dateo. Coding Blog

Coding, Tech and Developers Blog

Mediator
MassTransit
ASP.NET
CQRS

How to implement the mediator pattern in ASP.NET using MassTransit

Dennis Frühauff on March 23rd, 2023

I hope you remember the article in which I explored integrating messaging in your ASP.NET application to create loosely coupled code units, especially for performing long-running tasks and improving fault-tolerance.


We also briefly touched the topic of conversation-like workflows that are useful in call-and-response scenarios, in which your code actually would like to get some feedback from the operation in question, and I showed you a way of using the messaging infrastructure to make that work.


Today, the time has come to elaborate on a better way to tackle this problem, leveraging the internal mediator implementation of MassTransit without abusing the underlying bus infrastructure of Amazon SNS/SQS. With that in place, you can benefit form building a vertically-sliced and feature-oriented architecture for your application.


So let's dive into what that is all about and how you can implement it in your code base.


What is the mediator pattern?

The mediator pattern is a software design pattern that helps reduce the direct dependencies in your code by introducing some kind of dispatcher (= the mediator) in between. Instead of issuing a direct call to your dependencies, you hand over a description of what you are trying to do to this dispatcher which will then deliver this job to the class that needs to take care of it. Sounds vaguely familiar? Many real world applications adopt this sort of style to reduce the number of dependencies. Your traditional courier mail is a mediator, your email provider is as well and you even can think of .NET's intermediate language (IL) as a mediator.


In short, instead of having information of all its dependencies, your code only knows the mediator, thereby reducing the dependency graph from N*N (worst case) to 2N.


This pattern is especially famous for implementations of Command Query Responsibility Separation (CQRS) and variants of it. By moving away from classic layered architecture styles that can tend to make code unflexible and can add a lot of boilerplate for even simple tasks, it enables your software to be cut into small feature-related and vertical slices that specifically cover one use case in your system.



Consider the above diagram. It shows the sequence of events for a class HTTP-GET requests for the status of an arbitrary order.


  • The controller issues a query to the mediator to fetch the status of that order.
  • The mediator delivery this query to the specific implementation for this type of query.
  • The handler a) validates the query, b) accesses the data store, and, c) returns a result.

Please note that the handler implementation will not use data store abstractions like IOrderService. The handler is application logic - it is the direct consequence of your business requirement.


The mediator pattern or, in extension, CQRS are especially useful in event-driven systems or in systems where you want to create loosely coupled and independent islands of functionality. Since the handler implementations are so independent (loose coupling + high cohesion), it also scales very well in a growing code base with a high number of developers working in it.


How does this pattern relate to messaging?

It is easy to be confused about when to use a mediator implementation and when to use messaging. As discussed in one of the previous articles messaging is the right way to go if you need to perform an asynchronous task but don't expect a result back. The messaging infrastructure, if configured correctly, will handle transient errors and retry on failures, but you are not interested in a full status report at the time of issuing the command or the event.
On the other hand, if do expect a result, e.g., a validation message or a proper response, then you would go for the mediator pattern.


Also, since many of the implementations work in-memory, your response times will be much faster compared to going all the way through your messaging infrastructure.


Available implementations

There are a couple of ready-to-use implementations out there:


MassTransit also features a built-in implementation of the mediator pattern and since we were already dealing with MassTransit in previous articles, why not give it a try?
If your solution does not use MassTransit yet, only using it for implementing mediator/CQRS is probably like taking a sledgehammer to crack a nut but please do read on in any case, since the patterns are very similar in every implementation out there.


Lastly, there is also the option for you to implement the necessary infrastructure yourself. It is actually not very hard to do that and there might be good reasons for it. More on that later.


Mediator pattern in MassTransit

You can find this code on GitHub.


Now, let's assume we have an ASP.NET application that returns the status of an order on a particular endpoint. We want this query for the order to be handled via the mediator pattern. The first thing we'll need is an object that represents this type of query:


public record GetOrderStatus(int OrderId) 
  : Request<OrderStatusResult>;

In MassTransit's mediator implementation, everything is a Request<T> by default. That being said, if you want to fully go CQRS, you'll have to do some extra work to separate commands and queries for design time, but we will not do that now.


This query object can be published via an interface that is provided by MassTransit and that you can inject into, e.g., your controller:


[Route("[controller]")]
public class OrdersController : Controller
{
    private readonly IRequestClient<GetOrderStatus> client;
    
    public OrdersController(IRequestClient<GetOrderStatus> client)
    {
        this.client = client;
    }

    [HttpGet]
    public async Task<IActionResult> Get(int orderId)
    {
        var result = await client.GetResponse<GetOrderStatus, OrderStatusResult>(new GetOrderStatus(orderId));
    
    return new OkObjectResult(response.Message);
}

As you can see from the above snippet, you can pass on additional type arguments to the client to specify what type of result you are expecting from the query:


public record OrderStatusResult(int OrderId, OrderState State);

Finally, the implementation of the handler for this query is as simple as implementing the interface IConsumer<T>:


public class OrderStatusRequestConsumer : IConsumer<GetOrderStatus>
{
    private readonly OrderProvider provider;
    public OrderStatusRequestHandler(OrderProvider provider)
    {
        this.provider = provider;
    }
    public async Task Consume(ConsumeContext<GetOrderStatus> context)
    {
        var order = await provider.GetByIdAsync(context.Message.OrderId);

        await context.RespondAsync(new OrderStatusResult(order.Id, order.State));
    }
}

After getting the necessary data to create the reponse object, the consumer simply calls RespondAsync() on the message context for the result to be returned to the controller.


So that's it - we're done right? Well, no...
This is only the happy path, clearly. What if something goes wrong? What if we want to do validation on the request and return the result's back to the controller/user?


Making the implementation actually usable

First off, any excpetion thrown in the consumer will be directly propagated back to the code making the request (just inside a RequestException), so that's a good thing, but to actually cover up some of the boilerplate code and be able to validate requests and return different results in these cases, we have to put in some extra work of abstraction.
As a first step, let's have a dedicated object representing the result of the request that is in our control:


public class RequestResult<T>
{
    public RequestResult(T result)
    {
        Result = result;
    }

    public RequestResult(ValidationErrors validationErrors)
    {
        ValidationErrors = validationErrors;
    }

    public T? Result { get; }
    public ValidationErrors ValidationErrors { get; } = new();
    public bool IsValid => !ValidationErrors.Errors.Any();
}

It either contains the expected result type in case of success or a number of validation errors. Why do we do this? Because MassTransit's way of extracting results other than the expected response type is a bit cumbersome. You'll see what I mean if we take a look at the wrapper for IRequestClient<T>:


public class MassTransitRequestBus : IRequestBus
{
    private readonly IServiceScopeFactory scopeFactory;

    public MassTransitRequestBus(IServiceScopeFactory scopeFactory)
    {
        this.scopeFactory = scopeFactory;
    }

    public async Task<RequestResult<TResult>> GetResponse<TRequest, TResult>(TRequest message)
        where TRequest : class
        where TResult : class
    {
        using var scope = scopeFactory.CreateScope();
        var clientType = typeof(IRequestClient<>).MakeGenericType(typeof(TRequest));
        var client = scope.ServiceProvider.GetService(clientType);

        if (client is not IRequestClient<TRequest> requestClient)
        {
            throw new InvalidOperationException($"Could not create client for type {clientType}");
        }

        // If the request handler will throw an exception, it will be propagated to the caller inside
        // an RequestException. Custom wrapping/unwrapping could be placed here.
        var response = await requestClient.GetResponse<TResult, ValidationErrors>(message);

        if (response.Is(out Response<ValidationErrors>? validationResult) && validationResult != null)
        {
            return new RequestResult<TResult>(validationResult.Message);
        }

        if (response.Message is TResult requestResult)
        {
            return new RequestResult<TResult>(requestResult);
        }
        
        throw new InvalidOperationException($"Response is not of the expected type {typeof(TResult)}");
    }
}

The approach to go response.Is(out ....) is a bit nasty, so I assure you we will be happy to have concealed this in this class.
For something to properly handle the requests and to be able to perform proper validation we will also need a base class for the handlers. And there is really not much going on here:


public abstract class RequestHandler<TRequest, TResponse> : IConsumer<TRequest>
    where TRequest : class, Request<TResponse>
    where TResponse : class
{
    public async Task Consume(ConsumeContext<TRequest> context)
    {
        var validationResult = await Validate(context.Message, context.CancellationToken);
        if (validationResult.Errors.Any())
        {
            await context.RespondAsync(validationResult);
            return;
        }
        
        var response = await Handle(context.Message, context.CancellationToken).ConfigureAwait(false);
        await context.RespondAsync(response);
    }

    protected virtual Task<ValidationErrors> Validate(TRequest request, CancellationToken token)
    {
        return Task.FromResult(new ValidationErrors());
    }

    protected abstract Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken);
}

After having these in place, the code in the controller will be something like this:


[Route("[controller]")]
public class OrdersController : Controller
{
    private readonly IRequestBus requestBus;

    public OrdersController(IRequestBus requestBus)
    {
        this.requestBus = requestBus;
    }

    [HttpGet]
    public async Task<IActionResult> Get(int orderId)
    {
        var response = await requestBus.GetResponse<GetOrderStatus, OrderStatusResult>(new GetOrderStatus(orderId));

        if (response.IsValid)
        {
            return new OkObjectResult(response.Result);
        }

        return new BadRequestObjectResult(response.ValidationErrors);
    }
}

And the request handler for the order status becomes:


public class OrderStatusRequestHandler : RequestHandler<GetOrderStatus, OrderStatusResult>
{
    private readonly OrderProvider provider;

    public OrderStatusRequestHandler(OrderProvider provider)
    {
        this.provider = provider;
    }

    protected override async Task<ValidationErrors> Validate(GetOrderStatus request, CancellationToken token)
    {
        if (request.OrderId <= 0)
        {
            return new ValidationErrors(new[] {$"Order ID {request.OrderId} is invalid."});
        }

        return new ValidationErrors();
    }

    protected override async Task<OrderStatusResult> Handle(GetOrderStatus request, CancellationToken cancellationToken)
    {
        var order = await provider.GetByIdAsync(request.OrderId);

        return new OrderStatusResult(order.Id, order.State);
    }
}

Now, there is only one thing left. Let's have a look at our startup file, Program.cs:


var builder = WebApplication.CreateBuilder(args);
builder.Services.AddTransient<IRequestBus, MassTransitRequestBus>();
builder.Services.AddControllers();
builder.Services.AddMassTransit(busConfigurator =>
{
    busConfigurator.UsingInMemory();
    busConfigurator.AddMediator(x =>
    {
        x.AddConsumers(typeof(Program).Assembly);
        x.AddRequestClient(typeof(GetOrderStatus));
    });
});

var app = builder.Build();

app.MapControllers();

app.Run();

There we have it, a fully working mediator implementation using MassTransit, with the unnecessary boilerplate code encapsulated in our own classes. Still, there are a few things I would like to mention, because I feel they are important:


  • As stated earlier, the handlers are core/application logic. If you follow clean/hexagonal architecture principles, these would naturally reside in your core project. And while we did some work to abstract away the inner workings of MassTransit to the domain code, your domain code will at least have a transitive reference to MassTransit. Whether or not you think that is a bad thing, just be aware that this is the case. The code I've shown you here does not attempt to fully decouple on a DLL level.
  • If you really want to address this decoupling, you would need an additional layer between your domain code and MassTransit. I am sure that the amount of code necessary for this operation is about the same as the code that is actually necessary to create a mediator implementation on your own. So, to get rid of this external dependency, I actually encourage you to do an implementation of the mediator pattern that directly suits your needs. I'd figure one could get away with some 100 or 200 lines of actual code; and it's also a very good exercise at API design.

Conclusion

In this post we took a look at the mediator implementation in general and as integrated with MassTransit. I also showed you a way of encapsulating important functionality in classes that lie in your own control. Feel free to have a look at the code on GitHub. I also added the sample HTTP requests to test the functionality.



Please share on social media, stay in touch via the contact form, and subscribe to our post newsletter!

Be the first to know when a new post was released

We don’t spam!
Read our Privacy Policy for more info.

We use cookies on our website to give you the most relevant experience by remembering your preferences and repeat visits. By clicking “Accept All”, you consent to the use of ALL the cookies. However, you may visit "Cookie Settings" to provide a controlled consent.