Rabbitmq & Celery

概述:Rabbitmq & Celery

[TOC]

Rabbitmq & Celery

一、Rabbitmq

1.安装

mac OS为例:

(1)安装Homebrew [已安装可忽略]

1
ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

(2)通过brew安装Rabbitmq

1
brew install rabbitmq

(3)环境变量

1
2
vi /etc/profile
PATH=$PATH:/usr/local/sbin

2.基础概念

(1)高级消息队列协议(Advanced Message Queueing Protocol, AMQP)是面向消息中间件的开放标准应用程序层协议。

(2)在AMQP中,发送消息的应用程序(称为生产者)将消息发布到AMQP代理,该代理将消息分发给处理消息的应用程序(消费者)。

(3)AMQP 消息队列服务器实体(Broker)的内部结构主要包括三种实体,分别是交换器(Exchange)、队列(Queue)、绑定(bindings),如下图:

img

其中bingings是指将Exchange和Queue按路由规则绑定。

(4)Exchange分发消息时根据类型的不同分发策略有区别,目前共三种类型:direct、fanout、topic。

  • direct

img

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

应用场景:现在我们考虑只把重要的日志消息写入磁盘文件,例如只把 Error 级别的日志发送给负责记录写入磁盘文件的 Queue。这种场景下我们可以使用指定的 RoutingKey(例如 error)将写入磁盘文件的 Queue 绑定到 Direct Exchange 上。

  • fanout

img

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键(没有路由键概念),只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的,也实现了一个消息被多个消费者获取的需求。

应用场景:以日志系统为例:假设我们定义了一个 Exchange 来接收日志消息,同时定义了两个 Queue 来存储消息:一个记录将被打印到控制台的日志消息;另一个记录将被写入磁盘文件的日志消息。我们希望 Exchange 接收到的每一条消息都会同时被转发到两个 Queue,这种场景下就可以使用 Fanout Exchange 来广播消息到所有绑定的 Queue。

  • topic

img

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词匹配不多不少一个单词。

应用场景:假设我们的消息路由规则除了需要根据日志级别来分发之外还需要根据消息来源分发,可以将 RoutingKey 定义为 消息来源.级别order.infouser.error等。处理所有来源为 user 的 Queue 就可以通过 user.* 绑定到 Topic Exchange 上,而处理所有日志级别为 info 的 Queue 可以通过 *.info 绑定到 Exchange上。

3.常见命令

(1)启动服务

1
2
3
rabbitmq-server
// 备注:启动失败可尝试 sudo rabbitmq-server
// 后台启动:rabbitmq-server -detached

(2)查看状态

1
rabbitmqctl status

(3)访问

1
2
3
http://localhost:15672

# 默认账号及密码均为guest

(4)停止服务

1
rabbitmq-service stop

(5)查看所有队列

1
rabbitmqctl list_queues

(6)清除所有的队列

1
rabbitmqctl reset

(7)添加用户

1
rabbitmqctl add_user username password

(8)分配角色

1
rabbitmqctl set_user_tags username administrator

(9)新增虚拟主机

1
rabbitmqctl add_vhost vhost_name

4.Hello Rabbitmq

Hello World将展示生产者传输数据至消息队列,消费者从消息队列中取数据的过程,类似于下图:

![image-20201017214611765](/Users/junmingguo/Library/Application Support/typora-user-images/image-20201017214611765.png)

其中P表示生产者,C表示消费者,红色矩阵表示消息队列,hello表示传输的数据。

(1)生产者

send.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import pika
import json

queue = 'demo_queue' # 队列名称
routing_key = 'url_test' # 路由键
exchange = 'oss_test' # 交换器名称

# 新建连接,rabbitmq安装在本地则hostname为'localhost'
hostname = '127.0.0.1'
port = 5672

credentials = pika.PlainCredentials(username='guest', password='guest')
parameters = pika.ConnectionParameters(host=hostname, port=port, credentials=credentials)
connection = pika.BlockingConnection(parameters=parameters)

# 创建通道
channel = connection.channel()
# 创建broker: 交换器类型为direct,durable属性为true时表示会创建一个持久化的交换器
channel.exchange_declare(exchange=exchange, exchange_type='direct', durable=True)
# 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
channel.queue_declare(queue=queue, durable=True)
# 把队列和中间人绑定
channel.queue_bind(exchange=exchange, queue=queue, routing_key=routing_key)
# 交换机; 队列名,写明将消息发往哪个队列; 消息内容
# routing_key在使用匿名交换机的时候才需要指定,表示发送到哪个队列,注意当未定义exchange时,routing_key需和queue的值保持一致
data = {
"name": "jimmy"
}
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=json.dumps(data))
# 关闭连接
connection.close()

(2)消费者

receive.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import pika

queue = 'demo_queue' # 队列名

hostname = '127.0.0.1'
port = 5672

credentials = pika.PlainCredentials(username='guest', password='guest')
parameters = pika.ConnectionParameters(host=hostname, port=port, credentials=credentials)
connection = pika.BlockingConnection(parameters=parameters)
# 创建通道
channel = connection.channel()
channel.queue_declare(queue=queue, durable=True)


def callback(ch, method, properties, body):
print(f"接收到的消息 ======> {body}")
# ch.basic_ack(delivery_tag=method.delivery_tag) # 发送ack消息


# 添加不按顺序分配消息的参数(可选)
# channel.basic_qos(prefetch_count=1)
# rabbitmq使用callback来接收信息
channel.basic_consume(queue=queue,
on_message_callback=callback,
auto_ack=True)

# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理,按ctrl+c退出
print('正在等待接收消息,退出请按CTRL+C ……')
channel.start_consuming()

  • 执行receive.py
  • 执行2次send.py后,查看队列,会发现Ready=2:

![image-20201017213921348](/Users/junmingguo/Library/Application Support/typora-user-images/image-20201017213921348.png)

  • 查看消费者的终端输出
1
2
3
正在等待接收消息,退出请按CTRL+C ……
接收到的消息 ======> b'{"name": "jimmy"}'
接收到的消息 ======> b'{"name": "jimmy"}'

5.问题记录

(1)消费者接收异常

1
2
channel.basic_consume(callback, queue=queue, no_ack=False)
# TypeError: basic_consume() got multiple values for argument 'queue'

版本更新导致参数位置和参数名称均涉及更新:

1
channel.basic_consume(queue=queue, on_message_callback=callback,auto_ack=True)

(2)启动过程异常

1
zsh: command not found: rabbitmq-server

编辑~/.zshrc并写入以下3行配置

1
2
3
4
5
6
7
8
9
10
11
vim ~/.zshrc

PATH=$PATH:/usr/local/sbin
export PATH=$PATH:/usr/local/sbin
export PATH=/usr/local/sbin:$PATH

source .zshrc

# 打开新窗口

rabbitmq-server

(3)启动过程异常

1
2
3
4
5
6
Configuring logger redirection
10:03:26.920 [warning] Failed to write PID file "/usr/local/var/lib/rabbitmq/mnesia/rabbit@localhost.pid": permission denied
10:03:27.093 [error] Supervisor ra_system_sup had child ra_log_ets started with ra_log_ets:start_link......
Kernel pid terminated (application_controller) ({application_start_failure,ra,{{shutdown,{failed_to_start_child,ra_system_sup,{shutdown,{failed_to_start_child,ra_log_ets,{{badmatch,{error,{file_error,

Crash dump is being written to: erl_crash.dump...done

解决: sudo rabbitmq-server

(3)启动失败

zsh: command not found: rabbitmq-server

编辑~/.zshrc并写入以下3行配置

1
2
3
4
5
6
7
8
9
10
11
vim ~/.zshrc

PATH=$PATH:/usr/local/sbin
export PATH=$PATH:/usr/local/sbin
export PATH=/usr/local/sbin:$PATH

source .zshrc

# 打开新窗口

rabbitmq-server

(4)安装相关异常

(1)问题:执行sudo make test编译过程可能报错

解决方案:执行命令清理再编译:sudo make distclean && sudo make && sudo make test

(2)问题:安装Redis的编译过程出现问题“Failed opening the RDB file dump.rdb (in server root dir /usr/local/redis-6.0.5) for saving: Permission denied”,说明未赋予权限,需要对所安装的redis目录给予777权限:

解决方案:sudo chmod -R 0777 redis-6.0.5

二、Celery

1.模块架构

M1 M2 M3
Async Task 异步任务
Producer 任务发布者
发送➡️ ⬅️监控 Worker 任务消费者
Celery Beat 任务调度/定时任务 发送➡️ Broker消息代理 ⬅️监控 Worker 任务消费者
⬇️ 存储
Backend 结果存储

Celery模块:

  • 定时任务/任务调度 Celery Beat

读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。

  • 任务模块 Task

包含异步任务和定时任务,其中异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列,通常由Producer(任务生产者)产生任务交给任务队列。

  • 消息中间件 / 消息代理 Broker

Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

  • 任务执行单元/ 任务消费者 Worker

Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。

  • 任务结果存储 Backend

Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。

2.基本概念

任务生产者 (task producer) 负责产生计算任务,交给任务队列去处理。

任务调度器 (celery beat)会读取配置文件的内容,周期性地将执行任务的请求发送给任务队列。

任务代理 (broker)负责接受任务生产者发送过来的任务处理消息,存进队列之后再进行调度,分发给任务消费方 (celery worker)。

任务消费方 (celery worker)负责接收任务处理中间方发来的任务处理请求,完成这些任务,并且返回任务处理的结果。

Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

3.Hello Celery

(1)安装

1
pip install celery==4.4.7

Tip:celery( v5.0.0 )存在缺陷——backend找不到amqp

(2)创建celery应用

tasks.py

1
2
3
4
5
6
7
from celery import Celery

app = Celery('task_name', broker='amqp://guest:guest@127.0.0.1:5672') # 第一个参数为当前模块名称,第二个参数为中间件参数——指定你使用的消息中间件URL

@app.task
def add(x, y):
return x + y

(3)启动worker

tasks为工程名

1
celery -A tasks worker --loglevel=INFO [-Q QUEUE_NAME]

(4)调用application

触发消息提交至队列

1
2
3
python
from tasks import add
result = add.delay(1,2)

调用(4)后可查看Terminals的worker相关信息输出

(5)查看执行结果

1
2
result.result
# result: 3

(6)使用config加载Celery信息

除了上述app = Celery('tasks', broker='amqp://guest:guest@127.0.0.1:5672')的方法外,还可以将Celery的配置存入celeryconfig.py文件中,使用app.config_from_object加载:

tasks.py

1
2
3
from celery import Celery
app = Celery('task_name')
app.config_from_object('celery_config')

celery_config.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from datetime import timedelta

BROKER_URL = 'amqp://guest:guest@127.0.0.1:5672' # 中间件地址
CELERY_RESULT_BACKEND = 'amqp://' # 后端: 存储任务执行结果
CELERY_TIMEZONE = 'Asia/Shanghai' # 指定时区,不指定默认为 'UTC'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间

CELERYBEAT_SCHEDULE = {
'mybeat': {
# task:需要执行计划任务的函数
'task': 'tasks.add',
# 配置计划任务的执行时间(3秒)
'schedule': timedelta(seconds=3),
# 传入计划任务函数的参数
'args': (1, 200)
}
}
# ……

4.在Django中使用信号量

信号量(Signal):在特定事件发生时,可以发送一个信号去通知注册了这个信号的一个或者多个回调,在回调里进行逻辑处理。

(1)自定义信号量

event.py

1
2
from django.dispatch import Signal  # 内置库 
mySignal = Signal(providing_args=["信号参数1","信号参数2"])

(2)加载信号量 - 绑定处理函数

__init__.py

1
from .event import mySignal

apps.py

1
2
3
4
5
6
from django.apps import AppConfig  # 内置库 
class MyAppConfig(AppConfig):
name = "app"

def ready(self):
from .signals import 处理函数

(3)事件触发 - send

view.py

1
2
from .event import mySignal
mySignal.send(sender="function or class", 信号参数1="something", 信号参数2="others")

(4)接收信号 + 执行(signals.py处理信号的函数)

1
2
3
4
5
6
7
8
9
from django.dispatch import receiver  # 内置库 
from .event import mySignal

@receiver(mySignal)
def 处理函数(sender, instance, **kwargs):
# :param sender: 信号发送类
# :param instance: 实际要保存的实例
# :param kwargs: post_save 参数
print("do something")

(5)celery内置的几种信号

from celery.signals import before_task_publish, task_postrun

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
before_task_publish - sender 发布 task 之前。
after_task_publish - sender 发布 task 之后。

task_prerun - receiver 执行 task 之前。
task_postrun - receiver 执行 task 之后。

task_retry - receiver 执行 task 时,发生「重试」。
task_success - receiver 执行 task 时,发生「成功」。
task_failure - receiver 执行 task 时,发生「失败」。
task_revoked - receiver 执行 task 时,发生「注销」。

task_unknown - receiver 无法辨识。当 project 和 celery 的 task 产生落差时,会发生的状况。

补充:before_task_publish提供了很多参数:任务消息体body、交换器exchange、路由键routing_key、应用头部映射headers、消息属性properties、实体列表declare、重试策略retry_policy。(具体描述可参考官网)

其中 headers = {'lang': 'py', 'task': 'tasks.add', 'id': '15856ead-8a38-4a65-95d5-9baffc3c301c', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '15856ead-8a38-4a65-95d5-9baffc3c301c', 'parent_id': None, 'argsrepr': '[1, 200]', 'kwargsrepr': '{}', 'origin': 'gen29622@JUNMINGGUO-MB0'}

补充: ETA(estimated time of arrival, 预计到底时间)让你设置一个日期和时间,在这个时间之前任务将被执行。 countdown 是一种以秒为单位设置ETA的快捷方式。 确保任务在指定的日期和时间之后的某个时间执行,但不一定在该时间执行。

(6)django内置的几种信号

from django.db.models.signals import pre_save,post_save

1
2
pre_save - models对象保存之前触发
post_save - models对象保存之后触发

(7)自动收集异步任务

1
2
3
app = Celery('CELERY_NAME')
app.config_from_object('settings', namespace='CELERY')
app.autodiscover_tasks()

(8)使用日志记录

1
2
3
4
5
6
7
8
9
from celery.utils.log import get_task_logger                                                     

logger = get_task_logger(__name__)

@app.task(bind=True)
def mytask(self):
logger.info(self.request)

# bind设置为True时,mytask的形参必须填写self,可通过self获取上下文

5.任务调度器(Beat)

(1)配置

celeryconfig.py中配置CELERYBEAT_SCHEDULE:

1
2
3
4
5
6
7
8
9
10
CELERYBEAT_SCHEDULE = {
'mybeat': {
# task:需要执行计划任务的函数(完整路径:文件路径.方法)
'task': 'tasks.add',
# 配置计划任务的执行时间(3秒)
'schedule': timedelta(seconds=3),
# 传入计划任务函数的参数
'args': (1, 200)
}
}

(2)启动周期性任务

任务调度器会周期性(3秒)地启动任务,将需要执行的任务发送给任务队列。

1
2
3
celery -A YOUR_TASK_NAME beat --loglevel=INFO [-Q QUEUE_NAME]
# YOUR_TASK为clery任务名
# QUEUE_NAME为对应的队列名

(3)查看周期性任务

1
2
3
celery -A YOUR_TASK_NAME worker --loglevel=INFO [-Q QUEUE_NAME] 

# QUEUE_NAME 为Beat队列的名称

6.Django-setting其它相关配置

(1)异步任务绑定自定义任务队列名

1
2
3
4
5
6
7
8
9
10
11
12
from kombu import Queue, Exchange
CELERY_TASK_QUEUES=(
Queue('队列名',
exchange=Exchange('队列名'),
routing_key='队列名'),
...
)

CELERY_TASK_ROUTES={
'文件路径.异步方法名': '队列名',
...
}

(2)周期性任务

1
2
3
4
5
6
CELERY_BEAT_SCHEDULE={
'周期性任务名': {
'task': '周期性任务文件路径.周期性任务名',
'schedule': 60, # 间隔触发时间:60秒
},
}

实践

1.安装celery

2.安装rabbitmq

3.实践celery + rabbitmq

1
2
3
4
5
6
7
from celery import Celery

app = Celery('tasks', broker='amqp://guest:guest@127.0.0.1:5672')

@app.task
def add(x, y):
return x + y

4.运行celery

1
celery -A tasks worker --loglevel=INFO

5.查看完整的命令行参数列表

1
celery worker --help

6.调用任务:使用delay()调用任务,是apply_async()的快捷方式

1
2
from tasks import add
add.delay(4,2)

三、Celery_once

1.作用

可防止Celery重复执行相同任务,即避免队列中存在重复异步任务。

2.安装

1
pip install celery_once

3.实现原理

采用redis分布式锁,重写了Celery的apply_async(),重写的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def apply_async(self, args=None, kwargs=None, **options):
"""
Attempts to queues a task.
Will raises an AlreadyQueued exception if already queued.

:param \*args: positional arguments passed on to the task.
:param \*\*kwargs: keyword arguments passed on to the task.
:keyword \*\*once: (optional)
:param: graceful: (optional)
If True, wouldn't raise an exception if already queued.
Instead will return none.
:param: timeout: (optional)
An `int' number of seconds after which the lock will expire.
If not set, defaults to 1 hour.
:param: keys: (optional)

"""
once_options = options.get('once', {})
once_graceful = once_options.get(
'graceful', self.once.get('graceful', False))
once_timeout = once_options.get(
'timeout', self.once.get('timeout', self.default_timeout))

if not options.get('retries'):
key = self.get_key(args, kwargs)
try:
self.once_backend.raise_or_lock(key, timeout=once_timeout)
except AlreadyQueued as e:
if once_graceful:
return EagerResult(None, None, states.REJECTED)
raise e
return super(QueueOnce, self).apply_async(args, kwargs, **options)

重点内容在于:

1
2
3
4
5
6
7
8
9
if not options.get('retries'):
key = self.get_key(args, kwargs)
try:
self.once_backend.raise_or_lock(key, timeout=once_timeout)
except AlreadyQueued as e:
if once_graceful:
return EagerResult(None, None, states.REJECTED)
raise e
return super(QueueOnce, self).apply_async(args, kwargs, **options)

(1)try中去判断队列中是否存在(尝试加锁进行判断),若加锁失败则表示队列中存在相同异步任务,因而触发except AlreadyQueued,如果once_graceful=True则表示不抛错,反之设为Falseraise e

(2)注意到如果触发AlreadyQueued时会返回EagerResult对象,其中它的__init__

1
2
3
4
5
6
7
8
9
def __init__(self, id, ret_value, state, traceback=None):
# pylint: disable=super-init-not-called
# XXX should really not be inheriting from AsyncResult
self.id = id
self._result = ret_value
self._state = state
self._traceback = traceback
self.on_ready = promise()
self.on_ready(self)

原因在于Celery启动周期性任务Beat时,会触发celery/beat.py中的apply_entry方法,源码:

1
2
3
4
5
6
7
8
9
def apply_entry(self, entry, producer=None):
info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
try:
result = self.apply_async(entry, producer=producer, advance=False)
except Exception as exc: # pylint: disable=broad-except
error('Message Error: %s\n%s',
exc, traceback.format_stack(), exc_info=True)
else:
debug('%s sent. id->%s', entry.task, result.id)

分析可知,apply_entry中会获取apply_async的返回结果赋给result,其中发布异步任务失败(当队列中存在相同任务时),会进入else分支,而该部分需要获取result.id,所以EagerResult中需要赋予id字段为None,从而避免触发NoneType has not key ‘id’,而其它字段为可选字段,例如_state,_traceback等。

4.备注

Celery_once实现原理采用的redis共享锁,因此中间件(broker)仅支持redis,不支持其它broker,例如rabbitmq

1
2
3
4
5
6
7
CELERY_ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': YOUR_REDIS_URL,
'default_timeout': 600, # 任务下发后10分钟后删掉对应的celery-once锁,即10分钟后该任务可重新放入队列
}
}

四、Flower-Celery

Flower-Celery是一款Celery的监控可视化工具

1.安装

1
pip install flower

2.启动

1
2
flower -A tasks --port=5555
# tasks为Celery.app名称

3.访问

http://localhost:5555/tasks

4.可视化展示

![image-20201020162051867](/Users/junmingguo/Library/Application Support/typora-user-images/image-20201020162051867.png)

文档补充