📜 ⬆️ ⬇️

Some facts about python asyncio

Hello! I would like to share the experience of using python asyncio. For one and a half years of use in production, some experience has accumulated, general techniques that make life easier. Naturally, there were rakes, which are also worth mentioning, because it will help save a lot of time for those who are just starting to use asyncio in their applications. Who cares - I ask under the cat.

A bit of history


Asyncio appeared in Python version 3.4, 3.5 added a more pleasant async / await syntax. Asyncio provides out of the box Event loop, Future, Task, Coroutine, I / O multiplexing, Synchronization primitives. This, of course, is not small, but not enough for full development. For this there are third-party libraries. An excellent selection is here . In our company, we use asyncio along with a set of third-party libraries to write microservices. By nature, our services are more focused on I / O than on the CPU, so for us, asyncio is great.

Actually the facts


This is not a tutorial on asyncio. I will not explain why asynchronous I / O is good, or why not use streams. There will be no stories about korutinakh, generators, event loop'ah, etc. Also, there will be no benchmarks and comparisons with other languages. Go!
')
Debug

First, PYTHONASYNCIODEBUG. This is an environment variable that includes debug mode. For example, you can see messages that you have declared a function as coroutine, but are called as a normal function (relevant for python3.4). You also need to configure asyncio logger to debag level and still allow ResourseWarning output. You can see a lot of interesting things: messages that you forgot to close the transport or the event loop itself (read, you forgot to release the resources). Compare the launch of the following code with the -Wdefault interpreter parameter and the environment variable PYTHONASYNCIODEBUG = 1 and without them (here and later in the code examples I will omit some non-essential parts such as import or exception handling):

@asyncio.coroutine def test(): pass loop = asyncio.get_event_loop() test() 

Proper completion

By the way about the release of resources. The event loop must be able to properly stop, waiting for the correct completion of all the threads, closing the connections, etc. And if there are no special problems with the use of run_until_complete (), then with run_forever () everything is a bit more complicated. The event loop's close () method can be called only if it is already stopped — i.e. after the stop () method. This is best done using signals:

 def handler(loop): loop.remove_signal_handler(signal.SIGTERM) loop.stop() loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGTERM, handler, loop) try: loop.run_forever() finally: loop.close() 

Further, in the code examples, I will nevertheless concentrate on the essence, and not on the correct completion of the program.

Run the blocking code

Naturally, not for everything there are asynchronous libraries. Some code remains blocking, and it must somehow be run so that it does not block our event loop. For this, there is a good run_in_executor () method that runs what you passed to it in one of the threads of the built-in pool without blocking the main thread with the event loop. Everything is good, but with this there are 2 problems. First, the size of the standard pool is only 5. Secondly, in asyncio there is a synchronous dns resolver, which is launched in this way in the built-in pool. This means that synchronous operations will compete for a pool of just 5 threads, plus all who need to do getaddrinfo (). Exit - use your pool. Is always:

 def blocking_function(): time.sleep(42) pool = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) loop = asyncio.get_event_loop() loop.run_in_executor(pool, blocking_function) loop.close() 

Devious Future

Future has one very interesting feature: if an exception occurs in it, you will not learn anything about it, unless you explicitly ask the future itself about it. The documentation has a good example on this topic. You will see that there was an exception only when gc deletes the future object. From here follows a simple rule - always check the result of your future. Even if, according to your idea, the code inside the future should just spin in an infinite loop, and it would seem that there is no place to check the result - you still need to handle exceptions, like this:

 async def handle_exception(): try: await bug() except Exception: print('TADA!') async def bug(): raise Exception() loop = asyncio.get_event_loop() loop.create_task(handle_exception()) loop.run_forever() loop.close() 

await and __init __ ()

Impossible. The magic __init __ () method cannot contain asynchronous code. There are two ways. Or make another class of the class, for example, initialize (), which will already be corortium. It will contain all the asynchronous code for initialization, and it will need to be called after the object is created. It looks awful. Therefore, it is customary to use factory functions. Let me explain with an example:

 class Foo: def __init__(self, reader, writer, loop, *args, **kwargs): self._reader = reader self._writer = writer self._loop = loop async def create_foo(loop): reader, writer = await asyncio.open_connection('127.0.0.1', 8888, loop=loop) return Foo(reader, writer, loop) loop = asyncio.get_event_loop() foo = loop.run_until_complete(create_foo(loop)) print(foo) loop.close() 

Wake up neo

Let's say you have a task that spins in the event loop and periodically flushes some kind of buffer. You can write this code:

 async def flush_task(): while True: # flushing... await asyncio.sleep(FLUSH_TIMEOUT) 

Make create_task () - and everything seems to be good, except for one thing: what to do if you need to force the contents of the buffer on completion? How to make tasku "wake up"? Here synchronization primitives come to the rescue:

 class Foo: def __init__(self, loop, *args, **kwargs): self._loop = loop self._waiter = asyncio.Event() self._flush_future = self._loop.create_task(self.flush_task()) async def flush_task(self): while True: try: await asyncio.wait_for(self._waiter.wait(), timeout=FLUSH_TIMEOUT, loop=self._loop) except asyncio.TimeoutError: pass # flushing ... self._waiter.clear() def force_flush(): self._waiter.set() loop = asyncio.get_event_loop() foo = Foo(loop) loop.run_forever() loop.close() 

Testing

Test asynchronous code can and should be. And it is as easy to do as in the case of synchronous code:

 class TestCase(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(None) def tearDown(self): self.loop.close() def test_001(self): async def func(): self.assertEqual(42, 42) self.loop.run_until_complete(func()) 

Tests are perfectly isolated, because each new test uses its own event loop. And you can go ahead and use pytest, where there are comfortable decorators.

sources of inspiration


First of all - personal experience. Much of the above was realized as a result of “catching a rake”, and then studying the documentation and source codes asyncio. Also excellent examples were the source codes of popular libraries, such as aiohttp, aioredis, aiopg.

Thanks to everyone who read the article to the end. Good luck with asyncio!

Source: https://habr.com/ru/post/314606/


All Articles