IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    使用Celery实现Python分布式任务处理

    Derobukal发表于 2023-11-11 04:55:35
    love 0

    Celery是一个任务队列,它可以实现跨进程和机器的分布式任务处理。任务队列的输入端会输入各种任务(task),这些任务会在输出端由worker进行处理,这些任务会由客户端通过发送消息的方式交给broker,随后broker把任务分发给worker。

    安装组件

    本文使用到的组件版本

    组件版本
    Python2.7.16
    Celery4.4.7
    Redis6.2.4
    redis-py3.2.1

    首先我们需要安装celery和Redis的依赖包

    pip install celery==4.4.7pip install redis==3.2.1

    Celery支持多种类型的broker,在这里我们主要使用Redis作为Celery的broker,关于Redis的安装和使用可以参考我之前的文章Redis failover。

    构建应用

    我们首先创建如下的目录结构(本文的示例代码都放在了GitHub上面)

    .├── run.py└── search    ├── __init__.py    ├── config.py    └── tasks.py

    创建celery应用

    search/config.py包含了一些celery的配置文件,具体配置如下

    1
    2
    3
    4
    5
    6
    7
    8
    # 设置任务模块
    include = ['search.tasks']

    # 对指定的任务使用一个特定的队列进行路由,该任务在发送时会被发送到该指定队列中
    # 未指定队列的任务默认发送到一个名叫celery的队列
    task_routes = {
    'search.tasks.sort_list': {'queue': 'queue_1'}
    }

    在search/__init__.py中我们利用如上的配置信息初始化一个celery的app

    1
    2
    3
    4
    5
    6
    from __future__ import absolute_import
    from celery import Celery
    from . import config

    app = Celery('search', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/1')
    app.config_from_object(config)

    创建celery任务

    如上我们创建了一个celery的app,该app使用Redis作为broker和backend,之后我们在search/tasks.py中创建任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import requests
    from . import app

    @app.task
    def search_url(url):
    r = requests.get(url)
    return r.status_code

    @app.task
    def add(x, y):
    return x + y

    @app.task
    def sort_list(data):
    return sorted(data)

    启动celery的worker

    写完代码之后我们在项目根目录执行如下命令启动一个celery的worker

    celery -A search worker -Q queue_1,celery -l info

    其中,-A是--app的简写,代表启动的应用;worker表示当前命令是要启动一个celery的worker;-Q queue_1,celery表示当前worker监听queue_1和celery队列,不指定的话默认使用一个名叫celery的队列;-l info表示日志的级别。启动后输出如下内容,代表celery的worker已经成功启动了

    -------------- celery@Mac v4.4.7 (cliffs)--- ***** ----- -- ******* ---- Darwin-19.6.0-x86_64-i386-64bit 2022-04-15 14:30:06- *** --- * --- - ** ---------- [config]- ** ---------- .> app:         search:0x104604090- ** ---------- .> transport:   redis://127.0.0.1:6379/1- ** ---------- .> results:     redis://127.0.0.1:6379/1- *** --- * --- .> concurrency: 4 (prefork)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** ----- -------------- [queues]                .> celery           exchange=celery(direct) key=celery                .> queue_1          exchange=queue_1(direct) key=queue_1[tasks]. search.tasks.add. search.tasks.search_url. search.tasks.sort_list[2022-04-15 14:30:06,838: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1[2022-04-15 14:30:06,855: INFO/MainProcess] mingle: searching for neighbors[2022-04-15 14:30:07,930: INFO/MainProcess] mingle: all alone[2022-04-15 14:30:07,983: INFO/MainProcess] celery@Mac ready.

    发送任务

    之后我们在run.py中发送celery任务并交给worker执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    import logging
    from search.tasks import search_url, add, sort_list, get_redis_keys

    if __name__ == '__main__':
    logging.info(search_url.delay('https://www.jd.com').get(5))
    logging.info(add.delay(1.5, 3.5).get(5))
    data = [5, 86, 59, 17, 24, 92, 38, 95, 13, 89, 63, 3, 4, 60, 6]
    logging.info(sort_list.delay(data).get(5))
    # 将任务发送到指定队列,如果该队列没有worker监听,则此任务不会执行,5秒后超时
    logging.info(add.apply_async((1.5, 3.5), queue='queue_2').get(5))

    只需要在原有的方法基础上添加一个delay方法,就可以实现任务的发送并交给worker执行,非常简单。delay是apply_async方法的简化版,在apply_async方法中我们还可以指定该任务的发送队列,以及一些其它的配置。

    apply_async方法的返回值是一个AsyncResult类型,该类型的对象可以获取任务的信息,例如successful()和failed()方法可以获取到该任务是否执行成功,id和state属性可以获取到该任务的id和状态。如上所示,get()方法可以获取到该任务的返回值,为了避免卡死可以在执行时添加get(timeout)方法的超时时间。

    通过设置队列实现路由功能

    celery可以通过设置队列来实现任务的路由。假设我们有三个任务,它们发送任务的队列设置分别为q1,q2,q3。同时我们还有三个worker,它们的队列设置分别为q1,q1,q2,q3。那么任务1将会发送到worker1或者worker2上执行,而任务2和任务3都会在worker3上执行,通过队列就可以实现把任务发送到指定的worker上执行的功能。

    workerqueuetask
    111
    211
    32,32,3

    由上可见,task是和queue绑定的,一个task只能发送到一个指定的queue。而一个worker既可以监听多个queue,也可以多个worker监听一个queue,前者可以实现worker能力的扩展,后者可以实现任务的多负载均衡。

    celery的监控

    Flower是一个celery的网页监控和管理工具,使用前需要先安装

    pip install flower==0.9.2

    之后我们可以启动它

    celery -A search flower --port=5555 -Q queue_1,celery -l info

    随后访问http://127.0.0.1:5555就可以查看celery的监控信息了,我们也可以在网页上对celery进行一些管理操作。

    参考

    https://docs.celeryq.dev/en/v4.4.7/index.html
    https://github.com/RitterHou/celery_demo



沪ICP备19023445号-2号
友情链接