Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions integration_tests/integration_tests/test_consumer_rebalancing.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,22 @@ def manage_taskbroker(
taskbroker_index: int,
taskbroker_path: str,
config_file_path: str,
config: TaskbrokerConfig,
iterations: int,
min_sleep: int,
max_sleep: int,
log_file_path: str,
ports: list[int],
) -> None:
with open(log_file_path, "a") as log_file:
print(f"Starting taskbroker {taskbroker_index}, writing log file to " f"{log_file_path}")
for i in range(iterations):
# Use a distinct gRPC port each restart so we never re-bind a port
# still in TIME_WAIT (the original GHA flake mode).
config.grpc_port = ports[i]
with open(config_file_path, "w") as f:
yaml.safe_dump(config.to_dict(), f)

process = subprocess.Popen(
[taskbroker_path, "-c", config_file_path],
stderr=subprocess.STDOUT,
Expand Down Expand Up @@ -86,11 +94,9 @@ def test_tasks_written_once_during_rebalancing() -> None:
max_pending_count = 15_000
topic_name = "taskworker"
kafka_deadletter_topic = "taskworker-dlq"
grpc_ports = get_available_ports(num_consumers)
curr_time = int(time.time())

print(
f"""
print(f"""
Running test with the following configuration:
num of consumers: {num_consumers},
num of messages: {num_messages},
Expand All @@ -99,10 +105,8 @@ def test_tasks_written_once_during_rebalancing() -> None:
min restart duration: {min_restart_duration} seconds,
max restart duration: {max_restart_duration} seconds,
topic name: {topic_name},
grpc ports: {grpc_ports},
random seed value: 42
"""
)
""")
random.seed(42)

# Ensure topic exists and has correct number of partitions
Expand All @@ -123,27 +127,32 @@ def test_tasks_written_once_during_rebalancing() -> None:
kafka_deadletter_topic=kafka_deadletter_topic,
kafka_consumer_group=topic_name,
kafka_auto_offset_reset="earliest",
grpc_port=grpc_ports[i],
grpc_port=0,
)

for filename, config in taskbroker_configs.items():
with open(str(TEST_OUTPUT_PATH / filename), "w") as f:
yaml.safe_dump(config.to_dict(), f)
# Pre-allocate one gRPC port per (consumer, restart) so no port is ever
# rebound during the test
all_ports = get_available_ports(num_consumers * num_restarts)
ports_per_consumer = [
all_ports[i * num_restarts : (i + 1) * num_restarts] for i in range(num_consumers)
]

try:
send_generic_messages_to_topic(topic_name, num_messages)
threads: list[Thread] = []
for i in range(num_consumers):
for i, (filename, config) in enumerate(taskbroker_configs.items()):
thread = threading.Thread(
target=manage_taskbroker,
args=(
i,
taskbroker_path,
str(TEST_OUTPUT_PATH / f"config_{i}.yml"),
str(TEST_OUTPUT_PATH / filename),
config,
num_restarts,
min_restart_duration,
max_restart_duration,
str(TEST_OUTPUT_PATH / f"taskbroker_{i}_{curr_time}.log"),
ports_per_consumer[i],
),
)
thread.start()
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[flake8]
# File filtering is taken care of in pre-commit.
extend-ignore = E501
extend-ignore = E501, E203
Loading