T O P

[资源分享]     小白快速入门RabbitMQ,搭建Springboot+RabbitMQ带你避坑

  • By - 楼主

  • 2019-12-20 17:09:03
  • 快速入门RabbitMQ

    	RabbitMQ是一种消息中间件,它接收和转发消息,可以理解为邮局。
    	只是RabbitMQ接收,处理,转发的是二进制的数据,邮局处理的一般为纸。
    

    基本概念

    Producer(生产者): 发送消息的程序
    Consumer(消费者):接收消息的程序
    Queue(队列):像邮局的信箱,在RabbitMQ内部,同一个消息流只能存在一个Queue中,队列只受主机内存,磁盘的大小限制。 生产者像Queue中发送消息,消费者从Queue中取出消息

    详细介绍

    	以熟悉的电商场景为例,如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?这就需要消息队列登场了。
    
    	消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。
    	
    	消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。
    	如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。
    
    	RabbitMQ就是这样一款我们苦苦追寻的消息队列。
    	RabbitMQ是一个开源的消息代理的队列服务器,用来通过普通协议在完全不同的应用之间共享数据。
    
    	RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。
    	Erlang语言在数据交互方面性能优秀,有着和原生Socket一样的延迟,这也是RabbitMQ高性能的原因所在。
    	可谓“人如其名”,RabbitMQ像兔子一样迅速。
    

    在安装使用RabbitMQ必须先有Erlang 的基础

    什么是Erlang

    Erlang(['ə:læŋ])是一种通用的面向并发的编程语言,它由瑞典电信设备制造商
    爱立信所辖的CS-Lab开发,目的是创造一种可以应对大规模并发活动的编程语言和运行环境。Erlang问世于1987年,经过十年的发展,于1998年发布开源版本。Erlang是运行于虚拟机的解释性语言,但是现在也包含有乌普萨拉大学高性能Erlang计划(HiPE)开发的本地代码编译器,自R11B-4版本开始,Erlang也开始支持脚本式解释器。

    在编程范型上,Erlang属于多重范型编程语言,涵盖函数式、并发式及分布式。顺序执行的Erlang是一个及早求值, 单次赋值和动态类型的函数式编程语言。Erlang是一个结构化,动态类型编程语言,内建并行计算支持。最初是由爱立信专门为通信应用设计的,比如控制交换机或者变换协议等,因此非常适 合于构建分布式,实时软并行计算系统。使用Erlang编写出的应用运行时通常由成千上万个轻量级进程组成,并通过消息传递相互通讯。进程间上下文切换对于Erlang来说仅仅 只是一两个环节,比起C程序的线程切换要高效得多得多了。使用Erlang来编写分布式应用要简单的多,因为它的分布式机制是透明的:对于程序来说并不知道自己是在分布式运行。Erlang运行时环境是一个虚拟机,有点像Java虚拟机,这样代码一经编译,同样可以随处运行。

    它的运行时系统甚至允许代码在不被中断 的情况下更新。另外如果需要更高效的话,字节代码也可以编译成本地代码运行。

    Erlang是具有多重范型的编程语言,具有很多特点,主要的特点有以下几个:

    1.函数式
    2.并发性
    3.分布式
    4.健壮性
    5.软实时
    6.热更新
    7.递增式代码加载
    8.动态类型
    9.解释型

    1. RabbitMQ基础概念

    通常我们谈到消息队列服务, 会有三个概念: 发消息者、消息队列、收消息者。RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和队列之间, 加入了交换器 (Exchange)。这样发消息者和消息队列就没有直接联系,转而变成发消息者把消息发给交换器,交换器根据调度策略再把消息转发给消息队列。
    消息生产者并没有直接将消息发送给消息队列,而是通过建立与Exchange的Channel,将消息发送给Exchange。Exchange根据路由规则,将消息转发给指定的消息队列。消息队列储存消息,等待消费者取出消息。消费者通过建立与消息队列相连的Channel,从消息队列中获取消息。

    在这里插入图片描述
    1.Channel(信道):多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道。
    2.Producer(消息的生产者):向消息队列发布消息的客户端应用程序。
    3.Consumer(消息的消费者):从消息队列取得消息的客户端应用程序。
    4.Message(消息):消息由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(消息优先权)、delivery-mode(是否持久性存储)等。
    5.Routing Key(路由键):消息头的一个属性,用于标记消息的路由规则,决定了交换机的转发路径。最大长度255 字节。
    6.Queue(消息队列):存储消息的一种数据结构,用来保存消息,直到消息发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将消息取走。需要注意,当多个消费者订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,每一条消息只能被一个订阅者接收。
    7.Exchange(交换器|路由器):提供Producer到Queue之间的匹配,接收生产者发送的消息并将这些消息按照路由规则转发到消息队列。交换器用于转发消息,它不会存储消息 ,如果没有 Queue绑定到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。交换器有四种消息调度策略(下面会介绍),分别是fanout, direct, topic, headers。
    8.Binding(绑定):用于建立Exchange和Queue之间的关联。一个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规则,所以可以将交换器理解成一个由Binding构成的路由表。
    6.Binding Key(绑定键):Exchange与Queue的绑定关系,用于匹配Routing Key。最大长度255 字节。
    7.Broker:RabbitMQ Server,服务器实体。

    第一步新建springboot项目,添加依赖

    <dependencies>
    	<dependency>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-amqp</artifactId>
    	</dependency>
    	<dependency>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-web</artifactId>
    	</dependency>
    	<dependency>
    		<groupId>org.projectlombok</groupId>
    		<artifactId>lombok</artifactId>
    	</dependency>
    
    	<dependency>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-test</artifactId>
    		<scope>test</scope>
    	</dependency>
    </dependencies>
    

    在 application.yml文件中配置rabbitmq相关内容

    spring:
     	rabbitmq:
    	    host: localhost
    	    port: 5672
    	    username: guest
    	    password: guest
    

    第二步具体编码实现

    1.配置队列

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    @Slf4j
    public class DelayRabbitConfig {
        /**
         * 延迟队列 TTL 名称
         */
        private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";
        /**
         * DLX,dead letter发送到的 exchange
         * 延时消息就是发送到该交换机的
         */
        public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";
        /**
         * routing key 名称
         * 具体消息发送在该 routingKey 的
         */
        public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";
    
        public static final String ORDER_QUEUE_NAME = "user.order.queue";
        public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";
        public static final String ORDER_ROUTING_KEY = "order";
    
        /**
         * 延迟队列配置
         * <p>
         * 1、params.put("x-message-ttl", 5 * 1000);
         * 第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先)
         * 2、rabbitTemplate.convertAndSend(book, message -> {
         * message.getMessageProperties().setExpiration(2 * 1000 + "");
         * return message;
         * });
         * 第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制
         **/
        @Bean
        public Queue delayOrderQueue() {
            Map<String, Object> params = new HashMap<>();
            // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
            params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME);
            // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
            params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);
            return new Queue(ORDER_DELAY_QUEUE, true, false, false, params);
        }
        /**
         * 需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
         * 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,
         * 不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
         * @return DirectExchange
         */
        @Bean
        public DirectExchange orderDelayExchange() {
            return new DirectExchange(ORDER_DELAY_EXCHANGE);
        }
        @Bean
        public Binding dlxBinding() {
            return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);
        }
    
        @Bean
        public Queue orderQueue() {
            return new Queue(ORDER_QUEUE_NAME, true);
        }
        /**
         * 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
         * 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
         **/
        @Bean
        public TopicExchange orderTopicExchange() {
            return new TopicExchange(ORDER_EXCHANGE_NAME);
        }
    
        @Bean
        public Binding orderBinding() {
            // TODO 如果要让延迟队列之间有关联,这里的 routingKey 和 绑定的交换机很关键
    	        return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);
        }
    
    }
    

    2.创建一个Order实体类
    package com.amqp.demo.entity;

    import lombok.Data;
    
    import java.io.Serializable;
    @Data
    public class Order implements Serializable {
        private static final long serialVersionUID = -2221214252163879885L;
    
        private String orderId; // 订单id
    
        private Integer orderStatus; // 订单状态 0:未支付,1:已支付,2:订单已取消
    
        private String orderName; // 订单名字
    }
    

    3.接收者
    package com.amqp.demo.config;

    import com.amqp.demo.entity.Order;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    
    
    import java.util.Date;
    
    /**
     * 接受者
     */
    @Component
    @Slf4j
    public class DelayReceiver {
        private static Logger log = LoggerFactory.getLogger(DelayReceiver.class);
    
        @RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME})
        public void orderDelayQueue(Order order, Message message, Channel channel) {
            log.info("###########################################");
            log.info("【orderDelayQueue 监听的消息】 - 【消费时间】 - [{}]- 【订单内容】 - [{}]",  new Date(), order.toString());
            if(order.getOrderStatus() == 0) {
                order.setOrderStatus(2);
                log.info("【该订单未支付,取消订单】" + order.toString());
            } else if(order.getOrderStatus() == 1) {
                log.info("【该订单已完成支付】");
            } else if(order.getOrderStatus() == 2) {
                log.info("【该订单已取消】");
            }
            log.info("###########################################");
        }
    }
    

    4.发送者
    package com.amqp.demo.config;

    import com.amqp.demo.entity.Order;
    import lombok.extern.slf4j.Slf4j;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * 发送者
     */
    @Component
    @Slf4j
    public class DelaySender {
        @Autowired
        private AmqpTemplate amqpTemplate;
        private static Logger log = LoggerFactory.getLogger(DelayReceiver.class);
        public void sendDelay(Order order) {
            log.info("【订单生成时间】" + new Date().toString() +"【1分钟后检查订单是否已经支付】" + order.toString() );
            this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> {
                // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间
                message.getMessageProperties().setExpiration(1 * 1000 * 60 + "");
                return message;
            });
        }
    

    }
    5.测试,访问http://localhost:8080/sendDelay,查看日志输出

    package com.amqp.demo.controller;
    
    import com.amqp.demo.entity.Order;
    import com.amqp.demo.config.DelaySender;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class TestController {
        @Autowired
        private DelaySender delaySender;
    
        @GetMapping("/sendDelay")
        public Object sendDelay() {
            Order order1 = new Order();
            order1.setOrderStatus(0);
            order1.setOrderId("123456");
            order1.setOrderName("小米6");
    
            Order order2 = new Order();
            order2.setOrderStatus(1);
            order2.setOrderId("456789");
            order2.setOrderName("小米8");
    
            delaySender.sendDelay(order1);
            delaySender.sendDelay(order2);
            return "ok";
        }
    }
    

    网页输出:ok:即可 查看日志

    本帖子中包含资源

    您需要 登录 才可以下载,没有帐号?立即注册