⬆️ ⬇️

Introduction to CQRS + Event Sourcing: Part 2

In the last article, I started with the basics of CQRS + Event Sourcing. This time I propose to continue and look at ES in more detail.



In the example that I posted with my previous article, the magic of Event Sourcing was hidden behind the abstraction of IRepository and the two methods IRepository.Save () and IRepository.GetById <> ().

In order to understand in more detail what is happening, I decided to talk about the process of saving and loading the aggregate from the Event Store using the example of the Lokad IDDD Sample project from Rinat Abdulin. He has a direct appeal to the Event Store in the application services, without additional abstractions, so everything looks very clear. Application Service is an analogue of CommandHandler, but which handles all commands of one unit. This approach is very convenient and we also switched to it in one project.



Applicationservice



The IApplicationService interface is extremely simple.

public interface IApplicationService { void Execute(ICommand cmd); } 


In the Execute method, we pass any commands and we hope that the service will redirect them to the desired handler.



Since Rinat in the example has only one Customer unit, the service is also only one CustomerApplicationService. Actually, therefore, there is no need to make any logic in the base class. An excellent solution for example in my opinion.

')

The Execute method transfers the command processing to one of the overloads of the When method that matches the signature. The implementation of the Execute method is very simple using speakers, and there is no need to run on reflection on all methods.

 public void Execute(ICommand cmd) { // pass command to a specific method named when // that can handle the command ((dynamic)this).When((dynamic)cmd); } 


Let's start with the creation command - CreateCustomer.

 [Serializable] public sealed class CreateCustomer : ICommand { public CustomerId Id { get; set; } public string Name { get; set; } public Currency Currency { get; set; } public override string ToString() { return string.Format("Create {0} named '{1}' with {2}", Id, Name, Currency); } } 


In a real project, you will most likely have a message queue between the UI and the ApplicationService, but for an example, Rinat limited himself to directly passing the command to the service's uplink object (see class ApplicationServer).

After the CreateCustomer command gets into the Execute method, it is redirected to the When method.

 public void When(CreateCustomer c) { Update(c.Id, a => a.Create(c.Id,c.Name, c.Currency, _pricingService, DateTime.UtcNow)); } 


In the Update method, we pass an aggregate aydishka and an action that calls the method of changing the state of the aggregate. In general, in my opinion, it is better not to do the Create method of the aggregate, but to create another constructor, since calling the Create method in this context looks a bit strange. We seem to create an aggregate, but for some reason we pass the Create method as a method of changing the state. With the designer so would not have happened.



Let's go back to the Update method, the next task is to 1) load all events for the current aggregate, 2) create an aggregate and restore its state using loaded events, 3) execute the transferred Action Action execute on the aggregate and 4) save new events if they exist.

 void Update(CustomerId id, Action<Customer> execute) { // Load event stream from the store EventStream stream = _eventStore.LoadEventStream(id); // create new Customer aggregate from the history Customer customer = new Customer(stream.Events); // execute delegated action execute(customer); // append resulting changes to the stream _eventStore.AppendToStream(id, stream.Version, customer.Changes); } 




Recovery condition



In the example that I showed in the last article, the state of the aggregate was stored as private fields in the aggregate class. This is not very convenient if you want to add snapshots, because you have to somehow suck up the state every time or use reflection. Rinat has a much more convenient approach — for the state, a separate CustomerState class, which makes it possible to take out projection methods from the aggregate and it is much easier to save and load snapshots, although they are not in the example.

As you can see, the list of events of the same unit is transferred to the constructor of the unit, as it is not difficult to guess, in order for it to restore its state.

The unit in turn delegates the restoration of the state to the CustomerState class.

 /// <summary> /// Aggregate state, which is separate from this class in order to ensure, /// that we modify it ONLY by passing events. /// </summary> readonly CustomerState _state; public Customer(IEnumerable<IEvent> events) { _state = new CustomerState(events); } 


I will give the code for the entire CustomerState class, I’ll only remove a few When projection methods.

 /// <summary> /// This is the state of the customer aggregate. /// It can be mutated only by passing events to it. /// </summary> public class CustomerState { public string Name { get; private set; } public bool Created { get; private set; } public CustomerId Id { get; private set; } public bool ConsumptionLocked { get; private set; } public bool ManualBilling { get; private set; } public Currency Currency { get; private set; } public CurrencyAmount Balance { get; private set; } public int MaxTransactionId { get; private set; } public CustomerState(IEnumerable<IEvent> events) { foreach (var e in events) { Mutate(e); } } ... public void When(CustomerCreated e) { Created = true; Name = e.Name; Id = e.Id; Currency = e.Currency; Balance = new CurrencyAmount(0, e.Currency); } public void When(CustomerRenamed e) { Name = e.Name; } public void Mutate(IEvent e) { // .NET magic to call one of the 'When' handlers with // matching signature ((dynamic) this).When((dynamic)e); } } 


As you can see in the constructor, we forychem run on the transmitted events and transfer them to the Mutate method, which in turn sets them further into a suitable projection method.

You may notice that all properties have a private setter method, which guarantees that the state from the outside can be changed only by passing the corresponding event.



Condition restored, now you can and try to change it. Go back a little to the state change method. Since I started to deal with the CreateCustomer command, then the aggregate will have a look at the Create method.

 public void Create(CustomerId id, string name, Currency currency, IPricingService service, DateTime utc) { if (_state.Created) throw new InvalidOperationException("Customer was already created"); Apply(new CustomerCreated { Created = utc, Name = name, Id = id, Currency = currency }); var bonus = service.GetWelcomeBonus(currency); if (bonus.Amount > 0) AddPayment("Welcome bonus", bonus, utc); } 


Here is the place to check our business rules, since we have access to the current state of the unit. We have a business rule that Customer can only be created once (although there is also a technical limitation), so when we try to call Create on an already created aggregate, we throw an exception.

If all business rules are satisfied, then we pass the CustomerCreated event to the Apply method. The Apply method is very simple, it only sends the event to the state object and adds it to the list of current changes.

 public readonly IList<IEvent> Changes = new List<IEvent>(); readonly CustomerState _state; void Apply(IEvent e) { // pass each event to modify current in-memory state _state.Mutate(e); // append event to change list for further persistence Changes.Add(e); } 


In most cases, one operation with an aggregate accounts for one event. But just in the case of the Create method, there may be two events.

After we passed the CustomerCreate event to the Apply method, we can add a welcome bonus to the current customer, if the business rule is satisfied that the bonus amount should be greater than zero. In this case, the aggregation method AddPayment is called, which does not reduce any checks, but simply triggers the CustomerPaymentAdded event.

 public void AddPayment(string name, CurrencyAmount amount, DateTime utc) { Apply(new CustomerPaymentAdded() { Id = _state.Id, Payment = amount, NewBalance = _state.Balance + amount, PaymentName = name, Transaction = _state.MaxTransactionId + 1, TimeUtc = utc }); } 


Now we have to save new events and publish them in the Read model. I suspect that the next line does it.

 // append resulting changes to the stream _eventStore.AppendToStream(id, stream.Version, customer.Changes); 


Make sure ...

 public void AppendToStream(IIdentity id, long originalVersion, ICollection<IEvent> events) { if (events.Count == 0) return; var name = IdentityToString(id); var data = SerializeEvent(events.ToArray()); try { _appendOnlyStore.Append(name, data, originalVersion); } catch(AppendOnlyStoreConcurrencyException e) { // load server events var server = LoadEventStream(id, 0, int.MaxValue); // throw a real problem throw OptimisticConcurrencyException.Create(server.Version, e.ExpectedStreamVersion, id, server.Events); } // technically there should be a parallel process that queries new changes // from the event store and sends them via messages (avoiding 2PC problem). // however, for demo purposes, we'll just send them to the console from here Console.ForegroundColor = ConsoleColor.DarkGreen; foreach (var @event in events) { Console.WriteLine(" {0} r{1} Event: {2}", id,originalVersion, @event); } Console.ForegroundColor = ConsoleColor.DarkGray; } 


Well, almost pleased. Events are serialized and saved in the append only store (we are not going to delete and change them). But instead of sending them to the read-model (to the presentation level), Rinat simply displays them to the console.

However, for example, this is enough.

If you want to see how it all will work with the message queue, you can see an example on the github from my previous article. I changed it a bit and also introduced part of the Event Sourcing infrastructure to the solution. In this example, I want to show how you can add snapshots.



Snapshots



Snapshots are needed to take intermediate pictures of the state of the unit. This allows us not to load the entire stream of aggregate events, but only those that occurred after we made the last snapshot.

So look at the implementation.

In the UserCommandHandler class, there is an Update method very similar to the one that Rinat has, but for me it still saves and loads snapshots. Snapshots do every 100 versions.

 private const int SnapshotInterval = 100; 


First we bring up a snapshot from the repository, then we load the stream of events that occurred after we made this snapshot. Then we transfer all this to the designer of the unit.

 private void Update(string userId, Action<UserAR> updateAction) { var snapshot = _snapshotRepository.Load(userId); var startVersion = snapshot == null ? 0 : snapshot.StreamVersion + 1; var stream = _eventStore.OpenStream(userId, startVersion, int.MaxValue); var user = new UserAR(snapshot, stream); updateAction(user); var originalVersion = stream.GetVersion(); _eventStore.AppendToStream(userId, originalVersion, user.Changes); var newVersion = originalVersion + 1; if (newVersion % SnapshotInterval == 0) { _snapshotRepository.Save(new Snapshot(userId, newVersion,user.State)); } } 


In the constructor, we are trying to get the state from the snapshot or create a new state if there is no snapshot. On the received state we lose all the missing events, and as a result we get the current version of the aggregate.

 public UserAR(Snapshot snapshot, TransitionStream stream) { _state = snapshot != null ? (UserState) snapshot.Payload : new UserState(); foreach (var transition in stream.Read()) { foreach (var @event in transition.Events) { _state.Mutate((IEvent) @event.Data); } } } 


After the manipulations with the unit, we check whether the unit version is multiple to the interval through which we make snapshots, and if so, then save the new snapshot to the repository. To get the state of the aggregate from UserCommandHandler, we had to set the internal getter property of the State for it.



That's all, now we have snapshots, which allowed us to restore the state of the unit much faster if it has a lot of events.



Feedback



If you are interested in the topic CQRS + ES, please do not hesitate to ask questions in the comments. You can also write to me in Skype if there is no aka on Habré. Recently, a fellow from Chelyabinsk tapped me on Skype, and thanks to his questions, I had many ideas for the next article. Since I now have more free time at my disposal, I plan to write them more often.

Source: https://habr.com/ru/post/149464/



All Articles