📜 ⬆️ ⬇️

How we did monitoring requests mongodb


The use of Mongi in production is a rather controversial topic.
On the one hand, everything is simple and convenient: put the data, set up replication, we understand how to shard the base with the growth of data. On the other hand, there are quite a few scary stories , Aphyr in his last jepsen test did not draw very positive conclusions.


In fact, it turns out that there are quite a few projects where mongo is the main data repository, and we were often asked about mongodb support per meter. We spent a long time with this task, because it is much more difficult to make "sensible" monitoring than to just collect some metrics and set up some alerts. You must first understand the features of the behavior of software to understand exactly which indicators to track.


Just about the complexity and problems I want to tell you on the example of the implementation of monitoring requests to mongodb.


You need to look at any database from three sides:



We have so far limited only to monitoring requests.


Since we are talking about monitoring, we are not interested in each specific request, we rather want to group all requests according to some identical execution plan (for example, postgresql in pg_stat_statements groups requests according to a real plan).


For mongodb, the query identifier is the type of query (find, insert, update, findAndModify, aggregate, and others), the database, the collection, and the bson document with the query itself.
For simplicity, we decided that requests can be grouped by replacing all the field values ​​from the request with "?" and sorted by field.


For example, the query:


{"country": "RU", "city": "Moscow", "$orderby": {"age": -1}} 

turn into


 {country: ?, city: ?, $orderby: {age: ?}} 

and then sort by keys


 {$orderby: {age: ?}, city: ?, country: ?} 

Most likely, such queries will use the same indices, regardless of specific conditions.


The next big question is how to get the whole stream of requests in real time.


The only regular way in mongodb is profiler . He writes statistics for each request in a limited collection (capped collection). The profiler can record either only slow requests (if the execution time is longer than the time specified in slowOpThresholdMs ) or record absolutely all requests. In the second case, the performance of the mongodb itself can subside.


The advantages of this approach is to include very detailed statistics on the performance of each request.


But for us it is very critical not to have a negative impact on the performance of our clients' servers, so we cannot use the profiler in the recording mode for all requests. Only "slow" requests are not enough for us, since we will not see the full picture:



In our experience, problems often create high-frequency queries that were previously performed by 1ms, and then for some reason, for example, 5ms were executed. And requests> 100ms (default slowOpThresholdMs) are usually service (admin / statistics) and very rare.


Since the standard profiler did not fit, we began to dig in the direction of traffic sniffing. At the first stage it was necessary to clarify a number of questions:



The prototype of our mongodb plugin was written in a few days using the gopacket library. We intercepted packets through libpcap, parsed the protocol, bson documents were deserialized using mgo .


Since we do not have a mongodb installation under load, we made a stand and launched a finished benchmark . In our case, mongodb and the sinker lived on the same virtual machine with 2 cores and 2Gb of memory. By load, we saw about 10 thousand packets per second with ~ 60Mbit / s traffic.


Our prototype under such a load utilized about 70% of a single processor core. It became clear that it was necessary to profile and optimize the code. Here it is worthwhile to pay tribute to the standard golang profiler , we did not need to invent anything, but simply to tune the most voracious sections of the CPU code and try to allocate memory as little as possible to reduce the load on the GC.


I can’t reproduce the optimization process exactly, but I’ll give examples of the most significant changes:


bson.Unmarshal slow


Bson request document in mongo is roughly a dictionary, the meanings of which can be including the same dictionaries.
Since from the very beginning we decided that we would normalize queries, we can not read the values ​​of the elements of the source dictionary at all if they are not dictionaries.
We take the specification and write our primitive deserializer. The result was a function of ~ 100 lines.


For example, I will give a piece of analysis of the dictionary element
 elementValueType, err = reader.ReadByte() if err != nil { break } payload, err = reader.ReadBytes(nullByte) if err != nil { break } elementName = string(payload) switch elementValueType { case bsonDouble, bsonDatetime, bsonTimestamp, bsonInt64: if _, err = reader.ReadN(8); err != nil { break } case bsonString: l, err = reader.ReadInt() if err != nil { break } payload, err = reader.ReadN(l) if err != nil { break } elementValue = string(payload[:len(payload)-1]) case bsonJsCode, bsonDeprecated, bsonBinary, bsonJsWithScope, bsonArray: l, err = reader.ReadInt() if err != nil { break } if _, err = reader.ReadN(l - 4); err != nil { break } case bsonDoc: elementValue, _, _, err = readDocument(reader) if err != nil { break } case bsonObjId: if _, err = reader.ReadN(12); err != nil { break } case bsonBool: if _, err = reader.ReadByte(); err != nil { break } case bsonRegexp: if _, err = reader.ReadBytes(nullByte); err != nil { break } if _, err = reader.ReadBytes(nullByte); err != nil { break } case bsonDbPointer: l, err = reader.ReadInt() if err != nil { break } if _, err = reader.ReadN(l - 4 + 12); err != nil { break } case bsonInt32: if _, err = reader.ReadN(4); err != nil { break } } 

Of all the options for the fields, we only read the values ​​for bsonDocument (recursively calling ourselves the same) and bsonString (we have additional logic for defining the collection and the type of request), we just skip the other fields.


How to catch packages


In our tests, the use of raw sockets directly turned out to be faster than through pcap.
Maybe it was because of the old version of libpcap, but we planned to do a sniffer only under linux, so we decided not to figure it out, but to use gopacket.af_packet (all the more, there is no need to link the agent with libpcap).


Raw sockets are special sockets in linux, through which you can send a fully formed packet to userspace (and not the kernel) or receive packets from a specific network interface. If we talk about sniffing, packets from the kernel get into userspace through a cyclic buffer, which allows you not to do syscall to intercept each packet. There is a detailed hardcore in the kernel documentation on this topic.


ZeroCopy


Since we process packets in one stream, we can use the " ZeroCopy " sniffer interface. But at the same time it must be remembered that references to this area of ​​memory cannot be further left in the code.


Parsing packages


The package parsing interface in gopacket is quite flexible, it supports many different protocols out of the box, the user does not need to think about how top-level data is encapsulated. But at the same time, this interface imposes the need for a large number of data copying and, as a consequence, a large load on both the CPU and the GC.


We again decided to throw away all unnecessary :)


Our task from the original ethernet frame (and at the AF_PACKET output we always get ethernet) to get:



For simplicity, it was decided not to support IPv6 yet.


The result was such a terrible function.
 func DecodePacket(data []byte, linkType layers.LinkType, packet *TcpIpPacket) (err error) { var l uint16 switch linkType { case layers.LinkTypeEthernet: if len(data) < 14 { ethernetTooSmall.Inc(1) err = errors.New("Ethernet packet too small") return } l = binary.BigEndian.Uint16(data[12:14]) switch layers.EthernetType(l) { case layers.EthernetTypeIPv4: data = data[14:] case layers.EthernetTypeLLC: l = uint16(data[2]) if l&0x1 == 0 || l&0x3 == 0x1 { data = data[4:] } else { data = data[3:] } default: ethernetUnsupportedType.Inc(1) err = errors.New("Unsupported ethernet type") return } default: unsupportedLinkProto.Inc(1) err = errors.New("Unsupported link protocol") return } //IP var cmp int if len(data) < 20 { ipTooSmallLength.Inc(1) err = errors.New("Too small IP length") return } version := data[0] >> 4 switch version { case 4: if binary.BigEndian.Uint16(data[6:8])&0x1FFF != 0 { ipNonFirstFragment.Inc(1) err = errors.New("Non first IP fragment") return } if len(data) < 20 { ipTooSmall.Inc(1) err = errors.New("Too small IP packet") return } hl := uint8(data[0]) & 0x0F l = binary.BigEndian.Uint16(data[2:4]) packet.SrcIp[0] = data[12] packet.SrcIp[1] = data[13] packet.SrcIp[2] = data[14] packet.SrcIp[3] = data[15] packet.DstIp[0] = data[16] packet.DstIp[1] = data[17] packet.DstIp[2] = data[18] packet.DstIp[3] = data[19] if l < 20 { ipTooSmallLength.Inc(1) err = errors.New("Too small IP length") return } else if hl < 5 { ipTooSmallHeaderLength.Inc(1) err = errors.New("Too small IP header length") return } else if int(hl*4) > int(l) { ipInvalieHeaderLength.Inc(1) err = errors.New("Invalid IP header length > IP length") return } if cmp = len(data) - int(l); cmp > 0 { data = data[:l] } else if cmp < 0 { if int(hl)*4 > len(data) { ipTruncatedHeader.Inc(1) err = errors.New("Not all IP header bytes available") return } } data = data[hl*4:] case 6: ipV6IsNotSupported.Inc(1) err = errors.New("IPv6 is not supported") return default: ipInvalidVersion.Inc(1) err = errors.New("Invalid IP packet version") return } //TCP if len(data) < 13 { tcpTooSmall.Inc(1) err = errors.New("Too small TCP packet") return } packet.SrcPort = binary.BigEndian.Uint16(data[0:2]) packet.DstPort = binary.BigEndian.Uint16(data[2:4]) packet.Seq = binary.BigEndian.Uint32(data[4:8]) dataOffset := data[12] >> 4 if dataOffset < 5 { tcpInvalidDataOffset.Inc(1) err = errors.New("Invalid TCP data offset") return } dataStart := int(dataOffset) * 4 if dataStart > len(data) { tcpOffsetGreaterThanPacket.Inc(1) err = errors.New("TCP data offset greater than packet length") return } packet.Payload = data[dataStart:] return } 

For such functions, it is always worth writing benchmarks , this time a rather nice picture turned out:


 Benchmark_DecodePacket-4 50000000 27.9 ns/op Benchmark_Gopacket-4 1000000 3351 ns/op 

That is, we received an acceleration of more than 100 times.


A significant part of the code of this function is error handling, there you can see the increments of different counters from which we later make the agent's service metrics and we can easily understand why the sniffer is somehow wrong with us. For example, we are going to find out about the need to add IPv6 support by just this metric.


We are also not trying to glue tcp payload from different packages, in the case when the data does not fit into the 1 ethernet frame.
If such a packet is mongodb's answer, we are only interested in the header, and for large insert queries, for example, we simply take part of the query from the first packet.


Double packs


It turned out that if the client and server are on the same server, then we catch the same package 2 times.
I had to do a simple packet deduplicator based on src ip + port, dest ip + port and TCP seq.


Total



')

Source: https://habr.com/ru/post/308328/


All Articles