Inside Creating SparkContext¶
This document describes the internals of what happens when a new SparkContext is created.
import org.apache.spark.{SparkConf, SparkContext}
// 1. Create Spark configuration
val conf = new SparkConf()
.setAppName("SparkMe Application")
.setMaster("local[*]")
// 2. Create Spark context
val sc = new SparkContext(conf)
creationSite¶
creationSite: CallSite
SparkContext determines call site.
assertOnDriver¶
SparkContext...FIXME
markPartiallyConstructed¶
SparkContext...FIXME
startTime¶
startTime: Long
SparkContext records the current time (in ms).
stopped¶
stopped: AtomicBoolean
SparkContext initializes stopped flag to false.
Printing Out Spark Version¶
SparkContext prints out the following INFO message to the logs:
Running Spark version [SPARK_VERSION]
sparkUser¶
sparkUser: String
SparkContext determines Spark user.
SparkConf¶
_conf: SparkConf
SparkContext clones the SparkConf and requests it to validateSettings.
Enforcing Mandatory Configuration Properties¶
SparkContext asserts that spark.master and spark.app.name are defined (in the SparkConf).
A master URL must be set in your configuration
An application name must be set in your configuration
DriverLogger¶
_driverLogger: Option[DriverLogger]
SparkContext creates a DriverLogger.
ResourceInformation¶
_resources: Map[String, ResourceInformation]
SparkContext uses spark.driver.resourcesFile configuration property to discovery driver resources and prints out the following INFO message to the logs:
==============================================================
Resources for [componentName]:
[resources]
==============================================================
Submitted Application¶
SparkContext prints out the following INFO message to the logs (with the value of spark.app.name configuration property):
Submitted application: [appName]
Spark on YARN and spark.yarn.app.id¶
For Spark on YARN in cluster deploy mode], SparkContext checks whether spark.yarn.app.id configuration property is defined. SparkException is thrown if it does not exist.
Detected yarn cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.
Displaying Spark Configuration¶
With spark.logConf configuration property enabled, SparkContext prints out the following INFO message to the logs:
Spark configuration:
[conf.toDebugString]
Note
SparkConf.toDebugString is used very early in the initialization process and other settings configured afterwards are not included. Use SparkContext.getConf.toDebugString once SparkContext is initialized.
Setting Configuration Properties¶
- spark.driver.host to the current value of the property (to override the default)
- spark.driver.port to
0unless defined already - spark.executor.id to
driver
User-Defined Jar Files¶
_jars: Seq[String]
SparkContext sets the _jars to spark.jars configuration property.
User-Defined Files¶
_files: Seq[String]
SparkContext sets the _files to spark.files configuration property.
spark.eventLog.dir¶
_eventLogDir: Option[URI]
If spark-history-server:EventLoggingListener.md[event logging] is enabled, i.e. EventLoggingListener.md#spark_eventLog_enabled[spark.eventLog.enabled] flag is true, the internal field _eventLogDir is set to the value of EventLoggingListener.md#spark_eventLog_dir[spark.eventLog.dir] setting or the default value /tmp/spark-events.
spark.eventLog.compress¶
_eventLogCodec: Option[String]
Also, if spark-history-server:EventLoggingListener.md#spark_eventLog_compress[spark.eventLog.compress] is enabled (it is not by default), the short name of the io:CompressionCodec.md[CompressionCodec] is assigned to _eventLogCodec. The config key is core:BroadcastManager.md#spark_io_compression_codec[spark.io.compression.codec] (default: lz4).
Creating LiveListenerBus¶
_listenerBus: LiveListenerBus
SparkContext creates a LiveListenerBus.
Creating AppStatusStore¶
_statusStore: AppStatusStore
SparkContext requests AppStatusStore to create a core:AppStatusStore.md#createLiveStore[live store] (i.e. the AppStatusStore for a live Spark application) and requests <
NOTE: The current AppStatusStore is available as SparkContext.md#statusStore[statusStore] property of the SparkContext.
Creating SparkEnv¶
_env: SparkEnv
SparkContext creates a SparkEnv and requests SparkEnv to use the instance as the default SparkEnv.
spark.repl.class.uri¶
With spark.repl.class.outputDir configuration property defined, SparkContext sets spark.repl.class.uri configuration property to be...FIXME
Creating SparkStatusTracker¶
_statusTracker: SparkStatusTracker
SparkContext creates a SparkStatusTracker (with itself and the AppStatusStore).
Creating ConsoleProgressBar¶
_progressBar: Option[ConsoleProgressBar]
SparkContext creates a ConsoleProgressBar only when spark.ui.showConsoleProgress configuration property is enabled.
Creating SparkUI¶
_ui: Option[SparkUI]
SparkContext creates a SparkUI only when spark.ui.enabled configuration property is enabled.
SparkContext requests the SparkUI to bind.
Hadoop Configuration¶
_hadoopConfiguration: Configuration
SparkContext creates a new Hadoop Configuration.
Adding User-Defined Jar Files¶
If there are jars given through the SparkContext constructor, they are added using addJar.
Adding User-Defined Files¶
SparkContext adds the files in spark.files configuration property.
_executorMemory¶
_executorMemory: Int
SparkContext determines the amount of memory to allocate to each executor. It is the value of executor:Executor.md#spark.executor.memory[spark.executor.memory] setting, or SparkContext.md#environment-variables[SPARK_EXECUTOR_MEMORY] environment variable (or currently-deprecated SPARK_MEM), or defaults to 1024.
_executorMemory is later available as sc.executorMemory and used for LOCAL_CLUSTER_REGEX, spark-standalone.md#SparkDeploySchedulerBackend[Spark Standalone's SparkDeploySchedulerBackend], to set executorEnvs("SPARK_EXECUTOR_MEMORY"), MesosSchedulerBackend, CoarseMesosSchedulerBackend.
SPARK_PREPEND_CLASSES Environment Variable¶
The value of SPARK_PREPEND_CLASSES environment variable is included in executorEnvs.
For Mesos SchedulerBackend Only¶
The Mesos scheduler backend's configuration is included in executorEnvs, i.e. SparkContext.md#environment-variables[SPARK_EXECUTOR_MEMORY], _conf.getExecutorEnv, and SPARK_USER.
ShuffleDriverComponents¶
_shuffleDriverComponents: ShuffleDriverComponents
SparkContext...FIXME
Registering HeartbeatReceiver¶
SparkContext registers HeartbeatReceiver RPC endpoint.
PluginContainer¶
_plugins: Option[PluginContainer]
SparkContext creates a PluginContainer (with itself and the _resources).
Creating SchedulerBackend and TaskScheduler¶
SparkContext object is requested to SparkContext.md#createTaskScheduler[create the SchedulerBackend with the TaskScheduler] (for the given master URL) and the result becomes the internal _schedulerBackend and _taskScheduler.
scheduler:DAGScheduler.md#creating-instance[DAGScheduler is created] (as _dagScheduler).
Sending Blocking TaskSchedulerIsSet¶
SparkContext sends a blocking TaskSchedulerIsSet message to HeartbeatReceiver RPC endpoint (to inform that the TaskScheduler is now available).
Heartbeater¶
_heartbeater: Heartbeater
SparkContext creates a Heartbeater and starts it.
Starting TaskScheduler¶
SparkContext requests the TaskScheduler to start.
Setting Spark Application's and Execution Attempt's IDs¶
SparkContext sets the internal fields -- _applicationId and _applicationAttemptId -- (using applicationId and applicationAttemptId methods from the scheduler:TaskScheduler.md#contract[TaskScheduler Contract]).
NOTE: SparkContext requests TaskScheduler for the scheduler:TaskScheduler.md#applicationId[unique identifier of a Spark application] (that is currently only implemented by scheduler:TaskSchedulerImpl.md#applicationId[TaskSchedulerImpl] that uses SchedulerBackend to scheduler:SchedulerBackend.md#applicationId[request the identifier]).
NOTE: The unique identifier of a Spark application is used to initialize spark-webui-SparkUI.md#setAppId[SparkUI] and storage:BlockManager.md#initialize[BlockManager].
NOTE: _applicationAttemptId is used when SparkContext is requested for the SparkContext.md#applicationAttemptId[unique identifier of execution attempt of a Spark application] and when EventLoggingListener spark-history-server:EventLoggingListener.md#creating-instance[is created].
Setting spark.app.id Spark Property in SparkConf¶
SparkContext sets SparkConf.md#spark.app.id[spark.app.id] property to be the <<_applicationId, unique identifier of a Spark application>> and, if enabled, spark-webui-SparkUI.md#setAppId[passes it on to SparkUI].
spark.ui.proxyBase¶
Initializing SparkUI¶
SparkContext requests the SparkUI (if defined) to setAppId with the _applicationId.
Initializing BlockManager¶
The storage:BlockManager.md#initialize[BlockManager (for the driver) is initialized] (with _applicationId).
Starting MetricsSystem¶
SparkContext requests the MetricsSystem to start.
NOTE: SparkContext starts MetricsSystem after <MetricsSystem uses it to build unique identifiers fo metrics sources.
Attaching JSON Servlet Handler¶
SparkContext requests the MetricsSystem for a JSON servlet handler and requests the <<_ui, SparkUI>> to spark-webui-WebUI.md#attachHandler[attach it].
Starting EventLoggingListener (with Event Log Enabled)¶
_eventLogger: Option[EventLoggingListener]
With spark.eventLog.enabled configuration property enabled, SparkContext creates an EventLoggingListener and requests it to start.
SparkContext requests the LiveListenerBus to add the EventLoggingListener to eventLog event queue.
With spark.eventLog.enabled disabled, _eventLogger is None (undefined).
ContextCleaner¶
_cleaner: Option[ContextCleaner]
With spark.cleaner.referenceTracking configuration property enabled, SparkContext creates a ContextCleaner (with itself and the _shuffleDriverComponents).
SparkContext requests the ContextCleaner to start
ExecutorAllocationManager¶
_executorAllocationManager: Option[ExecutorAllocationManager]
SparkContext initializes _executorAllocationManager internal registry.
SparkContext creates an ExecutorAllocationManager when:
-
Dynamic Allocation of Executors is enabled (based on spark.dynamicAllocation.enabled configuration property and the master URL)
The ExecutorAllocationManager is requested to start.
Registering User-Defined SparkListeners¶
SparkContext registers user-defined listeners and starts SparkListenerEvent event delivery to the listeners.
postEnvironmentUpdate¶
postEnvironmentUpdate is called that posts SparkListener.md#SparkListenerEnvironmentUpdate[SparkListenerEnvironmentUpdate] message on scheduler:LiveListenerBus.md[] with information about Task Scheduler's scheduling mode, added jar and file paths, and other environmental details. They are displayed in web UI's spark-webui-environment.md[Environment tab].
postApplicationStart¶
SparkListener.md#SparkListenerApplicationStart[SparkListenerApplicationStart] message is posted to scheduler:LiveListenerBus.md[] (using the internal postApplicationStart method).
postStartHook¶
TaskScheduler scheduler:TaskScheduler.md#postStartHook[is notified that SparkContext is almost fully initialized].
NOTE: scheduler:TaskScheduler.md#postStartHook[TaskScheduler.postStartHook] does nothing by default, but custom implementations offer more advanced features, i.e. TaskSchedulerImpl scheduler:TaskSchedulerImpl.md#postStartHook[blocks the current thread until SchedulerBackend is ready]. There is also YarnClusterScheduler for Spark on YARN in cluster deploy mode.
Registering Metrics Sources¶
SparkContext requests MetricsSystem to register metrics sources for the following services:
Adding Shutdown Hook¶
SparkContext adds a shutdown hook (using ShutdownHookManager.addShutdownHook()).
SparkContext prints out the following DEBUG message to the logs:
Adding shutdown hook
CAUTION: FIXME ShutdownHookManager.addShutdownHook()
Any non-fatal Exception leads to termination of the Spark context instance.
CAUTION: FIXME What does NonFatal represent in Scala?
CAUTION: FIXME Finish me
Initializing nextShuffleId and nextRddId Internal Counters¶
nextShuffleId and nextRddId start with 0.
CAUTION: FIXME Where are nextShuffleId and nextRddId used?
A new instance of Spark context is created and ready for operation.
Loading External Cluster Manager for URL (getClusterManager method)¶
getClusterManager(
url: String): Option[ExternalClusterManager]
getClusterManager loads scheduler:ExternalClusterManager.md[] that scheduler:ExternalClusterManager.md#canCreate[can handle the input url].
If there are two or more external cluster managers that could handle url, a SparkException is thrown:
Multiple Cluster Managers ([serviceLoaders]) registered for the url [url].
NOTE: getClusterManager uses Java's ++https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html#load-java.lang.Class-java.lang.ClassLoader-++[ServiceLoader.load] method.
NOTE: getClusterManager is used to find a cluster manager for a master URL when SparkContext.md#createTaskScheduler[creating a SchedulerBackend and a TaskScheduler for the driver].
setupAndStartListenerBus¶
setupAndStartListenerBus(): Unit
setupAndStartListenerBus is an internal method that reads configuration-properties.md#spark.extraListeners[spark.extraListeners] configuration property from the current SparkConf.md[SparkConf] to create and register SparkListenerInterface listeners.
It expects that the class name represents a SparkListenerInterface listener with one of the following constructors (in this order):
- a single-argument constructor that accepts SparkConf.md[SparkConf]
- a zero-argument constructor
setupAndStartListenerBus scheduler:LiveListenerBus.md#ListenerBus-addListener[registers every listener class].
You should see the following INFO message in the logs:
INFO Registered listener [className]
It scheduler:LiveListenerBus.md#start[starts LiveListenerBus] and records it in the internal _listenerBusStarted.
When no single-SparkConf or zero-argument constructor could be found for a class name in configuration-properties.md#spark.extraListeners[spark.extraListeners] configuration property, a SparkException is thrown with the message:
[className] did not have a zero-argument constructor or a single-argument constructor that accepts SparkConf. Note: if the class is defined inside of another Scala class, then its constructors may accept an implicit parameter that references the enclosing class; in this case, you must define the listener as a top-level class in order to prevent this extra parameter from breaking Spark's ability to find a valid constructor.
Any exception while registering a SparkListenerInterface listener stops the SparkContext and a SparkException is thrown and the source exception's message.
Exception when registering SparkListener
Tip
Set INFO logging level for org.apache.spark.SparkContext logger to see the extra listeners being registered.
Registered listener pl.japila.spark.CustomSparkListener