AMQP概念
通过消息机制,可以实现数据传输,非阻塞型操作,推送通知,发布/订阅,异步处理,work队列。
AMQP当中有四个概念非常重要:虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)。
virutal host相当于namespace,用于不同tenant之间的exchange, queue, binding的隔离。
Queue队列, 每个消息都会被投入到一个或多个队列。消息就一直在里面,直到有客户端(也就是消费者,Consumer)连接到这个队列并且将其取走为止。队列是由消费者(Consumer)通过程序建立的,不是通过配置文件或者命令行工具。
Binding绑定, 它的作用就是把exchange和queue按照路由规则绑定起来。
Routing_Key路由关键字:exchange根据这个关键字进行消息投递。
Channele消息通道:在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
Exchange交换机,对消息进行路由,当收到Publisher传递给它的消息后,Excahnge会根据路由键routing_key决定将消息加入到哪些消息队列中。
消息的类型:
Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。一对一交换类型。
Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。一对多主题多播交换类型。
Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。一对多广播交换类型。
RabbitMQ简介与特点
RabbitMQ是一个开源的AMQP协议的实现,它具有如下特点:可靠性(Reliability), RabbitMQ使用一些机制来保证程序的可靠性,如持久化、传输确认机制、发布确认、高可用性。灵活的路由机制(Flexible Routing), 在消息进入队列之前,通过Exchange来路由消息的。对于典型的路由功能,RabbitMQ已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。消息集群(Clustering)多个RabbitMQ服务器可以组成一个集群,形成单个逻辑Broker。Federation, For servers that need to be more loosely and unreliably connected than clustering allows, RabbitMQ offers a federation model.队列高可用(Highly Available Queues), 队列可以在集群中的机器上进行镜像,以确保在硬件问题下还保证消息安全。多种协议的支持(Multi-protocol), RabbitMQ支持多种消息队列协议。
一个rabbitmq python例子
#coding:utf-8
import sys
from amqplib import client_0_8 as amqp
if __name__ == '__main__':
if (len(sys.argv) <= 1):
ispublisher = '0'
print "Then pls run 'rabbittest 1' to sent message."
else:
ispublisher = sys.argv[1]
conn = amqp.Connection(host="localhost:5672 ", userid="guest", password="password", virtual_host="/", insist=False)
# 每个channel都被分配了一个整数标识
chan = conn.channel()
# 创建一个队列,它是durable的(重启后会重新建立)a
# 消费者断开时不会自动删除(auto_delte=False)
chan.queue_declare(queue="queue1", durable=True, exclusive=False, auto_delete=False)
# 创建交换机,参数意思和上面的队列是一样的,还有一个type类型:fanout, direct, topic
chan.exchange_declare(exchange="switch1", type="direct",
durable=True, auto_delete=False,)
# 绑定交换机和队列
chan.queue_bind(queue="queue1", exchange="switch1", routing_key="key1")
if (ispublisher == '1'):
# 生产者
msg = amqp.Message("Test message!")
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg, exchange="switch1", routing_key="key1")
else:
# 主动从队列拉消息
msg = chan.basic_get("queue1")
print msg.body
chan.basic_ack(msg.delivery_tag)
# 消息来了通知回调
# 如果no_ack=True可以使用chan.basic_ack()人工确认,使用delivery_tag参数
def recv_callback(msg):
print 'Received: ' + msg.body
chan.basic_consume(queue='queue1', no_ack=False,
callback=recv_callback, consumer_tag="testtag")
# chan.basic_cancel("testtag") # 取消回调函数
while True:
chan.wait() # 等待在队列上,直到下一个消息到达队列。
chan.close()
conn.close()
RabbitMQ CLI
安装,sudo apt-get install rabbitmq-server
重启,sudo service rabbitmq-server restart
sudo rabbitmqctl list_vhostssudo rabbitmqctl add_vhost demo
sudo rabbitmqctl list_users
sudo rabbitmqctl add_user test password
sudo rabbitmqctl change_password test password
sudo rabbitmqctl clear_password test
sudo rabbitmqctl list_user_permissions test
sudo rabbitmqctl set_permissions -p demo test ".*" ".*" ".*"
sudo rabbitmqctl clear_permissions -p demo test
sudo rabbitmqctl list_queues -p demo name durable auto_delete slave_pids synchronised_slave_pids
sudo rabbitmqadmin delete queue name='queuename'
sudo rabbitmqctl list_exchanges -p demosudo rabbitmqctl list_bindings -p demo
sudo rabbitmqctl list_consumers -p demosudo rabbitmqctl statussudo rabbitmqctl report
RabbitMQ GUI
Enable it, sudo rabbitmq-plugins enable rabbitmq_management
http://localhost:15672 (guest/guest)
RabbitMQ配置文件
http://www.rabbitmq.com/configure.html#configuration-file
sudo find / -name rabbitmq.config*
sudo mv /usr/share/doc/rabbitmq-server/rabbitmq.config.example.gz /etc/rabbitmq/cd /etc/rabbitmq/ && sudo gunzip rabbitmq.config.example.gz
sudo mv rabbitmq.config.example rabbitmq.config
RabbitMQ调优
1, 流控(Flow Control)机制,默认RabbitMQ在使用机器的40%以上的内存时流控机制会起作用block掉所有连接。故确保使用64位操作系统与64位Erlang VM。当RabbitMQ是集群情况下,当其中有一台机器硬盘不足的时候,所有节点的producer链接也会被阻止。
rabbitmqctl set_vm_memory_high_watermark 0.4
rabbitmqctl set_vm_memory_high_watermark_paging_ratio 0.75
rabbitmqctl status
http://www.rabbitmq.com/memory.html
Max open files,/etc/default/rabbitmq-server
ulimit -n 65535
cat /proc/$RABBITMQ_BEAM_PROCESS_PID/limits
2, Erlang的Hipe优化, 可以设置hipe_compiles设置。可以看到有20-50%的性能优化。而你只需要付出1分钟左右的延迟启动。 HiPE需要你检查是否编译进入你的Erlang安装环境。Ubuntu,需要安装erlang-base-hipe.默认有些平台不支持。如果Erlang VM segfaults,请关闭这个选项。
[{rabbit, [{hipe_compile, true}]}].