ExecutorAllocationManager¶
ExecutorAllocationManager
can be used to dynamically allocate executors based on processing workload.
ExecutorAllocationManager
intercepts Spark events using the internal ExecutorAllocationListener that keeps track of the workload.
Creating Instance¶
ExecutorAllocationManager
takes the following to be created:
- ExecutorAllocationClient
- LiveListenerBus
- SparkConf
- ContextCleaner (default:
None
) -
Clock
(default:SystemClock
)
ExecutorAllocationManager
is created when SparkContext is created.
Validating Configuration¶
validateSettings(): Unit
validateSettings
makes sure that the settings for dynamic allocation are correct.
validateSettings
throws a SparkException
when the following are not met:
-
spark.dynamicAllocation.minExecutors must be positive
-
spark.dynamicAllocation.maxExecutors must be
0
or greater -
spark.dynamicAllocation.minExecutors must be less than or equal to spark.dynamicAllocation.maxExecutors
-
spark.dynamicAllocation.executorIdleTimeout must be greater than
0
-
spark.shuffle.service.enabled must be enabled.
-
The number of tasks per core, i.e. spark.executor.cores divided by spark.task.cpus, is not zero.
Performance Metrics¶
ExecutorAllocationManager
uses ExecutorAllocationManagerSource for performance metrics.
ExecutorMonitor¶
ExecutorAllocationManager
creates an ExecutorMonitor when created.
ExecutorMonitor
is added to the management queue (of LiveListenerBus) when ExecutorAllocationManager
is started.
ExecutorMonitor
is attached (to the ContextCleaner) when ExecutorAllocationManager
is started.
ExecutorMonitor
is requested to reset when ExecutorAllocationManager
is requested to reset.
ExecutorMonitor
is used for the performance metrics:
- numberExecutorsPendingToRemove (based on pendingRemovalCount)
- numberAllExecutors (based on executorCount)
ExecutorMonitor
is used for the following:
- timedOutExecutors when
ExecutorAllocationManager
is requested to schedule - executorCount when
ExecutorAllocationManager
is requested to addExecutors - executorCount, pendingRemovalCount and executorsKilled when
ExecutorAllocationManager
is requested to removeExecutors
ExecutorAllocationListener¶
ExecutorAllocationManager
creates an ExecutorAllocationListener when created to intercept Spark events that impact the allocation policy.
ExecutorAllocationListener
is added to the management queue (of LiveListenerBus) when ExecutorAllocationManager
is started.
ExecutorAllocationListener
is used to calculate the maximum number of executors needed.
spark.dynamicAllocation.executorAllocationRatio¶
ExecutorAllocationManager
uses spark.dynamicAllocation.executorAllocationRatio configuration property for maxNumExecutorsNeeded.
tasksPerExecutorForFullParallelism¶
ExecutorAllocationManager
uses spark.executor.cores and spark.task.cpus configuration properties for the number of tasks that can be submitted to an executor for full parallelism.
Used when:
Maximum Number of Executors Needed¶
maxNumExecutorsNeeded(): Int
maxNumExecutorsNeeded
requests the ExecutorAllocationListener for the number of pending and running tasks.
maxNumExecutorsNeeded
is the smallest integer value that is greater than or equal to the multiplication of the total number of pending and running tasks by executorAllocationRatio divided by tasksPerExecutorForFullParallelism.
maxNumExecutorsNeeded
is used for:
- updateAndSyncNumExecutorsTarget
- numberMaxNeededExecutors performance metric
ExecutorAllocationClient¶
ExecutorAllocationManager
is given an ExecutorAllocationClient when created.
Starting ExecutorAllocationManager¶
start(): Unit
start
requests the LiveListenerBus to add to the management queue:
start
requests the ContextCleaner (if defined) to attach the ExecutorMonitor.
creates a scheduleTask
(a Java Runnable) for schedule when started.
start
requests the ScheduledExecutorService to schedule the scheduleTask
every 100
ms.
Note
The schedule delay of 100
is not configurable.
start
requests the ExecutorAllocationClient to request the total executors with the following:
start
is used when SparkContext
is created.
Scheduling Executors¶
schedule(): Unit
schedule
calls <
It then go over <
updateAndSyncNumExecutorsTarget¶
updateAndSyncNumExecutorsTarget(
now: Long): Int
updateAndSyncNumExecutorsTarget
maxNumExecutorsNeeded.
updateAndSyncNumExecutorsTarget
...FIXME
Stopping ExecutorAllocationManager¶
stop(): Unit
stop
shuts down <
Note
stop
waits 10 seconds for the termination to be complete.
stop
is used when SparkContext
is requested to stop
spark-dynamic-executor-allocation Allocation Executor¶
spark-dynamic-executor-allocation
allocation executor is a...FIXME
ExecutorAllocationManagerSource¶
ExecutorAllocationManagerSource
Logging¶
Enable ALL
logging level for org.apache.spark.ExecutorAllocationManager
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.ExecutorAllocationManager=ALL
Refer to Logging.