Asynchronous code is complicated. Everybody knows it. Writing asynchronous tests is even harder. I recently fixed a flashing test and would like to share some thoughts on writing asynchronous tests.In this article, we will deal with a common problem for asynchronous tests - how to build streams in a specific order, forcing individual operations of individual streams to be performed before other operations of other streams. Under normal conditions, we do not seek to build a forced order of execution of different threads, since this contradicts the very reason for using threads, which is to ensure parallelism and allow the processor to choose the best order of execution based on the resources available and the state of the application. But a certain order may be required during testing to ensure the stability of the tests.

')
Test throttler
Throttler (Limiter) is a class that is responsible for limiting the number of simultaneous operations that are performed on a certain resource (for example, connection pool, network buffer, or resource-intensive processor operations). Unlike other synchronization tools, the role of a delimiter is that requests exceeding a quota will fail immediately without waiting. A quick shutdown is important because the alternative, wait, consumes resources — ports, streams, and memory.
Here is a simple implementation of a delimiter (basically, this is a wrapper around the Semaphore class; in the real world, waiting, retrying, etc. can occur here):
class ThrottledException extends RuntimeException("Throttled!") class Throttler(count: Int) { private val semaphore = new Semaphore(count) def apply(f: => Unit): Unit = { if (!semaphore.tryAcquire()) throw new ThrottledException try { f } finally { semaphore.release() } } }
Let's start with a simple unit test: we are testing a limiter for a single thread (for tests, we use the specs2 library). In this test, we verify that we can make more consecutive calls than the maximum number of simultaneous calls set for the limiter (the maxCount variable is lower). Note that since we are using a single thread, we are not testing the ability of the terminator to “fast termination”, since we do not load the terminator. In fact, we check only the fact that while the limiter is not loaded, it does not interrupt the execution of operations.
class ThrottlerTest extends Specification { "Throttler" should { "execute sequential" in new ctx { var invocationCount = 0 for (i <- 0 to maxCount) { throttler { invocationCount += 1 } } invocationCount must be_==(maxCount + 1) } } trait ctx { val maxCount = 3 val throttler = new Throttler(maxCount) } }
We are testing asynchronously limiter
In the previous test, we did not load the limiter simply because it could not be done with a single thread. Therefore, our next step is to check the operability of the limiter in a multi-threaded environment.
Training:
val e = Executors.newCachedThreadPool() implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(e) private val waitForeverLatch = new CountDownLatch(1) override def after: Any = { waitForeverLatch.countDown() e.shutdownNow() } def waitForever(): Unit = try { waitForeverLatch.await() } catch { case _: InterruptedException => case ex: Throwable => throw ex }
The ExecutionContext object is used for the Future construction; The waitForever method keeps the thread until the waitForeverLatch counter is reset - before the end of the test. In the following function, we close ExecutorService.
A simplified way to check the multithreaded behavior of a limiter is as follows:
"throw exception once reached the limit [naive,flaky]" in new ctx { for (i <- 1 to maxCount) { Future { throttler(waitForever()) } } throttler {} must throwA[ThrottledException] }
Here we create threads in an amount equal to maxCount. In each thread, we call the waitForever function, which waits until the end of the test. Then we try to perform another operation to bypass the limiter - maxCount +1. It is assumed that in this place we should get a ThrottledException exception. However, although we are waiting for an exception, it does not occur. The last call to the delimiter (with wait) may occur before any of the future starts (this causes an exception to be thrown in this instance of the future, but not within the scope of the wait).
The problem with the above test is that we do not know about the start of all threads and wait in the waitForever function before we try to bypass the limiter, expecting that the limiter will throw an exception. To fix this, we need to somehow wait until all the future threads are running. Here is an approach familiar to many of us: just add a call to the sleep method with some reasonable duration.
"throw exception once reached the limit [naive, bad]" in new ctx { for (i <- 1 to maxCount) { Future { throttler(waitForever()) } } Thread.sleep(1000) throttler {} must throwA[ThrottledException] }
Well, now our test will almost always pass. But this is the wrong approach for at least two reasons:
The duration of the test will be exactly equal to the “reasonable duration” established by us.
In very rare situations, for example, with a high load of the machine, this reasonable duration may not be enough.
If you're still in doubt, search Google for other reasons.
A more correct approach is to synchronize the start of our threads (future) and our expectations.
We will use the CountDownLatch class from the java.util.concurrent package:
"throw exception once reached the limit [working]" in new ctx { val barrier = new CountDownLatch(maxCount) for (i <- 1 to maxCount) { Future { throttler { barrier.countDown() waitForever() } } } barrier.await(5, TimeUnit.SECONDS) must beTrue throttler {} must throwA[ThrottledException] }
We use CountDownLatch for barrier synchronization. The await method blocks the main thread until the latch counter clears. When other threads are started (we will designate these other threads as futures), each of these futures calls the countDown barrier method to reduce the latch counter value by one. When the latch counter becomes zero, all futures are located inside the waitForever method.
At this point, we have seen that the limiter is loaded and contains the number of threads equal to maxCount. Attempting another thread to use a delimiter will result in an exception. Thus, we obtained a deterministic execution order in which we can check the behavior of the terminator in the main thread. The main thread can and will continue execution at this point (the barrier counter reaches zero and the CountDownLatch releases the waiting thread).
We use a somewhat excessive timeout to avoid endless blocking if something unexpected happens. If something happens, the test will drop. This timeout will not affect the duration of the test, because if nothing unexpected happens, we don’t have to wait for it.
At last
When testing asynchronous code, quite often there is a need for a specific flow order for a specific test. If you do not use any synchronization, we get unstable tests, which sometimes work out, and sometimes fall. Using Thread.sleep reduces the instability of the tests, but does not solve the problem. In most cases, when we need to determine the order of flows in the test, we can use CountDownLatch instead of Thread.sleep. The advantage of CountDownLatch is that we can specify when to reset the waiting (holding) of the stream, which gives us two important advantages: deterministic order determination and, therefore, more stable tests and faster passing tests. Even for the usual wait, for example, the waitForever function, we could use something like Thread.sleep (Long.MAX_VALUE), but unreliable approaches should always be avoided.
Wix
Website Designer ,
Dmitry Komanov
Original article:
Wix engineers blog