Skip to content

Commit cd7d29d

Browse files
committed
Reverted KafkaSink to actual position and added new event class KafkaSinkV2, testcase and doc
1 parent a757750 commit cd7d29d

5 files changed

Lines changed: 828 additions & 131 deletions

File tree

docs/Flight_recorder_mode_KafkaSink.md

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,28 @@ provided by the user. Use this mode to monitor Spark execution workload.
1414
Notes:
1515
- KafkaSink: the amount of data generated is relatively small in most applications: O(number_of_stages)
1616
- KafkaSinkExtended can generate a large amount of data O(Number_of_tasks), use with care
17+
18+
## KafkaSinkV2 and KafkaSinkV2Extended
19+
20+
**KafkaSinkV2** is an enhanced version of KafkaSink that adds application-level aggregated metrics and custom labels support.
21+
It collects all stage/executor/query metrics from the base KafkaSink plus additional application lifecycle events and counters.
22+
**KafkaSinkV2Extended** extends KafkaSinkV2 to also record detailed metrics for each executed Task.
23+
24+
**Compatibility Note:** KafkaSinkV2 is backward compatible with KafkaSink in terms of configuration and basic event types.
25+
Existing consumers of KafkaSink events will continue to work as expected. KafkaSinkV2 adds two new event types
26+
(`applications_started` and `applications_ended`) with additional metadata that existing consumers can safely ignore
27+
if not needed.
28+
29+
### Key Differences from KafkaSink
30+
31+
1. **Application-Level Metrics:** KafkaSinkV2 emits `applications_started` and `applications_ended` events with aggregated counters
32+
2. **Custom Labels:** Support for custom metadata labels via `spark.sparkmeasure.appLabels.*` configuration
33+
3. **Enhanced Application End Event:** Includes executor counts, job/stage/task counters, and selected Spark configurations
34+
4. **Counter Tracking:** Tracks success/failure counts for jobs, stages, and tasks throughout application lifecycle
35+
36+
## Configuration
37+
38+
### KafkaSink / KafkaSinkExtended Configuration
1739

1840
How to use: attach the KafkaSink to a Spark Context using the extra listener infrastructure. Example:
1941
- `--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSink`
@@ -40,6 +62,29 @@ Configuration - KafkaSink parameters:
4062
Example: --conf spark.sparkmeasure.kafka.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
4163
```
4264

65+
### KafkaSinkV2 / KafkaSinkV2Extended Configuration
66+
67+
How to use: attach the KafkaSinkV2 listener to the Spark Context:
68+
69+
```
70+
# Start the listener for KafkaSinkV2 (recommended):
71+
--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2
72+
73+
# Or use KafkaSinkV2Extended for task-level metrics (generates more data):
74+
--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2Extended
75+
76+
# Required Kafka configuration (same as KafkaSink):
77+
--conf spark.sparkmeasure.kafkaBroker = Kafka broker endpoint URL
78+
Example: --conf spark.sparkmeasure.kafkaBroker=kafka.your-site.com:9092
79+
--conf spark.sparkmeasure.kafkaTopic = Kafka topic
80+
Example: --conf spark.sparkmeasure.kafkaTopic=sparkmeasure-metrics
81+
82+
# Optional - Custom application labels:
83+
--conf spark.sparkmeasure.appLabels.<labelKey> = Custom metadata value
84+
Example: --conf spark.sparkmeasure.appLabels.project=my-project
85+
Example: --conf spark.sparkmeasure.appLabels.environment=production
86+
```
87+
4388
This code depends on "kafka-clients". If you deploy sparkMeasure from maven central,
4489
the dependency is being taken care of.
4590
If you run sparkMeasure from a jar instead, you may need to add the dependency manually
@@ -102,3 +147,32 @@ bin/spark-shell \
102147
"jobId" : "0",
103148
"appId" : "local-1660057441489"
104149
...
150+
```
151+
152+
**Note:** KafkaSinkV2 also emits all standard events like `stages_started`, `stages_ended`, `stage_metrics`, etc.,
153+
just like the original KafkaSink, ensuring backward compatibility.
154+
155+
## Migration Guide: KafkaSink to KafkaSinkV2
156+
157+
If you are currently using KafkaSink and want to migrate to KafkaSinkV2:
158+
159+
1. **Configuration Change:** Simply replace the listener class name:
160+
```
161+
# Old:
162+
--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSink
163+
164+
# New:
165+
--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2
166+
```
167+
168+
2. **Backward Compatibility:** All existing event types and their schemas remain unchanged. Your existing Kafka consumers will continue to work.
169+
170+
3. **New Events:** KafkaSinkV2 adds two new event types (`applications_started` and `applications_ended`). If your consumers don't need these, they can simply ignore events with these names.
171+
172+
4. **Optional Custom Labels:** Add custom labels for better filtering and organization:
173+
```
174+
--conf spark.sparkmeasure.appLabels.project=my-project
175+
--conf spark.sparkmeasure.appLabels.environment=staging
176+
```
177+
178+
5. **Benefits:** You gain application-level aggregated metrics, custom labels, and selected Spark configurations without losing any existing functionality.

docs/Reference_SparkMeasure_API_and_Configs.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,54 @@ This code depends on "kafka-clients", you may need to add the dependency explici
433433
--packages org.apache.kafka:kafka-clients:3.7.0
434434
```
435435

436+
## KafkaSinkV2 and KafkaSinkV2Extended
437+
438+
```
439+
class KafkaSinkV2(conf: SparkConf) extends KafkaSink(conf)
440+
class KafkaSinkV2Extended(conf: SparkConf) extends KafkaSinkV2(conf)
441+
442+
**KafkaSinkV2** is an enhanced version of KafkaSink that extends the SparkListener infrastructure
443+
with application-level aggregated metrics and custom labels support.
444+
445+
Key Features:
446+
1. All stage/executor/query metrics from the base KafkaSink
447+
2. Application-level aggregated counters (executor/job/stage/task counts)
448+
3. Custom labels via spark.sparkmeasure.appLabels.* configurations
449+
4. Enhanced applications_started and applications_ended events with metadata
450+
5. Automatic capture of selected Spark configurations
451+
452+
**Backward Compatibility:** KafkaSinkV2 emits all the same event types as KafkaSink,
453+
ensuring existing consumers continue to work. It adds two new event types that can be
454+
safely ignored by consumers that don't need them.
455+
456+
How to use:
457+
* --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2
458+
459+
**KafkaSinkV2Extended** adds verbose task-level metrics (can generate O(Number_of_tasks) data)
460+
* How to use:
461+
* --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2Extended
462+
463+
Configuration - KafkaSinkV2 parameters:
464+
465+
Required:
466+
--conf spark.sparkmeasure.kafkaBroker = Kafka broker endpoint URL
467+
Example: --conf spark.sparkmeasure.kafkaBroker=kafka.your-site.com:9092
468+
--conf spark.sparkmeasure.kafkaTopic = Kafka topic
469+
Example: --conf spark.sparkmeasure.kafkaTopic=spark-metrics
470+
471+
Optional - Custom labels (recommended for filtering and organization):
472+
--conf spark.sparkmeasure.appLabels.<labelKey> = Custom metadata value
473+
Example: --conf spark.sparkmeasure.appLabels.project=my-project
474+
Example: --conf spark.sparkmeasure.appLabels.environment=production
475+
Example: --conf spark.sparkmeasure.appLabels.team=data-engineering
476+
477+
For detailed event schemas and examples, see:
478+
- docs/Flight_recorder_mode_KafkaSink.md
479+
480+
This code depends on "kafka-clients", you may need to add the dependency explicitly, example:
481+
--packages org.apache.kafka:kafka-clients:3.7.0
482+
```
483+
436484
## Prometheus PushGatewaySink
437485
```
438486
class PushGatewaySink(conf: SparkConf) extends SparkListener

src/main/scala/ch/cern/sparkmeasure/KafkaSink.scala

Lines changed: 2 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@ package ch.cern.sparkmeasure
22

33
import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerRecord}
44
import org.apache.kafka.common.serialization.ByteArraySerializer
5-
import org.apache.spark.{SparkConf, TaskFailedReason, TaskKilled}
5+
import org.apache.spark.SparkConf
66
import org.apache.spark.scheduler._
77
import org.apache.spark.sql.SparkSession
88
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
99
import org.slf4j.{Logger, LoggerFactory}
1010

1111
import java.nio.charset.StandardCharsets
1212
import java.util.Properties
13-
import scala.collection.mutable
1413
import scala.util.Try
1514

1615
/**
@@ -30,8 +29,6 @@ import scala.util.Try
3029
* example: --conf spark.sparkmeasure.kafkaTopic=sparkmeasure-stageinfo
3130
* spark.sparkmeasure.kafka.* = Other kafka properties
3231
* example: --conf spark.sparkmeasure.kafka.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
33-
* spark.sparkmeasure.appLabels.* = Custom labels to include in application start and end events
34-
* example: --conf spark.sparkmeasure.appLabels.environment=production
3532
*
3633
* This code depends on "kafka clients", you may need to add the dependency:
3734
* --packages org.apache.kafka:kafka-clients:3.2.1
@@ -52,29 +49,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener {
5249
case _ => "noAppId"
5350
}
5451

55-
// Application tracking
56-
private var appName: String = "noAppName"
57-
private var startTime: Long = 0L
58-
59-
// Executor tracking
60-
private var executorsFailed: Int = 0
61-
private var executorsKilled: Int = 0
62-
63-
// Job tracking
64-
private var totalJobsCompleted: Int = 0
65-
private var succeededJobsCount: Int = 0
66-
private var failedJobsCount: Int = 0
67-
68-
// Stage tracking
69-
private var totalStagesCompleted: Int = 0
70-
private var succeededStagesCount: Int = 0
71-
private var failedStagesCount: Int = 0
72-
73-
// Task tracking
74-
private var totalTaskCount: Int = 0
75-
private var numTaskFailed: Int = 0
76-
private var numTaskKilled: Int = 0
77-
7852
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
7953
val executorInfo = executorAdded.executorInfo
8054
val epochMillis = System.currentTimeMillis()
@@ -90,17 +64,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener {
9064
report(metrics)
9165
}
9266

93-
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
94-
if (executorRemoved != null && executorRemoved.reason != null) {
95-
executorRemoved.reason match {
96-
case reason if reason.toLowerCase.contains("kill") =>
97-
executorsKilled += 1
98-
case _ =>
99-
executorsFailed += 1
100-
}
101-
}
102-
}
103-
10467
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
10568
val submissionTime = stageSubmitted.stageInfo.submissionTime.getOrElse(0L)
10669
val attemptNumber = stageSubmitted.stageInfo.attemptNumber()
@@ -118,6 +81,7 @@ class KafkaSink(conf: SparkConf) extends SparkListener {
11881
report(metrics)
11982
}
12083

84+
12185
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
12286
val stageId = stageCompleted.stageInfo.stageId.toString
12387
val submissionTime = stageCompleted.stageInfo.submissionTime.getOrElse(0L)
@@ -176,17 +140,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener {
176140
)
177141

178142
report(stageTaskMetrics)
179-
180-
if (stageCompleted != null && stageCompleted.stageInfo != null) {
181-
val stageInfo = stageCompleted.stageInfo
182-
totalStagesCompleted += 1
183-
184-
if (stageInfo.failureReason.isDefined) {
185-
failedStagesCount += 1
186-
} else {
187-
succeededStagesCount += 1
188-
}
189-
}
190143
}
191144

192145
override def onOtherEvent(event: SparkListenerEvent): Unit = {
@@ -250,69 +203,13 @@ class KafkaSink(conf: SparkConf) extends SparkListener {
250203
"epochMillis" -> epochMillis
251204
)
252205
report(jobEndMetrics)
253-
254-
if (jobEnd != null) {
255-
totalJobsCompleted += 1
256-
257-
jobEnd.jobResult match {
258-
case org.apache.spark.scheduler.JobSucceeded =>
259-
succeededJobsCount += 1
260-
case _ =>
261-
failedJobsCount += 1
262-
}
263-
}
264206
}
265207

266208
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
267209
appId = applicationStart.appId.getOrElse("noAppId")
268-
appName = applicationStart.appName
269-
startTime = applicationStart.time
270-
val appLabels = extractAppLabels(conf)
271-
val epochMillis = System.currentTimeMillis()
272-
273-
val appStartMetrics = Map[String, Any](
274-
"name" -> "applications_started",
275-
"appId" -> appId,
276-
"appName" -> appName,
277-
"startTime" -> startTime,
278-
"epochMillis" -> epochMillis
279-
) ++ appLabels
280-
281-
report(appStartMetrics)
282210
}
283211

284212
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
285-
val completionTime = applicationEnd.time
286-
val safeEndTime = if (completionTime > 0) completionTime else System.currentTimeMillis()
287-
val duration = if (startTime > 0) safeEndTime - startTime else 0L
288-
val epochMillis = System.currentTimeMillis()
289-
val configurations = conf.getAll.toMap
290-
val appLabels = extractAppLabels(conf)
291-
292-
val appEndMetrics = Map[String, Any](
293-
"name" -> "applications_ended",
294-
"appId" -> appId,
295-
"appName" -> appName,
296-
"startTime" -> startTime,
297-
"completionTime" -> completionTime,
298-
"duration" -> duration,
299-
"executorsFailed" -> executorsFailed,
300-
"executorsKilled" -> executorsKilled,
301-
"totalJobsCompleted" -> totalJobsCompleted,
302-
"succeededJobsCount" -> succeededJobsCount,
303-
"failedJobsCount" -> failedJobsCount,
304-
"numStagesCompleted" -> totalStagesCompleted,
305-
"numSucceededStages" -> succeededStagesCount,
306-
"numFailedStages" -> failedStagesCount,
307-
"totalTaskCount" -> totalTaskCount,
308-
"numTaskFailed" -> numTaskFailed,
309-
"numTaskKilled" -> numTaskKilled,
310-
"epochMillis" -> epochMillis,
311-
"configurations" -> configurations
312-
) ++ appLabels
313-
314-
report(appEndMetrics)
315-
316213
logger.info(s"Spark application ended, timestamp = ${applicationEnd.time}, closing Kafka connection.")
317214
synchronized(
318215
if (Option(producer).isDefined) {
@@ -326,22 +223,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener {
326223
)
327224
}
328225

329-
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
330-
if (taskEnd != null) {
331-
totalTaskCount += 1
332-
333-
if (taskEnd.reason != null) {
334-
taskEnd.reason match {
335-
case _: TaskKilled =>
336-
numTaskKilled += 1
337-
case _: TaskFailedReason =>
338-
numTaskFailed += 1
339-
case _ =>
340-
}
341-
}
342-
}
343-
}
344-
345226
protected def report[T <: Any](metrics: Map[String, T]): Unit = {
346227
val result: Unit = Try {
347228
ensureProducer()
@@ -375,14 +256,6 @@ class KafkaSink(conf: SparkConf) extends SparkListener {
375256
)
376257
}
377258

378-
private def extractAppLabels(conf: SparkConf): Map[String, String] = {
379-
Try {
380-
conf.getAll
381-
.filter { case (key, _) => key.startsWith("spark.sparkmeasure.appLabels.") }
382-
.map { case (key, value) => (key.stripPrefix("spark.sparkmeasure."), value) }
383-
.toMap
384-
}.getOrElse(Map.empty[String, String])
385-
}
386259
}
387260

388261
/**
@@ -409,8 +282,6 @@ class KafkaSinkExtended(conf: SparkConf) extends KafkaSink(conf) {
409282
}
410283

411284
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
412-
super.onTaskEnd(taskEnd)
413-
414285
val taskInfo = taskEnd.taskInfo
415286
val taskmetrics = taskEnd.taskMetrics
416287
val epochMillis = System.currentTimeMillis()

0 commit comments

Comments
 (0)