📜 ⬆️ ⬇️

Haskell. We test the multithreaded application

This article was compiled by Valery Isaev, a lecturer at the Academic University, based on the practice materials for the functional programming course.

I suppose it is not a secret to anyone that writing multi-threaded applications is associated with a number of problems that are missing in the development of single-threaded programs.
One of the problems is testing the application.
We can not control the order in which operations are performed, therefore, it is not amenable to control and the result of the program. Even if we get an error, stepping on the same rake a second time will not be so easy.
I want to offer a small recipe on how to test a multi-threaded application.
From the ingredients we need: haskell , QuickCheck , some monads, salt / pepper to taste.

Working example


As a working example, take the task of the dining philosophers.

MVar a is a link that either contains a value of type a or is empty.
putMVar ref x puts the reference ref value x.
takeMVar ref reads the contents of the link, leaving it empty after that.
If it was already empty, then the stream falls asleep until it records something else in it.
() Is a type having a single value, which is denoted in the same way as the type itself - () .
We model forks with links like MVar () .
Thus, a fork can have two states: if the fork is occupied by a philosopher, it is empty; if the plug is free, it contains the value () .
')
 import System.Random import Control.Monad import Control.Concurrent import Control.Monad.Cont import Control.Monad.Trans import Data.IORef import Test.QuickCheck import Test.QuickCheck.Gen import Test.QuickCheck.Monadic -- sleep       ( 0  0.3) sleep :: IO () sleep = randomRIO (0, 300000) >>= threadDelay phil :: Int --  . -> MVar () --    . -> MVar () --    . -> IO () phil n leftFork rightFork = forever $ do putStrLn $ show n ++ " is awaiting" sleep takeMVar leftFork putStrLn $ show n ++ " took left fork" -- sleep takeMVar rightFork putStrLn $ show n ++ " took right fork" sleep putMVar leftFork () putMVar rightFork () putStrLn $ show n ++ " put forks" sleep runPhil :: Int -> IO () runPhil n = do --  ,   . forks <- replicateM n $ newMVar () --  5 ,     phil. forM_ [1..n] $ \i -> forkIO $ phil i (forks !! (i - 1)) (forks !! (i `mod` n)) main = do runPhil 5 --    ,  ,     . forever (threadDelay 1000000000) 

In this program can happen deadlock.
To admire it, you can uncomment the line - sleep and wait a bit.
Our goal is to write tests that would detect this error.
But before we can do this, it’s worth understanding how we will manage the order of operations. For this, instead of IO, use another monad.

runPhil definition of the functions sleep , phil and runPhil , so that they work for other monads.

 sleep :: MonadIO m => m () sleep = do r <- liftIO $ randomRIO (0, 100) r `times` liftIO (threadDelay 300) where times :: Monad m => Int -> m () -> m () times ra = mapM_ (\_ -> a) [1..r] 

Now the sleep function can work with any monad that supports IO operations. In the MonadIO class, only one liftIO function is liftIO that allows this.
Note that instead of once falling asleep for a random number of seconds, we fall asleep a random number of times by 0.3 milliseconds. The reason is that in our monad, actions inside liftIO are performed atomically. Accordingly, the time at which we fall asleep does not affect anything, it only matters how many times we do it.

Since our monad will work in the same thread, MVar is useless for us, and we will later define our type of links, assuming that the phil function can work with MVar and other types of links.
To do this, we define the monad class MonadConcurrent , in which there will be operations for creating, reading and writing by reference, as well as for creating threads.

 class Monad m => MonadConcurrent m where type CVar m :: * -> * newCVar :: a -> m (CVar ma) takeCVar :: CVar ma -> ma putCVar :: CVar ma -> a -> m () fork :: m () -> m () 


Here we used families of types that are extensions of the language.
In this case, we need this extension so that we can define different types of links for different monads.
To use an extension, you need to add the following line to the beginning of the file (and at the same time connect extensions that will be needed later):

 {-# LANGUAGE TypeFamilies, ExistentialQuantification, GeneralizedNewtypeDeriving #-} 

Define an instance this class for the monad IO.
Everything is easy here: we simply use the appropriate operations for MVar .

 instance MonadConcurrent IO where type CVar IO = MVar newCVar = newMVar takeCVar = takeMVar putCVar = putMVar fork m = forkIO m >> return () 

runPhil functions phil and runPhil .

 phil :: (MonadIO m, MonadConcurrent m) => Int -> CVar m () -> CVar m () -> m () phil n leftFork rightFork = forever $ do liftIO $ putStrLn $ show n ++ " is awaiting" sleep takeCVar leftFork liftIO $ putStrLn $ show n ++ " took left fork" takeCVar rightFork liftIO $ putStrLn $ show n ++ " took right fork" sleep putCVar leftFork () putCVar rightFork () liftIO $ putStrLn $ show n ++ " put forks" sleep runPhil :: (MonadIO m, MonadConcurrent m) => Int -> m () runPhil n = do forks <- replicateM n $ newCVar () forM_ [1..n] $ \i -> fork $ phil i (forks !! (i - 1)) (forks !! (i `mod` n)) 

Run the program and make sure that it works as before.

Monad Concurrent


And now the fun begins.

We define the monad in which we will work (looking ahead, I will say that it is called Cont ). Also I would venture to suggest that Cont is one of the most complex and most powerful monads at the same time.
Using this monad, you can do anything with the control flow: for example, instead of performing actions, you can save them in a structure (for this purpose we will declare an Action type) and perform them later, perhaps in a different order.

 data Action = Atom (IO Action) | forall a. ReadRef (MaybeRef a) (a -> Action) | forall a. WriteRef (MaybeRef a) a Action | Fork Action Action | Stop 

Let's understand separately with each designer.
A Stop action means that the calculation is complete.
The Fork action means that the calculations are branching, that is, now we have two threads that can be executed simultaneously.
The Atom action performs an atomically IO operation, which returns us a new Action , which contains the action, which should be performed in the next step.

For example:
The getSum function sets an action that reads two numbers from the keyboard, prints their sum and ends.

 getSum :: Action getSum = Atom $ do x <- readLn --    return $ Atom $ do --   y <- readLn --    return $ Atom $ do --   print (x + y) --   return Stop --   

Further:
Action WriteRef ref val act writes the value val by reference ref , in act is the continuation.
The ReadRef ref act reads the value by reference ref , act takes this value and returns a continuation.
To make it possible to save arbitrary types of references in Action , we use another extension of the language - existential quantification.

The MaybeRef type represents the type of links that we will use instead of MVar , and it is defined as a link to Maybe .

 newtype MaybeRef a = MaybeRef (IORef (Maybe a)) 

Now we can define our monad.
As I promised, we just wrap the monad Cont .

 newtype Concurrent a = Concurrent (Cont Action a) deriving Monad 

Monad Cont Action organized as follows.
Instead of returning a value of type a , it takes an extension of the type (a -> Action) , passes the value to this function, and returns the result.
That is, we can assume that Cont Action a = (a -> Action) -> Action .
More specifically, we have the following pair of functions that translate (a -> Action) -> Action to Cont Action a and back.

 cont :: ((a -> Action) -> Action) -> Cont Action a. runCont :: Cont Action a -> (a -> Action) -> Action 

Now we can define instances of the MonadIO and MonadConcurrent .

 instance MonadIO Concurrent where liftIO m = Concurrent $ cont $ \c -> Atom $ do a <- m return (ca) 

Let's see what happens here.
liftIO takes an IO operation and wraps it into an atomic action. Namely: we in cont transfer to a function that accepts continuation (that is, c is of type a -> Action ) and returns an atomic action that performs an IO operation m .
We have defined Atom so that an atomic operation must return an Action that is a continuation.
Actually, this is what we are doing: after we have executed m , we call c , which returns the necessary continuation.

Now define the instance MonadConcurrent .
Create a link in newCVar using the liftIO function liftIO just defined.
In takeCVar and putCVar return the corresponding action, and we continue the continuation inside this structure.
In fork, we return an action in which both threads are stored: one is passed to the arguments of the fork function, the other comes from a continuation.

 instance MonadConcurrent Concurrent where type CVar Concurrent = MaybeRef newCVar a = liftIO $ liftM MaybeRef $ newIORef (Just a) takeCVar v = Concurrent $ cont (ReadRef v) putCVar va = Concurrent $ cont $ \c -> WriteRef va $ c () fork (Concurrent m) = Concurrent $ cont $ \c -> Fork (runCont m $ \_ -> Stop) $ c () 

Our monad is almost ready, it remains only to learn how to run it.
To begin with we will write the function launch Action . It takes a list of actions, each element in which is a separate thread.
Strategies for launching actions may be different. We define two points: in what order to execute the threads, and what to do if we try to read the value from a variable that is empty. Let me remind you that nothing can lie in a variable, and then we need to wait until something else is put into it by another thread.
Let's first write a simple version, where we will execute the threads in turn; and the stream trying to read from an empty variable will be moved to the end of the queue.

 runAction :: [Action] -> IO () --    , . runAction [] = return () --   ,  ,   ,    . runAction (Atom m : as) = do a' <- m runAction $ as ++ [a'] --       . runAction (Fork a1 a2 : as) = runAction $ as ++ [a1,a2] --    . runAction (Stop : as) = runAction as runAction (ReadRef (MaybeRef ref) c : as) = do --   . ma <- readIORef ref case ma of --    -,  Just a -> do --   . writeIORef ref Nothing --     . runAction (as ++ [ca]) --     ,       ,         . Nothing -> runAction (as ++ [ReadRef (MaybeRef ref) c]) --    ,     . runAction (WriteRef (MaybeRef ref) val a : as) = do writeIORef ref (Just val) runAction (as ++ [a]) 

Notice that putMVar works slightly differently than our implementation of WriteRef .
If the link already had some value, then putMVar freeze the stream until the variable is released. In this case, overwrite the value.
You should not create a version working as putMVar in this situation, so as not to over-complicate the code.

Next, write the function that starts the Concurrent , and override the main .

 runConcurrent :: Concurrent () -> IO () runConcurrent (Concurrent c) = runAction [runCont c $ \_ -> Stop] main = runConcurrent (runPhil 5) 

Since we are now working in one thread, and threadDelay stops all work, the speed has slowed down a bit.

We write tests


It is time to “add seasoning to the dish” - write tests for our example.
To do this, use the QuickCheck library, which generates random input data for tests. Since we want to run our threads in different orders, the input to our tests is the order in which we select the next thread from the list.
It is possible to encode the input data with a list of numbers, but the problem is that we do not know in advance which range these numbers should be chosen for, since the number of streams can vary.
Therefore, we will encode the input data as a list of functions of the type Int -> Int , which take the number n and return a number from the interval [0,n-1] .

 newtype Route = Route [Int -> Int] 

The Arbitrary class provided by the QuickCheck library is designed to describe types that allow you to generate elements at random.
In this class, two functions are declared - shrink and arbitrary .
shrink has a default implementation, so we will not override it.
In the arbitrary function, we will generate a list of random functions, where each function returns a number from the interval [0,n-1] .

 instance Arbitrary Route where arbitrary = fmap Route (listOf arbitraryFun) where arbitraryFun = MkGen $ \qsn -> unGen (choose (0, n - 1)) qs 

We also define an instance Show for Route , as this is required by QuickCheck .
Unfortunately, we cannot write a too useful show . Moreover, this function will not be used, so we leave it undefined.

 instance Show Route where show = undefined 

Now you can start writing a smarter version of runAction .
The first difference is that we will separate the execution of atomic actions and work with links.
To begin skipAtoms , skipAtoms write an auxiliary function, skipAtoms , which performs atomic actions: the function accepts the list of actions, executes Atom , Fork and Stop , the rest returns as a result.

 skipAtoms :: [Action] -> IO [Action] skipAtoms [] = return [] skipAtoms (Atom m : as) = do a <- m skipAtoms (as ++ [a]) skipAtoms (Fork a1 a2 : as) = skipAtoms (as ++ [a1,a2]) skipAtoms (Stop : as) = skipAtoms as skipAtoms (a : as) = fmap (a:) (skipAtoms as) 

The second difference of the new version of runAction from the previous one is that we track the receipt of deadlock.
To do this, we have two lists of actions. The first one stores active (executed by us) threads. In the second, there are threads waiting for any link to be updated.
If the list of active threads is empty, but there is no waiting list, then we have received deadlock, and in this case we throw an exception.

The third innovation is the Route type argument used to select the stream number to be executed in the current step.

 runAction :: Route -> [Action] -> [Action] -> IO () runAction _ [] [] = return () runAction _ [] _ = fail "Deadlock" runAction (Route []) _ _ = return () runAction (Route (r:rs)) as bs = do as <- skipAtoms as let n = length as case splitAt (rn) as of (as1, ReadRef (MaybeRef ref) c : as2) -> do ma <- readIORef ref case ma of Just a -> do writeIORef ref Nothing runAction (Route rs) (as1 ++ [ca] ++ as2) bs Nothing -> runAction (Route rs) (as1 ++ as2) (bs ++ [ReadRef (MaybeRef ref) c]) (as1, WriteRef (MaybeRef ref) xc : as2) -> do writeIORef ref (Just x) runAction (Route rs) (as1 ++ [c] ++ as2 ++ bs) [] 

The runConcurrent function runConcurrent hardly changed.

 runConcurrent :: Route -> Concurrent () -> IO () runConcurrent r (Concurrent c) = runAction r [runCont c $ \_ -> Stop] [] 

You can check how the new version works by passing round_robin as the first argument. This is a simple execution strategy, similar to how the runAction function worked before. Here we simply generate an infinite list and for each element we take the remainder modulo the number of threads.

 round_robin :: Route round_robin = Route $ map rem [0..] 

By running the calculations on these inputs, we will most likely quickly get deadlock - due to the fact that the work of our example is based on a random number generator - therefore, despite the fact that the input data are always the same, the execution order is random.
If our example were more deterministic, we would have to vary the input data randomly, which we now do.

 main = quickCheck $ monadicIO $ do r <- pick arbitrary run $ runConcurrent r (runPhil 5) 

We choose an arbitrary element of the Route type using the arbitrary function that we implemented earlier. Then we run our calculation on this input.
QuickCheck will take care of the QuickCheck , namely: run our test 100 times, each time increasing the size of the input data.

Running the program, we will see the following:

 ... 3 took left fork 4 put forks 4 is awaiting 5 took left fork 4 took left fork 1 took right fork 1 put forks 1 is awaiting 1 took left fork 2 took left fork *** Failed! Exception: 'user error (Deadlock)' (after 36 tests): 

What was required to get!

Conclusion


We have learned how to write tests that can detect the state of the deadlock in a multi-threaded application.
In the process, we have seen examples of using the Cont monad, type families, existential quantification, and the QuickCheck library.
In addition, we learned how to build a model of multi-threaded program execution from scrap materials.

Source: https://habr.com/ru/post/224075/


All Articles