📜 ⬆️ ⬇️

Efficient multithreading in Python

I want to share a simple recipe on how to efficiently perform a large number of http requests and other I / O tasks from regular Python. The most correct thing to do would be to use asynchronous frameworks like Tornado or gevent. But sometimes this option is not suitable, because it is problematic to build the event loop into an already existing project.

In my case, a Django application already existed, from which some very small files had to be uploaded to AWS s3 about once a month. As time went on, the number of files began to approach 50,000, and it was tiresome to unload them one by one. As you know, s3 does not support multiple updates for one PUT request, and the experimentally set maximum speed of requests from the ec2 server in the same data center does not exceed 17 per second (which is not enough, by the way). Thus, the update time for 50 thousand files began to approach one hour.

Pythonists from childhood know that there is no point in using threads (threads of the operating system) because of the global interpreter loc. But few realize that, like any lok, this is released from time to time. In particular, this occurs during input-output operations, including network ones. This means that streams can be used to parallelize http-requests — while one thread is waiting for a response, another one calmly processes the result of the previous one or prepares the next one.
')
It turns out, all you need is a pool of threads that will execute queries. Fortunately, such a pool has already been written. Starting with version 3.2, for unifying all asynchronous work in Python, the library concurrent.futures appeared. For the second version of Python, there is a backport under the name futures . The code to the disgrace is simple:

 from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(concurrency) as executor: for _ in executor.map(upload, queryset): pass 

Here concurrency is the number of workflows, upload is a function that performs the task itself, queryset is an iterator of objects that will be transferred one by one into the task. Already this code with a concurrency of 150 could get through to the Amazon server ≈ 450 requests per second.

There is a need for a note regarding the tasks: they must be thread-safe. Those. several parallel tasks should not have shared resources, or should manage them correctly. The global interpreter lock is a bad helper here - it does not guarantee that the execution of the thread will not be interrupted in the most inappropriate place. If you use only urllib3, requests or boto, there is nothing to worry about, they are already thread-safe. About other libraries you need to clarify. Also, your own code can be thread safe.

As time went on, the number of files began to approach 200 thousand. What do you think, how much memory can occupy 200 thousand Django-models? And 200 thousand futures? And 200 thousand tasks? All together about a gigabyte. It became clear that sending everything to the executor at once was not an option. But why not add new tasks at the end of the previous ones? At the very beginning we add the number of tasks equal to the number of threads, we keep records of how many tasks are set, how many are completed. We do not store futures ourselves, do not give them outside. It turns out a very cool function that can be reused (carefully, this is not the final version) :

 from concurrent.futures import ThreadPoolExecutor, Future def task_queue(task, iterator, concurrency=10): def submit(): try: obj = next(iterator) except StopIteration: return stats['delayed'] += 1 future = executor.submit(task, obj) future.add_done_callback(upload_done) def upload_done(future): submit() stats['delayed'] -= 1 stats['done'] += 1 executor = ThreadPoolExecutor(concurrency) stats = {'done': 0, 'delayed': 0}  for _ in range(concurrency): submit() return stats 

It has only three actions: the submit function, which selects the next object from the iterator and creates a task for it, upload_done , which is called at the end of the task and sets the next one, and the cycle in which the first tasks are set. We try to run:

 stats = task_queue(upload, queryset.iterator(), concurrency=5) while True: print '\rdone {done}, in work: {delayed} '.format(**stats), sys.stdout.flush() if stats['delayed'] == 0: break time.sleep(0.2) 

Great, it works! It already uses the iterator querisset method. It seems that it could be used in the first example with the executor.map function, but executor.map selects the entire iterator at once and makes it useless. Immediately the objects are really selected, one for each running thread.

However, there is a problem: it is worth increasing the number of streams, as exceptions of “ValueError: generator already executing” begin to pour. The code uses the same generator from all threads, so sooner or later two threads try to select values ​​at the same time (in fact, this can happen when there are only two threads, but with a lower probability). The same applies to the counters, sooner or later, the two processes simultaneously consider one value, then both add one and both write down the “initial number + 1” rather than the “initial number + 2”. Therefore, all work with shared objects must be wrapped in locks.

There are other problems. There is no handling of errors that may occur during the execution of a task. If you interrupt the execution with ctrl + c, the main thread will throw an exception, and the rest will continue until the very end, so you need a mechanism for forcing the queue. The executor just has a shutdown method for this purpose and one could give the executor out to stop him when the user presses ctrl + c. But there is a better option: you can create futures that will be resolved upon completion of all the work and clean up the executor if someone cancels it from the outside. Here is a version that takes into account all these errors:

 def task_queue(task, iterator, concurrency=10, on_fail=lambda _: None): def submit(): try: obj = next(iterator) except StopIteration: return if result.cancelled(): return stats['delayed'] += 1 future = executor.submit(task, obj) future.obj = obj future.add_done_callback(upload_done) def upload_done(future): with io_lock: submit() stats['delayed'] -= 1 stats['done'] += 1 if future.exception(): on_fail(future.exception(), future.obj) if stats['delayed'] == 0: result.set_result(stats)  def cleanup(_): with io_lock: executor.shutdown(wait=False)  io_lock = threading.RLock() executor = ThreadPoolExecutor(concurrency) result = Future() result.stats = stats = {'done': 0, 'delayed': 0} result.add_done_callback(cleanup) with io_lock: for _ in range(concurrency): submit()  return result 

Here you need to use reentrant lok, because there is a certain probability that a very short task will be completed before the handler is add_done_callback to add_done_callback , and then the handler will be executed immediately in the same thread and will try to capture the lok again. Dedlok will turn out. Reentrant Lok will allow the same flow that captured it for the first time, calmly go in again, but will not allow itself to be captured from another flow until the first flow releases it as many times as it captured. The code that uses this task queue changes a little:

 from concurrent.futures import ThreadPoolExecutor, Future, TimeoutError  result = task_queue(upload, queryset.iterator(), concurrency=5) try: while not result.done(): try: result.result(.2) except TimeoutError: pass print '\rdone {done}, in work: {delayed} '.format(**result.stats), sys.stdout.flush() except KeyboardInterrupt: result.cancel() raise 

You no longer need to stupidly fall asleep every 200 milliseconds, you can fall asleep smartly, waiting for the queue to complete. And in case of interruption stop the queue.

It was getting dark. As time went on, the number of files began to approach 1.5 million. Despite the fact that everything looked as if everything was working with fixed memory consumption (the number of threads, futures and Django models should not change during the whole run), memory consumption still grew. It turned out that queryset.iterator() does not work as expected. Objects are actually created only when they are explicitly selected from the iterator, but the driver’s response to the raw database is still deleted immediately. It turns out about 500 megabytes per million lines. The solution to this problem is quite obvious: you need to make requests not for all objects at once, but to divide the portions. In this case, it is necessary to avoid sampling with an offset, because a query of the type LIMIT 100 OFFSET 200000 actually means that the DBMS must be run through 200100 records. Instead of offset, use a field selection with an index.

 def real_queryset_iterator(qs, pk='pk', chunk_size=5000): qs = qs.order_by(pk)  chunk = list(qs[:chunk_size]) while chunk: for item in chunk: yield item last_pk = getattr(chunk[-1], pk) chunk = list(qs.filter(**{pk + '__gt': last_pk})[:chunk_size]) 

Here pk is more like a pagination key than primary. However, often the primary is well suited for this role. Such an iterator actually consumes a fixed amount of memory and runs no slower than a sample at a time. But if you increase the number of streams, another problem arises. In Jang, database connections are local to threads, so when a new thread makes a request, a new connection is created. Sooner or later, the number of connections reaches a critical number and an exception similar to this occurs:

 OperationalError: FATAL: remaining connection slots are reserved for non-replication superuser connections 

The correct solution would be to use the same connection for all threads, since we have already limited the ability to simultaneously make requests from different threads. There are no standard tools for this in Djanga, but this can be done with the help of a hack, replacing the threading.local object with a regular object:

 from django.db import connections, DEFAULT_DB_ALIAS connections._connections = type('empty', (object,), {})() connections[DEFAULT_DB_ALIAS].allow_thread_sharing = True 

But you have to understand that this will kill the database thread safety in the rest of the application, so this option is only suitable for commands launched from the console. A more humane option is to close the connection after each request, or after each element, which gives a not very large overhead.

 def close_connection_iterator(iterator, db=DEFAULT_DB_ALIAS): for item in iterator: connections[db].close() yield item result = task_queue( upload, close_connection_iterator(real_queryset_iterator(queryset)), concurrency=150 ) 

There is a third solution: use a separate stream that will communicate with the database, transferring objects to other threads. This option does not break anything else in the rest of the application and does not introduce the overhead of permanently re-opening connections. But its implementation is quite complex and draws no less than a separate article.

Perhaps it will take time, the number of files will increase to 10 million and new problems will appear. But while it seems that the main problem will be that such an update will take about eight hours and will cost $ 50 only for PUT requests at current Amazon prices.

Some theses from the read:
  1. I / O streams on Python work well, but care must be taken for isolation.
  2. You need to run tens and hundreds of thousands of tasks very carefully, monitoring memory consumption.
  3. queryset.iterator() in the Djang ORS does not work exactly as expected.

The task_queue and real_queryset_iterator on the githaba:
https://gist.github.com/homm/b8caf60c11997da69b1e

Source: https://habr.com/ru/post/229767/


All Articles