目录

代码随记 SpringBoot MQ

微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用,也可以叫同步通讯。但在很多场景下,我们可能需要采用异步通讯的方式

  • 例如支付服务
    • 1、扣减余额
    • 2、更新交易流水
    • 3、更新订单状态
  • 如果切换到异步我们可以
    • 1、扣减余额
    • 2、更新交易流水和订单状态,同时执行
      • 发送一条 消息Broker,所有相关的微服务订阅 消息通知,一旦消息到达了 Broker 那么所有订阅了的微服务同时处理业务。 综上,异步调用的优势包括:
  • 耦合度更低
  • 性能更好
  • 业务拓展性强
  • 故障隔离,避免级联失败
  mq:
    image: rabbitmq:3.8-management
    container_name: mq
    hostname: mq
    environment:
      - RABBITMQ_DEFAULT_USER=itheima
      - RABBITMQ_DEFAULT_PASS=123321
    # volumes:
    #   - ./mq/plugins:/plugins
    ports:
      - "15672:15672"
      - "5672:5672"
    restart: unless-stopped
  • 分享几个概念
    • publisher 消息发送者
    • consumer 消息消费者
    • queue 队列 存储消息
    • exchange 交换机,负责路由信息

默认情况下,MQ会依次将消息轮询分配给每一个消费者,不会考虑消息堆积的情况

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

广播给所有队列,下面输出结果是两边队列都收到了

@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String message) throws InterruptedException {
        System.out.println("fanout.queue1 ... 监听到了简单队列消息:" + message + ", " + LocalTime.now());
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String message) throws InterruptedException {
        System.out.println("fanout.queue2 ... 监听到了简单队列消息:" + message + ", " + LocalTime.now());
    }

}
@SpringBootTest
class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testWorkQueue() throws Exception {
        String queueName = "work.queue";
        for (int i = 0; i < 50; i++) {
            String message = "Hello, RabbitMQ! " + i;
            rabbitTemplate.convertAndSend(queueName, message);
        }
    }
}

给 Queue 绑定一个 Key,发生消息的 RoutingKey。

队列1绑定了 red blue,队列2绑定了 red yellow

@Test
public void testDirectQueue() throws Exception {
    // 交换机名称
    String exchangeName = "hmall.direct";
    rabbitTemplate.convertAndSend(exchangeName, "red", "reddd");
    rabbitTemplate.convertAndSend(exchangeName, "blue", "blueeee");
    rabbitTemplate.convertAndSend(exchangeName, "yellow", "yellow");
}
// direct.queue1 ... 监听到了简单队列消息:reddd, 21:28:19.889317300
// direct.queue2 ... 监听到了简单队列消息:blueeee, 21:28:19.889317300

// direct.queue1 ... 监听到了简单队列消息:blueeee, 21:28:19.891458400
// direct.queue2 ... 监听到了简单队列消息yellow, 21:28:19.891458400

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词 举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu

  • item.*:只能匹配item.spu

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:

    • china.news
    • china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:

    • china.news
    • japan.news
@Test
public void testSendTopicExchange() {
    // 交换机名称
    String exchangeName = "hmall.topic";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", "china.news");
    rabbitTemplate.convertAndSend(exchangeName, "A.news", "A.news");
    rabbitTemplate.convertAndSend(exchangeName, "china.B", "china.B");
}

// topic.queue2 ... 监听到了简单队列消息:china.news, 22:03:57.451952900
// topic.queue1 ... 监听到了简单队列消息:china.news, 22:03:57.451952900

// topic.queue1 ... 监听到了简单队列消息:china.B, 22:03:57.454631300
// topic.queue2 ... 监听到了简单队列消息A.news, 22:03:57.454631300

利用代码生成交换机 队列 绑定关系

@Configuration
public class FanoutConfiguration {
    @Bean
    public FanoutExchange fanoutExchange() {
        // return new FanoutExchange("hmall.fanout");
        return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
    }

    @Bean
    public Queue fanoutQueue1() {
        // return new Queue("fanout.queue1");
        return QueueBuilder.durable("fanout.queue1").build();
    }

    @Bean
    public Queue fanoutQueue2() {
        // return new Queue("fanout.queue1");
        return QueueBuilder.durable("fanout.queue2").build();
    }

    @Bean
    public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
}

引入依赖

<dependency>  
    <groupId>com.fasterxml.jackson.dataformat</groupId>  
    <artifactId>jackson-dataformat-xml</artifactId>  
    <version>2.9.10</version>  
</dependency>
@SpringBootApplication  
public class PublisherApplication {  
    public static void main(String[] args) {  
        SpringApplication.run(PublisherApplication.class);  
    }  
  
    @Bean  
    public Jackson2JsonMessageConverter messageConverter() {  
        // 1.定义消息转换器  
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();  
        // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息  
        jackson2JsonMessageConverter.setCreateMessageIds(true);  
        return jackson2JsonMessageConverter;  
    }  
}
@Test  
public void testSendObject() {  
    Map<String, Object> msg = new HashMap<>();  
    msg.put("name", "itcast");  
    msg.put("age", 25);  
    // 发送消息  
    rabbitTemplate.convertAndSend("object.queue", msg);  
}
<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>

先设置反序列化

@Configuration
public class MqConfig {
    @Bean
    public Jackson2JsonMessageConverter messageConverter() {
        // 1.定义消息转换器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
        jackson2JsonMessageConverter.setCreateMessageIds(true);
        return jackson2JsonMessageConverter;
    }
}

创建一个队列:trade.pay.success.queue 创建一个交换机:pay.direct

然后交给代码去,绑定并监听。给交换机 pay.direct,绑定一个队列 trade.pay.success.queuekey=pay.success。交易成功后,更新订单数据。

@Component  
@RequiredArgsConstructor  
public class PayStatusListener {  
  
    private final IOrderService orderService;  
  
    @RabbitListener(bindings = @QueueBinding(  
            value = @Queue(value = "trade.pay.success.queue", durable = "true"),  
            exchange = @Exchange(value = "pay.direct"),  
            key = "pay.success"  
    ))  
    public void listenPaySuccess(long orderId) {  
        orderService.markOrderPaySuccess(orderId);  
    }  
}

下面这里,发送到交换机 pay.directkey=pay.success。那么就由上面监听方法触发到了。

@Override
@Transactional
public void tryPayOrderByBalance(PayOrderFormDTO payOrderFormDTO) {
    // 修改订单状态
    try {
        rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());
    } catch (Exception e) {
        log.error("发送支付成功消息异常, id: {}", po.getBizOrderNo(), e);
    }
}

最后:我们不需要太关心队列,只需要将 队列交换机 绑定了,那么之后是直接操作 交换机key

  • 发送者重连
    • 缺点是,当前线程会阻塞式,影响业务性能
spring:  
  rabbitmq:  
    host: 127.0.0.1  
    port: 5672  
    username: hmall  
    password: 123  
    virtual-host: /hmall  
    connection-timeout: 1s # 设置MQ的连接超时时间  
    template:  
      retry:  
        enabled: true # 开启超时重试机制  
        initial-interval: 1000ms # 失败后的初始等待时间  
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier        max-attempts: 3 # 最大重试次数
  • 发送者确认

    • 开启后,当发送者发送消息给MQ,MQ会返回确认结果给发送者
    • 成功发送 ACK
    • 失败发生 NACK
  • none:关闭confirm机制

  • simple:同步阻塞等待MQ的回执

  • correlated:MQ异步回调返回回执

路由失败(交换机到队列),才触发ReturnCallback机制,返回异常信息; 消息投递(发送者到交换机)成功,ConfirmCallback机制返回ack,投递失败返回nck

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

ReturnCallback

package com.itheima.publisher.config;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnsCallback(returned -> {
            log.error("触发return callback,");
            log.debug("exchange: {}", returned.getExchange());
            log.debug("routingKey: {}", returned.getRoutingKey());
            log.debug("message: {}", returned.getMessage());
            log.debug("replyCode: {}", returned.getReplyCode());
            log.debug("replyText: {}", returned.getReplyText());
        });
    }
}

ConfirmCallback

我们给 cd 添加了一个回调函数,将来成功和失败的时候就会触发

@Test
void testPublisherConfirm() throws InterruptedException {
    // 1.创建CorrelationData
    CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
    // 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            // 2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("send message fail", ex);
        }

        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
            if (result.isAck()) { // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");
            } else { // result.getReason(),String类型,返回nack时的异常描述
                log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
            }
        }
    });
    // 3.发送消息
    rabbitTemplate.convertAndSend("hmall.direct", "blue", "hello", cd);
    // 我们是在单元测试里面,因此需要等待回调执行
    Thread.sleep(2000);
}

主要注意,我们是在单元测试里面,因此需要等待回调执行,并且开启日志

logging:  
  level:  
    root: DEBUG

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活
  • auto:自动模式 (默认)。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或校验异常,自动返回reject;

有一个比较有意思的点是,auto默认模式下,如果是 RuntimeException 运行异常他会一直在队列中重试 nack,如果抛出的是 MessageConversionException 这种消息处理或校验异常则会直接丢弃 reject

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 不做处理
  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回reject,消息会被丢弃
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

消息重试策略

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
    • 好处是会等待,没那么频繁去重试。
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
    • 一般会将消息传递给 error.direct 交换机,由专门的人去处理
@Configuration
@RequiredArgsConstructor
public class ErrorMessageConfiguration {

    @Bean
    public DirectExchange errorMessageExchange() {
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue", true);
    }

    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) {
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(
                rabbitTemplate,
                "error.direct",
                "error"
        );
    }
}

在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

  • 根据id删除数据
  • 查询数据
  • 新增数据 但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:
  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

这里给出两种方案:

  • 唯一消息ID(一般不推荐)

每一条消息都生成一个唯一的id,与消息一起投递给消费者。 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

那么这个时候就需要用 Message 接消息

@RabbitListener(queues = "simple.queue")  
public void listenSimpleQueue(Message message) {  
    log.info("监听到了简单队列消息:ID: {}", message.getMessageProperties().getMessageId());  
    log.info("监听到了简单队列消息:Body: {}", new String(message.getBody()));  
    // 监听到了简单队列消息:ID: 77cc89bc-8ee2-4e12-b9f8-b690c04f6aeb  
    // 监听到了简单队列消息:Body: "Hello, RabbitMQ!"  
    // throw new RuntimeException("模拟异常");  
}
  • 业务状态判断 这个其实也很简单,就是判断下订单是否已经设置了
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "trade.pay.success.queue", durable = "true"),
        exchange = @Exchange(value = "pay.direct"),
        key = "pay.success"
))
public void listenPaySuccess(long orderId) {
    // 1.查询订单
    Order old = orderService.getById(orderId);
    // 2.判断订单状态
    if (old == null || old.getStatus() != 1) {
        // 订单不存在或者订单状态不是1,放弃处理
        return;
    }
    // 3.标记订单已支付
    orderService.markOrderPaySuccess(orderId);
}

消费者不会立即收到消息,而是会等待一段时间。

  • 场景:

    • 1、下单 交易服务 订单未支付——商品服务 扣减库存
    • 2、等待用户付款,因此 交易服务 发送延迟消息 30分钟,30分钟以后再确认是否已支付
  • 插件

  • 声明延迟交换机

    • 一定要指定交换机是 delayed
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于Bean方式

package com.itheima.consumer.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交换机类型和名称
                .delayed() // 设置delay的属性为true
                .durable(true) // 持久化
                .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}
  • 发送延迟消息:5秒后收到
@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}