Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 78 additions & 3 deletions apps/backend/services/cdc/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,98 @@
*
* Wire format and `onCdcEvent` API are unchanged — both still come from
* `@wxyc/database` and remain the cross-consumer contract.
*
* BS#1120 fallback-channel sinks: `cdc_oversized` and `cdc_error` notifications
* (emitted by migration 0094 when the primary `cdc` payload would have been
* dropped) are wired here to `Sentry.captureMessage` so AC #3's "emit a metric
* Sentry can alert on" is satisfied. Subscribing only — the cdc-listener owns
* the LISTEN; this module owns the Sentry signal.
*/

import * as Sentry from '@sentry/node';
import { onCdcErrorEvent, onCdcOversizedEvent, startCdcListener, stopCdcListener } from '@wxyc/database';

/**
* Stable Sentry fingerprints for the BS#1120 fallback channels. Each channel
* gets a single issue group so alert thresholds count notifications, not
* per-table churn. (The `table` is on `tags` for breakdown queries.)
*/
const OVERSIZED_FINGERPRINT = ['cdc-oversized-payload'];
const ERROR_FINGERPRINT = ['cdc-trigger-exception'];

let fallbackSinksRegistered = false;

/**
* Wires the BS#1120 fallback channels to Sentry. Idempotent: a second call is
* a no-op so a stray `startCdcDispatcher()` (e.g. dev hot-reload) doesn't
* stack duplicate captures. Exported so tests can drive the wiring without
* coupling to module-init order. `__resetCdcFallbackSinksForTests` lets the
* test harness drop the latch between cases.
*/
export function registerCdcFallbackSinks(): void {
if (fallbackSinksRegistered) return;
fallbackSinksRegistered = true;

import { startCdcListener, stopCdcListener } from '@wxyc/database';
onCdcOversizedEvent((event) => {
Sentry.captureMessage('cdc.oversized_payload', {
level: 'warning',
tags: {
subsystem: 'cdc',
table: event.table,
action: event.action,
reason: event.reason,
},
extra: {
schema: event.schema,
primary_key: event.primary_key,
payload_bytes: event.payload_bytes,
timestamp: event.timestamp,
},
fingerprint: OVERSIZED_FINGERPRINT,
});
});

onCdcErrorEvent((event) => {
Sentry.captureMessage('cdc.trigger_exception', {
level: 'error',
tags: {
subsystem: 'cdc',
table: event.table,
action: event.action,
reason: event.reason,
sqlstate: event.sqlstate,
},
extra: {
schema: event.schema,
sqlerrm: event.sqlerrm,
timestamp: event.timestamp,
},
fingerprint: ERROR_FINGERPRINT,
});
});
}

/**
* Starts the per-process CDC LISTEN connection. Idempotent at the listener
* layer (`startCdcListener` warns and returns on a second call). Call once
* at startup, before any consumer that registers via `onCdcEvent`.
* layer (`startCdcListener` warns and returns on a second call) and at the
* fallback-sink layer (`registerCdcFallbackSinks` no-ops on the second call
* via its module-level latch). Call once at startup, before any consumer that
* registers via `onCdcEvent`.
*/
export async function startCdcDispatcher(): Promise<void> {
registerCdcFallbackSinks();
await startCdcListener();
}

/**
* Stops the per-process CDC LISTEN connection and clears registered
* callbacks. Safe to call unconditionally during shutdown.
*
* Drops the fallback-sink latch so a subsequent `startCdcDispatcher()` (test
* harness, future hot-reload) re-wires the captures against the freshly
* cleared `@wxyc/database` callback arrays.
*/
export async function shutdownCdcDispatcher(): Promise<void> {
await stopCdcListener();
fallbackSinksRegistered = false;
}
47 changes: 47 additions & 0 deletions apps/enrichment-worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import {
closeDatabaseConnection,
enableLivenessProbe,
onCdcConnectionStateChange,
onCdcErrorEvent,
onCdcEvent,
onCdcOversizedEvent,
startCdcListener,
stopCdcListener,
} from '@wxyc/database';
Expand Down Expand Up @@ -214,6 +216,51 @@ const main = async (): Promise<void> => {
});

onCdcEvent(makeEnrichmentHandler());

// BS#1120: wire the migration-0094 fallback channels to Sentry so a dropped
// primary `cdc` payload (oversized row) or an unexpected trigger exception
// produces a metric the alert can fire on. Both the worker and the
// backend's CDC dispatcher subscribe; they're independent LISTEN
// connections, so each process needs its own sink.
onCdcOversizedEvent((event) => {
Sentry.captureMessage('cdc.oversized_payload', {
level: 'warning',
tags: {
subsystem: 'cdc',
consumer: 'enrichment-worker',
table: event.table,
action: event.action,
reason: event.reason,
},
extra: {
schema: event.schema,
primary_key: event.primary_key,
payload_bytes: event.payload_bytes,
timestamp: event.timestamp,
},
fingerprint: ['cdc-oversized-payload'],
});
});
onCdcErrorEvent((event) => {
Sentry.captureMessage('cdc.trigger_exception', {
level: 'error',
tags: {
subsystem: 'cdc',
consumer: 'enrichment-worker',
table: event.table,
action: event.action,
reason: event.reason,
sqlstate: event.sqlstate,
},
extra: {
schema: event.schema,
sqlerrm: event.sqlerrm,
timestamp: event.timestamp,
},
fingerprint: ['cdc-trigger-exception'],
});
});

await startCdcListener();
// Belt-and-suspenders: the onlisten hook should have already flipped this
// to true; re-assert in case a future cdc-listener change skips dispatch.
Expand Down
125 changes: 124 additions & 1 deletion shared/database/src/cdc-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@
* worker's /healthcheck within ~one probe-cycle + echo-timeout. Postgres-js's
* `onlisten` callback (third arg of `listen()`) also dispatches connected=true
* on the initial subscribe and on every auto-reconnect.
*
* Oversized + error visibility (BS#1120): migration 0094 routes payloads that
* exceed Postgres's 8000-byte `pg_notify` cap to a `cdc_oversized` channel and
* unexpected trigger exceptions to a `cdc_error` channel. Both have distinct
* payload shapes from the main `cdc` channel — see `CdcOversizedEvent` and
* `CdcErrorEvent` — and consumers register via `onCdcOversizedEvent` /
* `onCdcErrorEvent`. Wiring a Sentry sink in the process entry point (the
* dispatcher in the backend, `worker.ts` in the enrichment worker) gives the
* BS#1120 AC #3 metric the alert hook can drive off.
*/

import postgres from 'postgres';
Expand All @@ -22,15 +31,58 @@ export interface CdcEvent {
timestamp: number;
}

/**
* Payload shape emitted by `pg_notify('cdc_oversized', ...)` in migration 0094
* when the would-be `cdc` payload exceeds the 7800-byte safety threshold.
*
* The originating mutation still committed — only the live notification was
* dropped. Consumers that need the row's new state must refetch it (by
* `primary_key` when present, otherwise by a source-of-truth scan).
*/
export interface CdcOversizedEvent {
table: string;
schema: string;
action: 'INSERT' | 'UPDATE' | 'DELETE';
/** `data->>'id'` from the row, when the table has an `id` column. Null otherwise. */
primary_key: string | null;
/** `octet_length(payload::text)` of the would-be `cdc` payload, in bytes. */
payload_bytes: number;
timestamp: number;
reason: 'payload_too_large';
}

/**
* Payload shape emitted by `pg_notify('cdc_error', ...)` in migration 0094 when
* the trigger body raised an unexpected exception. Paired with a `RAISE
* WARNING` so PG logs still record the failure for forensics.
*/
export interface CdcErrorEvent {
table: string;
schema: string;
action: 'INSERT' | 'UPDATE' | 'DELETE';
/** SQLSTATE of the underlying PL/pgSQL exception. */
sqlstate: string;
/** SQLERRM of the underlying PL/pgSQL exception. */
sqlerrm: string;
timestamp: number;
reason: 'trigger_exception';
}

export type CdcEventCallback = (event: CdcEvent) => void;
export type CdcConnectionStateCallback = (connected: boolean) => void;
export type CdcOversizedEventCallback = (event: CdcOversizedEvent) => void;
export type CdcErrorEventCallback = (event: CdcErrorEvent) => void;

const CDC_CHANNEL = 'cdc';
const HEALTH_CHANNEL = 'cdc_health';
const CDC_OVERSIZED_CHANNEL = 'cdc_oversized';
const CDC_ERROR_CHANNEL = 'cdc_error';

let listenConnection: ReturnType<typeof postgres> | null = null;
let callbacks: CdcEventCallback[] = [];
let stateCallbacks: CdcConnectionStateCallback[] = [];
let oversizedCallbacks: CdcOversizedEventCallback[] = [];
let errorCallbacks: CdcErrorEventCallback[] = [];

let livenessTimer: ReturnType<typeof setInterval> | null = null;
let outstandingProbeToken: string | null = null;
Expand All @@ -45,6 +97,40 @@ export function onCdcEvent(callback: CdcEventCallback): void {
callbacks.push(callback);
}

/**
* Registers a callback to receive `cdc_oversized` events (BS#1120).
*
* Fired when a row's would-be `cdc` payload would exceed Postgres's 8000-byte
* `pg_notify` cap (migration 0094 cuts over at 7800 bytes to leave wire
* headroom). The originating mutation already committed; the live notification
* was dropped. Sinks are typically:
*
* - Sentry signal so an alert can fire on AC #3 (see `dispatcher.ts`,
* `worker.ts` for the wiring).
* - A refetch path keyed off `primary_key` (when non-null) for downstream
* consumers (SSE, enrichment, reconciliation) that need the row state.
*
* Multiple callbacks can be registered; all are invoked for each event.
*/
export function onCdcOversizedEvent(callback: CdcOversizedEventCallback): void {
oversizedCallbacks.push(callback);
}

/**
* Registers a callback to receive `cdc_error` events (BS#1120).
*
* Fired when the `cdc_notify()` trigger body raised an unexpected exception
* (anything other than the oversized branch, which has its own channel). The
* trigger also emits `RAISE WARNING` so PG logs still record it for forensics;
* this callback is the listener-side visibility path so the failure isn't
* confined to PG logs the application servers don't tail.
*
* Multiple callbacks can be registered; all are invoked for each event.
*/
export function onCdcErrorEvent(callback: CdcErrorEventCallback): void {
errorCallbacks.push(callback);
}

/**
* Registers a callback fired on CDC connection-state transitions.
*
Expand Down Expand Up @@ -118,7 +204,42 @@ export async function startCdcListener(): Promise<void> {
}
);

console.log('[cdc-listener] Listening on channel:', CDC_CHANNEL);
// BS#1120: fallback channels carry oversized/error notifications when the
// primary `cdc` payload would have been dropped. Subscribed alongside `cdc`
// so a single LISTEN connection covers all three. State callback intentionally
// omitted — the `cdc` re-LISTEN above already covers reconnect signaling,
// and these channels reuse the same socket.
await listenConnection.listen(CDC_OVERSIZED_CHANNEL, (payload: string) => {
try {
const event = JSON.parse(payload) as CdcOversizedEvent;
for (const cb of oversizedCallbacks) {
try {
cb(event);
} catch (err) {
console.error('[cdc-listener] Oversized callback error:', err);
}
}
} catch (err) {
console.error('[cdc-listener] Failed to parse cdc_oversized payload:', err);
}
});

await listenConnection.listen(CDC_ERROR_CHANNEL, (payload: string) => {
try {
const event = JSON.parse(payload) as CdcErrorEvent;
for (const cb of errorCallbacks) {
try {
cb(event);
} catch (err) {
console.error('[cdc-listener] Error callback error:', err);
}
}
} catch (err) {
console.error('[cdc-listener] Failed to parse cdc_error payload:', err);
}
});

console.log('[cdc-listener] Listening on channels:', CDC_CHANNEL, CDC_OVERSIZED_CHANNEL, CDC_ERROR_CHANNEL);
}

export interface LivenessProbeOptions {
Expand Down Expand Up @@ -227,6 +348,8 @@ export async function stopCdcListener(): Promise<void> {
listenConnection = null;
callbacks = [];
stateCallbacks = [];
oversizedCallbacks = [];
errorCallbacks = [];
console.log('[cdc-listener] Stopped');
}
}
Loading
Loading