From 07edd9ad76d137ca9719a40cf49fa9d91b9c3fc1 Mon Sep 17 00:00:00 2001 From: Juha Mustonen Date: Tue, 29 Jul 2025 15:35:24 +0300 Subject: [PATCH 01/14] Initial adapters for PGlite --- packages/digest-pglite/README.md | 11 + packages/digest-pglite/package.json | 19 + .../PGliteDigestPersistenceAdapter.test.ts | 20 + .../src/PGliteDigestPersistenceAdapter.ts | 430 ++++++++++++ packages/digest-pglite/src/PGliteDigestRow.ts | 11 + packages/digest-pglite/src/index.ts | 2 + .../src/migrations/001-initial.sql | 21 + packages/digest-pglite/tsconfig.json | 9 + packages/journal-pglite/README.md | 11 + packages/journal-pglite/package.json | 19 + .../journal-pglite/src/PGliteFullStateRow.ts | 19 + .../src/PGliteJournalPersistenceAdapter.ts | 618 ++++++++++++++++++ .../journal-pglite/src/PGliteJournalRow.ts | 15 + packages/journal-pglite/src/index.ts | 3 + .../src/migrations/001-initial.sql | 67 ++ 15 files changed, 1275 insertions(+) create mode 100644 packages/digest-pglite/README.md create mode 100644 packages/digest-pglite/package.json create mode 100644 packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts create mode 100644 packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts create mode 100644 packages/digest-pglite/src/PGliteDigestRow.ts create mode 100644 packages/digest-pglite/src/index.ts create mode 100644 packages/digest-pglite/src/migrations/001-initial.sql create mode 100644 packages/digest-pglite/tsconfig.json create mode 100644 packages/journal-pglite/README.md create mode 100644 packages/journal-pglite/package.json create mode 100644 packages/journal-pglite/src/PGliteFullStateRow.ts create mode 100644 packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts create mode 100644 packages/journal-pglite/src/PGliteJournalRow.ts create mode 100644 packages/journal-pglite/src/index.ts create mode 100644 packages/journal-pglite/src/migrations/001-initial.sql diff --git a/packages/digest-pglite/README.md b/packages/digest-pglite/README.md new file mode 100644 index 00000000..0b6640c2 --- /dev/null +++ b/packages/digest-pglite/README.md @@ -0,0 +1,11 @@ +# `digest-pglite` + +> TODO: description + +## Usage + +``` +const digestPglite = require('digest-pglite'); + +// TODO: DEMONSTRATE API +``` diff --git a/packages/digest-pglite/package.json b/packages/digest-pglite/package.json new file mode 100644 index 00000000..a3f032d5 --- /dev/null +++ b/packages/digest-pglite/package.json @@ -0,0 +1,19 @@ +{ + "name": "digest-pglite", + "version": "0.0.1", + "description": "Digest package with PGlite backend", + "author": "Juha Mustonen ", + "homepage": "", + "license": "MIT", + "main": "lib/digest-pglite.js", + "directories": { + "lib": "lib", + "test": "__tests__" + }, + "files": [ + "lib" + ], + "scripts": { + "test": "node ./__tests__/digest-pglite.test.js" + } +} diff --git a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts new file mode 100644 index 00000000..c7917457 --- /dev/null +++ b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts @@ -0,0 +1,20 @@ +import { PGliteDigestPersistenceAdapter } from './PGliteDigestPersistenceAdapter'; + +describe('PGliteDigestPersistenceAdapter', () => { + it('should be defined', () => { + expect(PGliteDigestPersistenceAdapter).toBeDefined(); + }); + + it('should be able to connect to a database', async () => { + const adapter = await PGliteDigestPersistenceAdapter.connect(); + expect(adapter).toBeDefined(); + }); + + it('should run migrations', async () => { + const adapter = await PGliteDigestPersistenceAdapter.connect(); + expect(adapter).toBeDefined(); + + const result = await adapter.queryDigests(); + expect(result).toBeDefined(); + }); +}); diff --git a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts new file mode 100644 index 00000000..06059d14 --- /dev/null +++ b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts @@ -0,0 +1,430 @@ +import path from 'path'; +import { ChartReference } from '@samihult/xjog-util'; +import { PGlite, PGliteOptions } from '@electric-sql/pglite'; +import migrationRunner from 'node-pg-migrate'; +import createSubscriber from 'pg-listen'; + +import { + DigestPersistenceAdapter, + DigestEntry, + DigestEntries, + DigestQuery, + Expression, + ChartReferenceWithTimestamp, +} from '@samihult/xjog-digest-persistence'; + +import { PGliteDigestRow } from './PGliteDigestRow'; + +/** + * Use the static method `connect` to instantiate. + * @hideconstructor + */ +export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { + public readonly component = 'digest/persistence'; + public readonly type = 'pg'; + + private readonly stopObservingNewDigestEntries: Promise<() => Promise>; + + public constructor( + private readonly listenerConfig: PGliteOptions, + private readonly pool: PGlite, + ) { + super(); + + this.stopObservingNewDigestEntries = this.startObservingNewDigestEntries(); + } + + /** + * Create a connection to a [PostgreSql](https://www.postgresql.org/) database + * and resolve to a JournalPersistenceAdapter that can be passed to the XJog + * constructor. + */ + static async connect( + poolConfiguration: PGliteOptions = {}, + ): Promise { + const pool = await PGlite.create(poolConfiguration); + const adapter = new PGliteDigestPersistenceAdapter(poolConfiguration, pool); + + try { + await migrationRunner({ + dbClient: pool as any, + migrationsTable: 'migrations_digest', + dir: path.join(__dirname, './migrations'), + direction: 'up', + log: (message) => adapter.trace({ in: 'connect', message }), + // https://github.com/salsita/node-pg-migrate/issues/821 + checkOrder: false, + noLock: true, + }); + } finally { + await pool.close(); + } + + return adapter; + } + + public async disconnect(): Promise { + await ( + await this.stopObservingNewDigestEntries + )?.(); + await this.pool.close(); + } + + protected async upsertDigest( + ref: ChartReference, + key: string, + value: string, + ): Promise { + const result = await this.pool.query( + 'INSERT INTO "digests" ' + + '( ' + + ' "machineId", "chartId", "key", "value" ' + + ') VALUES ( ' + + ' $1, $2, $3, $4' + + ') ON CONFLICT ( ' + + ' "machineId", "chartId", "key"' + + ') DO UPDATE SET ' + + ' value = :value, timestamp = transaction_timestamp() ', + [ref.machineId, ref.chartId, key, value], + ); + + return result.affectedRows ?? 0; + } + + protected async emitDigestEntryNotification( + ref: ChartReference, + ): Promise { + const payload = JSON.stringify(ref); + + await this.pool.query("SELECT pg_notify('new-digest-entry', $1::text)", [ + payload, + ]); + } + + public async deleteDigest(ref: ChartReference, key: string): Promise { + const result = await this.pool.query( + 'DELETE FROM "digests" ' + + 'WHERE "machineId" = $1 AND "chartId" = $2 AND "key" = $3 ', + [ref.machineId, ref.chartId, key], + ); + + return result.affectedRows ?? 0; + } + + public async deleteByChart(ref: ChartReference): Promise { + const result = await this.pool.query( + 'DELETE FROM "digests" ' + 'WHERE "machineId" = $1 AND "chartId" = $2 ', + [ref.machineId, ref.chartId], + ); + + return result.affectedRows ?? 0; + } + + /** Corresponds to {@link PostgresDigestRow} */ + private readonly digestEntrySqlSelectFields = + 'extract(epoch from "created") * 1000 AS "created", ' + + 'extract(epoch from "timestamp") * 1000 AS "timestamp", ' + + '"machineId", "chartId", "key, "value" '; + + public async readDigest( + ref: ChartReference, + key: string, + ): Promise { + const result = await this.pool.query( + 'SELECT ' + + this.digestEntrySqlSelectFields + + 'FROM "digests" ' + + 'WHERE "machineId" = $1 AND "chartId" = $2 AND key = $3 ', + [ref.machineId, ref.chartId, key], + ); + + if (!result.rows.length) { + return null; + } + + return PGliteDigestPersistenceAdapter.parseSqlDigestRow(result.rows[0]); + } + + public async readByChart(ref: ChartReference): Promise { + const result = await this.pool.query( + 'SELECT ' + + this.digestEntrySqlSelectFields + + 'FROM "digests" ' + + 'WHERE "machineId" = $1 AND "chartId" = $2 ', + [ref.machineId, ref.chartId], + ); + + const digestEntries: DigestEntries = {}; + + for (const row of result.rows) { + digestEntries[row.machineId] = + PGliteDigestPersistenceAdapter.parseSqlDigestRow(row); + } + + return digestEntries; + } + + public async queryDigests( + digestQuery?: DigestQuery, + ): Promise { + const [filterQuery, filterBindings] = + PGliteDigestPersistenceAdapter.filterQuery(digestQuery?.query); + + const result = await this.pool.query( + 'SELECT DISTINCT "machineId", "chartId", ' + + ' MAX(extract(epoch from "timestamp") * 1000) as "timestamp" ' + + 'FROM "digests" WHERE TRUE ' + + (digestQuery?.machineId !== undefined + ? ' AND "machineId" = $1 ' + : '') + + (digestQuery?.chartId !== undefined ? ' AND "chartId" = $2 ' : '') + + (filterQuery ? `AND (${filterQuery}) ` : '') + + 'GROUP BY "machineId", "chartId" ' + + 'ORDER BY "timestamp" ' + + (digestQuery?.order ?? 'ASC') + + (digestQuery?.offset !== undefined ? ' OFFSET $3 ' : '') + + (digestQuery?.limit !== undefined ? ' LIMIT $4 ' : ''), + [ + digestQuery?.machineId, + digestQuery?.chartId, + digestQuery?.offset, + digestQuery?.limit, + // ...filterBindings, + // TODO --^ + ], + ); + + return result.rows; + } + + static filterQuery( + expression?: Expression, + prefix = 'q', + ): [string, { [key: string]: string | number }] { + if (!expression) { + return ['', {}]; + } + + let queryString = ''; + const bindings: Record = {}; + + const createBindingKey = ( + op: 'eq' | 're' | 'lt' | 'lte' | 'gt' | 'gte', + key: string, + ) => `${prefix}_${op}_${key}`; + + const keyMatchSql = (key: string, bindingKey: string): string => + key ? `AND "candidate"."key" = :key_${bindingKey} ` : ''; + + const addBindings = ( + bindingKey: string, + key: string, + pattern: string | number, + ) => { + bindings[`key_${bindingKey}`] = key; + bindings[`value_${bindingKey}`] = pattern; + }; + + const matchingSql = (conditionSql: string) => + 'EXISTS ' + + '(SELECT 1 FROM "digests" AS "candidate" ' + + 'WHERE "candidate"."machineId" = "digests"."machineId" ' + + 'AND "candidate"."chartId" = "digests"."chartId" ' + + conditionSql + + ')'; + + switch (expression.op) { + case 'eq': { + const bindingKey = createBindingKey('eq', expression.left); + addBindings(bindingKey, expression.left, expression.right); + queryString += matchingSql( + keyMatchSql(expression.left, bindingKey) + + `AND "candidate"."value" = :value_${bindingKey} `, + ); + break; + } + + case 'matches': { + const bindingKey = createBindingKey('re', expression.left); + addBindings(bindingKey, expression.left, expression.right); + queryString += matchingSql( + keyMatchSql(expression.left, bindingKey) + + `AND "candidate"."value" ~ :value_${bindingKey} `, + ); + break; + } + + // Numeric inequality + + case '<': { + const bindingKey = createBindingKey('lt', expression.left); + addBindings(bindingKey, expression.left, expression.right); + queryString += matchingSql( + keyMatchSql(expression.left, bindingKey) + + `AND "candidate"."value"::decimal < :value_${bindingKey}::decimal `, + ); + break; + } + + case '>': { + const bindingKey = createBindingKey('gt', expression.left); + addBindings(bindingKey, expression.left, expression.right); + queryString += matchingSql( + keyMatchSql(expression.left, bindingKey) + + `AND "candidate"."value"::decimal > :value_${bindingKey}::decimal `, + ); + break; + } + + case '<=': { + const bindingKey = createBindingKey('lte', expression.left); + addBindings(bindingKey, expression.left, expression.right); + queryString += matchingSql( + keyMatchSql(expression.left, bindingKey) + + `AND "candidate"."value"::decimal <= :value_${bindingKey}::decimal `, + ); + break; + } + + case '>=': { + const bindingKey = createBindingKey('gte', expression.left); + addBindings(bindingKey, expression.left, expression.right); + queryString += matchingSql( + keyMatchSql(expression.left, bindingKey) + + `AND "candidate"."value"::decimal >= :value_${bindingKey}::decimal `, + ); + break; + } + + // Timestamps + + case 'created before': { + const bindingKey = `${prefix}_crbef`; + bindings[`value_${bindingKey}`] = expression.dateTime.valueOf(); + queryString += + 'NOT ' + + matchingSql( + `AND "candidate"."created" > to_timestamp(:value_${bindingKey}::decimal / 1000) `, + ); + break; + } + + case 'updated before': { + const bindingKey = `${prefix}_udbef`; + bindings[`value_${bindingKey}`] = expression.dateTime.valueOf(); + queryString += + 'NOT ' + + matchingSql( + `AND "candidate"."timestamp" > to_timestamp(:value_${bindingKey}::decimal / 1000) `, + ); + break; + } + + case 'created after': { + const bindingKey = `${prefix}_craft`; + bindings[`value_${bindingKey}`] = expression.dateTime.valueOf(); + queryString += + 'NOT ' + + matchingSql( + `AND "candidate"."created" < to_timestamp(:value_${bindingKey}::decimal / 1000) `, + ); + break; + } + + case 'updated after': { + const bindingKey = `${prefix}_crbef`; + bindings[`value_${bindingKey}`] = expression.dateTime.valueOf(); + queryString += + 'NOT ' + + matchingSql( + `AND "candidate"."timestamp" < to_timestamp(:value_${bindingKey}::decimal / 1000) `, + ); + break; + } + + // Combinators + + case 'not': { + const [subQueryString, subQueryBindings] = + PGliteDigestPersistenceAdapter.filterQuery( + expression.operand, + prefix + '_not', + ); + queryString += `NOT (${subQueryString}) `; + Object.assign(bindings, subQueryBindings); + break; + } + + case 'and': { + const [leftQueryString, leftQueryBindings] = + PGliteDigestPersistenceAdapter.filterQuery( + expression.left, + prefix + '_and_lt', + ); + const [rightQueryString, rightQueryBindings] = + PGliteDigestPersistenceAdapter.filterQuery( + expression.right, + prefix + '_and_rt', + ); + queryString += `${leftQueryString} AND ${rightQueryString} `; + Object.assign(bindings, leftQueryBindings); + Object.assign(bindings, rightQueryBindings); + break; + } + + case 'or': { + const [leftQueryString, leftQueryBindings] = + PGliteDigestPersistenceAdapter.filterQuery( + expression.left, + prefix + '_or_lt', + ); + const [rightQueryString, rightQueryBindings] = + PGliteDigestPersistenceAdapter.filterQuery( + expression.right, + prefix + '_or_rt', + ); + queryString += `${leftQueryString} OR ${rightQueryString} `; + Object.assign(bindings, leftQueryBindings); + Object.assign(bindings, rightQueryBindings); + break; + } + } + + return [`${queryString}`, bindings]; + } + + private async startObservingNewDigestEntries(): Promise<() => Promise> { + const channel = 'new-digest-entry'; + const digestSubscriber = createSubscriber(this.listenerConfig); + + // Received a notification of a new journal entry + digestSubscriber.notifications.on(channel, async (ref: ChartReference) => { + this.newDigestEntriesSubject.next(ref); + }); + + digestSubscriber.events.on('error', (error) => { + this.newDigestEntriesSubject.error(error); + }); + + digestSubscriber.connect().then(() => digestSubscriber.listenTo(channel)); + + return async () => { + await digestSubscriber.close(); + }; + } + + static parseSqlDigestRow(row: PGliteDigestRow): DigestEntry { + return { + created: Number(row.created), + timestamp: Number(row.timestamp), + + ref: { + machineId: row.machineId, + chartId: row.chartId, + }, + + key: row.key, + value: row.value, + }; + } +} diff --git a/packages/digest-pglite/src/PGliteDigestRow.ts b/packages/digest-pglite/src/PGliteDigestRow.ts new file mode 100644 index 00000000..b734bcdd --- /dev/null +++ b/packages/digest-pglite/src/PGliteDigestRow.ts @@ -0,0 +1,11 @@ +/** + * Digest record row directly from the SQL query + */ +export type PGliteDigestRow = { + created: number; + timestamp: number; + machineId: string; + chartId: string; + key: string; + value: string; +}; diff --git a/packages/digest-pglite/src/index.ts b/packages/digest-pglite/src/index.ts new file mode 100644 index 00000000..71fd73c6 --- /dev/null +++ b/packages/digest-pglite/src/index.ts @@ -0,0 +1,2 @@ +export * from './PGliteDigestRow'; +export * from './PGliteDigestPersistenceAdapter'; diff --git a/packages/digest-pglite/src/migrations/001-initial.sql b/packages/digest-pglite/src/migrations/001-initial.sql new file mode 100644 index 00000000..041ea0b2 --- /dev/null +++ b/packages/digest-pglite/src/migrations/001-initial.sql @@ -0,0 +1,21 @@ +-------------------------------------------------------------------------------- +-- Up migration +-------------------------------------------------------------------------------- + +CREATE TABLE "digests" ( + "created" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT transaction_timestamp(), + "timestamp" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT transaction_timestamp(), + + "machineId" TEXT NOT NULL, + "chartId" TEXT NOT NULL, + "key" TEXT NOT NULL, + "value" TEXT NOT NULL, + + PRIMARY KEY ("machineId", "chartId", "key") +); + +-------------------------------------------------------------------------------- +-- Down migration +-------------------------------------------------------------------------------- + +DROP TABLE "digests"; diff --git a/packages/digest-pglite/tsconfig.json b/packages/digest-pglite/tsconfig.json new file mode 100644 index 00000000..e142a2d6 --- /dev/null +++ b/packages/digest-pglite/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "rootDir": "./src", + "outDir": "./lib" + }, + "include": ["src"], + "exclude": ["node_modules", "lib", "src/**/__tests__", "src/**/*.test.*"] +} diff --git a/packages/journal-pglite/README.md b/packages/journal-pglite/README.md new file mode 100644 index 00000000..03b6e8f5 --- /dev/null +++ b/packages/journal-pglite/README.md @@ -0,0 +1,11 @@ +# `journal-pglite` + +> TODO: description + +## Usage + +``` +const journalPglite = require('journal-pglite'); + +// TODO: DEMONSTRATE API +``` diff --git a/packages/journal-pglite/package.json b/packages/journal-pglite/package.json new file mode 100644 index 00000000..ad04886b --- /dev/null +++ b/packages/journal-pglite/package.json @@ -0,0 +1,19 @@ +{ + "name": "journal-pglite", + "version": "0.0.1", + "description": "> TODO: description", + "author": "Juha Mustonen ", + "homepage": "", + "license": "MIT", + "main": "lib/journal-pglite.js", + "directories": { + "lib": "lib", + "test": "__tests__" + }, + "files": [ + "lib" + ], + "scripts": { + "test": "node ./__tests__/journal-pglite.test.js" + } +} diff --git a/packages/journal-pglite/src/PGliteFullStateRow.ts b/packages/journal-pglite/src/PGliteFullStateRow.ts new file mode 100644 index 00000000..c15b4c8e --- /dev/null +++ b/packages/journal-pglite/src/PGliteFullStateRow.ts @@ -0,0 +1,19 @@ +/** + * Full state entry row directly from the SQL query + */ +export type PGliteFullStateRow = { + id: number; + created: number; + timestamp: number; + + ownerId: string; + machineId: string; + chartId: string; + parentMachineId: string; + parentChartId: string; + + event: Buffer | null; + state: Buffer | null; + context: Buffer | null; + actions: Buffer | null; +}; diff --git a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts new file mode 100644 index 00000000..90a7ffd0 --- /dev/null +++ b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts @@ -0,0 +1,618 @@ +import path from 'path'; +import { ChartReference } from '@samihult/xjog-util'; +import { Client, PoolConfig } from 'pg'; +import migrationRunner from 'node-pg-migrate'; +import createSubscriber from 'pg-listen'; +import bind from 'pg-bind'; + +import { + FullStateEntry, + FullStateQuery, + JournalEntry, + JournalEntryAutoFields, + JournalEntryInsertFields, + JournalPersistenceAdapter, + JournalQuery, +} from '@samihult/xjog-journal-persistence'; + +import { PGliteJournalRow } from './PGliteJournalRow'; +import { PGliteFullStateRow } from './PGliteFullStateRow'; + +/** + * Options for instantiating {@link PGliteJournalPersistenceAdapter}. + */ +export type PGliteJournalPersistenceAdapterOptions = { + keyFrameInterval?: number; +}; + +/** + * Use the static method `connect` to instantiate. + * @hideconstructor + */ +export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { + public readonly component = 'journal/persistence'; + public readonly type = 'pg'; + + private readonly stopObservingNewJournalEntries: Promise<() => Promise>; + + public constructor( + private readonly listenerConfig: PoolConfig, + private readonly subscriptionConnection: Client, + private readonly readConnection: Client, + private readonly writeConnection: Client, + private readonly updateConnection: Client, + private options: PGliteJournalPersistenceAdapterOptions, + ) { + super(); + + subscriptionConnection.on('error', (err) => + this.error('Subscription connection emitted error', { err }), + ); + readConnection.on('error', (err) => + this.error('Read connection emitted error', { err }), + ); + writeConnection.on('error', (err) => + this.error('Write connection emitted error', { err }), + ); + updateConnection.on('error', (err) => + this.error('Update connection emitted error', { err }), + ); + + this.stopObservingNewJournalEntries = + this.startObservingNewJournalEntries(); + } + + /** + * Create a connection to a [PostgreSql](https://www.postgresql.org/) database + * and resolve to a JournalPersistenceAdapter that can be passed to the XJog + * constructor. + */ + static async connect( + poolConfiguration: PoolConfig, + // TODO resolve + options: Partial = {}, + ): Promise { + const subscriptionConnection = new Client(poolConfiguration); + const readConnection = new Client(poolConfiguration); + const writeConnection = new Client(poolConfiguration); + const updateConnection = new Client(poolConfiguration); + + const adapter = new PGliteJournalPersistenceAdapter( + poolConfiguration, + subscriptionConnection, + readConnection, + writeConnection, + updateConnection, + options, + ); + + await subscriptionConnection.connect(); + await readConnection.connect(); + await writeConnection.connect(); + await updateConnection.connect(); + + // TODO resolve separately + options.keyFrameInterval ??= 100; + + const migrationClient = new Client(poolConfiguration); + try { + await migrationClient.connect(); + await migrationRunner({ + dbClient: migrationClient, + migrationsTable: 'migrations_journal', + dir: path.join(__dirname, './migrations'), + direction: 'up', + log: (message) => adapter.trace({ in: 'connect', message }), + // https://github.com/salsita/node-pg-migrate/issues/821 + checkOrder: false, + noLock: true, + }); + } finally { + if (migrationClient) { + await migrationClient.end(); + } + } + + return adapter; + } + + public async disconnect(): Promise { + await ( + await this.stopObservingNewJournalEntries + )(); + + await this.subscriptionConnection.end(); + await this.updateConnection.end(); + await this.writeConnection.end(); + await this.readConnection.end(); + } + + protected async insertEntry( + entry: JournalEntryInsertFields, + ): Promise { + const query = bind( + 'INSERT INTO "journalEntries" ' + + '(' + + ' "machineId", "chartId", "event", ' + + ' "state", "context", "stateDelta", "contextDelta", ' + + ' "actions" ' + + ') ' + + 'VALUES (' + + ' :machineId, :chartId, :event, NULL, NULL, ' + + ' :stateDelta, :contextDelta, :actions ' + + ') ' + + 'RETURNING ' + + ' "id", extract(epoch from "timestamp") * 1000 as "timestamp" ', + { + machineId: entry.ref.machineId, + chartId: entry.ref.chartId, + event: entry.event ? Buffer.from(JSON.stringify(entry.event)) : null, + stateDelta: Buffer.from(JSON.stringify(entry.stateDelta)), + contextDelta: Buffer.from(JSON.stringify(entry.contextDelta)), + actions: entry.actions + ? Buffer.from(JSON.stringify(entry.actions)) + : null, + }, + ); + + const result = await this.writeConnection.query(query); + + if (!result.rowCount) { + throw new Error('Failed to write journal entry'); + } + + return { + id: result.rows[0].id, + timestamp: Number(result.rows[0].timestamp), + }; + } + + protected async updateFullState(entry: FullStateEntry): Promise { + const query = bind( + 'INSERT INTO "fullJournalStates" ' + + '( ' + + ' "id", "created", "timestamp", ' + + ' "ownerId", "machineId", "chartId", ' + + ' "parentMachineId", "parentChartId", ' + + ' "event", "state", "context", ' + + ' "actions" ' + + ') ' + + 'VALUES (' + + ' :id, to_timestamp(:timestamp::decimal / 1000), ' + + ' to_timestamp(:timestamp::decimal / 1000), ' + + ' :ownerId, :machineId, :chartId, ' + + ' :parentMachineId, :parentChartId, ' + + ' :event, :state, :context,' + + ' :actions ' + + ') ON CONFLICT (' + + ' "machineId", "chartId" ' + + ') DO UPDATE SET ' + + ' "id" = :id, "timestamp" = to_timestamp(:timestamp::decimal / 1000), ' + + ' "event" = :event, "state" = :state, "context" = :context, ' + + ' "actions" = :actions ' + + 'WHERE "fullJournalStates"."id" < :id ', + { + id: entry.id, + timestamp: entry.timestamp, + ownerId: entry.ownerId, + machineId: entry.ref.machineId, + chartId: entry.ref.chartId, + parentMachineId: entry.parentRef?.machineId ?? null, + parentChartId: entry.parentRef?.chartId ?? null, + event: entry.event ? Buffer.from(JSON.stringify(entry.event)) : null, + state: entry.state ? Buffer.from(JSON.stringify(entry.state)) : null, + context: entry.context + ? Buffer.from(JSON.stringify(entry.context)) + : null, + actions: entry.actions + ? Buffer.from(JSON.stringify(entry.actions)) + : null, + }, + ); + + const result = await this.writeConnection.query(query); + + if (!result.rowCount) { + throw new Error('Failed to write journal full entry'); + } + + return result.rows[0]; + } + + protected async emitJournalEntryNotification( + id: number, + ref: ChartReference, + ): Promise { + const payload = JSON.stringify({ + id, + machineId: ref.machineId, + chartId: ref.chartId, + }); + + await this.updateConnection.query( + bind("SELECT pg_notify('new-journal-entry', :payload::text)", { + payload, + }), + ); + } + + /** These SQL fields correspond to {@link PostgresJournalRow} */ + private readonly journalEntrySqlSelectFields = + ' "id", extract(epoch from "timestamp") * 1000 as "timestamp", ' + + ' "machineId", "chartId", "event", ' + + ' "state", "stateDelta", "context", "contextDelta", ' + + ' "actions" '; + + public async readEntry(id: number): Promise { + const result = await this.readConnection.query( + bind( + 'SELECT ' + + this.journalEntrySqlSelectFields + + 'FROM "journalEntries" WHERE "id"=:id::bigint', + { id }, + ), + ); + + if (!result.rowCount) { + return null; + } + + return PGliteJournalPersistenceAdapter.parseSqlJournalRow(result.rows[0]); + } + + public async queryEntries(query: JournalQuery): Promise { + let result; + + if (Array.isArray(query)) { + if (!query.length) { + return []; + } + + result = await this.readConnection.query( + 'SELECT ' + + this.journalEntrySqlSelectFields + + 'FROM "journalEntries" ' + + 'JOIN (VALUES ' + + query + .map( + ({ machineId, chartId }) => + `(${this.readConnection.escapeLiteral(machineId)}, ` + + `${this.readConnection.escapeLiteral(chartId)})`, + ) + .join(', ') + + ') ' + + ' AS "queryValues" ("queryMachineId", "queryChartId") ' + + 'ON "machineId" = "queryMachineId" AND "chartId" = "queryChartId" ', + ); + } else { + result = await this.readConnection.query( + bind( + 'SELECT ' + + this.journalEntrySqlSelectFields + + 'FROM "journalEntries" ' + + 'WHERE TRUE ' + + (query.ref !== undefined + ? ' AND "machineId" = :machineId AND "chartId" = :chartId ' + : '') + + (query.afterId !== undefined + ? ' AND "id" > :afterId::bigint ' + : '') + + (query.afterAndIncludingId !== undefined + ? ' AND "id" >= :afterAndIncludingId::bigint ' + : '') + + (query.beforeId !== undefined + ? ' AND "id" < :beforeId::bigint ' + : '') + + (query.beforeAndIncludingId !== undefined + ? ' AND "id" <= :beforeAndIncludingId::bigint ' + : '') + + (query.updatedAfterAndIncluding !== undefined + ? ' AND "timestamp" >= to_timestamp(:updatedAfterAndIncluding::decimal / 1000) ' + : '') + + (query.updatedBeforeAndIncluding !== undefined + ? ' AND "timestamp" <= to_timestamp(:updatedBeforeAndIncluding::decimal / 1000) ' + : '') + + 'ORDER BY "id" ' + + (query.order ?? 'ASC') + + (query.offset !== undefined ? ' OFFSET :offset' : '') + + (query.limit !== undefined ? ' LIMIT :limit' : ''), + { + machineId: query.ref?.machineId, + chartId: query.ref?.chartId, + afterId: query.afterId, + afterAndIncludingId: query.afterAndIncludingId, + beforeId: query.beforeId, + beforeAndIncludingId: query.beforeAndIncludingId, + updatedAfterAndIncluding: query.updatedAfterAndIncluding, + updatedBeforeAndIncluding: query.updatedBeforeAndIncluding, + offset: query.offset, + limit: query.limit, + }, + ), + ); + } + + return result.rows.map(PGliteJournalPersistenceAdapter.parseSqlJournalRow); + } + + private async startObservingNewJournalEntries(): Promise< + () => Promise + > { + const startTime = await this.getCurrentTime(); + + let journalEntryIdPointer = 0; + let fullStateEntryIdPointer = 0; + + const channel = 'new-journal-entry'; + const journalSubscriber = createSubscriber(this.listenerConfig); + + const yieldJournalEntries = (journalEntries: JournalEntry[]) => { + for (const journalEntry of journalEntries) { + if (journalEntry.id < journalEntryIdPointer) { + return; + } + journalEntryIdPointer = journalEntry.id; + this.newJournalEntriesSubject.next(journalEntry); + } + }; + + const yieldFullStateEntries = (fullStateEntries: FullStateEntry[]) => { + for (const fullStateEntry of fullStateEntries) { + if (fullStateEntry.id < fullStateEntryIdPointer) { + return; + } + fullStateEntryIdPointer = fullStateEntry.id; + this.newFullStateEntriesSubject.next(fullStateEntry); + } + }; + + // Received a notification of a new journal entry + journalSubscriber.notifications.on(channel, async () => { + this.queryEntries({ + afterId: journalEntryIdPointer, + updatedAfterAndIncluding: startTime, + order: 'DESC', + }).then((journalEntries: JournalEntry[]) => { + if (journalEntries.length) { + yieldJournalEntries(journalEntries); + } + }); + + this.queryFullStates({ + afterId: fullStateEntryIdPointer, + updatedAfterAndIncluding: startTime, + order: 'DESC', + }).then((fullStateEntries: FullStateEntry[]) => { + if (fullStateEntries.length) { + yieldFullStateEntries(fullStateEntries); + } + }); + }); + + journalSubscriber.events.on('error', (error) => { + this.newJournalEntriesSubject.error(error); + this.newFullStateEntriesSubject.error(error); + }); + + journalSubscriber.connect().then(() => journalSubscriber.listenTo(channel)); + + return async () => { + await journalSubscriber.close(); + }; + } + + /** These SQL fields correspond to {@link PostgresFullStateRow} */ + private readonly fullStateEntrySqlSelectFields = + ' "id", extract(epoch from "created") * 1000 as "created", ' + + ' extract(epoch from "timestamp") * 1000 as "timestamp", ' + + ' "machineId", "chartId", "parentMachineId", "parentChartId", ' + + ' "event", "state", "context", "actions" '; + + public async readFullState( + ref: ChartReference, + ): Promise { + const result = await this.readConnection.query( + bind( + 'SELECT ' + + this.fullStateEntrySqlSelectFields + + 'FROM "fullJournalStates" ' + + 'WHERE "machineId" = :machineId AND "chartId" = :chartId ', + { + machineId: ref.machineId, + chartId: ref.chartId, + }, + ), + ); + + if (!result.rowCount) { + return null; + } + + return PGliteJournalPersistenceAdapter.parseSqlFullStateRow(result.rows[0]); + } + + public async queryFullStates( + query: FullStateQuery, + ): Promise { + let result; + + if (Array.isArray(query)) { + if (!query.length) { + return []; + } + + result = await this.readConnection.query( + 'SELECT ' + + this.fullStateEntrySqlSelectFields + + 'FROM "fullJournalStates" ' + + 'JOIN (VALUES ' + + query + .map( + ({ machineId, chartId }) => + `(${this.readConnection.escapeLiteral(machineId)}, ` + + `${this.readConnection.escapeLiteral(chartId)})`, + ) + .join(', ') + + ') ' + + ' AS "queryValues" ("queryMachineId", "queryChartId") ' + + 'ON "machineId" = "queryMachineId" AND "chartId" = "queryChartId" ', + ); + } else { + result = await this.readConnection.query( + bind( + 'SELECT ' + + this.fullStateEntrySqlSelectFields + + 'FROM "fullJournalStates" ' + + 'WHERE TRUE ' + + (query.ref !== undefined && query.machineId === undefined + ? ' AND "machineId" = :machineId AND "chartId" = :chartId ' + : '') + + (query.parentRef !== undefined + ? ' AND "parentMachineId" = :parentMachineId AND "parentChartId" = :parentChartId ' + : '') + + // In case of both machineId and ref, ref takes precedence + (query.machineId !== undefined && query.ref === undefined + ? ' AND "machineId" = :machineId ' + : '') + + (query.afterId !== undefined + ? ' AND "id" > :afterId::bigint ' + : '') + + (query.afterAndIncludingId !== undefined + ? ' AND "id" >= :afterAndIncludingId::bigint ' + : '') + + (query.beforeId !== undefined + ? ' AND "id" < :beforeId::bigint ' + : '') + + (query.beforeAndIncludingId !== undefined + ? ' AND "id" <= :beforeAndIncludingId::bigint ' + : '') + + (query.createdAfterAndIncluding !== undefined + ? ' AND "created" >= to_timestamp(:createdAfterAndIncluding::decimal / 1000) ' + : '') + + (query.createdBeforeAndIncluding !== undefined + ? ' AND "created" <= to_timestamp(:createdBeforeAndIncluding::decimal / 1000) ' + : '') + + (query.updatedAfterAndIncluding !== undefined + ? ' AND "timestamp" >= to_timestamp(:updatedAfterAndIncluding::decimal / 1000) ' + : '') + + (query.updatedBeforeAndIncluding !== undefined + ? ' AND "timestamp" <= to_timestamp(:updatedBeforeAndIncluding::decimal / 1000) ' + : '') + + 'ORDER BY "id" ' + + (query.order ?? 'ASC') + + (query.offset !== undefined ? ' OFFSET :offset' : '') + + (query.limit !== undefined ? ' LIMIT :limit' : ''), + { + machineId: query.ref?.machineId ?? query.machineId, + chartId: query.ref?.chartId, + parentMachineId: query.parentRef?.machineId, + parentChartId: query.parentRef?.chartId, + afterId: query.afterId, + afterAndIncludingId: query.afterAndIncludingId, + beforeId: query.beforeId, + beforeAndIncludingId: query.beforeAndIncludingId, + createdAfterAndIncluding: query.createdAfterAndIncluding, + createdBeforeAndIncluding: query.createdBeforeAndIncluding, + updatedAfterAndIncluding: query.updatedAfterAndIncluding, + updatedBeforeAndIncluding: query.updatedBeforeAndIncluding, + offset: query.offset, + limit: query.limit, + }, + ), + ); + } + + return result.rows.map( + PGliteJournalPersistenceAdapter.parseSqlFullStateRow, + ); + } + + /** + * @returns Number of deleted records + */ + public async deleteByChart(ref: ChartReference): Promise { + const fullStateResult = await this.updateConnection.query( + bind( + 'DELETE FROM "fullJournalStates" ' + + 'WHERE "machineId"=:machineId AND "chartId"=:chartId', + { + machineId: ref.machineId, + chartId: ref.chartId, + }, + ), + ); + + const journalEntryResult = await this.updateConnection.query( + bind( + 'DELETE FROM "journalEntries" ' + + 'WHERE "machineId"=:machineId AND "chartId"=:chartId', + { + machineId: ref.machineId, + chartId: ref.chartId, + }, + ), + ); + + return fullStateResult.rowCount + journalEntryResult.rowCount; + } + + public async getCurrentTime(): Promise { + const result = await this.readConnection.query<{ time: number }>( + 'SELECT extract(epoch from transaction_timestamp()) * 1000 AS "time"', + ); + + if (!result.rowCount) { + throw new Error('Failed to read current time from database'); + } + + return Number(result.rows[0].time); + } + + static parseSqlJournalRow(row: PGliteJournalRow): JournalEntry { + return { + id: Number(row.id), + timestamp: Number(row.timestamp), + + ref: { + machineId: row.machineId, + chartId: row.chartId, + }, + + event: JSON.parse(String(row.event)), + + state: row.state ? JSON.parse(String(row.state)) : null, + context: row.context ? JSON.parse(String(row.context)) : null, + + stateDelta: JSON.parse(String(row.stateDelta)), + contextDelta: JSON.parse(String(row.contextDelta)), + actions: row.actions ? JSON.parse(String(row.actions)) : null, + }; + } + + static parseSqlFullStateRow(row: PGliteFullStateRow): FullStateEntry { + return { + id: Number(row.id), + created: Number(row.created), + timestamp: Number(row.timestamp), + + ownerId: row.ownerId, + + ref: { + machineId: row.machineId, + chartId: row.chartId, + }, + + parentRef: row.parentChartId + ? { + machineId: row.parentMachineId, + chartId: row.parentChartId, + } + : null, + + event: JSON.parse(String(row.event)), + state: row.state ? JSON.parse(String(row.state)) : null, + context: row.context ? JSON.parse(String(row.context)) : null, + actions: row.actions ? JSON.parse(String(row.actions)) : null, + }; + } +} diff --git a/packages/journal-pglite/src/PGliteJournalRow.ts b/packages/journal-pglite/src/PGliteJournalRow.ts new file mode 100644 index 00000000..4934558a --- /dev/null +++ b/packages/journal-pglite/src/PGliteJournalRow.ts @@ -0,0 +1,15 @@ +/** + * Journal entry row directly from the SQL query + */ +export type PGliteJournalRow = { + id: number; + timestamp: number; + machineId: string; + chartId: string; + event: Buffer | null; + state: Buffer | null; + stateDelta: Buffer; + context: Buffer | null; + contextDelta: Buffer; + actions: Buffer | null; +}; diff --git a/packages/journal-pglite/src/index.ts b/packages/journal-pglite/src/index.ts new file mode 100644 index 00000000..b726c6f8 --- /dev/null +++ b/packages/journal-pglite/src/index.ts @@ -0,0 +1,3 @@ +export * from './PGliteJournalRow'; +export * from './PGliteFullStateRow'; +export * from './PGliteJournalPersistenceAdapter'; diff --git a/packages/journal-pglite/src/migrations/001-initial.sql b/packages/journal-pglite/src/migrations/001-initial.sql new file mode 100644 index 00000000..32086f56 --- /dev/null +++ b/packages/journal-pglite/src/migrations/001-initial.sql @@ -0,0 +1,67 @@ +-------------------------------------------------------------------------------- +-- Up migration +-------------------------------------------------------------------------------- + +CREATE TABLE "journalEntries" ( + "id" SERIAL PRIMARY KEY, + "timestamp" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT transaction_timestamp(), + + "machineId" TEXT NOT NULL, + "chartId" TEXT NOT NULL, + + -- Event that caused this transition, as serialized JSON + "event" BYTEA, + -- Full state as serialized JSON, but only mandatory for the first entry + "state" BYTEA DEFAULT NULL, + -- Context as serialized JSON,but only mandatory for the first entry + "context" BYTEA DEFAULT NULL, + + -- Change set between this and previous entry, can be used for time travel + "stateDelta" BYTEA NOT NULL, + -- Change set between this and previous entry, can be used for time travel + "contextDelta" BYTEA NOT NULL, + + -- Actions triggered by the transition + "actions" BYTEA DEFAULT NULL +); + +CREATE INDEX "journalChartIndex" ON "journalEntries" ("machineId", "chartId"); + +CREATE TABLE "fullJournalStates" ( + "id" BIGINT, + "created" TIMESTAMP WITH TIME ZONE NOT NULL, + "timestamp" TIMESTAMP WITH TIME ZONE NOT NULL, + + "ownerId" TEXT, + "machineId" TEXT NOT NULL, + "chartId" TEXT NOT NULL, + "parentMachineId" TEXT, + "parentChartId" TEXT, + + -- Event that caused this transition, as serialized JSON + "event" BYTEA DEFAULT NULL, + + -- Full state as serialized JSON, but only mandatory for the first entry + "state" BYTEA DEFAULT NULL, + -- Context as serialized JSON,but only mandatory for the first entry + "context" BYTEA DEFAULT NULL, + + -- Actions triggered by the transition + "actions" BYTEA DEFAULT NULL, + + PRIMARY KEY("machineId", "chartId") +); + +CREATE INDEX "fullJournalChartParentIndex" + ON "fullJournalStates" ("parentMachineId", "parentChartId") + WHERE "parentChartId" IS NOT NULL; + +-------------------------------------------------------------------------------- +-- Down migration +-------------------------------------------------------------------------------- + +DROP INDEX "fullJournalChartParentIndex"; +DROP INDEX "journalChartIndex"; + +DROP TABLE "fullJournalStates"; +DROP TABLE "journalEntries"; From dacca353eedd326eb8b0dc6eb2d9fd12e1feee64 Mon Sep 17 00:00:00 2001 From: Juha Mustonen Date: Tue, 29 Jul 2025 17:33:38 +0300 Subject: [PATCH 02/14] Compile files --- packages/core-pglite/package.json | 2 +- packages/digest-pglite/package.json | 20 +- packages/digest-pglite/tsconfig.json | 2 +- packages/journal-pglite/jest.config.js | 6 + packages/journal-pglite/package.json | 20 +- .../src/PGliteJournalPersistenceAdapter.ts | 460 ++++++++---------- packages/journal-pglite/tsconfig.json | 9 + 7 files changed, 261 insertions(+), 258 deletions(-) create mode 100644 packages/journal-pglite/jest.config.js create mode 100644 packages/journal-pglite/tsconfig.json diff --git a/packages/core-pglite/package.json b/packages/core-pglite/package.json index 9181e62e..b0d95a0c 100644 --- a/packages/core-pglite/package.json +++ b/packages/core-pglite/package.json @@ -22,7 +22,7 @@ "format": "prettier --write 'src/**/*.ts'", "lint": "eslint src/**/*.ts && prettier --check src", "prePublishOnly": "yarn build", - "test": "node --experimental-vm-modules node_modules/.bin/jest" + "test": "NODE_OPTIONS='--experimental-vm-modules' jest" }, "dependencies": { "@electric-sql/pglite": "^0.3.5", diff --git a/packages/digest-pglite/package.json b/packages/digest-pglite/package.json index a3f032d5..1e15f64d 100644 --- a/packages/digest-pglite/package.json +++ b/packages/digest-pglite/package.json @@ -7,13 +7,25 @@ "license": "MIT", "main": "lib/digest-pglite.js", "directories": { - "lib": "lib", - "test": "__tests__" + "lib": "lib" }, "files": [ "lib" ], "scripts": { - "test": "node ./__tests__/digest-pglite.test.js" + "build": "yarn build:bin ; yarn build:files", + "build:bin": "tsc", + "build:files": "cpx-fixed 'src/**/*.sql' lib", + "clean": "rm -rf node_modules lib", + "test": "NODE_OPTIONS='--experimental-vm-modules' jest" + }, + "dependencies": { + "@electric-sql/pglite": "^0.3.5" + }, + "devDependencies": { + "cpx-fixed": "^1.6.0", + "jest": "^28.1.2", + "ts-node": "^10.7.0", + "typescript": "^4.7.4" } -} +} \ No newline at end of file diff --git a/packages/digest-pglite/tsconfig.json b/packages/digest-pglite/tsconfig.json index e142a2d6..cf21a7e1 100644 --- a/packages/digest-pglite/tsconfig.json +++ b/packages/digest-pglite/tsconfig.json @@ -5,5 +5,5 @@ "outDir": "./lib" }, "include": ["src"], - "exclude": ["node_modules", "lib", "src/**/__tests__", "src/**/*.test.*"] + "exclude": ["node_modules", "lib", "src/**/*.test.*"] } diff --git a/packages/journal-pglite/jest.config.js b/packages/journal-pglite/jest.config.js new file mode 100644 index 00000000..22d67952 --- /dev/null +++ b/packages/journal-pglite/jest.config.js @@ -0,0 +1,6 @@ +const baseConfig = require('../jestconfig.base'); + +module.exports = { + ...baseConfig, + rootDir: 'src', +}; diff --git a/packages/journal-pglite/package.json b/packages/journal-pglite/package.json index ad04886b..03926d2e 100644 --- a/packages/journal-pglite/package.json +++ b/packages/journal-pglite/package.json @@ -7,13 +7,25 @@ "license": "MIT", "main": "lib/journal-pglite.js", "directories": { - "lib": "lib", - "test": "__tests__" + "lib": "lib" }, "files": [ "lib" ], "scripts": { - "test": "node ./__tests__/journal-pglite.test.js" + "build": "yarn build:bin ; yarn build:files", + "build:bin": "tsc", + "build:files": "cpx-fixed 'src/**/*.sql' lib", + "clean": "rm -rf node_modules lib", + "test": "NODE_OPTIONS='--experimental-vm-modules' jest" + }, + "dependencies": { + "@electric-sql/pglite": "^0.3.5" + }, + "devDependencies": { + "cpx-fixed": "^1.6.0", + "jest": "^28.1.2", + "ts-node": "^10.7.0", + "typescript": "^4.7.4" } -} +} \ No newline at end of file diff --git a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts index 90a7ffd0..da79fb0d 100644 --- a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts +++ b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts @@ -1,9 +1,7 @@ -import path from 'path'; import { ChartReference } from '@samihult/xjog-util'; -import { Client, PoolConfig } from 'pg'; import migrationRunner from 'node-pg-migrate'; +import path from 'path'; import createSubscriber from 'pg-listen'; -import bind from 'pg-bind'; import { FullStateEntry, @@ -15,8 +13,9 @@ import { JournalQuery, } from '@samihult/xjog-journal-persistence'; -import { PGliteJournalRow } from './PGliteJournalRow'; +import { PGlite, PGliteOptions } from '@electric-sql/pglite'; import { PGliteFullStateRow } from './PGliteFullStateRow'; +import { PGliteJournalRow } from './PGliteJournalRow'; /** * Options for instantiating {@link PGliteJournalPersistenceAdapter}. @@ -36,28 +35,12 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { private readonly stopObservingNewJournalEntries: Promise<() => Promise>; public constructor( - private readonly listenerConfig: PoolConfig, - private readonly subscriptionConnection: Client, - private readonly readConnection: Client, - private readonly writeConnection: Client, - private readonly updateConnection: Client, + private readonly listenerConfig: PGliteOptions, + private readonly connection: PGlite, private options: PGliteJournalPersistenceAdapterOptions, ) { super(); - subscriptionConnection.on('error', (err) => - this.error('Subscription connection emitted error', { err }), - ); - readConnection.on('error', (err) => - this.error('Read connection emitted error', { err }), - ); - writeConnection.on('error', (err) => - this.error('Write connection emitted error', { err }), - ); - updateConnection.on('error', (err) => - this.error('Update connection emitted error', { err }), - ); - this.stopObservingNewJournalEntries = this.startObservingNewJournalEntries(); } @@ -68,37 +51,22 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { * constructor. */ static async connect( - poolConfiguration: PoolConfig, - // TODO resolve + poolConfiguration: PGliteOptions = {}, options: Partial = {}, ): Promise { - const subscriptionConnection = new Client(poolConfiguration); - const readConnection = new Client(poolConfiguration); - const writeConnection = new Client(poolConfiguration); - const updateConnection = new Client(poolConfiguration); - + const pool = await PGlite.create(poolConfiguration); const adapter = new PGliteJournalPersistenceAdapter( poolConfiguration, - subscriptionConnection, - readConnection, - writeConnection, - updateConnection, + pool, options, ); - await subscriptionConnection.connect(); - await readConnection.connect(); - await writeConnection.connect(); - await updateConnection.connect(); - // TODO resolve separately options.keyFrameInterval ??= 100; - const migrationClient = new Client(poolConfiguration); try { - await migrationClient.connect(); await migrationRunner({ - dbClient: migrationClient, + dbClient: pool as any, migrationsTable: 'migrations_journal', dir: path.join(__dirname, './migrations'), direction: 'up', @@ -108,9 +76,7 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { noLock: true, }); } finally { - if (migrationClient) { - await migrationClient.end(); - } + await pool.close(); } return adapter; @@ -121,16 +87,16 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { await this.stopObservingNewJournalEntries )(); - await this.subscriptionConnection.end(); - await this.updateConnection.end(); - await this.writeConnection.end(); - await this.readConnection.end(); + await this.connection.close(); } protected async insertEntry( entry: JournalEntryInsertFields, ): Promise { - const query = bind( + const result = await this.connection.query<{ + id: number; + timestamp: number; + }>( 'INSERT INTO "journalEntries" ' + '(' + ' "machineId", "chartId", "event", ' + @@ -138,26 +104,22 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { ' "actions" ' + ') ' + 'VALUES (' + - ' :machineId, :chartId, :event, NULL, NULL, ' + - ' :stateDelta, :contextDelta, :actions ' + + ' $1, $2, $3, NULL, NULL, ' + + ' $4, $5, $6 ' + ') ' + 'RETURNING ' + ' "id", extract(epoch from "timestamp") * 1000 as "timestamp" ', - { - machineId: entry.ref.machineId, - chartId: entry.ref.chartId, - event: entry.event ? Buffer.from(JSON.stringify(entry.event)) : null, - stateDelta: Buffer.from(JSON.stringify(entry.stateDelta)), - contextDelta: Buffer.from(JSON.stringify(entry.contextDelta)), - actions: entry.actions - ? Buffer.from(JSON.stringify(entry.actions)) - : null, - }, + [ + entry.ref.machineId, + entry.ref.chartId, + entry.event ? Buffer.from(JSON.stringify(entry.event)) : null, + Buffer.from(JSON.stringify(entry.stateDelta)), + Buffer.from(JSON.stringify(entry.contextDelta)), + entry.actions ? Buffer.from(JSON.stringify(entry.actions)) : null, + ], ); - const result = await this.writeConnection.query(query); - - if (!result.rowCount) { + if (!result.rows.length) { throw new Error('Failed to write journal entry'); } @@ -168,7 +130,7 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { } protected async updateFullState(entry: FullStateEntry): Promise { - const query = bind( + const result = await this.connection.query( 'INSERT INTO "fullJournalStates" ' + '( ' + ' "id", "created", "timestamp", ' + @@ -178,12 +140,11 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { ' "actions" ' + ') ' + 'VALUES (' + - ' :id, to_timestamp(:timestamp::decimal / 1000), ' + - ' to_timestamp(:timestamp::decimal / 1000), ' + - ' :ownerId, :machineId, :chartId, ' + - ' :parentMachineId, :parentChartId, ' + - ' :event, :state, :context,' + - ' :actions ' + + ' $1, to_timestamp($2::decimal / 1000), ' + + ' to_timestamp($2::decimal / 1000), ' + + ' $3, $4, $5, ' + + ' $6, $7, ' + + ' $8, $9, $10, $11 ' + ') ON CONFLICT (' + ' "machineId", "chartId" ' + ') DO UPDATE SET ' + @@ -191,32 +152,26 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { ' "event" = :event, "state" = :state, "context" = :context, ' + ' "actions" = :actions ' + 'WHERE "fullJournalStates"."id" < :id ', - { - id: entry.id, - timestamp: entry.timestamp, - ownerId: entry.ownerId, - machineId: entry.ref.machineId, - chartId: entry.ref.chartId, - parentMachineId: entry.parentRef?.machineId ?? null, - parentChartId: entry.parentRef?.chartId ?? null, - event: entry.event ? Buffer.from(JSON.stringify(entry.event)) : null, - state: entry.state ? Buffer.from(JSON.stringify(entry.state)) : null, - context: entry.context - ? Buffer.from(JSON.stringify(entry.context)) - : null, - actions: entry.actions - ? Buffer.from(JSON.stringify(entry.actions)) - : null, - }, + [ + entry.id, + entry.timestamp, + entry.ownerId, + entry.ref.machineId, + entry.ref.chartId, + entry.parentRef?.machineId ?? null, + entry.parentRef?.chartId ?? null, + entry.event ? Buffer.from(JSON.stringify(entry.event)) : null, + entry.state ? Buffer.from(JSON.stringify(entry.state)) : null, + entry.context ? Buffer.from(JSON.stringify(entry.context)) : null, + entry.actions ? Buffer.from(JSON.stringify(entry.actions)) : null, + ], ); - const result = await this.writeConnection.query(query); - - if (!result.rowCount) { + if (!result.rows.length) { throw new Error('Failed to write journal full entry'); } - return result.rows[0]; + return; } protected async emitJournalEntryNotification( @@ -229,10 +184,9 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { chartId: ref.chartId, }); - await this.updateConnection.query( - bind("SELECT pg_notify('new-journal-entry', :payload::text)", { - payload, - }), + await this.connection.query( + "SELECT pg_notify('new-journal-entry', $1:text)", + [payload], ); } @@ -244,22 +198,57 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { ' "actions" '; public async readEntry(id: number): Promise { - const result = await this.readConnection.query( - bind( - 'SELECT ' + - this.journalEntrySqlSelectFields + - 'FROM "journalEntries" WHERE "id"=:id::bigint', - { id }, - ), + const result = await this.connection.query( + 'SELECT ' + + this.journalEntrySqlSelectFields + + 'FROM "journalEntries" WHERE "id"=$1', + [id], ); - if (!result.rowCount) { + if (!result.rows.length) { return null; } return PGliteJournalPersistenceAdapter.parseSqlJournalRow(result.rows[0]); } + /** + * Including the node-pg helper function + * https://github.com/brianc/node-postgres/blob/1b2bedc9c86b7378288e704252a8e4fafa27aa34/packages/pg/lib/utils.js#L175 + */ + private escapeLiteral(str: string): string { + let hasBackslash = false; + let escaped = "'"; + + if (str == null) { + return "''"; + } + + if (typeof str !== 'string') { + return "''"; + } + + for (let i = 0; i < str.length; i++) { + const c = str[i]; + if (c === "'") { + escaped += c + c; + } else if (c === '\\') { + escaped += c + c; + hasBackslash = true; + } else { + escaped += c; + } + } + + escaped += "'"; + + if (hasBackslash === true) { + escaped = ' E' + escaped; + } + + return escaped; + } + public async queryEntries(query: JournalQuery): Promise { let result; @@ -268,7 +257,7 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { return []; } - result = await this.readConnection.query( + result = await this.connection.query( 'SELECT ' + this.journalEntrySqlSelectFields + 'FROM "journalEntries" ' + @@ -276,8 +265,8 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { query .map( ({ machineId, chartId }) => - `(${this.readConnection.escapeLiteral(machineId)}, ` + - `${this.readConnection.escapeLiteral(chartId)})`, + `(${this.escapeLiteral(machineId)}, ` + + `${this.escapeLiteral(chartId)})`, ) .join(', ') + ') ' + @@ -285,50 +274,44 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { 'ON "machineId" = "queryMachineId" AND "chartId" = "queryChartId" ', ); } else { - result = await this.readConnection.query( - bind( - 'SELECT ' + - this.journalEntrySqlSelectFields + - 'FROM "journalEntries" ' + - 'WHERE TRUE ' + - (query.ref !== undefined - ? ' AND "machineId" = :machineId AND "chartId" = :chartId ' - : '') + - (query.afterId !== undefined - ? ' AND "id" > :afterId::bigint ' - : '') + - (query.afterAndIncludingId !== undefined - ? ' AND "id" >= :afterAndIncludingId::bigint ' - : '') + - (query.beforeId !== undefined - ? ' AND "id" < :beforeId::bigint ' - : '') + - (query.beforeAndIncludingId !== undefined - ? ' AND "id" <= :beforeAndIncludingId::bigint ' - : '') + - (query.updatedAfterAndIncluding !== undefined - ? ' AND "timestamp" >= to_timestamp(:updatedAfterAndIncluding::decimal / 1000) ' - : '') + - (query.updatedBeforeAndIncluding !== undefined - ? ' AND "timestamp" <= to_timestamp(:updatedBeforeAndIncluding::decimal / 1000) ' - : '') + - 'ORDER BY "id" ' + - (query.order ?? 'ASC') + - (query.offset !== undefined ? ' OFFSET :offset' : '') + - (query.limit !== undefined ? ' LIMIT :limit' : ''), - { - machineId: query.ref?.machineId, - chartId: query.ref?.chartId, - afterId: query.afterId, - afterAndIncludingId: query.afterAndIncludingId, - beforeId: query.beforeId, - beforeAndIncludingId: query.beforeAndIncludingId, - updatedAfterAndIncluding: query.updatedAfterAndIncluding, - updatedBeforeAndIncluding: query.updatedBeforeAndIncluding, - offset: query.offset, - limit: query.limit, - }, - ), + result = await this.connection.query( + 'SELECT ' + + this.journalEntrySqlSelectFields + + 'FROM "journalEntries" ' + + 'WHERE TRUE ' + + (query.ref !== undefined + ? ' AND "machineId" = $1 AND "chartId" = $2 ' + : '') + + (query.afterId !== undefined ? ' AND "id" > $3::bigint ' : '') + + (query.afterAndIncludingId !== undefined + ? ' AND "id" >= $4::bigint ' + : '') + + (query.beforeId !== undefined ? ' AND "id" < $5::bigint ' : '') + + (query.beforeAndIncludingId !== undefined + ? ' AND "id" <= $6::bigint ' + : '') + + (query.updatedAfterAndIncluding !== undefined + ? ' AND "timestamp" >= to_timestamp($7::decimal / 1000) ' + : '') + + (query.updatedBeforeAndIncluding !== undefined + ? ' AND "timestamp" <= to_timestamp($8::decimal / 1000) ' + : '') + + 'ORDER BY "id" ' + + (query.order ?? 'ASC') + + (query.offset !== undefined ? ' OFFSET $9 ' : '') + + (query.limit !== undefined ? ' LIMIT $10 ' : ''), + [ + query.ref?.machineId, // $1 + query.ref?.chartId, // $2 + query.afterId, // $3 + query.afterAndIncludingId, // $4 + query.beforeId, // $5 + query.beforeAndIncludingId, // $6 + query.updatedAfterAndIncluding, // $7 + query.updatedBeforeAndIncluding, // $8 + query.offset, // $9 + query.limit, // $10 + ], ); } @@ -411,20 +394,15 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { public async readFullState( ref: ChartReference, ): Promise { - const result = await this.readConnection.query( - bind( - 'SELECT ' + - this.fullStateEntrySqlSelectFields + - 'FROM "fullJournalStates" ' + - 'WHERE "machineId" = :machineId AND "chartId" = :chartId ', - { - machineId: ref.machineId, - chartId: ref.chartId, - }, - ), + const result = await this.connection.query( + 'SELECT ' + + this.fullStateEntrySqlSelectFields + + 'FROM "fullJournalStates" ' + + 'WHERE "machineId" = :machineId AND "chartId" = :chartId ', + [ref.machineId, ref.chartId], ); - if (!result.rowCount) { + if (!result.affectedRows) { return null; } @@ -441,7 +419,7 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { return []; } - result = await this.readConnection.query( + result = await this.connection.query( 'SELECT ' + this.fullStateEntrySqlSelectFields + 'FROM "fullJournalStates" ' + @@ -449,8 +427,8 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { query .map( ({ machineId, chartId }) => - `(${this.readConnection.escapeLiteral(machineId)}, ` + - `${this.readConnection.escapeLiteral(chartId)})`, + `(${this.escapeLiteral(machineId)}, ` + + `${this.escapeLiteral(chartId)})`, ) .join(', ') + ') ' + @@ -458,67 +436,61 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { 'ON "machineId" = "queryMachineId" AND "chartId" = "queryChartId" ', ); } else { - result = await this.readConnection.query( - bind( - 'SELECT ' + - this.fullStateEntrySqlSelectFields + - 'FROM "fullJournalStates" ' + - 'WHERE TRUE ' + - (query.ref !== undefined && query.machineId === undefined - ? ' AND "machineId" = :machineId AND "chartId" = :chartId ' - : '') + - (query.parentRef !== undefined - ? ' AND "parentMachineId" = :parentMachineId AND "parentChartId" = :parentChartId ' - : '') + - // In case of both machineId and ref, ref takes precedence - (query.machineId !== undefined && query.ref === undefined - ? ' AND "machineId" = :machineId ' - : '') + - (query.afterId !== undefined - ? ' AND "id" > :afterId::bigint ' - : '') + - (query.afterAndIncludingId !== undefined - ? ' AND "id" >= :afterAndIncludingId::bigint ' - : '') + - (query.beforeId !== undefined - ? ' AND "id" < :beforeId::bigint ' - : '') + - (query.beforeAndIncludingId !== undefined - ? ' AND "id" <= :beforeAndIncludingId::bigint ' - : '') + - (query.createdAfterAndIncluding !== undefined - ? ' AND "created" >= to_timestamp(:createdAfterAndIncluding::decimal / 1000) ' - : '') + - (query.createdBeforeAndIncluding !== undefined - ? ' AND "created" <= to_timestamp(:createdBeforeAndIncluding::decimal / 1000) ' - : '') + - (query.updatedAfterAndIncluding !== undefined - ? ' AND "timestamp" >= to_timestamp(:updatedAfterAndIncluding::decimal / 1000) ' - : '') + - (query.updatedBeforeAndIncluding !== undefined - ? ' AND "timestamp" <= to_timestamp(:updatedBeforeAndIncluding::decimal / 1000) ' - : '') + - 'ORDER BY "id" ' + - (query.order ?? 'ASC') + - (query.offset !== undefined ? ' OFFSET :offset' : '') + - (query.limit !== undefined ? ' LIMIT :limit' : ''), - { - machineId: query.ref?.machineId ?? query.machineId, - chartId: query.ref?.chartId, - parentMachineId: query.parentRef?.machineId, - parentChartId: query.parentRef?.chartId, - afterId: query.afterId, - afterAndIncludingId: query.afterAndIncludingId, - beforeId: query.beforeId, - beforeAndIncludingId: query.beforeAndIncludingId, - createdAfterAndIncluding: query.createdAfterAndIncluding, - createdBeforeAndIncluding: query.createdBeforeAndIncluding, - updatedAfterAndIncluding: query.updatedAfterAndIncluding, - updatedBeforeAndIncluding: query.updatedBeforeAndIncluding, - offset: query.offset, - limit: query.limit, - }, - ), + result = await this.connection.query( + 'SELECT ' + + this.fullStateEntrySqlSelectFields + + 'FROM "fullJournalStates" ' + + 'WHERE TRUE ' + + (query.ref !== undefined && query.machineId === undefined + ? ' AND "machineId" = $1 AND "chartId" = $2 ' + : '') + + (query.parentRef !== undefined + ? ' AND "parentMachineId" = $3 AND "parentChartId" = $4 ' + : '') + + // In case of both machineId and ref, ref takes precedence + (query.machineId !== undefined && query.ref === undefined + ? ' AND "machineId" = $5 ' + : '') + + (query.afterId !== undefined ? ' AND "id" > $6::bigint ' : '') + + (query.afterAndIncludingId !== undefined + ? ' AND "id" >= $7::bigint ' + : '') + + (query.beforeId !== undefined ? ' AND "id" < $8::bigint ' : '') + + (query.beforeAndIncludingId !== undefined + ? ' AND "id" <= $9::bigint ' + : '') + + (query.createdAfterAndIncluding !== undefined + ? ' AND "created" >= to_timestamp($10::decimal / 1000) ' + : '') + + (query.createdBeforeAndIncluding !== undefined + ? ' AND "created" <= to_timestamp($11::decimal / 1000) ' + : '') + + (query.updatedAfterAndIncluding !== undefined + ? ' AND "timestamp" >= to_timestamp($12::decimal / 1000) ' + : '') + + (query.updatedBeforeAndIncluding !== undefined + ? ' AND "timestamp" <= to_timestamp($13::decimal / 1000) ' + : '') + + 'ORDER BY "id" ' + + (query.order ?? 'ASC') + + (query.offset !== undefined ? ' OFFSET $13 ' : '') + + (query.limit !== undefined ? ' LIMIT $14 ' : ''), + [ + query.ref?.machineId ?? query.machineId, // $1 + query.ref?.chartId, // $2 + query.parentRef?.machineId, // $3 + query.parentRef?.chartId, // $4 + query.afterId, // $5 + query.afterAndIncludingId, // $6 + query.beforeId, // $7 + query.beforeAndIncludingId, // $8 + query.createdAfterAndIncluding, // $9 + query.createdBeforeAndIncluding, // $10 + query.updatedAfterAndIncluding, // $11 + query.updatedBeforeAndIncluding, // $12 + query.offset, // $13 + query.limit, // $14 + ], ); } @@ -531,37 +503,29 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { * @returns Number of deleted records */ public async deleteByChart(ref: ChartReference): Promise { - const fullStateResult = await this.updateConnection.query( - bind( - 'DELETE FROM "fullJournalStates" ' + - 'WHERE "machineId"=:machineId AND "chartId"=:chartId', - { - machineId: ref.machineId, - chartId: ref.chartId, - }, - ), + const fullStateResult = await this.connection.query( + 'DELETE FROM "fullJournalStates" ' + + 'WHERE "machineId"=$1 AND "chartId"=$2', + [ref.machineId, ref.chartId], ); - const journalEntryResult = await this.updateConnection.query( - bind( - 'DELETE FROM "journalEntries" ' + - 'WHERE "machineId"=:machineId AND "chartId"=:chartId', - { - machineId: ref.machineId, - chartId: ref.chartId, - }, - ), + const journalEntryResult = await this.connection.query( + 'DELETE FROM "journalEntries" ' + 'WHERE "machineId"=$1 AND "chartId"=$2', + [ref.machineId, ref.chartId], ); - return fullStateResult.rowCount + journalEntryResult.rowCount; + return ( + (fullStateResult.affectedRows ?? 0) + + (journalEntryResult.affectedRows ?? 0) + ); } public async getCurrentTime(): Promise { - const result = await this.readConnection.query<{ time: number }>( + const result = await this.connection.query<{ time: number }>( 'SELECT extract(epoch from transaction_timestamp()) * 1000 AS "time"', ); - if (!result.rowCount) { + if (!result.affectedRows) { throw new Error('Failed to read current time from database'); } diff --git a/packages/journal-pglite/tsconfig.json b/packages/journal-pglite/tsconfig.json new file mode 100644 index 00000000..cf21a7e1 --- /dev/null +++ b/packages/journal-pglite/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "rootDir": "./src", + "outDir": "./lib" + }, + "include": ["src"], + "exclude": ["node_modules", "lib", "src/**/*.test.*"] +} From 4ad3018ac5f321a6d9f95433308607b8baa359aa Mon Sep 17 00:00:00 2001 From: Juha Mustonen Date: Tue, 29 Jul 2025 17:56:33 +0300 Subject: [PATCH 03/14] Split migrations --- .../PGliteJournalPersistenceAdapter.test.ts | 12 +++++++ .../src/PGliteJournalPersistenceAdapter.ts | 8 +++-- ...01-initial.sql => 001-journal-entries.sql} | 35 ------------------ .../migrations/002-journal-chart-index.sql | 13 +++++++ .../migrations/003-full-journal-states.sql | 36 +++++++++++++++++++ .../004-full-journal-chart-parent-index.sql | 13 +++++++ 6 files changed, 80 insertions(+), 37 deletions(-) create mode 100644 packages/journal-pglite/src/PGliteJournalPersistenceAdapter.test.ts rename packages/journal-pglite/src/migrations/{001-initial.sql => 001-journal-entries.sql} (53%) create mode 100644 packages/journal-pglite/src/migrations/002-journal-chart-index.sql create mode 100644 packages/journal-pglite/src/migrations/003-full-journal-states.sql create mode 100644 packages/journal-pglite/src/migrations/004-full-journal-chart-parent-index.sql diff --git a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.test.ts b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.test.ts new file mode 100644 index 00000000..89124f31 --- /dev/null +++ b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.test.ts @@ -0,0 +1,12 @@ +import { PGliteJournalPersistenceAdapter } from './PGliteJournalPersistenceAdapter'; + +describe('PGliteJournalPersistenceAdapter', () => { + it('should be defined', () => { + expect(PGliteJournalPersistenceAdapter).toBeDefined(); + }); + + it('should be able to connect to a database', async () => { + const adapter = await PGliteJournalPersistenceAdapter.connect(); + expect(adapter).toBeDefined(); + }); +}); diff --git a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts index da79fb0d..0d8ed1d5 100644 --- a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts +++ b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts @@ -69,11 +69,12 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { dbClient: pool as any, migrationsTable: 'migrations_journal', dir: path.join(__dirname, './migrations'), + singleTransaction: true, direction: 'up', log: (message) => adapter.trace({ in: 'connect', message }), // https://github.com/salsita/node-pg-migrate/issues/821 checkOrder: false, - noLock: true, + noLock: false, }); } finally { await pool.close(); @@ -323,6 +324,9 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { > { const startTime = await this.getCurrentTime(); + // TODO: Implement + return () => Promise.resolve(); + let journalEntryIdPointer = 0; let fullStateEntryIdPointer = 0; @@ -525,7 +529,7 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { 'SELECT extract(epoch from transaction_timestamp()) * 1000 AS "time"', ); - if (!result.affectedRows) { + if (!result.rows.length) { throw new Error('Failed to read current time from database'); } diff --git a/packages/journal-pglite/src/migrations/001-initial.sql b/packages/journal-pglite/src/migrations/001-journal-entries.sql similarity index 53% rename from packages/journal-pglite/src/migrations/001-initial.sql rename to packages/journal-pglite/src/migrations/001-journal-entries.sql index 32086f56..3c10111b 100644 --- a/packages/journal-pglite/src/migrations/001-initial.sql +++ b/packages/journal-pglite/src/migrations/001-journal-entries.sql @@ -25,43 +25,8 @@ CREATE TABLE "journalEntries" ( "actions" BYTEA DEFAULT NULL ); -CREATE INDEX "journalChartIndex" ON "journalEntries" ("machineId", "chartId"); - -CREATE TABLE "fullJournalStates" ( - "id" BIGINT, - "created" TIMESTAMP WITH TIME ZONE NOT NULL, - "timestamp" TIMESTAMP WITH TIME ZONE NOT NULL, - - "ownerId" TEXT, - "machineId" TEXT NOT NULL, - "chartId" TEXT NOT NULL, - "parentMachineId" TEXT, - "parentChartId" TEXT, - - -- Event that caused this transition, as serialized JSON - "event" BYTEA DEFAULT NULL, - - -- Full state as serialized JSON, but only mandatory for the first entry - "state" BYTEA DEFAULT NULL, - -- Context as serialized JSON,but only mandatory for the first entry - "context" BYTEA DEFAULT NULL, - - -- Actions triggered by the transition - "actions" BYTEA DEFAULT NULL, - - PRIMARY KEY("machineId", "chartId") -); - -CREATE INDEX "fullJournalChartParentIndex" - ON "fullJournalStates" ("parentMachineId", "parentChartId") - WHERE "parentChartId" IS NOT NULL; - -------------------------------------------------------------------------------- -- Down migration -------------------------------------------------------------------------------- -DROP INDEX "fullJournalChartParentIndex"; -DROP INDEX "journalChartIndex"; - -DROP TABLE "fullJournalStates"; DROP TABLE "journalEntries"; diff --git a/packages/journal-pglite/src/migrations/002-journal-chart-index.sql b/packages/journal-pglite/src/migrations/002-journal-chart-index.sql new file mode 100644 index 00000000..e132c6ae --- /dev/null +++ b/packages/journal-pglite/src/migrations/002-journal-chart-index.sql @@ -0,0 +1,13 @@ +-------------------------------------------------------------------------------- +-- Up migration +-------------------------------------------------------------------------------- + +CREATE INDEX "journalChartIndex" ON "journalEntries" ("machineId", "chartId"); + + +-------------------------------------------------------------------------------- +-- Down migration +-------------------------------------------------------------------------------- + +DROP INDEX "journalChartIndex"; + diff --git a/packages/journal-pglite/src/migrations/003-full-journal-states.sql b/packages/journal-pglite/src/migrations/003-full-journal-states.sql new file mode 100644 index 00000000..399aac1d --- /dev/null +++ b/packages/journal-pglite/src/migrations/003-full-journal-states.sql @@ -0,0 +1,36 @@ +-------------------------------------------------------------------------------- +-- Up migration +-------------------------------------------------------------------------------- + + +CREATE TABLE "fullJournalStates" ( + "id" BIGINT, + "created" TIMESTAMP WITH TIME ZONE NOT NULL, + "timestamp" TIMESTAMP WITH TIME ZONE NOT NULL, + + "ownerId" TEXT, + "machineId" TEXT NOT NULL, + "chartId" TEXT NOT NULL, + "parentMachineId" TEXT, + "parentChartId" TEXT, + + -- Event that caused this transition, as serialized JSON + "event" BYTEA DEFAULT NULL, + + -- Full state as serialized JSON, but only mandatory for the first entry + "state" BYTEA DEFAULT NULL, + -- Context as serialized JSON,but only mandatory for the first entry + "context" BYTEA DEFAULT NULL, + + -- Actions triggered by the transition + "actions" BYTEA DEFAULT NULL, + + PRIMARY KEY("machineId", "chartId") +); + +-------------------------------------------------------------------------------- +-- Down migration +-------------------------------------------------------------------------------- + +DROP TABLE "fullJournalStates"; + diff --git a/packages/journal-pglite/src/migrations/004-full-journal-chart-parent-index.sql b/packages/journal-pglite/src/migrations/004-full-journal-chart-parent-index.sql new file mode 100644 index 00000000..99c559df --- /dev/null +++ b/packages/journal-pglite/src/migrations/004-full-journal-chart-parent-index.sql @@ -0,0 +1,13 @@ +-------------------------------------------------------------------------------- +-- Up migration +-------------------------------------------------------------------------------- + +CREATE INDEX "fullJournalChartParentIndex" + ON "fullJournalStates" ("parentMachineId", "parentChartId") + WHERE "parentChartId" IS NOT NULL; + +-------------------------------------------------------------------------------- +-- Down migration +-------------------------------------------------------------------------------- + +DROP INDEX "fullJournalChartParentIndex"; From 141e8ba63d8cadcedd69b6ccce357d274afc6a36 Mon Sep 17 00:00:00 2001 From: Juha Mustonen Date: Tue, 29 Jul 2025 17:57:12 +0300 Subject: [PATCH 04/14] Make it testable --- packages/digest-pglite/jest.config.js | 6 ++++++ packages/digest-pglite/package.json | 10 +++++++++- .../src/PGliteDigestPersistenceAdapter.test.ts | 8 -------- 3 files changed, 15 insertions(+), 9 deletions(-) create mode 100644 packages/digest-pglite/jest.config.js diff --git a/packages/digest-pglite/jest.config.js b/packages/digest-pglite/jest.config.js new file mode 100644 index 00000000..22d67952 --- /dev/null +++ b/packages/digest-pglite/jest.config.js @@ -0,0 +1,6 @@ +const baseConfig = require('../jestconfig.base'); + +module.exports = { + ...baseConfig, + rootDir: 'src', +}; diff --git a/packages/digest-pglite/package.json b/packages/digest-pglite/package.json index 1e15f64d..2a7ee548 100644 --- a/packages/digest-pglite/package.json +++ b/packages/digest-pglite/package.json @@ -24,8 +24,16 @@ }, "devDependencies": { "cpx-fixed": "^1.6.0", + "@swc/cli": "^0.1.57", + "@swc/core": "^1.2.223", + "@swc/jest": "^0.2.22", + "@types/node": "^16.7.10", + "@types/jest": "^27.0.1", "jest": "^28.1.2", "ts-node": "^10.7.0", - "typescript": "^4.7.4" + "prettier": "^2.3.2", + "typescript": "^4.7.4", + "eslint": "^8.25.0", + "eslint-config-prettier": "^8.5.0" } } \ No newline at end of file diff --git a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts index c7917457..916ccc66 100644 --- a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts +++ b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts @@ -9,12 +9,4 @@ describe('PGliteDigestPersistenceAdapter', () => { const adapter = await PGliteDigestPersistenceAdapter.connect(); expect(adapter).toBeDefined(); }); - - it('should run migrations', async () => { - const adapter = await PGliteDigestPersistenceAdapter.connect(); - expect(adapter).toBeDefined(); - - const result = await adapter.queryDigests(); - expect(result).toBeDefined(); - }); }); From 5e164e16416f92c05802bbc687bbbe83898548c5 Mon Sep 17 00:00:00 2001 From: Juha Mustonen Date: Tue, 29 Jul 2025 18:02:29 +0300 Subject: [PATCH 05/14] Passing tests with partially disabled functionality --- packages/core-pglite/src/PGlitePersistenceAdapter.ts | 2 +- packages/digest-pglite/package.json | 2 +- .../digest-pglite/src/PGliteDigestPersistenceAdapter.ts | 9 +++++++-- packages/journal-pglite/package.json | 2 +- .../src/PGliteJournalPersistenceAdapter.ts | 2 +- 5 files changed, 11 insertions(+), 6 deletions(-) diff --git a/packages/core-pglite/src/PGlitePersistenceAdapter.ts b/packages/core-pglite/src/PGlitePersistenceAdapter.ts index dbf38f1d..fd143ce9 100644 --- a/packages/core-pglite/src/PGlitePersistenceAdapter.ts +++ b/packages/core-pglite/src/PGlitePersistenceAdapter.ts @@ -68,7 +68,7 @@ type PGliteDeferredEventRow = { */ export class PGlitePersistenceAdapter extends PersistenceAdapter { public readonly component = 'persistence'; - public readonly type = 'pg'; + public readonly type = 'pglite'; public constructor( public readonly pool: PGlite, diff --git a/packages/digest-pglite/package.json b/packages/digest-pglite/package.json index 2a7ee548..ae292c9d 100644 --- a/packages/digest-pglite/package.json +++ b/packages/digest-pglite/package.json @@ -1,5 +1,5 @@ { - "name": "digest-pglite", + "name": "@samihult/xjog-digest-pglite", "version": "0.0.1", "description": "Digest package with PGlite backend", "author": "Juha Mustonen ", diff --git a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts index 06059d14..c655c13f 100644 --- a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts +++ b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts @@ -21,7 +21,7 @@ import { PGliteDigestRow } from './PGliteDigestRow'; */ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { public readonly component = 'digest/persistence'; - public readonly type = 'pg'; + public readonly type = 'pglite'; private readonly stopObservingNewDigestEntries: Promise<() => Promise>; @@ -51,10 +51,11 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { migrationsTable: 'migrations_digest', dir: path.join(__dirname, './migrations'), direction: 'up', + singleTransaction: true, log: (message) => adapter.trace({ in: 'connect', message }), // https://github.com/salsita/node-pg-migrate/issues/821 checkOrder: false, - noLock: true, + noLock: false, }); } finally { await pool.close(); @@ -395,6 +396,10 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { private async startObservingNewDigestEntries(): Promise<() => Promise> { const channel = 'new-digest-entry'; + + // TODO: Implement + return () => Promise.resolve(); + const digestSubscriber = createSubscriber(this.listenerConfig); // Received a notification of a new journal entry diff --git a/packages/journal-pglite/package.json b/packages/journal-pglite/package.json index 03926d2e..8ab69469 100644 --- a/packages/journal-pglite/package.json +++ b/packages/journal-pglite/package.json @@ -1,5 +1,5 @@ { - "name": "journal-pglite", + "name": "@samihult/xjog-journal-pglite", "version": "0.0.1", "description": "> TODO: description", "author": "Juha Mustonen ", diff --git a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts index 0d8ed1d5..68679ada 100644 --- a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts +++ b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts @@ -30,7 +30,7 @@ export type PGliteJournalPersistenceAdapterOptions = { */ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { public readonly component = 'journal/persistence'; - public readonly type = 'pg'; + public readonly type = 'pglite'; private readonly stopObservingNewJournalEntries: Promise<() => Promise>; From ef1892ca3d65daa95701c9132c40fce033369ea7 Mon Sep 17 00:00:00 2001 From: Juha Mustonen Date: Tue, 29 Jul 2025 18:17:00 +0300 Subject: [PATCH 06/14] Lint and test subscription --- packages/digest-pglite/package.json | 1 + .../src/PGliteDigestPersistenceAdapter.ts | 10 ++++++++-- packages/journal-pglite/package.json | 1 + 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/packages/digest-pglite/package.json b/packages/digest-pglite/package.json index ae292c9d..c4878513 100644 --- a/packages/digest-pglite/package.json +++ b/packages/digest-pglite/package.json @@ -15,6 +15,7 @@ "scripts": { "build": "yarn build:bin ; yarn build:files", "build:bin": "tsc", + "lint": "eslint src/**/*.ts && prettier --check src", "build:files": "cpx-fixed 'src/**/*.sql' lib", "clean": "rm -rf node_modules lib", "test": "NODE_OPTIONS='--experimental-vm-modules' jest" diff --git a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts index c655c13f..a69800ce 100644 --- a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts +++ b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts @@ -398,8 +398,13 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { const channel = 'new-digest-entry'; // TODO: Implement - return () => Promise.resolve(); + //return () => Promise.resolve(); + this.pool.listen(channel, (payload: string) => { + this.newDigestEntriesSubject.next(JSON.parse(payload) as ChartReference); + }); + + /* const digestSubscriber = createSubscriber(this.listenerConfig); // Received a notification of a new journal entry @@ -412,9 +417,10 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { }); digestSubscriber.connect().then(() => digestSubscriber.listenTo(channel)); + */ return async () => { - await digestSubscriber.close(); + await this.pool.unlisten(channel); }; } diff --git a/packages/journal-pglite/package.json b/packages/journal-pglite/package.json index 8ab69469..62f23a58 100644 --- a/packages/journal-pglite/package.json +++ b/packages/journal-pglite/package.json @@ -15,6 +15,7 @@ "scripts": { "build": "yarn build:bin ; yarn build:files", "build:bin": "tsc", + "lint": "eslint src/**/*.ts && prettier --check src", "build:files": "cpx-fixed 'src/**/*.sql' lib", "clean": "rm -rf node_modules lib", "test": "NODE_OPTIONS='--experimental-vm-modules' jest" From 43a6a59dc16aea795662c5548d75b6bdbce9245e Mon Sep 17 00:00:00 2001 From: Juha Mustonen Date: Tue, 29 Jul 2025 18:53:26 +0300 Subject: [PATCH 07/14] Testing subscription NOTE: PGlite may have bug where channel name cannot contain dashes --- .../src/PGliteDigestPersistenceAdapter.ts | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts index a69800ce..41bd325f 100644 --- a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts +++ b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts @@ -2,7 +2,6 @@ import path from 'path'; import { ChartReference } from '@samihult/xjog-util'; import { PGlite, PGliteOptions } from '@electric-sql/pglite'; import migrationRunner from 'node-pg-migrate'; -import createSubscriber from 'pg-listen'; import { DigestPersistenceAdapter, @@ -40,7 +39,9 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { * constructor. */ static async connect( - poolConfiguration: PGliteOptions = {}, + poolConfiguration: PGliteOptions = { + debug: 1, + }, ): Promise { const pool = await PGlite.create(poolConfiguration); const adapter = new PGliteDigestPersistenceAdapter(poolConfiguration, pool); @@ -85,7 +86,7 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { ') ON CONFLICT ( ' + ' "machineId", "chartId", "key"' + ') DO UPDATE SET ' + - ' value = :value, timestamp = transaction_timestamp() ', + ' value = $4, timestamp = transaction_timestamp() ', [ref.machineId, ref.chartId, key, value], ); @@ -97,7 +98,7 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { ): Promise { const payload = JSON.stringify(ref); - await this.pool.query("SELECT pg_notify('new-digest-entry', $1::text)", [ + await this.pool.query("SELECT pg_notify('new_digest_entry', $1::text)", [ payload, ]); } @@ -206,6 +207,9 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { return ['', {}]; } + // TODO: Implement + return ['', {}]; + let queryString = ''; const bindings: Record = {}; @@ -395,10 +399,7 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { } private async startObservingNewDigestEntries(): Promise<() => Promise> { - const channel = 'new-digest-entry'; - - // TODO: Implement - //return () => Promise.resolve(); + const channel = 'new_digest_entry'; this.pool.listen(channel, (payload: string) => { this.newDigestEntriesSubject.next(JSON.parse(payload) as ChartReference); From 40548c732c92d892e3a3c27b5e299bf056d13506 Mon Sep 17 00:00:00 2001 From: Juha Mustonen Date: Tue, 29 Jul 2025 18:56:41 +0300 Subject: [PATCH 08/14] Remove debug --- packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts index 41bd325f..3a57e813 100644 --- a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts +++ b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts @@ -39,9 +39,7 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { * constructor. */ static async connect( - poolConfiguration: PGliteOptions = { - debug: 1, - }, + poolConfiguration: PGliteOptions = {}, ): Promise { const pool = await PGlite.create(poolConfiguration); const adapter = new PGliteDigestPersistenceAdapter(poolConfiguration, pool); From 9b06bc5c4772dab7f70d0a73b2c95d0867607f17 Mon Sep 17 00:00:00 2001 From: Juha Mustonen Date: Wed, 30 Jul 2025 09:52:18 +0300 Subject: [PATCH 09/14] Improve tests --- .../PGliteDigestPersistenceAdapter.test.ts | 34 ++++++++++++++++++- .../src/PGliteDigestPersistenceAdapter.ts | 24 ++----------- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts index 916ccc66..59a73437 100644 --- a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts +++ b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts @@ -1,12 +1,44 @@ +import { ChartReference } from '@samihult/xjog-util'; import { PGliteDigestPersistenceAdapter } from './PGliteDigestPersistenceAdapter'; describe('PGliteDigestPersistenceAdapter', () => { + const chartReference: ChartReference = { + machineId: 'test', + chartId: 'test', + }; + it('should be defined', () => { expect(PGliteDigestPersistenceAdapter).toBeDefined(); }); - it('should be able to connect to a database', async () => { + it('should be able to connect to a migrated database', async () => { const adapter = await PGliteDigestPersistenceAdapter.connect(); expect(adapter).toBeDefined(); }); + + it('should read digest', async () => { + const adapter = await PGliteDigestPersistenceAdapter.connect(); + await adapter.record(chartReference, { foo: 'bar' }); + + const result = await adapter.readDigest(chartReference, 'foo'); + expect(result).toMatchObject({ + key: 'foo', + value: 'bar', + ref: chartReference, + created: expect.any(Number), + timestamp: expect.any(Number), + }); + }); + + it('should clear digest', async () => { + const testAdapter = await PGliteDigestPersistenceAdapter.connect(); + + await testAdapter.record(chartReference, { foo: 'bar' }); + expect( + testAdapter.readDigest(chartReference, 'foo'), + ).resolves.toBeDefined(); + + await testAdapter.clear(chartReference, ['foo']); + expect(testAdapter.readDigest(chartReference, 'foo')).resolves.toBeNull(); + }); }); diff --git a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts index 3a57e813..7b683c48 100644 --- a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts +++ b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts @@ -57,7 +57,7 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { noLock: false, }); } finally { - await pool.close(); + // Do not close the pool here, it will be closed by the adapter } return adapter; @@ -124,7 +124,7 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { private readonly digestEntrySqlSelectFields = 'extract(epoch from "created") * 1000 AS "created", ' + 'extract(epoch from "timestamp") * 1000 AS "timestamp", ' + - '"machineId", "chartId", "key, "value" '; + '"machineId", "chartId", "key", "value" '; public async readDigest( ref: ChartReference, @@ -134,7 +134,7 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { 'SELECT ' + this.digestEntrySqlSelectFields + 'FROM "digests" ' + - 'WHERE "machineId" = $1 AND "chartId" = $2 AND key = $3 ', + 'WHERE "machineId" = $1 AND "chartId" = $2 AND "key" = $3 ', [ref.machineId, ref.chartId, key], ); @@ -205,9 +205,6 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { return ['', {}]; } - // TODO: Implement - return ['', {}]; - let queryString = ''; const bindings: Record = {}; @@ -403,21 +400,6 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { this.newDigestEntriesSubject.next(JSON.parse(payload) as ChartReference); }); - /* - const digestSubscriber = createSubscriber(this.listenerConfig); - - // Received a notification of a new journal entry - digestSubscriber.notifications.on(channel, async (ref: ChartReference) => { - this.newDigestEntriesSubject.next(ref); - }); - - digestSubscriber.events.on('error', (error) => { - this.newDigestEntriesSubject.error(error); - }); - - digestSubscriber.connect().then(() => digestSubscriber.listenTo(channel)); - */ - return async () => { await this.pool.unlisten(channel); }; From 5200a61593e70e30c46e3594df5f23b4e9bf8691 Mon Sep 17 00:00:00 2001 From: Juha Mustonen Date: Wed, 30 Jul 2025 10:01:37 +0300 Subject: [PATCH 10/14] Limited support for query filter --- .../src/PGliteDigestPersistenceAdapter.test.ts | 12 ++++++++++++ .../src/PGliteDigestPersistenceAdapter.ts | 9 +++++---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts index 59a73437..313eb0c7 100644 --- a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts +++ b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts @@ -41,4 +41,16 @@ describe('PGliteDigestPersistenceAdapter', () => { await testAdapter.clear(chartReference, ['foo']); expect(testAdapter.readDigest(chartReference, 'foo')).resolves.toBeNull(); }); + + it('should filter digests', async () => { + const testAdapter = await PGliteDigestPersistenceAdapter.connect(); + await testAdapter.record(chartReference, { foo: 'bar' }); + await testAdapter.record(chartReference, { foo: 'baz' }); + + const result = await testAdapter.queryDigests(); + expect(result).toHaveLength(1); + expect(result[0].chartId).toBe(chartReference.chartId); + expect(result[0].machineId).toBe(chartReference.machineId); + expect(result[0].timestamp).toBeDefined(); + }); }); diff --git a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts index 7b683c48..47a95b62 100644 --- a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts +++ b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts @@ -185,10 +185,11 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { (digestQuery?.offset !== undefined ? ' OFFSET $3 ' : '') + (digestQuery?.limit !== undefined ? ' LIMIT $4 ' : ''), [ - digestQuery?.machineId, - digestQuery?.chartId, - digestQuery?.offset, - digestQuery?.limit, + // Only add bindings if they are defined, PGlite does not allow non-used bindings + ...(digestQuery?.machineId ? [digestQuery.machineId] : []), + ...(digestQuery?.chartId ? [digestQuery.chartId] : []), + ...(digestQuery?.offset ? [digestQuery.offset] : []), + ...(digestQuery?.limit ? [digestQuery.limit] : []), // ...filterBindings, // TODO --^ ], From 172457e2e0220377b1a6d23154bc9bbd1bc75b4b Mon Sep 17 00:00:00 2001 From: Juha Mustonen Date: Wed, 30 Jul 2025 10:07:11 +0300 Subject: [PATCH 11/14] Improve note --- packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts index 47a95b62..8cd5e2c2 100644 --- a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts +++ b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts @@ -191,7 +191,7 @@ export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { ...(digestQuery?.offset ? [digestQuery.offset] : []), ...(digestQuery?.limit ? [digestQuery.limit] : []), // ...filterBindings, - // TODO --^ + // TODO: Implement filterBindings ], ); From 1803f1195dd16a683df8e1c9122d9f9cf4de3bc8 Mon Sep 17 00:00:00 2001 From: Juha Mustonen Date: Wed, 30 Jul 2025 10:37:01 +0300 Subject: [PATCH 12/14] Initial testing and support --- .../PGliteJournalPersistenceAdapter.test.ts | 56 +++++++++++++++++++ .../src/PGliteJournalPersistenceAdapter.ts | 36 ++++++++++-- 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.test.ts b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.test.ts index 89124f31..49bc3a22 100644 --- a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.test.ts +++ b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.test.ts @@ -1,3 +1,4 @@ +import { XJogStateChangeAction } from '@samihult/xjog-util'; import { PGliteJournalPersistenceAdapter } from './PGliteJournalPersistenceAdapter'; describe('PGliteJournalPersistenceAdapter', () => { @@ -9,4 +10,59 @@ describe('PGliteJournalPersistenceAdapter', () => { const adapter = await PGliteJournalPersistenceAdapter.connect(); expect(adapter).toBeDefined(); }); + + it('should read entry', async () => { + const adapter = await PGliteJournalPersistenceAdapter.connect(); + const entry = await adapter.readEntry(1); + expect(entry).toBeNull(); + }); + + it('should read full state', async () => { + const adapter = await PGliteJournalPersistenceAdapter.connect(); + const ownerId = 'test'; + const ref = { + machineId: 'test', + chartId: 'test', + }; + const parentRef = null; + const event = { + type: 'test', + }; + const oldState = { + foo: 'bar', + }; + const oldContext = { + foo: 'bar', + }; + const newState = { + foo: 'bar', + }; + const newContext = {}; + const actions: XJogStateChangeAction[] = []; + const cid = 'test'; + + await adapter.record( + ownerId, + ref, + parentRef, + event, + oldState, + oldContext, + newState, + newContext, + actions, + cid, + ); + /* const fullState = await adapter.readFullState(ref); + expect(fullState).toMatchObject({ + ownerId, + ref, + parentRef, + event, + oldState, + oldContext, + newState, + newContext, + }); */ + }); }); diff --git a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts index 68679ada..ef4de55e 100644 --- a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts +++ b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts @@ -1,7 +1,9 @@ -import { ChartReference } from '@samihult/xjog-util'; +import { EventObject, StateValue } from 'xstate'; +import { ChartReference, XJogStateChangeAction } from '@samihult/xjog-util'; import migrationRunner from 'node-pg-migrate'; import path from 'path'; import createSubscriber from 'pg-listen'; +import { Operation } from 'rfc6902'; import { FullStateEntry, @@ -77,7 +79,7 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { noLock: false, }); } finally { - await pool.close(); + // Do not close the pool here, it will be closed by the adapter } return adapter; @@ -402,11 +404,11 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { 'SELECT ' + this.fullStateEntrySqlSelectFields + 'FROM "fullJournalStates" ' + - 'WHERE "machineId" = :machineId AND "chartId" = :chartId ', + 'WHERE "machineId" = $1 AND "chartId" = $2 ', [ref.machineId, ref.chartId], ); - if (!result.affectedRows) { + if (!result.rows.length) { return null; } @@ -536,6 +538,32 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { return Number(result.rows[0].time); } + public async record( + ownerId: string, + ref: ChartReference, + parentRef: ChartReference | null, + event: EventObject | null, + oldState: StateValue | null, + oldContext: any | null, + newState: StateValue | null, + newContext: unknown | null, + actions: XJogStateChangeAction[] | null, + cid?: string, + ): Promise { + // TODO: Figure out what data to pass in here + const entry = await this.insertEntry({ + ref, + event, + stateDelta: oldState + ? [{ op: 'replace', path: '', value: oldState }] + : [], + contextDelta: oldContext + ? [{ op: 'replace', path: '', value: oldContext }] + : [], + actions: actions ?? [], + }); + } + static parseSqlJournalRow(row: PGliteJournalRow): JournalEntry { return { id: Number(row.id), From 386b015b11827f5f88e1d814a5759ad7442d0205 Mon Sep 17 00:00:00 2001 From: Juha Mustonen Date: Wed, 30 Jul 2025 11:35:15 +0300 Subject: [PATCH 13/14] Fix and test full journal entry --- .../PGliteJournalPersistenceAdapter.test.ts | 48 +++++++---- .../src/PGliteJournalPersistenceAdapter.ts | 84 +++++++++---------- 2 files changed, 68 insertions(+), 64 deletions(-) diff --git a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.test.ts b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.test.ts index 49bc3a22..938ddabe 100644 --- a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.test.ts +++ b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.test.ts @@ -19,27 +19,29 @@ describe('PGliteJournalPersistenceAdapter', () => { it('should read full state', async () => { const adapter = await PGliteJournalPersistenceAdapter.connect(); - const ownerId = 'test'; + const ownerId = 'ownerId'; const ref = { - machineId: 'test', - chartId: 'test', + machineId: 'machineId', + chartId: 'chartId', }; const parentRef = null; const event = { - type: 'test', + type: 'event', }; const oldState = { - foo: 'bar', + state: 'old', }; const oldContext = { - foo: 'bar', + ctx: 'old', }; const newState = { - foo: 'bar', + state: 'new', + }; + const newContext = { + ctx: 'new', }; - const newContext = {}; const actions: XJogStateChangeAction[] = []; - const cid = 'test'; + const cid = 'cid'; await adapter.record( ownerId, @@ -53,16 +55,26 @@ describe('PGliteJournalPersistenceAdapter', () => { actions, cid, ); - /* const fullState = await adapter.readFullState(ref); + const fullState = await adapter.readFullState(ref); expect(fullState).toMatchObject({ + id: expect.any(Number), + created: expect.any(Number), + timestamp: expect.any(Number), ownerId, - ref, - parentRef, - event, - oldState, - oldContext, - newState, - newContext, - }); */ + event: { + type: 'event', + }, + state: { + state: 'new', + }, + context: { + ctx: 'new', + }, + actions: [], + ref: { + machineId: 'machineId', + chartId: 'chartId', + }, + }); }); }); diff --git a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts index ef4de55e..ca5090f2 100644 --- a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts +++ b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts @@ -2,8 +2,6 @@ import { EventObject, StateValue } from 'xstate'; import { ChartReference, XJogStateChangeAction } from '@samihult/xjog-util'; import migrationRunner from 'node-pg-migrate'; import path from 'path'; -import createSubscriber from 'pg-listen'; -import { Operation } from 'rfc6902'; import { FullStateEntry, @@ -151,26 +149,26 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { ') ON CONFLICT (' + ' "machineId", "chartId" ' + ') DO UPDATE SET ' + - ' "id" = :id, "timestamp" = to_timestamp(:timestamp::decimal / 1000), ' + - ' "event" = :event, "state" = :state, "context" = :context, ' + - ' "actions" = :actions ' + - 'WHERE "fullJournalStates"."id" < :id ', + ' "id" = $1, "timestamp" = to_timestamp($2::decimal / 1000), ' + + ' "event" = $8, "state" = $9, "context" = $10, ' + + ' "actions" = $11 ' + + 'WHERE "fullJournalStates"."id" < $1 ', [ - entry.id, - entry.timestamp, - entry.ownerId, - entry.ref.machineId, - entry.ref.chartId, - entry.parentRef?.machineId ?? null, - entry.parentRef?.chartId ?? null, - entry.event ? Buffer.from(JSON.stringify(entry.event)) : null, - entry.state ? Buffer.from(JSON.stringify(entry.state)) : null, - entry.context ? Buffer.from(JSON.stringify(entry.context)) : null, - entry.actions ? Buffer.from(JSON.stringify(entry.actions)) : null, + entry.id, // $1 + entry.timestamp, // $2 + entry.ownerId, // $3 + entry.ref.machineId, // $4 + entry.ref.chartId, // $5 + entry.parentRef?.machineId ?? null, // $6 + entry.parentRef?.chartId ?? null, // $7 + entry.event ? Buffer.from(JSON.stringify(entry.event)) : null, // $8 + entry.state ? Buffer.from(JSON.stringify(entry.state)) : null, // $9 + entry.context ? Buffer.from(JSON.stringify(entry.context)) : null, // $10 + entry.actions ? Buffer.from(JSON.stringify(entry.actions)) : null, // $11 ], ); - if (!result.rows.length) { + if (!result.affectedRows) { throw new Error('Failed to write journal full entry'); } @@ -188,7 +186,7 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { }); await this.connection.query( - "SELECT pg_notify('new-journal-entry', $1:text)", + "SELECT pg_notify('new_journal_entry', $1:text)", [payload], ); } @@ -326,14 +324,10 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { > { const startTime = await this.getCurrentTime(); - // TODO: Implement - return () => Promise.resolve(); - let journalEntryIdPointer = 0; let fullStateEntryIdPointer = 0; - const channel = 'new-journal-entry'; - const journalSubscriber = createSubscriber(this.listenerConfig); + const channel = 'new_journal_entry'; const yieldJournalEntries = (journalEntries: JournalEntry[]) => { for (const journalEntry of journalEntries) { @@ -356,7 +350,7 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { }; // Received a notification of a new journal entry - journalSubscriber.notifications.on(channel, async () => { + this.connection.listen(channel, async () => { this.queryEntries({ afterId: journalEntryIdPointer, updatedAfterAndIncluding: startTime, @@ -378,15 +372,8 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { }); }); - journalSubscriber.events.on('error', (error) => { - this.newJournalEntriesSubject.error(error); - this.newFullStateEntriesSubject.error(error); - }); - - journalSubscriber.connect().then(() => journalSubscriber.listenTo(channel)); - return async () => { - await journalSubscriber.close(); + await this.connection.unlisten(channel); }; } @@ -394,6 +381,7 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { private readonly fullStateEntrySqlSelectFields = ' "id", extract(epoch from "created") * 1000 as "created", ' + ' extract(epoch from "timestamp") * 1000 as "timestamp", ' + + ' "ownerId", ' + ' "machineId", "chartId", "parentMachineId", "parentChartId", ' + ' "event", "state", "context", "actions" '; @@ -547,20 +535,22 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { oldContext: any | null, newState: StateValue | null, newContext: unknown | null, - actions: XJogStateChangeAction[] | null, + actions: XJogStateChangeAction[], cid?: string, ): Promise { // TODO: Figure out what data to pass in here - const entry = await this.insertEntry({ + const now = new Date().getTime(); + const entry = await this.updateFullState({ + id: 1, + created: now, + timestamp: now, + ownerId, ref, + parentRef, event, - stateDelta: oldState - ? [{ op: 'replace', path: '', value: oldState }] - : [], - contextDelta: oldContext - ? [{ op: 'replace', path: '', value: oldContext }] - : [], - actions: actions ?? [], + state: newState, + context: newContext, + actions, }); } @@ -586,6 +576,8 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { } static parseSqlFullStateRow(row: PGliteFullStateRow): FullStateEntry { + const decoder = new TextDecoder(); + return { id: Number(row.id), created: Number(row.created), @@ -605,10 +597,10 @@ export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { } : null, - event: JSON.parse(String(row.event)), - state: row.state ? JSON.parse(String(row.state)) : null, - context: row.context ? JSON.parse(String(row.context)) : null, - actions: row.actions ? JSON.parse(String(row.actions)) : null, + event: row.event ? JSON.parse(decoder.decode(row.event)) : null, + state: row.state ? JSON.parse(decoder.decode(row.state)) : null, + context: row.context ? JSON.parse(decoder.decode(row.context)) : null, + actions: row.actions ? JSON.parse(decoder.decode(row.actions)) : null, }; } } From e02146bcb66748f208fdd5c8562b7d5e8489f4f6 Mon Sep 17 00:00:00 2001 From: Juha Mustonen Date: Wed, 30 Jul 2025 11:37:01 +0300 Subject: [PATCH 14/14] Update README --- packages/digest-pglite/README.md | 12 ++---------- packages/journal-pglite/README.md | 12 ++---------- 2 files changed, 4 insertions(+), 20 deletions(-) diff --git a/packages/digest-pglite/README.md b/packages/digest-pglite/README.md index 0b6640c2..b97d783a 100644 --- a/packages/digest-pglite/README.md +++ b/packages/digest-pglite/README.md @@ -1,11 +1,3 @@ -# `digest-pglite` +# XJog PGlite Digest Persistence -> TODO: description - -## Usage - -``` -const digestPglite = require('digest-pglite'); - -// TODO: DEMONSTRATE API -``` +Package provides a PGlite persistence layer for XJog digest module. diff --git a/packages/journal-pglite/README.md b/packages/journal-pglite/README.md index 03b6e8f5..0adc3d67 100644 --- a/packages/journal-pglite/README.md +++ b/packages/journal-pglite/README.md @@ -1,11 +1,3 @@ -# `journal-pglite` +# XJog PGlite Persistence -> TODO: description - -## Usage - -``` -const journalPglite = require('journal-pglite'); - -// TODO: DEMONSTRATE API -``` +Package provides a PGlite persistence layer for XJog journaling.