For the past 7 years, I have been working with the team to support and develop the core of the Miro product (ex-RealtimeBoard): client-server and cluster interaction, work with the database.
We have Java with different libraries on board. Everything starts outside the container, through the Maven-plugin. At the heart of it is the platform of our partners, which allows you to work with the database and streams, manage client-server interaction, etc. DB - Redis and PostgreSQL (my colleague
wrote about how we move from one database to another ).
From the point of view of business logic, the application contains:
')
- work with user boards and their content;
- functionality for user registration, creating and managing boards;
- user resource generator. For example, he optimizes large images uploaded to the application so that they do not slow down on our clients;
- many integrations with third-party services.
In 2011, when we started, the whole Miro was on the same server. It had everything: Nginx, on which php was spinning for the site, a Java application and databases.
The product developed, the number of users and content that they added to the boards grew, so the load on the server also grew. Due to the large number of applications on our server, we couldn’t understand at that moment what exactly the load gives and, accordingly, could not optimize it. To fix this, we spread everything to different servers, and we had a web server, server with our application and server with databases.
Unfortunately, after a while, problems again arose, as the load on the application continued to grow. Then we thought about how to scale the infrastructure.

Then I will talk about the difficulties that we faced in the development of clusters and the scaling of a Java application and infrastructure.
Horizontal scaling infrastructure
We started with collecting metrics: memory and CPU usage, time for user requests, system resources, work with the database. By metrics, it was clear that the generation of user resources is an unpredictable process. We can load the processor by 100% and wait tens of seconds until everything is done. User requests for boards, too, sometimes gave an unexpected load. For example, when a user selects a thousand widgets and starts spontaneously moving them.
We started to think about how to scale these parts of the system and came up with obvious solutions.
Scale work with boards and content . Opening the board by the user looks like this: the user opens the client → indicates which board he wants to open → connects to the server → a stream is created on the server → all users of this board connect to one stream → any change or widget creation occurs within this stream. It turns out that all work with the board is strictly limited to the flow, which means we can spread these flows between servers.
Scale the generation of user resources . We can take out the server to generate resources separately, and it will receive messages to generate, and then respond that everything is generated.
It seems simple. But as soon as we began to explore this topic more deeply, it turned out that we needed to additionally solve some indirect problems. For example, if users expire a paid subscription, then we must notify them about it, on whatever board they are. Or, if the user has updated the version of the resource, you need to take care that the cache is correctly reset on all servers and we give the correct version.
We defined the system requirements. The next step is to understand how to put this into practice. In fact, we needed a system that would allow servers in a cluster to communicate with each other and on the basis of which we would implement all our ideas.
The first cluster of the “box”
We did not choose the first version of the system, because it was already partially implemented in the partner platform that we used. In it, all servers were connected to each other via TCP, and we could send RPC messages to one or all of the servers using this connection.
For example, we have three servers, they are connected to each other via TCP, and in Redis we have a list of these servers. We run a new server in the cluster → it adds itself to the list in Redis → reads the list to find out about all the servers in the cluster → connects to all.

On the basis of RPC, support has already been implemented to flush the cache and redirect users to the correct server. We had to make the generation of user resources and notify users that something happened (for example, the account has expired). To generate resources, we chose an arbitrary server and sent it a generation request, and for notifications about subscription expiration, we sent a command to all servers in the hope that the message would reach the goal.
The server itself determines to whom to send the message.
Sounds like a feature, not a problem. But the server focuses only on the availability of a connection to another server. If there are connections, then there is a candidate for sending a message.
The problem is that server number 1 does not know that server number 4 is under high load right now and cannot respond to it quickly enough. As a result, server requests â„–1 are processed more slowly than they could.

The server does not know that the second server is frozen
And what if the server is not just heavily loaded, but generally frozen? And it hovered so that it no longer comes to life. For example, exhausted all available memory.
In this case, server number 1 does not know what the problem is, so it continues to wait for an answer. The rest of the servers in the cluster also do not know about the situation with server No. 4, so they will send many messages to server No. 4 and wait for a response. So it will be up until the server number 4 will not die.

What to do? We can independently add to the system a server status check. Or we can redirect messages from “sick” servers to “healthy” ones. All this will take too much time for developers. In 2012, we had little experience in this area, so we began to look for ready-made solutions to all our problems at once.
Message broker. ActiveMQ
We decided to go in the direction of Message broker to correctly configure the communication between the servers. We chose ActiveMQ because of the ability to customize the receipt of a message to a consumer at a certain time. True, we never used this opportunity, so we could choose RabbitMQ, for example.
As a result, we transferred our entire cluster system to ActiveMQ. What it gave:
- The server no longer determines to whom to send the message, because all messages go through the queue.
- Fault tolerance is configured. To read the queue, you can run not one, but several servers. Even if one of them falls, the system will continue to work.
- The servers had roles, which made it possible to divide the servers by load type. For example, a resource generator can connect only to a queue for reading messages to generate resources, and a server with boards to a queue for opening boards.
- Made RPC communication, i.e. each server has its own private queue, where other servers send events to it.
- You can send messages to all servers via Topic, which we use to reset subscriptions.
The scheme looks simple: all servers are connected to the broker, and it manages communication between them. Everything works, messages are sent and received, resources are created. But there were new problems.
What to do when all the necessary servers are?
Suppose server number 3 wants to send a message to generate resources on the queue. He is waiting for his message to be processed. But he does not know that for some reason there is not a single recipient of the message. For example, recipients flew due to an error.
For all the waiting time, the server sends a lot of messages with a request, which is why a queue of messages appears. Therefore, when working servers appear, they must first process the accumulated queue, which takes time. On the user's side, this leads to the fact that the image being uploaded by him does not appear immediately. He is not ready to wait, so he leaves the board.
As a result, we spend server power on the generation of resources, and the result is no longer needed.

How can I solve the problem? We can set up monitoring that will notify you of what is happening. But from the moment when monitoring reports something, until the moment when we realize that our servers are bad, time will pass. It does not suit us.
Another option is to launch Service Discovery, or a registry of services, which will know which servers are running with which roles. In this case, we will immediately receive an error message if there are no free servers.
Some services cannot scale horizontally.
This is the problem of our early code, not ActiveMQ. I will show by example:
Permission ownerPermission = service.getOwnerPermission(board); Permission permission = service.getPermission(board,user); ownerPermission.setRole(EDITOR); permission.setRole(OWNER);
We have a user rights service on the board: the user can be the owner of the board or its editor. The owner of the board can only be one. Suppose we have a script when we want to transfer ownership of a board from one user to another. On the first line, we get the current owner of the board, on the second - we take the user who was the editor, and now becomes the owner. Then we put the role of the EDITOR on the current owner, and the role of the OWNER on the former editor.
Consider how this will work in a multithreaded environment. When the first thread sets the role of EDITOR, and the second thread tries to take the current OWNER, it may turn out that OWNER does not exist, but there are two EDITORs.
The reason is the lack of synchronization. We can solve the problem by adding a synchronize block on the board.
synchronized (board) { Permission ownerPermission = service.getOwnerPermission(board); Permission permission = service.getPermission(board,user); ownerPermission.setRole(EDITOR); permission.setRole(OWNER); }
This solution will not work in a cluster. We could help with this SQL database with the help of transactions. But we have Redis.
Another solution is to add distributed locks to the cluster so that synchronization is inside the entire cluster, and not just a single server.
Single point of failure when entering the board
The model of interaction between the client and the server is stateful. So we have to keep the state of the board on the server. Therefore, we have made a separate role for the servers - BoardServer, which handles user requests related to the boards.
Imagine that we have three BoardServer, one of which is the main one. The user sends him a request “Open a board with id = 123” → the server looks in its database whether the board is open and on which server it is. In this example, the board is open.

The main server responds that you need to connect to server # 1 → the user connects. Obviously, if the main server dies, the user will not be able to access the new boards.
Then why do we need a server that knows where the boards are open? So that we have a single decision point. If something happens to the servers, we need to understand whether the board is actually available in order to remove the board from the registry or rediscover it somewhere else. It would be possible to organize this with the help of a quorum, when several servers solve a similar problem, but at that time we did not have the knowledge to implement the quorum on our own.
Switch to Hazelcast
Anyway, we coped with the problems that have arisen, but it may not be the most beautiful way. Now we needed to understand how to solve them correctly, so we formulated a list of requirements for a new cluster solution:
- We need something that will monitor the status of all servers and their roles. Let's call this Service Discovery.
- We need cluster locks that will guarantee consistency when performing dangerous queries.
- We need a distributed data structure that will ensure that the boards lie on certain servers and inform you if something went wrong.
It was 2015. We opted for Hazelcast - In-Memory Data Grid, a cluster system for storing information in RAM. Then we thought that we had found a miracle solution, the holy grail of the world of cluster interaction, a miracle framework that can do everything and combines distributed data structures, locks, RPC messages and queues.

As is the case with ActiveMQ, we transferred almost everything to Hazelcast:
- generation of user resources through ExecutorService;
- distributed blocking when rights are changed;
- server roles and attributes (Service Discovery);
- unified register of open boards, etc.
Hazelcast Topologies
Hazelcast can be configured in two topologies. The first option is the Client-Server, when the members are located separately from the main application, they themselves form a cluster, and all applications connect to them as a database.

The second topology is embedded when the Hazelcast members are embedded in the application itself. In this case, we can use fewer instances, data access is faster, because the data and the business logic itself are in the same place.

We chose the second solution because we considered it more efficient and cost-effective in implementation. Effective, because the speed of access to Hazelcast data will be lower, because perhaps this data lies on the current server. Economical, because we don’t need to spend money on additional instances.
Cluster hangs when member hangs
A couple of weeks after turning on the Hazelcast, problems appeared on the prode.
At first, our monitoring showed that one of the servers began to gradually overload the memory. While watching this server, the rest of the servers also began to load: the CPU grew, then the RAM, and in five minutes all the servers used all the available memory.
At this point in the consoles, we saw the following messages:
2015-07-15 15:35:51,466 [WARN] (cached18) com.hazelcast.spi.impl.operationservice.impl.Invocation: [my.host.address.com]:5701 [dev] [3.5] Asking ifoperation execution has been started: com.hazelcast.spi.impl.operationservice.impl.IsStillRunningService$InvokeIsStillRunningOperationRunnable@6d4274d7 2015-07-15 15:35:51,467 [WARN] (hz._hzInstance_1_dev.async.thread-3) com.hazelcast.spi.impl.operationservice.impl.Invocation:[my.host.address.com]:5701 [dev] [3.5] 'is-executing': true -> Invocation{ serviceName='hz:impl:executorService', op=com.hazelcast.executor.impl.operations.MemberCallableTaskOperation{serviceName='null', partitionId=-1, callId=18062, invocationTime=1436974430783, waitTimeout=-1,callTimeout=60000}, partitionId=-1, replicaIndex=0, tryCount=250, tryPauseMillis=500, invokeCount=1, callTimeout=60000,target=Address[my.host2.address.com]:5701, backupsExpected=0, backupsCompleted=0}
Here, Hazelcast checks whether the operation that was sent to the first — “dying” —server is running. Hazelcast tried to keep a finger on the pulse and checked the status of the operation several times per second. As a result, he spammed all the other servers with this operation, and after a few minutes they flew out of memory, and we collected several GB of logs from each of them.
The situation was repeated several times. It turned out that this is a bug in Hazelcast version 3.5, in which the heartbeating mechanism was implemented, checking the status of requests. It did not check some of the boundary cases that we encountered. I had to optimize the application in order not to fall into these cases, and after a few weeks Hazelcast fixed the error.
Frequent adding and deleting members from Hazelcast
The next problem we discovered is adding and removing members from Hazelcast.
First, I will briefly describe how Hazelcast works with partitions. For example, there are four servers, and each stores some part of the data (in the figure they are of a different color). The unit is the primary partition, the two is the secondary partition, i.e. backup of the main partition.

When you turn off any server partitions are sent to other servers. In case the server dies, the partitions are distilled from it, and from those servers that are still alive and keep backup of these partitions.

This is a reliable mechanism. The problem is that we often turn on and off servers for load balancing, and rebalancing the partitions also takes time. And the more servers it works and the more data we store in Hazelcast, the more time it takes to rebalance the partitions.
Of course, we can reduce the number of backups, i.e. secondary partitions But it is not safe, because something will definitely go wrong.
Another solution is to switch to the Client-Server topology so that switching servers on and off does not affect the main Hazelcast cluster. We tried to do this, and it turned out that RPC requests cannot be performed on clients. Let's see why.
To do this, consider an example of sending one RPC request to another server. We take ExecutorService, which allows you to send RPC messages, and do submit with a new task.
hazelcastInstance .getExecutorService(...) .submit(new Task(), ...);
By itself, the task looks like a normal Java class that implements Callable.
public class Task implements Callable<Long> { @Override public Long call() { return 42; } }
The problem is that Hazelcast clients can be not only Java applications, but also with ++ applications, .NET and others. Naturally, we cannot generate and convert our Java class to another platform.
One option is to switch to using http requests in case we want to send something from one server to another and get an answer. But then we have to partially abandon the Hazelcast.
Therefore, we chose to use queues instead of ExecutorService as a solution. To do this, we have independently implemented a mechanism to wait for the execution of an element in the queue, which processes boundary cases and returns the result to the requesting server.
What we have learned
Lay flexibility in the system. The future is constantly changing, so there are no perfect solutions. You can’t do it right away, but you can try to be flexible and put it in the system. This allowed us to postpone important architectural decisions until the moment when they cannot be not taken any more.
Robert Martin in Pure Architecture writes about this principle:
“The goal of the architect is to create a form for the system that makes the policy the most important element, and the details - not related to the policy. This will delay and delay decisions on the details. ”
Universal tools and solutions do not exist. If it seems to you that some kind of framework solves all your problems, then most likely it is not. Therefore, when implementing any framework, it is important to understand not only what problems it will solve, but also what it will bring with it.
Do not immediately rewrite everything. If you encounter a problem in architecture and it seems that the only right decision is to write everything from scratch, wait. If the problem is really serious, find a quick fix and watch how the system will work in the future. Most likely, it will not be the only problem in architecture, with time you will find more. And only when you pick up a sufficient number of problem areas, you can begin to refactor. Only in this case will there be more advantages from it than its cost.