I offer you a translation of the article by Gary Willoughby “Interesting ways of using the go channels” .This article is intended for those who are already a little versed in Go.
')
Interesting ways to use go channels
I wrote this post to document the John Graham-Cumming Channels report on Go at the 2014 GopherCon conference. The report was called “A Quick Guide to Channels” and is available for viewing on
youtube.com .
Throughout the report, we are presented with interesting ways to use Go channels and reveal the possibilities and advantages of competitive programming. Personally, this report opened my eyes to several new ways of structuring programs and new techniques for synchronizing across several processor cores.
The following examples demonstrate different techniques for how to use feeds in Go. The code has been specially simplified for their understanding. You should not use it for production versions. For example, all error handling is missing.
Signals
Waiting for event
In this example, the gorutin starts, does some work (in this case waits 5 seconds), then closes the channel. Unbuffered channels always stop the execution of the current gorutine until a message arrives. The closure of the channel signals the gorutina that it can continue its execution, because no more data can be obtained. Closed channels never stop the execution of gorutiny.
package main import ( "fmt" "time" ) func main() { c := make(chan bool) go func() {
Coordination of several gorutin
In this example, a hundred gorutin is started, waiting for data transmission through the start channel (or its closing). In the case when it is closed, all the gorutiny will start.
package main func worker(start chan bool) { <-start
Coordinated termination of workers
In this example, a hundred gorutin is launched, waiting for data transmission through the die channel (or its closing). In the case when it will be closed, all the gorutines will stop executing.
package main func worker(die chan bool) { for { select { // ... - case case <-die: return } } } func main() { die := make(chan bool) for i := 0; i < 100; i++ { go worker(die) } // worker'. close(die) }
Check for worker stopping
In this example, the Gorutin is launched, waiting for data transmission from the die channel (or its closure). In the case when the signal arrives, the gorutina will perform the final actions and send a signal to the main function (through the same die channel) that it is complete.
package main func worker(die chan bool) { for { select { // ... - case case <-die: // ... . die <- true return } } } func main() { die := make(chan bool) go worker(die) die <- true // , <-die }
Encapsulate state
Unique Service ID
In this example, the gorutin is run to generate unique hex id's. Each id is sent through the channel id and the gorutin is suspended until the message from the channel is read. Each time a channel is read, the horoutine increments the counter and sends its value.
package main import "fmt" func main() { id := make(chan string) go func() { var counter int64 = 1 for { id <- fmt.Sprintf("%x", counter) counter += 1 } }() fmt.Printf("%s\n", <-id)
Memory reuse
In this example, the gorutin is run to reuse the memory buffers. The give channel gets old memory buffers and saves them to the list. At this time, the get channel allocates these buffers for use. If there are no available buffers in the list, a new one is created.
From translatorSimply put, we actively reuse memory in order not to allocate it once again (as we know, the OS can allocate memory for a very long time). A list is used, in which there is always at least 1 buffer. And already used buffers are sent back to the same list.
package main import "container/list" func main() { give := make(chan []byte) get := make(chan []byte) go func() { q := new(list.List) for { if q.Len() == 0 { q.PushFront(make([]byte, 100)) } e := q.Front() select { case s := <-give: q.PushFront(s) case get <- e.Value.([]byte): q.Remove(e) } } }()
Limited memory reuse
In this example, the buffered channel is used as buffer storage. The channel is configured to store five buffers at any one time. This means that the channel does not block the current gorutin, if it has room for another record.
Select provides non-blocking access to this channel in case it is filled. The first select creates a new buffer if it cannot be retrieved from the repository. The second select does not do anything by default if it is not possible to put a buffer in the storage, which causes the GC to clear this buffer.
From translatorHere again I want to add from myself. We simply create a buffered channel that limits the number of buffers that we want to “reuse”. And if an “extra” buffer comes to us, we ignore it, allowing the garbage collector to work. In this case, both functions are non-blocking, that is, they do not expect to receive data from anyone.
package main func get(store chan []byte) []byte { select { case b := <-store: return b default: return make([]byte, 100) } } func give(store chan []byte, b []byte) { select { case store <- b: default: return } } func main() {
Nil feeds
Disabling the receipt of messages in the case statement
In this example, the Gorutin is started and select is used to receive messages from two channels. If the channel is closed, it is set to nil. Since nil channels always block execution, this case is no longer executed. If both channels are set to nil, we will exit the mountain because it can no longer receive anything.
From translatorIn the example, the second displayed value is false, because after closing c1 we got x = false and ok = false. If we did not assign the value nil to the channel c1, then in our infinite loop we would continue to receive infinitely x = false, ok = false.
An important thought. If you close a channel, then its case will always receive the default value. Therefore, you should assign nil to the closed channel. And after that you should not forget to check all the channels on nil, otherwise you can permanently block your mountain.
package main import "fmt" func main() { c1 := make(chan bool) c2 := make(chan bool) go func() { for { select { case x, ok := <-c1: if !ok { c1 = nil } fmt.Println(x) case x, ok := <-c2: if !ok { c2 = nil } fmt.Println(x) } if c1 == nil && c2 == nil { return } } }() c1 <- true
Disabling the sending of messages in the case statement
In this example, Gorutin is launched and used to generate random numbers and send them to channel c. When a message arrives on channel d, channel c is set to nil, disabling the corresponding case statement. A disabled Gorutin never generates random numbers again.
From translatorHere, checking for nil would not hurt, otherwise our gorutin does not finish its execution. It was blocked on select, since the first case will never be executed, and the second has lost its meaning.
There is an error in the comment - c in the second last line is not equal to nil, nil was set only for the local variable src. And deadlock occurs because our channel c is no longer recorded by anyone.
package main import ( "fmt" "math/rand" ) func main() { c := make(chan int) d := make(chan bool) go func(src chan int) { for { select { case src <- rand.Intn(100): case <-d: src = nil } } }(c)
Timers
Time-out
In this example, Gorutina is launched to do some work. The timeout channel is created to ensure that case executes if select takes too long. In this case, the gorutina ends after 30 seconds of waiting. The timeout is recreated every select iteration to ensure that it is successfully executed. In each subsequent iteration, the timeout is reset.
From translatorThere is a small misprint - in the code 5 seconds, and in article 30.
package main import "time" func worker() { for { timeout := time.After(5 * time.Second) select {
Heartbeat
In this example, Gorutina is started to do some work. The heartbeat channel is designed to execute a case statement at regular intervals. The heartbeat channel is not reset at each iteration so that the case statement is always executed on time.
package main import "time" func worker() { heartbeat := time.Tick(30 * time.Second) for { select {
Examples
Network multiplexer
This example demonstrates a simple network multiplexer. In our main gorutin, a channel is created for processing the transmitted messages and a network connection is established. Then, hundreds of gorutin runs to generate strings (which act like our messages) and send them through this channel. Each message is read from the channel over an infinite loop and sent to the network connection.
This example does not start (because we are trying to connect to the test domain), but it shows how easy it is to run many asynchronous processes that send messages to a single network connection.
package main import "net" func worker(messages chan string) { for { var msg string
First answer
In this example, each url from the array is transferred to a separate gorutina. Each gorutina is executed asynchronously and requests the url passed to it. Each response to the request is transmitted to the first channel, which, of course, ensures that the first response received will go to the channel first. Then we can read this response from the channel and process it accordingly.
From translatorThe example is again not working (swears at the unused variable r). Here is a similar example - youtube.com . I recommend to watch this video entirely. package main import "net/http" type response struct { resp *http.Response url string } func get(url string, r chan response) { if resp, err := http.Get(url); err == nil { r <- response{resp, url} } } func main() { first := make(chan response) for _, url := range []string{"http://code.jquery.com/jquery-1.9.1.min.js", "http://cdnjs.cloudflare.com/ajax/libs/jquery/1.9.1/jquery.min.js", "http://ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js", "http://ajax.aspnetcdn.com/ajax/jQuery/jquery-1.9.1.min.js"} { go get(url, first) } r := <-first
Send response channel
In this example, the channel w is created to transfer tasks to Gorutin. Gorutina receives the task and makes a request to the url contained in it. The resp channel also comes to the mountain as part of the task. As soon as the request is completed, the response is sent back through the resp. This allows this gorutina to process tasks and send the result back through different channels configured for each individual task.
From translatorIf it is simpler, for each of our tasks we create our own channel for the answer, through which we send the results to us.
package main import "net/http" type work struct { url string resp chan *http.Response } func getter(w chan work) { for { do := <-w resp, _ := http.Get(do.url) do.resp <- resp } } func main() { w := make(chan work) go getter(w) resp := make(chan *http.Response) w <- work{"http://cdnjs.cloudflare.com/jquery/1.9.1/jquery.min.js", resp} r := <-resp
HTTP load balancer
This example creates a load balancer based on the previous examples. It processes read from stdin url'y and for everyone starts the gorutina for processing. Each request passes through a load balancer to filter out these tasks for a limited number of workers. These workers process requests and return results to a single answer channel.
Using a load balancer like this allows you to send a huge number of requests, distribute them to all available resources and process them in an ordered manner.
package main import ( "fmt" "net/http" ) type job struct { url string resp chan *http.Response } type worker struct { jobs chan *job count int } func (w *worker) getter(done chan *worker) { for { j := <-w.jobs resp, _ := http.Get(j.url) j.resp <- resp done <- w } } func get(jobs chan *job, url string, answer chan string) { resp := make(chan *http.Response) jobs <- &job{url, resp} r := <-resp answer <- r.Request.URL.String() } func balancer(count int, depth int) chan *job { jobs := make(chan *job) done := make(chan *worker) workers := make([]*worker, count) for i := 0; i < count; i++ { workers[i] = &worker{make(chan *job, depth), 0} go workers[i].getter(done) } go func() { for { var free *worker min := depth for _, w := range workers { if w.count < min { free = w min = w.count } } var jobsource chan *job if free != nil { jobsource = jobs } select { case j := <-jobsource: free.jobs <- j free.count++ case w := <-done: w.count-- } } }() return jobs } func main() { jobs := balancer(10, 10) answer := make(chan string) for { var url string if _, err := fmt.Scanln(&url); err != nil { break } go get(jobs, url, answer) } for u := range answer { fmt.Printf("%s\n", u) } }
Conclusion
Go is a language that, from my point of view, has problems, but it is a language that I am willing to learn and use. The ideas from this presentation opened up new concepts for me and after it I wanted to start a new project that will take advantage of the fantastic support for competitiveness in Go. She also stressed the need to read and understand the standard library provided by languages such as Go in order to better understand the nature and design of the language.