OutputCommitCoordinator¶
OutputCommitCoordinator is used to coordinate <
[[result-commits]] Result commits are the outputs of running tasks (and a running task is described by a task attempt for a partition in a stage).
TIP: A partition (of a stage) is unlocked when it is marked as -1 in <
From the scaladoc (it's a private[spark] class so no way to find it https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala[outside the code]):
Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins" policy. OutputCommitCoordinator is instantiated in both the drivers and executors. On executors, it is configured with a reference to the driver's OutputCommitCoordinatorEndpoint, so requests to commit output will be forwarded to the driver's OutputCommitCoordinator.
The most interesting piece is in...
This class was introduced in https://issues.apache.org/jira/browse/SPARK-4879[SPARK-4879]; see that JIRA issue (and the associated pull requests) for an extensive design discussion.
[[authorized-committers]] Authorized committers are task attempts (per partition and stage) that can...FIXME
== [[stop]] stop Method
CAUTION: FIXME
== [[stageStart]] Stage Execution Started Notification
CAUTION: FIXME
== [[taskCompleted]] taskCompleted Method
[source, scala]¶
taskCompleted( stage: StageId, partition: PartitionId, attemptNumber: TaskAttemptNumber, reason: TaskEndReason): Unit
taskCompleted marks the partition (in the stage) completed (and hence a result committed), but only when the attemptNumber is amongst <partition).
Internally, taskCompleted first finds <stage.
For task completions with no stage registered in <taskCompleted simply exits.
DEBUG OutputCommitCoordinator: Ignoring task completion for completed stage
For the reason being Success taskCompleted does nothing and exits.
For the reason being TaskCommitDenied, you should see the following INFO message in the logs and taskCompleted exits.
INFO OutputCommitCoordinator: Task was denied committing, stage: [stage], partition: [partition], attempt: [attemptNumber]
NOTE: For no stage registered or reason being Success or TaskCommitDenied, taskCompleted does nothing (important).
For task completion reasons other than Success or TaskCommitDenied and attemptNumber amongst <taskCompleted <
NOTE: A task attempt can never be -1.
When the lock for partition is cleared, You should see the following DEBUG message in the logs:
DEBUG OutputCommitCoordinator: Authorized committer (attemptNumber=[attemptNumber], stage=[stage], partition=[partition]) failed; clearing lock
NOTE: taskCompleted is executed only when scheduler:DAGSchedulerEventProcessLoop.md#handleTaskCompletion[DAGScheduler informs that a task has completed].
== [[logging]] Logging
Enable ALL logging level for org.apache.spark.scheduler.OutputCommitCoordinator logger to see what happens inside.
Add the following line to conf/log4j.properties:
[source]¶
log4j.logger.org.apache.spark.scheduler.OutputCommitCoordinator=ALL¶
Refer to spark-logging.md[Logging].
== [[internal-properties]] Internal Properties
[cols="30m,70",options="header",width="100%"] |=== | Name | Description
| [[authorizedCommittersByStage]] authorizedCommittersByStage | Tracks commit locks for task attempts for a partition in a stage.
Used in <
|===