📜 ⬆️ ⬇️

We prepare multithreading with core.async

image


The pattern of using channels to create is becoming increasingly popular.
multi-threaded applications. The idea is not new, its design was laid back in 1978
in the form of CSP . The most well-known implementation is now widely used in Golang.


We in article will consider implementation of CSP in core.async for Clojure, if interesting, welcome under kat.



The article will discuss the simple and basic practices for working with core.async, described in the article will be sufficient for a good start in multi-threaded programming.


Unlike Golang, where the paradigm of working with streams through channels is built into the language itself, core.async is just a library for Clojure, if you are impressed by another paradigm, then there is a choice: pulsar , promesa , manifold


At the same time, core.async and promesa can also be used on the browser side in ClojureScript, naturally in this case there is no need to talk about multi-threading, since all this stuff is compiled in ES5 and executed in the browser, but the familiar interface and convenient work with asynchrony can serve well.


So what does core.async give us? If we explain on the fingers, then core.async provides us with scheduling via go-blocks in its fixed Thread Pool consisting of 8 threads (the size of the Thread Pool can be changed via a special option). When a message arrives on the channel, core.async will find the free flow itself, and pass the task to it, or put the message in a queue. Who first hears about the Thread Pool, you can read a good note on the pattern Worker Thread




Example number 1


(defonce log-chan (chan)) (defn loop-worker [msg] (println msg)) (go-loop [] (let [msg (<! log-chan)] (loop-worker msg) (recur))) 

In the example above, we created a log-chan channel, and defined a loop-worker function that will process messages from the channel. Then we created an infinite loop go-block, putting our loop-worker Now we can send data to the channel: (>!! log-chan "")


The loop-worker function was rendered separately for the go-block intentionally, for the sake of convenient debugging through the REPL.


The body itself is go-loop since this macro is baked somewhere inside core.async, and recompiling it on-the-fly to the REPL is of a strange character, so the handler is easier to separate and live peacefully.


It is worth noting that the go-loop does not make any infinite loop in its usual sense.
After receiving the message, a one-time execution of the handler function occurs, and then the go-block is parked by the <! which will be waiting for a new message. Thus, you can create as many channels and processors to them as you like.


Within the go-block, the function of reading from the channel <! provides parking flow.
Outside of the go block there is an opportunity to use the function <!! which blocks the main thread until a message is received. Behavior <!! can be compared with the await function in ES7.


Parking go block, this is the term core.async meaning that the thread is released, and is available for other tasks. There is also the term blocking, which means that the thread will be directly blocked and unavailable for new tasks until it is released.


In example 1, there is a flaw, if Exception is called in a loop-worker, the form will be interrupted, and (recur) will never be called, therefore waiting for data from the log-chan channel will stop, we will fix this in Example No. 2.


Example number 2


 (defonce log-chan (chan)) (defn loop-worker [msg] (throw (Exception. "my exception message"))) (go-loop [] (let [msg (<! log-chan) res (try (loop-worker msg) :ok (catch Exception e (println (.getMessage e)) :error))] (recur))) 

In this example, we wrapped the entire loop-worker call into a try form, and the variable res would contain a flag indicating that the form was successful or an error occurred. This flag can be useful, for example, if we want to close the channel in case of an error. A working example of this approach can be found here.


Example number 3


  (let [c1 (go (<! (timeout (rand-int 1000))) 5) c2 (go (<! (timeout (rand-int 1000))) 7)] (go (let [v1 (<! c1) v2 (<! c2)] (println {:v1 v1 :v2 v2 :summ (+ v1 v2)})))) 

This example will wait for the result of all the asynchronous operations listed in the let block. This practice is very convenient for solving the problem of callback hell in JavaScript, and another reason to rejoice that it can be used on the browser side in the face of ClojureScript.


Example number 4


 (defn upload "upload emulator" [headshot c time] (go (Thread/sleep time) (>! c headshot))) (let [c1 (chan) c2 (chan)] (upload "pic1.jpg" c1 30) (upload "pic2.jpg" c2 40) (let [[headshot channel] (alts!! [c1 c2 (timeout 20)])] (if headshot (println "Sending headshot notification for" headshot) (println "Timed out!")))) 

In this example, we created a upload function that emulates an asynchronous operation, in this case, a file upload. The final argument to upload, takes the delay time in milliseconds. Using the function alts !!! we can get the first result that will be returned to us by one of the channels listed in the vector. In our vector, the last channel goes (timeout 20) , this channel will return the result in 20 milliseconds, and this will be the first value that will be written to the variable headshot and the form will continue. Thus, this example emulates the installation of time on timeout, during which we will wait for the execution of a set of asynchronous operations.


Example number 5


 (def ping (chan)) (def pong (chan)) (go-loop [] (let [msg (<! ping)] (when (= msg :ping) (println msg) (>! pong :pong) (Thread/sleep 1000)) (recur))) (go-loop [] (let [msg (<! pong)] (when (= msg :pong) (println msg) (>! ping :ping) (Thread/sleep 1000)) (recur))) (>!! ping :ping) 

An example of communication between two channels, the classic Ping-Pong.


This was the last example I wanted to show. Separately, it is also worth highlighting the presence of data types in clojure, created specifically for recording information into several streams, this atom and agent, as well as the overall immunity of other types, all this greatly simplifies the developer’s life when developing a multi-threaded application.




Useful links:


"Http://clojure.com/blog/2013/06/28/clojure-core-async-channels.html
»Https://github.com/clojure/core.async
»Https://github.com/clojure/core.async/wiki/Getting-Started
"Http://www.braveclojure.com/core-async/
»Http://go.cognitect.com/core_async_webinar_recording

')

Source: https://habr.com/ru/post/312748/


All Articles