Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka_consumer/changelog.d/24167.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve the accuracy of ``estimated_consumer_lag`` for consumers that are far behind: cap interpolation for offsets older than the cached broker history, use the low watermark as a floor for the lag offset when cluster monitoring is enabled, and retain a longer broker-timestamp history by compacting the cache (Visvalingam-Whyatt) and pruning samples below the lowest readable offset (the low watermark, or the earliest consumer offset when cluster monitoring is disabled) instead of evicting the oldest one.
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def _parallel_fetch(self, fn: Callable[[str], Any], subjects: list[str], error_l
self.log.warning("Error fetching %s for %s: %s", error_label, subject, e)
return results

def collect_all_metadata(self, highwater_offsets):
def collect_all_metadata(self, highwater_offsets, low_watermark_offsets, topic_partitions):
try:
shared_metadata = self.client.kafka_client.list_topics(timeout=self.config._request_timeout)
except Exception as e:
Expand All @@ -234,7 +234,7 @@ def collect_all_metadata(self, highwater_offsets):
self.log.error("Error collecting broker metadata: %s", e)

try:
self._collect_topic_metadata(shared_metadata, highwater_offsets)
self._collect_topic_metadata(shared_metadata, highwater_offsets, low_watermark_offsets, topic_partitions)
except Exception as e:
self.log.error("Error collecting topic metadata: %s", e)

Expand Down Expand Up @@ -534,7 +534,7 @@ def _collect_broker_metadata(self, metadata=None):
"data-streams-message",
)

def _fetch_earliest_offsets(self, topic_partitions):
def fetch_earliest_offsets(self, topic_partitions):
"""Batch-fetch log-start offsets via AdminClient.list_offsets(earliest).

Uses ListOffsets with the EARLIEST_TIMESTAMP sentinel, which the broker
Expand Down Expand Up @@ -589,11 +589,9 @@ def _fetch_earliest_offsets(self, topic_partitions):
)
return result

def _collect_topic_metadata(self, metadata, highwater_offsets):
def _collect_topic_metadata(self, metadata, highwater_offsets, low_watermark_offsets, topic_partitions):
self.log.debug("Collecting topic metadata")

topic_partitions = self.client.get_topic_partitions()

cluster_id = self.config._kafka_cluster_id_override or (
metadata.cluster_id if hasattr(metadata, 'cluster_id') else 'unknown'
)
Expand All @@ -603,8 +601,6 @@ def _collect_topic_metadata(self, metadata, highwater_offsets):

self.check.gauge('topic.count', len(topic_partitions), tags=self._get_tags(cluster_id))

earliest_offsets = self._fetch_earliest_offsets(topic_partitions)

now_ts = time.time()
prev_ts = None
previous_partition_offsets = {}
Expand Down Expand Up @@ -644,7 +640,7 @@ def _collect_topic_metadata(self, metadata, highwater_offsets):

partition_metadata = topic_metadata.partitions.get(partition_id)
latest = highwater_offsets.get((topic_name, partition_id), 0)
earliest = earliest_offsets.get((topic_name, partition_id))
earliest = low_watermark_offsets.get((topic_name, partition_id))

if earliest is None:
have_all_earliest = False
Expand Down
107 changes: 97 additions & 10 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# (C) Datadog, Inc. 2019-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
import heapq
import json
from collections import defaultdict
from time import time
Expand All @@ -17,6 +18,8 @@

MAX_TIMESTAMPS = 1000

LAG_EXTRAPOLATION_LIMIT_SECONDS = 600


class KafkaCheck(AgentCheck):
__NAMESPACE__ = 'kafka'
Expand Down Expand Up @@ -63,6 +66,8 @@ def check(self, _):
# Fetch the broker highwater offsets
highwater_offsets = {}
broker_timestamps = defaultdict(dict)
low_watermark_offsets = {}
topic_partitions = {}
cluster_id = ""
persistent_cache_key = "broker_timestamps_"
consumer_contexts_count = self.count_consumer_contexts(consumer_offsets)
Expand All @@ -82,9 +87,17 @@ def check(self, _):
partitions.add((topic, partition))
# Expected format: ({(topic, partition): offset}, cluster_id)
highwater_offsets, cluster_id = self.get_highwater_offsets(partitions)
if self.config._cluster_monitoring_enabled:
topic_partitions = self.client.get_topic_partitions()
low_watermark_offsets = self.metadata_collector.fetch_earliest_offsets(topic_partitions)
if self._data_streams_enabled:
broker_timestamps = self._load_broker_timestamps(persistent_cache_key)
self._add_broker_timestamps(broker_timestamps, highwater_offsets)
if low_watermark_offsets:
prune_floors = low_watermark_offsets
else:
self.log.debug("No low watermarks available; pruning cache by earliest consumer offset")
prune_floors = self._earliest_consumer_offsets(consumer_offsets)
self._add_broker_timestamps(broker_timestamps, highwater_offsets, prune_floors)
self._save_broker_timestamps(broker_timestamps, persistent_cache_key)
else:
self.warning("Context limit reached. Skipping highwater offset collection.")
Expand Down Expand Up @@ -125,13 +138,14 @@ def check(self, _):
reporting_limit - len(highwater_offsets),
broker_timestamps,
cluster_id,
low_watermark_offsets,
)

# Collect cluster metadata if enabled
if self.config._cluster_monitoring_enabled:
self._send_cluster_monitoring_heartbeat(total_contexts, cluster_id)
try:
self.metadata_collector.collect_all_metadata(highwater_offsets)
self.metadata_collector.collect_all_metadata(highwater_offsets, low_watermark_offsets, topic_partitions)
except Exception as e:
self.log.error("Error collecting cluster metadata: %s", e)

Expand Down Expand Up @@ -254,7 +268,17 @@ def _load_broker_timestamps(self, persistent_cache_key):
self.log.warning('Could not read broker timestamps from cache: %s', str(e))
return broker_timestamps

def _add_broker_timestamps(self, broker_timestamps, highwater_offsets):
def _earliest_consumer_offsets(self, consumer_offsets):
"""Return the lowest committed offset per (topic, partition) across all consumer groups."""
earliest = {}
for offsets in consumer_offsets.values():
for topic_partition, offset in offsets.items():
if topic_partition not in earliest or offset < earliest[topic_partition]:
earliest[topic_partition] = offset
return earliest

def _add_broker_timestamps(self, broker_timestamps, highwater_offsets, prune_floors=None):
prune_floors = prune_floors or {}
for (topic, partition), highwater_offset in highwater_offsets.items():
timestamps = broker_timestamps["{}_{}".format(topic, partition)]
# If the highwater offset went backwards (topic recreated,
Expand All @@ -265,11 +289,11 @@ def _add_broker_timestamps(self, broker_timestamps, highwater_offsets):
for o in stale:
del timestamps[o]
timestamps[highwater_offset] = time()
# If there's too many timestamps, we delete the oldest one (by
# timestamp, not by offset — evicting by min offset would discard
# the fresh post-reset entries and keep poisonous stale ones).
if len(timestamps) > self._max_timestamps:
del timestamps[min(timestamps, key=timestamps.get)]
if len(timestamps) >= self._max_timestamps:
prune_floor = prune_floors.get((topic, partition))
if prune_floor is not None:
_prune_below_anchor(timestamps, prune_floor)
_visvalingam_whyatt(timestamps, max(2, self._max_timestamps // 2))

def _save_broker_timestamps(self, broker_timestamps, persistent_cache_key):
"""Saves broker timestamps to persistent cache."""
Expand All @@ -292,9 +316,16 @@ def report_highwater_offsets(self, highwater_offsets, contexts_limit, cluster_id
self.log.debug('%s highwater offsets reported', reported_contexts)

def report_consumer_offsets_and_lag(
self, consumer_offsets, highwater_offsets, contexts_limit, broker_timestamps, cluster_id
self,
consumer_offsets,
highwater_offsets,
contexts_limit,
broker_timestamps,
cluster_id,
low_watermark_offsets=None,
):
"""Report the consumer offsets and consumer lag."""
low_watermark_offsets = low_watermark_offsets or {}
reported_contexts = 0
self.log.debug("Reporting consumer offsets and lag metrics")
for consumer_group, offsets in consumer_offsets.items():
Expand Down Expand Up @@ -368,7 +399,9 @@ def report_consumer_offsets_and_lag(
timestamps = broker_timestamps["{}_{}".format(topic, partition)]
# The producer timestamp can be not set if there was an error fetching broker offsets.
producer_timestamp = timestamps.get(producer_offset, None)
consumer_timestamp = _get_interpolated_timestamp(timestamps, consumer_offset)
low_watermark = low_watermark_offsets.get((topic, partition))
effective_offset = consumer_offset if low_watermark is None else max(consumer_offset, low_watermark)
consumer_timestamp = _get_interpolated_timestamp(timestamps, effective_offset)
if consumer_timestamp is None or producer_timestamp is None:
continue
lag = producer_timestamp - consumer_timestamp
Expand Down Expand Up @@ -482,4 +515,58 @@ def _get_interpolated_timestamp(timestamps, offset):
timestamp_after = timestamps[offset_after]
slope = (timestamp_after - timestamp_before) / float(offset_after - offset_before)
timestamp = slope * (offset - offset_after) + timestamp_after

if offset < offset_before:
# Cap how far past the oldest cached sample we extrapolate, so estimated lag stays bounded.
timestamp = max(timestamp, timestamp_before - LAG_EXTRAPOLATION_LIMIT_SECONDS)
return timestamp


def _prune_below_anchor(timestamps, floor):
below = [o for o in timestamps if o < floor]
if len(below) <= 1:
return
anchor = max(below)
for o in below:
if o != anchor:
del timestamps[o]


def _visvalingam_whyatt(timestamps, target_count):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a good bit slower than the previous eviction behavior. Do you think it will take too long on clusters with a lot of partitions?

@piochelepiotr piochelepiotr Jun 25, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's O(nlog(m)), so it will still be very fast. And it only runs 1/500 of the time.
The previous one was O(n
log(n)) and was running every time.
I don't expect that to be an issue.

if len(timestamps) <= target_count:
return timestamps

offsets = sorted(timestamps)
prev = {o: (offsets[i - 1] if i > 0 else None) for i, o in enumerate(offsets)}
nxt = {o: (offsets[i + 1] if i < len(offsets) - 1 else None) for i, o in enumerate(offsets)}
alive = set(offsets)

current = {}
heap = []
for o in offsets:
if prev[o] is not None and nxt[o] is not None:
current[o] = _interpolation_error(o, prev, nxt, timestamps)
heap.append((current[o], o))
Comment on lines +546 to +549

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Allow stale reset endpoints to age out

When a topic is recreated or reset and the new highwater is still above an old cached offset, the stale purge above only removes cached offsets greater than the new highwater, so lower offsets from the previous topic generation can remain. This compactor never enqueues the first/last offset, so the smallest cached offset is preserved forever instead of eventually being evicted by age as before; that stale timestamp can then keep poisoning estimated_consumer_lag interpolation for the new topic generation. Consider allowing endpoint samples that predate a reset to be dropped, or falling back to timestamp-based eviction for those cases.

Useful? React with 👍 / 👎.

heapq.heapify(heap)

remaining = len(offsets)
while remaining > target_count and heap:
error, o = heapq.heappop(heap)
if o not in alive or error != current.get(o):
continue
before, after = prev[o], nxt[o]
alive.discard(o)
del timestamps[o]
remaining -= 1
nxt[before], prev[after] = after, before
for neighbor in (before, after):
if prev[neighbor] is not None and nxt[neighbor] is not None:
current[neighbor] = _interpolation_error(neighbor, prev, nxt, timestamps)
heapq.heappush(heap, (current[neighbor], neighbor))
return timestamps


def _interpolation_error(o, prev, nxt, timestamps):
before, after = prev[o], nxt[o]
predicted = timestamps[before] + (timestamps[after] - timestamps[before]) * (o - before) / (after - before)
return abs(timestamps[o] - predicted)
Loading
Loading