ShuffleWriteMetrics¶
ShuffleWriteMetrics is a <
ShuffleWriteMetrics tracks the following task metrics:
- <
> - <
> - <
>
Note
Accumulators allow tasks (running on executors) to communicate with the driver.
[[accumulators]] .ShuffleWriteMetrics's Accumulators [cols="1,2",options="header",width="100%"] |=== | Name | Description
| [[_bytesWritten]] _bytesWritten | Accumulator to track how many shuffle bytes were written in a shuffle task.
Used when ShuffleWriteMetrics is requested the <
NOTE: _bytesWritten is available as internal.metrics.shuffle.write.bytesWritten (internally shuffleWrite.BYTES_WRITTEN) in executor:TaskMetrics.md[TaskMetrics].
| [[_writeTime]] _writeTime | Accumulator to track shuffle write time (as 64-bit integer) of a shuffle task.
Used when ShuffleWriteMetrics is requested the <
NOTE: _writeTime is available as internal.metrics.shuffle.write.writeTime (internally shuffleWrite.WRITE_TIME) in executor:TaskMetrics.md[TaskMetrics].
| [[_recordsWritten]] _recordsWritten | Accumulator to track how many shuffle records were written in a shuffle task.
Used when ShuffleWriteMetrics is requested the <
NOTE: _recordsWritten is available as internal.metrics.shuffle.write.recordsWritten (internally shuffleWrite.RECORDS_WRITTEN) in executor:TaskMetrics.md[TaskMetrics].
|===
== [[decRecordsWritten]] decRecordsWritten Method
CAUTION: FIXME
== [[decBytesWritten]] decBytesWritten Method
CAUTION: FIXME
== [[writeTime]] writeTime Method
CAUTION: FIXME
== [[recordsWritten]] recordsWritten Method
CAUTION: FIXME
== [[bytesWritten]] Returning Number of Shuffle Bytes Written -- bytesWritten Method
[source, scala]¶
bytesWritten: Long¶
bytesWritten represents the shuffle bytes written metrics of a shuffle task.
Internally, bytesWritten returns the sum of <<_bytesWritten, _bytesWritten>> internal accumulator.
[NOTE]¶
bytesWritten is used when:
-
ShuffleWriteMetricsUIDatais created -
In <
> -
spark-SparkListener-StatsReportListener.md#onStageCompleted[
StatsReportListenerintercepts stage completed events] to show shuffle bytes written -
shuffle:ShuffleExternalSorter.md#writeSortedFile[
ShuffleExternalSorterdoeswriteSortedFile] (toincDiskBytesSpilled) -
spark-history-server:JsonProtocol.md#taskMetricsToJson[
JsonProtocolconverts ShuffleWriteMetrics to JSON] -
spark-webui-executors-ExecutorsListener.md#onTaskEnd[
ExecutorsListenerintercepts task end events] to update executor metrics
7. spark-webui-JobProgressListener.md#updateAggregateMetrics[JobProgressListener updates stage and executor metrics]¶
== [[incBytesWritten]] Incrementing Shuffle Bytes Written Metrics -- incBytesWritten Method
[source, scala]¶
incBytesWritten(v: Long): Unit¶
incBytesWritten simply adds v to <<_bytesWritten, _bytesWritten>> internal accumulator.
[NOTE]¶
incBytesWritten is used when:
-
shuffle:UnsafeShuffleWriter.md#mergeSpills[
UnsafeShuffleWriterdoesmergeSpills] -
storage:DiskBlockObjectWriter.md#updateBytesWritten[
DiskBlockObjectWriterdoesupdateBytesWritten] -
spark-history-server:JsonProtocol.md#taskMetricsFromJson[
JsonProtocolcreatesTaskMetricsfrom JSON]
====
== [[incWriteTime]] Incrementing Shuffle Write Time Metrics -- incWriteTime Method
[source, scala]¶
incWriteTime(v: Long): Unit¶
incWriteTime simply adds v to <<_writeTime, _writeTime>> internal accumulator.
[NOTE]¶
incWriteTime is used when:
-
shuffle:SortShuffleWriter.md#stop[
SortShuffleWriterstops]. -
BypassMergeSortShuffleWritershuffle:BypassMergeSortShuffleWriter.md#write[writes records] (i.e. when it initializesDiskBlockObjectWriterpartition writers) and later when shuffle:BypassMergeSortShuffleWriter.md#writePartitionedFile[concatenates per-partition files into a single file]. -
shuffle:UnsafeShuffleWriter.md#mergeSpillsWithTransferTo[
UnsafeShuffleWriterdoesmergeSpillsWithTransferTo]. -
storage:DiskBlockObjectWriter.md#commitAndGet[
DiskBlockObjectWriterdoescommitAndGet] (but only whensyncWritesflag is enabled that forces outstanding writes to disk). -
spark-history-server:JsonProtocol.md#taskMetricsFromJson[
JsonProtocolcreatesTaskMetricsfrom JSON]
6. TimeTrackingOutputStream does its operation (after all it is an output stream to track shuffle write time).¶
== [[incRecordsWritten]] Incrementing Shuffle Records Written Metrics -- incRecordsWritten Method
[source, scala]¶
incRecordsWritten(v: Long): Unit¶
incRecordsWritten simply adds v to <<_recordsWritten, _recordsWritten>> internal accumulator.
[NOTE]¶
incRecordsWritten is used when:
-
shuffle:ShuffleExternalSorter.md#writeSortedFile[
ShuffleExternalSorterdoeswriteSortedFile] -
storage:DiskBlockObjectWriter.md#recordWritten[
DiskBlockObjectWriterdoesrecordWritten] -
spark-history-server:JsonProtocol.md#taskMetricsFromJson[
JsonProtocolcreatesTaskMetricsfrom JSON]
====