SparkEnv¶
SparkEnv
is a handle to Spark Execution Environment with the core services of Apache Spark (that interact with each other to establish a distributed computing platform for a Spark application).
There are two separate SparkEnv
s of the driver and executors.
Core Services¶
Property | Service |
---|---|
blockManager | BlockManager |
broadcastManager | BroadcastManager |
closureSerializer | Serializer |
conf | SparkConf |
mapOutputTracker | MapOutputTracker |
memoryManager | MemoryManager |
metricsSystem | MetricsSystem |
outputCommitCoordinator | OutputCommitCoordinator |
rpcEnv | RpcEnv |
securityManager | SecurityManager |
serializer | Serializer |
serializerManager | SerializerManager |
shuffleManager | ShuffleManager |
Creating Instance¶
SparkEnv
takes the following to be created:
- Executor ID
- RpcEnv
- Serializer
- Serializer
- SerializerManager
- MapOutputTracker
- ShuffleManager
- BroadcastManager
- BlockManager
- SecurityManager
- MetricsSystem
- MemoryManager
- OutputCommitCoordinator
- SparkConf
SparkEnv
is created using create utility.
Temporary Directory of Driver¶
driverTmpDir: Option[String]
SparkEnv
defines driverTmpDir
internal registry for the driver to be used as the root directory of files added using SparkContext.addFile.
driverTmpDir
is undefined initially and is defined for the driver only when SparkEnv
utility is used to create a "base" SparkEnv.
Creating SparkEnv for Driver¶
createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv
createDriverEnv
creates a SparkEnv execution environment for the driver.
createDriverEnv
accepts an instance of SparkConf.md[SparkConf], spark-deployment-environments.md[whether it runs in local mode or not], scheduler:LiveListenerBus.md[], the number of cores to use for execution in local mode or 0
otherwise, and a scheduler:OutputCommitCoordinator.md[OutputCommitCoordinator] (default: none).
createDriverEnv
ensures that spark-driver.md#spark_driver_host[spark.driver.host] and spark-driver.md#spark_driver_port[spark.driver.port] settings are defined.
It then passes the call straight on to the <driver
executor id, isDriver
enabled, and the input parameters).
createDriverEnv
is used when SparkContext
is created.
Creating SparkEnv for Executor¶
createExecutorEnv(
conf: SparkConf,
executorId: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv
createExecutorEnv(
conf: SparkConf,
executorId: String,
bindAddress: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv
createExecutorEnv
creates an executor's (execution) environment that is the Spark execution environment for an executor.
createExecutorEnv
simply <
NOTE: The number of cores numCores
is configured using --cores
command-line option of CoarseGrainedExecutorBackend
and is specific to a cluster manager.
createExecutorEnv
is used when CoarseGrainedExecutorBackend
utility is requested to run
.
Creating "Base" SparkEnv (for Driver and Executors)¶
create(
conf: SparkConf,
executorId: String,
bindAddress: String,
advertiseAddress: String,
port: Option[Int],
isLocal: Boolean,
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv
create is an utility to create the "base" SparkEnv (that is "enhanced" for the driver and executors later on).
.create's Input Arguments and Their Usage [cols="1,2",options="header",width="100%"] |=== | Input Argument | Usage
| bindAddress
| Used to create rpc:index.md[RpcEnv] and storage:NettyBlockTransferService.md#creating-instance[NettyBlockTransferService].
| advertiseAddress
| Used to create rpc:index.md[RpcEnv] and storage:NettyBlockTransferService.md#creating-instance[NettyBlockTransferService].
| numUsableCores
| Used to create memory:MemoryManager.md[MemoryManager], storage:NettyBlockTransferService.md#creating-instance[NettyBlockTransferService] and storage:BlockManager.md#creating-instance[BlockManager]. |===
[[create-Serializer]] create creates a Serializer
(based on <DEBUG
message in the logs:
Using serializer: [serializer]
[[create-closure-Serializer]] create creates a closure Serializer
(based on <
[[ShuffleManager]][[create-ShuffleManager]] create creates a shuffle:ShuffleManager.md[ShuffleManager] given the value of configuration-properties.md#spark.shuffle.manager[spark.shuffle.manager] configuration property.
[[MemoryManager]][[create-MemoryManager]] create creates a memory:MemoryManager.md[MemoryManager] based on configuration-properties.md#spark.memory.useLegacyMode[spark.memory.useLegacyMode] setting (with memory:UnifiedMemoryManager.md[UnifiedMemoryManager] being the default and numCores
the input numUsableCores
).
[[NettyBlockTransferService]][[create-NettyBlockTransferService]] create creates a storage:NettyBlockTransferService.md#creating-instance[NettyBlockTransferService] with the following ports:
-
spark-driver.md#spark_driver_blockManager_port[spark.driver.blockManager.port] for the driver (default:
0
) -
storage:BlockManager.md#spark_blockManager_port[spark.blockManager.port] for an executor (default:
0
)
NOTE: create uses the NettyBlockTransferService
to <
CAUTION: FIXME A picture with SparkEnv, NettyBlockTransferService
and the ports "armed".
[[BlockManagerMaster]][[create-BlockManagerMaster]] create creates a storage:BlockManagerMaster.md#creating-instance[BlockManagerMaster] object with the BlockManagerMaster
RPC endpoint reference (by <isDriver
flag.
.Creating BlockManager for the Driver image::sparkenv-driver-blockmanager.png[align="center"]
NOTE: create registers the BlockManagerMaster RPC endpoint for the driver and looks it up for executors.
.Creating BlockManager for Executor image::sparkenv-executor-blockmanager.png[align="center"]
[[BlockManager]][[create-BlockManager]] create creates a storage:BlockManager.md#creating-instance[BlockManager] (using the above <
create creates a core:BroadcastManager.md[].
[[MapOutputTracker]][[create-MapOutputTracker]] create creates a scheduler:MapOutputTrackerMaster.md[MapOutputTrackerMaster] or scheduler:MapOutputTrackerWorker.md[MapOutputTrackerWorker] for the driver and executors, respectively.
NOTE: The choice of the real implementation of scheduler:MapOutputTracker.md[MapOutputTracker] is based on whether the input executorId
is driver or not.
[[MapOutputTrackerMasterEndpoint]][[create-MapOutputTrackerMasterEndpoint]] create <
CAUTION: FIXME
[[create-CacheManager]] It creates a CacheManager.
[[create-MetricsSystem]] It creates a MetricsSystem for a driver and a worker separately.
It initializes userFiles
temporary directory used for downloading dependencies for a driver while this is the executor's current working directory for an executor.
[[create-OutputCommitCoordinator]] An OutputCommitCoordinator is created.
Usage¶
create
is used when SparkEnv
utility is used to create a SparkEnv
for the driver and executors.
== [[get]] Accessing SparkEnv
[source, scala]¶
get: SparkEnv¶
get returns the SparkEnv on the driver and executors.
[source, scala]¶
import org.apache.spark.SparkEnv assert(SparkEnv.get.isInstanceOf[SparkEnv])
== [[registerOrLookupEndpoint]] Registering or Looking up RPC Endpoint by Name
[source, scala]¶
registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint)
registerOrLookupEndpoint
registers or looks up a RPC endpoint by name
.
If called from the driver, you should see the following INFO message in the logs:
Registering [name]
And the RPC endpoint is registered in the RPC environment.
Otherwise, it obtains a RPC endpoint reference by name
.
== [[stop]] Stopping SparkEnv
[source, scala]¶
stop(): Unit¶
stop checks <
Otherwise, stop turns isStopped
flag on, stops all pythonWorkers
and requests the following services to stop:
- scheduler:MapOutputTracker.md#stop[MapOutputTracker]
- shuffle:ShuffleManager.md#stop[ShuffleManager]
- core:BroadcastManager.md#stop[BroadcastManager]
- storage:BlockManager.md#stop[BlockManager]
- storage:BlockManagerMaster.md#stop[BlockManagerMaster]
- MetricsSystem
- scheduler:OutputCommitCoordinator.md#stop[OutputCommitCoordinator]
stop rpc:index.md#shutdown[requests RpcEnv
to shut down] and rpc:index.md#awaitTermination[waits till it terminates].
Only on the driver, stop deletes the <
Exception while deleting Spark temp dir: [path]
NOTE: stop is used when SparkContext.md#stop[SparkContext
stops] (on the driver) and executor:Executor.md#stop[Executor
stops].
== [[set]] set
Method
[source, scala]¶
set(e: SparkEnv): Unit¶
set
saves the input SparkEnv to <
NOTE: set
is used when...FIXME
== [[environmentDetails]] environmentDetails Utility
[source, scala]¶
environmentDetails( conf: SparkConf, schedulingMode: String, addedJars: Seq[String], addedFiles: Seq[String]): Map[String, Seq[(String, String)]]
environmentDetails...FIXME
environmentDetails is used when SparkContext is requested to SparkContext.md#postEnvironmentUpdate[post a SparkListenerEnvironmentUpdate event].
== [[logging]] Logging
Enable ALL
logging level for org.apache.spark.SparkEnv
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
[source]¶
log4j.logger.org.apache.spark.SparkEnv=ALL¶
Refer to spark-logging.md[Logging].
== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| isStopped | [[isStopped]] Used to mark SparkEnv stopped
Default: false
|===