Skip to content

Commit cb593b2

Browse files
authored
Merge pull request #15 from fireflyframework/develop
release(eda): merge Postgres EDA adapter from develop into main
2 parents 2a76812 + 01a4e6e commit cb593b2

18 files changed

Lines changed: 2233 additions & 54 deletions

README.md

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
[![Java](https://img.shields.io/badge/Java-21%2B-orange.svg)](https://openjdk.org)
66
[![Spring Boot](https://img.shields.io/badge/Spring%20Boot-3.x-green.svg)](https://spring.io/projects/spring-boot)
77

8-
> Unified event-driven architecture library with Kafka, RabbitMQ, and Spring Application Events support.
8+
> Unified event-driven architecture library with Kafka, RabbitMQ, PostgreSQL (LISTEN/NOTIFY + outbox), and Spring Application Events support.
99
1010
---
1111

@@ -23,15 +23,15 @@
2323

2424
## Overview
2525

26-
Firefly Framework EDA provides a standardized messaging abstraction for event-driven architectures, supporting multiple broker implementations through a unified publisher/consumer API. It enables reactive event publishing and consumption with built-in support for Apache Kafka, RabbitMQ, and Spring Application Events as transport mechanisms.
26+
Firefly Framework EDA provides a standardized messaging abstraction for event-driven architectures, supporting multiple broker implementations through a unified publisher/consumer API. It enables reactive event publishing and consumption with built-in support for Apache Kafka, RabbitMQ, PostgreSQL (via `LISTEN`/`NOTIFY` backed by a transactional outbox table), and Spring Application Events as transport mechanisms.
2727

2828
The library features annotation-driven event publishing (`@EventPublisher`, `@PublishResult`), declarative event listeners (`@EventListener`), and a comprehensive set of event filtering, serialization, and error handling capabilities. It includes support for JSON, Avro, and Protobuf message serialization formats.
2929

3030
The resilient publisher wrapper provides circuit breaker integration, while the dead letter queue handler ensures no events are lost during processing failures. Metrics collection and health indicators provide full observability into the messaging infrastructure.
3131

3232
## Features
3333

34-
- Multi-broker support: Apache Kafka, RabbitMQ, Spring Application Events
34+
- Multi-broker support: Apache Kafka, RabbitMQ, PostgreSQL (`LISTEN`/`NOTIFY` + outbox), Spring Application Events
3535
- Annotation-driven publishing: `@EventPublisher`, `@PublishResult`
3636
- Declarative event listening: `@EventListener` with SpEL-based filtering
3737
- Event envelope pattern with metadata propagation
@@ -41,16 +41,17 @@ The resilient publisher wrapper provides circuit breaker integration, while the
4141
- Resilient publisher with circuit breaker support
4242
- Dynamic event listener registration at runtime
4343
- AMQP admin auto-configuration for RabbitMQ exchanges and queues
44+
- Transactional outbox table for PostgreSQL with auto-created schema and trigger
4445
- Custom error handling strategies with metrics and notification handlers
4546
- Health indicators and metrics for Actuator integration
46-
- Spring Boot auto-configuration for Kafka and RabbitMQ
47+
- Spring Boot auto-configuration for Kafka, RabbitMQ, and PostgreSQL
4748

4849
## Requirements
4950

5051
- Java 21+
5152
- Spring Boot 3.x
5253
- Maven 3.9+
53-
- Apache Kafka or RabbitMQ (depending on chosen broker)
54+
- Apache Kafka, RabbitMQ, or PostgreSQL 11+ (depending on chosen transport)
5455

5556
## Installation
5657

@@ -95,16 +96,76 @@ public class OrderEventHandler {
9596
```yaml
9697
firefly:
9798
eda:
98-
broker: kafka # kafka, rabbitmq, spring
99-
kafka:
100-
bootstrap-servers: localhost:9092
101-
consumer:
102-
group-id: my-service
103-
rabbitmq:
104-
host: localhost
105-
port: 5672
99+
enabled: true
100+
default-publisher-type: AUTO # AUTO chooses KAFKA → RABBITMQ → POSTGRES → APPLICATION_EVENT
101+
publishers:
102+
enabled: true
103+
kafka:
104+
default:
105+
enabled: true
106+
bootstrap-servers: localhost:9092
107+
rabbitmq:
108+
default:
109+
enabled: true
110+
host: localhost
111+
port: 5672
112+
postgres:
113+
default:
114+
enabled: true
115+
host: localhost
116+
port: 5432
117+
database: app
118+
username: app
119+
password: secret
120+
schema: public
121+
outbox-table: firefly_eda_outbox
122+
default-destination: events
123+
auto-create-schema: true # provision outbox table + NOTIFY trigger at startup
124+
consumer:
125+
enabled: true
126+
group-id: my-service
127+
kafka:
128+
default:
129+
enabled: true
130+
bootstrap-servers: localhost:9092
131+
rabbitmq:
132+
default:
133+
enabled: true
134+
host: localhost
135+
port: 5672
136+
queues: events-queue
137+
postgres:
138+
default:
139+
enabled: true
140+
host: localhost
141+
port: 5432
142+
database: app
143+
username: app
144+
password: secret
145+
channels: events,order-events # destinations to LISTEN on
146+
polling-interval: 30s # NOTIFY-loss fallback poll cadence
147+
max-attempts: 3 # outbox row moves to DEAD_LETTER after N failures
106148
```
107149
150+
### PostgreSQL transport at a glance
151+
152+
- Each `publish()` performs a single `INSERT` into `firefly_eda_outbox`. A
153+
database trigger fires `pg_notify(channel, id)` for every inserted row.
154+
- The consumer holds a dedicated R2DBC connection that runs `LISTEN <channel>`
155+
for every subscribed destination. Notifications carry only the outbox row
156+
id so payloads can be arbitrarily large.
157+
- On dispatch, the listener pipeline either marks the row `PROCESSED` or
158+
increments `attempts`; once `attempts` reaches `max-attempts`, the row
159+
moves to `DEAD_LETTER` status.
160+
- A periodic poll (`polling-interval`) catches rows that slipped past the
161+
live channel (e.g., consumer offline at insert time, payload too large,
162+
connection reset). Set it to `0s` to disable polling.
163+
- Channel names are derived deterministically from destinations via the
164+
built-in mapper: non-alphanumeric characters become `_`, the result is
165+
lower-cased, prefixed with `firefly_eda_`, and truncated to fit
166+
PostgreSQL's 63-byte identifier limit with a stable hash suffix when
167+
needed.
168+
108169
## Documentation
109170

110171
Additional documentation is available in the [docs/](docs/) directory:

docs/CONFIGURATION.md

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,54 @@ firefly:
179179
| `default-routing-key` | string | `"event"` | Default routing key |
180180
| `properties` | map | `{}` | Additional RabbitMQ connection properties |
181181

182+
### PostgreSQL Publisher
183+
184+
The PostgreSQL publisher uses an outbox table together with `pg_notify` to deliver events. Configuration lives under `firefly.eda.publishers.postgres.<connection-id>` -- never under `spring.r2dbc.*`.
185+
186+
```yaml
187+
firefly:
188+
eda:
189+
publishers:
190+
postgres:
191+
default: # Connection ID
192+
enabled: true
193+
host: "localhost"
194+
port: 5432
195+
database: "app"
196+
username: "app"
197+
password: "secret"
198+
schema: "public"
199+
outbox-table: "firefly_eda_outbox"
200+
default-destination: "events"
201+
auto-create-schema: true
202+
max-pool-size: 10
203+
properties:
204+
statement_timeout: "30000"
205+
```
206+
207+
| Property | Type | Default | Description |
208+
|----------|------|---------|-------------|
209+
| `enabled` | boolean | `false` | Whether this PostgreSQL publisher connection is enabled |
210+
| `host` | string | `"localhost"` | PostgreSQL host |
211+
| `port` | int | `5432` | PostgreSQL port |
212+
| `database` | string | `null` | Database name |
213+
| `username` | string | `null` | Database username |
214+
| `password` | string | `null` | Database password |
215+
| `schema` | string | `"public"` | Schema containing the outbox table |
216+
| `outbox-table` | string | `"firefly_eda_outbox"` | Name of the outbox table |
217+
| `default-destination` | string | `"events"` | Default destination when none is provided to `publish()` |
218+
| `auto-create-schema` | boolean | `true` | Provision the outbox table, index, NOTIFY function and trigger at startup |
219+
| `max-pool-size` | int | `10` | Maximum R2DBC connection pool size used for outbox inserts |
220+
| `properties` | map | `{}` | Additional R2DBC PostgreSQL startup options (e.g., statement timeout) |
221+
222+
When `auto-create-schema: true`, the publisher provisions:
223+
224+
- `firefly_eda_outbox` table with `id`, `destination`, `channel`, `payload (BYTEA)`, `headers (JSONB)`, `status`, `attempts`, `error_message`, and timestamp columns
225+
- An index on `(status, created_at)` filtered to `status = 'PENDING'`
226+
- An index on `(channel, status)`
227+
- A `firefly_eda_notify_event()` trigger function that calls `pg_notify(NEW.channel, NEW.id::text)`
228+
- An `AFTER INSERT` trigger on the outbox table that runs the function
229+
182230
## Consumer Configuration
183231

184232
Consumers are configured under `firefly.eda.consumer`.
@@ -289,6 +337,51 @@ firefly:
289337

290338
**Important**: RabbitMQ queues must be pre-declared and configured here. The `@EventListener` destinations are used for filtering messages after they are consumed from these queues, not for determining which queues to subscribe to.
291339

340+
### PostgreSQL Consumer
341+
342+
The PostgreSQL consumer holds one long-lived R2DBC connection to receive `NOTIFY` messages and creates short-lived connections to drain the outbox table. Configuration lives under `firefly.eda.consumer.postgres.<connection-id>`. Channels are derived from `@EventListener` annotations with `consumerType=POSTGRES` or `consumerType=AUTO`, and additionally from the `channels` property.
343+
344+
```yaml
345+
firefly:
346+
eda:
347+
consumer:
348+
postgres:
349+
default:
350+
enabled: true
351+
host: "localhost"
352+
port: 5432
353+
database: "app"
354+
username: "app"
355+
password: "secret"
356+
schema: "public"
357+
outbox-table: "firefly_eda_outbox"
358+
channels: "events,order-events" # comma-separated destinations to LISTEN on
359+
polling-interval: 30s # NOTIFY-loss fallback poll cadence; set to 0s to disable
360+
max-attempts: 3 # rows beyond this attempt count are marked DEAD_LETTER
361+
batch-size: 50 # max rows fetched per poll cycle
362+
max-pool-size: 5
363+
properties: {}
364+
```
365+
366+
| Property | Type | Default | Description |
367+
|----------|------|---------|-------------|
368+
| `enabled` | boolean | `false` | Whether this PostgreSQL consumer connection is enabled |
369+
| `host` | string | `"localhost"` | PostgreSQL host |
370+
| `port` | int | `5432` | PostgreSQL port |
371+
| `database` | string | `null` | Database name |
372+
| `username` | string | `null` | Database username |
373+
| `password` | string | `null` | Database password |
374+
| `schema` | string | `"public"` | Schema containing the outbox table |
375+
| `outbox-table` | string | `"firefly_eda_outbox"` | Outbox table the consumer reads from |
376+
| `channels` | string | `"events"` | Comma-separated destinations to LISTEN on (combined with any from `@EventListener`) |
377+
| `polling-interval` | duration | `30s` | Fallback poll cadence for rows missed by NOTIFY; `0s` disables polling |
378+
| `max-attempts` | int | `3` | Failure attempts before a row transitions to `DEAD_LETTER` |
379+
| `batch-size` | int | `50` | Maximum rows fetched per poll cycle |
380+
| `max-pool-size` | int | `5` | Connection pool size for outbox queries; LISTEN uses one connection on top of this budget |
381+
| `properties` | map | `{}` | Additional R2DBC PostgreSQL startup options |
382+
383+
**How acknowledgement works**: On successful dispatch, the row is updated to `status='PROCESSED', processed_at=NOW()`. On failure, `attempts` is incremented and `error_message` is recorded; once `attempts >= max-attempts`, the row moves to `status='DEAD_LETTER'`. Use the outbox table directly to inspect, reset, or requeue events.
384+
292385
### Application Event Consumer
293386

294387
```yaml

docs/PUBLISHER_TYPES.md

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ The `PublisherType` enum defines the following supported messaging platforms:
88

99
### AUTO
1010
**Description**: Automatically select the best available publisher
11-
**Selection Priority**: KAFKA → RABBITMQ → APPLICATION_EVENT → NOOP
11+
**Selection Priority**: KAFKA → RABBITMQ → POSTGRES → APPLICATION_EVENT → NOOP
1212
**Use Case**: Let the system choose the optimal publisher based on availability and configuration
1313

1414
```java
@@ -60,6 +60,53 @@ firefly:
6060
default-exchange: events
6161
```
6262
63+
### POSTGRES
64+
**Description**: PostgreSQL `LISTEN`/`NOTIFY` with a transactional outbox table
65+
**Features**: Persistent storage, payload sizes beyond `NOTIFY`'s 8 kB limit, retry counts, dead-letter status, polling fallback for missed notifications
66+
**Best For**: Services that already use PostgreSQL and want reliable event publishing without an additional broker; outbox-style transactional event publishing
67+
**Persistence**: ✅ Yes (outbox table rows)
68+
**Ordering**: ✅ Yes (per destination -- rows are ordered by `created_at` / `id`)
69+
**Cloud Service**: ❌ No (self-hosted; works with managed PostgreSQL services)
70+
71+
```yaml
72+
firefly:
73+
eda:
74+
publishers:
75+
postgres:
76+
default:
77+
enabled: true
78+
host: localhost
79+
port: 5432
80+
database: app
81+
username: app
82+
password: secret
83+
schema: public
84+
outbox-table: firefly_eda_outbox
85+
default-destination: events
86+
auto-create-schema: true # provision outbox table + NOTIFY trigger at startup
87+
consumer:
88+
postgres:
89+
default:
90+
enabled: true
91+
host: localhost
92+
port: 5432
93+
database: app
94+
username: app
95+
password: secret
96+
channels: events,order-events # destinations to LISTEN on
97+
polling-interval: 30s
98+
max-attempts: 3
99+
```
100+
101+
**How it works**: Each `publish()` performs a single `INSERT` into the
102+
configured outbox table. A trigger fires `pg_notify(channel, id)` for the
103+
new row, so any consumer that has issued `LISTEN` on the matching channel
104+
receives the row id and fetches the full payload from the table. The
105+
consumer marks rows as `PROCESSED` after successful dispatch, increments
106+
`attempts` on failure, and transitions to `DEAD_LETTER` once `max-attempts`
107+
is reached. A periodic poll catches rows that slipped past the live
108+
channel (consumer offline, payload too large, etc.).
109+
63110
### APPLICATION_EVENT
64111
**Description**: Spring Application Event Bus (in-memory)
65112
**Features**: Synchronous processing, JVM-local, simple integration
@@ -99,23 +146,25 @@ class ServiceTest {
99146
When using `PublisherType.AUTO`, the system selects publishers in this priority order:
100147

101148
1. **KAFKA** - If Kafka is configured and available
102-
2. **RABBITMQ** - If RabbitMQ is configured and available
103-
3. **APPLICATION_EVENT** - If Spring context is available (always true)
104-
4. **NOOP** - If explicitly enabled for testing
149+
2. **RABBITMQ** - If RabbitMQ is configured and available
150+
3. **POSTGRES** - If a PostgreSQL EDA connection is configured and available
151+
4. **APPLICATION_EVENT** - If Spring context is available (always true)
152+
5. **NOOP** - If explicitly enabled for testing
105153

106154
## Feature Comparison Matrix
107155

108-
| Feature | KAFKA | RABBITMQ | APPLICATION_EVENT | NOOP |
109-
|---------|-------|----------|------------------|------|
110-
| **Throughput** | Very High | High | High | N/A |
111-
| **Persistence** |||||
112-
| **Ordering** |||||
113-
| **Partitioning** |||||
114-
| **Complex Routing** |||||
115-
| **Guaranteed Delivery** |||||
116-
| **Multi-Instance** |||||
117-
| **Cloud Native** |||||
118-
| **Setup Complexity** | Medium | Medium | Low | None |
156+
| Feature | KAFKA | RABBITMQ | POSTGRES | APPLICATION_EVENT | NOOP |
157+
|---------|-------|----------|----------|------------------|------|
158+
| **Throughput** | Very High | High | Medium | High | N/A |
159+
| **Persistence** | ✅ | ✅ | ✅ | ❌ | ❌ |
160+
| **Ordering** | ✅ | ❌ | ✅ | ❌ | ❌ |
161+
| **Partitioning** | ✅ | ❌ | ❌ | ❌ | ❌ |
162+
| **Complex Routing** | ❌ | ✅ | ❌ | ❌ | ❌ |
163+
| **Guaranteed Delivery** | ✅ | ✅ | ✅ | ❌ | ❌ |
164+
| **Multi-Instance** | ✅ | ✅ | ✅ | ❌ | ❌ |
165+
| **Cloud Native** | ✅ | ✅ | ✅ | ❌ | ❌ |
166+
| **Setup Complexity** | Medium | Medium | Low | Low | None |
167+
| **Requires extra broker** | Yes | Yes | No (reuses DB) | No | No |
119168

120169
## Configuration Examples
121170

@@ -182,6 +231,13 @@ public class EventService {
182231
- Fan-out patterns are common
183232
- Priority queues are required
184233

234+
**Use POSTGRES when:**
235+
- The service already uses PostgreSQL and you want event publishing without operating a separate broker
236+
- You need transactional outbox semantics (event publish ties to a database transaction)
237+
- Moderate throughput is sufficient (the table acts as the queue)
238+
- You want persistent, auditable events with built-in retry / dead-letter status
239+
- You can tolerate a small NOTIFY-delivery delay covered by the polling fallback
240+
185241
**Use APPLICATION_EVENT when:**
186242
- Simple internal communication is needed
187243
- Single-instance deployment

pom.xml

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.fireflyframework</groupId>
99
<artifactId>fireflyframework-parent</artifactId>
10-
<version>26.04.01</version>
10+
<version>26.05.05</version>
1111
</parent>
1212

1313
<artifactId>fireflyframework-eda</artifactId>
@@ -103,6 +103,20 @@
103103
<artifactId>spring-boot-starter-amqp</artifactId>
104104
</dependency>
105105

106+
<!-- PostgreSQL via R2DBC (LISTEN/NOTIFY + outbox transport) -->
107+
<dependency>
108+
<groupId>io.r2dbc</groupId>
109+
<artifactId>r2dbc-spi</artifactId>
110+
</dependency>
111+
<dependency>
112+
<groupId>io.r2dbc</groupId>
113+
<artifactId>r2dbc-pool</artifactId>
114+
</dependency>
115+
<dependency>
116+
<groupId>org.postgresql</groupId>
117+
<artifactId>r2dbc-postgresql</artifactId>
118+
</dependency>
119+
106120

107121
<!-- Circuit Breaker for Resilience -->
108122
<dependency>
@@ -175,6 +189,11 @@
175189
<artifactId>rabbitmq</artifactId>
176190
<scope>test</scope>
177191
</dependency>
192+
<dependency>
193+
<groupId>org.testcontainers</groupId>
194+
<artifactId>postgresql</artifactId>
195+
<scope>test</scope>
196+
</dependency>
178197

179198
<!-- Spring Boot Test Auto-configuration -->
180199
<dependency>

0 commit comments

Comments
 (0)