[Improve] [Zeta] Add observability metrics for engine state stores#10860
[Improve] [Zeta] Add observability metrics for engine state stores#10860JeremyXin wants to merge 1 commit intoapache:devfrom
Conversation
DanielLeens
left a comment
There was a problem hiding this comment.
Thanks for working on this. I like the goal here: state-store visibility is genuinely useful for operating Zeta. I traced the full /metrics path on the latest head, and I found one blocking issue before this can safely go into seatunnel-engine: the new collector performs distributed IMap.size() calls on the metrics scrape hot path.
What problem this PR solves
- User pain point
Today Zeta exposes JVM, thread-pool, cluster, and node metrics, but it does not directly expose the size/resource footprint of engine state stores such asrunningJobState,checkpointMonitor, orconnectorJarRefCounters. That makes state growth harder to observe during production troubleshooting. - Fix approach
This PR addsEngineStateStoreMetricExportsto publish global entry counts plus local owned/backup/heap stats for 12 engine-side Hazelcast IMaps, updates the English/Chinese telemetry docs, and adds collector/API tests. - One-line summary
The observability direction makes sense, but the current implementation puts distributed global-size collection into the Prometheus scrape path, which is too expensive for the engine control plane.
I. Code review
1.1 Core logic analysis
Precise change summary
- Main files
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreMetricExports.java:32-119seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/ExportsInstanceInitializer.java:41-53docs/en/engines/zeta/telemetry.md:54-67docs/zh/engines/zeta/telemetry.md:54-66.../EngineStateStoreMetricExportsTest.java:55-159.../MetricsApiTest.java:52-59
- Main path
ExportsInstanceInitializer.init(...)EngineStateStoreMetricExports.collect()EngineStateStoreMetricExports.collectStoreMetrics(...)MetricsServlet.doGet(...)
Before / after snippets
Before, the default registry did not include this collector:
new JobMetricExports(node).register(collectorRegistry);
new JobThreadPoolStatusExports(node).register(collectorRegistry);
new NodeMetricExports(node).register(collectorRegistry);
new ClusterMetricExports(node).register(collectorRegistry);After:
new JobMetricExports(node).register(collectorRegistry);
new JobThreadPoolStatusExports(node).register(collectorRegistry);
new NodeMetricExports(node).register(collectorRegistry);
new EngineStateStoreMetricExports(node).register(collectorRegistry);
new ClusterMetricExports(node).register(collectorRegistry);The key runtime logic is:
for (String storeName : ENGINE_STATE_STORES) {
collectStoreMetrics(
storeName, entries, localOwnedEntries, localBackupEntries, localHeapCostBytes);
}
...
IMap<?, ?> map = getNode().hazelcastInstance.getMap(storeName);
if (isMaster()) {
entries.addMetric(labelValues(storeName, BACKEND), map.size());
}
LocalMapStats localMapStats = map.getLocalMapStats();And /metrics collects on every request:
TextFormat.writeFormat(
contentType, stringWriter, collectorRegistry.metricFamilySamples());Key findings
- The normal path absolutely hits this PR: once the collector is registered at startup, every
/metricsor/openmetricsscrape executescollect(). - The local-stat part is fine in principle:
getLocalMapStats()is a local read. - The risky part is the master-only global metric:
map.size()is executed for each of the 12 state stores on every scrape. - SeaTunnel already uses
IMap.size()in admin-style code paths such asGetOverviewOperation.getOverviewInfo(...), but that is a one-off management query. Using the same kind of global read in a Prometheus collector is a very different load pattern. - So this is not just “adding a few gauges”; it changes the runtime cost of the metrics endpoint itself.
Deep logic analysis
Full runtime path:
Node startup
-> ExportsInstanceInitializer.init(node) [ExportsInstanceInitializer.java:36-54]
-> new EngineStateStoreMetricExports(node).register(defaultRegistry)
Prometheus scrape
-> MetricsServlet.doGet(...) [MetricsServlet.java:45-62]
-> collectorRegistry.metricFamilySamples()
-> EngineStateStoreMetricExports.collect() [EngineStateStoreMetricExports.java:55-85]
-> iterate over 12 ENGINE_STATE_STORES
-> collectStoreMetrics(storeName, ...) [87-118]
-> hazelcastInstance.getMap(storeName)
-> if master: map.size() // distributed global size
-> map.getLocalMapStats() // local owned/backup/heap stats
That means:
- It is hit whenever metrics scraping is enabled and active.
- It is not limited to debugging or an explicit admin action.
- The cost scales with scrape frequency and with the number/size of engine state stores.
- The master node becomes the place where the extra distributed work is concentrated.
1.2 Compatibility impact
- Conclusion: partially incompatible
- API: no REST path change, but
/metricsnow exposes four new metric families - Config options: unchanged
- Defaults: telemetry-enabled deployments now implicitly do more work per scrape
- Protocol: Prometheus/OpenMetrics output is extended
- Serialization format: unchanged
- Historical behavior: metrics scraping changes from mostly lightweight local/JMX-style reads to a path that now also performs distributed global-size collection on the master
1.3 Performance / side effects
- CPU: every scrape iterates all 12 stores; the master additionally performs 12 global
map.size()calls - Memory / GC: repeated label/sample allocation is modest, but continuous under scrape load
- Network: the global-size path is not a local field read; it introduces distributed-statistics cost
- Concurrency / cluster load: no explicit locking issue in the collector itself, but it adds recurring management-side pressure to the master and Hazelcast cluster
- Retry / idempotency: metrics scrapes are naturally periodic, so this cost is amplified by every Prometheus replica and scrape interval
- Resource release: no leak risk, but real steady-state overhead risk
1.4 Error handling and logging
- Good:
HazelcastInstanceNotActiveExceptionis handled gracefully during shutdown/inactive periods.- Other exceptions are logged instead of being swallowed silently.
- Remaining issue:
- There is no throttling, caching, or cheap fallback for the expensive global-size part.
Issue 1: Putting distributed global IMap.size() calls on the /metrics scrape hot path adds steady extra load to the master and Hazelcast cluster
- Location:
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreMetricExports.java:93-102 - Description:
This code runs insideMetricsServlet.doGet(...) -> collectorRegistry.metricFamilySamples() -> EngineStateStoreMetricExports.collectStoreMetrics(...), so it is part of the normal Prometheus scrape path.getLocalMapStats()is a local stats read and is fine conceptually, butif (isMaster()) entries.addMetric(..., map.size())turns each scrape into a sequence of global-size lookups across 12 distributed Hazelcast IMaps. That is very different from the existing one-off admin query inGetOverviewOperation.getOverviewInfo(...), and it means the metrics endpoint itself now performs recurring cluster-wide work. - Risk:
This is a control-plane risk, not a cosmetic one. In larger clusters or under frequent scraping,/metricscan become a stable source of extra work on the master. Forseatunnel-engine, that is a merge-blocking concern because the monitoring path should not become a new source of management pressure by default. - Best improvement:
Option A: export only local metrics (ownedEntryCount,backupEntryCount,heapCost) and let Prometheus compute cluster totals withsum(...).
Option B: if you really need a global total metric, collect/cache it asynchronously in the background rather than triggering distributedmap.size()work on every scrape. - Severity: High
II. Code quality
2.1 Code conventions
- The overall coding style is consistent.
- I would still recommend a short class-level or field-level note explaining why this exact fixed store list is tracked and which metrics are intentionally local vs. master-only.
2.2 Test coverage
- Added coverage
EngineStateStoreMetricExportsTestchecks sample presence, label structure, and store coverage.MetricsApiTestverifies the metrics endpoint exposes the new family.
- Missing coverage
- There is no test exercising the cost/behavior boundary of the distributed global-size path.
- Current tests are single-node happy-path checks, so they cannot expose the operational issue above.
2.3 Documentation updates
- This is a user-visible metrics change, and both
docs/enanddocs/zhwere updated. - The two docs are broadly consistent in metric names, labels, and descriptions.
III. Architecture
3.1 Elegance of the solution
- The local-state metrics are a good direction.
- The global total metric is where the design becomes too heavy: it mixes admin-style distributed counting into a collector that runs on every scrape.
3.2 Maintainability
- The implementation is easy to read mechanically.
- The harder long-term issue is performance semantics: the code looks like “just another gauge,” but it actually carries distributed collection cost.
3.3 Extensibility
- If we keep this pattern, it becomes too easy to add more expensive cluster reads into collectors later.
- It would be healthier to keep collectors cheap and move aggregation either to Prometheus or to cached background sampling.
3.4 Historical-version compatibility
- The metric names themselves do not break existing endpoints.
- The runtime behavior does change by adding new default work to metrics scraping, which is why I’m calling this partially incompatible from an operational perspective.
IV. Issue summary
| No. | Issue | Location | Severity |
|---|---|---|---|
| 1 | Putting distributed global IMap.size() calls on the /metrics scrape hot path adds steady extra load to the master and Hazelcast cluster |
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/metrics/exports/EngineStateStoreMetricExports.java:93-102 |
High |
V. Merge conclusion
Conclusion: can merge after fixes
- Blocking items
- Issue 1: the current collector performs distributed global-size work in the default metrics scrape path, which is too expensive for the engine control plane.
- Suggested but non-blocking improvements
- Add a short comment explaining the fixed store list and the local-vs-global metric split.
Overall assessment:
The feature idea is worthwhile, and the local state-store visibility is useful. The problem is the cost model of the global total metric in its current form. If you switch that part to Prometheus-side aggregation or a cached background sample, the design becomes much safer for seatunnel-engine.
Possible alternatives:
- Option A: keep only local metrics and compute totals in Prometheus. This is the cleanest option.
- Option B: keep the global-total semantics, but compute/cache them asynchronously rather than on every scrape.
CI note:
- The
Buildcheck is currently red, but the available GitHub metadata points its details URL to thePull Request Labelerrun rather than a telemetry-specific failing step, so I could not attribute a metrics-specific failure to this diff from the exposed logs alone.
Purpose of this pull request
fix #10766
This pull request adds basic observability metrics for Zeta engine state stores backed by Hazelcast
IMap.Main changes in this PR:
EngineStateStoreMetricExportsThis PR is intentionally kept within the Phase 1 scope discussed in issue #10766:
backend="hazelcast"labelDoes this PR introduce any user-facing change?
Yes. This PR adds new user-visible telemetry metrics to the existing Zeta Prometheus metrics endpoint.
How was this patch tested?
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.