⬆️ ⬇️

Asynchronous Monad Example

Suppose two programs communicate with each other over the network, but do not deign to wait for an answer, so the answers come in a random order. To find out what's what, a number is sent with the message, and the answer is the number of the original (to which we reply) message and the number of the answer for subsequent communication.



Our goal is to describe the sequence of receiving and sending messages when communicating with a certain interlocutor, and also to be able to use I / O (for example, contacting the database) between receptions and sending messages.



As if in a code in your preferred language, for example, such a dialogue looked, given that at any moment (between any of these points) some other requests may come that also need to be processed, but not accidentally entangled in this dialogue :

1. Send the number

2. A number comes in response

3. We send number from item 2 in a square

4. In response, the number again

5. Display to the console the sum of the numbers of p.2 and p.4

')

Here is how it will look like on Haskell (the example function is, of course, non-blocking):

example :: Int -> AIO () <br>

example v = do <br>

x <- request v<br>

y <- request ( x * x ) <br>

io $ print ( x + y ) <br>



Compare this with a similar blocking function, which, for example, requests a response from the user:

example :: Int -> IO () <br>

example v = do <br>

x <- request v<br>

y <- request ( x * x ) <br>

print ( x + y ) <br>





In order not to be distracted by unnecessary details and not need to run several programs, I will simplify the task for the article. We will read-write to the channel ( Chan a ), and the message will be of type (Int, String) , i.e. message number and serialized value.



We connect all the necessary modules:



> module Test ( <br>

> ) where <br>

> <br>

> import Control . Arrow <br>

> import Control . Monad <br>

> import Control . Concurrent . MVar <br>

> import Control . Concurrent . Chan <br>

> import Control . Concurrent <br>

> import Data . List <br>

> import Data . Maybe <br>



Before writing a monad, try first to do everything on callbacks.

When sending a message, we need to generate some number, as well as add a callback to the list. Those. we will need a variable number and a list of pairs number -> callback . We also need, in fact, the channels themselves. Unlike sockets, they need two, since we will write to one and read from the other. All this will be issued as a separate type:



> data AState = AState { <br>

> aCurrent :: MVar Int , <br>

> aWait :: MVar [ ( Int , String -> IO () ) ] , <br>

> aChanOut :: Chan ( Int , String ) , <br>

> aChanIn :: Chan ( Int , String ) } <br>

> <br>

> newA = liftM4 AState ( newMVar 0 ) ( newMVar [] ) newChan newChan<br>



The message handler from the client side must read from the channel and call a callback in accordance with the message number.

We will read from the channel, then look for a suitable callback (at the same time removing it from the list) and call. It's simple:



> listener ( AState _ w _ chIn ) = forever $ do <br>

> ( i , s ) <- readChan chIn<br>

> -- modifyMVar a -> IO (a, b) <br>

> -- .. , . <br>

> -- callback. <br>

> callback <- modifyMVar w $ \ callbacks -> do <br>

> -- callback' . <br>

> let ( past , ok ) = partition ( ( /= i ) . fst ) callbacks<br>

> -- ( ). <br>

> case ok of <br>

> ( ( _ , f ) : _ ) -> return ( past , f ) -- callback ( ). <br>

> _ -> return ( past , \ s -> return () ) -- , <br>

> callback s -- callback. <br>



So that the messages that came to the “server” could be observed firsthand, we will write a handler that will output all incoming messages to our interlocutor (on the aChanOut channel).

We read from the channel aChanOut and display on the screen:



> tracer ( AState _ _ chOut _ ) = forever $ readChan chOut >>= print<br>



Promonadic method


For starters, we do without monads. Let's try to just write a function to send a message.

It should generate a message number, serialize the message to a string, and register a callback.

sendAndReceive1 :: AState -> String -> ( String -> IO () ) -> IO () <br>

sendAndReceive1 ( AState cur w chOut _ ) msg onMsg = do <br>

i <- modifyMVar cur ( return . ( succ &&& id ) ) -- 1 . <br>

modifyMVar_ w ( return . ( ( i , onMsg ) : ) ) -- callback. <br>

writeChan chOut ( i , msg ) -- . <br>



Use in principle is acceptable, but there are shortcomings:

sendAndReceive1 a ( show 123 ) $ \ ans -> do <br>

let x = read ans -- . <br>

print x<br>

sendAndReceive1 a ( show x ) $ \ ans2 -> do <br>

-- ... <br>



First, you can pass the serialization and deserialization function to not write them in a callback and write sendAndReceive2 , using, for example, the standard read and show defaults.

sendAndReceive1 :: AState -> a -> ( a -> String ) -> ( String -> b ) -> ( b -> IO () ) -> IO () <br>

sendAndReceive1 ( AState cur w chOut _ ) msg show_ read_ onMsg = do <br>

i <- modifyMVar cur ( return . ( succ &&& id ) ) <br>

modifyMVar_ w ( return . ( ( i , onMsg . read_ ) : ) ) <br>

writeChan chOut ( i , show_ msg ) <br>

<br>

sendAndReceive2 :: ( Show a , Read b ) => AState -> a -> ( b -> IO () ) -> IO () <br>

sendAndReceive2 a msg onMsg = sendAndReceive1 a msg show read onMsg<br>

<br>

-- . <br>

sendAndReceive2 a 23 $ \ x -> do <br>

print x<br>

sendAndReceive2 a ( x + 10 ) $ \ z -> ... <br>



It would be possible to dwell on this, but then it was possible to take an assembler at all, so why not take advantage of a more powerful abstraction?



Eumonadic method


If we recall that the main function of the monad (not in TC, but in Haskell) is of type ma -> (a -> mb) -> mb , then our callback is suggested as the second argument. But also there one must be able to transfer the usual calculation of the type print .

To somehow distinguish them, create a new type with two options:

1. Message + callback

2. Net value



> data AS a = Send String ( String -> AIO a ) | Pure a<br>



And we wrap it in the IO monad.



> data AIO a = AIO { aio :: IO ( AS a ) } <br>



Thus, the calculations will be divided into two camps: sending the message and all the rest.



We write a function that “raises” the usual IO into our monad. It should just return the same as IO , but wrapping it in the Pure constructor Pure



> io :: IO a -> AIO a<br>

<br>

io act = AIO $ do <br>

v <- act<br>

return ( Pure v ) <br>



Or easier:



> io = AIO . liftM Pure <br>



The function for sending a message will be used by the second constructor, Send , in essence, simply wrapping the arguments into the constructor:



> sendAndReceive :: a -> ( a -> String ) -> ( String -> b ) -> AIO b<br>

> sendAndReceive msg to from = AIO $ return $ Send ( to msg ) ( return . from ) <br>



And similar to her request , using show , read for serialization:



> request :: ( Show a , Read b ) => a -> AIO b<br>

> request msg = sendAndReceive msg show read<br>



Some trick is that we do not calculate anything in our monad, but only construct something like a tree of calculations. By themselves, these functions only create the AIO type.

It's time to take on a function that will be able to calculate all this garbage. Those. to execute the dialogue described by us (for example, example ). The created dialogue is reduced to two options:

1. Pure - you just need to remove the value and return it.

2. Send - the main work takes place here - we generate a number, register a callback and send a message.



> run :: AState -> AIO () -> IO () <br>

> run a @ ( AState cur w chOut chIn ) act = run' act where <br>

> run' ( AIO actIO ) = do <br>

> as <- actIO<br>

> case as of <br>

> Pure value -> return value<br>

> Send msg onMsg -> do <br>

> i <- modifyMVar cur ( return . ( succ &&& id ) ) -- <br>

> modifyMVar_ w ( return . ( ( i , run' . onMsg ) : ) ) -- callback. <br>

> writeChan chOut ( i , msg ) -- . <br>



Now everything is ready to write the instance monads:



> instance Monad AIO where <br>

> return = AIO . return . Pure -- <br>

> AIO v >>= f = AIO $ do <br>

> x <- v -- AS, Send Pure? <br>

> case x of <br>

> -- Pure, callback . <br>

> Pure value -> aio $ f value<br>

> -- "" callback . <br>

> Send msg onMsg -> return $ Send msg ( \ s -> onMsg s >>= f ) <br>



The last thing we need is a function for checking the operability, which will start the listener threads to process incoming messages to the client and the tracer to output incoming messages to the server, and return the function to us to send messages from the server back to our client. Those. in this case, we will act as an interlocutor, typing what we want to send to our client:



> start :: IO ( AState , ( Int , String ) -> IO () ) <br>

> start = newA >>= forks where <br>

> forks a = mapM_ forkIO [ listener a , tracer a ] >> return ( a , writeChan ( aChanIn a ) ) <br>



Tydyshch!


Now you can check it in the interpreter using the original example.

-- , - <br>

-- , <br>

ghci > ( a , f ) <- start<br>

-- <br>

ghci > run a ( example 10 ) <br>

-- <br>

( 0 , "10" ) <br>

-- <br>

ghci > run a ( example 20 ) <br>

-- , <br>

( 1 , "20" ) <br>

-- "" <br>

ghci > f ( 0 , "11" ) <br>

-- <br>

( 2 , "121" ) <br>

-- "" <br>

ghci > f ( 1 , "21" ) <br>

-- <br>

( 3 , "441" ) <br>

-- "", , "" <br>

ghci > f ( 3 , "444" ) <br>

-- <br>

465 <br>

-- "" <br>

ghci > f ( 2 , "122" ) <br>

133 <br>

ghci > <br>



As you can see, despite the launch of two example once, the dialogs with them do not overlap.



By the way, this whole message is a Literate Haskell program, you can copy it into test.lhs and test it yourself.



PS Many thanks to pechlambda for helping to improve the article.

Source: https://habr.com/ru/post/117031/



All Articles