📜 ⬆️ ⬇️

Dive into Centrifugo

In the previous article about Centrifuge, I told you that the server was rewritten from Python to Go (Centrifugo code on github , description on opensource.mail.ru ). Several months have passed since then, during which time the Centrifuge managed to get version 1.0.0 and even go a little further (the latest version at the time of this post is 1.4.2).

In this article, we are waiting for a quick start working with Centrifuge, examples of real use, reflections on the location and purpose of the Centrifuge in the realities of 2016, a description of some architectural features / capabilities of the real-time server and Go code examples that are responsible for implementing the main features. Welcome aboard!

Let me remind you what a centrifuge is. This is the server that runs next to the backend of your application. Application users connect to the Centrifuge using the Websocket protocol or the SockJS polyfill library. Having connected and logged in using HMAC- token (received from the application backend), they subscribe to the channels of interest. Having learned about a new event, the backend of the application sends it to the desired channel in Centrifuge using the HTTP API or the queue in Redis. The centrifuge, in turn, sends a message to all connected interested users. Nothing fundamentally new: a lot of products are designed to solve real-time problems, some of them work in a similar way.

If you have heard of the pusher.com service, then in some approximation you can consider Centrifugo as a self-hosted counterpart.
')


That is, conceptually nothing has changed. Let's take a closer look at how the centrifuge works and how some of the associated problems are solved. But let's start with how the internal device of the server is presented to users-developers outside.

Fast start


Let us show a quick example of how to use Centrifuge to prototype simple real-time-ideas. In a real production case, everything is a bit more complicated and, for example, will require the generation of connection parameters on the backend side of your application, but now we will disable all sorts of checks to do without the backend. This will allow us to get a working version as soon as possible and to understand the main purpose of the Centrifuge.

The task will be simple: the user enters the page in the browser and sees the picture. With the help of the Centrifuge JavaScript client, we will subscribe to the screen-updates channel in the Centrifuge and wait for incoming messages from it. Next, we will send (publish) messages like this from the Centrifuge administrative web interface:

{ "image": "https://habrastorage.org/files/6b3/ae5/fcb/6b3ae5fcbeaf49c480baca60f88e7d40.jpg" } 

where image is the url of the image.

When this message is received from all users, the picture will automatically change to the one we sent in the message.

Download the latest release of Centrifuge for your operating system from here . At the time of this writing, this is version 1.4.2. Unpack the zip-archive and run the binary executable file:

 CENTRIFUGO_SECRET=secret ./centrifugo --insecure --insecure_admin --web 

Open http: // localhost: 8000 and see the web interface, from which we will send messages to clients using the form in the Actions tab.

By the way, you can install Centrifugo in another way. For example, use the Docker image or download the RPM / DEB package from packagecloud.io .

Create a `index.html` file with the contents in the new directory:

 <!doctype html> <html lang="en"> <head> <script src="//cdn.jsdelivr.net/sockjs/1.0/sockjs.min.js"></script> <script src="//rawgit.com/centrifugal/centrifuge-js/1.3.4/centrifuge.min.js"></script> </head> <body> <img id="img" src="//habrastorage.org/files/d8e/0f5/46c/d8e0f546cb374398a636f8b54f9fca54.jpg" width="500px" /> <script type="text/javascript"> var img = document.getElementById("img"); var centrifuge = new Centrifuge({ url: 'http://localhost:8000/connection', insecure: true }); centrifuge.subscribe('screen-updates', function (message) { img.src = message.data.image; }); centrifuge.connect(); </script> </body> </html> 

After that, we can start an HTTP server that provides this HTML page:

 python -m SimpleHTTPServer 3000 

and open several tabs with the address http: // localhost: 3000 in the browser.

In the Actions tab of the Centrifuge administrative web interface, we can write the name of the screen-updates channel in the channel, JSON field of the above type in the data field and publish it to the channel. For example:

 { "image": "https://habrastorage.org/files/6b3/ae5/fcb/6b3ae5fcbeaf49c480baca60f88e7d40.jpg" } 




On all open pages, the picture will change to the one you just sent. Here is a ready codepen example using the Centrifugo demoinstants on Heroku:



This is a slightly prettier version of the example above. You can go through the demoinstants on Heroku (password for entry: demo) and from the Actions tab send messages to the screen-updates channel, watching the result in the codepen windows. In this case, you do not have to install anything to get a general idea.

It was a quick start, and the result is rather primitive. But imagine what opportunities open up as soon as you transfer it to the realities of a live web application - they are limited only by imagination. In a real project, you will not send new messages manually from the interface, but using the Centrifuge API and client libraries for this API. Also, most likely, insecure flags and parameters that you may have observed above will not be used. A full description of all the features you will find in the documentation .

As I have already said, in our Mail.Ru Group the centrifuge is used on the intranet (an internal social network for company employees) - for likes, comments, employment status of negotiation, voting results and more. Now we have 700 users online at the same time and about 300 new messages are published per minute. This, of course, very little. CPU consumption - on average about 1%, memory - about 200 MB. There are cases (for example, after corporate mailings) when the number of published messages reaches 2 thousand per minute and the fan-out (that is, the number of users sent to the connection) reaches 300 thousand messages per minute. Again, nothing fantastic. Just such a case can be seen on the chart below:



Unfortunately, at the moment there are no properly made benchmarks. Centrifuges are not there, so you have to operate with figures from real projects - in our case, the Centrifugo load is insignificant.

There are several interesting applications in the Mail.Ru Group Centrifuge, which I would like to talk about.

First, this is a game for employees. Players get together in the same room, they are divided into teams at the touch of a button. Further, as soon as the teams are seated at the tables, the game itself begins. Players enter the site from their mobile devices - smartphones, tablets, laptops - receive questions using push messages and answer them. At the same time, the game statistics appears on the big screen (TV, image from the projector), and the presenter controls the game from his device: transfers it from round to round, sending players new questions and then the correct answers to them. If you heard about The Jackbox Party Games , then this is something similar.

My colleague is currently working on the project https://drawr.ru . This is a cooperative real-time game in which people can have fun by drawing and guessing what is shown in the pictures of others. The game in its internal architecture is a lot like the one I described in the previous paragraph - Django + Centrifugo under the hood. The centrifuge is responsible for sending notifications about changes in the game state to users and for in-game chat. You can try right now, but you need to find with whom: the minimum number of players is three.

Further, in our big office Mail.Ru a lot of doors. Doors open after applying a pass to the reader device. This allowed us to do some interesting things. For example, the so-called exit-poll - an employee enters a room and answers a question that instantly appears on the screen in front of him. In such an unobtrusive manner, the staff of our personnel department can collect the most diverse statistics and opinions of employees on certain issues. The interface on the screen is simply a browser tab, subscribed to a channel in the Centrifuge, through which data about aisles through the door arrive.

Our administrators from technical support at the entrance of the employee, using approximately the same scheme, show on the big screen information about the person and task in Jira, with whom he looked to them: it saves a few seconds with further communication.

One of the developers of the site from Alexa TOP 250 (if it’s very interesting which site it is, read Centrifugo chat on gitter.com ) in a test mode launched the Centrifuge for 50 thousand users online. At the same time, 6 thousand new messages per second are published (these are new, not fan-out messages). For client balancing, two m4.xlarge instances with a centrifuge on Amazon were used. This example proves that the overwhelming majority of Centrifuge productivity projects should suffice with head.

Server-side only push in 2016


In the example above, we used push clients from the server side. The centrifuge allows you to publish new messages directly from the client, bypassing the backend of the application, but still its main use is server-side only push: sending messages initiated by the server in one direction - from the server to the client.

Such a unidirectional scheme is sufficient for solving real-time problems in most applications, perhaps, except for dynamic multiplayer games and the like, which require the lowest possible response time and closer integration of the backend and real-time kernel.

Most modern applications are mostly read-only, new content is rarely created, so any backend should easily cope with new events generated by users of the application. For example, if a user added a comment in a web application, then you simply send it with an HTTP POST request (normal or AJAX) — process, validate, save to the database — and then send it to everyone. To do this, you do not need to organize communication between the user and the server through a two-way connection that web sockets provide. Meteor - and Derby-like frameworks are beautiful in their own way - but they are not the only tools for the simple implementation of real-time applications.

The significant advantage of Centrifuges: using it, you do not change the way new events are generated in your application. This is especially critical if the application backend is written in a language or framework that does not support working with a large number of persistent connections (PHP and many frameworks on it, Django, etc.).

Once again, I’ll emphasize the main point I’m trying to voice: in most cases, we don’t need bidirectional interaction between the client and the server through a permanent connection. All that is needed is a way to quickly deliver new content to users as soon as the application backend learns about its appearance.

Recently, the Django framework received the MOSS Grant . Judging by this news, soon we (Django users) are waiting for the adoption of the Channels project framework into the core. This will turn Django into a set of workers who, through channels, will communicate with the so-called interface servers through a broker. The server interface can serve client requests for a variety of protocols — HTTP, Websocket, etc. The concept is very beautiful and interesting, and among the features that will become possible are bidirectional client and backend communication on Django via web sockets. Special Websocket interface servers will be responsible for it. However, there are problems with this, when events come in a different order to different Django workers, you will need to synchronize them using locks in the broker (one of the possible brokers is Redis ). It is not clear what the innovations will result in in practice, but the appearance of locks in a separate broker is at least a controversial decision. Due to this, Django will occupy an additional niche in the market of frameworks and real-time decisions, but whether this is good given the above said, we will see.

Django is not the only example. I'm not a big connoisseur of what is happening in the world of Ruby on Rails, but judging by this article , “rails” will also soon receive (already received?) Support for bidirectional communication with the user via web sockets, and some developers are alarmed .

The funny thing is: as soon as the Internet has almost completely moved to browsers that support web sockets (after all, we waited for this, right?), There are doubts about how good the websockets are for the web in the beginning of HTTP / 2. Each tab opened in the browser is a new permanent client connection to the server. Why redundancy, if HTTP / 2 allows to multiplex all requests to the domain in different browser tabs through one connection? Already now, when a client and server are connected via HTTP / 2, the HTTP / 1.1 specification restriction on the number of connections to one domain is lost. You can open a large number of browser tabs, each of which uses an Eventsource or XHR-Streaming to deliver messages to the user, and not rest on a limit (usually five to six, depending on the browser).

With all this, it should be noted that Websocket as a protocol is very good and, importantly, allows you to connect to the server from a non-browser environment: clients for web sockets are available for almost all programming languages. The stated lack (use of separate connections when opening tabs) mainly concerns only websites, but there are other applications - desktop, mobile. It’s a pity that the draft on multiplexing web sockets via an HTTP / 2 connection didn’t bring to mind: apparently there were good reasons, I don’t know them - maybe someone has a hunch?

Unfortunately, now, at the beginning of 2016, we do not have the ideal transport for real-time communication . Anyway, all existing have a number of advantages and disadvantages. As with many things in programming, the choice of transport is trade-off. We will wait for new features, especially given the specification of PUSH PROMISE frames in HTTP / 2 (here, for example, the working draft for delivering notifications (online and offline) to browsers based on PUSH PROMISE, the implementation should appear in Firefox soon, and then in other browsers). Perhaps in some future Centrifuge will support this draft to send offline push messages to users of sites.

Anyway, Centrifuge allows you to forget about a headache with transports and deliver real-time messages to customers, even if your application’s backend is not suitable for this purpose (or not yet - Rails, Django from the examples above). At the same time, neither the code nor the philosophy of a working project needs to be changed - you can continue to use your favorite framework and assign responsibility for maintaining permanent connections and sending real-time messages to Centrifugo. Even if you are writing an application on Go or Node.js, you can focus on the core and core functionality of the application using Centrifuge for real-time messaging. Later, if you wish, you can easily abandon it in favor of your own project-integrated solution (write the processing of persistent connections yourself, for example, to Primus in the case of Node.js). As a transport, as I mentioned above, you can use “clean” web sockets or all available SockJS transports. In the near future, the Centrifuge will learn to communicate with clients over HTTP / 2 - thanks to the implementation of HTTP / 2, which is built into the standard Go 1.6 library. This will have a positive effect on connections that use SockJS HTTP transports (eventsource, xhr-streaming, etc.).

Do not forget about the quick example of a real-time application that we implemented above: you can prototype a real-time application without a backend (and with a backend) - for this there is a ready JavaScript client and server as a binary file without dependencies. In my slightly biased opinion, this is one of the most convenient and quickest solutions to start.

An interesting model can be obtained by using Centrifuge in conjunction with RethinkDB on the backend: by subscribing to changefeed documents, you can build applications that need to synchronize the data structure between the server and the backend — a la Firebase .

Internal organization


Now about what's inside. In previous articles, I wrote a lot about the capabilities of the Centrifuge, but not much about how it all works. Much has changed with the evolution of the project, including the language (Python -> Go). Now used approaches and tools are quite stable. Therefore, it seems the most time to talk about the internal structure of the server.

HTTP / Websocket server


In a simple approximation, Centrifugo is a Websocket / HTTP server. On a separate port, a standard Go HTTP server rises with registered functions - request handlers. Handlers can be divided into three categories:

  1. Client connections (Websocket or SockJS).
  2. HTTP API (posting messages, unloading metrics, etc.).
  3. Administrative resources required including the operation of the web interface.

Client connections are not short stateless HTTP requests, but persistent connections. In the case of Websocket connections and HTTP streaming transports (xhr-streaming, eventsource) that use SockJS when web sockets are unavailable, everything is quite clear. After the connection is established, we do not close it until the client is online (in fact, this is not quite true, as in the case of streaming transports, sometimes you still have to drop the client connection so that the Garbage Collector can clean up the response body that has grown to a certain size ). The blessing of Go allows you to work almost carelessly with a huge number of such connections. There is a small problem with polling transports, for example xhr-polling (aka long-polling), because when sending a message, connections with clients are broken and reinstalled again. It is necessary to store the client's session for some time, sufficient for reconnect. This is the server implementation of SockJS . And the client, in turn, is recommended to reconnect to the same server instance (this is assumed by the balancer - for example, a sticky session in Nginx ).

About HTTP API. If you want to post a new message to the channel, send a well-formed POST request to the url `/ api /`. I would like to note that every API request by default must be signed with a secret key that only the Centrifuge and your backend know. In most cases, this additional protection can be disabled, since requests to the API will most likely be sent from a specific IP - so firewall rules in production are enough. In this case, the request to the API is just a POST request with JSON containing the commands. Approximately this type:

 { “method”: “publish”, “params”: { “channel”: “updates”, “data”: { “text”: “hello world” } } } 


With regards to the administrative web interface, this is an endpoint that allows you to log in with a password, a handler that distributes static administrative web interface files and a websocket endpoint, through which the web interface receives real-time messages published to a particular channel (optional), and also executes commands (for example, publishes a message to the channel - what we saw in the example above).

Engine


Perhaps the most interesting under the hood Centrifuges are concentrated in the so-called Engine'ah.

However, before proceeding with the description of the work of the built-in Engine, let's look at the features of the Centrifuge, which are somehow tied to the Engine:



The first feature results in a problem: clients subscribed to the same channel can be connected to different instances, so when a message is posted to the channel, it must be delivered to both. Much the same for presence and history: each Centrifuge instance must have access to full channel information in order to provide it to the client in the event of a request.

Add to this the remark that the Centrifuge does not store anything in permanent storage. In the case of presence information, this is not a problem: we store as much data as there are now users in the channels. — ( ) — -- .

, , Engine Go-:

 type Engine interface { name() string run() error publish(chID ChannelID, msg []byte, opts *publishOpts) <-chan error subscribe(chID ChannelID) error unsubscribe(chID ChannelID) error channels() ([]ChannelID, error) addPresence(chID ChannelID, uid ConnID, info ClientInfo) error removePresence(chID ChannelID, uid ConnID) error presence(chID ChannelID) (map[ConnID]ClientInfo, error) history(chID ChannelID, opts historyOpts) ([]Message, error) } 

.

— Go- . , , . , - . , Go. , — . , , , — , Memory Engine Redis Engine.

Memory Engine


, - . Memory engine, , , .

Memory Engine' . Presence history . — — .

presence- : — , — .

, — . , . , Memory Engine'. , , string, ( string).

 type historyItem struct { messages []string expireAt int64 } func (i historyItem) expired() bool { return i.expireAt < time.Now().Unix() } type memoryHistoryHub struct { sync.RWMutex history map[string]historyItem queue priority.Queue nextCheck int64 } func newMemoryHistoryHub() *memoryHistoryHub { hub := &memoryHistoryHub{ history: make(map[string]historyItem), queue: priority.MakeQueue(), nextCheck: 0, } go hub.expire() return hub } 

queue — priority queue Go . .

, Push Pop. O(log(n)).

, priority queue — , generics Go.

, :

 func (h *memoryHistoryHub) add(channel string, message string, size, lifetime int) error { h.Lock() defer h.Unlock() _, ok := h.history[channel] expireAt := time.Now().Unix() + int64(lifetime) heap.Push(&h.queue, &priority.Item{Value: channel, Priority: expireAt}) if !ok { h.history[channel] = historyItem{ messages: []string{message}, expireAt: expireAt, } } else { messages := h.history[channel].messages messages = append([]string{message}, messages...) if len(messages) > size { messages = messages[0:size] } h.history[channel] = historyItem{ messages: messages, expireAt: expireAt, } } if h.nextCheck == 0 || h.nextCheck > expireAt { h.nextCheck = expireAt } return nil } 

, , size lifetime, — , , .

, , :

 func (h *memoryHistoryHub) get(channel string) ([]string, error) { h.RLock() defer h.RUnlock() hItem, ok := h.history[channel] if !ok { return []string{}, nil } if hItem.expired() { delete(h.history, channel) return []string{}, nil } return hItem.messages, nil } 

, — expire, :

 func (h *memoryHistoryHub) expire() { var nextCheck int64 for { time.Sleep(time.Second) h.Lock() if h.nextCheck == 0 || h.nextCheck > time.Now().Unix() { h.Unlock() continue } nextCheck = 0 for h.queue.Len() > 0 { item := heap.Pop(&h.queue).(*priority.Item) expireAt := item.Priority if expireAt > time.Now().Unix() { heap.Push(&h.queue, item) nextCheck = expireAt break } channel := item.Value hItem, ok := h.history[channel] if !ok { continue } if hItem.expireAt <= expireAt { delete(h.history, channel) } } h.nextCheck = nextCheck h.Unlock() } } 

, nextCheck — , .

, , . , , .

Redis Engine


— Engine , Redis presence history .

, , :

 centrifugo --config=config.json --engine=redis 


Redis Engine' Memory Engine':


, SUBSCRIBE, , . — (UNSUBSCRIBE). , , , .

O presence. — expiration-. EXPIRE. presence- — HASH- , «» HASH , . Redis Engine , — HASH'a SET'a. , / presence-, . Centrifugo Redigo Redis-, API .

presence- :

 func (e *RedisEngine) addPresence(ch string, uid string, info []byte) error { conn := e.pool.Get() defer conn.Close() expire := 60 expireAt := time.Now().Unix() + int64(expire) hashKey := “hash.” + channel setKey := “set.” + channel conn.Send("MULTI") conn.Send("ZADD", setKey, expireAt, uid) conn.Send("HSET", hashKey, uid, info) conn.Send("EXPIRE", setKey, expire) conn.Send("EXPIRE", hashKey, expire) _, err = conn.Do("EXEC") return err } 

( ) — a addPresence, , , 25 ( ) , . channel — , uid — ID , info — . HASH- SET-. HASH , , SET HASH'a. 60 HASH', .

presence-, , :

 func (e *RedisEngine) removePresence(channel string, uid string) error { conn := e.pool.Get() defer conn.Close() conn.Send("MULTI") conn.Send("HDEL", “hash.” + channel, uid) conn.Send("ZREM", ”set.” + channel , uid) _, err := conn.Do("EXEC") return err } 

, - presence- — presence, ( ):

 func (e *RedisEngine) presence(channel string) (map[string][]byte, error) { conn := e.pool.Get() defer conn.Close() now := time.Now().Unix() hashKey := “hash.” + channel setKey := “set.” + channel reply, _ := conn.Do("ZRANGEBYSCORE", setKey, 0, now) expiredKeys, _ := redis.Strings(reply, nil) if len(expiredKeys) > 0 { conn.Send("ZREMRANGEBYSCORE", setKey, 0, now) for _, key := range expiredKeys { conn.Send("HDEL", hashKey, key) } } reply, _ = conn.Do("HGETALL", hashKey) return replyToPresenceInfo(reply) } 

Centrifugo Lua- , EVALSHA . . presence- round-trip . Lua , .

Redis engine' — LPUSH, LTRIM, LRANGE EXPIRE.

, Redis Engine'a — () , . RPUSH , BLPOP, . , : , , , . , .

, , , . , . , , , , - :

 crc16(CHANNEL_NAME) mod N 

N — , .

, HTTP API, Redis engine. .

Redis — . , , -, . . (. ). — Redis Sentinel. — Haproxy , twemproxy , codis . PaaS- High-Availability (HA) Redis .

1.4.2 Sentinel — Sentinel Sentinel-. , twemproxy codis, PUBSUB-. C haproxy — .

, , Redis Engine, — . ( , , , , ) Redis, PUBLISH . , , Redis pipeline, . . - (RTT) Redis', — latency . , , , ( ) . , , latency . . . Redis. . Redis' . , , , :

 func fillPublishBatch(ch chan *pubRequest, prs *[]*pubRequest) { for len(*prs) < RedisPublishBatchLimit { select { case pr := <-ch: *prs = append(*prs, pr) default: return } } } func (e *RedisEngine) runPublishPipeline() { var prs []*pubRequest for { pr := <-e.pubCh prs = append(prs, pr) fillPublishBatch(e.pubCh, &prs) conn := e.pool.Get() for i := range prs { conn.Send("PUBLISH", prs[i].channel, prs[i].message) } _ := conn.Flush() for i := range prs { _, _ := conn.Receive() } conn.Close() prs = nil } } 

Lua- .


— PUB/SUB-, Go . — , ( ), . , - . .

, . , Go- , , -, ( , ), -, . , , , . . , :

 select { case client.messageCh <- message: return nil default: return ErrChannelFull } 

. , ( ).

, Centrifugo, (ID , JSON-), HMAC SHA-256 . . , — Javascript- .


— -, . — , , public:news, Centrifugo public . , . , : , . , . :



, . presence-, . PUB/SUB.

, , real-time , (, ).


, , , , (best effort). (at most once). -- — .

, . PUB/SUB- , , , .

Centrifugo . API- , . , API .

Conclusion


, , Centrifugo — Mail.Ru Group, . , — -, . , . — ( MIT). , pusher.com 20 ( fan-out ), 10 . 499$ ( 2016-, https://pusher.com/pricing ). , 10 . 500 M .

, c , . Centrifugo . , . , .



, - : , iOS Android. (, Android ), . Go . , gomobile — , Go .

References:


PS Paul Banks , . .

Source: https://habr.com/ru/post/280346/


All Articles