This week's issue is sponsored by Rebus Pro. Rebus Pro is the perfect one-up for serious Rebus users - use Fleet Manager to get Slack alerts when something fails, and retry dead-lettered messages with a click of the mouse.
And by Uno Platform. Uno Platfrom is a free and open-source development platform that allows you to build single codebase applications for Windows, iOS, Android, WebAssembly, macOS, and Linux with C# and XAML. Easily create applications that look and feel native to each platform without having to rewrite your codebase from scratch.
MediatR is a popular library with a simple mediator pattern implementation in .NET.
Here's a definiton taken from MediatR's GitHub: "In-process messaging with no dependencies."
With the rise in popularity of the CQRS pattern, MediatR became the go-to library to implement commands and queries.
However, MediatR also has support for the publish-subscribe pattern using notifications.
You can publish an INotification
instance and have multiple subscribers handle the published message.
Until recently, the handlers subscribing to an INotification
message could only execute serially, one by one.
In this week's newsletter, I'll show you how to configure MediatR to execute the handlers in parallel.
Let's dive in.
How Publish-Subscribe Works With MediatR
Before I talk about notification publishing strategies, let's see how publish-subscribe works with MediatR.
You need a class implementing the INotification
interface:
public record OrderCreated(Guid OrderId) : INotification;
Then you need a respective INotificationHandler
implementation:
public class OrderCreatedHandler : INotificationHandler<OrderCreated>
{
private readonly INotificationService _notificationService;
public OrderCreatedHandler(INotificationService notificationService)
{
_notificationService = notificationService;
}
public async Task Handle(
OrderCreated notification,
CancellationToken cancellationToken)
{
await _notificationService.SendOrderCreatedEmail(
notification.OrderId,
cancellationToken);
}
}
And then you simply publish a message using either IMediator
or IPublisher
.
I prefer using the IPublisher
because it's more expressive:
await publisher.Publish(new OrderCreated(order.Id), cancellationToken);
MediatR will invoke all the respective handlers.
Introducing Notification Publisher Strategies
Before MediatR v12, the publishing strategy would invoke each handler individually.
However, there's a new interface INotificationPublisher
controlling how the handlers are called.
The default implementation of this interface is ForeachAwaitPublisher
:
public class ForeachAwaitPublisher : INotificationPublisher
{
public async Task Publish(
IEnumerable<NotificationHandlerExecutor> handlerExecutors,
INotification notification,
CancellationToken cancellationToken)
{
foreach (var handler in handlerExecutors)
{
await handler
.HandlerCallback(notification, cancellationToken)
.ConfigureAwait(false);
}
}
}
But now you can also use the TaskWhenAllPublisher
:
public class TaskWhenAllPublisher : INotificationPublisher
{
public Task Publish(
IEnumerable<NotificationHandlerExecutor> handlerExecutors,
INotification notification,
CancellationToken cancellationToken)
{
var tasks = handlerExecutors
.Select(handler => handler.HandlerCallback(
notification,
cancellationToken))
.ToArray();
return Task.WhenAll(tasks);
}
}
Here's a comparison between these two strategies.
ForeachAwaitPublisher
:
- Invokes each handler one by one
- Fails when an exception occurs in one of the handlers
TaskWhenAllPublisher
:
- Invokes all the handlers at the same time
- Executes all the handlers regardless of one of them throwing an exception
If you store the task returned by TaskWhenAllPublisher
you can access the Task.Exception
property, which will contain an AggregateException
instance.
You can then implement more robust exception handling.
Configuring MediatR Notification Publishing Strategy
How do we configure which INotificationPublisher
strategy MediatR will use?
There's a new way to apply configuration options when calling the AddMediatR
method.
You supply an Action<MediatRServiceConfiguration>
delegate and configure the MediatRServiceConfiguration
instance.
If you want to use the TaskWhenAllPublisher
strategy, you can either:
- Provide a value for the
NotificationPublisher
property - Specify the strategy type on the
NotificationPublisherType
property
services.AddMediatR(config => {
config.RegisterServicesFromAssemblyContaining<Program>();
// Setting the publisher directly will make the instance a Singleton.
config.NotificationPublisher = new TaskWhenAllPublisher();
// Seting the publisher type will:
// 1. Override the value set on NotificationPublisher
// 2. Use the service lifetime from the ServiceLifetime property below
config.NotificationPublisherType = typeof(TaskWhenAllPublisher);
config.ServiceLifetime = ServiceLifetime.Transient;
});
You can also implement a custom INotificationPublisher
instance and use your own implementation instead.
How Is This Useful?
Being able to run notification handlers in parallel provides a significant performance improvement over the default behavior.
However, note that all handlers will use the same service scope.
If you have service instances that don't support concurrent access you may run into problems.
Unfortunately, one such service instance is the EF Core DbContext
.
In any case, I think this is a great addition to the already amazing MediatR library.
That's all for today.
See you next week.