Skip to content

Commit b3426df

Browse files
committed
KIP-932: fix destroy tests acc to modified impl + leader-migration session-list fix
1 parent 7e439d4 commit b3426df

20 files changed

Lines changed: 1808 additions & 244 deletions

.semaphore/semaphore.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ blocks:
222222
--version ${KAFKA_VERSION} --cpversion ${CP_VERSION}
223223
--brokers 1
224224
--conf '["share.coordinator.state.topic.replication.factor=1", "share.coordinator.state.topic.min.isr=1", "group.share.min.record.lock.duration.ms=1000", "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1"]'
225-
--cmd 'TESTS_SKIP_BEFORE=0170 python3 run-test-batches.py')
225+
--cmd 'TESTS_SKIP_BEFORE=0157 python3 run-test-batches.py')
226226
- cache store trivup-kafka-${KAFKA_VERSION}-${CACHE_TAG} tests/tmp-KafkaCluster/KafkaCluster/KafkaBrokerApp/kafka/${KAFKA_VERSION}
227227

228228
- name: 'Windows MSVC: share consumer tests'
@@ -260,7 +260,7 @@ blocks:
260260
- Invoke-WebRequest -Uri "https://archive.apache.org/dist/kafka/${env:KAFKA_VERSION}/kafka_2.13-${env:KAFKA_VERSION}.tgz" -OutFile C:\kafka.tgz -UseBasicParsing
261261
- tar -xzf C:\kafka.tgz -C C:\
262262
- Add-Content "C:\kafka_2.13-${env:KAFKA_VERSION}\config\server.properties" "`ngroup.share.min.record.lock.duration.ms=1000`ntransaction.state.log.replication.factor=1`ntransaction.state.log.min.isr=1"
263-
- $env:PATH = [System.Environment]::GetEnvironmentVariable("PATH", "Machine") + ";" + [System.Environment]::GetEnvironmentVariable("PATH", "User"); $env:KAFKA_CLUSTER_ID = & "C:\kafka_2.13-${env:KAFKA_VERSION}\bin\windows\kafka-storage.bat" random-uuid; & "C:\kafka_2.13-${env:KAFKA_VERSION}\bin\windows\kafka-storage.bat" format --standalone -t $env:KAFKA_CLUSTER_ID -c "C:\kafka_2.13-${env:KAFKA_VERSION}\config\server.properties"; Start-Job -Name kafka-broker -ScriptBlock { param($v) & "C:\kafka_2.13-$v\bin\windows\kafka-server-start.bat" "C:\kafka_2.13-$v\config\server.properties" } -ArgumentList $env:KAFKA_VERSION; Start-Sleep 15; cd tests; Set-Content test.conf "bootstrap.servers=localhost:9092`nbroker.address.family=v4"; Remove-Item Env:TESTS -ErrorAction SilentlyContinue; Remove-Item Env:TESTS_SKIP -ErrorAction SilentlyContinue; $env:RDKAFKA_TEST_CONF = "test.conf"; $env:TESTS_SKIP_BEFORE = "0170"; $env:TEST_BROKER_OS = "windows"; & ..\win32\outdir\v142\x64\Release\tests.exe; Stop-Job -Name kafka-broker
263+
- $env:PATH = [System.Environment]::GetEnvironmentVariable("PATH", "Machine") + ";" + [System.Environment]::GetEnvironmentVariable("PATH", "User"); $env:KAFKA_CLUSTER_ID = & "C:\kafka_2.13-${env:KAFKA_VERSION}\bin\windows\kafka-storage.bat" random-uuid; & "C:\kafka_2.13-${env:KAFKA_VERSION}\bin\windows\kafka-storage.bat" format --standalone -t $env:KAFKA_CLUSTER_ID -c "C:\kafka_2.13-${env:KAFKA_VERSION}\config\server.properties"; Start-Job -Name kafka-broker -ScriptBlock { param($v) & "C:\kafka_2.13-$v\bin\windows\kafka-server-start.bat" "C:\kafka_2.13-$v\config\server.properties" } -ArgumentList $env:KAFKA_VERSION; Start-Sleep 15; cd tests; Set-Content test.conf "bootstrap.servers=localhost:9092`nbroker.address.family=v4"; Remove-Item Env:TESTS -ErrorAction SilentlyContinue; Remove-Item Env:TESTS_SKIP -ErrorAction SilentlyContinue; $env:RDKAFKA_TEST_CONF = "test.conf"; $env:TESTS_SKIP_BEFORE = "0157"; $env:TEST_BROKER_OS = "windows"; & ..\win32\outdir\v142\x64\Release\tests.exe; Stop-Job -Name kafka-broker
264264
- name: 'Share consumer tests: x86'
265265
env_vars:
266266
- name: triplet
@@ -271,7 +271,7 @@ blocks:
271271
- Invoke-WebRequest -Uri "https://archive.apache.org/dist/kafka/${env:KAFKA_VERSION}/kafka_2.13-${env:KAFKA_VERSION}.tgz" -OutFile C:\kafka.tgz -UseBasicParsing
272272
- tar -xzf C:\kafka.tgz -C C:\
273273
- Add-Content "C:\kafka_2.13-${env:KAFKA_VERSION}\config\server.properties" "`ngroup.share.min.record.lock.duration.ms=1000`ntransaction.state.log.replication.factor=1`ntransaction.state.log.min.isr=1"
274-
- $env:PATH = [System.Environment]::GetEnvironmentVariable("PATH", "Machine") + ";" + [System.Environment]::GetEnvironmentVariable("PATH", "User"); $env:KAFKA_CLUSTER_ID = & "C:\kafka_2.13-${env:KAFKA_VERSION}\bin\windows\kafka-storage.bat" random-uuid; & "C:\kafka_2.13-${env:KAFKA_VERSION}\bin\windows\kafka-storage.bat" format --standalone -t $env:KAFKA_CLUSTER_ID -c "C:\kafka_2.13-${env:KAFKA_VERSION}\config\server.properties"; Start-Job -Name kafka-broker -ScriptBlock { param($v) & "C:\kafka_2.13-$v\bin\windows\kafka-server-start.bat" "C:\kafka_2.13-$v\config\server.properties" } -ArgumentList $env:KAFKA_VERSION; Start-Sleep 15; cd tests; Set-Content test.conf "bootstrap.servers=localhost:9092`nbroker.address.family=v4"; Remove-Item Env:TESTS -ErrorAction SilentlyContinue; Remove-Item Env:TESTS_SKIP -ErrorAction SilentlyContinue; $env:RDKAFKA_TEST_CONF = "test.conf"; $env:TESTS_SKIP_BEFORE = "0170"; $env:TEST_BROKER_OS = "windows"; & ..\win32\outdir\v142\Win32\Release\tests.exe; Stop-Job -Name kafka-broker
274+
- $env:PATH = [System.Environment]::GetEnvironmentVariable("PATH", "Machine") + ";" + [System.Environment]::GetEnvironmentVariable("PATH", "User"); $env:KAFKA_CLUSTER_ID = & "C:\kafka_2.13-${env:KAFKA_VERSION}\bin\windows\kafka-storage.bat" random-uuid; & "C:\kafka_2.13-${env:KAFKA_VERSION}\bin\windows\kafka-storage.bat" format --standalone -t $env:KAFKA_CLUSTER_ID -c "C:\kafka_2.13-${env:KAFKA_VERSION}\config\server.properties"; Start-Job -Name kafka-broker -ScriptBlock { param($v) & "C:\kafka_2.13-$v\bin\windows\kafka-server-start.bat" "C:\kafka_2.13-$v\config\server.properties" } -ArgumentList $env:KAFKA_VERSION; Start-Sleep 15; cd tests; Set-Content test.conf "bootstrap.servers=localhost:9092`nbroker.address.family=v4"; Remove-Item Env:TESTS -ErrorAction SilentlyContinue; Remove-Item Env:TESTS_SKIP -ErrorAction SilentlyContinue; $env:RDKAFKA_TEST_CONF = "test.conf"; $env:TESTS_SKIP_BEFORE = "0157"; $env:TEST_BROKER_OS = "windows"; & ..\win32\outdir\v142\Win32\Release\tests.exe; Stop-Job -Name kafka-broker
275275

276276
- name: 'Linux x64: release artifact docker builds'
277277
dependencies: []

packaging/tools/run-share-consumer-tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ echo "arguments: $TEST_ARGS"
2222
--conf '["group.share.min.record.lock.duration.ms=1000", "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1"]' \
2323
--version "$TEST_KAFKA_GIT_REF" \
2424
--cpversion "$TEST_CP_VERSION" \
25-
--cmd "TESTS_SKIP_BEFORE=0170 python run-test-batches.py $TEST_ARGS")
25+
--cmd "TESTS_SKIP_BEFORE=0157 python run-test-batches.py $TEST_ARGS")

src/rdkafka.c

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,6 +1119,7 @@ void rd_kafka_destroy_final(rd_kafka_t *rk) {
11191119
mtx_destroy(&rk->rk_conf.sasl.lock);
11201120
rwlock_destroy(&rk->rk_lock);
11211121

1122+
rd_free(rk->rk_rkshare);
11221123
rd_free(rk);
11231124
rd_kafka_global_cnt_decr();
11241125
}
@@ -1201,6 +1202,18 @@ static void rd_kafka_destroy_app(rd_kafka_t *rk, int flags) {
12011202
rd_kafka_consumer_close(rk);
12021203
}
12031204

1205+
if (RD_KAFKA_IS_SHARE_CONSUMER(rk)) {
1206+
/* Destroy inflight acks map to release
1207+
* toppar references held by topic_partition objects in the map.
1208+
* Otherwise rd_kafka_destroy_app() deadlocks: it joins broker
1209+
* threads, which wait for refcnt <= 1, but the toppar holds a
1210+
* broker ref via rktp_leader that is only released when the
1211+
* toppar is destroyed, which requires refcnt 0, which requires
1212+
* releasing the rktp ref held by the inflight_acks map entry.
1213+
*/
1214+
RD_MAP_DESTROY(&rk->rk_rkshare->rkshare_inflight_acks);
1215+
}
1216+
12041217
/* Await telemetry termination. This method blocks until the last
12051218
* PushTelemetry request is sent (if possible). */
12061219
if (!(flags & RD_KAFKA_DESTROY_F_IMMEDIATE))
@@ -1258,23 +1271,17 @@ void rd_kafka_share_destroy(rd_kafka_share_t *rkshare) {
12581271
* TODO KIP-932: Guard this with checks for rkshare and
12591272
* rkshare->rkshare_rk?
12601273
*/
1261-
1262-
/* Destroy inflight acks map before rd_kafka_destroy() to release
1263-
* toppar references held by topic_partition objects in the map.
1264-
* Otherwise rd_kafka_destroy() deadlocks: it joins broker threads,
1265-
* which wait for refcnt <= 1, but the toppar holds a broker ref
1266-
* via rktp_leader that is only released when the toppar is destroyed,
1267-
* which requires refcnt 0, which requires releasing the rktp ref
1268-
* held by the inflight_acks map entry. */
1269-
RD_MAP_DESTROY(&rkshare->rkshare_inflight_acks);
12701274
rd_kafka_destroy(rkshare->rkshare_rk);
1271-
rd_free(rkshare);
12721275
}
12731276

12741277
void rd_kafka_destroy_flags(rd_kafka_t *rk, int flags) {
12751278
rd_kafka_destroy_app(rk, flags);
12761279
}
12771280

1281+
void rd_kafka_share_destroy_flags(rd_kafka_share_t *rkshare, int flags) {
1282+
rd_kafka_destroy_flags(rkshare->rkshare_rk, flags);
1283+
}
1284+
12781285

12791286
/**
12801287
* Main destructor for rd_kafka_t
@@ -1393,6 +1400,18 @@ static void rd_kafka_destroy_internal(rd_kafka_t *rk) {
13931400
thrd = rd_malloc(sizeof(*thrd));
13941401
*thrd = rk->rk_internal_rkb->rkb_thread;
13951402

1403+
/* TODO KIP-932: Clear any share-session state that may
1404+
* have been populated on the internal broker during a
1405+
* leader migration (PARTITION_JOIN runs on whichever
1406+
* broker the toppar is transiently delegated to,
1407+
* including :0/internal). Once the leader-migration
1408+
* path is fixed to skip non-LEARNED brokers, this
1409+
* SESSION_CLEAR enqueue can be removed. */
1410+
if (RD_KAFKA_IS_SHARE_CONSUMER(rk))
1411+
rd_kafka_q_enq(
1412+
rk->rk_internal_rkb->rkb_ops,
1413+
rd_kafka_op_new(RD_KAFKA_OP_SHARE_SESSION_CLEAR));
1414+
13961415
/* Send op to trigger queue wake-up.
13971416
* WARNING: This is last time we can read
13981417
* from rk_internal_rkb in this thread! */
@@ -3215,7 +3234,10 @@ rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk,
32153234
rd_bool_t records_fetched = rko_orig->rko_u.share_fetch.records_fetched;
32163235
rd_bool_t should_leave = rko_orig->rko_u.share_fetch.should_leave;
32173236

3218-
rd_kafka_assert(rk, thrd_is_current(rk->rk_thread));
3237+
/* TODO KIP-932: This assertion has been commented out as the function
3238+
* can also be called in broker thread during when rko has __DESTROY set
3239+
* (client termination). Check if it is safe to keep it this way */
3240+
// rd_kafka_assert(rk, thrd_is_current(rk->rk_thread));
32193241
rd_kafka_dbg(rk, CGRP, "SHAREFETCH",
32203242
"Share fetch reply: %s, should_fetch=%d, "
32213243
"records_fetched=%d, should_leave=%d, broker=%s",
@@ -3253,8 +3275,9 @@ rd_kafka_op_res_t rd_kafka_share_fetch_reply_op(rd_kafka_t *rk,
32533275
* enqueue a session leave op on the replying broker thread and return
32543276
*/
32553277
if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) {
3256-
rd_kafka_share_enqueue_fetch_op(rkcg->rkcg_rk, reply_rkb,
3257-
rd_false, rd_true);
3278+
if (!rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk))
3279+
rd_kafka_share_enqueue_fetch_op(
3280+
rkcg->rkcg_rk, reply_rkb, rd_false, rd_true);
32583281
return RD_KAFKA_OP_RES_HANDLED;
32593282
}
32603283

@@ -3455,6 +3478,7 @@ void rd_kafka_share_enqueue_fetch_op(rd_kafka_t *rk,
34553478
rko_sf->rko_u.share_fetch.target_broker = rkb;
34563479
rko_sf->rko_replyq = RD_KAFKA_REPLYQ(rk->rk_ops, 0);
34573480
rko_sf->rko_op_cb = rd_kafka_share_fetch_reply_op_cb;
3481+
rko_sf->rko_rk = rk;
34583482

34593483
/* Set fetch guard flag to prevent multiple in-flight fetches to the
34603484
* same broker.*/
@@ -3978,6 +4002,7 @@ static void rd_kafka_share_enqueue_sync_ack_op(rd_kafka_t *rk,
39784002
rko_sf->rko_u.share_fetch.target_broker = rkb;
39794003
rko_sf->rko_replyq = RD_KAFKA_REPLYQ(rk->rk_ops, 0);
39804004
rko_sf->rko_op_cb = rd_kafka_share_fetch_reply_op_cb;
4005+
rko_sf->rko_rk = rk;
39814006

39824007
rkb->rkb_share_fetch_enqueued = rd_true;
39834008

src/rdkafka.h

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3413,17 +3413,6 @@ void rd_kafka_destroy(rd_kafka_t *rk);
34133413
RD_EXPORT
34143414
void rd_kafka_destroy_flags(rd_kafka_t *rk, int flags);
34153415

3416-
3417-
/**
3418-
* @brief Destroy the share consumer instance.
3419-
*
3420-
* @remark This is a blocking operation.
3421-
*
3422-
* @sa rd_kafka_share_consumer_close()
3423-
*/
3424-
RD_EXPORT
3425-
void rd_kafka_share_destroy(rd_kafka_share_t *rkshare);
3426-
34273416
/**
34283417
* @brief Enable SASL background callbacks for a share consumer.
34293418
*
@@ -5218,6 +5207,24 @@ RD_EXPORT
52185207
int rd_kafka_share_consumer_closed(rd_kafka_share_t *rkshare);
52195208

52205209

5210+
/**
5211+
* @brief Destroy the share consumer instance and free all associated resources.
5212+
*
5213+
* @param rkshare Share consumer instance.
5214+
*/
5215+
RD_EXPORT
5216+
void rd_kafka_share_destroy(rd_kafka_share_t *rkshare);
5217+
5218+
/**
5219+
* @brief Destroy the share consumer instance according to specified destroy
5220+
* flags.
5221+
*
5222+
* @param rkshare Share consumer instance.
5223+
*
5224+
* @param flags Destroy flags (see RD_KAFKA_DESTROY_F_* defines).
5225+
*/
5226+
RD_EXPORT
5227+
void rd_kafka_share_destroy_flags(rd_kafka_share_t *rkshare, int flags);
52215228
/**@}*/
52225229

52235230

0 commit comments

Comments
 (0)