Skip to content
Merged
74 changes: 74 additions & 0 deletions docs/Flight_recorder_mode_KafkaSink.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,28 @@ provided by the user. Use this mode to monitor Spark execution workload.
Notes:
- KafkaSink: the amount of data generated is relatively small in most applications: O(number_of_stages)
- KafkaSinkExtended can generate a large amount of data O(Number_of_tasks), use with care

## KafkaSinkV2 and KafkaSinkV2Extended

**KafkaSinkV2** is an enhanced version of KafkaSink that adds application-level aggregated metrics and custom labels support.
It collects all stage/executor/query metrics from the base KafkaSink plus additional application lifecycle events and counters.
**KafkaSinkV2Extended** extends KafkaSinkV2 to also record detailed metrics for each executed Task.

**Compatibility Note:** KafkaSinkV2 is backward compatible with KafkaSink in terms of configuration and basic event types.
Existing consumers of KafkaSink events will continue to work as expected. KafkaSinkV2 adds two new event types
(`applications_started` and `applications_ended`) with additional metadata that existing consumers can safely ignore
if not needed.

### Key Differences from KafkaSink

1. **Application-Level Metrics:** KafkaSinkV2 emits `applications_started` and `applications_ended` events with aggregated counters
2. **Custom Labels:** Support for custom metadata labels via `spark.sparkmeasure.appLabels.*` configuration
3. **Enhanced Application End Event:** Includes executor counts, job/stage/task counters, and selected Spark configurations
4. **Counter Tracking:** Tracks success/failure counts for jobs, stages, and tasks throughout application lifecycle

## Configuration

### KafkaSink / KafkaSinkExtended Configuration

How to use: attach the KafkaSink to a Spark Context using the extra listener infrastructure. Example:
- `--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSink`
Expand All @@ -40,6 +62,29 @@ Configuration - KafkaSink parameters:
Example: --conf spark.sparkmeasure.kafka.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
```

### KafkaSinkV2 / KafkaSinkV2Extended Configuration

How to use: attach the KafkaSinkV2 listener to the Spark Context:

```
# Start the listener for KafkaSinkV2 (recommended):
--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2

# Or use KafkaSinkV2Extended for task-level metrics (generates more data):
--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2Extended

# Required Kafka configuration (same as KafkaSink):
--conf spark.sparkmeasure.kafkaBroker = Kafka broker endpoint URL
Example: --conf spark.sparkmeasure.kafkaBroker=kafka.your-site.com:9092
--conf spark.sparkmeasure.kafkaTopic = Kafka topic
Example: --conf spark.sparkmeasure.kafkaTopic=sparkmeasure-metrics

# Optional - Custom application labels:
--conf spark.sparkmeasure.appLabels.<labelKey> = Custom metadata value
Example: --conf spark.sparkmeasure.appLabels.project=my-project
Example: --conf spark.sparkmeasure.appLabels.environment=production
```

This code depends on "kafka-clients". If you deploy sparkMeasure from maven central,
the dependency is being taken care of.
If you run sparkMeasure from a jar instead, you may need to add the dependency manually
Expand Down Expand Up @@ -102,3 +147,32 @@ bin/spark-shell \
"jobId" : "0",
"appId" : "local-1660057441489"
...
```

**Note:** KafkaSinkV2 also emits all standard events like `stages_started`, `stages_ended`, `stage_metrics`, etc.,
just like the original KafkaSink, ensuring backward compatibility.

## Migration Guide: KafkaSink to KafkaSinkV2

If you are currently using KafkaSink and want to migrate to KafkaSinkV2:

1. **Configuration Change:** Simply replace the listener class name:
```
# Old:
--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSink

# New:
--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2
```

2. **Backward Compatibility:** All existing event types and their schemas remain unchanged. Your existing Kafka consumers will continue to work.

3. **New Events:** KafkaSinkV2 adds two new event types (`applications_started` and `applications_ended`). If your consumers don't need these, they can simply ignore events with these names.

4. **Optional Custom Labels:** Add custom labels for better filtering and organization:
```
--conf spark.sparkmeasure.appLabels.project=my-project
--conf spark.sparkmeasure.appLabels.environment=staging
```

5. **Benefits:** You gain application-level aggregated metrics, custom labels, and selected Spark configurations without losing any existing functionality.
48 changes: 48 additions & 0 deletions docs/Reference_SparkMeasure_API_and_Configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,54 @@ This code depends on "kafka-clients", you may need to add the dependency explici
--packages org.apache.kafka:kafka-clients:3.7.0
```

## KafkaSinkV2 and KafkaSinkV2Extended

```
class KafkaSinkV2(conf: SparkConf) extends KafkaSink(conf)
class KafkaSinkV2Extended(conf: SparkConf) extends KafkaSinkV2(conf)

**KafkaSinkV2** is an enhanced version of KafkaSink that extends the SparkListener infrastructure
with application-level aggregated metrics and custom labels support.

Key Features:
1. All stage/executor/query metrics from the base KafkaSink
2. Application-level aggregated counters (executor/job/stage/task counts)
3. Custom labels via spark.sparkmeasure.appLabels.* configurations
4. Enhanced applications_started and applications_ended events with metadata
5. Automatic capture of selected Spark configurations

**Backward Compatibility:** KafkaSinkV2 emits all the same event types as KafkaSink,
ensuring existing consumers continue to work. It adds two new event types that can be
safely ignored by consumers that don't need them.

How to use:
* --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2

**KafkaSinkV2Extended** adds verbose task-level metrics (can generate O(Number_of_tasks) data)
* How to use:
* --conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSinkV2Extended

Configuration - KafkaSinkV2 parameters:

Required:
--conf spark.sparkmeasure.kafkaBroker = Kafka broker endpoint URL
Example: --conf spark.sparkmeasure.kafkaBroker=kafka.your-site.com:9092
--conf spark.sparkmeasure.kafkaTopic = Kafka topic
Example: --conf spark.sparkmeasure.kafkaTopic=spark-metrics

Optional - Custom labels (recommended for filtering and organization):
--conf spark.sparkmeasure.appLabels.<labelKey> = Custom metadata value
Example: --conf spark.sparkmeasure.appLabels.project=my-project
Example: --conf spark.sparkmeasure.appLabels.environment=production
Example: --conf spark.sparkmeasure.appLabels.team=data-engineering

For detailed event schemas and examples, see:
- docs/Flight_recorder_mode_KafkaSink.md

This code depends on "kafka-clients", you may need to add the dependency explicitly, example:
--packages org.apache.kafka:kafka-clients:3.7.0
```

## Prometheus PushGatewaySink
```
class PushGatewaySink(conf: SparkConf) extends SparkListener
Expand Down
Loading
Loading