TaskMetrics¶
TaskMetrics is a <
TaskMetrics is <
-
Stageis requested to scheduler:Stage.md#makeNewStageAttempt[create a new stage attempt] (whenDAGScheduleris requested to scheduler:DAGScheduler.md#submitMissingTasks[submit the missing tasks of a stage]) -
TaskMetrics utility is requested to <
>, < >, and < >
[[creating-instance]] TaskMetrics takes no arguments to be created.
TaskMetrics is available using TaskContext.taskMetrics.
TIP: Use SparkListener.md#onTaskEnd[SparkListener.onTaskEnd] to intercept SparkListener.md#SparkListenerTaskEnd[SparkListenerTaskEnd] events to access the <
TIP: Use <
TIP: Use spark-history-server:EventLoggingListener.md[EventLoggingListener] for post-execution (history) statistics.
TaskMetrics uses accumulators to represent the metrics and offers "increment" methods to increment them.
NOTE: The local values of the accumulators for a scheduler:Task.md[task] (as accumulated while the scheduler:Task.md#run[task runs]) are sent from the executor to the driver when the task completes (and <
[[metrics]] .Metrics [cols="1,1,1,2",options="header",width="100%"] |=== | Property | Name | Type | Description
| [[_memoryBytesSpilled]] _memoryBytesSpilled | internal.metrics.memoryBytesSpilled | LongAccumulator | Used in <
| [[_updatedBlockStatuses]] _updatedBlockStatuses | internal.metrics.updatedBlockStatuses | CollectionAccumulator[(BlockId, BlockStatus)] | Used in <Block>>, <
|===
[[internal-registries]] .TaskMetrics's Internal Registries and Counters [cols="1,2",options="header",width="100%"] |=== | Name | Description
| [[nameToAccums]] nameToAccums | Internal accumulators indexed by their names.
Used when TaskMetrics <
NOTE: nameToAccums is a transient and lazy value.
| [[internalAccums]] internalAccums | Collection of internal AccumulatorV2 objects.
Used when...FIXME
NOTE: internalAccums is a transient and lazy value.
| [[externalAccums]] externalAccums | Collection of external AccumulatorV2 objects.
Used when TaskMetrics <
NOTE: externalAccums is a transient and lazy value. |===
== [[accumulators]] accumulators Method
CAUTION: FIXME
== [[mergeShuffleReadMetrics]] mergeShuffleReadMetrics Method
CAUTION: FIXME
== [[memoryBytesSpilled]] memoryBytesSpilled Method
CAUTION: FIXME
== [[updatedBlockStatuses]] updatedBlockStatuses Method
CAUTION: FIXME
== [[setExecutorCpuTime]] setExecutorCpuTime Method
CAUTION: FIXME
== [[setResultSerializationTime]] setResultSerializationTime Method
CAUTION: FIXME
== [[setJvmGCTime]] setJvmGCTime Method
CAUTION: FIXME
== [[setExecutorRunTime]] setExecutorRunTime Method
CAUTION: FIXME
== [[setExecutorDeserializeCpuTime]] setExecutorDeserializeCpuTime Method
CAUTION: FIXME
== [[setExecutorDeserializeTime]] setExecutorDeserializeTime Method
CAUTION: FIXME
== [[setUpdatedBlockStatuses]] setUpdatedBlockStatuses Method
CAUTION: FIXME
== [[fromAccumulators]] Re-Creating TaskMetrics From AccumulatorV2s -- fromAccumulators Method
[source, scala]¶
fromAccumulators(accums: Seq[AccumulatorV2[_, _]]): TaskMetrics¶
fromAccumulators creates a new TaskMetrics and registers accums as internal and external task metrics (using <
Internally, fromAccumulators creates a new TaskMetrics. It then splits accums into internal and external task metrics collections (using <
For every internal task metrics, fromAccumulators finds the metrics in <
In the end, fromAccumulators <
NOTE: fromAccumulators is used exclusively when scheduler:DAGSchedulerEventProcessLoop.md#handleTaskCompletion[DAGScheduler gets notified that a task has finished] (and re-creates TaskMetrics).
== [[incMemoryBytesSpilled]] Increasing Memory Bytes Spilled -- incMemoryBytesSpilled Method
[source, scala]¶
incMemoryBytesSpilled(v: Long): Unit¶
incMemoryBytesSpilled adds v to <<_memoryBytesSpilled, _memoryBytesSpilled>> task metrics.
[NOTE]¶
incMemoryBytesSpilled is used when:
-
rdd:Aggregator.md#updateMetrics[
Aggregatorupdates task metrics] -
CoGroupedRDDis requested to compute a partition -
shuffle:BlockStoreShuffleReader.md#read[
BlockStoreShuffleReaderreads combined key-value records for a reduce task] -
shuffle:ShuffleExternalSorter.md#spill[
ShuffleExternalSorterfrees execution memory by spilling to disk] -
shuffle:ExternalSorter.md#writePartitionedFile[
ExternalSorterwrites the records into a temporary partitioned file in the disk store] -
UnsafeExternalSorterspills current records due to memory pressure -
SpillableIteratorspills records to disk
8. spark-history-server:JsonProtocol.md#taskMetricsFromJson[JsonProtocol creates TaskMetrics from JSON]¶
== [[incUpdatedBlockStatuses]] Recording Updated BlockStatus For Block -- incUpdatedBlockStatuses Method
[source, scala]¶
incUpdatedBlockStatuses(v: (BlockId, BlockStatus)): Unit¶
incUpdatedBlockStatuses adds v in <<_updatedBlockStatuses, _updatedBlockStatuses>> internal registry.
NOTE: incUpdatedBlockStatuses is used exclusively when storage:BlockManager.md#addUpdatedBlockStatusToTaskMetrics[BlockManager does addUpdatedBlockStatusToTaskMetrics].
Registering Internal Accumulators¶
register(
sc: SparkContext): Unit
register registers the internal accumulators (from <countFailedValues enabled (true).
NOTE: register is used exclusively when scheduler:Stage.md#makeNewStageAttempt[Stage is requested for its new attempt].
== [[empty]] empty Factory Method
[source, scala]¶
empty: TaskMetrics¶
empty...FIXME
empty is used when:
-
TaskContextImplis created -
TaskMetrics utility is requested to <
> -
JsonProtocol utility is requested to spark-history-server:JsonProtocol.md#taskMetricsFromJson[taskMetricsFromJson]
== [[registered]] registered Factory Method
[source, scala]¶
registered: TaskMetrics¶
registered...FIXME
NOTE: registered is used exclusively when Task is scheduler:Task.md#serializedTaskMetrics[created].
fromAccumulatorInfos¶
fromAccumulatorInfos(
infos: Seq[AccumulableInfo]): TaskMetrics
fromAccumulatorInfos...FIXME
fromAccumulatorInfos is used when AppStatusListener is requested to onExecutorMetricsUpdate.
== [[fromAccumulators]] fromAccumulators Factory Method
[source, scala]¶
fromAccumulators(accums: Seq[AccumulatorV2[_, _]]): TaskMetrics¶
fromAccumulators...FIXME
NOTE: fromAccumulators is used exclusively when DAGScheduler is requested to scheduler:DAGScheduler.md#postTaskEnd[postTaskEnd].