SparkEnv¶
SparkEnv is a handle to Spark Execution Environment with the core services of Apache Spark (that interact with each other to establish a distributed computing platform for a Spark application).
There are two separate SparkEnvs of the driver and executors.
Core Services¶
| Property | Service |
|---|---|
| blockManager | BlockManager |
| broadcastManager | BroadcastManager |
| closureSerializer | Serializer |
| conf | SparkConf |
| mapOutputTracker | MapOutputTracker |
| memoryManager | MemoryManager |
| metricsSystem | MetricsSystem |
| outputCommitCoordinator | OutputCommitCoordinator |
| rpcEnv | RpcEnv |
| securityManager | SecurityManager |
| serializer | Serializer |
| serializerManager | SerializerManager |
| shuffleManager | ShuffleManager |
Creating Instance¶
SparkEnv takes the following to be created:
- Executor ID
- RpcEnv
- Serializer
- Serializer
- SerializerManager
- MapOutputTracker
- ShuffleManager
- BroadcastManager
- BlockManager
- SecurityManager
- MetricsSystem
- MemoryManager
- OutputCommitCoordinator
- SparkConf
SparkEnv is created using create utility.
Temporary Directory of Driver¶
driverTmpDir: Option[String]
SparkEnv defines driverTmpDir internal registry for the driver to be used as the root directory of files added using SparkContext.addFile.
driverTmpDir is undefined initially and is defined for the driver only when SparkEnv utility is used to create a "base" SparkEnv.
Creating SparkEnv for Driver¶
createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv
createDriverEnv creates a SparkEnv execution environment for the driver.

createDriverEnv accepts an instance of SparkConf.md[SparkConf], spark-deployment-environments.md[whether it runs in local mode or not], scheduler:LiveListenerBus.md[], the number of cores to use for execution in local mode or 0 otherwise, and a scheduler:OutputCommitCoordinator.md[OutputCommitCoordinator] (default: none).
createDriverEnv ensures that spark-driver.md#spark_driver_host[spark.driver.host] and spark-driver.md#spark_driver_port[spark.driver.port] settings are defined.
It then passes the call straight on to the <driver executor id, isDriver enabled, and the input parameters).
createDriverEnv is used when SparkContext is created.
Creating SparkEnv for Executor¶
createExecutorEnv(
conf: SparkConf,
executorId: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv
createExecutorEnv(
conf: SparkConf,
executorId: String,
bindAddress: String,
hostname: String,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv
createExecutorEnv creates an executor's (execution) environment that is the Spark execution environment for an executor.

createExecutorEnv simply <
NOTE: The number of cores numCores is configured using --cores command-line option of CoarseGrainedExecutorBackend and is specific to a cluster manager.
createExecutorEnv is used when CoarseGrainedExecutorBackend utility is requested to run.
Creating "Base" SparkEnv (for Driver and Executors)¶
create(
conf: SparkConf,
executorId: String,
bindAddress: String,
advertiseAddress: String,
port: Option[Int],
isLocal: Boolean,
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv
create is an utility to create the "base" SparkEnv (that is "enhanced" for the driver and executors later on).
.create's Input Arguments and Their Usage [cols="1,2",options="header",width="100%"] |=== | Input Argument | Usage
| bindAddress | Used to create rpc:index.md[RpcEnv] and storage:NettyBlockTransferService.md#creating-instance[NettyBlockTransferService].
| advertiseAddress | Used to create rpc:index.md[RpcEnv] and storage:NettyBlockTransferService.md#creating-instance[NettyBlockTransferService].
| numUsableCores | Used to create memory:MemoryManager.md[MemoryManager], storage:NettyBlockTransferService.md#creating-instance[NettyBlockTransferService] and storage:BlockManager.md#creating-instance[BlockManager]. |===
[[create-Serializer]] create creates a Serializer (based on <DEBUG message in the logs:
Using serializer: [serializer]
[[create-closure-Serializer]] create creates a closure Serializer (based on <
[[ShuffleManager]][[create-ShuffleManager]] create creates a shuffle:ShuffleManager.md[ShuffleManager] given the value of configuration-properties.md#spark.shuffle.manager[spark.shuffle.manager] configuration property.
[[MemoryManager]][[create-MemoryManager]] create creates a memory:MemoryManager.md[MemoryManager] based on configuration-properties.md#spark.memory.useLegacyMode[spark.memory.useLegacyMode] setting (with memory:UnifiedMemoryManager.md[UnifiedMemoryManager] being the default and numCores the input numUsableCores).
[[NettyBlockTransferService]][[create-NettyBlockTransferService]] create creates a storage:NettyBlockTransferService.md#creating-instance[NettyBlockTransferService] with the following ports:
-
spark-driver.md#spark_driver_blockManager_port[spark.driver.blockManager.port] for the driver (default:
0) -
storage:BlockManager.md#spark_blockManager_port[spark.blockManager.port] for an executor (default:
0)
NOTE: create uses the NettyBlockTransferService to <
CAUTION: FIXME A picture with SparkEnv, NettyBlockTransferService and the ports "armed".
[[BlockManagerMaster]][[create-BlockManagerMaster]] create creates a storage:BlockManagerMaster.md#creating-instance[BlockManagerMaster] object with the BlockManagerMaster RPC endpoint reference (by <isDriver flag.
.Creating BlockManager for the Driver image::sparkenv-driver-blockmanager.png[align="center"]
NOTE: create registers the BlockManagerMaster RPC endpoint for the driver and looks it up for executors.
.Creating BlockManager for Executor image::sparkenv-executor-blockmanager.png[align="center"]
[[BlockManager]][[create-BlockManager]] create creates a storage:BlockManager.md#creating-instance[BlockManager] (using the above <
create creates a core:BroadcastManager.md[].
[[MapOutputTracker]][[create-MapOutputTracker]] create creates a scheduler:MapOutputTrackerMaster.md[MapOutputTrackerMaster] or scheduler:MapOutputTrackerWorker.md[MapOutputTrackerWorker] for the driver and executors, respectively.
NOTE: The choice of the real implementation of scheduler:MapOutputTracker.md[MapOutputTracker] is based on whether the input executorId is driver or not.
[[MapOutputTrackerMasterEndpoint]][[create-MapOutputTrackerMasterEndpoint]] create <
CAUTION: FIXME
[[create-CacheManager]] It creates a CacheManager.
[[create-MetricsSystem]] It creates a MetricsSystem for a driver and a worker separately.
It initializes userFiles temporary directory used for downloading dependencies for a driver while this is the executor's current working directory for an executor.
[[create-OutputCommitCoordinator]] An OutputCommitCoordinator is created.
Usage¶
create is used when SparkEnv utility is used to create a SparkEnv for the driver and executors.
== [[get]] Accessing SparkEnv
[source, scala]¶
get: SparkEnv¶
get returns the SparkEnv on the driver and executors.
[source, scala]¶
import org.apache.spark.SparkEnv assert(SparkEnv.get.isInstanceOf[SparkEnv])
== [[registerOrLookupEndpoint]] Registering or Looking up RPC Endpoint by Name
[source, scala]¶
registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint)
registerOrLookupEndpoint registers or looks up a RPC endpoint by name.
If called from the driver, you should see the following INFO message in the logs:
Registering [name]
And the RPC endpoint is registered in the RPC environment.
Otherwise, it obtains a RPC endpoint reference by name.
== [[stop]] Stopping SparkEnv
[source, scala]¶
stop(): Unit¶
stop checks <
Otherwise, stop turns isStopped flag on, stops all pythonWorkers and requests the following services to stop:
- scheduler:MapOutputTracker.md#stop[MapOutputTracker]
- shuffle:ShuffleManager.md#stop[ShuffleManager]
- core:BroadcastManager.md#stop[BroadcastManager]
- storage:BlockManager.md#stop[BlockManager]
- storage:BlockManagerMaster.md#stop[BlockManagerMaster]
- MetricsSystem
- scheduler:OutputCommitCoordinator.md#stop[OutputCommitCoordinator]
stop rpc:index.md#shutdown[requests RpcEnv to shut down] and rpc:index.md#awaitTermination[waits till it terminates].
Only on the driver, stop deletes the <
Exception while deleting Spark temp dir: [path]
NOTE: stop is used when SparkContext.md#stop[SparkContext stops] (on the driver) and executor:Executor.md#stop[Executor stops].
== [[set]] set Method
[source, scala]¶
set(e: SparkEnv): Unit¶
set saves the input SparkEnv to <
NOTE: set is used when...FIXME
== [[environmentDetails]] environmentDetails Utility
[source, scala]¶
environmentDetails( conf: SparkConf, schedulingMode: String, addedJars: Seq[String], addedFiles: Seq[String]): Map[String, Seq[(String, String)]]
environmentDetails...FIXME
environmentDetails is used when SparkContext is requested to SparkContext.md#postEnvironmentUpdate[post a SparkListenerEnvironmentUpdate event].
== [[logging]] Logging
Enable ALL logging level for org.apache.spark.SparkEnv logger to see what happens inside.
Add the following line to conf/log4j.properties:
[source]¶
log4j.logger.org.apache.spark.SparkEnv=ALL¶
Refer to spark-logging.md[Logging].
== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| isStopped | [[isStopped]] Used to mark SparkEnv stopped
Default: false
|===