diff --git a/docs/Flight_recorder_mode_KafkaSink.md b/docs/Flight_recorder_mode_KafkaSink.md index eedab75..663a9ca 100644 --- a/docs/Flight_recorder_mode_KafkaSink.md +++ b/docs/Flight_recorder_mode_KafkaSink.md @@ -14,6 +14,28 @@ provided by the user. Use this mode to monitor Spark execution workload. Notes: - KafkaSink: the amount of data generated is relatively small in most applications: O(number_of_stages) - KafkaSinkExtended can generate a large amount of data O(Number_of_tasks), use with care + +## KafkaSinkV2 and KafkaSinkV2Extended + +**KafkaSinkV2** is an enhanced version of KafkaSink that adds application-level aggregated metrics and custom labels support. +It collects all stage/executor/query metrics from the base KafkaSink plus additional application lifecycle events and counters. +**KafkaSinkV2Extended** extends KafkaSinkV2 to also record detailed metrics for each executed Task. + +**Compatibility Note:** KafkaSinkV2 is backward compatible with KafkaSink in terms of configuration and basic event types. +Existing consumers of KafkaSink events will continue to work as expected. KafkaSinkV2 adds two new event types +(`applications_started` and `applications_ended`) with additional metadata that existing consumers can safely ignore +if not needed. + +### Key Differences from KafkaSink + +1. **Application-Level Metrics:** KafkaSinkV2 emits `applications_started` and `applications_ended` events with aggregated counters +2. **Custom Labels:** Support for custom metadata labels via `spark.sparkmeasure.appLabels.*` configuration +3. **Enhanced Application End Event:** Includes executor counts, job/stage/task counters, and selected Spark configurations +4. **Counter Tracking:** Tracks success/failure counts for jobs, stages, and tasks throughout application lifecycle + +## Configuration + +### KafkaSink / KafkaSinkExtended Configuration How to use: attach the KafkaSink to a Spark Context using the extra listener infrastructure. Example: - `--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSink` @@ -40,6 +62,29 @@ Configuration - KafkaSink parameters: Example: --conf spark.sparkmeasure.kafka.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ``` +### KafkaSinkV2 / KafkaSinkV2Extended Configuration + +How to use: attach the KafkaSinkV2 listener to the Spark Context: + +``` +# Start the listener for KafkaSinkV2 (recommended): +--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2 + +# Or use KafkaSinkV2Extended for task-level metrics (generates more data): +--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2Extended + +# Required Kafka configuration (same as KafkaSink): +--conf spark.sparkmeasure.kafkaBroker = Kafka broker endpoint URL + Example: --conf spark.sparkmeasure.kafkaBroker=kafka.your-site.com:9092 +--conf spark.sparkmeasure.kafkaTopic = Kafka topic + Example: --conf spark.sparkmeasure.kafkaTopic=sparkmeasure-metrics + +# Optional - Custom application labels: +--conf spark.sparkmeasure.appLabels. = Custom metadata value + Example: --conf spark.sparkmeasure.appLabels.project=my-project + Example: --conf spark.sparkmeasure.appLabels.environment=production +``` + This code depends on "kafka-clients". If you deploy sparkMeasure from maven central, the dependency is being taken care of. If you run sparkMeasure from a jar instead, you may need to add the dependency manually @@ -102,3 +147,32 @@ bin/spark-shell \ "jobId" : "0", "appId" : "local-1660057441489" ... +``` + +**Note:** KafkaSinkV2 also emits all standard events like `stages_started`, `stages_ended`, `stage_metrics`, etc., +just like the original KafkaSink, ensuring backward compatibility. + +## Migration Guide: KafkaSink to KafkaSinkV2 + +If you are currently using KafkaSink and want to migrate to KafkaSinkV2: + +1. **Configuration Change:** Simply replace the listener class name: + ``` + # Old: + --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSink + + # New: + --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2 + ``` + +2. **Backward Compatibility:** All existing event types and their schemas remain unchanged. Your existing Kafka consumers will continue to work. + +3. **New Events:** KafkaSinkV2 adds two new event types (`applications_started` and `applications_ended`). If your consumers don't need these, they can simply ignore events with these names. + +4. **Optional Custom Labels:** Add custom labels for better filtering and organization: + ``` + --conf spark.sparkmeasure.appLabels.project=my-project + --conf spark.sparkmeasure.appLabels.environment=staging + ``` + +5. **Benefits:** You gain application-level aggregated metrics, custom labels, and selected Spark configurations without losing any existing functionality. diff --git a/docs/Reference_SparkMeasure_API_and_Configs.md b/docs/Reference_SparkMeasure_API_and_Configs.md index a6fc663..52565fd 100644 --- a/docs/Reference_SparkMeasure_API_and_Configs.md +++ b/docs/Reference_SparkMeasure_API_and_Configs.md @@ -433,6 +433,54 @@ This code depends on "kafka-clients", you may need to add the dependency explici --packages org.apache.kafka:kafka-clients:3.7.0 ``` +## KafkaSinkV2 and KafkaSinkV2Extended + +``` +class KafkaSinkV2(conf: SparkConf) extends KafkaSink(conf) +class KafkaSinkV2Extended(conf: SparkConf) extends KafkaSinkV2(conf) + +**KafkaSinkV2** is an enhanced version of KafkaSink that extends the SparkListener infrastructure +with application-level aggregated metrics and custom labels support. + +Key Features: +1. All stage/executor/query metrics from the base KafkaSink +2. Application-level aggregated counters (executor/job/stage/task counts) +3. Custom labels via spark.sparkmeasure.appLabels.* configurations +4. Enhanced applications_started and applications_ended events with metadata +5. Automatic capture of selected Spark configurations + +**Backward Compatibility:** KafkaSinkV2 emits all the same event types as KafkaSink, +ensuring existing consumers continue to work. It adds two new event types that can be +safely ignored by consumers that don't need them. + +How to use: +* --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2 + +**KafkaSinkV2Extended** adds verbose task-level metrics (can generate O(Number_of_tasks) data) +* How to use: +* --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2Extended + +Configuration - KafkaSinkV2 parameters: + +Required: +--conf spark.sparkmeasure.kafkaBroker = Kafka broker endpoint URL + Example: --conf spark.sparkmeasure.kafkaBroker=kafka.your-site.com:9092 +--conf spark.sparkmeasure.kafkaTopic = Kafka topic + Example: --conf spark.sparkmeasure.kafkaTopic=spark-metrics + +Optional - Custom labels (recommended for filtering and organization): +--conf spark.sparkmeasure.appLabels. = Custom metadata value + Example: --conf spark.sparkmeasure.appLabels.project=my-project + Example: --conf spark.sparkmeasure.appLabels.environment=production + Example: --conf spark.sparkmeasure.appLabels.team=data-engineering + +For detailed event schemas and examples, see: + - docs/Flight_recorder_mode_KafkaSink.md + +This code depends on "kafka-clients", you may need to add the dependency explicitly, example: + --packages org.apache.kafka:kafka-clients:3.7.0 +``` + ## Prometheus PushGatewaySink ``` class PushGatewaySink(conf: SparkConf) extends SparkListener diff --git a/src/main/scala/ch/cern/sparkmeasure/KafkaSinkV2.scala b/src/main/scala/ch/cern/sparkmeasure/KafkaSinkV2.scala new file mode 100644 index 0000000..5d06328 --- /dev/null +++ b/src/main/scala/ch/cern/sparkmeasure/KafkaSinkV2.scala @@ -0,0 +1,365 @@ +package ch.cern.sparkmeasure + +import org.apache.spark.scheduler._ +import org.apache.spark.{SparkConf, TaskFailedReason, TaskKilled} + +import scala.collection.mutable +import scala.util.Try + + +/** + * An extended version of ch.cern.sparkmeasure.KafkaSink that adds + * application-level metrics and custom labels support. + * + * This listener combines: + * 1. All stage/executor/query metrics from the base KafkaSink + * 2. Application-level metrics (executor counts, stage counts, task counts) + * 3. Custom Labels passed via spark configurations + * 4. Enhanced applications_started and applications_ended events with metadata + * + * Configuration: + * --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2 + * --conf spark.sparkmeasure.kafkaBroker=kafka.example.com:9092 + * --conf spark.sparkmeasure.kafkaTopic=spark-metrics + * --conf spark.sparkmeasure.kafka.* = Additional Kafka properties + * --conf spark.sparkmeasure.appLabels.* = Custom metadata to include in app events + * + * Example custom configs: + * --conf spark.sparkmeasure.appLabels.project=my-project + * --conf spark.sparkmeasure.appLabels.environment=production + * --conf spark.sparkmeasure.appLabels.team=engineering + * + */ +class KafkaSinkV2(conf: SparkConf) extends KafkaSink(conf) { + + // Application tracking + private var appName: String = "unknown" + private var startTime: Long = 0L + + // Executor tracking + private var executorsFailed: Int = 0 + private var executorsKilled: Int = 0 + + // Job tracking + private var totalJobsCompleted: Int = 0 + private var succeededJobsCount: Int = 0 + private var failedJobsCount: Int = 0 + + // Stage tracking + private var totalStagesCompleted: Int = 0 + private var succeededStagesCount: Int = 0 + private var failedStagesCount: Int = 0 + + // Task tracking + private var totalTaskCount: Int = 0 + private var numTaskFailed: Int = 0 + private var numTaskKilled: Int = 0 + + override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { + super.onApplicationStart(applicationStart) + + appId = applicationStart.appId.getOrElse("noAppId") + appName = applicationStart.appName + startTime = applicationStart.time + val appLabels = extractAppLabels(conf) + + val epochMillis = System.currentTimeMillis() + + val appStartMetrics = Map[String, Any]( + "name" -> "applications_started", + "appId" -> appId, + "appName" -> appName, + "startTime" -> startTime, + "epochMillis" -> epochMillis + ) ++ appLabels + + report(appStartMetrics) + } + + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + super.onStageCompleted(stageCompleted) + + if (stageCompleted != null && stageCompleted.stageInfo != null) { + val stageInfo = stageCompleted.stageInfo + totalStagesCompleted += 1 + + if (stageInfo.failureReason.isDefined) { + failedStagesCount += 1 + } else { + succeededStagesCount += 1 + } + } + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + if (taskEnd != null) { + totalTaskCount += 1 + + if (taskEnd.reason != null) { + taskEnd.reason match { + case _: TaskKilled => + numTaskKilled += 1 + case _: TaskFailedReason => + numTaskFailed += 1 + case _ => + } + } + } + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + super.onJobEnd(jobEnd) + + if (jobEnd != null) { + totalJobsCompleted += 1 + + jobEnd.jobResult match { + case org.apache.spark.scheduler.JobSucceeded => + succeededJobsCount += 1 + case _ => + failedJobsCount += 1 + } + } + } + + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + + if (executorRemoved != null && executorRemoved.reason != null) { + executorRemoved.reason match { + case reason if reason.toLowerCase.contains("kill") => + executorsKilled += 1 + case _ => + executorsFailed += 1 + } + } + } + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + val completionTime = applicationEnd.time + + val safeEndTime = if (completionTime > 0) completionTime else System.currentTimeMillis() + val duration = if (startTime > 0) safeEndTime - startTime else 0L + val epochMillis = System.currentTimeMillis() + val sparkConfigs = extractSparkConfigs(conf) + val appLabels = extractAppLabels(conf) + + val appEndMetrics = Map[String, Any]( + "name" -> "applications_ended", + "appId" -> appId, + "appName" -> appName, + "startTime" -> startTime, + "completionTime" -> completionTime, + "duration" -> duration, + "executorsFailed" -> executorsFailed, + "executorsKilled" -> executorsKilled, + "totalJobsCompleted" -> totalJobsCompleted, + "succeededJobsCount" -> succeededJobsCount, + "failedJobsCount" -> failedJobsCount, + "numStagesCompleted" -> totalStagesCompleted, + "numSucceededStages" -> succeededStagesCount, + "numFailedStages" -> failedStagesCount, + "totalTaskCount" -> totalTaskCount, + "numTaskFailed" -> numTaskFailed, + "numTaskKilled" -> numTaskKilled, + "epochMillis" -> epochMillis, + "sparkConfigs" -> sparkConfigs + ) ++ appLabels + + report(appEndMetrics) + super.onApplicationEnd(applicationEnd) + } + + private def extractAppLabels(conf: SparkConf): Map[String, String] = { + Try { + conf.getAll + .filter { case (key, _) => key.startsWith("spark.sparkmeasure.appLabels.") } + .map { case (key, value) => (key.stripPrefix("spark.sparkmeasure."), value) } + .toMap + }.getOrElse(Map.empty[String, String]) + } + + private def extractSparkConfigs(conf: SparkConf): Map[String, String] = { + val optimizationConfigKeys = List( + // Executor resource configurations + "spark.executor.instances", + "spark.executor.memory", + "spark.executor.memoryOverhead", + "spark.executor.cores", + "spark.executor.heartbeatInterval", + + // Driver resource configurations + "spark.driver.memory", + "spark.driver.memoryOverhead", + "spark.driver.cores", + "spark.driver.maxResultSize", + + // Dynamic allocation + "spark.dynamicAllocation.enabled", + "spark.dynamicAllocation.minExecutors", + "spark.dynamicAllocation.maxExecutors", + "spark.dynamicAllocation.initialExecutors", + "spark.dynamicAllocation.executorIdleTimeout", + "spark.dynamicAllocation.cachedExecutorIdleTimeout", + "spark.dynamicAllocation.schedulerBacklogTimeout", + + // Shuffle and partitioning + "spark.sql.shuffle.partitions", + "spark.default.parallelism", + "spark.sql.adaptive.enabled", + "spark.sql.adaptive.coalescePartitions.enabled", + "spark.sql.adaptive.coalescePartitions.minPartitionNum", + "spark.sql.adaptive.advisoryPartitionSizeInBytes", + "spark.sql.adaptive.skewJoin.enabled", + "spark.sql.adaptive.skewJoin.skewedPartitionFactor", + "spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", + + // Shuffle service and compression + "spark.shuffle.service.enabled", + "spark.shuffle.compress", + "spark.shuffle.spill.compress", + "spark.shuffle.file.buffer", + "spark.shuffle.io.retryWait", + "spark.shuffle.io.maxRetries", + + // Memory management + "spark.memory.fraction", + "spark.memory.storageFraction", + "spark.memory.offHeap.enabled", + "spark.memory.offHeap.size", + + // Kubernetes resource configurations + "spark.kubernetes.executor.request.cores", + "spark.kubernetes.executor.limit.cores", + "spark.kubernetes.driver.request.cores", + "spark.kubernetes.driver.limit.cores", + "spark.kubernetes.memoryOverheadFactor", + "spark.kubernetes.allocation.batch.size", + "spark.kubernetes.allocation.batch.delay", + "spark.kubernetes.executor.deleteOnTermination", + + // Serialization + "spark.serializer", + "spark.kryoserializer.buffer.max", + "spark.kryoserializer.buffer", + "spark.rdd.compress", + + // Network and timeouts + "spark.network.timeout", + "spark.rpc.askTimeout", + "spark.rpc.lookupTimeout", + + // SQL optimization + "spark.sql.autoBroadcastJoinThreshold", + "spark.sql.broadcastTimeout", + "spark.sql.files.maxPartitionBytes", + "spark.sql.files.openCostInBytes", + "spark.sql.files.minPartitionNum", + + // Speculation + "spark.speculation", + "spark.speculation.multiplier", + "spark.speculation.quantile", + "spark.speculation.interval" + ) + Try { + conf.getAll.toMap.filter { case (key, _) => + optimizationConfigKeys.contains(key) + } + }.getOrElse(Map.empty[String, String]) + } +} + +/** + * KafkaSinkV2Extended extends the basic KafkaSinkV2 functionality with a verbose dump of tasks metrics + * Note: this can generate a large amount of data O(Number_of_tasks) + * Configuration parameters and how-to use: see KafkaSinkV2Extended + */ +class KafkaSinkV2Extended(conf: SparkConf) extends KafkaSinkV2(conf) { + + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + val taskInfo = taskStart.taskInfo + val epochMillis = System.currentTimeMillis() + + val taskStartMetrics = Map[String, Any]( + "name" -> "tasks_started", + "appId" -> appId, + "taskId" -> taskInfo.taskId, + "attemptNumber" -> taskInfo.attemptNumber, + "stageId" -> taskStart.stageId, + "launchTime" -> taskInfo.launchTime, + "epochMillis" -> epochMillis + ) + report(taskStartMetrics) + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { + super.onTaskEnd(taskEnd) + + val taskInfo = taskEnd.taskInfo + val taskmetrics = taskEnd.taskMetrics + val epochMillis = System.currentTimeMillis() + + val point1 = Map[String, Any]( + "name" -> "tasks_ended", + "appId" -> appId, + "taskId" -> taskInfo.taskId, + "attemptNumber" -> taskInfo.attemptNumber, + "stageId" -> taskEnd.stageId, + "launchTime" -> taskInfo.launchTime, + "finishTime" -> taskInfo.finishTime, + "epochMillis" -> epochMillis + ) + report(point1) + + val taskMetricsHeader = Map[String, Any]( + "name" -> "task_metrics", + "appId" -> appId, + // task info + "taskId" -> taskInfo.taskId, + "attemptNumber" -> taskInfo.attemptNumber, + "stageId" -> taskEnd.stageId, + "launchTime" -> taskInfo.launchTime, + "finishTime" -> taskInfo.finishTime, + "failed" -> taskInfo.failed, + "speculative" -> taskInfo.speculative, + "killed" -> taskInfo.killed, + "finished" -> taskInfo.finished, + "executorId" -> taskInfo.executorId, + "duration" -> taskInfo.duration, + "successful" -> taskInfo.successful, + "host" -> taskInfo.host, + "taskLocality" -> Utils.encodeTaskLocality(taskInfo.taskLocality), + "epochMillis" -> epochMillis + ) + val taskMetricsBody = if (taskmetrics != null) Map[String, Any]( + // task metrics + "executorRunTime" -> taskmetrics.executorRunTime, + "executorCpuTime" -> taskmetrics.executorCpuTime, + "executorDeserializeCpuTime" -> taskmetrics.executorDeserializeCpuTime, + "executorDeserializeTime" -> taskmetrics.executorDeserializeTime, + "jvmGCTime" -> taskmetrics.jvmGCTime, + "memoryBytesSpilled" -> taskmetrics.memoryBytesSpilled, + "peakExecutionMemory" -> taskmetrics.peakExecutionMemory, + "resultSerializationTime" -> taskmetrics.resultSerializationTime, + "resultSize" -> taskmetrics.resultSize, + "bytesRead" -> taskmetrics.inputMetrics.bytesRead, + "recordsRead" -> taskmetrics.inputMetrics.recordsRead, + "bytesWritten" -> taskmetrics.outputMetrics.bytesWritten, + "recordsWritten" -> taskmetrics.outputMetrics.recordsWritten, + "shuffleTotalBytesRead" -> taskmetrics.shuffleReadMetrics.totalBytesRead, + "shuffleRemoteBytesRead" -> taskmetrics.shuffleReadMetrics.remoteBytesRead, + "shuffleLocalBytesRead" -> taskmetrics.shuffleReadMetrics.localBytesRead, + "shuffleTotalBlocksFetched" -> taskmetrics.shuffleReadMetrics.totalBlocksFetched, + "shuffleLocalBlocksFetched" -> taskmetrics.shuffleReadMetrics.localBlocksFetched, + "shuffleRemoteBlocksFetched" -> taskmetrics.shuffleReadMetrics.remoteBlocksFetched, + "shuffleRecordsRead" -> taskmetrics.shuffleReadMetrics.recordsRead, + // this requires spark2.3 and above "remoteBytesReadToDisk" -> taskmetrics.shuffleReadMetrics.remoteBytesReadToDisk, + "shuffleFetchWaitTime" -> taskmetrics.shuffleReadMetrics.fetchWaitTime, + "shuffleBytesWritten" -> taskmetrics.shuffleWriteMetrics.bytesWritten, + "shuffleRecordsWritten" -> taskmetrics.shuffleWriteMetrics.recordsWritten, + "shuffleWriteTime" -> taskmetrics.shuffleWriteMetrics.writeTime, + ) else Map() + val point2 = taskMetricsHeader ++ taskMetricsBody + report(point2) + } +} \ No newline at end of file diff --git a/src/test/scala/ch/cern/sparkmeasure/KafkaSinkV2Test.scala b/src/test/scala/ch/cern/sparkmeasure/KafkaSinkV2Test.scala new file mode 100644 index 0000000..28ebdff --- /dev/null +++ b/src/test/scala/ch/cern/sparkmeasure/KafkaSinkV2Test.scala @@ -0,0 +1,339 @@ +package ch.cern.sparkmeasure + +import org.apache.spark.SparkConf +import org.apache.spark.scheduler._ +import org.apache.spark.sql.SparkSession +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import scala.collection.mutable.ListBuffer +import scala.util.Properties + +/** + * Test suite for KafkaSinkV2 to verify: + * 1. Application start/end event schema and payload correctness + * 2. Counter tracking (jobs, stages, tasks, executors) + * 3. Custom labels extraction and inclusion + * 4. Spark configurations extraction + * 5. Backward compatibility with KafkaSink events + */ +class KafkaSinkV2Test extends AnyFunSuite with BeforeAndAfterAll with Matchers { + + var spark: SparkSession = _ + val capturedEvents: ListBuffer[Map[String, Any]] = ListBuffer.empty + + class TestableKafkaSinkV2(conf: SparkConf) extends KafkaSinkV2(conf) { + override protected def report[T <: Any](metrics: Map[String, T]): Unit = { + // Capture events for testing instead of sending to Kafka + capturedEvents += metrics.asInstanceOf[Map[String, Any]] + } + } + + override def beforeAll(): Unit = { + val conf = new SparkConf() + .setMaster("local[*]") + .setAppName("KafkaSinkV2Test") + .set("spark.sparkmeasure.kafkaBroker", "test:9092") + .set("spark.sparkmeasure.kafkaTopic", "test-topic") + .set("spark.sparkmeasure.appLabels.project", "test-project") + .set("spark.sparkmeasure.appLabels.environment", "test-env") + .set("spark.sparkmeasure.appLabels.team", "test-team") + .set("spark.executor.instances", "2") + .set("spark.executor.memory", "1g") + .set("spark.sql.shuffle.partitions", "10") + + if (Properties.versionNumberString.startsWith("2.12")) { + spark = SparkSession.builder() + .config(conf) + .config("spark.jars", "target/scala-2.12/*.jar") + .getOrCreate() + } else if (Properties.versionNumberString.startsWith("2.13")) { + spark = SparkSession.builder() + .config(conf) + .config("spark.jars", "target/scala-2.13/*.jar") + .getOrCreate() + } + } + + override def afterAll(): Unit = { + if (spark != null) { + spark.stop() + } + } + + test("applications_started event should contain correct schema and custom labels") { + capturedEvents.clear() + val listener = new TestableKafkaSinkV2(spark.sparkContext.getConf) + + val appStartEvent = SparkListenerApplicationStart( + appName = "TestApp", + appId = Some("test-app-001"), + time = 1234567890L, + sparkUser = "testuser", + appAttemptId = None, + driverLogs = None + ) + + listener.onApplicationStart(appStartEvent) + + val startEvents = capturedEvents.filter(_("name") == "applications_started") + assert(startEvents.nonEmpty, "applications_started event should be emitted") + + val startEvent = startEvents.head + assert(startEvent("appId") == "test-app-001", "appId should match") + assert(startEvent("appName") == "TestApp", "appName should match") + assert(startEvent("startTime") == 1234567890L, "startTime should match") + assert(startEvent.contains("epochMillis"), "epochMillis should be present") + + // Verify custom labels are included + assert(startEvent("appLabels.project") == "test-project", "project label should be present") + assert(startEvent("appLabels.environment") == "test-env", "environment label should be present") + assert(startEvent("appLabels.team") == "test-team", "team label should be present") + } + + test("applications_ended event should contain basic structure and custom labels") { + capturedEvents.clear() + val listener = new TestableKafkaSinkV2(spark.sparkContext.getConf) + + val appStartEvent = SparkListenerApplicationStart( + appName = "TestApp", + appId = Some("test-app-002"), + time = 1000000000L, + sparkUser = "testuser", + appAttemptId = None, + driverLogs = None + ) + listener.onApplicationStart(appStartEvent) + + val appEndEvent = SparkListenerApplicationEnd(time = 1000010000L) + listener.onApplicationEnd(appEndEvent) + + val endEvents = capturedEvents.filter(_("name") == "applications_ended") + assert(endEvents.nonEmpty, "applications_ended event should be emitted") + + val endEvent = endEvents.head + + // Verify basic fields + assert(endEvent("appId") == "test-app-002", "appId should match") + assert(endEvent("appName") == "TestApp", "appName should match") + assert(endEvent("startTime") == 1000000000L, "startTime should match") + assert(endEvent("completionTime") == 1000010000L, "completionTime should match") + assert(endEvent("duration") == 10000L, "duration should be calculated correctly") + assert(endEvent.contains("epochMillis"), "epochMillis should be present") + + // Verify counters are initialized + assert(endEvent("totalJobsCompleted") == 0, "should have 0 jobs initially") + assert(endEvent("succeededJobsCount") == 0, "should have 0 succeeded jobs") + assert(endEvent("failedJobsCount") == 0, "should have 0 failed jobs") + assert(endEvent("numStagesCompleted") == 0, "should have 0 stages") + assert(endEvent("totalTaskCount") == 0, "should have 0 tasks") + assert(endEvent("executorsFailed") == 0, "should have 0 failed executors") + assert(endEvent("executorsKilled") == 0, "should have 0 killed executors") + + // Verify custom labels + assert(endEvent("appLabels.project") == "test-project", "project label should be present") + assert(endEvent("appLabels.environment") == "test-env", "environment label should be present") + assert(endEvent("appLabels.team") == "test-team", "team label should be present") + + // Verify spark configurations are captured + assert(endEvent.contains("sparkConfigs"), "sparkConfigs should be present") + val sparkConfigs = endEvent("sparkConfigs").asInstanceOf[Map[String, String]] + assert(sparkConfigs.contains("spark.executor.instances"), "executor.instances should be captured") + assert(sparkConfigs("spark.executor.instances") == "2", "executor.instances value should match") + assert(sparkConfigs.contains("spark.executor.memory"), "executor.memory should be captured") + assert(sparkConfigs("spark.executor.memory") == "1g", "executor.memory value should match") + assert(sparkConfigs.contains("spark.sql.shuffle.partitions"), "shuffle.partitions should be captured") + assert(sparkConfigs("spark.sql.shuffle.partitions") == "10", "shuffle.partitions value should match") + } + + test("job counters should track succeeded and failed jobs") { + capturedEvents.clear() + val listener = new TestableKafkaSinkV2(spark.sparkContext.getConf) + + val appStartEvent = SparkListenerApplicationStart( + appName = "TestApp", + appId = Some("test-app-003"), + time = 1000000000L, + sparkUser = "testuser", + appAttemptId = None, + driverLogs = None + ) + listener.onApplicationStart(appStartEvent) + + // Simulate successful job + val jobStart1 = SparkListenerJobStart( + jobId = 1, + time = 1000001000L, + stageInfos = Seq(), + properties = null + ) + listener.onJobStart(jobStart1) + + val jobEnd1 = SparkListenerJobEnd( + jobId = 1, + time = 1000002000L, + jobResult = org.apache.spark.scheduler.JobSucceeded + ) + listener.onJobEnd(jobEnd1) + + // Simulate another successful job + val jobStart2 = SparkListenerJobStart( + jobId = 2, + time = 1000003000L, + stageInfos = Seq(), + properties = null + ) + listener.onJobStart(jobStart2) + + val jobEnd2 = SparkListenerJobEnd( + jobId = 2, + time = 1000004000L, + jobResult = org.apache.spark.scheduler.JobSucceeded + ) + listener.onJobEnd(jobEnd2) + + val appEndEvent = SparkListenerApplicationEnd(time = 1000010000L) + listener.onApplicationEnd(appEndEvent) + + val endEvents = capturedEvents.filter(_("name") == "applications_ended") + val endEvent = endEvents.head + + assert(endEvent("totalJobsCompleted") == 2, "should have 2 total jobs") + assert(endEvent("succeededJobsCount") == 2, "should have 2 succeeded jobs") + assert(endEvent("failedJobsCount") == 0, "should have 0 failed jobs") + } + + test("executor removal tracking should work") { + capturedEvents.clear() + val listener = new TestableKafkaSinkV2(spark.sparkContext.getConf) + + val appStartEvent = SparkListenerApplicationStart( + appName = "TestApp", + appId = Some("test-app-004"), + time = 1000000000L, + sparkUser = "testuser", + appAttemptId = None, + driverLogs = None + ) + listener.onApplicationStart(appStartEvent) + + // Simulate killed executor + val executorRemovedKilled = SparkListenerExecutorRemoved( + time = 1000001000L, + executorId = "exec-1", + reason = "Executor killed by driver" + ) + listener.onExecutorRemoved(executorRemovedKilled) + + // Simulate failed executor + val executorRemovedFailed = SparkListenerExecutorRemoved( + time = 1000002000L, + executorId = "exec-2", + reason = "Executor lost due to node failure" + ) + listener.onExecutorRemoved(executorRemovedFailed) + + val appEndEvent = SparkListenerApplicationEnd(time = 1000010000L) + listener.onApplicationEnd(appEndEvent) + + val endEvents = capturedEvents.filter(_("name") == "applications_ended") + val endEvent = endEvents.head + + assert(endEvent("executorsKilled") == 1, "should have 1 killed executor") + assert(endEvent("executorsFailed") == 1, "should have 1 failed executor") + } + + test("backward compatibility - KafkaSinkV2 should emit KafkaSink event types") { + capturedEvents.clear() + val listener = new TestableKafkaSinkV2(spark.sparkContext.getConf) + + val appStartEvent = SparkListenerApplicationStart( + appName = "TestApp", + appId = Some("test-app-005"), + time = 1000000000L, + sparkUser = "testuser", + appAttemptId = None, + driverLogs = None + ) + listener.onApplicationStart(appStartEvent) + + // Job events + val jobStart = SparkListenerJobStart( + jobId = 1, + time = 1000001000L, + stageInfos = Seq(), + properties = null + ) + listener.onJobStart(jobStart) + + val jobStartedEvents = capturedEvents.filter(_("name") == "jobs_started") + assert(jobStartedEvents.nonEmpty, "jobs_started event should be emitted (backward compatibility)") + assert(jobStartedEvents.head("jobId") == "1", "jobId should match") + + val jobEnd = SparkListenerJobEnd( + jobId = 1, + time = 1000002000L, + jobResult = org.apache.spark.scheduler.JobSucceeded + ) + listener.onJobEnd(jobEnd) + + val jobEndedEvents = capturedEvents.filter(_("name") == "jobs_ended") + assert(jobEndedEvents.nonEmpty, "jobs_ended event should be emitted (backward compatibility)") + } + + test("custom labels extraction should handle missing labels gracefully") { + val conf = new SparkConf() + .set("spark.sparkmeasure.kafkaBroker", "test:9092") + .set("spark.sparkmeasure.kafkaTopic", "test-topic") + // No custom labels set + + capturedEvents.clear() + val listener = new TestableKafkaSinkV2(conf) + + val appStartEvent = SparkListenerApplicationStart( + appName = "TestApp", + appId = Some("test-app-006"), + time = 1234567890L, + sparkUser = "testuser", + appAttemptId = None, + driverLogs = None + ) + + listener.onApplicationStart(appStartEvent) + + val startEvents = capturedEvents.filter(_("name") == "applications_started") + val startEvent = startEvents.head + + // Verify no custom label keys exist + val customLabelKeys = startEvent.keys.filter(_.startsWith("appLabels.")) + assert(customLabelKeys.isEmpty, "no custom labels should be present when not configured") + } + + test("integration test - run actual Spark query and verify metrics") { + capturedEvents.clear() + val listener = new TestableKafkaSinkV2(spark.sparkContext.getConf) + + // Add listener to spark context + spark.sparkContext.addSparkListener(listener) + + // Run a simple query + spark.sql("select count(*) from range(100)").collect() + + // Remove listener + spark.sparkContext.removeSparkListener(listener) + + // Verify that standard KafkaSink events were emitted + val jobStartedEvents = capturedEvents.filter(_("name") == "jobs_started") + assert(jobStartedEvents.nonEmpty, "jobs_started events should be emitted during query execution") + + val jobEndedEvents = capturedEvents.filter(_("name") == "jobs_ended") + assert(jobEndedEvents.nonEmpty, "jobs_ended events should be emitted during query execution") + + val stageStartedEvents = capturedEvents.filter(_("name") == "stages_started") + assert(stageStartedEvents.nonEmpty, "stages_started events should be emitted during query execution") + + val stageEndedEvents = capturedEvents.filter(_("name") == "stages_ended") + assert(stageEndedEvents.nonEmpty, "stages_ended events should be emitted during query execution") + } +}