RabbitMQ的基础学习(上)

前言:

RabbitMQ是一个基于AMQP规范实现的消息队列。它具有性能好、高可用、跨平台性、社区活跃等优点,比较适合中小型公司使用。掌握RabbitMQ相关知识,对工作和学习都有帮助。下面我讲详细介绍一下Rabbit的相关知识。

正文:

一、AMQP规范:

首先,我们先要说明一下AMQP规范,这有利于我们学习RabbitMQ相关知识。

1. 概念:

AMQP(Advanced Message Queuing Protocol)是一个应用层的高级消息队列协议,它与JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。基于此协议不受客户端,开发语言等条件的限制,RabbitMQ就是基于此协议实现的。

2. 核心组件:

    ConnectionFactory(连接工厂):生产Connection的的工厂。 Connection(连接):应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手。AMQP连接通常是长连接。 Channel(网络信道):大部分的业务操作是在Channel这个接口中完成的,包括: 队列的声明queueDeclare;交换机的声明exchangeDeclare; 队列的绑定queueBind;发布消息basicPublish;消费消息basicConsume等。 Broker(中间件):接受客户端的连接,实现AMQP实体服务,如RabbitMQ。 Producer(生产者):生产消息。 Consumer(消费者):消费消息。 Queue(队列):存储着即将被应用消费掉的消息。 Message(消息):服务与应用程序之间传送的数据,由Properties(属性)和body(主体)组成。 VirtualHost(虚拟主机):用于进行逻辑隔离,一个虚拟主机理由可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名字的Exchange。 Exchange(交换机):接受消息,根据路由键发送消息到绑定的队列(不具备消息存储的能力)。 Binding(绑定):Exchange和Queue之间的虚拟连接。 Routing Key(路由键):路由规则,虚拟机可以用它来确定如何路由一个特定消息。

3. AMQP工作过程:

  1. 生成者发布消息到交换机(Exchange)。
  2. 交换机根据路由规则,将消息分发给与当前交换机绑定的队列中。
  3. 消费者监听接收到消息之后开始业务处理。

二、4种交换机的使用:

RabbitMQ中一供有四种交换机类型,分别是Direct exchange(直连交换机)、Fanout exchange(扇形交换机)、Topic exchange(主题交换机)、Headers exchange(头交换机)。

1. Direct exchange(直连交换机):

要求消息与一个特定的路由键完全匹配,即一对一的,点对点的发送。

2. Fanout exchange(扇形交换机):

3.Topic exchange(主题交换机):

通配符匹配交换机,使用通配符去匹配,路由到对应的队列。

4. Headers exchange(头交换机):

不适用routingKey进行路由匹配,使用请求头中键值路由匹配。

5. 代码实现,以直连交换机为例:

5.1 直连交换机的Rabbit配置类:

/**
 * 〈一句话功能简述〉<br>
 * 〈直连交换机的Rabbit配置类〉
 *
 * @author hanxinghua
 * @create 2022/9/19
 * @since 1.0.0
 */
@Order(-1)
@Configuration
public class DirectRabbitConfig implements BeanPostProcessor {


    @Resource
    private RabbitAdmin rabbitAdmin;


    //初始化rabbitAdmin对象
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    @Bean
    public Queue rabbitDirectQueue() {
        /**
         * 1、name:    队列名称
         * 2、durable: 是否持久化
         * 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
         * 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
         * */
        return new Queue(RabbitConstant.DIRECT_TOPIC, true, false, false);
    }

    @Bean
    public DirectExchange rabbitDirectExchange() {
        // Direct交换机
        return new DirectExchange(RabbitConstant.DIRECT_EXCHANGE, true, false);
    }

    @Bean
    public Binding bindDirect() {
        //链式写法,绑定交换机和队列,并设置匹配键
        return BindingBuilder
                //绑定队列
                .bind(rabbitDirectQueue())
                //到交换机
                .to(rabbitDirectExchange())
                //并设置匹配键
                .with(RabbitConstant.DIRECT_ROUTING);
    }

    /**
     * 实例化Bean后的处理器
     * Tips:
     * 由于队列不存在,启动消费者会报错,最好的解决方法是生产者和消费者都尝试声明队列
     *
     * @param bean
     * @param beanName
     * @return
     * @throws BeansException
     */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        // 创建交换机
        rabbitAdmin.declareExchange(rabbitDirectExchange());
        // 创建队列
        rabbitAdmin.declareQueue(rabbitDirectQueue());
        return null;
    }

}

5.2 直连交换机的发送消息服务:

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/19
 * @since 1.0.0
 */
@Service("directRabbitService")
public class DirectRabbitServiceImpl implements RabbitService {

    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Override
    public String sendMsg(String msg) throws Exception {
        try {
            rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE, RabbitConstant.DIRECT_ROUTING, msg);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }
}

5.3 直连交换机的消息消费者:

/**
 * 〈一句话功能简述〉<br>
 * 〈直连交换机消费者〉
 *
 * @author hanxinghua
 * @create 2022/9/19
 * @since 1.0.0
 */
@Component
public class DirectRabbitConsumer {

    enum Action {
        //处理成功
        SUCCESS,
        //可以重试的错误,消息重回队列
        RETRY,
        //无需重试的错误,拒绝消息,并从队列中删除
        REJECT
    }

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue(RabbitConstant.DIRECT_TOPIC))
    public void process(String msg, Message message, Channel channel) {
        long tag = message.getMessageProperties().getDeliveryTag();
        Action action = Action.SUCCESS;
        try {
            System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:" + msg);
            if ("bad".equals(msg)) {
                throw new IllegalArgumentException("测试:抛出可重回队列的异常");
            }
            if ("error".equals(msg)) {
                throw new Exception("测试:抛出无需重回队列的异常");
            }
        } catch (IllegalArgumentException e1) {
            e1.printStackTrace();
            //根据异常的类型判断,设置action是可重试的,还是无需重试的
            action = Action.RETRY;
        } catch (Exception e2) {
            //打印异常
            e2.printStackTrace();
            //根据异常的类型判断,设置action是可重试的,还是无需重试的
            action = Action.REJECT;
        } finally {
            try {
                if (action == Action.SUCCESS) {
                    //multiple 表示是否批量处理。true表示批量ack处理小于tag的所有消息。false则处理当前消息
                    channel.basicAck(tag, false);
                } else if (action == Action.RETRY) {
                    //Nack,拒绝策略,消息重回队列
                    channel.basicNack(tag, false, true);
                } else {
                    //Nack,拒绝策略,并且从队列中删除
                    channel.basicNack(tag, false, false);
                }
                channel.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

三、6种通信模型使用:

RabbitMQ中,主要包括6种通信模型,分别是helloworld模型、work模型、pubsub模型、router模型、topic模型、rpc模型。

1. helloworld模型:

一个生产者发送消息,一个接收者接收消息。

相关代码:

/**
 * 〈一句话功能简述〉<br>
 * 〈接受队列中的消息〉
 * <p>
 * 一个生成者发送消息,一个接收者接收消息
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Recv {

    /**
     * 队列名称
     */
    private final static String QUEUE_NAME = "hello.mq";

    public static void main(String[] argv) throws Exception {

        // 创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ所在主机ip或者主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("Waiting for messages.");

        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {

            /**
             * 处理交付
             *
             * @param consumerTag 这个消息的唯一标记
             * @param envelope 信封(请求的消息属性的一个封装)
             * @param properties 前面队列带过来的值
             * @param body 接受到的消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received " + message + "");
            }
        };

        // 启动一个消费者,并返回服务端生成的消费者标识
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}




/**
 * 〈一句话功能简述〉<br>
 * 〈发送消息到队列中〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Send {

    /**
     * 队列名称
     */
    private final static String QUEUE_NAME = "hello.mq";

    public static void main(String[] argv) throws Exception {

        // 创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ所在主机ip或者主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 指定一个队列
        // 第一个参数:队列名称
        // 第二个参数:false:重启后,队列没有。true:持久化队列,重启后,队列依然存在
        // 第三个参数:声明一个独占队列,仅限于此连接,连接关闭,删除这个队列 true
        // 第四个参数:最后一个消费者退出去之后,这个队列是否自动删除
        // 第五个参数:队列的其他属性
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 往队列中发出一条消息
        String message = "hello world!";
        // 第一个参数: 交换机,不能为null,但是可以设置成 ""
        // 第二个参数:路由key,不能为null,但是可以设置成 ""
        // 第三个参数:设置的队列的属性
        // 第四个参数:消息值
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("Sent " + message + "");
        //关闭频道和连接
        channel.close();
        connection.close();
    }
}

2. work模型:

多个消费者消费的数据之和才是原来队列中的所有数据,适用于流量的消峰。

相关代码:

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Task {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        for (int i = 0; i < 100 ; i++) {
            channel.basicPublish("", TASK_QUEUE_NAME, null,
                    ("我是工作模型:"+i).getBytes("UTF-8"));
        }
        System.out.println("Sent over!");

        channel.close();
        connection.close();
    }

}



/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Worker1 {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("Waiting for messages.");

//        // 消费端限流策略,同一时刻服务器只会发送一条消息给消费者
//        channel.basicQos(1)

        final Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("消费者1接受到的消息是:"+new String(body));
                // 进行手动应答  第一个参数:自动应答的这个消息标记  第二个参数:false 就相当于告诉队列受到消息了
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    }

}




/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Worker2 {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("Waiting for messages.");


        final Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("消费者2接受到的消息是:"+new String(body));
                // 进行手动应答  第一个参数:自动应答的这个消息标记  第二个参数:false 就相当于告诉队列受到消息了
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    }
}

3. pubsub模型:

发布订阅模式,使用Fanout交换机。

相关代码:

/**
 * 〈一句话功能简述〉<br>
 * 〈消息发布者〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Publish {

    /**
     * 声明交换机的名字
     */
    private static final String EXCHANGE_NAME="fanout-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明交换机 第一个参数:交换机的名字  第二个参数:交换机的类型,如果使用的是发布订阅模型  只能写 fanout
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);

        // 发送消息到交换机
        for (int i = 0; i <100 ; i++) {
            channel.basicPublish(EXCHANGE_NAME,"",null,("发布订阅模型的值:"+i).getBytes());
        }
        System.out.println("Sent over!");

        // 关闭资源
        channel.close();
        connection.close();
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈消息订阅者〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Subscribe1 {

    /**
     * 声明交换机的名字
     */
    private static final String EXCHANGE_NAME = "fanout-01";

    /**
     * 队列的名字
     */
    private static final String QUEUE_NAME = "fanout-queue1";


    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 声明换机
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);

        // 将队列绑定到交换机  第一个参数:队列的名字 第二个参数:交换机的名字
        // 第三个参数:路由的key(现在没有用到这个路由的key)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("订阅者1接受到的数据是:" + new String(body));
            }
        };

        // 启动一个消费者
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈消息订阅者〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Subscribe2 {

    /**
     * 声明交换机的名字
     */
    private static final String EXCHANGE_NAME = "fanout-01";

    /**
     * 队列的名字
     */
    private static final String QUEUE_NAME = "fanout-queue2";


    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 声明换机
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);

        // 将队列绑定到交换机  第一个参数:队列的名字 第二个参数:交换机的名字
        // 第三个参数:路由的key(现在没有用到这个路由的key)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("订阅者1接受到的数据是:" + new String(body));
            }
        };

        // 启动一个消费者
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

4. router模型:

路由模型,相当于是分布订阅的升级版,根据路由的key(routing key)来判断是否路由到哪一个队列里面去,使用Direct交换机

相关代码:

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Producer {

    private static final String EXCHANGE_NAME = "direct-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明一个交换机,要是路由模式只能是 direct
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);

        // 发送信息到交换机
        for (int i = 0; i < 100; i++) {
            if (i % 2 == 0) {
                // 这个路由的key是可以随便设置的
                channel.basicPublish(EXCHANGE_NAME, "one", null, ("路由模型的值:" + i).getBytes());
            } else {
                // 这个路由的key是可以随便设置的
                channel.basicPublish(EXCHANGE_NAME, "two", null, ("路由模型的值:" + i).getBytes());
            }
        }
        System.out.println("Sent over!");

        channel.close();
        connection.close();
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Consumer1 {

    private static final String EXCHANGE_NAME="direct-01";
    private static final String QUEUE_NAME="direct-queue-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);

        // 绑定队列到交换机  第三个参数:表示的是路由key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //这里就是接受消息的地方
                System.out.println("路由key是one的这个队列接受到数据:"+new String(body));
            }
        };

        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Consumer2 {

    private static final String EXCHANGE_NAME="direct-01";
    private static final String QUEUE_NAME="direct-queue-02";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);

        // 绑定队列到交换机  第三个参数:表示的是路由key
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"two");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //这里就是接受消息的地方
                System.out.println("路由key是two的这个队列接受到数据:"+new String(body));
            }
        };

        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

5. topic模型:

相当于是对路由模式的一个升级,在匹配的规则上可以实现模糊匹配

相关代码:

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Producer {

    private static final String EXCHANGE_NAME = "topic-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);

        // 发送信息到交换机
        for (int i = 0; i < 100; i++) {
            if (i % 2 == 0) {
                channel.basicPublish(EXCHANGE_NAME, "one.one.one", null, ("路由模型的值:" + i).getBytes());
            }else {
                channel.basicPublish(EXCHANGE_NAME, "one.one", null, ("路由模型的值:" + i).getBytes());
            }
        }
        System.out.println("Sent over!");

        channel.close();
        connection.close();
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Consumer1 {

    private static final String QUEUE_NAME="topic-queue-01";
    private static final String EXCHANGE_NAME="topic-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        // 绑定队列到交换机  第三个参数:表示的是路由key
        // 注意  * :只是代表一个单词  # :这个才代表  一个或者多个单词
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one.*");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("路由key是one.*的这个队列接受到数据:"+new String(body));
            }
        };
        System.out.println("Waiting for messages.");
        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Consumer2 {

    private static final String QUEUE_NAME="topic-queue-02";
    private static final String EXCHANGE_NAME="topic-01";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        // 绑定队列到交换机  第三个参数:表示的是路由key
        // 注意  * :只是代表一个单词  # :这个才代表  一个或者多个单词
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one.#");
        System.out.println("Waiting for messages.");

        // 声明消费者
        Consumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("路由key是one.#的这个队列接受到数据:"+new String(body));
            }
        };
        //绑定消费者
        channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
    }
}

6. rpc模型:

相关代码:

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Server {

    private final static String QUEUE_NAME = "rpc-01";

    public static void main(String[] args) throws Exception {
        // 创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ所在主机ip或者主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 声明一个队列:客户端向服务器发送数据的队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 启动消费者,用来处理客户端发送到队列的消息
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 获取参数
                String message = new String(body);
                int n = Integer.parseInt(message);
                // 模拟服务端的一个功能
                String fib = handleInterface(n) + "";
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(properties.getCorrelationId())
                        .build();

                // 将结果返回会客户端
                // 注意从properties去获取客户端传送过来的信息,再返回回去
                channel.basicPublish("", properties.getReplyTo(), replyProps, fib.getBytes());
            }
        });
    }

    private static int handleInterface(int n) {
        if (n == 0) {
            return 0;
        }
        return n + 2;
    }
}



/**
 * 〈一句话功能简述〉<br>
 * 〈RPC模型〉
 *
 * @author hanxinghua
 * @create 2022/9/23
 * @since 1.0.0
 */
public class Client {

    private final static String QUEUE_NAME = "rpc-01";

    public static void main(String[] args) throws Exception {
        // 创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ所在主机ip或者主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 声明一个队列(换了一种方式),用于存储服务器返回到客户端的数据
        String replyQueueName = channel.queueDeclare().getQueue();

        // 使用UUID随机生成一个id
        final String correlationId = UUID.randomUUID().toString();

        // 客户端发送给服务器添加的额外属性
        AMQP.BasicProperties props = new AMQP.BasicProperties()
                .builder()
                .correlationId(correlationId)
                .replyTo(replyQueueName)
                .build();

        // 客户端将数据发送到发送队列
        channel.basicPublish("", QUEUE_NAME, props, "4".getBytes());

        // 启动消费者,用来客户端从相应队列获取到处理的结果
        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                // 通过correlationId去保证获取到的是正确的信息
                if (properties.getCorrelationId().equals(correlationId)) {
                    // 处理的结果输出
                    System.out.println("RPC返回结果:" + new String(body));
                }

                // 关闭通道,注意一定要等结果返回后再关闭,不然拿不到返回的数据
                try {
                    channel.close();
                    connection.close();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
经验分享 程序员 微信小程序 职场和发展