📜 ⬆️ ⬇️

Celery - distributed task queue

This time we decided to talk about the wonderful product that we use in our work. It will be a question of Celery - "distributed task queue". This is a distributed asynchronous queue of tasks, which has broad functionality. In our site builder, we often have to run asynchronous from the point of view of a user response task. On Habré, unfortunately, there is not a lot of information on this product, and it deserves a separate mention, and we want to correct this.

So, what can Celery :

Interested? We ask under the cat.

Let's start c worker'a configuration. This is the daemon that actually receives tasks from the queue and executes them. The recommended queue is RabbitMQ, but for now we have limited ourselves to ghettoq, via MongoDB. Also supported by Redis and RDBMS.

CARROT_BACKEND = "ghettoq.taproot.MongoDB" BROKER_HOST = "xxx" BROKER_PORT = 27017 BROKER_VHOST = "celery" CELERY_SEND_TASK_ERROR_EMAILS = True ADMINS = ( ('Admin', 'admin@localhost'), ) CELERYD_MAX_TASKS_PER_CHILD = 5 CELERY_IMPORTS = ("tasks", ) CELERY_DISABLE_RATE_LIMITS = True CELERY_RESULT_BACKEND = "mongodb" CELERY_MONGODB_BACKEND_SETTINGS = { "host": "xxx", "port": 27017, "database": "celery", "taskmeta_collection": "my_taskmeta_collection", } 

Running the daemon: celeryd -l INFO -B
We include logging in the console and the -B option to start the daemon of periodic tasks. The latter can be run separately by the command celerybeat

Now create a test task. In the config, we import tasks, so the tasks file is tasks.py:

 from celery.decorators import task from celery.decorators import periodic_task from celery.task.schedules import crontab @periodic_task(run_every=timedelta(seconds=60)) def mail_queue(): print "Task is executed every minute" @periodic_task(run_every=crontab(hour=0, minute=10)) def transactions(): print "Task is executed every day on 0:10" @task def delayed_function(id): some_function() @task def delayed_heavy_function(id): some_heavy_function() 

So, we have 4 tasks in tasks. The first two are scheduled; they are marked by the @periodic_task decorator. But the last two will be called directly from the program code. In this way:

 from tasks import delayed_function, delayed_heavy_function delayed_function.apply_async(args=[id], countdown=300) #    300  r = delayed_heavy_function.delay(id) #  (   ),    

Now in order to track the result and the fact that the last task was completed, let's execute:

r.ready () # Return True if the job has completed
r.result # Returns the value of the function executed or None if it has not yet been executed (asynchronously)
r.get () # Will wait for the assignment and return its result (synchronous)

The variable r can be driven through cPickle, put the value in the cache and ayaksom to poll the status of the task. Or you can get the task id, and put it in the cache. In addition, you can set the task id yourself, the main thing is that it is unique.

After using celery tightly, we found several errors related to deferred execution of tasks with the queue manager ghettoq, but they were all corrected by the author on the day of the issue on github, for which he thanks.

Not so long ago, version 2.0 was released, which has ceased to be django-dependent, and integration with django has now been moved to a separate sub-project, celery-django.

Two celery limitations can be distinguished: more precisely, these are just features: on a regular FreeBSD, workers will not work, because there is no pythonic multiprocessing there, although there are recipes for assembling a kernel for celery; To reload tasks, the worker must be restarted to load the new python code of the tasks and related functions. It works great on linux.

Source: https://habr.com/ru/post/102742/

All Articles