Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 36 additions & 10 deletions kafka_actions/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -500,58 +500,84 @@ files:
description: |
Configuration for updating consumer group offsets.
WARNING: Can cause duplicate processing or data loss.
Consumer group should be stopped (no active members).
The consumer group must have no active members before this action runs;
the check enforces this and will fail with a clear error if members are found.

The 'cluster' value must be the Kafka-internal cluster UUID (e.g.
"MkU3OEVBNTcwNTJENDM2Qg"), not a human-readable name. Retrieve it with:
kafka-topics.sh --bootstrap-server <host>:9092 --describe | head -1
or via the AdminClient metadata: admin.list_topics().cluster_id.

Each entry in 'offsets' must specify either an explicit 'offset' (a
non-negative integer) or a 'reset_to' policy ('earliest' or 'latest').
Mixing both fields in the same entry is an error.
value:
type: object
required:
- cluster
- consumer_group
- offsets
example:
cluster: prod-kafka-1
cluster: MkU3OEVBNTcwNTJENDM2Qg
consumer_group: order-processor
offsets:
- topic: orders
partition: 0
offset: 1000
- topic: orders
partition: 1
offset: 1500
reset_to: earliest
properties:
- name: cluster
type: string
description: Kafka cluster identifier
example: prod-kafka-1
description: |
Kafka cluster UUID as returned by the broker metadata (AdminClient.list_topics().cluster_id).
This is NOT a human-readable name — it is the internal UUID assigned when the cluster was
created (e.g. "MkU3OEVBNTcwNTJENDM2Qg"). The check compares this against the live
cluster ID and aborts if they differ, preventing accidental execution on the wrong cluster.
example: MkU3OEVBNTcwNTJENDM2Qg
- name: consumer_group
type: string
description: Consumer group ID to update
example: order-processor
- name: offsets
type: array
description: List of topic-partition-offset tuples to update
description: |
List of topic-partition offset specifications. Each entry targets one
partition and must specify exactly one of:
- offset: explicit non-negative integer offset to commit
- reset_to: 'earliest' or 'latest' — the check resolves the actual
partition offset at runtime via list_offsets before committing
items:
type: object
required:
- topic
- partition
- offset
properties:
- name: topic
type: string
description: Topic name
- name: partition
type: integer
description: Partition number
description: Non-negative partition number
- name: offset
type: integer
description: New offset value
description: Explicit offset to commit (non-negative). Mutually exclusive with reset_to.
- name: reset_to
type: string
description: |
Reset policy: 'earliest' commits the log-start offset; 'latest' commits
the high-watermark offset. Mutually exclusive with offset.
enum:
- earliest
- latest
example:
- topic: orders
partition: 0
offset: 1000
- topic: orders
partition: 1
offset: 1500
reset_to: earliest
fleet_configurable: false

# ========================================================================
Expand Down
1 change: 1 addition & 0 deletions kafka_actions/changelog.d/24165.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Harden reset-consumer-offsets action: enforce inactive group check, surface per-partition errors, add reset_to earliest/latest support, validate non-negative offsets, and document that cluster must be the Kafka cluster UUID.
6 changes: 5 additions & 1 deletion kafka_actions/datadog_checks/kafka_actions/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,10 +744,14 @@ def _action_update_consumer_group_offsets(self):
consumer_group = config['consumer_group']
offsets = config['offsets']

self.kafka_client.check_consumer_group_inactive(consumer_group)

self.log.warning(
"Updating offsets for consumer group '%s' on cluster '%s' - may cause duplicate processing or data loss",
"Updating offsets for consumer group '%s' on cluster '%s' - may cause duplicate processing or data loss. "
"Offsets: %s",
consumer_group,
self.cluster,
offsets,
)

success = self.kafka_client.update_consumer_group_offsets(consumer_group=consumer_group, offsets=offsets)
Expand Down
23 changes: 17 additions & 6 deletions kafka_actions/datadog_checks/kafka_actions/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,14 +313,25 @@ def _validate_update_consumer_group_offsets(self):
if 'partition' not in offset_entry:
raise ConfigurationError(f"offsets[{i}] requires 'partition' parameter")

if 'offset' not in offset_entry:
raise ConfigurationError(f"offsets[{i}] requires 'offset' parameter")
if not isinstance(offset_entry.get('partition'), int) or offset_entry['partition'] < 0:
raise ConfigurationError(f"offsets[{i}].partition must be a non-negative integer")

if not isinstance(offset_entry.get('partition'), int):
raise ConfigurationError(f"offsets[{i}].partition must be an integer")
has_offset = 'offset' in offset_entry
has_reset_to = 'reset_to' in offset_entry

if not isinstance(offset_entry.get('offset'), int):
raise ConfigurationError(f"offsets[{i}].offset must be an integer")
if not has_offset and not has_reset_to:
raise ConfigurationError(f"offsets[{i}] requires 'offset' or 'reset_to'")

if has_offset and has_reset_to:
raise ConfigurationError(f"offsets[{i}] cannot specify both 'offset' and 'reset_to'")

if has_offset:
if not isinstance(offset_entry['offset'], int) or offset_entry['offset'] < 0:
raise ConfigurationError(f"offsets[{i}].offset must be a non-negative integer")

if has_reset_to:
if offset_entry['reset_to'] not in ('earliest', 'latest'):
raise ConfigurationError(f"offsets[{i}].reset_to must be 'earliest' or 'latest'")

def _validate_produce_message(self):
"""Validate produce_message action configuration."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Any, Optional

from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
from typing_extensions import Literal

from datadog_checks.base.utils.functions import identity
from datadog_checks.base.utils.models import validation
Expand Down Expand Up @@ -205,8 +206,15 @@ class Offset(BaseModel):
arbitrary_types_allowed=True,
frozen=True,
)
offset: int = Field(..., description='New offset value')
partition: int = Field(..., description='Partition number')
offset: Optional[int] = Field(
None,
description='Explicit offset to commit (non-negative). Mutually exclusive with reset_to.',
)
partition: int = Field(..., description='Non-negative partition number')
reset_to: Optional[Literal['earliest', 'latest']] = Field(
None,
description="Reset policy: 'earliest' commits the log-start offset; 'latest' commits\nthe high-watermark offset. Mutually exclusive with offset.\n",
)
topic: str = Field(..., description='Topic name')


Expand All @@ -215,15 +223,19 @@ class UpdateConsumerGroupOffsets(BaseModel):
arbitrary_types_allowed=True,
frozen=True,
)
cluster: str = Field(..., description='Kafka cluster identifier', examples=['prod-kafka-1'])
cluster: str = Field(
...,
description='Kafka cluster UUID as returned by the broker metadata (AdminClient.list_topics().cluster_id).\nThis is NOT a human-readable name — it is the internal UUID assigned when the cluster was\ncreated (e.g. "MkU3OEVBNTcwNTJENDM2Qg"). The check compares this against the live\ncluster ID and aborts if they differ, preventing accidental execution on the wrong cluster.\n',
examples=['MkU3OEVBNTcwNTJENDM2Qg'],
)
consumer_group: str = Field(..., description='Consumer group ID to update', examples=['order-processor'])
offsets: tuple[Offset, ...] = Field(
...,
description='List of topic-partition-offset tuples to update',
description="List of topic-partition offset specifications. Each entry targets one\npartition and must specify exactly one of:\n - offset: explicit non-negative integer offset to commit\n - reset_to: 'earliest' or 'latest' — the check resolves the actual\n partition offset at runtime via list_offsets before committing\n",
examples=[
[
{'offset': 1000, 'partition': 0, 'topic': 'orders'},
{'offset': 1500, 'partition': 1, 'topic': 'orders'},
{'partition': 1, 'reset_to': 'earliest', 'topic': 'orders'},
]
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,18 +257,28 @@ instances:
## @param update_consumer_group_offsets - mapping - optional
## Configuration for updating consumer group offsets.
## WARNING: Can cause duplicate processing or data loss.
## Consumer group should be stopped (no active members).
## The consumer group must have no active members before this action runs;
## the check enforces this and will fail with a clear error if members are found.
##
## The 'cluster' value must be the Kafka-internal cluster UUID (e.g.
## "MkU3OEVBNTcwNTJENDM2Qg"), not a human-readable name. Retrieve it with:
## kafka-topics.sh --bootstrap-server <host>:9092 --describe | head -1
## or via the AdminClient metadata: admin.list_topics().cluster_id.
##
## Each entry in 'offsets' must specify either an explicit 'offset' (a
## non-negative integer) or a 'reset_to' policy ('earliest' or 'latest').
## Mixing both fields in the same entry is an error.
#
# update_consumer_group_offsets:
# cluster: prod-kafka-1
# cluster: MkU3OEVBNTcwNTJENDM2Qg
# consumer_group: order-processor
# offsets:
# - topic: orders
# partition: 0
# offset: 1000
# - topic: orders
# partition: 1
# offset: 1500
# reset_to: earliest

## @param produce_message - mapping - optional
## Configuration for producing a message to a Kafka topic.
Expand Down
59 changes: 53 additions & 6 deletions kafka_actions/datadog_checks/kafka_actions/kafka_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@
import time
from typing import TYPE_CHECKING, Any

from confluent_kafka import Consumer, KafkaError, KafkaException, Producer, TopicPartition
from confluent_kafka import (
Consumer,
ConsumerGroupTopicPartitions,
KafkaError,
KafkaException,
Producer,
TopicPartition,
)
from confluent_kafka.admin import AdminClient, ConfigResource, NewTopic, OffsetSpec, ResourceType

try:
Expand Down Expand Up @@ -510,6 +517,31 @@ def delete_consumer_group(self, consumer_group: str) -> bool:
self.log.error("Failed to delete consumer group '%s': %s", group_id, e)
raise

def check_consumer_group_inactive(self, consumer_group: str) -> None:
"""Raise if the consumer group has active members.

alter_consumer_group_offsets requires a dead or empty group; active members
cause Kafka to return NON_EMPTY_GROUP errors per partition.
"""
admin = self.get_admin_client()
futures = admin.describe_consumer_groups([consumer_group])
future = futures[consumer_group]
description = future.result()
if description.members:
raise Exception(
f"Consumer group '{consumer_group}' has {len(description.members)} active member(s). "
"Stop all consumers in the group before resetting offsets."
)

def resolve_offset(self, topic: str, partition: int, reset_to: str) -> int:
"""Resolve 'earliest' or 'latest' to a concrete partition offset."""
admin = self.get_admin_client()
tp = TopicPartition(topic, partition)
spec = OffsetSpec.earliest() if reset_to == 'earliest' else OffsetSpec.latest()
futures = admin.list_offsets({tp: spec}, request_timeout=10)
result = futures[tp].result()
return result.offset

def update_consumer_group_offsets(self, consumer_group: str, offsets: list[dict[str, Any]]) -> bool:
"""Update consumer group offsets for specific topic-partitions.

Expand All @@ -518,7 +550,8 @@ def update_consumer_group_offsets(self, consumer_group: str, offsets: list[dict[
offsets: List of offset specifications, each with:
- topic: Topic name
- partition: Partition number
- offset: New offset value
- offset: Explicit offset value (mutually exclusive with reset_to)
- reset_to: 'earliest' or 'latest' (mutually exclusive with offset)

Returns:
True if successful
Expand All @@ -530,18 +563,32 @@ def update_consumer_group_offsets(self, consumer_group: str, offsets: list[dict[
topic = offset_spec.get('topic')
partition = offset_spec.get('partition')
offset = offset_spec.get('offset')
reset_to = offset_spec.get('reset_to')

if topic is None or partition is None or offset is None:
raise ValueError("Each offset specification must have 'topic', 'partition', and 'offset'")
if topic is None or partition is None:
raise ValueError("Each offset specification must have 'topic' and 'partition'")
if offset is None and reset_to is None:
raise ValueError("Each offset specification must have 'offset' or 'reset_to'")

if reset_to is not None:
offset = self.resolve_offset(topic, partition, reset_to)
self.log.debug("Resolved '%s' for %s[%d] to offset %d", reset_to, topic, partition, offset)

tp = TopicPartition(topic, partition, offset)
topic_partitions.append(tp)

futures = admin.alter_consumer_group_offsets(consumer_group, topic_partitions)
futures = admin.alter_consumer_group_offsets([ConsumerGroupTopicPartitions(consumer_group, topic_partitions)])

for group_id, future in futures.items():
try:
future.result()
result = future.result()
partition_errors = [
f"{tp.topic}[{tp.partition}]: {tp.error}"
for tp in result.topic_partitions
if tp.error is not None
]
if partition_errors:
raise Exception(f"Per-partition errors for group '{group_id}': {'; '.join(partition_errors)}")
self.log.debug("Consumer group '%s' offsets updated for %d partitions", group_id, len(topic_partitions))
return True
except Exception as e:
Expand Down
1 change: 0 additions & 1 deletion kafka_actions/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import socket

from confluent_kafka.admin import AdminClient

from datadog_checks.dev import get_docker_hostname

HERE = os.path.dirname(os.path.abspath(__file__))
Expand Down
1 change: 0 additions & 1 deletion kafka_actions/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient
from confluent_kafka.cimpl import NewTopic

from datadog_checks.dev import WaitFor, docker_run
from datadog_checks.dev._env import e2e_testing
from datadog_checks.kafka_actions import KafkaActionsCheck
Expand Down
Loading
Loading