From 97542e25bb816ae1f8f26260a7676b2e42f1bc78 Mon Sep 17 00:00:00 2001 From: Abhishek Shaw Date: Sun, 28 Jun 2026 21:26:05 -0700 Subject: [PATCH] Add Kafka broker setup and topic creation to Prepare. PiperOrigin-RevId: 939597473 --- .../linux_benchmarks/kafka_benchmark.py | 83 ++++++++++++++++++- 1 file changed, 79 insertions(+), 4 deletions(-) diff --git a/perfkitbenchmarker/linux_benchmarks/kafka_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kafka_benchmark.py index ac822a4b86..0e8caee7fe 100644 --- a/perfkitbenchmarker/linux_benchmarks/kafka_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kafka_benchmark.py @@ -14,8 +14,10 @@ """Runs Apache Kafka benchmarks.""" import concurrent.futures +import time from typing import Any, Mapping +from absl import flags from perfkitbenchmarker import benchmark_spec as bm_spec from perfkitbenchmarker import configs from perfkitbenchmarker import sample @@ -38,9 +40,30 @@ KAFKA_TAR = f'kafka_{KAFKA_VERSION}.tgz' KAFKA_URL = f'https://downloads.apache.org/kafka/4.2.0/{KAFKA_TAR}' +KAFKA_DIR = f'kafka_{KAFKA_VERSION}' +KAFKA_TOPIC_NAME = 'kafka_benchmark_test' +KAFKA_BROKER_PORT = 9092 +KAFKA_CONSUMER_TIMEOUT_MS = 100_000 + +FLAGS = flags.FLAGS +_KAFKA_NUM_PARTITIONS = flags.DEFINE_integer( + 'kafka_num_partitions', + 256, + 'Number of partitions for the topic.', +) +_KAFKA_REPLICATION_FACTOR = flags.DEFINE_integer( + 'kafka_replication_factor', + 1, + 'Replication factor for the topic.', +) + def _InstallKafka(vm): - """Installs Kafka on a VM.""" + """Installs Kafka on a VM. + + Args: + vm: The VM to install Kafka on. + """ vm.InstallPackages('openjdk-17-jdk') vm.Install('build_tools') vm.Install('pip') @@ -72,12 +95,64 @@ def Prepare(benchmark_spec: bm_spec.BenchmarkSpec) -> None: with concurrent.futures.ThreadPoolExecutor(max_workers=len(vms)) as executor: executor.map(_InstallKafka, vms) + broker_vm = benchmark_spec.vm_groups['broker'][0] + + broker_vm.RemoteCommand( + f'cd {KAFKA_DIR} && sed -i ' + f"'s/advertised.listeners=PLAINTEXT:\\/\\/localhost:{KAFKA_BROKER_PORT}/advertised.listeners=PLAINTEXT:\\/\\/{broker_vm.internal_ip}:{KAFKA_BROKER_PORT}/g' " + 'config/server.properties' + ) + + kafka_cluster_id, _ = broker_vm.RemoteCommand( + f'cd {KAFKA_DIR} && bin/kafka-storage.sh random-uuid' + ) + broker_vm.RemoteCommand( + f'cd {KAFKA_DIR} && bin/kafka-storage.sh format --standalone -t' + f' {kafka_cluster_id.strip()} -c config/server.properties' + ) + + # Start the broker + broker_vm.RemoteCommand( + f'cd {KAFKA_DIR} && bin/kafka-server-start.sh -daemon' + ' config/server.properties' + ) + # Wait for the broker to start. + time.sleep(10) + + broker_vm.RemoteCommand( + f'cd {KAFKA_DIR} && bin/kafka-topics.sh --create --topic' + f' {KAFKA_TOPIC_NAME} --bootstrap-server localhost:{KAFKA_BROKER_PORT}' + f' --partitions={_KAFKA_NUM_PARTITIONS.value}' + f' --replication-factor={_KAFKA_REPLICATION_FACTOR.value}' + f' --config min.insync.replicas={_KAFKA_REPLICATION_FACTOR.value}' + ' --if-not-exists' + ) + def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> list[sample.Sample]: - """Runs Kafka benchmarks.""" + """Runs Kafka benchmarks. + + Args: + benchmark_spec: The benchmark specification. + + Returns: + A list of samples. + """ return [] def Cleanup(benchmark_spec: bm_spec.BenchmarkSpec) -> None: - """Cleans up the benchmark.""" - pass + """Cleans up the benchmark. + + Args: + benchmark_spec: The benchmark specification. + """ + broker_vm = benchmark_spec.vm_groups['broker'][0] + + broker_vm.RemoteCommand( + f'cd {KAFKA_DIR} && bin/kafka-server-stop.sh', ignore_failure=True + ) + + broker_vm.RemoteCommand( + 'rm -rf /tmp/kraft-combined-logs', ignore_failure=True + )