DiskBlockManager¶
DiskBlockManager
manages a logical mapping of logical blocks and their physical on-disk locations for a BlockManager.
By default, one block is mapped to one file with a name given by its BlockId
. It is however possible to have a block map to only a segment of a file.
Block files are hashed among the local directories.
DiskBlockManager
is used to create a DiskStore.
Tip
Creating Instance¶
DiskBlockManager
takes the following to be created:
- SparkConf
-
deleteFilesOnStop
flag
When created, DiskBlockManager
creates one or many local directories to store block data and initializes the internal subDirs collection of locks for every local directory.
In the end, DiskBlockManager
registers a shutdown hook to clean up the local directories for blocks.
DiskBlockManager
is created for BlockManager.
Local Directories for Blocks¶
localDirs: Array[File]
While being created, DiskBlockManager
creates local directories for block data. DiskBlockManager
expects at least one local directory or prints out the following ERROR message to the logs and exits the JVM (with exit code 53):
Failed to create any local dir.
localDirs
is used when:
DiskBlockManager
is requested to getFile, initialize the subDirs internal registry, and to doStopBlockManager
is requested to register with an external shuffle server
Creating Local Directories¶
createLocalDirs(
conf: SparkConf): Array[File]
createLocalDirs
creates blockmgr-[random UUID]
directory under local directories to store block data.
Internally, createLocalDirs
finds the configured local directories where Spark can write files and creates a subdirectory blockmgr-[UUID]
under every configured parent directory.
For every local directory, createLocalDirs
prints out the following INFO message to the logs:
Created local directory at [localDir]
In case of an exception, createLocalDirs
prints out the following ERROR message to the logs and skips the directory:
Failed to create local dir in [rootDir]. Ignoring this directory.
File Locks for Local Block Store Directories¶
subDirs: Array[Array[File]]
subDirs
is a lookup table for file locks of every local block directory (with the first dimension for local directories and the second for locks).
The number of block subdirectories is controlled by spark.diskStore.subDirectories configuration property.
subDirs(dirId)(subDirId)
is used to access subDirId
subdirectory in dirId
local directory.
subDirs
is used when DiskBlockManager
is requested for a block file or all block files.
Finding Block File (and Creating Parent Directories)¶
getFile(
blockId: BlockId): File
getFile(
filename: String): File
getFile
computes a hash of the file name of the input BlockId that is used for the name of the parent directory and subdirectory.
getFile
creates the subdirectory unless it already exists.
getFile
is used when:
-
DiskBlockManager
is requested to containsBlock, createTempLocalBlock, createTempShuffleBlock -
DiskStore
is requested to getBytes, remove, contains, and put -
IndexShuffleBlockResolver
is requested to getDataFile and getIndexFile
createTempShuffleBlock¶
createTempShuffleBlock(): (TempShuffleBlockId, File)
createTempShuffleBlock
creates a temporary TempShuffleBlockId
block.
createTempShuffleBlock
...FIXME
Registering Shutdown Hook¶
addShutdownHook(): AnyRef
addShutdownHook
registers a shutdown hook to execute doStop at shutdown.
When executed, you should see the following DEBUG message in the logs:
Adding shutdown hook
addShutdownHook
adds the shutdown hook so it prints the following INFO message and executes doStop:
Shutdown hook called
Getting Local Directories for Spark to Write Files¶
getConfiguredLocalDirs(
conf: SparkConf): Array[String]
getConfiguredLocalDirs
returns the local directories where Spark can write files.
Internally, getConfiguredLocalDirs
uses conf
SparkConf to know if External Shuffle Service is enabled (based on spark.shuffle.service.enabled configuration property).
getConfiguredLocalDirs
checks if Spark runs on YARN and if so, returns LOCAL_DIRS-controlled local directories.
In non-YARN mode (or for the driver in yarn-client mode), getConfiguredLocalDirs
checks the following environment variables (in the order) and returns the value of the first met:
SPARK_EXECUTOR_DIRS
environment variableSPARK_LOCAL_DIRS
environment variableMESOS_DIRECTORY
environment variable (only when External Shuffle Service is not used)
In the end, when no earlier environment variables were found, getConfiguredLocalDirs
uses spark.local.dir configuration property or falls back to java.io.tmpdir
System property.
getConfiguredLocalDirs
is used when:
DiskBlockManager
is requested to createLocalDirsUtils
helper is requested to getLocalDir and getOrCreateLocalRootDirsImpl
Getting Writable Directories in YARN¶
getYarnLocalDirs(
conf: SparkConf): String
getYarnLocalDirs
uses conf
SparkConf to read LOCAL_DIRS
environment variable with comma-separated local directories (that have already been created and secured so that only the user has access to them).
getYarnLocalDirs
throws an Exception
when LOCAL_DIRS
environment variable was not set:
Yarn Local dirs can't be empty
Checking Whether Spark Runs on YARN¶
isRunningInYarnContainer(
conf: SparkConf): Boolean
isRunningInYarnContainer
uses conf
SparkConf to read Hadoop YARN's CONTAINER_ID environment variable to find out if Spark runs in a YARN container (that is exported by YARN NodeManager).
Getting All Blocks (From Files Stored On Disk)¶
getAllBlocks(): Seq[BlockId]
getAllBlocks
gets all the blocks stored on disk.
Internally, getAllBlocks
takes the block files and returns their names (as BlockId
).
getAllBlocks
is used when BlockManager
is requested to find IDs of existing blocks for a given filter.
All Block Files¶
getAllFiles(): Seq[File]
getAllFiles
...FIXME
Stopping¶
stop(): Unit
stop
...FIXME
stop
is used when BlockManager
is requested to stop.
Stopping DiskBlockManager (Removing Local Directories for Blocks)¶
doStop(): Unit
doStop
deletes the local directories recursively (only when the constructor's deleteFilesOnStop
is enabled and the parent directories are not registered to be removed at shutdown).
doStop
is used when DiskBlockManager
is requested to shut down or stop.
Logging¶
Enable ALL
logging level for org.apache.spark.storage.DiskBlockManager
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.storage.DiskBlockManager=ALL
Refer to Logging.