-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathzoom_engine.py
More file actions
2234 lines (2029 loc) · 109 KB
/
zoom_engine.py
File metadata and controls
2234 lines (2029 loc) · 109 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
zoom_engine.py — Headless WAL poller for Zoom Notes.
Replaces zoom_menu_bar.py. No rumps, no UI — designed to run as a child process
of the Swift ZoomNotesApp. Emits newline-delimited JSON events to stdout and
accepts JSON commands on stdin (for future extensibility).
State machine: idle → active → generating → idle
JSON events emitted to stdout:
{"event": "state", "value": "idle"}
{"event": "state", "value": "active", "meeting_id": "<id>"}
{"event": "state", "value": "generating"}
{"event": "done", "title": "...", "path": "...", "transcript_path": "...", "attendees": [...], "meeting_id": "..."}
{"event": "recovery_available", "meeting_id": "...", "entry_count": N, "last_updated": "...", "slug_hint": "...", "location": "root|failed", "title": "...?", "failed_at": "...?", "last_error": "...?"}
{"event": "error", "message": "..."}
stdin commands accepted (one JSON object per line):
{"cmd": "generate"} — manual trigger
{"cmd": "reload"} — reload settings (also triggered by SIGHUP)
{"cmd": "retry", "meeting_id": "..."} — retry a meeting whose LLM call just failed
{"cmd": "recover", "meeting_id": "..."} — recover a meeting from a prior crash
"""
import concurrent.futures
import json
import re
import signal
import sys
import threading
import time
from datetime import datetime
from pathlib import Path
def _friendly_error(exc: Exception) -> str:
"""Convert a raw exception into a short, human-readable menu bar message."""
msg = str(exc)
# HTTP status codes from LLM providers
for code, label in [
("429", "LLM quota exceeded — try again later"),
("401", "LLM authentication failed — check your API key in Settings"),
("403", "LLM access denied — check your API key in Settings"),
("500", "LLM server error — try again later"),
("503", "LLM service unavailable — try again later"),
]:
if code in msg:
return label
# Truncate anything else to a reasonable length
first_line = msg.splitlines()[0] if msg else "Unknown error"
return first_line[:80] + ("…" if len(first_line) > 80 else "")
from zoom_config import (
get_config,
invalidate_config_cache,
get_api_key,
resolve_subfolder,
resolve_filename,
)
from zoom_notes import (
CancelledError,
find_origin_dir,
find_wal,
parse_transcript,
parse_meeting_title,
read_calendar_title,
format_transcript,
summarize,
generate_title,
build_note_content,
build_placeholder_note,
build_transcript_content,
save_transcript_only,
save_note_only,
overwrite_note,
slugify_title,
detect_active_meeting_id,
persist_accumulator,
load_persisted_accumulator,
load_failed_sidecar,
_persisted_paths,
delete_persisted_accumulator,
purge_stale_accumulators,
list_recoverable_meetings,
mark_meeting_failed,
_path_to_vault_link,
_atomic_write_text,
_CACHE_DIR,
_safe_meeting_id_slug,
)
# ── Event emission ────────────────────────────────────────────────────────────
_emit_lock = threading.Lock()
def emit(payload: dict) -> None:
"""Write a JSON event to stdout, thread-safe."""
with _emit_lock:
sys.stdout.write(json.dumps(payload, ensure_ascii=False) + "\n")
sys.stdout.flush()
# ── WAL watcher ───────────────────────────────────────────────────────────────
class EngineState:
IDLE = "idle"
ACTIVE = "active"
GENERATING = "generating"
# Hard ceiling for one note-generation pass (parse + LLM + write).
# urllib's per-call timeout is 180s and _http_retry sleeps 15+30+60 between
# attempts, so the worst-case for one summarize() call is ~13 minutes. We cap
# the whole pipeline at 5 minutes — anything longer is almost certainly stuck.
_GENERATE_TIMEOUT_SECS = 5 * 60
# Force a fresh on-disk snapshot every N ACTIVE ticks regardless of whether
# the in-memory accumulator changed. Belt-and-suspenders against a SQLite WAL
# checkpoint that resets the file without the parser yielding new entries —
# the in-RAM accumulator stays correct, but the on-disk copy could otherwise
# go stale right when a crash would leave it as the only surviving copy.
# At the default 5s poll interval, 6 ticks ≈ 30s.
_PERIODIC_PERSIST_TICKS = 6
# A WAL "shrink" is suspicious only when the new size is well below the
# previous size — normal frame rotation can drop a few bytes between ticks
# without truncating the log. Treat <50% of last_size as a truncate signal.
_TRUNCATE_RATIO = 0.5
class ZoomEngine:
def __init__(self):
self._state = EngineState.IDLE
self._state_lock = threading.Lock()
self._generating_lock = threading.Lock()
# All WAL-tracking variables share a single lock. Read and write only
# via the helpers below; never touch the underscored fields directly
# from outside the lock.
self._tracking_lock = threading.Lock()
self._last_mtime: float | None = None
self._last_size: int | None = None
self._last_active_ts: float | None = None
self._active_meeting_id: str | None = None
# WAL mtime captured when the current ACTIVE session first started
# (the IDLE→ACTIVE transition tick). Combined with `_active_meeting_id`
# it forms a session fingerprint that distinguishes recurring Zoom
# meetings (same meeting_id, different start time) from a checkpoint
# mutation of the same meeting we just generated for.
self._active_session_mtime: float | None = None
# Fingerprint of the last successfully-generated meeting session.
# Tuple of (meeting_id, session_start_mtime). Guards against:
# 1. Zoom's post-meeting WAL checkpoint replaying the same meeting.
# 2. False suppression of recurring meetings that reuse the same
# Zoom meeting_id — a new IDLE→ACTIVE produces a different
# session_start_mtime so the guard correctly lets it through.
self._last_generated_session: tuple[str, float] | None = None
# `(meeting_id, latest_ts_secs)` for the session we most recently
# generated notes for, where `latest_ts_secs` is the wall-clock
# seconds-since-midnight of the last transcript entry we wrote.
# Passed to `detect_active_meeting_id` on the next IDLE -> ACTIVE
# transition so the just-completed meeting can't trap detection
# while its data is still resident in the WAL (the 2026-04-30
# back-to-back-meeting overwrite scenario). Distinct from
# `_last_generated_session`: the fingerprint there guards against
# re-summarizing the same checkpoint, this guards against picking
# up the wrong meeting when a NEW one starts immediately after.
self._last_completed_boundary: tuple[str, int] | None = None
# Set by _generate_notes when it writes a placeholder note instead of
# a final note. Read by the worker so it knows to keep the persisted
# accumulator (for retry) rather than deleting it.
self._last_run_note_failed: bool = False
# Cooperative cancellation signal for in-flight note generation.
# Cleared at the start of every _trigger_generate / _trigger_retry
# and set when the outer 5-min wall-clock timeout fires (so the
# still-running summarize() thread can abort at its next checkpoint
# instead of zombie-running until the urllib per-call timeout).
self._cancel_event = threading.Event()
# Most recent failed-note metadata, used to drive a retry without
# asking the user to remember the meeting ID. Cleared on retry success.
self._last_failed_meeting: dict | None = None
# Accumulated transcript entries keyed by msg_id. Populated on every
# poll tick so a WAL checkpoint can't lose data already read.
self._accumulated: dict[str, dict] = {}
self._accumulated_lock = threading.Lock()
# Monotonic timestamp of when the WAL was last seen while ACTIVE.
# Set when the WAL disappears mid-meeting (Zoom checkpoint) so we can
# fire generation after idle_threshold even without the WAL present.
self._wal_gone_since: float | None = None
# Counter of ACTIVE ticks since the last accumulator persist call. Used
# to force a fresh disk snapshot every _PERIODIC_PERSIST_TICKS even when
# nothing changed in the parser output — guards against the gap where a
# SQLite WAL checkpoint truncates the file without producing parser
# output that flips `changed_in_acc` true.
self._ticks_since_persist: int = 0
# Streak counter for change-ticks while ACTIVE that yielded zero
# new accumulator entries. A growing streak strongly suggests a
# parser regression vs. a Zoom WAL format change — the WAL is
# being written (mtime/size moved) but our parser sees nothing.
# Emitted via diag at threshold so a degraded session is visible
# in logs without re-creating the issue.
self._empty_parse_streak: int = 0
# Monotonic timestamp of the last tick that added new entries to
# the accumulator. Used by the secondary idle trigger to detect
# meetings where Zoom's post-meeting WAL checkpoint writes keep
# the WAL "alive" (resetting the normal idle clock) even after
# the last speech was captured — causing the meeting to never
# trigger generation until the app is restarted.
self._last_acc_changed_wall_time: float | None = None
# Purge stale in-progress cache files left over from prior crashes.
# Order matters: purge first so the recovery scan that follows only
# surfaces snapshots inside the live retention window.
purge_stale_accumulators()
# Snapshot any in-progress accumulators that survived purge so run()
# can emit `recovery_available` events on startup. We capture this in
# __init__ rather than at run() time so a crash between those two
# phases doesn't lose the list. The Swift menu bar uses these events
# to surface a "Recover unfinished meeting" item — without this,
# cached transcripts from a prior crash are unreachable through the
# UI even though they're sitting on disk.
try:
self._recoverable_at_startup: list[dict] = list_recoverable_meetings()
except Exception:
self._recoverable_at_startup = []
# Config (reloaded on SIGHUP or "reload" command)
self._cfg_lock = threading.Lock()
self._reload_requested = False
# Cleared together with config so a settings change picks up new WAL
# prefixes / a Zoom install relocation on the next tick.
self._origin_invalidated = False
# Resolved WAL paths, keyed by (origin_str, kind). Populated lazily on
# first successful resolution and reused on every subsequent poll —
# the underlying IndexedDB folder structure is stable for the life of
# an engine session. Cleared whenever origin is invalidated (settings
# reload, Zoom relocation) so a config change picks up fresh paths.
# Only positive results are cached — `None` is intentionally NOT
# cached so we keep retrying when the user enables Notetaker / starts
# their first meeting after launching the app.
self._wal_cache: dict[tuple[str, str], Path] = {}
# One-shot guard for the "Zoom installed but no transcript WAL"
# setup error. Without this, a misconfigured Zoom would re-emit the
# error every 5-second poll and spam the user. Reset on origin
# invalidation so settings changes give us a fresh chance to retry.
self._setup_error_emitted = False
signal.signal(signal.SIGHUP, self._on_sighup)
signal.signal(signal.SIGTERM, self._on_sigterm)
def _on_sighup(self, signum, frame):
"""SIGHUP triggers a config reload on the next poll tick."""
self._reload_requested = True
def _on_sigterm(self, signum, frame):
"""SIGTERM: finish any in-flight generation before exiting.
Without this, app termination mid-generation kills the background
thread before it can call delete_persisted_accumulator(), leaving
an orphaned snapshot that surfaces as a spurious "recover unfinished
meeting" item on the next launch. Cancelling the LLM call and waiting
up to 10 seconds gives the worker a clean exit path.
"""
self._cancel_event.set()
acquired = self._generating_lock.acquire(timeout=10)
if acquired:
self._generating_lock.release()
sys.exit(0)
# ── Tracking helpers (always held under _tracking_lock) ─────────────────
def _read_tracking(self):
with self._tracking_lock:
return (
self._last_mtime,
self._last_size,
self._last_active_ts,
self._active_meeting_id,
)
def _read_session_mtime(self) -> float | None:
with self._tracking_lock:
return self._active_session_mtime
def _latest_acc_ts_secs_for(self, meeting_id: str) -> int | None:
"""Latest accumulator timestamp for `meeting_id`, in seconds-since-midnight.
Used to stamp `_last_completed_boundary` after generation. Returns
None if the accumulator has no entries for that meeting (we won't
block detection on a non-existent boundary).
"""
latest = -1
with self._accumulated_lock:
for e in self._accumulated.values():
if e.get("meeting_id") != meeting_id:
continue
ts = e.get("timestamp")
if not ts:
continue
try:
parts = [int(x) for x in ts.split(":")]
except ValueError:
continue
if len(parts) != 3:
continue
secs = parts[0] * 3600 + parts[1] * 60 + parts[2]
if secs > latest:
latest = secs
return latest if latest >= 0 else None
def _write_tracking(self, *, mtime=..., size=..., active_ts=..., meeting_id=..., session_mtime=...):
with self._tracking_lock:
if mtime is not ...:
self._last_mtime = mtime
if size is not ...:
self._last_size = size
if active_ts is not ...:
self._last_active_ts = active_ts
if meeting_id is not ...:
self._active_meeting_id = meeting_id
if session_mtime is not ...:
self._active_session_mtime = session_mtime
def _reset_tracking(self) -> None:
with self._tracking_lock:
self._last_mtime = None
self._last_size = None
self._last_active_ts = None
self._active_meeting_id = None
self._active_session_mtime = None
# Reset the periodic-persist counter alongside tracking — its meaning
# is "ticks since last persist while ACTIVE", and a tracking reset
# always implies we're leaving ACTIVE. Same reasoning for the
# empty-parse streak and accumulator-change timestamp.
self._ticks_since_persist = 0
self._empty_parse_streak = 0
self._last_acc_changed_wall_time = None
# ── State helpers ───────────────────────────────────────────────────────
def _set_state(self, new_state: str, **extra) -> None:
with self._state_lock:
self._state = new_state
payload: dict = {"event": "state", "value": new_state}
payload.update(extra)
emit(payload)
def _get_state(self) -> str:
with self._state_lock:
return self._state
# ── Diagnostics ─────────────────────────────────────────────────────────
def _emit_diag(self, kind: str, **fields) -> None:
"""Emit a structured diagnostic event when diagnostics is enabled.
These show up in the engine's stdout JSON stream and are mirrored to
the Swift app's log file by EngineManager. Useful for post-mortem
investigation of "why was my meeting skipped" without code reading.
"""
try:
if not self._diagnostics_enabled():
return
except Exception:
return
payload = {"event": "diag", "kind": kind}
payload.update(fields)
emit(payload)
def _diagnostics_enabled(self) -> bool:
try:
cfg = self._get_cfg()
return bool(getattr(cfg, "diagnostics", False))
except Exception:
return False
# ── Config ─────────────────────────────────────────────────────────────
def _get_cfg(self):
with self._cfg_lock:
if self._reload_requested:
invalidate_config_cache()
self._reload_requested = False
self._origin_invalidated = True
return get_config()
def _consume_origin_invalidated(self) -> bool:
with self._cfg_lock:
was = self._origin_invalidated
self._origin_invalidated = False
if was:
# Drop cached WAL paths and re-arm the one-shot setup error so a
# settings change (or a Zoom reinstall mid-session) gets a fresh
# resolution and a fresh chance to surface a real diagnostic.
self._wal_cache.clear()
self._setup_error_emitted = False
return was
# ── WAL resolution (with cache and setup-error fallback) ────────────────
def _resolve_wal(self, origin, cfg, kind: str) -> Path | None:
"""Resolve the transcript or blocks WAL, caching positive results.
`kind` is `"transcript"` or `"blocks"`. Calls into `find_wal()` —
which tries the configured prefix first and falls back to scanning
WAL contents — and caches the resolved Path in memory. Subsequent
polls re-use the cached path until the file disappears or origin is
invalidated. Negative results (None) are NOT cached: that's the
common cold-start state for users who haven't yet run a meeting
with AI Notetaker, and we want every poll to keep trying until the
WAL appears.
"""
if origin is None:
return None
cache_key = (str(origin), kind)
cached = self._wal_cache.get(cache_key)
if cached is not None:
try:
if cached.exists() and cached.stat().st_size > 256:
return cached
except OSError:
pass
# Stale cache — Zoom rotated the WAL or the user reinstalled.
self._wal_cache.pop(cache_key, None)
prefix = (
cfg.transcript_db_prefix
if kind == "transcript"
else cfg.blocks_db_prefix
)
wal = find_wal(origin, prefix, kind=kind)
if wal is not None:
self._wal_cache[cache_key] = wal
via = "prefix" if wal.parent.name.startswith(prefix) else "content"
self._emit_diag(
"wal_resolved",
wal_kind=kind,
path=str(wal),
via=via,
)
return wal
def _maybe_emit_setup_error(self, origin) -> None:
"""Emit a one-shot UI error when origin is found but no transcript
WAL exists.
This is the failure mode that previously left the engine silently
stuck in IDLE forever: Zoom is installed (we have an origin), but
none of its IndexedDB stores contain transcript data — typically
because AI Companion / Notetaker is disabled or has never been
used on this account. Surfacing this in the UI tells the user
what to fix instead of leaving them staring at "Waiting for
meeting..." through an entire call.
"""
if origin is None or self._setup_error_emitted:
return
self._setup_error_emitted = True
emit({
"event": "error",
"message": (
"Couldn't find Zoom's transcript database. Make sure "
"AI Companion / Notetaker is enabled in your Zoom client "
"and has been used at least once on this account."
),
})
# ── Persistence helpers ─────────────────────────────────────────────────
def _persist_accumulator_now(self, meeting_id: str, reason: str) -> None:
"""Snapshot and write the in-memory accumulator to disk, atomically.
Resets the periodic-persist tick counter so a same-tick persist can't
be immediately followed by another forced one. `reason` is included in
the diag event for post-mortem analysis ("changed", "truncated",
"periodic"). Safe to call even with an empty accumulator — it just
writes an empty snapshot, which is a valid resume state.
"""
if not meeting_id:
return
with self._accumulated_lock:
snapshot = dict(self._accumulated)
try:
persist_accumulator(meeting_id, snapshot)
except Exception:
return
self._ticks_since_persist = 0
self._emit_diag(
"accumulator_persisted",
count=len(snapshot),
meeting_id=meeting_id,
reason=reason,
)
# ── Polling loop ────────────────────────────────────────────────────────
def run(self) -> None:
# Emit a one-shot readiness event with Zoom-detection status before
# the regular state stream, so the UI can show a setup error if
# Zoom isn't installed / WAL paths don't resolve.
initial_origin = find_origin_dir()
emit({
"event": "ready",
"zoom_installed": initial_origin is not None,
"wal_path": str(initial_origin) if initial_origin else None,
})
# Surface any in-progress accumulators left over from a prior crash.
# Emitted between `ready` and the first `state` event so the UI can
# render a "Recover unfinished meeting" menu before the engine starts
# actively polling. The UI is responsible for suppressing recovery
# entries whose meeting_id later matches an `active` state event —
# those will be auto-resumed via the existing IDLE→ACTIVE seed path.
for rec in self._recoverable_at_startup:
evt = {
"event": "recovery_available",
"meeting_id": rec["meeting_id"],
"entry_count": rec["entry_count"],
"last_updated": rec["last_updated"],
"slug_hint": rec["slug_hint"],
"location": rec.get("location", "root"),
}
# Confirmed-failed snapshots (in `failed/`) carry sidecar metadata
# — pass through what we have so the menu bar can render a richer
# label like "Sales Sync — failed 3 days ago" instead of the
# speaker-name slug hint.
if rec.get("title"):
evt["title"] = rec["title"]
if rec.get("failed_at"):
evt["failed_at"] = rec["failed_at"]
if rec.get("last_error"):
evt["last_error"] = rec["last_error"]
emit(evt)
emit({"event": "state", "value": EngineState.IDLE})
threading.Thread(target=self._stdin_reader, daemon=True).start()
origin = initial_origin
while True:
try:
cfg = self._get_cfg()
poll_interval = cfg.poll_interval_secs
idle_threshold = cfg.idle_threshold_secs
# Reset cached origin whenever config was reloaded OR we never
# found one — a re-poll is cheap (single directory iter) and
# lets us recover when Zoom is installed/relocated mid-session.
if origin is None or self._consume_origin_invalidated():
origin = find_origin_dir()
self._poll_once(origin, cfg, idle_threshold)
except Exception as exc:
emit({"event": "error", "message": f"Poll error: {exc}"})
time.sleep(poll_interval)
def _poll_once(self, origin, cfg, idle_threshold: int) -> None:
if origin is None:
return
wal = self._resolve_wal(origin, cfg, "transcript")
if not wal:
# Zoom is installed but we can't find a transcript-shaped WAL.
# Surface the diagnostic the first time we hit this so the user
# sees a real error instead of silently sitting in IDLE. We
# only emit when we've never been ACTIVE — once a meeting has
# been seen, a transient WAL-gone window is the existing
# "checkpoint mid-meeting" path below and shouldn't trigger a
# setup-error message.
if self._get_state() == EngineState.IDLE:
self._maybe_emit_setup_error(origin)
state = self._get_state()
if state == EngineState.ACTIVE:
with self._accumulated_lock:
has_entries = bool(self._accumulated)
if has_entries:
# WAL disappeared mid-meeting (Zoom checkpoint). Keep ACTIVE
# and fire generation once the idle threshold elapses.
if self._wal_gone_since is None:
self._wal_gone_since = time.monotonic()
idle_secs = time.monotonic() - self._wal_gone_since
if idle_secs >= idle_threshold:
self._wal_gone_since = None
self._trigger_generate(origin, cfg)
return
# No entries yet — WAL gone before we accumulated anything
self._wal_gone_since = None
self._set_state(EngineState.IDLE)
self._reset_tracking()
return
# WAL present — reset the gone timer
self._wal_gone_since = None
try:
stat = wal.stat()
except OSError:
return
mtime = stat.st_mtime
size = stat.st_size
now = time.time()
last_mtime, last_size, last_active_ts, _ = self._read_tracking()
# First poll after startup (or after _reset_tracking): we don't know
# whether the WAL is currently changing or just has stale state from a
# past meeting. Anchor to the observed values WITHOUT marking it as
# "changed" — otherwise a stale WAL from a meeting that ended hours
# ago would immediately flip us to ACTIVE on engine restart and
# re-trigger generation 90 seconds later.
if last_mtime is None or last_size is None:
self._write_tracking(mtime=mtime, size=size)
return
changed = (mtime != last_mtime) or (size != last_size)
# Truncate detection: the WAL shrunk well below its previous size,
# which on SQLite means a checkpoint truncated the journal. The new
# entries we already accumulated are still valid in RAM, but the
# parser will yield nothing new from this point until fresh writes
# land — so this is a critical moment to force a persist.
truncated = (
last_size is not None
and size < last_size
and size < last_size * _TRUNCATE_RATIO
)
self._write_tracking(mtime=mtime, size=size)
state = self._get_state()
# Force a fresh disk snapshot when the WAL just got truncated (a
# SQLite checkpoint cleared the journal) or when we've gone too many
# ACTIVE ticks without persisting. Run this BEFORE the change-handling
# branch so a tick that sees a truncate ALSO benefits from the regular
# accumulator-update logic below if there happens to be fresh data.
if state == EngineState.ACTIVE:
self._ticks_since_persist += 1
_, _, _, current_meeting_id = self._read_tracking()
if current_meeting_id and (
truncated or self._ticks_since_persist >= _PERIODIC_PERSIST_TICKS
):
if truncated:
self._emit_diag(
"wal_truncated",
meeting_id=current_meeting_id,
old_size=last_size, new_size=size,
)
self._persist_accumulator_now(
current_meeting_id,
reason="truncated" if truncated else "periodic",
)
if changed:
self._write_tracking(active_ts=now)
# On IDLE -> ACTIVE we ask detection to ignore the meeting we
# just finished generating notes for. Without this the still-
# resident WAL data from that meeting (huge entry count + still
# within the recency window) outscores the one or two entries
# the freshly-starting meeting has produced, and tracking locks
# onto the wrong id for the rest of the session — the 2026-04-30
# back-to-back-meeting failure mode.
#
# Boundary filter applies in TWO cases (the 2026-04-30 PM lesson):
# 1. The IDLE -> ACTIVE transition tick (original case).
# 2. While ACTIVE but with no tracked meeting_id yet — i.e. we
# went ACTIVE on a tick where Zoom hadn't written its
# `meetingId` token yet, so detection returned None. On
# subsequent ticks, while tracking is still empty, the
# boundary filter must STILL apply or the just-ended
# meeting (whose data is still resident in the WAL) gets
# promoted to active and contaminates the new session.
_, _, _, current_tracking_id = self._read_tracking()
tracking_is_empty = not current_tracking_id
apply_boundary = (
state == EngineState.IDLE
or (state == EngineState.ACTIVE and tracking_is_empty)
)
exclude_id = None
freshness_floor = None
if apply_boundary and self._last_completed_boundary is not None:
exclude_id, freshness_floor = self._last_completed_boundary
try:
meeting_id = detect_active_meeting_id(
wal,
exclude_meeting_id=exclude_id,
freshness_floor_secs=freshness_floor,
)
except Exception:
meeting_id = None
# Blocked-meeting guard: if the detected meeting_id is in the
# user's block list, refuse the transition entirely and stay IDLE.
# Normalize by stripping whitespace so copy-pasted IDs with a
# trailing space or newline still match.
if meeting_id:
blocked = [b.strip() for b in getattr(cfg, "blocked_meeting_ids", [])]
if meeting_id.strip() in blocked:
self._emit_diag(
"meeting_blocked",
meeting_id=meeting_id,
)
return
if state == EngineState.IDLE:
# Stamp the session fingerprint as we enter ACTIVE. `mtime`
# is the current WAL mtime; combined with meeting_id it lets
# the post-success dedupe distinguish a recurring meeting's
# new session from a checkpoint of the one we just finished.
self._write_tracking(meeting_id=meeting_id, session_mtime=mtime)
# Only clear the boundary anchor once we've successfully
# tracked the new meeting. If meeting_id is None here (Zoom
# hadn't written meetingId yet), keep the boundary in place
# so subsequent ACTIVE-with-empty-tracking ticks can still
# apply it.
if meeting_id:
self._last_completed_boundary = None
# Fresh ACTIVE period — restart the periodic-persist clock so
# we don't immediately force a snapshot on the very first
# change tick before the accumulator has any new content.
self._ticks_since_persist = 0
with self._accumulated_lock:
# Seed from any persisted snapshot so a mid-meeting engine
# restart doesn't lose entries collected before the restart.
# Validate every entry's meeting_id matches — guards against
# a mismatched snapshot ever poisoning the accumulator.
self._accumulated = {}
if meeting_id:
persisted = load_persisted_accumulator(meeting_id)
if persisted:
# Only seed from a snapshot written today — a prior-day
# snapshot is kept for manual recovery but must not
# auto-load into a fresh live session. Loading it would
# cause Case-B abandoned-meeting logic to fire with stale
# data and generate notes for the wrong (yesterday's)
# meeting instead of the one that just started.
slug = _safe_meeting_id_slug(meeting_id)
snap_path = _CACHE_DIR / f"in-progress-{slug}.json"
try:
snap_mtime = datetime.fromtimestamp(snap_path.stat().st_mtime)
age_hours = (datetime.now() - snap_mtime).total_seconds() / 3600
is_recent = age_hours < 4
except OSError:
is_recent = True # no stat → treat as safe to load
if is_recent:
self._accumulated = {
k: v for k, v in persisted.items()
if not v.get("meeting_id") or v.get("meeting_id") == meeting_id
}
else:
# Snapshot is stale (>4 h old) — a prior-day or
# earlier same-day occurrence of a recurring meeting.
# Delete it so the failed/ recovery item (if any) is
# also cleaned up, and start with an empty accumulator.
delete_persisted_accumulator(meeting_id)
acc_size = len(self._accumulated)
self._emit_diag(
"meeting_id_changed",
from_id=None, to_id=meeting_id or "",
reason="idle_to_active", seeded_from_snapshot=acc_size > 0,
)
self._set_state(
EngineState.ACTIVE,
meeting_id=meeting_id or "",
accumulator_size=acc_size,
)
elif state == EngineState.ACTIVE and meeting_id:
# Re-evaluate the active meeting ID on every tick. Two distinct
# cases to handle:
#
# (A) Upgrade-from-empty: tracking is currently empty (because
# IDLE -> ACTIVE happened before Zoom wrote `meetingId`),
# and detection has now produced a real id. Adopt it
# WITHOUT clearing the accumulator — the entries we
# captured during the empty-tracking window are real
# for this same meeting; their `meeting_id` field was
# just None at parse time. The generation-time filter
# keeps no-id entries alongside matching-id entries.
# This is the 2026-04-30 PM Brand Team Meeting fix.
#
# (B) Meeting actually changed: tracking has a real id but
# the WAL now scores a different one as best. A new
# meeting started while the old one's data was still
# in the WAL — switch and clear the accumulator so the
# two don't mix.
if tracking_is_empty:
# Case A: upgrade without clearing.
self._write_tracking(meeting_id=meeting_id, session_mtime=mtime)
self._last_completed_boundary = None
with self._accumulated_lock:
# Stamp the late-arriving meeting_id onto any entries
# that lacked one so they survive the strict filter
# at generation time even if anything later changes
# the inclusion rules.
for entry in self._accumulated.values():
if not entry.get("meeting_id"):
entry["meeting_id"] = meeting_id
acc_size = len(self._accumulated)
self._emit_diag(
"meeting_id_changed",
from_id="", to_id=meeting_id,
reason="upgrade_from_empty",
accumulator_preserved=True,
)
self._set_state(
EngineState.ACTIVE,
meeting_id=meeting_id,
accumulator_size=acc_size,
)
elif meeting_id != current_tracking_id:
# If the newly detected meeting_id is blocked, don't
# perform the Case B transition — stay on the current
# tracked meeting (or let idle detection handle it).
_blocked_ids = [b.strip() for b in getattr(cfg, "blocked_meeting_ids", [])]
if meeting_id.strip() in _blocked_ids:
self._emit_diag(
"meeting_blocked",
meeting_id=meeting_id,
context="case_b",
)
else:
# Case B: real meeting change. Two sub-scenarios:
#
# 1) BACK-TO-BACK MEETINGS: the previous meeting really
# happened, has a real accumulator with real
# conversation. Auto-generate its note now (the
# 2026-05-04 AEO GA fix) before clearing local
# state. Runs in a background thread so the new
# meeting's tracking proceeds without delay.
#
# 2) MISIDENTIFICATION: scoring just corrected itself
# after briefly tracking the wrong meeting (the
# original 2026-04-30 reason this branch existed).
# Accumulator is small / has no real speakers.
# Discard as before — generating a note from
# misidentified noise wastes an LLM call and
# produces a confusing artifact.
#
# `_abandoned_looks_real` is the gate between the two.
with self._accumulated_lock:
abandoned_snapshot = dict(self._accumulated)
# Gate on entries that *explicitly* belong to the
# abandoned meeting_id. The full snapshot may contain
# early utterances from the new meeting whose WAL pages
# haven't established a meeting_id context yet
# (meeting_id=None). Including them inflates the count
# and causes false-positives where a stale meeting +
# 3 new-meeting utterances triggers abandoned generation.
strict_snapshot = {
k: v for k, v in abandoned_snapshot.items()
if v.get("meeting_id") == current_tracking_id
}
if self._abandoned_looks_real(strict_snapshot):
self._trigger_abandoned_generation(
current_tracking_id, abandoned_snapshot,
)
# Note: do NOT delete_persisted_accumulator here —
# _trigger_abandoned_generation persists a fresh
# snapshot synchronously and cleans up after itself
# on success.
else:
try:
delete_persisted_accumulator(current_tracking_id)
except Exception:
pass
self._write_tracking(meeting_id=meeting_id, session_mtime=mtime)
with self._accumulated_lock:
self._accumulated = {}
self._emit_diag(
"meeting_id_changed",
from_id=current_tracking_id, to_id=meeting_id,
reason="active_reevaluation",
abandoned_snapshot_size=len(abandoned_snapshot),
strict_snapshot_size=len(strict_snapshot),
abandoned_auto_generated=self._abandoned_looks_real(strict_snapshot),
)
self._set_state(
EngineState.ACTIVE,
meeting_id=meeting_id,
accumulator_size=0,
)
# Snapshot new entries into the accumulator on every change tick.
# No meeting_id_filter here — we accumulate everything and filter
# at generation time. This prevents a stale meeting ID (detected
# at IDLE→ACTIVE when old data was still in the WAL) from causing
# new utterances to be silently dropped.
try:
_, _, _, current_meeting_id = self._read_tracking()
fresh = parse_transcript(wal)
changed_in_acc = False
with self._accumulated_lock:
_today_str = datetime.now().strftime("%Y-%m-%d")
for entry in fresh:
mid_key = entry["msg_id"]
if mid_key not in self._accumulated:
entry["_session_date"] = _today_str
self._accumulated[mid_key] = entry
changed_in_acc = True
else:
existing = self._accumulated[mid_key]
# Cross-meeting guard (the 2026-05-04 Quick Chat
# incident): if the fresh parse claims this msg_id
# belongs to a different meeting than the one we
# already have stamped on the accumulated entry,
# treat the fresh metadata as untrustworthy and
# skip the speaker/timestamp overwrites. The
# `meeting_id` field is already protected by the
# `not existing.get("meeting_id")` clause below;
# without this guard, a parse_transcript run that
# leaked metadata from an adjacent corrupted-
# boundary entry could rewrite the speaker and
# timestamp of an entry already stamped to the
# active session via Case A upgrade-from-empty.
fresh_mid = entry.get("meeting_id")
existing_mid = existing.get("meeting_id")
mid_conflict = bool(fresh_mid) and bool(existing_mid) \
and fresh_mid != existing_mid
if len(entry.get("text", "")) > len(existing.get("text", "")):
existing["text"] = entry["text"]
changed_in_acc = True
if not mid_conflict:
if entry.get("speaker") and entry["speaker"] != "Unknown" \
and existing.get("speaker") != entry["speaker"]:
existing["speaker"] = entry["speaker"]
changed_in_acc = True
if entry.get("timestamp") and existing.get("timestamp") != entry["timestamp"]:
existing["timestamp"] = entry["timestamp"]
changed_in_acc = True
if entry.get("meeting_id") and not existing.get("meeting_id"):
existing["meeting_id"] = entry["meeting_id"]
changed_in_acc = True
# Persist on any change (new entry, longer text, speaker fix,
# timestamp update). Speaker corrections and timestamp fills
# matter for retry just as much as new utterances.
if changed_in_acc and current_meeting_id:
self._persist_accumulator_now(current_meeting_id, reason="changed")
if changed_in_acc:
self._last_acc_changed_wall_time = time.monotonic()
# Empty-parse streak: WAL moved (mtime/size) but the parser
# found nothing new. Reset on real changes; emit a diag at
# threshold so a degraded session shows up in logs.
if changed_in_acc:
self._empty_parse_streak = 0
else:
self._empty_parse_streak += 1
if self._empty_parse_streak in (5, 20, 60):
self._emit_diag(
"empty_parse_streak",
count=self._empty_parse_streak,
meeting_id=current_meeting_id or "",
)
except Exception as exc:
# Previously a silent `pass`. Surface as a diag event so
# parser regressions are visible in logs without users
# having to recreate the issue.
self._emit_diag(
"parse_error",
error=str(exc)[:200],
meeting_id=(self._read_tracking()[3] or ""),
)
# Secondary idle trigger: fire generation when the accumulator
# has been frozen for a long time even though the WAL file is
# still being updated by Zoom checkpoint writes. This is the
# "Jose Ocando" failure mode — meeting ends, Zoom keeps the WAL
# alive overnight with periodic checkpoint writes, the normal
# idle clock never elapses because `changed` stays True.
# Threshold is max(3× idle_threshold, 5 min) so a brief
# mid-meeting Notetaker pause doesn't trigger prematurely.
if (
state == EngineState.ACTIVE
and self._last_acc_changed_wall_time is not None
):
acc_stale_secs = time.monotonic() - self._last_acc_changed_wall_time
acc_stale_threshold = max(idle_threshold * 3, 5 * 60)
if acc_stale_secs >= acc_stale_threshold:
_, _, _, _stale_mid = self._read_tracking()
if _stale_mid:
with self._accumulated_lock: