RabbitMQ初探:死信队列、延迟队列、保证消息可靠性

在之前,已对RabbitMQ的一些基本知识做了一些了解,详见RabbitMq:6种工作模式通俗理解

本文主要介绍RabbitMQ的死信队列、延迟队列、保证消息可靠性等的方法和实现,本文在正式开始之前补充交换机类型知识。

交换机分类:

  • 扇形交换机 fanoutExchange
  • 直连交换机 directExchange
  • 主题交换机 topicExchange
  • 头交换机 headersExchange

死信队列(Dead-Letter-Exchange)

可以称为死信交换器,也有人称之为死信邮箱。

当消息在一个队列中变成死信 (deal message) 之后,它能被重新被发送到另一个交换机中(实际上也可以直接使用已存在的交换机作为死信交换机,如一个正常交换机,只要队列指定他为死信交换机),这个交换机就是 DLX ,绑定 DLX 的队列就称之为死信队列。

上述的死信是什么呢?——没有被消费者使用的信息
消息变成死信 般是由于以下几种情况:

  • 消息被拒绝 (BasicReject/BasicNack),井且设置 requeue 参数为 false,即被拒绝也不重新入队的消息;
  • 消息过期,当消息设置了过期时间后,消息过期就成为死信,当队列设置了过期时间后,消息在队列中存在的时间大于队列过期时间,变为死信;
  • 队列达到最大长度后,新来的消息进入不了队列,只能进入死信队列。

DLX 是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定。实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息发布到设置的 DLX ,进而被路由到另一个队列,即死信队列。

死信队列也是一个正常的队列,只不过他保存死信而已,我们仍然可以像正常的队列那样去消费里面的信息。

以下是他的简单结构图(来源于网络):
image.png

图中存在两个交换机,一个正常交换机,一个死信交换机,实际上我们也可以直接使用一个交换机Exchange,将死信队列直接绑定到Exchange即可,只需要正常队列去指定死信交换机为Exchange和死信队列即可。

例子:此文不给出具体的例子,将会在文后附一个说明性的通用例子,如需查看直接滑至文章底部。

延迟队列

延迟队列是死信队列的一种情况。

延迟队列存储的对象是对应的延迟消息,所谓"延迟消息"是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费延迟队列的使用场景有很多,比如(摘自《RabbitMQ实战指南》):

  • 在订单系统中, 个用户下单之后通常有 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延迟队列来处理这些订单了
  • 用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。

延迟队列示意图:
image.png

此时你应该看出来了,这就是死信队列,对不同延迟时间的消息存放在不同的正常队列中,消息过期后,然后放进不同的死信队列,只不过消费者只从死信队列中取消息而已。(建议:最好设置队列过期时间)

此时可能会有人产生疑问,根据死信队列的介绍,成为死信可以是到达了消息的过期时间,也可以是到达了队列的过期时间,那么我们为什么不直接设置消息的过期时间,然后不同过期时间的消息都放在一个一个队列中呢?到了时间,自动去到对应的死信队列?

上述是不行的,这是由于队列的性质决定,队列是先进先出结构,假设第一条消息过期时间为10s,第二条为5s,当到达10s后,取出第一条消息,放入10s的死信队列,此时才能取第二条消息,但是实际上第二条消息早就应该放进5s的死信队列了,而此时才刚放入,这显然是不符合要求的。

例子:此文不给出具体的例子,将会在文后附一个说明性的通用例子,如需查看直接滑至文章底部。

保障消息的可靠性

首先我们应该开启交换机、队列、消息的持久化,保证出现断电之类的情况后可以恢复。然后消息的投递大致可以分为以下3步,示意图如下:
image.png

3步分别是:

  • (1)消息生产者将消息投递到交换机,Producer ——> Exchange
  • (2)消息生产者将消息投递到交换机,Exchange ——> Queue
  • (3)消息生产者将消息投递到交换机,Queue ——> Consumer

(1)保证Producer ——> Exchange

保证该步骤,依次做如下操作
1.开启RabbitMQ的确认功能 publisher-confirm-type: correlated
2.实现RabbitTemplate.ConfirmCallback接口,该接口只有一个方法confirm,当Producer发送消息后,mq将会调用该方法,并注入参数值,当我们实现该方法后,将该接口实现类初始化到rabbitTemplate中(见文章末尾代码示例),该接口解释及简易实现如下:

/**
* 参数correlationData为相关数据,该数据在我们发送消息时设置,一般为设置一个唯一ID
* 参数b为消息成功标识,当为true表示成功发送到Exchange,false表示未成功,当未成功后,我们可以在该方法进行处理。
* 参数s表示原因,即当b为false时,s表示出现false的原因
*/
public void confirm(CorrelationData correlationData, boolean b, String s) {
    if (b){
       log.info("消息发送成功");
       return;
    }
    log.info("消息发送失败");
    log.info("消息的唯一标识:{}",correlationData);
    log.info("未成功原因:{}",s);
}

(2)Exchange ——> Queue

保证该步骤,依次做如下操作:
1.开启returns功能 publisher-returns: true
2.实现 RabbitTemplate.ReturnsCallback 接口,该接口只有一个方法 returnedMessage(ReturnedMessage returnedMessage) ,当Exchange向Queue投递消息后,如果出现错误(比如路由key写错找不到queue),调用该方法,并自动注入参数值,当我们实现该方法后,将该接口实现类初始化到rabbitTemplate中(见文章末尾代码示例),该接口解释及简易实现如下:

public void returnedMessage(ReturnedMessage returnedMessage) {
    log.info("消息主体: {}", 
    new String(returnedMessage.getMessage().getBody()));
    log.info("回复编码: {}", returnedMessage.getReplyCode());
    log.info("回复内容: {}", returnedMessage.getReplyText());
    log.info("交换器: {}", returnedMessage.getExchange());
    log.info("路由键: {}", returnedMessage.getRoutingKey());
}

(3)Queue ——> Consumer

默认情况下,当我们取出一条消息后,MQ自动删除队列中的这个消息。但是如果我们并没有成功消费消息,这时候MQ中也已经删除了消息,这显然是不符合需求的。
要保证Queue ——> Consumer,我们可以采用以下步骤:
1.开启消费者手动确认和重试机制:

listener:
      simple:
        #开启消费者手动确认
        acknowledge-mode: manual
        #消息被拒绝是否重新入队 true入队 false不入队
        default-requeue-rejected: true
        retry:
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # true无状态;false有状态。如果业务中包含事务,这里改为false
          stateless: true
          # 初始的失败等待时长为1000ms
          initial-interval: 1000
          # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          multiplier: 1

2.消费者手动确认,给出一个简单例子,并介绍几个API

    @RabbitListener(queues = "normalQueue")
    public void useMsg(Message message, Channel channel)  {
        String msg = new String(message.getBody());
        try{
	//确认消息,参数1:消息的交付标识 参数2:是否批量确认 true批量确认,false不批量
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            log.info("消费消息:{}",msg);
        }catch (Exception e){
            try {
	//拒绝消息:参数1:消息的交付标识 参数2:是否批量确认,true批量确认,false不批量 
	//参数3:是否重新入队 true重新入队false不入队
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                log.info("拒绝消息:{},重新入队",msg);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }

    }

简单例子

给出一个包含了上述一些情况的简易例子,结构如下:

image.png

1.配置文件

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: yanzi
    password: 123456
    virtual-host: yanziMq
    #确认消息的类型,确认消息已发送到交换机(Exchange),
#    这里支持两种类型:simple:同步等待confirm结果,直到超时;# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    publisher-confirm-type: correlated
    # 确认消息投放到队列,开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
    publisher-returns: true
    listener:
      simple:
        #开启消费者手动确认
        acknowledge-mode: manual
        #消息被拒绝是否重新入队 true入队 false不入队
        default-requeue-rejected: true
        retry:
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # true无状态;false有状态。如果业务中包含事务,这里改为false
          stateless: true
          # 初始的失败等待时长为1000ms
          initial-interval: 1000
          # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          multiplier: 1
    template:
      mandatory: true

2.配置MQ结构

@Configuration
public class Config {
    //创建交换机
    @Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange("direxchange").build();
    }
    //创建死信队列
    @Bean
    public Queue deadLetterQueue(){
        return QueueBuilder.durable("deadLetterQueue").build();
    }
    //死信队列绑定到交换价
    @Bean
    public Binding deadBind(Queue deadLetterQueue,DirectExchange directExchange){
        return BindingBuilder.bind(deadLetterQueue).to(directExchange).with("deadkey");
    }
    //创建正常队列,并指定该队列的死信交换机、死信路由key、过期时间
    @Bean
    public Queue normalQueue(){
        return QueueBuilder
                .durable("normalQueue")
                .withArgument("x-dead-letter-exchange","direxchange")
                .withArgument("x-dead-letter-routing-key","deadkey")
                .withArgument("x-message-ttl",20000)
                .build();
    }
    //正常队列绑定交换机
    @Bean
    public Binding normalBind(Queue normalQueue,DirectExchange directExchange){
        return BindingBuilder.bind(normalQueue).to(directExchange).with("normalkey");
    }
}

3.生产消息并实现ConfirmCallback、ReturnsCallback 接口
```Java
@Component
@Slf4j
public class Productor implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
    @Autowired
    RabbitTemplate rabbitTemplate;
    //将ConfirmCallback和ReturnsCallback初始化到 rabbitTemplate
    @PostConstruct
    public void rabbitmqInit(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }
    
    //发送消息
    public void sendMsg() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            Message message = new Message(("xiaoxi:" + i).getBytes());
            log.info("发送消息:"+i);
            rabbitTemplate.sendAndReceive("direxchange","normalkey",message);
        }

    }

    @Override
    //实现ConfirmCallback接口
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        if (b){
            log.info("消息发送成功");
            return;
        }
        log.info("消息发送失败");
        log.info("消息的唯一标识:{}",correlationData);
        log.info("未成功原因:{}",s);
    }

    //实现ReturnsCallback 接口
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("消息主体: {}", new String(returnedMessage.getMessage().getBody()));
        log.info("回复编码: {}", returnedMessage.getReplyCode());
        log.info("回复内容: {}", returnedMessage.getReplyText());
        log.info("交换器: {}", returnedMessage.getExchange());
        log.info("路由键: {}", returnedMessage.getRoutingKey());
    }
}

4.消费消息并手动确认或拒绝

@Component
@Slf4j
public class Consumer {
    RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "normalQueue")
    public void useMsg(Message message, Channel channel)  {
        String msg = new String(message.getBody());
        try{
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            log.info("消费消息:{}",msg);
        }catch (Exception e){
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                log.info("拒绝消息:{},重新入队",msg);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}