Event Driven Architecture: Publishing Events using an IOC container
In my last post I talked about advanced IOC usage and how it is possible to use an IOC container to resolve an open generic from a closed implementation. This is technically cool, but it does not explain why this is important. In this post I want to show some additional code that will demonstrate a basic concept of Event Driven Architecture: Publishing an Event.
Why is Event Driven Architecture good?
Event driven architecture is extremely extensible. In my mind, it the perfect solution to address the Open Closed Principal and enable Single Responsibility in code.
Consider a system that processes orders. There is a method called SubmitOrder(order) on an OrderController. It just validates and saves the new order to the database today.
In a following story, the product owner says “I need the system to send an email to the customer when the order is submitted”. No problem, inject the MailService into the OrderController, send the mail in the SubmitOrder(order) method. great.
The next story the product owner says “I need you to deduct the ordered quantities from the OnHand inventory”. Check, so we inject in the inventory service into our OrderController, and we call it from the SubmitOrder(order) method.
More stories…. the product owner says “We need to send a message to the warehouse fulfillment system to let them know a new order has been submitted”
Ugh, another service, another method. The tests for SubmitOrder become out of control. Even when extracting methods and encapsulating more logic into services, there is still violation of SRP & OCP. Not to mention there could be performance degradation from performing all of these tasks synchronously while the user waits.
Its not the job of SubmitOrder to do all of this, there has to be a better way.
Publishing an Event Example
What if I “tell” the system that an event had occurred, and anyone interested could take action? (like an old school observer right?) In the following code sample I am going to inject in an ‘EventPublisher’ into my OrderController, then use that object to notify the rest of the system by publishing an event using this line:
_eventPublisher.Publish(new OrderSubmittedEvent{OrderId = order.Id});
The OrderController
public class OrderController : Controller
{
private readonly IRepository<Order> _orderRepository;
private readonly IEventPublisher _eventPublisher;
public OrderController(IRepository<Order> orderRepository,
IEventPublisher eventPublisher)
{
_orderRepository = orderRepository;
_eventPublisher = eventPublisher;
}
public ActionResult SubmitOrder(OrderViewModel viewModel)
{
try
{
if (ModelState.IsValid)
{
var order = MapOrder(viewModel);
_orderRepository.Add(order);
_orderRepository.SaveChanges();
_eventPublisher.Publish(new OrderSubmittedEvent{OrderId = order.Id});
//Display success message
//ViewInfo.AddSuccessMessage(Language.SubmitOrderSuccess);
}
}
catch
{
ModelState.AddModelError("__Form", Language.SubmitOrderError);
}
return View(viewModel);
}
//other
}
Now, using this simple Consumer (Handler) interface:
public interface IConsumer<T>
{
void Handle(T eventMessage);
}
I can independently implement each of the system requirements, as needed:
public class EmailOrderConfirmation : IConsumer<OrderSubmittedEvent>
{
public void Handle(OrderSubmittedEvent eventMessage)
{
//send email
}
}
public class NotifyWarehouse : IConsumer<OrderSubmittedEvent>
{
public void Handle(OrderSubmittedEvent eventMessage)
{
//notify warehouse
}
}
public class DeductOnHandInventory : IConsumer<OrderSubmittedEvent>
{
public void Handle(OrderSubmittedEvent eventMessage)
{
//deduct inventory
}
}
One of my favorite parts about this code: the container is building up all of these event consumers, which makes it is easy to satisfy all of the dependencies using constructor injection. This makes testing each of these handlers a breeze, and everything follows the same patterns and conventions keeping the code base understandable and clean. (Services, Controllers, Consumers all work the same)
Just plug in the dependencies in the EmailOrderConfirmation consumer ctor* like so:
public class EmailOrderConfirmation : IConsumer<OrderSubmittedEvent>
{
private readonly IRepository<Order> _orderRepository;
private readonly ISmtpService _smtpService;
private readonly ILogger _logger;
public EmailOrderConfirmation(IRepository<Order> orderRepository,
ISmtpService smtpService,
ILogger logger)
{
_orderRepository = orderRepository;
_smtpService = smtpService;
_logger = logger;
}
public void Handle(OrderSubmittedEvent eventMessage)
{
var order = _orderRepository.Single(x => x.Id == eventMessage.OrderId);
var message = new SmtpMessage();
//get customer info from order & populate message
_smtpService.SendMessage(message);
}
}
* Note that you could also inject the IEventPublisher into a Consumer and publish more events.
How Does it Work?
Under the hood the IoC container is doing most of the work. It keeps track of the event subscriptions and also provides the consumer instantiation.
Event Subscriptions
Here is how the event subscriptions are added to the container. In this example I use an interface so that I can enable dependency injection into components, and then mock the GetSubscriptions<T> dependency for easy testing. I also use a static method for adding subscriptions since the registration process operation usually happens at bootstrapping time, which is generally all static. The IoC.Container here is thread safe.
public interface ISubscriptionService
{
IEnumerable<IConsumer<T>> GetSubscriptions<T>();
}
public class EventSubscriptions : ISubscriptionService
{
public static void Add<T>()
{
var consumerType = typeof(T);
consumerType.GetInterfaces()
.Where(x => x.IsGenericType)
.Where(x => x.GetGenericTypeDefinition() == typeof(IConsumer<>))
.ToList()
.ForEach(x => IoC.Container.RegisterType(x,
consumerType,
consumerType.FullName));
}
public IEnumerable<IConsumer<T>> GetSubscriptions<T>()
{
var consumers = IoC.Container.ResolveAll(typeof(IConsumer<T>));
return consumers.Cast<IConsumer<T>>();
}
}
Event Publishing
Now that the subscriptions are setup, the publisher can read those subscriptions, get the consumers from the container, and pass the event message instance to each of them. Again I am using an Interface so I can enable dependency injection (and mocking capabilities) for the components which need to do publishing (controllers, services)
public interface IEventPublisher
{
void Publish<T>(T eventMessage);
}
public class EventPublisher : IEventPublisher
{
private readonly ISubscriptionService _subscriptionService;
public EventPublisher(ISubscriptionService subscriptionService)
{
_subscriptionService = subscriptionService;
}
public void Publish<T>(T eventMessage)
{
var subscriptions = _subscriptionService.GetSubscriptions<T>();
subscriptions.ToList().ForEach(x => PublishToConsumer(x, eventMessage));
}
private static void PublishToConsumer<T>(IConsumer<T> x, T eventMessage)
{
try
{
x.Handle(eventMessage);
}
catch(Exception e)
{
//log and handle internally
}
finally
{
var instance = x as IDisposable;
if (instance != null)
{
instance.Dispose();
}
}
}
}
- Keep in mind in this example is all running on the same thread, therefore even though semantically we are handling events as if they are asynchronous, it is a blocking operation
Additional Benefits of this Architecture
- Events are in messages, which can be serialized and processed on different threads, or even different nodes in a cluster. In a web environment, this might mean offloading the workload out of your IIS processes freeing web-server threads for performance & scalability (see Service Bus section)
- Get multicast delegate observer like functionality without having to deal with static events, event registration & deregistration (the whole += –= biz)
- Each event handler has its own class, each having 1 responsibility
- Very extensible, very maintainable (wait, I said that already?)
This is not a Service Bus
This code above could work great in a simple MVC.NET application for implementing clean separation and structuring your code base into events. Though it gets a lot of influence from the OSS services busses, it is greatly simplified and merely demonstrates a simple concept of publishing & consuming domain events using an IoC container.
A service bus does quite a bit more than just publishing local messages on single thread. If you find yourself enjoying this posts and want to take it to the next level, check out MassTransit (Chris Patterson & Dru Sellers) & nServiceBus (Udi Dahan). I would also recommend the EAI patterns books by Gregor Hohpe and recognize the reference patterns. (one of these days Ill make it through it instead of using it for reference!)
Some additional concepts and common patterns you might find:
- Publish-Subscribe
- Request-Response
- Sagas (Workflow)
- Correlation
- Competing Consumers
- Distributed Load (processing across many nodes in cluster)
- Transport (Http, MSMQ, ActiveMq, WCF TCP)
- Serialization (Binary, XML, JSON)
- Threadpooling
The Specification
Normally I don’t make a habit of testing the container, but in this case its so close to the container I feel its appropriate vs mocking out the subscriptions. Plus since this is a blog post demonstrating advanced IOC usage, I want explicitly show all of the dependencies required at bootstrap time.
Please note the LoggerSpy for assertions. In this spec we are really only concerned whether or not they get called. Because of the nature of eventing, using a Test Spy is really the only good way I have found to assert whether or not your handlers get invoked. (If your processing on multiple threads you will have to make sure they are all done before you check assertions, perhaps a future blog post)
[TestFixture]
public class When_An_Order_Is_Submitted : Specification_Context
{
private LoggerSpy _loggerSpy;
private Mock<IRepository<Order>> _orderRepositoryMock;
readonly IUnityContainer _container = IoC.Container;
protected override void Context()
{
//setup EF repository mock
_orderRepositoryMock = new Mock<IRepository<Order>>();
//logger spy to capture handler messages
_loggerSpy = new LoggerSpy();
//register with container
_container.RegisterInstance<ILogger>(_loggerSpy);
_container.RegisterInstance(_orderRepositoryMock.Object);
_container.RegisterType<IEventPublisher, EventPublisher>();
_container.RegisterType<ISubscriptionService, EventSubscriptions>();
_container.RegisterType<OrderController>();
//add event subscriptions
EventSubscriptions.Add<EmailOrderConfirmation>();
EventSubscriptions.Add<NotifyWarehouse>();
EventSubscriptions.Add<DeductOnHandInventory>();
}
protected override void Because()
{
_container.Resolve<OrderController>().SubmitOrder(new OrderViewModel());
}
[Test]
public void Order_Should_Be_Persisted()
{
_orderRepositoryMock.Verify(x => x.Add(It.IsAny<Order>()));
_orderRepositoryMock.Verify(x => x.SaveChanges());
}
[Test]
public void Email_Notification_Should_Be_Sent_To_Customer()
{
_loggerSpy.LoggedMessages.ShouldContain(Language.OrderConfirmationEmailSuccess);
}
[Test]
public void On_Hand_Inventory_Should_Be_Reduced()
{
_loggerSpy.LoggedMessages.ShouldContain(Language.DeductOnHandInventorySuccess);
}
[Test]
public void Warehouse_Should_Be_Notified()
{
_loggerSpy.LoggedMessages.ShouldContain(Language.NotifyWarehouseSuccess);
}
}






@Jason
Thank you for the positive feedback, I am very glad that you find my posts helpful. I do have plans for additional posts that build on the EDA posts and also EF POCO. I will put some CQRS posts on the radar and well see what comes out. I have a lot on my plate at the moment, so my blog has been on the back-burner.
@Jarod Ferguson
Sure thing… I look forward to it.
Excellent article – thanks for posting this!