From 9b9ed710cda04a79cda20803ef5a63b04d5f3029 Mon Sep 17 00:00:00 2001 From: pp-achauhan Date: Thu, 26 Feb 2026 00:45:21 +0530 Subject: [PATCH 1/9] Add AppEventKafkaSink and AppEventKafkaSinkExtended for application-level metrics and custom metadata support --- .../cern/sparkmeasure/AppEventKafkaSink.scala | 294 ++++++++++++++++++ 1 file changed, 294 insertions(+) create mode 100644 src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala diff --git a/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala b/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala new file mode 100644 index 0000000..fd3b065 --- /dev/null +++ b/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala @@ -0,0 +1,294 @@ +package ch.cern.sparkmeasure + +import org.apache.spark.scheduler._ +import org.apache.spark.{SparkConf, TaskFailedReason, TaskKilled} + +import scala.collection.mutable +import scala.util.Try + +/** + * AppEventKafkaSink: An extended version of ch.cern.sparkmeasure.KafkaSink that adds + * application-level metrics and custom fields support. + * + * This listener combines: + * 1. All stage/executor/query metrics from the base KafkaSink + * 2. Application-level metrics (executor counts, stage counts, task counts) + * 3. Custom fields passed via spark configurations + * 4. Enhanced app_start and app_end events with metadata + * + * Configuration: + * --conf spark.extraListeners=com.example.listener.AppEventKafkaSink + * --conf spark.sparkmeasure.kafkaBroker=kafka.example.com:9092 + * --conf spark.sparkmeasure.kafkaTopic=spark-metrics + * --conf spark.sparkmeasure.kafka.* = Additional Kafka properties + * --conf spark.sparkmeasure.customFields.* = Custom metadata to include in app events + * + * More configuration parameters and how-to use: see KafkaSink + * + * Example custom configs: + * --conf spark.sparkmeasure.customFields.project=my-project + * --conf spark.sparkmeasure.customFields.environment=production + * --conf spark.sparkmeasure.customFields.team=engineering + * + */ +class AppEventKafkaSink(conf: SparkConf) extends KafkaSink(conf) { + + // Application tracking + private var appName: String = "unknown" + private var start_time: Long = 0L + + // Executor tracking + private val executorIds: mutable.HashSet[String] = mutable.HashSet.empty[String] + private var totalExecutorCount: Int = 0 + private var executorsFailed: Int = 0 + private var executorsKilled: Int = 0 + + // Job tracking + private var totalJobsCompleted: Int = 0 + private var succeededJobsCount: Int = 0 + private var failedJobsCount: Int = 0 + + // Stage tracking + private var totalStagesCompleted: Int = 0 + private var succeededStagesCount: Int = 0 + private var failedStagesCount: Int = 0 + + // Task tracking + private var totalTaskCount: Int = 0 + private var numTaskFailed: Int = 0 + private var numTaskKilled: Int = 0 + + override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { + super.onApplicationStart(applicationStart) + + appId = applicationStart.appId.getOrElse("noAppId") + appName = applicationStart.appName + start_time = applicationStart.time + val customFields = extractCustomFields(conf) + + val epochMillis = System.currentTimeMillis() + + val appStartMetrics = Map[String, Any]( + "name" -> "application_started", + "appId" -> appId, + "appName" -> appName, + "start_time" -> start_time, + "epochMillis" -> epochMillis + ) ++ customFields + + report(appStartMetrics) + } + + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + super.onStageCompleted(stageCompleted) + + if (stageCompleted != null && stageCompleted.stageInfo != null) { + val stageInfo = stageCompleted.stageInfo + totalStagesCompleted += 1 + + if (stageInfo.failureReason.isDefined) { + failedStagesCount += 1 + } else { + succeededStagesCount += 1 + } + } + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + if (taskEnd != null) { + totalTaskCount += 1 + + if (taskEnd.reason != null) { + taskEnd.reason match { + case _: TaskKilled => + numTaskKilled += 1 + case _: TaskFailedReason => + numTaskFailed += 1 + case _ => + } + } + } + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + super.onJobEnd(jobEnd) + + if (jobEnd != null) { + totalJobsCompleted += 1 + + jobEnd.jobResult match { + case org.apache.spark.scheduler.JobSucceeded => + succeededJobsCount += 1 + case _ => + failedJobsCount += 1 + } + } + } + + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { + super.onExecutorAdded(executorAdded) + + if (executorAdded != null && executorAdded.executorId != null) { + val execId = executorAdded.executorId + if (!executorIds.contains(execId) && execId != "driver") { + executorIds += execId + totalExecutorCount += 1 + } + } + } + + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + super.onExecutorRemoved(executorRemoved) + + if (executorRemoved != null && executorRemoved.reason != null) { + executorRemoved.reason match { + case reason if reason.toLowerCase.contains("kill") => + executorsKilled += 1 + case _ => + executorsFailed += 1 + } + } + } + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + val completionTime = applicationEnd.time + + val safeEndTime = if (completionTime > 0) completionTime else System.currentTimeMillis() + val duration = if (start_time > 0) safeEndTime - start_time else 0L + val successful = succeededJobsCount > 0 && failedJobsCount == 0 + val epochMillis = System.currentTimeMillis() + val configurations = conf.getAll.toMap + val customFields = extractCustomFields(conf) + + val appEndMetrics = Map[String, Any]( + "name" -> "applications_ended", + "appId" -> appId, + "appName" -> appName, + "start_time" -> start_time, + "completionTime" -> completionTime, + "duration" -> duration, + "successful" -> successful, + "totalExecutorCount" -> totalExecutorCount, + "executorsFailed" -> executorsFailed, + "executorsKilled" -> executorsKilled, + "totalJobsCompleted" -> totalJobsCompleted, + "succeededJobsCount" -> succeededJobsCount, + "failedJobsCount" -> failedJobsCount, + "numStagesCompleted" -> totalStagesCompleted, + "numSucceededStages" -> succeededStagesCount, + "numFailedStages" -> failedStagesCount, + "totalTaskCount" -> totalTaskCount, + "numTaskFailed" -> numTaskFailed, + "numTaskKilled" -> numTaskKilled, + "epochMillis" -> epochMillis, + "configurations" -> configurations + ) ++ customFields + + report(appEndMetrics) + super.onApplicationEnd(applicationEnd) + } + + private def extractCustomFields(conf: SparkConf): Map[String, String] = { + Try { + conf.getAll + .filter { case (key, _) => key.startsWith("spark.sparkmeasure.customFields.") } + .map { case (key, value) => (key.stripPrefix("spark.sparkmeasure.customFields."), value) } + .toMap + }.getOrElse(Map.empty[String, String]) + } +} + +/** + * AppEventKafkaSinkExtended extends the AppEventKafkaSink functionality with a verbose dump of tasks metrics + * Note: this can generate a large amount of data O(Number_of_tasks) + * Configuration parameters and how-to use: see AppEventKafkaSinkExtended + */ +class AppEventKafkaSinkExtended(conf: SparkConf) extends AppEventKafkaSink(conf) { + + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + val taskInfo = taskStart.taskInfo + val epochMillis = System.currentTimeMillis() + + val taskStartMetrics = Map[String, Any]( + "name" -> "tasks_started", + "appId" -> appId, + "taskId" -> taskInfo.taskId, + "attemptNumber" -> taskInfo.attemptNumber, + "stageId" -> taskStart.stageId, + "launchTime" -> taskInfo.launchTime, + "epochMillis" -> epochMillis + ) + report(taskStartMetrics) + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + super.onTaskEnd(taskEnd) + + val taskInfo = taskEnd.taskInfo + val taskmetrics = taskEnd.taskMetrics + val epochMillis = System.currentTimeMillis() + + val point1 = Map[String, Any]( + "name" -> "tasks_ended", + "appId" -> appId, + "taskId" -> taskInfo.taskId, + "attemptNumber" -> taskInfo.attemptNumber, + "stageId" -> taskEnd.stageId, + "launchTime" -> taskInfo.launchTime, + "finishTime" -> taskInfo.finishTime, + "epochMillis" -> epochMillis + ) + report(point1) + + val taskMetricsHeader = Map[String, Any]( + "name" -> "task_metrics", + "appId" -> appId, + // task info + "taskId" -> taskInfo.taskId, + "attemptNumber" -> taskInfo.attemptNumber, + "stageId" -> taskEnd.stageId, + "launchTime" -> taskInfo.launchTime, + "finishTime" -> taskInfo.finishTime, + "failed" -> taskInfo.failed, + "speculative" -> taskInfo.speculative, + "killed" -> taskInfo.killed, + "finished" -> taskInfo.finished, + "executorId" -> taskInfo.executorId, + "duration" -> taskInfo.duration, + "successful" -> taskInfo.successful, + "host" -> taskInfo.host, + "taskLocality" -> Utils.encodeTaskLocality(taskInfo.taskLocality), + "epochMillis" -> epochMillis + ) + val taskMetricsBody = if (taskmetrics != null) Map[String, Any]( + // task metrics + "executorRunTime" -> taskmetrics.executorRunTime, + "executorCpuTime" -> taskmetrics.executorCpuTime, + "executorDeserializeCpuTime" -> taskmetrics.executorDeserializeCpuTime, + "executorDeserializeTime" -> taskmetrics.executorDeserializeTime, + "jvmGCTime" -> taskmetrics.jvmGCTime, + "memoryBytesSpilled" -> taskmetrics.memoryBytesSpilled, + "peakExecutionMemory" -> taskmetrics.peakExecutionMemory, + "resultSerializationTime" -> taskmetrics.resultSerializationTime, + "resultSize" -> taskmetrics.resultSize, + "bytesRead" -> taskmetrics.inputMetrics.bytesRead, + "recordsRead" -> taskmetrics.inputMetrics.recordsRead, + "bytesWritten" -> taskmetrics.outputMetrics.bytesWritten, + "recordsWritten" -> taskmetrics.outputMetrics.recordsWritten, + "shuffleTotalBytesRead" -> taskmetrics.shuffleReadMetrics.totalBytesRead, + "shuffleRemoteBytesRead" -> taskmetrics.shuffleReadMetrics.remoteBytesRead, + "shuffleLocalBytesRead" -> taskmetrics.shuffleReadMetrics.localBytesRead, + "shuffleTotalBlocksFetched" -> taskmetrics.shuffleReadMetrics.totalBlocksFetched, + "shuffleLocalBlocksFetched" -> taskmetrics.shuffleReadMetrics.localBlocksFetched, + "shuffleRemoteBlocksFetched" -> taskmetrics.shuffleReadMetrics.remoteBlocksFetched, + "shuffleRecordsRead" -> taskmetrics.shuffleReadMetrics.recordsRead, + // this requires spark2.3 and above "remoteBytesReadToDisk" -> taskmetrics.shuffleReadMetrics.remoteBytesReadToDisk, + "shuffleFetchWaitTime" -> taskmetrics.shuffleReadMetrics.fetchWaitTime, + "shuffleBytesWritten" -> taskmetrics.shuffleWriteMetrics.bytesWritten, + "shuffleRecordsWritten" -> taskmetrics.shuffleWriteMetrics.recordsWritten, + "shuffleWriteTime" -> taskmetrics.shuffleWriteMetrics.writeTime, + ) else Map() + val point2 = taskMetricsHeader ++ taskMetricsBody + report(point2) + } +} \ No newline at end of file From f291460387cc9c82da172ca83ab6d3a03231010f Mon Sep 17 00:00:00 2001 From: pp-achauhan Date: Thu, 26 Feb 2026 00:51:14 +0530 Subject: [PATCH 2/9] Add AppEventKafkaSink and AppEventKafkaSinkExtended for application-level metrics and custom metadata support --- src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala b/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala index fd3b065..d0e9789 100644 --- a/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala +++ b/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala @@ -14,7 +14,7 @@ import scala.util.Try * 1. All stage/executor/query metrics from the base KafkaSink * 2. Application-level metrics (executor counts, stage counts, task counts) * 3. Custom fields passed via spark configurations - * 4. Enhanced app_start and app_end events with metadata + * 4. Enhanced application_started and applications_ended events with metadata * * Configuration: * --conf spark.extraListeners=com.example.listener.AppEventKafkaSink From bd5497adca270cd9c789796dfc097a99bb8e5bdb Mon Sep 17 00:00:00 2001 From: pp-achauhan Date: Thu, 26 Feb 2026 01:15:30 +0530 Subject: [PATCH 3/9] Standardised variable name --- .../scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala b/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala index d0e9789..566353b 100644 --- a/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala +++ b/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala @@ -35,7 +35,7 @@ class AppEventKafkaSink(conf: SparkConf) extends KafkaSink(conf) { // Application tracking private var appName: String = "unknown" - private var start_time: Long = 0L + private var startTime: Long = 0L // Executor tracking private val executorIds: mutable.HashSet[String] = mutable.HashSet.empty[String] @@ -63,7 +63,7 @@ class AppEventKafkaSink(conf: SparkConf) extends KafkaSink(conf) { appId = applicationStart.appId.getOrElse("noAppId") appName = applicationStart.appName - start_time = applicationStart.time + startTime = applicationStart.time val customFields = extractCustomFields(conf) val epochMillis = System.currentTimeMillis() @@ -72,7 +72,7 @@ class AppEventKafkaSink(conf: SparkConf) extends KafkaSink(conf) { "name" -> "application_started", "appId" -> appId, "appName" -> appName, - "start_time" -> start_time, + "startTime" -> startTime, "epochMillis" -> epochMillis ) ++ customFields @@ -154,7 +154,7 @@ class AppEventKafkaSink(conf: SparkConf) extends KafkaSink(conf) { val completionTime = applicationEnd.time val safeEndTime = if (completionTime > 0) completionTime else System.currentTimeMillis() - val duration = if (start_time > 0) safeEndTime - start_time else 0L + val duration = if (startTime > 0) safeEndTime - startTime else 0L val successful = succeededJobsCount > 0 && failedJobsCount == 0 val epochMillis = System.currentTimeMillis() val configurations = conf.getAll.toMap @@ -164,7 +164,7 @@ class AppEventKafkaSink(conf: SparkConf) extends KafkaSink(conf) { "name" -> "applications_ended", "appId" -> appId, "appName" -> appName, - "start_time" -> start_time, + "startTime" -> startTime, "completionTime" -> completionTime, "duration" -> duration, "successful" -> successful, From d9b2532d860aa48180ef6578a19f8d3a3553aa42 Mon Sep 17 00:00:00 2001 From: pp-achauhan Date: Thu, 26 Feb 2026 20:22:32 +0530 Subject: [PATCH 4/9] Renamed customFields to appLabels to correctly identify in event --- .../cern/sparkmeasure/AppEventKafkaSink.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala b/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala index 566353b..6989870 100644 --- a/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala +++ b/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala @@ -8,12 +8,12 @@ import scala.util.Try /** * AppEventKafkaSink: An extended version of ch.cern.sparkmeasure.KafkaSink that adds - * application-level metrics and custom fields support. + * application-level metrics and custom labels support. * * This listener combines: * 1. All stage/executor/query metrics from the base KafkaSink * 2. Application-level metrics (executor counts, stage counts, task counts) - * 3. Custom fields passed via spark configurations + * 3. Custom labels passed via spark configurations * 4. Enhanced application_started and applications_ended events with metadata * * Configuration: @@ -21,14 +21,14 @@ import scala.util.Try * --conf spark.sparkmeasure.kafkaBroker=kafka.example.com:9092 * --conf spark.sparkmeasure.kafkaTopic=spark-metrics * --conf spark.sparkmeasure.kafka.* = Additional Kafka properties - * --conf spark.sparkmeasure.customFields.* = Custom metadata to include in app events + * --conf spark.sparkmeasure.appLabels.* = Custom metadata to include in app events * * More configuration parameters and how-to use: see KafkaSink * * Example custom configs: - * --conf spark.sparkmeasure.customFields.project=my-project - * --conf spark.sparkmeasure.customFields.environment=production - * --conf spark.sparkmeasure.customFields.team=engineering + * --conf spark.sparkmeasure.appLabels.project=my-project + * --conf spark.sparkmeasure.appLabels.environment=production + * --conf spark.sparkmeasure.appLabels.team=engineering * */ class AppEventKafkaSink(conf: SparkConf) extends KafkaSink(conf) { @@ -64,7 +64,7 @@ class AppEventKafkaSink(conf: SparkConf) extends KafkaSink(conf) { appId = applicationStart.appId.getOrElse("noAppId") appName = applicationStart.appName startTime = applicationStart.time - val customFields = extractCustomFields(conf) + val appLabels = extractAppLabels(conf) val epochMillis = System.currentTimeMillis() @@ -74,7 +74,7 @@ class AppEventKafkaSink(conf: SparkConf) extends KafkaSink(conf) { "appName" -> appName, "startTime" -> startTime, "epochMillis" -> epochMillis - ) ++ customFields + ) ++ appLabels report(appStartMetrics) } @@ -158,7 +158,7 @@ class AppEventKafkaSink(conf: SparkConf) extends KafkaSink(conf) { val successful = succeededJobsCount > 0 && failedJobsCount == 0 val epochMillis = System.currentTimeMillis() val configurations = conf.getAll.toMap - val customFields = extractCustomFields(conf) + val appLabels = extractAppLabels(conf) val appEndMetrics = Map[String, Any]( "name" -> "applications_ended", @@ -182,17 +182,17 @@ class AppEventKafkaSink(conf: SparkConf) extends KafkaSink(conf) { "numTaskKilled" -> numTaskKilled, "epochMillis" -> epochMillis, "configurations" -> configurations - ) ++ customFields + ) ++ appLabels report(appEndMetrics) super.onApplicationEnd(applicationEnd) } - private def extractCustomFields(conf: SparkConf): Map[String, String] = { + private def extractAppLabels(conf: SparkConf): Map[String, String] = { Try { conf.getAll - .filter { case (key, _) => key.startsWith("spark.sparkmeasure.customFields.") } - .map { case (key, value) => (key.stripPrefix("spark.sparkmeasure.customFields."), value) } + .filter { case (key, _) => key.startsWith("spark.sparkmeasure.appLabels.") } + .map { case (key, value) => (key.stripPrefix("spark.sparkmeasure."), value) } .toMap }.getOrElse(Map.empty[String, String]) } From 3220b4b09aba17e7221ad64bc1fbf63ef4d09aae Mon Sep 17 00:00:00 2001 From: pp-achauhan Date: Thu, 26 Feb 2026 21:28:25 +0530 Subject: [PATCH 5/9] Added applications start and end event in Kafka Sink --- .../ch/cern/sparkmeasure/KafkaSink.scala | 146 +++++++++++++++++- 1 file changed, 144 insertions(+), 2 deletions(-) diff --git a/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala b/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala index 17c58c3..cd007e8 100644 --- a/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala +++ b/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala @@ -2,7 +2,7 @@ package ch.cern.sparkmeasure import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerRecord} import org.apache.kafka.common.serialization.ByteArraySerializer -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, TaskFailedReason, TaskKilled} import org.apache.spark.scheduler._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} @@ -10,6 +10,7 @@ import org.slf4j.{Logger, LoggerFactory} import java.nio.charset.StandardCharsets import java.util.Properties +import scala.collection.mutable import scala.util.Try /** @@ -29,6 +30,8 @@ import scala.util.Try * example: --conf spark.sparkmeasure.kafkaTopic=sparkmeasure-stageinfo * spark.sparkmeasure.kafka.* = Other kafka properties * example: --conf spark.sparkmeasure.kafka.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks + * spark.sparkmeasure.appLabels.* = Custom labels to include in application start and end events + * example: --conf spark.sparkmeasure.appLabels.environment=production * * This code depends on "kafka clients", you may need to add the dependency: * --packages org.apache.kafka:kafka-clients:3.2.1 @@ -49,6 +52,31 @@ class KafkaSink(conf: SparkConf) extends SparkListener { case _ => "noAppId" } + // Application tracking + private var appName: String = "noAppName" + private var startTime: Long = 0L + + // Executor tracking + private val executorIds: mutable.HashSet[String] = mutable.HashSet.empty[String] + private var totalExecutorCount: Int = 0 + private var executorsFailed: Int = 0 + private var executorsKilled: Int = 0 + + // Job tracking + private var totalJobsCompleted: Int = 0 + private var succeededJobsCount: Int = 0 + private var failedJobsCount: Int = 0 + + // Stage tracking + private var totalStagesCompleted: Int = 0 + private var succeededStagesCount: Int = 0 + private var failedStagesCount: Int = 0 + + // Task tracking + private var totalTaskCount: Int = 0 + private var numTaskFailed: Int = 0 + private var numTaskKilled: Int = 0 + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { val executorInfo = executorAdded.executorInfo val epochMillis = System.currentTimeMillis() @@ -62,6 +90,25 @@ class KafkaSink(conf: SparkConf) extends SparkListener { "epochMillis" -> epochMillis ) report(metrics) + + if (executorAdded != null && executorAdded.executorId != null) { + val execId = executorAdded.executorId + if (!executorIds.contains(execId) && execId != "driver") { + executorIds += execId + totalExecutorCount += 1 + } + } + } + + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + if (executorRemoved != null && executorRemoved.reason != null) { + executorRemoved.reason match { + case reason if reason.toLowerCase.contains("kill") => + executorsKilled += 1 + case _ => + executorsFailed += 1 + } + } } override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { @@ -81,7 +128,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener { report(metrics) } - override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { val stageId = stageCompleted.stageInfo.stageId.toString val submissionTime = stageCompleted.stageInfo.submissionTime.getOrElse(0L) @@ -140,6 +186,17 @@ class KafkaSink(conf: SparkConf) extends SparkListener { ) report(stageTaskMetrics) + + if (stageCompleted != null && stageCompleted.stageInfo != null) { + val stageInfo = stageCompleted.stageInfo + totalStagesCompleted += 1 + + if (stageInfo.failureReason.isDefined) { + failedStagesCount += 1 + } else { + succeededStagesCount += 1 + } + } } override def onOtherEvent(event: SparkListenerEvent): Unit = { @@ -203,13 +260,72 @@ class KafkaSink(conf: SparkConf) extends SparkListener { "epochMillis" -> epochMillis ) report(jobEndMetrics) + + if (jobEnd != null) { + totalJobsCompleted += 1 + + jobEnd.jobResult match { + case org.apache.spark.scheduler.JobSucceeded => + succeededJobsCount += 1 + case _ => + failedJobsCount += 1 + } + } } override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { appId = applicationStart.appId.getOrElse("noAppId") + appName = applicationStart.appName + startTime = applicationStart.time + val appLabels = extractAppLabels(conf) + val epochMillis = System.currentTimeMillis() + + val appStartMetrics = Map[String, Any]( + "name" -> "applications_started", + "appId" -> appId, + "appName" -> appName, + "startTime" -> startTime, + "epochMillis" -> epochMillis + ) ++ appLabels + + report(appStartMetrics) } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + val completionTime = applicationEnd.time + val safeEndTime = if (completionTime > 0) completionTime else System.currentTimeMillis() + val duration = if (startTime > 0) safeEndTime - startTime else 0L + val successful = succeededJobsCount > 0 && failedJobsCount == 0 + val epochMillis = System.currentTimeMillis() + val configurations = conf.getAll.toMap + val appLabels = extractAppLabels(conf) + + val appEndMetrics = Map[String, Any]( + "name" -> "applications_ended", + "appId" -> appId, + "appName" -> appName, + "startTime" -> startTime, + "completionTime" -> completionTime, + "duration" -> duration, + "successful" -> successful, + "totalExecutorCount" -> totalExecutorCount, + "executorsFailed" -> executorsFailed, + "executorsKilled" -> executorsKilled, + "totalJobsCompleted" -> totalJobsCompleted, + "succeededJobsCount" -> succeededJobsCount, + "failedJobsCount" -> failedJobsCount, + "numStagesCompleted" -> totalStagesCompleted, + "numSucceededStages" -> succeededStagesCount, + "numFailedStages" -> failedStagesCount, + "totalTaskCount" -> totalTaskCount, + "numTaskFailed" -> numTaskFailed, + "numTaskKilled" -> numTaskKilled, + "epochMillis" -> epochMillis, + "configurations" -> configurations + ) ++ appLabels + + report(appEndMetrics) + logger.info(s"Spark application ended, timestamp = ${applicationEnd.time}, closing Kafka connection.") synchronized( if (Option(producer).isDefined) { @@ -223,6 +339,22 @@ class KafkaSink(conf: SparkConf) extends SparkListener { ) } + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + if (taskEnd != null) { + totalTaskCount += 1 + + if (taskEnd.reason != null) { + taskEnd.reason match { + case _: TaskKilled => + numTaskKilled += 1 + case _: TaskFailedReason => + numTaskFailed += 1 + case _ => + } + } + } + } + protected def report[T <: Any](metrics: Map[String, T]): Unit = { val result: Unit = Try { ensureProducer() @@ -256,6 +388,14 @@ class KafkaSink(conf: SparkConf) extends SparkListener { ) } + private def extractAppLabels(conf: SparkConf): Map[String, String] = { + Try { + conf.getAll + .filter { case (key, _) => key.startsWith("spark.sparkmeasure.appLabels.") } + .map { case (key, value) => (key.stripPrefix("spark.sparkmeasure."), value) } + .toMap + }.getOrElse(Map.empty[String, String]) + } } /** @@ -282,6 +422,8 @@ class KafkaSinkExtended(conf: SparkConf) extends KafkaSink(conf) { } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + super.onTaskEnd(taskEnd) + val taskInfo = taskEnd.taskInfo val taskmetrics = taskEnd.taskMetrics val epochMillis = System.currentTimeMillis() From 46c83d015e515e384e06efb8de44f67004f6d2a0 Mon Sep 17 00:00:00 2001 From: pp-achauhan Date: Thu, 26 Feb 2026 21:28:31 +0530 Subject: [PATCH 6/9] Added applications start and end event in Kafka Sink --- .../cern/sparkmeasure/AppEventKafkaSink.scala | 294 ------------------ 1 file changed, 294 deletions(-) delete mode 100644 src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala diff --git a/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala b/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala deleted file mode 100644 index 6989870..0000000 --- a/src/main/scala/ch/cern/sparkmeasure/AppEventKafkaSink.scala +++ /dev/null @@ -1,294 +0,0 @@ -package ch.cern.sparkmeasure - -import org.apache.spark.scheduler._ -import org.apache.spark.{SparkConf, TaskFailedReason, TaskKilled} - -import scala.collection.mutable -import scala.util.Try - -/** - * AppEventKafkaSink: An extended version of ch.cern.sparkmeasure.KafkaSink that adds - * application-level metrics and custom labels support. - * - * This listener combines: - * 1. All stage/executor/query metrics from the base KafkaSink - * 2. Application-level metrics (executor counts, stage counts, task counts) - * 3. Custom labels passed via spark configurations - * 4. Enhanced application_started and applications_ended events with metadata - * - * Configuration: - * --conf spark.extraListeners=com.example.listener.AppEventKafkaSink - * --conf spark.sparkmeasure.kafkaBroker=kafka.example.com:9092 - * --conf spark.sparkmeasure.kafkaTopic=spark-metrics - * --conf spark.sparkmeasure.kafka.* = Additional Kafka properties - * --conf spark.sparkmeasure.appLabels.* = Custom metadata to include in app events - * - * More configuration parameters and how-to use: see KafkaSink - * - * Example custom configs: - * --conf spark.sparkmeasure.appLabels.project=my-project - * --conf spark.sparkmeasure.appLabels.environment=production - * --conf spark.sparkmeasure.appLabels.team=engineering - * - */ -class AppEventKafkaSink(conf: SparkConf) extends KafkaSink(conf) { - - // Application tracking - private var appName: String = "unknown" - private var startTime: Long = 0L - - // Executor tracking - private val executorIds: mutable.HashSet[String] = mutable.HashSet.empty[String] - private var totalExecutorCount: Int = 0 - private var executorsFailed: Int = 0 - private var executorsKilled: Int = 0 - - // Job tracking - private var totalJobsCompleted: Int = 0 - private var succeededJobsCount: Int = 0 - private var failedJobsCount: Int = 0 - - // Stage tracking - private var totalStagesCompleted: Int = 0 - private var succeededStagesCount: Int = 0 - private var failedStagesCount: Int = 0 - - // Task tracking - private var totalTaskCount: Int = 0 - private var numTaskFailed: Int = 0 - private var numTaskKilled: Int = 0 - - override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { - super.onApplicationStart(applicationStart) - - appId = applicationStart.appId.getOrElse("noAppId") - appName = applicationStart.appName - startTime = applicationStart.time - val appLabels = extractAppLabels(conf) - - val epochMillis = System.currentTimeMillis() - - val appStartMetrics = Map[String, Any]( - "name" -> "application_started", - "appId" -> appId, - "appName" -> appName, - "startTime" -> startTime, - "epochMillis" -> epochMillis - ) ++ appLabels - - report(appStartMetrics) - } - - override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { - super.onStageCompleted(stageCompleted) - - if (stageCompleted != null && stageCompleted.stageInfo != null) { - val stageInfo = stageCompleted.stageInfo - totalStagesCompleted += 1 - - if (stageInfo.failureReason.isDefined) { - failedStagesCount += 1 - } else { - succeededStagesCount += 1 - } - } - } - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - if (taskEnd != null) { - totalTaskCount += 1 - - if (taskEnd.reason != null) { - taskEnd.reason match { - case _: TaskKilled => - numTaskKilled += 1 - case _: TaskFailedReason => - numTaskFailed += 1 - case _ => - } - } - } - } - - override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { - super.onJobEnd(jobEnd) - - if (jobEnd != null) { - totalJobsCompleted += 1 - - jobEnd.jobResult match { - case org.apache.spark.scheduler.JobSucceeded => - succeededJobsCount += 1 - case _ => - failedJobsCount += 1 - } - } - } - - override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { - super.onExecutorAdded(executorAdded) - - if (executorAdded != null && executorAdded.executorId != null) { - val execId = executorAdded.executorId - if (!executorIds.contains(execId) && execId != "driver") { - executorIds += execId - totalExecutorCount += 1 - } - } - } - - override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { - super.onExecutorRemoved(executorRemoved) - - if (executorRemoved != null && executorRemoved.reason != null) { - executorRemoved.reason match { - case reason if reason.toLowerCase.contains("kill") => - executorsKilled += 1 - case _ => - executorsFailed += 1 - } - } - } - - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { - val completionTime = applicationEnd.time - - val safeEndTime = if (completionTime > 0) completionTime else System.currentTimeMillis() - val duration = if (startTime > 0) safeEndTime - startTime else 0L - val successful = succeededJobsCount > 0 && failedJobsCount == 0 - val epochMillis = System.currentTimeMillis() - val configurations = conf.getAll.toMap - val appLabels = extractAppLabels(conf) - - val appEndMetrics = Map[String, Any]( - "name" -> "applications_ended", - "appId" -> appId, - "appName" -> appName, - "startTime" -> startTime, - "completionTime" -> completionTime, - "duration" -> duration, - "successful" -> successful, - "totalExecutorCount" -> totalExecutorCount, - "executorsFailed" -> executorsFailed, - "executorsKilled" -> executorsKilled, - "totalJobsCompleted" -> totalJobsCompleted, - "succeededJobsCount" -> succeededJobsCount, - "failedJobsCount" -> failedJobsCount, - "numStagesCompleted" -> totalStagesCompleted, - "numSucceededStages" -> succeededStagesCount, - "numFailedStages" -> failedStagesCount, - "totalTaskCount" -> totalTaskCount, - "numTaskFailed" -> numTaskFailed, - "numTaskKilled" -> numTaskKilled, - "epochMillis" -> epochMillis, - "configurations" -> configurations - ) ++ appLabels - - report(appEndMetrics) - super.onApplicationEnd(applicationEnd) - } - - private def extractAppLabels(conf: SparkConf): Map[String, String] = { - Try { - conf.getAll - .filter { case (key, _) => key.startsWith("spark.sparkmeasure.appLabels.") } - .map { case (key, value) => (key.stripPrefix("spark.sparkmeasure."), value) } - .toMap - }.getOrElse(Map.empty[String, String]) - } -} - -/** - * AppEventKafkaSinkExtended extends the AppEventKafkaSink functionality with a verbose dump of tasks metrics - * Note: this can generate a large amount of data O(Number_of_tasks) - * Configuration parameters and how-to use: see AppEventKafkaSinkExtended - */ -class AppEventKafkaSinkExtended(conf: SparkConf) extends AppEventKafkaSink(conf) { - - override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { - val taskInfo = taskStart.taskInfo - val epochMillis = System.currentTimeMillis() - - val taskStartMetrics = Map[String, Any]( - "name" -> "tasks_started", - "appId" -> appId, - "taskId" -> taskInfo.taskId, - "attemptNumber" -> taskInfo.attemptNumber, - "stageId" -> taskStart.stageId, - "launchTime" -> taskInfo.launchTime, - "epochMillis" -> epochMillis - ) - report(taskStartMetrics) - } - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - super.onTaskEnd(taskEnd) - - val taskInfo = taskEnd.taskInfo - val taskmetrics = taskEnd.taskMetrics - val epochMillis = System.currentTimeMillis() - - val point1 = Map[String, Any]( - "name" -> "tasks_ended", - "appId" -> appId, - "taskId" -> taskInfo.taskId, - "attemptNumber" -> taskInfo.attemptNumber, - "stageId" -> taskEnd.stageId, - "launchTime" -> taskInfo.launchTime, - "finishTime" -> taskInfo.finishTime, - "epochMillis" -> epochMillis - ) - report(point1) - - val taskMetricsHeader = Map[String, Any]( - "name" -> "task_metrics", - "appId" -> appId, - // task info - "taskId" -> taskInfo.taskId, - "attemptNumber" -> taskInfo.attemptNumber, - "stageId" -> taskEnd.stageId, - "launchTime" -> taskInfo.launchTime, - "finishTime" -> taskInfo.finishTime, - "failed" -> taskInfo.failed, - "speculative" -> taskInfo.speculative, - "killed" -> taskInfo.killed, - "finished" -> taskInfo.finished, - "executorId" -> taskInfo.executorId, - "duration" -> taskInfo.duration, - "successful" -> taskInfo.successful, - "host" -> taskInfo.host, - "taskLocality" -> Utils.encodeTaskLocality(taskInfo.taskLocality), - "epochMillis" -> epochMillis - ) - val taskMetricsBody = if (taskmetrics != null) Map[String, Any]( - // task metrics - "executorRunTime" -> taskmetrics.executorRunTime, - "executorCpuTime" -> taskmetrics.executorCpuTime, - "executorDeserializeCpuTime" -> taskmetrics.executorDeserializeCpuTime, - "executorDeserializeTime" -> taskmetrics.executorDeserializeTime, - "jvmGCTime" -> taskmetrics.jvmGCTime, - "memoryBytesSpilled" -> taskmetrics.memoryBytesSpilled, - "peakExecutionMemory" -> taskmetrics.peakExecutionMemory, - "resultSerializationTime" -> taskmetrics.resultSerializationTime, - "resultSize" -> taskmetrics.resultSize, - "bytesRead" -> taskmetrics.inputMetrics.bytesRead, - "recordsRead" -> taskmetrics.inputMetrics.recordsRead, - "bytesWritten" -> taskmetrics.outputMetrics.bytesWritten, - "recordsWritten" -> taskmetrics.outputMetrics.recordsWritten, - "shuffleTotalBytesRead" -> taskmetrics.shuffleReadMetrics.totalBytesRead, - "shuffleRemoteBytesRead" -> taskmetrics.shuffleReadMetrics.remoteBytesRead, - "shuffleLocalBytesRead" -> taskmetrics.shuffleReadMetrics.localBytesRead, - "shuffleTotalBlocksFetched" -> taskmetrics.shuffleReadMetrics.totalBlocksFetched, - "shuffleLocalBlocksFetched" -> taskmetrics.shuffleReadMetrics.localBlocksFetched, - "shuffleRemoteBlocksFetched" -> taskmetrics.shuffleReadMetrics.remoteBlocksFetched, - "shuffleRecordsRead" -> taskmetrics.shuffleReadMetrics.recordsRead, - // this requires spark2.3 and above "remoteBytesReadToDisk" -> taskmetrics.shuffleReadMetrics.remoteBytesReadToDisk, - "shuffleFetchWaitTime" -> taskmetrics.shuffleReadMetrics.fetchWaitTime, - "shuffleBytesWritten" -> taskmetrics.shuffleWriteMetrics.bytesWritten, - "shuffleRecordsWritten" -> taskmetrics.shuffleWriteMetrics.recordsWritten, - "shuffleWriteTime" -> taskmetrics.shuffleWriteMetrics.writeTime, - ) else Map() - val point2 = taskMetricsHeader ++ taskMetricsBody - report(point2) - } -} \ No newline at end of file From ebd5eb47fa1de32d99ab726e4b564b9e8a479290 Mon Sep 17 00:00:00 2001 From: pp-achauhan Date: Thu, 26 Feb 2026 23:08:12 +0530 Subject: [PATCH 7/9] Removed totalExecutorCount metric --- src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala b/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala index cd007e8..b558047 100644 --- a/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala +++ b/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala @@ -57,8 +57,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener { private var startTime: Long = 0L // Executor tracking - private val executorIds: mutable.HashSet[String] = mutable.HashSet.empty[String] - private var totalExecutorCount: Int = 0 private var executorsFailed: Int = 0 private var executorsKilled: Int = 0 @@ -90,14 +88,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener { "epochMillis" -> epochMillis ) report(metrics) - - if (executorAdded != null && executorAdded.executorId != null) { - val execId = executorAdded.executorId - if (!executorIds.contains(execId) && execId != "driver") { - executorIds += execId - totalExecutorCount += 1 - } - } } override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { @@ -308,7 +298,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener { "completionTime" -> completionTime, "duration" -> duration, "successful" -> successful, - "totalExecutorCount" -> totalExecutorCount, "executorsFailed" -> executorsFailed, "executorsKilled" -> executorsKilled, "totalJobsCompleted" -> totalJobsCompleted, From a75775015bcc889facb88aa221a58fe5e553e357 Mon Sep 17 00:00:00 2001 From: pp-achauhan Date: Fri, 27 Feb 2026 17:02:41 +0530 Subject: [PATCH 8/9] Removed successful metric from app_end --- src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala b/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala index b558047..2e9ddd7 100644 --- a/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala +++ b/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala @@ -285,7 +285,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener { val completionTime = applicationEnd.time val safeEndTime = if (completionTime > 0) completionTime else System.currentTimeMillis() val duration = if (startTime > 0) safeEndTime - startTime else 0L - val successful = succeededJobsCount > 0 && failedJobsCount == 0 val epochMillis = System.currentTimeMillis() val configurations = conf.getAll.toMap val appLabels = extractAppLabels(conf) @@ -297,7 +296,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener { "startTime" -> startTime, "completionTime" -> completionTime, "duration" -> duration, - "successful" -> successful, "executorsFailed" -> executorsFailed, "executorsKilled" -> executorsKilled, "totalJobsCompleted" -> totalJobsCompleted, From cd7d29d7e5929459995c0e80c247985a3aca1984 Mon Sep 17 00:00:00 2001 From: pp-achauhan Date: Thu, 2 Apr 2026 21:00:50 +0530 Subject: [PATCH 9/9] Reverted KafkaSink to actual position and added new event class KafkaSinkV2, testcase and doc --- docs/Flight_recorder_mode_KafkaSink.md | 74 ++++ .../Reference_SparkMeasure_API_and_Configs.md | 48 +++ .../ch/cern/sparkmeasure/KafkaSink.scala | 133 +------ .../ch/cern/sparkmeasure/KafkaSinkV2.scala | 365 ++++++++++++++++++ .../cern/sparkmeasure/KafkaSinkV2Test.scala | 339 ++++++++++++++++ 5 files changed, 828 insertions(+), 131 deletions(-) create mode 100644 src/main/scala/ch/cern/sparkmeasure/KafkaSinkV2.scala create mode 100644 src/test/scala/ch/cern/sparkmeasure/KafkaSinkV2Test.scala diff --git a/docs/Flight_recorder_mode_KafkaSink.md b/docs/Flight_recorder_mode_KafkaSink.md index eedab75..663a9ca 100644 --- a/docs/Flight_recorder_mode_KafkaSink.md +++ b/docs/Flight_recorder_mode_KafkaSink.md @@ -14,6 +14,28 @@ provided by the user. Use this mode to monitor Spark execution workload. Notes: - KafkaSink: the amount of data generated is relatively small in most applications: O(number_of_stages) - KafkaSinkExtended can generate a large amount of data O(Number_of_tasks), use with care + +## KafkaSinkV2 and KafkaSinkV2Extended + +**KafkaSinkV2** is an enhanced version of KafkaSink that adds application-level aggregated metrics and custom labels support. +It collects all stage/executor/query metrics from the base KafkaSink plus additional application lifecycle events and counters. +**KafkaSinkV2Extended** extends KafkaSinkV2 to also record detailed metrics for each executed Task. + +**Compatibility Note:** KafkaSinkV2 is backward compatible with KafkaSink in terms of configuration and basic event types. +Existing consumers of KafkaSink events will continue to work as expected. KafkaSinkV2 adds two new event types +(`applications_started` and `applications_ended`) with additional metadata that existing consumers can safely ignore +if not needed. + +### Key Differences from KafkaSink + +1. **Application-Level Metrics:** KafkaSinkV2 emits `applications_started` and `applications_ended` events with aggregated counters +2. **Custom Labels:** Support for custom metadata labels via `spark.sparkmeasure.appLabels.*` configuration +3. **Enhanced Application End Event:** Includes executor counts, job/stage/task counters, and selected Spark configurations +4. **Counter Tracking:** Tracks success/failure counts for jobs, stages, and tasks throughout application lifecycle + +## Configuration + +### KafkaSink / KafkaSinkExtended Configuration How to use: attach the KafkaSink to a Spark Context using the extra listener infrastructure. Example: - `--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSink` @@ -40,6 +62,29 @@ Configuration - KafkaSink parameters: Example: --conf spark.sparkmeasure.kafka.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ``` +### KafkaSinkV2 / KafkaSinkV2Extended Configuration + +How to use: attach the KafkaSinkV2 listener to the Spark Context: + +``` +# Start the listener for KafkaSinkV2 (recommended): +--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2 + +# Or use KafkaSinkV2Extended for task-level metrics (generates more data): +--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2Extended + +# Required Kafka configuration (same as KafkaSink): +--conf spark.sparkmeasure.kafkaBroker = Kafka broker endpoint URL + Example: --conf spark.sparkmeasure.kafkaBroker=kafka.your-site.com:9092 +--conf spark.sparkmeasure.kafkaTopic = Kafka topic + Example: --conf spark.sparkmeasure.kafkaTopic=sparkmeasure-metrics + +# Optional - Custom application labels: +--conf spark.sparkmeasure.appLabels. = Custom metadata value + Example: --conf spark.sparkmeasure.appLabels.project=my-project + Example: --conf spark.sparkmeasure.appLabels.environment=production +``` + This code depends on "kafka-clients". If you deploy sparkMeasure from maven central, the dependency is being taken care of. If you run sparkMeasure from a jar instead, you may need to add the dependency manually @@ -102,3 +147,32 @@ bin/spark-shell \ "jobId" : "0", "appId" : "local-1660057441489" ... +``` + +**Note:** KafkaSinkV2 also emits all standard events like `stages_started`, `stages_ended`, `stage_metrics`, etc., +just like the original KafkaSink, ensuring backward compatibility. + +## Migration Guide: KafkaSink to KafkaSinkV2 + +If you are currently using KafkaSink and want to migrate to KafkaSinkV2: + +1. **Configuration Change:** Simply replace the listener class name: + ``` + # Old: + --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSink + + # New: + --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2 + ``` + +2. **Backward Compatibility:** All existing event types and their schemas remain unchanged. Your existing Kafka consumers will continue to work. + +3. **New Events:** KafkaSinkV2 adds two new event types (`applications_started` and `applications_ended`). If your consumers don't need these, they can simply ignore events with these names. + +4. **Optional Custom Labels:** Add custom labels for better filtering and organization: + ``` + --conf spark.sparkmeasure.appLabels.project=my-project + --conf spark.sparkmeasure.appLabels.environment=staging + ``` + +5. **Benefits:** You gain application-level aggregated metrics, custom labels, and selected Spark configurations without losing any existing functionality. diff --git a/docs/Reference_SparkMeasure_API_and_Configs.md b/docs/Reference_SparkMeasure_API_and_Configs.md index a6fc663..52565fd 100644 --- a/docs/Reference_SparkMeasure_API_and_Configs.md +++ b/docs/Reference_SparkMeasure_API_and_Configs.md @@ -433,6 +433,54 @@ This code depends on "kafka-clients", you may need to add the dependency explici --packages org.apache.kafka:kafka-clients:3.7.0 ``` +## KafkaSinkV2 and KafkaSinkV2Extended + +``` +class KafkaSinkV2(conf: SparkConf) extends KafkaSink(conf) +class KafkaSinkV2Extended(conf: SparkConf) extends KafkaSinkV2(conf) + +**KafkaSinkV2** is an enhanced version of KafkaSink that extends the SparkListener infrastructure +with application-level aggregated metrics and custom labels support. + +Key Features: +1. All stage/executor/query metrics from the base KafkaSink +2. Application-level aggregated counters (executor/job/stage/task counts) +3. Custom labels via spark.sparkmeasure.appLabels.* configurations +4. Enhanced applications_started and applications_ended events with metadata +5. Automatic capture of selected Spark configurations + +**Backward Compatibility:** KafkaSinkV2 emits all the same event types as KafkaSink, +ensuring existing consumers continue to work. It adds two new event types that can be +safely ignored by consumers that don't need them. + +How to use: +* --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2 + +**KafkaSinkV2Extended** adds verbose task-level metrics (can generate O(Number_of_tasks) data) +* How to use: +* --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2Extended + +Configuration - KafkaSinkV2 parameters: + +Required: +--conf spark.sparkmeasure.kafkaBroker = Kafka broker endpoint URL + Example: --conf spark.sparkmeasure.kafkaBroker=kafka.your-site.com:9092 +--conf spark.sparkmeasure.kafkaTopic = Kafka topic + Example: --conf spark.sparkmeasure.kafkaTopic=spark-metrics + +Optional - Custom labels (recommended for filtering and organization): +--conf spark.sparkmeasure.appLabels. = Custom metadata value + Example: --conf spark.sparkmeasure.appLabels.project=my-project + Example: --conf spark.sparkmeasure.appLabels.environment=production + Example: --conf spark.sparkmeasure.appLabels.team=data-engineering + +For detailed event schemas and examples, see: + - docs/Flight_recorder_mode_KafkaSink.md + +This code depends on "kafka-clients", you may need to add the dependency explicitly, example: + --packages org.apache.kafka:kafka-clients:3.7.0 +``` + ## Prometheus PushGatewaySink ``` class PushGatewaySink(conf: SparkConf) extends SparkListener diff --git a/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala b/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala index 2e9ddd7..17c58c3 100644 --- a/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala +++ b/src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala @@ -2,7 +2,7 @@ package ch.cern.sparkmeasure import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerRecord} import org.apache.kafka.common.serialization.ByteArraySerializer -import org.apache.spark.{SparkConf, TaskFailedReason, TaskKilled} +import org.apache.spark.SparkConf import org.apache.spark.scheduler._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} @@ -10,7 +10,6 @@ import org.slf4j.{Logger, LoggerFactory} import java.nio.charset.StandardCharsets import java.util.Properties -import scala.collection.mutable import scala.util.Try /** @@ -30,8 +29,6 @@ import scala.util.Try * example: --conf spark.sparkmeasure.kafkaTopic=sparkmeasure-stageinfo * spark.sparkmeasure.kafka.* = Other kafka properties * example: --conf spark.sparkmeasure.kafka.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks - * spark.sparkmeasure.appLabels.* = Custom labels to include in application start and end events - * example: --conf spark.sparkmeasure.appLabels.environment=production * * This code depends on "kafka clients", you may need to add the dependency: * --packages org.apache.kafka:kafka-clients:3.2.1 @@ -52,29 +49,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener { case _ => "noAppId" } - // Application tracking - private var appName: String = "noAppName" - private var startTime: Long = 0L - - // Executor tracking - private var executorsFailed: Int = 0 - private var executorsKilled: Int = 0 - - // Job tracking - private var totalJobsCompleted: Int = 0 - private var succeededJobsCount: Int = 0 - private var failedJobsCount: Int = 0 - - // Stage tracking - private var totalStagesCompleted: Int = 0 - private var succeededStagesCount: Int = 0 - private var failedStagesCount: Int = 0 - - // Task tracking - private var totalTaskCount: Int = 0 - private var numTaskFailed: Int = 0 - private var numTaskKilled: Int = 0 - override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { val executorInfo = executorAdded.executorInfo val epochMillis = System.currentTimeMillis() @@ -90,17 +64,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener { report(metrics) } - override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { - if (executorRemoved != null && executorRemoved.reason != null) { - executorRemoved.reason match { - case reason if reason.toLowerCase.contains("kill") => - executorsKilled += 1 - case _ => - executorsFailed += 1 - } - } - } - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { val submissionTime = stageSubmitted.stageInfo.submissionTime.getOrElse(0L) val attemptNumber = stageSubmitted.stageInfo.attemptNumber() @@ -118,6 +81,7 @@ class KafkaSink(conf: SparkConf) extends SparkListener { report(metrics) } + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { val stageId = stageCompleted.stageInfo.stageId.toString val submissionTime = stageCompleted.stageInfo.submissionTime.getOrElse(0L) @@ -176,17 +140,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener { ) report(stageTaskMetrics) - - if (stageCompleted != null && stageCompleted.stageInfo != null) { - val stageInfo = stageCompleted.stageInfo - totalStagesCompleted += 1 - - if (stageInfo.failureReason.isDefined) { - failedStagesCount += 1 - } else { - succeededStagesCount += 1 - } - } } override def onOtherEvent(event: SparkListenerEvent): Unit = { @@ -250,69 +203,13 @@ class KafkaSink(conf: SparkConf) extends SparkListener { "epochMillis" -> epochMillis ) report(jobEndMetrics) - - if (jobEnd != null) { - totalJobsCompleted += 1 - - jobEnd.jobResult match { - case org.apache.spark.scheduler.JobSucceeded => - succeededJobsCount += 1 - case _ => - failedJobsCount += 1 - } - } } override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { appId = applicationStart.appId.getOrElse("noAppId") - appName = applicationStart.appName - startTime = applicationStart.time - val appLabels = extractAppLabels(conf) - val epochMillis = System.currentTimeMillis() - - val appStartMetrics = Map[String, Any]( - "name" -> "applications_started", - "appId" -> appId, - "appName" -> appName, - "startTime" -> startTime, - "epochMillis" -> epochMillis - ) ++ appLabels - - report(appStartMetrics) } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { - val completionTime = applicationEnd.time - val safeEndTime = if (completionTime > 0) completionTime else System.currentTimeMillis() - val duration = if (startTime > 0) safeEndTime - startTime else 0L - val epochMillis = System.currentTimeMillis() - val configurations = conf.getAll.toMap - val appLabels = extractAppLabels(conf) - - val appEndMetrics = Map[String, Any]( - "name" -> "applications_ended", - "appId" -> appId, - "appName" -> appName, - "startTime" -> startTime, - "completionTime" -> completionTime, - "duration" -> duration, - "executorsFailed" -> executorsFailed, - "executorsKilled" -> executorsKilled, - "totalJobsCompleted" -> totalJobsCompleted, - "succeededJobsCount" -> succeededJobsCount, - "failedJobsCount" -> failedJobsCount, - "numStagesCompleted" -> totalStagesCompleted, - "numSucceededStages" -> succeededStagesCount, - "numFailedStages" -> failedStagesCount, - "totalTaskCount" -> totalTaskCount, - "numTaskFailed" -> numTaskFailed, - "numTaskKilled" -> numTaskKilled, - "epochMillis" -> epochMillis, - "configurations" -> configurations - ) ++ appLabels - - report(appEndMetrics) - logger.info(s"Spark application ended, timestamp = ${applicationEnd.time}, closing Kafka connection.") synchronized( if (Option(producer).isDefined) { @@ -326,22 +223,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener { ) } - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - if (taskEnd != null) { - totalTaskCount += 1 - - if (taskEnd.reason != null) { - taskEnd.reason match { - case _: TaskKilled => - numTaskKilled += 1 - case _: TaskFailedReason => - numTaskFailed += 1 - case _ => - } - } - } - } - protected def report[T <: Any](metrics: Map[String, T]): Unit = { val result: Unit = Try { ensureProducer() @@ -375,14 +256,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener { ) } - private def extractAppLabels(conf: SparkConf): Map[String, String] = { - Try { - conf.getAll - .filter { case (key, _) => key.startsWith("spark.sparkmeasure.appLabels.") } - .map { case (key, value) => (key.stripPrefix("spark.sparkmeasure."), value) } - .toMap - }.getOrElse(Map.empty[String, String]) - } } /** @@ -409,8 +282,6 @@ class KafkaSinkExtended(conf: SparkConf) extends KafkaSink(conf) { } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - super.onTaskEnd(taskEnd) - val taskInfo = taskEnd.taskInfo val taskmetrics = taskEnd.taskMetrics val epochMillis = System.currentTimeMillis() diff --git a/src/main/scala/ch/cern/sparkmeasure/KafkaSinkV2.scala b/src/main/scala/ch/cern/sparkmeasure/KafkaSinkV2.scala new file mode 100644 index 0000000..5d06328 --- /dev/null +++ b/src/main/scala/ch/cern/sparkmeasure/KafkaSinkV2.scala @@ -0,0 +1,365 @@ +package ch.cern.sparkmeasure + +import org.apache.spark.scheduler._ +import org.apache.spark.{SparkConf, TaskFailedReason, TaskKilled} + +import scala.collection.mutable +import scala.util.Try + + +/** + * An extended version of ch.cern.sparkmeasure.KafkaSink that adds + * application-level metrics and custom labels support. + * + * This listener combines: + * 1. All stage/executor/query metrics from the base KafkaSink + * 2. Application-level metrics (executor counts, stage counts, task counts) + * 3. Custom Labels passed via spark configurations + * 4. Enhanced applications_started and applications_ended events with metadata + * + * Configuration: + * --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2 + * --conf spark.sparkmeasure.kafkaBroker=kafka.example.com:9092 + * --conf spark.sparkmeasure.kafkaTopic=spark-metrics + * --conf spark.sparkmeasure.kafka.* = Additional Kafka properties + * --conf spark.sparkmeasure.appLabels.* = Custom metadata to include in app events + * + * Example custom configs: + * --conf spark.sparkmeasure.appLabels.project=my-project + * --conf spark.sparkmeasure.appLabels.environment=production + * --conf spark.sparkmeasure.appLabels.team=engineering + * + */ +class KafkaSinkV2(conf: SparkConf) extends KafkaSink(conf) { + + // Application tracking + private var appName: String = "unknown" + private var startTime: Long = 0L + + // Executor tracking + private var executorsFailed: Int = 0 + private var executorsKilled: Int = 0 + + // Job tracking + private var totalJobsCompleted: Int = 0 + private var succeededJobsCount: Int = 0 + private var failedJobsCount: Int = 0 + + // Stage tracking + private var totalStagesCompleted: Int = 0 + private var succeededStagesCount: Int = 0 + private var failedStagesCount: Int = 0 + + // Task tracking + private var totalTaskCount: Int = 0 + private var numTaskFailed: Int = 0 + private var numTaskKilled: Int = 0 + + override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { + super.onApplicationStart(applicationStart) + + appId = applicationStart.appId.getOrElse("noAppId") + appName = applicationStart.appName + startTime = applicationStart.time + val appLabels = extractAppLabels(conf) + + val epochMillis = System.currentTimeMillis() + + val appStartMetrics = Map[String, Any]( + "name" -> "applications_started", + "appId" -> appId, + "appName" -> appName, + "startTime" -> startTime, + "epochMillis" -> epochMillis + ) ++ appLabels + + report(appStartMetrics) + } + + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + super.onStageCompleted(stageCompleted) + + if (stageCompleted != null && stageCompleted.stageInfo != null) { + val stageInfo = stageCompleted.stageInfo + totalStagesCompleted += 1 + + if (stageInfo.failureReason.isDefined) { + failedStagesCount += 1 + } else { + succeededStagesCount += 1 + } + } + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + if (taskEnd != null) { + totalTaskCount += 1 + + if (taskEnd.reason != null) { + taskEnd.reason match { + case _: TaskKilled => + numTaskKilled += 1 + case _: TaskFailedReason => + numTaskFailed += 1 + case _ => + } + } + } + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + super.onJobEnd(jobEnd) + + if (jobEnd != null) { + totalJobsCompleted += 1 + + jobEnd.jobResult match { + case org.apache.spark.scheduler.JobSucceeded => + succeededJobsCount += 1 + case _ => + failedJobsCount += 1 + } + } + } + + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + + if (executorRemoved != null && executorRemoved.reason != null) { + executorRemoved.reason match { + case reason if reason.toLowerCase.contains("kill") => + executorsKilled += 1 + case _ => + executorsFailed += 1 + } + } + } + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + val completionTime = applicationEnd.time + + val safeEndTime = if (completionTime > 0) completionTime else System.currentTimeMillis() + val duration = if (startTime > 0) safeEndTime - startTime else 0L + val epochMillis = System.currentTimeMillis() + val sparkConfigs = extractSparkConfigs(conf) + val appLabels = extractAppLabels(conf) + + val appEndMetrics = Map[String, Any]( + "name" -> "applications_ended", + "appId" -> appId, + "appName" -> appName, + "startTime" -> startTime, + "completionTime" -> completionTime, + "duration" -> duration, + "executorsFailed" -> executorsFailed, + "executorsKilled" -> executorsKilled, + "totalJobsCompleted" -> totalJobsCompleted, + "succeededJobsCount" -> succeededJobsCount, + "failedJobsCount" -> failedJobsCount, + "numStagesCompleted" -> totalStagesCompleted, + "numSucceededStages" -> succeededStagesCount, + "numFailedStages" -> failedStagesCount, + "totalTaskCount" -> totalTaskCount, + "numTaskFailed" -> numTaskFailed, + "numTaskKilled" -> numTaskKilled, + "epochMillis" -> epochMillis, + "sparkConfigs" -> sparkConfigs + ) ++ appLabels + + report(appEndMetrics) + super.onApplicationEnd(applicationEnd) + } + + private def extractAppLabels(conf: SparkConf): Map[String, String] = { + Try { + conf.getAll + .filter { case (key, _) => key.startsWith("spark.sparkmeasure.appLabels.") } + .map { case (key, value) => (key.stripPrefix("spark.sparkmeasure."), value) } + .toMap + }.getOrElse(Map.empty[String, String]) + } + + private def extractSparkConfigs(conf: SparkConf): Map[String, String] = { + val optimizationConfigKeys = List( + // Executor resource configurations + "spark.executor.instances", + "spark.executor.memory", + "spark.executor.memoryOverhead", + "spark.executor.cores", + "spark.executor.heartbeatInterval", + + // Driver resource configurations + "spark.driver.memory", + "spark.driver.memoryOverhead", + "spark.driver.cores", + "spark.driver.maxResultSize", + + // Dynamic allocation + "spark.dynamicAllocation.enabled", + "spark.dynamicAllocation.minExecutors", + "spark.dynamicAllocation.maxExecutors", + "spark.dynamicAllocation.initialExecutors", + "spark.dynamicAllocation.executorIdleTimeout", + "spark.dynamicAllocation.cachedExecutorIdleTimeout", + "spark.dynamicAllocation.schedulerBacklogTimeout", + + // Shuffle and partitioning + "spark.sql.shuffle.partitions", + "spark.default.parallelism", + "spark.sql.adaptive.enabled", + "spark.sql.adaptive.coalescePartitions.enabled", + "spark.sql.adaptive.coalescePartitions.minPartitionNum", + "spark.sql.adaptive.advisoryPartitionSizeInBytes", + "spark.sql.adaptive.skewJoin.enabled", + "spark.sql.adaptive.skewJoin.skewedPartitionFactor", + "spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", + + // Shuffle service and compression + "spark.shuffle.service.enabled", + "spark.shuffle.compress", + "spark.shuffle.spill.compress", + "spark.shuffle.file.buffer", + "spark.shuffle.io.retryWait", + "spark.shuffle.io.maxRetries", + + // Memory management + "spark.memory.fraction", + "spark.memory.storageFraction", + "spark.memory.offHeap.enabled", + "spark.memory.offHeap.size", + + // Kubernetes resource configurations + "spark.kubernetes.executor.request.cores", + "spark.kubernetes.executor.limit.cores", + "spark.kubernetes.driver.request.cores", + "spark.kubernetes.driver.limit.cores", + "spark.kubernetes.memoryOverheadFactor", + "spark.kubernetes.allocation.batch.size", + "spark.kubernetes.allocation.batch.delay", + "spark.kubernetes.executor.deleteOnTermination", + + // Serialization + "spark.serializer", + "spark.kryoserializer.buffer.max", + "spark.kryoserializer.buffer", + "spark.rdd.compress", + + // Network and timeouts + "spark.network.timeout", + "spark.rpc.askTimeout", + "spark.rpc.lookupTimeout", + + // SQL optimization + "spark.sql.autoBroadcastJoinThreshold", + "spark.sql.broadcastTimeout", + "spark.sql.files.maxPartitionBytes", + "spark.sql.files.openCostInBytes", + "spark.sql.files.minPartitionNum", + + // Speculation + "spark.speculation", + "spark.speculation.multiplier", + "spark.speculation.quantile", + "spark.speculation.interval" + ) + Try { + conf.getAll.toMap.filter { case (key, _) => + optimizationConfigKeys.contains(key) + } + }.getOrElse(Map.empty[String, String]) + } +} + +/** + * KafkaSinkV2Extended extends the basic KafkaSinkV2 functionality with a verbose dump of tasks metrics + * Note: this can generate a large amount of data O(Number_of_tasks) + * Configuration parameters and how-to use: see KafkaSinkV2Extended + */ +class KafkaSinkV2Extended(conf: SparkConf) extends KafkaSinkV2(conf) { + + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + val taskInfo = taskStart.taskInfo + val epochMillis = System.currentTimeMillis() + + val taskStartMetrics = Map[String, Any]( + "name" -> "tasks_started", + "appId" -> appId, + "taskId" -> taskInfo.taskId, + "attemptNumber" -> taskInfo.attemptNumber, + "stageId" -> taskStart.stageId, + "launchTime" -> taskInfo.launchTime, + "epochMillis" -> epochMillis + ) + report(taskStartMetrics) + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + super.onTaskEnd(taskEnd) + + val taskInfo = taskEnd.taskInfo + val taskmetrics = taskEnd.taskMetrics + val epochMillis = System.currentTimeMillis() + + val point1 = Map[String, Any]( + "name" -> "tasks_ended", + "appId" -> appId, + "taskId" -> taskInfo.taskId, + "attemptNumber" -> taskInfo.attemptNumber, + "stageId" -> taskEnd.stageId, + "launchTime" -> taskInfo.launchTime, + "finishTime" -> taskInfo.finishTime, + "epochMillis" -> epochMillis + ) + report(point1) + + val taskMetricsHeader = Map[String, Any]( + "name" -> "task_metrics", + "appId" -> appId, + // task info + "taskId" -> taskInfo.taskId, + "attemptNumber" -> taskInfo.attemptNumber, + "stageId" -> taskEnd.stageId, + "launchTime" -> taskInfo.launchTime, + "finishTime" -> taskInfo.finishTime, + "failed" -> taskInfo.failed, + "speculative" -> taskInfo.speculative, + "killed" -> taskInfo.killed, + "finished" -> taskInfo.finished, + "executorId" -> taskInfo.executorId, + "duration" -> taskInfo.duration, + "successful" -> taskInfo.successful, + "host" -> taskInfo.host, + "taskLocality" -> Utils.encodeTaskLocality(taskInfo.taskLocality), + "epochMillis" -> epochMillis + ) + val taskMetricsBody = if (taskmetrics != null) Map[String, Any]( + // task metrics + "executorRunTime" -> taskmetrics.executorRunTime, + "executorCpuTime" -> taskmetrics.executorCpuTime, + "executorDeserializeCpuTime" -> taskmetrics.executorDeserializeCpuTime, + "executorDeserializeTime" -> taskmetrics.executorDeserializeTime, + "jvmGCTime" -> taskmetrics.jvmGCTime, + "memoryBytesSpilled" -> taskmetrics.memoryBytesSpilled, + "peakExecutionMemory" -> taskmetrics.peakExecutionMemory, + "resultSerializationTime" -> taskmetrics.resultSerializationTime, + "resultSize" -> taskmetrics.resultSize, + "bytesRead" -> taskmetrics.inputMetrics.bytesRead, + "recordsRead" -> taskmetrics.inputMetrics.recordsRead, + "bytesWritten" -> taskmetrics.outputMetrics.bytesWritten, + "recordsWritten" -> taskmetrics.outputMetrics.recordsWritten, + "shuffleTotalBytesRead" -> taskmetrics.shuffleReadMetrics.totalBytesRead, + "shuffleRemoteBytesRead" -> taskmetrics.shuffleReadMetrics.remoteBytesRead, + "shuffleLocalBytesRead" -> taskmetrics.shuffleReadMetrics.localBytesRead, + "shuffleTotalBlocksFetched" -> taskmetrics.shuffleReadMetrics.totalBlocksFetched, + "shuffleLocalBlocksFetched" -> taskmetrics.shuffleReadMetrics.localBlocksFetched, + "shuffleRemoteBlocksFetched" -> taskmetrics.shuffleReadMetrics.remoteBlocksFetched, + "shuffleRecordsRead" -> taskmetrics.shuffleReadMetrics.recordsRead, + // this requires spark2.3 and above "remoteBytesReadToDisk" -> taskmetrics.shuffleReadMetrics.remoteBytesReadToDisk, + "shuffleFetchWaitTime" -> taskmetrics.shuffleReadMetrics.fetchWaitTime, + "shuffleBytesWritten" -> taskmetrics.shuffleWriteMetrics.bytesWritten, + "shuffleRecordsWritten" -> taskmetrics.shuffleWriteMetrics.recordsWritten, + "shuffleWriteTime" -> taskmetrics.shuffleWriteMetrics.writeTime, + ) else Map() + val point2 = taskMetricsHeader ++ taskMetricsBody + report(point2) + } +} \ No newline at end of file diff --git a/src/test/scala/ch/cern/sparkmeasure/KafkaSinkV2Test.scala b/src/test/scala/ch/cern/sparkmeasure/KafkaSinkV2Test.scala new file mode 100644 index 0000000..28ebdff --- /dev/null +++ b/src/test/scala/ch/cern/sparkmeasure/KafkaSinkV2Test.scala @@ -0,0 +1,339 @@ +package ch.cern.sparkmeasure + +import org.apache.spark.SparkConf +import org.apache.spark.scheduler._ +import org.apache.spark.sql.SparkSession +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import scala.collection.mutable.ListBuffer +import scala.util.Properties + +/** + * Test suite for KafkaSinkV2 to verify: + * 1. Application start/end event schema and payload correctness + * 2. Counter tracking (jobs, stages, tasks, executors) + * 3. Custom labels extraction and inclusion + * 4. Spark configurations extraction + * 5. Backward compatibility with KafkaSink events + */ +class KafkaSinkV2Test extends AnyFunSuite with BeforeAndAfterAll with Matchers { + + var spark: SparkSession = _ + val capturedEvents: ListBuffer[Map[String, Any]] = ListBuffer.empty + + class TestableKafkaSinkV2(conf: SparkConf) extends KafkaSinkV2(conf) { + override protected def report[T <: Any](metrics: Map[String, T]): Unit = { + // Capture events for testing instead of sending to Kafka + capturedEvents += metrics.asInstanceOf[Map[String, Any]] + } + } + + override def beforeAll(): Unit = { + val conf = new SparkConf() + .setMaster("local[*]") + .setAppName("KafkaSinkV2Test") + .set("spark.sparkmeasure.kafkaBroker", "test:9092") + .set("spark.sparkmeasure.kafkaTopic", "test-topic") + .set("spark.sparkmeasure.appLabels.project", "test-project") + .set("spark.sparkmeasure.appLabels.environment", "test-env") + .set("spark.sparkmeasure.appLabels.team", "test-team") + .set("spark.executor.instances", "2") + .set("spark.executor.memory", "1g") + .set("spark.sql.shuffle.partitions", "10") + + if (Properties.versionNumberString.startsWith("2.12")) { + spark = SparkSession.builder() + .config(conf) + .config("spark.jars", "target/scala-2.12/*.jar") + .getOrCreate() + } else if (Properties.versionNumberString.startsWith("2.13")) { + spark = SparkSession.builder() + .config(conf) + .config("spark.jars", "target/scala-2.13/*.jar") + .getOrCreate() + } + } + + override def afterAll(): Unit = { + if (spark != null) { + spark.stop() + } + } + + test("applications_started event should contain correct schema and custom labels") { + capturedEvents.clear() + val listener = new TestableKafkaSinkV2(spark.sparkContext.getConf) + + val appStartEvent = SparkListenerApplicationStart( + appName = "TestApp", + appId = Some("test-app-001"), + time = 1234567890L, + sparkUser = "testuser", + appAttemptId = None, + driverLogs = None + ) + + listener.onApplicationStart(appStartEvent) + + val startEvents = capturedEvents.filter(_("name") == "applications_started") + assert(startEvents.nonEmpty, "applications_started event should be emitted") + + val startEvent = startEvents.head + assert(startEvent("appId") == "test-app-001", "appId should match") + assert(startEvent("appName") == "TestApp", "appName should match") + assert(startEvent("startTime") == 1234567890L, "startTime should match") + assert(startEvent.contains("epochMillis"), "epochMillis should be present") + + // Verify custom labels are included + assert(startEvent("appLabels.project") == "test-project", "project label should be present") + assert(startEvent("appLabels.environment") == "test-env", "environment label should be present") + assert(startEvent("appLabels.team") == "test-team", "team label should be present") + } + + test("applications_ended event should contain basic structure and custom labels") { + capturedEvents.clear() + val listener = new TestableKafkaSinkV2(spark.sparkContext.getConf) + + val appStartEvent = SparkListenerApplicationStart( + appName = "TestApp", + appId = Some("test-app-002"), + time = 1000000000L, + sparkUser = "testuser", + appAttemptId = None, + driverLogs = None + ) + listener.onApplicationStart(appStartEvent) + + val appEndEvent = SparkListenerApplicationEnd(time = 1000010000L) + listener.onApplicationEnd(appEndEvent) + + val endEvents = capturedEvents.filter(_("name") == "applications_ended") + assert(endEvents.nonEmpty, "applications_ended event should be emitted") + + val endEvent = endEvents.head + + // Verify basic fields + assert(endEvent("appId") == "test-app-002", "appId should match") + assert(endEvent("appName") == "TestApp", "appName should match") + assert(endEvent("startTime") == 1000000000L, "startTime should match") + assert(endEvent("completionTime") == 1000010000L, "completionTime should match") + assert(endEvent("duration") == 10000L, "duration should be calculated correctly") + assert(endEvent.contains("epochMillis"), "epochMillis should be present") + + // Verify counters are initialized + assert(endEvent("totalJobsCompleted") == 0, "should have 0 jobs initially") + assert(endEvent("succeededJobsCount") == 0, "should have 0 succeeded jobs") + assert(endEvent("failedJobsCount") == 0, "should have 0 failed jobs") + assert(endEvent("numStagesCompleted") == 0, "should have 0 stages") + assert(endEvent("totalTaskCount") == 0, "should have 0 tasks") + assert(endEvent("executorsFailed") == 0, "should have 0 failed executors") + assert(endEvent("executorsKilled") == 0, "should have 0 killed executors") + + // Verify custom labels + assert(endEvent("appLabels.project") == "test-project", "project label should be present") + assert(endEvent("appLabels.environment") == "test-env", "environment label should be present") + assert(endEvent("appLabels.team") == "test-team", "team label should be present") + + // Verify spark configurations are captured + assert(endEvent.contains("sparkConfigs"), "sparkConfigs should be present") + val sparkConfigs = endEvent("sparkConfigs").asInstanceOf[Map[String, String]] + assert(sparkConfigs.contains("spark.executor.instances"), "executor.instances should be captured") + assert(sparkConfigs("spark.executor.instances") == "2", "executor.instances value should match") + assert(sparkConfigs.contains("spark.executor.memory"), "executor.memory should be captured") + assert(sparkConfigs("spark.executor.memory") == "1g", "executor.memory value should match") + assert(sparkConfigs.contains("spark.sql.shuffle.partitions"), "shuffle.partitions should be captured") + assert(sparkConfigs("spark.sql.shuffle.partitions") == "10", "shuffle.partitions value should match") + } + + test("job counters should track succeeded and failed jobs") { + capturedEvents.clear() + val listener = new TestableKafkaSinkV2(spark.sparkContext.getConf) + + val appStartEvent = SparkListenerApplicationStart( + appName = "TestApp", + appId = Some("test-app-003"), + time = 1000000000L, + sparkUser = "testuser", + appAttemptId = None, + driverLogs = None + ) + listener.onApplicationStart(appStartEvent) + + // Simulate successful job + val jobStart1 = SparkListenerJobStart( + jobId = 1, + time = 1000001000L, + stageInfos = Seq(), + properties = null + ) + listener.onJobStart(jobStart1) + + val jobEnd1 = SparkListenerJobEnd( + jobId = 1, + time = 1000002000L, + jobResult = org.apache.spark.scheduler.JobSucceeded + ) + listener.onJobEnd(jobEnd1) + + // Simulate another successful job + val jobStart2 = SparkListenerJobStart( + jobId = 2, + time = 1000003000L, + stageInfos = Seq(), + properties = null + ) + listener.onJobStart(jobStart2) + + val jobEnd2 = SparkListenerJobEnd( + jobId = 2, + time = 1000004000L, + jobResult = org.apache.spark.scheduler.JobSucceeded + ) + listener.onJobEnd(jobEnd2) + + val appEndEvent = SparkListenerApplicationEnd(time = 1000010000L) + listener.onApplicationEnd(appEndEvent) + + val endEvents = capturedEvents.filter(_("name") == "applications_ended") + val endEvent = endEvents.head + + assert(endEvent("totalJobsCompleted") == 2, "should have 2 total jobs") + assert(endEvent("succeededJobsCount") == 2, "should have 2 succeeded jobs") + assert(endEvent("failedJobsCount") == 0, "should have 0 failed jobs") + } + + test("executor removal tracking should work") { + capturedEvents.clear() + val listener = new TestableKafkaSinkV2(spark.sparkContext.getConf) + + val appStartEvent = SparkListenerApplicationStart( + appName = "TestApp", + appId = Some("test-app-004"), + time = 1000000000L, + sparkUser = "testuser", + appAttemptId = None, + driverLogs = None + ) + listener.onApplicationStart(appStartEvent) + + // Simulate killed executor + val executorRemovedKilled = SparkListenerExecutorRemoved( + time = 1000001000L, + executorId = "exec-1", + reason = "Executor killed by driver" + ) + listener.onExecutorRemoved(executorRemovedKilled) + + // Simulate failed executor + val executorRemovedFailed = SparkListenerExecutorRemoved( + time = 1000002000L, + executorId = "exec-2", + reason = "Executor lost due to node failure" + ) + listener.onExecutorRemoved(executorRemovedFailed) + + val appEndEvent = SparkListenerApplicationEnd(time = 1000010000L) + listener.onApplicationEnd(appEndEvent) + + val endEvents = capturedEvents.filter(_("name") == "applications_ended") + val endEvent = endEvents.head + + assert(endEvent("executorsKilled") == 1, "should have 1 killed executor") + assert(endEvent("executorsFailed") == 1, "should have 1 failed executor") + } + + test("backward compatibility - KafkaSinkV2 should emit KafkaSink event types") { + capturedEvents.clear() + val listener = new TestableKafkaSinkV2(spark.sparkContext.getConf) + + val appStartEvent = SparkListenerApplicationStart( + appName = "TestApp", + appId = Some("test-app-005"), + time = 1000000000L, + sparkUser = "testuser", + appAttemptId = None, + driverLogs = None + ) + listener.onApplicationStart(appStartEvent) + + // Job events + val jobStart = SparkListenerJobStart( + jobId = 1, + time = 1000001000L, + stageInfos = Seq(), + properties = null + ) + listener.onJobStart(jobStart) + + val jobStartedEvents = capturedEvents.filter(_("name") == "jobs_started") + assert(jobStartedEvents.nonEmpty, "jobs_started event should be emitted (backward compatibility)") + assert(jobStartedEvents.head("jobId") == "1", "jobId should match") + + val jobEnd = SparkListenerJobEnd( + jobId = 1, + time = 1000002000L, + jobResult = org.apache.spark.scheduler.JobSucceeded + ) + listener.onJobEnd(jobEnd) + + val jobEndedEvents = capturedEvents.filter(_("name") == "jobs_ended") + assert(jobEndedEvents.nonEmpty, "jobs_ended event should be emitted (backward compatibility)") + } + + test("custom labels extraction should handle missing labels gracefully") { + val conf = new SparkConf() + .set("spark.sparkmeasure.kafkaBroker", "test:9092") + .set("spark.sparkmeasure.kafkaTopic", "test-topic") + // No custom labels set + + capturedEvents.clear() + val listener = new TestableKafkaSinkV2(conf) + + val appStartEvent = SparkListenerApplicationStart( + appName = "TestApp", + appId = Some("test-app-006"), + time = 1234567890L, + sparkUser = "testuser", + appAttemptId = None, + driverLogs = None + ) + + listener.onApplicationStart(appStartEvent) + + val startEvents = capturedEvents.filter(_("name") == "applications_started") + val startEvent = startEvents.head + + // Verify no custom label keys exist + val customLabelKeys = startEvent.keys.filter(_.startsWith("appLabels.")) + assert(customLabelKeys.isEmpty, "no custom labels should be present when not configured") + } + + test("integration test - run actual Spark query and verify metrics") { + capturedEvents.clear() + val listener = new TestableKafkaSinkV2(spark.sparkContext.getConf) + + // Add listener to spark context + spark.sparkContext.addSparkListener(listener) + + // Run a simple query + spark.sql("select count(*) from range(100)").collect() + + // Remove listener + spark.sparkContext.removeSparkListener(listener) + + // Verify that standard KafkaSink events were emitted + val jobStartedEvents = capturedEvents.filter(_("name") == "jobs_started") + assert(jobStartedEvents.nonEmpty, "jobs_started events should be emitted during query execution") + + val jobEndedEvents = capturedEvents.filter(_("name") == "jobs_ended") + assert(jobEndedEvents.nonEmpty, "jobs_ended events should be emitted during query execution") + + val stageStartedEvents = capturedEvents.filter(_("name") == "stages_started") + assert(stageStartedEvents.nonEmpty, "stages_started events should be emitted during query execution") + + val stageEndedEvents = capturedEvents.filter(_("name") == "stages_ended") + assert(stageEndedEvents.nonEmpty, "stages_ended events should be emitted during query execution") + } +}