NettyBlockTransferService
= NettyBlockTransferService
NettyBlockTransferService is a storage:BlockTransferService.md[] that uses Netty for <
.NettyBlockTransferService, SparkEnv and BlockManager image::NettyBlockTransferService.png[align="center"]
== [[creating-instance]] Creating Instance
NettyBlockTransferService takes the following to be created:
- [[conf]] SparkConf.md[SparkConf]
- [[securityManager]] SecurityManager
- [[bindAddress]] Bind address
- [[hostName]] Host name
- [[_port]] Port number
- [[numCores]] Number of CPU cores
NettyBlockTransferService is created when SparkEnv is core:SparkEnv.md#create-NettyBlockTransferService[created] for the driver and executors (and core:SparkEnv.md#create-BlockManager[creates the BlockManager]).
== [[transportConf]][[transportContext]] TransportConf, TransportContext
NettyBlockTransferService creates a network:TransportConf.md[] for shuffle module (using network:SparkTransportConf.md#fromSparkConf[SparkTransportConf] utility) when <
NettyBlockTransferService uses the TransportConf for the following:
-
Create a network:TransportContext.md[] when requested to <
> -
Create a storage:OneForOneBlockFetcher.md[] and a core:RetryingBlockFetcher.md[RetryingBlockFetcher] when requested to <
>
NettyBlockTransferService uses the TransportContext to create the <
== [[clientFactory]] TransportClientFactory
NettyBlockTransferService creates a network:TransportClientFactory.md[] when requested to <
NettyBlockTransferService uses the TransportClientFactory for the following:
-
<
> -
<
> -
<
>
NettyBlockTransferService requests the TransportClientFactory to network:TransportClientFactory.md#close[close] when requested to <
== [[server]] TransportServer
NettyBlockTransferService <
NettyBlockTransferService uses the TransportServer for the following:
-
<
> -
<
>
NettyBlockTransferService requests the TransportServer to network:TransportServer.md#close[close] when requested to <
== [[port]] Port
NettyBlockTransferService simply requests the <
== [[shuffleMetrics]] Shuffle Metrics
[source,scala]¶
shuffleMetrics(): MetricSet¶
shuffleMetrics...FIXME
shuffleMetrics is part of the storage:ShuffleClient.md#shuffleMetrics[ShuffleClient] abstraction.
== [[fetchBlocks]] Fetching Blocks
[source, scala]¶
fetchBlocks( host: String, port: Int, execId: String, blockIds: Array[String], listener: BlockFetchingListener): Unit
When executed, fetchBlocks prints out the following TRACE message in the logs:
TRACE Fetch blocks from [host]:[port] (executor id [execId])
fetchBlocks then creates a RetryingBlockFetcher.BlockFetchStarter
where createAndStart
method...FIXME
Depending on the maximum number of acceptable IO exceptions (such as connection timeouts) per request, if the number is greater than 0
, fetchBlocks creates a core:RetryingBlockFetcher.md#creating-instance[RetryingBlockFetcher] and core:RetryingBlockFetcher.md#start[starts] it immediately.
NOTE: RetryingBlockFetcher
is created with the RetryingBlockFetcher.BlockFetchStarter
created earlier, the input blockIds
and listener
.
If however the number of retries is not greater than 0
(it could be 0
or less), the RetryingBlockFetcher.BlockFetchStarter
created earlier is started (with the input blockIds
and listener
).
In case of any Exception
, you should see the following ERROR message in the logs and the input BlockFetchingListener
gets notified (using onBlockFetchFailure
for every block id).
ERROR Exception while beginning fetchBlocks
fetchBlocks is part of storage:BlockTransferService.md#fetchBlocks[BlockTransferService] abstraction.
== [[appId]] Application Id
== [[close]] Closing NettyBlockTransferService
[source, scala]¶
close(): Unit¶
close...FIXME
close is part of the storage:BlockTransferService.md#close[BlockTransferService] abstraction.
== [[init]] Initializing NettyBlockTransferService
[source, scala]¶
init( blockDataManager: BlockDataManager): Unit
init creates a storage:NettyBlockRpcServer.md[] (for the SparkConf.md#getAppId[application id], a JavaSerializer and the given storage:BlockDataManager.md[BlockDataManager]) that is used to create a <
init creates the internal clientFactory
and a server.
CAUTION: FIXME What's the "a server"?
In the end, you should see the INFO message in the logs:
Server created on [hostName]:[port]
NOTE: hostname
is given when core:SparkEnv.md#NettyBlockTransferService[NettyBlockTransferService is created] and is controlled by spark-driver.md#spark_driver_host[spark.driver.host
Spark property] for the driver and differs per deployment environment for executors (as controlled by executor:CoarseGrainedExecutorBackend.md#main[--hostname
for CoarseGrainedExecutorBackend
]).
init is part of the storage:BlockTransferService.md#init[BlockTransferService] abstraction.
== [[uploadBlock]] Uploading Block
[source, scala]¶
uploadBlock( hostname: String, port: Int, execId: String, blockId: BlockId, blockData: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Future[Unit]
Internally, uploadBlock creates a TransportClient
client to send a <hostname
and port
).
NOTE: UploadBlock
message is processed by storage:NettyBlockRpcServer.md[NettyBlockRpcServer].
The UploadBlock
message holds the <execId
and blockId
. It also holds the serialized bytes for block metadata with level
and classTag
serialized (using the internal JavaSerializer
) as well as the serialized bytes for the input blockData
itself (this time however the serialization uses storage:BlockDataManager.md#ManagedBuffer[ManagedBuffer.nioByteBuffer
method]).
The entire UploadBlock
message is further serialized before sending (using TransportClient.sendRpc
).
CAUTION: FIXME Describe TransportClient
and clientFactory.createClient
.
When blockId
block was successfully uploaded, you should see the following TRACE message in the logs:
TRACE NettyBlockTransferService: Successfully uploaded block [blockId]
When an upload failed, you should see the following ERROR message in the logs:
ERROR Error while uploading block [blockId]
uploadBlock is part of the storage:BlockTransferService.md#uploadBlock[BlockTransferService] abstraction.
== [[UploadBlock]] UploadBlock Message
UploadBlock
is a BlockTransferMessage
that describes a block being uploaded, i.e. send over the wire from a <
.UploadBlock
Attributes [cols="1,2",options="header",width="100%"] |=== | Attribute | Description | appId
| The application id (the block belongs to) | execId
| The executor id | blockId
| The block id | metadata
| | blockData
| The block data as an array of bytes |===
As an Encodable
, UploadBlock
can calculate the encoded size and do encoding and decoding itself to or from a ByteBuf
, respectively.
== [[createServer]] createServer Internal Method
[source, scala]¶
createServer( bootstraps: List[TransportServerBootstrap]): TransportServer
createServer...FIXME
createServer is used when NettyBlockTransferService is requested to <
== [[logging]] Logging
Enable ALL
logging level for org.apache.spark.network.netty.NettyBlockTransferService
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
[source,plaintext]¶
log4j.logger.org.apache.spark.network.netty.NettyBlockTransferService=ALL¶
Refer to spark-logging.md[Logging].