How To Publish MediatR Notifications In Parallel

How To Publish MediatR Notifications In Parallel

3 min read ·

This week's issue is sponsored by Rebus Pro. Rebus Pro is the perfect one-up for serious Rebus users - use Fleet Manager to get Slack alerts when something fails, and retry dead-lettered messages with a click of the mouse.

And by Uno Platform. Uno Platfrom is a free and open-source development platform that allows you to build single codebase applications for Windows, iOS, Android, WebAssembly, macOS, and Linux with C# and XAML. Easily create applications that look and feel native to each platform without having to rewrite your codebase from scratch.

MediatR is a popular library with a simple mediator pattern implementation in .NET.

Here's a definiton taken from MediatR's GitHub: "In-process messaging with no dependencies."

With the rise in popularity of the CQRS pattern, MediatR became the go-to library to implement commands and queries.

However, MediatR also has support for the publish-subscribe pattern using notifications. You can publish an INotification instance and have multiple subscribers handle the published message.

Until recently, the handlers subscribing to an INotification message could only execute serially, one by one.

In this week's newsletter, I'll show you how to configure MediatR to execute the handlers in parallel.

Let's dive in.

How Publish-Subscribe Works With MediatR

Before I talk about notification publishing strategies, let's see how publish-subscribe works with MediatR.

You need a class implementing the INotification interface:

public record OrderCreated(Guid OrderId) : INotification;

Then you need a respective INotificationHandler implementation:

public class OrderCreatedHandler : INotificationHandler<OrderCreated>
{
    private readonly INotificationService _notificationService;

    public OrderCreatedHandler(INotificationService notificationService)
    {
        _notificationService = notificationService;
    }

    public async Task Handle(
        OrderCreated notification,
        CancellationToken cancellationToken)
    {
        await _notificationService.SendOrderCreatedEmail(
            notification.OrderId,
            cancellationToken);
    }
}

And then you simply publish a message using either IMediator or IPublisher. I prefer using the IPublisher because it's more expressive:

await publisher.Publish(new OrderCreated(order.Id), cancellationToken);

MediatR will invoke all the respective handlers.

Introducing Notification Publisher Strategies

Before MediatR v12, the publishing strategy would invoke each handler individually.

However, there's a new interface INotificationPublisher controlling how the handlers are called.

The default implementation of this interface is ForeachAwaitPublisher:

public class ForeachAwaitPublisher : INotificationPublisher
{
    public async Task Publish(
        IEnumerable<NotificationHandlerExecutor> handlerExecutors,
        INotification notification,
        CancellationToken cancellationToken)
    {
        foreach (var handler in handlerExecutors)
        {
            await handler
                .HandlerCallback(notification, cancellationToken)
                .ConfigureAwait(false);
        }
    }
}

But now you can also use the TaskWhenAllPublisher:

public class TaskWhenAllPublisher : INotificationPublisher
{
    public Task Publish(
        IEnumerable<NotificationHandlerExecutor> handlerExecutors,
        INotification notification,
        CancellationToken cancellationToken)
    {
        var tasks = handlerExecutors
            .Select(handler => handler.HandlerCallback(
                notification,
                cancellationToken))
            .ToArray();

        return Task.WhenAll(tasks);
    }
}

Here's a comparison between these two strategies.

ForeachAwaitPublisher:

  • Invokes each handler one by one
  • Fails when an exception occurs in one of the handlers

TaskWhenAllPublisher:

  • Invokes all the handlers at the same time
  • Executes all the handlers regardless of one of them throwing an exception

If you store the task returned by TaskWhenAllPublisher you can access the Task.Exception property, which will contain an AggregateException instance. You can then implement more robust exception handling.

Configuring MediatR Notification Publishing Strategy

How do we configure which INotificationPublisher strategy MediatR will use?

There's a new way to apply configuration options when calling the AddMediatR method.

You supply an Action<MediatRServiceConfiguration> delegate and configure the MediatRServiceConfiguration instance.

If you want to use the TaskWhenAllPublisher strategy, you can either:

  • Provide a value for the NotificationPublisher property
  • Specify the strategy type on the NotificationPublisherType property
services.AddMediatR(config => {
    config.RegisterServicesFromAssemblyContaining<Program>();

    // Setting the publisher directly will make the instance a Singleton.
    config.NotificationPublisher = new TaskWhenAllPublisher();

    // Seting the publisher type will:
    // 1. Override the value set on NotificationPublisher
    // 2. Use the service lifetime from the ServiceLifetime property below
    config.NotificationPublisherType = typeof(TaskWhenAllPublisher);

    config.ServiceLifetime = ServiceLifetime.Transient;
});

You can also implement a custom INotificationPublisher instance and use your own implementation instead.

How Is This Useful?

Being able to run notification handlers in parallel provides a significant performance improvement over the default behavior.

However, note that all handlers will use the same service scope.

If you have service instances that don't support concurrent access you may run into problems.

Unfortunately, one such service instance is the EF Core DbContext.

In any case, I think this is a great addition to the already amazing MediatR library.

That's all for today.

See you next week.


Whenever you're ready, there are 4 ways I can help you:

  1. (COMING SOON) Pragmatic REST APIs: You will learn how to build production-ready REST APIs using the latest ASP.NET Core features and best practices. It includes a fully functional UI application that we'll integrate with the REST API. Join the waitlist!
  2. Pragmatic Clean Architecture: Join 3,700+ students in this comprehensive course that will teach you the system I use to ship production-ready applications using Clean Architecture. Learn how to apply the best practices of modern software architecture.
  3. Modular Monolith Architecture: Join 1,600+ engineers in this in-depth course that will transform the way you build modern systems. You will learn the best practices for applying the Modular Monolith architecture in a real-world scenario.
  4. Patreon Community: Join a community of 1,000+ engineers and software architects. You will also unlock access to the source code I use in my YouTube videos, early access to future videos, and exclusive discounts for my courses.

Become a Better .NET Software Engineer

Join 61,000+ engineers who are improving their skills every Saturday morning.