Skip to content

KIP-932: Implement destroy APIs#5406

Open
Ojasva Jain (ojasvajain) wants to merge 16 commits into
dev_kip-932_queues-for-kafkafrom
dev_kip-932_destroy
Open

KIP-932: Implement destroy APIs#5406
Ojasva Jain (ojasvajain) wants to merge 16 commits into
dev_kip-932_queues-for-kafkafrom
dev_kip-932_destroy

Conversation

@ojasvajain
Copy link
Copy Markdown
Member

@ojasvajain Ojasva Jain (ojasvajain) commented Apr 16, 2026

This PR implements the following APIs related to share consumers:

  1. void rd_kafka_share_destroy(rd_kafka_share_t *rkshare);
  2. void rd_kafka_share_destroy_flags(rd_kafka_share_t *rkshare, int flags);

Note, handling of _DESTROY* in acknowledgement callback will be covered in a separate PR

Pending:

  • Handing of _DESTROY* in commit_sync
  • Broker decomission test cases
  • More test cases related to destroy API
  • Fix "Build configuration check" job getting stuck in destroy
  • Add missing test cases from this list
  • Add fatal error handling in close API
  • Evaluate RD_KAFKA_RESP_ERR__DESTROY check in rd_kafka_share_fetch_reply_op

Test Cases Added (so far):

  ┌─────┬──────────────────────────────────────────────────┬──────┬────────────────────────────────────────────────────────────────────────────────────────────────────────┐  
  │  #  │             Test (subtest function)              │ Type │                                              Description                                               │
  ├─────┼──────────────────────────────────────────────────┼──────┼────────────────────────────────────────────────────────────────────────────────────────────────────────┤  
  │     │                                                  │      │ Acks across commit_async + commit_sync with 5s ShareAck RTT delays, then destroys. Verifies destroy    │
  │ 1   │ test_destroy_with_cached_acks_and_delayed_broker │ mock │ completes within bounded time and that commit_sync returns REQUEST_TIMED_OUT for the still-pending     │  
  │     │                                                  │      │ partition.                                                                                             │  
  ├─────┼──────────────────────────────────────────────────┼──────┼────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ 2   │ test_broker_decommission_with_commit_sync        │ mock │ Decommissions a broker mid-commit_sync (background thread moves leader + removes broker from           │  
  │     │                                                  │      │ metadata). Asserts per-partition err = __DESTROY_BROKER for the orphaned partition.                    │  
  ├─────┼──────────────────────────────────────────────────┼──────┼────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ 3   │ test_broker_decommission_with_consume_batch      │ mock │ Decommissions a broker mid-consume_batch. Verifies the in-flight ShareFetch fails locally with         │  
  │     │                                                  │      │ __DESTROY_BROKER and the consumer keeps polling the surviving broker.                                  │  
  ├─────┼──────────────────────────────────────────────────┼──────┼────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ 4   │ test_broker_decommission_during_close            │ mock │ Decommissions a broker mid-close. Verifies close completes despite the in-flight leave-ShareAck being  │  
  │     │                                                  │      │ killed.                                                                                                │  
  ├─────┼──────────────────────────────────────────────────┼──────┼────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ 5   │ test_broker_decommission_with_commit_async       │ mock │ Decommissions a broker mid-commit_async. Verifies the async commit returns promptly with NULL error    │  
  │     │                                                  │      │ and the consumer survives.                                                                             │  
  ├─────┼──────────────────────────────────────────────────┼──────┼────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ 6   │ test_destroy_during_rebalance                    │ mock │ Drives the cgrp into wait-incr-unassign-to-complete (heartbeat blocked by injected 10s RTT), then      │  
  │     │                                                  │      │ destroys. NO_CONSUMER_CLOSE must return in <2s; full close must complete in <15s.                      │  
  ├─────┼──────────────────────────────────────────────────┼──────┼────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ 7   │ test_destroy_with_fatal_error                    │ mock │ Acks half, injects a 30s ShareAck delay, triggers fatal error, then destroys. Verifies the fatal-error │  
  │     │                                                  │      │  path (auto-promoted to NO_CONSUMER_CLOSE) returns in <2s regardless of caller flags.                  │  
  ├─────┼──────────────────────────────────────────────────┼──────┼────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ 8   │ do_test_destroy_with_explicit_ack                │ real │ Consumes a batch with explicit-ack mode, acks all or half (parameterised), then destroys without       │  
  │     │                                                  │      │ commit. Verifies destroy handles pending acks correctly.                                               │  
  ├─────┼──────────────────────────────────────────────────┼──────┼────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ 9   │ do_test_destroy_with_implicit_ack                │ real │ Consumes a batch with implicit-ack mode, never makes a follow-up poll (so implicit ack never fires),   │  
  │     │                                                  │      │ then destroys. Verifies destroy handles un-acked records gracefully.                                   │  
  ├─────┼──────────────────────────────────────────────────┼──────┼────────────────────────────────────────────────────────────────────────────────────────────────────────┤
  │ 10  │ do_test_destroy_with_subscribe_unsubscribe       │ real │ Creates a share consumer with various subscribe/unsubscribe combinations and destroys. Sanity check    │  
  │     │                                                  │      │ for state transitions through subscribe/unsubscribe.                                                   │  
  └─────┴──────────────────────────────────────────────────┴──────┴────────────────────────────────────────────────────────────────────────────────────────────────────────┘
                                                                                                                                                                              
  Notes:          
  - Each test runs with both destroy_flags=0 and RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE variants.
  - Tests 2–5 each also run with both explicit_ack=true/false variants (except _with_consume_batch which is implicit only).                                                   
  - Test 8 also runs with both ack_half=true/false variants.                                                               
  - Mock tests live in main_0179_share_consumer_destroy_local; real-broker tests live in main_0179_share_consumer_destroy.                                                    

@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@ojasvajain Ojasva Jain (ojasvajain) marked this pull request as ready for review April 16, 2026 19:29
@ojasvajain Ojasva Jain (ojasvajain) requested a review from a team as a code owner April 16, 2026 19:29
@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_close_only branch from 214b852 to 70c29ba Compare April 18, 2026 19:26
@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_close_only branch 2 times, most recently from 97d35a7 to 105eb74 Compare April 26, 2026 18:17
@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_close_only branch 5 times, most recently from a32c996 to 7b88cec Compare May 4, 2026 14:03
@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_destroy branch from 0b3197d to 9059cbc Compare May 5, 2026 04:27
Base automatically changed from dev_kip-932_close_only to dev_kip-932_queues-for-kafka May 5, 2026 05:45
airlock-confluentinc Bot pushed a commit that referenced this pull request May 6, 2026
1. src/rdkafka_broker.c: Add rd_kafka_broker_share_fetch_session_clear()
   to broker thread exit path — prevents assertion failure in
   rd_kafka_broker_destroy_final when toppars_to_forget is non-NULL
   after a broker is decommissioned during close.

2. tests/0178-share_consumer_close.c:
   - Uncomment test_close_with_broker_down_is_fatal_cb
   - Add delayed_down_state struct + delayed_down_cb background thread
   - Add test_close_with_broker_decommission: validates close()
     completes when a broker goes down mid-close
   - Register test in main_0178_share_consumer_close_local

Note: test passes on both branches (with/without rko_op_cb fix)
because set_down triggers __STATE (not __DESTROY) — decommission
is too slow to race ahead of the leave op processing. Needs
rd_kafka_mock_broker_remove_from_metadata (PR #5406) to decouple
metadata removal from connection drop for a deterministic test.
@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_destroy branch from dff116d to 716df74 Compare May 8, 2026 06:57
@ojasvajain Ojasva Jain (ojasvajain) changed the base branch from dev_kip-932_queues-for-kafka to dev_kip-932_acknowledgement_callback May 8, 2026 06:57
@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_destroy branch from 716df74 to fdbac55 Compare May 8, 2026 10:02
@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_acknowledgement_callback branch from 76eb457 to daa5c73 Compare May 8, 2026 15:46
@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_destroy branch from e26cdc3 to b3426df Compare May 9, 2026 10:38
Base automatically changed from dev_kip-932_acknowledgement_callback to dev_kip-932_queues-for-kafka May 11, 2026 15:08
Copy link
Copy Markdown
Member

@pranavrth Pranav Rathi (pranavrth) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments for the implementation part. I will go through the tests in another pass.

Comment thread .semaphore/semaphore.yml Outdated
Comment thread packaging/tools/run-share-consumer-tests.sh
Comment thread src/rdkafka.c
Comment thread src/rdkafka.c Outdated
Comment thread src/rdkafka.c
* Step 1: Dispatch acknowledgement callbacks.
* Per-partition errors were set by the broker thread.
*/
rd_kafka_share_dispatch_ack_callbacks(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to remember - Let's add ack callback tests as well.

Comment thread src/rdkafka_fetcher.c
Comment thread src/rdkafka_fetcher.h
Comment thread src/rdkafka_int.h
*/
rd_kafka_resp_err_t
rd_kafka_share_consumer_closed_err(rd_kafka_share_t *rkshare);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert.

Comment thread tests/test.c Outdated
Comment thread tests/logs.ci Outdated
test_ack_cb_state_t state;
int consumed = 0;
int attempts = 0;
test_ack_cb_state_t state = {0};
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue was causing the tests fail on the feature branch. It should be fixed now.

ack_cb_state_t state;
int consumed = 0;
int attempts = 0;
ack_cb_state_t state = {0};
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

while ((rcvd < 4 || state.callback_cnt < 1) && attempts-- > 0) {
size_t batch_rcvd = 0;
err = rd_kafka_share_consume_batch(rkshare, 2000, batch,
err = rd_kafka_share_consume_batch(rkshare, 2000, batch + rcvd,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If msgs don't get redelivered msgs in one consume_batch call, some msgs might leak. Observed this issue in one of the runs.

}
}

void rd_kafka_share_acks_clear_during_broker_decommission(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add test case for this flow.

* apply the result to the in-flight commit_sync request (copies
* err into the cgrp's result list, decrements awaiting count,
* completes the sync if this was the last broker outstanding). */
if (rkb->rkb_pending_commit_sync.sync_ack_details) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to dispatch callbacks as well.

}
}

void rd_kafka_share_acks_clear_during_broker_decommission(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add broker id as well in the logs.

Comment thread src/rdkafka.c
* (buf callback init -> INVALID_RECORD_STATE -> per-
* partition err or top-level err). */
if (ack_details) {
if (ack_details && rko_orig->rko_err) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move it to before dispatch ack_details (step 1) part.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add test as well for this.

Comment thread src/rdkafka.c
rd_kafka_share_commit_sync_apply_result(rk, rkcg, ack_details);
rd_kafka_dbg(
rk, CGRP, "SHARE",
"Commit sync reply from broker %s: %s, "
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can change wording for this.

Comment thread src/rdkafka_broker.c
rd_atomic32_add(&rkb->termination_in_progress, 1);

if (RD_KAFKA_IS_SHARE_CONSUMER(rk) &&
rkb->rkb_source == RD_KAFKA_LEARNED) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's verify this is correct.

Comment thread src/rdkafka_int.h
rd_bool_t should_fetch,
rd_bool_t should_leave);

void rd_kafka_share_commit_sync_apply_result(rd_kafka_t *rk,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if this should be moved to any other file?

int receipt_cnt;
int receipt_capacity;
int callback_invocations;
mtx_t lock;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if it is needed.

}
attempts++;
}
TEST_ASSERT(rcvd >= TEST_MSGS,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

==

*
* surviving_partition: TEST_MSGS/2 offsets, NO_ERROR
* target_partition: TEST_MSGS/2 offsets, __DESTROY_BROKER */
{
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants