快捷搜索:

RabbitMQ入门5:交换机类型;(fanout模式,direct模式,topic模式,headers模式;)

说明: (1)本篇博客主要介绍RabbitMQ交换机的内容,包括fanout模式,direct模式,topic模式,headers模式等;

一:RabbitMQ的交换机,在前面粗浅的介绍其概念,但我们没有详细介绍:

(1)在【】中,简单介绍了交换机的概念; ● 交换机的主要作用是,Producer在发送消息时候,是通过交换机向对应的队列中发送消息的; ● 然后,可以使用RoutingKey来指定,交换机和哪些队列绑定;即,Routing Key路由键就是专用于决定交换机和队列是如何进行绑定的; 简单来说,消息发送方即Producer生产者,在发布消息时候,可以设置交换机和路由键,以把消息发送到指定的队列中去;即,交换机的主要作用是,Producer生产者发布消息用的;
(2)在【】中,演示通过客户端连接使用RabbitMQ的时候,也涉及到了交换机的一点内容; ●在这儿,我们发布消息的时候,没有指定交换机(即使用的是RabbitMQ的默认交换机);如果我们没有指定交换机,那么我们就需要设置RoutingKey,以让RabbitMQ知道,Producer发送的消息要发送到那个(些)队列上去;

二:RabbitMQ中,交换机的四种工作模式:简述;

(1)fanout:广播模式的交换机; ● 即,如果Producer生产者在发布消息的时候,采用的是fanout模式的交换机,那么RabbitMQ会向【所有与这个交换机绑定的队列】都发送这个(同样的)消息; ● 所以,如果Producer生产者在发布消息的时候,采用的是fanout模式的交换机;其发布消息的时候,就不用设置RoutingKey了;因为,其会向【所有与这个交换机绑定的队列】都发送这个(同样的)消息,就不需要RoutingKey来指定向哪些队列发送消息了;
(2)direct:直接模式; ● 其会根据,RoutingKey路由键,去决定交换机把消息发送到哪个(些)队列上去; ● Producer在发送消息时,如果使用direct模式;那么,我们在发送消息的时候,就需要指定一个Routing Key(否则交换机,将不知道把消息发送到哪个(些)队列上去); ● 然后Consumer在接收消息的时候,需要创建一个队列;然后,Consumer要想使用队列接收到某个消息,那么在通过队列接受消息时,就需要设置对应的Routing Key;
(3)topic模式; ● Topic模式可以认为是direct模式的一个威力加强版; ● Topic之所以,可以应对“多个条件”的复杂逻辑;主要是其可以进行模糊匹配; ● 其中,*可以匹配任何一个单词(不包含"."的任意字符串);例如,item.*可以匹配item.pikapika,或者item.&gdygj。diu,或者item.ji。。joidjfi%djl但不能匹配item.ji。。joidjf.i%djl(实测如此;;;;;;貌似在RabbitMQ的Topic模式中,"."是一种特殊的存在,其可以作为分隔符,每隔一个"."就表示多了一个单词); #可以匹配0至多个单词;例如:item.# 能够匹配 item.insert.abc 或者 item.insert;(实测如此) ● 关于,topic模式,可以看下下面的案例;

三:交换机类型之:fanout广播模式;

0.问题引入; (1)一个适合使用fanout模式交换机的,场景; 试想一下这种场景:我们要构建一个日志记录系统,其情况和需求如下: ● 其中,比如我们的【业务代码】会发出日志消息,然后我们的【日志记录代码】负责接收并存储日志消息; ● 然后,同时既希望把日志存到磁盘上,同时也希望把日志打印到屏幕上; ● 那么,对于这种需求,我们就需要有多个接收日志的地方;即,也就是,我们有一个发送日志消息的,但却有多个接收日志消息的(对于同样的一条消息,这个多个接收日志消息的地方,都需收到); 那么,对于这种需求,那么Producer在发布消息的时候,使用fanout模式的交换机,就很适合; ● 如果Producer生产者在发布消息的时候,使用fanout模式交换机,会有如下特点:如果Producer生产者,在发布消息的时候,采用fanout模式的交换机;那么,在发布消息时,那么所有与这个交换机绑定的队列(一般这个队列声明在Consumer消费者处),都会收到这个消息; ● 即,Producer生产者在发布消息的时候,如果采用fanout模式的交换机;那么,其发布消息的时候,会向RabbitMQ中、所有、与这个交换机绑定的,队列中,都发送这个消息; ● 如果,RabbitMQ中、当前、没有与这个交换机绑定的队列,那么消息就相当于是一个无的之矢,这个消息就会在设定的存活时间内容,静静的等待;一旦有【与这个交换机绑定的队列】被创建后,这个消息就会把发送到那个队列中去;(PS:这一点,可能存在不准确、甚至错误的地方;但是,这也是自己经过实测的……) …………………………………………………… (2)关于日志存储,还有第二个需求; ● 需求:由于日志存储这个业务的特殊性,我们只想记录当前产生的日志,即【日志记录代码】只需要记录【日志记录代码启动后,业务代码产生的日志】;(因为,以前产生的日志,自有当时的日志记录代码负责;;;;同时,日志也有时效性,记录一些旧日志的意义不是很大;;;); ● 那么,针对这个需求,Consumer在接收消息的时候,就可以使用临时队列;(有关临时队列的内容,在下面会介绍)
1.创建EmitLog类,这个类可以认为是一个产生日志的业务类;其角色是【Producer生产者】; EmitLog类: package fanout; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 描述:发送日志信息; */ public class EmitLog { //定义交换机的名字; private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址) connectionFactory.setHost("1**.***.***.**8"); //然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行; // 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】 connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); //PS:记得要放开RabbitMQ部署的服务器的,5672端口; //3.建立连接 Connection connection = connectionFactory.newConnection(); //4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了) Channel channel = connection.createChannel(); //定义交换机;第一个参数,交换机的名字;第二个参数,交换机的类型; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //模拟一下,日志的内容 String message = "info:hello world"; //6.发布消息 //参数说明:第一个参数(exchange)是交换机; // 第二个参数(routingKey)是路由键,这儿因为交换机是fanout模式,所以路由键不需要写; //第三个参数(props),消息除了消息体外,还要有props作为它的配置; // 第四个参数(body)消息的内容,要求是byte[]类型的,同时,需要指定编码类型 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println("发送了消息:" + message); channel.close(); connection.close(); } } 说明: (1)类内容说明; (2)关于fanout交换机,和,RabbitMQ默认交换机,在使用时的区别; ● 在【】中,我们一开始演示RabbitMQ的时候,并没有主动去关心交换机的内容,当时使用的是默认交换机;;;所以,在当时我们发送消息的时候,为了能让RabbitMQ知道要被这个消息发送到哪个(些)队列中,我们需要指定Routing Key;;;然后,当时,我们在Producer生产者中就定义了一个队列,然后把这个队列作为RoutingKey,就相当于告诉RabbitMQ,我们发送的这个消息就是发送到这个队列中的;;;自然,RabbitMQ通过默认交换机,把消息发送到那个队列中后,这个消息就在队列中了;;;如果,当前没有任何Consumer消费者连接这个队列,那么这个队列中的消息,就会默默的等待;;;如果,有Consumer消费者连接了这个队列,那么Consumer就可以从这个队列中获取消息;;;而且,使用默认交换机的时候,RabbitMQ会采用分发的方式,把队列中的消息,分发给所有(连接了这个队列的)的Consumer,每个消息只会发送给一个Consumer(比如,有1,2,3,4四条消息;然后,有两个队列;那么第一个队列会分到1,3两条消息,第二个队列会分到2,4两条消息);;;这在 【】有着很明显的体现; ● 但是,Producer生产者在发送消息的时候,如果采用的是fanout模式的交换机;;;那么,其发布消息的时候,会向RabbitMQ中、所有、与这个交换机绑定的,队列中,都发送这个消息;;;如果,RabbitMQ中、当前、没有与这个交换机绑定的队列,那么消息就相当于是一个无的之矢,这个消息就会在设定的存活时间内容,静静的等待;一旦有【与这个交换机绑定的队列】被创建后,这个消息就会把发送到那个队列中去;(PS:这一点,可能存在不准确、甚至错误的地方;但是,这也是自己经过实测的……);;;同时,如果Producer生产者在发布消息的时候,采用的是fanout模式的交换机;其发布消息的时候,就不用设置RoutingKey了;因为,其会向【所有与这个交换机绑定的队列】都发送这个(同样的)消息,就不需要RoutingKey来指定向哪些队列发送消息了; (3)运行结果;发现,我们成功了发送了消息;(但其实,因为当前在RabbitMQ中,没有在任何Consumer中定义队列,来与这个交换机绑定;;;所以,这个消息就相当于是一个无的之矢,这个消息就会在设定的存活时间内容,静静的等待;一旦有【与这个交换机绑定的队列】被创建后,这个消息就会把发送到那个队列中去;(PS:这一点,可能存在不准确、甚至错误的地方;但是,这也是自己经过实测的……);;)
2.创建ReceiveLog类,这个类可以认为是一个存储日志的类;其角色是【Consumer消费者】; ReceiveLog类: package fanout; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 描述:接收日志消息 */ public class ReceiveLog { //定义交换机的名字 private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址) //这里面填写是RabbitMQ服务端所在服务器的ip地址 connectionFactory.setHost("1**.***.***.**8"); //然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行; // 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】 connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); //PS:记得要放开RabbitMQ部署的服务器的,5672端口; //3.建立连接 Connection connection = connectionFactory.newConnection(); //4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了) Channel channel = connection.createChannel(); //定义交换机;第一个参数,交换机的名字;第二个参数,交换机的类型; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //5.声明队列;我们生成一个非持久、会自动删除的、临时队列 String queueName = channel.queueDeclare().getQueue(); System.out.println(queueName); //然后,将队列绑定到交换机上;第一个参数是交换机名;第二个参数是Routing key(在fanout模式下,不需要Routing key); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("开始接收消息;"); //6.接收消息,并消费 channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到了消息:" + message); } }); } } 说明: (1.1)类内容说明: (1.2)针对我们这儿的【日志存储】业务来说,因为我们有个需求是【日志记录代码只用记录当前生成的日志信息,以前产生的日志就没必要记录了;(因为,以前产生的日志,自有当时的日志记录代码负责)】;所以,Consumer消费者,在接收日志消息时候,选用的是一种临时队列; ● 临时队列的声明方式; ● 上面我们使用临时队列的方式:我们每次启动一次Consumer消费者的时候,都会创建一个全新的(因为临时队列的名字是RabbitMQ随机、自动生成的)临时队列; ● 临时队列的本质特点,自己并没有很好的掌握; ● 有关临时队列,在接收消息时的特点,在下面会介绍; (1.3)然后,就可以把队列绑定到交换机上,然后去队列中拿消息了; (2)强调说明:接收消息的时候,Consumer是与队列绑定的,和交换机是不绑定的;这是一种解耦的关系;即,Producer通过交换机把消息发送到队列中去,然后Consumer从队列中拿消息; (3)运行结果; ● 情况1:Producer在Consumer未启动前(即,临时队列未创建前)发送的消息,在Consumer启动后(即,临时队列创建后),临时队列是接收不到的消息的; ● 情况2:我们先启动Consumer(即,先创建临时队列),然后Produce发送的消息,临时队列是可以收到的; ● 情况3:启动多个Consumer(自然,每个Consumer的队列是不同的),然后Producer再发送消息; 这正好可以满足日志系统的,第一个需求; ● 情况4(情况1的变种):如果Consumer中使用的不是临时队列,Producer在Consumer未启动前(其队列未创建前)发送的消息,在Consumer启动后(即,临时队列创建后),队列是可以接受的到的; 所以,可以感受到,RabbitMQ的内容,其实还是有一些的,对于这些自己暂时不了解的内容,暂时搁置吧;以后有精力再了解;;而且,以后的项目中,万一消息中间件没有使用RabbitMQ,而是使用的RockerMQ呐,对吧;

四:交换机类型之:direct直接模式;

0.direct模式,简介; (1)direct模式简介; ● Producer在发送消息时,如果使用direct模式;那么,我们在发送消息的时候,就需要指定一个Routing Key(否则交换机,将不知道把消息发送到哪个(些)队列上去); ● 然后Consumer在接收消息的时候,需要创建一个队列;然后,Consumer要想使用队列接收到某个消息,那么在通过队列接受消息时,就需要设置对应的Routing Key; …………………………………………………… (2)direct模式的一个应用场景; ● 比如,日志分为info,warning,error等等级;对于【把日志打印在控制台】来说,info,warning,error这三个等级都可以打印;对于【把日志写在日志文件中】来说,有时我们只需要写error等级的即可; ● 所以,对于这种场景,使用fanout广播模式的交换机,就不太合适了; ● 对于这种场景,我们可以使用direct模式;业务代码通过交换机发送日志消息的时候,给日志消息设置上对应的RoutingKey(此时,Routing Key就可以设置为info,warning或error这三个日志等级); ● 那么此时,【把日志打印在控制台】在通过队列去接收消息的时候,这个队列接收消息的时候Routing Key就可以设置为info,warning或error这三个日志等级;;;【把日志写在日志文件中】在通过队列去接收消息的时候,就可以设置Routing key为error这一个日志等级
Tips:定义(声明)一个交换机的时候,如果RabbitMQ已经存在叫这个名字的交换机时(PS:RabbitMQ创建交换机的时候,貌似可以设置过期时间等;RabbitMQ这种更复杂的内容,后续再了解吧),如果交换机类型不一致,就会报IO异常;
1.创建EmitLog类,这个类可以认为是一个产生日志的业务类;其角色是【Producer生产者】; EmitLog类: package direct; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 描述:direct类型的交换机时,发送消息; */ public class EmitLog { //定义交换机的名字; private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址) connectionFactory.setHost("1**.***.***.**8"); //然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行; // 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】 connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); //PS:记得要放开RabbitMQ部署的服务器的,5672端口; //3.建立连接 Connection connection = connectionFactory.newConnection(); //4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了) Channel channel = connection.createChannel(); //定义交换机;第一个参数,交换机的名字;第二个参数,交换机的类型; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //模拟一下,日志的内容 String messageInfo = "info:hello world"; String messageWarning = "warning:hello world"; String messageError = "error:hello world"; //6.发布消息 //参数说明:第一个参数(exchange)是交换机;这儿使用的是direct模式的交换机; // 第二个参数(routingKey)是路由键;然后,我们这儿发送的消息的routingKey设为info; //第三个参数(props),消息除了消息体外,还要有props作为它的配置; // 第四个参数(body)消息的内容,要求是byte[]类型的,同时,需要指定编码类型 channel.basicPublish(EXCHANGE_NAME, "info", null, messageInfo.getBytes("UTF-8")); System.out.println("发送了消息:" +"日志等级为info;消息内容为:"+ messageInfo); channel.basicPublish(EXCHANGE_NAME, "warning", null, messageWarning.getBytes("UTF-8")); System.out.println("发送了消息:" +"日志等级为warning;消息内容为:"+ messageWarning); channel.basicPublish(EXCHANGE_NAME, "error", null, messageError.getBytes("UTF-8")); System.out.println("发送了消息:" +"日志等级为error;消息内容为:"+ messageError); channel.close(); connection.close(); } } 说明: (1)使用direct类型的交换机; (2)通过交换机发送消息的时候,设置对应的Routing Key;
2.创建ReceiveLog1类,这个类可以认为是【把日志打印在控制台】那个接收日志信息的类;其角色是【Consumer消费者】; ReceiveLog1类: package direct; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 描述:direct类型的交换机时,接收消息; */ public class ReceiveLog1 { //定义交换机的名字 private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址) //这里面填写是RabbitMQ服务端所在服务器的ip地址 connectionFactory.setHost("1**.***.***.**8"); //然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行; // 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】 connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); //PS:记得要放开RabbitMQ部署的服务器的,5672端口; //3.建立连接 Connection connection = connectionFactory.newConnection(); //4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了) Channel channel = connection.createChannel(); //定义交换机;第一个参数,交换机的名字;第二个参数,交换机的类型; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.声明队列;我们生成一个非持久、会自动删除的、临时队列 String queueName = channel.queueDeclare().getQueue(); //然后,将队列绑定到交换机上;第一个参数是交换机名;第二个参数是Routing key; channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "warning"); channel.queueBind(queueName, EXCHANGE_NAME, "error"); System.out.println("开始接收消息;"); //6.接收消息,并消费 channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到了消息:" + message); } }); } } 说明: (1)定义交换机,声明一个临时队列; (2.1)然后,将队列绑定到交换机上去;同时,因为这儿想要接收info,warning,error类型的(而Producer在发送消息的时候,正好也对应的把routing key设为了info,warning,error);;;所以,这儿在接收消息的时候,该队列的routingkey也可以设为info,warning,error; (2.2)PS:经过实测,这儿绑定三次,其是同一个队列;
3.创建ReceiveLog2类,这个类可以认为是【把日志写在日志文件中】那个接收日志信息的类;其角色是【Consumer消费者】; ReceiveLog2类: package direct; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 描述:direct类型的交换机时,接收消息; */ public class ReceiveLog2 { //定义交换机的名字 private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址) //这里面填写是RabbitMQ服务端所在服务器的ip地址 connectionFactory.setHost("1**.***.***.**8"); //然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行; // 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】 connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); //PS:记得要放开RabbitMQ部署的服务器的,5672端口; //3.建立连接 Connection connection = connectionFactory.newConnection(); //4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了) Channel channel = connection.createChannel(); //定义交换机;第一个参数,交换机的名字;第二个参数,交换机的类型; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //5.声明队列;我们生成一个非持久、会自动删除的、临时队列 String queueName = channel.queueDeclare().getQueue(); //然后,将队列绑定到交换机上;第一个参数是交换机名;第二个参数是Routing key; channel.queueBind(queueName, EXCHANGE_NAME, "error"); System.out.println("开始接收消息;"); //6.接收消息,并消费 channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到了消息:" + message); } }); } } 说明: (1)这儿在接收消息是时候,只接受error类型的日志,所以其接收消息的时候routingKey设为了error;
4.运行效果; 先启动ReceiveLog1和ReceiveLog2两个Consumer(其本质,就是先创建两个接收消息的临时队列); 然后,启动EmitLog这个Producer,去发送消息; 那么,此时ReceiveLog1和ReceiveLog2两个Consumer,接收消息的情况为; 发现结果OK,符合预期效果;

五:交换机类型之:topic模式;

Topic模式可以认为是direct模式的一个威力加强版:逻辑阐述如下; (1)先回顾一下,direct模式: (2)但是,对于一些业务比较复杂的情况,为了能够更加灵活的应对,就需要引入Topic模式; ● 此时,我们有一个日志记录系统,其只想记录来自于【用户模块】的error级别的日志信息;那么,对于这种需求,使用direct模式的交换机自然也是可以完成的,只是不太灵活,不简洁; ● 那么,对于这种“多个条件”的复杂逻辑,使用topic就可以比较好的应对; (3)Topic模式简介; ● Topic之所以,可以应对“多个条件”的复杂逻辑;主要是其可以进行模糊匹配; ● 其中,*可以匹配任何一个单词(不包含"."的任意字符串);例如,item.*可以匹配item.pikapika,或者item.&gdygj。diu,或者item.ji。。joidjfi%djl但不能匹配item.ji。。joidjf.i%djl(实测如此;;;;;;貌似在RabbitMQ的Topic模式中,"."是一种特殊的存在,其可以作为分隔符,每隔一个"."就表示多了一个单词); #可以匹配0至多个单词;例如:item.# 能够匹配 item.insert.abc 或者 item.insert;(实测如此) ● 关于,topic模式,可以看下下面的案例;
1.创建EmitLog类,这个类可以认为是一个产生日志的业务类;其角色是【Producer生产者】; EmitLog类: package topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 描述:topic类型的交换机时,发送消息; */ public class EmitLog { //定义交换机的名字; private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址) connectionFactory.setHost("1**.***.***.**8"); //然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行; // 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】 connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); //PS:记得要放开RabbitMQ部署的服务器的,5672端口; //3.建立连接 Connection connection = connectionFactory.newConnection(); //4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了) Channel channel = connection.createChannel(); //定义交换机;第一个参数,交换机的名字;第二个参数,交换机的类型; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String[] routingKeys = new String[11]; routingKeys[0] = "quick.orange.rabbit"; routingKeys[1] = "lazy.orange.elephant"; routingKeys[2] = "quick.orange.fox"; routingKeys[3] = "lazy.brown.fox"; routingKeys[4] = "lazy.pink.rabbit"; routingKeys[5] = "quick.brown.fox"; routingKeys[6] = "orange"; routingKeys[7] = "quick.orange.male.rabbit"; routingKeys[8] = "lazy.orange.male.rabbit"; routingKeys[9] = "lazy.orange.。male.rabbit"; routingKeys[10] = "item&gdygj。。。fyui。。diu.orange.fox"; //6.发布消息;;;但是,发送消息时设置的Routing Key各不相同 //参数说明:第一个参数(exchange)是交换机;这儿使用的是direct模式的交换机; // 第二个参数(routingKey)是路由键;然后,我们这儿发送的消息的routingKey设为info; //第三个参数(props),消息除了消息体外,还要有props作为它的配置; // 第四个参数(body)消息的内容,要求是byte[]类型的,同时,需要指定编码类型 for (int i = 0; i < routingKeys.length; i++) { channel.basicPublish(EXCHANGE_NAME, routingKeys[i], null, (routingKeys[i] + "(" + i + ")").getBytes("UTF-8")); System.out.println("发送了:" + routingKeys[i] + "。设置的Routing Key是:" + routingKeys[i]); } channel.close(); connection.close(); } } 说明: (1)这儿我们使用topic模式的交换机; (2)然后,这儿我们发送11条消息,分别设置不同的Routing Key;(然后,为了方便观察效果,我们把消息的内容,设置成和Routing Key的内容一样了);
2.创建Receive1类,其角色是【Consumer消费者】;然后,其接收消息时,设置的Routing Key是【*.orange.*】; Receive1类: package topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 描述:topic类型的交换机时,接收消息; */ public class Receive1 { //定义交换机的名字 private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址) //这里面填写是RabbitMQ服务端所在服务器的ip地址 connectionFactory.setHost("1**.***.***.**8"); //然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行; // 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】 connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); //PS:记得要放开RabbitMQ部署的服务器的,5672端口; //3.建立连接 Connection connection = connectionFactory.newConnection(); //4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了) Channel channel = connection.createChannel(); //定义交换机;第一个参数,交换机的名字;第二个参数,交换机的类型; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //5.声明队列;我们生成一个非持久、会自动删除的、临时队列 String queueName = channel.queueDeclare().getQueue(); //然后,将队列绑定到交换机上;第一个参数是交换机名;第二个参数是Routing key; channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*"); System.out.println("开始接收消息;"); //6.接收消息,并消费 channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到了消息:" + message+"。接收这个消息时,是通过Routing Key="+"[*.orange.*],来接收的。"); } }); } } 说明: (1)类内容说明;
3.创建Receive2类,其角色是【Consumer消费者】;然后,其接收消息时,设置的Routing Key是【*.*.rabbit】和【lazy.#】; Receive2类: package topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 描述:topic类型的交换机时,接收消息; */ public class Receive2 { //定义交换机的名字 private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //2.设置RabbitMQ的地址(即RabbitMQ的服务端的地址) //这里面填写是RabbitMQ服务端所在服务器的ip地址 connectionFactory.setHost("1**.***.***.**8"); //然后,要想连接RabbitMQ的服务端,我么还需要通过一个用户才行; // 所以,这儿我们使用【前面我们设置的,能够在其他服务器上访问RabbitMQ所在服务器的,admin用户】 connectionFactory.setUsername("admin"); connectionFactory.setPassword("password"); //PS:记得要放开RabbitMQ部署的服务器的,5672端口; //3.建立连接 Connection connection = connectionFactory.newConnection(); //4.获得Channel信道(我们大部分的操作,都是在信道上完成的;有了信道后,我们就可以进行操作了) Channel channel = connection.createChannel(); //定义交换机;第一个参数,交换机的名字;第二个参数,交换机的类型; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //5.声明队列;我们生成一个非持久、会自动删除的、临时队列 String queueName = channel.queueDeclare().getQueue(); //然后,将队列绑定到交换机上;第一个参数是交换机名;第二个参数是Routing key; channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#"); System.out.println("开始接收消息;"); //6.接收消息,并消费 channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到了消息:" + message+"。接收这个消息时,是通过Routing Key="+"[*.*.rabbit]或[lazy.#],来接收的。"); } }); } } 说明: (1)类内容说明;

4.运行效果;

(1)先启动Receive1和Receive2两个Consumer(本质上就是先创建对应的两个临时队列); (2)然后,启动EmitLog发送消息; (3)Receive1和Receive2接收消息的情况是;(结果就不详细分析了,稍微一瞅,就能明白) ● Receive1,接收消息,队列绑定交换机时,Routing Key设置的是【*.orange.*】; ● Receive2,接收消息,队列绑定交换机时,Routing Key设置的是【*.*.rabbit】和【lazy.#】; (4)可以看到,RabbitMQ的Topic模式的交换机的提供的这个功能,能够帮助我们灵活的应对实际业务开发中的复杂需求;;;;;但是,在实际开发中具体使用RabbitMQ的规范和常用写法,需要积累和总结;

六:交换机类型之:headers模式;

● 如果,我们在发送消息的时候,设置了properties,并且传入了指定的规则;那么,我们就可以使用设置的规则发送给指定的队列; ● 只是,在实际开中,很少使用这种模式;
不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。 在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
经验分享 程序员 微信小程序 职场和发展