diff --git a/integration_tests/integration_tests/test_consumer_rebalancing.py b/integration_tests/integration_tests/test_consumer_rebalancing.py index b9d21732..7d27be6f 100644 --- a/integration_tests/integration_tests/test_consumer_rebalancing.py +++ b/integration_tests/integration_tests/test_consumer_rebalancing.py @@ -25,14 +25,22 @@ def manage_taskbroker( taskbroker_index: int, taskbroker_path: str, config_file_path: str, + config: TaskbrokerConfig, iterations: int, min_sleep: int, max_sleep: int, log_file_path: str, + ports: list[int], ) -> None: with open(log_file_path, "a") as log_file: print(f"Starting taskbroker {taskbroker_index}, writing log file to " f"{log_file_path}") for i in range(iterations): + # Use a distinct gRPC port each restart so we never re-bind a port + # still in TIME_WAIT (the original GHA flake mode). + config.grpc_port = ports[i] + with open(config_file_path, "w") as f: + yaml.safe_dump(config.to_dict(), f) + process = subprocess.Popen( [taskbroker_path, "-c", config_file_path], stderr=subprocess.STDOUT, @@ -86,11 +94,9 @@ def test_tasks_written_once_during_rebalancing() -> None: max_pending_count = 15_000 topic_name = "taskworker" kafka_deadletter_topic = "taskworker-dlq" - grpc_ports = get_available_ports(num_consumers) curr_time = int(time.time()) - print( - f""" + print(f""" Running test with the following configuration: num of consumers: {num_consumers}, num of messages: {num_messages}, @@ -99,10 +105,8 @@ def test_tasks_written_once_during_rebalancing() -> None: min restart duration: {min_restart_duration} seconds, max restart duration: {max_restart_duration} seconds, topic name: {topic_name}, - grpc ports: {grpc_ports}, random seed value: 42 - """ - ) + """) random.seed(42) # Ensure topic exists and has correct number of partitions @@ -123,27 +127,32 @@ def test_tasks_written_once_during_rebalancing() -> None: kafka_deadletter_topic=kafka_deadletter_topic, kafka_consumer_group=topic_name, kafka_auto_offset_reset="earliest", - grpc_port=grpc_ports[i], + grpc_port=0, ) - for filename, config in taskbroker_configs.items(): - with open(str(TEST_OUTPUT_PATH / filename), "w") as f: - yaml.safe_dump(config.to_dict(), f) + # Pre-allocate one gRPC port per (consumer, restart) so no port is ever + # rebound during the test + all_ports = get_available_ports(num_consumers * num_restarts) + ports_per_consumer = [ + all_ports[i * num_restarts : (i + 1) * num_restarts] for i in range(num_consumers) + ] try: send_generic_messages_to_topic(topic_name, num_messages) threads: list[Thread] = [] - for i in range(num_consumers): + for i, (filename, config) in enumerate(taskbroker_configs.items()): thread = threading.Thread( target=manage_taskbroker, args=( i, taskbroker_path, - str(TEST_OUTPUT_PATH / f"config_{i}.yml"), + str(TEST_OUTPUT_PATH / filename), + config, num_restarts, min_restart_duration, max_restart_duration, str(TEST_OUTPUT_PATH / f"taskbroker_{i}_{curr_time}.log"), + ports_per_consumer[i], ), ) thread.start() diff --git a/setup.cfg b/setup.cfg index 741973e6..11e9216f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,3 +1,3 @@ [flake8] # File filtering is taken care of in pre-commit. -extend-ignore = E501 +extend-ignore = E501, E203