sudo apt-get install rabbitmq-server
$ rabbitmqctl add_user myuser mypassword $ rabbitmqctl add_vhost '/' $ rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
import djcelery os.environ["CELERY_LOADER"] = "django" djcelery.setup_loader() AMQP_HOST = 'localhost' BROKER_HOST='localhost' BROKER_PORT = 5672 BROKER_VHOST = "/" BROKER_USER = "myuser" BROKER_PASSWORD = "mypassword" INSTALLED_APPS+='djcelery'
from celery.task import task, periodic_task from celery.task.schedules import crontab import poplib import email @task def mail_content(user_mail, mail_pass, mail_from): mail_server = 'pop.'+user_mail.split('@')[1] mail_login = user_mail.split('@')[0] p = poplib.POP3(mail_server) print p.getwelcome() try: p.user(mail_login) p.pass_(mail_pass) except poplib.error_proto: return 'Error', 'Email is blocked' try: print p.list() except: return 'Error', 'dont receive list of mails' numMessages = len(p.list()[1]) print numMessages for i in range(numMessages): m = email.message_from_string("\n".join(p.top(i+1, 1)[1])) try: this_email_from = m['From'] if this_email_from.find(mail_from) >= 0: print this_email_from m = email.message_from_string('\n'.join(p.retr(i+1)[1])) content = get_text_body(m) print content return 'Ok', content else: pass except Exception, e: return 'Error', unicode(e, 'utf8') raise mail_content.retry(exc=e, countdown=30)
>>>res = mail_content.delay('user@domen', 'password', 'email_from@domen.email.from')
>>>res = mail_content.apply_async(('user@domen', 'password', 'email_from@domen.email.from'), countdown=30)
>>>res.get() () >>>res.info
$ sudo pip install pika==0.9.5
import pika import pickle import time importr settings def callback(function_to_decorate): user = settings.BROKER_USER broker_host = settings.BROKER_HOST password = settings.BROKER_PASSWORD credentials = pika.PlainCredentials(user, password) parameters = pika.ConnectionParameters(host=broker_host, credentials=credentials) def receiver(*args, **kw): (backend_function, data) = function_to_decorate(*args, **kw) pickled_obj = pickle.dumps(data) queue_name = str(time.time()) print "call_backend", backend_function.__name__ connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare( queue = queue_name) channel.basic_publish(exchange='', routing_key=queue_name, body=pickled_obj) channel.basic_consume( backend_function, queue=queue_name, no_ack = True) channel.queue_delete(queue=queue_name) connection.close() return receiver
from decorators import * from tasks_backend import mail_analizer, mail_error @task @callback def mail_content(...): ... if (...): ... return mail_analizer, (content,) return mail_error, ('error',)
import tasks def mail_analizer(ch, method, properties, body): email_text = pickle.loads(body) if emai_text.find(u'Hello'): tasks.send_emails.delay(email_text) else: tasks.send_twitter_status.delay(email_text)
def backend(function_to_decorate): def receive(ch, method, properties, body): data=pickle.loads(body) args = data function_to_decorate(*args) return receive
@backend def mail_analizer(email_text): if emai_text.find(u'Hello'): tasks.send_emails.delay(email_text) else: tasks.send_twitter_status.delay(email_text)
@callback
as in mail_content: @backend @callback def mail_analizer(cont): print cont return send_twitter_status, (cont,)
@callback def first(*args): print first.__name__ print args return senders, args @backend @callback def senders(*args): print args return analizer, args @backend @callback def analizer( *args): print args return ended_fun, args @backend def ended_fun(*args): print ended_fun.__name__ print args
@callback
because she takes nothing from the rabbit, and the last - only @backend
- because she does not transmit anything. @callback def runer(*args): return test_func, (args) @backend @callback def test_func( *args): print args return test_func, args
@backend @call_backend def mail_content(user_mail, mail_pass, mail_from): mail_server = 'pop.'+user_mail.split('@')[1] mail_login = user_mail.split('@')[0] p = poplib.POP3(mail_server) print p.getwelcome() try: p.user(mail_login) p.pass_(mail_pass) except poplib.error_proto: return mail_error, 'Email is blocked' try: print p.list() except: return mail_error, 'dont receive list of mails' numMessages = len(p.list()[1]) print numMessages for i in range(numMessages): m = email.message_from_string("\n".join(p.top(i+1, 1)[1])) try: this_from = m['From'] this_from = this_from.decode('cp1251').split('<'.decode('cp1251'))[1] if this_from.find(mail_from) >= 0: print m['From'] m = email.message_from_string('\n'.join(p.retr(i+1)[1])) content = get_text_body(m) print content return email_analizer, (content, email_from) else: pass except Exception, e: return email_error, (unicode(e, 'utf8'),) return mail_content, (user_mail, mail_pass, mail_from) @backend @call_backend def email_analizer(content, email_from): if content.find(u'Hello'): email_to = email_from text=u'Hello, my dear friend' return send_mail, (email_to, text) return send_twitter_status, (cont,) @call_backend def run_email(): ''' , , email, password, email_from ''' return mail_content, (email, password, email_from)
@celery.task def add(x, y): return x + y add.apply_async((2, 2), link=add.s(16))
>>> from celery import group >>> res = group(add.s(i, i) for i in xrange(10))() >>> res.get(timeout=1) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] >>> g = group(add.s(i, i) for i in xrange(10)) >>> g.apply_async()
>>> from celery import chain @task def mul(x,y): return x*y @task def div(x,y): return x/y # (2 + 2) * 8 / 2 >>> res = chain(add.subtask((2, 2)), mul.subtask((8, )), div.subtask((2,))).apply_async() >>> res.get() == 16 >>> res.parent.get() == 32 >>> res.parent.parent.get() == 4 >>> (add.s(2, 2) | add.s(4) | add.s(8))().get() 16 </source <h5>immutable</h5> , <source lang="python"> >>> add.subtask((2, 2), immutable=True) >>> add.si(2, 2)
@task def xsum(res_list): return sum(res_list) >>> from celery import chord >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())() >>> res.get() 90
>>> c3 = (group(add.s(i, i) for i in xrange(10) | xsum.s())) >>> res = c3() >>> res.get() 90
res=task.map([1,2]) res=[task(1), task(2)]
res=add.starmap([(1,2), (2,4)]) res=[add(1,2), add(2,4)]
>>> from proj.tasks import add >>> res = add.chunks(zip(range(100), range(100)), 10)() >>> res.get() [[0, 2, 4, 6, 8, 10, 12, 14, 16, 18], [20, 22, 24, 26, 28, 30, 32, 34, 36, 38], [40, 42, 44, 46, 48, 50, 52, 54, 56, 58], [60, 62, 64, 66, 68, 70, 72, 74, 76, 78], [80, 82, 84, 86, 88, 90, 92, 94, 96, 98], [100, 102, 104, 106, 108, 110, 112, 114, 116, 118], [120, 122, 124, 126, 128, 130, 132, 134, 136, 138], [140, 142, 144, 146, 148, 150, 152, 154, 156, 158], [160, 162, 164, 166, 168, 170, 172, 174, 176, 178], [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]
Source: https://habr.com/ru/post/158961/
All Articles