diff --git a/docs/feature-walkthrough.md b/docs/feature-walkthrough.md index 6ff4dc6..1402974 100644 --- a/docs/feature-walkthrough.md +++ b/docs/feature-walkthrough.md @@ -865,7 +865,7 @@ try (ServiceScope scope = provider.createScope()) { ### Scheduling Messages -Delay message delivery by setting the scheduled enqueue time on the send or publish context or by using the `IMessageScheduler` service. External schedulers such as Quartz or Hangfire can be plugged in by providing a custom `IJobScheduler`/`JobScheduler` implementation. +Delay message delivery by setting the scheduled enqueue time on the send or publish context or by using the `IMessageScheduler` service. `IMessageScheduler` returns a `ScheduledMessageHandle` that can be used to cancel a scheduled message. External schedulers such as Quartz or Hangfire can be plugged in by providing a custom `IJobScheduler`/`JobScheduler` implementation. #### C# @@ -874,8 +874,12 @@ await bus.Publish(new OrderSubmitted(), ctx => ctx.SetScheduledEnqueueTime(TimeS var endpoint = await bus.GetSendEndpoint(new Uri("queue:submit-order")); await endpoint.Send(new SubmitOrder(), ctx => ctx.SetScheduledEnqueueTime(TimeSpan.FromSeconds(30))); +await bus.SchedulePublish(new OrderSubmitted(), TimeSpan.FromSeconds(30)); +await endpoint.ScheduleSend(new SubmitOrder(), TimeSpan.FromSeconds(30)); + var scheduler = provider.GetRequiredService(); -await scheduler.SchedulePublish(new OrderSubmitted(), TimeSpan.FromSeconds(30)); +var handle = await scheduler.SchedulePublish(new OrderSubmitted(), TimeSpan.FromSeconds(30)); +await scheduler.CancelScheduledPublish(handle); await scheduler.ScheduleSend(new Uri("queue:submit-order"), new SubmitOrder(), TimeSpan.FromSeconds(30)); ``` @@ -887,10 +891,61 @@ SendEndpoint endpoint = bus.getSendEndpoint("queue:submit-order"); endpoint.send(new SubmitOrder(), ctx -> ctx.setScheduledEnqueueTime(Duration.ofSeconds(30))).get(); MessageScheduler scheduler = services.getService(MessageScheduler.class); -scheduler.schedulePublish(new OrderSubmitted(), Duration.ofSeconds(30)).get(); +ScheduledMessageHandle handle = scheduler.schedulePublish(new OrderSubmitted(), Duration.ofSeconds(30)) + .toCompletableFuture().get(); +scheduler.cancelScheduledPublish(handle).toCompletableFuture().get(); scheduler.scheduleSend("queue:submit-order", new SubmitOrder(), Duration.ofSeconds(30)).get(); ``` +##### Custom schedulers + +`AddServiceBus` registers a simple timer-based `DefaultJobScheduler`. To integrate a production scheduler such as Quartz or Hangfire, implement `IJobScheduler`/`JobScheduler` and register it so it replaces the default. + +**C#** + +```csharp +class HangfireJobScheduler : IJobScheduler +{ + readonly IBackgroundJobClient jobs; + public HangfireJobScheduler(IBackgroundJobClient jobs) => this.jobs = jobs; + + public Task Schedule(DateTime scheduledTime, Func callback, CancellationToken token = default) + { + jobs.Schedule(() => callback(token), scheduledTime); + return Task.FromResult(Guid.NewGuid()); + } + + public Task Cancel(Guid tokenId) => Task.CompletedTask; +} + +services.AddSingleton(); +services.AddServiceBus(cfg => { /* ... */ }); +``` + +**Java** + +```java +class QuartzJobScheduler implements JobScheduler { + private final Scheduler scheduler; + QuartzJobScheduler(Scheduler scheduler) { this.scheduler = scheduler; } + + public CompletionStage schedule(Instant scheduledTime, + Function> callback, + CancellationToken token) { + scheduler.scheduleJob(() -> callback.apply(token), Date.from(scheduledTime)); + return CompletableFuture.completedFuture(UUID.randomUUID()); + } + + public CompletionStage cancel(UUID tokenId) { + return CompletableFuture.completedFuture(null); + } +} + +ServiceCollection services = ServiceCollection.create(); +services.addSingleton(JobScheduler.class, sp -> new QuartzJobScheduler(quartz)); +services.addServiceBus(cfg -> { /* ... */ }); +``` + ### Unit Testing with the In-Memory Test Harness #### C# diff --git a/src/Java/myservicebus-abstractions/src/main/java/com/myservicebus/JobScheduler.java b/src/Java/myservicebus-abstractions/src/main/java/com/myservicebus/JobScheduler.java index 605c398..7367c1a 100644 --- a/src/Java/myservicebus-abstractions/src/main/java/com/myservicebus/JobScheduler.java +++ b/src/Java/myservicebus-abstractions/src/main/java/com/myservicebus/JobScheduler.java @@ -2,13 +2,32 @@ import java.time.Duration; import java.time.Instant; -import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; +import java.util.UUID; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; + +import com.myservicebus.tasks.CancellationToken; public interface JobScheduler { - CompletableFuture schedule(Instant scheduledTime, Supplier> callback); + CompletionStage schedule(Instant scheduledTime, + Function> callback, + CancellationToken cancellationToken); + + default CompletionStage schedule(Instant scheduledTime, + Function> callback) { + return schedule(scheduledTime, callback, CancellationToken.none); + } - default CompletableFuture schedule(Duration delay, Supplier> callback) { - return schedule(Instant.now().plus(delay), callback); + default CompletionStage schedule(Duration delay, + Function> callback, + CancellationToken cancellationToken) { + return schedule(Instant.now().plus(delay), callback, cancellationToken); } + + default CompletionStage schedule(Duration delay, + Function> callback) { + return schedule(delay, callback, CancellationToken.none); + } + + CompletionStage cancel(UUID tokenId); } diff --git a/src/Java/myservicebus-abstractions/src/main/java/com/myservicebus/MessageScheduler.java b/src/Java/myservicebus-abstractions/src/main/java/com/myservicebus/MessageScheduler.java index e59d021..4b515ea 100644 --- a/src/Java/myservicebus-abstractions/src/main/java/com/myservicebus/MessageScheduler.java +++ b/src/Java/myservicebus-abstractions/src/main/java/com/myservicebus/MessageScheduler.java @@ -2,15 +2,72 @@ import java.time.Duration; import java.time.Instant; -import java.util.concurrent.CompletableFuture; +import java.util.UUID; +import java.util.concurrent.CompletionStage; + +import com.myservicebus.tasks.CancellationToken; public interface MessageScheduler { - CompletableFuture schedulePublish(T message, Instant scheduledTime); - default CompletableFuture schedulePublish(T message, Duration delay) { - return schedulePublish(message, Instant.now().plus(delay)); + CompletionStage schedulePublish(T message, + Instant scheduledTime, + CancellationToken cancellationToken); + + default CompletionStage schedulePublish(T message, Instant scheduledTime) { + return schedulePublish(message, scheduledTime, CancellationToken.none); + } + + default CompletionStage schedulePublish(T message, Duration delay, + CancellationToken cancellationToken) { + return schedulePublish(message, Instant.now().plus(delay), cancellationToken); + } + + default CompletionStage schedulePublish(T message, Duration delay) { + return schedulePublish(message, delay, CancellationToken.none); + } + + CompletionStage scheduleSend(String destination, + T message, + Instant scheduledTime, + CancellationToken cancellationToken); + + default CompletionStage scheduleSend(String destination, T message, Instant scheduledTime) { + return scheduleSend(destination, message, scheduledTime, CancellationToken.none); + } + + default CompletionStage scheduleSend(String destination, T message, Duration delay, + CancellationToken cancellationToken) { + return scheduleSend(destination, message, Instant.now().plus(delay), cancellationToken); + } + + default CompletionStage scheduleSend(String destination, T message, Duration delay) { + return scheduleSend(destination, message, delay, CancellationToken.none); } - CompletableFuture scheduleSend(String destination, T message, Instant scheduledTime); - default CompletableFuture scheduleSend(String destination, T message, Duration delay) { - return scheduleSend(destination, message, Instant.now().plus(delay)); + + CompletionStage cancelScheduledPublish(UUID tokenId, CancellationToken cancellationToken); + + default CompletionStage cancelScheduledPublish(UUID tokenId) { + return cancelScheduledPublish(tokenId, CancellationToken.none); + } + + default CompletionStage cancelScheduledPublish(ScheduledMessageHandle handle, CancellationToken cancellationToken) { + return cancelScheduledPublish(handle.getTokenId(), cancellationToken); + } + + default CompletionStage cancelScheduledPublish(ScheduledMessageHandle handle) { + return cancelScheduledPublish(handle, CancellationToken.none); + } + + CompletionStage cancelScheduledSend(UUID tokenId, CancellationToken cancellationToken); + + default CompletionStage cancelScheduledSend(UUID tokenId) { + return cancelScheduledSend(tokenId, CancellationToken.none); + } + + default CompletionStage cancelScheduledSend(ScheduledMessageHandle handle, CancellationToken cancellationToken) { + return cancelScheduledSend(handle.getTokenId(), cancellationToken); + } + + default CompletionStage cancelScheduledSend(ScheduledMessageHandle handle) { + return cancelScheduledSend(handle, CancellationToken.none); } } diff --git a/src/Java/myservicebus-abstractions/src/main/java/com/myservicebus/ScheduledMessageHandle.java b/src/Java/myservicebus-abstractions/src/main/java/com/myservicebus/ScheduledMessageHandle.java new file mode 100644 index 0000000..86fbf2f --- /dev/null +++ b/src/Java/myservicebus-abstractions/src/main/java/com/myservicebus/ScheduledMessageHandle.java @@ -0,0 +1,15 @@ +package com.myservicebus; + +import java.util.UUID; + +public class ScheduledMessageHandle { + private final UUID tokenId; + + public ScheduledMessageHandle(UUID tokenId) { + this.tokenId = tokenId; + } + + public UUID getTokenId() { + return tokenId; + } +} diff --git a/src/Java/myservicebus-testing/src/test/java/com/myservicebus/SchedulingTest.java b/src/Java/myservicebus-testing/src/test/java/com/myservicebus/SchedulingTest.java index c16b335..29dd583 100644 --- a/src/Java/myservicebus-testing/src/test/java/com/myservicebus/SchedulingTest.java +++ b/src/Java/myservicebus-testing/src/test/java/com/myservicebus/SchedulingTest.java @@ -1,18 +1,23 @@ package com.myservicebus; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Duration; import java.time.Instant; +import java.util.UUID; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; -import java.util.concurrent.CompletableFuture; import com.myservicebus.tasks.CancellationToken; public class SchedulingTest { @Test - void scheduleSend_delays_message() { + void scheduleSend_delays_message() throws Exception { InMemoryTestHarness harness = new InMemoryTestHarness(); CompletableFuture handled = new CompletableFuture<>(); harness.registerHandler(String.class, ctx -> { @@ -31,13 +36,13 @@ public CompletableFuture publish(T message, CancellationToken token) { new DefaultJobScheduler()); Instant start = Instant.now(); Duration delay = Duration.ofMillis(100); - scheduler.scheduleSend("loopback://localhost/queue", "hi", delay).join(); + scheduler.scheduleSend("loopback://localhost/queue", "hi", delay); + handled.join(); Instant end = Instant.now(); Duration elapsed = Duration.between(start, end); Duration tolerance = Duration.ofMillis(20); assertTrue(elapsed.toMillis() >= delay.minus(tolerance).toMillis()); assertTrue(harness.wasConsumed(String.class)); - handled.join(); } @Test @@ -69,7 +74,18 @@ void customScheduler_is_used() { return CompletableFuture.completedFuture(null); }); - JobScheduler immediate = (time, cb) -> cb.get(); + JobScheduler immediate = new JobScheduler() { + + public CompletionStage schedule(Instant time, Function> cb, CancellationToken token) { + cb.apply(token); + return CompletableFuture.completedFuture(UUID.randomUUID()); + } + + + public CompletionStage cancel(UUID tokenId) { + return CompletableFuture.completedFuture(null); + } + }; MessageScheduler scheduler = new MessageSchedulerImpl( new PublishEndpoint() { @Override @@ -80,11 +96,34 @@ public CompletableFuture publish(T message, CancellationToken token) { uri -> harness.getSendEndpoint(uri), immediate); Instant start = Instant.now(); - scheduler.scheduleSend("loopback://localhost/queue", "hi", Duration.ofMillis(100)).join(); + scheduler.scheduleSend("loopback://localhost/queue", "hi", Duration.ofMillis(100)); Instant end = Instant.now(); assertTrue(Duration.between(start, end).toMillis() < 100); assertTrue(harness.wasConsumed(String.class)); handled.join(); } + + @Test + void cancelScheduledSend_prevents_delivery() throws Exception { + InMemoryTestHarness harness = new InMemoryTestHarness(); + harness.registerHandler(String.class, ctx -> CompletableFuture.completedFuture(null)); + + MessageScheduler scheduler = new MessageSchedulerImpl( + new PublishEndpoint() { + @Override + public CompletableFuture publish(T message, CancellationToken token) { + return harness.send(message); + } + }, + uri -> harness.getSendEndpoint(uri), + new DefaultJobScheduler()); + ScheduledMessageHandle handle = scheduler + .scheduleSend("loopback://localhost/queue", "hi", Duration.ofMillis(200)) + .toCompletableFuture().get(); + scheduler.cancelScheduledSend(handle).toCompletableFuture().get(); + + TimeUnit.MILLISECONDS.sleep(300); + assertFalse(harness.wasConsumed(String.class)); + } } diff --git a/src/Java/myservicebus/src/main/java/com/myservicebus/DefaultJobScheduler.java b/src/Java/myservicebus/src/main/java/com/myservicebus/DefaultJobScheduler.java index 2c29291..60f94f0 100644 --- a/src/Java/myservicebus/src/main/java/com/myservicebus/DefaultJobScheduler.java +++ b/src/Java/myservicebus/src/main/java/com/myservicebus/DefaultJobScheduler.java @@ -2,26 +2,67 @@ import java.time.Duration; import java.time.Instant; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; +import java.util.function.Function; + +import com.myservicebus.tasks.CancellationToken; +import com.myservicebus.tasks.CancellationTokenSource; public class DefaultJobScheduler implements JobScheduler { + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final ConcurrentMap jobs = new ConcurrentHashMap<>(); + + private static class ScheduledJob { + final CancellationTokenSource cts; + final ScheduledFuture future; + ScheduledJob(CancellationTokenSource cts, ScheduledFuture future) { + this.cts = cts; + this.future = future; + } + } + @Override - public CompletableFuture schedule(Instant scheduledTime, Supplier> callback) { + public CompletionStage schedule(Instant scheduledTime, + Function> callback, + CancellationToken cancellationToken) { + UUID id = UUID.randomUUID(); Duration delay = Duration.between(Instant.now(), scheduledTime); if (delay.isNegative()) { delay = Duration.ZERO; } - CompletableFuture future = new CompletableFuture<>(); - CompletableFuture.delayedExecutor(delay.toMillis(), TimeUnit.MILLISECONDS) - .execute(() -> callback.get().whenComplete((r, e) -> { - if (e != null) { - future.completeExceptionally(e); - } else { - future.complete(r); - } - })); - return future; + CancellationTokenSource cts = new CancellationTokenSource(); + Runnable task = () -> { + try { + if (cancellationToken.isCancelled() || cts.isCancelled()) { + return; + } + callback.apply(cts.getToken()).whenComplete((r, e) -> { + jobs.remove(id); + }); + } catch (Throwable t) { + jobs.remove(id); + } + }; + ScheduledFuture future = executor.schedule(task, delay.toMillis(), TimeUnit.MILLISECONDS); + jobs.put(id, new ScheduledJob(cts, future)); + return CompletableFuture.completedFuture(id); + } + + @Override + public CompletionStage cancel(UUID tokenId) { + ScheduledJob job = jobs.remove(tokenId); + if (job != null) { + job.cts.cancel(); + job.future.cancel(false); + } + return CompletableFuture.completedFuture(null); } } diff --git a/src/Java/myservicebus/src/main/java/com/myservicebus/MessageSchedulerImpl.java b/src/Java/myservicebus/src/main/java/com/myservicebus/MessageSchedulerImpl.java index 6d50e90..220c48c 100644 --- a/src/Java/myservicebus/src/main/java/com/myservicebus/MessageSchedulerImpl.java +++ b/src/Java/myservicebus/src/main/java/com/myservicebus/MessageSchedulerImpl.java @@ -1,29 +1,50 @@ package com.myservicebus; import java.time.Instant; -import java.util.concurrent.CompletableFuture; +import java.util.UUID; +import java.util.concurrent.CompletionStage; + +import com.myservicebus.tasks.CancellationToken; public class MessageSchedulerImpl implements MessageScheduler { private final PublishEndpoint publishEndpoint; private final SendEndpointProvider sendEndpointProvider; private final JobScheduler jobScheduler; - public MessageSchedulerImpl(PublishEndpoint publishEndpoint, SendEndpointProvider sendEndpointProvider, JobScheduler jobScheduler) { + public MessageSchedulerImpl(PublishEndpoint publishEndpoint, + SendEndpointProvider sendEndpointProvider, + JobScheduler jobScheduler) { this.publishEndpoint = publishEndpoint; this.sendEndpointProvider = sendEndpointProvider; this.jobScheduler = jobScheduler; } @Override - public CompletableFuture schedulePublish(T message, Instant scheduledTime) { - return jobScheduler.schedule(scheduledTime, () -> publishEndpoint.publish(message)); + public CompletionStage schedulePublish(T message, + Instant scheduledTime, + CancellationToken cancellationToken) { + return jobScheduler.schedule(scheduledTime, token -> publishEndpoint.publish(message, token), cancellationToken) + .thenApply(ScheduledMessageHandle::new); } @Override - public CompletableFuture scheduleSend(String destination, T message, Instant scheduledTime) { - return jobScheduler.schedule(scheduledTime, () -> { + public CompletionStage scheduleSend(String destination, + T message, + Instant scheduledTime, + CancellationToken cancellationToken) { + return jobScheduler.schedule(scheduledTime, token -> { SendEndpoint endpoint = sendEndpointProvider.getSendEndpoint(destination); - return endpoint.send(message); - }); + return endpoint.send(message, token); + }, cancellationToken).thenApply(ScheduledMessageHandle::new); + } + + @Override + public CompletionStage cancelScheduledPublish(UUID tokenId, CancellationToken cancellationToken) { + return jobScheduler.cancel(tokenId); + } + + @Override + public CompletionStage cancelScheduledSend(UUID tokenId, CancellationToken cancellationToken) { + return jobScheduler.cancel(tokenId); } } diff --git a/src/MyServiceBus.Abstractions/IJobScheduler.cs b/src/MyServiceBus.Abstractions/IJobScheduler.cs index 9d1e389..ae63a2f 100644 --- a/src/MyServiceBus.Abstractions/IJobScheduler.cs +++ b/src/MyServiceBus.Abstractions/IJobScheduler.cs @@ -6,7 +6,8 @@ namespace MyServiceBus; public interface IJobScheduler { - Task Schedule(DateTime scheduledTime, Func callback, CancellationToken cancellationToken = default); - Task Schedule(TimeSpan delay, Func callback, CancellationToken cancellationToken = default) => + Task Schedule(DateTime scheduledTime, Func callback, CancellationToken cancellationToken = default); + Task Schedule(TimeSpan delay, Func callback, CancellationToken cancellationToken = default) => Schedule(DateTime.UtcNow + delay, callback, cancellationToken); + Task Cancel(Guid tokenId); } diff --git a/src/MyServiceBus.Abstractions/IMessageScheduler.cs b/src/MyServiceBus.Abstractions/IMessageScheduler.cs index ef7e658..9e2bf54 100644 --- a/src/MyServiceBus.Abstractions/IMessageScheduler.cs +++ b/src/MyServiceBus.Abstractions/IMessageScheduler.cs @@ -6,8 +6,14 @@ namespace MyServiceBus; public interface IMessageScheduler { - Task SchedulePublish(T message, DateTime scheduledTime, CancellationToken cancellationToken = default) where T : class; - Task SchedulePublish(T message, TimeSpan delay, CancellationToken cancellationToken = default) where T : class; - Task ScheduleSend(Uri destination, T message, DateTime scheduledTime, CancellationToken cancellationToken = default) where T : class; - Task ScheduleSend(Uri destination, T message, TimeSpan delay, CancellationToken cancellationToken = default) where T : class; + Task SchedulePublish(T message, DateTime scheduledTime, CancellationToken cancellationToken = default) where T : class; + Task SchedulePublish(T message, TimeSpan delay, CancellationToken cancellationToken = default) where T : class; + Task ScheduleSend(Uri destination, T message, DateTime scheduledTime, CancellationToken cancellationToken = default) where T : class; + Task ScheduleSend(Uri destination, T message, TimeSpan delay, CancellationToken cancellationToken = default) where T : class; + Task CancelScheduledPublish(Guid tokenId, CancellationToken cancellationToken = default); + Task CancelScheduledPublish(ScheduledMessageHandle handle, CancellationToken cancellationToken = default) + => CancelScheduledPublish(handle.TokenId, cancellationToken); + Task CancelScheduledSend(Guid tokenId, CancellationToken cancellationToken = default); + Task CancelScheduledSend(ScheduledMessageHandle handle, CancellationToken cancellationToken = default) + => CancelScheduledSend(handle.TokenId, cancellationToken); } diff --git a/src/MyServiceBus.Abstractions/ScheduledMessageHandle.cs b/src/MyServiceBus.Abstractions/ScheduledMessageHandle.cs new file mode 100644 index 0000000..b0850f1 --- /dev/null +++ b/src/MyServiceBus.Abstractions/ScheduledMessageHandle.cs @@ -0,0 +1,5 @@ +using System; + +namespace MyServiceBus; + +public record ScheduledMessageHandle(Guid TokenId, DateTime ScheduledTime); diff --git a/src/MyServiceBus/DefaultJobScheduler.cs b/src/MyServiceBus/DefaultJobScheduler.cs index 190eb1a..92c342a 100644 --- a/src/MyServiceBus/DefaultJobScheduler.cs +++ b/src/MyServiceBus/DefaultJobScheduler.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; @@ -6,15 +7,49 @@ namespace MyServiceBus; public class DefaultJobScheduler : IJobScheduler { - public async Task Schedule(DateTime scheduledTime, Func callback, CancellationToken cancellationToken = default) - { - var delay = scheduledTime - DateTime.UtcNow; - if (delay > TimeSpan.Zero) - await Task.Delay(delay, cancellationToken).ConfigureAwait(false); + private readonly ConcurrentDictionary _jobs = new(); - await callback(cancellationToken).ConfigureAwait(false); + public Task Schedule(DateTime scheduledTime, Func callback, CancellationToken cancellationToken = default) + { + var id = Guid.NewGuid(); + var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + _jobs[id] = cts; + _ = Execute(id, scheduledTime, callback, cts); + return Task.FromResult(id); } - public Task Schedule(TimeSpan delay, Func callback, CancellationToken cancellationToken = default) + public Task Schedule(TimeSpan delay, Func callback, CancellationToken cancellationToken = default) => Schedule(DateTime.UtcNow + delay, callback, cancellationToken); + + [Throws(typeof(AggregateException))] + public Task Cancel(Guid tokenId) + { + if (_jobs.TryRemove(tokenId, out var cts)) + { + cts.Cancel(); + cts.Dispose(); + } + return Task.CompletedTask; + } + + private async Task Execute(Guid id, DateTime scheduledTime, Func callback, CancellationTokenSource cts) + { + try + { + var delay = scheduledTime - DateTime.UtcNow; + if (delay > TimeSpan.Zero) + await Task.Delay(delay, cts.Token).ConfigureAwait(false); + + if (!cts.IsCancellationRequested) + await callback(cts.Token).ConfigureAwait(false); + } + catch (TaskCanceledException) + { + } + finally + { + _jobs.TryRemove(id, out _); + cts.Dispose(); + } + } } diff --git a/src/MyServiceBus/MessageScheduler.cs b/src/MyServiceBus/MessageScheduler.cs index 1194c91..db8e43d 100644 --- a/src/MyServiceBus/MessageScheduler.cs +++ b/src/MyServiceBus/MessageScheduler.cs @@ -17,19 +17,38 @@ public MessageScheduler(IPublishEndpoint publishEndpoint, ISendEndpointProvider _jobScheduler = jobScheduler; } - public Task SchedulePublish(T message, DateTime scheduledTime, CancellationToken cancellationToken = default) where T : class - => _jobScheduler.Schedule(scheduledTime, ct => _publishEndpoint.Publish(message, cancellationToken: ct), cancellationToken); + public async Task SchedulePublish(T message, DateTime scheduledTime, CancellationToken cancellationToken = default) where T : class + { + var tokenId = await _jobScheduler.Schedule(scheduledTime, ct => _publishEndpoint.Publish(message, cancellationToken: ct), cancellationToken); + return new ScheduledMessageHandle(tokenId, scheduledTime); + } - public Task SchedulePublish(T message, TimeSpan delay, CancellationToken cancellationToken = default) where T : class - => _jobScheduler.Schedule(delay, ct => _publishEndpoint.Publish(message, cancellationToken: ct), cancellationToken); + public Task SchedulePublish(T message, TimeSpan delay, CancellationToken cancellationToken = default) where T : class + => SchedulePublish(message, DateTime.UtcNow + delay, cancellationToken); - public Task ScheduleSend(Uri destination, T message, DateTime scheduledTime, CancellationToken cancellationToken = default) where T : class - => _jobScheduler.Schedule(scheduledTime, async ct => + public async Task ScheduleSend(Uri destination, T message, DateTime scheduledTime, CancellationToken cancellationToken = default) where T : class + { + async Task Callback(CancellationToken ct) { var endpoint = await _sendEndpointProvider.GetSendEndpoint(destination); await endpoint.Send(message, cancellationToken: ct); - }, cancellationToken); + } + var tokenId = await _jobScheduler.Schedule(scheduledTime, Callback, cancellationToken); + return new ScheduledMessageHandle(tokenId, scheduledTime); + } - public Task ScheduleSend(Uri destination, T message, TimeSpan delay, CancellationToken cancellationToken = default) where T : class + public Task ScheduleSend(Uri destination, T message, TimeSpan delay, CancellationToken cancellationToken = default) where T : class => ScheduleSend(destination, message, DateTime.UtcNow + delay, cancellationToken); + + public Task CancelScheduledPublish(Guid tokenId, CancellationToken cancellationToken = default) + => _jobScheduler.Cancel(tokenId); + + public Task CancelScheduledPublish(ScheduledMessageHandle handle, CancellationToken cancellationToken = default) + => CancelScheduledPublish(handle.TokenId, cancellationToken); + + public Task CancelScheduledSend(Guid tokenId, CancellationToken cancellationToken = default) + => _jobScheduler.Cancel(tokenId); + + public Task CancelScheduledSend(ScheduledMessageHandle handle, CancellationToken cancellationToken = default) + => CancelScheduledSend(handle.TokenId, cancellationToken); } diff --git a/src/MyServiceBus/ScheduleExtensions.cs b/src/MyServiceBus/ScheduleExtensions.cs new file mode 100644 index 0000000..ba5daae --- /dev/null +++ b/src/MyServiceBus/ScheduleExtensions.cs @@ -0,0 +1,36 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace MyServiceBus; + +public static class ScheduleExtensions +{ + public static Task SchedulePublish(this IPublishEndpoint endpoint, T message, DateTime scheduledTime, CancellationToken cancellationToken = default) + where T : class + => endpoint.Publish(message, ctx => ctx.SetScheduledEnqueueTime(scheduledTime), cancellationToken); + + public static Task SchedulePublish(this IPublishEndpoint endpoint, T message, TimeSpan delay, CancellationToken cancellationToken = default) + where T : class + => endpoint.Publish(message, ctx => ctx.SetScheduledEnqueueTime(delay), cancellationToken); + + public static Task ScheduleSend(this ISendEndpoint endpoint, T message, DateTime scheduledTime, CancellationToken cancellationToken = default) + where T : class + => endpoint.Send(message, ctx => ctx.SetScheduledEnqueueTime(scheduledTime), cancellationToken); + + public static Task ScheduleSend(this ISendEndpoint endpoint, T message, TimeSpan delay, CancellationToken cancellationToken = default) + where T : class + => endpoint.Send(message, ctx => ctx.SetScheduledEnqueueTime(delay), cancellationToken); + + public static Task CancelScheduledPublish(this IPublishEndpoint endpoint, IMessageScheduler scheduler, Guid tokenId, CancellationToken cancellationToken = default) + => scheduler.CancelScheduledPublish(tokenId, cancellationToken); + + public static Task CancelScheduledPublish(this IPublishEndpoint endpoint, IMessageScheduler scheduler, ScheduledMessageHandle handle, CancellationToken cancellationToken = default) + => scheduler.CancelScheduledPublish(handle, cancellationToken); + + public static Task CancelScheduledSend(this ISendEndpoint endpoint, IMessageScheduler scheduler, Guid tokenId, CancellationToken cancellationToken = default) + => scheduler.CancelScheduledSend(tokenId, cancellationToken); + + public static Task CancelScheduledSend(this ISendEndpoint endpoint, IMessageScheduler scheduler, ScheduledMessageHandle handle, CancellationToken cancellationToken = default) + => scheduler.CancelScheduledSend(handle, cancellationToken); +} diff --git a/test/MyServiceBus.Tests/SchedulingTests.cs b/test/MyServiceBus.Tests/SchedulingTests.cs index 2e0ba63..fee46ee 100644 --- a/test/MyServiceBus.Tests/SchedulingTests.cs +++ b/test/MyServiceBus.Tests/SchedulingTests.cs @@ -17,20 +17,74 @@ class TestMessage { } class TestConsumer : IConsumer { public static int Received; + public static TaskCompletionSource? Completed; public Task Consume(ConsumeContext context) { Received++; + Completed?.TrySetResult(DateTime.UtcNow); return Task.CompletedTask; } } class ImmediateJobScheduler : IJobScheduler { - public Task Schedule(DateTime scheduledTime, Func callback, CancellationToken cancellationToken = default) - => callback(cancellationToken); + public Task Schedule(DateTime scheduledTime, Func callback, CancellationToken cancellationToken = default) + { + _ = callback(cancellationToken); + return Task.FromResult(Guid.NewGuid()); + } + + public Task Schedule(TimeSpan delay, Func callback, CancellationToken cancellationToken = default) + { + _ = callback(cancellationToken); + return Task.FromResult(Guid.NewGuid()); + } + + public Task Cancel(Guid tokenId) => Task.CompletedTask; + } + + class StubSendContext : IPublishContext + { + public string MessageId { get; set; } = string.Empty; + public string RoutingKey { get; set; } = string.Empty; + public IDictionary Headers { get; } = new Dictionary(); + public string? CorrelationId { get; set; } + public Uri? ResponseAddress { get; set; } + public Uri? FaultAddress { get; set; } + public DateTime? ScheduledEnqueueTime { get; set; } + public CancellationToken CancellationToken { get; } = CancellationToken.None; + } + + class StubPublishEndpoint : IPublishEndpoint + { + public StubSendContext? Context; + + public Task Publish(object message, Action? contextCallback = null, CancellationToken cancellationToken = default) where T : class + => Publish((T)message, contextCallback, cancellationToken); + + public Task Publish(T message, Action? contextCallback = null, CancellationToken cancellationToken = default) where T : class + { + var ctx = new StubSendContext(); + contextCallback?.Invoke(ctx); + Context = ctx; + return Task.CompletedTask; + } + } + + class StubSendEndpoint : ISendEndpoint + { + public StubSendContext? Context; + + public Task Send(T message, Action? contextCallback = null, CancellationToken cancellationToken = default) where T : class + { + var ctx = new StubSendContext(); + contextCallback?.Invoke(ctx); + Context = ctx; + return Task.CompletedTask; + } - public Task Schedule(TimeSpan delay, Func callback, CancellationToken cancellationToken = default) - => callback(cancellationToken); + public Task Send(object message, Action? contextCallback = null, CancellationToken cancellationToken = default) where T : class + => Send((T)message, contextCallback, cancellationToken); } [Fact] @@ -51,14 +105,15 @@ public async Task SchedulePublish_delays_message() var scheduler = provider.GetRequiredService(); TestConsumer.Received = 0; + TestConsumer.Completed = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var delay = TimeSpan.FromMilliseconds(100); - var sw = Stopwatch.StartNew(); + var before = DateTime.UtcNow; await scheduler.SchedulePublish(new TestMessage(), delay); - sw.Stop(); - + var consumedAt = await TestConsumer.Completed.Task; var tolerance = TimeSpan.FromMilliseconds(20); - Assert.True(sw.Elapsed >= delay - tolerance); + Assert.True(consumedAt - before >= delay - tolerance); Assert.Equal(1, TestConsumer.Received); + TestConsumer.Completed = null; await hosted.StopAsync(CancellationToken.None); } @@ -112,13 +167,69 @@ public async Task Custom_scheduler_is_used() var scheduler = provider.GetRequiredService(); TestConsumer.Received = 0; + TestConsumer.Completed = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var delay = TimeSpan.FromSeconds(1); - var sw = Stopwatch.StartNew(); await scheduler.SchedulePublish(new TestMessage(), delay); - sw.Stop(); - - Assert.True(sw.Elapsed < delay); + await TestConsumer.Completed.Task; // should complete immediately + Assert.True(TestConsumer.Completed.Task.IsCompleted); Assert.Equal(1, TestConsumer.Received); + TestConsumer.Completed = null; + + await hosted.StopAsync(CancellationToken.None); + } + + [Fact] + [Throws(typeof(NotNullException), typeof(InRangeException))] + public async Task Publish_extension_sets_scheduled_time() + { + var endpoint = new StubPublishEndpoint(); + var delay = TimeSpan.FromMilliseconds(100); + var before = DateTime.UtcNow; + await endpoint.SchedulePublish(new TestMessage(), delay); + + Assert.NotNull(endpoint.Context); + var scheduled = endpoint.Context!.ScheduledEnqueueTime; + var tolerance = TimeSpan.FromMilliseconds(50); + Assert.InRange(scheduled!.Value, before + delay - tolerance, before + delay + tolerance); + } + + [Fact] + [Throws(typeof(NotNullException), typeof(InRangeException))] + public async Task Send_extension_sets_scheduled_time() + { + var endpoint = new StubSendEndpoint(); + var delay = TimeSpan.FromMilliseconds(100); + var before = DateTime.UtcNow; + await endpoint.ScheduleSend(new TestMessage(), delay); + + Assert.NotNull(endpoint.Context); + var scheduled = endpoint.Context!.ScheduledEnqueueTime; + var tolerance = TimeSpan.FromMilliseconds(50); + Assert.InRange(scheduled!.Value, before + delay - tolerance, before + delay + tolerance); + } + + [Fact] + public async Task Cancel_prevents_scheduled_publish() + { + var services = new ServiceCollection(); + services.AddLogging(); + services.AddServiceBus(cfg => + { + cfg.UsingMediator(); + cfg.AddConsumer(); + }); + + await using var provider = services.BuildServiceProvider(); + var hosted = provider.GetRequiredService(); + await hosted.StartAsync(CancellationToken.None); + + var scheduler = provider.GetRequiredService(); + TestConsumer.Received = 0; + var delay = TimeSpan.FromMilliseconds(200); + var handle = await scheduler.SchedulePublish(new TestMessage(), delay); + await scheduler.CancelScheduledPublish(handle); + await Task.Delay(delay + TimeSpan.FromMilliseconds(50)); + Assert.Equal(0, TestConsumer.Received); await hosted.StopAsync(CancellationToken.None); }