BlockManager¶
BlockManager
manages the storage for blocks (chunks of data) that can be stored in memory and on disk.
BlockManager
runs as part of the driver and executor processes.
BlockManager
provides interface for uploading and fetching blocks both locally and remotely using various stores (i.e. memory, disk, and off-heap).
Cached blocks are blocks with non-zero sum of memory and disk sizes.
Tip
Use spark-submit's command-line options (i.e. --driver-memory for the driver and --executor-memory for executors) or their equivalents as Spark properties (i.e. spark.executor.memory and spark.driver.memory) to control the memory for storage memory.
When External Shuffle Service is enabled, BlockManager uses ExternalShuffleClient to read shuffle files (of other executors).
Creating Instance¶
BlockManager
takes the following to be created:
- Executor ID
- RpcEnv
- BlockManagerMaster
- SerializerManager
- SparkConf
- MemoryManager
- MapOutputTracker
- ShuffleManager
- BlockTransferService
-
SecurityManager
- Optional ExternalBlockStoreClient
When created, BlockManager
sets externalShuffleServiceEnabled internal flag based on spark.shuffle.service.enabled configuration property.
BlockManager
then creates an instance of DiskBlockManager (requesting deleteFilesOnStop
when an external shuffle service is not in use).
BlockManager
creates block-manager-future daemon cached thread pool with 128 threads maximum (as futureExecutionContext
).
BlockManager
calculates the maximum memory to use (as maxMemory
) by requesting the maximum on-heap and off-heap storage memory from the assigned MemoryManager
.
BlockManager
calculates the port used by the external shuffle service (as externalShuffleServicePort
).
BlockManager
creates a client to read other executors' shuffle files (as shuffleClient
). If the external shuffle service is used an ExternalShuffleClient is created or the input BlockTransferService is used.
BlockManager
sets the maximum number of failures before this block manager refreshes the block locations from the driver (as maxFailuresBeforeLocationRefresh
).
BlockManager
registers a BlockManagerSlaveEndpoint with the input RpcEnv, itself, and MapOutputTracker (as slaveEndpoint
).
BlockManager
is created when SparkEnv is created (for the driver and executors) when a Spark application starts.
Initializing BlockManager¶
initialize(
appId: String): Unit
initialize
requests the BlockTransferService to initialize.
initialize
requests the ExternalBlockStoreClient to initialize (if given).
initialize
determines the BlockReplicationPolicy based on spark.storage.replication.policy configuration property and prints out the following INFO message to the logs:
Using [priorityClass] for block replication policy
initialize
creates a BlockManagerId and requests the BlockManagerMaster to registerBlockManager (with the BlockManagerId
, the local directories of the DiskBlockManager, the maxOnHeapMemory, the maxOffHeapMemory and the slaveEndpoint).
initialize
sets the internal BlockManagerId to be the response from the BlockManagerMaster (if available) or the BlockManagerId
just created.
initialize
initializes the External Shuffle Server's Address when enabled and prints out the following INFO message to the logs (with the externalShuffleServicePort):
external shuffle service port = [externalShuffleServicePort]
(only for executors and External Shuffle Service enabled) initialize
registers with the External Shuffle Server.
initialize
determines the hostLocalDirManager. With spark.shuffle.readHostLocalDisk configuration property enabled and spark.shuffle.useOldFetchProtocol disabled, initialize
uses the ExternalBlockStoreClient to create a HostLocalDirManager
(with spark.storage.localDiskByExecutors.cacheSize configuration property).
In the end, initialize
prints out the following INFO message to the logs (with the blockManagerId):
Initialized BlockManager: [blockManagerId]
initialize
is used when:
Registering Executor's BlockManager with External Shuffle Server¶
registerWithExternalShuffleServer(): Unit
registerWithExternalShuffleServer
registers the BlockManager
(for an executor) with External Shuffle Service.
registerWithExternalShuffleServer
prints out the following INFO message to the logs:
Registering executor with local external shuffle service.
registerWithExternalShuffleServer
creates an ExecutorShuffleInfo (with the localDirs and subDirsPerLocalDir of the DiskBlockManager, and the class name of the ShuffleManager).
registerWithExternalShuffleServer
uses spark.shuffle.registration.maxAttempts configuration property and 5
sleep time when requesting the ExternalBlockStoreClient to registerWithShuffleServer (using the BlockManagerId and the ExecutorShuffleInfo
).
In case of any exception that happen below the maximum number of attempts, registerWithExternalShuffleServer
prints out the following ERROR message to the logs and sleeps 5 seconds:
Failed to connect to external shuffle server, will retry [attempts] more times after waiting 5 seconds...
BlockManagerId¶
BlockManager
uses a BlockManagerId for...FIXME
HostLocalDirManager¶
BlockManager
can use a HostLocalDirManager
.
Default: (undefined)
BlockReplicationPolicy¶
BlockManager
uses a BlockReplicationPolicy for...FIXME
External Shuffle Service's Port¶
BlockManager
determines the port of an external shuffle service when created.
The port is used to create the shuffleServerId and a HostLocalDirManager.
The port is also used for preferExecutors.
spark.diskStore.subDirectories Configuration Property¶
BlockManager
uses spark.diskStore.subDirectories configuration property to initialize a subDirsPerLocalDir
local value.
subDirsPerLocalDir
is used when:
IndexShuffleBlockResolver
is requested to getDataFile and getIndexFileBlockManager
is requested to readDiskBlockFromSameHostExecutor
Fetching Block or Computing (and Storing) it¶
getOrElseUpdate[T](
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[T],
makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]]
Map.getOrElseUpdate
I think it is fair to say that getOrElseUpdate
is like getOrElseUpdate of scala.collection.mutable.Map in Scala.
getOrElseUpdate(key: K, op: ⇒ V): V
Quoting the official scaladoc:
If given key
K
is already in this map,getOrElseUpdate
returns the associated valueV
.Otherwise,
getOrElseUpdate
computes a valueV
from given expressionop
, stores with the keyK
in the map and returns that value.
Since BlockManager
is a key-value store of blocks of data identified by a block ID that seems to fit so well.
getOrElseUpdate
first attempts to get the block by the BlockId
(from the local block manager first and, if unavailable, requesting remote peers).
getOrElseUpdate
gives the BlockResult
of the block if found.
If however the block was not found (in any block manager in a Spark cluster), getOrElseUpdate
doPutIterator (for the input BlockId
, the makeIterator
function and the StorageLevel
).
getOrElseUpdate
branches off per the result.
For None
, getOrElseUpdate
getLocalValues for the BlockId
and eventually returns the BlockResult
(unless terminated by a SparkException
due to some internal error).
For Some(iter)
, getOrElseUpdate
returns an iterator of T
values.
getOrElseUpdate
is used when:
RDD
is requested to get or compute an RDD partition (for anRDDBlockId
with the RDD's id and partition index).
Fetching Block from Local or Remote Block Managers¶
get[T: ClassTag](
blockId: BlockId): Option[BlockResult]
get
attempts to get the blockId
block from a local block manager first before requesting it from remote block managers.
Internally, get
tries to get the block from the local BlockManager. If the block was found, you should see the following INFO message in the logs and get
returns the local BlockResult.
Found block [blockId] locally
If however the block was not found locally, get
tries to get the block from remote block managers. If retrieved from a remote block manager, you should see the following INFO message in the logs and get
returns the remote BlockResult.
Found block [blockId] remotely
In the end, get
returns "nothing" (i.e. NONE
) when the blockId
block was not found either in the local BlockManager or any remote BlockManager.
get
is used when:
BlockManager
is requested to getOrElseUpdate
getRemoteValues¶
getRemoteValues[T: ClassTag](
blockId: BlockId): Option[BlockResult]
getRemoteValues
getRemoteBlock with the bufferTransformer
function that takes a ManagedBuffer and does the following:
- Requests the SerializerManager to deserialize values from an input stream from the
ManagedBuffer
- Creates a
BlockResult
with the values (and their total size, andNetwork
read method)
Fetching Block Bytes From Remote Block Managers¶
getRemoteBytes(
blockId: BlockId): Option[ChunkedByteBuffer]
getRemoteBytes
getRemoteBlock with the bufferTransformer
function that takes a ManagedBuffer and creates a ChunkedByteBuffer
.
getRemoteBytes
is used when:
TorrentBroadcast
is requested to readBlocksTaskResultGetter
is requested to enqueueSuccessfulTask
Fetching Remote Block¶
getRemoteBlock[T](
blockId: BlockId,
bufferTransformer: ManagedBuffer => T): Option[T]
getRemoteBlock
 is used for getRemoteValues and getRemoteBytes.
getRemoteBlock
prints out the following DEBUG message to the logs:
Getting remote block [blockId]
getRemoteBlock
requests the BlockManagerMaster for locations and status of the input BlockId (with the host of BlockManagerId).
With some locations, getRemoteBlock
determines the size of the block (max of diskSize
and memSize
). getRemoteBlock
tries to read the block from the local directories of another executor on the same host. getRemoteBlock
prints out the following INFO message to the logs:
Read [blockId] from the disk of a same host executor is [successful|failed].
When a data block could not be found in any of the local directories, getRemoteBlock
fetchRemoteManagedBuffer.
For no locations from the BlockManagerMaster, getRemoteBlock
prints out the following DEBUG message to the logs:
readDiskBlockFromSameHostExecutor¶
readDiskBlockFromSameHostExecutor(
blockId: BlockId,
localDirs: Array[String],
blockSize: Long): Option[ManagedBuffer]
readDiskBlockFromSameHostExecutor
...FIXME
fetchRemoteManagedBuffer¶
fetchRemoteManagedBuffer(
blockId: BlockId,
blockSize: Long,
locationsAndStatus: BlockManagerMessages.BlockLocationsAndStatus): Option[ManagedBuffer]
fetchRemoteManagedBuffer
...FIXME
sortLocations¶
sortLocations(
locations: Seq[BlockManagerId]): Seq[BlockManagerId]
sortLocations
...FIXME
preferExecutors¶
preferExecutors(
locations: Seq[BlockManagerId]): Seq[BlockManagerId]
preferExecutors
...FIXME
readDiskBlockFromSameHostExecutor¶
readDiskBlockFromSameHostExecutor(
blockId: BlockId,
localDirs: Array[String],
blockSize: Long): Option[ManagedBuffer]
readDiskBlockFromSameHostExecutor
...FIXME
ExecutionContextExecutorService¶
BlockManager
uses a Scala ExecutionContextExecutorService to execute FIXME asynchronously (on a thread pool with block-manager-future prefix and maximum of 128 threads).
BlockEvictionHandler¶
BlockManager
is a BlockEvictionHandler that can drop a block from memory (and store it on a disk when necessary).
ShuffleClient and External Shuffle Service¶
BlockManager
manages the lifecycle of a ShuffleClient:
-
Creates when created
-
Inits (and possibly registers with an external shuffle server) when requested to initialize
-
Closes when requested to stop
The ShuffleClient
can be an ExternalShuffleClient or the given BlockTransferService based on spark.shuffle.service.enabled configuration property. When enabled, BlockManager uses the ExternalShuffleClient.
The ShuffleClient
is available to other Spark services (using shuffleClient
value) and is used when BlockStoreShuffleReader is requested to read combined key-value records for a reduce task.
When requested for shuffle metrics, BlockManager simply requests them from the ShuffleClient
.
BlockManager and RpcEnv¶
BlockManager
is given a RpcEnv when created.
The RpcEnv
is used to set up a BlockManagerSlaveEndpoint.
BlockInfoManager¶
BlockManager
creates a BlockInfoManager when created.
BlockManager
requests the BlockInfoManager
to clear when requested to stop.
BlockManager
uses the BlockInfoManager
to create a MemoryStore.
BlockManager
uses the BlockInfoManager
when requested for the following:
-
removeRdd, removeBroadcast, removeBlock, removeBlockInternal
-
downgradeLock, releaseLock, registerTask, releaseAllLocksForTask
BlockManager and BlockManagerMaster¶
BlockManager
is given a BlockManagerMaster when created.
BlockManager as BlockDataManager¶
BlockManager
is a BlockDataManager.
BlockManager and MapOutputTracker¶
BlockManager
is given a MapOutputTracker when created.
Executor ID¶
BlockManager
is given an Executor ID when created.
The Executor ID is one of the following:
-
driver (
SparkContext.DRIVER_IDENTIFIER
) for the driver -
Value of --executor-id command-line argument for CoarseGrainedExecutorBackend executors
BlockManagerEndpoint RPC Endpoint¶
BlockManager
requests the RpcEnv to register a BlockManagerSlaveEndpoint under the name BlockManagerEndpoint[ID]
.
The RPC endpoint is used when BlockManager
is requested to initialize and reregister (to register the BlockManager
on an executor with the BlockManagerMaster on the driver).
The endpoint is stopped (by requesting the RpcEnv to stop the reference) when BlockManager
is requested to stop.
Accessing BlockManager¶
BlockManager
is available using SparkEnv on the driver and executors.
import org.apache.spark.SparkEnv
val bm = SparkEnv.get.blockManager
scala> :type bm
org.apache.spark.storage.BlockManager
BlockTransferService¶
BlockManager
is given a BlockTransferService when created.
BlockTransferService
is used as the ShuffleClient when BlockManager
is configured with no external shuffle service (based on spark.shuffle.service.enabled configuration property).
BlockTransferService
is initialized when BlockManager
is.
BlockTransferService
is closed when BlockManager
is requested to stop.
BlockTransferService
is used when BlockManager
is requested to fetching a block from or replicate a block to remote block managers.
MemoryManager¶
BlockManager is given a MemoryManager when created.
BlockManager uses the MemoryManager
for the following:
-
Create the MemoryStore (that is then assigned to MemoryManager as a "circular dependency")
-
Initialize maxOnHeapMemory and maxOffHeapMemory (for reporting)
ShuffleManager¶
BlockManager
is given a ShuffleManager when created.
BlockManager
uses the ShuffleManager
for the following:
-
Retrieving a block data (for shuffle blocks)
-
Retrieving a non-shuffle block data (for shuffle blocks anyway)
-
Registering an executor with a local external shuffle service (when initialized on an executor with externalShuffleServiceEnabled)
DiskBlockManager¶
BlockManager creates a DiskBlockManager when created.
BlockManager uses the BlockManager for the following:
-
Creating a DiskStore
-
Registering an executor with a local external shuffle service (when initialized on an executor with externalShuffleServiceEnabled)
The BlockManager
is available as diskBlockManager
reference to other Spark systems.
import org.apache.spark.SparkEnv
SparkEnv.get.blockManager.diskBlockManager
MemoryStore¶
BlockManager creates a MemoryStore when created (with the BlockInfoManager, the SerializerManager, the MemoryManager and itself as a BlockEvictionHandler).
BlockManager
requests the MemoryManager to use the MemoryStore
.
BlockManager
uses the MemoryStore
for the following:
The MemoryStore
is requested to clear when BlockManager
is requested to stop.
The MemoryStore is available as memoryStore
private reference to other Spark services.
import org.apache.spark.SparkEnv
SparkEnv.get.blockManager.memoryStore
The MemoryStore is used (via SparkEnv.get.blockManager.memoryStore
reference) when Task is requested to run (that has finished and requests the MemoryStore to releaseUnrollMemoryForThisTask).
DiskStore¶
BlockManager creates a DiskStore (with the DiskBlockManager) when created.
BlockManager uses the DiskStore when requested to getStatus, getCurrentBlockStatus, getLocalValues, doGetLocalBytes, doPutBytes, doPutIterator, dropFromMemory, removeBlockInternal.
Performance Metrics¶
BlockManager uses BlockManagerSource to report metrics under the name BlockManager.
getPeers¶
getPeers(
forceFetch: Boolean): Seq[BlockManagerId]
getPeers
...FIXME
getPeers
is used when BlockManager
is requested to replicateBlock and replicate.
Releasing All Locks For Task¶
releaseAllLocksForTask(
taskAttemptId: Long): Seq[BlockId]
releaseAllLocksForTask
...FIXME
releaseAllLocksForTask
is used when TaskRunner
is requested to run (at the end of a task).
Stopping BlockManager¶
stop(): Unit
stop
...FIXME
stop
is used when SparkEnv
is requested to stop.
Getting IDs of Existing Blocks (For a Given Filter)¶
getMatchingBlockIds(
filter: BlockId => Boolean): Seq[BlockId]
getMatchingBlockIds
...FIXME
getMatchingBlockIds
is used when BlockManagerSlaveEndpoint
is requested to handle a GetMatchingBlockIds message.
Getting Local Block¶
getLocalValues(
blockId: BlockId): Option[BlockResult]
getLocalValues
prints out the following DEBUG message to the logs:
Getting local block [blockId]
getLocalValues
obtains a read lock for blockId
.
When no blockId
block was found, you should see the following DEBUG message in the logs and getLocalValues
returns "nothing" (i.e. NONE
).
Block [blockId] was not found
When the blockId
block was found, you should see the following DEBUG message in the logs:
Level for block [blockId] is [level]
If blockId
block has memory level and is registered in MemoryStore
, getLocalValues
returns a BlockResult as Memory
read method and with a CompletionIterator
for an interator:
- Values iterator from
MemoryStore
forblockId
for "deserialized" persistence levels. - Iterator from
SerializerManager
after the data stream has been deserialized for theblockId
block and the bytes forblockId
block for "serialized" persistence levels.
getLocalValues
is used when:
-
TorrentBroadcast
is requested to readBroadcastBlock -
BlockManager
is requested to get and getOrElseUpdate
maybeCacheDiskValuesInMemory¶
maybeCacheDiskValuesInMemory[T](
blockInfo: BlockInfo,
blockId: BlockId,
level: StorageLevel,
diskIterator: Iterator[T]): Iterator[T]
maybeCacheDiskValuesInMemory
...FIXME
maybeCacheDiskValuesInMemory
is used when BlockManager
is requested to getLocalValues.
Retrieving Block Data¶
getBlockData(
blockId: BlockId): ManagedBuffer
getBlockData
is part of the BlockDataManager abstraction.
For a BlockId.md[] of a shuffle (a ShuffleBlockId), getBlockData requests the <
Otherwise, getBlockData <
If found, getBlockData creates a new BlockManagerManagedBuffer (with the <
If not found, getBlockData <
NOTE: getBlockData
is executed for shuffle blocks or local blocks that the BlockManagerMaster knows this executor really has (unless BlockManagerMaster is outdated).
Retrieving Non-Shuffle Local Block Data¶
getLocalBytes(
blockId: BlockId): Option[BlockData]
getLocalBytes
...FIXME
getLocalBytes
is used when:
-
TorrentBroadcast is requested to core:TorrentBroadcast.md#readBlocks[readBlocks]
-
BlockManager is requested for the <
> (of a non-shuffle block)
removeBlockInternal¶
removeBlockInternal(
blockId: BlockId,
tellMaster: Boolean): Unit
removeBlockInternal
...FIXME
removeBlockInternal
is used when BlockManager is requested to <
Stores¶
A Store is the place where blocks are held.
There are the following possible stores:
- MemoryStore.md[MemoryStore] for memory storage level.
- DiskStore.md[DiskStore] for disk storage level.
ExternalBlockStore
for OFF_HEAP storage level.
Storing Block Data Locally¶
putBlockData(
blockId: BlockId,
data: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Boolean
putBlockData
simply <level
).
putBlockData
is part of the BlockDataManager abstraction.
Internally, putBlockData
wraps ChunkedByteBuffer
around data
buffer's NIO ByteBuffer
and calls <
Storing Block Bytes Locally¶
putBytes(
blockId: BlockId,
bytes: ChunkedByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true): Boolean
putBytes
makes sure that the bytes
are not null
and <
putBytes
is used when:
-
BlockManager is requested to <
> -
TaskRunner
is requested to executor:TaskRunner.md#run-result-sent-via-blockmanager[run] (and the result size is above executor:Executor.md#maxDirectResultSize[maxDirectResultSize]) -
TorrentBroadcast
is requested to core:TorrentBroadcast.md#writeBlocks[writeBlocks] and core:TorrentBroadcast.md#readBlocks[readBlocks]
doPutBytes¶
doPutBytes[T](
blockId: BlockId,
bytes: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[T],
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Boolean
doPutBytes
calls the internal helper <BlockInfo
and does the uploading.
Inside the function, if the StorageLevel.md[storage level
]'s replication is greater than 1, it immediately starts <blockId
block on a separate thread (from futureExecutionContext
thread pool). The replication uses the input bytes
and level
storage level.
For a memory storage level, the function checks whether the storage level
is deserialized or not. For a deserialized storage level
, BlockManager
's serializer:SerializerManager.md#dataDeserializeStream[SerializerManager
deserializes bytes
into an iterator of values] that MemoryStore.md#putIteratorAsValues[MemoryStore
stores]. If however the storage level
is not deserialized, the function requests MemoryStore.md#putBytes[MemoryStore
to store the bytes]
If the put did not succeed and the storage level is to use disk, you should see the following WARN message in the logs:
Persisting block [blockId] to disk instead.
And DiskStore.md#putBytes[DiskStore
stores the bytes].
NOTE: DiskStore.md[DiskStore] is requested to store the bytes of a block with memory and disk storage level only when MemoryStore.md[MemoryStore] has failed.
If the storage level is to use disk only, DiskStore.md#putBytes[DiskStore
stores the bytes].
doPutBytes
requests <tellMaster
), the function <TaskContext
metrics are updated with the updated block status] (only when executed inside a task where TaskContext
is available).
You should see the following DEBUG message in the logs:
Put block [blockId] locally took [time] ms
The function waits till the earlier asynchronous replication finishes for a block with replication level greater than 1
.
The final result of doPutBytes
is the result of storing the block successful or not (as computed earlier).
NOTE: doPutBytes
is used exclusively when BlockManager is requested to <
doPut¶
doPut[T](
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[_],
tellMaster: Boolean,
keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T]
doPut executes the input putBody
function with a BlockInfo.md[] being a new BlockInfo
object (with level
storage level) that BlockInfoManager.md#lockNewBlockForWriting[BlockInfoManager
managed to create a write lock for].
If the block has already been created (and BlockInfoManager.md#lockNewBlockForWriting[BlockInfoManager
did not manage to create a write lock for]), the following WARN message is printed out to the logs:
Block [blockId] already exists on this machine; not re-adding it
doPut <keepReadLock
flag is disabled and returns None
immediately.
If however the write lock has been given, doPut executes putBody
.
If the result of putBody
is None
the block is considered saved successfully.
For successful save and keepReadLock
enabled, BlockInfoManager.md#downgradeLock[BlockInfoManager
is requested to downgrade an exclusive write lock for blockId
to a shared read lock].
For successful save and keepReadLock
disabled, BlockInfoManager.md#unlock[BlockInfoManager
is requested to release lock on blockId
].
For unsuccessful save, <
Putting block [blockId] failed
In the end, doPut prints out the following DEBUG message to the logs:
Putting block [blockId] [withOrWithout] replication took [usedTime] ms
doPut is used when BlockManager is requested to <
Removing Block From Memory and Disk¶
removeBlock(
blockId: BlockId,
tellMaster: Boolean = true): Unit
removeBlock removes the blockId
block from the MemoryStore.md[MemoryStore] and DiskStore.md[DiskStore].
When executed, it prints out the following DEBUG message to the logs:
Removing block [blockId]
It requests BlockInfoManager.md[] for lock for writing for the blockId
block. If it receives none, it prints out the following WARN message to the logs and quits.
Asked to remove block [blockId], which does not exist
Otherwise, with a write lock for the block, the block is removed from MemoryStore.md[MemoryStore] and DiskStore.md[DiskStore] (see MemoryStore.md#remove[Removing Block in MemoryStore
] and DiskStore.md#remove[Removing Block in DiskStore
]).
If both removals fail, it prints out the following WARN message:
Block [blockId] could not be removed as it was not found in either the disk, memory, or external block store
The block is removed from BlockInfoManager.md[].
removeBlock then <tellMaster
and the info's tellMaster
are both enabled, i.e. true
) and the executor:TaskMetrics.md#incUpdatedBlockStatuses[current TaskContext metrics are updated with the change].
removeBlock is used when:
-
BlockManager is requested to <
>, < > and < > -
BlockManagerSlaveEndpoint is requested to handle a BlockManagerSlaveEndpoint.md#RemoveBlock[RemoveBlock] message
Removing RDD Blocks¶
removeRdd(rddId: Int): Int
removeRdd
removes all the blocks that belong to the rddId
RDD.
It prints out the following INFO message to the logs:
Removing RDD [rddId]
It then requests RDD blocks from BlockInfoManager.md[] and <
The number of blocks removed is the final result.
NOTE: It is used by BlockManagerSlaveEndpoint.md#RemoveRdd[BlockManagerSlaveEndpoint
while handling RemoveRdd
messages].
Removing All Blocks of Broadcast Variable¶
removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int
removeBroadcast
removes all the blocks of the input broadcastId
broadcast.
Internally, it starts by printing out the following DEBUG message to the logs:
Removing broadcast [broadcastId]
It then requests all the BlockId.md#BroadcastBlockId[BroadcastBlockId] objects that belong to the broadcastId
broadcast from BlockInfoManager.md[] and <
The number of blocks removed is the final result.
NOTE: It is used by BlockManagerSlaveEndpoint.md#RemoveBroadcast[BlockManagerSlaveEndpoint
while handling RemoveBroadcast
messages].
External Shuffle Server's Address¶
shuffleServerId: BlockManagerId
When requested to initialize, BlockManager
records the location (BlockManagerId) of External Shuffle Service if enabled or simply uses the non-external-shuffle-service BlockManagerId.
The BlockManagerId
is used to register an executor with a local external shuffle service.
The BlockManagerId
is used as the location of a shuffle map output when:
BypassMergeSortShuffleWriter
is requested to write partition records to a shuffle fileUnsafeShuffleWriter
is requested to close and write outputSortShuffleWriter
is requested to write output
getStatus¶
getStatus(
blockId: BlockId): Option[BlockStatus]
getStatus
...FIXME
getStatus
is used when BlockManagerSlaveEndpoint
is requested to handle GetBlockStatus message.
Re-registering BlockManager with Driver and Reporting Blocks¶
reregister(): Unit
When executed, reregister prints the following INFO message to the logs:
BlockManager [blockManagerId] re-registering with master
reregister then BlockManagerMaster.md#registerBlockManager[registers itself to the driver's BlockManagerMaster
] (just as it was when BlockManager was initializing). It passes the BlockManagerId.md[], the maximum memory (as maxMemory
), and the BlockManagerSlaveEndpoint.md[].
reregister will then report all the local blocks to the BlockManagerMaster.md[BlockManagerMaster].
You should see the following INFO message in the logs:
Reporting [blockInfoManager.size] blocks to the master.
For each block metadata (in BlockInfoManager.md[]) it gets block current status and tries to send it to the BlockManagerMaster.
If there is an issue communicating to the BlockManagerMaster.md[BlockManagerMaster], you should see the following ERROR message in the logs:
Failed to report [blockId] to master; giving up.
After the ERROR message, reregister stops reporting.
reregister
is used when an Executor
was informed to re-register while sending heartbeats.
reportAllBlocks¶
reportAllBlocks(): Unit
reportAllBlocks
...FIXME
Calculate Current Block Status¶
getCurrentBlockStatus(
blockId: BlockId,
info: BlockInfo): BlockStatus
getCurrentBlockStatus
gives the current BlockStatus
of the BlockId
block (with the block's current StorageLevel.md[StorageLevel], memory and disk sizes). It uses MemoryStore.md[MemoryStore] and DiskStore.md[DiskStore] for size and other information.
NOTE: Most of the information to build BlockStatus
is already in BlockInfo
except that it may not necessarily reflect the current state per MemoryStore.md[MemoryStore] and DiskStore.md[DiskStore].
Internally, it uses the input BlockInfo.md[] to know about the block's storage level. If the storage level is not set (i.e. null
), the returned BlockStatus
assumes the StorageLevel.md[default NONE
storage level] and the memory and disk sizes being 0
.
If however the storage level is set, getCurrentBlockStatus
uses MemoryStore.md[MemoryStore] and DiskStore.md[DiskStore] to check whether the block is stored in the storages or not and request for their sizes in the storages respectively (using their getSize
or assume 0
).
NOTE: It is acceptable that the BlockInfo
says to use memory or disk yet the block is not in the storages (yet or anymore). The method will give current status.
getCurrentBlockStatus
is used when <
Reporting Current Storage Status of Block to Driver¶
reportBlockStatus(
blockId: BlockId,
info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Unit
reportBlockStatus is an for <
Got told to re-register updating block [blockId]
It does asynchronous reregistration (using asyncReregister
).
In either case, it prints out the following DEBUG message to the logs:
Told master about block [blockId]
reportBlockStatus is used when BlockManager is requested to getBlockData, doPutBytes, doPutIterator, dropFromMemory and removeBlockInternal.
Reporting Block Status Update to Driver¶
def tryToReportBlockStatus(
blockId: BlockId,
info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Boolean
tryToReportBlockStatus
reports block status update to BlockManagerMaster and returns its response.
tryToReportBlockStatus
is used when BlockManager is requested to reportAllBlocks or reportBlockStatus.
Execution Context¶
block-manager-future is the execution context for...FIXME
ByteBuffer¶
The underlying abstraction for blocks in Spark is a ByteBuffer
that limits the size of a block to 2GB (Integer.MAX_VALUE
- see Why does FileChannel.map take up to Integer.MAX_VALUE of data? and SPARK-1476 2GB limit in spark for blocks). This has implication not just for managed blocks in use, but also for shuffle blocks (memory mapped blocks are limited to 2GB, even though the API allows for long
), ser-deser via byte array-backed output streams.
BlockResult¶
BlockResult
is a description of a fetched block with the readMethod
and bytes
.
Registering Task¶
registerTask(
taskAttemptId: Long): Unit
registerTask
requests the BlockInfoManager to register a given task.
registerTask
is used when Task
is requested to run (at the start of a task).
Creating DiskBlockObjectWriter¶
getDiskWriter(
blockId: BlockId,
file: File,
serializerInstance: SerializerInstance,
bufferSize: Int,
writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter
getDiskWriter creates a DiskBlockObjectWriter (with spark.shuffle.sync configuration property for syncWrites
argument).
getDiskWriter
uses the SerializerManager.
getDiskWriter
is used when:
-
BypassMergeSortShuffleWriter
is requested to write records (of a partition) -
ShuffleExternalSorter
is requested to writeSortedFile -
ExternalAppendOnlyMap
is requested to spillMemoryIteratorToDisk -
ExternalSorter
is requested to spillMemoryIteratorToDisk and writePartitionedFile -
UnsafeSorterSpillWriter is created
Recording Updated BlockStatus In Current Task's TaskMetrics¶
addUpdatedBlockStatusToTaskMetrics(
blockId: BlockId,
status: BlockStatus): Unit
addUpdatedBlockStatusToTaskMetrics
takes an active TaskContext
(if available) and records updated BlockStatus
for Block
(in the task's TaskMetrics
).
addUpdatedBlockStatusToTaskMetrics
is used when BlockManager doPutBytes (for a block that was successfully stored), doPut, doPutIterator, removes blocks from memory (possibly spilling it to disk) and removes block from memory and disk.
Requesting Shuffle-Related Spark Metrics Source¶
shuffleMetricsSource: Source
shuffleMetricsSource
requests the ShuffleClient for the shuffle metrics and creates a ShuffleMetricsSource with the source name based on spark.shuffle.service.enabled configuration property:
-
ExternalShuffle when spark.shuffle.service.enabled configuration property is on (
true
) -
NettyBlockTransfer when spark.shuffle.service.enabled configuration property is off (
false
)
shuffleMetricsSource
is used when Executor is created (for non-local / cluster modes).
Replicating Block To Peers¶
replicate(
blockId: BlockId,
data: BlockData,
level: StorageLevel,
classTag: ClassTag[_],
existingReplicas: Set[BlockManagerId] = Set.empty): Unit
replicate
...FIXME
replicate
is used when BlockManager
is requested to doPutBytes, doPutIterator and replicateBlock.
replicateBlock¶
replicateBlock(
blockId: BlockId,
existingReplicas: Set[BlockManagerId],
maxReplicas: Int): Unit
replicateBlock
...FIXME
replicateBlock
is used when BlockManagerSlaveEndpoint
is requested to handle a ReplicateBlock message.
putIterator¶
putIterator[T: ClassTag](
blockId: BlockId,
values: Iterator[T],
level: StorageLevel,
tellMaster: Boolean = true): Boolean
putIterator
...FIXME
putIterator
is used when:
BlockManager
is requested to putSingle
putSingle Method¶
putSingle[T: ClassTag](
blockId: BlockId,
value: T,
level: StorageLevel,
tellMaster: Boolean = true): Boolean
putSingle
...FIXME
putSingle
is used when TorrentBroadcast
is requested to write the blocks and readBroadcastBlock.
doPutIterator¶
doPutIterator[T](
blockId: BlockId,
iterator: () => Iterator[T],
level: StorageLevel,
classTag: ClassTag[T],
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]]
doPutIterator
simply <putBody
function that accepts a BlockInfo
and does the following:
. putBody
branches off per whether the StorageLevel
indicates to use a StorageLevel.md#useMemory[memory] or simply a StorageLevel.md#useDisk[disk], i.e.
-
When the input
StorageLevel
indicates to StorageLevel.md#useMemory[use a memory] for storage in StorageLevel.md#deserialized[deserialized] format,putBody
requests <> to MemoryStore.md#putIteratorAsValues[putIteratorAsValues] (for the BlockId
and with theiterator
factory function). + If the <> returned a correct value, the internal size
is set to the value. + If however the <> failed to give a correct value, FIXME -
When the input
StorageLevel
indicates to StorageLevel.md#useMemory[use memory] for storage in StorageLevel.md#deserialized[serialized] format,putBody
...FIXME -
When the input
StorageLevel
does not indicate to use memory for storage but StorageLevel.md#useDisk[disk] instead,putBody
...FIXME
. putBody
requests the <
. Only when the block was successfully stored in either the memory or disk store:
-
putBody
<> to the < > when the input tellMaster
flag (default: enabled) and thetellMaster
flag of the block info are both enabled. -
putBody
<> (with the BlockId
andBlockStatus
) -
putBody
prints out the following DEBUG message to the logs: +Put block [blockId] locally took [time] ms
-
When the input
StorageLevel
indicates to use StorageLevel.md#replication[replication],putBody
<> followed by < > (with the input BlockId
and theStorageLevel
as well as theBlockData
to replicate) -
With a successful replication,
putBody
prints out the following DEBUG message to the logs: +Put block [blockId] remotely took [time] ms
. In the end, putBody
may or may not give a PartiallyUnrolledIterator
if...FIXME
NOTE: doPutIterator
is used when BlockManager is requested to <
Dropping Block from Memory¶
dropFromMemory(
blockId: BlockId,
data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel
dropFromMemory
prints out the following INFO message to the logs:
Dropping block [blockId] from memory
dropFromMemory
then asserts that the given block is BlockInfoManager.md#assertBlockIsLockedForWriting[locked for writing].
If the block's StorageLevel.md[StorageLevel] uses disks and the internal DiskStore.md[DiskStore] object (diskStore
) does not contain the block, it is saved then. You should see the following INFO message in the logs:
Writing block [blockId] to disk
CAUTION: FIXME Describe the case with saving a block to disk.
The block's memory size is fetched and recorded (using MemoryStore.getSize
).
The block is MemoryStore.md#remove[removed from memory] if exists. If not, you should see the following WARN message in the logs:
Block [blockId] could not be dropped from memory as it does not exist
It then <info.tellMaster
.
CAUTION: FIXME When would info.tellMaster
be true
?
A block is considered updated when it was written to disk or removed from memory or both. If either happened, the executor:TaskMetrics.md#incUpdatedBlockStatuses[current TaskContext metrics are updated with the change].
In the end, dropFromMemory
returns the current storage level of the block.
dropFromMemory
is part of the BlockEvictionHandler abstraction.
releaseLock Method¶
releaseLock(
blockId: BlockId,
taskAttemptId: Option[Long] = None): Unit
releaseLock requests the BlockInfoManager to unlock the given block.
releaseLock is part of the BlockDataManager abstraction.
putBlockDataAsStream¶
putBlockDataAsStream(
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[_]): StreamCallbackWithID
putBlockDataAsStream
...FIXME
putBlockDataAsStream
is part of the BlockDataManager abstraction.
Maximum Memory¶
Total maximum value that BlockManager can ever possibly use (that depends on <
Total available memory:MemoryManager.md#maxOnHeapStorageMemory[on-heap] and memory:MemoryManager.md#maxOffHeapStorageMemory[off-heap] memory for storage (in bytes)
Maximum Off-Heap Memory¶
Maximum On-Heap Memory¶
Logging¶
Enable ALL
logging level for org.apache.spark.storage.BlockManager
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.storage.BlockManager=ALL
Refer to Logging.