diff --git a/packages/core-pglite/package.json b/packages/core-pglite/package.json index 9181e62e..b0d95a0c 100644 --- a/packages/core-pglite/package.json +++ b/packages/core-pglite/package.json @@ -22,7 +22,7 @@ "format": "prettier --write 'src/**/*.ts'", "lint": "eslint src/**/*.ts && prettier --check src", "prePublishOnly": "yarn build", - "test": "node --experimental-vm-modules node_modules/.bin/jest" + "test": "NODE_OPTIONS='--experimental-vm-modules' jest" }, "dependencies": { "@electric-sql/pglite": "^0.3.5", diff --git a/packages/core-pglite/src/PGlitePersistenceAdapter.ts b/packages/core-pglite/src/PGlitePersistenceAdapter.ts index dbf38f1d..fd143ce9 100644 --- a/packages/core-pglite/src/PGlitePersistenceAdapter.ts +++ b/packages/core-pglite/src/PGlitePersistenceAdapter.ts @@ -68,7 +68,7 @@ type PGliteDeferredEventRow = { */ export class PGlitePersistenceAdapter extends PersistenceAdapter { public readonly component = 'persistence'; - public readonly type = 'pg'; + public readonly type = 'pglite'; public constructor( public readonly pool: PGlite, diff --git a/packages/digest-pglite/README.md b/packages/digest-pglite/README.md new file mode 100644 index 00000000..b97d783a --- /dev/null +++ b/packages/digest-pglite/README.md @@ -0,0 +1,3 @@ +# XJog PGlite Digest Persistence + +Package provides a PGlite persistence layer for XJog digest module. diff --git a/packages/digest-pglite/jest.config.js b/packages/digest-pglite/jest.config.js new file mode 100644 index 00000000..22d67952 --- /dev/null +++ b/packages/digest-pglite/jest.config.js @@ -0,0 +1,6 @@ +const baseConfig = require('../jestconfig.base'); + +module.exports = { + ...baseConfig, + rootDir: 'src', +}; diff --git a/packages/digest-pglite/package.json b/packages/digest-pglite/package.json new file mode 100644 index 00000000..c4878513 --- /dev/null +++ b/packages/digest-pglite/package.json @@ -0,0 +1,40 @@ +{ + "name": "@samihult/xjog-digest-pglite", + "version": "0.0.1", + "description": "Digest package with PGlite backend", + "author": "Juha Mustonen ", + "homepage": "", + "license": "MIT", + "main": "lib/digest-pglite.js", + "directories": { + "lib": "lib" + }, + "files": [ + "lib" + ], + "scripts": { + "build": "yarn build:bin ; yarn build:files", + "build:bin": "tsc", + "lint": "eslint src/**/*.ts && prettier --check src", + "build:files": "cpx-fixed 'src/**/*.sql' lib", + "clean": "rm -rf node_modules lib", + "test": "NODE_OPTIONS='--experimental-vm-modules' jest" + }, + "dependencies": { + "@electric-sql/pglite": "^0.3.5" + }, + "devDependencies": { + "cpx-fixed": "^1.6.0", + "@swc/cli": "^0.1.57", + "@swc/core": "^1.2.223", + "@swc/jest": "^0.2.22", + "@types/node": "^16.7.10", + "@types/jest": "^27.0.1", + "jest": "^28.1.2", + "ts-node": "^10.7.0", + "prettier": "^2.3.2", + "typescript": "^4.7.4", + "eslint": "^8.25.0", + "eslint-config-prettier": "^8.5.0" + } +} \ No newline at end of file diff --git a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts new file mode 100644 index 00000000..313eb0c7 --- /dev/null +++ b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.test.ts @@ -0,0 +1,56 @@ +import { ChartReference } from '@samihult/xjog-util'; +import { PGliteDigestPersistenceAdapter } from './PGliteDigestPersistenceAdapter'; + +describe('PGliteDigestPersistenceAdapter', () => { + const chartReference: ChartReference = { + machineId: 'test', + chartId: 'test', + }; + + it('should be defined', () => { + expect(PGliteDigestPersistenceAdapter).toBeDefined(); + }); + + it('should be able to connect to a migrated database', async () => { + const adapter = await PGliteDigestPersistenceAdapter.connect(); + expect(adapter).toBeDefined(); + }); + + it('should read digest', async () => { + const adapter = await PGliteDigestPersistenceAdapter.connect(); + await adapter.record(chartReference, { foo: 'bar' }); + + const result = await adapter.readDigest(chartReference, 'foo'); + expect(result).toMatchObject({ + key: 'foo', + value: 'bar', + ref: chartReference, + created: expect.any(Number), + timestamp: expect.any(Number), + }); + }); + + it('should clear digest', async () => { + const testAdapter = await PGliteDigestPersistenceAdapter.connect(); + + await testAdapter.record(chartReference, { foo: 'bar' }); + expect( + testAdapter.readDigest(chartReference, 'foo'), + ).resolves.toBeDefined(); + + await testAdapter.clear(chartReference, ['foo']); + expect(testAdapter.readDigest(chartReference, 'foo')).resolves.toBeNull(); + }); + + it('should filter digests', async () => { + const testAdapter = await PGliteDigestPersistenceAdapter.connect(); + await testAdapter.record(chartReference, { foo: 'bar' }); + await testAdapter.record(chartReference, { foo: 'baz' }); + + const result = await testAdapter.queryDigests(); + expect(result).toHaveLength(1); + expect(result[0].chartId).toBe(chartReference.chartId); + expect(result[0].machineId).toBe(chartReference.machineId); + expect(result[0].timestamp).toBeDefined(); + }); +}); diff --git a/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts new file mode 100644 index 00000000..8cd5e2c2 --- /dev/null +++ b/packages/digest-pglite/src/PGliteDigestPersistenceAdapter.ts @@ -0,0 +1,423 @@ +import path from 'path'; +import { ChartReference } from '@samihult/xjog-util'; +import { PGlite, PGliteOptions } from '@electric-sql/pglite'; +import migrationRunner from 'node-pg-migrate'; + +import { + DigestPersistenceAdapter, + DigestEntry, + DigestEntries, + DigestQuery, + Expression, + ChartReferenceWithTimestamp, +} from '@samihult/xjog-digest-persistence'; + +import { PGliteDigestRow } from './PGliteDigestRow'; + +/** + * Use the static method `connect` to instantiate. + * @hideconstructor + */ +export class PGliteDigestPersistenceAdapter extends DigestPersistenceAdapter { + public readonly component = 'digest/persistence'; + public readonly type = 'pglite'; + + private readonly stopObservingNewDigestEntries: Promise<() => Promise>; + + public constructor( + private readonly listenerConfig: PGliteOptions, + private readonly pool: PGlite, + ) { + super(); + + this.stopObservingNewDigestEntries = this.startObservingNewDigestEntries(); + } + + /** + * Create a connection to a [PostgreSql](https://www.postgresql.org/) database + * and resolve to a JournalPersistenceAdapter that can be passed to the XJog + * constructor. + */ + static async connect( + poolConfiguration: PGliteOptions = {}, + ): Promise { + const pool = await PGlite.create(poolConfiguration); + const adapter = new PGliteDigestPersistenceAdapter(poolConfiguration, pool); + + try { + await migrationRunner({ + dbClient: pool as any, + migrationsTable: 'migrations_digest', + dir: path.join(__dirname, './migrations'), + direction: 'up', + singleTransaction: true, + log: (message) => adapter.trace({ in: 'connect', message }), + // https://github.com/salsita/node-pg-migrate/issues/821 + checkOrder: false, + noLock: false, + }); + } finally { + // Do not close the pool here, it will be closed by the adapter + } + + return adapter; + } + + public async disconnect(): Promise { + await ( + await this.stopObservingNewDigestEntries + )?.(); + await this.pool.close(); + } + + protected async upsertDigest( + ref: ChartReference, + key: string, + value: string, + ): Promise { + const result = await this.pool.query( + 'INSERT INTO "digests" ' + + '( ' + + ' "machineId", "chartId", "key", "value" ' + + ') VALUES ( ' + + ' $1, $2, $3, $4' + + ') ON CONFLICT ( ' + + ' "machineId", "chartId", "key"' + + ') DO UPDATE SET ' + + ' value = $4, timestamp = transaction_timestamp() ', + [ref.machineId, ref.chartId, key, value], + ); + + return result.affectedRows ?? 0; + } + + protected async emitDigestEntryNotification( + ref: ChartReference, + ): Promise { + const payload = JSON.stringify(ref); + + await this.pool.query("SELECT pg_notify('new_digest_entry', $1::text)", [ + payload, + ]); + } + + public async deleteDigest(ref: ChartReference, key: string): Promise { + const result = await this.pool.query( + 'DELETE FROM "digests" ' + + 'WHERE "machineId" = $1 AND "chartId" = $2 AND "key" = $3 ', + [ref.machineId, ref.chartId, key], + ); + + return result.affectedRows ?? 0; + } + + public async deleteByChart(ref: ChartReference): Promise { + const result = await this.pool.query( + 'DELETE FROM "digests" ' + 'WHERE "machineId" = $1 AND "chartId" = $2 ', + [ref.machineId, ref.chartId], + ); + + return result.affectedRows ?? 0; + } + + /** Corresponds to {@link PostgresDigestRow} */ + private readonly digestEntrySqlSelectFields = + 'extract(epoch from "created") * 1000 AS "created", ' + + 'extract(epoch from "timestamp") * 1000 AS "timestamp", ' + + '"machineId", "chartId", "key", "value" '; + + public async readDigest( + ref: ChartReference, + key: string, + ): Promise { + const result = await this.pool.query( + 'SELECT ' + + this.digestEntrySqlSelectFields + + 'FROM "digests" ' + + 'WHERE "machineId" = $1 AND "chartId" = $2 AND "key" = $3 ', + [ref.machineId, ref.chartId, key], + ); + + if (!result.rows.length) { + return null; + } + + return PGliteDigestPersistenceAdapter.parseSqlDigestRow(result.rows[0]); + } + + public async readByChart(ref: ChartReference): Promise { + const result = await this.pool.query( + 'SELECT ' + + this.digestEntrySqlSelectFields + + 'FROM "digests" ' + + 'WHERE "machineId" = $1 AND "chartId" = $2 ', + [ref.machineId, ref.chartId], + ); + + const digestEntries: DigestEntries = {}; + + for (const row of result.rows) { + digestEntries[row.machineId] = + PGliteDigestPersistenceAdapter.parseSqlDigestRow(row); + } + + return digestEntries; + } + + public async queryDigests( + digestQuery?: DigestQuery, + ): Promise { + const [filterQuery, filterBindings] = + PGliteDigestPersistenceAdapter.filterQuery(digestQuery?.query); + + const result = await this.pool.query( + 'SELECT DISTINCT "machineId", "chartId", ' + + ' MAX(extract(epoch from "timestamp") * 1000) as "timestamp" ' + + 'FROM "digests" WHERE TRUE ' + + (digestQuery?.machineId !== undefined + ? ' AND "machineId" = $1 ' + : '') + + (digestQuery?.chartId !== undefined ? ' AND "chartId" = $2 ' : '') + + (filterQuery ? `AND (${filterQuery}) ` : '') + + 'GROUP BY "machineId", "chartId" ' + + 'ORDER BY "timestamp" ' + + (digestQuery?.order ?? 'ASC') + + (digestQuery?.offset !== undefined ? ' OFFSET $3 ' : '') + + (digestQuery?.limit !== undefined ? ' LIMIT $4 ' : ''), + [ + // Only add bindings if they are defined, PGlite does not allow non-used bindings + ...(digestQuery?.machineId ? [digestQuery.machineId] : []), + ...(digestQuery?.chartId ? [digestQuery.chartId] : []), + ...(digestQuery?.offset ? [digestQuery.offset] : []), + ...(digestQuery?.limit ? [digestQuery.limit] : []), + // ...filterBindings, + // TODO: Implement filterBindings + ], + ); + + return result.rows; + } + + static filterQuery( + expression?: Expression, + prefix = 'q', + ): [string, { [key: string]: string | number }] { + if (!expression) { + return ['', {}]; + } + + let queryString = ''; + const bindings: Record = {}; + + const createBindingKey = ( + op: 'eq' | 're' | 'lt' | 'lte' | 'gt' | 'gte', + key: string, + ) => `${prefix}_${op}_${key}`; + + const keyMatchSql = (key: string, bindingKey: string): string => + key ? `AND "candidate"."key" = :key_${bindingKey} ` : ''; + + const addBindings = ( + bindingKey: string, + key: string, + pattern: string | number, + ) => { + bindings[`key_${bindingKey}`] = key; + bindings[`value_${bindingKey}`] = pattern; + }; + + const matchingSql = (conditionSql: string) => + 'EXISTS ' + + '(SELECT 1 FROM "digests" AS "candidate" ' + + 'WHERE "candidate"."machineId" = "digests"."machineId" ' + + 'AND "candidate"."chartId" = "digests"."chartId" ' + + conditionSql + + ')'; + + switch (expression.op) { + case 'eq': { + const bindingKey = createBindingKey('eq', expression.left); + addBindings(bindingKey, expression.left, expression.right); + queryString += matchingSql( + keyMatchSql(expression.left, bindingKey) + + `AND "candidate"."value" = :value_${bindingKey} `, + ); + break; + } + + case 'matches': { + const bindingKey = createBindingKey('re', expression.left); + addBindings(bindingKey, expression.left, expression.right); + queryString += matchingSql( + keyMatchSql(expression.left, bindingKey) + + `AND "candidate"."value" ~ :value_${bindingKey} `, + ); + break; + } + + // Numeric inequality + + case '<': { + const bindingKey = createBindingKey('lt', expression.left); + addBindings(bindingKey, expression.left, expression.right); + queryString += matchingSql( + keyMatchSql(expression.left, bindingKey) + + `AND "candidate"."value"::decimal < :value_${bindingKey}::decimal `, + ); + break; + } + + case '>': { + const bindingKey = createBindingKey('gt', expression.left); + addBindings(bindingKey, expression.left, expression.right); + queryString += matchingSql( + keyMatchSql(expression.left, bindingKey) + + `AND "candidate"."value"::decimal > :value_${bindingKey}::decimal `, + ); + break; + } + + case '<=': { + const bindingKey = createBindingKey('lte', expression.left); + addBindings(bindingKey, expression.left, expression.right); + queryString += matchingSql( + keyMatchSql(expression.left, bindingKey) + + `AND "candidate"."value"::decimal <= :value_${bindingKey}::decimal `, + ); + break; + } + + case '>=': { + const bindingKey = createBindingKey('gte', expression.left); + addBindings(bindingKey, expression.left, expression.right); + queryString += matchingSql( + keyMatchSql(expression.left, bindingKey) + + `AND "candidate"."value"::decimal >= :value_${bindingKey}::decimal `, + ); + break; + } + + // Timestamps + + case 'created before': { + const bindingKey = `${prefix}_crbef`; + bindings[`value_${bindingKey}`] = expression.dateTime.valueOf(); + queryString += + 'NOT ' + + matchingSql( + `AND "candidate"."created" > to_timestamp(:value_${bindingKey}::decimal / 1000) `, + ); + break; + } + + case 'updated before': { + const bindingKey = `${prefix}_udbef`; + bindings[`value_${bindingKey}`] = expression.dateTime.valueOf(); + queryString += + 'NOT ' + + matchingSql( + `AND "candidate"."timestamp" > to_timestamp(:value_${bindingKey}::decimal / 1000) `, + ); + break; + } + + case 'created after': { + const bindingKey = `${prefix}_craft`; + bindings[`value_${bindingKey}`] = expression.dateTime.valueOf(); + queryString += + 'NOT ' + + matchingSql( + `AND "candidate"."created" < to_timestamp(:value_${bindingKey}::decimal / 1000) `, + ); + break; + } + + case 'updated after': { + const bindingKey = `${prefix}_crbef`; + bindings[`value_${bindingKey}`] = expression.dateTime.valueOf(); + queryString += + 'NOT ' + + matchingSql( + `AND "candidate"."timestamp" < to_timestamp(:value_${bindingKey}::decimal / 1000) `, + ); + break; + } + + // Combinators + + case 'not': { + const [subQueryString, subQueryBindings] = + PGliteDigestPersistenceAdapter.filterQuery( + expression.operand, + prefix + '_not', + ); + queryString += `NOT (${subQueryString}) `; + Object.assign(bindings, subQueryBindings); + break; + } + + case 'and': { + const [leftQueryString, leftQueryBindings] = + PGliteDigestPersistenceAdapter.filterQuery( + expression.left, + prefix + '_and_lt', + ); + const [rightQueryString, rightQueryBindings] = + PGliteDigestPersistenceAdapter.filterQuery( + expression.right, + prefix + '_and_rt', + ); + queryString += `${leftQueryString} AND ${rightQueryString} `; + Object.assign(bindings, leftQueryBindings); + Object.assign(bindings, rightQueryBindings); + break; + } + + case 'or': { + const [leftQueryString, leftQueryBindings] = + PGliteDigestPersistenceAdapter.filterQuery( + expression.left, + prefix + '_or_lt', + ); + const [rightQueryString, rightQueryBindings] = + PGliteDigestPersistenceAdapter.filterQuery( + expression.right, + prefix + '_or_rt', + ); + queryString += `${leftQueryString} OR ${rightQueryString} `; + Object.assign(bindings, leftQueryBindings); + Object.assign(bindings, rightQueryBindings); + break; + } + } + + return [`${queryString}`, bindings]; + } + + private async startObservingNewDigestEntries(): Promise<() => Promise> { + const channel = 'new_digest_entry'; + + this.pool.listen(channel, (payload: string) => { + this.newDigestEntriesSubject.next(JSON.parse(payload) as ChartReference); + }); + + return async () => { + await this.pool.unlisten(channel); + }; + } + + static parseSqlDigestRow(row: PGliteDigestRow): DigestEntry { + return { + created: Number(row.created), + timestamp: Number(row.timestamp), + + ref: { + machineId: row.machineId, + chartId: row.chartId, + }, + + key: row.key, + value: row.value, + }; + } +} diff --git a/packages/digest-pglite/src/PGliteDigestRow.ts b/packages/digest-pglite/src/PGliteDigestRow.ts new file mode 100644 index 00000000..b734bcdd --- /dev/null +++ b/packages/digest-pglite/src/PGliteDigestRow.ts @@ -0,0 +1,11 @@ +/** + * Digest record row directly from the SQL query + */ +export type PGliteDigestRow = { + created: number; + timestamp: number; + machineId: string; + chartId: string; + key: string; + value: string; +}; diff --git a/packages/digest-pglite/src/index.ts b/packages/digest-pglite/src/index.ts new file mode 100644 index 00000000..71fd73c6 --- /dev/null +++ b/packages/digest-pglite/src/index.ts @@ -0,0 +1,2 @@ +export * from './PGliteDigestRow'; +export * from './PGliteDigestPersistenceAdapter'; diff --git a/packages/digest-pglite/src/migrations/001-initial.sql b/packages/digest-pglite/src/migrations/001-initial.sql new file mode 100644 index 00000000..041ea0b2 --- /dev/null +++ b/packages/digest-pglite/src/migrations/001-initial.sql @@ -0,0 +1,21 @@ +-------------------------------------------------------------------------------- +-- Up migration +-------------------------------------------------------------------------------- + +CREATE TABLE "digests" ( + "created" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT transaction_timestamp(), + "timestamp" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT transaction_timestamp(), + + "machineId" TEXT NOT NULL, + "chartId" TEXT NOT NULL, + "key" TEXT NOT NULL, + "value" TEXT NOT NULL, + + PRIMARY KEY ("machineId", "chartId", "key") +); + +-------------------------------------------------------------------------------- +-- Down migration +-------------------------------------------------------------------------------- + +DROP TABLE "digests"; diff --git a/packages/digest-pglite/tsconfig.json b/packages/digest-pglite/tsconfig.json new file mode 100644 index 00000000..cf21a7e1 --- /dev/null +++ b/packages/digest-pglite/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "rootDir": "./src", + "outDir": "./lib" + }, + "include": ["src"], + "exclude": ["node_modules", "lib", "src/**/*.test.*"] +} diff --git a/packages/journal-pglite/README.md b/packages/journal-pglite/README.md new file mode 100644 index 00000000..0adc3d67 --- /dev/null +++ b/packages/journal-pglite/README.md @@ -0,0 +1,3 @@ +# XJog PGlite Persistence + +Package provides a PGlite persistence layer for XJog journaling. diff --git a/packages/journal-pglite/jest.config.js b/packages/journal-pglite/jest.config.js new file mode 100644 index 00000000..22d67952 --- /dev/null +++ b/packages/journal-pglite/jest.config.js @@ -0,0 +1,6 @@ +const baseConfig = require('../jestconfig.base'); + +module.exports = { + ...baseConfig, + rootDir: 'src', +}; diff --git a/packages/journal-pglite/package.json b/packages/journal-pglite/package.json new file mode 100644 index 00000000..62f23a58 --- /dev/null +++ b/packages/journal-pglite/package.json @@ -0,0 +1,32 @@ +{ + "name": "@samihult/xjog-journal-pglite", + "version": "0.0.1", + "description": "> TODO: description", + "author": "Juha Mustonen ", + "homepage": "", + "license": "MIT", + "main": "lib/journal-pglite.js", + "directories": { + "lib": "lib" + }, + "files": [ + "lib" + ], + "scripts": { + "build": "yarn build:bin ; yarn build:files", + "build:bin": "tsc", + "lint": "eslint src/**/*.ts && prettier --check src", + "build:files": "cpx-fixed 'src/**/*.sql' lib", + "clean": "rm -rf node_modules lib", + "test": "NODE_OPTIONS='--experimental-vm-modules' jest" + }, + "dependencies": { + "@electric-sql/pglite": "^0.3.5" + }, + "devDependencies": { + "cpx-fixed": "^1.6.0", + "jest": "^28.1.2", + "ts-node": "^10.7.0", + "typescript": "^4.7.4" + } +} \ No newline at end of file diff --git a/packages/journal-pglite/src/PGliteFullStateRow.ts b/packages/journal-pglite/src/PGliteFullStateRow.ts new file mode 100644 index 00000000..c15b4c8e --- /dev/null +++ b/packages/journal-pglite/src/PGliteFullStateRow.ts @@ -0,0 +1,19 @@ +/** + * Full state entry row directly from the SQL query + */ +export type PGliteFullStateRow = { + id: number; + created: number; + timestamp: number; + + ownerId: string; + machineId: string; + chartId: string; + parentMachineId: string; + parentChartId: string; + + event: Buffer | null; + state: Buffer | null; + context: Buffer | null; + actions: Buffer | null; +}; diff --git a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.test.ts b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.test.ts new file mode 100644 index 00000000..938ddabe --- /dev/null +++ b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.test.ts @@ -0,0 +1,80 @@ +import { XJogStateChangeAction } from '@samihult/xjog-util'; +import { PGliteJournalPersistenceAdapter } from './PGliteJournalPersistenceAdapter'; + +describe('PGliteJournalPersistenceAdapter', () => { + it('should be defined', () => { + expect(PGliteJournalPersistenceAdapter).toBeDefined(); + }); + + it('should be able to connect to a database', async () => { + const adapter = await PGliteJournalPersistenceAdapter.connect(); + expect(adapter).toBeDefined(); + }); + + it('should read entry', async () => { + const adapter = await PGliteJournalPersistenceAdapter.connect(); + const entry = await adapter.readEntry(1); + expect(entry).toBeNull(); + }); + + it('should read full state', async () => { + const adapter = await PGliteJournalPersistenceAdapter.connect(); + const ownerId = 'ownerId'; + const ref = { + machineId: 'machineId', + chartId: 'chartId', + }; + const parentRef = null; + const event = { + type: 'event', + }; + const oldState = { + state: 'old', + }; + const oldContext = { + ctx: 'old', + }; + const newState = { + state: 'new', + }; + const newContext = { + ctx: 'new', + }; + const actions: XJogStateChangeAction[] = []; + const cid = 'cid'; + + await adapter.record( + ownerId, + ref, + parentRef, + event, + oldState, + oldContext, + newState, + newContext, + actions, + cid, + ); + const fullState = await adapter.readFullState(ref); + expect(fullState).toMatchObject({ + id: expect.any(Number), + created: expect.any(Number), + timestamp: expect.any(Number), + ownerId, + event: { + type: 'event', + }, + state: { + state: 'new', + }, + context: { + ctx: 'new', + }, + actions: [], + ref: { + machineId: 'machineId', + chartId: 'chartId', + }, + }); + }); +}); diff --git a/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts new file mode 100644 index 00000000..ca5090f2 --- /dev/null +++ b/packages/journal-pglite/src/PGliteJournalPersistenceAdapter.ts @@ -0,0 +1,606 @@ +import { EventObject, StateValue } from 'xstate'; +import { ChartReference, XJogStateChangeAction } from '@samihult/xjog-util'; +import migrationRunner from 'node-pg-migrate'; +import path from 'path'; + +import { + FullStateEntry, + FullStateQuery, + JournalEntry, + JournalEntryAutoFields, + JournalEntryInsertFields, + JournalPersistenceAdapter, + JournalQuery, +} from '@samihult/xjog-journal-persistence'; + +import { PGlite, PGliteOptions } from '@electric-sql/pglite'; +import { PGliteFullStateRow } from './PGliteFullStateRow'; +import { PGliteJournalRow } from './PGliteJournalRow'; + +/** + * Options for instantiating {@link PGliteJournalPersistenceAdapter}. + */ +export type PGliteJournalPersistenceAdapterOptions = { + keyFrameInterval?: number; +}; + +/** + * Use the static method `connect` to instantiate. + * @hideconstructor + */ +export class PGliteJournalPersistenceAdapter extends JournalPersistenceAdapter { + public readonly component = 'journal/persistence'; + public readonly type = 'pglite'; + + private readonly stopObservingNewJournalEntries: Promise<() => Promise>; + + public constructor( + private readonly listenerConfig: PGliteOptions, + private readonly connection: PGlite, + private options: PGliteJournalPersistenceAdapterOptions, + ) { + super(); + + this.stopObservingNewJournalEntries = + this.startObservingNewJournalEntries(); + } + + /** + * Create a connection to a [PostgreSql](https://www.postgresql.org/) database + * and resolve to a JournalPersistenceAdapter that can be passed to the XJog + * constructor. + */ + static async connect( + poolConfiguration: PGliteOptions = {}, + options: Partial = {}, + ): Promise { + const pool = await PGlite.create(poolConfiguration); + const adapter = new PGliteJournalPersistenceAdapter( + poolConfiguration, + pool, + options, + ); + + // TODO resolve separately + options.keyFrameInterval ??= 100; + + try { + await migrationRunner({ + dbClient: pool as any, + migrationsTable: 'migrations_journal', + dir: path.join(__dirname, './migrations'), + singleTransaction: true, + direction: 'up', + log: (message) => adapter.trace({ in: 'connect', message }), + // https://github.com/salsita/node-pg-migrate/issues/821 + checkOrder: false, + noLock: false, + }); + } finally { + // Do not close the pool here, it will be closed by the adapter + } + + return adapter; + } + + public async disconnect(): Promise { + await ( + await this.stopObservingNewJournalEntries + )(); + + await this.connection.close(); + } + + protected async insertEntry( + entry: JournalEntryInsertFields, + ): Promise { + const result = await this.connection.query<{ + id: number; + timestamp: number; + }>( + 'INSERT INTO "journalEntries" ' + + '(' + + ' "machineId", "chartId", "event", ' + + ' "state", "context", "stateDelta", "contextDelta", ' + + ' "actions" ' + + ') ' + + 'VALUES (' + + ' $1, $2, $3, NULL, NULL, ' + + ' $4, $5, $6 ' + + ') ' + + 'RETURNING ' + + ' "id", extract(epoch from "timestamp") * 1000 as "timestamp" ', + [ + entry.ref.machineId, + entry.ref.chartId, + entry.event ? Buffer.from(JSON.stringify(entry.event)) : null, + Buffer.from(JSON.stringify(entry.stateDelta)), + Buffer.from(JSON.stringify(entry.contextDelta)), + entry.actions ? Buffer.from(JSON.stringify(entry.actions)) : null, + ], + ); + + if (!result.rows.length) { + throw new Error('Failed to write journal entry'); + } + + return { + id: result.rows[0].id, + timestamp: Number(result.rows[0].timestamp), + }; + } + + protected async updateFullState(entry: FullStateEntry): Promise { + const result = await this.connection.query( + 'INSERT INTO "fullJournalStates" ' + + '( ' + + ' "id", "created", "timestamp", ' + + ' "ownerId", "machineId", "chartId", ' + + ' "parentMachineId", "parentChartId", ' + + ' "event", "state", "context", ' + + ' "actions" ' + + ') ' + + 'VALUES (' + + ' $1, to_timestamp($2::decimal / 1000), ' + + ' to_timestamp($2::decimal / 1000), ' + + ' $3, $4, $5, ' + + ' $6, $7, ' + + ' $8, $9, $10, $11 ' + + ') ON CONFLICT (' + + ' "machineId", "chartId" ' + + ') DO UPDATE SET ' + + ' "id" = $1, "timestamp" = to_timestamp($2::decimal / 1000), ' + + ' "event" = $8, "state" = $9, "context" = $10, ' + + ' "actions" = $11 ' + + 'WHERE "fullJournalStates"."id" < $1 ', + [ + entry.id, // $1 + entry.timestamp, // $2 + entry.ownerId, // $3 + entry.ref.machineId, // $4 + entry.ref.chartId, // $5 + entry.parentRef?.machineId ?? null, // $6 + entry.parentRef?.chartId ?? null, // $7 + entry.event ? Buffer.from(JSON.stringify(entry.event)) : null, // $8 + entry.state ? Buffer.from(JSON.stringify(entry.state)) : null, // $9 + entry.context ? Buffer.from(JSON.stringify(entry.context)) : null, // $10 + entry.actions ? Buffer.from(JSON.stringify(entry.actions)) : null, // $11 + ], + ); + + if (!result.affectedRows) { + throw new Error('Failed to write journal full entry'); + } + + return; + } + + protected async emitJournalEntryNotification( + id: number, + ref: ChartReference, + ): Promise { + const payload = JSON.stringify({ + id, + machineId: ref.machineId, + chartId: ref.chartId, + }); + + await this.connection.query( + "SELECT pg_notify('new_journal_entry', $1:text)", + [payload], + ); + } + + /** These SQL fields correspond to {@link PostgresJournalRow} */ + private readonly journalEntrySqlSelectFields = + ' "id", extract(epoch from "timestamp") * 1000 as "timestamp", ' + + ' "machineId", "chartId", "event", ' + + ' "state", "stateDelta", "context", "contextDelta", ' + + ' "actions" '; + + public async readEntry(id: number): Promise { + const result = await this.connection.query( + 'SELECT ' + + this.journalEntrySqlSelectFields + + 'FROM "journalEntries" WHERE "id"=$1', + [id], + ); + + if (!result.rows.length) { + return null; + } + + return PGliteJournalPersistenceAdapter.parseSqlJournalRow(result.rows[0]); + } + + /** + * Including the node-pg helper function + * https://github.com/brianc/node-postgres/blob/1b2bedc9c86b7378288e704252a8e4fafa27aa34/packages/pg/lib/utils.js#L175 + */ + private escapeLiteral(str: string): string { + let hasBackslash = false; + let escaped = "'"; + + if (str == null) { + return "''"; + } + + if (typeof str !== 'string') { + return "''"; + } + + for (let i = 0; i < str.length; i++) { + const c = str[i]; + if (c === "'") { + escaped += c + c; + } else if (c === '\\') { + escaped += c + c; + hasBackslash = true; + } else { + escaped += c; + } + } + + escaped += "'"; + + if (hasBackslash === true) { + escaped = ' E' + escaped; + } + + return escaped; + } + + public async queryEntries(query: JournalQuery): Promise { + let result; + + if (Array.isArray(query)) { + if (!query.length) { + return []; + } + + result = await this.connection.query( + 'SELECT ' + + this.journalEntrySqlSelectFields + + 'FROM "journalEntries" ' + + 'JOIN (VALUES ' + + query + .map( + ({ machineId, chartId }) => + `(${this.escapeLiteral(machineId)}, ` + + `${this.escapeLiteral(chartId)})`, + ) + .join(', ') + + ') ' + + ' AS "queryValues" ("queryMachineId", "queryChartId") ' + + 'ON "machineId" = "queryMachineId" AND "chartId" = "queryChartId" ', + ); + } else { + result = await this.connection.query( + 'SELECT ' + + this.journalEntrySqlSelectFields + + 'FROM "journalEntries" ' + + 'WHERE TRUE ' + + (query.ref !== undefined + ? ' AND "machineId" = $1 AND "chartId" = $2 ' + : '') + + (query.afterId !== undefined ? ' AND "id" > $3::bigint ' : '') + + (query.afterAndIncludingId !== undefined + ? ' AND "id" >= $4::bigint ' + : '') + + (query.beforeId !== undefined ? ' AND "id" < $5::bigint ' : '') + + (query.beforeAndIncludingId !== undefined + ? ' AND "id" <= $6::bigint ' + : '') + + (query.updatedAfterAndIncluding !== undefined + ? ' AND "timestamp" >= to_timestamp($7::decimal / 1000) ' + : '') + + (query.updatedBeforeAndIncluding !== undefined + ? ' AND "timestamp" <= to_timestamp($8::decimal / 1000) ' + : '') + + 'ORDER BY "id" ' + + (query.order ?? 'ASC') + + (query.offset !== undefined ? ' OFFSET $9 ' : '') + + (query.limit !== undefined ? ' LIMIT $10 ' : ''), + [ + query.ref?.machineId, // $1 + query.ref?.chartId, // $2 + query.afterId, // $3 + query.afterAndIncludingId, // $4 + query.beforeId, // $5 + query.beforeAndIncludingId, // $6 + query.updatedAfterAndIncluding, // $7 + query.updatedBeforeAndIncluding, // $8 + query.offset, // $9 + query.limit, // $10 + ], + ); + } + + return result.rows.map(PGliteJournalPersistenceAdapter.parseSqlJournalRow); + } + + private async startObservingNewJournalEntries(): Promise< + () => Promise + > { + const startTime = await this.getCurrentTime(); + + let journalEntryIdPointer = 0; + let fullStateEntryIdPointer = 0; + + const channel = 'new_journal_entry'; + + const yieldJournalEntries = (journalEntries: JournalEntry[]) => { + for (const journalEntry of journalEntries) { + if (journalEntry.id < journalEntryIdPointer) { + return; + } + journalEntryIdPointer = journalEntry.id; + this.newJournalEntriesSubject.next(journalEntry); + } + }; + + const yieldFullStateEntries = (fullStateEntries: FullStateEntry[]) => { + for (const fullStateEntry of fullStateEntries) { + if (fullStateEntry.id < fullStateEntryIdPointer) { + return; + } + fullStateEntryIdPointer = fullStateEntry.id; + this.newFullStateEntriesSubject.next(fullStateEntry); + } + }; + + // Received a notification of a new journal entry + this.connection.listen(channel, async () => { + this.queryEntries({ + afterId: journalEntryIdPointer, + updatedAfterAndIncluding: startTime, + order: 'DESC', + }).then((journalEntries: JournalEntry[]) => { + if (journalEntries.length) { + yieldJournalEntries(journalEntries); + } + }); + + this.queryFullStates({ + afterId: fullStateEntryIdPointer, + updatedAfterAndIncluding: startTime, + order: 'DESC', + }).then((fullStateEntries: FullStateEntry[]) => { + if (fullStateEntries.length) { + yieldFullStateEntries(fullStateEntries); + } + }); + }); + + return async () => { + await this.connection.unlisten(channel); + }; + } + + /** These SQL fields correspond to {@link PostgresFullStateRow} */ + private readonly fullStateEntrySqlSelectFields = + ' "id", extract(epoch from "created") * 1000 as "created", ' + + ' extract(epoch from "timestamp") * 1000 as "timestamp", ' + + ' "ownerId", ' + + ' "machineId", "chartId", "parentMachineId", "parentChartId", ' + + ' "event", "state", "context", "actions" '; + + public async readFullState( + ref: ChartReference, + ): Promise { + const result = await this.connection.query( + 'SELECT ' + + this.fullStateEntrySqlSelectFields + + 'FROM "fullJournalStates" ' + + 'WHERE "machineId" = $1 AND "chartId" = $2 ', + [ref.machineId, ref.chartId], + ); + + if (!result.rows.length) { + return null; + } + + return PGliteJournalPersistenceAdapter.parseSqlFullStateRow(result.rows[0]); + } + + public async queryFullStates( + query: FullStateQuery, + ): Promise { + let result; + + if (Array.isArray(query)) { + if (!query.length) { + return []; + } + + result = await this.connection.query( + 'SELECT ' + + this.fullStateEntrySqlSelectFields + + 'FROM "fullJournalStates" ' + + 'JOIN (VALUES ' + + query + .map( + ({ machineId, chartId }) => + `(${this.escapeLiteral(machineId)}, ` + + `${this.escapeLiteral(chartId)})`, + ) + .join(', ') + + ') ' + + ' AS "queryValues" ("queryMachineId", "queryChartId") ' + + 'ON "machineId" = "queryMachineId" AND "chartId" = "queryChartId" ', + ); + } else { + result = await this.connection.query( + 'SELECT ' + + this.fullStateEntrySqlSelectFields + + 'FROM "fullJournalStates" ' + + 'WHERE TRUE ' + + (query.ref !== undefined && query.machineId === undefined + ? ' AND "machineId" = $1 AND "chartId" = $2 ' + : '') + + (query.parentRef !== undefined + ? ' AND "parentMachineId" = $3 AND "parentChartId" = $4 ' + : '') + + // In case of both machineId and ref, ref takes precedence + (query.machineId !== undefined && query.ref === undefined + ? ' AND "machineId" = $5 ' + : '') + + (query.afterId !== undefined ? ' AND "id" > $6::bigint ' : '') + + (query.afterAndIncludingId !== undefined + ? ' AND "id" >= $7::bigint ' + : '') + + (query.beforeId !== undefined ? ' AND "id" < $8::bigint ' : '') + + (query.beforeAndIncludingId !== undefined + ? ' AND "id" <= $9::bigint ' + : '') + + (query.createdAfterAndIncluding !== undefined + ? ' AND "created" >= to_timestamp($10::decimal / 1000) ' + : '') + + (query.createdBeforeAndIncluding !== undefined + ? ' AND "created" <= to_timestamp($11::decimal / 1000) ' + : '') + + (query.updatedAfterAndIncluding !== undefined + ? ' AND "timestamp" >= to_timestamp($12::decimal / 1000) ' + : '') + + (query.updatedBeforeAndIncluding !== undefined + ? ' AND "timestamp" <= to_timestamp($13::decimal / 1000) ' + : '') + + 'ORDER BY "id" ' + + (query.order ?? 'ASC') + + (query.offset !== undefined ? ' OFFSET $13 ' : '') + + (query.limit !== undefined ? ' LIMIT $14 ' : ''), + [ + query.ref?.machineId ?? query.machineId, // $1 + query.ref?.chartId, // $2 + query.parentRef?.machineId, // $3 + query.parentRef?.chartId, // $4 + query.afterId, // $5 + query.afterAndIncludingId, // $6 + query.beforeId, // $7 + query.beforeAndIncludingId, // $8 + query.createdAfterAndIncluding, // $9 + query.createdBeforeAndIncluding, // $10 + query.updatedAfterAndIncluding, // $11 + query.updatedBeforeAndIncluding, // $12 + query.offset, // $13 + query.limit, // $14 + ], + ); + } + + return result.rows.map( + PGliteJournalPersistenceAdapter.parseSqlFullStateRow, + ); + } + + /** + * @returns Number of deleted records + */ + public async deleteByChart(ref: ChartReference): Promise { + const fullStateResult = await this.connection.query( + 'DELETE FROM "fullJournalStates" ' + + 'WHERE "machineId"=$1 AND "chartId"=$2', + [ref.machineId, ref.chartId], + ); + + const journalEntryResult = await this.connection.query( + 'DELETE FROM "journalEntries" ' + 'WHERE "machineId"=$1 AND "chartId"=$2', + [ref.machineId, ref.chartId], + ); + + return ( + (fullStateResult.affectedRows ?? 0) + + (journalEntryResult.affectedRows ?? 0) + ); + } + + public async getCurrentTime(): Promise { + const result = await this.connection.query<{ time: number }>( + 'SELECT extract(epoch from transaction_timestamp()) * 1000 AS "time"', + ); + + if (!result.rows.length) { + throw new Error('Failed to read current time from database'); + } + + return Number(result.rows[0].time); + } + + public async record( + ownerId: string, + ref: ChartReference, + parentRef: ChartReference | null, + event: EventObject | null, + oldState: StateValue | null, + oldContext: any | null, + newState: StateValue | null, + newContext: unknown | null, + actions: XJogStateChangeAction[], + cid?: string, + ): Promise { + // TODO: Figure out what data to pass in here + const now = new Date().getTime(); + const entry = await this.updateFullState({ + id: 1, + created: now, + timestamp: now, + ownerId, + ref, + parentRef, + event, + state: newState, + context: newContext, + actions, + }); + } + + static parseSqlJournalRow(row: PGliteJournalRow): JournalEntry { + return { + id: Number(row.id), + timestamp: Number(row.timestamp), + + ref: { + machineId: row.machineId, + chartId: row.chartId, + }, + + event: JSON.parse(String(row.event)), + + state: row.state ? JSON.parse(String(row.state)) : null, + context: row.context ? JSON.parse(String(row.context)) : null, + + stateDelta: JSON.parse(String(row.stateDelta)), + contextDelta: JSON.parse(String(row.contextDelta)), + actions: row.actions ? JSON.parse(String(row.actions)) : null, + }; + } + + static parseSqlFullStateRow(row: PGliteFullStateRow): FullStateEntry { + const decoder = new TextDecoder(); + + return { + id: Number(row.id), + created: Number(row.created), + timestamp: Number(row.timestamp), + + ownerId: row.ownerId, + + ref: { + machineId: row.machineId, + chartId: row.chartId, + }, + + parentRef: row.parentChartId + ? { + machineId: row.parentMachineId, + chartId: row.parentChartId, + } + : null, + + event: row.event ? JSON.parse(decoder.decode(row.event)) : null, + state: row.state ? JSON.parse(decoder.decode(row.state)) : null, + context: row.context ? JSON.parse(decoder.decode(row.context)) : null, + actions: row.actions ? JSON.parse(decoder.decode(row.actions)) : null, + }; + } +} diff --git a/packages/journal-pglite/src/PGliteJournalRow.ts b/packages/journal-pglite/src/PGliteJournalRow.ts new file mode 100644 index 00000000..4934558a --- /dev/null +++ b/packages/journal-pglite/src/PGliteJournalRow.ts @@ -0,0 +1,15 @@ +/** + * Journal entry row directly from the SQL query + */ +export type PGliteJournalRow = { + id: number; + timestamp: number; + machineId: string; + chartId: string; + event: Buffer | null; + state: Buffer | null; + stateDelta: Buffer; + context: Buffer | null; + contextDelta: Buffer; + actions: Buffer | null; +}; diff --git a/packages/journal-pglite/src/index.ts b/packages/journal-pglite/src/index.ts new file mode 100644 index 00000000..b726c6f8 --- /dev/null +++ b/packages/journal-pglite/src/index.ts @@ -0,0 +1,3 @@ +export * from './PGliteJournalRow'; +export * from './PGliteFullStateRow'; +export * from './PGliteJournalPersistenceAdapter'; diff --git a/packages/journal-pglite/src/migrations/001-journal-entries.sql b/packages/journal-pglite/src/migrations/001-journal-entries.sql new file mode 100644 index 00000000..3c10111b --- /dev/null +++ b/packages/journal-pglite/src/migrations/001-journal-entries.sql @@ -0,0 +1,32 @@ +-------------------------------------------------------------------------------- +-- Up migration +-------------------------------------------------------------------------------- + +CREATE TABLE "journalEntries" ( + "id" SERIAL PRIMARY KEY, + "timestamp" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT transaction_timestamp(), + + "machineId" TEXT NOT NULL, + "chartId" TEXT NOT NULL, + + -- Event that caused this transition, as serialized JSON + "event" BYTEA, + -- Full state as serialized JSON, but only mandatory for the first entry + "state" BYTEA DEFAULT NULL, + -- Context as serialized JSON,but only mandatory for the first entry + "context" BYTEA DEFAULT NULL, + + -- Change set between this and previous entry, can be used for time travel + "stateDelta" BYTEA NOT NULL, + -- Change set between this and previous entry, can be used for time travel + "contextDelta" BYTEA NOT NULL, + + -- Actions triggered by the transition + "actions" BYTEA DEFAULT NULL +); + +-------------------------------------------------------------------------------- +-- Down migration +-------------------------------------------------------------------------------- + +DROP TABLE "journalEntries"; diff --git a/packages/journal-pglite/src/migrations/002-journal-chart-index.sql b/packages/journal-pglite/src/migrations/002-journal-chart-index.sql new file mode 100644 index 00000000..e132c6ae --- /dev/null +++ b/packages/journal-pglite/src/migrations/002-journal-chart-index.sql @@ -0,0 +1,13 @@ +-------------------------------------------------------------------------------- +-- Up migration +-------------------------------------------------------------------------------- + +CREATE INDEX "journalChartIndex" ON "journalEntries" ("machineId", "chartId"); + + +-------------------------------------------------------------------------------- +-- Down migration +-------------------------------------------------------------------------------- + +DROP INDEX "journalChartIndex"; + diff --git a/packages/journal-pglite/src/migrations/003-full-journal-states.sql b/packages/journal-pglite/src/migrations/003-full-journal-states.sql new file mode 100644 index 00000000..399aac1d --- /dev/null +++ b/packages/journal-pglite/src/migrations/003-full-journal-states.sql @@ -0,0 +1,36 @@ +-------------------------------------------------------------------------------- +-- Up migration +-------------------------------------------------------------------------------- + + +CREATE TABLE "fullJournalStates" ( + "id" BIGINT, + "created" TIMESTAMP WITH TIME ZONE NOT NULL, + "timestamp" TIMESTAMP WITH TIME ZONE NOT NULL, + + "ownerId" TEXT, + "machineId" TEXT NOT NULL, + "chartId" TEXT NOT NULL, + "parentMachineId" TEXT, + "parentChartId" TEXT, + + -- Event that caused this transition, as serialized JSON + "event" BYTEA DEFAULT NULL, + + -- Full state as serialized JSON, but only mandatory for the first entry + "state" BYTEA DEFAULT NULL, + -- Context as serialized JSON,but only mandatory for the first entry + "context" BYTEA DEFAULT NULL, + + -- Actions triggered by the transition + "actions" BYTEA DEFAULT NULL, + + PRIMARY KEY("machineId", "chartId") +); + +-------------------------------------------------------------------------------- +-- Down migration +-------------------------------------------------------------------------------- + +DROP TABLE "fullJournalStates"; + diff --git a/packages/journal-pglite/src/migrations/004-full-journal-chart-parent-index.sql b/packages/journal-pglite/src/migrations/004-full-journal-chart-parent-index.sql new file mode 100644 index 00000000..99c559df --- /dev/null +++ b/packages/journal-pglite/src/migrations/004-full-journal-chart-parent-index.sql @@ -0,0 +1,13 @@ +-------------------------------------------------------------------------------- +-- Up migration +-------------------------------------------------------------------------------- + +CREATE INDEX "fullJournalChartParentIndex" + ON "fullJournalStates" ("parentMachineId", "parentChartId") + WHERE "parentChartId" IS NOT NULL; + +-------------------------------------------------------------------------------- +-- Down migration +-------------------------------------------------------------------------------- + +DROP INDEX "fullJournalChartParentIndex"; diff --git a/packages/journal-pglite/tsconfig.json b/packages/journal-pglite/tsconfig.json new file mode 100644 index 00000000..cf21a7e1 --- /dev/null +++ b/packages/journal-pglite/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "rootDir": "./src", + "outDir": "./lib" + }, + "include": ["src"], + "exclude": ["node_modules", "lib", "src/**/*.test.*"] +}