In this article, I suggest the reader make a rather fascinating journey with me into the depths of asyncio with me in order to figure out how it implements asynchronous code execution. We will saddle the callbacks and sweep through the cycle of events through a couple of key abstractions right into the corute. If on your python map there are no these sights yet, welcome under cat.
asyncio is an asynchronous I / O library which, according to pep3153 , was created to provide a standardized base for creating asynchronous frameworks. pep3156 also attributes to it the need to provide extremely simple integration into pre-existing asynchronous frameworks (Twisted, Tornado, Gevent). As we can see now, these goals were successfully achieved - a new framework based on asyncio appeared: aiohttp , in Tornado AsyncioMainLoop is the default event loop from version 5.0, in Twisted asyncioreactor is available from version 16.5.0, and for Gevent there is a third-party aiogevent library .
Asyncio is a hybrid library that uses two approaches at the same time to implement asynchronous code execution: classic on callbacks and, relatively new, (at least for python) on corutines. It is based on three main abstractions, which are analogues of abstractions that exist in third-party frameworks:
The cycle of events - the main component of the library, on the roads running through it, the data are delivered to any of its components. It is large and complex, so first consider its trimmed version.
# ~/inside_asyncio/base_loop.py import collections import random class Loop: def __init__(self): # self.ready = collections.deque() def call_soon(self, callback, *args): # self.ready.append((callback, args)) def run_until_complete(self, callback, *args): # self.call_soon(callback, *args) # - # while self.ready: ntodo = len(self.ready) # # for _ in range(ntodo): # # callback, args = self.ready.popleft() callback(*args) def callback(loop): print('') loop.call_soon(print, '') loop = Loop() loop.run_until_complete(c, loop)
Having saddled our little callback, we hit the road through call_soon , get into the queue and after a brief wait we will be displayed on the screen.
It is worth mentioning that callbacks are dangerous horses - if they drop you in the middle of the road, the python interpreter will not be able to help you understand where it happened. If you do not believe, go the same way on the maybe_print callback coming to the finish line in about half of the cases.
# ~/inside_asyncio/base_loop.py def maybe_print(msg): if random.randint(0, 1): raise Exception(msg) else: print(msg) def starting_point(loop): # print('') loop.call_soon(maybe_print, '') def main(loop): loop.call_soon(starting_point, loop) loop.call_soon(starting_point, loop) loop = Loop() loop.run_until_complete(main, loop)
Below is a full trace run of the previous example. Due to the fact that the maybe_print function was started by an event loop, and not directly from the starting_point , the traceback ends exactly on it, in the run_until_complete method. For such a traceback, it is impossible to determine where the starting_point is in the code, which will make debugging much more difficult if the starting_point is located in several places in the code base.
$: python3 base_loop.py >> # >> # >> # >> Traceback (most recent call last): >> File "base_loop.py", line 42, in <module> >> loop.run_until_complete(main, loop) >> File "base_loop.py", line 17, in run_until_complete >> callback(*args) >> File "base_loop.py", line 29, in maybe_print >> raise Exception(msg) >> Exception: #
A continuous call stack is needed not only to display the full traceback, but also to implement other features of the language. For example, it is based on exception handling. The example below will not work, because by the time the starting_point starts, the main function will already be executed:
# ~/inside_asyncio/base_loop.py def main(loop): try: loop.call_soon(starting_point, loop) loop.call_soon(starting_point, loop) except: pass Loop().run_until_complete(main, loop)
The following example does not work either. The context manager in the main function will open and close the file even before its processing is started.
# ~/inside_asyncio/base_loop.py def main(loop): with open('file.txt', 'rb') as f: loop.call_soon(process_file, f) Loop().run_until_complete(main, loop) # , =(
The lack of a continuous call stack limits the use of the usual features of the language. For a partial workaround of this flaw, asyncio had to add a lot of additional code that was not directly related to the task it was solving. This code, for the most part, is not in the examples - they are quite complex without it.
The event loop communicates with the outside world through the operating system through events. Code that knows how to work with it is provided by a module of the standard library called selectors . It allows you to tell the operating system that we are waiting for some event, and then ask if it has happened. In the example below, the expected event will be the availability of the socket for reading.
# ~/inside_asyncio/event_loop.py import selectors import socket import collections from future import Future from handle import Handle from task import Task class EventLoop: def __init__(self): self.ready = collections.deque() # self.selector = selectors.DefaultSelector() def add_reader(self, sock, callback): # # : # , # # self.selector.register( sock, socket.EVENT_READ, (self._accept_conn, sock, callback) ) def _accept_conn(self, sock, callback): # conn, addr = sock.accept() conn.setblocking(False) # self.selector.register( conn, socket.EVENT_READ, (callback, conn) ) def run_until_complete(self, callback, *args): self.call_soon(callback, *args) # - while self.ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self.ready.popleft() callback(*args) # for key, events in self.selector.select(timeout=0): # callback, *args = key.data # self.call_soon(callback, *args) def call_soon(self, callback, *args): self.ready.append((callback, args)) def print_data(conn): print(conn.recv(1000)) def main(loop): # sock = socket.socket() # 8086 sock.bind(('localhost', 8086)) sock.listen(100) sock.setblocking(False) # loop.add_reader(sock, print_data) loop = EventLoop() # loop.run_until_complete(main, loop)
The messenger from the outside world leaves his message or package in the selector, and the selector sends it to the recipient. Now it is possible to read from the socket using the event loop. If you run this code and connect using netcat, it will faithfully output everything that will be sent to it.
$: nc localhost 8086 $: python3 event_loop.py "Hi there!" b'"Hi there!"\n' "Hello!" b'"Hello!"\n' "Answer me, please!" b'"Answer me, please!"\n'
At the beginning of the article it was said that asyncio is a hybrid library in which the korutins work on top of callbacks. To implement this functionality, the two remaining main abstractions are used: Task and Future . Next, the code of these abstractions will be shown, and then, how, using their event loop, the corutines are executed.
Below is the code for the Future class. It is needed so that in coroutine one could wait until the completion of the callback and get its result.
# ~/inside_asyncio/future.py import sys from asyncio import events, CancelledError class Future: # _state = 'PENDING' # FINISHED, CANCELLED # Future # _source_traceback = None # _callbacks = [] # _exception = None # _loop = None # _result = None def __init__(self, loop): self._loop = loop self._source_traceback = events.extract_stack(sys._getframe(1)) def add_done_callback(self, callback): # self._callbacks.append(callback) def _schedule_callbacks(self): # for callback in self._callbacks: self._loop.call_soon(callback, self) self._callbacks[:] = [] # Future # - def set_exception(self, exception): # self._exception = exception # self._state = 'FINISHED' # self._schedule_callbacks() def set_result(self, result): # self._result = result self._state = 'FINISHED' self._schedule_callbacks() def cancel(self): # self._state = 'CANCELLED' self._schedule_callbacks() def result(self): # # if self._state == 'CANCELLED': raise CancelledError # if self._exception is not None: raise self._exception # return self._result def __await__(self): # , await # if self._state == 'PENDING': yield self # return self.result()
This is a special subclass of the Future class. It is needed to run the quorus on the callback event loop.
# ~/inside_asyncio/task.py from asyncio import futures from future import Future class Task(Future): def __init__(self, coro, *, loop=None): super().__init__(loop=loop) # self._coro = coro def _step(self, exc=None): # , try: if exc is None: # None # result = self._coro.send(None) else: # self._coro.throw(exc) except StopIteration: result = None except Exception as exc: self.set_exception(exc) else: # Future # wakeup if isinstance(result, Future): result.add_done_callback(self._wakeup) # step elif result is None: self._loop.call_soon(self._step) def _wakeup(self, future): # Future Task # try: future.result() except Exception as exc: self._step(exc) # Future else: self._step()
# ~/inside_asyncio/future_event_loop.py import selectors from selectors import EVENT_READ, EVENT_WRITE import socket import collections from future import Future from task import Task class EventLoop: def __init__(self): self._ready = collections.deque() self.selector = selectors.DefaultSelector() def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): callback, *args = key.data self.call_soon(callback, *args) def call_soon(self, callback, *args): self._ready.append((callback, args)) # Future def sock_accept(self, sock, fut=None): # # Future fut = fut if fut else Future(loop=self) try: # conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): # # # Future self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: fut.set_exception(exc) self.selector.unregister(sock) else: # # Future fut.set_result((conn, address)) self.selector.unregister(sock) return fut def sock_recv(self, sock, n, fut=None): # # , # , fut = fut if fut else Future(loop=self) try: data = sock.recv(n) except (BlockingIOError, InterruptedError): self.selector.register( sock, EVENT_READ, (self.sock_recv, sock, n, fut) ) except Exception as exc: fut.set_exception(exc) self.selector.unregister(sock) else: fut.set_result(data) self.selector.unregister(sock) return fut async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # conn, addr = await loop.sock_accept(sock) # result = await loop.sock_recv(conn, 1000) print(result) loop = EventLoop() # Task task = Task(coro=main(loop), loop=loop) # loop.run_until_complete(task._step)
Now let's see how main corutin will be executed:
__________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): # task._step self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() # callback(*args) # task._step() ___________________________________________________________________ clsss Task: def _step(self, exc=None): try: if exc is None: # None result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): # # sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): # Future fut = fut if fut else Future(loop=self) try: # conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): # # # Future self.selector.register( sock, EVENT_READ, (self.sock_accept, sock, fut) ) except Exception as exc: -------------------------------------------- self.selector.unregister(sock) # Future return fut ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # await __await__ Future conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): # Future if self._state == 'PENDING': yield self return self.result() ___________________________________________________________________ class Task(Future): def _step(self, exc=None): try: if exc is None: # None result = self._coro.send(None) # result = fut -------------------------------- else: # Future # wakeup if isinstance(result, Future): result.add_done_callback(self._wakeup) elif result is None: self._loop.call_soon(self._step) # - Task Future # # ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() callback(*args) for key, events in self.selector.select(timeout=0): # callback, *args = key.data self.call_soon(callback, *args) # loop.sock_accept(sock, fut) ___________________________________________________________________ class EventLoop: def sock_accept(self, sock, fut=None): fut = fut if fut else Future(loop=self) try: # conn, address = sock.accept() conn.setblocking(False) except (BlockingIOError, InterruptedError): -------------------------------- else: # Future fut.set_result((conn, address)) self.selector.unregister(sock) return fut ___________________________________________________________________ class Future: def set_result(self, result): # self._result = result # self._state = 'FINISHED' # self._schedule_callbacks() def _schedule_callbacks(self): for callback in self._callbacks: # task.wakeup self._loop.call_soon(callback, self) # (task.wakeup, fut) self._callbacks[:] = [] ___________________________________________________________________ class EventLoop: def run_until_complete(self, callback, *args): self.call_soon(callback, *args) while self._ready or self.selector.get_map(): ntodo = len(self._ready) for _ in range(ntodo): callback, args = self._ready.popleft() # # task.wakeup callback(*args) # task.wakeup(fut) ___________________________________________________________________ class Task(Future): def _wakeup(self, future): try: future.result() except Exception as exc: self._step(exc) else: # Future task._step self._step() def _step(self, exc=None): try: if exc is None: # None result = self._coro.send(None) else: ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # await __awai__ conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result) ___________________________________________________________________ class Future: def __await__(self): if self._state == 'PENDING': yield self # Future return self.result() ___________________________________________________________________ async def main(loop): sock = socket.socket() sock.bind(('localhost', 8080)) sock.listen(100) sock.setblocking(False) # Future conn addr conn, addr = await loop.sock_accept(sock) result = await loop.sock_recv(conn, 1000) print(result)
In such a simple way, asyncio performs cortutina.
The goal of creating asyncio was successfully achieved. She not only solved the problem of compatibility, but also caused a huge increase in interest in competitive programming in the community. New articles and libraries began to appear like mushrooms after rain. In addition, asyncio influenced the language itself: native cortutins and new async / await keywords were added to it. The previous time a new keyword was added back in 2003, it was the keyword yield .
One of the goals of creating asyncio was to provide extremely simple integration into pre-existing asynchronous frameworks (Twisted, Tornado, Gevent). From this goal, the choice of tools follows logically: if there were no compatibility requirement, perhaps the main role would be given to the Corintins. Due to the fact that when programming on callbacks it is impossible to keep a continuous stack of calls, an additional system had to be created on the border between them and Korutins that would support the capabilities of the language based on it.
Now the main question. Why does all this need to be known to a simple library user who follows recommendations from the documentation and uses only Korutin and a high-level API?
Here is a piece of StreamWriter documentation
Its copy is returned by the asyncio.open_connection function and is the async / await API over the callback API. And these callbacks stick out of it. The write and writelines functions are synchronous, they try to write to the socket, and if they don’t, they drop the data into the underlying buffer and add callbacks to the record. Korutina drain is needed in order to ensure that it is possible to wait until the amount of data in the buffer drops to the specified value.
If you forget to call drain between write calls, the internal buffer can grow to indecent sizes. However, if you keep this in mind, a couple of unpleasant moments remain. The first is that if a callback on the record “breaks”, then the quorutine using this API will not know about it and, accordingly, will not be able to process it. Secondly, if the korutin "breaks down", then the callback for recording will not know about it and will continue to write data from the buffer.
Thus, even using only korutiny, be prepared for the fact that kollbeki remind themselves.
You can read about how to work with databases from asynchronous code in this article of our corporate blog Antida software .
Source: https://habr.com/ru/post/453348/