JobProgressListener
== [[JobProgressListener]] JobProgressListener Spark Listener
JobProgressListener is a SparkListener.md[] for spark-webui.md[web UI].
JobProgressListener intercepts the following SparkListener.md#SparkListenerEvent[Spark events].
.JobProgressListener Events [cols="1,2",options="header",width="100%"] |=== | Handler | Purpose | <
| <
| <StageUIData and JobUIData. | <StageUIData and JobUIData, and registers a new TaskUIData. | <StageUIData (and TaskUIData), ExecutorSummary, and JobUIData.
| <
| onEnvironmentUpdate | Sets schedulingMode property using the current configuration-properties.md#spark.scheduler.mode[spark.scheduler.mode] (from Spark Properties environment details).
Used in spark-webui-AllJobsPage.md[AllJobsPage] (for the Scheduling Mode), and to display pools in spark-webui-JobsTab.md[JobsTab] and spark-webui-StagesTab.md[StagesTab].
FIXME: Add the links/screenshots for pools. | onBlockManagerAdded | Records an executor and its block manager in the internal <onBlockManagerRemoved | Removes the executor from the internal <onApplicationStart | Records a Spark application's start time (in the internal startTime).
Used in spark-webui-jobs.md[Jobs tab] (for a total uptime and the event timeline) and spark-webui-jobs.md[Job page] (for the event timeline). | onApplicationEnd | Records a Spark application's end time (in the internal endTime).
Used in spark-webui-jobs.md[Jobs tab] (for a total uptime). | onTaskGettingResult | Does nothing.
FIXME: Why is this event intercepted at all?! |===
=== [[updateAggregateMetrics]] updateAggregateMetrics Method
CAUTION: FIXME
=== [[registries]] Registries and Counters
JobProgressListener uses registries to collect information about job executions.
.JobProgressListener Registries and Counters [cols="1,2",options="header",width="100%"] |=== | Name | Description | [[numCompletedStages]] numCompletedStages | | [[numFailedStages]] numFailedStages |
| [[stageIdToData]] stageIdToData | Holds <stageIdToInfo | | [[stageIdToActiveJobIds]] stageIdToActiveJobIds | | [[poolToActiveStages]] poolToActiveStages |
| [[activeJobs]] activeJobs | | [[completedJobs]] completedJobs | | [[failedJobs]] failedJobs | | [[jobIdToData]] jobIdToData | | [[jobGroupToJobIds]] jobGroupToJobIds |
| [[pendingStages]] pendingStages | | [[activeStages]] activeStages | | [[completedStages]] completedStages | | [[skippedStages]] skippedStages | | [[failedStages]] failedStages |
| [[executorIdToBlockManagerId]] executorIdToBlockManagerId | The lookup table of storage:BlockManagerId.md[]s per executor id.
Used to track block managers so the Stage page can display Address in spark-webui-StagePage.md#ExecutorTable[Aggregated Metrics by Executor].
FIXME: How does Executors page collect the very same information? |===
=== [[onJobStart]] onJobStart Callback
[source, scala]¶
onJobStart(jobStart: SparkListenerJobStart): Unit¶
onJobStart creates a <
onJobStart reads the optional Spark Job group id as spark.jobGroup.id (from properties in the input jobStart).
onJobStart then creates a JobUIData using the input jobStart with status attribute set to JobExecutionStatus.RUNNING and records it in <
onJobStart looks the job ids for the group id (in <
The internal <StageInfo in SparkListenerJobStart.stageInfos collection).
onJobStart records the stages of the job in <
onJobStart records StageInfos in <
=== [[onJobEnd]] onJobEnd Method
[source, scala]¶
onJobEnd(jobEnd: SparkListenerJobEnd): Unit¶
onJobEnd removes an entry in <
onJobEnd removes the job from <
When completed successfully, the job is added to <status attribute set to JobExecutionStatus.SUCCEEDED. <
When failed, the job is added to <status attribute set to JobExecutionStatus.FAILED. <
For every stage in the job, the stage is removed from the active jobs (in <
Every pending stage in <
=== [[onExecutorMetricsUpdate]] onExecutorMetricsUpdate Method
[source, scala]¶
onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit¶
=== [[onTaskStart]] onTaskStart Method
[source, scala]¶
onTaskStart(taskStart: SparkListenerTaskStart): Unit¶
onTaskStart updates StageUIData and JobUIData, and registers a new TaskUIData.
onTaskStart takes TaskInfo from the input taskStart.
onTaskStart looks the StageUIData for the stage and stage attempt ids up (in <
onTaskStart increments numActiveTasks and puts a TaskUIData for the task in stageData.taskData.
Ultimately, onTaskStart looks the stage in the internal <JobUIData (from <numActiveTasks.
=== [[onTaskEnd]] onTaskEnd Method
[source, scala]¶
onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit¶
onTaskEnd updates the StageUIData (and TaskUIData), ExecutorSummary, and JobUIData.
onTaskEnd takes TaskInfo from the input taskEnd.
NOTE: onTaskEnd does its processing when the TaskInfo is available and stageAttemptId is not -1.
onTaskEnd looks the StageUIData for the stage and stage attempt ids up (in <
onTaskEnd saves accumulables in the StageUIData.
onTaskEnd reads the ExecutorSummary for the executor (the task has finished on).
Depending on the task end's reason onTaskEnd increments succeededTasks, killedTasks or failedTasks counters.
onTaskEnd adds the task's duration to taskTime.
onTaskEnd decrements the number of active tasks (in the StageUIData).
Again, depending on the task end's reason onTaskEnd computes errorMessage and updates StageUIData.
CAUTION: FIXME Why is the same information in two different registries -- stageData and execSummary?!
If taskMetrics is available, <
The task's TaskUIData is looked up in stageData.taskData and updateTaskInfo and updateTaskMetrics are executed. errorMessage is updated.
onTaskEnd makes sure that the number of tasks in StageUIData (stageData.taskData) is not above <
Ultimately, onTaskEnd looks the stage in the internal <JobUIData (from <numActiveTasks and increments numCompletedTasks, numKilledTasks or numFailedTasks depending on the task's end reason.
=== [[onStageSubmitted]] onStageSubmitted Method
[source, scala]¶
onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit¶
=== [[onStageCompleted]] onStageCompleted Method
[source, scala]¶
onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit¶
onStageCompleted updates the StageUIData and JobUIData.
onStageCompleted reads stageInfo from the input stageCompleted and records it in <
onStageCompleted looks the StageUIData for the stage and the stage attempt ids up in <
onStageCompleted records accumulables in StageUIData.
onStageCompleted removes the stage from <
If the stage completed successfully (i.e. has no failureReason), onStageCompleted adds the stage to <
Otherwise, when the stage failed, onStageCompleted adds the stage to <
Ultimately, onStageCompleted looks the stage in the internal <JobUIData (from <numActiveStages. When completed successfully, it adds the stage to completedStageIndices. With failure, numFailedStages gets incremented.
=== [[JobUIData]] JobUIData
CAUTION: FIXME
=== [[blockManagerIds]] blockManagerIds method
[source, scala]¶
blockManagerIds: Seq[BlockManagerId]¶
CAUTION: FIXME
=== [[StageUIData]] StageUIData
CAUTION: FIXME
=== [[settings]] Settings
.Spark Properties [options="header",width="100%"] |=== | Setting | Default Value | Description | [[spark_ui_retainedJobs]] spark.ui.retainedJobs | 1000 | The number of jobs to hold information about | [[spark_ui_retainedStages]] spark.ui.retainedStages | 1000 | The number of stages to hold information about | [[spark_ui_retainedTasks]] spark.ui.retainedTasks | 100000 | The number of tasks to hold information about |===