Skip to content

Commit 35af476

Browse files
committed
Merge branch 'claude/actor-context-hook'
2 parents fce160a + 525700c commit 35af476

3 files changed

Lines changed: 238 additions & 6 deletions

File tree

kiln/src/kiln/events.py

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ async def on_print_done(event: Event) -> None:
3636
from __future__ import annotations
3737

3838
import asyncio
39+
import contextvars
3940
import enum
4041
import logging
4142
import threading
@@ -49,6 +50,32 @@ async def on_print_done(event: Event) -> None:
4950
logger = logging.getLogger(__name__)
5051

5152

53+
# ---------------------------------------------------------------------------
54+
# Ambient actor context
55+
# ---------------------------------------------------------------------------
56+
#
57+
# Generic "who triggered this event" envelope. An orchestrator (typically a
58+
# REST dispatcher, but anything that wraps a unit of work) ``set``s this
59+
# before invoking code that may publish events. :class:`EventBus` attaches
60+
# its current value to every :class:`Event` whose ``actor`` field is not
61+
# already explicitly set, so downstream subscribers can attribute events to
62+
# their originating call.
63+
#
64+
# Deliberately opaque — Kiln itself has no schema for the dict. Conventions
65+
# used by consumers (e.g. ``caller_tier``, ``caller_id``, ``request_id``) are
66+
# defined by the consumer, not the bus.
67+
#
68+
# Background-thread propagation: long-running publishers that spawn threads
69+
# (see :class:`kiln.plugins.monitoring_tools._PrintWatcher`) must capture
70+
# the current context via :func:`contextvars.copy_context` and run the
71+
# thread body inside it, otherwise the ContextVar's value is lost at the
72+
# thread boundary. Synchronous publishers don't need to do anything —
73+
# the value is read at publish time on the same thread that set it.
74+
current_actor_context: contextvars.ContextVar[dict[str, Any] | None] = (
75+
contextvars.ContextVar("kiln_actor_context", default=None)
76+
)
77+
78+
5279
class EventType(enum.Enum):
5380
"""All event types emitted by the Kiln system."""
5481

@@ -169,15 +196,25 @@ class Event:
169196
data: dict[str, Any] = field(default_factory=dict)
170197
timestamp: float = field(default_factory=time.time)
171198
source: str = "" # e.g. "printer:voron-350" or "queue"
199+
# Ambient actor context — who triggered this event (caller tier,
200+
# tenant id, request correlation id, etc.). Populated automatically
201+
# by :class:`EventBus` from :data:`current_actor_context` at publish
202+
# time when not explicitly set. ``None`` means no actor metadata is
203+
# attached; an empty dict means "actor explicitly empty" (e.g. a
204+
# system-initiated event with no caller).
205+
actor: dict[str, Any] | None = None
172206

173207
def to_dict(self) -> dict[str, Any]:
174208
"""Return a JSON-serialisable dictionary."""
175-
return {
209+
out: dict[str, Any] = {
176210
"type": self.type.value,
177211
"data": self.data,
178212
"timestamp": self.timestamp,
179213
"source": self.source,
180214
}
215+
if self.actor is not None:
216+
out["actor"] = self.actor
217+
return out
181218

182219

183220
# Type aliases for event handlers.
@@ -287,10 +324,24 @@ def _resolve_event(
287324
data: dict[str, Any] | None,
288325
source: str,
289326
) -> Event:
290-
"""Normalise the flexible publish signature into an :class:`Event`."""
327+
"""Normalise the flexible publish signature into an :class:`Event`.
328+
329+
Attaches the ambient :data:`current_actor_context` to events that
330+
don't carry an ``actor`` of their own. Explicit
331+
``Event(actor=...)`` always wins over the ambient value.
332+
"""
291333
if isinstance(event_or_type, EventType):
292-
return Event(type=event_or_type, data=data or {}, source=source)
293-
return event_or_type
334+
event = Event(type=event_or_type, data=data or {}, source=source)
335+
else:
336+
event = event_or_type
337+
if event.actor is None:
338+
ambient = current_actor_context.get()
339+
if ambient is not None:
340+
# Defensive copy — the ambient dict can be mutated by its
341+
# owner after publish, and we want the event's actor to be
342+
# a stable snapshot at publish time.
343+
event.actor = dict(ambient)
344+
return event
294345

295346
def publish(
296347
self,

kiln/src/kiln/plugins/monitoring_tools.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from __future__ import annotations
1515

16+
import contextvars
1617
import logging
1718
import os
1819
import secrets
@@ -126,10 +127,22 @@ def __init__(
126127
# -- public API --------------------------------------------------------
127128

128129
def start(self) -> None:
129-
"""Start the background monitoring thread."""
130+
"""Start the background monitoring thread.
131+
132+
Captures the current Python context (incl. :data:`kiln.events.
133+
current_actor_context`) via :func:`contextvars.copy_context` and
134+
runs the watcher body inside it, so events published from the
135+
background thread carry the caller's ambient actor metadata
136+
(caller tier, tenant id, etc.) instead of falling off the
137+
thread boundary.
138+
"""
130139
self._start_time = time.time()
140+
# Snapshot the caller's context at start-time so the watcher
141+
# thread sees the same ContextVar values the caller saw. Python
142+
# ContextVars do not auto-propagate across thread boundaries.
143+
ctx = contextvars.copy_context()
131144
self._thread = threading.Thread(
132-
target=self._run,
145+
target=lambda: ctx.run(self._run),
133146
name=f"print-watcher-{self._watch_id}",
134147
daemon=True,
135148
)

kiln/tests/test_events.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,3 +463,171 @@ def publish_batch(event_type: EventType, count: int) -> None:
463463
started = [e for e in events if e.type == EventType.PRINT_STARTED]
464464
assert len(queued) == 50
465465
assert len(started) == 50
466+
467+
468+
# ---------------------------------------------------------------------------
469+
# Ambient actor context
470+
# ---------------------------------------------------------------------------
471+
472+
473+
class TestActorContext:
474+
"""Tests for ``current_actor_context`` and its propagation onto events.
475+
476+
The ContextVar is a generic "who triggered this event" envelope. An
477+
orchestrator (e.g. kiln-pro's REST dispatcher) sets it before invoking
478+
code that publishes events; :class:`EventBus` attaches its current
479+
value to every event whose ``actor`` is not already set. This test
480+
class pins the contract — bus-level behaviour only; cross-repo
481+
integration is tested kiln-pro-side.
482+
"""
483+
484+
def test_actor_defaults_to_none_on_event(self):
485+
"""The dataclass default — un-stamped Events have actor=None."""
486+
e = Event(type=EventType.JOB_QUEUED, data={"x": 1})
487+
assert e.actor is None
488+
489+
def test_context_var_default_is_none(self):
490+
from kiln.events import current_actor_context
491+
492+
assert current_actor_context.get() is None
493+
494+
def test_publish_without_context_leaves_actor_none(self):
495+
from kiln.events import current_actor_context
496+
497+
bus = EventBus()
498+
# ContextVar default is None — sanity that no test before us leaked.
499+
assert current_actor_context.get() is None
500+
bus.publish(EventType.JOB_QUEUED, {"job_id": "j1"})
501+
events = bus.recent_events()
502+
assert len(events) == 1
503+
assert events[0].actor is None
504+
505+
def test_publish_with_ambient_context_stamps_event(self):
506+
from kiln.events import current_actor_context
507+
508+
bus = EventBus()
509+
token = current_actor_context.set({"caller_tier": "pro", "caller_id": "t-7"})
510+
try:
511+
bus.publish(EventType.JOB_QUEUED, {"job_id": "j2"})
512+
finally:
513+
current_actor_context.reset(token)
514+
515+
events = bus.recent_events()
516+
assert len(events) == 1
517+
assert events[0].actor == {"caller_tier": "pro", "caller_id": "t-7"}
518+
519+
def test_explicit_event_actor_wins_over_ambient(self):
520+
"""Pre-built Event with actor set is NOT overwritten by the
521+
ambient ContextVar — explicit always beats ambient."""
522+
from kiln.events import current_actor_context
523+
524+
bus = EventBus()
525+
explicit = Event(
526+
type=EventType.JOB_QUEUED,
527+
data={"job_id": "j3"},
528+
actor={"caller_tier": "business"},
529+
)
530+
token = current_actor_context.set({"caller_tier": "free"})
531+
try:
532+
bus.publish(explicit)
533+
finally:
534+
current_actor_context.reset(token)
535+
536+
events = bus.recent_events()
537+
assert len(events) == 1
538+
assert events[0].actor == {"caller_tier": "business"}
539+
540+
def test_actor_snapshot_is_defensive_copy(self):
541+
"""Mutating the ambient dict AFTER publish must not mutate the
542+
already-stamped event's actor."""
543+
from kiln.events import current_actor_context
544+
545+
bus = EventBus()
546+
ambient = {"caller_tier": "pro"}
547+
token = current_actor_context.set(ambient)
548+
try:
549+
bus.publish(EventType.JOB_QUEUED, {"job_id": "j4"})
550+
finally:
551+
current_actor_context.reset(token)
552+
553+
# Mutate the live dict — the event's actor must not reflect this.
554+
ambient["caller_tier"] = "free"
555+
ambient["leaked"] = True
556+
events = bus.recent_events()
557+
assert events[0].actor == {"caller_tier": "pro"}
558+
559+
def test_to_dict_includes_actor_when_set(self):
560+
e = Event(
561+
type=EventType.JOB_QUEUED,
562+
data={"job_id": "j5"},
563+
actor={"caller_tier": "pro"},
564+
)
565+
d = e.to_dict()
566+
assert d["actor"] == {"caller_tier": "pro"}
567+
568+
def test_to_dict_omits_actor_when_none(self):
569+
e = Event(type=EventType.JOB_QUEUED, data={"job_id": "j6"})
570+
d = e.to_dict()
571+
assert "actor" not in d
572+
573+
def test_context_propagates_to_background_thread_via_copy_context(self):
574+
"""The pattern :class:`_PrintWatcher.start` uses: capture
575+
:func:`contextvars.copy_context` in the calling thread, run the
576+
target inside ``ctx.run(...)`` from the background thread. This
577+
test pins that the value actually survives the thread boundary —
578+
without ``copy_context``, the ContextVar would default to None
579+
in the child thread and publish-time stamping would silently
580+
drop the actor.
581+
"""
582+
import contextvars
583+
584+
from kiln.events import current_actor_context
585+
586+
bus = EventBus()
587+
token = current_actor_context.set({"caller_tier": "pro", "caller_id": "t-9"})
588+
try:
589+
ctx = contextvars.copy_context()
590+
finally:
591+
current_actor_context.reset(token)
592+
593+
# ContextVar is back to default in the parent thread now.
594+
assert current_actor_context.get() is None
595+
596+
def _publish_from_thread():
597+
# Inside ctx.run, the ContextVar's value is the one snapshotted above.
598+
bus.publish(EventType.JOB_QUEUED, {"job_id": "j7"})
599+
600+
t = threading.Thread(target=lambda: ctx.run(_publish_from_thread))
601+
t.start()
602+
t.join(timeout=2)
603+
604+
events = bus.recent_events()
605+
assert len(events) == 1
606+
assert events[0].actor == {"caller_tier": "pro", "caller_id": "t-9"}
607+
608+
def test_naked_thread_without_copy_context_loses_actor(self):
609+
"""Sanity check / regression guard: a thread that does NOT use
610+
``copy_context`` does NOT see the parent thread's ContextVar
611+
value. If this ever fails (e.g. Python changes ContextVar
612+
semantics to auto-propagate), the ``copy_context`` ceremony in
613+
_PrintWatcher.start becomes unnecessary and the comment there
614+
can be simplified."""
615+
from kiln.events import current_actor_context
616+
617+
bus = EventBus()
618+
token = current_actor_context.set({"caller_tier": "pro"})
619+
try:
620+
def _publish_from_naked_thread():
621+
# No ctx.run — ContextVar should be at its default here.
622+
bus.publish(EventType.JOB_QUEUED, {"job_id": "j8"})
623+
624+
t = threading.Thread(target=_publish_from_naked_thread)
625+
t.start()
626+
t.join(timeout=2)
627+
finally:
628+
current_actor_context.reset(token)
629+
630+
events = bus.recent_events()
631+
assert len(events) == 1
632+
# Naked thread → ContextVar default (None) → event.actor stays None.
633+
assert events[0].actor is None

0 commit comments

Comments
 (0)