📜 ⬆️ ⬇️

Million WebSocket and Go

image


Hello to all! My name is Sergey Kamardin, I am a programmer for the Mail.Ru Mail team.


This is an article about how we developed a high-loaded WebSocket server on Go.


If the topic of WebSocket is close to you, but Go is not quite, I hope the article will still seem interesting to you from the point of view of ideas and optimization techniques.


1. Preface


To indicate the context of the story, it is worth saying a few words about why we needed such a server.


Mail.Ru Mail has a lot of systems, the state of which is changing. Obviously, such a system is the repository of user letters. You can learn about state changes — about events — in several ways. Basically, it is either a periodic polling of the system, or - in the opposite direction - notifications from the system about a change in its state.


Both methods have their pros and cons, but if we talk about mail, then the sooner the user receives a new letter, the better. Polling in the mail is about 50 thousand HTTP requests per second, 60% of which return the status 304, which means there is no change in the box.


Therefore, in order to reduce server load and speed up the delivery of letters to users, it was decided invent a bicycle write a publisher-subscriber server (aka bus, message-broker or event-channel), which, on the one hand, receives status messages, and on the other hand, subscribes to such messages.


It was:


+-----------+ +-----------+ +-----------+ | | â—„-------+ | | â—„-------+ | | | Storage | | API | HTTP | Browser | | | +-------â–ş | | +-------â–ş | | +-----------+ +-----------+ +-----------+ 

It became:


  +-------------+ +---------+ WebSocket +-----------+ | Storage | | API * | +-----------â–ş | Browser | +-------------+ +---------+ (3) +-----------+ + (2) â–˛ | | (1) â–Ľ + +---------------------------------+ | Bus | +---------------------------------+ 

The first diagram shows how it was before. The browser periodically went to the API and asked about changes to the Storage (store letters).


On the second - a new version of the architecture. The browser establishes a WebSocket connection to the API over which the Storage event is notified. The API is a client to the Bus server and sends it the data of its subscribers (this server will not be discussed today; perhaps I will tell about it in the following publications). At the time of receiving a new letter, Storage sends a notification about this to the Bus (1), Bus to its subscribers (2). The API determines which connection to send the received notification to, and sends it to the browser to the user (3).


As you might have guessed, today we’ll talk about the API, or WebSocket server. Looking ahead, I will say that there will be about 3 million live connections on the server. This figure will emerge more than once in the subsequent story about optimizations.


2. Idiomatic way


Let's take a look at how we would implement some parts of our server, using Go capabilities, without using system calls in our own code.


Before we consider working with net/http , let's talk about sending and receiving data. The data that is above the WebSocket protocol (for example, json envelopes), hereinafter I will begin to call packages . Let's start the implementation of the Channel structure, which will contain the logic for receiving and sending packets through a WebSocket connection.


2.1. Channel struct


 // Packet represents application level data. type Packet struct { ... } // Channel wraps user connection. type Channel struct { conn net.Conn // WebSocket connection. send chan Packet // Outgoing packets queue. } func NewChannel(conn net.Conn) *Channel { c := &Channel{ conn: conn, send: make(chan Packet, N), } go c.reader() go c.writer() return c } 

I want to draw your attention to the launch of two gorutin reading and writing. Each gorutina needs its own stack, which, depending on the operating system and the Go version, can have an initial size of 2 to 8 KB. If we take into account the figure mentioned earlier (3 million live connections), then we will need 24 GB of memory for all connections (with a stack of 4 KB). And this is without taking into account the memory allocated to the Channel structure, the queue of outgoing ch.send packets ch.send and other internal fields.


2.2. Gorutiny I / O


Let's look at the implementation of the "reader" from the connection:


 func (c *Channel) reader() { // We make buffered read to reduce read syscalls. buf := bufio.NewReader(c.conn) for { pkt, _ := readPacket(buf) c.handle(pkt) } } 

Simple enough, right? We use a buffer to reduce the number of syscalls for reading and read as many buf as it allows us. In an infinite loop, we expect new data to come into the connection and read the next packet. I’ll ask you to remember the words, we ’ll expect new data to arrive : we'll return to them later.


Parsing and processing incoming packets will be left aside, since it does not matter for those optimizations that will be discussed. But buf is worth paying attention to now: by default it is 4 KB, which means it is another 12 GB of memory. A similar situation with the "writer":


 func (c *Channel) writer() { // We make buffered write to reduce write syscalls. buf := bufio.NewWriter(c.conn) for pkt := range c.send { _ := writePacket(buf, pkt) buf.Flush() } } 

We iterate through the outgoing c.send packet c.send and write them to the buffer. This, as an attentive reader could already guess, is another 4 KB and 12 GB of memory for our 3 million connections.


2.3. HTTP


We have a simple implementation of the Channel , now we need to get a WebSocket-connection with which we will work. Since we are still under the heading Idiomatic way , we will do it in the appropriate key.


If you are not familiar with how WebSocket works, then you should say that the client switches to the WebSocket protocol using a special mechanism in HTTP called Upgrade. After successful processing of the Upgrade request, the server and client use a TCP connection to exchange binary WebSocket frames.

Here the frame structure inside the connection is described.

 import ( "net/http" "some/websocket" ) http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) { conn, _ := websocket.Upgrade(r, w) ch := NewChannel(conn) //... }) 

Note that the http.ResponseWriter inside itself contains the bufio.Writer write buffer, and to initialize *http.Request , the bufio.Reader read bufio.Reader also allocated bufio.Reader .


Regardless of the WebSocket library used, after a successful response to an Upgrade request, the server receives I / O buffers along with a TCP connection when calling responseWriter.Hijack() .


Hint: in some cases, using go:linkname you can return buffers to the net/http pool by calling net/http.putBufio{Reader,Writer} .

Thus, we need another 24 GB of memory for 3 million connections.


Total already 72 GB of memory for an application that still does nothing!


3. Optimization


It is worth refreshing what we have said in the preface, and remember how the user connection behaves. After switching to WebSocket, the client sends a packet with events of interest to it — that is, subscribes to events. After that (apart from technical messages like ping/pong ), the client can send nothing more for the entire lifetime of the connection.


The lifetime of a connection can be from several seconds to several days.

It turns out that our Channel.reader() and Channel.writer() most of the time are waiting for data processing to receive or send. And along with them, data is waiting for I / O buffers, each 4 KB.


Now it’s obvious that you can do some things better, right?


3.1. netpoll


Remember the implementation of Channel.reader() , which waited for new data to conn.Read() in, blocking on the call to conn.Read() inside bufio.Reader ? If there is data in the connection, the runtime go “woke up” our mountain and allowed us to read the next packet. After that, the gorutin was again blocked while waiting for new data. Let's see how runtime in go understands that it is necessary to “wake up” Goretin.


Looking at the conn.Read() , we will see that the net.netFD.Read() :


 // net/fd_unix.go func (fd *netFD) Read(p []byte) (n int, err error) { //... for { n, err = syscall.Read(fd.sysfd, p) if err != nil { n = 0 if err == syscall.EAGAIN { if err = fd.pd.waitRead(); err == nil { continue } } } //... break } //... } 

Sockets go non-blocking. EAGAIN says that there is no data in the socket, and in order not to block from reading from an empty socket, the OS returns us control.

We see that the read() system call from the file connection descriptor occurs. In the event that reading returns an EAGAIN , the runtime makes pollDesc.waitRead() :


 // net/fd_poll_runtime.go func (pd *pollDesc) waitRead() error { return pd.wait('r') } func (pd *pollDesc) wait(mode int) error { res := runtime_pollWait(pd.runtimeCtx, mode) //... } 

If you dig deeper , we will see that in Linux netpoll implemented epoll . Why don't we use the same approach for our connections? We could allocate a buffer for reading and run gorutin only when it is really necessary: ​​when there is data in the socket for sure.


At github.com/golang/go, there is an issue on the export of netpoll features.

3.2. Getting rid of gorutin


Suppose we have a netpoll implementation for Go. Now we can not run Channel.reader Channel.reader() with the buffer inside, but “subscribe” instead to the data availability event in the connection:


 ch := NewChannel(conn) // Make conn to be observed by netpoll instance. // Note that EventRead is identical to EPOLLIN on Linux. poller.Start(conn, netpoll.EventRead, func() { // We spawn goroutine here to prevent poller wait loop // to become locked during receiving packet from ch. go Receive(ch) }) // Receive reads a packet from conn and handles it somehow. func (ch *Channel) Receive() { buf := bufio.NewReader(ch.conn) pkt := readPacket(buf) c.handle(pkt) } 

With Channel.writer() the situation is simpler - we can run gorutina and allocate a buffer only when we are going to send a package:


 func (ch *Channel) Send(p Packet) { if c.noWriterYet() { go ch.writer() } ch.send <- p } 

After reading the outgoing packets from ch.send (one or more), the writer will complete its work and release the stack and buffer.


Fine! We saved 48 GB — we got rid of the stack and I / O buffers inside two constantly “working” Gorutin.


3.3. Resource control


A large number of connections is not only a large memory consumption. During the development of the server, we had more than once race conditions and deadlocks, which were often accompanied by the so-called self-DDoS - a situation where application clients uncontrollably tried to connect to the server and even more broke down broke it.


For example, if suddenly for some reason we could not process ping/pong messages, but the idle connection handler continued to disconnect such connections (assuming that the connections were closed incorrectly, so there is no data from them), it turned out that the client instead of to wait for events after connecting, lose connection every N seconds and try to connect again.


It would be great if a blocked or overloaded server simply stopped accepting new connections, and the balancer before it (for example, nginx) would move on to the next server instance.


Moreover, regardless of the server load, if suddenly all clients for any of the reasons want to send us a package, the previously saved 48 GB will be back in business - in fact, we will return to the original state of the gorutina and buffer for each connection.


3.3.1 Goroutine pool


We can limit the number of simultaneously processed packets using a gorutin pool. Here is the naive implementation of such a pool:


 package gpool func New(size int) *Pool { return &Pool{ work: make(chan func()), sem: make(chan struct{}, size), } } func (p *Pool) Schedule(task func()) error { select { case p.work <- task: case p.sem <- struct{}{}: go p.worker(task) } } func (p *Pool) worker(task func()) { defer func() { <-p.sem } for { task() task = <-p.work } } 

And now our code with netpoll takes the following form:


 pool := gpool.New(128) poller.Start(conn, netpoll.EventRead, func() { // We will block poller wait loop when // all pool workers are busy. pool.Schedule(func() { Receive(ch) }) }) 

That is, now we are reading the packet not immediately when data is found in the socket, but at the first opportunity to take a free gorutin in the pool.


Similarly, we will change the Send() :


 pool := gpool.New(128) func (ch *Channel) Send(p Packet) { if c.noWriterYet() { pool.Schedule(ch.writer) } ch.send <- p } 

Instead of go ch.writer() we want to make an entry in one of the reusable gorutin. Thus, in the case of a pool of N gorutins, we guarantee that with N requests processed simultaneously and N + 1 arriving, we will not allocate N + 1 read buffer. The gorutin pool also allows you to limit Accept() and Upgrade() new connections and avoid most situations with DDoS.


3.4. Zero-copy upgrade


Let's go a little bit towards the WebSocket protocol. As mentioned above, the client switches to the WebSocket protocol using the HTTP Upgrade request. Here's what it looks like:


 GET /ws HTTP/1.1 Host: mail.ru Connection: Upgrade Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA== Sec-Websocket-Version: 13 Upgrade: websocket HTTP/1.1 101 Switching Protocols Connection: Upgrade Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4= Upgrade: websocket 

That is, the HTTP request and its headers in our case are needed only to switch to the WebSocket protocol. This knowledge, as well as what is stored inside http.Request , suggests that, for optimization purposes, we could refuse unnecessary allocations and copyings when parsing an HTTP request and leave the standard net/http server.


http.Request contains, for example, a Header , which is unconditionally filled in with all request headers by copying data from the connection to the rows. Imagine how much extra data you can keep inside this field, for example, if the size of the Cookie large.

But what to take in return?


3.4.1. WebSocket implementations


Unfortunately, all the libraries that existed at the time of our server optimization made it possible to do an upgrade only when using a standard net/http server. Moreover, none (of the two) libraries allowed to apply all the read and write optimizations described above. In order for these optimizations to work, we need to have a sufficiently low-level API for working with WebSocket. To reuse buffers, we need the functions to work with the connection to look like this:


 func ReadFrame(io.Reader) (Frame, error) func WriteFrame(io.Writer, Frame) error 

Having a library with such an API, we could read the packets from the connection as follows (writing packets would look the same):


 // getReadBuf, putReadBuf are intended to // reuse *bufio.Reader (with sync.Pool for example). func getReadBuf(io.Reader) *bufio.Reader func putReadBuf(*bufio.Reader) // readPacket must be called when data could be read from conn. func readPacket(conn io.Reader) error { buf := getReadBuf() defer putReadBuf(buf) buf.Reset(conn) frame, _ := ReadFrame(buf) parsePacket(frame.Payload) //... } 

In short, it is time to gash your lib.


3.4.2. github.com/gobwas/ws


Ideologically, the ws library was written with the thought that it should not impose the logic of working with the protocol to the user. All read and write methods accept the standard io.Reader and io.Writer , which allow or not to use buffering, as well as any other wrappers around I / O.


In addition to the upgrade-requests from the standard net/http , ws supports zero-copy upgrade - processing upgrade-requests and switching to WebSocket without memory allocation and copying. ws.Upgrade() takes io.ReadWriter ( net.Conn implements this interface) net.Conn is, we could use standard net.Listen() and send the resulting connection from ln.Accept() immediately to ws.Upgrade() . At the same time, the library allows you to copy any request data for future use in the application (for example, a Cookie to check the session).


Below is a comparison of the upgrade-request processing: standard net/http - net.Listen() against net.Listen() and zero-copy upgrade:


 BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/op BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op 

Switching to ws and zero-copy upgrade allowed us to save another 24 GB - the ones that were allocated for I / O buffers when processing the request in the net/http handler.


3.5. Together


Let's structure the optimization that I talked about.



Something like this might look like the server code:


 import ( "net" "github.com/gobwas/ws" ) ln, _ := net.Listen("tcp", ":8080") for { // Try to accept incoming connection inside free pool worker. // If there no free workers for 1ms, do not accept anything and try later. // This will help us to prevent many self-ddos or out of resource limit cases. err := pool.ScheduleTimeout(time.Millisecond, func() { conn := ln.Accept() _ = ws.Upgrade(conn) // Wrap WebSocket connection with our Channel struct. // This will help us to handle/send our app's packets. ch := NewChannel(conn) // Wait for incoming bytes from connection. poller.Start(conn, netpoll.EventRead, func() { // Do not cross the resource limits. pool.Schedule(func() { // Read and handle incoming packet(s). ch.Recevie() }) }) }) if err != nil { time.Sleep(time.Millisecond) } } 

4. Conclusion


Premature optimization in programming. Donald knuth

Of course, the optimization above is not relevant in all cases. For example, if the ratio of free resources (memory, CPU) to the number of live connections is large enough, probably, there is no point in optimization. However, knowing where and what can be improved, I hope, will be useful.


Thanks for attention!


5. References



')

Source: https://habr.com/ru/post/331784/


All Articles