📜 ⬆️ ⬇️

How Discord processes more than 1,000,000 push requests per minute with Elixir GenStage


Discord

Discord experienced an unprecedented growth. To cope with it, our development department got a pleasant problem - to look for a way to scale back-end services.

In this case, we have achieved great success with the help of one technology called Elixir GenStage.
')

The Perfect Storm: Overwatch and Pokémon GO


This summer, our system of mobile push-notifications began to creak from the load. Chat / r / Overwatch passed for 25,000 simultaneous users, and the Pokémon GO chat groups were everywhere, so that sudden spikes in the flow of notifications became a serious problem.

Bursts of the flow of notifications inhibit the entire system of push-notifications, and sometimes put it. Push notifications either come late, or do not come at all.

GenStage comes to the rescue


After a small investigation, we found out that the main bottleneck was sending push notifications to Google Firebase Cloud Messaging.

We realized that we can immediately improve throughput if we send push requests to Firebase via XMPP, and not via HTTP.

Firebase XMPP is slightly more complicated than HTTP. Firebase requires that each XMPP connection at a time does not have more than 100 requests in a queue. If 100 requests have flown away from you, then you should wait until Firebase acknowledges receipt of the request before sending the next one.

Since only 100 requests are allowed in the queue at a time, we had to design a new system so that XMPP connections would not overflow during requests bursts.

At first glance, it seemed that GenStage would be the perfect solution to the problem.

Genstage


What is GenStage?

GenStage is a new mode (Elixir) for exchanging events under back pressure between Elixir processes. [ 0 ]

What does this really mean? Essentially, this mode gives you the tools you need so that not a single part of your system is overloaded.

In practice, a system with GenStage modes usually has several stages.

The stages are the computational steps that send and / or receive data from other stages.

When a stage sends data, it acts as a manufacturer. When receives data, then as a consumer. Stages can play the roles of both the producer and the consumer at the same time.

In addition to assigning producer and consumer roles, a stage can be designated as a “source” (source) if it only produces elements, or it can be designated as a “drain” (sink) if it only consumes them. [ 1 ]

An approach




We divided the system into two stages of GenStage. One source and one drain.


Back pressure and load shedding with GenStage


GenStage has two key functions that help us during the surge in requests: back pressure (pressure) and load shedding.

Back pressure


Pusher uses the GenStage functionality to request Push Collector the maximum number of requests that Pusher can process. This guarantees an upper bound for the number of push requests that are pending. When Firebase confirms the request, then Pusher also requires Push Collector.

Pusher knows the exact number of requests that the Firebase XMPP connection can handle, and never requires too much. But Push Collector never sends a request to Pusher, if he did not ask.

Load shedding


Since the Pushers put back pressure on the Push Collector, a potential bottleneck appears in the Push Collector. Super-duper powerful bursts can overload it.

GenStage has another built-in function for these situations: buffered events.

In Push Collector, we determine how many push requests to buffer. In the normal state, the buffer is empty, but once a month, when catastrophic events occur, it comes in handy.

If a lot of events pass through the system, and the buffer is filled, then Push Collector discards incoming push requests. This happens by itself simply by specifying the buffer_size option in the init function of Push Collector.

With these two functions, we are able to cope with bursts of push notifications.

Code (finally, the most important part)


Below is a sample code of how we set up the Pusher and Push Collector stages. For simplicity, we have removed many fragments that are responsible for handling failures when a connection is lost, Firebase returns errors, etc.

You can skip the code if you want to see the result.

Push Collector (manufacturer)


push_collector.ex

 defmodule GCM.PushCollector do use GenStage # Client def push(pid, push_requests) do GenServer.cast(pid, {:push, push_requests}) end # Server def init(_args) do # Run as producer and specify the max amount # of push requests to buffer. {:producer, :ok, buffer_size: @max_buffer_size} end def handle_cast({:push, push_requests}, state) do # Dispatch the push_requests as events. # These will be buffered if there are no consumers ready. {:noreply, push_requests, state} end def handle_demand(_demand, state) do # Do nothing. Events will be dispatched as-is. {:noreply, [], state} end end 

Pusher (consumer)


pusher.ex

 defmodule GCM.Pusher do use GenStage # The maximum number of requests Firebase allows at once per XMPP connection @max_demand 100 defstruct [ :producer, :producer_from, :fcm_conn_pid, :pending_requests, ] def start_link(producer, fcm_conn_pid, opts \\ []) do GenStage.start_link(__MODULE__, {producer, fcm_conn_pid}, opts) end def init({producer, fcm_conn_pid}) do state = %__MODULE__{ next_id: 1, pending_requests: Map.new, producer: producer, fcm_conn_pid: fcm_conn_pid, } send(self, :init) # Run as consumer {:consumer, state} end def handle_info(:init, %{producer: producer}=state) do # Subscribe to the Push Collector GenStage.async_subscribe(self, to: producer, cancel: :temporary) {:noreply, [], state} end def handle_subscribe(:producer, _opts, from, state) do # Start demanding requests now that we are subscribed GenStage.ask(from, @max_demand) {:manual, %{state | producer_from: from}} end def handle_events(push_requests, _from, state) do # We got some push requests from the Push Collector. # Let's send them. state = Enum.reduce(push_requests, state, &do_send/2) {:noreply, [], state} end # Send the message to FCM, track as a pending request defp do_send(%{fcm_conn_pid: fcm_conn_pid, pending_requests: pending_requests}=state, push_request) do {message_id, state} = generate_id(state) xml = PushRequest.to_xml(push_request, message_id) :ok = FCM.Connection.send(fcm_conn_pid, xml) pending_requests = Map.put(pending_requests, message_id, push_request) %{state | pending_requests: pending_requests} end # FCM response handling defp handle_response(%{message_id: message_id}=response, %{pending_requests: pending_requests, producer_from: producer_from}=state) do {push_request, pending_requests} = Map.pop(pending_requests, message_id) # Since we finished a request, ask the Push Collector for more. GenStage.ask(producer_from, 1) %{state | pending_requests: pending_requests} end defp generate_id(%{next_id: next_id}=state) do {to_string(next_id), %{state | next_id: next_id + 1}} end end 

Example incident

The following is a real incident that the system encountered. The upper graph shows the number of push requests per second passing through the system. On the lower graph - the number of push requests placed in the Push Collector buffer.





Chronicle of events:


Success elixir


We at Discord are very pleased with the use of Elixir and Erlang as a key technology on our backend services. It's nice to see extensions like GenStage, which rely on the unbreakable Erlang / OTP technologies.

We are looking for a brave spirit to help solve such problems as Discord continues to grow. If you like games and these kinds of tasks make your heart beat faster, check out our jobs .

Source: https://habr.com/ru/post/317724/


All Articles