From 9f381ea378adbe5fbf00fbefda028454f2e20231 Mon Sep 17 00:00:00 2001 From: Eric Lee Date: Thu, 11 Jun 2026 19:36:27 -0700 Subject: [PATCH] bridge: port the concurrent-session registry; publish the bridge ID (#284) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two Python bridges in the same workspace did not dedup — the TS updateSessionBridgeId() publish was a TODO-noop and the registry module didn't exist. - src/utils/concurrent_sessions.py ports utils/concurrentSessions.ts: PID files under ~/.clawcodex/sessions/, cleanup-before-write ordering, session-switch refresh, strict .json filename guard (TS #34210), pid<=1 probe guard, WSL-aware stale sweep chained after registration (TS main.tsx); BG_SESSIONS/UDS_INBOX subsetting and the EPERM-alive probe divergence are documented - set_repl_bridge_handle publishes get_self_bridge_compat_id() on every set/clear so peer enumeration dedups a session reachable over both UDS and bridge (clearing prevents a stale ID from suppressing a legitimately-remote session after reconnect) - remote_io wires session_id from bootstrap get_session_id() (the TS getSessionId() parity TODO) - register_session() wired into the TUI/headless/REPL entrypoints, fail-soft Closes #284 Co-Authored-By: Claude Opus 4.7 --- src/bridge/repl_bridge_handle.py | 42 +++--- src/entrypoints/headless.py | 9 ++ src/entrypoints/tui.py | 9 ++ src/repl/core.py | 8 + src/transports/remote_io.py | 12 +- src/utils/concurrent_sessions.py | 226 +++++++++++++++++++++++++++++ tests/test_concurrent_sessions.py | 161 ++++++++++++++++++++ tests/transports/test_remote_io.py | 7 +- 8 files changed, 444 insertions(+), 30 deletions(-) create mode 100644 src/utils/concurrent_sessions.py create mode 100644 tests/test_concurrent_sessions.py diff --git a/src/bridge/repl_bridge_handle.py b/src/bridge/repl_bridge_handle.py index e7340a4a..dd61fa23 100644 --- a/src/bridge/repl_bridge_handle.py +++ b/src/bridge/repl_bridge_handle.py @@ -11,17 +11,10 @@ Set from the orchestrator (Phase 5/6) when init completes; cleared on teardown. Reading is best-effort: ``None`` means no bridge is connected. -⚠️ **Phase 1 caveat — multi-peer dedup is BROKEN until Phase 2** ⚠️ - -The TS version calls ``utils/concurrentSessions.updateSessionBridgeId()`` -on every set, which publishes the local bridge ID so other peers can -dedup. That cross-folder dep lives in Phase 2 (per refactoring plan §5 -second-wave list). The Python port replaces it with a debug-logged -TODO-noop until Phase 2 lands the publisher. Operationally this means -**two concurrent Python bridges in the same workspace will not dedup -against each other** — both will appear in claude.ai/code session lists. -The public API of this module is unaffected; only the cross-process -side effect is missing. +Every set/clear publishes the local bridge ID to the concurrent-session +PID registry (``src/utils/concurrent_sessions.py``, #284) so other peers +can dedup us out of session lists — mirroring TS +``utils/concurrentSessions.updateSessionBridgeId()``. """ from __future__ import annotations @@ -39,24 +32,23 @@ def set_repl_bridge_handle(h: ReplBridgeHandle | None) -> None: """Register (or clear) the active REPL bridge handle. - Mirrors TS ``setReplBridgeHandle`` on ``replBridgeHandle.ts:18-23``. - - The TS version also calls - ``updateSessionBridgeId(getSelfBridgeCompatId() ?? null)`` to publish - the local bridge ID for cross-peer dedup. That helper doesn't exist - in Python yet (Phase 2). Until then, calling ``set_repl_bridge_handle`` - just updates the in-process pointer; cross-process dedup is a no-op. + Mirrors TS ``setReplBridgeHandle`` on ``replBridgeHandle.ts:18-23``, + including the ``updateSessionBridgeId(getSelfBridgeCompatId() ?? + null)`` publish: setting the handle records our bridge compat ID in + the PID registry so peer enumeration dedups us; clearing publishes + ``None`` so a stale ID doesn't suppress a legitimately-remote + session after reconnect (#284). """ global _handle _handle = h - # TODO(phase2): once src/utils/concurrent_sessions.py exists, call - # update_session_bridge_id(get_self_bridge_compat_id()) - # to publish the local bridge ID so other peers can dedup us out. - # Until then, cross-peer dedup is silently broken — a debug log keeps - # the gap visible during development. + try: + from src.utils.concurrent_sessions import update_session_bridge_id + + update_session_bridge_id(get_self_bridge_compat_id()) + except Exception: + logger.debug('[bridge:handle] bridge-id publish failed', exc_info=True) logger.debug( - '[bridge:handle] %s (multi-peer dedup not yet wired — Phase 2)', - 'set' if h is not None else 'cleared', + '[bridge:handle] %s', 'set' if h is not None else 'cleared', ) diff --git a/src/entrypoints/headless.py b/src/entrypoints/headless.py index cbd2ad36..d4600df3 100644 --- a/src/entrypoints/headless.py +++ b/src/entrypoints/headless.py @@ -198,6 +198,15 @@ def run_headless(options: HeadlessOptions) -> int: abort_controller=abort_controller, ) tool_context.options.is_non_interactive_session = True + + # #284: publish this session's PID file so peers can enumerate and + # dedup concurrent sessions (best-effort, never blocks startup). + try: + from src.utils.concurrent_sessions import register_session + + register_session() + except Exception: + pass if options.skip_permissions or effective_mode == "bypassPermissions": tool_context.allow_docs = True tool_context.permission_handler = None diff --git a/src/entrypoints/tui.py b/src/entrypoints/tui.py index 7f421a84..978bbf6e 100644 --- a/src/entrypoints/tui.py +++ b/src/entrypoints/tui.py @@ -142,6 +142,15 @@ def run_tui(options: TUIOptions) -> int: tool_context.allow_docs = True tool_context.options.is_non_interactive_session = False + # #284: publish this session's PID file so peers can enumerate and + # dedup concurrent sessions (best-effort, never blocks startup). + try: + from src.utils.concurrent_sessions import register_session + + register_session() + except Exception: + pass + # Build and run app --------------------------------------------------- from src.tui.app import ClawCodexTUI diff --git a/src/repl/core.py b/src/repl/core.py index 10c7ba13..de9c8809 100644 --- a/src/repl/core.py +++ b/src/repl/core.py @@ -467,6 +467,14 @@ def _get_mcp_servers_for_prompt() -> list[str]: workspace_root=Path.cwd(), permission_context=_perm_setup.context, ) + # #284: publish this session's PID file so peers can enumerate + # and dedup concurrent sessions (best-effort, never blocks startup). + try: + from src.utils.concurrent_sessions import register_session + + register_session() + except Exception: + pass self.tool_context.ask_user = self._ask_user_questions # Permission handler with status control for proper input handling self._current_status = None diff --git a/src/transports/remote_io.py b/src/transports/remote_io.py index 3381752d..bf7c09fe 100644 --- a/src/transports/remote_io.py +++ b/src/transports/remote_io.py @@ -139,9 +139,15 @@ def build_headers() -> dict[str, str]: if get_session_ingress_auth_token() is None: logger.error("[remote-io] No session ingress token available") - # TODO(parity): wire session_id from bootstrap once the - # equivalent of TS getSessionId() is ported. Currently None. - session_id: str | None = None + # TS getSessionId() parity (#284): the bootstrap session ID + # identifies this peer to the transport layer. Fail-soft — a + # broken bootstrap import must not block the transport. + try: + from src.bootstrap.state import get_session_id + + session_id: str | None = get_session_id() + except Exception: + session_id = None self._transport: Transport = get_transport_for_url( self._url_str, diff --git a/src/utils/concurrent_sessions.py b/src/utils/concurrent_sessions.py new file mode 100644 index 00000000..b21bea42 --- /dev/null +++ b/src/utils/concurrent_sessions.py @@ -0,0 +1,226 @@ +"""Concurrent-session PID registry. + +Port of ``typescript/src/utils/concurrentSessions.ts`` (#284): each +top-level session writes ``~/.clawcodex/sessions/.json`` so peers +can enumerate live sessions and — the bridge use case — dedup a session +reachable over both UDS and bridge (local wins). The bridge handle +publishes its compat ID here via :func:`update_session_bridge_id`; +cleared on teardown so a stale ID doesn't suppress a legitimately-remote +session after reconnect. + +Deliberate subsetting vs TS: the ``BG_SESSIONS`` / ``UDS_INBOX`` +feature-gated fields (session names, log paths, messaging sockets, +activity pushes) are omitted — those subsystems aren't ported. +Everything is best-effort and fail-soft: registry problems must never +break a session. +""" + +from __future__ import annotations + +import json +import logging +import os +import re +import sys +import time +from pathlib import Path + +logger = logging.getLogger(__name__) + +_PID_FILE_RE = re.compile(r"^\d+\.json$") + +_registered = False +_unsubscribe_session_switch = None + + +def _sessions_dir() -> Path: + # Resolve through the module attribute at call time so test isolation + # that re-points GLOBAL_CONFIG_DIR covers this module too. + from src import config as config_mod + + return Path(config_mod.GLOBAL_CONFIG_DIR) / "sessions" + + +def _pid_file() -> Path: + return _sessions_dir() / f"{os.getpid()}.json" + + +def _get_agent_id() -> str | None: + """Teammate/subagent marker. This port's teammates run in-process + today and never reach the session entrypoints, so the skip is a + forward-compat guard: if process-spawned teammates land, set + ``CLAUDE_CODE_AGENT_ID`` in their environment (the TS env mechanism + for process-based teammates) and they'll be excluded here.""" + return os.environ.get("CLAUDE_CODE_AGENT_ID") or None + + +def register_session() -> bool: + """Write this session's PID file and register cleanup-on-exit. + + Registers top-level sessions only — teammates/subagents are skipped + (they'd conflate swarm usage with genuine concurrency). Returns True + if registered. Errors are logged at debug, never raised. + """ + global _registered, _unsubscribe_session_switch + if _get_agent_id() is not None: + return False + if _registered: + return True + + from src.bootstrap.state import ( + get_original_cwd, + get_session_id, + on_session_switch, + ) + from src.utils.graceful_shutdown import register_cleanup + + pid_file = _pid_file() + + def _cleanup() -> None: + try: + pid_file.unlink() + except OSError: + pass # ENOENT is fine (already deleted or never written) + + # Registered BEFORE the write (TS ordering): if anything after the + # write throws, the file is still reaped at exit. + register_cleanup(_cleanup) + + try: + directory = _sessions_dir() + directory.mkdir(parents=True, exist_ok=True, mode=0o700) + # No inner try/except (TS semantics): a directory whose perms + # can't be tightened to 0700 must not receive the PID file — + # the registry carries sessionId/cwd/bridgeSessionId. + os.chmod(directory, 0o700) + payload = { + "pid": os.getpid(), + "sessionId": get_session_id(), + "cwd": get_original_cwd(), + "startedAt": int(time.time() * 1000), + "kind": "interactive", + "entrypoint": os.environ.get("CLAUDE_CODE_ENTRYPOINT"), + } + pid_file.write_text(json.dumps(payload), encoding="utf-8") + # --resume / /resume mutates get_session_id() via the session + # switch; without this, the PID file's sessionId goes stale. + _unsubscribe_session_switch = on_session_switch( + lambda sid: _update_pid_file({"sessionId": sid}) + ) + _registered = True + # TS sweeps stale files at startup right after registering + # (main.tsx chains countConcurrentSessions); without it, + # hard-crashed sessions' PID files would accumulate forever. + count_concurrent_sessions() + return True + except Exception as exc: # noqa: BLE001 — registry is best-effort + logger.debug("[concurrent_sessions] register failed: %s", exc) + return False + + +def _update_pid_file(patch: dict) -> None: + """Merge ``patch`` into this session's PID file. Best-effort: silently + no-op when the file doesn't exist (session not registered) or + read/write fails.""" + try: + pid_file = _pid_file() + data = json.loads(pid_file.read_text(encoding="utf-8")) + if not isinstance(data, dict): + return + data.update(patch) + pid_file.write_text(json.dumps(data), encoding="utf-8") + except Exception as exc: # noqa: BLE001 — Signal.emit propagates + # listener exceptions, so this must be literally best-effort + logger.debug("[concurrent_sessions] update_pid_file failed: %s", exc) + + +def update_session_name(name: str | None) -> None: + if not name: + return + _update_pid_file({"name": name}) + + +def update_session_bridge_id(bridge_session_id: str | None) -> None: + """Record this session's Remote Control session ID so peer + enumeration can dedup: a session reachable over both UDS and bridge + should only appear once (local wins). Cleared (``None``) on bridge + teardown so stale IDs don't suppress a legitimately-remote session + after reconnect.""" + _update_pid_file({"bridgeSessionId": bridge_session_id}) + + +def _is_process_running(pid: int) -> bool: + # TS isProcessRunning returns false for pid <= 1: signal 0 to pid 0 + # targets our own process group (always "alive"), and init/launchd + # would otherwise be counted as a live session forever. + if pid <= 1: + return False + try: + os.kill(pid, 0) + return True + except ProcessLookupError: + return False + except PermissionError: + # Deliberate divergence from TS (which maps EPERM to "not + # running"): EPERM proves the PID exists, just owned by someone + # else — counting it live avoids sweeping a real session's file. + return True + except (OSError, OverflowError): + # OverflowError: a regex-valid but absurd PID exceeds C long. + return False + + +def _is_wsl() -> bool: + if sys.platform != "linux": + return False + try: + with open("/proc/version", encoding="utf-8", errors="replace") as fh: + version = fh.read().lower() + # TS getPlatform matches either marker; custom WSL2 kernels + # often carry only "wsl". + return "microsoft" in version or "wsl" in version + except OSError: + return False + + +def count_concurrent_sessions() -> int: + """Count live concurrent sessions (including this one), sweeping + stale PID files from crashed sessions. Returns 0 on any error.""" + directory = _sessions_dir() + try: + files = os.listdir(directory) + except OSError as exc: + logger.debug("[concurrent_sessions] readdir failed: %s", exc) + return 0 + + count = 0 + for name in files: + # Strict filename guard: only ``.json`` is a candidate — a + # lenient prefix-parse would sweep unrelated files as "stale" + # (TS issue #34210: silent user data loss). + if not _PID_FILE_RE.match(name): + continue + pid = int(name[:-5]) + if pid == os.getpid(): + count += 1 + continue + if _is_process_running(pid): + count += 1 + elif not _is_wsl(): + # Stale file from a crashed session — sweep it. Skip on WSL: + # a Windows PID isn't probeable from WSL and we'd falsely + # delete a live session's file (conservative undercount is + # acceptable; this is telemetry). + try: + (directory / name).unlink() + except OSError: + pass + return count + + +def reset_for_testing() -> None: + global _registered, _unsubscribe_session_switch + _registered = False + if _unsubscribe_session_switch is not None: + _unsubscribe_session_switch() + _unsubscribe_session_switch = None diff --git a/tests/test_concurrent_sessions.py b/tests/test_concurrent_sessions.py new file mode 100644 index 00000000..830b7f2b --- /dev/null +++ b/tests/test_concurrent_sessions.py @@ -0,0 +1,161 @@ +"""#284 — concurrent-session PID registry (port of utils/concurrentSessions.ts). + +Covers registration, the bridge-ID publish/clear cycle that lets peers +dedup a session reachable over both UDS and bridge, live-session +counting with stale-file sweeping, and the strict ``.json`` +filename guard (TS issue #34210: a lenient prefix-parse swept unrelated +files as "stale"). +""" +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from src.utils import concurrent_sessions as cs + + +@pytest.fixture(autouse=True) +def _isolated_registry(tmp_path, monkeypatch): + import src.config as config_mod + + monkeypatch.setattr(config_mod, "GLOBAL_CONFIG_DIR", tmp_path / ".clawcodex") + cs.reset_for_testing() + yield + cs.reset_for_testing() + + +def _pid_file(tmp_path: Path) -> Path: + return tmp_path / ".clawcodex" / "sessions" / f"{os.getpid()}.json" + + +class TestRegisterSession: + def test_register_writes_pid_file(self, tmp_path): + assert cs.register_session() is True + data = json.loads(_pid_file(tmp_path).read_text()) + assert data["pid"] == os.getpid() + assert data["sessionId"] + assert data["cwd"] + assert data["kind"] == "interactive" + + def test_register_is_idempotent(self, tmp_path): + assert cs.register_session() is True + before = _pid_file(tmp_path).read_text() + assert cs.register_session() is True + assert _pid_file(tmp_path).read_text() == before + + def test_teammates_are_skipped(self, tmp_path, monkeypatch): + monkeypatch.setenv("CLAUDE_CODE_AGENT_ID", "a123") + assert cs.register_session() is False + assert not _pid_file(tmp_path).exists() + + def test_session_switch_updates_pid_file(self, tmp_path): + from src.bootstrap.state import _session_switched + + cs.register_session() + _session_switched.emit("switched-session-id") + data = json.loads(_pid_file(tmp_path).read_text()) + assert data["sessionId"] == "switched-session-id" + + +class TestBridgeIdPublish: + def test_publish_and_clear_round_trip(self, tmp_path): + cs.register_session() + cs.update_session_bridge_id("session_abc123") + data = json.loads(_pid_file(tmp_path).read_text()) + assert data["bridgeSessionId"] == "session_abc123" + + # Cleared on bridge teardown so a stale ID doesn't suppress a + # legitimately-remote session after reconnect. + cs.update_session_bridge_id(None) + data = json.loads(_pid_file(tmp_path).read_text()) + assert data["bridgeSessionId"] is None + + def test_update_without_registration_is_noop(self, tmp_path): + cs.update_session_bridge_id("session_abc123") # must not raise + assert not _pid_file(tmp_path).exists() + + def test_set_repl_bridge_handle_publishes_compat_id(self, tmp_path): + from src.bridge import repl_bridge_handle as rbh + + cs.register_session() + + class _Handle: + bridge_session_id = "cse_deadbeef" + + try: + rbh.set_repl_bridge_handle(_Handle()) + data = json.loads(_pid_file(tmp_path).read_text()) + assert data["bridgeSessionId"] == rbh.get_self_bridge_compat_id() + assert data["bridgeSessionId"].startswith("session_") + + rbh.set_repl_bridge_handle(None) + data = json.loads(_pid_file(tmp_path).read_text()) + assert data["bridgeSessionId"] is None + finally: + rbh.set_repl_bridge_handle(None) + + +class TestUpdateSessionName: + def test_name_round_trip(self, tmp_path): + cs.register_session() + cs.update_session_name("my-session") + data = json.loads(_pid_file(tmp_path).read_text()) + assert data["name"] == "my-session" + + def test_falsy_name_is_noop(self, tmp_path): + cs.register_session() + before = _pid_file(tmp_path).read_text() + cs.update_session_name(None) + cs.update_session_name("") + assert _pid_file(tmp_path).read_text() == before + + +class TestCountConcurrentSessions: + def test_counts_self(self): + cs.register_session() + assert cs.count_concurrent_sessions() == 1 + + def test_missing_dir_returns_zero(self): + assert cs.count_concurrent_sessions() == 0 + + def test_stale_pid_file_is_swept(self, tmp_path): + cs.register_session() + sessions = tmp_path / ".clawcodex" / "sessions" + # A PID that cannot exist (max pid is bounded well below this). + stale = sessions / "999999999.json" + stale.write_text("{}") + assert cs.count_concurrent_sessions() == 1 + assert not stale.exists() + + def test_live_peer_pid_is_counted(self, tmp_path): + cs.register_session() + sessions = tmp_path / ".clawcodex" / "sessions" + # Our parent (the test runner) is a genuinely live peer PID. + (sessions / f"{os.getppid()}.json").write_text("{}") + assert cs.count_concurrent_sessions() == 2 + + def test_pid_one_and_zero_are_never_counted_live(self, tmp_path): + # TS isProcessRunning: pid <= 1 -> false. Signal 0 to pid 0 hits + # our own process group (always succeeds) and init/launchd would + # otherwise count as a live session forever. + cs.register_session() + sessions = tmp_path / ".clawcodex" / "sessions" + (sessions / "1.json").write_text("{}") + (sessions / "0.json").write_text("{}") + assert cs.count_concurrent_sessions() == 1 + assert not (sessions / "1.json").exists() # swept as stale + assert not (sessions / "0.json").exists() + + def test_non_pid_filenames_are_never_swept(self, tmp_path): + # TS issue #34210: parseInt's prefix-parse swept + # "2026-03-14_notes.md" as PID 2026 — silent user data loss. + cs.register_session() + sessions = tmp_path / ".clawcodex" / "sessions" + bystander = sessions / "2026-03-14_notes.md" + bystander.write_text("important") + assert cs.count_concurrent_sessions() == 1 + assert bystander.exists() + assert bystander.read_text() == "important" diff --git a/tests/transports/test_remote_io.py b/tests/transports/test_remote_io.py index 5765457a..d08d240b 100644 --- a/tests/transports/test_remote_io.py +++ b/tests/transports/test_remote_io.py @@ -155,8 +155,11 @@ async def test_constructor_uses_get_transport_for_url(stub_transport): args, kwargs = stub_transport.factory_calls[0] # type: ignore[attr-defined] assert args == ("ws://example.com/x",) assert callable(kwargs.get("refresh_headers")) - # session_id is currently None (TODO marker in remote_io.py). - assert kwargs.get("session_id") is None + # #284: the bootstrap session ID identifies this peer to the + # transport layer (TS getSessionId() parity). + from src.bootstrap.state import get_session_id + + assert kwargs.get("session_id") == get_session_id() finally: io.close()