ShuffleBlockFetcherIterator¶
ShuffleBlockFetcherIterator is a Scala http://www.scala-lang.org/api/current/scala/collection/Iterator.html[Iterator] that fetches shuffle blocks (aka shuffle map outputs) from block managers.
ShuffleBlockFetcherIterator is <BlockStoreShuffleReader is requested to shuffle:BlockStoreShuffleReader.md#read[read combined key-value records for a reduce task].
ShuffleBlockFetcherIterator allows for <(BlockId, InputStream) pairs so a caller can handle shuffle blocks in a pipelined fashion as they are received.
ShuffleBlockFetcherIterator is exhausted (i.e. <
ShuffleBlockFetcherIterator <
[[internal-registries]] .ShuffleBlockFetcherIterator's Internal Registries and Counters [cols="1,2",options="header",width="100%"] |=== | Name | Description
| numBlocksProcessed | [[numBlocksProcessed]] The number of blocks <
| numBlocksToFetch a| [[numBlocksToFetch]] Total number of blocks to <
ShuffleBlockFetcherIterator can <numBlocksToFetch elements.
numBlocksToFetch is increased every time ShuffleBlockFetcherIterator is requested to <
Getting [numBlocksToFetch] non-empty blocks out of [totalBlocks] blocks
| [[results]] results | Internal FIFO blocking queue (using Java's https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingQueue.html[java.util.concurrent.LinkedBlockingQueue]) to hold FetchResult remote and local fetch results.
Used in:
-
<
> to take one FetchResultoff the queue, -
<
> to put SuccessFetchResultorFailureFetchResultremote fetch results (as part ofBlockFetchingListenercallback), -
<
> (similarly to < >) to put local fetch results, -
<
> to release managed buffers for SuccessFetchResultresults.
| [[maxBytesInFlight]] maxBytesInFlight | The maximum size (in bytes) of all the remote shuffle blocks to fetch.
Set when <
| [[maxReqsInFlight]] maxReqsInFlight | The maximum number of remote requests to fetch shuffle blocks.
Set when <
| [[bytesInFlight]] bytesInFlight | The bytes of fetched remote shuffle blocks in flight
Starts at 0 when <
Incremented every <
ShuffleBlockFetcherIterator makes sure that the invariant of bytesInFlight below <
| [[reqsInFlight]] reqsInFlight | The number of remote shuffle block fetch requests in flight.
Starts at 0 when <
Incremented every <
ShuffleBlockFetcherIterator makes sure that the invariant of reqsInFlight below <
| [[isZombie]] isZombie | Flag whether ShuffleBlockFetcherIterator is still active. It is disabled, i.e. false, when <
<sendRequest) will no longer add fetched remote shuffle blocks into <
| [[currentResult]] currentResult | The currently-processed SuccessFetchResult
Set when ShuffleBlockFetcherIterator <
[TIP]¶
Enable ERROR, WARN, INFO, DEBUG or TRACE logging levels for org.apache.spark.storage.ShuffleBlockFetcherIterator logger to see what happens in ShuffleBlockFetcherIterator.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.storage.ShuffleBlockFetcherIterator=TRACE
Refer to spark-logging.md[Logging].¶
== [[fetchUpToMaxBytes]] fetchUpToMaxBytes Method
CAUTION: FIXME
Creating Instance¶
When created, ShuffleBlockFetcherIterator takes the following:
- [[context]] TaskContext
- [[shuffleClient]] storage:ShuffleClient.md[]
- [[blockManager]] storage:BlockManager.md[BlockManager]
- [[blocksByAddress]] Blocks to fetch per storage:BlockManager.md[BlockManager] (as
Seq[(BlockManagerId, Seq[(BlockId, Long)])]) - [[streamWrapper]] Function to wrap the returned input stream (as
(BlockId, InputStream) => InputStream) - <
> -- the maximum size (in bytes) of map outputs to fetch simultaneously from each reduce task (controlled by shuffle:BlockStoreShuffleReader.md#spark_reducer_maxSizeInFlight[spark.reducer.maxSizeInFlight] Spark property) - <
> -- the maximum number of remote requests to fetch blocks at any given point (controlled by shuffle:BlockStoreShuffleReader.md#spark_reducer_maxReqsInFlight[spark.reducer.maxReqsInFlight] Spark property) - [[maxBlocksInFlightPerAddress]]
maxBlocksInFlightPerAddress - [[maxReqSizeShuffleToMem]]
maxReqSizeShuffleToMem - [[detectCorrupt]]
detectCorruptflag to detect any corruption in fetched blocks (controlled by shuffle:BlockStoreShuffleReader.md#spark_shuffle_detectCorrupt[spark.shuffle.detectCorrupt] Spark property)
== [[initialize]] Initializing ShuffleBlockFetcherIterator -- initialize Internal Method
[source, scala]¶
initialize(): Unit¶
initialize registers a task cleanup and fetches shuffle blocks from remote and local storage:BlockManager.md[BlockManagers].
Internally, initialize registers a TaskCompletionListener (that will <
initialize <
initialize <
As ShuffleBlockFetcherIterator is in initialization phase, initialize makes sure that <0. Otherwise, initialize throws an exception.
initialize <
You should see the following INFO message in the logs:
INFO ShuffleBlockFetcherIterator: Started [numFetches] remote fetches in [time] ms
initialize <
You should see the following DEBUG message in the logs:
DEBUG ShuffleBlockFetcherIterator: Got local blocks in [time] ms
NOTE: initialize is used exclusively when ShuffleBlockFetcherIterator is <
== [[sendRequest]] Sending Remote Shuffle Block Fetch Request -- sendRequest Internal Method
[source, scala]¶
sendRequest(req: FetchRequest): Unit¶
Internally, when sendRequest runs, you should see the following DEBUG message in the logs:
DEBUG ShuffleBlockFetcherIterator: Sending request for [blocks.size] blocks ([size] B) from [hostPort]
sendRequest increments <
NOTE: The input FetchRequest contains the remote storage:BlockManagerId.md[] address and the shuffle blocks to fetch (as a sequence of storage:BlockId.md[] and their sizes).
sendRequest storage:ShuffleClient.md#fetchBlocks[requests ShuffleClient to fetch shuffle blocks] (from the host, the port, and the executor as defined in the input FetchRequest).
NOTE: ShuffleClient was defined when <
sendRequest registers a BlockFetchingListener with ShuffleClient that:
-
<
> adds it as SuccessFetchResultto <> internal queue. -
<
> adds it as FailureFetchResultto <> internal queue.
NOTE: sendRequest is used exclusively when ShuffleBlockFetcherIterator is requested to <
=== [[sendRequest-BlockFetchingListener-onBlockFetchSuccess]] onBlockFetchSuccess Callback
[source, scala]¶
onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit¶
Internally, onBlockFetchSuccess checks if the <
onBlockFetchSuccess marks the input blockId as received (i.e. removes it from all the blocks to fetch as requested in <
onBlockFetchSuccess adds the managed buf (as SuccessFetchResult) to <
You should see the following DEBUG message in the logs:
DEBUG ShuffleBlockFetcherIterator: remainingBlocks: [blocks]
Regardless of zombie state of ShuffleBlockFetcherIterator, you should see the following TRACE message in the logs:
TRACE ShuffleBlockFetcherIterator: Got remote block [blockId] after [time] ms
=== [[sendRequest-BlockFetchingListener-onBlockFetchFailure]] onBlockFetchFailure Callback
[source, scala]¶
onBlockFetchFailure(blockId: String, e: Throwable): Unit¶
When onBlockFetchFailure is called, you should see the following ERROR message in the logs:
ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from [hostPort]
onBlockFetchFailure adds the block (as FailureFetchResult) to <
== [[throwFetchFailedException]] Throwing FetchFailedException (for ShuffleBlockId) -- throwFetchFailedException Internal Method
[source, scala]¶
throwFetchFailedException( blockId: BlockId, address: BlockManagerId, e: Throwable): Nothing
throwFetchFailedException throws a shuffle:FetchFailedException.md[FetchFailedException] when the input blockId is a ShuffleBlockId.
NOTE: throwFetchFailedException creates a FetchFailedException passing on the root cause of a failure, i.e. the input e.
Otherwise, throwFetchFailedException throws a SparkException:
Failed to get block [blockId], which is not a shuffle block
NOTE: throwFetchFailedException is used when ShuffleBlockFetcherIterator is requested for the <
== [[cleanup]] Releasing Resources -- cleanup Internal Method
[source, scala]¶
cleanup(): Unit¶
Internally, cleanup marks ShuffleBlockFetcherIterator a <
cleanup <
cleanup iterates over <SuccessFetchResult, increments remote bytes read and blocks fetched shuffle task metrics, and eventually releases the managed buffer.
NOTE: cleanup is used when <
== [[releaseCurrentResultBuffer]] Decrementing Reference Count Of and Releasing Result Buffer (for SuccessFetchResult) -- releaseCurrentResultBuffer Internal Method
[source, scala]¶
releaseCurrentResultBuffer(): Unit¶
releaseCurrentResultBuffer decrements the <
releaseCurrentResultBuffer releases <
NOTE: releaseCurrentResultBuffer is used when <BufferReleasingInputStream closes.
== [[fetchLocalBlocks]] fetchLocalBlocks Internal Method
[source, scala]¶
fetchLocalBlocks(): Unit¶
fetchLocalBlocks...FIXME
NOTE: fetchLocalBlocks is used when...FIXME
== [[hasNext]] hasNext Method
[source, scala]¶
hasNext: Boolean¶
NOTE: hasNext is part of Scala's ++https://www.scala-lang.org/api/current/scala/collection/Iterator.html#hasNext:Boolean++[Iterator Contract] to test whether this iterator can provide another element.
hasNext is positive (true) when <
Otherwise, hasNext is negative (false).
== [[splitLocalRemoteBlocks]] splitLocalRemoteBlocks Internal Method
[source, scala]¶
splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest]¶
splitLocalRemoteBlocks...FIXME
NOTE: splitLocalRemoteBlocks is used exclusively when ShuffleBlockFetcherIterator is requested to <
== [[next]] Retrieving Next Element -- next Method
[source, scala]¶
next(): (BlockId, InputStream)¶
NOTE: next is part of Scala's ++https://www.scala-lang.org/api/current/scala/collection/Iterator.html#next():A++[Iterator Contract] to produce the next element of this iterator.
next...FIXME