📜 ⬆️ ⬇️

Processing 1 million requests per minute with Go

I offer the readers of Habrakhabr a translation of the article by the chief architect of Malwarebytes about how they reached the processing of 1 million requests per minute on just 4 servers.

In Malwarebytes, we are experiencing wild growth and since I joined the company about a year ago in Silicon Valley, one of my main responsibilities was designing and developing architectures of several systems for developing a fast-growing company and all the necessary infrastructure to support the product. used by millions of people every day. I have been working in the antivirus industry for more than 12 years in several different companies, and I know how complex these systems are, as a result, because of the enormous amounts of data that I have to deal with daily.

What is interesting is that for the last 9 years or so, all the web-side development I've come across has been done on Ruby on Rails. Don't get me wrong, I love Ruby on Rails and I believe that this is a great environment, but after a while you get used to thinking about developing Ruby-style systems, and you forget how effective and simple your architecture could be if you enabled multi-threading, concurrency, fast execution and efficient use of memory. For many years I have written in C / C ++, Delphi and C #, and I began to realize how much less complicated things can be if you have chosen the right tool for your business.

As Chief Architect, I am not a fan of holivars about languages ​​and frameworks that are so popular on the web. I believe that the efficiency, productivity and maintainability of the code depend mainly on how simple you can build your solution.
')

Problem


Working on one of the parts of our anonymous telemetry and analytics collection system, we were faced with the task of processing a huge number of POST requests from millions of clients. The web handler was supposed to receive a JSON document, which may contain a collection of data (payload), which, in turn, must be saved on Amazon S3 so that our map-reduce systems will later process this data.

Traditionally, we would look in the direction of the worker-tier architecture, and use such things as:

And they would set up 2 different clusters, one for the web front end, and another for the workers, so that the background tasks could be scaled.

But from the very beginning our team knew that we had to write this on Go, because at the discussion stage we already understood that this system would have to cope with huge traffic. I used Go for about 2 years, and we developed several systems on it, but none of them worked so far with such loads.

We started by creating several structures to describe the request data that will be accepted in POST requests, and the method for uploading them to our S3 batch.

type PayloadCollection struct { WindowsVersion string `json:"version"` Token string `json:"token"` Payloads []Payload `json:"data"` } type Payload struct { // [redacted] } func (p *Payload) UploadToS3() error { // the storageFolder method ensures that there are no name collision in // case we get same timestamp in the key name storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano()) bucket := S3Bucket b := new(bytes.Buffer) encodeErr := json.NewEncoder(b).Encode(payload) if encodeErr != nil { return encodeErr } // Everything we post to the S3 bucket should be marked 'private' var acl = s3.Private var contentType = "application/octet-stream" return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{}) } 

Go ahead with Go-rutin


Initially, we took the simplest, naive solution of a POST handler, just trying to parallelize processing using a simple go-routine:

 func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { go payload.UploadToS3() // <----- DON'T DO THIS } w.WriteHeader(http.StatusOK) } 

For average loads, this approach will work for most people, but it quickly proved to be ineffective on a larger scale. We expected that there would be a lot of requests, but when we rolled out the first version in production, we realized that we were wrong by orders of magnitude. We completely underestimated the amount of traffic.

The approach above is bad for several reasons. There is no way to control how much gorutin we run. And since we received 1 million POST requests per minute, this code, of course, quickly fell and crashed.

Try again


We had to find another way. From the very beginning, we discussed that we need to reduce the request processing time to a minimum and do hard tasks in the background. Of course, this is how you should do it in the world of Ruby on Rails, otherwise you will block all available web handlers, and it doesn't matter if you use puma, unicorn or passenger (But let's not discuss JRuby here, please). So we would have to use conventional solutions for such tasks, such as Resque, Sidekiq, SQS, etc ... This list is large, as there are many ways to solve our problem.

And our second attempt was to create a buffered channel in which we could put a queue of tasks and load them onto S3, and since we can control the maximum number of objects in our queue, and we have a lot of RAM to keep everything in memory, we decided that it would be enough just to buffer the tasks in the queue channel.

 var Queue chan Payload func init() { Queue = make(chan Payload, MAX_QUEUE) } func payloadHandler(w http.ResponseWriter, r *http.Request) { ... // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { Queue <- payload } ... } 

And then, actually, in order to read tasks from the queue and process them, we used something similar to this code:

 func StartProcessor() { for { select { case job := <-Queue: job.payload.UploadToS3() // <-- STILL NOT GOOD } } } 

Honestly, I have no idea what we were thinking then. This, apparently, was late at night, with a bunch of drunk Red-Bull. This approach did not give us any gain, we simply exchanged bad concurrency for a buffered channel and this simply postponed the problem. Our synchronous queue handler loaded only one batch of data on S3 per unit of time, and since the frequency of incoming requests was much more than the handler's ability to load them on S3, our buffered channel reached its limit very quickly and blocked the ability to add new tasks to the queue.

We silently ignored the problem and started the countdown of the collapse of our system. The response time (latency) increased incrementally within a few minutes after we had fixed this buggy version.



The best decision


We decided to use the popular pattern of working with channels in Go to create a two-tier system of channels, one to work with the queue of channels, the other to control the number of task handlers working with the queue at the same time.

The idea was to parallelize the load on S3, controlling this process, so as not to overload the machine and not rest on the connection errors with S3. Therefore, we chose the Job / Worker pattern. For those familiar with Java, C #, etc, consider this the Go-way to implement the Worker Thread-Pool using channels.

 var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") ) // Job represents the job to be run type Job struct { Payload Payload } // A buffered channel that we can send work requests on. var JobQueue chan Job // Worker represents the worker that executes the job type Worker struct { WorkerPool chan chan Job JobChannel chan Job quit chan bool } func NewWorker(workerPool chan chan Job) Worker { return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool)} } // Start method starts the run loop for the worker, listening for a quit channel in // case we need to stop it func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel select { case job := <-w.JobChannel: // we have received a work request. if err := job.Payload.UploadToS3(); err != nil { log.Errorf("Error uploading to S3: %s", err.Error()) } case <-w.quit: // we have received a signal to stop return } } }() } // Stop signals the worker to stop listening for work requests. func (w Worker) Stop() { go func() { w.quit <- true }() } 

We changed our request handler to create a Job object with the data, and send it to the JobQueue channel to be picked up by task handlers.

 func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { // let's create a job with the payload work := Job{Payload: payload} // Push the work onto the queue. JobQueue <- work } w.WriteHeader(http.StatusOK) } 

During server initialization, we create a Dispatcher and call Run () to create a pool of workers and start listening to incoming tasks in JobQueue.

 dispatcher := NewDispatcher(MaxWorker) dispatcher.Run() 

Below is our implementation code for the dispatcher:

 type Dispatcher struct { // A pool of workers channels that are registered with the dispatcher WorkerPool chan chan Job } func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool} } func (d *Dispatcher) Run() { // starting n number of workers for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(d.pool) worker.Start() } go d.dispatch() } func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: // a job request has been received go func(job Job) { // try to obtain a worker job channel that is available. // this will block until a worker is idle jobChannel := <-d.WorkerPool // dispatch the job to the worker job channel jobChannel <- job }(job) } } } 

Notice that we specify the number of handlers that will be launched and added to the pool. Since we used Amazon Elasticbeanstalk for this project and the go-environment that was verified, and we always tried to follow the twelve -factor methodology to configure our systems in production, we read these values ​​from the environment variables. Thus, we can control the number of processors and the maximum queue size in order to quickly tune these parameters without redrawing the entire cluster.

 var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") ) 

Instant result


Immediately after we had grasped the last decision, we saw that the response time had fallen to insignificant numbers and our ability to process requests increased dramatically.



A few minutes after warming up Elastic Load Balancers, we saw that our ElasticBeanstalk application processes about 1 million requests per minute. We usually have a few hours in the morning when traffic peaks reach over 1 million requests per minute.

As soon as we hit the new code, the number of required servers dropped significantly, from 100 to about 20 servers.



After we configured our cluster and autoscaling settings, we were able to reduce their number even more - to 4 EC c4.large instances and Elastic Auto-Scaling launched a new instance if CPU usage exceeded 90% within 5 minutes .



findings


I am deeply convinced that simplicity always wins. We could create a complex system with a bunch of queues, background processes, complex deployments, but instead of all this we decided to use the power of auto-scaling ElasticBeanstalk and the efficiency and simplicity of the approach to competitiveness that Golang gives out of the box.

Not every day you see a cluster of only 4 machines that are even weaker than my current Macbook Pro, processing POST requests, writing to Amazon S3 bakery 1 million times every minute.

There is always the right tool for the task. And for those cases where your Ruby on Rails system needs a more powerful web handler, exit the ruby ​​ecosystem a bit for simpler, more powerful solutions.

Source: https://habr.com/ru/post/262045/


All Articles