diff --git a/kafka_consumer/changelog.d/24167.fixed b/kafka_consumer/changelog.d/24167.fixed new file mode 100644 index 0000000000000..3898228f1d51c --- /dev/null +++ b/kafka_consumer/changelog.d/24167.fixed @@ -0,0 +1 @@ +Improve the accuracy of ``estimated_consumer_lag`` for consumers that are far behind: cap interpolation for offsets older than the cached broker history, use the low watermark as a floor for the lag offset when cluster monitoring is enabled, and retain a longer broker-timestamp history by compacting the cache (Visvalingam-Whyatt) and pruning samples below the lowest readable offset (the low watermark, or the earliest consumer offset when cluster monitoring is disabled) instead of evicting the oldest one. diff --git a/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py b/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py index 684dc5b02c002..272467baf2a9c 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/cluster_metadata.py @@ -221,7 +221,7 @@ def _parallel_fetch(self, fn: Callable[[str], Any], subjects: list[str], error_l self.log.warning("Error fetching %s for %s: %s", error_label, subject, e) return results - def collect_all_metadata(self, highwater_offsets): + def collect_all_metadata(self, highwater_offsets, low_watermark_offsets, topic_partitions): try: shared_metadata = self.client.kafka_client.list_topics(timeout=self.config._request_timeout) except Exception as e: @@ -234,7 +234,7 @@ def collect_all_metadata(self, highwater_offsets): self.log.error("Error collecting broker metadata: %s", e) try: - self._collect_topic_metadata(shared_metadata, highwater_offsets) + self._collect_topic_metadata(shared_metadata, highwater_offsets, low_watermark_offsets, topic_partitions) except Exception as e: self.log.error("Error collecting topic metadata: %s", e) @@ -534,7 +534,7 @@ def _collect_broker_metadata(self, metadata=None): "data-streams-message", ) - def _fetch_earliest_offsets(self, topic_partitions): + def fetch_earliest_offsets(self, topic_partitions): """Batch-fetch log-start offsets via AdminClient.list_offsets(earliest). Uses ListOffsets with the EARLIEST_TIMESTAMP sentinel, which the broker @@ -589,11 +589,9 @@ def _fetch_earliest_offsets(self, topic_partitions): ) return result - def _collect_topic_metadata(self, metadata, highwater_offsets): + def _collect_topic_metadata(self, metadata, highwater_offsets, low_watermark_offsets, topic_partitions): self.log.debug("Collecting topic metadata") - topic_partitions = self.client.get_topic_partitions() - cluster_id = self.config._kafka_cluster_id_override or ( metadata.cluster_id if hasattr(metadata, 'cluster_id') else 'unknown' ) @@ -603,8 +601,6 @@ def _collect_topic_metadata(self, metadata, highwater_offsets): self.check.gauge('topic.count', len(topic_partitions), tags=self._get_tags(cluster_id)) - earliest_offsets = self._fetch_earliest_offsets(topic_partitions) - now_ts = time.time() prev_ts = None previous_partition_offsets = {} @@ -644,7 +640,7 @@ def _collect_topic_metadata(self, metadata, highwater_offsets): partition_metadata = topic_metadata.partitions.get(partition_id) latest = highwater_offsets.get((topic_name, partition_id), 0) - earliest = earliest_offsets.get((topic_name, partition_id)) + earliest = low_watermark_offsets.get((topic_name, partition_id)) if earliest is None: have_all_earliest = False diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 0ac2e213a5748..20e2e30b72302 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -1,6 +1,7 @@ # (C) Datadog, Inc. 2019-present # All rights reserved # Licensed under Simplified BSD License (see LICENSE) +import heapq import json from collections import defaultdict from time import time @@ -17,6 +18,8 @@ MAX_TIMESTAMPS = 1000 +LAG_EXTRAPOLATION_LIMIT_SECONDS = 600 + class KafkaCheck(AgentCheck): __NAMESPACE__ = 'kafka' @@ -63,6 +66,8 @@ def check(self, _): # Fetch the broker highwater offsets highwater_offsets = {} broker_timestamps = defaultdict(dict) + low_watermark_offsets = {} + topic_partitions = {} cluster_id = "" persistent_cache_key = "broker_timestamps_" consumer_contexts_count = self.count_consumer_contexts(consumer_offsets) @@ -82,9 +87,17 @@ def check(self, _): partitions.add((topic, partition)) # Expected format: ({(topic, partition): offset}, cluster_id) highwater_offsets, cluster_id = self.get_highwater_offsets(partitions) + if self.config._cluster_monitoring_enabled: + topic_partitions = self.client.get_topic_partitions() + low_watermark_offsets = self.metadata_collector.fetch_earliest_offsets(topic_partitions) if self._data_streams_enabled: broker_timestamps = self._load_broker_timestamps(persistent_cache_key) - self._add_broker_timestamps(broker_timestamps, highwater_offsets) + if low_watermark_offsets: + prune_floors = low_watermark_offsets + else: + self.log.debug("No low watermarks available; pruning cache by earliest consumer offset") + prune_floors = self._earliest_consumer_offsets(consumer_offsets) + self._add_broker_timestamps(broker_timestamps, highwater_offsets, prune_floors) self._save_broker_timestamps(broker_timestamps, persistent_cache_key) else: self.warning("Context limit reached. Skipping highwater offset collection.") @@ -125,13 +138,14 @@ def check(self, _): reporting_limit - len(highwater_offsets), broker_timestamps, cluster_id, + low_watermark_offsets, ) # Collect cluster metadata if enabled if self.config._cluster_monitoring_enabled: self._send_cluster_monitoring_heartbeat(total_contexts, cluster_id) try: - self.metadata_collector.collect_all_metadata(highwater_offsets) + self.metadata_collector.collect_all_metadata(highwater_offsets, low_watermark_offsets, topic_partitions) except Exception as e: self.log.error("Error collecting cluster metadata: %s", e) @@ -254,7 +268,17 @@ def _load_broker_timestamps(self, persistent_cache_key): self.log.warning('Could not read broker timestamps from cache: %s', str(e)) return broker_timestamps - def _add_broker_timestamps(self, broker_timestamps, highwater_offsets): + def _earliest_consumer_offsets(self, consumer_offsets): + """Return the lowest committed offset per (topic, partition) across all consumer groups.""" + earliest = {} + for offsets in consumer_offsets.values(): + for topic_partition, offset in offsets.items(): + if topic_partition not in earliest or offset < earliest[topic_partition]: + earliest[topic_partition] = offset + return earliest + + def _add_broker_timestamps(self, broker_timestamps, highwater_offsets, prune_floors=None): + prune_floors = prune_floors or {} for (topic, partition), highwater_offset in highwater_offsets.items(): timestamps = broker_timestamps["{}_{}".format(topic, partition)] # If the highwater offset went backwards (topic recreated, @@ -265,11 +289,11 @@ def _add_broker_timestamps(self, broker_timestamps, highwater_offsets): for o in stale: del timestamps[o] timestamps[highwater_offset] = time() - # If there's too many timestamps, we delete the oldest one (by - # timestamp, not by offset — evicting by min offset would discard - # the fresh post-reset entries and keep poisonous stale ones). - if len(timestamps) > self._max_timestamps: - del timestamps[min(timestamps, key=timestamps.get)] + if len(timestamps) >= self._max_timestamps: + prune_floor = prune_floors.get((topic, partition)) + if prune_floor is not None: + _prune_below_anchor(timestamps, prune_floor) + _visvalingam_whyatt(timestamps, max(2, self._max_timestamps // 2)) def _save_broker_timestamps(self, broker_timestamps, persistent_cache_key): """Saves broker timestamps to persistent cache.""" @@ -292,9 +316,16 @@ def report_highwater_offsets(self, highwater_offsets, contexts_limit, cluster_id self.log.debug('%s highwater offsets reported', reported_contexts) def report_consumer_offsets_and_lag( - self, consumer_offsets, highwater_offsets, contexts_limit, broker_timestamps, cluster_id + self, + consumer_offsets, + highwater_offsets, + contexts_limit, + broker_timestamps, + cluster_id, + low_watermark_offsets=None, ): """Report the consumer offsets and consumer lag.""" + low_watermark_offsets = low_watermark_offsets or {} reported_contexts = 0 self.log.debug("Reporting consumer offsets and lag metrics") for consumer_group, offsets in consumer_offsets.items(): @@ -368,7 +399,9 @@ def report_consumer_offsets_and_lag( timestamps = broker_timestamps["{}_{}".format(topic, partition)] # The producer timestamp can be not set if there was an error fetching broker offsets. producer_timestamp = timestamps.get(producer_offset, None) - consumer_timestamp = _get_interpolated_timestamp(timestamps, consumer_offset) + low_watermark = low_watermark_offsets.get((topic, partition)) + effective_offset = consumer_offset if low_watermark is None else max(consumer_offset, low_watermark) + consumer_timestamp = _get_interpolated_timestamp(timestamps, effective_offset) if consumer_timestamp is None or producer_timestamp is None: continue lag = producer_timestamp - consumer_timestamp @@ -482,4 +515,58 @@ def _get_interpolated_timestamp(timestamps, offset): timestamp_after = timestamps[offset_after] slope = (timestamp_after - timestamp_before) / float(offset_after - offset_before) timestamp = slope * (offset - offset_after) + timestamp_after + + if offset < offset_before: + # Cap how far past the oldest cached sample we extrapolate, so estimated lag stays bounded. + timestamp = max(timestamp, timestamp_before - LAG_EXTRAPOLATION_LIMIT_SECONDS) return timestamp + + +def _prune_below_anchor(timestamps, floor): + below = [o for o in timestamps if o < floor] + if len(below) <= 1: + return + anchor = max(below) + for o in below: + if o != anchor: + del timestamps[o] + + +def _visvalingam_whyatt(timestamps, target_count): + if len(timestamps) <= target_count: + return timestamps + + offsets = sorted(timestamps) + prev = {o: (offsets[i - 1] if i > 0 else None) for i, o in enumerate(offsets)} + nxt = {o: (offsets[i + 1] if i < len(offsets) - 1 else None) for i, o in enumerate(offsets)} + alive = set(offsets) + + current = {} + heap = [] + for o in offsets: + if prev[o] is not None and nxt[o] is not None: + current[o] = _interpolation_error(o, prev, nxt, timestamps) + heap.append((current[o], o)) + heapq.heapify(heap) + + remaining = len(offsets) + while remaining > target_count and heap: + error, o = heapq.heappop(heap) + if o not in alive or error != current.get(o): + continue + before, after = prev[o], nxt[o] + alive.discard(o) + del timestamps[o] + remaining -= 1 + nxt[before], prev[after] = after, before + for neighbor in (before, after): + if prev[neighbor] is not None and nxt[neighbor] is not None: + current[neighbor] = _interpolation_error(neighbor, prev, nxt, timestamps) + heapq.heappush(heap, (current[neighbor], neighbor)) + return timestamps + + +def _interpolation_error(o, prev, nxt, timestamps): + before, after = prev[o], nxt[o] + predicted = timestamps[before] + (timestamps[after] - timestamps[before]) * (o - before) / (after - before) + return abs(timestamps[o] - predicted) diff --git a/kafka_consumer/tests/test_unit.py b/kafka_consumer/tests/test_unit.py index 0653cbd44a1c4..37d9d6e3a4a08 100644 --- a/kafka_consumer/tests/test_unit.py +++ b/kafka_consumer/tests/test_unit.py @@ -10,7 +10,11 @@ from datadog_checks.kafka_consumer import KafkaCheck from datadog_checks.kafka_consumer.client import KafkaClient -from datadog_checks.kafka_consumer.kafka_consumer import _get_interpolated_timestamp +from datadog_checks.kafka_consumer.kafka_consumer import ( + _get_interpolated_timestamp, + _prune_below_anchor, + _visvalingam_whyatt, +) pytestmark = [pytest.mark.unit] @@ -444,6 +448,48 @@ def test_get_interpolated_timestamp(): assert _get_interpolated_timestamp({10: 200}, 15) is None +def test_get_interpolated_timestamp_caps_left_extrapolation(): + # Slope is 1 timestamp unit per offset; offset 0 is 1100 offsets before the oldest sample. + # Unclamped this would extrapolate to 9000, but we cap it at oldest_sample - 600 = 9400. + assert _get_interpolated_timestamp({1000: 10000, 1100: 10100}, 0) == 9400 + # A modest left-extrapolation that stays within the 600s budget is left untouched. + assert _get_interpolated_timestamp({1000: 10000, 1100: 10100}, 900) == 9900 + # Interpolation between known offsets is never affected by the cap. + assert _get_interpolated_timestamp({1000: 10000, 1100: 10100}, 1050) == 10050 + + +@pytest.mark.parametrize( + 'timestamps, target_count, expected_len, required_offsets', + [ + pytest.param( + {0: 0.0, 10: 10.0, 20: 20.0, 30: 30.0, 40: 40.0}, + 3, + 3, + {0, 40}, + id='linear series drops collinear interior points, keeps endpoints', + ), + pytest.param( + {0: 0.0, 10: 10.0, 20: 20.0, 30: 100.0}, + 3, + 3, + {0, 20, 30}, + id='keeps the rate-change kink at offset 20, drops redundant offset 10', + ), + pytest.param( + {0: 0.0, 10: 10.0}, + 5, + 2, + {0, 10}, + id='no-op when already within target', + ), + ], +) +def test_visvalingam_whyatt(timestamps, target_count, expected_len, required_offsets): + _visvalingam_whyatt(timestamps, target_count) + assert len(timestamps) == expected_len + assert required_offsets <= set(timestamps) + + @pytest.mark.parametrize( 'persistent_cache_contents, instance_overrides, consumer_lag_seconds_count', [ @@ -515,19 +561,134 @@ def test_add_broker_timestamps_purges_stale_offsets_on_reset(kafka_instance, che assert 170 in timestamps -def test_add_broker_timestamps_evicts_by_oldest_timestamp(kafka_instance, check): - # Eviction must drop the entry with the oldest timestamp, not the smallest - # offset. Evicting by min(offset) would discard fresh post-reset entries - # and keep poisonous ones. - kafka_instance['timestamp_history_size'] = 2 +def test_add_broker_timestamps_compacts_when_full(kafka_instance, check): + # When the cache reaches capacity it is compacted down to half its size, keeping the oldest and + # newest samples and dropping the points that least affect the offset/timestamp curve. + kafka_instance['timestamp_history_size'] = 4 check = check(kafka_instance) - broker_timestamps = {"topic1_0": {500: 50.0, 400: 999.0}} - check._add_broker_timestamps(broker_timestamps, {("topic1", 0): 600}) + broker_timestamps = {"topic1_0": {10: 1.0, 20: 2.0, 30: 3.0}} + check._add_broker_timestamps(broker_timestamps, {("topic1", 0): 40}) timestamps = broker_timestamps["topic1_0"] - assert 500 not in timestamps # oldest by timestamp - assert 400 in timestamps - assert 600 in timestamps + assert len(timestamps) == 2 + assert 10 in timestamps # oldest endpoint preserved + assert 40 in timestamps # newest endpoint preserved + + +def test_add_broker_timestamps_prunes_below_earliest_consumer_offset(kafka_instance, check): + # At compaction time, samples older than the earliest consumer offset are useless (no consumer + # will ever interpolate there), so they are pruned — keeping one anchor at/below that offset. + kafka_instance['timestamp_history_size'] = 4 + check = check(kafka_instance) + broker_timestamps = {"topic1_0": {10: 1.0, 20: 2.0, 30: 3.0}} + check._add_broker_timestamps(broker_timestamps, {("topic1", 0): 40}, {("topic1", 0): 25}) + + timestamps = broker_timestamps["topic1_0"] + assert 10 not in timestamps # useless: below the earliest consumer offset and not the anchor + assert 20 in timestamps # anchor: largest sample at/below the earliest consumer offset + assert 40 in timestamps + + +def test_report_lag_in_time_uses_low_watermark(kafka_instance, check, aggregator): + # A consumer behind the low watermark can't read the out-of-retention messages between its + # committed offset and the low watermark, so lag-in-time is interpolated from the low watermark. + kafka_instance['data_streams_enabled'] = True + check = check(kafka_instance) + mock_client = seed_mock_client() + mock_client.get_partitions_for_topic.return_value = [0] + check.client = mock_client + + consumer_offsets = {"consumer_group1": {("topic1", 0): 5}} + highwater_offsets = {("topic1", 0): 100} + broker_timestamps = {"topic1_0": {50: 1000.0, 100: 1100.0}} + low_watermark_offsets = {("topic1", 0): 50} + + check.report_consumer_offsets_and_lag( + consumer_offsets, + highwater_offsets, + float('inf'), + broker_timestamps, + "cluster_id", + low_watermark_offsets, + ) + + # effective offset = max(5, 50) = 50 -> consumer_timestamp 1000, producer_timestamp 1100, lag 100. + aggregator.assert_metric("kafka.estimated_consumer_lag", value=100.0, count=1) + + +def test_report_lag_in_time_caps_left_extrapolation_without_low_watermark(kafka_instance, check, aggregator): + # With cluster monitoring off there is no low watermark, so the consumer offset is used as-is. + # When it predates every cached sample, the lag is still bounded by the left-extrapolation cap + # (cache window + LAG_EXTRAPOLATION_LIMIT_SECONDS) rather than growing without limit. + kafka_instance['data_streams_enabled'] = True + check = check(kafka_instance) + mock_client = seed_mock_client() + mock_client.get_partitions_for_topic.return_value = [0] + check.client = mock_client + + consumer_offsets = {"consumer_group1": {("topic1", 0): 0}} + highwater_offsets = {("topic1", 0): 1100} + broker_timestamps = {"topic1_0": {1000: 10000.0, 1100: 10100.0}} + + check.report_consumer_offsets_and_lag( + consumer_offsets, + highwater_offsets, + float('inf'), + broker_timestamps, + "cluster_id", + ) + + # Unclamped extrapolation would give consumer_timestamp 9000 (lag 1100). The cap raises it to + # oldest_sample - 600 = 9400, so lag = 10100 - 9400 = 700 (cache window 100 + 600s budget). + aggregator.assert_metric("kafka.estimated_consumer_lag", value=700.0, count=1) + + +def test_earliest_consumer_offsets_takes_cross_group_min(kafka_instance, check): + # The pruning floor is the lowest committed offset for a partition across every consumer group. + check = check(kafka_instance) + consumer_offsets = { + "group1": {("topic1", 0): 10, ("topic1", 1): 50}, + "group2": {("topic1", 0): 5, ("topic2", 0): 99}, + } + assert check._earliest_consumer_offsets(consumer_offsets) == { + ("topic1", 0): 5, + ("topic1", 1): 50, + ("topic2", 0): 99, + } + + +@pytest.mark.parametrize( + 'timestamps, floor, expected_offsets', + [ + pytest.param( + {10: 1.0, 20: 2.0, 30: 3.0, 40: 4.0}, + 35, + {30, 40}, + id='keeps one anchor (max below floor) and everything at/above it', + ), + pytest.param( + {10: 1.0, 20: 2.0, 30: 3.0}, + 30, + {20, 30}, + id='offset equal to floor is retained, not pruned', + ), + pytest.param( + {30: 3.0, 40: 4.0}, + 10, + {30, 40}, + id='no samples below floor: nothing pruned', + ), + pytest.param( + {20: 2.0, 30: 3.0}, + 25, + {20, 30}, + id='single sample below floor is the anchor and is kept', + ), + ], +) +def test_prune_below_anchor(timestamps, floor, expected_offsets): + _prune_below_anchor(timestamps, floor) + assert set(timestamps) == expected_offsets def test_count_consumer_contexts(check, kafka_instance):