88import time
99from collections .abc import Callable , Generator , Sequence
1010from dataclasses import dataclass
11+ from functools import partial
1112from multiprocessing .synchronize import Event
1213from types import FrameType
1314from typing import Any
@@ -294,43 +295,57 @@ def get_oldest_pending_activation() -> ActivationWithPendingFutures | None:
294295 oldest = task
295296 return oldest
296297
297- def check_task_future_completion () -> None :
298- if len (pending_task_futures ) > 0 :
299- # Records how many activations with pending producer futures
300- # the worker child has. Only records when there are pending activations.
301- metrics .gauge (
302- "taskworker.worker.activations_with_pending_futures" ,
303- len (pending_task_futures ),
304- tags = {
305- "processing_pool" : processing_pool_name ,
306- },
307- )
308- for task in pending_task_futures .copy ():
309- future_status = [f .done () for fut in task .pending_futures .values () for f in fut ]
310- if all (future_status ):
311- await_task_futures (task )
312- else :
313- # How many futures are still pending in this task
298+ def check_task_future_completion (
299+ shutdown_event : Event , local_shutdown : threading .Event
300+ ) -> None :
301+ while not shutdown_event .is_set () and not local_shutdown .is_set ():
302+ if len (pending_task_futures ) > 0 :
303+ # Records how many activations with pending producer futures
304+ # the worker child has. Only records when there are pending activations.
305+ metrics .gauge (
306+ "taskworker.worker.activations_with_pending_futures" ,
307+ len (pending_task_futures ),
308+ tags = {
309+ "processing_pool" : processing_pool_name ,
310+ },
311+ )
312+ for task in pending_task_futures .copy ():
313+ future_status = [
314+ f .done () for fut in task .pending_futures .values () for f in fut
315+ ]
316+ if all (future_status ):
317+ await_task_futures (task )
318+ else :
319+ # How many futures are still pending in this task
320+ metrics .distribution (
321+ "taskworker.task.incomplete_futures" ,
322+ len ([f for f in future_status if not f ]),
323+ tags = {
324+ "processing_pool" : processing_pool_name ,
325+ "namespace" : task .inflight .activation .namespace ,
326+ "taskname" : task .inflight .activation .taskname ,
327+ },
328+ )
329+ # How long has the oldest pending task been sitting in the queue
330+ if oldest := get_oldest_pending_activation ():
314331 metrics .distribution (
315- "taskworker.task.incomplete_futures " ,
316- len ([ f for f in future_status if not f ]) ,
332+ "taskworker.worker.oldest_pending_activation_age " ,
333+ time . time () - oldest . futures_start_time ,
317334 tags = {
318335 "processing_pool" : processing_pool_name ,
319- "namespace" : task .inflight .activation .namespace ,
320- "taskname" : task .inflight .activation .taskname ,
336+ "namespace" : oldest .inflight .activation .namespace ,
337+ "taskname" : oldest .inflight .activation .taskname ,
321338 },
322339 )
323- # How long has the oldest pending task been sitting in the queue
324- if oldest := get_oldest_pending_activation ():
325- metrics .distribution (
326- "taskworker.worker.oldest_pending_activation_age" ,
327- time .time () - oldest .futures_start_time ,
328- tags = {
329- "processing_pool" : processing_pool_name ,
330- "namespace" : oldest .inflight .activation .namespace ,
331- "taskname" : oldest .inflight .activation .taskname ,
332- },
333- )
340+ else :
341+ time .sleep (0.1 )
342+
343+ _future_completion_thread = threading .Thread (
344+ name = "check-future-completion" ,
345+ target = partial (check_task_future_completion , shutdown_event , local_shutdown ),
346+ daemon = True ,
347+ )
348+ _future_completion_thread .start ()
334349
335350 while not shutdown_event .is_set () and not local_shutdown .is_set ():
336351 if max_task_count and processed_task_count >= max_task_count :
@@ -341,10 +356,11 @@ def check_task_future_completion() -> None:
341356 logger .info (
342357 "taskworker.max_task_count_reached" , extra = {"count" : processed_task_count }
343358 )
359+ # Still set the shutdown signal to trigger shutdown of the future checker thread
360+ local_shutdown .set ()
344361 break
345362
346363 try :
347- check_task_future_completion ()
348364 inflight = child_tasks .get (timeout = 1.0 )
349365 except queue .Empty :
350366 metrics .incr (
@@ -530,6 +546,7 @@ def check_task_future_completion() -> None:
530546 pending_task_futures .append (pending_task )
531547
532548 # Once we get the shutdown signal, drain any pending futures
549+ _future_completion_thread .join ()
533550 for task in pending_task_futures .copy ():
534551 await_task_futures (task )
535552
0 commit comments