mirror of
https://github.com/nesquena/hermes-webui.git
synced 2026-05-25 03:00:23 +00:00
fix: shorten cron profile lock for manual runs
This commit is contained in:
+67
-3
@@ -313,6 +313,71 @@ def _profile_home_for_cron_job(job: dict):
|
||||
return get_hermes_home_for_profile(raw)
|
||||
|
||||
|
||||
def _cron_job_subprocess_main(job, execution_profile_home, result_queue):
|
||||
"""Run one cron job inside a child process pinned to a profile home."""
|
||||
try:
|
||||
def _run():
|
||||
from cron.scheduler import run_job
|
||||
|
||||
return run_job(job)
|
||||
|
||||
if execution_profile_home is None:
|
||||
result = _run()
|
||||
else:
|
||||
from api.profiles import cron_profile_context_for_home
|
||||
|
||||
with cron_profile_context_for_home(execution_profile_home):
|
||||
result = _run()
|
||||
result_queue.put(("ok", result))
|
||||
except BaseException as exc: # pragma: no cover - surfaced in parent
|
||||
import traceback
|
||||
|
||||
result_queue.put(("error", f"{type(exc).__name__}: {exc}", traceback.format_exc()))
|
||||
|
||||
|
||||
def _run_cron_job_in_profile_subprocess(job, execution_profile_home):
|
||||
"""Execute cron.scheduler.run_job without holding the parent cron env lock.
|
||||
|
||||
cron.scheduler/cron.jobs still rely on process-global HERMES_HOME and module
|
||||
constants, so running the job body in a child process gives each long cron
|
||||
execution its own globals. The parent process only uses cron_profile_context
|
||||
for short metadata reads/writes and remains responsive to unrelated cron UI
|
||||
and API calls while the job runs.
|
||||
"""
|
||||
import multiprocessing
|
||||
import queue
|
||||
|
||||
ctx = multiprocessing.get_context("fork")
|
||||
result_queue = ctx.Queue(maxsize=1)
|
||||
process = ctx.Process(
|
||||
target=_cron_job_subprocess_main,
|
||||
args=(job, execution_profile_home, result_queue),
|
||||
)
|
||||
process.start()
|
||||
process.join()
|
||||
|
||||
try:
|
||||
status, *payload = result_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
status = "error"
|
||||
payload = [
|
||||
f"cron run subprocess exited with code {process.exitcode}",
|
||||
"",
|
||||
]
|
||||
finally:
|
||||
result_queue.close()
|
||||
result_queue.join_thread()
|
||||
|
||||
if status == "ok":
|
||||
return payload[0]
|
||||
|
||||
message = payload[0]
|
||||
traceback_text = payload[1] if len(payload) > 1 else ""
|
||||
if traceback_text:
|
||||
logger.error("Manual cron subprocess failed:\n%s", traceback_text)
|
||||
raise RuntimeError(message)
|
||||
|
||||
|
||||
def _run_cron_tracked(job, profile_home=None, execution_profile_home=None):
|
||||
"""Wrapper that tracks running state around cron.scheduler.run_job.
|
||||
|
||||
@@ -321,7 +386,6 @@ def _run_cron_tracked(job, profile_home=None, execution_profile_home=None):
|
||||
agent config/.env while running. When no job profile is selected, both homes
|
||||
are the same and legacy server-default behavior is preserved.
|
||||
"""
|
||||
from cron.scheduler import run_job # import here — runs inside a worker thread
|
||||
from cron.jobs import mark_job_run, save_job_output
|
||||
|
||||
job_id = job.get("id", "")
|
||||
@@ -336,8 +400,8 @@ def _run_cron_tracked(job, profile_home=None, execution_profile_home=None):
|
||||
return fn()
|
||||
|
||||
try:
|
||||
success, output, final_response, error = _with_cron_home(
|
||||
execution_profile_home, lambda: run_job(job)
|
||||
success, output, final_response, error = _run_cron_job_in_profile_subprocess(
|
||||
job, execution_profile_home
|
||||
)
|
||||
|
||||
# Persist output and run metadata back to the job's owning cron store,
|
||||
|
||||
@@ -28,9 +28,9 @@ class TestRunCronTrackedImport:
|
||||
"""_run_cron_tracked must be self-contained — it runs in a worker thread."""
|
||||
|
||||
def test_run_job_imported_inside_function(self):
|
||||
"""run_job must be imported inside _run_cron_tracked, not relied on
|
||||
"""run_job must be imported inside the subprocess target, not relied on
|
||||
from a caller's local scope."""
|
||||
src = _get_function_source("_run_cron_tracked")
|
||||
src = _get_function_source("_cron_job_subprocess_main")
|
||||
tree = ast.parse(src)
|
||||
names_used = set()
|
||||
|
||||
@@ -86,7 +86,12 @@ class TestRunCronTrackedImport:
|
||||
"_run_cron_tracked to avoid the NameError in worker threads."
|
||||
)
|
||||
|
||||
def test_run_cron_tracked_calls_run_job(self):
|
||||
"""Sanity: the function still actually calls run_job."""
|
||||
def test_run_cron_tracked_calls_run_job_helper(self):
|
||||
"""Sanity: the function still delegates to the cron job runner."""
|
||||
src = _get_function_source("_run_cron_tracked")
|
||||
assert "run_job" in src, "_run_cron_tracked should call run_job"
|
||||
assert "_run_cron_job_in_profile_subprocess" in src
|
||||
|
||||
def test_cron_subprocess_target_calls_run_job(self):
|
||||
"""Sanity: the subprocess target still actually calls run_job."""
|
||||
src = _get_function_source("_cron_job_subprocess_main")
|
||||
assert "run_job" in src, "cron subprocess target should call run_job"
|
||||
|
||||
@@ -0,0 +1,113 @@
|
||||
import sys
|
||||
import threading
|
||||
import types
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def _install_fake_cron(monkeypatch, run_job, events):
|
||||
cron_pkg = types.ModuleType("cron")
|
||||
cron_pkg.__path__ = []
|
||||
|
||||
cron_jobs = types.ModuleType("cron.jobs")
|
||||
cron_jobs.HERMES_DIR = Path("/tmp/hermes")
|
||||
cron_jobs.CRON_DIR = cron_jobs.HERMES_DIR / "cron"
|
||||
cron_jobs.JOBS_FILE = cron_jobs.CRON_DIR / "jobs.json"
|
||||
cron_jobs.OUTPUT_DIR = cron_jobs.CRON_DIR / "output"
|
||||
cron_jobs.save_job_output = lambda job_id, output: events.append(("save", job_id, output))
|
||||
cron_jobs.mark_job_run = lambda job_id, success, error=None: events.append(("mark", job_id, success, error))
|
||||
|
||||
cron_scheduler = types.ModuleType("cron.scheduler")
|
||||
cron_scheduler._hermes_home = Path("/tmp/hermes")
|
||||
cron_scheduler._LOCK_DIR = cron_scheduler._hermes_home / "cron"
|
||||
cron_scheduler._LOCK_FILE = cron_scheduler._LOCK_DIR / ".tick.lock"
|
||||
cron_scheduler.run_job = run_job
|
||||
|
||||
monkeypatch.setitem(sys.modules, "cron", cron_pkg)
|
||||
monkeypatch.setitem(sys.modules, "cron.jobs", cron_jobs)
|
||||
monkeypatch.setitem(sys.modules, "cron.scheduler", cron_scheduler)
|
||||
return cron_jobs, cron_scheduler
|
||||
|
||||
|
||||
def test_manual_cron_run_does_not_hold_profile_lock_for_job_duration(tmp_path, monkeypatch):
|
||||
"""A long manual run must not freeze unrelated cron/profile operations.
|
||||
|
||||
The parent WebUI process still needs the cron profile lock for short metadata
|
||||
writes, but the potentially minutes-long run_job body should execute outside
|
||||
that process-wide critical section.
|
||||
"""
|
||||
import api.routes as routes
|
||||
from api.profiles import cron_profile_context_for_home
|
||||
|
||||
events = []
|
||||
run_started = threading.Event()
|
||||
release_run = threading.Event()
|
||||
|
||||
def fake_run_job_subprocess(job, execution_profile_home):
|
||||
events.append(("run", job["id"], str(execution_profile_home)))
|
||||
run_started.set()
|
||||
assert release_run.wait(2), "test timed out waiting to release fake cron run"
|
||||
return True, "output", "final", None
|
||||
|
||||
_install_fake_cron(monkeypatch, lambda job: (True, "unused", "unused", None), events)
|
||||
monkeypatch.setattr(routes, "_run_cron_job_in_profile_subprocess", fake_run_job_subprocess)
|
||||
|
||||
job_home = tmp_path / "owner"
|
||||
exec_home = tmp_path / "exec"
|
||||
other_home = tmp_path / "other"
|
||||
|
||||
routes._mark_cron_running("job1574")
|
||||
worker = threading.Thread(
|
||||
target=routes._run_cron_tracked,
|
||||
args=({"id": "job1574"}, job_home, exec_home),
|
||||
)
|
||||
worker.start()
|
||||
assert run_started.wait(2), "fake run_job did not start"
|
||||
|
||||
contender_entered = threading.Event()
|
||||
|
||||
def contender():
|
||||
with cron_profile_context_for_home(other_home):
|
||||
events.append(("contender", str(other_home)))
|
||||
contender_entered.set()
|
||||
|
||||
contender_thread = threading.Thread(target=contender)
|
||||
contender_thread.start()
|
||||
|
||||
assert contender_entered.wait(0.5), (
|
||||
"cron_profile_context_for_home stayed blocked while run_job was active; "
|
||||
"the global cron profile lock is still held for the full job duration"
|
||||
)
|
||||
|
||||
release_run.set()
|
||||
worker.join(2)
|
||||
contender_thread.join(2)
|
||||
|
||||
assert not worker.is_alive()
|
||||
assert not contender_thread.is_alive()
|
||||
assert ("run", "job1574", str(exec_home)) in events
|
||||
assert ("save", "job1574", "output") in events
|
||||
assert ("mark", "job1574", True, None) in events
|
||||
assert routes._is_cron_running("job1574") == (False, 0.0)
|
||||
|
||||
|
||||
def test_cron_job_subprocess_executes_under_selected_profile_home(tmp_path, monkeypatch):
|
||||
import api.routes as routes
|
||||
|
||||
def fake_run_job(job):
|
||||
import cron.scheduler as scheduler
|
||||
|
||||
return True, str(scheduler._hermes_home), "final", None
|
||||
|
||||
events = []
|
||||
_, cron_scheduler = _install_fake_cron(monkeypatch, fake_run_job, events)
|
||||
exec_home = tmp_path / "exec-profile"
|
||||
|
||||
success, output, final_response, error = routes._run_cron_job_in_profile_subprocess(
|
||||
{"id": "job1574"}, exec_home
|
||||
)
|
||||
|
||||
assert success is True
|
||||
assert output == str(exec_home)
|
||||
assert final_response == "final"
|
||||
assert error is None
|
||||
assert cron_scheduler._hermes_home == Path("/tmp/hermes")
|
||||
@@ -184,7 +184,12 @@ def test_manual_cron_run_uses_execution_profile_but_persists_to_owning_store(mon
|
||||
cron_scheduler = types.ModuleType("cron.scheduler")
|
||||
cron_scheduler.run_job = lambda job: events.append(("run", job["id"])) or (True, "output", "final", None)
|
||||
|
||||
def fake_subprocess_run(job, execution_profile_home):
|
||||
events.append(("run", job["id"], str(execution_profile_home)))
|
||||
return True, "output", "final", None
|
||||
|
||||
monkeypatch.setattr(profiles, "cron_profile_context_for_home", Ctx)
|
||||
monkeypatch.setattr(routes, "_run_cron_job_in_profile_subprocess", fake_subprocess_run)
|
||||
monkeypatch.setitem(sys.modules, "cron", cron_pkg)
|
||||
monkeypatch.setitem(sys.modules, "cron.jobs", cron_jobs)
|
||||
monkeypatch.setitem(sys.modules, "cron.scheduler", cron_scheduler)
|
||||
@@ -197,9 +202,7 @@ def test_manual_cron_run_uses_execution_profile_but_persists_to_owning_store(mon
|
||||
)
|
||||
|
||||
assert events == [
|
||||
("enter", "/hermes/profiles/research"),
|
||||
("run", "job617"),
|
||||
("exit", "/hermes/profiles/research"),
|
||||
("run", "job617", "/hermes/profiles/research"),
|
||||
("enter", "/hermes/default"),
|
||||
("save", "job617", "output"),
|
||||
("mark", "job617", True, None),
|
||||
|
||||
@@ -288,30 +288,24 @@ def test_scheduler_run_job_wrapper_does_not_reenter_manual_cron_context(tmp_path
|
||||
|
||||
|
||||
def test_cron_worker_does_not_silently_fall_back_on_profile_context_failure():
|
||||
"""_run_cron_tracked must NOT silently set ctx=None when
|
||||
cron_profile_context_for_home(...).__enter__() raises.
|
||||
"""The subprocess target must not fall back to an unpinned cron run.
|
||||
|
||||
A silent fallback in the worker thread would leave the job running
|
||||
unpinned against process-global HERMES_HOME, silently corrupting
|
||||
cross-profile state — the same class of bug as #1573. We'd rather
|
||||
let the exception propagate and kill the worker thread than risk
|
||||
that.
|
||||
|
||||
Source-level assertion to catch any future re-introduction of the
|
||||
over-broad except clause around the context setup.
|
||||
A silent fallback would leave the job running against process-global
|
||||
HERMES_HOME, silently corrupting cross-profile state — the same class of bug
|
||||
as #1573. The child process may report the exception to the parent, but it
|
||||
must not continue into run_job outside the requested profile context.
|
||||
"""
|
||||
from pathlib import Path
|
||||
src = (Path(__file__).resolve().parent.parent / "api" / "routes.py").read_text(encoding="utf-8")
|
||||
|
||||
idx = src.find("def _run_cron_tracked(job")
|
||||
assert idx != -1, "_run_cron_tracked not found"
|
||||
idx = src.find("def _cron_job_subprocess_main(job")
|
||||
assert idx != -1, "_cron_job_subprocess_main not found"
|
||||
body = src[idx : idx + 2000]
|
||||
|
||||
# The profile-context setup must NOT be wrapped in try/except that
|
||||
# silently falls back to ctx=None.
|
||||
assert "except Exception" not in body[:body.find("run_job(job)")], (
|
||||
"_run_cron_tracked silently falls back to ctx=None when "
|
||||
"cron_profile_context_for_home(...).__enter__() raises. That leaves "
|
||||
"the worker thread unpinned against process-global HERMES_HOME. "
|
||||
"Let the exception propagate rather than corrupt cross-profile state."
|
||||
assert "with cron_profile_context_for_home(execution_profile_home):" in body
|
||||
assert "result = _run()" in body
|
||||
assert "ctx = None" not in body
|
||||
assert "except Exception" not in body[:body.find("with cron_profile_context_for_home")], (
|
||||
"cron subprocess target appears to catch profile-context setup before "
|
||||
"entering the context; do not fall back to an unpinned run_job call."
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user