📜 ⬆️ ⬇️

Patterns corinne asyncio: outside of await

Translator's Preface:
Once again, stepping on a rake when working with python asyncio, I went to the Internet to find something more pleasant than dry documentation. I came across an article by Yeray Diaz "Asyncio Coroutine Patterns: Beyond await" , in which the author is very fascinating about the use of asyncio and shares some of the techniques. Since I did not find anything of the same kind in Russian, I decided to translate it.


Asyncio is a python programmer's competitive dream: you write code that borders synchronous, and you allow Python to do the rest. This is another import of the anti-gravity library: import antigravity


In fact, this is not at all the case; competitive programming is hard work and, as long as the corutories allow us to avoid the hell of callbacks, which can lead you far enough, you still need to think about creating tasks, getting results and elegantly intercepting exceptions. Sadly


The good news is that all of this is possible in asyncio. The bad news is that it is not always immediately obvious what is wrong and how to fix it. Below are a few patterns that I found while working with asyncio.



Before we begin:


I used my favorite aiohttp library to perform asynchronous HTTP requests and the Hacker News API, because this is a simple and well-known site that follows a familiar usage scenario. Following the response to my previous article , I also used the async / await syntax introduced in Python 3.5. I assumed that the reader is familiar with the ideas that are described here. And ultimately, all the examples are available in the GitHub repository of this article .


Ok, let's get started!


Recursive Cortina


Creating and running tasks is trivial in asyncio. For such tasks, the API includes several methods in the AbstractEventLoop class, as well as functions in the library. But usually you want to combine the results from these tasks and process them in some way, and recursion is an excellent example of this scheme, and also demonstrates the simplicity of quorutine compared to other means of competitiveness.


A common case for using asyncio is to create a web-crawler of some kind. Imagine that we are just too busy to check HackerNews, or maybe you just like a good holivar, so you want to implement a system that extracts the number of comments for a particular post HN and, if it is above the threshold, notifies you. You googled a bit and found the documentation on the HN API, just what you need, but you noticed the following in the documentation:


Want to know the total number of article comments? Go around the tree and count them.

Challenge accepted!


"""          ,       ,      . ,         Hacker News      """ import asyncio import argparse import logging from urllib.parse import urlparse, parse_qs from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the comments of a Hacker News post.') parser.add_argument('--id', type=int, default=8863, help='ID of the post in HN, defaults to 8863') parser.add_argument('--url', type=str, help='URL of a post in HN') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """          . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response: #  .   return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       log.debug('{:^6} > Fetching {} child posts'.format( post_id, number_of_comments)) tasks = [post_number_of_comments(loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments def id_from_HN_url(url): """   `id` URL       None. """ parse_result = urlparse(url) try: return parse_qs(parse_result.query)['id'][0] except (KeyError, IndexError): return None if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) loop.close() 

Feel free to try running the script with the “—verbose” flag for more detailed output.


  [14:47:32] > Calculating comments took 2.23 seconds and 73 fetches [14:47:32] -- Post 8863 has 72 comments 

Let's skip the template code and go straight to the recursive corortina. Note that this code is read almost completely as it would be in the case of synchronous code.


 async def post_number_of_comments(loop, session, post_id): """          . """ url = URL_TEMPLATE.format(post_id) now = datetime.now() response = await fetch(session, url) log.debug('{:^6} > Fetching of {} took {} seconds'.format( post_id, url, (datetime.now() - now).total_seconds())) if 'kids' not in response: # base case, there are no comments return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       log.debug('{:^6} > Fetching {} child posts'.format( post_id, number_of_comments)) tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #          number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments 

  1. First we get JSON with post data.
  2. Recursively bypass each of the heirs.
  3. Finally we reach the base case and return zero,
    when the post has no responses.
  4. When returning from the base case, add the answers to the current post
    to the number of heirs and refund

This is a great example of what Brett Slatkin describes as a fan-in and a fan-out , we are a fan-out to get data from the heirs and the fan-in reduce the data to calculate the number of comments


The asyncio API has a couple of ways to perform these fan-out operations. Here I use the gather function, which effectively waits until all the corutines are executed and return a list of their results.


Notice how the use of corortina also fits well with recursion at any one point where any number of coroutines are present, waiting for answers to their queries during the call to the gather function and resuming execution after the I / O operation completes. This allows us to express rather complex behavior in one elegant and (easily) readable coruntine.


"Very simple" - you say? All right, let's go up a notch.


Shot and forget


Imagine that you want to send yourself an e-mail message with posts that have more comments than a specific value, and you want to do this in the same way as we went around the post tree. We can simply add the expression "if" to the end of the recursive function to achieve this:


 async def post_number_of_comments(loop, session, post_id): """           . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: #  .   return 0 #       . number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments(loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) #        if number_of_comments > MIN_COMMENTS: await log_post(response) return number_of_comments async def log_post(post): """   . """ await asyncio.sleep(random() * 3) log.info("Post logged") 

Yes, I used asyncio.sleep. This is the last time. I promise.


 [09:41:02] Post logged [09:41:04] Post logged [09:41:06] Post logged [09:41:06] > Calculating comments took 6.35 seconds and 73 fetches [09:41:06] -- Post 8863 has 72 comments 

This is significantly slower than before!
The reason is that, as we discussed earlier, await suspends the execution of coroutine until the future is fulfilled, but since we do not need the result of logging, there is no real reason to do so.


We need to "shoot and forget" with our cororlina, and since we cannot wait for it to complete using await, we need another way to start the execution of cororutine without her waiting. Quickly looking at the asyncio API, we’ll find the function ensure_future , which will schedule the launch of the coroutine, wrap it in the Task object and return it. Remembering that earlier korutina was planned, the cycle of events will monitor the result of our korutina’s work at some point in the future, when another korutina will be in a state of waiting.


Great, let's replace await log_post with the following:


 async def post_number_of_comments(loop, session, post_id): """           . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 #       . number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) #        if number_of_comments > MIN_COMMENTS: asyncio.ensure_future(log_post(response)) return number_of_comments 

 [09:42:57] > Calculating comments took 1.69 seconds and 73 fetches [09:42:57] -- Post 8863 has 72 comments [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1109197f8>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919948>()]>> [09:42:57] Task was destroyed but it is pending! task: <Task pending coro=<log_post() done, defined at 02_fire_and_forget.py:82> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x110919978>()]>> 

Hmm, frightening Task was destroyed! haunting asyncio users worldwide. The good news is that we returned to the time we received earlier (1.69 p.), The bad news is that asyncio does not like going beyond the limits of "shot-and-forget".


The problem is that we forcibly close the event loop after we get the result of the work of post_number_of_comments corute , without leaving our task log_post time to complete.


We have two options:
we either allow the event loop to run indefinitely using run_forever and manually terminate the script, or we use the task class all_tasks method to find all the running tasks and wait for the calculation of the number of comments to end.


Let's try to get out of this situation by quickly making changes after our call to post_number_of_comments :


 if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [ task for task in asyncio.Task.all_tasks() if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close() 

 [09:47:29] > Calculating comments took 1.72 seconds and 73 fetches [09:47:29] — Post 8863 has 72 comments [09:47:30] Post logged [09:47:31] Post logged [09:47:32] Post logged 

Now we are sure that the logging tasks are completed.
Assuming that the all_tasks method works fine in the cases we are dealing with is a great idea when tasks are performed appropriately in our event loop, but in more complex cases there can be any number of running tasks whose source may be outside of our code .


Another approach is to restore order after we independently register absolutely all of the Korutins that we planned to launch and allow them to be executed, set aside earlier,
as soon as the counting of comments is complete. As you know, the ensure_future function returns a Task object. We can use this to register our tasks with low priority. Let's just define the task_registry list and store futures in it:


 async def post_number_of_comments(loop, session, post_id): """Retrieve data for current post and recursively for all comments. """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 # calculate this post's comments as number of comments number_of_comments = len(response['kids']) # create recursive tasks for all comments tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] # schedule the tasks and retrieve results results = await asyncio.gather(*tasks) # reduce the descendents comments and add it to this post's number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) # Log if number of comments is over a threshold if number_of_comments > MIN_COMMENTS: # Add the future to the registry task_registry.append(asyncio.ensure_future(log_post(response))) return number_of_comments # (... ommitted code ...) # if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) post_id = id_from_HN_url(args.url) if args.url else args.id loop = asyncio.get_event_loop() task_registry = [] # define our task registry with aiohttp.ClientSession(loop=loop) as session: now = datetime.now() comments = loop.run_until_complete( post_number_of_comments(loop, session, post_id)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("-- Post {} has {} comments".format(post_id, comments)) pending_tasks = [task for task in task_registry if not task.done()] loop.run_until_complete(asyncio.gather(*pending_tasks)) loop.close() 

 [09:53:46] > Calculating comments took 1.68 seconds and 73 fetches [09:53:46] — Post 8863 has 72 comments [09:53:46] Post logged [09:53:48] Post logged [09:53:49] Post logged 

We learn the next lesson - asyncio should not be viewed as a distributed queue of tasks like Celery . All tasks run in one thread and the event cycle must be managed accordingly, allowing you to allocate time to complete tasks.


Which leads to another generally accepted pattern:


Periodically launched Cortina


Continuing with our example of HN (and we did a great job earlier), we decided
It is crucial to count the number of comments on the HN publication as they become available and while they are on the list of the last 5 entries.


A quick look at the HN API shows the end point, which returns the last 500 entries. Great, so we can just poll this endpoint for new publications and calculate the number of comments to them, say every five seconds.


Well, since we now turn to periodic polling, we can simply use an infinite while loop, wait for the polling task to complete (call await ), and fall asleep (call sleep ) for the required amount of time. I made a few minor changes to get top entries instead of contacting the post URL directly.


 """ An example of periodically scheduling coroutines using an infinite loop of awaiting and sleeping. """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5,help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) fetch_counter = 0 async def fetch(session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ global fetch_counter with async_timeout.timeout(FETCH_TIMEOUT): fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, post_id): """          . """ url = URL_TEMPLATE.format(post_id) response = await fetch(session, url) if 'kids' not in response: # base case, there are no comments return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments( loop, session, kid_id) for kid_id in response['kids']] #      results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments async def get_comments_of_top_stories(loop, session, limit, iteration): """     HN. """ response = await fetch(session, TOP_STORIES_URL) tasks = [post_number_of_comments(loop, session, post_id) for post_id in response[:limit]] results = await asyncio.gather(*tasks) for post_id, num_comments in zip(response[:limit], results): log.info("Post {} has {} comments ({})".format(post_id, num_comments, iteration)) async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format(limit, iteration)) await get_comments_of_top_stories(loop, session, limit, iteration) log.info('> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: loop.run_until_complete( poll_top_stories_for_comments( loop, session, args.period, args.limit)) loop.close() 

 [10:14:03] Calculating comments for top 5 stories. (1) [10:14:06] Post 13848196 has 31 comments (1) [10:14:06] Post 13849430 has 37 comments (1) [10:14:06] Post 13849037 has 15 comments (1) [10:14:06] Post 13845337 has 128 comments (1) [10:14:06] Post 13847465 has 27 comments (1) [10:14:06] > Calculating comments took 2.96 seconds and 244 fetches [10:14:06] Waiting for 5 seconds… [10:14:11] Calculating comments for top 5 stories. (2) [10:14:14] Post 13848196 has 31 comments (2) [10:14:14] Post 13849430 has 37 comments (2) [10:14:14] Post 13849037 has 15 comments (2) [10:14:14] Post 13845337 has 128 comments (2) [10:14:14] Post 13847465 has 27 comments (2) [10:14:14] > Calculating comments took 3.04 seconds and 244 fetches [10:14:14] Waiting for 5 seconds… 

Great, but there is a minor problem: if you pay attention to the time stamp,
then the task does not start strictly every 5 seconds, it starts 5 seconds after the execution of get_comments_of_top_stories is completed . Again, the consequences of using await and blocking until we get our results back.
These features do not create problems when the task takes more time than five seconds. Also, it seems erroneous to use _run_until complete when cortina is designed as infinite.


The good news is that now we are experts on ensure_future , and we can just cram it into code instead of using await ...


 async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ global fetch_counter iteration = 1 while True: now = datetime.now() log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_counter)) log.info("Waiting for {} seconds...".format(period)) iteration += 1 fetch_counter = 0 await asyncio.sleep(period) 

  [10:55:40] Calculating comments for top 5 stories. (1) [10:55:40] > Calculating comments took 0.00 seconds and 0 fetches [10:55:40] Waiting for 5 seconds… [10:55:43] Post 13848196 has 32 comments (1) [10:55:43] Post 13849430 has 48 comments (1) [10:55:43] Post 13849037 has 16 comments (1) [10:55:43] Post 13845337 has 129 comments (1) [10:55:43] Post 13847465 has 29 comments (1) [10:55:45] Calculating comments for top 5 stories. (2) [10:55:45] > Calculating comments took 0.00 seconds and 260 fetches [10:55:45] Waiting for 5 seconds… [10:55:48] Post 13848196 has 32 comments (2) [10:55:48] Post 13849430 has 48 comments (2) [10:55:48] Post 13849037 has 16 comments (2) [10:55:48] Post 13845337 has 129 comments (2) [10:55:48] Post 13847465 has 29 comments (2) 

Hmm ... Well, the good news is that the time stamp is exactly five seconds, but what is 0.00 seconds and no samples? And then the next iteration takes zero seconds and 260 samples?


This is one of the consequences of avoiding await, now we no longer block coruntine and just go to the next line, which prints zero seconds and, for the first time, zero messages retrieved. These are fairly small tasks, since we can live without messages, but what if we need the results of the tasks?


Then, my friend, we need to resort to ... callbacks (we shrink ((()


I know, I know, the whole point of Korutin is to avoid callbacks, but this is because the dramatic subtitle of the article is “Outside of await”. We are no longer on the territory of await , we have adventures with manual launch of tasks which leads to our use case. What does this give you? spoiler


As we discussed earlier, ensure_future returns a Future object to which we can add a callback using the _add_done callback .


Before we do this, and in order to have a correct fetches count, we arrive at the fact that we must encapsulate our extraction quotient in the URLFetcher class. In this case, we create an instance for each task so that we have the correct sample count. We also delete a global variable that still contributed a bug:


 """               ensure_future   sleep.      future,    ensure_future,         ,    URLFetcher   . """ import asyncio import argparse import logging from datetime import datetime import aiohttp import async_timeout LOGGER_FORMAT = '%(asctime)s %(message)s' URL_TEMPLATE = "https://hacker-news.firebaseio.com/v0/item/{}.json" TOP_STORIES_URL = "https://hacker-news.firebaseio.com/v0/topstories.json" FETCH_TIMEOUT = 10 parser = argparse.ArgumentParser(description='Calculate the number of comments of the top stories in HN.') parser.add_argument('--period', type=int, default=5, help='Number of seconds between poll') parser.add_argument('--limit', type=int, default=5, help='Number of new stories to calculate comments for') parser.add_argument('--verbose', action='store_true', help='Detailed output') logging.basicConfig(format=LOGGER_FORMAT, datefmt='[%H:%M:%S]') log = logging.getLogger() log.setLevel(logging.INFO) class URLFetcher(): """   URL     """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 async with session.get(url) as response: return await response.json() async def post_number_of_comments(loop, session, fetcher, post_id): """          . """ url = URL_TEMPLATE.format(post_id) response = await fetcher.fetch(session, url) #  .  . if response is None or 'kids' not in response: return 0 #     ,    (  ) number_of_comments = len(response['kids']) #       tasks = [post_number_of_comments( loop, session, fetcher, kid_id) for kid_id in response['kids']] # s     results = await asyncio.gather(*tasks) #            number_of_comments += sum(results) log.debug('{:^6} > {} comments'.format(post_id, number_of_comments)) return number_of_comments async def get_comments_of_top_stories(loop, session, limit, iteration): """    HN. """ fetcher = URLFetcher() # create a new fetcher for this task response = await fetcher.fetch(session, TOP_STORIES_URL) tasks = [post_number_of_comments( loop, session, fetcher, post_id) for post_id in response[:limit]] results = await asyncio.gather(*tasks) for post_id, num_comments in zip(response[:limit], results): log.info("Post {} has {} comments ({})".format( post_id, num_comments, iteration)) return fetcher.fetch_counter # return the fetch count async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format(limit, iteration)) future = asyncio.ensure_future(get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info('> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: loop.run_until_complete( poll_top_stories_for_comments( loop, session, args.period, args.limit)) loop.close() 

  [12:23:40] Calculating comments for top 5 stories. (1) [12:23:40] Waiting for 5 seconds... [12:23:43] Post 13848196 has 38 comments (1) [12:23:43] Post 13849430 has 72 comments (1) [12:23:43] Post 13849037 has 19 comments (1) [12:23:43] Post 13848283 has 64 comments (1) [12:23:43] Post 13847465 has 34 comments (1) [12:23:43] > Calculating comments took 3.17 seconds and 233 fetches [12:23:45] Calculating comments for top 5 stories. (2) [12:23:45] Waiting for 5 seconds... [12:23:47] Post 13848196 has 38 comments (2) [12:23:47] Post 13849430 has 72 comments (2) [12:23:47] Post 13849037 has 19 comments (2) [12:23:47] Post 13848283 has 64 comments (2) [12:23:47] Post 13847465 has 34 comments (2) [12:23:47] > Calculating comments took 2.47 seconds and 233 fetches [12:23:50] Calculating comments for top 5 stories. (3) [12:23:50] Waiting for 5 seconds... 

, , callback:


 async def poll_top_stories_for_comments(loop, session, period, limit): """         . """ iteration = 1 while True: log.info("Calculating comments for top {} stories. ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 await asyncio.sleep(period) 

, callback , future. (fetch) URLFetcher _get_comments_of_top stories future.


Do you see? , , await .


callback-, API asyncio AbstractBaseLoop _call later _call at ,
- . , , poll_top_stories_for_comments :


 def poll_top_stories_for_comments(loop, session, period, limit, iteration=0): """     get_comments_of_top_stories. """ log.info("Calculating comments for top {} stories ({})".format( limit, iteration)) future = asyncio.ensure_future( get_comments_of_top_stories(loop, session, limit, iteration)) now = datetime.now() def callback(fut): fetch_count = fut.result() log.info( '> Calculating comments took {:.2f} seconds and {} fetches'.format( (datetime.now() - now).total_seconds(), fetch_count)) future.add_done_callback(callback) log.info("Waiting for {} seconds...".format(period)) iteration += 1 loop.call_later( period, partial( # or call_at(loop.time() + period) poll_top_stories_for_comments, loop, session, period, limit, iteration ) ) if __name__ == '__main__': args = parser.parse_args() if args.verbose: log.setLevel(logging.DEBUG) loop = asyncio.get_event_loop() with aiohttp.ClientSession(loop=loop) as session: poll_top_stories_for_comments( loop, session, args.period, args.limit) loop.run_forever() loop.close() 

. :



, , , — ? ? URL:


 MAXIMUM_FETCHES = 5 class URLFetcher(): """   URL     """ def __init__(self): self.fetch_counter = 0 async def fetch(self, session, url): """  URL   aiohttp,    JSON .     aiohttp    . """ with async_timeout.timeout(FETCH_TIMEOUT): self.fetch_counter += 1 if self.fetch_counter > MAXIMUM_FETCHES: raise Exception('BOOM!') async with session.get(url) as response: return await response.json() 

  [12:51:00] Calculating comments for top 5 stories. (1) [12:51:00] Waiting for 5 seconds… [12:51:01] Exception in callback poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121 handle: <Handle poll_top_stories_for_comments.<locals>.callback(<Task finishe…ion('BOOM!',)>) at 05_periodic_coroutines.py:121> Traceback (most recent call last): File “/Users/yeray/.pyenv/versions/3.6.0/lib/python3.6/asyncio/events.py”, line 126, in _run self._callback(*self._args) File “05_periodic_coroutines.py”, line 122, in callback fetch_count = fut.result() File “05_periodic_coroutines.py”, line 100, in get_comments_of_top_stories results = await asyncio.gather(*tasks) File “05_periodic_coroutines.py”, line 69, in post_number_of_comments response = await fetcher.fetch(session, url) File “05_periodic_coroutines.py”, line 58, in fetch raise Exception('BOOM!') Exception: BOOM! [12:51:05] Calculating comments for top 5 stories. (2) [12:51:05] Waiting for 5 seconds… 

, ?


What to do? , , : asyncio :


')

Source: https://habr.com/ru/post/420129/


All Articles