📜 ⬆️ ⬇️

3 cases for using Celery in a Django application

image

I am creating web applications on Django. Basically, these are SaaS services for business. All of these applications require asynchronous tasks. For their implementation I use Celery. In the article I will tell about situations in which I use Celery, with code examples.

Celery is a system for managing task queues. Fundamentally able to 2 things: take tasks from the queue and perform tasks on a schedule. The queue broker is usually RabbitMQ or Redis. The tasks are put in the queue, and then Celery workers take them from there and perform them.

For Celery, you can think of an application in almost any application, but then I will describe only those cases in which I use it myself.
')

1. Scheduled Tasks


Often there are tasks that need to be completed on a specific date and time: send a reminder to the user, end the trial period of the account, publish a post on social networks.

In Celery, it is possible to specify the ETA parameter when calling the task - the time at which the task must be launched. But if you plan tasks this way, then it turns out to be very unreliable: they may not start and it is inconvenient to cancel them.

A more reliable way is to use celerybeat schedule. That is, create a schedule where there will be tasks that start at a certain frequency or at a specific time. For example, if you need to publish a post on social networks on a schedule, then task for this is launched once a minute. If you need to finish the trial period for your account, you can run the task once a day.

# schedule.py from datetime import timedelta from celery.schedules import crontab CELERYBEAT_SCHEDULE = { 'publish_post_starter': { 'task': 'publish_post_starter', 'schedule': timedelta(minutes=1), }, 'end_trial_starter': { 'task': 'end_trial_starter', 'schedule': crontab(hour=10, minute=21), }, } 

In the task starter, we get all the instances for which the planned time has already come. We go through the instances and for each we call the main task. As arguments, we pass only the id of the instance so as not to clog the queue with unnecessary data. We can immediately go through all instances and perform actions, but most often it is better to call a separate task for each instance. So we will speed up the execution, and if an error occurs, then it will affect only one of the tasks.

 # tasks.py @app.task(name='publish_post') def publish_post(post_id): ... @app.task(name='publish_post_starter') def publish_post_starter(): post_ids = list( Post.objects.filter( publish_dt__lte=timezone.now(), is_published=False ).values_list('id', flat=True) ) for post_id in post_ids: publish_post.delay(post_id) 

2. Long computing and API calls from WSGI


WSGI refers to the context in which requests from users are processed (Request-Response Cycle). In contrast to the context of asynchronous tasks - Celery.

To create a responsive interface, all buttons must respond instantly and should not block the rest of the interface. To do this, after clicking the button is blocked, a spinner is placed on it and an ajax request is sent to the server. If processing the request takes longer than a couple of seconds, then you can move the calculations to the Celery task.

In WSGI, we call task and return a response. At the front, unlock the button and remove the spinner. We show the user a message that the action is running. In parallel, a Celery task is executed, which, upon completion, returns a response on the web socket. Having received the result at the front, we show it to the user.

 # rest_views.py from rest_framework import status from rest_framework.views import APIView from rest_framework.response import Response from tasks import send_emails class SendEmailView(APIView): def post(self, request): # this id will be used to send response with websocket request_uuid = request.data.get('request_uuid') if not request_uuid: return Response(status=status.HTTP_400_BAD_REQUEST) send_emails.delay(request.user.id, request_uuid) return Response(status=status.HTTP_200_OK) 

Separately, you can distinguish external API calls from WSGI. In this case, all calls, regardless of the duration of their execution, are launched through the Celery task. This is protection from the fool. There should not be a situation where, due to the inaccessibility of some external API, the user interface freezes.

3. Challenges from Tornado


When integrating with a social network, Telegram or a payment service, you need a webhook url to which notifications will come. The number of requests can not always be calculated in advance, but most likely their number will exceed requests from users. These requests will be received until they receive a response with code 200.

For processing such requests, the Tornado asynchronous framework is suitable. In order not to turn processing into synchronous one in Tornado there should be no IO operations (work with databases, files). This is where Celery is needed. The Tornado handler receives the request, validates the data, calls the Celery task, and returns a successful response.

 # tornado_handlers.py from tornado import gen, escape from tornado.web import RequestHandler from tasks import handle_vk_callback class VkCallbackHandler(RequestHandler): @gen.coroutine def post(self, *args, **kwargs): try: data = escape.json_decode(self.request.body) except ValueError: self.set_status(status_code=400, reason='Invalid data') return handle_vk_callback.delay(data) self.write('ok') return 

Source: https://habr.com/ru/post/461775/


All Articles