Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions clients/python/src/taskbroker_client/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def __init__(
grpc_port: int = 50052,
push_task_timeout: float = 5,
update_in_batches: bool = False,
skip_awaiting_futures: bool = True,
) -> None:
app = import_app(app_module)

Expand All @@ -166,6 +167,7 @@ def __init__(
pod_name=pod_name,
process_type=process_type,
update_in_batches=update_in_batches,
skip_awaiting_futures=skip_awaiting_futures,
)

logger.info("Running in PUSH mode")
Expand Down Expand Up @@ -503,6 +505,7 @@ def __init__(
process_type: str = "spawn",
health_check_file_path: str | None = None,
health_check_sec_per_touch: float = DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH,
skip_awaiting_futures: bool = True,
) -> None:
self._namespace = namespace
app = import_app(app_module)
Expand All @@ -526,6 +529,7 @@ def __init__(
result_queue_maxsize=result_queue_maxsize,
processing_pool_name=processing_pool_name,
process_type=process_type,
skip_awaiting_futures=skip_awaiting_futures,
)

logger.info("Running in PULL mode")
Expand Down Expand Up @@ -709,6 +713,7 @@ def __init__(
pod_name: str | None = None,
process_type: str = "spawn",
update_in_batches: bool = False,
skip_awaiting_futures: bool = True,
Comment thread
cursor[bot] marked this conversation as resolved.
) -> None:
self._concurrency = concurrency
self._processing_pool_name = processing_pool_name or "unknown"
Comment thread
bmckerry marked this conversation as resolved.
Expand All @@ -721,6 +726,7 @@ def __init__(
self._app_module = app_module
app = import_app(app_module)
self._metrics = app.metrics
self._skip_awaiting_futures = skip_awaiting_futures

self._mp_context = mp_context
self._process_type = process_type
Expand Down Expand Up @@ -859,6 +865,7 @@ def spawn_children_thread() -> None:
self._max_child_task_count,
self._processing_pool_name,
self._process_type,
self._skip_awaiting_futures,
),
)
process.start()
Expand Down
27 changes: 13 additions & 14 deletions clients/python/src/taskbroker_client/worker/workerchild.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def child_process(
max_task_count: int | None,
processing_pool_name: str,
process_type: str,
skip_awaiting_futures: bool,
) -> None:
"""
The entrypoint for spawned worker children.
Expand Down Expand Up @@ -220,6 +221,7 @@ def run_worker(
max_task_count: int | None,
processing_pool_name: str,
process_type: str,
skip_awaiting_futures: bool,
) -> None:
processed_task_count = 0
pending_task_futures: list[ActivationWithPendingFutures] = []
Expand Down Expand Up @@ -272,16 +274,14 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None:
},
)
pending_task_futures.remove(task)
# FIXME(benm): Use passthrough option to get future-related metrics
# without placing a duplicate ProcessingResult
_task_execution_complete(
inflight=task.inflight,
next_state=task.status,
execution_start_time=task.execution_start_time,
execution_end_time=task.futures_start_time,
task_func=task.task_func,
futures_start_time=task.futures_start_time,
passthrough=True,
passthrough=skip_awaiting_futures,
)

def get_oldest_pending_activation() -> ActivationWithPendingFutures | None:
Comment thread
bmckerry marked this conversation as resolved.
Expand Down Expand Up @@ -515,13 +515,13 @@ def check_task_future_completion(
task_func,
)
else:
# FIXME(benm): Temporarily bypass producer futures needing to be completed
# before writing to `processed_tasks` while still recording metrics.
_place_processing_result(
inflight,
next_state,
task_func,
)
# Place ProcessingResult immediately if we're skipping awaiting futures
if skip_awaiting_futures:
_place_processing_result(
inflight,
next_state,
task_func,
)
for name, futures in task_produced_futures.items():
# How many futures were produced in the executed task,
# tagged by producer name
Expand Down Expand Up @@ -782,12 +782,10 @@ def _task_execution_complete(
execution_end_time: float,
task_func: Task[Any, Any] | None,
futures_start_time: float | None = None,
# FIXME(benm): Temp option to skip placing a task in processed_tasks.
# This is for tasks with pending producer futures, as we still want to record
# metrics as usual but want to have a `ProcessingResult` placed immediately
# while we troubleshoot why futures are never being marked as done.
passthrough: bool = False,
) -> None:
# If passthrough is enabled, we already placed a ProcessingResult
# as soon as the task function finished execution
if not passthrough:
_place_processing_result(
inflight,
Expand All @@ -812,4 +810,5 @@ def _task_execution_complete(
max_task_count,
processing_pool_name,
process_type,
skip_awaiting_futures,
)
Loading
Loading