Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 58 additions & 3 deletions docs/feature-walkthrough.md
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ try (ServiceScope scope = provider.createScope()) {

### Scheduling Messages

Delay message delivery by setting the scheduled enqueue time on the send or publish context or by using the `IMessageScheduler` service. External schedulers such as Quartz or Hangfire can be plugged in by providing a custom `IJobScheduler`/`JobScheduler` implementation.
Delay message delivery by setting the scheduled enqueue time on the send or publish context or by using the `IMessageScheduler` service. `IMessageScheduler` returns a `ScheduledMessageHandle` that can be used to cancel a scheduled message. External schedulers such as Quartz or Hangfire can be plugged in by providing a custom `IJobScheduler`/`JobScheduler` implementation.

#### C#

Expand All @@ -874,8 +874,12 @@ await bus.Publish(new OrderSubmitted(), ctx => ctx.SetScheduledEnqueueTime(TimeS
var endpoint = await bus.GetSendEndpoint(new Uri("queue:submit-order"));
await endpoint.Send(new SubmitOrder(), ctx => ctx.SetScheduledEnqueueTime(TimeSpan.FromSeconds(30)));

await bus.SchedulePublish(new OrderSubmitted(), TimeSpan.FromSeconds(30));
await endpoint.ScheduleSend(new SubmitOrder(), TimeSpan.FromSeconds(30));

var scheduler = provider.GetRequiredService<IMessageScheduler>();
await scheduler.SchedulePublish(new OrderSubmitted(), TimeSpan.FromSeconds(30));
var handle = await scheduler.SchedulePublish(new OrderSubmitted(), TimeSpan.FromSeconds(30));
await scheduler.CancelScheduledPublish(handle);
await scheduler.ScheduleSend(new Uri("queue:submit-order"), new SubmitOrder(), TimeSpan.FromSeconds(30));
```

Expand All @@ -887,10 +891,61 @@ SendEndpoint endpoint = bus.getSendEndpoint("queue:submit-order");
endpoint.send(new SubmitOrder(), ctx -> ctx.setScheduledEnqueueTime(Duration.ofSeconds(30))).get();

MessageScheduler scheduler = services.getService(MessageScheduler.class);
scheduler.schedulePublish(new OrderSubmitted(), Duration.ofSeconds(30)).get();
ScheduledMessageHandle handle = scheduler.schedulePublish(new OrderSubmitted(), Duration.ofSeconds(30))
.toCompletableFuture().get();
scheduler.cancelScheduledPublish(handle).toCompletableFuture().get();
scheduler.scheduleSend("queue:submit-order", new SubmitOrder(), Duration.ofSeconds(30)).get();
```

##### Custom schedulers

`AddServiceBus` registers a simple timer-based `DefaultJobScheduler`. To integrate a production scheduler such as Quartz or Hangfire, implement `IJobScheduler`/`JobScheduler` and register it so it replaces the default.

**C#**

```csharp
class HangfireJobScheduler : IJobScheduler
{
readonly IBackgroundJobClient jobs;
public HangfireJobScheduler(IBackgroundJobClient jobs) => this.jobs = jobs;

public Task<Guid> Schedule(DateTime scheduledTime, Func<CancellationToken, Task> callback, CancellationToken token = default)
{
jobs.Schedule(() => callback(token), scheduledTime);
return Task.FromResult(Guid.NewGuid());
}

public Task Cancel(Guid tokenId) => Task.CompletedTask;
}

services.AddSingleton<IJobScheduler, HangfireJobScheduler>();
services.AddServiceBus(cfg => { /* ... */ });
```

**Java**

```java
class QuartzJobScheduler implements JobScheduler {
private final Scheduler scheduler;
QuartzJobScheduler(Scheduler scheduler) { this.scheduler = scheduler; }

public CompletionStage<UUID> schedule(Instant scheduledTime,
Function<CancellationToken, CompletionStage<Void>> callback,
CancellationToken token) {
scheduler.scheduleJob(() -> callback.apply(token), Date.from(scheduledTime));
return CompletableFuture.completedFuture(UUID.randomUUID());
}

public CompletionStage<Void> cancel(UUID tokenId) {
return CompletableFuture.completedFuture(null);
}
}

ServiceCollection services = ServiceCollection.create();
services.addSingleton(JobScheduler.class, sp -> new QuartzJobScheduler(quartz));
services.addServiceBus(cfg -> { /* ... */ });
```

### Unit Testing with the In-Memory Test Harness

#### C#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,32 @@

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import com.myservicebus.tasks.CancellationToken;

public interface JobScheduler {
CompletableFuture<Void> schedule(Instant scheduledTime, Supplier<CompletableFuture<Void>> callback);
CompletionStage<UUID> schedule(Instant scheduledTime,
Function<CancellationToken, CompletionStage<Void>> callback,
CancellationToken cancellationToken);

default CompletionStage<UUID> schedule(Instant scheduledTime,
Function<CancellationToken, CompletionStage<Void>> callback) {
return schedule(scheduledTime, callback, CancellationToken.none);
}

default CompletableFuture<Void> schedule(Duration delay, Supplier<CompletableFuture<Void>> callback) {
return schedule(Instant.now().plus(delay), callback);
default CompletionStage<UUID> schedule(Duration delay,
Function<CancellationToken, CompletionStage<Void>> callback,
CancellationToken cancellationToken) {
return schedule(Instant.now().plus(delay), callback, cancellationToken);
}

default CompletionStage<UUID> schedule(Duration delay,
Function<CancellationToken, CompletionStage<Void>> callback) {
return schedule(delay, callback, CancellationToken.none);
}

CompletionStage<Void> cancel(UUID tokenId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,72 @@

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.UUID;
import java.util.concurrent.CompletionStage;

import com.myservicebus.tasks.CancellationToken;

public interface MessageScheduler {
<T> CompletableFuture<Void> schedulePublish(T message, Instant scheduledTime);
default <T> CompletableFuture<Void> schedulePublish(T message, Duration delay) {
return schedulePublish(message, Instant.now().plus(delay));
<T> CompletionStage<ScheduledMessageHandle> schedulePublish(T message,
Instant scheduledTime,
CancellationToken cancellationToken);

default <T> CompletionStage<ScheduledMessageHandle> schedulePublish(T message, Instant scheduledTime) {
return schedulePublish(message, scheduledTime, CancellationToken.none);
}

default <T> CompletionStage<ScheduledMessageHandle> schedulePublish(T message, Duration delay,
CancellationToken cancellationToken) {
return schedulePublish(message, Instant.now().plus(delay), cancellationToken);
}

default <T> CompletionStage<ScheduledMessageHandle> schedulePublish(T message, Duration delay) {
return schedulePublish(message, delay, CancellationToken.none);
}

<T> CompletionStage<ScheduledMessageHandle> scheduleSend(String destination,
T message,
Instant scheduledTime,
CancellationToken cancellationToken);

default <T> CompletionStage<ScheduledMessageHandle> scheduleSend(String destination, T message, Instant scheduledTime) {
return scheduleSend(destination, message, scheduledTime, CancellationToken.none);
}

default <T> CompletionStage<ScheduledMessageHandle> scheduleSend(String destination, T message, Duration delay,
CancellationToken cancellationToken) {
return scheduleSend(destination, message, Instant.now().plus(delay), cancellationToken);
}

default <T> CompletionStage<ScheduledMessageHandle> scheduleSend(String destination, T message, Duration delay) {
return scheduleSend(destination, message, delay, CancellationToken.none);
}
<T> CompletableFuture<Void> scheduleSend(String destination, T message, Instant scheduledTime);
default <T> CompletableFuture<Void> scheduleSend(String destination, T message, Duration delay) {
return scheduleSend(destination, message, Instant.now().plus(delay));

CompletionStage<Void> cancelScheduledPublish(UUID tokenId, CancellationToken cancellationToken);

default CompletionStage<Void> cancelScheduledPublish(UUID tokenId) {
return cancelScheduledPublish(tokenId, CancellationToken.none);
}

default CompletionStage<Void> cancelScheduledPublish(ScheduledMessageHandle handle, CancellationToken cancellationToken) {
return cancelScheduledPublish(handle.getTokenId(), cancellationToken);
}

default CompletionStage<Void> cancelScheduledPublish(ScheduledMessageHandle handle) {
return cancelScheduledPublish(handle, CancellationToken.none);
}

CompletionStage<Void> cancelScheduledSend(UUID tokenId, CancellationToken cancellationToken);

default CompletionStage<Void> cancelScheduledSend(UUID tokenId) {
return cancelScheduledSend(tokenId, CancellationToken.none);
}

default CompletionStage<Void> cancelScheduledSend(ScheduledMessageHandle handle, CancellationToken cancellationToken) {
return cancelScheduledSend(handle.getTokenId(), cancellationToken);
}

default CompletionStage<Void> cancelScheduledSend(ScheduledMessageHandle handle) {
return cancelScheduledSend(handle, CancellationToken.none);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.myservicebus;

import java.util.UUID;

public class ScheduledMessageHandle {
private final UUID tokenId;

public ScheduledMessageHandle(UUID tokenId) {
this.tokenId = tokenId;
}

public UUID getTokenId() {
return tokenId;
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package com.myservicebus;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

import java.util.concurrent.CompletableFuture;
import com.myservicebus.tasks.CancellationToken;

public class SchedulingTest {
@Test
void scheduleSend_delays_message() {
void scheduleSend_delays_message() throws Exception {
InMemoryTestHarness harness = new InMemoryTestHarness();
CompletableFuture<Void> handled = new CompletableFuture<>();
harness.registerHandler(String.class, ctx -> {
Expand All @@ -31,13 +36,13 @@ public <T> CompletableFuture<Void> publish(T message, CancellationToken token) {
new DefaultJobScheduler());
Instant start = Instant.now();
Duration delay = Duration.ofMillis(100);
scheduler.scheduleSend("loopback://localhost/queue", "hi", delay).join();
scheduler.scheduleSend("loopback://localhost/queue", "hi", delay);
handled.join();
Instant end = Instant.now();
Duration elapsed = Duration.between(start, end);
Duration tolerance = Duration.ofMillis(20);
assertTrue(elapsed.toMillis() >= delay.minus(tolerance).toMillis());
assertTrue(harness.wasConsumed(String.class));
handled.join();
}

@Test
Expand Down Expand Up @@ -69,7 +74,18 @@ void customScheduler_is_used() {
return CompletableFuture.completedFuture(null);
});

JobScheduler immediate = (time, cb) -> cb.get();
JobScheduler immediate = new JobScheduler() {

public CompletionStage<UUID> schedule(Instant time, Function<CancellationToken, CompletionStage<Void>> cb, CancellationToken token) {
cb.apply(token);
return CompletableFuture.completedFuture(UUID.randomUUID());
}


public CompletionStage<Void> cancel(UUID tokenId) {
return CompletableFuture.completedFuture(null);
}
};
MessageScheduler scheduler = new MessageSchedulerImpl(
new PublishEndpoint() {
@Override
Expand All @@ -80,11 +96,34 @@ public <T> CompletableFuture<Void> publish(T message, CancellationToken token) {
uri -> harness.getSendEndpoint(uri),
immediate);
Instant start = Instant.now();
scheduler.scheduleSend("loopback://localhost/queue", "hi", Duration.ofMillis(100)).join();
scheduler.scheduleSend("loopback://localhost/queue", "hi", Duration.ofMillis(100));
Instant end = Instant.now();

assertTrue(Duration.between(start, end).toMillis() < 100);
assertTrue(harness.wasConsumed(String.class));
handled.join();
}

@Test
void cancelScheduledSend_prevents_delivery() throws Exception {
InMemoryTestHarness harness = new InMemoryTestHarness();
harness.registerHandler(String.class, ctx -> CompletableFuture.completedFuture(null));

MessageScheduler scheduler = new MessageSchedulerImpl(
new PublishEndpoint() {
@Override
public <T> CompletableFuture<Void> publish(T message, CancellationToken token) {
return harness.send(message);
}
},
uri -> harness.getSendEndpoint(uri),
new DefaultJobScheduler());
ScheduledMessageHandle handle = scheduler
.scheduleSend("loopback://localhost/queue", "hi", Duration.ofMillis(200))
.toCompletableFuture().get();
scheduler.cancelScheduledSend(handle).toCompletableFuture().get();

TimeUnit.MILLISECONDS.sleep(300);
assertFalse(harness.wasConsumed(String.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,67 @@

import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.function.Function;

import com.myservicebus.tasks.CancellationToken;
import com.myservicebus.tasks.CancellationTokenSource;

public class DefaultJobScheduler implements JobScheduler {
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<UUID, ScheduledJob> jobs = new ConcurrentHashMap<>();

private static class ScheduledJob {
final CancellationTokenSource cts;
final ScheduledFuture<?> future;
ScheduledJob(CancellationTokenSource cts, ScheduledFuture<?> future) {
this.cts = cts;
this.future = future;
}
}

@Override
public CompletableFuture<Void> schedule(Instant scheduledTime, Supplier<CompletableFuture<Void>> callback) {
public CompletionStage<UUID> schedule(Instant scheduledTime,
Function<CancellationToken, CompletionStage<Void>> callback,
CancellationToken cancellationToken) {
UUID id = UUID.randomUUID();
Duration delay = Duration.between(Instant.now(), scheduledTime);
if (delay.isNegative()) {
delay = Duration.ZERO;
}
CompletableFuture<Void> future = new CompletableFuture<>();
CompletableFuture.delayedExecutor(delay.toMillis(), TimeUnit.MILLISECONDS)
.execute(() -> callback.get().whenComplete((r, e) -> {
if (e != null) {
future.completeExceptionally(e);
} else {
future.complete(r);
}
}));
return future;
CancellationTokenSource cts = new CancellationTokenSource();
Runnable task = () -> {
try {
if (cancellationToken.isCancelled() || cts.isCancelled()) {
return;
}
callback.apply(cts.getToken()).whenComplete((r, e) -> {
jobs.remove(id);
});
} catch (Throwable t) {
jobs.remove(id);
}
};
ScheduledFuture<?> future = executor.schedule(task, delay.toMillis(), TimeUnit.MILLISECONDS);
jobs.put(id, new ScheduledJob(cts, future));
return CompletableFuture.completedFuture(id);
}

@Override
public CompletionStage<Void> cancel(UUID tokenId) {
ScheduledJob job = jobs.remove(tokenId);
if (job != null) {
job.cts.cancel();
job.future.cancel(false);
}
return CompletableFuture.completedFuture(null);
}
}
Loading
Loading