RabbitMQ的基础学习(上)
前言:
RabbitMQ是一个基于AMQP规范实现的消息队列。它具有性能好、高可用、跨平台性、社区活跃等优点,比较适合中小型公司使用。掌握RabbitMQ相关知识,对工作和学习都有帮助。下面我讲详细介绍一下Rabbit的相关知识。
正文:
一、AMQP规范:
首先,我们先要说明一下AMQP规范,这有利于我们学习RabbitMQ相关知识。
1. 概念:
AMQP(Advanced Message Queuing Protocol)是一个应用层的高级消息队列协议,它与JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。基于此协议不受客户端,开发语言等条件的限制,RabbitMQ就是基于此协议实现的。
2. 核心组件:
-
ConnectionFactory(连接工厂):生产Connection的的工厂。 Connection(连接):应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手。AMQP连接通常是长连接。 Channel(网络信道):大部分的业务操作是在Channel这个接口中完成的,包括: 队列的声明queueDeclare;交换机的声明exchangeDeclare; 队列的绑定queueBind;发布消息basicPublish;消费消息basicConsume等。 Broker(中间件):接受客户端的连接,实现AMQP实体服务,如RabbitMQ。 Producer(生产者):生产消息。 Consumer(消费者):消费消息。 Queue(队列):存储着即将被应用消费掉的消息。 Message(消息):服务与应用程序之间传送的数据,由Properties(属性)和body(主体)组成。 VirtualHost(虚拟主机):用于进行逻辑隔离,一个虚拟主机理由可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名字的Exchange。 Exchange(交换机):接受消息,根据路由键发送消息到绑定的队列(不具备消息存储的能力)。 Binding(绑定):Exchange和Queue之间的虚拟连接。 Routing Key(路由键):路由规则,虚拟机可以用它来确定如何路由一个特定消息。
3. AMQP工作过程:
- 生成者发布消息到交换机(Exchange)。
- 交换机根据路由规则,将消息分发给与当前交换机绑定的队列中。
- 消费者监听接收到消息之后开始业务处理。
二、4种交换机的使用:
RabbitMQ中一供有四种交换机类型,分别是Direct exchange(直连交换机)、Fanout exchange(扇形交换机)、Topic exchange(主题交换机)、Headers exchange(头交换机)。
1. Direct exchange(直连交换机):
要求消息与一个特定的路由键完全匹配,即一对一的,点对点的发送。
2. Fanout exchange(扇形交换机):
3.Topic exchange(主题交换机):
通配符匹配交换机,使用通配符去匹配,路由到对应的队列。
4. Headers exchange(头交换机):
不适用routingKey进行路由匹配,使用请求头中键值路由匹配。
5. 代码实现,以直连交换机为例:
5.1 直连交换机的Rabbit配置类:
/** * 〈一句话功能简述〉<br> * 〈直连交换机的Rabbit配置类〉 * * @author hanxinghua * @create 2022/9/19 * @since 1.0.0 */ @Order(-1) @Configuration public class DirectRabbitConfig implements BeanPostProcessor { @Resource private RabbitAdmin rabbitAdmin; //初始化rabbitAdmin对象 @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类 rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } @Bean public Queue rabbitDirectQueue() { /** * 1、name: 队列名称 * 2、durable: 是否持久化 * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。 * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。 * */ return new Queue(RabbitConstant.DIRECT_TOPIC, true, false, false); } @Bean public DirectExchange rabbitDirectExchange() { // Direct交换机 return new DirectExchange(RabbitConstant.DIRECT_EXCHANGE, true, false); } @Bean public Binding bindDirect() { //链式写法,绑定交换机和队列,并设置匹配键 return BindingBuilder //绑定队列 .bind(rabbitDirectQueue()) //到交换机 .to(rabbitDirectExchange()) //并设置匹配键 .with(RabbitConstant.DIRECT_ROUTING); } /** * 实例化Bean后的处理器 * Tips: * 由于队列不存在,启动消费者会报错,最好的解决方法是生产者和消费者都尝试声明队列 * * @param bean * @param beanName * @return * @throws BeansException */ @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { // 创建交换机 rabbitAdmin.declareExchange(rabbitDirectExchange()); // 创建队列 rabbitAdmin.declareQueue(rabbitDirectQueue()); return null; } }
5.2 直连交换机的发送消息服务:
/** * 〈一句话功能简述〉<br> * 〈〉 * * @author hanxinghua * @create 2022/9/19 * @since 1.0.0 */ @Service("directRabbitService") public class DirectRabbitServiceImpl implements RabbitService { private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Resource private RabbitTemplate rabbitTemplate; @Override public String sendMsg(String msg) throws Exception { try { rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE, RabbitConstant.DIRECT_ROUTING, msg); return "ok"; } catch (Exception e) { e.printStackTrace(); return "error"; } } }
5.3 直连交换机的消息消费者:
/** * 〈一句话功能简述〉<br> * 〈直连交换机消费者〉 * * @author hanxinghua * @create 2022/9/19 * @since 1.0.0 */ @Component public class DirectRabbitConsumer { enum Action { //处理成功 SUCCESS, //可以重试的错误,消息重回队列 RETRY, //无需重试的错误,拒绝消息,并从队列中删除 REJECT } @RabbitHandler @RabbitListener(queuesToDeclare = @Queue(RabbitConstant.DIRECT_TOPIC)) public void process(String msg, Message message, Channel channel) { long tag = message.getMessageProperties().getDeliveryTag(); Action action = Action.SUCCESS; try { System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:" + msg); if ("bad".equals(msg)) { throw new IllegalArgumentException("测试:抛出可重回队列的异常"); } if ("error".equals(msg)) { throw new Exception("测试:抛出无需重回队列的异常"); } } catch (IllegalArgumentException e1) { e1.printStackTrace(); //根据异常的类型判断,设置action是可重试的,还是无需重试的 action = Action.RETRY; } catch (Exception e2) { //打印异常 e2.printStackTrace(); //根据异常的类型判断,设置action是可重试的,还是无需重试的 action = Action.REJECT; } finally { try { if (action == Action.SUCCESS) { //multiple 表示是否批量处理。true表示批量ack处理小于tag的所有消息。false则处理当前消息 channel.basicAck(tag, false); } else if (action == Action.RETRY) { //Nack,拒绝策略,消息重回队列 channel.basicNack(tag, false, true); } else { //Nack,拒绝策略,并且从队列中删除 channel.basicNack(tag, false, false); } channel.close(); } catch (Exception e) { e.printStackTrace(); } } } }
三、6种通信模型使用:
RabbitMQ中,主要包括6种通信模型,分别是helloworld模型、work模型、pubsub模型、router模型、topic模型、rpc模型。
1. helloworld模型:
一个生产者发送消息,一个接收者接收消息。
相关代码:
/** * 〈一句话功能简述〉<br> * 〈接受队列中的消息〉 * <p> * 一个生成者发送消息,一个接收者接收消息 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Recv { /** * 队列名称 */ private final static String QUEUE_NAME = "hello.mq"; public static void main(String[] argv) throws Exception { // 创建链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ所在主机ip或者主机名 factory.setHost("localhost"); // 创建一个连接 Connection connection = factory.newConnection(); // 创建一个频道 Channel channel = connection.createChannel(); // 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Waiting for messages."); // 创建消费者 Consumer consumer = new DefaultConsumer(channel) { /** * 处理交付 * * @param consumerTag 这个消息的唯一标记 * @param envelope 信封(请求的消息属性的一个封装) * @param properties 前面队列带过来的值 * @param body 接受到的消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received " + message + ""); } }; // 启动一个消费者,并返回服务端生成的消费者标识 channel.basicConsume(QUEUE_NAME, true, consumer); } } /** * 〈一句话功能简述〉<br> * 〈发送消息到队列中〉 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Send { /** * 队列名称 */ private final static String QUEUE_NAME = "hello.mq"; public static void main(String[] argv) throws Exception { // 创建链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ所在主机ip或者主机名 factory.setHost("localhost"); // 创建一个连接 Connection connection = factory.newConnection(); // 创建一个频道 Channel channel = connection.createChannel(); // 指定一个队列 // 第一个参数:队列名称 // 第二个参数:false:重启后,队列没有。true:持久化队列,重启后,队列依然存在 // 第三个参数:声明一个独占队列,仅限于此连接,连接关闭,删除这个队列 true // 第四个参数:最后一个消费者退出去之后,这个队列是否自动删除 // 第五个参数:队列的其他属性 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 往队列中发出一条消息 String message = "hello world!"; // 第一个参数: 交换机,不能为null,但是可以设置成 "" // 第二个参数:路由key,不能为null,但是可以设置成 "" // 第三个参数:设置的队列的属性 // 第四个参数:消息值 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("Sent " + message + ""); //关闭频道和连接 channel.close(); connection.close(); } }
2. work模型:
多个消费者消费的数据之和才是原来队列中的所有数据,适用于流量的消峰。
相关代码:
/** * 〈一句话功能简述〉<br> * 〈〉 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Task { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); for (int i = 0; i < 100 ; i++) { channel.basicPublish("", TASK_QUEUE_NAME, null, ("我是工作模型:"+i).getBytes("UTF-8")); } System.out.println("Sent over!"); channel.close(); connection.close(); } } /** * 〈一句话功能简述〉<br> * 〈〉 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Worker1 { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println("Waiting for messages."); // // 消费端限流策略,同一时刻服务器只会发送一条消息给消费者 // channel.basicQos(1) final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1接受到的消息是:"+new String(body)); // 进行手动应答 第一个参数:自动应答的这个消息标记 第二个参数:false 就相当于告诉队列受到消息了 channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, consumer); } } /** * 〈一句话功能简述〉<br> * 〈〉 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Worker2 { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println("Waiting for messages."); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2接受到的消息是:"+new String(body)); // 进行手动应答 第一个参数:自动应答的这个消息标记 第二个参数:false 就相当于告诉队列受到消息了 channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, consumer); } }
3. pubsub模型:
发布订阅模式,使用Fanout交换机。
相关代码:
/** * 〈一句话功能简述〉<br> * 〈消息发布者〉 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Publish { /** * 声明交换机的名字 */ private static final String EXCHANGE_NAME="fanout-01"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明交换机 第一个参数:交换机的名字 第二个参数:交换机的类型,如果使用的是发布订阅模型 只能写 fanout channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT); // 发送消息到交换机 for (int i = 0; i <100 ; i++) { channel.basicPublish(EXCHANGE_NAME,"",null,("发布订阅模型的值:"+i).getBytes()); } System.out.println("Sent over!"); // 关闭资源 channel.close(); connection.close(); } } /** * 〈一句话功能简述〉<br> * 〈消息订阅者〉 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Subscribe1 { /** * 声明交换机的名字 */ private static final String EXCHANGE_NAME = "fanout-01"; /** * 队列的名字 */ private static final String QUEUE_NAME = "fanout-queue1"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 声明换机 channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT); // 将队列绑定到交换机 第一个参数:队列的名字 第二个参数:交换机的名字 // 第三个参数:路由的key(现在没有用到这个路由的key) channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); System.out.println("Waiting for messages."); // 声明消费者 Consumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("订阅者1接受到的数据是:" + new String(body)); } }; // 启动一个消费者 channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } } /** * 〈一句话功能简述〉<br> * 〈消息订阅者〉 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Subscribe2 { /** * 声明交换机的名字 */ private static final String EXCHANGE_NAME = "fanout-01"; /** * 队列的名字 */ private static final String QUEUE_NAME = "fanout-queue2"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 声明换机 channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT); // 将队列绑定到交换机 第一个参数:队列的名字 第二个参数:交换机的名字 // 第三个参数:路由的key(现在没有用到这个路由的key) channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); System.out.println("Waiting for messages."); // 声明消费者 Consumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("订阅者1接受到的数据是:" + new String(body)); } }; // 启动一个消费者 channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
4. router模型:
路由模型,相当于是分布订阅的升级版,根据路由的key(routing key)来判断是否路由到哪一个队列里面去,使用Direct交换机
相关代码:
/** * 〈一句话功能简述〉<br> * 〈〉 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Producer { private static final String EXCHANGE_NAME = "direct-01"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个交换机,要是路由模式只能是 direct channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT); // 发送信息到交换机 for (int i = 0; i < 100; i++) { if (i % 2 == 0) { // 这个路由的key是可以随便设置的 channel.basicPublish(EXCHANGE_NAME, "one", null, ("路由模型的值:" + i).getBytes()); } else { // 这个路由的key是可以随便设置的 channel.basicPublish(EXCHANGE_NAME, "two", null, ("路由模型的值:" + i).getBytes()); } } System.out.println("Sent over!"); channel.close(); connection.close(); } } /** * 〈一句话功能简述〉<br> * 〈〉 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Consumer1 { private static final String EXCHANGE_NAME="direct-01"; private static final String QUEUE_NAME="direct-queue-01"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT); // 绑定队列到交换机 第三个参数:表示的是路由key channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one"); System.out.println("Waiting for messages."); // 声明消费者 Consumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //这里就是接受消息的地方 System.out.println("路由key是one的这个队列接受到数据:"+new String(body)); } }; //绑定消费者 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } } /** * 〈一句话功能简述〉<br> * 〈〉 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Consumer2 { private static final String EXCHANGE_NAME="direct-01"; private static final String QUEUE_NAME="direct-queue-02"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT); // 绑定队列到交换机 第三个参数:表示的是路由key channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"two"); System.out.println("Waiting for messages."); // 声明消费者 Consumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //这里就是接受消息的地方 System.out.println("路由key是two的这个队列接受到数据:"+new String(body)); } }; //绑定消费者 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
5. topic模型:
相当于是对路由模式的一个升级,在匹配的规则上可以实现模糊匹配
相关代码:
/** * 〈一句话功能简述〉<br> * 〈〉 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Producer { private static final String EXCHANGE_NAME = "topic-01"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC); // 发送信息到交换机 for (int i = 0; i < 100; i++) { if (i % 2 == 0) { channel.basicPublish(EXCHANGE_NAME, "one.one.one", null, ("路由模型的值:" + i).getBytes()); }else { channel.basicPublish(EXCHANGE_NAME, "one.one", null, ("路由模型的值:" + i).getBytes()); } } System.out.println("Sent over!"); channel.close(); connection.close(); } } /** * 〈一句话功能简述〉<br> * 〈〉 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Consumer1 { private static final String QUEUE_NAME="topic-queue-01"; private static final String EXCHANGE_NAME="topic-01"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); // 绑定队列到交换机 第三个参数:表示的是路由key // 注意 * :只是代表一个单词 # :这个才代表 一个或者多个单词 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one.*"); System.out.println("Waiting for messages."); // 声明消费者 Consumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("路由key是one.*的这个队列接受到数据:"+new String(body)); } }; System.out.println("Waiting for messages."); //绑定消费者 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } } /** * 〈一句话功能简述〉<br> * 〈〉 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Consumer2 { private static final String QUEUE_NAME="topic-queue-02"; private static final String EXCHANGE_NAME="topic-01"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); // 绑定队列到交换机 第三个参数:表示的是路由key // 注意 * :只是代表一个单词 # :这个才代表 一个或者多个单词 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one.#"); System.out.println("Waiting for messages."); // 声明消费者 Consumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("路由key是one.#的这个队列接受到数据:"+new String(body)); } }; //绑定消费者 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
6. rpc模型:
相关代码:
/** * 〈一句话功能简述〉<br> * 〈〉 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Server { private final static String QUEUE_NAME = "rpc-01"; public static void main(String[] args) throws Exception { // 创建链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ所在主机ip或者主机名 factory.setHost("localhost"); // 创建一个连接 Connection connection = factory.newConnection(); // 创建一个频道 Channel channel = connection.createChannel(); // 声明一个队列:客户端向服务器发送数据的队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 启动消费者,用来处理客户端发送到队列的消息 channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 获取参数 String message = new String(body); int n = Integer.parseInt(message); // 模拟服务端的一个功能 String fib = handleInterface(n) + ""; AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); // 将结果返回会客户端 // 注意从properties去获取客户端传送过来的信息,再返回回去 channel.basicPublish("", properties.getReplyTo(), replyProps, fib.getBytes()); } }); } private static int handleInterface(int n) { if (n == 0) { return 0; } return n + 2; } } /** * 〈一句话功能简述〉<br> * 〈RPC模型〉 * * @author hanxinghua * @create 2022/9/23 * @since 1.0.0 */ public class Client { private final static String QUEUE_NAME = "rpc-01"; public static void main(String[] args) throws Exception { // 创建链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ所在主机ip或者主机名 factory.setHost("localhost"); // 创建一个连接 Connection connection = factory.newConnection(); // 创建一个频道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 声明一个队列(换了一种方式),用于存储服务器返回到客户端的数据 String replyQueueName = channel.queueDeclare().getQueue(); // 使用UUID随机生成一个id final String correlationId = UUID.randomUUID().toString(); // 客户端发送给服务器添加的额外属性 AMQP.BasicProperties props = new AMQP.BasicProperties() .builder() .correlationId(correlationId) .replyTo(replyQueueName) .build(); // 客户端将数据发送到发送队列 channel.basicPublish("", QUEUE_NAME, props, "4".getBytes()); // 启动消费者,用来客户端从相应队列获取到处理的结果 channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 通过correlationId去保证获取到的是正确的信息 if (properties.getCorrelationId().equals(correlationId)) { // 处理的结果输出 System.out.println("RPC返回结果:" + new String(body)); } // 关闭通道,注意一定要等结果返回后再关闭,不然拿不到返回的数据 try { channel.close(); connection.close(); } catch (TimeoutException e) { e.printStackTrace(); } } }); } }