📜 ⬆️ ⬇️

Can I use the CQRS pattern in GO?

The pattern (CQRS - Command and Query Responsibility Segregation) separates basically a command to read data from commands to modify or add data. This allows you to achieve maximum performance, scalability and security, and also allows you to increase system flexibility to modifications over time and reduce the number of errors when the complexity of the system logic, the cause of which is usually the processing of data at the domain level.

This pattern is not new and belongs to the Eiffel language. But thanks to the efforts of Greg Young and Martin Fowler, he was reincarnated, especially in the .NET world.

From my personal experience, I can say that the pattern is very convenient, it makes it quite easy to separate business logic, it is very well supported, easily “digestible” by beginners, and also well allowing to isolate errors. About him a lot of information is written on Habré and there is no point in re-typing it all in this article. Then I decided to completely focus on porting this pattern to Go.

So, let's begin…
')
At once I would like to say that CQRS does not necessarily imply Event Sourcing, but is often used in the CQRS + DDD + Event Sourcing bundle due to the simplicity of the latter. I tried projects with Event Sourcing and without it, and in both cases, the CQRS pattern was a good fit for business logic, but it's worth noting that using Event Sourcing brings its own advantages when implementing a denormalized database, the speed of saving and reading data, but complicates data understanding, migration, and reporting.
Simply put, for Event Sourcing you must use CQRS, but not vice versa.
Therefore, I will not touch the Event Sourcing, so as not to complicate the article.

And so for the processing of the Command we need a Bus. We define its basic interface

type EventBus interface { Subscribe(handler handlers.Handler) error Publish(eventName string, args ...interface{}) Unsubscribe(eventName string) error } 

We also need to store the command processing logic somewhere. To do this, define the interface handlers - Handler

 type Handler interface { Event() string Execute(... interface{}) error OnSubscribe() OnUnsubscribe() } 

In many contractual languages, the interface is usually added to the Command, but due to the implementation of the interfaces in Go, this does not make sense, and the command is replaced by an event — an Event that can be processed by multiple Handlers.

If you have already noticed, a set of parameters is passed to the Publish method - Publish (eventName string, args ... interface {}) , and not a specific type. As a result, you can pass to the method any type or set of types.

The full implementation of EventBus will look like this:

 type eventBus struct { mtx sync.RWMutex handlers map[string][] handlers.Handler } // Execute appropriate handlers func (b *eventBus) Publish(eventName string, args ...interface{}) { b.mtx.RLock() defer b.mtx.RUnlock() if hs, ok := b.handlers[eventName]; ok { rArgs := createArgs(args) for _, h := range hs { // In case of Closure "h" variable will be reassigned before ever executed by goroutine. // Because if this you need to save value into variable and use this variable in closure. h_in_goroutine := h go func() { //Handle Panic in Handler.Execute. defer func() { if err := recover(); err != nil { log.Printf("Panic in EventBus.Publish: %s", err) } }() h_in_goroutine.Execute(rArgs) }() } } } // Subscribe Handler func (b *eventBus) Subscribe(h handlers.Handler) error { b.mtx.Lock() //Handle Panic on adding new handler defer func() { b.mtx.Unlock() if err := recover(); err != nil { log.Printf("Panic in EventBus.Subscribe: %s", err) } }() if h == nil { return errors.New("Handler can not be nil.") } if len(h.Event()) == 0 { return errors.New("Handlers with empty Event are not allowed.") } h.OnSubscribe() b.handlers[h.Event()] = append(b.handlers[h.Event()], h) return nil } // Unsubscribe Handler func (b *eventBus) Unsubscribe(eventName string) error { b.mtx.Lock() //Handle Panic on adding new handler defer func() { b.mtx.Unlock() if err := recover(); err != nil { log.Printf("Panic in EventBus.Unsubscribe: %s", err) } }() if _, ok := b.handlers[eventName]; ok { for i, h := range b.handlers[eventName] { if h != nil { h.OnUnsubscribe() b.handlers[eventName] = append(b.handlers[eventName][:i], b.handlers[eventName][i+1:]...) } } return nil } return fmt.Errorf("event are not %s exist", eventName) } func createArgs(args []interface{}) []reflect.Value { reflectedArgs := make([]reflect.Value, 0) for _, arg := range args { reflectedArgs = append(reflectedArgs, reflect.ValueOf(arg)) } return reflectedArgs } // New creates new EventBus func New() EventBus { return &eventBus{ handlers: make(map[string][] handlers.Handler), } } 

Inside the Publish method, the event handler method is called wrapped in goroutine with panic handling, in case of an unexpected situation.

As you can see, the implementation is very simple, much easier than it can be implemented on .NET or Java.

You can download the full code with examples here: github .

Source: https://habr.com/ru/post/348126/


All Articles