Skip to content

Commit fbc313a

Browse files
fjammesLucaCanali
andauthored
Add support for jmx prometheus exporter
* Add support for jmx prometheus exporter - Write unit test for jmxexport function - Add documentation - Document how to run unit test - Add integration tests over k8s * Avoid importing implicits * Add support for e2e run in PR * Improve Dockerfile and provide on-liner to run e2e tests --------- Co-authored-by: Luca Canali <luca.canali@cern.ch>
1 parent eab34df commit fbc313a

36 files changed

Lines changed: 1289 additions & 164 deletions

.ciux

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
apiVersion: v1alpha1
2+
registry: k8sschool
3+
sourcePathes:
4+
- .ciux
5+
- Dockerfile
6+
- e2e/rootfs
7+
dependencies:
8+
- package: github.com/k8s-school/ink@v0.0.1-rc5
9+
labels:
10+
itest: "true"
11+
- package: github.com/k8s-school/ktbx@v1.1.4-rc7
12+
labels:
13+
itest: "true"
14+

.github/workflows/build_with_scala_and_python_tests.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ name: sparkMeasure CI
22

33
on:
44
push:
5-
branches: [ master ]
65
pull_request:
76
branches: [ master ]
87

.github/workflows/e2e.yml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
name: "e2e tests"
2+
on:
3+
push:
4+
pull_request:
5+
branches:
6+
- master
7+
jobs:
8+
main:
9+
name: Run spark-measure end-to-end tests
10+
runs-on: ubuntu-24.04
11+
steps:
12+
- name: Checkout code
13+
uses: actions/checkout@v2
14+
# Required by ciux
15+
with:
16+
fetch-depth: 0
17+
- name: Stop apparmor
18+
run: |
19+
sudo /etc/init.d/apparmor stop
20+
- uses: actions/setup-go@v3
21+
with:
22+
go-version: '^1.21.4'
23+
- name: Run ciux and create k8s/kind cluster
24+
run: |
25+
./e2e/prereq.sh
26+
- name: Build spark-measure image
27+
run: |
28+
./e2e/build.sh
29+
- name: Load spark-measure image into k8s/kind cluster
30+
run: |
31+
./e2e/push-image.sh -k -d
32+
- name: Run argocd
33+
run: |
34+
./e2e/argocd.sh
35+
- name: Access prometheus exporter metrics
36+
run: |
37+
./e2e/check-metrics.sh
38+
# - name: Push image
39+
# run: |
40+
# ./push-image.sh

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
/.ciux.d/
2+
/.vscode
13
.idea
24
target/
35
project/project/
46
project/target/
7+
__pycache__

Dockerfile

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Use a Scala SBT base image with Java 17 for building sparkMeasure jar files
2+
FROM sbtscala/scala-sbt:eclipse-temurin-alpine-17.0.15_6_1.11.3_2.13.16 AS builder
3+
4+
# Set the working directory
5+
WORKDIR /app
6+
7+
# Copy the SBT configuration file
8+
COPY build.sbt ./
9+
# Copy the SBT project configuration directory
10+
COPY project ./project
11+
12+
# Copy the application source code
13+
COPY src ./src
14+
15+
# Compile the project with Scala 2.12.18
16+
ENV SCALA_VERSION=2.12.18
17+
RUN sbt ++${SCALA_VERSION} package
18+
19+
# Use the official Spark image with Scala 2.12, Java 17, and Python 3 for runtime
20+
FROM docker.io/library/spark:3.5.6-scala2.12-java17-python3-ubuntu
21+
22+
USER root
23+
24+
# Set up the Prometheus JMX exporter to expose metrics to Prometheus
25+
ENV JMX_EXPORTER_AGENT_VERSION=1.1.0
26+
ADD https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_AGENT_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_AGENT_VERSION}.jar /opt/spark/jars
27+
RUN chmod 644 /opt/spark/jars/jmx_prometheus_javaagent-${JMX_EXPORTER_AGENT_VERSION}.jar
28+
29+
# Add the local sparkMeasure python code to the image
30+
ADD python /opt/src/python
31+
32+
# Install the local sparkMeasure python package
33+
RUN pip install /opt/src/python
34+
35+
# Add rootfs filesystem, it contains python scripts for runnning end-to-end tests
36+
ADD e2e/rootfs/ /
37+
38+
# Copy the compiled jar from the build stage
39+
COPY --from=builder /app/target/scala-2.12/*.jar /opt/spark/jars/
40+
41+
# Set the Spark user
42+
ARG spark_uid=185
43+
ENV spark_uid=${spark_uid}
44+
USER ${spark_uid}
45+
46+
# Expose port 4040 for the Spark UI
47+
EXPOSE 4040
48+
49+
# Set the default entrypoint
50+
CMD ["/bin/bash"]

README.md

Lines changed: 91 additions & 72 deletions
Large diffs are not rendered by default.

docs/Instrument_Scala_code.md

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,19 @@
33
SparkMeasure can be used to instrument parts of your Scala code to measure Apache Spark workload.
44
Use this for example for performance troubleshooting, application instrumentation, workload studies, etc.
55

6-
### Example code
7-
8-
You can find an example of how to instrument a Scala application running Apache Spark jobs at this link:
6+
### Example code
7+
8+
You can find an example of how to instrument a Scala application running Apache Spark jobs at this link:
99
[link to example application](../examples/testSparkMeasureScala)
10-
10+
1111
How to run the example:
1212
```
1313
# build the example jar
1414
sbt package
1515
1616
bin/spark-submit --master local[*] --packages ch.cern.sparkmeasure:spark-measure_2.13:0.25 --class ch.cern.testSparkMeasure.testSparkMeasure <path_to_the_example_jar>/testsparkmeasurescala_2.13-0.1.jar
1717
```
18-
18+
1919
### Collect and save Stage Metrics
2020
An example of how to collect task metrics aggregated at the stage execution level.
2121
Some relevant snippets of code are:
@@ -29,7 +29,7 @@ Some relevant snippets of code are:
2929
stageMetrics.runAndMeasure {
3030
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
3131
}
32-
32+
3333
// print report to standard output
3434
stageMetrics.printReport()
3535

@@ -39,11 +39,11 @@ Some relevant snippets of code are:
3939

4040
// Introduced in sparkMeasure v0.21, memory metrics report:
4141
stageMetrics.printMemoryReport()
42-
42+
4343
//save session metrics data
4444
val df = stageMetrics.createStageMetricsDF("PerfStageMetrics")
4545
stageMetrics.saveData(df.orderBy("jobId", "stageId"), "/tmp/stagemetrics_test1")
46-
46+
4747
val aggregatedDF = stageMetrics.aggregateStageMetrics("PerfStageMetrics")
4848
stageMetrics.saveData(aggregatedDF, "/tmp/stagemetrics_report_test2")
4949
```
@@ -64,8 +64,9 @@ to study skew effects, otherwise consider using stagemetrics aggregation as pref
6464

6565
### Export to Prometheus PushGateway
6666

67-
You have the option to export aggregated stage metrics and/or task metrics to a Prometheus push gateway.
68-
See details at: [Prometheus Pushgateway](Prometheus.md)
67+
You have the option to export aggregated stage metrics and/or task metrics to:
68+
- a Prometheus push gateway, see details at: [Prometheus Pushgateway](Prometheus.md)
69+
- the JMX Prometheus exporter, see details at: [Prometheus exporter through JMX](Prometheus_through_JMX.md)
6970

7071
### Run sparkMeasure using the packaged version from Maven Central
7172

@@ -89,7 +90,7 @@ See details at: [Prometheus Pushgateway](Prometheus.md)
8990
9091
# Run as in one of these examples:
9192
bin/spark-submit --jars path>/spark-measure_2.13-0.26-SNAPSHOT.jar
92-
93+
9394
# alternative, set classpath for the driver (it is only needed in the driver)
9495
bin/spark-submit --conf spark.driver.extraClassPath=<path>/spark-measure_2.13-0.26-SNAPSHOT.jar ...
9596
```

docs/Prometheus_through_JMX.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
## Exporting sparkMeasure Metrics to Prometheus via JMX
2+
3+
`sparkMeasure` collects execution metrics from Spark jobs at the driver or executor level. While it does not expose its metrics directly via JMX, it can be used alongside Spark's JMX metrics system to enable Prometheus-based monitoring.
4+
5+
In a Kubernetes environment using the **Spark Operator**, you can configure the Spark driver and executor to expose their sparkMeasure metrics through JMX Prometheus exporter and scrape them with Prometheus.
6+
7+
> ✅ This setup has been validated **only** in **Kubernetes environments using the [Spark Operator](https://www.kubeflow.org/docs/components/spark-operator)**.
8+
9+
### Enable the JMX prometheus exporter in Spark
10+
11+
To configure JMX and Prometheus exporter monitoring with Spark on Kubernetes, follow the official Kubeflow Spark Operator documentation:
12+
13+
📖 [Monitoring with JMX and Prometheus — Kubeflow Spark Operator Guide](https://www.kubeflow.org/docs/components/spark-operator/user-guide/monitoring-with-jmx-and-prometheus/)
14+
15+
### Exporting `sparkMeasure` Metrics via JMX in Python
16+
17+
To programmatically export `sparkMeasure` metrics in Python alongside standard JMX metrics, you can leverage the `jmxexport` function from the `sparkmeasure.jmx` module. This enables custom metrics collected during job execution to be exposed through the same Prometheus exporter as native Spark metrics.
18+
19+
#### Example Usage
20+
21+
```python
22+
from sparkmeasure import StageMetrics
23+
from sparkmeasure.jmx import jmxexport
24+
25+
stage_metrics = StageMetrics(spark)
26+
stage_metrics.begin()
27+
28+
# ... run your Spark jobs here ...
29+
30+
stage_metrics.end()
31+
current_metrics = stagemetrics.aggregate_stagemetrics()
32+
33+
# export the metrics to JMX Prometheus exporter
34+
jmxexport(spark, current_metrics)
35+
```
36+
37+
The `jmxexport()` call updates the current Spark application’s JMX metrics with the `sparkMeasure` results, making them available to any configured Prometheus instance.
38+
39+
See a full implementation example here:
40+
📄 [How to use the JMX exporter in Python code](../e2e/rootfs/opt/spark/examples/spark-sql.py)
41+
42+
---
43+
44+
### Prometheus Exporter Configuration
45+
46+
In addition to exposing metrics via JMX, you must configure the Prometheus JMX exporter in the Spark driver and executor pods to make the custom `sparkMeasure` metrics queryable by Prometheus. This configuration should be added *on top of* the existing JMX metrics exporter configuration.
47+
48+
Ensure your Spark pod manifest or Helm chart includes a properly configured `ConfigMap` for the JMX exporter. Specifically, you’ll need to add mappings for the custom `sparkMeasure` metrics to the YAML under the `rules` section used by the Prometheus JMX exporter.
49+
50+
A production-ready configuration example is available here:
51+
📄 [How to configure the Prometheus exporter to expose sparkMeasure metrics](../e2e/charts/spark-demo/templates/jmx-configmap.yaml)
52+
53+
---
54+
55+
By combining Python-based metric collection with a Prometheus-compatible JMX exporter, you can ensure comprehensive observability for Spark applications, including custom performance instrumentation through `sparkMeasure`.
56+
57+
> **Security Tip:** In production environments, ensure that JMX ports are protected using appropriate Kubernetes NetworkPolicies or service mesh configurations. Avoid exposing unauthenticated JMX endpoints externally to mitigate the risk of unauthorized access.

docs/Python_shell_and_Jupyter.md

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
# sparkMeasure on PySpark
22

3-
Notes on how to use sparkMeasure to collect Spark workload metrics when using PySpark from command line
4-
or from a Jupyter notebook.
3+
Notes on how to use sparkMeasure to collect Spark workload metrics when using PySpark from command line
4+
or from a Jupyter notebook.
55
See also [README](../README.md) for an introduction to sparkMeasure and its architecture.
66

77
### Deployment and installation
88

9-
- Use PyPi to install the Python wrapper and take the jar from Maven central:
9+
- Use PyPi to install the Python wrapper and take the jar from Maven central:
1010
```
1111
pip install pyspark # Spark 4
1212
pip install sparkmeasure
@@ -18,51 +18,51 @@ See also [README](../README.md) for an introduction to sparkMeasure and its arch
1818
cd sparkmeasure
1919
sbt +package
2020
ls -l target/scala-2.13/spark-measure*.jar # note location of the compiled and packaged jar
21-
21+
2222
# Install the Python wrapper package
2323
cd python
2424
pip install .
25-
25+
2626
# Run as in one of these examples:
2727
bin/pyspark --jars path>/spark-measure_2.13-0.26-SNAPSHOT.jar
28-
28+
2929
#Alternative:
3030
bin/pyspark --conf spark.driver.extraClassPath=<path>/spark-measure_2.13-0.26-SNAPSHOT.jar
3131
```
32-
33-
32+
33+
3434
### PySpark example
3535
1. How to collect and print Spark task stage metrics using sparkMeasure, example in Python:
3636
```python
3737
from sparkmeasure import StageMetrics
3838
stagemetrics = StageMetrics(spark)
39-
39+
4040
stagemetrics.begin()
4141
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()
4242
stagemetrics.end()
4343
4444
stagemetrics.print_report()
4545
stagemetrics.print_memory_report()
46-
46+
4747
# get metrics as a dictionary
4848
metrics = stagemetrics.aggregate_stage_metrics()
4949
```
5050
2. Similar to example 1, but with a shortcut to run code and measure it with one command line:
5151
```python
5252
from sparkmeasure import StageMetrics
5353
stagemetrics = StageMetrics(spark)
54-
54+
5555
stagemetrics.runandmeasure(globals(),
5656
'spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()')
57-
57+
5858
stageMetrics.print_memory_report()
5959
```
6060

6161
### Jupyter notebook example
6262

63-
Jupyter notebooks are a popular way to interact with PySpark for data analysis.
63+
Jupyter notebooks are a popular way to interact with PySpark for data analysis.
6464
Example Jupyter notebook showing the use of basic sparkMeasure instrumentation:
65-
65+
6666
[SparkMeasure_Jupyer_Python_getting_started.ipynb](examples/SparkMeasure_Jupyer_Python_getting_started.ipynb)
6767

6868
Note, in particular with Jupyter notebooks it can be handy to write cell magic to wrap the instrumentation,
@@ -84,7 +84,7 @@ def sparkmeasure(line, cell=None):
8484
### Collecting metrics at finer granularity: use Task metrics
8585

8686
Collecting Spark task metrics at the granularity of each task completion has additional overhead
87-
compare to collecting at the stage completion level, therefore this option should only be used if you need data with
87+
compare to collecting at the stage completion level, therefore this option should only be used if you need data with
8888
this finer granularity, for example because you want to study skew effects, otherwise consider using
8989
stagemetrics aggregation as preferred choice.
9090

@@ -98,7 +98,7 @@ stagemetrics aggregation as preferred choice.
9898
taskmetrics.end()
9999
taskmetrics.print_report()
100100
```
101-
101+
102102
```python
103103
from sparkmeasure import TaskMetrics
104104
taskmetrics = TaskMetrics(spark)
@@ -108,18 +108,18 @@ stagemetrics aggregation as preferred choice.
108108

109109
### Exporting metrics data for archiving and/or further analysis
110110

111-
One simple use case is to make use of the data collected and reported by stagemetrics and taskmetrics
112-
printReport methods for immediate troubleshooting and workload analysis.
113-
You also have options to save metrics aggregated as in the printReport output.
111+
One simple use case is to make use of the data collected and reported by stagemetrics and taskmetrics
112+
printReport methods for immediate troubleshooting and workload analysis.
113+
You also have options to save metrics aggregated as in the printReport output.
114+
115+
Another option is to export the metrics to an external system, see details at [Prometheus Pushgateway](Prometheus.md) or or [Prometheus exporter through JMX](Prometheus_through_JMX.md).
114116

115-
Another option is to export the metrics to an external system, see details at [Prometheus Pushgateway](Prometheus.md)
116-
117117
- Example on how to export raw Stage metrics data in json format
118118
```python
119119
from sparkmeasure import StageMetrics
120120
stagemetrics = StageMetrics(spark)
121121
stagemetrics.runandmeasure(globals(), ...your workload here ... )
122-
122+
123123
df = stagemetrics.create_stagemetrics_DF("PerfStageMetrics")
124124
df.show()
125125
stagemetrics.save_data(df.orderBy("jobId", "stageId"), "stagemetrics_test1", "json")
@@ -156,4 +156,4 @@ Stage 3 JVMHeapMemory maxVal bytes => 279558120 (266.6 MB)
156156
Stage 3 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
157157
```
158158
159-
159+

0 commit comments

Comments
 (0)