Skip to content

agents: entity re-runs scheduled action on every redeploy — unwired shared-state handle on wiring race (+ pull-wake wake_stream -1 replay) #4625

Description

@balegas

Summary

Two independent defects in @electric-ax/agents-runtime / @electric-ax/agents-server cause a long-lived entity to re-run a scheduled action on every server/runner redeploy. Together they produced duplicate daily-digest posts in electric-sql/open-factory on every deploy (a first idempotency fix there, open-factory#52, did not help because it reads the very collection that Defect B leaves unwired).

Observed on: runner @electric-ax/agents-runtime@0.4.0, server @electric-ax/agents-server@0.4.16 (deployed via stratovolt cloud). Both defects also verified present in the agents-runtime@0.4.1 / agents-server@0.5.1 source. File:line citations below are from the 0.4.0/0.4.1 source unless noted.


Defect B (primary) — ctx.observe(db(...)) can resolve with an unwired shared-state handle

When shared-state wiring throws a transient error, ctx.observe(db(...)) resolves successfully with a handle whose backing collection was never wired (__electricCollection === undefined), instead of rejecting. Reads then silently see the collection as empty, so any idempotency/dedup decision based on "is this row absent?" acts on stale-empty data.

Mechanism (packages/agents-runtime/src/process-wake.ts)

  1. The db-source branch of doObserve runs scheduleSharedStateWiring(); await waitForSharedStateWiring() (0.4.0:1805-1816 / 0.4.1:1824-1836).
  2. scheduleSharedStateWiring chains .then(wirePendingSharedStates).catch(err => failBackgroundWake(err, 'SHARED_STATE_WIRING_FAILED'))the rejection is swallowed (0.4.1:1886-1892). failBackgroundWake only sets liveProcessError; it does not interrupt the running handler.
  3. wirePendingSharedStates does await createSharedStateDb(...) then await ssHandle.wireDb(ssDb) (0.4.1:1872-1883). wireDb is what sets backingDb.
  4. createSharedStateDb calls await sharedDb.preload() with no try/catch (0.4.0:1513 / 0.4.1:1547) — unlike the entity/child observe path, which explicitly wraps preload to tolerate Stream not found|404 (0.4.0:1422 / 0.4.1:1446-1457, comment literally cites the daily-digest case).
  5. So a transient 'control' in undefined (durable-streams @durable-streams/state batch-processor race while a subscription is registering) or STREAM_NOT_FOUND (eager preload of a not-yet-ready stream) during a cold reconnect rejects preload → swallowed → wireDb never runs → backingDb stays null.
  6. waitForSharedStateWiring() awaits the already-.catch-ed promise → resolvesctx.observe(db(...)) returns a truthy, unwired handle.
  7. __electricCollection getter returns backingDb?.collections[name]undefined while backingDb is null (setup-context.ts, 0.4.0:411-412 + getter). Note the inconsistency: the public proxy methods (.toArray/.get/.insert) call assertConnectedSharedState which throws in this state (connect mode) (setup-context.ts 0.4.0:503-510), but raw __electricCollection access returns undefined silently.

Impact

Most likely on the first wake after a restart/redeploy, when stream connections are cold and the transient race window is open. Any consumer reading shared state to decide idempotency will treat "couldn't read" as "nothing there". In open-factory this re-dispatched the already-posted morning digest on every deploy:

// open-factory packages/factory/src/helpers/daily-digest.ts
const col = proxy?.__electricCollection
if (!col) return false   // unwired -> "no digest posted" -> re-dispatch

Client-side guards don't catch it: open-factory wraps observe in observeWithRaceGuard (which already knows 'control' in undefined / STREAM_NOT_FOUND are benign), but it only catches observe rejections — and the db-source path resolves here rather than rejecting.

Suggested fix

Make shared-state wiring fail-closed rather than resolve-unwired. Either:

  • (a) propagate the wiring failure so ctx.observe(db(...)) rejects on a wiring/preload error (don't swallow in scheduleSharedStateWiring's .catch for the path that waitForSharedStateWiring awaits) — then existing client guards (observeWithRaceGuard, if (!shared) return) handle it correctly; or
  • (b) wrap the shared-state preload() to tolerate the transient batch race and retry until wired (do not return a handle until backingDb is set).

⚠️ Note: simply treating the error as "empty stream" (mirroring the entity-path 404 swallow at 0.4.1:1446-1457) is not sufficient for shared-state in connect mode — an existing shared stream that is merely racing must not read as empty, or the dedup bug persists with a wired-but-empty collection. Fail-closed (reject/retry) is the correct semantics.


Defect A (secondary) — pull-wake consumer replays the whole wake_stream from -1 on cold restart

The runner's wake_stream read position is in-memory only and the server never returns a durable resume offset, so every cold restart re-reads the entire append-only wake_stream from the beginning, re-presenting historical wakes.

Mechanism

  • createPullWakeRunner initializes let currentOffset = config.offset ?? '-1' and resets it on every start(); there is no disk/DB persistence and no read-back at startup (agents-runtime dist/index.js createPullWakeRunner). -1 = read the wake_stream from the start.
  • Runner registration POST /_electric/runners does not return a resume offset: rowToRunner omits wake_stream_offset (agents-server src/entity-registry.ts). The runner's reported offset is stored only in runner_runtime_diagnostics and read back only by the /health endpoint (src/routing/runners-router.ts) — never used to seek/resume.
  • Consumers that try to use it get undefined and fall back to -1. (e.g. open-factory runtime.ts reads registered.wake_stream_offset and passes it as offset — but receives nothing.)

The per-subscription durable consume cursor (acked_offset) is correct and is not the problem; this is specifically the runner's wake_stream read position. (Cron observations themselves do not re-fire on re-observe — they're Postgres-scheduled ticks with no in-memory baseline — and firstWake is correctly false after restart; neither is the trigger.)

Impact

Every runner redeploy re-presents historical wake events → wake/claim churn and re-delivery of already-handled wakes to long-lived entities. Catch-up-style handlers (which act on any wake when a scheduled occurrence is newer than the last run) then re-run. This is what wakes the daily-digest on every deploy; Defect B then turns that wake into a duplicate post.

Suggested fix

Give the runner a durable resume position for the wake_stream. Options:

  • Return a durable resume offset from runner registration (add wake_stream_offset to rowToRunner / the createRunner response) so the host can seed config.offset (the plumbing already exists on the consumer side); or
  • Default a fresh-boot consumer to the wake_stream tail rather than -1; or
  • Server-side: trim/seek the per-runner wake_stream to the runner's last-acked offset.

Repro sketch

  1. Long-lived entity that observe(db(sharedId))s a shared stream and uses it for idempotency, plus a catch-up handler that acts on any wake when due (e.g. daily-digest).
  2. Post once (writes a row to the shared stream).
  3. Redeploy the runner. On the first post-restart wake, the shared-state preload hits the transient wiring race → observe(db) resolves unwired → idempotency read returns empty → action re-runs → duplicate.

Happy to provide the open-factory entity + the mitigation we're shipping there (a client-side "is the shared collection wired?" gate) as a concrete example.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions