RabbitMQ 消息可靠投递-消息落库

概述

本文源码 https://github.com/nixuechao/rabbit-reliable-delivery 可直接运行并且有详细的注释和readMe

RabbitMQ,保证消息不丢失的唯一方法是使用事务,使用事务会将吞吐量降低250倍,代价太大,所以引入了消费者确认模式机制:

ConfirmCallback 异步,消息确认回调,告知放松成功或失败.

ReturnCallback 异步,消息回退回调,告知消息没有可用队列,设置了强制标志 setMandatory(true) 才会有此回调

ConfirmCallback

1.对于可路由消息,当所有队列都接受消息时.对于路由到持久队列的持久性消息,这意味着持久化到磁盘.对于镜像队列,这意味着所有镜像都已接受该消息.
2.对于不可路由的消息, 当获取到对应消息的队列列表为空时就会返回(成功).
3.只有在负责队列的Erlang进程中发生内部错误时,才会传递失败(如连接断开).
4.从消息发送到收到确认,中间会有一定时间,特别是如果队列和消息都需要持久化的时候,延迟会更高,而且不保证收到确认的顺序与发送的顺序一样,所以我们不能发送一条,等待确认后再发送第二条.

ReturnCallback

  1. 在设置了强制标志后,只有消息不可路由时才会有这个回调
  2. 不可路由即发送时指定的交换机和路由Key不能找到对列
  3. ReturnCallback 总是在ReturnCallback 之前调用

上面的内容大部分是官方文档的翻译,总结一下就是 

1.确认为成功的消息不一定成功(有可能是无法路由的消息)
2.确认为失败的消息不一定失败(消息投递成功,在回调时连接断开,会收到失败的回调)

策略

成功的不一定成功,失败的不一定失败,这不就是不可靠么,网上找到了以下策略(消息落库方案)

  1. 收到 ConfirmCallback 的失败确认时保存对应消息, 定时任务每隔一段时间查找保存下来的失败消息进行重发. 收到ReturnCallback 时,保存对应消息进行人工处理

  2. 发送消息前,保存消息,收到成功确认时删除对应消息, 定时任务每隔一段时间查找保存下来消息进行重发. 收到ReturnCallback 时,保存对应消息进行人工处理

  3. 与2类似,只不过把成功的消息和失败的消息设置为不同的状态,然后定时补偿

..如果有好的方案留言交流啊QAQ

Coding

发现上面的这些策略基本都在这些地方搞事情

  • 消息发送前
  • 收到确认失败的回调
  • 收到确认成功的回调
  • 收到消息无法路由的回调
  • 定时任务查找需要重发的消息

在上面这些点上都加入一个方法,把这些方法构造成一个类,称之为一个策略, 然后自定义一个继承自 RabbitTemplate 的 ReliableRabbitTemplate,加入定义的策略,那么只要是通过ReliableRabbitTemplate发送的消息就都是可靠投递的了(建议配合本文源码食用)

定义策略:

new MessageProcess.Builder()
                .preSend(messageInfo -> LOGGER.debug("第一次发送前.."))

                .messageAck(messageInfo -> LOGGER.debug("消息投递成功.."))

                .messageNack(messageInfo -> LOGGER.debug("消息投递失败"))

                .pickResendMessages(() -> {
                    LOGGER.debug("定时任务: 查找需要重发的消息..");
                    return null;
                })

                .preResend(id -> LOGGER.debug("消息重发前.."))

                .messageRouteFail((messageProperties, messageBody, exchange, routingKey) ->
                        LOGGER.debug("路由失败的消息"))

                .build();
    }

只要在对应LOG的地方加入自己的逻辑就好了

下面是1,2策略的实现,想用哪个策略在哪个方法上@Bean就行

@Bean
public MessageProcess myStrategy1() {
    return new MessageProcess.Builder()
        //消息第一次发送前
        .preSend(messageInfo -> {

        })

        //消息投递成功(包含路由失败的)
        .messageAck(messageInfo -> {
            messageService.deleteIfExist(messageInfo.getId());
        })

        //消息投递失败
        .messageNack(messageInfo -> {
            //若消息不存在就保存
            messageService.saveIfNotExist(messageInfo);
        })

        //提供需要重发的消息
        .pickResendMessages(() -> {
            //查找需要重发的消息,满足
            // 1.重发次数小于3
            // 2.上一次发送时间为3秒钟之前的(confirmCallback是异步的)
            // 3.按上一次发送时间顺序排序(先发等待久的)
            return messageService.selectNeedResendMessage();
        })

        //消息重发前
        .preResend(id -> {
            //消息重发数+1,修改上一次发送时间为当前时间
            messageService.updateResendTimesAndLastSendTime(id);
        })

        //消息路由失败
        .messageRouteFail((messageProperties, messageBody, exchange, routingKey) -> {
            //保存路由失败的消息
            messageService.saveUnRoutableMessage(messageProperties, messageBody, exchange, routingKey);
        })

        .build();
}



public MessageProcess myStrategy2() {
    return new MessageProcess.Builder()
        //消息第一次发送前
        .preSend(messageInfo -> {
            //保存消息
            messageService.save(messageInfo);
        })

        //消息投递成功(包含路由失败的)
        .messageAck(messageInfo -> {
            //删除消息
            messageService.deleteById(messageInfo.getId());
        })

        //消息投递失败
        .messageNack(messageInfo -> {
            //添加失败原因
            messageService.addFailReason(messageInfo.getId(), messageInfo.getFailReason());
        })

        //提供需要重发的消息
        .pickResendMessages(() -> {
            //查找需要重发的消息,满足
            // 1.重发次数小于3
            // 2.上一次发送时间为3秒钟之前的(confirmCallback是异步的)
            // 3.按上一次发送时间顺序排序(先发等待久的)
            return messageService.selectNeedResendMessage();
        })

        //消息重发前
        .preResend(id -> {
            //消息重发数+1,修改上一次发送时间为当前时间
            messageService.updateResendTimesAndLastSendTime(id);
        })

        //消息路由失败
        .messageRouteFail((messageProperties, messageBody, exchange, routingKey) -> {
            //保存路由失败的消息
            messageService.saveUnRoutableMessage(messageProperties, messageBody, exchange, routingKey);
        })

        .build();
}

在1,2策略下30个线程,每个线程发送3000条消息,发送过程中手动断开连接好几次,最终都能100%收到消息(都有重复发送的问题)…断连时可以不用硬核拔网线,登上RabbitMQ Management在Connections里强制断开就行

转载请注明出处


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!