celery学习笔记(celery怎么记)

网友投稿 415 2022-08-22


celery学习笔记(celery怎么记)

目录​

​​celery实现异步任务: 1​​​

​​celery定时任务: 2​​​

​​celery组件: 5​​​

​​实例化celery: 5​​​

​​发送1个celery任务: 5​​​

​​任务组: 6​​​

​​任务链: 6​​​

​​重写celery基类; 6​​​

​​celery命令: 7​​​

​​celery监控: 7​​​

celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度;​

celery的架构由三部分组成:​

消息中间件message broker,celery本身不提供消息服务,但可方便的和第三方提供的消息中间件集成包括RabbitMQ、redis等;​

任务执行单元worker,worker是celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中;​

任务执行结果存储task result store组成,task result store用来存储worker执行的任务的结果,celery支持以不同方式存储任务的结果,包括AMQP、redis等;​

另celery还支持不同的并发和序列化的手段:​

并发,prefork、eventlet、gevent、threads/singleThreaded;​

序列化,pickle、json、yaml、mspack、zlib、bzip2Compression、cryptographic message signing等;​

使用场景:​

celery是一个强大的分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行,我们通常使用它来实现异步任务async task和定时任务crontab;​

异步任务:将耗时操作任务提交给celery去异步执行,比如发送短信|邮件、消息推送、音视频处理等;​

定时任务:定时执行某件事情,如每天数据统计;​

celery具有以下优点:​

simple简单:celery使用和维护都非常简单,并且不需要配置文件;​

highly available高可用:worker和client会在网络连接丢失或失败时,自动进行重试,并且有的brokers也支持双主或主从实现高可用;​

fast快速:单个的celery进程每分钟可处理百万级的任务,并且只需要ms级的往返延迟(使用rabbitmq|librabbitmq|优化设置时);​

flexible灵活:celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消息者、生产者、broker传输等;​

安装:​

pip install -U Celery​

取执行结果:​

from celery.result import AsyncResult​

from celery_task import cel​

async_result = AsyncResult(id='477c9d77-c62b-4fe5-9035-9087f7ad018a', app=cel)​

if async_result.successful():​

result = async_result.get()​

执行成功')​

将结果删除​

无论现在是什么时候,都要终止​

如果任务还没有开始执行,就可以终止​

elif async_result.failed():​

执行失败')​

elif async_result.status == 'PENDING':​

任务等待中被执行')​

elif async_result.status == 'RETRY':​

任务出错正在重试')​

elif async_result.status == 'STARTED':​

任务已经开始执行')​

celery_tasks/{__init__.py, celery.py, task01.py, task02.py}​

celery_tasks/celery.py​

from celery import Celery​

cel = Celery('celery_demo', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=['celery_tasks.task01', 'celery_tasks.task02'])​

cel.conf.timezone = 'Asia/Shanghai'​

cel.conf.enable_utc = False​

check_result.py​

from celery.result import AsyncResult​

from celery_task import cel​

res = AsyncResult(id='...', app=cel)​

if res.successful():​

result = res.get()​

print(result)​

elif res.failed():​

print('failed')​

elif res.status == 'PENDING':​

任务等待中被执行')​

elif res.status == 'RETRY':​

任务异常后正在重试')​

elif res.status == 'STARTED':​

任务已经开始被执行')​

ctime = datetime.now()​

utc_ctime = datetime.utcfromtimestramp(ctime.timestamp())​

time_delay = timedelta(seconds=10)​

task_time = utc_ctime + time_delay​

result = send_email.apply_async(args=['egon'], eta=task_time) # 有eta就是定时任务​

celery实现异步任务:​

​​install celery eventlet redis #eventlet仅win下需要​

tasks.py #用于配置任务,main.py用来执行​

from celery import Celery​

import time​

celery = Celery("tasks",​

broker="redis://:ane56pda@10.10.101.47:6378/0",​

backend="redis://:ane56pda@10.10.101.47:6378/0")​

@celery.task #加上此装饰器,这个函数就变成celery任务了(task)​

def send_mail():​

邮件开始发送....')​

time.sleep(10)​

邮件发送结束!')​

main.py #执行后,会发现立马就结束,不会被阻塞等待10s​

from tasks import send_mail​

if __name__ == '__main__':​

res = send_mail.delay() #这样调用,就变成异步任务了,不会被阻塞,是.apply_async()是快捷方式,.apply_async((2, 2), queue='lopri', countdown=10)可指定运行参数|运行的时间|使用的任务队列,返回结果为AsyncResult实例,可用于跟踪任务状况;res.ready()检测是否已处理完毕;res.get(timeout=1)将异步调用转为同步调用,res.get(propagate=False)如果任务出现异常,get()会再次引发异常使用此参数覆盖,res.traceback进行回溯;res.id获取任务ID;res.failed(),res.successful(),检查任务执行成功或失败;res.state,PENDING-->STARED-->SUCCESS,另RETRY;​

>celery -A tasks.celery --pool=eventlet worker --loglevel=info上不指定--pool​

注:​

# broker(中间人):存储任务的队列​

# worker:真正执行任务的工作者,单独手动运行worker,celery -A tasks.celery worker --loglevel=info​

# backend:用来存储任务执行后的结果​

# redis://:password@hostname:port/db_number​

注:​

Run a worker​

If you jumped in and already executed the above code you will be disappointed to learn that .wait() will never actually return. That’s because you also need to run a Celery worker to receive and execute the task.​

$ celery -A your_application.celery worker​

The your_application string has to point to your application’s package or module that creates the celery object.​

Now that the worker is running, wait will return the result once the task is finished.​

celery定时任务:​

生产配置:​

redis​

celery​

django-celery-results​

django-celery-beat​

eventlet​

INSTALLED_APPS = [​

...​

'django_celery_results',​

'django_celery_beat'​

]​

>python manage.py makemigrations django_celery_beat​

>python manage.py migrate django_celery_beat​

>python manage.py makemigrations django_celery_results​

>python manage.py migrate django_celery_results​

CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'​

# CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'​

CELERY_RESULT_BACKEND = 'django-db'​

CELERY_RESULT_SERIALIZER = 'json'​

# CELERY_TIMEZONE = TIME_ZONE​

celery.py​

from __future__ import absolute_import, unicode_literals​

import os​

from celery import Celery, platforms​

from celery.schedules import crontab​

# set the default Django settings module for the 'celery' program.​

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'yto_monitor.settings_prd')​

# app = Celery('yto_monitor', broker='redis://127.0.0.1:6379/', backend='redis://127.0.0.1:6379/')​

app = Celery('yto_monitor')​

# Using a string here means the worker doesn't have to serialize​

# the configuration object to child processes.​

# - namespace='CELERY' means all celery-related configuration keys​

# should have a `CELERY_` prefix.​

app.config_from_object('django.conf:settings_prd', namespace='CELERY')​

# Load task modules from all registered Django app configs.​

app.autodiscover_tasks()​

@app.task(bind=True)​

def debug_task(self):​

print('Request: {0!r}'.format(self.request))​

# platforms.C_FORCE_ROOT = True​

app.conf.beat_schedule = {​

'add-task': {​

'task': 'midmonitor.tasks.add',​

另crontab(minute=30, hour=0),crontab(hour=6, minute=0, day_of_month='1'),datetime.timedelta(seconds=20),​

'args': (5, 6)​

}​

}​

app.conf.timezone = 'Asia/Shanghai'​

midmonitor/tasks.py​

# Create your tasks here​

from __future__ import absolute_import, unicode_literals​

from celery import shared_task​

@shared_task​

def inspect_redis():​

pass​

>celery -A yto_monitor --pool=eventlet worker -l info #处理任务​

>celery -A yto_monitor beat -l info #发送任务​

> /usr/local/python368/bin/celery --workdir=/data/app/yto_monitor -A yto_monitor worker --loglevel=info​

> /usr/local/python368/bin/celery --workdir=/data/app/yto_monitor -A yto_monitor beat --loglevel=info​

/usr/local/python368/bin/uwsgi --ini /data/app/yto_monitor/yto_monitor/uwsgi_test.ini​

celery组件:​

worker (任务执行者),用来执行具体任务,可在多台服务器部署实现扩展,项目中我们使用 python 进行开发​

broker (中间人),用来实现任务调度、worker 管理等功能;支持 RabbitMQ、Redis、Zookeeper 等中间件,项目中我们使用 redis​

backend 用来存储任务结果,项目中我们使用 redis​

application (应用),用来实例化 celery​

tasks (任务),用来构建 application​

实例化celery:​

# 最简化构建一个 celery 应用,指定了 broker 和 backend​

from celery import Celery​

# 定义 broker 和 backend,分别为任务中间人和结果保存路径​

BROKER = "redis://:@127.0.0.1:6379/3"​

BACKEND = "redis://:@127.0.0.1:6379/4"​

app = Celery("tasks",broker=BROKER,backend=BACKEND,)​

# 定义一个任务,名字为 add​

@app.task​

def add(x, y): ​

c = x + y​

print('计算结果为: %d ' % c)​

return c​

@app.task(bind=True,max_retries=3) # 最大重试 3 次​

def test_retry(self):​

print('执行 Celery 重试')​

raise self.retry(countdown=1) # 1 秒后执行重试​

@app.task(bind=True)​

def test_fail(self):​

print('执行 Celery 失败')​

raise RuntimeError('测试 celery 失败')​

发送1个celery任务:​

test_sender.py​

# 脚本用来发送 celery 任务​

from test_celery import * ​

# 最简洁推送一个任务,不支持任何选项​

add.delay(3,6)​

# 推送一个任务,第一个参数,​

# 如果任务只需要一个参数,必须添加逗号进行转换,格式 (var1,)​

# countdown=10,10 秒后开始执行​

add.apply_async((2,5), countdown=10)​

# 参数的其他写法,​

add.apply_async(kwargs={'x':4, 'y':8}) ​

add.s(5,6).apply_async()​

# 任务失败重试​

test_retry.apply_async()​

任务组:​

test_sender.py中追加​

# 任务组​

from celery import group​

numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]​

res = group(add.subtask(i) for i in numbers).apply_async()​

print(res.get())​

任务链:​

test_sender.py中追加​

# 使用 link,将任务结果作为第一个参数传递到下一个任务​

add.apply_async((2, 3), link=add.s(16))​

# 同样,前一个任务结果作为下一个任务的第一个参数​

from celery import chain​

res = chain(add.s(2, 2), add.s(4), add.s(8))()​

print(res.get())​

# 使用管道符​

(add.s(2, 2) | add.s(4) | add.s(8))().get()​

重写celery基类;​

celery event,监控celery相关事件,可定制worker进程报警|任务失败报警等功能;​

定时任务;​

celery命令:​

celery -A test_celery worker -l info #worker会一直占用终端,可用-D放至后台;worker进程中包含多个子进程,默认为cpu核数,可用-c指定启动子进程个数;-Q queue_name1,queue_name2指定队列名称,多个用逗号分隔,推送任务到指定队列add.apply_async((10,20),queue='queue_name1')​

celery -A test_celery report #查看celery相关信息​

celery -A test_celery inspect active_queues #查看活动队列​

celery -A test_celery inspect stats #检查状态​

celery -A test_celery inspect report #检查报告​

celery -A tset_celery purge #清除队列中的任务​

celery -A test_celery inspect ping #发送ping​

celery -A test_celery control shutdown #关闭worker进程​

celery -A test_celery worker --autoscale=10,2 -n au #动态加载celery pool个数​

celery -A test_celery status #查看worker集群中存活的节点​

]# vim celery.py​

app.conf.task_routes = {'cmdb.api.v_center.get_cluster_data': {'queue': 'vcenter'}}​

/usr/local/python368/bin/celery --workdir=/data/app/yto_monitor -A yto_monitor worker -Q vcenter,celery -l info # -Q指定队列名,celery为默认的​

celery监控:​

pip install flower #安装celery监控插件​

celery multi start 3 -A test_celery -l info -c 4 --pidfile=tmp/celery_%n.pid -f logs/celery.log #启动3个worker,每个worker启动4个子进程​

celery flower -A test_celery --port=8080​

/usr/local/python368/bin/celery flower --workdir=/data/app/yto_monitor -A yto_monitor --port=8080​


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Maven依赖junit @Test报错的解决方案
下一篇:Python中的循环结构(python里的循环结构)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~