📜 ⬆️ ⬇️

Go load test, version 2

Do not reach the hands to rewrite the go-meter . Increase productivity, gain more control over the process and bring it closer to wrk . Ideally, you want to see an easily and conveniently expandable alternative. Yes, wrk recently introduced support for Lua scripts that solve many inconveniences, but there are also some unpleasant nuances, for example, it will not be possible to collect advanced statistics, because the statistics output methods work only on the first stream and on the collected data on other access streams no, so it comes back again to the fact that to understand the source code and do it for yourself, but this is not a trivial task. And so, we are preparing a load test for Go, with buns. Who cares, I ask under the cat.

What is and what is needed

From the beginning we will understand what we need:
- sending GET / POST / PUT / DELETE requests
- brute force URL and POST body
- control over open connections
- flow control
- indication of test duration
- restrictions on the maximum number of requests per second
- the ability to exclude the first few seconds from the statistics to avoid distortion when the HTTP server is warming up

Plan

- connection pool
- simple Request / Response
- statistics
- profit
thinking out loud
If you need to control connections, the standard http.Client does not suit us (and it’s a big one for this task), it knows too much because of what performance suffers. Since we have several worker threads meant for sending requests, we need a pool of connections that they will share with each other. It does not make sense for a walker to wait for a response from the server, we just lose precious time on it. How to evaluate passing traffic? Standard http.Request, http.Respose do not allow such information, it will not work, it means you need to implement a simple Request / Response, which will give us everything we need. Collect raw data and at the end of their aggregate fail, because the memory is not rubber. We collect the statue on the fly.


Go


The pool of connections is written on the basis of a limited channel. It will look like a simple pool of objects, took an object from the channel, worked, put it back.
type Connection struct { conn net.Conn manager *ConnectionManager } type ConnectionManager struct { conns chan *Connection config *Config } func NewConnectionManager(config *Config) (result *ConnectionManager) { result = &ConnectionManager{config: config, conns: make(chan *Connection, config.Connections)} for i := 0; i < config.Connections; i++ { connection := &Connection{manager: result} if connection.Dial() != nil { ConnectionErrors++ } result.conns <- connection } return } func (this *ConnectionManager) Get() *Connection { return <-this.conns } func (this *Connection) Dial() error { if this.IsConnected() { this.Disconnect() } conn, err := net.Dial("tcp4", this.manager.config.Url.Host) if err == nil { this.conn = conn } return err } func (this *Connection) Disconnect() { this.conn.Close() this.conn = nil } func (this *Connection) IsConnected() bool { return this.conn != nil } func (this *Connection) Return() { this.manager.conns <- this } 

Request / Response here you can read the source Go, see how it is implemented there, and make a simplified analogy, the main difference will be the ability to get the traffic volume of each request / response and save valuable time.
Request
 type Request struct { Method string URL *url.URL Header map[string][]string Body io.Reader ContentLength int64 Host string BufferSize int64 } func (req *Request) Write(w io.Writer) error { bw := &bytes.Buffer{} fmt.Fprintf(bw, "%s %s HTTP/1.1\r\n", valueOrDefault(req.Method, "GET"), req.URL.RequestURI()) fmt.Fprintf(bw, "Host: %s\r\n", req.Host) userAgent := "" if req.Header != nil { if ua := req.Header["User-Agent"]; len(ua) > 0 { userAgent = ua[0] } } if userAgent != "" { fmt.Fprintf(bw, "User-Agent: %s\r\n", userAgent) } if req.Method == "POST" || req.Method == "PUT" { fmt.Fprintf(bw, "Content-Length: %d\r\n", req.ContentLength) } if req.Header != nil { for key, values := range req.Header { if key == "User-Agent" || key == "Content-Length" || key == "Host" { continue } for _, value := range values { fmt.Fprintf(bw, "%s: %s\r\n", key, value) } } } io.WriteString(bw, "\r\n") if req.Method == "POST" || req.Method == "PUT" { bodyReader := bufio.NewReader(req.Body) _, err := bodyReader.WriteTo(bw) if err != nil { return err } } req.BufferSize = int64(bw.Len()) _, err := bw.WriteTo(w) return err } 


Response
 type Response struct { Status string StatusCode int Header map[string][]string ContentLength int64 BufferSize int64 } func ReadResponse(r *bufio.Reader) (*Response, error) { tp := textproto.NewReader(r) resp := &Response{} line, err := tp.ReadLine() if err != nil { return nil, err } f := strings.SplitN(line, " ", 3) resp.BufferSize += int64(len(f) + 2) if len(f) < 2 { return nil, errors.New("Response Header ERROR") } reasonPhrase := "" if len(f) > 2 { reasonPhrase = f[2] } resp.Status = f[1] + " " + reasonPhrase resp.StatusCode, err = strconv.Atoi(f[1]) if err != nil { return nil, errors.New("malformed HTTP status code") } resp.Header = make(map[string][]string) for { line, err := tp.ReadLine() if err != nil { return nil, errors.New("Response Header ERROR") } resp.BufferSize += int64(len(line) + 2) if len(line) == 0 { break } else { f := strings.SplitN(line, ":", 2) resp.Header[f[0]] = append(resp.Header[strings.TrimSpace(f[0])], strings.TrimSpace(f[1])) } } if cl := resp.Header["Content-Length"]; len(cl) > 0 { i, err := strconv.ParseInt(cl[0], 10, 0) if err == nil { resp.ContentLength = i } } buff := make([]byte, resp.ContentLength) r.Read(buff) resp.BufferSize += int64(resp.ContentLength) return resp, nil } 


In order for our threads to turn off when the testing time is over, let's make a channel for shutting down threads and a channel through which each thread will report that it has completed its work correctly.
 WorkerQuit := make(chan bool, *_threads) WorkerQuited := make(chan bool, *_threads) 

check the time, and also wait for Ctr + C (SIGTERM) so that our application can complete the testing at any time
 //Start Ctr+C listen signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) //Wait timers or SIGTERM select { case <-time.After(config.Duration): case <-signalChan: } for i := 0; i < config.Threads; i++ { config.WorkerQuit <- true } //Wait for threads complete for i := 0; i < config.Threads; i++ { <-config.WorkerQuited } 

Now let's take a look at the worker itself: to limit the number of requests per second, we take for each its share of the total number, 4 times per second, we will increment the counter and wait for either the connection to be released or shutdown
 func NewThread(config *Config) { timerAllow := time.NewTicker(time.Duration(250) * time.Millisecond) allow := int32(config.MRQ / 4 / config.Threads) if config.MRQ == -1 { allow = 2147483647 } else if allow <= 0 { allow = 1 } var connectionErrors int32 = 0 currentAllow := allow for { select { //        case <-timerAllow.C: currentAllow = allow //   case connection := <-config.ConnectionManager.conns: currentAllow-- //    -     if currentAllow < 0 { connection.Return() } else { //  req := getRequest(config.Method, config.Url, config.Source.GetNext()) //      if config.Reconnect && connection.IsConnected() { connection.Disconnect() } //  ,     if !connection.IsConnected() { if connection.Dial() != nil { connectionErrors++ } } //    ,    if connection.IsConnected() { go writeSocket(connection, req, config.RequestStats) } else { connection.Return() } } //  case <-config.WorkerQuit: //    atomic.AddInt32(&ConnectionErrors, connectionErrors) //  config.WorkerQuited <- true return } } } 

As soon as the connection is released, we form the next request and start sending it asynchronously, so in a circle until the time runs out. After the request is sent and the response is read, the connection is returned to the pool, and the thread picks it up again.
Submit request
 func writeSocket(connection *Connection, req *http.Request, read chan *RequestStats) { result := &RequestStats{} //          defer func() { connection.Return() read <- result }() now := time.Now() conn := connection.conn bw := bufio.NewWriter(conn) //  err := req.Write(bw) if err != nil { result.WriteError = err return } err = bw.Flush() if err != nil { result.WriteError = err return } //  res, err := http.ReadResponse(bufio.NewReader(conn)) if err != nil { result.ReadError = err return } //   result.Duration = time.Now().Sub(now) result.NetOut = req.BufferSize result.NetIn = res.BufferSize result.ResponseCode = res.StatusCode req.Body = nil } 


It remains the case for small, to collect statistics from the objects of RequestStats and arrange it
 //  type StatsSource struct { Readed int64 Writed int64 Requests int Skiped int Min time.Duration Max time.Duration Sum int64 Codes map[int]int DurationPercent map[time.Duration]int ReadErrors int WriteErrors int Work time.Duration } //    type StatsSourcePerSecond struct { Readed int64 Writed int64 Requests int Skiped int Sum int64 } //  func StartStatsAggregator(config *Config) { allowStore := true allowStoreTime := time.After(config.ExcludeSeconds) if config.ExcludeSeconds.Seconds() > 0 { allowStore = false } verboseTimer := time.NewTicker(time.Duration(1) * time.Second) if config.Verbose { fmt.Printf("%s %s %s %s %s %s\n", newSpancesFormatRightf("Second", 10, "%s"), newSpancesFormatRightf("Total", 10, "%s"), newSpancesFormatRightf("Req/sec", 10, "%s"), newSpancesFormatRightf("Avg/sec", 10, "%s"), newSpancesFormatRightf("In/sec", 10, "%s"), newSpancesFormatRightf("Out/sec", 10, "%s"), ) } else { verboseTimer.Stop() } source = StatsSource{ Codes: make(map[int]int), DurationPercent: make(map[time.Duration]int), } perSecond := StatsSourcePerSecond{} start := time.Now() for { select { //    case <-verboseTimer.C: if perSecond.Requests-perSecond.Skiped > 0 && config.Verbose { //    avgMilliseconds := perSecond.Sum / int64(perSecond.Requests-perSecond.Skiped) avg := time.Duration(avgMilliseconds) * time.Millisecond //  fmt.Printf("%s %s %s %s %s %s\n", newSpancesFormatRightf(roundToSecondDuration(time.Now().Sub(start)), 10, "%v"), newSpancesFormatRightf(source.Requests, 10, "%d"), newSpancesFormatRightf(perSecond.Requests, 10, "%d"), newSpancesFormatRightf(avg, 10, "%v"), newSpancesFormatRightf(Bites(perSecond.Readed), 10, "%s"), newSpancesFormatRightf(Bites(perSecond.Writed), 10, "%s"), ) } //  perSecond = StatsSourcePerSecond{} //          case <-allowStoreTime: allowStore = true //    case res := <-config.RequestStats: //   -   ,      if res.ReadError != nil { source.ReadErrors++ continue } else if res.WriteError != nil { source.WriteErrors++ continue } //  source.Requests++ perSecond.Requests++ perSecond.Readed += res.NetIn perSecond.Writed += res.NetOut source.Readed += res.NetIn source.Writed += res.NetOut //      HTTP  source.Codes[res.ResponseCode]++ if !allowStore { perSecond.Skiped++ source.Skiped++ continue } //    sum := int64(res.Duration.Seconds() * 1000) source.Sum += sum perSecond.Sum += sum //     if source.Min > res.Duration { source.Min = roundDuration(res.Duration) } if source.Max < res.Duration { source.Max = roundDuration(res.Duration) } //        10  duration := time.Duration(res.Duration.Nanoseconds()/10000000) * time.Millisecond * 10 source.DurationPercent[duration]++ //   case <-config.StatsQuit: //    source.Work = time.Duration(time.Now().Sub(start).Seconds()*1000) * time.Millisecond if config.Verbose { s := "" for { if len(s) >= 61 { break } s += "-" } fmt.Println(s) } //  config.StatsQuit <- true return } } } 

')
Summing up

I will omit how to parse the launch arguments and format the statistics output, since this is not interesting. And now let's check what we got. For the sample, set the wrk on the Node.js cluster
 % ./wrk -c 21 -t 7 -d 30s -L http://localhost:3001/index.html Running 30s test @ http://localhost:3001/index.html 7 threads and 21 connections Thread Stats Avg Stdev Max +/- Stdev Latency 1.09ms 6.55ms 152.07ms 99.63% Req/Sec 5.20k 3.08k 14.33k 58.75% Latency Distribution 50% 490.00us 75% 0.89ms 90% 1.83ms 99% 5.04ms 1031636 requests in 30.00s, 153.48MB read Requests/sec: 34388.25 Transfer/sec: 5.12MB 

and the same on go with GOMAXPROCS = 1
 % ./go-meter -t 7 -c 21 -d 30s -u http://localhost:3001/index.html Running test threads: 7, connections: 21 in 30s GET http://localhost:3001/index.html Stats: Min Avg Max Latency 0 0 83ms 843183 requests in 30s, net: in 103MB, out 62MB HTTP Codes: 200 100.00% Latency: 0 99.99% 10ms - 80ms 0.01% Requests: 28106.10/sec Net In: 27MBit/sec Net Out: 17MBit/sec Transfer: 5.5MB/sec 

We get 28106 against 34388 requests per second - this is about 20% less compared to pure C + event loop + nio. Pretty good, when changing GOMAXPROCS, there is almost no difference, since Node.js selects most of the CPU time.
Minuses:
- loss of 20% performance, you can try to simplify Request / Response, can give a little performance
- not yet support HTTPS
- it is still impossible to specify custom HTTP headers and timeout

All source code here - Github

How to use
 % go get github.com/a696385/go-meter % $GOPATH/bin/go-meter -h 


Thanks for attention!

Source: https://habr.com/ru/post/203328/


All Articles