Implementing The Saga Pattern With Rebus And RabbitMQ

Implementing The Saga Pattern With Rebus And RabbitMQ

5 min read ·

This week's issue is sponsored by QueueExplorer. QueueExplorer is an explorer-like management console for RabbitMQ, Azure Service Bus, MSMQ, and ActiveMQ. With QueueExplorer you can see what happens in your queues, edit and fix problematic messages, extract business data from JSON or XML payloads, or send thousands of messages for stress testing.

And by Rebus Pro. Rebus is a free .NET "service bus", and Rebus Pro is the perfect one-up for serious Rebus users. Use Fleet Manager to get Slack alerts when something fails and retry dead-lettered messages with a click of the mouse.

Designing long-lived processes in a distributed environment is an interesting engineering challenge.

And a well known pattern for solving this problem is a Saga.

A Saga is a sequence of local transactions, where each local transaction updates the Saga state and publishes a message triggering the next step in the Saga.

Sagas come in two forms:

  • Orchestrated
  • Choreographed

With an orchestrated Saga, there's a central component responsible for orchestrating the individual steps.

In a choreographed Saga, processes work independently but coordinate with each other using events.

In this week's issue, I'll show you how to create an orchestrated Saga using the Rebus library with RabbitMQ for message transport.

Let's dive in.

Rebus Configuration

Rebus is a free .NET “service bus”, and it's practical for implementing asynchronous messaging-based communication between the components of an application.

Let's install the following libraries:

  • Rebus.ServiceProvider for managing the Rebus instance
  • Rebus.RabbitMq for RabbitMQ message transport
  • Rebus.SqlServer for SQL Server state persistence
Install-Package Rebus.ServiceProvider -Version 8.4.0
Install-Package Rebus.RabbitMq -Version 8.0.0
Install-Package Rebus.SqlServer -Version 7.3.1

Inside of your ASP.NET Core application you will need the following configuration.

services.AddRebus(
    rebus => rebus
        .Routing(r =>
            r.TypeBased().MapAssemblyOf<Program>("newsletter-queue"))
        .Transport(t =>
            t.UseRabbitMq(
                configuration.GetConnectionString("RabbitMq"),
                inputQueueName: "newsletter-queue"))
        .Sagas(s =>
            s.StoreInSqlServer(
                configuration.GetConnectionString("SqlServer"),
                dataTableName: "Sagas",
                indexTableName: "SagaIndexes"))
        .Timeouts(t =>
            t.StoreInSqlServer(
                builder.Configuration.GetConnectionString("SqlServer"),
                tableName: "Timeouts"))
);

services.AutoRegisterHandlersFromAssemblyOf<Program>();

Unpacking the individual configuration steps:

  • Routing - Configures messages to be routed by their type
  • Transport - Configures the message transport mechanism
  • Sagas - Configures the Saga persistence store
  • Timeouts - Configures the timeouts persistence store

You also need to specify the queue name for sending and receiving messages.

AutoRegisterHandlersFromAssemblyOf will scan the specified assembly and automatically register the respective message handlers.

Creating The Saga With Rebus

We're going to create a Saga for a newsletter onboarding process.

When a user subscribes to the newsletter we want to:

  • Send a welcome email immediately
  • Send a follow-up email after 7 days

The first step in creating the Saga is defining the data model by implementing ISagaData. We'll keep it simple and store the Email for correlation, and add two flags for the distinct steps in our Saga.

public class NewsletterOnboardingSagaData : ISagaData
{
    public Guid Id { get; set; }
    public int Revision { get; set; }

    public string Email { get; set; }

    public bool WelcomeEmailSent { get; set; }

    public bool FollowUpEmailSent { get; set; }
}

Now we can define the NewsletterOnboardingSaga class by inheriting from the Saga class and implementing the CorrelateMessages method.

It's a best practice to use a unique value for correlation. In our case this will be the Email.

You also configure how the Saga starts with IAmInitiatedBy, and the individual messages the Saga handles with IHandleMessages.

public class NewsletterOnboardingSaga :
    Saga<NewsletterOnboardingSagaData>,
    IAmInitiatedBy<SubscribeToNewsletter>,
    IHandleMessages<WelcomeEmailSent>,
    IHandleMessages<FollowUpEmailSent>
{
    private readonly IBus _bus;

    public NewsletterOnboardingSaga(IBus bus)
    {
        _bus = bus;
    }

    protected override void CorrelateMessages(
        ICorrelationConfig<NewsletterOnboardingSagaData> config)
    {
        config.Correlate<SubscribeToNewsletter>(m => m.Email, d => d.Email);

        config.Correlate<WelcomeEmailSent>(m => m.Email, d => d.Email);

        config.Correlate<FollowUpEmailSent>(m => m.Email, d => d.Email);
    }

    /* Handlers omitted for brevity */
}

Message Types And Naming Conventions

There are two types of messages you send in a Saga:

  • Commands
  • Events

Commands instruct the receiving component what to do.
Think: verb, imperative.

Events notify the Saga which process was just completed.
Think: what happened, past tense.

Saga Orchestration With Messages

The NewsletterOnboardingSaga starts by handling the SubscribeToNewsletter command, and publishes a SendWelcomeEmail command.

public async Task Handle(SubscribeToNewsletter message)
{
    if (!IsNew)
    {
        return;
    }

    await _bus.Send(new SendWelcomeEmail(message.Email));
}

The SendWelcomeEmail command is handled by a different component, which publishes a WelcomeEmailSent event when it completes.

In the WelcomeEmailSent handler we update the Saga state and publish a deferred message by calling Defer. Rebus will persist the SendFollowUpEmail command, and publish it when the timeout expires.

public async Task Handle(WelcomeEmailSent message)
{
    Data.WelcomeEmailSent = true;

    await _bus.Defer(TimeSpan.FromDays(3), new SendFollowUpEmail(message.Email));
}

Finally, the SendFollowUpEmail command is handled and we publish the FollowUpEmailSent event.

We update the Saga state again, and also call MarkAsComplete to complete the Saga.

public Task Handle(FollowUpEmailSent message)
{
    Data.FollowUpEmailSent = true;

    MarkAsComplete();

    return Task.CompletedTask;
}

Completing the Saga will delete it from the database.

Handling Commands With Rebus

Here's how the SendWelcomeEmail command handler looks like.

public class SendWelcomeEmailHandler : IHandleMessages<SendWelcomeEmail>
{
    private readonly IEmailService _emailService;
    private readonly IBus _bus;

    public SendWelcomeEmailHandler(IEmailService emailService, IBus bus)
    {
        _emailService = emailService;
        _bus = bus;
    }

    public async Task Handle(SendWelcomeEmail message)
    {
        await _emailService.SendWelcomeEmailAsync(message.Email);

        await _bus.Reply(new WelcomeEmailSent(message.Email));
    }
}

The important thing to highlight here is that we're using the Reply method to send a message back. This will reply back to the endpoint specified as the return address on the current message.

In Summary

Sagas are practical for implementing a long-lived process in a distributed system. Each business process is represented by a local transaction, and publishes a message to trigger the next step in the Saga.

Although Sagas are very powerful, they are also complicated to develop, maintain and debug.

We didn't cover a few important topics in this newsletter:

  • Error handling
  • Message retries
  • Compensating transactions

I think you'll have some fun researching these on your own.

Take a look at the source code for the example used in this newsletter if you want to learn more about implementing Sagas.

If you have Docker installed, you should be able to run it without a problem and try it out.

Thank you for reading, and have an awesome Saturday.


Whenever you're ready, there are 4 ways I can help you:

  1. (COMING SOON) Pragmatic REST APIs: You will learn how to build production-ready REST APIs using the latest ASP.NET Core features and best practices. It includes a fully functional UI application that we'll integrate with the REST API. Join the waitlist!
  2. Pragmatic Clean Architecture: Join 3,700+ students in this comprehensive course that will teach you the system I use to ship production-ready applications using Clean Architecture. Learn how to apply the best practices of modern software architecture.
  3. Modular Monolith Architecture: Join 1,600+ engineers in this in-depth course that will transform the way you build modern systems. You will learn the best practices for applying the Modular Monolith architecture in a real-world scenario.
  4. Patreon Community: Join a community of 1,000+ engineers and software architects. You will also unlock access to the source code I use in my YouTube videos, early access to future videos, and exclusive discounts for my courses.

Become a Better .NET Software Engineer

Join 61,000+ engineers who are improving their skills every Saturday morning.