📜 ⬆️ ⬇️

Using RabbitMQ in django projects without Celery, and what's new in Celery 3.0

I think most python programmers are already to some extent familiar with the features of Celery. In the first part I will tell you how to use RabbitMQ without celery, and in the second part - a brief overview of the new features of celery 3.0.
About installing bundles Django-Celery-RabbitMQ can be read here .
About the use of RabbitMQ is well written here , and here , well, on the site RabbitMQ.

Briefly recall the installation and configuration:
RabbitMQ:
sudo apt-get install rabbitmq-server
Add user:
 $ rabbitmqctl add_user myuser mypassword $ rabbitmqctl add_vhost '/' $ rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*" 

Briefly settings Celery, RabbitMQ:
in settings.py
 import djcelery os.environ["CELERY_LOADER"] = "django" djcelery.setup_loader() AMQP_HOST = 'localhost' BROKER_HOST='localhost' BROKER_PORT = 5672 BROKER_VHOST = "/" BROKER_USER = "myuser" BROKER_PASSWORD = "mypassword" INSTALLED_APPS+='djcelery' 

Statement: in order to make one small task asynchronous, it is not at all necessary to use celery. It is possible to do RabbitMQ.
Evidence:
Let's start from the opposite:
Task: check email for the presence of a letter from the specified sender, if there is no letter, repeat the check in a minute, if there is, go further (parse it for example ...)
Use poplib, email.
We will write a function that receives an email from a predetermined sender and we wrap it with a task decorator.
The function accepts an email address, a password and an email address from whom the letter should come and returns the status (Ok, Error) and message
in tasks.py
 from celery.task import task, periodic_task from celery.task.schedules import crontab import poplib import email @task def mail_content(user_mail, mail_pass, mail_from): mail_server = 'pop.'+user_mail.split('@')[1] mail_login = user_mail.split('@')[0] p = poplib.POP3(mail_server) print p.getwelcome() try: p.user(mail_login) p.pass_(mail_pass) except poplib.error_proto: return 'Error', 'Email is blocked' try: print p.list() except: return 'Error', 'dont receive list of mails' numMessages = len(p.list()[1]) print numMessages for i in range(numMessages): m = email.message_from_string("\n".join(p.top(i+1, 1)[1])) try: this_email_from = m['From'] if this_email_from.find(mail_from) >= 0: print this_email_from m = email.message_from_string('\n'.join(p.retr(i+1)[1])) content = get_text_body(m) print content return 'Ok', content else: pass except Exception, e: return 'Error', unicode(e, 'utf8') raise mail_content.retry(exc=e, countdown=30) 

The last line of the code describes the restart of the task after 30 seconds if the letter was not found.
Now we can run a task like this:
 >>>res = mail_content.delay('user@domen', 'password', 'email_from@domen.email.from') 

in this case, execution will start immediately, or like this:
 >>>res = mail_content.apply_async(('user@domen', 'password', 'email_from@domen.email.from'), countdown=30) 

In this case, execution will begin after 30 seconds.
(You must first start the celery server:
python manage.py celeryd
and in another window run the shell:
python manage.py shell,
And already from the shela call these commands)
We can get the result by doing
 >>>res.get() () >>>res.info 

(returns None if there is no result yet, and the result if there is one)
But checking whether the result is not always convenient and always means performing unnecessary actions.
To call a function after performing the task, you can implement a callback. If you have celery installed and you can make a function that accepts the result of a task (task), then you can proceed to the next subsection. Who wants to do without celery is a way to organize callback based on pika and rabbitMQ.
To work with AMQP, install the pika package:
 $ sudo pip install pika==0.9.5 

Details Hello world using this library and RabbitMQ is described here.
in decorators.py:
 import pika import pickle import time importr settings def callback(function_to_decorate): user = settings.BROKER_USER broker_host = settings.BROKER_HOST password = settings.BROKER_PASSWORD credentials = pika.PlainCredentials(user, password) parameters = pika.ConnectionParameters(host=broker_host, credentials=credentials) def receiver(*args, **kw): (backend_function, data) = function_to_decorate(*args, **kw) pickled_obj = pickle.dumps(data) queue_name = str(time.time()) print "call_backend", backend_function.__name__ connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare( queue = queue_name) channel.basic_publish(exchange='', routing_key=queue_name, body=pickled_obj) channel.basic_consume( backend_function, queue=queue_name, no_ack = True) channel.queue_delete(queue=queue_name) connection.close() return receiver 

This is the decorator, with which we wrap the mail_content function before (!) Wrapping the task decorator
The decorator returns our mail_content function with the added instructions for sending a message to rabbitmq
I will not rewrite the entire function in tasks.py, I just changed
in tasks.py:
 from decorators import * from tasks_backend import mail_analizer, mail_error @task @callback def mail_content(...): ... if (...): ... return mail_analizer, (content,) return mail_error, ('error',) 

We return the function as the first argument, the second as the list of arguments that we want to pass to the function
in tasks_backend.py
 import tasks def mail_analizer(ch, method, properties, body): email_text = pickle.loads(body) if emai_text.find(u'Hello'): tasks.send_emails.delay(email_text) else: tasks.send_twitter_status.delay(email_text) 

They accepted email, recognized it and launched new tasks.
Note that the arguments are not very convenient, fix this:
in decorators.py
 def backend(function_to_decorate): def receive(ch, method, properties, body): data=pickle.loads(body) args = data function_to_decorate(*args) return receive 

Now we can rewrite the mail_analizer function like this:
 @backend def mail_analizer(email_text): if emai_text.find(u'Hello'): tasks.send_emails.delay(email_text) else: tasks.send_twitter_status.delay(email_text) 

For start of the following functions we use the decorator
 @callback 
as in mail_content:
 @backend @callback def mail_analizer(cont): print cont return send_twitter_status, (cont,) 

A simple example of building a chain of functions with this interface:
 @callback def first(*args): print first.__name__ print args return senders, args @backend @callback def senders(*args): print args return analizer, args @backend @callback def analizer( *args): print args return ended_fun, args @backend def ended_fun(*args): print ended_fun.__name__ print args 

The first function is wrapped only by the decorator.
 @callback 
because she takes nothing from the rabbit, and the last - only
 @backend 
- because she does not transmit anything.
Note that a function can call itself. Also note that the function wrapped by the backend decorator can only be called from rabbitmq.
To run, use a function that wraps only callback.
 @callback def runer(*args): return test_func, (args) @backend @callback def test_func( *args): print args return test_func, args 

The final version of the functions mail_content, email_analizer, run_email:
 @backend @call_backend def mail_content(user_mail, mail_pass, mail_from): mail_server = 'pop.'+user_mail.split('@')[1] mail_login = user_mail.split('@')[0] p = poplib.POP3(mail_server) print p.getwelcome() try: p.user(mail_login) p.pass_(mail_pass) except poplib.error_proto: return mail_error, 'Email is blocked' try: print p.list() except: return mail_error, 'dont receive list of mails' numMessages = len(p.list()[1]) print numMessages for i in range(numMessages): m = email.message_from_string("\n".join(p.top(i+1, 1)[1])) try: this_from = m['From'] this_from = this_from.decode('cp1251').split('<'.decode('cp1251'))[1] if this_from.find(mail_from) >= 0: print m['From'] m = email.message_from_string('\n'.join(p.retr(i+1)[1])) content = get_text_body(m) print content return email_analizer, (content, email_from) else: pass except Exception, e: return email_error, (unicode(e, 'utf8'),) return mail_content, (user_mail, mail_pass, mail_from) @backend @call_backend def email_analizer(content, email_from): if content.find(u'Hello'): email_to = email_from text=u'Hello, my dear friend' return send_mail, (email_to, text) return send_twitter_status, (cont,) @call_backend def run_email(): '''  , , email, password, email_from ''' return mail_content, (email, password, email_from) 

Subtotal:
I hope there was nothing complicated. You can use, instead of celery, if you have for example one small task (task).

How to do it with celery 3.0



In celery 3.0, you can transfer the name of the task to which you want to transfer the result of the task
Example from documentation:
 @celery.task def add(x, y): return x + y add.apply_async((2, 2), link=add.s(16)) 

where add is our task (task), add.s is a subtask (subtask), which is started after the execution of add (2, 2), the first argument to the subtask is the result of the execution of add (2, 2), the second argument comes to 16. Total (2 + 2) + 16 = 20. What is the subtask here
In relation to our task, we make a task from the mail_analizer function, leave one argument - content, remove the decorator @call_backend and call it like this:
')
>>> mail_content.apply_async (mail_addres, mail_password, email_from, link = mail_analizer.s ())
The variable link_error is also provided for the case when the problem “raises” an error.
Read more about it here.
In addition, in celery 3.0 appeared:
Group

the group accepts a list of tasks to be applied in parallel:
example from the documentation:
 >>> from celery import group >>> res = group(add.s(i, i) for i in xrange(10))() >>> res.get(timeout=1) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]   >>> g = group(add.s(i, i) for i in xrange(10)) >>> g.apply_async() 

chain:

Call chains
Now tasks can be called as chains, for example:
 >>> from celery import chain @task def mul(x,y): return x*y @task def div(x,y): return x/y # (2 + 2) * 8 / 2 >>> res = chain(add.subtask((2, 2)), mul.subtask((8, )), div.subtask((2,))).apply_async() >>> res.get() == 16 >>> res.parent.get() == 32 >>> res.parent.parent.get() == 4   >>> (add.s(2, 2) | add.s(4) | add.s(8))().get() 16 </source <h5>immutable</h5>     ,            <source lang="python"> >>> add.subtask((2, 2), immutable=True)  >>> add.si(2, 2) 

chord

Chord:
accepts a list of tasks to be performed in parallel and a task that accepts a list of results from the task list. This is bent.
 @task def xsum(res_list): return sum(res_list) >>> from celery import chord >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())() >>> res.get() 90 

using chain (group) we get chord:
 >>> c3 = (group(add.s(i, i) for i in xrange(10) | xsum.s())) >>> res = c3() >>> res.get() 90 

map

Like map (fun, [1,2,3])
 res=task.map([1,2])  res=[task(1), task(2)] 

starmap

 res=add.starmap([(1,2), (2,4)])  res=[add(1,2), add(2,4)] 

chuncs

Splits long lists of arguments into different tasks,
 >>> from proj.tasks import add >>> res = add.chunks(zip(range(100), range(100)), 10)() >>> res.get() [[0, 2, 4, 6, 8, 10, 12, 14, 16, 18], [20, 22, 24, 26, 28, 30, 32, 34, 36, 38], [40, 42, 44, 46, 48, 50, 52, 54, 56, 58], [60, 62, 64, 66, 68, 70, 72, 74, 76, 78], [80, 82, 84, 86, 88, 90, 92, 94, 96, 98], [100, 102, 104, 106, 108, 110, 112, 114, 116, 118], [120, 122, 124, 126, 128, 130, 132, 134, 136, 138], [140, 142, 144, 146, 148, 150, 152, 154, 156, 158], [160, 162, 164, 166, 168, 170, 172, 174, 176, 178], [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]] 

Subtotal:

Celery 3.0 gives a lot of very handy buns that are a pleasure to use if used.
Total:

Celery provides many convenient tools, but for small tasks, where 90% of these functions are not needed, it is quite possible to do with a message queue (rabbit), thus getting rid of the need to configure celery, reduce server load, get rid of additional project dependencies.
Thank you all for your attention.

Source: https://habr.com/ru/post/158961/


All Articles