File tree Expand file tree Collapse file tree
src/main/scala/ch/cern/sparkmeasure Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -298,7 +298,7 @@ class KafkaSinkExtended(conf: SparkConf) extends KafkaSink(conf) {
298298 )
299299 report(point1)
300300
301- val point2 = Map [String , Any ](
301+ val taskMetricsHeader = Map [String , Any ](
302302 " name" -> " task_metrics" ,
303303 " appId" -> appId,
304304 // task info
@@ -316,6 +316,9 @@ class KafkaSinkExtended(conf: SparkConf) extends KafkaSink(conf) {
316316 " successful" -> taskInfo.successful,
317317 " host" -> taskInfo.host,
318318 " taskLocality" -> Utils .encodeTaskLocality(taskInfo.taskLocality),
319+ " epochMillis" -> epochMillis
320+ )
321+ val taskMetricsBody = if (taskmetrics != null ) Map [String , Any ](
319322 // task metrics
320323 " executorRunTime" -> taskmetrics.executorRunTime,
321324 " executorCpuTime" -> taskmetrics.executorCpuTime,
@@ -342,8 +345,8 @@ class KafkaSinkExtended(conf: SparkConf) extends KafkaSink(conf) {
342345 " shuffleBytesWritten" -> taskmetrics.shuffleWriteMetrics.bytesWritten,
343346 " shuffleRecordsWritten" -> taskmetrics.shuffleWriteMetrics.recordsWritten,
344347 " shuffleWriteTime" -> taskmetrics.shuffleWriteMetrics.writeTime,
345- " epochMillis " -> epochMillis
346- )
348+ ) else Map ()
349+ val point2 = taskMetricsHeader ++ taskMetricsBody
347350 report(point2)
348351 }
349352}
You can’t perform that action at this time.
0 commit comments