Skip to content

kafka_consumer: bound and refine estimated_consumer_lag#24167

Open
piochelepiotr wants to merge 12 commits into
masterfrom
pwolski/kafka-consumer-lag-time-caps
Open

kafka_consumer: bound and refine estimated_consumer_lag#24167
piochelepiotr wants to merge 12 commits into
masterfrom
pwolski/kafka-consumer-lag-time-caps

Conversation

@piochelepiotr

Copy link
Copy Markdown
Contributor

What does this PR do?

Two refinements to the kafka.estimated_consumer_lag (lag-in-time) calculation in the kafka_consumer check:

  1. Bound left-extrapolation. The broker timestamp cache stores (highwater_offset → time) samples clustered near the latest offset. When a consumer offset is older than the oldest cached sample, the lag is estimated by extrapolating the affine offset/time fit into the past — an unreliable region that could produce an arbitrarily large lag. We now clamp the extrapolated timestamp to at most LAG_EXTRAPOLATION_LIMIT_SECONDS (10 minutes) before the oldest cached sample. This is a continuous cap: reported lag rises smoothly up to roughly cache_window + 10min rather than jumping. Interpolation between known offsets is unaffected.

  2. Use the low watermark as a floor for the lag offset. A consumer that has fallen behind the partition low watermark can never read the messages between its committed offset and the low watermark — they are out of retention. So for lag-in-time we now interpolate from max(consumer_offset, low_watermark) instead of the raw consumer offset. The offset-based consumer_lag metric is unchanged. Low watermark offsets are fetched (via AdminClient.list_offsets(earliest)) only when enable_cluster_monitoring is enabled; with cluster monitoring off, behavior is unchanged.

Motivation

Consumers replaying old data or lagging beyond the cache/retention window produced wildly overestimated estimated_consumer_lag values. Both changes keep the metric meaningful and bounded in those cases.

Review checklist (to be filled by reviewers)

  • Feature or bugfix MUST have appropriate tests (unit, integration, e2e)
  • Add qa/required if this PR needs QA validation, or qa/skip-qa if it does not. Exactly one of the two is required.
  • If you need to backport this PR to another branch, you can add the backport/<branch-name> label to the PR and it will automatically open a backport PR once this one is merged

🤖 Generated with Claude Code

Cap left-extrapolation of the broker timestamp cache so a consumer offset
older than the oldest cached sample cannot extrapolate more than 10 minutes
past it, keeping estimated_consumer_lag bounded.

Use max(consumer_offset, low_watermark) as the offset basis for lag-in-time
when cluster monitoring is enabled: messages below the low watermark are out
of retention and unreachable, so they should not inflate the time lag.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@datadog-datadog-prod-us1-2

datadog-datadog-prod-us1-2 Bot commented Jun 24, 2026

Copy link
Copy Markdown

Tests  Code Coverage

🎉 All green!

🧪 All tests passed
❄️ No new flaky tests detected

🎯 Code Coverage (details)
Patch Coverage: 96.69%
Overall Coverage: 91.87% (+3.87%)

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: 9f76e78 | Docs | Datadog PR Page | Give us feedback!

piochelepiotr and others added 10 commits June 24, 2026 16:45
Replace single-oldest eviction with batch compaction (Visvalingam-Whyatt)
triggered when the cache reaches capacity: keep the oldest and newest samples
and drop the points that least distort the offset/timestamp curve, so the
cache spans a longer history at a coarsening resolution and high lag is
interpolated rather than extrapolated.

At the same trigger, prune samples below the earliest consumer offset (keeping
one anchor) since no consumer will ever interpolate there.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Use the partition low watermark as the prune floor when cluster monitoring is
enabled (the physically meaningful "lowest readable offset"), falling back to
the earliest committed consumer offset otherwise. The low watermark is now
fetched before the cache update and reused for both pruning and the lag-in-time
floor, so there is no extra broker call.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Previously the log-start (low watermark) offsets were fetched twice per run
when cluster monitoring and data streams were both enabled: once by the
metadata collector for partition.size/topic.size/throughput, and again by the
lag path for the lag-in-time and cache-pruning floor.

Fetch them once in check(), gated on cluster monitoring, over all non-internal
topic partitions, and share the result with both the data-streams lag path and
the metadata collector. Removes the duplicate list_offsets(earliest) call and
the divergent internal-topic handling.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…etch

Drop the PR-added Client.get_low_watermark_offsets and the
_get_low_watermark_offsets wrapper, which duplicated the existing
ClusterMetadataCollector._fetch_earliest_offsets. The check now calls
_fetch_earliest_offsets once under cluster monitoring and shares the result
with both the data-streams lag/pruning path and the topic-metadata collection,
so the earliest offsets are still fetched only once per run.

This reverts client.py to master and keeps the cluster_metadata.py change to a
small signature tweak.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Drop the redundant earliest_offsets alias and reference the passed-in
low_watermark_offsets directly.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
- Clarify that the left-extrapolation cap bounds lag-in-time regardless of
  cluster monitoring or the low-watermark floor, and document why there is no
  symmetric right-side clamp (the newest cached sample is the just-collected
  highwater, which the consumer offset can never exceed).
- Promote ClusterMetadataCollector.fetch_earliest_offsets to a public method
  since KafkaCheck now calls it across the class boundary.
- Log a debug line when the cache-prune floor falls back from the low watermark
  to the earliest consumer offset.
- Extract the Visvalingam-Whyatt significance closure into a module-level
  _interpolation_error helper.
- Parameterize the _visvalingam_whyatt tests; add direct tests for
  _earliest_consumer_offsets, _prune_below_anchor, and the left-extrapolation
  cap through report_consumer_offsets_and_lag without a low watermark.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ection

Pass the topic-partition map computed in check() through collect_all_metadata
into _collect_topic_metadata instead of fetching it again, so the cluster
monitoring path makes the same number of get_topic_partitions calls as before.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@piochelepiotr piochelepiotr marked this pull request as ready for review June 25, 2026 13:34
@piochelepiotr piochelepiotr requested a review from a team as a code owner June 25, 2026 13:34
@dd-octo-sts

dd-octo-sts Bot commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Validation Report

All 21 validations passed.

Show details
Validation Description Status
agent-reqs Verify check versions match the Agent requirements file
ci Validate CI configuration and code coverage settings
codeowners Validate every integration has a CODEOWNERS entry
config Validate default configuration files against spec.yaml
dep Verify dependency pins are consistent and Agent-compatible
http Validate integrations use the HTTP wrapper correctly
imports Validate check imports do not use deprecated modules
integration-style Validate check code style conventions
jmx-metrics Validate JMX metrics definition files and config
labeler Validate PR labeler config matches integration directories
legacy-signature Validate no integration uses the legacy Agent check signature
license-headers Validate Python files have proper license headers
licenses Validate third-party license attribution list
metadata Validate metadata.csv metric definitions
models Validate configuration data models match spec.yaml
openmetrics Validate OpenMetrics integrations disable the metric limit
package Validate Python package metadata and naming
qa-label Validate the pull request declares whether it needs QA for the next Agent release
readmes Validate README files have required sections
saved-views Validate saved view JSON file structure and fields
version Validate version consistency between package and changelog

View full run

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 9f76e7821a

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +546 to +549
for o in offsets:
if prev[o] is not None and nxt[o] is not None:
current[o] = _interpolation_error(o, prev, nxt, timestamps)
heap.append((current[o], o))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Allow stale reset endpoints to age out

When a topic is recreated or reset and the new highwater is still above an old cached offset, the stale purge above only removes cached offsets greater than the new highwater, so lower offsets from the previous topic generation can remain. This compactor never enqueues the first/last offset, so the smallest cached offset is preserved forever instead of eventually being evicted by age as before; that stale timestamp can then keep poisoning estimated_consumer_lag interpolation for the new topic generation. Consider allowing endpoint samples that predate a reset to be dropped, or falling back to timestamp-based eviction for those cases.

Useful? React with 👍 / 👎.

del timestamps[o]


def _visvalingam_whyatt(timestamps, target_count):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a good bit slower than the previous eviction behavior. Do you think it will take too long on clusters with a lot of partitions?

@piochelepiotr piochelepiotr Jun 25, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's O(nlog(m)), so it will still be very fast. And it only runs 1/500 of the time.
The previous one was O(n
log(n)) and was running every time.
I don't expect that to be an issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants