We were engaged somehow in audio processing in Java using complex algorithms. Each piece of audio had to go through a long chain of processing (20-50 algorithms of varying degrees of complexity). Audio streams arrived in parallel, the algorithms worked in parallel, and ended at different times. Some algorithms needed different degrees of buffering. Information from an increasing level of abstraction was extracted from pieces of audio, that is, starting from some level, it was not the audio that was coming, but the extracted information about this audio.
The whole farm was supposed to work within the framework of a single instance of the application, but it had to have several nested almost independent very similar containers for the client code (such as beans).
From the very beginning, we did not set the goal of universal unification, and we solved it in our own way in each part of the system. Somewhere they used threads for long-term tasks, somewhere they created chains of calls, somewhere they used a subscription model. Since the system was quite large, almost all known methods of decomposition and processing were involved in varying degrees. Then we discovered commonality and implemented similar solutions in different parts of the system. And then they invented the first version of what we now call contact system or
SynapseGrid .
Modularity
Since we did not solve a separate applied task from the “did-forgotten” series, but implemented tools to solve a family of tasks, one of the principles of development was the
principle of modularity . That is, we created separate reusable components that had to be combined in an almost arbitrary manner in order to build systems to solve any task from the family.
')
Audio stream processing
When we first started, we didn't understand very well how to model the audio stream. We began by representing it in the form of an AudioInputStream. It turned out that such a representation does not allow for modularity. Those. One module can easily read from a byte stream with locks, and perform its processing. But how will he pass the data on? If you connect modules with pipe, you will have to put each tiny module in a separate execution thread, which will be blocked until a piece of new data arrives. This, of course, was extremely inefficient.
Then we found that if the audio stream is divided into frames (pieces of fixed length, 10-40 ms each), then the modules are much easier to connect. Pieces of audio could be placed in a normal queue and the queue could be checked without blocking.
Primary audio processing in many tasks turned out to be almost linear. Those. the modules were lined up in a chain in which the output of the previous module was connected to the input of the subsequent module. This structure is usually called the pipeline.
There are three ways to organize data transfer:
1. pull
def getData = process(previousModule.getData)
2. push
def pushData(d:data) { val res = process(d) nextModule.pushData(res) }
3. functional aggregation
def f(d:data) = { val r1 = module1.f(d) val r2 = module2.f(r1) r2 }
In the case of pull, the data is “pulled out” of the last component, and the chain of previous components is unobservable and blocking may occur due to lack of data in some buffer. If any exception occurs, the entire stack collapses, which makes it impossible to continue working.
In the case of push, the data “pushes” into the first component and their subsequent fate remains unknown. If there is a lack of data for buffering at the output of the chain, nothing appears. When an exception occurs, the stack collapses again and the exception appears at the very beginning of the chain, where it is again not clear what can be done. Continuing work is also impossible. In addition, the implementation of modules in push mode is rather inconvenient / unusual for a programmer. And it definitely requires side effects.
When we began to use the functional-approach, it turned out that it has advantages over pull and push. First, the entire chain of components is clearly visible, and can be easily traced. Secondly, when an exception occurs in any component, it is easy to intercept, and, importantly, the exception affects only one component, and processing can be continued with minimal problems.
Since development at that time was Java, we made an interface for the functional approach that all modules had to implement. It turned out that the modules were fundamentally divided into two types - those that always required exactly one output element for each input element, and other modules that could output 0..n elements at the output. Modules of the first type (of the
map
type), although they were a special case of the second type, but they were encountered quite often and their interface was simpler:
trait MapModule[TIn, TOut] { def f(data:TIn):TOut }
Modules of the second type (
flatMap
) contained internal buffering specific to the module, and had at the output a collection containing 0..n elements:
trait FlatMapModule[TIn, TOut] { def f(data:TIn):Collection[TOut] }
For "pumping" data through the pipeline, it was possible to use two strategies - depth first or width first.
def widthFirst(d:Data) = { val r1:Collection[...] = module1.f(d) val r2 = for(d1 <- r1; d2 <- module2.f(d1)) yield d2 val r3 = for(d2 <- r2; d3 <- module3.f(d2)) yield d3 r3 } def depthFirst(d:Data) = { for(d1 <- module1.f(d); d2 <- module2.f(r1); d3 <- module3.f(d2) ) yield d3 }
At depth first, each element was traced to the end of the chain, and then the next input element was processed. With width first, all elements went through the first stage, then all received ones went through the second stage, and so on.
The most common interface is 1 -> 0..n (flatMap), so this interface was taken as the basis for development. It turned out that modules having such an interface are very easy to integrate on the principle of functional aggregation into larger modules having exactly the same interface. For a linear pipeline, it turned out that nothing else was required.
Subscription model
At some point, the processing forked. The same data had to be transferred to several modules. A scheme that worked well for the pipeline created problems here. First, in order to transfer data to several places, it was necessary to have an algorithm in which both continuations were explicitly stated. Secondly, when aggregating a chain with two ends, it was impossible to provide the same interface with one input and one output.
We quickly came up with a scheme with a subscription to events. An Event [T] object was created in which it was possible to put data on one side and subscribe to new data on the other. This allowed for decoupling modules. The event was made asynchronous and thread safe. The created scheme is very similar to the
RxJava library (where the event is called Observable [T]).
The subscription model was pretty good and made it quite simple to organize work in several independent streams. One of the differences from Observable in our scheme was that data could come from several sources as input to Event.

Fig.1 Many-to-many, directly

Fig.2 Many-to-many under the "star" scheme
As the project progressed, the number of event's increased and the limitations of this scheme became apparent.
1. It is not known whether this event is generated by anyone.
2. It is not known whether anyone has subscribed to the event.
3. Data streams flow unobservable - one event, a handler, hidden internal generation (?) Of another event, another handler, etc.
4. If the subscription is synchronous, i.e. If the handler is executed in the calling thread, it is not clear what to do with the exception.
If the event handler's subscription to the event can still be traced, and even event-subscribers can be drawn, then the generator-event communication was otherworldly and it was impossible to understand whether someone was generating this event and under what conditions.
Akka
At this point
Akka was already used for remoting here and looked promising. At least, one could hope to pass on the Akka multithreading and solve some problems with events.
Akka offers a lightweight Actor who works in a conditionally separate stream, but does not consume a lot of resources. A conditionally separate stream means that if an ector is busy processing a message, no other thread will be simultaneously used to process other messages by the same ector. Those. a separate ector processes the messages strictly sequentially, but with a large number of ectors it turns out that everyone works joyfully in parallel and does not interfere with each other.
It turned out that in the approach with communication ectors, they are not thought out at all and are not observed. Those. Relations between ectors are entirely on the conscience of the developer and can crawl along the program as you please. Moreover, although we can theoretically track the fate of a single link to an ector, however, Akka provides a way to get links to the textual url of the ector. Those. in principle, we cannot track and limit the spread of links in the system.
What does this lead to? In a multi-threaded system with a large number of similar ectors and a huge number of unobserved links, there are occasionally little-known phenomena. For example, an infinite loop of three types: divergent, when in each iteration the number of messages increases, monotonous, which simply lives its own life, eats away processor resources, but does not lead to a catastrophe, and fading, when the number of messages slowly decreases. To debug such a system is next to impossible. It is very difficult. In a number of critical places, we had to use the
Model Checking approach. Still, there was no complete certainty that some messages did not circulate somewhere.
Upstart
Some time ago, a boot system based on an event model (
Upstart ) appeared in Ubuntu. The system is very remarkable for its approach to events. Each event has a name and can have parameters. Handlers (tasks) can subscribe to a combination of events (with boolean conditions on parameters) and will be called if everything goes well. Also, handlers declaratively describe the events that they generate (although it is possible to generate any event in another way).
This system describes the so-called "lattice", or directed graph (digraph). Lattice nodes are events, and branches are handlers. Due to the upstart built-in parallelism in the graph, it is possible to start handlers much earlier than System V, based on fixed separating layers (runlevels 0..6). Independent handlers work quietly in parallel.
The ideas of Upstart influenced the further development of the contact system.
Contact system version 1
As long as we consider a picture of a star with one event (shown in the figure above), we will not be able to come up with a good mechanism for organizing a multi-threaded system. But if we construct a directed graph, interesting perspectives are already drawn.

Fig.3 The grid of events and handlers
The scheme is very similar upstart. If you demand that the declarations are declared declaratively and prohibit / exclude the implicit creation of links, then the entire graph describing the operation of the entire system turns out to be at a glance and is available for static analysis. Just by observing such a graph, you can detect unwanted cycles, missing links, hanging separately Event'y or handlers.
In the first version, we just summarized and adapted Event [T] somewhat, but kept the distributed (“smeared”) message passing mechanism. The availability of the processing graph in itself allowed to understand the operation of the system and solve current problems. But such a “blurred” message transfer mechanism with mandatory buffering and protected and thread-safe was not suitable for solving similar problems in other parts of the system. It required something similar, but more lightweight.
The mechanism of operation of lightweight ectors in Akka
It requires a slight digression on the mechanism of lightweight ectors, which has many similarities with the SynapseGrid architecture.
Consider this code:
val helloActorRef = actor(new Act { become { case "hello" ⇒ sender ! "hi" } }) helloActorRef ! "Hello"
When we send a “Hello” message to an ector, a wrapper is created around our message:
Message(helloActorRef, "Hello")
And this message is placed in the queue. Somewhere in parallel there are 16 threads that read messages from this large queue. And there is still some dispatcher who ensures that two threads do not work with one ector at the same time. The thread grabs the message, finds the ector and simply performs the function of this ector on our message (“Hello”).
Such a scheme allows millions of ectors to exist who do not consume resources (except for a bit of memory) while they are sleeping. And the consumption of processor time occurs only in 16 threads and exclusively on the processing of real data.
If you look at the user model, then each ector has his own message queue and we are a team
!
send our message to this queue. That is, it looks like there are a million bursts. And this custom model is implemented in the form of a global queue and global flows. There is a kind of virtualization of the user view using an effective system view.
In this scheme, actorRef plays the role of a handle, or a tag, or a label. Those. We supply our data with a label that shows the location of the “Hello” message in the virtual space of virtual queues of ectors.
Virtually every million ektorov we have a million queues. Represent an array of 1,000,000 queues that you need to view? In this virtual user model, everything will not be very effective. And the same structure is implemented with the help of labels, which allows to get one big queue, but with tags-tags. And this queue will work very effectively.
Another analogy that comes to mind is using a HashMap to store huge sparse arrays. How can we save some data for several points in three-dimensional space? Let the size of our three-dimensional space - 1000x1000x1000. If you store arrays, you will need 10e9 cells. And if you use the label (x, y, z), then you need much less memory. This is where
HashMap[(x,y,z), data]
works.
Contact System Version 2 or SynapseGrid
In the scheme with a blurred mechanism for transferring communications, messages were directly stored in Event. It is as if a million queues to the ectors actually do. But if you make a single collection where events and connections will be located, then you can provide an effective centralized message distribution mechanism.
So, to represent events that you can subscribe to, we introduce the concept of
contact ,
val contact1 = Contact[T]("1")
and to submit messages associated with this contact, a
signal .
Signal[T](contact1, data:T)
As you can see, the contact is similar to ActorRef, and the signal is similar to Message. The difference is that neither the state nor the behavior is associated with the contact. This is just some kind of ID. If we recall the analogy with the virtual "queue", the contact identifies this queue.
Since there is no data or behavior in the contact, it is simply impossible to send a message to it. This leads to the impossibility of creating otherworldly links, which usually arise inside handlers / handlers. Relationships can only be declarative.
External binding
In the use of contacts and signals, ActorRef and Message, we observe a common pattern, which, apparently, is quite common in functional programming. I call this pattern "external binding". The idea is not to store data in the object that is used to address this data. Instead, the address (link, identifier) ​​is an immutable structure, similar to an index. And the value is stored somewhere else or not anywhere at all. To access the data, we use some kind of monad system that manages our data.
For example, a signal is a temporary object that reports for a short time that such-and-such data is on such a contact. It would be possible with the same success instead of a list of signals to use a HashMap or other structure that would seem convenient to us.
Due to the fact that data is not stored in the contact itself, we can, for example, operate on the history of the “state” of a variable. It is enough to have a list of signals related to one contact. A signal processing for the entire system allows you to build the development in time of all its "states".
As mentioned above, the model of the whole system is a directed graph. If you look closely, you can see that this is a bichromatic graph. But in most cases, elementary handlers are map or flatMap components that have one input and one output. Therefore, the basis of the SynapseGrid is a simpler graph, the nodes of which correspond to the contacts, and the arcs are connected with the components that process the data. Those. the graph is defined by a set of triples (contact 1, contact 2, handler):
Link[TIn, TOut](from:Contact[TIn], to:Contact[TOut],function: TIn=>Collection[TOut])
(here we use the fact that map components are a special case of flatMap components).
In the SynapseGrid description, there is an interesting metaphor for a breadboard model , well illustrating the idea of ​​a SynapseGrid.
When data (signals) appear on any contact, we can easily identify all the handlers that would like to process this data, we can plan to call these handlers in one or several threads. The results of the work received by each processor become signals on the output contacts to which the processors are attached, and again get into the general queue for processing.
Buiders and DSL
The described mechanism of operation assumes the presence of an immutable (immutable) graph, given by a set of triples. Such a graph would not be very convenient to create immediately in one expression. Instead, it turns out to be convenient to use the concept of builders, which allow step by step, by adding individual arcs, to construct a graph of the system.
For the contact system, we developed the DSL
SystemBuilder
, which contains many methods for all occasions, allowing us to design the system step by step.
val in = input[Int]("in")
Using the standard method names map, flatMap, filter allows you to use Scala-sugar - for-comprehension:
val c2 = for(i <- in) yield i * 2
More examples of using DSL are in the
SynapseGrid description .
Metamorphosis system
In SynapseGrid, two phases are clearly distinguished - the system design phase and the runtime phase. And usually these phases do not overlap. It is impossible to simply add new elements to the working system. A similar phenomenon occurs, for example, in Guice - modules are created first, and then according to the description contained in the modules, a working system is constructed.
The code that constructs the system uses the API Builder. And the code that works in a real system knows nothing about contacts and signals. It simply processes the incoming data and returns the result of the execution.
Trellis, discrete time, synchronism and determinism
One of the features of Akka and other approaches based on ectors is the fact that the order of processing messages by different actors is not defined and given to chance. Most likely, this leads to a gain in performance, but also adversely affects the predictability of the system.
Since unpredictability is much worse for us than under-utilization of the processor's potential, we decided to ensure guarantees of the sequence of signals in the main modes of operation. (A more productive, but less deterministic mode of operation is used only for optimization in rare cases.)
All signals are immersed at a discrete time and are ordered within each time slot. At the initial moment of time there is one signal at the input contact. The transition from one point of time to another occurs only when all signals of the current point in time are processed. Generated signals are ordered as if the handlers were called strictly sequentially (in single-threaded mode it is, and in multi-threaded signals are ordered by force).
If you look at the set of signals at each moment in time, then the resulting picture is called a
trellis and is used, for example, in
the Viterbi algorithm .
Since all processing is centralized, no one prevents us from drawing such a picture and analyzing the passage of signals.If the graph forks, it can be quite simple to guarantee the synchronism of the appearance of signals at the given contacts. If alternative chains have different lengths, then synchronism is provided very simply - by adding a few empty arrows (“delay lines”).Subsystems
The created system of contacts remarkably replaced the pipeline and subscriptions to events. And gradually, many components of the system were transferred to this system. However, to integrate several systems, it was necessary to provide for the existence of components with several inputs and several outputs. In contrast to the two-terminal components, analogs of which are normal functions, we did not find analogues for multi-contact components.The output was found in the use of all the same labels for distinguishing the inputs between themselves and the outputs among themselves. The component representing the subsystem obtained the following signature: type Component = Signal => Collection[Signal]
Those.
the data we want to send to the in1 input is wrapped in a Signal and marked with the in1 contact. Data appearing at the out1 output is presented Signal(out1, data)
.From the point of view of the trellis of the parent system, all processing in the child subsystem occurs in one time step. Inside the subsystem creates its own trellis.Parallelism
The subsystem turned out to be a good contender for execution in a separate thread and we first implemented a mechanism that allows to allocate any subsystem into a separate Akka-Ector.Such a scheme made it possible to cover 90% of all needs for multithreading. Akka guarantees that within one ector everything that happens can be considered single-threaded. And SynapseGrid allowed to unite any number of ectors into a connected system with observed links . In ectors built on the basis of subsystems there is no direct possibility of creating another-world communication with another subsystem. The picture of the world becomes much more transparent.The advantages of the system of ektor on SynapseGrid
, SynapseGrid, , Akka . Namely:
1. - . ().
2. .
3. , .
4. , .
However, there are still 10% of the need for multithreading, which did not give us peace of mind. Namely, if we needed to provide parallel processing of the mass of the same type data using one component, the ectors based on subsystems skidded. They processed data strictly in a consistent manner.Then it was decided to make your scheduler, which will load the work of 16 ExecutorContext threads according to the SynapseGrid semantics. The synapse-grid-concurrent library allows you to fully implement the system of arbitrary nesting on the thread pool and at the same time ensures the determinism of the single-threaded version.Conclusion
SynapseGrid allows you to build modular systems that perform urgent (ASAP) data processing without unnecessary buffering. The library is based on a functional approach, external linking and a declarative approach. The system is built using the advanced API Builder in design mode. And in runtime the system is unmodifiable (immutable).The processing logic can be executed in one or in several threads with the guarantee of obtaining identical results. SynapseGrid extends Akka by adding strongly typed inputs, outputs and connections between ejectors.The library is published on GitHub under a free license (BSD-like).As the SynapseGrid operating experience has shown, the library makes it easy to design efficient streaming data processing systems in real time.