import asyncio async def foo(): print('Running in foo') await asyncio.sleep(0) print('Explicit context switch to foo again') async def bar(): print('Explicit context to bar') await asyncio.sleep(0) print('Implicit context switch back to bar') ioloop = asyncio.get_event_loop() tasks = [ioloop.create_task(foo()), ioloop.create_task(bar())] wait_tasks = asyncio.wait(tasks) ioloop.run_until_complete(wait_tasks) ioloop.close()
$ python3 1-sync-async-execution-asyncio-await.py Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar
import time import asyncio start = time.time() def tic(): return 'at %1.1f seconds' % (time.time() - start) async def gr1(): # Busy waits for a second, but we don't want to stick around... print('gr1 started work: {}'.format(tic())) await asyncio.sleep(2) print('gr1 ended work: {}'.format(tic())) async def gr2(): # Busy waits for a second, but we don't want to stick around... print('gr2 started work: {}'.format(tic())) await asyncio.sleep(2) print('gr2 Ended work: {}'.format(tic())) async def gr3(): print("Let's do some stuff while the coroutines are blocked, {}".format(tic())) await asyncio.sleep(1) print("Done!") ioloop = asyncio.get_event_loop() tasks = [ ioloop.create_task(gr1()), ioloop.create_task(gr2()), ioloop.create_task(gr3()) ] ioloop.run_until_complete(asyncio.wait(tasks)) ioloop.close()
$ python3 1b-cooperatively-scheduled-asyncio-await.py gr1 started work: at 0.0 seconds gr2 started work: at 0.0 seconds Lets do some stuff while the coroutines are blocked, at 0.0 seconds Done! gr1 ended work: at 2.0 seconds gr2 Ended work: at 2.0 seconds
import random from time import sleep import asyncio def task(pid): """Synchronous non-deterministic task. """ sleep(random.randint(0, 2) * 0.001) print('Task %s done' % pid) async def task_coro(pid): """Coroutine non-deterministic task """ await asyncio.sleep(random.randint(0, 2) * 0.001) print('Task %s done' % pid) def synchronous(): for i in range(1, 10): task(i) async def asynchronous(): tasks = [asyncio.ensure_future(task_coro(i)) for i in range(1, 10)] await asyncio.wait(tasks) print('Synchronous:') synchronous() ioloop = asyncio.get_event_loop() print('Asynchronous:') ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 1c-determinism-sync-async-asyncio-await.py Synchronous: Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done Asynchronous: Task 2 done Task 5 done Task 6 done Task 8 done Task 9 done Task 1 done Task 4 done Task 3 done Task 7 done
import time import urllib.request import asyncio import aiohttp URL = 'https://api.github.com/events' MAX_CLIENTS = 3 def fetch_sync(pid): print('Fetch sync process {} started'.format(pid)) start = time.time() response = urllib.request.urlopen(URL) datetime = response.getheader('Date') print('Process {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start)) return datetime async def fetch_async(pid): print('Fetch async process {} started'.format(pid)) start = time.time() response = await aiohttp.request('GET', URL) datetime = response.headers.get('Date') print('Process {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start)) response.close() return datetime def synchronous(): start = time.time() for i in range(1, MAX_CLIENTS + 1): fetch_sync(i) print("Process took: {:.2f} seconds".format(time.time() - start)) async def asynchronous(): start = time.time() tasks = [asyncio.ensure_future( fetch_async(i)) for i in range(1, MAX_CLIENTS + 1)] await asyncio.wait(tasks) print("Process took: {:.2f} seconds".format(time.time() - start)) print('Synchronous:') synchronous() print('Asynchronous:') ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 1d-async-fetch-from-server-asyncio-await.py Synchronous: Fetch sync process 1 started Process 1: Wed, 17 Feb 2016 13:10:11 GMT, took: 0.54 seconds Fetch sync process 2 started Process 2: Wed, 17 Feb 2016 13:10:11 GMT, took: 0.50 seconds Fetch sync process 3 started Process 3: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.48 seconds Process took: 1.54 seconds Asynchronous: Fetch async process 1 started Fetch async process 2 started Fetch async process 3 started Process 3: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.50 seconds Process 2: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.52 seconds Process 1: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.54 seconds Process took: 0.54 seconds
import time import random import asyncio import aiohttp URL = 'https://api.github.com/events' MAX_CLIENTS = 3 async def fetch_async(pid): start = time.time() sleepy_time = random.randint(2, 5) print('Fetch async process {} started, sleeping for {} seconds'.format( pid, sleepy_time)) await asyncio.sleep(sleepy_time) response = await aiohttp.request('GET', URL) datetime = response.headers.get('Date') response.close() return 'Process {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start) async def asynchronous(): start = time.time() futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)] for i, future in enumerate(asyncio.as_completed(futures)): result = await future print('{} {}'.format(">>" * (i + 1), result)) print("Process took: {:.2f} seconds".format(time.time() - start)) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 2a-async-fetch-from-server-as-completed-asyncio-await.py Fetch async process 1 started, sleeping for 4 seconds Fetch async process 3 started, sleeping for 5 seconds Fetch async process 2 started, sleeping for 3 seconds >> Process 2: Wed, 17 Feb 2016 13:55:19 GMT, took: 3.53 seconds >>>> Process 1: Wed, 17 Feb 2016 13:55:20 GMT, took: 4.49 seconds >>>>>> Process 3: Wed, 17 Feb 2016 13:55:21 GMT, took: 5.48 seconds Process took: 5.48 seconds
from collections import namedtuple import time import asyncio from concurrent.futures import FIRST_COMPLETED import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) response = await aiohttp.request('GET', service.url) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, pending = await asyncio.wait( futures, return_when=FIRST_COMPLETED) print(done.pop().result()) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 2c-fetch-first-ip-address-response-await.py Fetching IP from ip-api Fetching IP from ipify ip-api finished with result: 82.34.76.170, took: 0.09 seconds Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x10f95c6d8> Task was destroyed but it is pending! task: <Task pending coro=<fetch_ip() running at 2c-fetch-first-ip-address-response.py:20> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(10)(), Task._wakeup()]>>
from collections import namedtuple import time import asyncio from concurrent.futures import FIRST_COMPLETED import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) response = await aiohttp.request('GET', service.url) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, pending = await asyncio.wait( futures, return_when=FIRST_COMPLETED) print(done.pop().result()) for future in pending: future.cancel() ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 2c-fetch-first-ip-address-response-no-warning-await.py Fetching IP from ipify Fetching IP from ip-api ip-api finished with result: 82.34.76.170, took: 0.08 seconds
from collections import namedtuple import time import asyncio import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query'), Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) try: response = await aiohttp.request('GET', service.url) except: return '{} is unresponsive'.format(service.name) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, _ = await asyncio.wait(futures) for future in done: print(future.result()) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 3a-fetch-ip-addresses-fail-await.py Fetching IP from ip-api Fetching IP from borken Fetching IP from ipify ip-api finished with result: 85.133.69.250, took: 0.75 seconds ipify finished with result: 85.133.69.250, took: 1.37 seconds borken is unresponsive
from collections import namedtuple import time import asyncio import aiohttp import traceback Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'), Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) try: response = await aiohttp.request('GET', service.url) except: return '{} is unresponsive'.format(service.name) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, _ = await asyncio.wait(futures) for future in done: try: print(future.result()) except: print("Unexpected error: {}".format(traceback.format_exc())) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 3b-fetch-ip-addresses-future-exceptions-await.py Fetching IP from ipify Fetching IP from borken Fetching IP from ip-api ipify finished with result: 85.133.69.250, took: 0.91 seconds borken is unresponsive Unexpected error: Traceback (most recent call last): File “3b-fetch-ip-addresses-future-exceptions.py”, line 39, in asynchronous print(future.result()) File “3b-fetch-ip-addresses-future-exceptions.py”, line 26, in fetch_ip ip = json_response[service.ip_attr] KeyError: 'this-is-not-an-attr'
from collections import namedtuple import time import asyncio import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'), Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) try: response = await aiohttp.request('GET', service.url) except: print('{} is unresponsive'.format(service.name)) else: json_response = await response.json() ip = json_response[service.ip_attr] response.close() print('{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start)) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] await asyncio.wait(futures) # intentionally ignore results ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 3c-fetch-ip-addresses-ignore-exceptions-await.py Fetching IP from ipify Fetching IP from borken Fetching IP from ip-api borken is unresponsive ipify finished with result: 85.133.69.250, took: 0.78 seconds Task exception was never retrieved future: <Task finished coro=<fetch_ip() done, defined at 3c-fetch-ip-addresses-ignore-exceptions.py:15> exception=KeyError('this-is-not-an-attr',)> Traceback (most recent call last): File “3c-fetch-ip-addresses-ignore-exceptions.py”, line 25, in fetch_ip ip = json_response[service.ip_attr] KeyError: 'this-is-not-an-attr'
import time import random import asyncio import aiohttp import argparse from collections import namedtuple from concurrent.futures import FIRST_COMPLETED Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query'), ) DEFAULT_TIMEOUT = 0.01 async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) await asyncio.sleep(random.randint(1, 3) * 0.1) try: response = await aiohttp.request('GET', service.url) except: return '{} is unresponsive'.format(service.name) json_response = await response.json() ip = json_response[service.ip_attr] response.close() print('{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start)) return ip async def asynchronous(timeout): response = { "message": "Result from asynchronous.", "ip": "not available" } futures = [fetch_ip(service) for service in SERVICES] done, pending = await asyncio.wait( futures, timeout=timeout, return_when=FIRST_COMPLETED) for future in pending: future.cancel() for future in done: response["ip"] = future.result() print(response) parser = argparse.ArgumentParser() parser.add_argument( '-t', '--timeout', help='Timeout to use, defaults to {}'.format(DEFAULT_TIMEOUT), default=DEFAULT_TIMEOUT, type=float) args = parser.parse_args() print("Using a {} timeout".format(args.timeout)) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous(args.timeout)) ioloop.close()
$ python 4a-timeout-with-wait-kwarg-await.py Using a 0.01 timeout Fetching IP from ipify Fetching IP from ip-api {'message': 'Result from asynchronous.', 'ip': 'not available'}
$ python 4a-timeout-with-wait-kwarg-await.py -t 5 Using a 5.0 timeout Fetching IP from ip-api Fetching IP from ipify ipify finished with result: 82.34.76.170, took: 1.24 seconds {'ip': '82.34.76.170', 'message': 'Result from asynchronous.'}
Source: https://habr.com/ru/post/337420/
All Articles