Skip to content

Commit 7e439d4

Browse files
committed
requested changes
1 parent daa5c73 commit 7e439d4

11 files changed

Lines changed: 678 additions & 382 deletions

src/rdkafka.c

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3223,6 +3223,13 @@ rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk,
32233223
records_fetched, should_leave,
32243224
reply_rkb ? rd_kafka_broker_name(reply_rkb) : "none");
32253225

3226+
/*
3227+
* Step 1: Dispatch acknowledgement callbacks.
3228+
* Per-partition errors were set by the broker thread.
3229+
*/
3230+
rd_kafka_share_dispatch_ack_callbacks(
3231+
rk, rko_orig->rko_u.share_fetch.ack_details);
3232+
32263233
reply_rkb->rkb_share_fetch_enqueued = rd_false;
32273234

32283235
if (should_fetch)
@@ -3234,15 +3241,15 @@ rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk,
32343241
}
32353242

32363243
/*
3237-
* Step 1: If records were fetched and broker is not terminating,
3244+
* Step 2: If records were fetched and broker is not terminating,
32383245
* reset the global fetch guard so the next FANOUT can select
32393246
* a new fetch broker.
32403247
*/
32413248
if (records_fetched && rko_orig->rko_err != RD_KAFKA_RESP_ERR__DESTROY)
32423249
rkcg->rkcg_share.share_fetch_more_records = rd_false;
32433250

32443251
/*
3245-
* Step 2: If the consumer has been marked for termination,
3252+
* Step 3: If the consumer has been marked for termination,
32463253
* enqueue a session leave op on the replying broker thread and return
32473254
*/
32483255
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) {
@@ -3252,7 +3259,7 @@ rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk,
32523259
}
32533260

32543261
/*
3255-
* Step 3: Handle commit_sync reply if this op belongs to
3262+
* Step 4: Handle commit_sync reply if this op belongs to
32563263
* the current commit_sync request.
32573264
*/
32583265
if (rko_orig->rko_u.share_fetch.commit_sync_request_id != 0 &&
@@ -3307,13 +3314,6 @@ rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk,
33073314
rd_kafka_share_commit_sync_maybe_complete(rk, rkcg);
33083315
}
33093316

3310-
/*
3311-
* Step 4: Dispatch acknowledgement callbacks.
3312-
* Per-partition errors were set by the broker thread.
3313-
*/
3314-
rd_kafka_share_dispatch_ack_callbacks(
3315-
rk, rko_orig->rko_u.share_fetch.ack_details);
3316-
33173317
/*
33183318
* Step 5: Dispatch pending commit_sync to the replying broker
33193319
* (highest priority). Acks must always be sent.
@@ -5725,7 +5725,7 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,
57255725
rko->rko_u.offset_commit.opaque);
57265726
break;
57275727

5728-
case RD_KAFKA_OP_SHARE_ACK_COMMIT:
5728+
case RD_KAFKA_OP_SHARE_ACK_COMMIT_CB:
57295729
if (!rko->rko_u.share_ack_commit.cb)
57305730
return RD_KAFKA_OP_RES_PASS; /* Dont handle here */
57315731
rko->rko_u.share_ack_commit.cb(

src/rdkafka_op.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) {
131131
[RD_KAFKA_OP_SHARE_SESSION_PARTITION_REMOVE] =
132132
"REPLY:SHARE_SESSION_PARTITION_REMOVE",
133133
[RD_KAFKA_OP_SHARE_FETCH_RESPONSE] = "REPLY:SHARE_FETCH_RESPONSE",
134-
[RD_KAFKA_OP_SHARE_ACK_COMMIT] = "REPLY:SHARE_ACK_COMMIT",
134+
[RD_KAFKA_OP_SHARE_ACK_COMMIT_CB] = "REPLY:SHARE_ACK_COMMIT_CB",
135135
[RD_KAFKA_OP_SHARE_COMMIT_ASYNC_FANOUT] =
136136
"REPLY:SHARE_COMMIT_ASYNC_FANOUT",
137137
[RD_KAFKA_OP_SHARE_COMMIT_SYNC_FANOUT] =
@@ -309,7 +309,7 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) {
309309
[RD_KAFKA_OP_SHARE_SESSION_PARTITION_REMOVE] = _RD_KAFKA_OP_EMPTY,
310310
[RD_KAFKA_OP_SHARE_FETCH_RESPONSE] =
311311
sizeof(rko->rko_u.share_fetch_response),
312-
[RD_KAFKA_OP_SHARE_ACK_COMMIT] =
312+
[RD_KAFKA_OP_SHARE_ACK_COMMIT_CB] =
313313
sizeof(rko->rko_u.share_ack_commit),
314314
[RD_KAFKA_OP_SHARE_COMMIT_ASYNC_FANOUT] =
315315
sizeof(rko->rko_u.share_commit_async_fanout),
@@ -584,7 +584,7 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) {
584584
break;
585585
}
586586

587-
case RD_KAFKA_OP_SHARE_ACK_COMMIT:
587+
case RD_KAFKA_OP_SHARE_ACK_COMMIT_CB:
588588
RD_IF_FREE(rko->rko_u.share_ack_commit.partitions,
589589
rd_kafka_share_partition_offsets_list_destroy);
590590
break;

src/rdkafka_op.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ typedef enum {
212212
RD_KAFKA_OP_SHARE_FETCH_RESPONSE, /**< Share fetch response containing
213213
* all messages and partition acks
214214
* from a single broker response. */
215-
RD_KAFKA_OP_SHARE_ACK_COMMIT, /**< Share acknowledgement callback
215+
RD_KAFKA_OP_SHARE_ACK_COMMIT_CB, /**< Share acknowledgement callback
216216
* reply: main -> app */
217217

218218
RD_KAFKA_OP__END

src/rdkafka_share_acknowledgement.c

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -792,7 +792,7 @@ rd_kafka_share_acknowledge_offset(rd_kafka_share_t *rkshare,
792792
* @param partition Partition id.
793793
* @param offsets_cnt Number of offsets to allocate space for.
794794
*/
795-
rd_kafka_share_partition_offsets_t *
795+
static rd_kafka_share_partition_offsets_t *
796796
rd_kafka_share_partition_offsets_new(rd_kafka_Uuid_t topic_id,
797797
const char *topic,
798798
int32_t partition,
@@ -816,7 +816,7 @@ rd_kafka_share_partition_offsets_new(rd_kafka_Uuid_t topic_id,
816816
*
817817
* @param elem Element to destroy.
818818
*/
819-
void rd_kafka_share_partition_offsets_destroy(
819+
static void rd_kafka_share_partition_offsets_destroy(
820820
rd_kafka_share_partition_offsets_t *elem) {
821821
if (!elem)
822822
return;
@@ -832,7 +832,7 @@ void rd_kafka_share_partition_offsets_destroy(
832832
* Caller must destroy with
833833
* rd_kafka_share_partition_offsets_list_destroy().
834834
*/
835-
rd_kafka_share_partition_offsets_list_t *
835+
static rd_kafka_share_partition_offsets_list_t *
836836
rd_kafka_share_partition_offsets_list_new(int capacity) {
837837
rd_kafka_share_partition_offsets_list_t *list;
838838
size_t size;
@@ -897,9 +897,10 @@ rd_kafka_share_build_partition_offsets_list(
897897
}
898898

899899

900-
void rd_kafka_share_enqueue_ack_callback(rd_kafka_t *rk,
901-
rd_kafka_share_ack_batches_t *batches,
902-
rd_kafka_resp_err_t err) {
900+
void rd_kafka_share_enqueue_ack_commit_cb_op(
901+
rd_kafka_t *rk,
902+
rd_kafka_share_ack_batches_t *batches,
903+
rd_kafka_resp_err_t err) {
903904
rd_kafka_op_t *cb_rko;
904905
rd_kafka_share_partition_offsets_list_t *partitions;
905906

@@ -910,7 +911,7 @@ void rd_kafka_share_enqueue_ack_callback(rd_kafka_t *rk,
910911
if (!partitions)
911912
return;
912913

913-
cb_rko = rd_kafka_op_new(RD_KAFKA_OP_SHARE_ACK_COMMIT);
914+
cb_rko = rd_kafka_op_new(RD_KAFKA_OP_SHARE_ACK_COMMIT_CB);
914915
cb_rko->rko_err = err;
915916
cb_rko->rko_u.share_ack_commit.partitions = partitions;
916917
cb_rko->rko_u.share_ack_commit.cb =
@@ -984,9 +985,7 @@ void rd_kafka_share_dispatch_ack_callbacks(rd_kafka_t *rk,
984985

985986
/* Use per-partition error from each batch */
986987
RD_LIST_FOREACH(ack_batch, ack_details, k) {
987-
rd_kafka_share_enqueue_ack_callback(
988-
rk, ack_batch,
989-
ack_batch->rktpar ? ack_batch->rktpar->err
990-
: RD_KAFKA_RESP_ERR_NO_ERROR);
988+
rd_kafka_share_enqueue_ack_commit_cb_op(rk, ack_batch,
989+
ack_batch->rktpar->err);
991990
}
992991
}

src/rdkafka_share_acknowledgement.h

Lines changed: 16 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,6 @@
2828
#ifndef _RDKAFKA_SHARE_ACKNOWLEDGEMENT_H_
2929
#define _RDKAFKA_SHARE_ACKNOWLEDGEMENT_H_
3030

31-
#include "rdlist.h"
32-
#include "rdkafka.h"
33-
34-
/* Forward declarations */
3531
typedef struct rd_kafka_op_s rd_kafka_op_t;
3632

3733
typedef enum rd_kafka_internal_ShareAcknowledgement_type_s {
@@ -250,39 +246,6 @@ struct rd_kafka_share_partition_offsets_list_s {
250246
offsets */
251247
};
252248

253-
/**
254-
* @brief Allocate and initialize a partition offsets element.
255-
*
256-
* @param topic_id Topic UUID.
257-
* @param topic Topic name (will be duplicated).
258-
* @param partition Partition id.
259-
* @param offsets_cnt Number of offsets to allocate space for.
260-
* @returns Newly allocated partition offsets element.
261-
*/
262-
rd_kafka_share_partition_offsets_t *
263-
rd_kafka_share_partition_offsets_new(rd_kafka_Uuid_t topic_id,
264-
const char *topic,
265-
int32_t partition,
266-
int offsets_cnt);
267-
268-
/**
269-
* @brief Destroy a partition offsets element.
270-
*
271-
* @param elem Element to destroy.
272-
*/
273-
void rd_kafka_share_partition_offsets_destroy(
274-
rd_kafka_share_partition_offsets_t *elem);
275-
276-
/**
277-
* @brief Allocate and initialize a partition offsets list.
278-
*
279-
* @param capacity Initial capacity for elements.
280-
* @returns Newly allocated list, or NULL if capacity is 0.
281-
* Caller must destroy with
282-
* rd_kafka_share_partition_offsets_list_destroy().
283-
*/
284-
rd_kafka_share_partition_offsets_list_t *
285-
rd_kafka_share_partition_offsets_list_new(int capacity);
286249

287250
/**
288251
* @brief Destroy a partition offsets list.
@@ -320,21 +283,28 @@ rd_kafka_share_build_partition_offsets_list(
320283
* @param batches The ack batches for this partition (contains offsets).
321284
* @param err Error code to report in callback.
322285
*/
323-
void rd_kafka_share_enqueue_ack_callback(rd_kafka_t *rk,
324-
rd_kafka_share_ack_batches_t *batches,
325-
rd_kafka_resp_err_t err);
286+
void rd_kafka_share_enqueue_ack_commit_cb_op(
287+
rd_kafka_t *rk,
288+
rd_kafka_share_ack_batches_t *batches,
289+
rd_kafka_resp_err_t err);
326290

327291

328292
/**
329-
* @brief Dispatch ack callbacks for all partitions in ack_details.
293+
* @brief Enqueue acknowledgement callbacks to application for each partition.
294+
*
295+
* Iterates through each partition in ack_details and enqueues one callback
296+
* operation (RD_KAFKA_OP_SHARE_ACK_COMMIT_CB) per partition to the
297+
* application's reply queue. Each operation contains:
298+
* - The partition's acknowledged offsets
299+
* - Per-partition error code from batch->rktpar->err
330300
*
331-
* If err is set (top-level error), all partitions receive the same error.
332-
* Otherwise, per-partition results from ack_results are used.
301+
* The application's share_acknowledgement_commit_cb is invoked once per
302+
* partition when the app calls rd_kafka_consumer_poll() or
303+
* rd_kafka_queue_poll().
333304
*
334305
* @param rk Kafka handle.
335-
* @param ack_details List of ack batches.
336-
* @param ack_results Per-partition results (may be NULL).
337-
* @param err Top-level error code.
306+
* @param ack_details List of rd_kafka_share_ack_batches_t* with acknowledgement
307+
* results and per-partition error codes.
338308
*/
339309
void rd_kafka_share_dispatch_ack_callbacks(rd_kafka_t *rk,
340310
rd_list_t *ack_details);

0 commit comments

Comments
 (0)