The main project of the company in which I work is dedicated to optimizing ad impressions in Facebook applications and mobile devices. To date, the project serves up to 400 million unique visitors per month, working on more than a thousand virtual servers. The number of servers and data volumes, which should be processed twenty-four hours a day, poses a number of interesting problems for developers regarding scalability and stability of the system.
Optimization of impressions is a large process, one of the parts of which is the preservation and analysis of a chain of events related to the life cycle of a banner — showing, clicking, converting, ... it all starts with saving records of events. Each of the events takes place on one of the many servers, and, for obvious reasons, we try to serve the entire chain in one place - in this case, we don’t need to worry about how to assemble the scattered parts into a whole. But in real life, anything happens - the server crashes, the network does not work, the software is upgraded or overloaded - in general, for many reasons the maintenance of successive events sometimes occurs on different servers and even in different data centers and you need to be prepared for this.
The task to be solved is how to store, search, modify information about the sequence of events under the following conditions:
')
- events can occur on different servers and in different data centers (east and west coast of the USA, Europe)
- the interval between events - from fractions of a second to several days
- by the time of receiving the final event (for example, conversion) information about the entire chain should be on hand
- information lifetime is about ten days, after which it should be deleted, preferably automatically, via TTL
- event reading / writing rate - hundreds or thousands per second
- Response time: desirable - up to 10ms, permissible - within 50ms, maximum - up to 100ms
- information should be available “always” - regardless of hardware accidents, network, upgrades
- the system should be easy to scale: adding new servers, data centers should occur transparently for other services (degradation of response time within specified limits is acceptable).
The last two points are very important for business and are simply vital for Ops engineers if they want to quietly perform their duties during the day and sleep well at night.
By the time we got some definiteness with the task, we had a little of our own experience of purely researching the operation of cassandra. The experience was quite positive, in addition, it was clear that it is properly designed, has an active community, serious companies and developers behind it. All this taken together, plus the fact that she ideally fell on the task, determined the solution to try it out in a real battle.
Iron selection
The biggest concern was the requirement for response time frames - at the level of 10ms. The second parameter that needed to be observed is to have free disk space for all internal cassandra tasks (compaction, repair, ...). Third, the iron must be “simple” in order for it to be easily replaced in case of accidents and added during expansion.
Response times
Cassandra is super fast on writing. This is ensured by the fact that the data gets into the table in memory, and in the commit log, which means: writing to the disk always goes consistently, there are no significant movements on the disk (therefore, the commit log and random access tables are recommended to keep different disks).
The read speed depends on many parameters and ultimately rests on the speed of random disk access, if the data does not fit entirely in memory, and on the network speed, if the data needed to answer the request are on different servers. Among the methods of dealing with delays in the response to reading are an increase in RAM, the use of compressed tables (appeared in versions of Cassandra 1.0.x), careful planning of the location and data schema, and queries to the database. As you can see, not everything is in the hands of operations, so you have to plan with some margin.
As a result of tests and tests, we decided to stop at version 8 of RAM, 15000rpm SAS disk for storing tables, and the usual 7200rpm SATA for commit log (on the same disk, Cassandra itself is installed)
The choice of processor model is not so important, we use Intel's Q9400 @ 2.66GHz, X3470 @ 2.93GHz processors.
The choice of 8 gigabytes of memory and the default cassandra parameters for the JVM memory later demanded tweaks, since in some situations we were faced with large costs for the garbage collection in the JVM.
To date, the response times of Cassandra are almost always within 5ms, increasing during compaction to 30ms, and flying out for a short time before the level of 100ms in case of network problems.
Free disk space
Having a stock of free disk space with tables is very important. No, it is simply critical. If you watch this poorly, you may find that one or several nodes suddenly turned out to be inoperable. We calculated the amount of data on the disk by multiplying the size of the event record by the expected number of simultaneously stored records and by the replication factor, and then dividing the result by the number of nodes in the ring.
Cassandra in normal operation (lack of compaction, adding a new node) needs to have a minimum of 50% reserve of the disk, because the next compaction can temporarily double, and the repair process in unsuccessful cases and triple the space consumed. Therefore, we took the disk volume with a triple reserve (we got 300G) and set the levels for disk usage in Nagios: warning - 30%, critical - 50%. Until today, the practice confirms the correctness of the calculations, in the usual situation we occasionally fly out to the warning and never to critical. During repair, the occupied space increases to 50-60%, occasionally to 70%.
Commissioning
Packages
Before commissioning, we prepared Debian packages, including both Cassandra itself and all the necessary local settings, Munin's plugins, Nagiosovsky checks. Manual work when starting a new node is minimized: we manually partition the disk and manually assign an initial token.
The cluster began with two nodes in one data center on the east coast of America, to which pairs of nodes in other data centers were gradually added to a single ring. The decision to add nodes to the ring without regard to geography was erroneous, because as soon as I had to connect a data center in Europe, it became clear that the cross-data center delays between the US and Europe at 100ms would not allow them to withstand the necessary response times. Fortunately, Cassandra has a mechanism to manage the localization of replicas by data centers - NetworkTopologyStrategy, to which we had to switch right under load. After the experiment on the test ring, it became clear how to do this painlessly and with minimal impact on the system. Within a few minutes immediately after the switch, part of the data was inaccessible for reading, which no one really noticed.
Data centers
Now our cluster consists of twelve nodes in six data centers. Starting a new data center consists in ordering the servers of the desired configuration, marking the disk, selecting the necessary initial token for new nodes and installing packages. The entire launch procedure on the new server takes up to an hour, taking into account the time of initial loading of data replicas (up to 60G).
Monitoring and Documentation
Even at the stage of experimenting with cassandra, Munin's plugins were written that gave us all the necessary information about the nodes — response times for each keyspace / columnfamily, the number of reads, writes, compressions, etc. queuing, the amount of data on each node and the number of tables on the disk, etc. etc. The presence of these graphs is very important for understanding what is happening in the long term or what happens during any internal procedures such as repair, compaction.
Finding out the correlation between different parameters allows us to understand how to avoid an increase in the base response time in difficult moments of the cluster life.
We tried to use OpsCenter from Datastax (it makes it possible to see the status not only in the long term, but also in realtime mode and manage the nodes from a single center), but due to certain problems with the software configuration we could not use it - we had to write our own monitor that reduces the current values ​​of the most important metrics in one browser window. Example of output for two servers in normal operation at night:
Node name | Read lat, ms | Write lat, ms | Read, ops / s | Write, ops / s | Disk,% | Java% | Data, G | Streams, send G | Streams, send% | Streams, recv G | Streams, recv% | Comp, G | Comp, % |
node1 | 0.7 | 0.6 | 66.7 | 10.8 | 15.0 | 60.1 | 39.4 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 |
node2 | 0.7 | 0.6 | 66.5 | 10.3 | 15.0 | 20.7 | 39.7 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 |
those two servers, but repair goes on the second:
Node name | Read lat, ms | Write lat, ms | Read, ops / s | Write, ops / s | Disk,% | Java% | Data, G | Streams, send G | Streams, send% | Streams, recv G | Streams, recv% | Comp, G | Comp, % |
node1 | 4.1 | 0.6 | 54.3 | 9.5 | 17.1 | 60.1 | 44.9 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 |
node2 | 2.6 | 0.5 | 54.5 | 9.5 | 18.5 | 55.2 | 48.9 | 1.9 | 22.3 | 0.0 | 0.0 | 2.5 | 1.2 |
In addition, we have documented the main procedures for ourselves either in the corporate wiki or in the form of detailed comments on all issues related to Cassandra.
Punched cones
NTS
The mistake that was made from the very beginning was the use of Simple Strategy instead of Network Topology Strategy, which led to problems as soon as the nodes in the remote data center were launched. I had to correct the error on the go, although nothing prevented the use of NTS right away. Now we have replication arranged so that we keep one replica of data in each data center. NetworkTopologyStrategy allows you to describe the cluster topology - in which data center, in which rack is a particular node, after which Cassandra optimizes data access. Any recording or reading is served locally and only if the required node in this data center is not available, the client is waiting for data from another.
client filer
Our application is written in java and we use the Hector library to work with the database. Hector is pretty smart and offers the client different policies for selecting a cluster node for connection. The first option we stopped at - every client worked with the node from which he received the fastest answer, seemed intuitively correct, but the reality turned out to be cruel - it turned out that if one of the nodes responded noticeably faster than the others, all customers rushed at him and in fact arranged his ddos. As soon as the node began to respond slowly because of the increased load, everyone rushed to the other. I had to urgently rewrite the logic - now the clients walk around all the “live” nodes in their data center in a circle - so that within the datacenter the load on the nodes is evenly distributed. If a node does not respond within 100 ms, the client marks it for himself as “dead” and calls him occasionally to check its status. This completely solved our problem.
Compactions, repair
Cassandra stores data on a disk in the form of SSTable tables sorted by key (along with several auxiliary files for each table). Each new SSTable is formed when the MemTable from memory is reset to disk. Once recorded, SSTable never changes. The only way to get rid of old SSTable and data is the compaction procedure, which collects several (or all) SSTable into one. With this build, a lot of important work is done — obsolete tombstones (deleted data marker) are deleted, TTL obsolete data is marked as deleted and tombstone for them is placed in the new SSTable. Some types of compaction are performed automatically in the background, some can be initiated manually. As a result of this compression, new tables appear (usually with a smaller number and volume), consisting of a mixture of not yet obsolete tombstones and data.
Reducing the impact of a heavy compression / merge table operation consists in optimally selecting the merge frequency (according to the size of the tables involved), the degree of parallelism (reducing the number of parallel merges increases the operation time but reduces the load on the disk) and allowed disk traffic for the compression operation.
The repair operation synchronizes data and tombstones between replicas. Although Cassandra has synchronization tools “on the go” for the case if the recording on one of the replicas failed the first time, all these tools do not provide a one hundred percent guarantee. Therefore, for some data processing scenarios, periodically starting repair is recommended, and for some it is required. Since this process can involve the transfer of fairly large amounts of data between nodes (and between data centers), it is important to minimize discrepancies in the data immediately before repair. The selection of the optimal repair procedure and preparation for it took quite a long time, since the operation is difficult and too often you do not experiment.
As a result, we have developed the following repair startup strategy: the procedure starts at night once a week; preliminarily for small ColumnFamily there is a complete merging of tables on the disk (this process takes up little resources), and a complete merging of “old” (older than four days) tables for large ColumnFamily. The last operation cleans a lot of tombstones on the one hand and consumes relatively few resources on the other. By the time repair is launched, the minimum data stored on the disk and the process of calculating the difference and synchronization between the nodes takes minimum time — about an hour.
Repair especially suffers from network problems - if during synchronization the connection is interrupted for a long time (minutes) or there are large packet losses, then the exchange between nodes that occurs over TCP is stuck. In new versions of Cassandra, you can specify a timeout for such a hang.
Memory and garbage collections
Another subtle point is the use of JVM memory. Especially the load on heap grows during parallel compactions and repair. The default settings may not be suitable for you, you will have to experiment.
Recommendations
- Determine as early as possible the nature of the load and the parameters that you need to provide for the application. Request detailed information from the task director. Design your iron based on your analysis. Ask the developers to carefully design the data storage in the database.
- Provide yourself a reserve in place and at the speed of the disk and the amount of RAM if the response times are important for you.
- Use the same hardware and software on the nodes, do not breed the zoo.
- Prepare detailed monitoring of everything. It is unknown in advance what data will be needed for analyzing problems.
- Automate the installation of new nodes.
- Understand in detail the life cycle of your data in Cassandra: how and where they are stored, how they are read, how they leave the database.
findings
Over the past year, we launched three clusters, of which two are working in production-projects, and one is under development. In general, it can be said that Cassandra is a paradise for operations, subject to careful attention. During the year of operation, we did not have a single problem with cassandra on a separate node or cluster outflow in general, despite frequent hardware crashes and replacing / adding individual servers, regular upgrades, continuous experiments on live nodes with different settings and processes. In the end, both stability and scalability are provided by one basic principle - all nodes play the same role and are not closely connected with each other.