RabbitMQ是使用最广泛的开源消息队列中间件之一,它是用Erlang写的,并实现了高级消息队列协议AMQP。消息队列可以在应用间交换消息,实现解耦、异步处理、削峰、缓冲等目的,应用场景还是比较丰富的。除了RabbitMQ,还有其他的消息队列可用,比如Kafka、Java写的ActiveMQ和阿里出品的RocketMQ等,现在网上有很多他们的对比,你可以具体分析分析那种可能更适合你。
RabbitMQ现在属于Pivotal公司,而Spring框架也是Pivotal出品,所以两者搭配使用也是顺理成章,Spring Boot也有AMQP Starter以提供支持。在打开IDE演示代码如何编写之前,先说说消息队列以及RabbitMQ具体是怎么工作的。
消息队列,顾名思义,应用程序之间传递的消息会放在队列里。消息的发送者叫做生产者,消息的接收者叫消费者。生产者的消息不直接发送到消费者,而是经过队列中转,队列可以保存许多消息,大小可以增长到内存的限制,也可以持久化到磁盘。生产者、消费者、和队列保持了一定程度的独立性,大部分情况下运行于不同的机器上。
一个RabbitMQ实例中,可能有多个队列,每个队列都有自己的名字,消费者可以根据队列的名字,获取队列里的消息。单个队列可以对接多个消费者,然而并不是每个消费者都能拿到所有消息,而是像扑克发牌每人一张一样,队列会轮训所有消费者,挨个发消息(在Spring AMQP中默认会有点不一样)。
生产者发送消息的过程,会比较复杂一点。消息并不是直接到达队列,而是要经过交换器(exchange)的分配。交换器和队列是有对应关系的,这个关系可以使用绑定键(binding key)来表示,如下图。交换器可以有多个绑定键,绑定键可以对应一个或多个队列,队列也可以对应多个绑定键。
另外生产者发送消息的时候,需要提供交换器的名字和一个路由键(routing key),这样交换器会根据路由键,对比绑定键,将消息投递到具体的队列上。路由键、绑定键和交换器的存在,其实也可以理解为一种高层次的抽象,为的是承载不同的交换器类型以及工作模式,所以刚开始的时候,这些概念会让人很疑惑,不知这样设计的意图。我们不妨从交换器的具体类型说起,这样更好理解一点。
fanout类型的交换器,收到消息之后,会忽略路由键,直接投递到所有绑定了当前交换器的队列中,很像广播。direct类型的交换器,会将消息投递给“绑定键==路由键”的队列,这种类型的交换器使用最简单。RabbitMQ默认会创建一个direct类型的交换器,名称为空字符串,你可以通过rabbitmqctl list_exchanges
命令查看现有的交换器。你创建队列的时候,默认情况下会和默认交换器绑定,绑定键就是队列的名字,这样简化了使用,最终导致的效果就是:你给出的路由键就可以等于队列名,直接将消息发送到某个队列。
topic类型的交换器比direct类型的更灵活一点,在比较路由键和绑定键的时候,并不是比较是否相等,而是进行匹配,类似与正则。这种情况下路由键必须是由”.”分隔的字符串,比如”foo.bar.baz”,而绑定键必须是和路由键的形式是一样的,只是额外可以模糊匹配,”*”可以匹配一个单词,”#”可以匹配0个1个或者多个单词,比如”foo.*.*”就可以匹配”foo.bar.baz”。当topic类型的交换器收到消息后,会将路由键和绑定键比较,是否匹配,然后才会把消息放到队列里。还有一些交换器类型,用的少一些,在此就不多说了。
RabbitMQ的安装还是比较简单的,成熟的包管理工具基本都可以用一个命令安装,比如Mac用brew install rabbitmq
,Ubuntu用apt install rabbitmq-server
。
RabbitMQ的默认端口是5672,默认会有个用户,用户名和密码都是guest,不过guest只能在本机(localhost)使用。使用rabbitmqctl add_user user pass
命令来添加新用户。
rabbitmqctl命令的操作和参数还是挺多的,你可以通过官方文档查看。不过还有种比较直观的方式是使用RabbitMQ自带的一套网页UI。当你的RabbitMQ在后台运行起来之后,他自动启动了个服务器,你可以通过15672端口访问。他可以增删查改交换器和队列等,开发时使用比较方便直观。
RabbitMQ官方提供了一个Java的编程客户端,如果你使用的是Gradle,那么可以通过下面的方式来引入,其中的版本号你可以按照最新的更改:
implementation 'com.rabbitmq:amqp-client:5.9.0'
Spring有个项目叫做Spring AMQP,它针对AMQP封装了个抽象层,底层可以有多个实现,最常见的实现就是RabbitMQ了。Spring AMQP封装了RabbitMQ的Java客户端,提供了更加方便的API,使用模版就可以进行大部分的操作。比如AMQPTemplate
接口,它覆盖了AMQP协议里通用的发送和接受消息的方法,RabbitTemplate
实现了AMQPTemplate
,并提供了更丰富操作。从抽象和可替换的角度来讲,使用AMQPTemplate
比较好,其实我都是用RabbitTemplate
,反正也不打算换其他AMQP实现。
Spring Boot针对Spring AMQP也提供了自动配置,使用起来很简单,第一步加对应的Starter依赖,第二步配置连接信息,第三步就能编码了。在配置连接信息的时候,host默认是localhost,port默认是5672,用户名和密码默认是上面提到的guest用户。
send
方法有很多重载形式,可以指定路由键,不指定的话默认是空字符串;可以指定交换器,不指定的话默认为空,就是我们上面提到的RabbitMQ提供的direct类型的交换器;sendAndReceive
是用来进行RPC调用的,暂且不议。
Message类是用来表示消息的,它由byte[]
和MessageProperties
类型的成员变量组成,作用类似于HTTP响应报文中的响应体和首部。convertAndSend
方法可以传递一个Object对象进去(一般是你的业务对象),然后转换为Message对象,这个转换操作需要借助MessageConverter
来完成。RabbitTemplate
会有一个默认的SimpleMessageConverter
,后者会将byte[]
、String
和Serializable
类型的对象处理成Message对象。
虽然Message的内容是byte[]
,但是具体什么格式可以根据自己项目的需要自定义,搞成自己的二进制的或者XML或者JSON等格式。咱就统一弄成JSON,写REST接口和使用Redis的时候用的都是JSON,这时候也用,多统一,反正后端搞的都是这些东西。可以通过Jackson2JsonMessageConverter
来将业务对象转换成JSON,只要将其对象放入容器中即可,Spring Boot会自动将其配置给RabbitTemplate
。
上图中使用了容器中的ObjectMapper
,这个对象可能在其他使用了Jackson的地方用过了。如果不符合你的需求,可以再根据自己需求创建个。我比较倾向于在项目里用JSON的地方使用统一的ObjectMapper
。
既然已经可以把消息发送出去了,那么哪个队列接收呢?在使用默认direct交换器的情况下,需要一个与路由键同名的队列。可以使用声明(declare)的方式告诉RabbitMQ,我需要一个什么名字什么属性的队列,如果此队列不存在,则会创建一个新的,如果已经存在,声明不会有什么效果,只不过已有队列属性和你声明的不一致,会报错的。
你可以使用rabbitmqadmin命令在终端中声明队列,或者使用比较常见的方式,在Spring容器中创建个Queue
类型的Bean,Spring AMQP会自动监测到并向RabbitMQ发出声明。我实践了下,单纯创建一个Queue
并不会立马创建队列,而是在程序发送或者接收等场景,连接建立的时候创建的。
Queue
类有很多参数,其中比较常用的就是名称和是否持久化。上图的task-queue队列是持久的,也就是说RabbitMQ重启之后,队列和其中的消息还在。还有其他许多参数可以使用的,可以在构造方法的arguments参数中添加,比如x-max-length表示队列的最大长度,x-overflow表示队列超长后的行为,x-expires设置队列多久不用会被删除等等。
交换器和绑定同样也是需要声明的,形式也是和队列类似,需要在容器里添加对象。交换器有个对应的接口Exchange
,你需要声明什么类型的交换器,就创建对应类型的对象。
绑定对应的类是Binding
,不过一般通过BindingBuilder
来创建:
上图中将队列和交换器对应了起来,并以队列的名字作为绑定键。我们声明的这些对象,一旦在RabbitMQ中创建成功了,就不可以更改,比如你想让Queue的durable属性从true改为false,直接改代码是不能让RabbitMQ中队列属性也变的。真的需要的话,可以给声明的对象换个名字,相当于创建个新的。
对于复杂的业务,一次要声明很多队列和交换器的时候,代码会显得冗长,可以把他们都创建在一个@Bean方法里,然后返回一个合并了的Declarables
对象。其实这些声明操作最后都是由RabbitAdmin
(实现了AmqpAdmin
)完成的,还可以对队列、交换器和绑定进行删除,RabbitAdmin
和RabbitTemplate
一样,会自动配置到容器中,可以直接引入。
终于该说怎么接收消息了。一种方式是使用RabbitTemplate
的receive*方法,另一种是使用@RabbitListener
注解。
RabbitTemplate
有receive和receiveAndConvert方法,前者返回Message
对象,后者返回convert之后的对象,具体根据你的MessageConverter
而定。如果队列里没有消息,receive*会返回null。
这两个方法都有多种重载形式,可以指定队列名字,超时时间和类型应用等等。跟send*方法不同的是,receive*没有路由键、绑定键和交换器等参数,因为消息已经分发到队列里了,只要知道队列名字就可以取消息了。如果不传超时时间,默认时间是0,可以用spring.rabbitmq.template.receive-timeout
属性设置一个默认的超时时间。
使用receive*主动从队列主动拉取消息在简单的场景下使用不太方便,不如用@RabbitListener
注解。首先需要将@EnableRabbit
添加到任何一个配置类,然后给处理消息的方法添加@RabbitListener
注解。消息处理方法的参数可以是你自定义的业务对象,只要你提供了对应的MessageConverter
。
这个监听器呢,默认是单线程的,只会从队列里一个一个取消息取处理。如果你需要多线程处理,也就是多个消费者同时干,可以设置属性spring.rabbitmq.listener.simple.concurrency
,表示可以有多少个消息并行处理。这种情况下,消息就不会是顺序处理了,在需要顺序处理的情况下,你可能需要另想办法。
RabbitMQ在给消费者发送消息之后,有时候需要消费者确认下有没有处理好,如果消费者在处理消息的时候出了问题,比如挂了,则会让消息返回队列重新等待处理。这样会确保消息会正确地处理,不会遗漏。虽然一个消息可能会多次收到,但是只要代码逻辑做好判断,避免第二次再去处理就好了。
在@RabbitListener
方法的执行过程中,如果正常执行完成,那么会给RabbitMQ发送一个ack确认,表示消息真正被处理掉了。如果发生了异常,那么则会停止告诉队列此消息没有正确地被处理,可以再给其他消费者处理。
上面我们说到RabbitMQ默认会像发牌一样发送消息,然而,不同的消费者处理消息的速度可能不一样,消息可能在某一处理慢的消费者上积压,为了避免这一问题,提高吞吐量,Spring AMQP给消费者的缓冲大小进行了限制,大小设置为250,如果一个消费者有250个未确认的消息,就不再给他继续投递消息,而是给其他处理快的消费者。这个大小是可以通过spring.rabbitmq.listener.simple.prefetch
配置的。
本来只是想简单总结下RabbitMQ的要点,没想到还是写了5000+字。不过,就算知道的再多,不动手实践,不在项目里用,都是会很快忘记的。另外记得研究下RabbitMQ官方的Java客户端和Spring AMQP的文档,能给你的编程思路提供更多可能性。
The post Spring Boot教程(29) – RabbitMQ必备基础 first appeared on 闷瓜蛋子的BLOG.