Multitasking is what Go is really good for, though not perfect. Nice syntax with a tart finish, simple and powerful abstractions, captivate with their elegance compared to other imperative languages. And having tasted the best, you don’t want to go so far as to mediocrity. Therefore, if you switch to another language, it should be even more expressive and with a no less sensible implementation of multitasking.
If you have already played enough with Go, are tired of copy-paste, manual juggling with mutexes and are seriously considering acquiring hand prostheses , then let me offer you the Tour of the Go translation with the equivalent D code and brief explanations.
package main import ( "fmt" "time" ) func say(s string) { for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func main() { go say("world") say("hello") }
Go allows you to easily and easily start any function in a parallel thread and continue working without waiting for it to complete. All the streams (fibers, fibers, coroutines, coroutines, greenlets) are done cooperatively on a limited number of native streams (threads, threads), thereby maximizing the utilization of the processor cores. Standard library D supports fibers , but only within one thread and does not know how to balance the fibers into several threads. But such a scheduler is implemented in the vibe.d project , although the syntax for launching a parallel stream is still not as concise as in Go. Therefore, we will use the go.d library providing the "go!" Template. for parallel start of functions. In addition, following the best practices, the code of examples we will draw up in the form of tests.
unittest { import core.time; import std.range; import jin.go; __gshared static string[] log; static void saying( string message ) { foreach( _ ; 3.iota ) { sleep( 100.msecs ); log ~= message; } } go!saying( "hello" ); sleep( 50.msecs ); saying( "world" ); log.assertEq([ "hello" , "world" , "hello" , "world" , "hello" , "world" ]); }
In D, it is not customary to cycle unnecessarily, so we implemented the cycle through iteration over a sequence of natural numbers. We had to declare the function “saying” static so that it does not have access to local variables, which is unsafe when it is executed in parallel in different threads. If you make this function a closure by removing "static", then this code will not compile - thanks to the template magic, the compiler will not allow us to send the gun to our own limbs. In Go, the issue of competitive access remains on the conscience of the programmer, who, in most cases, does not have it.
package main import "fmt" func main() { ch := make(chan int, 2) ch <- 1 ch <- 2 fmt.Println(<-ch) fmt.Println(<-ch) }
Running parallel threads would not be so useful if it were not possible to synchronize them. Go uses a rather elegant abstraction for this — channels. Channels are typed message queues. If a stream tries to read something from an empty channel, then it is blocked waiting for another stream, which this data will write there. Conversely, if it tries to write to an overcrowded channel, it will block until another thread subtracts at least one message from the channel. Channels easily replace such abstractions as lazy generators, events and promises, bringing with them much more usage scenarios.
In the standard library D for communication between threads is used to receive / transmit abstract messages . That is, knowing the stream id, you can send an arbitrary message to it, but it must unpack it and somehow process it. Pretty uncomfortable mechanism. Vibe.d introduces an abstraction of a stream of bytes with behavior similar to the gochanals. But often it is required not just bytes to transfer, but some structures. In addition, in Go, and in D, inter-thread communication is implemented through the capture of a mutex, which has notorious problems . Therefore, we will again use the go.d library, which provides us with typed wait-free channels.
unittest { import jin.go; auto numbers = new Channel!int(2); numbers.next = 1; numbers.next = 2; numbers.next.assertEq( 1 ); numbers.next.assertEq( 2 ); }
The virtual property "next", of course, is not as clear as the arrow in Go, but the compiler keeps a close eye on the position of our gun, and does not allow to transfer types not safe for parallel use from different threads through the channel. However, there is one thing - these channels require that they have no more than one reader and no more than one writer. Unfortunately, for the time being, this has to be monitored manually, but in the future, the compiler will probably also be transferred to our allies.
It is also worth noting that the size of the channel in Go is equal to one element by default, and about 512 bytes in go.d.
package main import "fmt" func sum(s []int, c chan int) { sum := 0 for _, v := range s { sum += v } c <- sum // send sum to c } func main() { s := []int{7, 2, 8, -9, 4, 0} c := make(chan int) go sum(s[:len(s)/2], c) go sum(s[len(s)/2:], c) x, y := <-c, <-c // receive from c fmt.Println(x, y, x+y) // -5 17 12 }
In Go, working with a channel is protected by a mutex, so you can use it to communicate with several streams at once, when it doesn't matter to you in what order they provide the data. Channels from the go.d library, on the contrary, are non-blocking, so they cannot be used in such a scenario — you need to create your own communication channel for each stream. To simplify the work with channel lists, the library provides Inputs and Outputs balancer structures. In this case, we need Inputs, which in turn reads from each non-empty channel registered in it.
unittest { import std.algorithm; import std.range; import jin.go; static auto summing( Channel!int sums , const int[] numbers ) { sums.next = numbers.sum; } immutable int[] numbers = [ 7 , 2 , 8 , -9 , 4 , 0 ]; Inputs!int sums; go!summing( sums.make(1) , numbers[ 0 .. $/2 ] ); go!summing( sums.make(1) , numbers[ $/2 .. $ ] ); auto res = sums.take(2).array; ( res ~ res.sum ).assertEq([ 17 , -5 , 12 ]); }
As usual, we do not write with our hands summation of the range, but use the standard generalized "sum" algorithm for this. In order for such algorithms to work with your data type, you only need to implement one of the range interfaces , which, of course, are implemented in both the Channel and Inputs, and Outputs. The "take" algorithm produces a lazy range, which returns the specified number of first elements of the source range. And the "array" algorithm removes all elements from the range and returns the native array with them. Please note that we pass to each stream a separate channel of unit length and a section of an immutable array (hello, parallelism!).
package main import ( "fmt" ) func fibonacci(n int, c chan int) { x, y := 0, 1 for i := 0; i < n; i++ { c <- x x, y = y, x+y } close(c) } func main() { c := make(chan int, 10) go fibonacci(cap(c), c) for i := range c { fmt.Println(i) } }
As you can see, in Go we can also be iterated through the channel, successively deriving from it successive elements. In order not to hang in an infinite loop, such channels must be closed by the transmitting side so that the host can understand that there will be no more data and it’s time to end the cycle. In D, we would write almost the same thing, except that we would declare a Fibonacci series in the form of a mathematical recurrent formula.
unittest { import std.range; import jin.go; static auto fibonacci( Channel!int numbers , int count ) { auto range = recurrence!q{ a[n-1] + a[n-2] }( 0 , 1 ).take( count ); foreach( x ; range ) numbers.next = x; numbers.close(); } auto numbers = new Channel!int(10); go!fibonacci( numbers , numbers.size ); numbers.array.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]); }
But you can further simplify the code, knowing that the template "go!" he is able to shift values ​​from range to channel.
unittest { import std.range; import jin.go; static auto fibonacci( int limit ) { return recurrence!q{ a[n-1] + a[n-2] }( 0 , 1 ).take( limit ); } fibonacci( 10 ).array.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]); go!fibonacci( 10 ).array.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 ]); }
Thus, the function does not necessarily know anything about the channels to be able to run it in a parallel thread, and then wait for the result from it.
package main import "fmt" func fibonacci(c, quit chan int) { x, y := 0, 1 for { select { case c <- x: x, y = y, x+y case <-quit: fmt.Println("quit") return } } } func main() { c := make(chan int) quit := make(chan int) go func() { for i := 0; i < 10; i++ { fmt.Println(<-c) } quit <- 0 }() fibonacci(c, quit) }
Go has a special concise syntax for simultaneous work with multiple channels. D, of course, has nothing. However, the equivalent functionality is not very difficult to implement by manually implementing the observation cycle.
unittest { import std.range; import jin.go; __gshared int[] log; static auto fibonacci( Channel!int numbers , Channel!bool control ) { auto range = recurrence!q{ a[n-1] + a[n-2] }( 0 , 1 ); while( !control.closed ) { if( numbers.needed ) numbers.next = range.next; yield; } log ~= -1; numbers.close(); } static void print( Channel!bool control , Channel!int numbers ) { foreach( i ; 10.iota ) log ~= numbers.next; control.close(); } auto numbers = new Channel!int(1); auto control = new Channel!bool(1); go!print( control , numbers ); go!fibonacci( numbers , control ); while( !control.empty || !numbers.empty ) yield; log.assertEq([ 0 , 1 , 1 , 2 , 3 , 5 , 8 , 13 , 21 , 34 , -1 ]); }
As you can see, we had to get rid of the closure, and add "yield" in cycles so that competing fibers could do something as well, while the current hangs waiting.
package main import ( "fmt" "time" ) func main() { tick := time.Tick(100 * time.Millisecond) boom := time.After(500 * time.Millisecond) for { select { case <-tick: fmt.Println("tick.") case <-boom: fmt.Println("BOOM!") return default: fmt.Println(" .") time.Sleep(50 * time.Millisecond) } } }
Go's special syntax allows you to do something if there is no activity in any of the channels. In D, however, you have more control over the flow of execution.
unittest { import core.time; import jin.go; static auto after( Channel!bool channel , Duration dur ) { sleep( dur ); if( !channel.closed ) channel.next = true; } static auto tick( Channel!bool channel , Duration dur ) { while( !channel.closed ) after( channel , dur ); } auto ticks = go!tick( 101.msecs ); auto booms = go!after( 501.msecs ); string log; while( booms.clear ) { while( !ticks.clear ) { log ~= "tick"; ticks.popFront; } log ~= "."; sleep( 51.msecs ); } log ~= "BOOM!"; log.assertEq( "..tick..tick..tick..tick..BOOM!" ); }
A remarkable feature is that we did not need to manually create the channel. If the function accepts the channel as the first argument and we did not pass it, it will be created automatically and returned as the result of the "go!" Pattern, which is very convenient. The "after" and "tick" functions are too specific to be included in the common library, but they have very simple implementations.
In some cases, we still cannot do without a shared changeable state, and locks come to our aid.
package main import ( "fmt" "sync" "time" ) // SafeCounter is safe to use concurrently. type SafeCounter struct { v map[string]int mux sync.Mutex } // Inc increments the counter for the given key. func (c *SafeCounter) Inc(key string) { c.mux.Lock() // Lock so only one goroutine at a time can access the map cvcv[key]++ c.mux.Unlock() } // Value returns the current value of the counter for the given key. func (c *SafeCounter) Value(key string) int { c.mux.Lock() // Lock so only one goroutine at a time can access the map cv defer c.mux.Unlock() return cv[key] } func main() { c := SafeCounter{v: make(map[string]int)} for i := 0; i < 1000; i++ { go c.Inc("somekey") } time.Sleep(time.Second) fmt.Println(c.Value("somekey")) }
Yes, exactly, the implementation of a shared changeable state in Go is pain and suffering. One wrong move when working with mutexes and you suddenly find yourself in phantom limbs . Not to mention the fact that the compiler does not even hint to you where mutexes are needed. But the D compiler strongly curses you for trying to work with an unprotected changeable state from different threads. And the easiest way to protect the state when multithreaded execution is to implement a synchronized class.
unittest { import core.atomic; import core.time; import std.range; import std.typecons; import jin.go; synchronized class SafeCounter { private int[string] store; void inc( string key ) { ++ store[key]; } auto opIndex( string key ) { return store[ key ]; } void opIndexUnary( string op = "++" )( string key ) { this.inc( key ); } } static counter = new shared SafeCounter; static void working( int i ) { ++ counter["somekey"]; } foreach( i ; 1000.iota ) { go!working( i ); } sleep( 1.seconds ); counter["somekey"].assertEq( 1000 ); }
The feature of the synchronized class is that a mutex is automatically created for it and when any public method is called, this mutex is captured, released only when the method leaves. In this case, all internal state must be private. But there is one unpleasant feature (and in fact a very dangerous and annoying compiler bug): sample methods, such as, for example, "opIndexUnary!", Do not turn into a mutex capture. Therefore, we have created a separate public method "inc", which we call from the template method. The internal implementation turned out not so beautiful, but the external interface turned out as native. The resulting "shared SafeCounter" we can already safely pass through the channel and use directly from different streams.
Source: https://habr.com/ru/post/280378/
All Articles