From 5331ba9dab5cfecf38a14c1c5a48f50e7bbd8767 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Wed, 24 Jun 2026 15:58:50 +0200 Subject: [PATCH 01/12] kafka_consumer: bound and refine estimated_consumer_lag Cap left-extrapolation of the broker timestamp cache so a consumer offset older than the oldest cached sample cannot extrapolate more than 10 minutes past it, keeping estimated_consumer_lag bounded. Use max(consumer_offset, low_watermark) as the offset basis for lag-in-time when cluster monitoring is enabled: messages below the low watermark are out of retention and unreachable, so they should not inflate the time lag. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../datadog_checks/kafka_consumer/client.py | 27 +++++++++++++- .../kafka_consumer/kafka_consumer.py | 37 ++++++++++++++++++- kafka_consumer/tests/test_unit.py | 37 +++++++++++++++++++ 3 files changed, 97 insertions(+), 4 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client.py b/kafka_consumer/datadog_checks/kafka_consumer/client.py index d26c81beed385..58962591e08e0 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client.py @@ -3,8 +3,8 @@ # Licensed under a 3-clause BSD style license (see LICENSE) from concurrent.futures import as_completed -from confluent_kafka import Consumer, ConsumerGroupTopicPartitions, KafkaException, TopicPartition -from confluent_kafka.admin import AdminClient +from confluent_kafka import Consumer, ConsumerGroupTopicPartitions, IsolationLevel, KafkaException, TopicPartition +from confluent_kafka.admin import AdminClient, OffsetSpec # AWS MSK IAM authentication support try: @@ -168,6 +168,29 @@ def consumer_offsets_for_times(self, partitions, offset=-1): results.append((tp.topic, tp.partition, tp.offset)) return results + def get_low_watermark_offsets(self, partitions): + """Batch-fetch log-start (low watermark) offsets via AdminClient.list_offsets(earliest).""" + requests = {TopicPartition(topic, partition): OffsetSpec.earliest() for topic, partition in partitions} + if not requests: + return {} + + result = {} + try: + futures = self.kafka_client.list_offsets( + requests, + isolation_level=IsolationLevel.READ_UNCOMMITTED, + request_timeout=self.config._request_timeout, + ) + for tp, future in futures.items(): + try: + result[(tp.topic, tp.partition)] = future.result().offset + except Exception as e: + self.log.debug("Failed to fetch low watermark for %s:%s: %s", tp.topic, tp.partition, e) + except Exception as e: + self.log.warning("Failed to issue list_offsets request for low watermarks: %s", e) + return {} + return result + def _list_topics(self): if self._cluster_metadata: return self._cluster_metadata diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 0ac2e213a5748..050af96ce4d0f 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -17,6 +17,11 @@ MAX_TIMESTAMPS = 1000 +# When the consumer offset predates the oldest cached broker sample we extrapolate into the past, +# where the affine offset/timestamp assumption is unreliable. Cap how far back we extrapolate so the +# reported estimated_consumer_lag never exceeds the cached history window by more than this many seconds. +LAG_EXTRAPOLATION_LIMIT_SECONDS = 600 + class KafkaCheck(AgentCheck): __NAMESPACE__ = 'kafka' @@ -118,6 +123,16 @@ def check(self, _): if self.config._kafka_cluster_id_override: cluster_id = self.config._kafka_cluster_id_override + # Low watermark offsets let us cap lag-in-time for consumers that have fallen out of + # retention. They are only needed (and only fetched) when cluster monitoring is enabled. + low_watermark_offsets = {} + if self.config._cluster_monitoring_enabled and self._data_streams_enabled: + consumer_partitions = {tp for offsets in consumer_offsets.values() for tp in offsets} + try: + low_watermark_offsets = self.client.get_low_watermark_offsets(consumer_partitions) + except Exception: + self.log.exception("There was a problem collecting the low watermark offsets.") + self.report_highwater_offsets(highwater_offsets, reporting_limit, cluster_id) self.report_consumer_offsets_and_lag( consumer_offsets, @@ -125,6 +140,7 @@ def check(self, _): reporting_limit - len(highwater_offsets), broker_timestamps, cluster_id, + low_watermark_offsets, ) # Collect cluster metadata if enabled @@ -292,9 +308,16 @@ def report_highwater_offsets(self, highwater_offsets, contexts_limit, cluster_id self.log.debug('%s highwater offsets reported', reported_contexts) def report_consumer_offsets_and_lag( - self, consumer_offsets, highwater_offsets, contexts_limit, broker_timestamps, cluster_id + self, + consumer_offsets, + highwater_offsets, + contexts_limit, + broker_timestamps, + cluster_id, + low_watermark_offsets=None, ): """Report the consumer offsets and consumer lag.""" + low_watermark_offsets = low_watermark_offsets or {} reported_contexts = 0 self.log.debug("Reporting consumer offsets and lag metrics") for consumer_group, offsets in consumer_offsets.items(): @@ -368,7 +391,12 @@ def report_consumer_offsets_and_lag( timestamps = broker_timestamps["{}_{}".format(topic, partition)] # The producer timestamp can be not set if there was an error fetching broker offsets. producer_timestamp = timestamps.get(producer_offset, None) - consumer_timestamp = _get_interpolated_timestamp(timestamps, consumer_offset) + # A consumer that has fallen behind the low watermark can never read the messages + # between its committed offset and the low watermark (they are out of retention), so + # its effective position for lag-in-time is the low watermark. + low_watermark = low_watermark_offsets.get((topic, partition)) + effective_offset = consumer_offset if low_watermark is None else max(consumer_offset, low_watermark) + consumer_timestamp = _get_interpolated_timestamp(timestamps, effective_offset) if consumer_timestamp is None or producer_timestamp is None: continue lag = producer_timestamp - consumer_timestamp @@ -482,4 +510,9 @@ def _get_interpolated_timestamp(timestamps, offset): timestamp_after = timestamps[offset_after] slope = (timestamp_after - timestamp_before) / float(offset_after - offset_before) timestamp = slope * (offset - offset_after) + timestamp_after + + if offset < offset_before: + # Extrapolating to the left of the oldest cached sample: clamp the timestamp so the + # reported lag stays bounded instead of growing arbitrarily with the extrapolation. + timestamp = max(timestamp, timestamp_before - LAG_EXTRAPOLATION_LIMIT_SECONDS) return timestamp diff --git a/kafka_consumer/tests/test_unit.py b/kafka_consumer/tests/test_unit.py index 0653cbd44a1c4..4f9ff723ffc30 100644 --- a/kafka_consumer/tests/test_unit.py +++ b/kafka_consumer/tests/test_unit.py @@ -444,6 +444,16 @@ def test_get_interpolated_timestamp(): assert _get_interpolated_timestamp({10: 200}, 15) is None +def test_get_interpolated_timestamp_caps_left_extrapolation(): + # Slope is 1 timestamp unit per offset; offset 0 is 1100 offsets before the oldest sample. + # Unclamped this would extrapolate to 9000, but we cap it at oldest_sample - 600 = 9400. + assert _get_interpolated_timestamp({1000: 10000, 1100: 10100}, 0) == 9400 + # A modest left-extrapolation that stays within the 600s budget is left untouched. + assert _get_interpolated_timestamp({1000: 10000, 1100: 10100}, 900) == 9900 + # Interpolation between known offsets is never affected by the cap. + assert _get_interpolated_timestamp({1000: 10000, 1100: 10100}, 1050) == 10050 + + @pytest.mark.parametrize( 'persistent_cache_contents, instance_overrides, consumer_lag_seconds_count', [ @@ -530,6 +540,33 @@ def test_add_broker_timestamps_evicts_by_oldest_timestamp(kafka_instance, check) assert 600 in timestamps +def test_report_lag_in_time_uses_low_watermark(kafka_instance, check, aggregator): + # A consumer behind the low watermark can't read the out-of-retention messages between its + # committed offset and the low watermark, so lag-in-time is interpolated from the low watermark. + kafka_instance['data_streams_enabled'] = True + check = check(kafka_instance) + mock_client = seed_mock_client() + mock_client.get_partitions_for_topic.return_value = [0] + check.client = mock_client + + consumer_offsets = {"consumer_group1": {("topic1", 0): 5}} + highwater_offsets = {("topic1", 0): 100} + broker_timestamps = {"topic1_0": {50: 1000.0, 100: 1100.0}} + low_watermark_offsets = {("topic1", 0): 50} + + check.report_consumer_offsets_and_lag( + consumer_offsets, + highwater_offsets, + float('inf'), + broker_timestamps, + "cluster_id", + low_watermark_offsets, + ) + + # effective offset = max(5, 50) = 50 -> consumer_timestamp 1000, producer_timestamp 1100, lag 100. + aggregator.assert_metric("kafka.estimated_consumer_lag", value=100.0, count=1) + + def test_count_consumer_contexts(check, kafka_instance): kafka_consumer_check = check(kafka_instance) consumer_offsets = { From a8d4125c2a8b692b9f915a7d52fd7b0dd5c7fffb Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Wed, 24 Jun 2026 15:59:29 +0200 Subject: [PATCH 02/12] kafka_consumer: add changelog entry for PR #24167 Co-Authored-By: Claude Opus 4.8 (1M context) --- kafka_consumer/changelog.d/24167.fixed | 1 + 1 file changed, 1 insertion(+) create mode 100644 kafka_consumer/changelog.d/24167.fixed diff --git a/kafka_consumer/changelog.d/24167.fixed b/kafka_consumer/changelog.d/24167.fixed new file mode 100644 index 0000000000000..981496a746905 --- /dev/null +++ b/kafka_consumer/changelog.d/24167.fixed @@ -0,0 +1 @@ +Cap interpolated estimated_consumer_lag for offsets older than the cached history and use the low watermark as a floor when cluster monitoring is enabled. From 04836ec4eac0e7fe32fab22478c283d7aec7ba23 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Wed, 24 Jun 2026 16:45:39 +0200 Subject: [PATCH 03/12] kafka_consumer: compact and prune the broker-timestamp cache Replace single-oldest eviction with batch compaction (Visvalingam-Whyatt) triggered when the cache reaches capacity: keep the oldest and newest samples and drop the points that least distort the offset/timestamp curve, so the cache spans a longer history at a coarsening resolution and high lag is interpolated rather than extrapolated. At the same trigger, prune samples below the earliest consumer offset (keeping one anchor) since no consumer will ever interpolate there. Co-Authored-By: Claude Opus 4.8 (1M context) --- kafka_consumer/changelog.d/24167.fixed | 2 +- .../kafka_consumer/kafka_consumer.py | 97 +++++++++++++++++-- kafka_consumer/tests/test_unit.py | 56 ++++++++--- 3 files changed, 136 insertions(+), 19 deletions(-) diff --git a/kafka_consumer/changelog.d/24167.fixed b/kafka_consumer/changelog.d/24167.fixed index 981496a746905..e012c0dbcf413 100644 --- a/kafka_consumer/changelog.d/24167.fixed +++ b/kafka_consumer/changelog.d/24167.fixed @@ -1 +1 @@ -Cap interpolated estimated_consumer_lag for offsets older than the cached history and use the low watermark as a floor when cluster monitoring is enabled. +Improve the accuracy of ``estimated_consumer_lag`` for consumers that are far behind: cap interpolation for offsets older than the cached broker history, use the low watermark as a floor for the lag offset when cluster monitoring is enabled, and retain a longer broker-timestamp history by compacting the cache (Visvalingam-Whyatt) and pruning samples below the earliest consumer offset instead of evicting the oldest one. diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 050af96ce4d0f..7ec34c70c83e5 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -1,6 +1,7 @@ # (C) Datadog, Inc. 2019-present # All rights reserved # Licensed under Simplified BSD License (see LICENSE) +import heapq import json from collections import defaultdict from time import time @@ -89,7 +90,8 @@ def check(self, _): highwater_offsets, cluster_id = self.get_highwater_offsets(partitions) if self._data_streams_enabled: broker_timestamps = self._load_broker_timestamps(persistent_cache_key) - self._add_broker_timestamps(broker_timestamps, highwater_offsets) + earliest_consumer_offsets = self._earliest_consumer_offsets(consumer_offsets) + self._add_broker_timestamps(broker_timestamps, highwater_offsets, earliest_consumer_offsets) self._save_broker_timestamps(broker_timestamps, persistent_cache_key) else: self.warning("Context limit reached. Skipping highwater offset collection.") @@ -270,7 +272,17 @@ def _load_broker_timestamps(self, persistent_cache_key): self.log.warning('Could not read broker timestamps from cache: %s', str(e)) return broker_timestamps - def _add_broker_timestamps(self, broker_timestamps, highwater_offsets): + def _earliest_consumer_offsets(self, consumer_offsets): + """Return the lowest committed offset per (topic, partition) across all consumer groups.""" + earliest = {} + for offsets in consumer_offsets.values(): + for topic_partition, offset in offsets.items(): + if topic_partition not in earliest or offset < earliest[topic_partition]: + earliest[topic_partition] = offset + return earliest + + def _add_broker_timestamps(self, broker_timestamps, highwater_offsets, earliest_consumer_offsets=None): + earliest_consumer_offsets = earliest_consumer_offsets or {} for (topic, partition), highwater_offset in highwater_offsets.items(): timestamps = broker_timestamps["{}_{}".format(topic, partition)] # If the highwater offset went backwards (topic recreated, @@ -281,11 +293,17 @@ def _add_broker_timestamps(self, broker_timestamps, highwater_offsets): for o in stale: del timestamps[o] timestamps[highwater_offset] = time() - # If there's too many timestamps, we delete the oldest one (by - # timestamp, not by offset — evicting by min offset would discard - # the fresh post-reset entries and keep poisonous stale ones). - if len(timestamps) > self._max_timestamps: - del timestamps[min(timestamps, key=timestamps.get)] + # Once the cache fills up, prune and compact it instead of dropping a single entry. + if len(timestamps) >= self._max_timestamps: + # Samples older than the earliest consumer offset are useless — no consumer will ever + # interpolate there — so drop them, keeping one anchor at/below that offset. + earliest_consumer_offset = earliest_consumer_offsets.get((topic, partition)) + if earliest_consumer_offset is not None: + _prune_below_anchor(timestamps, earliest_consumer_offset) + # Compact what remains down to half capacity. The Visvalingam-Whyatt simplification keeps + # the oldest and newest samples and discards the points that least affect the + # offset/timestamp curve, so the cache spans a longer history at a coarsening resolution. + _visvalingam_whyatt(timestamps, max(2, self._max_timestamps // 2)) def _save_broker_timestamps(self, broker_timestamps, persistent_cache_key): """Saves broker timestamps to persistent cache.""" @@ -516,3 +534,68 @@ def _get_interpolated_timestamp(timestamps, offset): # reported lag stays bounded instead of growing arbitrarily with the extrapolation. timestamp = max(timestamp, timestamp_before - LAG_EXTRAPOLATION_LIMIT_SECONDS) return timestamp + + +def _prune_below_anchor(timestamps, floor): + """Drop cached samples strictly below the largest offset at or below ``floor``, in place. + + Samples below the earliest offset any consumer needs are useless, but we keep one anchor at or + below ``floor`` so the most-lagging consumer can still interpolate (rather than extrapolate). + """ + below = [o for o in timestamps if o < floor] + if len(below) <= 1: + return + anchor = max(below) + for o in below: + if o != anchor: + del timestamps[o] + + +def _visvalingam_whyatt(timestamps, target_count): + """Downsample an offset->timestamp series to ``target_count`` points in place. + + Implements the Visvalingam-Whyatt line simplification algorithm: the two endpoints (the oldest + and newest samples) are always kept, and the interior point whose removal least distorts the + linear offset/timestamp interpolation is dropped repeatedly until the target size is reached. + A point's "significance" is the time error introduced by reconstructing it from its neighbors. + + Runs in O(n log n) using a min-heap keyed by significance, a doubly-linked list of neighbors, + and lazy deletion of heap entries that become stale when a neighbor is removed. + """ + if len(timestamps) <= target_count: + return timestamps + + offsets = sorted(timestamps) + prev = {o: (offsets[i - 1] if i > 0 else None) for i, o in enumerate(offsets)} + nxt = {o: (offsets[i + 1] if i < len(offsets) - 1 else None) for i, o in enumerate(offsets)} + alive = set(offsets) + + def significance(o): + before, after = prev[o], nxt[o] + predicted = timestamps[before] + (timestamps[after] - timestamps[before]) * (o - before) / (after - before) + return abs(timestamps[o] - predicted) + + current = {} + heap = [] + for o in offsets: + if prev[o] is not None and nxt[o] is not None: + current[o] = significance(o) + heap.append((current[o], o)) + heapq.heapify(heap) + + remaining = len(offsets) + while remaining > target_count and heap: + error, o = heapq.heappop(heap) + if o not in alive or error != current.get(o): + # Stale entry: the point was already removed, or its significance was recomputed. + continue + before, after = prev[o], nxt[o] + alive.discard(o) + del timestamps[o] + remaining -= 1 + nxt[before], prev[after] = after, before + for neighbor in (before, after): + if prev[neighbor] is not None and nxt[neighbor] is not None: + current[neighbor] = significance(neighbor) + heapq.heappush(heap, (current[neighbor], neighbor)) + return timestamps diff --git a/kafka_consumer/tests/test_unit.py b/kafka_consumer/tests/test_unit.py index 4f9ff723ffc30..069fc4b899889 100644 --- a/kafka_consumer/tests/test_unit.py +++ b/kafka_consumer/tests/test_unit.py @@ -10,7 +10,7 @@ from datadog_checks.kafka_consumer import KafkaCheck from datadog_checks.kafka_consumer.client import KafkaClient -from datadog_checks.kafka_consumer.kafka_consumer import _get_interpolated_timestamp +from datadog_checks.kafka_consumer.kafka_consumer import _get_interpolated_timestamp, _visvalingam_whyatt pytestmark = [pytest.mark.unit] @@ -454,6 +454,27 @@ def test_get_interpolated_timestamp_caps_left_extrapolation(): assert _get_interpolated_timestamp({1000: 10000, 1100: 10100}, 1050) == 10050 +def test_visvalingam_whyatt_keeps_endpoints_and_drops_collinear(): + # A perfectly linear series: every interior point is redundant, so any of them can go. + timestamps = {0: 0.0, 10: 10.0, 20: 20.0, 30: 30.0, 40: 40.0} + _visvalingam_whyatt(timestamps, 3) + assert len(timestamps) == 3 + assert 0 in timestamps and 40 in timestamps # endpoints are always preserved + + +def test_visvalingam_whyatt_keeps_rate_changes(): + # Constant rate up to offset 20, then a sharp acceleration: 10 is redundant, the kink at 20 isn't. + timestamps = {0: 0.0, 10: 10.0, 20: 20.0, 30: 100.0} + _visvalingam_whyatt(timestamps, 3) + assert set(timestamps) == {0, 20, 30} + + +def test_visvalingam_whyatt_noop_when_within_target(): + timestamps = {0: 0.0, 10: 10.0} + _visvalingam_whyatt(timestamps, 5) + assert timestamps == {0: 0.0, 10: 10.0} + + @pytest.mark.parametrize( 'persistent_cache_contents, instance_overrides, consumer_lag_seconds_count', [ @@ -525,19 +546,32 @@ def test_add_broker_timestamps_purges_stale_offsets_on_reset(kafka_instance, che assert 170 in timestamps -def test_add_broker_timestamps_evicts_by_oldest_timestamp(kafka_instance, check): - # Eviction must drop the entry with the oldest timestamp, not the smallest - # offset. Evicting by min(offset) would discard fresh post-reset entries - # and keep poisonous ones. - kafka_instance['timestamp_history_size'] = 2 +def test_add_broker_timestamps_compacts_when_full(kafka_instance, check): + # When the cache reaches capacity it is compacted down to half its size, keeping the oldest and + # newest samples and dropping the points that least affect the offset/timestamp curve. + kafka_instance['timestamp_history_size'] = 4 + check = check(kafka_instance) + broker_timestamps = {"topic1_0": {10: 1.0, 20: 2.0, 30: 3.0}} + check._add_broker_timestamps(broker_timestamps, {("topic1", 0): 40}) + + timestamps = broker_timestamps["topic1_0"] + assert len(timestamps) == 2 + assert 10 in timestamps # oldest endpoint preserved + assert 40 in timestamps # newest endpoint preserved + + +def test_add_broker_timestamps_prunes_below_earliest_consumer_offset(kafka_instance, check): + # At compaction time, samples older than the earliest consumer offset are useless (no consumer + # will ever interpolate there), so they are pruned — keeping one anchor at/below that offset. + kafka_instance['timestamp_history_size'] = 4 check = check(kafka_instance) - broker_timestamps = {"topic1_0": {500: 50.0, 400: 999.0}} - check._add_broker_timestamps(broker_timestamps, {("topic1", 0): 600}) + broker_timestamps = {"topic1_0": {10: 1.0, 20: 2.0, 30: 3.0}} + check._add_broker_timestamps(broker_timestamps, {("topic1", 0): 40}, {("topic1", 0): 25}) timestamps = broker_timestamps["topic1_0"] - assert 500 not in timestamps # oldest by timestamp - assert 400 in timestamps - assert 600 in timestamps + assert 10 not in timestamps # useless: below the earliest consumer offset and not the anchor + assert 20 in timestamps # anchor: largest sample at/below the earliest consumer offset + assert 40 in timestamps def test_report_lag_in_time_uses_low_watermark(kafka_instance, check, aggregator): From fefa2a2baccdca69f243098ac438bd821a4b0df2 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Wed, 24 Jun 2026 16:58:37 +0200 Subject: [PATCH 04/12] kafka_consumer: prune broker-timestamp cache by low watermark Use the partition low watermark as the prune floor when cluster monitoring is enabled (the physically meaningful "lowest readable offset"), falling back to the earliest committed consumer offset otherwise. The low watermark is now fetched before the cache update and reused for both pruning and the lag-in-time floor, so there is no extra broker call. Co-Authored-By: Claude Opus 4.8 (1M context) --- kafka_consumer/changelog.d/24167.fixed | 2 +- .../kafka_consumer/kafka_consumer.py | 45 +++++++++++-------- kafka_consumer/tests/test_unit.py | 22 +++++++++ 3 files changed, 49 insertions(+), 20 deletions(-) diff --git a/kafka_consumer/changelog.d/24167.fixed b/kafka_consumer/changelog.d/24167.fixed index e012c0dbcf413..3898228f1d51c 100644 --- a/kafka_consumer/changelog.d/24167.fixed +++ b/kafka_consumer/changelog.d/24167.fixed @@ -1 +1 @@ -Improve the accuracy of ``estimated_consumer_lag`` for consumers that are far behind: cap interpolation for offsets older than the cached broker history, use the low watermark as a floor for the lag offset when cluster monitoring is enabled, and retain a longer broker-timestamp history by compacting the cache (Visvalingam-Whyatt) and pruning samples below the earliest consumer offset instead of evicting the oldest one. +Improve the accuracy of ``estimated_consumer_lag`` for consumers that are far behind: cap interpolation for offsets older than the cached broker history, use the low watermark as a floor for the lag offset when cluster monitoring is enabled, and retain a longer broker-timestamp history by compacting the cache (Visvalingam-Whyatt) and pruning samples below the lowest readable offset (the low watermark, or the earliest consumer offset when cluster monitoring is disabled) instead of evicting the oldest one. diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 7ec34c70c83e5..827bb8ef8e62f 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -69,6 +69,7 @@ def check(self, _): # Fetch the broker highwater offsets highwater_offsets = {} broker_timestamps = defaultdict(dict) + low_watermark_offsets = {} cluster_id = "" persistent_cache_key = "broker_timestamps_" consumer_contexts_count = self.count_consumer_contexts(consumer_offsets) @@ -89,9 +90,16 @@ def check(self, _): # Expected format: ({(topic, partition): offset}, cluster_id) highwater_offsets, cluster_id = self.get_highwater_offsets(partitions) if self._data_streams_enabled: + # Low watermark offsets are the physically meaningful floor for lag-in-time (and + # for pruning the cache), but require an extra broker call, so we only fetch them + # when cluster monitoring is enabled. + if self.config._cluster_monitoring_enabled: + low_watermark_offsets = self._get_low_watermark_offsets(consumer_offsets) broker_timestamps = self._load_broker_timestamps(persistent_cache_key) - earliest_consumer_offsets = self._earliest_consumer_offsets(consumer_offsets) - self._add_broker_timestamps(broker_timestamps, highwater_offsets, earliest_consumer_offsets) + # Prune cache entries below the lowest offset any consumer could read: the low + # watermark when we have it, otherwise the earliest committed consumer offset. + prune_floors = low_watermark_offsets or self._earliest_consumer_offsets(consumer_offsets) + self._add_broker_timestamps(broker_timestamps, highwater_offsets, prune_floors) self._save_broker_timestamps(broker_timestamps, persistent_cache_key) else: self.warning("Context limit reached. Skipping highwater offset collection.") @@ -125,16 +133,6 @@ def check(self, _): if self.config._kafka_cluster_id_override: cluster_id = self.config._kafka_cluster_id_override - # Low watermark offsets let us cap lag-in-time for consumers that have fallen out of - # retention. They are only needed (and only fetched) when cluster monitoring is enabled. - low_watermark_offsets = {} - if self.config._cluster_monitoring_enabled and self._data_streams_enabled: - consumer_partitions = {tp for offsets in consumer_offsets.values() for tp in offsets} - try: - low_watermark_offsets = self.client.get_low_watermark_offsets(consumer_partitions) - except Exception: - self.log.exception("There was a problem collecting the low watermark offsets.") - self.report_highwater_offsets(highwater_offsets, reporting_limit, cluster_id) self.report_consumer_offsets_and_lag( consumer_offsets, @@ -272,6 +270,15 @@ def _load_broker_timestamps(self, persistent_cache_key): self.log.warning('Could not read broker timestamps from cache: %s', str(e)) return broker_timestamps + def _get_low_watermark_offsets(self, consumer_offsets): + """Fetch low watermark (earliest) offsets for every consumed partition.""" + consumer_partitions = {tp for offsets in consumer_offsets.values() for tp in offsets} + try: + return self.client.get_low_watermark_offsets(consumer_partitions) + except Exception: + self.log.exception("There was a problem collecting the low watermark offsets.") + return {} + def _earliest_consumer_offsets(self, consumer_offsets): """Return the lowest committed offset per (topic, partition) across all consumer groups.""" earliest = {} @@ -281,8 +288,8 @@ def _earliest_consumer_offsets(self, consumer_offsets): earliest[topic_partition] = offset return earliest - def _add_broker_timestamps(self, broker_timestamps, highwater_offsets, earliest_consumer_offsets=None): - earliest_consumer_offsets = earliest_consumer_offsets or {} + def _add_broker_timestamps(self, broker_timestamps, highwater_offsets, prune_floors=None): + prune_floors = prune_floors or {} for (topic, partition), highwater_offset in highwater_offsets.items(): timestamps = broker_timestamps["{}_{}".format(topic, partition)] # If the highwater offset went backwards (topic recreated, @@ -295,11 +302,11 @@ def _add_broker_timestamps(self, broker_timestamps, highwater_offsets, earliest_ timestamps[highwater_offset] = time() # Once the cache fills up, prune and compact it instead of dropping a single entry. if len(timestamps) >= self._max_timestamps: - # Samples older than the earliest consumer offset are useless — no consumer will ever - # interpolate there — so drop them, keeping one anchor at/below that offset. - earliest_consumer_offset = earliest_consumer_offsets.get((topic, partition)) - if earliest_consumer_offset is not None: - _prune_below_anchor(timestamps, earliest_consumer_offset) + # Samples below the lowest offset any consumer can read are useless — no consumer will + # ever interpolate there — so drop them, keeping one anchor at/below that floor. + prune_floor = prune_floors.get((topic, partition)) + if prune_floor is not None: + _prune_below_anchor(timestamps, prune_floor) # Compact what remains down to half capacity. The Visvalingam-Whyatt simplification keeps # the oldest and newest samples and discards the points that least affect the # offset/timestamp curve, so the cache spans a longer history at a coarsening resolution. diff --git a/kafka_consumer/tests/test_unit.py b/kafka_consumer/tests/test_unit.py index 069fc4b899889..46e31f1e5149d 100644 --- a/kafka_consumer/tests/test_unit.py +++ b/kafka_consumer/tests/test_unit.py @@ -601,6 +601,28 @@ def test_report_lag_in_time_uses_low_watermark(kafka_instance, check, aggregator aggregator.assert_metric("kafka.estimated_consumer_lag", value=100.0, count=1) +def test_get_low_watermark_offsets_delegates_to_client(kafka_instance, check): + check = check(kafka_instance) + mock_client = seed_mock_client() + mock_client.get_low_watermark_offsets.return_value = {("topic1", 0): 5} + check.client = mock_client + + consumer_offsets = {"group1": {("topic1", 0): 10, ("topic1", 1): 20}} + result = check._get_low_watermark_offsets(consumer_offsets) + + assert result == {("topic1", 0): 5} + assert mock_client.get_low_watermark_offsets.call_args[0][0] == {("topic1", 0), ("topic1", 1)} + + +def test_get_low_watermark_offsets_handles_errors(kafka_instance, check): + check = check(kafka_instance) + mock_client = seed_mock_client() + mock_client.get_low_watermark_offsets.side_effect = Exception("boom") + check.client = mock_client + + assert check._get_low_watermark_offsets({"group1": {("topic1", 0): 10}}) == {} + + def test_count_consumer_contexts(check, kafka_instance): kafka_consumer_check = check(kafka_instance) consumer_offsets = { From ac84e55aaf8e9e46aeadc199a90682fde6a46b7f Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Thu, 25 Jun 2026 12:57:56 +0200 Subject: [PATCH 05/12] kafka_consumer: fetch low watermark offsets once and share them Previously the log-start (low watermark) offsets were fetched twice per run when cluster monitoring and data streams were both enabled: once by the metadata collector for partition.size/topic.size/throughput, and again by the lag path for the lag-in-time and cache-pruning floor. Fetch them once in check(), gated on cluster monitoring, over all non-internal topic partitions, and share the result with both the data-streams lag path and the metadata collector. Removes the duplicate list_offsets(earliest) call and the divergent internal-topic handling. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../datadog_checks/kafka_consumer/client.py | 20 +++++- .../kafka_consumer/cluster_metadata.py | 68 ++----------------- .../kafka_consumer/kafka_consumer.py | 26 ++++--- kafka_consumer/tests/test_cluster_metadata.py | 5 ++ kafka_consumer/tests/test_unit.py | 8 ++- 5 files changed, 51 insertions(+), 76 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client.py b/kafka_consumer/datadog_checks/kafka_consumer/client.py index 58962591e08e0..ecc77a72a0c2b 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client.py @@ -169,12 +169,19 @@ def consumer_offsets_for_times(self, partitions, offset=-1): return results def get_low_watermark_offsets(self, partitions): - """Batch-fetch log-start (low watermark) offsets via AdminClient.list_offsets(earliest).""" + """Batch-fetch log-start (low watermark) offsets via AdminClient.list_offsets(earliest). + + Uses ListOffsets with the EARLIEST_TIMESTAMP sentinel, which the broker services from + in-memory state without scanning .timeindex segment files. Failures surface as missing + entries so callers skip the earliest-dependent work for those partitions rather than + aborting the whole collection. + """ requests = {TopicPartition(topic, partition): OffsetSpec.earliest() for topic, partition in partitions} if not requests: return {} result = {} + errors = 0 try: futures = self.kafka_client.list_offsets( requests, @@ -185,10 +192,19 @@ def get_low_watermark_offsets(self, partitions): try: result[(tp.topic, tp.partition)] = future.result().offset except Exception as e: - self.log.debug("Failed to fetch low watermark for %s:%s: %s", tp.topic, tp.partition, e) + errors += 1 + if errors <= 3: + self.log.debug("Failed to fetch low watermark for %s:%s: %s", tp.topic, tp.partition, e) except Exception as e: self.log.warning("Failed to issue list_offsets request for low watermarks: %s", e) return {} + if errors: + self.log.warning( + "Failed to fetch low watermark for %d/%d partitions; " + "earliest-dependent metrics will be skipped for those partitions", + errors, + len(requests), + ) return result def _list_topics(self): diff --git a/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py b/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py index 684dc5b02c002..c336522285349 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py @@ -13,8 +13,7 @@ from typing import Any, NotRequired, TypedDict from urllib.parse import quote -from confluent_kafka import IsolationLevel, TopicPartition -from confluent_kafka.admin import ConfigResource, OffsetSpec, ResourceType +from confluent_kafka.admin import ConfigResource, ResourceType from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS @@ -221,7 +220,7 @@ def _parallel_fetch(self, fn: Callable[[str], Any], subjects: list[str], error_l self.log.warning("Error fetching %s for %s: %s", error_label, subject, e) return results - def collect_all_metadata(self, highwater_offsets): + def collect_all_metadata(self, highwater_offsets, low_watermark_offsets): try: shared_metadata = self.client.kafka_client.list_topics(timeout=self.config._request_timeout) except Exception as e: @@ -234,7 +233,7 @@ def collect_all_metadata(self, highwater_offsets): self.log.error("Error collecting broker metadata: %s", e) try: - self._collect_topic_metadata(shared_metadata, highwater_offsets) + self._collect_topic_metadata(shared_metadata, highwater_offsets, low_watermark_offsets) except Exception as e: self.log.error("Error collecting topic metadata: %s", e) @@ -534,62 +533,7 @@ def _collect_broker_metadata(self, metadata=None): "data-streams-message", ) - def _fetch_earliest_offsets(self, topic_partitions): - """Batch-fetch log-start offsets via AdminClient.list_offsets(earliest). - - Uses ListOffsets with the EARLIEST_TIMESTAMP sentinel, which the broker - services from in-memory state without scanning .timeindex segment files. - Failures are logged and surface as missing entries — the caller skips - the earliest-dependent metrics rather than aborting the whole topic - metadata collection. - """ - requests = { - TopicPartition(topic, partition): OffsetSpec.earliest() - for topic, partitions in topic_partitions.items() - if topic not in KAFKA_INTERNAL_TOPICS - for partition in partitions - } - if not requests: - return {} - - result = {} - errors = 0 - try: - futures = self.client.kafka_client.list_offsets( - requests, - isolation_level=IsolationLevel.READ_UNCOMMITTED, - request_timeout=self.config._request_timeout, - ) - for tp, future in futures.items(): - try: - info = future.result() - result[(tp.topic, tp.partition)] = info.offset - except Exception as e: - errors += 1 - if errors <= 3: - self.log.debug( - "Failed to fetch earliest offset for %s:%s: %s", - tp.topic, - tp.partition, - e, - ) - except Exception as e: - self.log.warning( - "Failed to issue list_offsets request; partition.beginning_offset, " - "partition.size, and topic.size will be skipped this run: %s", - e, - ) - return {} - if errors: - self.log.warning( - "Failed to fetch earliest offset for %d/%d partitions; " - "earliest-dependent metrics will be skipped for those partitions", - errors, - len(requests), - ) - return result - - def _collect_topic_metadata(self, metadata, highwater_offsets): + def _collect_topic_metadata(self, metadata, highwater_offsets, low_watermark_offsets): self.log.debug("Collecting topic metadata") topic_partitions = self.client.get_topic_partitions() @@ -603,7 +547,9 @@ def _collect_topic_metadata(self, metadata, highwater_offsets): self.check.gauge('topic.count', len(topic_partitions), tags=self._get_tags(cluster_id)) - earliest_offsets = self._fetch_earliest_offsets(topic_partitions) + # Low watermark (log-start) offsets are fetched once per run by the check and shared here, + # so partition.size, topic.size, and throughput reuse the same broker call as the lag path. + earliest_offsets = low_watermark_offsets now_ts = time.time() prev_ts = None diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 827bb8ef8e62f..1949df851508d 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -89,12 +89,13 @@ def check(self, _): partitions.add((topic, partition)) # Expected format: ({(topic, partition): offset}, cluster_id) highwater_offsets, cluster_id = self.get_highwater_offsets(partitions) + # Low watermark (log-start) offsets feed cluster-metadata metrics (partition.size, + # topic.size, throughput) and, when data streams is enabled, the lag-in-time and + # cache-pruning floor. They require an extra broker call, so fetch them once here and + # only under cluster monitoring, then share with the metadata collector below. + if self.config._cluster_monitoring_enabled: + low_watermark_offsets = self._get_low_watermark_offsets() if self._data_streams_enabled: - # Low watermark offsets are the physically meaningful floor for lag-in-time (and - # for pruning the cache), but require an extra broker call, so we only fetch them - # when cluster monitoring is enabled. - if self.config._cluster_monitoring_enabled: - low_watermark_offsets = self._get_low_watermark_offsets(consumer_offsets) broker_timestamps = self._load_broker_timestamps(persistent_cache_key) # Prune cache entries below the lowest offset any consumer could read: the low # watermark when we have it, otherwise the earliest committed consumer offset. @@ -147,7 +148,7 @@ def check(self, _): if self.config._cluster_monitoring_enabled: self._send_cluster_monitoring_heartbeat(total_contexts, cluster_id) try: - self.metadata_collector.collect_all_metadata(highwater_offsets) + self.metadata_collector.collect_all_metadata(highwater_offsets, low_watermark_offsets) except Exception as e: self.log.error("Error collecting cluster metadata: %s", e) @@ -270,11 +271,16 @@ def _load_broker_timestamps(self, persistent_cache_key): self.log.warning('Could not read broker timestamps from cache: %s', str(e)) return broker_timestamps - def _get_low_watermark_offsets(self, consumer_offsets): - """Fetch low watermark (earliest) offsets for every consumed partition.""" - consumer_partitions = {tp for offsets in consumer_offsets.values() for tp in offsets} + def _get_low_watermark_offsets(self): + """Fetch low watermark (earliest) offsets for every non-internal partition in the cluster.""" + partitions = { + (topic, partition) + for topic, parts in self.client.get_topic_partitions().items() + if topic not in KAFKA_INTERNAL_TOPICS + for partition in parts + } try: - return self.client.get_low_watermark_offsets(consumer_partitions) + return self.client.get_low_watermark_offsets(partitions) except Exception: self.log.exception("There was a problem collecting the low watermark offsets.") return {} diff --git a/kafka_consumer/tests/test_cluster_metadata.py b/kafka_consumer/tests/test_cluster_metadata.py index 878625d41b868..103fa480f0140 100644 --- a/kafka_consumer/tests/test_cluster_metadata.py +++ b/kafka_consumer/tests/test_cluster_metadata.py @@ -178,6 +178,11 @@ def mock_list_offsets(requests, **_kwargs): return result mock_admin_client.list_offsets.side_effect = mock_list_offsets + + def mock_low_watermark_offsets(partitions): + return {(topic, partition): 10 if partition == 0 else 20 for topic, partition in partitions} + + client.get_low_watermark_offsets.side_effect = mock_low_watermark_offsets client.consumer_get_cluster_id_and_list_topics.return_value = (cluster_id, [('test-topic', [0, 1])]) client.list_consumer_group_offsets.return_value = [] client.open_consumer.return_value = None diff --git a/kafka_consumer/tests/test_unit.py b/kafka_consumer/tests/test_unit.py index 46e31f1e5149d..bf748714779b7 100644 --- a/kafka_consumer/tests/test_unit.py +++ b/kafka_consumer/tests/test_unit.py @@ -604,23 +604,25 @@ def test_report_lag_in_time_uses_low_watermark(kafka_instance, check, aggregator def test_get_low_watermark_offsets_delegates_to_client(kafka_instance, check): check = check(kafka_instance) mock_client = seed_mock_client() + mock_client.get_topic_partitions.return_value = {"topic1": [0, 1], "__consumer_offsets": [0]} mock_client.get_low_watermark_offsets.return_value = {("topic1", 0): 5} check.client = mock_client - consumer_offsets = {"group1": {("topic1", 0): 10, ("topic1", 1): 20}} - result = check._get_low_watermark_offsets(consumer_offsets) + result = check._get_low_watermark_offsets() assert result == {("topic1", 0): 5} + # Internal topics are excluded and every non-internal partition is requested. assert mock_client.get_low_watermark_offsets.call_args[0][0] == {("topic1", 0), ("topic1", 1)} def test_get_low_watermark_offsets_handles_errors(kafka_instance, check): check = check(kafka_instance) mock_client = seed_mock_client() + mock_client.get_topic_partitions.return_value = {"topic1": [0]} mock_client.get_low_watermark_offsets.side_effect = Exception("boom") check.client = mock_client - assert check._get_low_watermark_offsets({"group1": {("topic1", 0): 10}}) == {} + assert check._get_low_watermark_offsets() == {} def test_count_consumer_contexts(check, kafka_instance): From 17b30ca80d9df3b7d4197887c8348fae5a932eb1 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Thu, 25 Jun 2026 14:08:55 +0200 Subject: [PATCH 06/12] kafka_consumer: reuse _fetch_earliest_offsets instead of a parallel fetch Drop the PR-added Client.get_low_watermark_offsets and the _get_low_watermark_offsets wrapper, which duplicated the existing ClusterMetadataCollector._fetch_earliest_offsets. The check now calls _fetch_earliest_offsets once under cluster monitoring and shares the result with both the data-streams lag/pruning path and the topic-metadata collection, so the earliest offsets are still fetched only once per run. This reverts client.py to master and keeps the cluster_metadata.py change to a small signature tweak. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../datadog_checks/kafka_consumer/client.py | 43 +------------- .../kafka_consumer/cluster_metadata.py | 58 ++++++++++++++++++- .../kafka_consumer/kafka_consumer.py | 18 +----- kafka_consumer/tests/test_cluster_metadata.py | 5 -- kafka_consumer/tests/test_unit.py | 24 -------- 5 files changed, 62 insertions(+), 86 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/client.py b/kafka_consumer/datadog_checks/kafka_consumer/client.py index ecc77a72a0c2b..d26c81beed385 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/client.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/client.py @@ -3,8 +3,8 @@ # Licensed under a 3-clause BSD style license (see LICENSE) from concurrent.futures import as_completed -from confluent_kafka import Consumer, ConsumerGroupTopicPartitions, IsolationLevel, KafkaException, TopicPartition -from confluent_kafka.admin import AdminClient, OffsetSpec +from confluent_kafka import Consumer, ConsumerGroupTopicPartitions, KafkaException, TopicPartition +from confluent_kafka.admin import AdminClient # AWS MSK IAM authentication support try: @@ -168,45 +168,6 @@ def consumer_offsets_for_times(self, partitions, offset=-1): results.append((tp.topic, tp.partition, tp.offset)) return results - def get_low_watermark_offsets(self, partitions): - """Batch-fetch log-start (low watermark) offsets via AdminClient.list_offsets(earliest). - - Uses ListOffsets with the EARLIEST_TIMESTAMP sentinel, which the broker services from - in-memory state without scanning .timeindex segment files. Failures surface as missing - entries so callers skip the earliest-dependent work for those partitions rather than - aborting the whole collection. - """ - requests = {TopicPartition(topic, partition): OffsetSpec.earliest() for topic, partition in partitions} - if not requests: - return {} - - result = {} - errors = 0 - try: - futures = self.kafka_client.list_offsets( - requests, - isolation_level=IsolationLevel.READ_UNCOMMITTED, - request_timeout=self.config._request_timeout, - ) - for tp, future in futures.items(): - try: - result[(tp.topic, tp.partition)] = future.result().offset - except Exception as e: - errors += 1 - if errors <= 3: - self.log.debug("Failed to fetch low watermark for %s:%s: %s", tp.topic, tp.partition, e) - except Exception as e: - self.log.warning("Failed to issue list_offsets request for low watermarks: %s", e) - return {} - if errors: - self.log.warning( - "Failed to fetch low watermark for %d/%d partitions; " - "earliest-dependent metrics will be skipped for those partitions", - errors, - len(requests), - ) - return result - def _list_topics(self): if self._cluster_metadata: return self._cluster_metadata diff --git a/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py b/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py index c336522285349..734f6be7cee90 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py @@ -13,7 +13,8 @@ from typing import Any, NotRequired, TypedDict from urllib.parse import quote -from confluent_kafka.admin import ConfigResource, ResourceType +from confluent_kafka import IsolationLevel, TopicPartition +from confluent_kafka.admin import ConfigResource, OffsetSpec, ResourceType from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS @@ -533,6 +534,61 @@ def _collect_broker_metadata(self, metadata=None): "data-streams-message", ) + def _fetch_earliest_offsets(self, topic_partitions): + """Batch-fetch log-start offsets via AdminClient.list_offsets(earliest). + + Uses ListOffsets with the EARLIEST_TIMESTAMP sentinel, which the broker + services from in-memory state without scanning .timeindex segment files. + Failures are logged and surface as missing entries — the caller skips + the earliest-dependent metrics rather than aborting the whole topic + metadata collection. + """ + requests = { + TopicPartition(topic, partition): OffsetSpec.earliest() + for topic, partitions in topic_partitions.items() + if topic not in KAFKA_INTERNAL_TOPICS + for partition in partitions + } + if not requests: + return {} + + result = {} + errors = 0 + try: + futures = self.client.kafka_client.list_offsets( + requests, + isolation_level=IsolationLevel.READ_UNCOMMITTED, + request_timeout=self.config._request_timeout, + ) + for tp, future in futures.items(): + try: + info = future.result() + result[(tp.topic, tp.partition)] = info.offset + except Exception as e: + errors += 1 + if errors <= 3: + self.log.debug( + "Failed to fetch earliest offset for %s:%s: %s", + tp.topic, + tp.partition, + e, + ) + except Exception as e: + self.log.warning( + "Failed to issue list_offsets request; partition.beginning_offset, " + "partition.size, and topic.size will be skipped this run: %s", + e, + ) + return {} + if errors: + self.log.warning( + "Failed to fetch earliest offset for %d/%d partitions; " + "earliest-dependent metrics will be skipped for those partitions", + errors, + len(requests), + ) + return result + def _collect_topic_metadata(self, metadata, highwater_offsets, low_watermark_offsets): self.log.debug("Collecting topic metadata") diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 1949df851508d..d4e42e06ebdb8 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -94,7 +94,9 @@ def check(self, _): # cache-pruning floor. They require an extra broker call, so fetch them once here and # only under cluster monitoring, then share with the metadata collector below. if self.config._cluster_monitoring_enabled: - low_watermark_offsets = self._get_low_watermark_offsets() + low_watermark_offsets = self.metadata_collector._fetch_earliest_offsets( + self.client.get_topic_partitions() + ) if self._data_streams_enabled: broker_timestamps = self._load_broker_timestamps(persistent_cache_key) # Prune cache entries below the lowest offset any consumer could read: the low @@ -271,20 +273,6 @@ def _load_broker_timestamps(self, persistent_cache_key): self.log.warning('Could not read broker timestamps from cache: %s', str(e)) return broker_timestamps - def _get_low_watermark_offsets(self): - """Fetch low watermark (earliest) offsets for every non-internal partition in the cluster.""" - partitions = { - (topic, partition) - for topic, parts in self.client.get_topic_partitions().items() - if topic not in KAFKA_INTERNAL_TOPICS - for partition in parts - } - try: - return self.client.get_low_watermark_offsets(partitions) - except Exception: - self.log.exception("There was a problem collecting the low watermark offsets.") - return {} - def _earliest_consumer_offsets(self, consumer_offsets): """Return the lowest committed offset per (topic, partition) across all consumer groups.""" earliest = {} diff --git a/kafka_consumer/tests/test_cluster_metadata.py b/kafka_consumer/tests/test_cluster_metadata.py index 103fa480f0140..878625d41b868 100644 --- a/kafka_consumer/tests/test_cluster_metadata.py +++ b/kafka_consumer/tests/test_cluster_metadata.py @@ -178,11 +178,6 @@ def mock_list_offsets(requests, **_kwargs): return result mock_admin_client.list_offsets.side_effect = mock_list_offsets - - def mock_low_watermark_offsets(partitions): - return {(topic, partition): 10 if partition == 0 else 20 for topic, partition in partitions} - - client.get_low_watermark_offsets.side_effect = mock_low_watermark_offsets client.consumer_get_cluster_id_and_list_topics.return_value = (cluster_id, [('test-topic', [0, 1])]) client.list_consumer_group_offsets.return_value = [] client.open_consumer.return_value = None diff --git a/kafka_consumer/tests/test_unit.py b/kafka_consumer/tests/test_unit.py index bf748714779b7..069fc4b899889 100644 --- a/kafka_consumer/tests/test_unit.py +++ b/kafka_consumer/tests/test_unit.py @@ -601,30 +601,6 @@ def test_report_lag_in_time_uses_low_watermark(kafka_instance, check, aggregator aggregator.assert_metric("kafka.estimated_consumer_lag", value=100.0, count=1) -def test_get_low_watermark_offsets_delegates_to_client(kafka_instance, check): - check = check(kafka_instance) - mock_client = seed_mock_client() - mock_client.get_topic_partitions.return_value = {"topic1": [0, 1], "__consumer_offsets": [0]} - mock_client.get_low_watermark_offsets.return_value = {("topic1", 0): 5} - check.client = mock_client - - result = check._get_low_watermark_offsets() - - assert result == {("topic1", 0): 5} - # Internal topics are excluded and every non-internal partition is requested. - assert mock_client.get_low_watermark_offsets.call_args[0][0] == {("topic1", 0), ("topic1", 1)} - - -def test_get_low_watermark_offsets_handles_errors(kafka_instance, check): - check = check(kafka_instance) - mock_client = seed_mock_client() - mock_client.get_topic_partitions.return_value = {"topic1": [0]} - mock_client.get_low_watermark_offsets.side_effect = Exception("boom") - check.client = mock_client - - assert check._get_low_watermark_offsets() == {} - - def test_count_consumer_contexts(check, kafka_instance): kafka_consumer_check = check(kafka_instance) consumer_offsets = { From 70abe3b83bd579001be287a0d0e0ee7e8a230c96 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Thu, 25 Jun 2026 14:25:44 +0200 Subject: [PATCH 07/12] kafka_consumer: use low_watermark_offsets directly in topic metadata Drop the redundant earliest_offsets alias and reference the passed-in low_watermark_offsets directly. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../datadog_checks/kafka_consumer/cluster_metadata.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py b/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py index 734f6be7cee90..a98f4c6289981 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py @@ -603,10 +603,6 @@ def _collect_topic_metadata(self, metadata, highwater_offsets, low_watermark_off self.check.gauge('topic.count', len(topic_partitions), tags=self._get_tags(cluster_id)) - # Low watermark (log-start) offsets are fetched once per run by the check and shared here, - # so partition.size, topic.size, and throughput reuse the same broker call as the lag path. - earliest_offsets = low_watermark_offsets - now_ts = time.time() prev_ts = None previous_partition_offsets = {} @@ -646,7 +642,7 @@ def _collect_topic_metadata(self, metadata, highwater_offsets, low_watermark_off partition_metadata = topic_metadata.partitions.get(partition_id) latest = highwater_offsets.get((topic_name, partition_id), 0) - earliest = earliest_offsets.get((topic_name, partition_id)) + earliest = low_watermark_offsets.get((topic_name, partition_id)) if earliest is None: have_all_earliest = False From 686a373954fefe5db0889c276e08823dc89a06d6 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Thu, 25 Jun 2026 14:52:09 +0200 Subject: [PATCH 08/12] kafka_consumer: address review feedback on lag bounding - Clarify that the left-extrapolation cap bounds lag-in-time regardless of cluster monitoring or the low-watermark floor, and document why there is no symmetric right-side clamp (the newest cached sample is the just-collected highwater, which the consumer offset can never exceed). - Promote ClusterMetadataCollector.fetch_earliest_offsets to a public method since KafkaCheck now calls it across the class boundary. - Log a debug line when the cache-prune floor falls back from the low watermark to the earliest consumer offset. - Extract the Visvalingam-Whyatt significance closure into a module-level _interpolation_error helper. - Parameterize the _visvalingam_whyatt tests; add direct tests for _earliest_consumer_offsets, _prune_below_anchor, and the left-extrapolation cap through report_consumer_offsets_and_lag without a low watermark. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../kafka_consumer/cluster_metadata.py | 2 +- .../kafka_consumer/kafka_consumer.py | 33 +++-- kafka_consumer/tests/test_unit.py | 130 +++++++++++++++--- 3 files changed, 133 insertions(+), 32 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py b/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py index a98f4c6289981..3ee7038317c9d 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py @@ -534,7 +534,7 @@ def _collect_broker_metadata(self, metadata=None): "data-streams-message", ) - def _fetch_earliest_offsets(self, topic_partitions): + def fetch_earliest_offsets(self, topic_partitions): """Batch-fetch log-start offsets via AdminClient.list_offsets(earliest). Uses ListOffsets with the EARLIEST_TIMESTAMP sentinel, which the broker diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index d4e42e06ebdb8..82e3ed297b54d 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -94,14 +94,18 @@ def check(self, _): # cache-pruning floor. They require an extra broker call, so fetch them once here and # only under cluster monitoring, then share with the metadata collector below. if self.config._cluster_monitoring_enabled: - low_watermark_offsets = self.metadata_collector._fetch_earliest_offsets( + low_watermark_offsets = self.metadata_collector.fetch_earliest_offsets( self.client.get_topic_partitions() ) if self._data_streams_enabled: broker_timestamps = self._load_broker_timestamps(persistent_cache_key) # Prune cache entries below the lowest offset any consumer could read: the low # watermark when we have it, otherwise the earliest committed consumer offset. - prune_floors = low_watermark_offsets or self._earliest_consumer_offsets(consumer_offsets) + if low_watermark_offsets: + prune_floors = low_watermark_offsets + else: + self.log.debug("No low watermarks available; pruning cache by earliest consumer offset") + prune_floors = self._earliest_consumer_offsets(consumer_offsets) self._add_broker_timestamps(broker_timestamps, highwater_offsets, prune_floors) self._save_broker_timestamps(broker_timestamps, persistent_cache_key) else: @@ -531,9 +535,14 @@ def _get_interpolated_timestamp(timestamps, offset): timestamp = slope * (offset - offset_after) + timestamp_after if offset < offset_before: - # Extrapolating to the left of the oldest cached sample: clamp the timestamp so the - # reported lag stays bounded instead of growing arbitrarily with the extrapolation. + # Extrapolating to the left of the oldest cached sample: clamp the timestamp so the reported + # lag stays bounded (at most LAG_EXTRAPOLATION_LIMIT_SECONDS beyond the cache window) instead + # of growing arbitrarily with the extrapolation. This bound is independent of cluster + # monitoring and the low-watermark floor — it always applies to lag-in-time. timestamp = max(timestamp, timestamp_before - LAG_EXTRAPOLATION_LIMIT_SECONDS) + # No symmetric right-side clamp: the newest cached sample is the highwater offset we just + # collected this run, and a consumer offset can never exceed the highwater, so extrapolating + # to the right of the newest sample is unreachable here. return timestamp @@ -571,16 +580,11 @@ def _visvalingam_whyatt(timestamps, target_count): nxt = {o: (offsets[i + 1] if i < len(offsets) - 1 else None) for i, o in enumerate(offsets)} alive = set(offsets) - def significance(o): - before, after = prev[o], nxt[o] - predicted = timestamps[before] + (timestamps[after] - timestamps[before]) * (o - before) / (after - before) - return abs(timestamps[o] - predicted) - current = {} heap = [] for o in offsets: if prev[o] is not None and nxt[o] is not None: - current[o] = significance(o) + current[o] = _interpolation_error(o, prev, nxt, timestamps) heap.append((current[o], o)) heapq.heapify(heap) @@ -597,6 +601,13 @@ def significance(o): nxt[before], prev[after] = after, before for neighbor in (before, after): if prev[neighbor] is not None and nxt[neighbor] is not None: - current[neighbor] = significance(neighbor) + current[neighbor] = _interpolation_error(neighbor, prev, nxt, timestamps) heapq.heappush(heap, (current[neighbor], neighbor)) return timestamps + + +def _interpolation_error(o, prev, nxt, timestamps): + """Time error introduced by reconstructing sample ``o`` from its neighbors via linear interpolation.""" + before, after = prev[o], nxt[o] + predicted = timestamps[before] + (timestamps[after] - timestamps[before]) * (o - before) / (after - before) + return abs(timestamps[o] - predicted) diff --git a/kafka_consumer/tests/test_unit.py b/kafka_consumer/tests/test_unit.py index 069fc4b899889..37d9d6e3a4a08 100644 --- a/kafka_consumer/tests/test_unit.py +++ b/kafka_consumer/tests/test_unit.py @@ -10,7 +10,11 @@ from datadog_checks.kafka_consumer import KafkaCheck from datadog_checks.kafka_consumer.client import KafkaClient -from datadog_checks.kafka_consumer.kafka_consumer import _get_interpolated_timestamp, _visvalingam_whyatt +from datadog_checks.kafka_consumer.kafka_consumer import ( + _get_interpolated_timestamp, + _prune_below_anchor, + _visvalingam_whyatt, +) pytestmark = [pytest.mark.unit] @@ -454,25 +458,36 @@ def test_get_interpolated_timestamp_caps_left_extrapolation(): assert _get_interpolated_timestamp({1000: 10000, 1100: 10100}, 1050) == 10050 -def test_visvalingam_whyatt_keeps_endpoints_and_drops_collinear(): - # A perfectly linear series: every interior point is redundant, so any of them can go. - timestamps = {0: 0.0, 10: 10.0, 20: 20.0, 30: 30.0, 40: 40.0} - _visvalingam_whyatt(timestamps, 3) - assert len(timestamps) == 3 - assert 0 in timestamps and 40 in timestamps # endpoints are always preserved - - -def test_visvalingam_whyatt_keeps_rate_changes(): - # Constant rate up to offset 20, then a sharp acceleration: 10 is redundant, the kink at 20 isn't. - timestamps = {0: 0.0, 10: 10.0, 20: 20.0, 30: 100.0} - _visvalingam_whyatt(timestamps, 3) - assert set(timestamps) == {0, 20, 30} - - -def test_visvalingam_whyatt_noop_when_within_target(): - timestamps = {0: 0.0, 10: 10.0} - _visvalingam_whyatt(timestamps, 5) - assert timestamps == {0: 0.0, 10: 10.0} +@pytest.mark.parametrize( + 'timestamps, target_count, expected_len, required_offsets', + [ + pytest.param( + {0: 0.0, 10: 10.0, 20: 20.0, 30: 30.0, 40: 40.0}, + 3, + 3, + {0, 40}, + id='linear series drops collinear interior points, keeps endpoints', + ), + pytest.param( + {0: 0.0, 10: 10.0, 20: 20.0, 30: 100.0}, + 3, + 3, + {0, 20, 30}, + id='keeps the rate-change kink at offset 20, drops redundant offset 10', + ), + pytest.param( + {0: 0.0, 10: 10.0}, + 5, + 2, + {0, 10}, + id='no-op when already within target', + ), + ], +) +def test_visvalingam_whyatt(timestamps, target_count, expected_len, required_offsets): + _visvalingam_whyatt(timestamps, target_count) + assert len(timestamps) == expected_len + assert required_offsets <= set(timestamps) @pytest.mark.parametrize( @@ -601,6 +616,81 @@ def test_report_lag_in_time_uses_low_watermark(kafka_instance, check, aggregator aggregator.assert_metric("kafka.estimated_consumer_lag", value=100.0, count=1) +def test_report_lag_in_time_caps_left_extrapolation_without_low_watermark(kafka_instance, check, aggregator): + # With cluster monitoring off there is no low watermark, so the consumer offset is used as-is. + # When it predates every cached sample, the lag is still bounded by the left-extrapolation cap + # (cache window + LAG_EXTRAPOLATION_LIMIT_SECONDS) rather than growing without limit. + kafka_instance['data_streams_enabled'] = True + check = check(kafka_instance) + mock_client = seed_mock_client() + mock_client.get_partitions_for_topic.return_value = [0] + check.client = mock_client + + consumer_offsets = {"consumer_group1": {("topic1", 0): 0}} + highwater_offsets = {("topic1", 0): 1100} + broker_timestamps = {"topic1_0": {1000: 10000.0, 1100: 10100.0}} + + check.report_consumer_offsets_and_lag( + consumer_offsets, + highwater_offsets, + float('inf'), + broker_timestamps, + "cluster_id", + ) + + # Unclamped extrapolation would give consumer_timestamp 9000 (lag 1100). The cap raises it to + # oldest_sample - 600 = 9400, so lag = 10100 - 9400 = 700 (cache window 100 + 600s budget). + aggregator.assert_metric("kafka.estimated_consumer_lag", value=700.0, count=1) + + +def test_earliest_consumer_offsets_takes_cross_group_min(kafka_instance, check): + # The pruning floor is the lowest committed offset for a partition across every consumer group. + check = check(kafka_instance) + consumer_offsets = { + "group1": {("topic1", 0): 10, ("topic1", 1): 50}, + "group2": {("topic1", 0): 5, ("topic2", 0): 99}, + } + assert check._earliest_consumer_offsets(consumer_offsets) == { + ("topic1", 0): 5, + ("topic1", 1): 50, + ("topic2", 0): 99, + } + + +@pytest.mark.parametrize( + 'timestamps, floor, expected_offsets', + [ + pytest.param( + {10: 1.0, 20: 2.0, 30: 3.0, 40: 4.0}, + 35, + {30, 40}, + id='keeps one anchor (max below floor) and everything at/above it', + ), + pytest.param( + {10: 1.0, 20: 2.0, 30: 3.0}, + 30, + {20, 30}, + id='offset equal to floor is retained, not pruned', + ), + pytest.param( + {30: 3.0, 40: 4.0}, + 10, + {30, 40}, + id='no samples below floor: nothing pruned', + ), + pytest.param( + {20: 2.0, 30: 3.0}, + 25, + {20, 30}, + id='single sample below floor is the anchor and is kept', + ), + ], +) +def test_prune_below_anchor(timestamps, floor, expected_offsets): + _prune_below_anchor(timestamps, floor) + assert set(timestamps) == expected_offsets + + def test_count_consumer_contexts(check, kafka_instance): kafka_consumer_check = check(kafka_instance) consumer_offsets = { From 6559b08f25b0f0b81ad97f4d38abc65d173df8e6 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Thu, 25 Jun 2026 15:05:39 +0200 Subject: [PATCH 09/12] kafka_consumer: trim comments to a single note on the extrapolation cap Co-Authored-By: Claude Opus 4.8 (1M context) --- .../kafka_consumer/kafka_consumer.py | 43 +------------------ 1 file changed, 1 insertion(+), 42 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 82e3ed297b54d..8c28b7ece46d5 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -18,9 +18,7 @@ MAX_TIMESTAMPS = 1000 -# When the consumer offset predates the oldest cached broker sample we extrapolate into the past, -# where the affine offset/timestamp assumption is unreliable. Cap how far back we extrapolate so the -# reported estimated_consumer_lag never exceeds the cached history window by more than this many seconds. +# Cap how far past the oldest cached sample we extrapolate, so estimated lag stays bounded. LAG_EXTRAPOLATION_LIMIT_SECONDS = 600 @@ -89,18 +87,12 @@ def check(self, _): partitions.add((topic, partition)) # Expected format: ({(topic, partition): offset}, cluster_id) highwater_offsets, cluster_id = self.get_highwater_offsets(partitions) - # Low watermark (log-start) offsets feed cluster-metadata metrics (partition.size, - # topic.size, throughput) and, when data streams is enabled, the lag-in-time and - # cache-pruning floor. They require an extra broker call, so fetch them once here and - # only under cluster monitoring, then share with the metadata collector below. if self.config._cluster_monitoring_enabled: low_watermark_offsets = self.metadata_collector.fetch_earliest_offsets( self.client.get_topic_partitions() ) if self._data_streams_enabled: broker_timestamps = self._load_broker_timestamps(persistent_cache_key) - # Prune cache entries below the lowest offset any consumer could read: the low - # watermark when we have it, otherwise the earliest committed consumer offset. if low_watermark_offsets: prune_floors = low_watermark_offsets else: @@ -298,16 +290,10 @@ def _add_broker_timestamps(self, broker_timestamps, highwater_offsets, prune_flo for o in stale: del timestamps[o] timestamps[highwater_offset] = time() - # Once the cache fills up, prune and compact it instead of dropping a single entry. if len(timestamps) >= self._max_timestamps: - # Samples below the lowest offset any consumer can read are useless — no consumer will - # ever interpolate there — so drop them, keeping one anchor at/below that floor. prune_floor = prune_floors.get((topic, partition)) if prune_floor is not None: _prune_below_anchor(timestamps, prune_floor) - # Compact what remains down to half capacity. The Visvalingam-Whyatt simplification keeps - # the oldest and newest samples and discards the points that least affect the - # offset/timestamp curve, so the cache spans a longer history at a coarsening resolution. _visvalingam_whyatt(timestamps, max(2, self._max_timestamps // 2)) def _save_broker_timestamps(self, broker_timestamps, persistent_cache_key): @@ -414,9 +400,6 @@ def report_consumer_offsets_and_lag( timestamps = broker_timestamps["{}_{}".format(topic, partition)] # The producer timestamp can be not set if there was an error fetching broker offsets. producer_timestamp = timestamps.get(producer_offset, None) - # A consumer that has fallen behind the low watermark can never read the messages - # between its committed offset and the low watermark (they are out of retention), so - # its effective position for lag-in-time is the low watermark. low_watermark = low_watermark_offsets.get((topic, partition)) effective_offset = consumer_offset if low_watermark is None else max(consumer_offset, low_watermark) consumer_timestamp = _get_interpolated_timestamp(timestamps, effective_offset) @@ -535,23 +518,11 @@ def _get_interpolated_timestamp(timestamps, offset): timestamp = slope * (offset - offset_after) + timestamp_after if offset < offset_before: - # Extrapolating to the left of the oldest cached sample: clamp the timestamp so the reported - # lag stays bounded (at most LAG_EXTRAPOLATION_LIMIT_SECONDS beyond the cache window) instead - # of growing arbitrarily with the extrapolation. This bound is independent of cluster - # monitoring and the low-watermark floor — it always applies to lag-in-time. timestamp = max(timestamp, timestamp_before - LAG_EXTRAPOLATION_LIMIT_SECONDS) - # No symmetric right-side clamp: the newest cached sample is the highwater offset we just - # collected this run, and a consumer offset can never exceed the highwater, so extrapolating - # to the right of the newest sample is unreachable here. return timestamp def _prune_below_anchor(timestamps, floor): - """Drop cached samples strictly below the largest offset at or below ``floor``, in place. - - Samples below the earliest offset any consumer needs are useless, but we keep one anchor at or - below ``floor`` so the most-lagging consumer can still interpolate (rather than extrapolate). - """ below = [o for o in timestamps if o < floor] if len(below) <= 1: return @@ -562,16 +533,6 @@ def _prune_below_anchor(timestamps, floor): def _visvalingam_whyatt(timestamps, target_count): - """Downsample an offset->timestamp series to ``target_count`` points in place. - - Implements the Visvalingam-Whyatt line simplification algorithm: the two endpoints (the oldest - and newest samples) are always kept, and the interior point whose removal least distorts the - linear offset/timestamp interpolation is dropped repeatedly until the target size is reached. - A point's "significance" is the time error introduced by reconstructing it from its neighbors. - - Runs in O(n log n) using a min-heap keyed by significance, a doubly-linked list of neighbors, - and lazy deletion of heap entries that become stale when a neighbor is removed. - """ if len(timestamps) <= target_count: return timestamps @@ -592,7 +553,6 @@ def _visvalingam_whyatt(timestamps, target_count): while remaining > target_count and heap: error, o = heapq.heappop(heap) if o not in alive or error != current.get(o): - # Stale entry: the point was already removed, or its significance was recomputed. continue before, after = prev[o], nxt[o] alive.discard(o) @@ -607,7 +567,6 @@ def _visvalingam_whyatt(timestamps, target_count): def _interpolation_error(o, prev, nxt, timestamps): - """Time error introduced by reconstructing sample ``o`` from its neighbors via linear interpolation.""" before, after = prev[o], nxt[o] predicted = timestamps[before] + (timestamps[after] - timestamps[before]) * (o - before) / (after - before) return abs(timestamps[o] - predicted) From b9fde56a662ccf4a9ba60bd6033675aab1ae32b0 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Thu, 25 Jun 2026 15:11:25 +0200 Subject: [PATCH 10/12] kafka_consumer: move extrapolation-cap comment to the clamp line Co-Authored-By: Claude Opus 4.8 (1M context) --- kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 8c28b7ece46d5..8d4223fe76b7b 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -18,7 +18,6 @@ MAX_TIMESTAMPS = 1000 -# Cap how far past the oldest cached sample we extrapolate, so estimated lag stays bounded. LAG_EXTRAPOLATION_LIMIT_SECONDS = 600 @@ -518,6 +517,7 @@ def _get_interpolated_timestamp(timestamps, offset): timestamp = slope * (offset - offset_after) + timestamp_after if offset < offset_before: + # Cap how far past the oldest cached sample we extrapolate, so estimated lag stays bounded. timestamp = max(timestamp, timestamp_before - LAG_EXTRAPOLATION_LIMIT_SECONDS) return timestamp From 83c37e4133a2369c3f6d23342afba3c6a7cb573d Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Thu, 25 Jun 2026 15:15:34 +0200 Subject: [PATCH 11/12] kafka_consumer: reuse fetched topic partitions in topic metadata collection Pass the topic-partition map computed in check() through collect_all_metadata into _collect_topic_metadata instead of fetching it again, so the cluster monitoring path makes the same number of get_topic_partitions calls as before. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../datadog_checks/kafka_consumer/cluster_metadata.py | 8 +++----- .../datadog_checks/kafka_consumer/kafka_consumer.py | 10 ++++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py b/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py index 3ee7038317c9d..272467baf2a9c 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py @@ -221,7 +221,7 @@ def _parallel_fetch(self, fn: Callable[[str], Any], subjects: list[str], error_l self.log.warning("Error fetching %s for %s: %s", error_label, subject, e) return results - def collect_all_metadata(self, highwater_offsets, low_watermark_offsets): + def collect_all_metadata(self, highwater_offsets, low_watermark_offsets, topic_partitions): try: shared_metadata = self.client.kafka_client.list_topics(timeout=self.config._request_timeout) except Exception as e: @@ -234,7 +234,7 @@ def collect_all_metadata(self, highwater_offsets, low_watermark_offsets): self.log.error("Error collecting broker metadata: %s", e) try: - self._collect_topic_metadata(shared_metadata, highwater_offsets, low_watermark_offsets) + self._collect_topic_metadata(shared_metadata, highwater_offsets, low_watermark_offsets, topic_partitions) except Exception as e: self.log.error("Error collecting topic metadata: %s", e) @@ -589,11 +589,9 @@ def fetch_earliest_offsets(self, topic_partitions): ) return result - def _collect_topic_metadata(self, metadata, highwater_offsets, low_watermark_offsets): + def _collect_topic_metadata(self, metadata, highwater_offsets, low_watermark_offsets, topic_partitions): self.log.debug("Collecting topic metadata") - topic_partitions = self.client.get_topic_partitions() - cluster_id = self.config._kafka_cluster_id_override or ( metadata.cluster_id if hasattr(metadata, 'cluster_id') else 'unknown' ) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 8d4223fe76b7b..6ba292e9e6e1b 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -67,6 +67,7 @@ def check(self, _): highwater_offsets = {} broker_timestamps = defaultdict(dict) low_watermark_offsets = {} + topic_partitions = {} cluster_id = "" persistent_cache_key = "broker_timestamps_" consumer_contexts_count = self.count_consumer_contexts(consumer_offsets) @@ -87,9 +88,8 @@ def check(self, _): # Expected format: ({(topic, partition): offset}, cluster_id) highwater_offsets, cluster_id = self.get_highwater_offsets(partitions) if self.config._cluster_monitoring_enabled: - low_watermark_offsets = self.metadata_collector.fetch_earliest_offsets( - self.client.get_topic_partitions() - ) + topic_partitions = self.client.get_topic_partitions() + low_watermark_offsets = self.metadata_collector.fetch_earliest_offsets(topic_partitions) if self._data_streams_enabled: broker_timestamps = self._load_broker_timestamps(persistent_cache_key) if low_watermark_offsets: @@ -145,7 +145,9 @@ def check(self, _): if self.config._cluster_monitoring_enabled: self._send_cluster_monitoring_heartbeat(total_contexts, cluster_id) try: - self.metadata_collector.collect_all_metadata(highwater_offsets, low_watermark_offsets) + self.metadata_collector.collect_all_metadata( + highwater_offsets, low_watermark_offsets, topic_partitions + ) except Exception as e: self.log.error("Error collecting cluster metadata: %s", e) From 9f76e7821a0c4b7592155549a2cb0fd631e38060 Mon Sep 17 00:00:00 2001 From: Piotr Wolski Date: Thu, 25 Jun 2026 15:19:07 +0200 Subject: [PATCH 12/12] kafka_consumer: satisfy ruff formatting for collect_all_metadata call Co-Authored-By: Claude Opus 4.8 (1M context) --- .../datadog_checks/kafka_consumer/kafka_consumer.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 6ba292e9e6e1b..20e2e30b72302 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -145,9 +145,7 @@ def check(self, _): if self.config._cluster_monitoring_enabled: self._send_cluster_monitoring_heartbeat(total_contexts, cluster_id) try: - self.metadata_collector.collect_all_metadata( - highwater_offsets, low_watermark_offsets, topic_partitions - ) + self.metadata_collector.collect_all_metadata(highwater_offsets, low_watermark_offsets, topic_partitions) except Exception as e: self.log.error("Error collecting cluster metadata: %s", e)