public interface IApplicationService { void Execute(ICommand cmd); }
public void Execute(ICommand cmd) { // pass command to a specific method named when // that can handle the command ((dynamic)this).When((dynamic)cmd); }
[Serializable] public sealed class CreateCustomer : ICommand { public CustomerId Id { get; set; } public string Name { get; set; } public Currency Currency { get; set; } public override string ToString() { return string.Format("Create {0} named '{1}' with {2}", Id, Name, Currency); } }
public void When(CreateCustomer c) { Update(c.Id, a => a.Create(c.Id,c.Name, c.Currency, _pricingService, DateTime.UtcNow)); }
void Update(CustomerId id, Action<Customer> execute) { // Load event stream from the store EventStream stream = _eventStore.LoadEventStream(id); // create new Customer aggregate from the history Customer customer = new Customer(stream.Events); // execute delegated action execute(customer); // append resulting changes to the stream _eventStore.AppendToStream(id, stream.Version, customer.Changes); }
/// <summary> /// Aggregate state, which is separate from this class in order to ensure, /// that we modify it ONLY by passing events. /// </summary> readonly CustomerState _state; public Customer(IEnumerable<IEvent> events) { _state = new CustomerState(events); }
/// <summary> /// This is the state of the customer aggregate. /// It can be mutated only by passing events to it. /// </summary> public class CustomerState { public string Name { get; private set; } public bool Created { get; private set; } public CustomerId Id { get; private set; } public bool ConsumptionLocked { get; private set; } public bool ManualBilling { get; private set; } public Currency Currency { get; private set; } public CurrencyAmount Balance { get; private set; } public int MaxTransactionId { get; private set; } public CustomerState(IEnumerable<IEvent> events) { foreach (var e in events) { Mutate(e); } } ... public void When(CustomerCreated e) { Created = true; Name = e.Name; Id = e.Id; Currency = e.Currency; Balance = new CurrencyAmount(0, e.Currency); } public void When(CustomerRenamed e) { Name = e.Name; } public void Mutate(IEvent e) { // .NET magic to call one of the 'When' handlers with // matching signature ((dynamic) this).When((dynamic)e); } }
public void Create(CustomerId id, string name, Currency currency, IPricingService service, DateTime utc) { if (_state.Created) throw new InvalidOperationException("Customer was already created"); Apply(new CustomerCreated { Created = utc, Name = name, Id = id, Currency = currency }); var bonus = service.GetWelcomeBonus(currency); if (bonus.Amount > 0) AddPayment("Welcome bonus", bonus, utc); }
public readonly IList<IEvent> Changes = new List<IEvent>(); readonly CustomerState _state; void Apply(IEvent e) { // pass each event to modify current in-memory state _state.Mutate(e); // append event to change list for further persistence Changes.Add(e); }
public void AddPayment(string name, CurrencyAmount amount, DateTime utc) { Apply(new CustomerPaymentAdded() { Id = _state.Id, Payment = amount, NewBalance = _state.Balance + amount, PaymentName = name, Transaction = _state.MaxTransactionId + 1, TimeUtc = utc }); }
// append resulting changes to the stream _eventStore.AppendToStream(id, stream.Version, customer.Changes);
public void AppendToStream(IIdentity id, long originalVersion, ICollection<IEvent> events) { if (events.Count == 0) return; var name = IdentityToString(id); var data = SerializeEvent(events.ToArray()); try { _appendOnlyStore.Append(name, data, originalVersion); } catch(AppendOnlyStoreConcurrencyException e) { // load server events var server = LoadEventStream(id, 0, int.MaxValue); // throw a real problem throw OptimisticConcurrencyException.Create(server.Version, e.ExpectedStreamVersion, id, server.Events); } // technically there should be a parallel process that queries new changes // from the event store and sends them via messages (avoiding 2PC problem). // however, for demo purposes, we'll just send them to the console from here Console.ForegroundColor = ConsoleColor.DarkGreen; foreach (var @event in events) { Console.WriteLine(" {0} r{1} Event: {2}", id,originalVersion, @event); } Console.ForegroundColor = ConsoleColor.DarkGray; }
private const int SnapshotInterval = 100;
private void Update(string userId, Action<UserAR> updateAction) { var snapshot = _snapshotRepository.Load(userId); var startVersion = snapshot == null ? 0 : snapshot.StreamVersion + 1; var stream = _eventStore.OpenStream(userId, startVersion, int.MaxValue); var user = new UserAR(snapshot, stream); updateAction(user); var originalVersion = stream.GetVersion(); _eventStore.AppendToStream(userId, originalVersion, user.Changes); var newVersion = originalVersion + 1; if (newVersion % SnapshotInterval == 0) { _snapshotRepository.Save(new Snapshot(userId, newVersion,user.State)); } }
public UserAR(Snapshot snapshot, TransitionStream stream) { _state = snapshot != null ? (UserState) snapshot.Payload : new UserState(); foreach (var transition in stream.Read()) { foreach (var @event in transition.Events) { _state.Mutate((IEvent) @event.Data); } } }
Source: https://habr.com/ru/post/149464/