diff --git a/api/routes.py b/api/routes.py index d60a24cc..c39d064e 100644 --- a/api/routes.py +++ b/api/routes.py @@ -313,6 +313,71 @@ def _profile_home_for_cron_job(job: dict): return get_hermes_home_for_profile(raw) +def _cron_job_subprocess_main(job, execution_profile_home, result_queue): + """Run one cron job inside a child process pinned to a profile home.""" + try: + def _run(): + from cron.scheduler import run_job + + return run_job(job) + + if execution_profile_home is None: + result = _run() + else: + from api.profiles import cron_profile_context_for_home + + with cron_profile_context_for_home(execution_profile_home): + result = _run() + result_queue.put(("ok", result)) + except BaseException as exc: # pragma: no cover - surfaced in parent + import traceback + + result_queue.put(("error", f"{type(exc).__name__}: {exc}", traceback.format_exc())) + + +def _run_cron_job_in_profile_subprocess(job, execution_profile_home): + """Execute cron.scheduler.run_job without holding the parent cron env lock. + + cron.scheduler/cron.jobs still rely on process-global HERMES_HOME and module + constants, so running the job body in a child process gives each long cron + execution its own globals. The parent process only uses cron_profile_context + for short metadata reads/writes and remains responsive to unrelated cron UI + and API calls while the job runs. + """ + import multiprocessing + import queue + + ctx = multiprocessing.get_context("fork") + result_queue = ctx.Queue(maxsize=1) + process = ctx.Process( + target=_cron_job_subprocess_main, + args=(job, execution_profile_home, result_queue), + ) + process.start() + process.join() + + try: + status, *payload = result_queue.get_nowait() + except queue.Empty: + status = "error" + payload = [ + f"cron run subprocess exited with code {process.exitcode}", + "", + ] + finally: + result_queue.close() + result_queue.join_thread() + + if status == "ok": + return payload[0] + + message = payload[0] + traceback_text = payload[1] if len(payload) > 1 else "" + if traceback_text: + logger.error("Manual cron subprocess failed:\n%s", traceback_text) + raise RuntimeError(message) + + def _run_cron_tracked(job, profile_home=None, execution_profile_home=None): """Wrapper that tracks running state around cron.scheduler.run_job. @@ -321,7 +386,6 @@ def _run_cron_tracked(job, profile_home=None, execution_profile_home=None): agent config/.env while running. When no job profile is selected, both homes are the same and legacy server-default behavior is preserved. """ - from cron.scheduler import run_job # import here — runs inside a worker thread from cron.jobs import mark_job_run, save_job_output job_id = job.get("id", "") @@ -336,8 +400,8 @@ def _run_cron_tracked(job, profile_home=None, execution_profile_home=None): return fn() try: - success, output, final_response, error = _with_cron_home( - execution_profile_home, lambda: run_job(job) + success, output, final_response, error = _run_cron_job_in_profile_subprocess( + job, execution_profile_home ) # Persist output and run metadata back to the job's owning cron store, diff --git a/tests/test_cron_run_job_import.py b/tests/test_cron_run_job_import.py index d0533fa3..b1bec76a 100644 --- a/tests/test_cron_run_job_import.py +++ b/tests/test_cron_run_job_import.py @@ -28,9 +28,9 @@ class TestRunCronTrackedImport: """_run_cron_tracked must be self-contained — it runs in a worker thread.""" def test_run_job_imported_inside_function(self): - """run_job must be imported inside _run_cron_tracked, not relied on + """run_job must be imported inside the subprocess target, not relied on from a caller's local scope.""" - src = _get_function_source("_run_cron_tracked") + src = _get_function_source("_cron_job_subprocess_main") tree = ast.parse(src) names_used = set() @@ -86,7 +86,12 @@ class TestRunCronTrackedImport: "_run_cron_tracked to avoid the NameError in worker threads." ) - def test_run_cron_tracked_calls_run_job(self): - """Sanity: the function still actually calls run_job.""" + def test_run_cron_tracked_calls_run_job_helper(self): + """Sanity: the function still delegates to the cron job runner.""" src = _get_function_source("_run_cron_tracked") - assert "run_job" in src, "_run_cron_tracked should call run_job" + assert "_run_cron_job_in_profile_subprocess" in src + + def test_cron_subprocess_target_calls_run_job(self): + """Sanity: the subprocess target still actually calls run_job.""" + src = _get_function_source("_cron_job_subprocess_main") + assert "run_job" in src, "cron subprocess target should call run_job" diff --git a/tests/test_issue1574_cron_profile_lock.py b/tests/test_issue1574_cron_profile_lock.py new file mode 100644 index 00000000..a705de38 --- /dev/null +++ b/tests/test_issue1574_cron_profile_lock.py @@ -0,0 +1,113 @@ +import sys +import threading +import types +from pathlib import Path + + +def _install_fake_cron(monkeypatch, run_job, events): + cron_pkg = types.ModuleType("cron") + cron_pkg.__path__ = [] + + cron_jobs = types.ModuleType("cron.jobs") + cron_jobs.HERMES_DIR = Path("/tmp/hermes") + cron_jobs.CRON_DIR = cron_jobs.HERMES_DIR / "cron" + cron_jobs.JOBS_FILE = cron_jobs.CRON_DIR / "jobs.json" + cron_jobs.OUTPUT_DIR = cron_jobs.CRON_DIR / "output" + cron_jobs.save_job_output = lambda job_id, output: events.append(("save", job_id, output)) + cron_jobs.mark_job_run = lambda job_id, success, error=None: events.append(("mark", job_id, success, error)) + + cron_scheduler = types.ModuleType("cron.scheduler") + cron_scheduler._hermes_home = Path("/tmp/hermes") + cron_scheduler._LOCK_DIR = cron_scheduler._hermes_home / "cron" + cron_scheduler._LOCK_FILE = cron_scheduler._LOCK_DIR / ".tick.lock" + cron_scheduler.run_job = run_job + + monkeypatch.setitem(sys.modules, "cron", cron_pkg) + monkeypatch.setitem(sys.modules, "cron.jobs", cron_jobs) + monkeypatch.setitem(sys.modules, "cron.scheduler", cron_scheduler) + return cron_jobs, cron_scheduler + + +def test_manual_cron_run_does_not_hold_profile_lock_for_job_duration(tmp_path, monkeypatch): + """A long manual run must not freeze unrelated cron/profile operations. + + The parent WebUI process still needs the cron profile lock for short metadata + writes, but the potentially minutes-long run_job body should execute outside + that process-wide critical section. + """ + import api.routes as routes + from api.profiles import cron_profile_context_for_home + + events = [] + run_started = threading.Event() + release_run = threading.Event() + + def fake_run_job_subprocess(job, execution_profile_home): + events.append(("run", job["id"], str(execution_profile_home))) + run_started.set() + assert release_run.wait(2), "test timed out waiting to release fake cron run" + return True, "output", "final", None + + _install_fake_cron(monkeypatch, lambda job: (True, "unused", "unused", None), events) + monkeypatch.setattr(routes, "_run_cron_job_in_profile_subprocess", fake_run_job_subprocess) + + job_home = tmp_path / "owner" + exec_home = tmp_path / "exec" + other_home = tmp_path / "other" + + routes._mark_cron_running("job1574") + worker = threading.Thread( + target=routes._run_cron_tracked, + args=({"id": "job1574"}, job_home, exec_home), + ) + worker.start() + assert run_started.wait(2), "fake run_job did not start" + + contender_entered = threading.Event() + + def contender(): + with cron_profile_context_for_home(other_home): + events.append(("contender", str(other_home))) + contender_entered.set() + + contender_thread = threading.Thread(target=contender) + contender_thread.start() + + assert contender_entered.wait(0.5), ( + "cron_profile_context_for_home stayed blocked while run_job was active; " + "the global cron profile lock is still held for the full job duration" + ) + + release_run.set() + worker.join(2) + contender_thread.join(2) + + assert not worker.is_alive() + assert not contender_thread.is_alive() + assert ("run", "job1574", str(exec_home)) in events + assert ("save", "job1574", "output") in events + assert ("mark", "job1574", True, None) in events + assert routes._is_cron_running("job1574") == (False, 0.0) + + +def test_cron_job_subprocess_executes_under_selected_profile_home(tmp_path, monkeypatch): + import api.routes as routes + + def fake_run_job(job): + import cron.scheduler as scheduler + + return True, str(scheduler._hermes_home), "final", None + + events = [] + _, cron_scheduler = _install_fake_cron(monkeypatch, fake_run_job, events) + exec_home = tmp_path / "exec-profile" + + success, output, final_response, error = routes._run_cron_job_in_profile_subprocess( + {"id": "job1574"}, exec_home + ) + + assert success is True + assert output == str(exec_home) + assert final_response == "final" + assert error is None + assert cron_scheduler._hermes_home == Path("/tmp/hermes") diff --git a/tests/test_issue617_cron_profile_selector.py b/tests/test_issue617_cron_profile_selector.py index 4d6e9643..1bd51d87 100644 --- a/tests/test_issue617_cron_profile_selector.py +++ b/tests/test_issue617_cron_profile_selector.py @@ -184,7 +184,12 @@ def test_manual_cron_run_uses_execution_profile_but_persists_to_owning_store(mon cron_scheduler = types.ModuleType("cron.scheduler") cron_scheduler.run_job = lambda job: events.append(("run", job["id"])) or (True, "output", "final", None) + def fake_subprocess_run(job, execution_profile_home): + events.append(("run", job["id"], str(execution_profile_home))) + return True, "output", "final", None + monkeypatch.setattr(profiles, "cron_profile_context_for_home", Ctx) + monkeypatch.setattr(routes, "_run_cron_job_in_profile_subprocess", fake_subprocess_run) monkeypatch.setitem(sys.modules, "cron", cron_pkg) monkeypatch.setitem(sys.modules, "cron.jobs", cron_jobs) monkeypatch.setitem(sys.modules, "cron.scheduler", cron_scheduler) @@ -197,9 +202,7 @@ def test_manual_cron_run_uses_execution_profile_but_persists_to_owning_store(mon ) assert events == [ - ("enter", "/hermes/profiles/research"), - ("run", "job617"), - ("exit", "/hermes/profiles/research"), + ("run", "job617", "/hermes/profiles/research"), ("enter", "/hermes/default"), ("save", "job617", "output"), ("mark", "job617", True, None), diff --git a/tests/test_scheduled_jobs_profile_isolation.py b/tests/test_scheduled_jobs_profile_isolation.py index 04f69224..99aab51c 100644 --- a/tests/test_scheduled_jobs_profile_isolation.py +++ b/tests/test_scheduled_jobs_profile_isolation.py @@ -288,30 +288,24 @@ def test_scheduler_run_job_wrapper_does_not_reenter_manual_cron_context(tmp_path def test_cron_worker_does_not_silently_fall_back_on_profile_context_failure(): - """_run_cron_tracked must NOT silently set ctx=None when - cron_profile_context_for_home(...).__enter__() raises. + """The subprocess target must not fall back to an unpinned cron run. - A silent fallback in the worker thread would leave the job running - unpinned against process-global HERMES_HOME, silently corrupting - cross-profile state — the same class of bug as #1573. We'd rather - let the exception propagate and kill the worker thread than risk - that. - - Source-level assertion to catch any future re-introduction of the - over-broad except clause around the context setup. + A silent fallback would leave the job running against process-global + HERMES_HOME, silently corrupting cross-profile state — the same class of bug + as #1573. The child process may report the exception to the parent, but it + must not continue into run_job outside the requested profile context. """ from pathlib import Path src = (Path(__file__).resolve().parent.parent / "api" / "routes.py").read_text(encoding="utf-8") - idx = src.find("def _run_cron_tracked(job") - assert idx != -1, "_run_cron_tracked not found" + idx = src.find("def _cron_job_subprocess_main(job") + assert idx != -1, "_cron_job_subprocess_main not found" body = src[idx : idx + 2000] - # The profile-context setup must NOT be wrapped in try/except that - # silently falls back to ctx=None. - assert "except Exception" not in body[:body.find("run_job(job)")], ( - "_run_cron_tracked silently falls back to ctx=None when " - "cron_profile_context_for_home(...).__enter__() raises. That leaves " - "the worker thread unpinned against process-global HERMES_HOME. " - "Let the exception propagate rather than corrupt cross-profile state." + assert "with cron_profile_context_for_home(execution_profile_home):" in body + assert "result = _run()" in body + assert "ctx = None" not in body + assert "except Exception" not in body[:body.find("with cron_profile_context_for_home")], ( + "cron subprocess target appears to catch profile-context setup before " + "entering the context; do not fall back to an unpinned run_job call." )