代码随记 SpringBoot MQ
1 基本知识
微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用,也可以叫同步通讯。但在很多场景下,我们可能需要采用异步通讯的方式
- 例如支付服务
- 1、扣减余额
- 2、更新交易流水
- 3、更新订单状态
- 如果切换到异步我们可以
- 1、扣减余额
- 2、更新交易流水和订单状态,同时执行
- 发送一条
消息到Broker,所有相关的微服务订阅消息通知,一旦消息到达了Broker那么所有订阅了的微服务同时处理业务。 综上,异步调用的优势包括:
- 发送一条
- 耦合度更低
- 性能更好
- 业务拓展性强
- 故障隔离,避免级联失败
mq:
image: rabbitmq:3.8-management
container_name: mq
hostname: mq
environment:
- RABBITMQ_DEFAULT_USER=itheima
- RABBITMQ_DEFAULT_PASS=123321
# volumes:
# - ./mq/plugins:/plugins
ports:
- "15672:15672"
- "5672:5672"
restart: unless-stopped- 分享几个概念
- publisher 消息发送者
- consumer 消息消费者
- queue 队列 存储消息
- exchange 交换机,负责路由信息
2 机制
默认情况下,MQ会依次将消息轮询分配给每一个消费者,不会考虑消息堆积的情况
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息3 交换机
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
3.1 Fanout
广播给所有队列,下面输出结果是两边队列都收到了
@Component
@Slf4j
public class SpringRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) throws InterruptedException {
System.out.println("fanout.queue1 ... 监听到了简单队列消息:" + message + ", " + LocalTime.now());
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) throws InterruptedException {
System.out.println("fanout.queue2 ... 监听到了简单队列消息:" + message + ", " + LocalTime.now());
}
}@SpringBootTest
class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testWorkQueue() throws Exception {
String queueName = "work.queue";
for (int i = 0; i < 50; i++) {
String message = "Hello, RabbitMQ! " + i;
rabbitTemplate.convertAndSend(queueName, message);
}
}
}3.2 Direct
给 Queue 绑定一个 Key,发生消息的 RoutingKey。
队列1绑定了 red blue,队列2绑定了 red yellow
@Test
public void testDirectQueue() throws Exception {
// 交换机名称
String exchangeName = "hmall.direct";
rabbitTemplate.convertAndSend(exchangeName, "red", "reddd");
rabbitTemplate.convertAndSend(exchangeName, "blue", "blueeee");
rabbitTemplate.convertAndSend(exchangeName, "yellow", "yellow");
}
// direct.queue1 ... 监听到了简单队列消息:reddd, 21:28:19.889317300
// direct.queue2 ... 监听到了简单队列消息:blueeee, 21:28:19.889317300
// direct.queue1 ... 监听到了简单队列消息:blueeee, 21:28:19.891458400
// direct.queue2 ... 监听到了简单队列消息:yellow, 21:28:19.8914584003.3 Topic
通配符规则:
#:匹配一个或多个词*:匹配不多不少恰好1个词 举例:item.#:能够匹配item.spu.insert或者item.spuitem.*:只能匹配item.sputopic.queue1:绑定的是china.#,凡是以china.开头的routing key都会被匹配到,包括:china.newschina.weather
topic.queue2:绑定的是#.news,凡是以.news结尾的routing key都会被匹配。包括:china.newsjapan.news
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "hmall.topic";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", "china.news");
rabbitTemplate.convertAndSend(exchangeName, "A.news", "A.news");
rabbitTemplate.convertAndSend(exchangeName, "china.B", "china.B");
}
// topic.queue2 ... 监听到了简单队列消息:china.news, 22:03:57.451952900
// topic.queue1 ... 监听到了简单队列消息:china.news, 22:03:57.451952900
// topic.queue1 ... 监听到了简单队列消息:china.B, 22:03:57.454631300
// topic.queue2 ... 监听到了简单队列消息:A.news, 22:03:57.4546313003.4 代码生成
利用代码生成交换机 队列 绑定关系
@Configuration
public class FanoutConfiguration {
@Bean
public FanoutExchange fanoutExchange() {
// return new FanoutExchange("hmall.fanout");
return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
}
@Bean
public Queue fanoutQueue1() {
// return new Queue("fanout.queue1");
return QueueBuilder.durable("fanout.queue1").build();
}
@Bean
public Queue fanoutQueue2() {
// return new Queue("fanout.queue1");
return QueueBuilder.durable("fanout.queue2").build();
}
@Bean
public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
}3.5 序列化
引入依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>@SpringBootApplication
public class PublisherApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class);
}
@Bean
public Jackson2JsonMessageConverter messageConverter() {
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
}@Test
public void testSendObject() {
Map<String, Object> msg = new HashMap<>();
msg.put("name", "itcast");
msg.put("age", 25);
// 发送消息
rabbitTemplate.convertAndSend("object.queue", msg);
}3.6 总结
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>先设置反序列化
@Configuration
public class MqConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter() {
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
}创建一个队列:trade.pay.success.queue
创建一个交换机:pay.direct
然后交给代码去,绑定并监听。给交换机 pay.direct,绑定一个队列 trade.pay.success.queue,key=pay.success。交易成功后,更新订单数据。
@Component
@RequiredArgsConstructor
public class PayStatusListener {
private final IOrderService orderService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "trade.pay.success.queue", durable = "true"),
exchange = @Exchange(value = "pay.direct"),
key = "pay.success"
))
public void listenPaySuccess(long orderId) {
orderService.markOrderPaySuccess(orderId);
}
}下面这里,发送到交换机 pay.direct,key=pay.success。那么就由上面监听方法触发到了。
@Override
@Transactional
public void tryPayOrderByBalance(PayOrderFormDTO payOrderFormDTO) {
// 修改订单状态
try {
rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());
} catch (Exception e) {
log.error("发送支付成功消息异常, id: {}", po.getBizOrderNo(), e);
}
}最后:我们不需要太关心队列,只需要将
队列和交换机绑定了,那么之后是直接操作交换机和key。
4 高级
4.1 发送者重连
- 发送者重连
- 缺点是,当前线程会阻塞式,影响业务性能
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: hmall
password: 123
virtual-host: /hmall
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier max-attempts: 3 # 最大重试次数4.2 发送者确认
发送者确认
- 开启后,当发送者发送消息给MQ,MQ会返回确认结果给发送者
- 成功发送 ACK
- 失败发生 NACK
none:关闭confirm机制simple:同步阻塞等待MQ的回执correlated:MQ异步回调返回回执
路由失败(交换机到队列),才触发ReturnCallback机制,返回异常信息; 消息投递(发送者到交换机)成功,ConfirmCallback机制返回ack,投递失败返回nck
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制ReturnCallback
package com.itheima.publisher.config;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnsCallback(returned -> {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
});
}
}ConfirmCallback
我们给 cd 添加了一个回调函数,将来成功和失败的时候就会触发
@Test
void testPublisherConfirm() throws InterruptedException {
// 1.创建CorrelationData
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
// 2.给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1.Future发生异常时的处理逻辑,基本不会触发
log.error("send message fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
if (result.isAck()) { // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
log.debug("发送消息成功,收到 ack!");
} else { // result.getReason(),String类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
});
// 3.发送消息
rabbitTemplate.convertAndSend("hmall.direct", "blue", "hello", cd);
// 我们是在单元测试里面,因此需要等待回调执行
Thread.sleep(2000);
}主要注意,我们是在单元测试里面,因此需要等待回调执行,并且开启日志
logging:
level:
root: DEBUG4.3 消费者确认
由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活auto:自动模式 (默认)。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:- 如果是业务异常,会自动返回
nack; - 如果是消息处理或校验异常,自动返回
reject;
- 如果是业务异常,会自动返回
有一个比较有意思的点是,auto默认模式下,如果是 RuntimeException 运行异常他会一直在队列中重试 nack,如果抛出的是 MessageConversionException 这种消息处理或校验异常则会直接丢弃 reject。
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理- 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring会返回reject,消息会被丢弃
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false消息重试策略
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队- 好处是会等待,没那么频繁去重试。
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机- 一般会将消息传递给
error.direct交换机,由专门的人去处理
- 一般会将消息传递给
@Configuration
@RequiredArgsConstructor
public class ErrorMessageConfiguration {
@Bean
public DirectExchange errorMessageExchange() {
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue() {
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) {
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(
rabbitTemplate,
"error.direct",
"error"
);
}
}4.4 幂等性
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:
- 根据id删除数据
- 查询数据
- 新增数据 但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:
- 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
- 退款业务。重复退款对商家而言会有经济损失。
这里给出两种方案:
- 唯一消息ID(一般不推荐)
每一条消息都生成一个唯一的id,与消息一起投递给消费者。 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}那么这个时候就需要用 Message 接消息
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(Message message) {
log.info("监听到了简单队列消息:ID: {}", message.getMessageProperties().getMessageId());
log.info("监听到了简单队列消息:Body: {}", new String(message.getBody()));
// 监听到了简单队列消息:ID: 77cc89bc-8ee2-4e12-b9f8-b690c04f6aeb
// 监听到了简单队列消息:Body: "Hello, RabbitMQ!"
// throw new RuntimeException("模拟异常");
}- 业务状态判断 这个其实也很简单,就是判断下订单是否已经设置了
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "trade.pay.success.queue", durable = "true"),
exchange = @Exchange(value = "pay.direct"),
key = "pay.success"
))
public void listenPaySuccess(long orderId) {
// 1.查询订单
Order old = orderService.getById(orderId);
// 2.判断订单状态
if (old == null || old.getStatus() != 1) {
// 订单不存在或者订单状态不是1,放弃处理
return;
}
// 3.标记订单已支付
orderService.markOrderPaySuccess(orderId);
}4.5 延迟消息
消费者不会立即收到消息,而是会等待一段时间。
场景:
- 1、下单
交易服务订单未支付——商品服务扣减库存 - 2、等待用户付款,因此
交易服务发送延迟消息30分钟,30分钟以后再确认是否已支付
- 1、下单
插件
- https://github.com/rabbitmq/rabbitmq-delayed-message-exchange 他可以将普通交换机改造成支持延迟消息的交换机
声明延迟交换机
- 一定要指定交换机是
delayed
- 一定要指定交换机是
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}基于Bean方式
package com.itheima.consumer.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class DelayExchangeConfig {
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}
@Bean
public Queue delayedQueue(){
return new Queue("delay.queue");
}
@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}- 发送延迟消息:5秒后收到
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}