diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheBootstrapFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheBootstrapFactory.java index 88c0633bc8..093b612c8b 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheBootstrapFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheBootstrapFactory.java @@ -35,10 +35,10 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaBindingConfig; import io.aklivity.zilla.runtime.binding.kafka.internal.types.Array32FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW; -import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConfigFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConfigDetailFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType; -import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaPartitionFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaPartitionMetadataFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.AbortFW; @@ -666,14 +666,14 @@ private void doBootstrapCleanup( private void onTopicConfigChanged( long traceId, - ArrayFW changedConfigs) + ArrayFW changedConfigs) { metaStream.doMetaInitialBeginIfNecessary(traceId); } private void onTopicMetaDataChanged( long traceId, - ArrayFW partitions) + ArrayFW partitions) { leadersByPartitionId.clear(); partitions.forEach(p -> leadersByPartitionId.put(p.partitionId(), p.leaderId())); @@ -698,7 +698,7 @@ private void onTopicMetaDataChanged( private void onPartitionMetaDataChangedIfNecessary( long traceId, - KafkaPartitionFW partition) + KafkaPartitionMetadataFW partition) { final int partitionId = partition.partitionId(); final int leaderId = partition.leaderId(); @@ -886,7 +886,7 @@ private void doDescribeInitialBegin( traceId, bootstrap.authorization, 0L, ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) .typeId(kafkaTypeId) - .describe(m -> m.topic(bootstrap.topic) + .describe(m -> m.name(bootstrap.topic) .configsItem(ci -> ci.set(CONFIG_NAME_CLEANUP_POLICY)) .configsItem(ci -> ci.set(CONFIG_NAME_MAX_MESSAGE_BYTES)) .configsItem(ci -> ci.set(CONFIG_NAME_SEGMENT_BYTES)) @@ -1009,7 +1009,7 @@ private void onDescribeReplyData( { final KafkaDataExFW kafkaDataEx = extension.get(kafkaDataExRO::wrap); final KafkaDescribeDataExFW kafkaDescribeDataEx = kafkaDataEx.describe(); - final ArrayFW changedConfigs = kafkaDescribeDataEx.configs(); + final ArrayFW changedConfigs = kafkaDescribeDataEx.configs(); bootstrap.onTopicConfigChanged(traceId, changedConfigs); @@ -1266,7 +1266,7 @@ private void onMetaReplyData( { final KafkaDataExFW kafkaDataEx = extension.get(kafkaDataExRO::wrap); final KafkaMetaDataExFW kafkaMetaDataEx = kafkaDataEx.meta(); - final ArrayFW partitions = kafkaMetaDataEx.partitions(); + final ArrayFW partitions = kafkaMetaDataEx.partitions(); bootstrap.onTopicMetaDataChanged(traceId, partitions); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheClientDescribeFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheClientDescribeFactory.java index ee6eca6f42..726304f679 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheClientDescribeFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheClientDescribeFactory.java @@ -35,7 +35,7 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig; import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight; -import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConfigFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConfigDetailFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.AbortFW; @@ -127,7 +127,7 @@ public MessageConsumer newStream( final KafkaBeginExFW kafkaBeginEx = extension.get(kafkaBeginExRO::tryWrap); assert kafkaBeginEx.kind() == KafkaBeginExFW.KIND_DESCRIBE; final KafkaDescribeBeginExFW kafkaDescribeBeginEx = kafkaBeginEx.describe(); - final String16FW topic = kafkaDescribeBeginEx.topic(); + final String16FW topic = kafkaDescribeBeginEx.name(); final String topicName = topic.asString(); MessageConsumer newStream = null; @@ -484,7 +484,7 @@ private void doDescribeFanoutInitialBegin( traceId, authorization, 0L, ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) .typeId(kafkaTypeId) - .describe(d -> d.topic(topic).configs(cs -> configNames.forEach(c -> cs.item(i -> i.set(c, UTF_8))))) + .describe(d -> d.name(topic).configs(cs -> configNames.forEach(c -> cs.item(i -> i.set(c, UTF_8))))) .build() .sizeof())); state = KafkaState.openingInitial(state); @@ -606,7 +606,7 @@ private void onDescribeFanoutReplyData( if (kafkaDescribeDataEx != null) { - final ArrayFW changedConfigs = kafkaDescribeDataEx.configs(); + final ArrayFW changedConfigs = kafkaDescribeDataEx.configs(); if (configValues == null) { configValues = new LinkedHashMap<>(); @@ -852,7 +852,7 @@ private void doDescribeReplyBegin( traceId, authorization, affinity, ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) .typeId(kafkaTypeId) - .describe(m -> m.topic(group.topic) + .describe(m -> m.name(group.topic) .configs(cs -> group.configNames.forEach(c -> cs.item(i -> i.set(c, UTF_8))))) .build() .sizeof())); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheMetaFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheMetaFactory.java index 06e00f0ea9..c3729a8c5a 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheMetaFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheMetaFactory.java @@ -41,7 +41,7 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig; import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight; -import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaPartitionFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaPartitionMetadataFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.AbortFW; @@ -471,8 +471,10 @@ private void onMetaFanoutMemberOpened( final KafkaDataExFW kafkaDataEx = kafkaDataExRW.wrap(extBuffer, 0, extBuffer.capacity()) .typeId(kafkaTypeId) - .meta(m -> leadersByPartitionId.forEach((p, l) -> m.partitionsItem(i -> i.partitionId(p) - .leaderId(l)))) + .meta(m -> m.replicationFactor((short) 0) + .partitions(ps -> leadersByPartitionId.forEach( + (p, l) -> ps.item(i -> i.partitionId(p).leaderId(l) + .replicas(r -> {}).isr(r -> {}))))) .build(); member.doMetaReplyDataIfNecessary(traceId, kafkaDataEx); } @@ -699,7 +701,7 @@ private void onMetaFanoutReplyData( if (kafkaMetaDataEx != null) { - final ArrayFW partitions = kafkaMetaDataEx.partitions(); + final ArrayFW partitions = kafkaMetaDataEx.partitions(); leadersByPartitionId.clear(); partitions.forEach(p -> leadersByPartitionId.put(p.partitionId(), p.leaderId())); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerDescribeFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerDescribeFactory.java index 18440bf2de..413ca080cb 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerDescribeFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerDescribeFactory.java @@ -42,7 +42,7 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaRouteConfig; import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight; -import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConfigFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConfigDetailFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.AbortFW; @@ -152,7 +152,7 @@ public MessageConsumer newStream( final KafkaBeginExFW kafkaBeginEx = extension.get(kafkaBeginExRO::tryWrap); assert kafkaBeginEx.kind() == KafkaBeginExFW.KIND_DESCRIBE; final KafkaDescribeBeginExFW kafkaDescribeBeginEx = kafkaBeginEx.describe(); - final String16FW beginTopic = kafkaDescribeBeginEx.topic(); + final String16FW beginTopic = kafkaDescribeBeginEx.name(); final String topicName = beginTopic.asString(); MessageConsumer newStream = null; @@ -471,7 +471,8 @@ private void onDescribeFanoutMemberOpened( final KafkaDataExFW kafkaDataEx = kafkaDataExRW.wrap(extBuffer, 0, extBuffer.capacity()) .typeId(kafkaTypeId) - .describe(d -> configValues.forEach((n, v) -> d.configsItem(i -> i.name(n).value(v)))) + .describe(d -> d.configs(cs -> configValues.forEach( + (n, v) -> cs.item(i -> i.name(n).value(v).isDefault(0).isSensitive(0))))) .build(); member.doDescribeReplyDataIfNecessary(traceId, kafkaDataEx); } @@ -527,7 +528,7 @@ private void doDescribeFanoutInitialBegin( traceId, authorization, 0L, ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) .typeId(kafkaTypeId) - .describe(d -> d.topic(topic.name()) + .describe(d -> d.name(topic.name()) .configs(cs -> configNames.forEach(c -> cs.item(i -> i.set(c, UTF_8))))) .build() .sizeof())); @@ -711,7 +712,7 @@ private void onDescribeFanoutReplyData( if (kafkaDescribeDataEx != null) { - final ArrayFW changedConfigs = kafkaDescribeDataEx.configs(); + final ArrayFW changedConfigs = kafkaDescribeDataEx.configs(); if (configValues == null) { configValues = new LinkedHashMap<>(); @@ -726,7 +727,7 @@ private void onDescribeFanoutReplyData( } private void onDescribeFanoutConfigChanged( - KafkaConfigFW config) + KafkaConfigDetailFW config) { final String16FW configName = config.name(); final String16FW configValue = config.value(); @@ -1021,7 +1022,7 @@ private void doDescribeReplyBegin( traceId, authorization, affinity, ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) .typeId(kafkaTypeId) - .describe(m -> m.topic(group.topic.name()) + .describe(m -> m.name(group.topic.name()) .configs(cs -> group.configNames.forEach(c -> cs.item(i -> i.set(c, UTF_8))))) .build() .sizeof())); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java index d9c6ff6a85..3d276ad0f4 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientDescribeFactory.java @@ -206,7 +206,7 @@ public MessageConsumer newStream( assert kafkaBeginEx.kind() == KafkaBeginExFW.KIND_DESCRIBE; final KafkaDescribeBeginExFW kafkaDescribeBeginEx = kafkaBeginEx.describe(); - final String16FW beginTopic = kafkaDescribeBeginEx.topic(); + final String16FW beginTopic = kafkaDescribeBeginEx.name(); final String topicName = beginTopic.asString(); MessageConsumer newStream = null; @@ -823,7 +823,7 @@ private void doApplicationBegin( ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) .typeId(kafkaTypeId) .describe(m -> m - .topic(topic) + .name(topic) .configs(cs -> configs.forEach(n -> { cs.item(i -> i.set(n, UTF_8)); @@ -1675,7 +1675,7 @@ private void onDecodeDescribeResponse( { final KafkaDataExFW kafkaDataEx = kafkaDataExRW.wrap(extBuffer, 0, extBuffer.capacity()) .typeId(kafkaTypeId) - .describe(d -> changedConfigs.forEach(n -> d.configsItem(ci -> ci.name(n).value(configs.get(n))))) + .describe(d -> changedConfigs.forEach(n -> d.configsItem(ci -> ci.name(n).value(configs.get(n)).isDefault(0).isSensitive(0)))) .build(); doApplicationData(traceId, authorization, kafkaDataEx); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java index 4964c96090..12f3480f52 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientMetaFactory.java @@ -1891,7 +1891,9 @@ private void onDecodeMetaResponse( final KafkaDataExFW kafkaDataEx = kafkaDataExRW.wrap(extBuffer, 0, extBuffer.capacity()) .typeId(kafkaTypeId) - .meta(m -> partitions.forEach((k, v) -> m.partitionsItem(pi -> pi.partitionId(k).leaderId(v)))) + .meta(m -> m.replicationFactor((short) 0) + .partitions(ps -> partitions.forEach((k, v) -> + ps.item(pi -> pi.partitionId(k).leaderId(v).replicas(r -> {}).isr(r -> {}))))) .build(); doApplicationData(traceId, authorization, kafkaDataEx); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java index 901f0fb455..e01f273c91 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaMergedFactory.java @@ -53,7 +53,7 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaAckMode; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaCapabilities; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConditionFW; -import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConfigFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaConfigDetailFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaType; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaEvaluation; @@ -65,7 +65,7 @@ import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaNotFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType; -import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaPartitionFW; +import io.aklivity.zilla.runtime.binding.kafka.internal.types.stream.KafkaPartitionMetadataFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaValueMatchFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.OctetsFW; import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW; @@ -1984,7 +1984,7 @@ private void cleanupBudgetCreditorIfNecessary() private void onTopicConfigChanged( long traceId, - ArrayFW configs) + ArrayFW configs) { configs.forEach(c -> { @@ -1998,7 +1998,7 @@ private void onTopicConfigChanged( private void onTopicMetaDataChanged( long traceId, - ArrayFW partitions) + ArrayFW partitions) { leadersByPartitionId.clear(); partitions.forEach(p -> leadersByPartitionId.put(p.partitionId(), p.leaderId())); @@ -2390,7 +2390,7 @@ private void doDescribeInitialBegin( traceId, merged.authorization, 0L, ex -> ex.set((b, o, l) -> kafkaBeginExRW.wrap(b, o, l) .typeId(kafkaTypeId) - .describe(m -> m.topic(merged.topic) + .describe(m -> m.name(merged.topic) .configsItem(ci -> ci.set(CONFIG_NAME_CLEANUP_POLICY)) .configsItem(ci -> ci.set(CONFIG_NAME_MAX_MESSAGE_BYTES)) .configsItem(ci -> ci.set(CONFIG_NAME_SEGMENT_BYTES)) @@ -2513,7 +2513,7 @@ private void onDescribeReplyData( { final KafkaDataExFW kafkaDataEx = extension.get(kafkaDataExRO::wrap); final KafkaDescribeDataExFW kafkaDescribeDataEx = kafkaDataEx.describe(); - final ArrayFW configs = kafkaDescribeDataEx.configs(); + final ArrayFW configs = kafkaDescribeDataEx.configs(); merged.onTopicConfigChanged(traceId, configs); doDescribeReplyWindow(traceId, 0, replyMax); @@ -2769,7 +2769,7 @@ private void onMetaReplyData( { final KafkaDataExFW kafkaDataEx = extension.get(kafkaDataExRO::wrap); final KafkaMetaDataExFW kafkaMetaDataEx = kafkaDataEx.meta(); - final ArrayFW partitions = kafkaMetaDataEx.partitions(); + final ArrayFW partitions = kafkaMetaDataEx.partitions(); merged.onTopicMetaDataChanged(traceId, partitions); doMetaReplyWindow(traceId, 0, replyMax); diff --git a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl index 4677f3a27f..06615a66e1 100644 --- a/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl +++ b/specs/binding-kafka.spec/src/main/resources/META-INF/zilla/kafka.idl @@ -63,10 +63,10 @@ scope kafka HEADERS (3) } - enum KafkaResourceType (uint8) + enum KafkaResourceType (int8) { - BROKER(4), - TOPIC(2) + TOPIC (2), + BROKER (4) } union KafkaCondition switch (uint8) @@ -187,10 +187,13 @@ scope kafka BOOTSTRAP (254), MERGED (255), DESCRIBE_CLUSTER (60), + ALTER_CONSUMER_GROUP_OFFSETS (53), ALTER_CONFIGS (33), INIT_PRODUCER_ID (22), DELETE_TOPICS (20), CREATE_TOPICS (19), + LIST_GROUPS (16), + DESCRIBE_GROUPS (15), META (3), OFFSET_COMMIT (8), OFFSET_FETCH (9), @@ -330,20 +333,38 @@ scope kafka string16 topic; } + struct KafkaPartitionMetadata + { + int32 partitionId; + int32 leaderId; + int32[] replicas; + int32[] isr; + } + struct KafkaMetaDataEx { - KafkaPartition[] partitions; + int16 replicationFactor; + KafkaPartitionMetadata[] partitions; } struct KafkaDescribeBeginEx { - string16 topic; + KafkaResourceType resourceType = TOPIC; + string16 name; string16[] configs; } + struct KafkaConfigDetail + { + string16 name; + string16 value; + uint8 isDefault; + uint8 isSensitive; + } + struct KafkaDescribeDataEx { - KafkaConfig[] configs; + KafkaConfigDetail[] configs; } struct KafkaFetchBeginEx @@ -564,12 +585,74 @@ scope kafka uint8 includeAuthorizedOperations; } + struct KafkaGroupState + { + string16 groupId; + string16 protocolType; + string16 groupState; + } + + struct KafkaListGroupsRequestBeginEx + { + string16[] statesFilter; + } + + struct KafkaDescribeGroupMember + { + string16 memberId; + string16 clientId; + string16 clientHost; + varint32 metadataLen; + octets[metadataLen] metadata = null; + varint32 assignmentLen; + octets[assignmentLen] assignment = null; + } + + struct KafkaDescribeGroupInfo + { + int16 error; + string16 groupId; + string16 groupState; + string16 protocolType; + string16 protocol; + KafkaDescribeGroupMember[] members; + } + + struct KafkaDescribeGroupsRequestBeginEx + { + string16[] groupIds; + uint8 includeAuthorizedOperations; + } + + struct KafkaAlterGroupTopicPartition + { + int32 partitionId; + int64 offset; + int32 leaderEpoch = -1; + string16 metadata = null; + } + + struct KafkaAlterGroupTopic + { + string16 name; + KafkaAlterGroupTopicPartition[] partitions; + } + + struct KafkaAlterConsumerGroupOffsetsRequestBeginEx + { + string16 groupId; + KafkaAlterGroupTopic[] topics; + } + union KafkaRequestBeginEx switch (uint8) { case 19: kafka::stream::KafkaCreateTopicsRequestBeginEx createTopics; case 20: kafka::stream::KafkaDeleteTopicsRequestBeginEx deleteTopics; case 33: kafka::stream::KafkaAlterConfigsRequestBeginEx alterConfigs; case 60: kafka::stream::KafkaDescribeClusterRequestBeginEx describeCluster; + case 15: kafka::stream::KafkaDescribeGroupsRequestBeginEx describeGroups; + case 16: kafka::stream::KafkaListGroupsRequestBeginEx listGroups; + case 53: kafka::stream::KafkaAlterConsumerGroupOffsetsRequestBeginEx alterConsumerGroupOffsets; } struct KafkaCreateTopicStatus @@ -630,12 +713,46 @@ scope kafka int32 authorizedOperations; } + struct KafkaListGroupsResponseBeginEx + { + int32 throttle; + int16 error; + KafkaGroupState[] groups; + } + + struct KafkaDescribeGroupsResponseBeginEx + { + int32 throttle; + KafkaDescribeGroupInfo[] groups; + } + + struct KafkaAlterGroupPartitionResult + { + int32 partitionId; + int16 error; + } + + struct KafkaAlterGroupTopicResult + { + string16 name; + KafkaAlterGroupPartitionResult[] partitions; + } + + struct KafkaAlterConsumerGroupOffsetsResponseBeginEx + { + int32 throttle; + KafkaAlterGroupTopicResult[] topics; + } + union KafkaResponseBeginEx switch (uint8) { case 19: kafka::stream::KafkaCreateTopicsResponseBeginEx createTopics; case 20: kafka::stream::KafkaDeleteTopicsResponseBeginEx deleteTopics; case 33: kafka::stream::KafkaAlterConfigsResponseBeginEx alterConfigs; case 60: kafka::stream::KafkaDescribeClusterResponseBeginEx describeCluster; + case 15: kafka::stream::KafkaDescribeGroupsResponseBeginEx describeGroups; + case 16: kafka::stream::KafkaListGroupsResponseBeginEx listGroups; + case 53: kafka::stream::KafkaAlterConsumerGroupOffsetsResponseBeginEx alterConsumerGroupOffsets; } }