From d43f73d17ae73ee96406dfd1554895d3d9d294d5 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Wed, 24 Jun 2026 09:06:58 -0400 Subject: [PATCH 1/2] ref(workerchild): remove passthrough option on future resolution --- .../taskbroker_client/worker/workerchild.py | 26 +-- clients/python/tests/worker/test_worker.py | 184 +++++++++--------- 2 files changed, 96 insertions(+), 114 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index e8d7a8a3..4f315c8b 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -272,8 +272,6 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: }, ) pending_task_futures.remove(task) - # FIXME(benm): Use passthrough option to get future-related metrics - # without placing a duplicate ProcessingResult _task_execution_complete( inflight=task.inflight, next_state=task.status, @@ -281,7 +279,6 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: execution_end_time=task.futures_start_time, task_func=task.task_func, futures_start_time=task.futures_start_time, - passthrough=True, ) def get_oldest_pending_activation() -> ActivationWithPendingFutures | None: @@ -515,13 +512,6 @@ def check_task_future_completion( task_func, ) else: - # FIXME(benm): Temporarily bypass producer futures needing to be completed - # before writing to `processed_tasks` while still recording metrics. - _place_processing_result( - inflight, - next_state, - task_func, - ) for name, futures in task_produced_futures.items(): # How many futures were produced in the executed task, # tagged by producer name @@ -782,18 +772,12 @@ def _task_execution_complete( execution_end_time: float, task_func: Task[Any, Any] | None, futures_start_time: float | None = None, - # FIXME(benm): Temp option to skip placing a task in processed_tasks. - # This is for tasks with pending producer futures, as we still want to record - # metrics as usual but want to have a `ProcessingResult` placed immediately - # while we troubleshoot why futures are never being marked as done. - passthrough: bool = False, ) -> None: - if not passthrough: - _place_processing_result( - inflight, - next_state, - task_func, - ) + _place_processing_result( + inflight, + next_state, + task_func, + ) record_task_execution( inflight.activation, next_state, diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index 2d35a903..059995ef 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -1437,55 +1437,54 @@ def test_child_process_tracks_producer_futures( assert result.status == TASK_ACTIVATION_STATUS_COMPLETE -# FIXME(benm): Skip this test while we're bypassing awaiting future completion -# def test_child_process_holds_result_until_futures_done( -# clear_pending_futures: None, restore_signal_handlers: None -# ) -> None: -# task = _producing_task() -# todo: queue.Queue[InflightTaskActivation] = queue.Queue() -# processed: queue.Queue[ProcessingResult] = queue.Queue() -# shutdown = Event() - -# pending_future: Future[BrokerValue[KafkaPayload]] = Future() -# todo.put(task) - -# # `child_process` calls `signal.signal`, which must run on the main thread. -# # Use a helper thread to observe the queue while the future is still -# # pending, then resolve the future so the drain can complete. -# observed_empty_while_pending = threading.Event() - -# def observe_and_resolve() -> None: -# # Wait for child_process to process the task and enter the drain loop. -# time.sleep(0.5) -# if processed.qsize() == 0: -# observed_empty_while_pending.set() -# pending_future.set_result(_make_broker_value()) - -# observer = threading.Thread(target=observe_and_resolve, name="future-observer") -# observer.start() -# try: -# with mock.patch.object( -# TaskProducer, "collect_futures", return_value={"test.producer": {pending_future}} -# ): -# child_process( -# "examples.app:app", -# todo, -# processed, -# shutdown, -# max_task_count=1, -# processing_pool_name="test", -# process_type="fork", -# ) -# finally: -# observer.join(timeout=5) -# shutdown.set() - -# assert ( -# observed_empty_while_pending.is_set() -# ), "result was pushed before the producer future was resolved" -# result = processed.get(timeout=5) -# assert result.task_id == task.activation.id -# assert result.status == TASK_ACTIVATION_STATUS_COMPLETE +def test_child_process_holds_result_until_futures_done( + clear_pending_futures: None, restore_signal_handlers: None +) -> None: + task = _producing_task() + todo: queue.Queue[InflightTaskActivation] = queue.Queue() + processed: queue.Queue[ProcessingResult] = queue.Queue() + shutdown = Event() + + pending_future: Future[BrokerValue[KafkaPayload]] = Future() + todo.put(task) + + # `child_process` calls `signal.signal`, which must run on the main thread. + # Use a helper thread to observe the queue while the future is still + # pending, then resolve the future so the drain can complete. + observed_empty_while_pending = threading.Event() + + def observe_and_resolve() -> None: + # Wait for child_process to process the task and enter the drain loop. + time.sleep(0.5) + if processed.qsize() == 0: + observed_empty_while_pending.set() + pending_future.set_result(_make_broker_value()) + + observer = threading.Thread(target=observe_and_resolve, name="future-observer") + observer.start() + try: + with mock.patch.object( + TaskProducer, "collect_futures", return_value={"test.producer": {pending_future}} + ): + child_process( + "examples.app:app", + todo, + processed, + shutdown, + max_task_count=1, + processing_pool_name="test", + process_type="fork", + ) + finally: + observer.join(timeout=5) + shutdown.set() + + assert ( + observed_empty_while_pending.is_set() + ), "result was pushed before the producer future was resolved" + result = processed.get(timeout=5) + assert result.task_id == task.activation.id + assert result.status == TASK_ACTIVATION_STATUS_COMPLETE def test_child_process_drains_pending_futures_on_sigterm( @@ -1531,50 +1530,49 @@ def deliver_sigterm() -> None: assert result.status == TASK_ACTIVATION_STATUS_COMPLETE -# FIXME(benm): Skip this test while we're bypassing awaiting future completion -# def test_child_process_retries_on_failed_future( -# clear_pending_futures: None, restore_signal_handlers: None -# ) -> None: -# retriable_task = InflightTaskActivation( -# host="localhost:50051", -# receive_timestamp=0, -# activation=TaskActivation( -# id="failed-future-retry", -# taskname="examples.will_retry", -# namespace="examples", -# parameters_bytes=msgpack.packb({"args": ["noop"], "kwargs": {}}, use_bin_type=True), -# processing_deadline_duration=2, -# retry_state=RetryState( -# attempts=0, -# max_attempts=3, -# on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD, -# ), -# ), -# ) -# todo: queue.Queue[InflightTaskActivation] = queue.Queue() -# processed: queue.Queue[ProcessingResult] = queue.Queue() -# shutdown = Event() - -# failed_future: Future[BrokerValue[KafkaPayload]] = Future() -# failed_future.set_exception(RuntimeError("kafka produce failed")) - -# todo.put(retriable_task) -# with mock.patch.object( -# TaskProducer, "collect_futures", return_value={"test.producer": {failed_future}} -# ): -# child_process( -# "examples.app:app", -# todo, -# processed, -# shutdown, -# max_task_count=1, -# processing_pool_name="test", -# process_type="fork", -# ) - -# result = processed.get(timeout=5) -# assert result.task_id == retriable_task.activation.id -# assert result.status == TASK_ACTIVATION_STATUS_RETRY +def test_child_process_retries_on_failed_future( + clear_pending_futures: None, restore_signal_handlers: None +) -> None: + retriable_task = InflightTaskActivation( + host="localhost:50051", + receive_timestamp=0, + activation=TaskActivation( + id="failed-future-retry", + taskname="examples.will_retry", + namespace="examples", + parameters_bytes=msgpack.packb({"args": ["noop"], "kwargs": {}}, use_bin_type=True), + processing_deadline_duration=2, + retry_state=RetryState( + attempts=0, + max_attempts=3, + on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD, + ), + ), + ) + todo: queue.Queue[InflightTaskActivation] = queue.Queue() + processed: queue.Queue[ProcessingResult] = queue.Queue() + shutdown = Event() + + failed_future: Future[BrokerValue[KafkaPayload]] = Future() + failed_future.set_exception(RuntimeError("kafka produce failed")) + + todo.put(retriable_task) + with mock.patch.object( + TaskProducer, "collect_futures", return_value={"test.producer": {failed_future}} + ): + child_process( + "examples.app:app", + todo, + processed, + shutdown, + max_task_count=1, + processing_pool_name="test", + process_type="fork", + ) + + result = processed.get(timeout=5) + assert result.task_id == retriable_task.activation.id + assert result.status == TASK_ACTIVATION_STATUS_RETRY def test_child_process_clears_pending_futures_when_task_fails( From 6e4888921bfbde454da34bf36c69e32027c0af7f Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Wed, 24 Jun 2026 09:45:31 -0400 Subject: [PATCH 2/2] Allow user to specify passthrough value rather than delete everything --- .../src/taskbroker_client/worker/worker.py | 7 ++ .../taskbroker_client/worker/workerchild.py | 25 ++++-- clients/python/tests/worker/test_worker.py | 82 +++++++++++++++++++ 3 files changed, 109 insertions(+), 5 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index f15d3755..1ae6aa3d 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -142,6 +142,7 @@ def __init__( grpc_port: int = 50052, push_task_timeout: float = 5, update_in_batches: bool = False, + skip_awaiting_futures: bool = True, ) -> None: app = import_app(app_module) @@ -166,6 +167,7 @@ def __init__( pod_name=pod_name, process_type=process_type, update_in_batches=update_in_batches, + skip_awaiting_futures=skip_awaiting_futures, ) logger.info("Running in PUSH mode") @@ -503,6 +505,7 @@ def __init__( process_type: str = "spawn", health_check_file_path: str | None = None, health_check_sec_per_touch: float = DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH, + skip_awaiting_futures: bool = True, ) -> None: self._namespace = namespace app = import_app(app_module) @@ -526,6 +529,7 @@ def __init__( result_queue_maxsize=result_queue_maxsize, processing_pool_name=processing_pool_name, process_type=process_type, + skip_awaiting_futures=skip_awaiting_futures, ) logger.info("Running in PULL mode") @@ -709,6 +713,7 @@ def __init__( pod_name: str | None = None, process_type: str = "spawn", update_in_batches: bool = False, + skip_awaiting_futures: bool = True, ) -> None: self._concurrency = concurrency self._processing_pool_name = processing_pool_name or "unknown" @@ -721,6 +726,7 @@ def __init__( self._app_module = app_module app = import_app(app_module) self._metrics = app.metrics + self._skip_awaiting_futures = skip_awaiting_futures self._mp_context = mp_context self._process_type = process_type @@ -859,6 +865,7 @@ def spawn_children_thread() -> None: self._max_child_task_count, self._processing_pool_name, self._process_type, + self._skip_awaiting_futures, ), ) process.start() diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 4f315c8b..c2c658b3 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -171,6 +171,7 @@ def child_process( max_task_count: int | None, processing_pool_name: str, process_type: str, + skip_awaiting_futures: bool, ) -> None: """ The entrypoint for spawned worker children. @@ -220,6 +221,7 @@ def run_worker( max_task_count: int | None, processing_pool_name: str, process_type: str, + skip_awaiting_futures: bool, ) -> None: processed_task_count = 0 pending_task_futures: list[ActivationWithPendingFutures] = [] @@ -279,6 +281,7 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: execution_end_time=task.futures_start_time, task_func=task.task_func, futures_start_time=task.futures_start_time, + passthrough=skip_awaiting_futures, ) def get_oldest_pending_activation() -> ActivationWithPendingFutures | None: @@ -512,6 +515,13 @@ def check_task_future_completion( task_func, ) else: + # Place ProcessingResult immediately if we're skipping awaiting futures + if skip_awaiting_futures: + _place_processing_result( + inflight, + next_state, + task_func, + ) for name, futures in task_produced_futures.items(): # How many futures were produced in the executed task, # tagged by producer name @@ -772,12 +782,16 @@ def _task_execution_complete( execution_end_time: float, task_func: Task[Any, Any] | None, futures_start_time: float | None = None, + passthrough: bool = False, ) -> None: - _place_processing_result( - inflight, - next_state, - task_func, - ) + # If passthrough is enabled, we already placed a ProcessingResult + # as soon as the task function finished execution + if not passthrough: + _place_processing_result( + inflight, + next_state, + task_func, + ) record_task_execution( inflight.activation, next_state, @@ -796,4 +810,5 @@ def _task_execution_complete( max_task_count, processing_pool_name, process_type, + skip_awaiting_futures, ) diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index 059995ef..38940875 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -768,6 +768,7 @@ def test_child_process_complete(mock_capture_checkin: mock.MagicMock) -> None: max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) assert todo.empty() @@ -804,6 +805,7 @@ def test_child_process_remove_start_time_kwargs() -> None: max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) assert todo.empty() @@ -826,6 +828,7 @@ def test_child_process_retry_task() -> None: max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) assert todo.empty() @@ -869,6 +872,7 @@ def test_child_process_retry_task_max_attempts( max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) assert todo.empty() @@ -910,6 +914,7 @@ def test_child_process_failure_task() -> None: max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) assert todo.empty() @@ -933,6 +938,7 @@ def test_child_process_shutdown() -> None: max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) # When shutdown has been set, the child should not process more tasks. @@ -955,6 +961,7 @@ def test_child_process_unknown_task() -> None: max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) result = processed.get() @@ -982,6 +989,7 @@ def test_child_process_at_most_once() -> None: max_task_count=2, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) assert todo.empty() @@ -1009,6 +1017,7 @@ def test_child_process_record_checkin(mock_capture_checkin: mock.Mock) -> None: max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) assert todo.empty() @@ -1040,6 +1049,7 @@ def test_child_process_pass_headers() -> None: max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) assert todo.empty() @@ -1080,6 +1090,7 @@ def test_child_process_terminate_task(mock_logger: mock.Mock) -> None: max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) assert todo.empty() @@ -1114,6 +1125,7 @@ def test_child_process_decompression(mock_capture_checkin: mock.MagicMock) -> No max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) assert todo.empty() @@ -1167,6 +1179,7 @@ def on_execute(self, headers: dict[str, str]) -> contextlib.AbstractContextManag max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) result = processed.get() @@ -1193,6 +1206,7 @@ def test_child_process_silenced_timeout(mock_logger: mock.Mock) -> None: max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) assert todo.empty() @@ -1222,6 +1236,7 @@ def test_child_process_silenced_exception_with_retries(mock_capture: mock.Mock) max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) assert todo.empty() @@ -1249,6 +1264,7 @@ def test_child_process_expected_ignored_exception_max_attempts(mock_capture: moc max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) # No reporting, but exception type is retriable @@ -1276,6 +1292,7 @@ def test_child_process_retry_on_deadline_exceeded(mock_logger: mock.Mock) -> Non max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) assert todo.empty() @@ -1308,6 +1325,7 @@ def test_child_process_general_exception_logs_task_failed(mock_logger: mock.Mock max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) result = processed.get() @@ -1343,6 +1361,7 @@ def test_child_process_silenced_exception_does_not_log_task_failed( max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) result = processed.get() @@ -1427,6 +1446,7 @@ def test_child_process_tracks_producer_futures( max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) # collect_futures is called once per executed task @@ -1474,6 +1494,7 @@ def observe_and_resolve() -> None: max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) finally: observer.join(timeout=5) @@ -1487,6 +1508,64 @@ def observe_and_resolve() -> None: assert result.status == TASK_ACTIVATION_STATUS_COMPLETE +def test_child_process_skip_awaiting_futures_places_result_immediately( + clear_pending_futures: None, restore_signal_handlers: None +) -> None: + task = _producing_task() + todo: queue.Queue[InflightTaskActivation] = queue.Queue() + processed: queue.Queue[ProcessingResult] = queue.Queue() + shutdown = Event() + + pending_future: Future[BrokerValue[KafkaPayload]] = Future() + todo.put(task) + + # With skip_awaiting_futures=True the ProcessingResult is placed as soon as + # the task function finishes executing, without waiting for the producer + # future to resolve. Observe the queue while the future is still pending to + # prove the result is available immediately, then resolve the future so the + # drain loop can complete. + observed_result_while_pending = threading.Event() + + def observe_and_resolve() -> None: + start = time.time() + while time.time() - start < 2: + if processed.qsize() > 0: + observed_result_while_pending.set() + break + time.sleep(0.01) + pending_future.set_result(_make_broker_value()) + + observer = threading.Thread(target=observe_and_resolve, name="future-observer") + observer.start() + try: + with mock.patch.object( + TaskProducer, "collect_futures", return_value={"test.producer": {pending_future}} + ): + child_process( + "examples.app:app", + todo, + processed, + shutdown, + max_task_count=1, + processing_pool_name="test", + process_type="fork", + skip_awaiting_futures=True, + ) + finally: + observer.join(timeout=5) + shutdown.set() + + assert ( + observed_result_while_pending.is_set() + ), "result was not placed immediately after the task function executed" + result = processed.get(timeout=5) + assert result.task_id == task.activation.id + assert result.status == TASK_ACTIVATION_STATUS_COMPLETE + # Awaiting the futures afterwards must not enqueue a second ProcessingResult + # (the immediate placement is the only result). + assert processed.empty() + + def test_child_process_drains_pending_futures_on_sigterm( clear_pending_futures: None, restore_signal_handlers: None ) -> None: @@ -1520,6 +1599,7 @@ def deliver_sigterm() -> None: max_task_count=None, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) finally: sigterm_thread.join(timeout=5) @@ -1568,6 +1648,7 @@ def test_child_process_retries_on_failed_future( max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) result = processed.get(timeout=5) @@ -1596,6 +1677,7 @@ def test_child_process_clears_pending_futures_when_task_fails( max_task_count=1, processing_pool_name="test", process_type="fork", + skip_awaiting_futures=False, ) result = processed.get(timeout=5)