本文共 11167 字,大约阅读时间需要 37 分钟。
消息队列:异步之间的线程方式
JMS
相当于jdbc,也是一个规范
(包括点对点、发布者、订阅者)ActiveMQ
相当于驱动,是具体的实现
添加依赖
SpringWeb、Spring for RabbitMQ
依赖配置
spring.rabbitmq.host=xx.xx.xx.xx//rabbitmq的端口号spring.rabbitmq.username=guestspring.rabbitmq.password=guest#5672通讯端口 (映射到了32773)spring.rabbitmq.port=32773
Broker
:简单来说就是消息队列服务器实体。
Exchange
:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue
:消息队列载体,每个消息都会被投入到一个或多个队列。 Binding
:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。 Routing Key
:路由关键字,exchange根据这个关键字进行消息投递。 vhost
:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。 producer
:消息生产者,就是投递消息的程序。 consumer
:消息消费者,就是接受消息的程序。 channel
:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。 (1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。 (3)客户端声明一个queue,并设置相关属性。 (4)客户端使用routing key,在exchange和queue之间建立好绑定关系。 (5)客户端投递消息到exchange。exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
exchange也有几个类型
完全根据key进行投递的叫做Direct交换机
,例如,绑定时设置了routing key为”abc”
,那么客户端提交的消息,只有设置了key为”abc”
的才会投递到队列。
对key进行模式匹配后进行投递的叫做Topic交换机
,符号”#”
匹配一个或多个词,符号”*”
匹配正好一个词。例如”abc.#”
匹配”abc.def.ghi”
,”abc.*”
只匹配”abc.def”
。
还有一种不需要key的,叫做Fanout交换机
,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储
RabbitMQ提供了四种Exchange:fanout,direct,topic,header
header模式在实际使用中较少,本文只对前三种模式进行比较。
性能排序:fanout > direct >> topic。比例大约为11:10:6
在日志系统中,所有的消息被广播给所有的消费者,但是如果我们只是希望有一个程序可以只接收error级别的日志并保存到磁盘中,而不用浪费空间去存储那些info、warning级别的日志。
为了说明这点,请看下图:
orange
,第二个队列有两个绑定键:black和green
。Direct Exchange - 处理路由关键字。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。
一般情况可以使用rabbitMQ自带的Exchang: " "
(该Exchange的名字为空字符串,下文称其为default Exchange)。
这种模式下不需要将Exchange进行任何绑定(binding)操作
消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。
package org.ql.rabbitmq.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @author ql * @version 1.0 2020/12/17 */@Configurationpublic class RabbitDirectConfig { public final static String DIRECTNAME="javaboy-direct"; @Bean Queue queue(){ return new Queue("hello.java"); } //用DirectExchange的话下面的两个可以省略掉 @Bean DirectExchange directExchange(){ //第二个参数:重启后是否有效,第三个参数,长久不用后是否删除 return new DirectExchange(DIRECTNAME,true,false); } @Bean Binding binding(){ //把Exchange和Queue到一起 return BindingBuilder.bind(queue()).to(directExchange()).with("direct"); }}
发送:
package org.ql.rabbitmq;import org.junit.jupiter.api.Test;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTestclass RabbitmqApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test void contextLoads() { //directExchange的作用,当你收到一条消息的时候,会转发到相同的routingKey(如:hello.java)上去 rabbitTemplate.convertAndSend("hello.java","hello java rabbitmq!"); //在这里发routingKey,会转发到config/RabbitDirectConfig的相同的Queue下 }}
接收器:监听,收到消息
@Componentpublic class DirectReceiver { //定义消费者,监听的routingKey也是hello.java所以会收到 @RabbitListener(queues = "hello.java")//定义了消费的方法,消费的参数是hello.java public void handler1(String msg){ System.out.println("handler1>>>"+msg); }}
Fanout Exchange – 不处理路由关键字。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
可以理解为路由表的模式
这种模式不需要RouteKey
这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
package org.ql.rabbitmq.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @author ql * @version 1.0 2020/12/17 */@Configurationpublic class RabbitFanoutConfig { public static final String FANOUTNAME = "java-fanout"; @Bean Queue queueOne() { return new Queue("queue-one"); } @Bean Queue queueTwo() { return new Queue("queue-two"); } @Bean FanoutExchange fanoutExchange() { //第二个参数:重启后是否有效,第三个参数,长久不用后是否删除 return new FanoutExchange(FANOUTNAME, true, false); } @Bean Binding bindingOne() { //把Exchange和Queue到一起 return BindingBuilder.bind(queueOne()).to(fanoutExchange()); } @Bean Binding bindingTwo() { return BindingBuilder.bind(queueTwo()).to(fanoutExchange()); }}
@Componentpublic class FanoutReceiver { @RabbitListener(queues = "queue-one") public void handleer1(String msg){ System.out.println("FanoutReceiver handleer1:"+msg); } @RabbitListener(queues = "queue-two") public void handleer2(String msg){ System.out.println("FanoutReceiver handleer2:"+msg); }}
@Autowired RabbitTemplate rabbitTemplate; @Test public void test1(){ //null表示的是routingKey,这时候是不生效的,写不写无所谓,只要是绑定了fanout方法,都会进行发送 rabbitTemplate.convertAndSend(RabbitFanoutConfig.FANOUTNAME,null,"hello fanout!"); }
Topic Exchange – 将路由关键字和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”
匹配一个或多个词,符号“*”
匹配不多不少一个词。因此“audit.#”
能够匹配到“audit.irs.corporate”
,但是“audit.*”
只会匹配到“audit.irs”
任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上
这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey)
,Exchange
会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
这种模式需要RouteKey
,也许要提前绑定Exchange
与Queue
。
在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”
表示该队列关心所有涉及log的消息(一个RouteKey
为”MQ.log.error”
的消息会被转发到该队列)。
“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”
能与“log.warn”
匹配,无法与“log.warn.timeout”
匹配;但是“log.#”
能与上述两者匹配。
同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。
package org.ql.rabbitmq.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @author ql * @version 1.0 2020/12/17 */@Configurationpublic class RabbitTopicConfig { public static final String TOPICNAME="java-topic"; @Bean TopicExchange topicExchange(){ return new TopicExchange(TOPICNAME,true,false); } @Bean Queue xiaomi(){ return new Queue("xiaomi"); } @Bean Queue huawei(){ return new Queue("huawei"); } @Bean Queue phone(){ return new Queue("phone"); } @Bean Binding xiaomiBinding(){ //xiaomi.#只要以小米开头的,都会传到这里来 return BindingBuilder.bind(xiaomi()).to(topicExchange()).with("xiaomi.#"); } @Bean Binding huaweiBinding(){ //huawei.#只要以huawei开头的,都会传到这里来 return BindingBuilder.bind(huawei()).to(topicExchange()).with("huawei.#"); } @Bean Binding phoneBinding(){ //#.phone.#只要有phone的,都会传到这里来 return BindingBuilder.bind(phone()).to(topicExchange()).with("#.phone.#"); }}
@Componentpublic class TopicRecevier { @RabbitListener(queues = "xiaomi") public void handler1(String msg){ System.out.println("TopicRecevier handler1:"+msg); } @RabbitListener(queues = "huawei") public void handler2(String msg){ System.out.println("TopicRecevier handler2:"+msg); } @RabbitListener(queues = "phone") public void handler3(String msg){ System.out.println("TopicRecevier handler3:"+msg); }}
@Autowired RabbitTemplate rabbitTemplate; @Test public void test2(){ rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiaomi.news","小米新闻"); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"vivo.phone","vivo手机"); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"huawei.phone","华为手机"); rabbitTemplate.convertAndSend(RabbitTopicConfig.TOPICNAME,"xiao.xiao","不存在的队列"); }
ackage org.ql.rabbitmq.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.HeadersExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/** * @author ql * @version 1.0 2020/12/17 */@Configurationpublic class RabbitHenderConfig { public static final String HEADERNAME="java-header"; @Bean HeadersExchange headersExchange(){ return new HeadersExchange(HEADERNAME,true,false); } @Bean Queue queueName(){ return new Queue("name-queue"); } @Bean Queue queueAge(){ return new Queue("age-queue"); } @Bean Binding bindingName(){ Mapmap=new HashMap<>(); map.put("name","java"); //匹配 return BindingBuilder.bind(queueName()).to(headersExchange()).whereAny(map).match(); } @Bean Binding bindingAge(){ //只要有age就匹配上 return BindingBuilder.bind(queueAge()).to(headersExchange()).where("age").exists(); }}
@Componentpublic class HeaderReceiver { //这时候是数组 @RabbitListener(queues = "name-queue") public void handler1(byte[] msg){ System.out.println("HeaderReceiver handler1"+new String(msg,0,msg.length)); } @RabbitListener(queues = "age-queue") public void handler2(byte[] msg){ System.out.println("HeaderReceiver handler2"+new String(msg,0,msg.length)); }}
@Autowired RabbitTemplate rabbitTemplate; @Test public void test3(){ //发送的是name,并且其里面的值是java Message nameMsg= MessageBuilder.withBody("hello java!".getBytes()).setHeader("name","java").build(); //这时候name找不到了 Message name1Msg= MessageBuilder.withBody("hello java!".getBytes()).setHeader("name","啦啦啦").build(); //age一定要有,99就无所谓了 Message ageMsg= MessageBuilder.withBody("hello 99!".getBytes()).setHeader("age","99").build(); rabbitTemplate.send(RabbitHenderConfig.HEADERNAME,null,nameMsg); rabbitTemplate.send(RabbitHenderConfig.HEADERNAME,null,ageMsg); }
参考:https://www.cnblogs.com/wuhenzhidu/p/10801103.html
https://blog.csdn.net/qq_26597927/article/details/95353748 https://www.cnblogs.com/shenyixin/p/9084249.html