diff --git a/pom.xml b/pom.xml index 96be046..40718bf 100644 --- a/pom.xml +++ b/pom.xml @@ -19,6 +19,13 @@ A comprehensive Spring Boot library that provides Event Sourcing pattern implementation with reactive programming support, featuring event stores, aggregates, snapshots, and integration with EDA messaging. + + + org.fireflyframework + fireflyframework-kernel + ${project.version} + + org.springframework.boot diff --git a/src/main/java/org/fireflyframework/eventsourcing/aggregate/EventHandlerException.java b/src/main/java/org/fireflyframework/eventsourcing/aggregate/EventHandlerException.java index 4784bcb..0d5ec40 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/aggregate/EventHandlerException.java +++ b/src/main/java/org/fireflyframework/eventsourcing/aggregate/EventHandlerException.java @@ -16,6 +16,8 @@ package org.fireflyframework.eventsourcing.aggregate; +import org.fireflyframework.kernel.exception.FireflyException; + /** * Exception thrown when there's an issue with event handler methods in aggregates. *

@@ -25,7 +27,7 @@ * - Event handler method has invalid signature * - Reflection issues when accessing event handlers */ -public class EventHandlerException extends RuntimeException { +public class EventHandlerException extends FireflyException { public EventHandlerException(String message) { super(message); diff --git a/src/main/java/org/fireflyframework/eventsourcing/config/EventSourcingAutoConfiguration.java b/src/main/java/org/fireflyframework/eventsourcing/config/EventSourcingAutoConfiguration.java index 9002911..cb0aa74 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/config/EventSourcingAutoConfiguration.java +++ b/src/main/java/org/fireflyframework/eventsourcing/config/EventSourcingAutoConfiguration.java @@ -17,20 +17,29 @@ package org.fireflyframework.eventsourcing.config; import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.core.instrument.MeterRegistry; import org.fireflyframework.eda.publisher.EventPublisherFactory; +import org.fireflyframework.eventsourcing.monitoring.EventStoreMetrics; +import org.fireflyframework.eventsourcing.outbox.EventOutboxProcessor; +import org.fireflyframework.eventsourcing.outbox.EventOutboxRepository; +import org.fireflyframework.eventsourcing.outbox.EventOutboxService; import org.fireflyframework.eventsourcing.publisher.EventSourcingPublisher; +import org.fireflyframework.eventsourcing.transaction.EventSourcingTransactionalAspect; +import org.fireflyframework.eventsourcing.upcasting.EventUpcaster; +import org.fireflyframework.eventsourcing.upcasting.EventUpcastingService; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.FilterType; -import org.springframework.context.annotation.Import; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.transaction.ReactiveTransactionManager; + +import java.util.List; /** * Auto-configuration for the Event Sourcing library. @@ -59,23 +68,8 @@ @AutoConfiguration @ConditionalOnProperty(prefix = "firefly.eventsourcing", name = "enabled", havingValue = "true", matchIfMissing = true) @EnableConfigurationProperties(EventSourcingProperties.class) -@ComponentScan( - basePackages = "org.fireflyframework.eventsourcing", - excludeFilters = @ComponentScan.Filter( - type = FilterType.REGEX, - pattern = "com\\.firefly\\.common\\.eventsourcing\\.examples\\..*" - ) -) @EnableAsync @EnableScheduling -@Import({ - EventStoreAutoConfiguration.class, - SnapshotAutoConfiguration.class, - EventSourcingHealthConfiguration.class, - EventSourcingMetricsConfiguration.class, - org.fireflyframework.core.config.R2dbcConfig.class, - org.fireflyframework.core.config.R2dbcTransactionConfig.class -}) @Slf4j public class EventSourcingAutoConfiguration { @@ -111,4 +105,76 @@ public ObjectMapper eventSourcingObjectMapper() { mapper.findAndRegisterModules(); return mapper; } + + /** + * Creates the EventStoreMetrics bean for monitoring event store operations. + */ + @Bean + @ConditionalOnMissingBean + public EventStoreMetrics eventStoreMetrics(MeterRegistry meterRegistry) { + log.debug("Creating EventStoreMetrics bean"); + return new EventStoreMetrics(meterRegistry); + } + + /** + * Creates the EventTypeRegistry bean for automatic event type discovery and registration. + */ + @Bean + @ConditionalOnMissingBean + public EventTypeRegistry eventTypeRegistry(ApplicationContext applicationContext, ObjectMapper objectMapper) { + log.debug("Creating EventTypeRegistry bean"); + return new EventTypeRegistry(applicationContext, objectMapper); + } + + /** + * Creates the EventSourcingTransactionalAspect bean for transactional event sourcing operations. + */ + @Bean + @ConditionalOnMissingBean + @ConditionalOnBean(ReactiveTransactionManager.class) + public EventSourcingTransactionalAspect eventSourcingTransactionalAspect( + ReactiveTransactionManager transactionManager, + EventSourcingPublisher eventPublisher) { + log.debug("Creating EventSourcingTransactionalAspect bean"); + return new EventSourcingTransactionalAspect(transactionManager, eventPublisher); + } + + /** + * Creates the EventUpcastingService bean for managing event upcasting. + */ + @Bean + @ConditionalOnMissingBean + public EventUpcastingService eventUpcastingService(List upcasters) { + log.debug("Creating EventUpcastingService bean"); + return new EventUpcastingService(upcasters); + } + + /** + * Creates the EventOutboxService bean for managing Event Outbox operations. + */ + @Bean + @ConditionalOnMissingBean + public EventOutboxService eventOutboxService( + EventOutboxRepository outboxRepository, + EventSourcingPublisher eventPublisher, + ObjectMapper objectMapper) { + log.debug("Creating EventOutboxService bean"); + return new EventOutboxService(outboxRepository, eventPublisher, objectMapper); + } + + /** + * Creates the EventOutboxProcessor bean for background processing of outbox entries. + */ + @Bean + @ConditionalOnMissingBean + @ConditionalOnProperty( + prefix = "firefly.eventsourcing.outbox.processor", + name = "enabled", + havingValue = "true", + matchIfMissing = false + ) + public EventOutboxProcessor eventOutboxProcessor(EventOutboxService outboxService) { + log.debug("Creating EventOutboxProcessor bean"); + return new EventOutboxProcessor(outboxService); + } } \ No newline at end of file diff --git a/src/main/java/org/fireflyframework/eventsourcing/config/EventSourcingHealthConfiguration.java b/src/main/java/org/fireflyframework/eventsourcing/config/EventSourcingHealthConfiguration.java index 86fd2bd..a05354e 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/config/EventSourcingHealthConfiguration.java +++ b/src/main/java/org/fireflyframework/eventsourcing/config/EventSourcingHealthConfiguration.java @@ -17,15 +17,28 @@ package org.fireflyframework.eventsourcing.config; import lombok.extern.slf4j.Slf4j; +import org.fireflyframework.eventsourcing.health.EventStoreHealthIndicator; +import org.fireflyframework.eventsourcing.health.OutboxHealthIndicator; +import org.fireflyframework.eventsourcing.health.ProjectionHealthIndicator; +import org.fireflyframework.eventsourcing.health.SnapshotStoreHealthIndicator; +import org.fireflyframework.eventsourcing.outbox.EventOutboxService; +import org.fireflyframework.eventsourcing.projection.ProjectionService; +import org.fireflyframework.eventsourcing.snapshot.SnapshotStore; +import org.fireflyframework.eventsourcing.store.EventStore; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.List; + /** * Auto-configuration for event sourcing health indicators. *

- * This configuration sets up health checks for event stores, - * snapshot stores, and other event sourcing components. + * Registers Spring Boot Actuator health checks for the event store, + * outbox, snapshot store, and projections when Actuator is on the classpath. */ @Configuration @ConditionalOnClass(name = "org.springframework.boot.actuator.health.HealthIndicator") @@ -37,7 +50,39 @@ public EventSourcingHealthConfiguration() { log.debug("Event Sourcing Health Configuration initialized"); } - // TODO: Add health indicator bean configurations - // @Bean - // public EventStoreHealthIndicator eventStoreHealthIndicator(EventStore eventStore) { ... } -} \ No newline at end of file + @Bean + @ConditionalOnBean(EventStore.class) + @ConditionalOnMissingBean(EventStoreHealthIndicator.class) + public EventStoreHealthIndicator eventStoreHealthIndicator(EventStore eventStore) { + log.debug("Creating EventStoreHealthIndicator bean"); + return new EventStoreHealthIndicator(eventStore); + } + + @Bean + @ConditionalOnBean(EventOutboxService.class) + @ConditionalOnMissingBean(OutboxHealthIndicator.class) + public OutboxHealthIndicator outboxHealthIndicator(EventOutboxService outboxService) { + log.debug("Creating OutboxHealthIndicator bean"); + return new OutboxHealthIndicator(outboxService); + } + + @Bean + @ConditionalOnBean(SnapshotStore.class) + @ConditionalOnMissingBean(SnapshotStoreHealthIndicator.class) + public SnapshotStoreHealthIndicator snapshotStoreHealthIndicator(SnapshotStore snapshotStore) { + log.debug("Creating SnapshotStoreHealthIndicator bean"); + return new SnapshotStoreHealthIndicator(snapshotStore); + } + + @Bean + @ConditionalOnMissingBean(ProjectionHealthIndicator.class) + public ProjectionHealthIndicator projectionHealthIndicator(List> projectionServices, + EventSourcingProjectionProperties projectionProperties) { + log.debug("Creating ProjectionHealthIndicator bean with {} projections", projectionServices.size()); + return new ProjectionHealthIndicator( + projectionServices, + projectionProperties.getHealthCheck().getTimeout(), + projectionProperties.getHealthCheck().getMaxAcceptableLag() + ); + } +} diff --git a/src/main/java/org/fireflyframework/eventsourcing/config/EventSourcingMetricsConfiguration.java b/src/main/java/org/fireflyframework/eventsourcing/config/EventSourcingMetricsConfiguration.java index 3e66c77..8e14fb9 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/config/EventSourcingMetricsConfiguration.java +++ b/src/main/java/org/fireflyframework/eventsourcing/config/EventSourcingMetricsConfiguration.java @@ -16,16 +16,26 @@ package org.fireflyframework.eventsourcing.config; +import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; +import org.fireflyframework.eventsourcing.monitoring.EventStoreMetrics; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Auto-configuration for event sourcing metrics collection. *

- * This configuration sets up Micrometer metrics for event sourcing - * operations like event appends, reads, snapshot operations, etc. + * Wires {@link EventStoreMetrics} to the Micrometer {@link MeterRegistry} when + * Micrometer is on the classpath and metrics are enabled. + * + *

Note: The {@link EventStoreMetrics} bean is also defined in + * {@link EventSourcingAutoConfiguration}. This configuration class ensures + * the metrics bean is available even when loaded through component scanning + * independently of the main auto-configuration.

*/ @Configuration @ConditionalOnClass(name = "io.micrometer.core.instrument.MeterRegistry") @@ -37,7 +47,11 @@ public EventSourcingMetricsConfiguration() { log.debug("Event Sourcing Metrics Configuration initialized"); } - // TODO: Add metrics bean configurations - // @Bean - // public EventStoreMetrics eventStoreMetrics(MeterRegistry meterRegistry) { ... } -} \ No newline at end of file + @Bean + @ConditionalOnBean(MeterRegistry.class) + @ConditionalOnMissingBean + public EventStoreMetrics eventStoreMetrics(MeterRegistry meterRegistry) { + log.info("Creating EventStoreMetrics bean via metrics configuration"); + return new EventStoreMetrics(meterRegistry); + } +} diff --git a/src/main/java/org/fireflyframework/eventsourcing/config/EventTypeRegistry.java b/src/main/java/org/fireflyframework/eventsourcing/config/EventTypeRegistry.java index c7fb4f2..523420a 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/config/EventTypeRegistry.java +++ b/src/main/java/org/fireflyframework/eventsourcing/config/EventTypeRegistry.java @@ -25,7 +25,6 @@ import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationContext; import org.springframework.context.event.EventListener; -import org.springframework.stereotype.Component; import java.util.Map; import java.util.Set; @@ -47,7 +46,6 @@ *

* The event will be automatically discovered and registered when the application starts. */ -@Component @RequiredArgsConstructor @Slf4j public class EventTypeRegistry { diff --git a/src/main/java/org/fireflyframework/eventsourcing/config/SnapshotAutoConfiguration.java b/src/main/java/org/fireflyframework/eventsourcing/config/SnapshotAutoConfiguration.java index a4fad11..28e938a 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/config/SnapshotAutoConfiguration.java +++ b/src/main/java/org/fireflyframework/eventsourcing/config/SnapshotAutoConfiguration.java @@ -16,15 +16,22 @@ package org.fireflyframework.eventsourcing.config; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; +import org.fireflyframework.eventsourcing.snapshot.SnapshotStore; +import org.fireflyframework.eventsourcing.snapshot.SnapshotTrigger; +import org.fireflyframework.eventsourcing.snapshot.r2dbc.R2dbcSnapshotStore; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.r2dbc.core.DatabaseClient; /** * Auto-configuration for snapshot stores. *

- * This configuration class sets up snapshot store implementations and - * related components like caching and cleanup schedulers. + * This configuration class sets up the R2DBC-backed snapshot store and a + * configurable snapshot trigger that creates snapshots after every N events. */ @Configuration @ConditionalOnProperty(prefix = "firefly.eventsourcing.snapshot", name = "enabled", havingValue = "true", matchIfMissing = true) @@ -35,8 +42,18 @@ public SnapshotAutoConfiguration() { log.debug("Snapshot Auto-Configuration initialized"); } - // TODO: Add snapshot store bean configurations - // @Bean - // @ConditionalOnMissingBean - // public SnapshotStore snapshotStore(...) { ... } -} \ No newline at end of file + @Bean + @ConditionalOnMissingBean + public SnapshotStore snapshotStore(DatabaseClient databaseClient, ObjectMapper objectMapper) { + log.info("Creating R2dbcSnapshotStore bean"); + return new R2dbcSnapshotStore(databaseClient, objectMapper); + } + + @Bean + @ConditionalOnMissingBean + public SnapshotTrigger snapshotTrigger(SnapshotStore snapshotStore, EventSourcingProperties properties) { + int frequency = properties.getSnapshot().getThreshold(); + log.info("Creating SnapshotTrigger bean with frequency: {} events", frequency); + return new SnapshotTrigger(snapshotStore, frequency); + } +} diff --git a/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/AccountClosedException.java b/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/AccountClosedException.java index b1fee14..b708e91 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/AccountClosedException.java +++ b/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/AccountClosedException.java @@ -16,10 +16,12 @@ package org.fireflyframework.eventsourcing.examples.ledger.exceptions; +import org.fireflyframework.kernel.exception.FireflyException; + /** * Exception thrown when attempting to perform operations on a closed account. */ -public class AccountClosedException extends RuntimeException { +public class AccountClosedException extends FireflyException { public AccountClosedException(String message) { super(message); diff --git a/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/AccountFrozenException.java b/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/AccountFrozenException.java index 2f42172..0e4e69c 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/AccountFrozenException.java +++ b/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/AccountFrozenException.java @@ -16,10 +16,12 @@ package org.fireflyframework.eventsourcing.examples.ledger.exceptions; +import org.fireflyframework.kernel.exception.FireflyException; + /** * Exception thrown when attempting to withdraw from a frozen account. */ -public class AccountFrozenException extends RuntimeException { +public class AccountFrozenException extends FireflyException { public AccountFrozenException(String message) { super(message); diff --git a/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/InsufficientFundsException.java b/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/InsufficientFundsException.java index 75181dd..8c18d4b 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/InsufficientFundsException.java +++ b/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/InsufficientFundsException.java @@ -16,10 +16,12 @@ package org.fireflyframework.eventsourcing.examples.ledger.exceptions; +import org.fireflyframework.kernel.exception.FireflyException; + /** * Exception thrown when attempting to withdraw more than the available balance. */ -public class InsufficientFundsException extends RuntimeException { +public class InsufficientFundsException extends FireflyException { public InsufficientFundsException(String message) { super(message); diff --git a/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/InvalidAmountException.java b/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/InvalidAmountException.java index a5993dc..87470e1 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/InvalidAmountException.java +++ b/src/main/java/org/fireflyframework/eventsourcing/examples/ledger/exceptions/InvalidAmountException.java @@ -16,10 +16,12 @@ package org.fireflyframework.eventsourcing.examples.ledger.exceptions; +import org.fireflyframework.kernel.exception.FireflyException; + /** * Exception thrown when an invalid amount is provided (e.g., negative or zero). */ -public class InvalidAmountException extends RuntimeException { +public class InvalidAmountException extends FireflyException { public InvalidAmountException(String message) { super(message); diff --git a/src/main/java/org/fireflyframework/eventsourcing/health/EventStoreHealthIndicator.java b/src/main/java/org/fireflyframework/eventsourcing/health/EventStoreHealthIndicator.java new file mode 100644 index 0000000..3e544ef --- /dev/null +++ b/src/main/java/org/fireflyframework/eventsourcing/health/EventStoreHealthIndicator.java @@ -0,0 +1,82 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eventsourcing.health; + +import lombok.extern.slf4j.Slf4j; +import org.fireflyframework.eventsourcing.store.EventStore; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.HealthIndicator; + +import java.time.Duration; + +/** + * Health indicator for the event store. + * Reports R2DBC connectivity, event count, and latest global sequence. + */ +@Slf4j +public class EventStoreHealthIndicator implements HealthIndicator { + + private final EventStore eventStore; + private final Duration timeout; + + public EventStoreHealthIndicator(EventStore eventStore) { + this(eventStore, Duration.ofSeconds(5)); + } + + public EventStoreHealthIndicator(EventStore eventStore, Duration timeout) { + this.eventStore = eventStore; + this.timeout = timeout; + } + + @Override + public Health health() { + try { + Boolean healthy = eventStore.isHealthy() + .timeout(timeout) + .block(); + + if (Boolean.TRUE.equals(healthy)) { + Health.Builder builder = Health.up(); + + // Attempt to fetch statistics (non-critical) + try { + var stats = eventStore.getStatistics() + .timeout(timeout) + .block(); + if (stats != null) { + builder.withDetail("totalEvents", stats.getTotalEvents()) + .withDetail("totalAggregates", stats.getTotalAggregates()) + .withDetail("currentGlobalSequence", stats.getCurrentGlobalSequence()); + } + } catch (Exception e) { + log.debug("Could not fetch event store statistics for health: {}", e.getMessage()); + } + + return builder.build(); + } + + return Health.down() + .withDetail("error", "Event store health check returned false") + .build(); + } catch (Exception e) { + log.error("Event store health check failed", e); + return Health.down() + .withDetail("error", "Health check failed: " + e.getMessage()) + .build(); + } + } +} diff --git a/src/main/java/org/fireflyframework/eventsourcing/health/OutboxHealthIndicator.java b/src/main/java/org/fireflyframework/eventsourcing/health/OutboxHealthIndicator.java new file mode 100644 index 0000000..47d66e4 --- /dev/null +++ b/src/main/java/org/fireflyframework/eventsourcing/health/OutboxHealthIndicator.java @@ -0,0 +1,89 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eventsourcing.health; + +import lombok.extern.slf4j.Slf4j; +import org.fireflyframework.eventsourcing.outbox.EventOutboxService; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.HealthIndicator; + +import java.time.Duration; + +/** + * Health indicator for the event outbox. + * Reports pending and failed entry counts, with configurable thresholds. + */ +@Slf4j +public class OutboxHealthIndicator implements HealthIndicator { + + private final EventOutboxService outboxService; + private final Duration timeout; + private final long pendingWarningThreshold; + private final long failedDownThreshold; + + public OutboxHealthIndicator(EventOutboxService outboxService) { + this(outboxService, Duration.ofSeconds(5), 1000L, 100L); + } + + public OutboxHealthIndicator(EventOutboxService outboxService, Duration timeout, + long pendingWarningThreshold, long failedDownThreshold) { + this.outboxService = outboxService; + this.timeout = timeout; + this.pendingWarningThreshold = pendingWarningThreshold; + this.failedDownThreshold = failedDownThreshold; + } + + @Override + public Health health() { + try { + var stats = outboxService.getStatistics() + .timeout(timeout) + .block(); + + if (stats == null) { + return Health.unknown() + .withDetail("error", "Could not retrieve outbox statistics") + .build(); + } + + Health.Builder builder = Health.up() + .withDetail("pendingEntries", stats.pendingCount()) + .withDetail("failedEntries", stats.failedCount()) + .withDetail("completedEntries", stats.completedCount()) + .withDetail("deadLetterEntries", stats.deadLetterCount()); + + if (stats.failedCount() > failedDownThreshold) { + return builder.down() + .withDetail("warning", "Failed entries (" + stats.failedCount() + + ") exceed threshold (" + failedDownThreshold + ")") + .build(); + } + + if (stats.pendingCount() > pendingWarningThreshold) { + builder.withDetail("warning", "Pending entries (" + stats.pendingCount() + + ") exceed threshold (" + pendingWarningThreshold + ")"); + } + + return builder.build(); + } catch (Exception e) { + log.error("Outbox health check failed", e); + return Health.down() + .withDetail("error", "Health check failed: " + e.getMessage()) + .build(); + } + } +} diff --git a/src/main/java/org/fireflyframework/eventsourcing/health/SnapshotStoreHealthIndicator.java b/src/main/java/org/fireflyframework/eventsourcing/health/SnapshotStoreHealthIndicator.java new file mode 100644 index 0000000..5a9db02 --- /dev/null +++ b/src/main/java/org/fireflyframework/eventsourcing/health/SnapshotStoreHealthIndicator.java @@ -0,0 +1,83 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eventsourcing.health; + +import lombok.extern.slf4j.Slf4j; +import org.fireflyframework.eventsourcing.snapshot.SnapshotStore; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.HealthIndicator; + +import java.time.Duration; + +/** + * Health indicator for the snapshot store. + * Reports connectivity and basic statistics. + */ +@Slf4j +public class SnapshotStoreHealthIndicator implements HealthIndicator { + + private final SnapshotStore snapshotStore; + private final Duration timeout; + + public SnapshotStoreHealthIndicator(SnapshotStore snapshotStore) { + this(snapshotStore, Duration.ofSeconds(5)); + } + + public SnapshotStoreHealthIndicator(SnapshotStore snapshotStore, Duration timeout) { + this.snapshotStore = snapshotStore; + this.timeout = timeout; + } + + @Override + public Health health() { + try { + Boolean healthy = snapshotStore.isHealthy() + .timeout(timeout) + .block(); + + if (Boolean.TRUE.equals(healthy)) { + Health.Builder builder = Health.up(); + + try { + var stats = snapshotStore.getStatistics() + .timeout(timeout) + .block(); + if (stats != null) { + builder.withDetail("totalSnapshots", stats.getTotalSnapshots()) + .withDetail("totalAggregatesWithSnapshots", stats.getTotalAggregatesWithSnapshots()); + if (stats.getTotalStorageSizeBytes() != null) { + builder.withDetail("totalStorageBytes", stats.getTotalStorageSizeBytes()); + } + } + } catch (Exception e) { + log.debug("Could not fetch snapshot statistics for health: {}", e.getMessage()); + } + + return builder.build(); + } + + return Health.down() + .withDetail("error", "Snapshot store health check returned false") + .build(); + } catch (Exception e) { + log.error("Snapshot store health check failed", e); + return Health.down() + .withDetail("error", "Health check failed: " + e.getMessage()) + .build(); + } + } +} diff --git a/src/main/java/org/fireflyframework/eventsourcing/monitoring/EventStoreMetrics.java b/src/main/java/org/fireflyframework/eventsourcing/monitoring/EventStoreMetrics.java index f1ed16d..b2c60ed 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/monitoring/EventStoreMetrics.java +++ b/src/main/java/org/fireflyframework/eventsourcing/monitoring/EventStoreMetrics.java @@ -21,8 +21,6 @@ import lombok.extern.slf4j.Slf4j; import org.fireflyframework.observability.metrics.FireflyMetricsSupport; import org.springframework.boot.actuate.health.Health; -import org.springframework.stereotype.Component; - import java.time.Duration; import java.time.Instant; import java.util.concurrent.atomic.AtomicLong; @@ -42,7 +40,6 @@ * - firefly.eventsourcing.connection.pool.active (Gauge) * - firefly.eventsourcing.batch.size (DistributionSummary) */ -@Component @Slf4j public class EventStoreMetrics extends FireflyMetricsSupport { diff --git a/src/main/java/org/fireflyframework/eventsourcing/outbox/EventOutboxProcessor.java b/src/main/java/org/fireflyframework/eventsourcing/outbox/EventOutboxProcessor.java index 66476c4..1878302 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/outbox/EventOutboxProcessor.java +++ b/src/main/java/org/fireflyframework/eventsourcing/outbox/EventOutboxProcessor.java @@ -19,9 +19,7 @@ import org.fireflyframework.eventsourcing.logging.EventSourcingLoggingContext; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; /** * Background processor for Event Outbox entries. @@ -50,13 +48,6 @@ * - Quartz for more advanced scheduling * - Dedicated outbox processor service */ -@Component -@ConditionalOnProperty( - prefix = "eventsourcing.outbox.processor", - name = "enabled", - havingValue = "true", - matchIfMissing = false -) @RequiredArgsConstructor @Slf4j public class EventOutboxProcessor { diff --git a/src/main/java/org/fireflyframework/eventsourcing/outbox/EventOutboxService.java b/src/main/java/org/fireflyframework/eventsourcing/outbox/EventOutboxService.java index 9278fcf..192eb6f 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/outbox/EventOutboxService.java +++ b/src/main/java/org/fireflyframework/eventsourcing/outbox/EventOutboxService.java @@ -26,7 +26,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; -import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -53,7 +52,6 @@ * 3. Retries failed publications with exponential backoff * 4. Marks entries as completed after successful publication */ -@Service @RequiredArgsConstructor @Slf4j public class EventOutboxService { diff --git a/src/main/java/org/fireflyframework/eventsourcing/publisher/EventPublishingException.java b/src/main/java/org/fireflyframework/eventsourcing/publisher/EventPublishingException.java index ef25883..b0d7666 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/publisher/EventPublishingException.java +++ b/src/main/java/org/fireflyframework/eventsourcing/publisher/EventPublishingException.java @@ -16,6 +16,8 @@ package org.fireflyframework.eventsourcing.publisher; +import org.fireflyframework.kernel.exception.FireflyInfrastructureException; + /** * Exception thrown when event publishing fails. *

@@ -25,7 +27,7 @@ * - Messaging infrastructure is unavailable * - Publisher configuration is invalid */ -public class EventPublishingException extends RuntimeException { +public class EventPublishingException extends FireflyInfrastructureException { public EventPublishingException(String message) { super(message); diff --git a/src/main/java/org/fireflyframework/eventsourcing/publisher/EventSourcingPublisher.java b/src/main/java/org/fireflyframework/eventsourcing/publisher/EventSourcingPublisher.java index a1012e4..aed8ac2 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/publisher/EventSourcingPublisher.java +++ b/src/main/java/org/fireflyframework/eventsourcing/publisher/EventSourcingPublisher.java @@ -112,9 +112,13 @@ public Mono publishEvents(List envelopes) { log.info("Publishing batch of {} events", envelopes.size()); return Flux.fromIterable(envelopes) - .flatMap(this::publishEvent) - .onErrorContinue((error, envelope) -> - log.error("Failed to publish event in batch: {}", envelope, error)) + .flatMap(envelope -> publishEvent(envelope) + .onErrorResume(error -> { + log.error("Failed to publish event in batch: eventId={}, aggregateId={}, eventType={}: {}", + envelope.getEventId(), envelope.getAggregateId(), + envelope.getEventType(), error.getMessage(), error); + return Mono.empty(); + })) .then() .doOnSuccess(v -> { long duration = System.currentTimeMillis() - startTime; diff --git a/src/main/java/org/fireflyframework/eventsourcing/snapshot/SnapshotException.java b/src/main/java/org/fireflyframework/eventsourcing/snapshot/SnapshotException.java index c86e8f9..4f4d32c 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/snapshot/SnapshotException.java +++ b/src/main/java/org/fireflyframework/eventsourcing/snapshot/SnapshotException.java @@ -16,6 +16,8 @@ package org.fireflyframework.eventsourcing.snapshot; +import org.fireflyframework.kernel.exception.FireflyInfrastructureException; + /** * Exception thrown when snapshot operations fail. *

@@ -26,7 +28,7 @@ * - Invalid snapshot data * - Configuration issues */ -public class SnapshotException extends RuntimeException { +public class SnapshotException extends FireflyInfrastructureException { public SnapshotException(String message) { super(message); diff --git a/src/main/java/org/fireflyframework/eventsourcing/snapshot/SnapshotTrigger.java b/src/main/java/org/fireflyframework/eventsourcing/snapshot/SnapshotTrigger.java new file mode 100644 index 0000000..5c32bd4 --- /dev/null +++ b/src/main/java/org/fireflyframework/eventsourcing/snapshot/SnapshotTrigger.java @@ -0,0 +1,90 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eventsourcing.snapshot; + +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +/** + * Determines when snapshots should be created and triggers their creation asynchronously. + * + *

After events are appended to an aggregate, the trigger checks whether the new version + * crosses the configured frequency threshold (e.g., every 100 events). If so, a snapshot + * save is initiated on a bounded-elastic scheduler to avoid blocking the main event-append path.

+ * + *

Snapshot creation is fire-and-forget from the caller's perspective — failures are logged + * but never propagate to the event-append operation.

+ */ +@Slf4j +public class SnapshotTrigger { + + private final SnapshotStore snapshotStore; + private final int frequency; + + /** + * @param snapshotStore the store to save snapshots to + * @param frequency create a snapshot every N events (e.g., 100) + */ + public SnapshotTrigger(SnapshotStore snapshotStore, int frequency) { + this.snapshotStore = snapshotStore; + this.frequency = frequency; + log.info("SnapshotTrigger initialized with frequency: {} events", frequency); + } + + /** + * Checks whether a snapshot should be taken at the given aggregate version and, + * if so, saves it asynchronously. + * + * @param snapshot the snapshot to potentially save + * @return true if the snapshot was triggered, false if skipped + */ + public boolean maybeTrigger(Snapshot snapshot) { + if (snapshot.getVersion() > 0 && snapshot.getVersion() % frequency == 0) { + log.info("Snapshot trigger fired for aggregate {} at version {} (frequency={})", + snapshot.getAggregateId(), snapshot.getVersion(), frequency); + + snapshotStore.saveSnapshot(snapshot) + .subscribeOn(Schedulers.boundedElastic()) + .doOnError(e -> log.error("Async snapshot save failed for aggregate {}: {}", + snapshot.getAggregateId(), e.getMessage())) + .subscribe(); + return true; + } + return false; + } + + /** + * Reactive variant — returns a Mono that completes after the save (or empty if no trigger). + * Useful when callers want to chain snapshot creation into a reactive pipeline. + */ + public Mono maybeTriggerReactive(Snapshot snapshot) { + if (snapshot.getVersion() > 0 && snapshot.getVersion() % frequency == 0) { + log.info("Snapshot trigger fired for aggregate {} at version {}", + snapshot.getAggregateId(), snapshot.getVersion()); + return snapshotStore.saveSnapshot(snapshot) + .doOnError(e -> log.error("Snapshot save failed for aggregate {}: {}", + snapshot.getAggregateId(), e.getMessage())) + .onErrorResume(e -> Mono.empty()); + } + return Mono.empty(); + } + + public int getFrequency() { + return frequency; + } +} diff --git a/src/main/java/org/fireflyframework/eventsourcing/snapshot/r2dbc/R2dbcSnapshotStore.java b/src/main/java/org/fireflyframework/eventsourcing/snapshot/r2dbc/R2dbcSnapshotStore.java new file mode 100644 index 0000000..ad1360c --- /dev/null +++ b/src/main/java/org/fireflyframework/eventsourcing/snapshot/r2dbc/R2dbcSnapshotStore.java @@ -0,0 +1,397 @@ +/* + * Copyright 2024-2026 Firefly Software Solutions Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.fireflyframework.eventsourcing.snapshot.r2dbc; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.fireflyframework.eventsourcing.snapshot.*; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.r2dbc.core.DatabaseClient; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +/** + * R2DBC-based implementation of {@link SnapshotStore}. + * + *

Uses {@link DatabaseClient} for reactive, non-blocking snapshot persistence + * against the {@code snapshots} table created by the V2 migration.

+ * + *

Snapshots are serialized as JSON with an embedded class name wrapper so that + * concrete {@link Snapshot} subtypes can be deserialized polymorphically without + * requiring {@code @JsonTypeInfo} on the Snapshot interface.

+ */ +@Slf4j +public class R2dbcSnapshotStore implements SnapshotStore { + + private final DatabaseClient databaseClient; + private final ObjectMapper objectMapper; + + public R2dbcSnapshotStore(DatabaseClient databaseClient, ObjectMapper objectMapper) { + this.databaseClient = databaseClient; + this.objectMapper = objectMapper; + log.info("R2dbcSnapshotStore initialized"); + } + + @Override + public Mono saveSnapshot(Snapshot snapshot) { + String snapshotData; + try { + snapshotData = serializeSnapshot(snapshot); + } catch (JsonProcessingException e) { + return Mono.error(new SnapshotException("Failed to serialize snapshot for aggregate " + + snapshot.getAggregateId(), e)); + } + + String sql = """ + INSERT INTO snapshots (aggregate_id, aggregate_type, aggregate_version, snapshot_data, created_at) + VALUES (:aggregateId, :aggregateType, :aggregateVersion, :snapshotData, :createdAt) + ON CONFLICT (aggregate_id, aggregate_type, aggregate_version) + DO UPDATE SET snapshot_data = :snapshotData, updated_at = NOW() + """; + + return databaseClient.sql(sql) + .bind("aggregateId", snapshot.getAggregateId()) + .bind("aggregateType", snapshot.getSnapshotType()) + .bind("aggregateVersion", snapshot.getVersion()) + .bind("snapshotData", snapshotData) + .bind("createdAt", snapshot.getCreatedAt()) + .fetch() + .rowsUpdated() + .doOnSuccess(rows -> log.debug("Saved snapshot for aggregate {} at version {}", + snapshot.getAggregateId(), snapshot.getVersion())) + .then(); + } + + @Override + public Mono loadLatestSnapshot(UUID aggregateId, String snapshotType) { + String sql = """ + SELECT aggregate_id, aggregate_type, aggregate_version, snapshot_data, created_at + FROM snapshots + WHERE aggregate_id = :aggregateId AND aggregate_type = :aggregateType + ORDER BY aggregate_version DESC + LIMIT 1 + """; + + return databaseClient.sql(sql) + .bind("aggregateId", aggregateId) + .bind("aggregateType", snapshotType) + .map((row, metadata) -> deserializeSnapshot( + row.get("snapshot_data", String.class), + row.get("aggregate_id", UUID.class), + row.get("aggregate_type", String.class), + row.get("aggregate_version", Long.class), + row.get("created_at", Instant.class))) + .one() + .doOnNext(s -> log.debug("Loaded latest snapshot for aggregate {} at version {}", + aggregateId, s.getVersion())); + } + + @Override + public Mono loadSnapshotAtOrBeforeVersion(UUID aggregateId, String snapshotType, long maxVersion) { + String sql = """ + SELECT aggregate_id, aggregate_type, aggregate_version, snapshot_data, created_at + FROM snapshots + WHERE aggregate_id = :aggregateId AND aggregate_type = :aggregateType + AND aggregate_version <= :maxVersion + ORDER BY aggregate_version DESC + LIMIT 1 + """; + + return databaseClient.sql(sql) + .bind("aggregateId", aggregateId) + .bind("aggregateType", snapshotType) + .bind("maxVersion", maxVersion) + .map((row, metadata) -> deserializeSnapshot( + row.get("snapshot_data", String.class), + row.get("aggregate_id", UUID.class), + row.get("aggregate_type", String.class), + row.get("aggregate_version", Long.class), + row.get("created_at", Instant.class))) + .one(); + } + + @Override + public Mono loadSnapshotAtVersion(UUID aggregateId, String snapshotType, long version) { + String sql = """ + SELECT aggregate_id, aggregate_type, aggregate_version, snapshot_data, created_at + FROM snapshots + WHERE aggregate_id = :aggregateId AND aggregate_type = :aggregateType + AND aggregate_version = :version + """; + + return databaseClient.sql(sql) + .bind("aggregateId", aggregateId) + .bind("aggregateType", snapshotType) + .bind("version", version) + .map((row, metadata) -> deserializeSnapshot( + row.get("snapshot_data", String.class), + row.get("aggregate_id", UUID.class), + row.get("aggregate_type", String.class), + row.get("aggregate_version", Long.class), + row.get("created_at", Instant.class))) + .one(); + } + + @Override + public Mono snapshotExists(UUID aggregateId, String snapshotType) { + String sql = """ + SELECT COUNT(*) as cnt FROM snapshots + WHERE aggregate_id = :aggregateId AND aggregate_type = :aggregateType + """; + + return databaseClient.sql(sql) + .bind("aggregateId", aggregateId) + .bind("aggregateType", snapshotType) + .map(row -> row.get("cnt", Long.class) > 0) + .one() + .defaultIfEmpty(false); + } + + @Override + public Mono getLatestSnapshotVersion(UUID aggregateId, String snapshotType) { + String sql = """ + SELECT COALESCE(MAX(aggregate_version), 0) as max_version FROM snapshots + WHERE aggregate_id = :aggregateId AND aggregate_type = :aggregateType + """; + + return databaseClient.sql(sql) + .bind("aggregateId", aggregateId) + .bind("aggregateType", snapshotType) + .map(row -> row.get("max_version", Long.class)) + .one() + .defaultIfEmpty(0L); + } + + @Override + public Mono deleteSnapshot(UUID aggregateId, String snapshotType, long version) { + String sql = """ + DELETE FROM snapshots + WHERE aggregate_id = :aggregateId AND aggregate_type = :aggregateType + AND aggregate_version = :version + """; + + return databaseClient.sql(sql) + .bind("aggregateId", aggregateId) + .bind("aggregateType", snapshotType) + .bind("version", version) + .fetch() + .rowsUpdated() + .then(); + } + + @Override + public Mono deleteAllSnapshots(UUID aggregateId, String snapshotType) { + String sql = """ + DELETE FROM snapshots + WHERE aggregate_id = :aggregateId AND aggregate_type = :aggregateType + """; + + return databaseClient.sql(sql) + .bind("aggregateId", aggregateId) + .bind("aggregateType", snapshotType) + .fetch() + .rowsUpdated() + .then(); + } + + @Override + public Mono deleteSnapshotsOlderThan(Instant olderThan) { + String sql = "DELETE FROM snapshots WHERE created_at < :olderThan"; + + return databaseClient.sql(sql) + .bind("olderThan", olderThan) + .fetch() + .rowsUpdated(); + } + + @Override + public Mono keepLatestSnapshots(UUID aggregateId, String snapshotType, int keepCount) { + String sql = """ + DELETE FROM snapshots + WHERE aggregate_id = :aggregateId AND aggregate_type = :aggregateType + AND aggregate_version NOT IN ( + SELECT aggregate_version FROM snapshots + WHERE aggregate_id = :aggregateId AND aggregate_type = :aggregateType + ORDER BY aggregate_version DESC + LIMIT :keepCount + ) + """; + + return databaseClient.sql(sql) + .bind("aggregateId", aggregateId) + .bind("aggregateType", snapshotType) + .bind("keepCount", keepCount) + .fetch() + .rowsUpdated(); + } + + @Override + public Flux listSnapshots(UUID aggregateId, String snapshotType) { + String sql = """ + SELECT aggregate_id, aggregate_type, aggregate_version, snapshot_data, created_at + FROM snapshots + WHERE aggregate_id = :aggregateId AND aggregate_type = :aggregateType + ORDER BY aggregate_version DESC + """; + + return databaseClient.sql(sql) + .bind("aggregateId", aggregateId) + .bind("aggregateType", snapshotType) + .map((row, metadata) -> deserializeSnapshot( + row.get("snapshot_data", String.class), + row.get("aggregate_id", UUID.class), + row.get("aggregate_type", String.class), + row.get("aggregate_version", Long.class), + row.get("created_at", Instant.class))) + .all(); + } + + @Override + public Flux listSnapshots(UUID aggregateId, String snapshotType, long fromVersion, long toVersion) { + String sql = """ + SELECT aggregate_id, aggregate_type, aggregate_version, snapshot_data, created_at + FROM snapshots + WHERE aggregate_id = :aggregateId AND aggregate_type = :aggregateType + AND aggregate_version >= :fromVersion AND aggregate_version <= :toVersion + ORDER BY aggregate_version DESC + """; + + return databaseClient.sql(sql) + .bind("aggregateId", aggregateId) + .bind("aggregateType", snapshotType) + .bind("fromVersion", fromVersion) + .bind("toVersion", toVersion) + .map((row, metadata) -> deserializeSnapshot( + row.get("snapshot_data", String.class), + row.get("aggregate_id", UUID.class), + row.get("aggregate_type", String.class), + row.get("aggregate_version", Long.class), + row.get("created_at", Instant.class))) + .all(); + } + + @Override + public Mono countSnapshots(UUID aggregateId, String snapshotType) { + String sql = """ + SELECT COUNT(*) as cnt FROM snapshots + WHERE aggregate_id = :aggregateId AND aggregate_type = :aggregateType + """; + + return databaseClient.sql(sql) + .bind("aggregateId", aggregateId) + .bind("aggregateType", snapshotType) + .map(row -> row.get("cnt", Long.class)) + .one() + .defaultIfEmpty(0L); + } + + @Override + public Mono getStatistics() { + String sql = """ + SELECT + COUNT(*) as total_snapshots, + COUNT(DISTINCT aggregate_id) as total_aggregates, + COALESCE(SUM(snapshot_size_bytes), 0) as total_size + FROM snapshots + """; + + return databaseClient.sql(sql) + .map(row -> SnapshotStatistics.builder() + .totalSnapshots(row.get("total_snapshots", Long.class)) + .totalAggregatesWithSnapshots(row.get("total_aggregates", Long.class)) + .totalStorageSizeBytes(row.get("total_size", Long.class)) + .build()) + .one(); + } + + @Override + public Mono isHealthy() { + return databaseClient.sql("SELECT 1 FROM snapshots LIMIT 1") + .fetch() + .first() + .map(row -> true) + .defaultIfEmpty(true) + .onErrorReturn(false); + } + + @Override + public Mono optimize() { + return databaseClient.sql("ANALYZE snapshots") + .fetch() + .rowsUpdated() + .doOnSuccess(v -> log.info("Snapshot table optimized")) + .then(); + } + + // --- Serialization helpers --- + + private String serializeSnapshot(Snapshot snapshot) throws JsonProcessingException { + Map wrapper = new HashMap<>(); + wrapper.put("@class", snapshot.getClass().getName()); + wrapper.put("state", objectMapper.writeValueAsString(snapshot)); + return objectMapper.writeValueAsString(wrapper); + } + + private Snapshot deserializeSnapshot(String snapshotData, UUID aggregateId, + String aggregateType, Long version, Instant createdAt) { + try { + JsonNode wrapper = objectMapper.readTree(snapshotData); + if (wrapper.has("@class") && wrapper.has("state")) { + String className = wrapper.get("@class").asText(); + Class clazz = Class.forName(className); + return (Snapshot) objectMapper.readValue(wrapper.get("state").asText(), clazz); + } + // Fallback: data stored without wrapper — return a generic snapshot + return new GenericSnapshot(aggregateId, aggregateType, version, createdAt, snapshotData); + } catch (Exception e) { + log.warn("Failed to deserialize snapshot for aggregate {}, returning generic snapshot: {}", + aggregateId, e.getMessage()); + return new GenericSnapshot(aggregateId, aggregateType, version, createdAt, snapshotData); + } + } + + /** + * Fallback snapshot when the concrete class cannot be resolved. + */ + private record GenericSnapshot( + UUID aggregateId, + String snapshotType, + long version, + Instant createdAt, + String rawData + ) implements Snapshot { + + @Override + public UUID getAggregateId() { return aggregateId; } + + @Override + public String getSnapshotType() { return snapshotType; } + + @Override + public long getVersion() { return version; } + + @Override + public Instant getCreatedAt() { return createdAt; } + } +} diff --git a/src/main/java/org/fireflyframework/eventsourcing/store/EventStoreException.java b/src/main/java/org/fireflyframework/eventsourcing/store/EventStoreException.java index 2cd8df1..069a295 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/store/EventStoreException.java +++ b/src/main/java/org/fireflyframework/eventsourcing/store/EventStoreException.java @@ -16,6 +16,8 @@ package org.fireflyframework.eventsourcing.store; +import org.fireflyframework.kernel.exception.FireflyInfrastructureException; + /** * Base exception for event store operations. *

@@ -26,7 +28,7 @@ * - Storage capacity issues * - Configuration problems */ -public class EventStoreException extends RuntimeException { +public class EventStoreException extends FireflyInfrastructureException { public EventStoreException(String message) { super(message); @@ -37,6 +39,6 @@ public EventStoreException(String message, Throwable cause) { } public EventStoreException(Throwable cause) { - super(cause); + super(cause.getMessage(), cause); } } \ No newline at end of file diff --git a/src/main/java/org/fireflyframework/eventsourcing/store/r2dbc/R2dbcEventStore.java b/src/main/java/org/fireflyframework/eventsourcing/store/r2dbc/R2dbcEventStore.java index 996255c..6029f4a 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/store/r2dbc/R2dbcEventStore.java +++ b/src/main/java/org/fireflyframework/eventsourcing/store/r2dbc/R2dbcEventStore.java @@ -348,9 +348,31 @@ public Flux streamEventsByTimeRange(Instant from, Instant to) { @Override public Flux streamEventsByMetadata(Map metadataCriteria) { - // This is a simplified implementation - in practice, you'd want more sophisticated JSON querying - log.warn("streamEventsByMetadata is not fully implemented in this version"); - return Flux.empty(); + if (metadataCriteria == null || metadataCriteria.isEmpty()) { + return streamAllEvents(); + } + + // Build a JSONB containment query: metadata @> '{"key":"value"}'::jsonb + // This uses PostgreSQL's JSONB containment operator for efficient indexed lookups. + String criteriaJson; + try { + criteriaJson = objectMapper.writeValueAsString(metadataCriteria); + } catch (JsonProcessingException e) { + return Flux.error(new EventStoreException("Failed to serialize metadata criteria", e)); + } + + String sql = """ + SELECT event_id, aggregate_id, aggregate_type, aggregate_version, global_sequence, + event_type, event_data, metadata, created_at + FROM events + WHERE metadata::jsonb @> :criteriaJson::jsonb + ORDER BY global_sequence ASC + """; + + return databaseClient.sql(sql) + .bind("criteriaJson", criteriaJson) + .map((row, metadata) -> mapToEventEnvelope(row, metadata)) + .all(); } @Override diff --git a/src/main/java/org/fireflyframework/eventsourcing/transaction/EventSourcingTransactionalAspect.java b/src/main/java/org/fireflyframework/eventsourcing/transaction/EventSourcingTransactionalAspect.java index 4052df3..27819dc 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/transaction/EventSourcingTransactionalAspect.java +++ b/src/main/java/org/fireflyframework/eventsourcing/transaction/EventSourcingTransactionalAspect.java @@ -27,9 +27,7 @@ import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.core.annotation.AnnotationUtils; -import org.springframework.stereotype.Component; import org.springframework.transaction.ReactiveTransactionManager; import org.springframework.transaction.reactive.TransactionalOperator; import org.springframework.transaction.support.DefaultTransactionDefinition; @@ -141,8 +139,6 @@ * @see org.springframework.transaction.ReactiveTransactionManager */ @Aspect -@Component -@ConditionalOnBean(ReactiveTransactionManager.class) public class EventSourcingTransactionalAspect { private static final Logger log = LoggerFactory.getLogger(EventSourcingTransactionalAspect.class); diff --git a/src/main/java/org/fireflyframework/eventsourcing/upcasting/EventUpcastingService.java b/src/main/java/org/fireflyframework/eventsourcing/upcasting/EventUpcastingService.java index 5867bd1..c12a80c 100644 --- a/src/main/java/org/fireflyframework/eventsourcing/upcasting/EventUpcastingService.java +++ b/src/main/java/org/fireflyframework/eventsourcing/upcasting/EventUpcastingService.java @@ -18,7 +18,6 @@ import org.fireflyframework.eventsourcing.domain.Event; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; import java.util.Comparator; import java.util.List; @@ -28,7 +27,6 @@ * Service for managing event upcasting. * Automatically applies registered upcasters to events when loading from the event store. */ -@Service @Slf4j public class EventUpcastingService { diff --git a/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 5909fe8..ffbda97 100644 --- a/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,6 +1,10 @@ org.fireflyframework.eventsourcing.config.R2dbcBeansAutoConfiguration +org.fireflyframework.eventsourcing.config.EventStoreAutoConfiguration +org.fireflyframework.eventsourcing.config.SnapshotAutoConfiguration org.fireflyframework.eventsourcing.config.EventSourcingAutoConfiguration org.fireflyframework.eventsourcing.config.EventSourcingProjectionAutoConfiguration +org.fireflyframework.eventsourcing.config.EventSourcingHealthConfiguration +org.fireflyframework.eventsourcing.config.EventSourcingMetricsConfiguration org.fireflyframework.eventsourcing.resilience.CircuitBreakerConfiguration org.fireflyframework.eventsourcing.tracing.OpenTelemetryConfiguration org.fireflyframework.eventsourcing.multitenancy.MultiTenancyConfiguration