diff --git a/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala b/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala index 02208d4..17c58c3 100644 --- a/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala +++ b/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala @@ -298,7 +298,7 @@ class KafkaSinkExtended(conf: SparkConf) extends KafkaSink(conf) { ) report(point1) - val point2 = Map[String, Any]( + val taskMetricsHeader = Map[String, Any]( "name" -> "task_metrics", "appId" -> appId, // task info @@ -316,6 +316,9 @@ class KafkaSinkExtended(conf: SparkConf) extends KafkaSink(conf) { "successful" -> taskInfo.successful, "host" -> taskInfo.host, "taskLocality" -> Utils.encodeTaskLocality(taskInfo.taskLocality), + "epochMillis" -> epochMillis + ) + val taskMetricsBody = if (taskmetrics != null) Map[String, Any]( // task metrics "executorRunTime" -> taskmetrics.executorRunTime, "executorCpuTime" -> taskmetrics.executorCpuTime, @@ -342,8 +345,8 @@ class KafkaSinkExtended(conf: SparkConf) extends KafkaSink(conf) { "shuffleBytesWritten" -> taskmetrics.shuffleWriteMetrics.bytesWritten, "shuffleRecordsWritten" -> taskmetrics.shuffleWriteMetrics.recordsWritten, "shuffleWriteTime" -> taskmetrics.shuffleWriteMetrics.writeTime, - "epochMillis" -> epochMillis - ) + ) else Map() + val point2 = taskMetricsHeader ++ taskMetricsBody report(point2) } }