Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 77 additions & 33 deletions src/common/notification-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,39 +218,7 @@ export class NotificationClient {
parsedData = event.data as string
}

const data = JSON.parse(parsedData)

if (data.type === "validation") {
this.logger.error(`Bad request: ${data.summary} - ${data.message} - ${data.found} - ${data.errors}`)
return
}

const parsed = JSON.parse(data.message) as {
pattern: string
data: NotificationEventData
}

this.logger.debug(`Received event: ${parsed.pattern}`)

this.observer.next({
pattern: parsed.pattern,
data: {
tenant: parsed.data.tenantId,
eventId: parsed.data.eventId,
dataCoreId: parsed.data.dataCore,
flowType: parsed.data.aggregator,
eventType: parsed.data.eventType,
validTime: parsed.data.validTime,
},
})

this.eventCount++

if (this.options.maxEvents && this.options.maxEvents <= this.eventCount) {
this.observer.complete()
this.eventCount = 0
this.webSocket.close(1000, "Max events received")
}
this.handleMessage(parsedData)
}

this.webSocket.onclose = (event) => {
Expand All @@ -271,6 +239,82 @@ export class NotificationClient {
}
}

/**
* Parses and dispatches a single raw notification frame.
*
* Runs inside the WebSocket `onmessage` callback, where any thrown error is
* uncaught and crashes the host process. Notification frames can occasionally
* be malformed (non-JSON, empty/absent `message`, truncated payloads, or
* non-event control frames), so every parse is guarded — a bad frame is logged
* and skipped instead of taking down the whole connection/process.
*/
private handleMessage(rawData: string): void {
let data: {
type?: string
message?: string
summary?: string
found?: unknown
errors?: unknown
}
try {
data = JSON.parse(rawData)
} catch (error) {
this.logger.warn(
`Discarding notification frame: outer payload is not valid JSON (${
error instanceof Error ? error.message : String(error)
})`,
)
return
}

if (data.type === "validation") {
this.logger.error(`Bad request: ${data.summary} - ${data.message} - ${data.found} - ${data.errors}`)
return
}

let parsed: { pattern: string; data: NotificationEventData }
try {
parsed = JSON.parse(data.message as string) as {
pattern: string
data: NotificationEventData
}
} catch (error) {
this.logger.warn(
`Discarding notification frame: 'message' is not valid JSON (type ${typeof data.message}): ${
error instanceof Error ? error.message : String(error)
}`,
)
return
}

if (!parsed || typeof parsed !== "object" || !parsed.data) {
this.logger.warn("Discarding notification frame: parsed payload is missing a 'data' field")
return
}

this.logger.debug(`Received event: ${parsed.pattern}`)

this.observer.next({
pattern: parsed.pattern,
data: {
tenant: parsed.data.tenantId,
eventId: parsed.data.eventId,
dataCoreId: parsed.data.dataCore,
flowType: parsed.data.aggregator,
eventType: parsed.data.eventType,
validTime: parsed.data.validTime,
},
})

this.eventCount++

if (this.options.maxEvents && this.options.maxEvents <= this.eventCount) {
this.observer.complete()
this.eventCount = 0
this.webSocket.close(1000, "Max events received")
}
}

/**
* Attempts to reconnect to the WebSocket server using exponential backoff
*/
Expand Down
94 changes: 94 additions & 0 deletions test/tests/common/notification-client.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import { assertEquals } from "@test/compat/assert"
import { describe, it } from "bun:test"
import { Subject } from "rxjs"
import { NotificationClient, type NotificationEvent } from "../../../src/common/notification-client.ts"
import type { Logger } from "../../../src/utils/logger.ts"

function recordingLogger(): Logger & { calls: Record<keyof Logger, string[]> } {
const calls: Record<keyof Logger, string[]> = { debug: [], info: [], warn: [], error: [] }
return {
calls,
debug: (m) => calls.debug.push(m),
info: (m) => calls.info.push(m),
warn: (m) => calls.warn.push(m),
error: (m) => calls.error.push(String(m)),
}
}

function makeClient() {
const observer = new Subject<NotificationEvent>()
const events: NotificationEvent[] = []
observer.subscribe((e) => events.push(e))
const logger = recordingLogger()
const client = new NotificationClient(
observer,
{ apiKey: "test-key" },
{ tenant: "t", dataCore: "dc" },
{ logger },
)
// handleMessage is the internal frame parser the WebSocket onmessage delegates to.
const handle = (raw: string) => (client as unknown as { handleMessage(raw: string): void }).handleMessage(raw)
return { handle, events, logger }
}

function validFrame(pattern = "event.stored.x"): string {
const inner = JSON.stringify({
pattern,
data: {
tenantId: "t",
eventId: "evt-1",
dataCore: "dc",
aggregator: "fishfacts-ais.0",
eventType: "ais.position.fix.observed.0",
validTime: "2026-01-01T00:00:00Z",
},
})
return JSON.stringify({ message: inner })
}

describe("NotificationClient.handleMessage", () => {
it("emits a NotificationEvent for a well-formed frame", () => {
const { handle, events } = makeClient()
handle(validFrame())
assertEquals(events.length, 1)
assertEquals(events[0].pattern, "event.stored.x")
assertEquals(events[0].data.eventId, "evt-1")
assertEquals(events[0].data.flowType, "fishfacts-ais.0")
})

// Regression: a single malformed frame must NOT throw. In production handleMessage
// runs inside the WebSocket onmessage callback, so an uncaught throw here crashes
// the whole process (observed repeatedly in prod via an unguarded JSON.parse).
it("does not throw or emit when data.message is not valid JSON", () => {
const { handle, events, logger } = makeClient()
handle(JSON.stringify({ message: "this-is-not-json{" }))
assertEquals(events.length, 0)
assertEquals(logger.calls.warn.length >= 1, true)
})

it("does not throw when message is empty or missing", () => {
const { handle, events } = makeClient()
handle(JSON.stringify({ message: "" }))
handle(JSON.stringify({ foo: "bar" }))
assertEquals(events.length, 0)
})

it("does not throw when the inner payload lacks a data field", () => {
const { handle, events } = makeClient()
handle(JSON.stringify({ message: JSON.stringify({ pattern: "p" }) }))
assertEquals(events.length, 0)
})

it("does not throw on a non-JSON outer frame", () => {
const { handle, events } = makeClient()
handle("not even json")
assertEquals(events.length, 0)
})

it("handles validation frames without emitting", () => {
const { handle, events, logger } = makeClient()
handle(JSON.stringify({ type: "validation", summary: "bad", message: "nope", found: "x", errors: [] }))
assertEquals(events.length, 0)
assertEquals(logger.calls.error.length >= 1, true)
})
})
Loading