LOADING

加载过慢请开启缓存 浏览器默认开启

RabbitMQ笔记

RabbitMQ笔记

1.MQ概念

1.1 什么是MQ?

MQ(message queue)是消息队列

1.2

MQ具有三大功能:

  1. 流量削峰:如果没有MQ,高并发的时候,系统直接宕机,有了MQ之后,通过排队,虽然时间开销更多了,但是不会导致系统宕机

  2. 应用解耦:消息的发送方和接收方不需要彼此联系,所以不会相互影响,即解耦

  3. 异步处理:对于不需要等待返回结果的操作,如果是同步,则会等待,这段时间就会浪费,而通过异步处理,就可以利用这段时间做其他事,从而减少响应时间,处理完后,被调用者把数据发给MQ,MQ再发回给调用者。比如发送短信、邮件、赠送积分等就可以异步执行

    image-20230430184707808

image-20230430185647956

MQ的分类(主流):

  1. kafka吞吐量高,一般用在日志采集方面
  2. RocketMQ阿里的,能抗住阿里双十一的流量,电商,金融级,大公司
  3. RabbitMQ吞吐量比前两个低,并发性能高,中小公司主流

2.RabbitMQ

2.1 概念:消息中间件,接收、存储和转发消息数据

四大核心概念:

  1. 生产者:发信息
  2. 交换机:接收消息,再推送到队列中
  3. 队列:本质是大的消息缓冲区,很多生产者可以发消息到队列中,很多消费者可以尝试从队列中接收数据
  4. 消费者:接收消息

image-20230430194716630

2.2 核心部分:七大模式

  1. 简单模式,“hello world!”
  2. 工作模式,Work queues
  3. 发布订阅模式,Publish/Subscribe
  4. 路由模式,Routing
  5. 主题模式,Topics
  6. RPC, Request/reply pattern
  7. 发布确认模式,Publisher Confirms

image-20230430195441894

Broker:RabbitMQ Server就是Message Broker

Connection:TCP连接

Channel极大减少了操作系统建立TCP connection的开销,相当于数据库的连接池

Binding:Exchange和Queue之间的虚拟连接

Virtual host:出于多租户和安全因素设计,Broker可以有多个vhost,每个vost可以有多个交换机

2.3 Hello World

1生产者对1消费者

2.4 Work Queue

工作队列(又称任务队列),主要思想是:避免立即执行资源密集型任务,而是将耗时的大量任务分配给多个工作线程

image-20230502172407328

1.轮询分发消息

一个队列对应多个工作线程(消费者)

这些工作线程是轮着处理消息(轮询)

消息只会被处理一次

2.消息应答

如果工作线程在执行一个长任务,但是还没完成的时候,工作线程挂掉了,而且队列删掉了消息,那么这个消息就没被完成而且丢失了,这是我们不想要的。

为了保证其不被丢失,就要采用消息应答

  1. 自动应答(还是可能存在消息丢失,但是更高效)(autoAck默认为false):

    一旦消费者收到消息,就进行应答,如果后续出了问题,消息已经被删除了

  2. 手动应答(安全):

    手动进行应答

    1. Channel.basicAck(手动确认收到消息)

    2. Channel.basicNack(否定确认)

    3. Channel.basicReject(否定确认)

      Reject比Nack少一个

      参数multiple(前两个都有此参数)

      批量应答,可以减少网络拥堵,但是可能存在消息丢失,一般false

      true to reject all messages up to and including the supplied delivery tag;

      false to reject just the supplied delivery tag.

image-20230502175733276

  1. 消息自动重新入队

如果一个消费者死亡(它的通道被关闭,连接被关闭,或者TCP连接丢失)而没有发送ack, RabbitMQ将理解消息没有被完全处理,并将重新将其排队。如果同时有其他消费者在线,它将迅速将其重新交付给另一个消费者。这样就可以确保没有信息丢失,即使工作人员偶尔会死亡。

在消费者交付确认时强制执行超时(默认为30分钟)。这有助于检测从不确认交付的有bug的(卡住的)消费者。您可以根据需要增加这个超时时间

3.遇到的问题

  1. 先启动消费者报错NOT_FOUND - no queue 'XXX',因为在这个程序里是在生产者创建的queue,在消费者里面使用queue,所以应该先启动生产者。之所以之前测试的时候先启动消费者没报错(实际是启动过生产者了),因为autoDelete设置为了false,所以queue仍然存在,就没报错

  2. 关于autoDelete设置为false,queue已存在,此时我再设置为true,会报错received ‘true‘ but current is ‘false‘,其参数durable设置不匹配,再次创建queue的时候也会报错,

    解决方案一:channel.queueDelete(TASK_QUEUE_NAME);删了重创,也可登陆网站,图形化界面来删除

    解决方案二:改为和存在的queue的参数一致

4.RabbitMQ持久化

概念:为了保障当RabbitMQ服务停掉以后消息生产者发送过来的消息不丢失

​ 默认情况下,RabbitMQ崩溃后,会忽视队列和消息,为了不丢失消息,需要将队列和消息都标记为持久化

队列持久化创建队列channel.queueDeclare)时,把durable参数设置为true。这样关机,重启后队列不会丢失

消息持久化发布时持久化,[但仍可能丢失(正在存储时,还没存完,只是缓存了)],

​ 实现方法:把channel.basicPublish的props的参数设置为

MessageProperties.PERSISTENT_TEXT_PLAIN

不公平分发:能者多劳,处理速度快的分配多一些任务,使得处理快的不至于空闲。

​ 设置方法:消费者方的channel.basicQos(x); x为非零

​ 不设置,或者设置成0,就是公平分发(为0,则可以无限堆积消息,因此一定可以一个一个地发消息,所以公平)

​ 其实不公平分发,实质上也是轮询,一个一个发,但是当缓冲区的堆积消息数量超过了 预取值(prefetchCount),就会找处理完了的消费者进行处理,因此会产生能者多劳现象

​ channel.basicQos(1); //basicQos(int prefetchCount)

​ prefetchCount:处理消息最大的数量。举个例子,如果输入1,那如果接收一个消息,但是没有应 答,则客户端不会收到下一个消息,消息只会在队列中阻塞。如果输入3,那么可以最多有3个消息 不应答,如果到达了3个,则发送端发给这个接收方得消息只会在队列中,而接收方不会有接收到消 息的事件产生。总结说,就是在下一次发送应答消息前,客户端可以收到的消息最大数量。

2.5 发布确认 Publisher Confirms

目的是:

防止消息丢失

前提:

1.队列必须持久化2.消息必须持久化

​ 必须再加上3.发布确认

​ 三个都要满足,消息才绝对不会丢失,不然消息可能丢失

开启方法:

​ channel.confirmSelect();

2.5.1 发布确认原理

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上发布的消息都会有一个唯一 的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送确认给生产者(包括唯一ID),

​ 这就使得生产者知道消息正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在消息写入 磁盘后发出

​ confirm模式最大的好处在于它是异步的,一旦发布一条消息,生产者可以在等待返回确认的同时继续发送下 一条消息,当消息最终得到确认后,生产者便可通过回调来处理该确认消息,如果RabbitMQ因为自身内部错 误导致消息丢失,就会发送一条nack消息,生产者同样通过回调来处理nack消息

2.5.2 发布确认策略

​ 分为3种:单个批量异步确认发布

单个确认发布

同步的,发一条就会等待确认信息

​ 实现:每个消息发布后都调用channel.waitForConfirms()或者waitForConfirmsOrDie();

​ 区别:

​ 如果任何消息被删除,waitForConfirmsOrDie将抛出IOException。

​ waitForConfirms()有返回值boolean,waitForConfirmsOrDie没有

​ 理解:

​ 可以将waitForConfirmsOrDie看作是一个同步helper,它依赖于底层的异步通知。

​ 它只是进行了阻塞

​ 注意:信道没有设置成confirm模式,则会throws an IllegalStateException.

批量确认发布

​ 实现:每隔批量大小的消息发布后,再进行发布确认(channel.waitForConfirms();)

​ 优点:速度比单个确认发布快

​ 缺点:但是如果一个批次某一个或一些出了问题,不能知道具体是哪个消息

异步确认发布

异步,一直发消息,对于确认信息,只需要靠broker收到后再发回来,这时候再进行回调,

​ 这样就既速度快,也能确切知道出问题的消息是哪个

//消息确认成功回调
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
    System.out.println("确认的消息" + deliveryTag);
};
//消息确认失败回调
/**
* 参数1 消息的标记
* 参数2 是否批量确认
*/
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
    System.out.println("未确认的消息" + deliveryTag);
};
//监听器
channel.addConfirmListener(ackCallback,nackCallback);

///////////////////
//详细版:
/**
* 线程安全有序的一个哈希表 适用于高并发的情况下
* 1.轻松的将序号和消息进行关联
* 2.轻松批量删除 by 序号
* 3.支持高并发(多线程)
*/
ConcurrentSkipListMap<Long,String> confirms = new ConcurrentSkipListMap<>();

//消息确认成功回调
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
            System.out.println("确认的消息" + deliveryTag);
            //根据tag删除确认成功的消息
            if(multiple){
                //因为是批量的,所以删除的不止一个
                //headMap,小于该tag的所有,但是当前这个也应该要删,所以要包括(<=),也就是第二个参数设置为true
                ConcurrentNavigableMap<Long, String> map =
                        confirms.headMap(deliveryTag,true);
                map.clear();
            }else{
                confirms.remove(deliveryTag);
            }
            System.out.println(confirms.size());
        };
        //消息确认失败回调
        /**
         * 参数1 消息的标记
         * 参数2 是否批量确认
         */
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            System.out.println("未确认的消息" + confirms.get(deliveryTag));
        };
        
        for (int i = 1; i <= MESSAGE_COUNT; i++) {
            String msg = i + "";
            //存消息
            confirms.put(channel.getNextPublishSeqNo(),msg);
            //发布在存之后,不然删不完
            channel.basicPublish("",s,null,msg.getBytes());
        }

​ 如果不写监听器,只有channel.confirmSelect();

​ 也就是默认之开启发布确认,肯定是一直在发消息,(确没确认就不清楚了)???

image-20230503222451013

2.5.2 确认失败的消息再次发布

​ 从相应的回调中重新发布nack-ed消息可能很诱人,但这应该避免,因为确认回调在I/O线程中被调度,而信 道不应该在这里执行发布操作。更好的解决方案是在内存队列中对消息进行排队,该队列由发布线程轮询。

​ 像ConcurrentLinkedQueue这样的类可以很好地在确认回调和发布线程之间传输消息。

​ 实现???

2.6 交换机

2.6.1 Exchanges概念

​ 在工作队列模式中,每个任务都只交给一个消费者,因为是通过队列

​ 而将消息传达给多个消费者,这种模式叫做“发布/订阅”

​ 现在通过交换机,使用多个队列,每个队列还是保证消息只消费一次

image-20230504114458139

​ RabbitMQ消息传递模型的核心思想是:

生产者生产的消息不直接发送到队列,实际上,通常生产者甚至不知道 这些消息传递到了哪些队列

​ 相反,生产者只能将消息发到交换机,交换机,一方面接收生产者的消息,一方面把它们推入队列

​ 由交换机的类型,决定如何处理收到的消息

2.6.2 类型以及默认交换机

​ 直接(direct)、主题(topic)、标题(headers),扇出(fanout)(对应的就是发布/订阅)

​ 之前使用的是默认交换机(用的“”空串)

​ 消息能路由发到队列中,是由routingKey(bindingkey)绑定的key指定的,如果它存在

channel.basicPublish("",TASK_QUEUE_NAME,null,msg.getBytes(StandardCharsets.UTF_8));

image-20230504120301446

2.6.3 临时队列

​ 如果不是durable(持久化的),那么就是临时队列

​ 每当连接到Rabbit时,我们都需要一个全新的空队列,为此我们可创建一个随机名称的队列,或者服务器帮 我们选择一个随机队列名称。其次一旦我们断开了消费者连接,队列将被自动删除

​ 创建方式:

//服务器帮我们选择一个随机队列名称
String queueName = channel.queueDeclare().getQueue();
//或者之前那种(durable声明为false)
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);

image-20230504121350757

2.6.4 绑定

​ binding就是exchange 和 queue 的桥梁

​ 如下图中X与Q1、Q2进行了绑定

image-20230505102311074

发消息到交换机(exchange) --> 根据路由规则(Routing Key)--> 找到绑定的队列(queue)

通过routing key确定使用哪个队列

2.6.5 Fanout(发布/订阅模式)

​ 接收到的所有消息广播到它知道的所有队列

image-20230505103500745

//声明交换机(在生产者和消费者其中之一声明,或者都声明,都是一样的)
//默认不自动删除,不持久化
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//发消息,这里的第二个参数(routing key可以随便写,因为是广播,所以不需要它来区分队列)
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
//同理, 消费者的队列绑定交换机时的routing key也能随便写
channel.queueBind(queue,EXCHANGE_NAME,"");

如果只在生产者声明交换机,那么必须先启动生产者,保证存在该交换机,这样消费者的队列才能绑定交换机

2.6.6 Direct exchange(路由模式)

​ 与上面不同,这个是取决于routing key,而不是与它无关

​ 这个模式就是通过routing key来找到接收消息的部分队列

//这里选择DIRECT模式
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//这里的routing key就是用来区分,对该routing key声明的所有队列会收到这个消息
channel.basicPublish(EXCHANGE_NAME,"warning",null,msg.getBytes());

2.6.7 Topic(主题模式)

为什么使用topic:

​ Direct路由模式,在某些情况下具有局限性,比如我们想要对于两种routing key来区别部分队列

​ 而Direct中,只能写一个routing key,所以需要用到topic主题模式

使用方法:

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,false,false,null);

routing key举例:

    * "stock.usd.nyse"
    * "nyse.vmw"
    * "quick.orange.rabbit"

routing key的注意事项:

  • * 代替一个任意单词.
  • # 代替0个或多个单词.

image-20230506185452533

对于routing key

​ 如果是# 相当于fanout,广播,群发

​ 如果**没有#和*出现相当于direct**,针对一个routing key

2.7 死信队列

概念:死掉的消息,无法被消费的消息

应用场景:保证订单业务消息数据不丢失,要使用RabbitMQ的死信队列机制,

​ 当消息消费发生异常时,将消息投入死信队列,还有比如下单没有在指定时间支付时自动生效

目前有两种方式可以设置消息的TTL。

第一种是通过队列的属性设置,队列中的所有消息都有相同的过期时间。

第二种方法是对消息本身进行单独设置,每条消息的TTL可以不同。

如果两种方法同时设置,则TTL以两者之间较小的那个数值为准。

消息在队列中的生存时间一旦超过设置的TTL值时,就会变成“死信”(Dead Message),消费者将无法再收到该消息(不是绝对的)。

image-20230513175815357

死信的来源

  1. 消息TTL过期(time to live)
  2. 队列达到最大长度(无法再添加数据)
  3. 消息被拒绝(basic.reject或basic.nack)并且requeue = false
//不设置TTL,默认情况下,消息是不会过期的
//TTL
AMQP.BasicProperties properties =
    new AMQP.BasicProperties()
    .builder().expiration("10000").build();
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,msg.getBytes());
//最大长度
arguments.put("x-max-length",6);
//拒绝
//不能自动应答
//第二个参数requeue
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);

2.8 延迟队列

2.8.1 概念

延时后处理,存放需要在指定时间被处理元素的队列

​ 相当于死信队列里的消息TTL过期,因为对于死信队列的消费者而言,消息到他那里的延迟时间正好是TTL

使用场景

​ 订单未支付,十分钟后自动取消

与定时器相比

​ 适用于数据量较大,时效性较强

image-20230509163902992

//配置类代码
@Configuration
public class TtlQueueConfig {

    //普通交换机名称
    public static final String X_EXCHANGE = "X";
    //死信交换机名称
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //普通队列名称
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信队列名称
    public static final String DEAD_LETTER_QUEUE = "QD";

    //声明
    //x交换机
    @Bean
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }
    //Y交换机
    @Bean
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }
    //队列QA
    @Bean
    public Queue queueA(){
        return QueueBuilder
                .durable(QUEUE_A)
                .ttl(10_000)
                .deadLetterExchange(Y_DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey("YD")
                .build();
    }
    //队列QB
    @Bean
    public Queue queueB(){
        return QueueBuilder
                .durable(QUEUE_B)
                .ttl(40_000)
                .deadLetterExchange(Y_DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey("YD")
                .build();
    }
    //死信队列
    @Bean
    public Queue queueD(){
        return QueueBuilder
                .durable(DEAD_LETTER_QUEUE)
                .build();
    }
    //绑定
    @Bean
    public Binding queueABindingX(Queue queueA,
                                  DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    @Bean
    public Binding queueBBindingX(Queue queueB,
                                  DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    @Bean
    public Binding queueDBindingY(Queue queueD,
                                  DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }

}

2.8.2 改进1

​ 这样存在一个缺点,就是时间由队列设置,如果有一个新的需求时间,那么就需要再加一个队列

​ 解决此问题,可以再加一个队列,不设置队列的TTL时间,通过消息本身来设置

image-20230511183920433

//配置类
//队列QC
@Bean
public Queue queueC(){
    return QueueBuilder
        .durable(QUEUE_C)
        .deadLetterExchange(Y_DEAD_LETTER_EXCHANGE)
        .deadLetterRoutingKey("YD")
        .build();
}
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                              @Qualifier("xExchange") DirectExchange xExchange){
    return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
//生产者设置消息过期时间
@GetMapping("/sendExpirationMsg/{message}/{ttl}")
public void sendMsg(@PathVariable("message") String message,
                    @PathVariable("ttl") String ttl){
    log.info("当前时间:{},发送一条时长:{}毫秒的TTL信息给队列QC:{}", new Date(),ttl,message);
    rabbitTemplate.convertAndSend("X", "XC", "消息来自TTL的10秒队列:" + message,
                                  msg -> {
                                      msg.getMessageProperties().setExpiration(ttl);
                                      return msg;
                                  });
}

2.8.3 存在的问题

image-20230511190838629

第一条20秒过期,第二条(2秒过期)紧跟着第一条发出,按理说第二条应该是2秒后收到,

结果却是20秒后

原因

RabbitMQ只会检查第一个消息是否过期(队列会排队,先进先出),如果第一个延时很长,而第二个延时很短,那么第二个并不会优先执行

2.8.4 解决方法

插件:

image-20230511220856229

image-20230511220925806

image-20230511221003742

image-20230511220811596

//配置类
@Configuration
public class DelayedQueueConfig {

    //交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    //队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    //routing key
    public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";

    //插件的交换机类型
    public static final String DELAYED_EXCHANGE_TYPE = "x-delayed-message";

    //交换机
    //自定义交换机
    @Bean
    public CustomExchange delayedExchange(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type","direct");

        return new CustomExchange(DELAYED_EXCHANGE_NAME,DELAYED_EXCHANGE_TYPE,true,false,arguments);
    }

    //队列
    @Bean
    public Queue delayedQueue(){
        return QueueBuilder.nonDurable(DELAYED_QUEUE_NAME).build();
    }

    //绑定
    @Bean
    public Binding delayedQueueBingDelayedExchange(Queue delayedQueue,
                                                   CustomExchange delayedExchange){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }

}
//生产者
//基于插件的延迟消息
@GetMapping("/sendDelayedMsg/{message}/{delayedTime}")
public void sendMsg(@PathVariable("message") String message,
                    @PathVariable("delayedTime") Integer delayedTime){
    log.info("当前时间:{},发送一条时长:{}毫秒的延迟信息给延迟队列delayed.queue:{}", new Date(),delayedTime,message);
    rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
                                  DelayedQueueConfig.DELAYED_ROUTING_KEY,
                                  "消息来自插件延迟消息:" + message,
                                  msg -> {
                                      msg.getMessageProperties().setDelay(delayedTime);
                                      return msg;
                                  });
}
//消费者
@Slf4j
@Component
public class DelayedQueueConsumer {

    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayQueue(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到基于插件的延迟队列消息:{}", new Date(),msg);
    }

}

2.8.5 使用总结

​ 一般来说,如果需要延时处理,使用插件的方式来实现,而不是死信队列的TTL来实现

​ 因为TTL的方式可能存在错误的执行

​ 基于插件的方式,其延迟是基于交换机(延时后发送到队列),而不再是队列

​ 使用插件时,因为没有用到TTL,所以发消息时是setDelay(延迟时间)

​ 而不是原来的setExpiration(过期时间)

2.9 延迟队列总结

​ 用于延时处理,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性:

消息可靠发送消息可靠投递死信队列保障消息至少被消费一次

​ 以及未被正确处理的消息不会被丢弃。另外,通过RabbitMQ集群的特性,可以很好地解决单点故障问题

​ ,不会因为单个节点挂掉导致延时队列不可用或者消息丢失

​ 延时队列的其他选择:Java的DelayQueue,利用Redis的zset,利用Quartz或者利用Kafka的时间轮

2.10 发布确认(springboot版)

使用原因

交换机或者队列出问题,消息会丢失,因此需要确认

2.10.1 发布确认

image-20230513160701294

yml配置(必须)

spring:
  rabbitmq:
    # 确保生产者的消息,是否到达交换机
    publisher-confirm-type: correlated

这里的simple会使用同步方式,而且可能会关闭channel信道

image-20230513175011804

//生产者
@GetMapping("/sendMsg/{message}")
public void sendMessage(@PathVariable("message") String message){
    //这里参数是ID,为了后面ConfirmCallback的confirm方法的参数CorrelationData能取到ID
    CorrelationData correlationData1 = new CorrelationData("666");
    //这里交换机和routingKey的名称改成不存在的,可以模拟交换机或队列出问题
    rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME
                                  ,ConfirmConfig.CONFIRM_ROUTING_KEY
                                  ,message
                                  ,correlationData1);
    log.info("发送消息内容为:{}",message);
}
//回调
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;
    
    //该注解保证先完成注入rabbitTemplate,再初始化(该方法),否则rabbitTemplate为null
    @PostConstruct
    public void init(){
        //注入
        //为了把rabbitTemplate这个实例里面的Callback,替换成自己写的Callback,而不是用原来的
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     *交换机确认回调方法
     *生产者的消息,是否到达交换机
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //前面生产者设置了CorrelationData,发送过来,这里的才有
        String id = correlationData == null ? "" : correlationData.getId();
        if(ack){
            log.info("交换机收到ID为:{}的消息",id);
        }else{
            log.info("交换机没有收到ID为:{}的消息,原因是:{}",id,cause);
        }
    }

    //交换机的消息,未到达指定队列时
    //进行回退
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("消息:{},被交换机{}退回,退回的原因{},路由的key:{}",
                new String(returned.getMessage().getBody()),
                returned.getExchange(),
                returned.getReplyText(),
                returned.getRoutingKey());
    }
}

2.10.2 回退消息

yml配置(必须)

spring:
  rabbitmq:
    # 确保交换机的消息,没有被路由到队列时,不丢弃消息
    publisher-returns: true

回退消息的代码在上面的returnedMessage

image-20230513171336562

image-20230513171346149

2.10.3 备份交换机

image-20230513175921759

//交换机
//普通交换机要指明发送到,哪个备份交换机
@Bean
public DirectExchange confirmExchange(){
    return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
        //默认为true,可不写
        .durable(true)
        //指定备份交换机
        .alternate(BACKUP_EXCHANGE_NAME)
        .build();
}

注意:

    1. 需要删除之前创建的交换机
    1. **回退消息**(publisher-returns: true或mandatory)和**备份交换机**==同时使用==,**备份交换机优先级更高**

2.11 RabbitMQ其他知识点

2.11.1 幂等性

​ 就是消息重复消费的问题,

​ 可能由于返回ack时网络中断,MQ没收到确认信息,重发给其他消费者或者

​ 网络重连后再次发送给该消费者,但实际上该消费者已经消费了该消息,导致了重复消费

解决重复消费方法

​ 使用全局ID或者写一个唯一标识(如时间戳或者UUID或者订单消费MQ中的消息的ID或者按自己的规则生成一个全局唯一的ID)每次消费时先用该ID判断该消息是否已经消费过

image-20230513200009691

2.11.2 优先级队列

概念

​ 部分消息优先处理

应用

订单催付功能

实现

image-20230513212810845

image-20230513212827970

image-20230513212855220

但是一般优先级设置比较小,因为这样速度快一些

2.11.3 惰性队列

​ 保存在磁盘

image-20230514000401527

使用场景

消费者下线、宕机或者因为维护而关闭,导致的消息的大量堆积时采用惰性队列

​ 如果都默认积压在内存中,会占用巨大的内存,造成浪费

​ 而使用惰性队列,消息进入MQ时,存放在磁盘里,消费时从磁盘读取到内存,所以速度慢,一般情况不用

使用

image-20230514001351682

image-20230514001452103

2.12 RabbitMQ集群

  1. 搭建集群

    image-20230514004647957

  2. 镜像队列

image-20230514003634054

​ 默认一个队列只存在一个服务器上,所以需要备份

image-20230514003706813

镜像2份,1、2、3台服务器

假设是1的队列,3镜像,现在是1,3都有该队列

1宕机了,会导致2里面继续镜像备份3里面的队列,现在是3、2都有该队列

因此即使集群最后只剩下一台服务器,也能保存有消息

  1. 使用Haproxy+Keepalive实现高可用

生产者连接的只是一个MQ,不知道有集群,代码里面写死了ip,无法改变ip

所以要使用其他的方法

比如Haproxy或者nginx

  1. Federation Exchange

使用原因

​ 服务器在不同地区,为了降低延迟,不同地区的用户,访问离自己近的服务器,使延迟低

​ 但是当涉及到两个不同地区的服务器交互,就需要同步信息,使数据同步

镜像队列指的是集群内备份

联邦队列指的是集群之间远程数据同步

  1. Shovel

同步数据(源端 --> 目的端)

image-20230514145705049