📜 ⬆️ ⬇️

AsyncIO for practicing python developer

I remember that moment when I thought, “How slowly does everything work, what if I parallelize calls?”, And after 3 days, looking at the code, I couldn’t understand anything in the terrible mess of streams, synchronizers and callback functions.

Then I met with asyncio , and everything changed.

If anyone does not know, asyncio is a new module for organizing competitive programming, which appeared in Python 3.4. It is designed to simplify the use of corutin and futur in asynchronous code - so that the code looks like synchronous, without callbacks.

I remember at that time there were several similar tools, and one of them stood out - this is the gevent library. I advise everyone to read the excellent gevent guide for a practicing python developer , which describes not only how to work with it, but what competition is in the general sense. I liked the article so much that I decided to use it as a template for writing an introduction to asyncio.
')
A small disclaimer is an article not gevent vs asyncio. Nathan Road has already done it for me in his article . All examples you can find on github .

I know you’re not anxious to write code, but first I would like to consider several concepts that will be useful to us in the future.

Threads, event loops, cortices and futures


Threads are the most common tool. I think you have heard of it before, but asyncio operates with slightly different concepts: event loops, cortutina and futures.


Pretty simple? Go!

Synchronous and asynchronous execution


In the video " Competition is not concurrency, it is better. " Rob Pike draws your attention to the key thing. The division of tasks into competitive subtasks is possible only with such parallelism, when it also controls these subtasks.

Asyncio does the same thing - you can break your code into procedures that you define as Korutin, which makes it possible to manage them as you wish, including simultaneous execution. The korutinas contain the yield statements, with the help of which we determine the places where you can switch to other pending tasks.

The context switch in asyncio is answered by yield, which transfers control back to the event loop, and that in turn is transferred to another corortina. Consider a basic example:

import asyncio async def foo(): print('Running in foo') await asyncio.sleep(0) print('Explicit context switch to foo again') async def bar(): print('Explicit context to bar') await asyncio.sleep(0) print('Implicit context switch back to bar') ioloop = asyncio.get_event_loop() tasks = [ioloop.create_task(foo()), ioloop.create_task(bar())] wait_tasks = asyncio.wait(tasks) ioloop.run_until_complete(wait_tasks) ioloop.close() 

 $ python3 1-sync-async-execution-asyncio-await.py Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar 

* First, we announced a couple of simplest corutin, which pretend to be non-blocking, using sleep from asyncio
* Korutiny can only be run from another korutiny, or wrapped in a task using create_task
* After we have 2 tasks, combine them using wait
* And finally, we will send for execution to the event loop via run_until_complete

Using await in any corintine, we thus declare that the coroutine can give control back to the event loop, which, in turn, will launch some of the following tasks: bar. In bar, the same thing happens: on await asyncio.sleep, the control will be transferred back to the event loop, which will return to the execution of foo at the right time.

Imagine 2 blocking tasks: gr1 and gr2, as if they are accessing some third-party services, and while they are waiting for a response, the third function can work asynchronously.

 import time import asyncio start = time.time() def tic(): return 'at %1.1f seconds' % (time.time() - start) async def gr1(): # Busy waits for a second, but we don't want to stick around... print('gr1 started work: {}'.format(tic())) await asyncio.sleep(2) print('gr1 ended work: {}'.format(tic())) async def gr2(): # Busy waits for a second, but we don't want to stick around... print('gr2 started work: {}'.format(tic())) await asyncio.sleep(2) print('gr2 Ended work: {}'.format(tic())) async def gr3(): print("Let's do some stuff while the coroutines are blocked, {}".format(tic())) await asyncio.sleep(1) print("Done!") ioloop = asyncio.get_event_loop() tasks = [ ioloop.create_task(gr1()), ioloop.create_task(gr2()), ioloop.create_task(gr3()) ] ioloop.run_until_complete(asyncio.wait(tasks)) ioloop.close() 

 $ python3 1b-cooperatively-scheduled-asyncio-await.py gr1 started work: at 0.0 seconds gr2 started work: at 0.0 seconds Lets do some stuff while the coroutines are blocked, at 0.0 seconds Done! gr1 ended work: at 2.0 seconds gr2 Ended work: at 2.0 seconds 

Notice how I work with I / O and scheduling, allowing it all to fit in one thread. While two tasks are blocked by waiting for I / O, the third function may take up all the CPU time.

Execution order


In the synchronous world, we think consistently. If we have a list of tasks that take different time to complete, they will end in the same order in which they were processed. However, in the case of competition one cannot be sure of it.

 import random from time import sleep import asyncio def task(pid): """Synchronous non-deterministic task. """ sleep(random.randint(0, 2) * 0.001) print('Task %s done' % pid) async def task_coro(pid): """Coroutine non-deterministic task """ await asyncio.sleep(random.randint(0, 2) * 0.001) print('Task %s done' % pid) def synchronous(): for i in range(1, 10): task(i) async def asynchronous(): tasks = [asyncio.ensure_future(task_coro(i)) for i in range(1, 10)] await asyncio.wait(tasks) print('Synchronous:') synchronous() ioloop = asyncio.get_event_loop() print('Asynchronous:') ioloop.run_until_complete(asynchronous()) ioloop.close() 

 $ python3 1c-determinism-sync-async-asyncio-await.py Synchronous: Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done Asynchronous: Task 2 done Task 5 done Task 6 done Task 8 done Task 9 done Task 1 done Task 4 done Task 3 done Task 7 done 

Of course, your result will be different, because each task will fall asleep for a random time, but notice that the result is completely different, although we always set tasks in the same order.

Also pay attention to quorutine for our rather simple task. It is important to understand that there is no magic in asyncio when implementing non-blocking tasks. During implementation, asyncio stood alone in the standard library, since the remaining modules provided only blocking functionality. You can use the concurrent.futures module to wrap blocking tasks into threads or processes and get futures for use in asyncio. Several such examples are available on GitHub .
This is probably the main drawback now when using asyncio, however there are already several libraries that can help solve this problem.

The most popular blocking task is retrieving data via an HTTP request. Consider working with the magnificent aiohttp library on the example of getting information about public events on GitHub.

 import time import urllib.request import asyncio import aiohttp URL = 'https://api.github.com/events' MAX_CLIENTS = 3 def fetch_sync(pid): print('Fetch sync process {} started'.format(pid)) start = time.time() response = urllib.request.urlopen(URL) datetime = response.getheader('Date') print('Process {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start)) return datetime async def fetch_async(pid): print('Fetch async process {} started'.format(pid)) start = time.time() response = await aiohttp.request('GET', URL) datetime = response.headers.get('Date') print('Process {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start)) response.close() return datetime def synchronous(): start = time.time() for i in range(1, MAX_CLIENTS + 1): fetch_sync(i) print("Process took: {:.2f} seconds".format(time.time() - start)) async def asynchronous(): start = time.time() tasks = [asyncio.ensure_future( fetch_async(i)) for i in range(1, MAX_CLIENTS + 1)] await asyncio.wait(tasks) print("Process took: {:.2f} seconds".format(time.time() - start)) print('Synchronous:') synchronous() print('Asynchronous:') ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close() 

 $ python3 1d-async-fetch-from-server-asyncio-await.py Synchronous: Fetch sync process 1 started Process 1: Wed, 17 Feb 2016 13:10:11 GMT, took: 0.54 seconds Fetch sync process 2 started Process 2: Wed, 17 Feb 2016 13:10:11 GMT, took: 0.50 seconds Fetch sync process 3 started Process 3: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.48 seconds Process took: 1.54 seconds Asynchronous: Fetch async process 1 started Fetch async process 2 started Fetch async process 3 started Process 3: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.50 seconds Process 2: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.52 seconds Process 1: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.54 seconds Process took: 0.54 seconds 

Here you should pay attention to a couple of points.

First, the difference in time - when using asynchronous calls, we run requests at the same time. As mentioned earlier, each of them passed control to the next and returned the result on completion. That is, the execution speed directly depends on the running time of the slowest query, which took just 0.54 seconds. Cool, right?

Secondly, how much code is similar to synchronous. This is essentially the same thing! The main differences are associated with the implementation of the library to execute queries, creating and waiting for completion of tasks.

Creating competitiveness


So far, we have used the only method to create and retrieve results from coruntine, create a set of tasks, and wait for them to complete. However, quicksies can be scheduled to run and get results in several ways. Imagine a situation where we need to process the results of GET requests as they are received; in fact, the implementation is very similar to the previous one:

 import time import random import asyncio import aiohttp URL = 'https://api.github.com/events' MAX_CLIENTS = 3 async def fetch_async(pid): start = time.time() sleepy_time = random.randint(2, 5) print('Fetch async process {} started, sleeping for {} seconds'.format( pid, sleepy_time)) await asyncio.sleep(sleepy_time) response = await aiohttp.request('GET', URL) datetime = response.headers.get('Date') response.close() return 'Process {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start) async def asynchronous(): start = time.time() futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)] for i, future in enumerate(asyncio.as_completed(futures)): result = await future print('{} {}'.format(">>" * (i + 1), result)) print("Process took: {:.2f} seconds".format(time.time() - start)) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close() 

 $ python3 2a-async-fetch-from-server-as-completed-asyncio-await.py Fetch async process 1 started, sleeping for 4 seconds Fetch async process 3 started, sleeping for 5 seconds Fetch async process 2 started, sleeping for 3 seconds >> Process 2: Wed, 17 Feb 2016 13:55:19 GMT, took: 3.53 seconds >>>> Process 1: Wed, 17 Feb 2016 13:55:20 GMT, took: 4.49 seconds >>>>>> Process 3: Wed, 17 Feb 2016 13:55:21 GMT, took: 5.48 seconds Process took: 5.48 seconds 

Look at the indents and timings - we run all the tasks at the same time, but they are processed in order of completion. The code in this case is slightly different: we pack the korutiny, each of which is already prepared for execution, in the list. The as_completed function returns an iterator that returns the results of quorutine as they are executed. Cool, right ?! By the way, both as_completed and wait are functions from the concurrent.futures package.

Another example is that if you want to know your IP address. There are a lot of services for this, but you do not know which of them will be available at the time of the program. Instead of sequentially polling each of the list, you can run all queries concurrently and select the first successful one.

Well, for this, in our favorite wait function, there is a special parameter return_when . Until now, we have ignored what the wait returns, because only parallelized tasks. But now we need to get the result from the coroutine, so we will use the set of done and pending futures.

 from collections import namedtuple import time import asyncio from concurrent.futures import FIRST_COMPLETED import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) response = await aiohttp.request('GET', service.url) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, pending = await asyncio.wait( futures, return_when=FIRST_COMPLETED) print(done.pop().result()) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close() 

 $ python3 2c-fetch-first-ip-address-response-await.py Fetching IP from ip-api Fetching IP from ipify ip-api finished with result: 82.34.76.170, took: 0.09 seconds Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x10f95c6d8> Task was destroyed but it is pending! task: <Task pending coro=<fetch_ip() running at 2c-fetch-first-ip-address-response.py:20> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(10)(), Task._wakeup()]>> 

What happened? The first service responded successfully, but there is some warning in the logs!

In fact, we started the execution of two tasks, but left the cycle after the first result, while the second quorum was still being executed. Asyncio thought it was a bug and warned us. Probably worth tidying up for themselves and obviously kill unnecessary tasks. How? Glad you asked.

Futur states



Everything is so simple. When the futur is in the done state, you can get the result of the execution. In the pending and running states, such an operation will result in an InvalidStateError exception, and in the case of canelled it will be canceledError, and finally, if the exception occurred in the corortine itself, it will be generated again (as was done when the exception was called). But don't take my word for it .

You can find out the status of the futures using the done , canceled or running methods, but do not forget that in the case of done, the result call can return both the expected result and the exception that occurred during the work. There is a cancel method for canceling the futures. This is suitable for correcting our example.

 from collections import namedtuple import time import asyncio from concurrent.futures import FIRST_COMPLETED import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) response = await aiohttp.request('GET', service.url) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, pending = await asyncio.wait( futures, return_when=FIRST_COMPLETED) print(done.pop().result()) for future in pending: future.cancel() ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close() 

 $ python3 2c-fetch-first-ip-address-response-no-warning-await.py Fetching IP from ipify Fetching IP from ip-api ip-api finished with result: 82.34.76.170, took: 0.08 seconds 

Simple and accurate conclusion - just what I love!

If you need some additional logic for processing futures, then you can connect callbacks that will be called upon going to the done state. This can be useful for tests when some results need to be redefined with some values ​​of their own.

Exception Handling


asyncio is all about writing a managed and readable competitive code, which is clearly visible when handling exceptions. Let's go back to the example to demonstrate.
Suppose we want to make sure that all requests for services by definition IP returned the same result. However, one of them may be offline and not respond to us. Just use try ... except as usual:

 from collections import namedtuple import time import asyncio import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query'), Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) try: response = await aiohttp.request('GET', service.url) except: return '{} is unresponsive'.format(service.name) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, _ = await asyncio.wait(futures) for future in done: print(future.result()) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close() 

 $ python3 3a-fetch-ip-addresses-fail-await.py Fetching IP from ip-api Fetching IP from borken Fetching IP from ipify ip-api finished with result: 85.133.69.250, took: 0.75 seconds ipify finished with result: 85.133.69.250, took: 1.37 seconds borken is unresponsive 

We can also handle the exception that occurred during the execution of the cortina:

 from collections import namedtuple import time import asyncio import aiohttp import traceback Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'), Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) try: response = await aiohttp.request('GET', service.url) except: return '{} is unresponsive'.format(service.name) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, _ = await asyncio.wait(futures) for future in done: try: print(future.result()) except: print("Unexpected error: {}".format(traceback.format_exc())) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close() 

 $ python3 3b-fetch-ip-addresses-future-exceptions-await.py Fetching IP from ipify Fetching IP from borken Fetching IP from ip-api ipify finished with result: 85.133.69.250, took: 0.91 seconds borken is unresponsive Unexpected error: Traceback (most recent call last): File “3b-fetch-ip-addresses-future-exceptions.py”, line 39, in asynchronous print(future.result()) File “3b-fetch-ip-addresses-future-exceptions.py”, line 26, in fetch_ip ip = json_response[service.ip_attr] KeyError: 'this-is-not-an-attr' 

Just like launching a task without waiting for its completion is an error, so getting unknown exceptions leaves its traces in the output:

 from collections import namedtuple import time import asyncio import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'), Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) try: response = await aiohttp.request('GET', service.url) except: print('{} is unresponsive'.format(service.name)) else: json_response = await response.json() ip = json_response[service.ip_attr] response.close() print('{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start)) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] await asyncio.wait(futures) # intentionally ignore results ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close() 

 $ python3 3c-fetch-ip-addresses-ignore-exceptions-await.py Fetching IP from ipify Fetching IP from borken Fetching IP from ip-api borken is unresponsive ipify finished with result: 85.133.69.250, took: 0.78 seconds Task exception was never retrieved future: <Task finished coro=<fetch_ip() done, defined at 3c-fetch-ip-addresses-ignore-exceptions.py:15> exception=KeyError('this-is-not-an-attr',)> Traceback (most recent call last): File “3c-fetch-ip-addresses-ignore-exceptions.py”, line 25, in fetch_ip ip = json_response[service.ip_attr] KeyError: 'this-is-not-an-attr' 

The output looks the same as in the previous example with the exception of the reproachful message from asyncio.

Timeouts


But what if the information about our IP is not so important? This can be a good addition to some composite answer, in which this part will be optional. In this case, we will not keep the user waiting. Ideally, we would set a timeout for calculating the IP, after which, in any case, give the answer to the user, even without this information.

Again wait has a suitable argument:

 import time import random import asyncio import aiohttp import argparse from collections import namedtuple from concurrent.futures import FIRST_COMPLETED Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query'), ) DEFAULT_TIMEOUT = 0.01 async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) await asyncio.sleep(random.randint(1, 3) * 0.1) try: response = await aiohttp.request('GET', service.url) except: return '{} is unresponsive'.format(service.name) json_response = await response.json() ip = json_response[service.ip_attr] response.close() print('{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start)) return ip async def asynchronous(timeout): response = { "message": "Result from asynchronous.", "ip": "not available" } futures = [fetch_ip(service) for service in SERVICES] done, pending = await asyncio.wait( futures, timeout=timeout, return_when=FIRST_COMPLETED) for future in pending: future.cancel() for future in done: response["ip"] = future.result() print(response) parser = argparse.ArgumentParser() parser.add_argument( '-t', '--timeout', help='Timeout to use, defaults to {}'.format(DEFAULT_TIMEOUT), default=DEFAULT_TIMEOUT, type=float) args = parser.parse_args() print("Using a {} timeout".format(args.timeout)) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous(args.timeout)) ioloop.close() 


I also added the timeout argument to the script startup line to check what happens if the requests have time to be processed. I also added random delays to prevent the script from ending too quickly, and it was time to figure out exactly how it works.

 $ python 4a-timeout-with-wait-kwarg-await.py Using a 0.01 timeout Fetching IP from ipify Fetching IP from ip-api {'message': 'Result from asynchronous.', 'ip': 'not available'} 

 $ python 4a-timeout-with-wait-kwarg-await.py -t 5 Using a 5.0 timeout Fetching IP from ip-api Fetching IP from ipify ipify finished with result: 82.34.76.170, took: 1.24 seconds {'ip': '82.34.76.170', 'message': 'Result from asynchronous.'} 

Conclusion


Asyncio has strengthened my already great love for python. To be honest, I fell in love with coroutines, even when I met them in Tornado, but asyncio managed to take all the best from him and other libraries in the implementation of competitiveness. And so much so that special efforts were made so that they could use the main I / O cycle. So if you use Tornado or Twisted , you can connect the code intended for asyncio!

As I already mentioned, the main problem is that standard libraries do not yet support non-blocking behavior. Many popular libraries also work so far only in a synchronous style, and those that use competition are still young and experimental. However, their number is growing .

I hope in this tutorial I showed how pleasant it is to work with asyncio, and this technology will encourage you to switch to python 3 if you are stuck on python 2.7 for some reason. One thing is for sure - the future of Python has completely changed.

From the translator:
The original article was published on February 20, 2016, during which time a lot has happened. Python 3.6 was released, in which, in addition to optimizations, asyncio work was improved, the API was transferred to a stable state. Libraries for working with Postgres, Redis, Elasticsearch, etc. were released in non-blocking mode. Even the new framework is Sanic, which resembles Flask, but works asynchronously. In the end, even the event loop was optimized and rewritten in Cython, which happened 2 times faster. So I see no reason to ignore this technology!

Source: https://habr.com/ru/post/337420/


All Articles