Skip to content

Commit fb8cdeb

Browse files
Add KafkaSinkV2 (#79)
* Add AppEventKafkaSink and AppEventKafkaSinkExtended for application-level metrics and custom metadata support * Add AppEventKafkaSink and AppEventKafkaSinkExtended for application-level metrics and custom metadata support * Standardised variable name * Renamed customFields to appLabels to correctly identify in event * Added applications start and end event in Kafka Sink * Added applications start and end event in Kafka Sink * Removed totalExecutorCount metric * Removed successful metric from app_end * Reverted KafkaSink to actual position and added new event class KafkaSinkV2, testcase and doc --------- Co-authored-by: pp-achauhan <achauhan@pulsepoint.com>
1 parent e253a7a commit fb8cdeb

4 files changed

Lines changed: 826 additions & 0 deletions

File tree

docs/Flight_recorder_mode_KafkaSink.md

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,28 @@ provided by the user. Use this mode to monitor Spark execution workload.
1414
Notes:
1515
- KafkaSink: the amount of data generated is relatively small in most applications: O(number_of_stages)
1616
- KafkaSinkExtended can generate a large amount of data O(Number_of_tasks), use with care
17+
18+
## KafkaSinkV2 and KafkaSinkV2Extended
19+
20+
**KafkaSinkV2** is an enhanced version of KafkaSink that adds application-level aggregated metrics and custom labels support.
21+
It collects all stage/executor/query metrics from the base KafkaSink plus additional application lifecycle events and counters.
22+
**KafkaSinkV2Extended** extends KafkaSinkV2 to also record detailed metrics for each executed Task.
23+
24+
**Compatibility Note:** KafkaSinkV2 is backward compatible with KafkaSink in terms of configuration and basic event types.
25+
Existing consumers of KafkaSink events will continue to work as expected. KafkaSinkV2 adds two new event types
26+
(`applications_started` and `applications_ended`) with additional metadata that existing consumers can safely ignore
27+
if not needed.
28+
29+
### Key Differences from KafkaSink
30+
31+
1. **Application-Level Metrics:** KafkaSinkV2 emits `applications_started` and `applications_ended` events with aggregated counters
32+
2. **Custom Labels:** Support for custom metadata labels via `spark.sparkmeasure.appLabels.*` configuration
33+
3. **Enhanced Application End Event:** Includes executor counts, job/stage/task counters, and selected Spark configurations
34+
4. **Counter Tracking:** Tracks success/failure counts for jobs, stages, and tasks throughout application lifecycle
35+
36+
## Configuration
37+
38+
### KafkaSink / KafkaSinkExtended Configuration
1739

1840
How to use: attach the KafkaSink to a Spark Context using the extra listener infrastructure. Example:
1941
- `--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSink`
@@ -40,6 +62,29 @@ Configuration - KafkaSink parameters:
4062
Example: --conf spark.sparkmeasure.kafka.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
4163
```
4264

65+
### KafkaSinkV2 / KafkaSinkV2Extended Configuration
66+
67+
How to use: attach the KafkaSinkV2 listener to the Spark Context:
68+
69+
```
70+
# Start the listener for KafkaSinkV2 (recommended):
71+
--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2
72+
73+
# Or use KafkaSinkV2Extended for task-level metrics (generates more data):
74+
--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2Extended
75+
76+
# Required Kafka configuration (same as KafkaSink):
77+
--conf spark.sparkmeasure.kafkaBroker = Kafka broker endpoint URL
78+
Example: --conf spark.sparkmeasure.kafkaBroker=kafka.your-site.com:9092
79+
--conf spark.sparkmeasure.kafkaTopic = Kafka topic
80+
Example: --conf spark.sparkmeasure.kafkaTopic=sparkmeasure-metrics
81+
82+
# Optional - Custom application labels:
83+
--conf spark.sparkmeasure.appLabels.<labelKey> = Custom metadata value
84+
Example: --conf spark.sparkmeasure.appLabels.project=my-project
85+
Example: --conf spark.sparkmeasure.appLabels.environment=production
86+
```
87+
4388
This code depends on "kafka-clients". If you deploy sparkMeasure from maven central,
4489
the dependency is being taken care of.
4590
If you run sparkMeasure from a jar instead, you may need to add the dependency manually
@@ -102,3 +147,32 @@ bin/spark-shell \
102147
"jobId" : "0",
103148
"appId" : "local-1660057441489"
104149
...
150+
```
151+
152+
**Note:** KafkaSinkV2 also emits all standard events like `stages_started`, `stages_ended`, `stage_metrics`, etc.,
153+
just like the original KafkaSink, ensuring backward compatibility.
154+
155+
## Migration Guide: KafkaSink to KafkaSinkV2
156+
157+
If you are currently using KafkaSink and want to migrate to KafkaSinkV2:
158+
159+
1. **Configuration Change:** Simply replace the listener class name:
160+
```
161+
# Old:
162+
--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSink
163+
164+
# New:
165+
--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2
166+
```
167+
168+
2. **Backward Compatibility:** All existing event types and their schemas remain unchanged. Your existing Kafka consumers will continue to work.
169+
170+
3. **New Events:** KafkaSinkV2 adds two new event types (`applications_started` and `applications_ended`). If your consumers don't need these, they can simply ignore events with these names.
171+
172+
4. **Optional Custom Labels:** Add custom labels for better filtering and organization:
173+
```
174+
--conf spark.sparkmeasure.appLabels.project=my-project
175+
--conf spark.sparkmeasure.appLabels.environment=staging
176+
```
177+
178+
5. **Benefits:** You gain application-level aggregated metrics, custom labels, and selected Spark configurations without losing any existing functionality.

docs/Reference_SparkMeasure_API_and_Configs.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,54 @@ This code depends on "kafka-clients", you may need to add the dependency explici
433433
--packages org.apache.kafka:kafka-clients:3.7.0
434434
```
435435

436+
## KafkaSinkV2 and KafkaSinkV2Extended
437+
438+
```
439+
class KafkaSinkV2(conf: SparkConf) extends KafkaSink(conf)
440+
class KafkaSinkV2Extended(conf: SparkConf) extends KafkaSinkV2(conf)
441+
442+
**KafkaSinkV2** is an enhanced version of KafkaSink that extends the SparkListener infrastructure
443+
with application-level aggregated metrics and custom labels support.
444+
445+
Key Features:
446+
1. All stage/executor/query metrics from the base KafkaSink
447+
2. Application-level aggregated counters (executor/job/stage/task counts)
448+
3. Custom labels via spark.sparkmeasure.appLabels.* configurations
449+
4. Enhanced applications_started and applications_ended events with metadata
450+
5. Automatic capture of selected Spark configurations
451+
452+
**Backward Compatibility:** KafkaSinkV2 emits all the same event types as KafkaSink,
453+
ensuring existing consumers continue to work. It adds two new event types that can be
454+
safely ignored by consumers that don't need them.
455+
456+
How to use:
457+
* --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2
458+
459+
**KafkaSinkV2Extended** adds verbose task-level metrics (can generate O(Number_of_tasks) data)
460+
* How to use:
461+
* --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2Extended
462+
463+
Configuration - KafkaSinkV2 parameters:
464+
465+
Required:
466+
--conf spark.sparkmeasure.kafkaBroker = Kafka broker endpoint URL
467+
Example: --conf spark.sparkmeasure.kafkaBroker=kafka.your-site.com:9092
468+
--conf spark.sparkmeasure.kafkaTopic = Kafka topic
469+
Example: --conf spark.sparkmeasure.kafkaTopic=spark-metrics
470+
471+
Optional - Custom labels (recommended for filtering and organization):
472+
--conf spark.sparkmeasure.appLabels.<labelKey> = Custom metadata value
473+
Example: --conf spark.sparkmeasure.appLabels.project=my-project
474+
Example: --conf spark.sparkmeasure.appLabels.environment=production
475+
Example: --conf spark.sparkmeasure.appLabels.team=data-engineering
476+
477+
For detailed event schemas and examples, see:
478+
- docs/Flight_recorder_mode_KafkaSink.md
479+
480+
This code depends on "kafka-clients", you may need to add the dependency explicitly, example:
481+
--packages org.apache.kafka:kafka-clients:3.7.0
482+
```
483+
436484
## Prometheus PushGatewaySink
437485
```
438486
class PushGatewaySink(conf: SparkConf) extends SparkListener

0 commit comments

Comments
 (0)