前端开发入门到精通的在线学习网站

网站首页 > 资源文章 正文

RabbitMQ的一些实践总结(kafka和rabbitmq的区别)

qiguaw 2024-09-26 16:36:41 资源文章 24 ℃ 0 评论

MQ使用场景

  1. 流程异步化
  2. 业务解耦
  3. 流量削峰填谷

RabbitMQ核心概念

先来看一眼rabbitMQ的架构图:

  1. broker:接受客户端连接,实现AMQP协议。
  2. connection:和具体broker建立网络连接。
  3. channel:逻辑概念,几乎所有操作都在channel中进行,channel是消息读写的通道,一个connection可以建立多个channel。
  4. message:应用程序和broker之间传递的数据,由properties和body组成。properties可以对消息进行修饰,比如消息的TTL,correlationId等高级特性;body是消息实体内容。
  5. Virtual host:虚拟主机,用于逻辑隔离,最上层消息的路由。一个Virtual host可以若干个Exchange和Queue,同一个Virtual host不能有同名的Exchange或Queue。
  6. Exchange:交换器,接受消息,根据路由键转发消息到绑定的队列上。
  7. binding:Exchange和Queue之间的绑定关系,告诉exchange把消息路由到哪个队列
  8. routing key:exchange结合routing key来确定如何路由一条消息。
  9. Queue:消息队列,用来存放消息的队列。

消息确认机制

结合上图中消息发送的链路, 我们可以看出一条消息发出去后可能在哪些环节出问题:

  1. Producer发出后由于网络故障Broker没有收到
  2. Producer发出后Broker宕机导致消息丢失
  3. Broker发送给Consumer后由于网络故障Consumer没有收到
  4. Broker发送给Consumer后Consumer宕机导致没有处理

RabbitMQ提供了发布者确认和消费者确认机制来解决这些问题,发布者确认是指Broker收到Producer的消息后,会给Producer发送一条确认消息,如果消息的durable属性为true, Broker会等消息成功持久化到磁盘后再发送publisher-confirm,发布者确认是异步的,不会影响发布的性能;消费者确认是指消费者收到消息后需要发送一条确认消息给
broker(可以开启自动确认模式,但可能丢消息),broker如果没有收到consumer的确认,会把消息重发.

生产者增加确认机制非常简单,channel开启confirm模式,然后增加监听, 如果使用的spring-rabbit框架, 把connection-factory的publisher-confirms配置为true,
并在RabbitTemplate配置一个confirm-callback的Listener:

  <rabbit:connection-factory id="rabbitConnectFactory"  publisher-confirms="true"/>
  <bean id="confirmCallback" class="com.xxx.MessageConfirmCallback"/>
  <rabbit:template id="rabbitTemplate"  connection-factory="rabbitConnectFactory" confirm-callback="confirmCallback"/>

  public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

    }
}

有一些场景, 我们需要确保消息被正确的路由到队列:

  1. 如果消息发送到交换器后找不到可路由的队列,这时候消息会被丢弃, publisher-confirm返回的ack为true,相当于消息石沉大海了.

这种情况需要发送的时候把mandatory设置true,并且设置一个return-callback,当RabbitMQ找不到可路由的队列时返回publish-return告诉你无法路由的原因,可以记录日志或者重试.
如果用的spring-rabbit框架,把rabbit:template的mandatory属性设置为true,并且配置return-callback,
只配置return-callback没有配置mandatory时callback不会生效

  <bean id="returnCallback" class="com.xxx.MessageReturnCallback">
    <property name="rabbitTemplate" ref="rabbitTemplate"/>
  </bean>

  <rabbit:template id="rabbitTemplate" mandatory="true"
                   connection-factory="rabbitConnectFactory" confirm-callback="confirmCallback"
                   return-callback="returnCallback" message-converter="messageConverter"/>

  public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }
}
  1. 消息被拒绝时进入死信队列

设想有这种场景, Consumer侧接收消息的Listener抛出了未捕获的异常, spring-rabbit的默认处理策略是返回Basic.Reject给broker,并且设置requeue=true,
消息会被重新入队列再次被消费,如果这种异常是无法恢复的(比如出现了NPE),消息会一直在Broker和消费者之间
无限投递,
经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部,消费者会立刻消费这条消息,业务处理再抛出异常,消息再重新入队,如此反复进行

这可能不是我们期望的逻辑,比较好的方式是把消息放入死信队列,
后面修复程序后再对死信队列中的消息进行消费.

消息变为死信的几种情况:

  1. 消息被拒绝(basic.reject/basic.nack)同时requeue=false(不重回队列)
  2. TTL过期
  3. 队列达到最大长度

死信队列如何使用,以spring-rabbit为例:
rabbit:listener-container 把requeue-rejected属性设置为false,表示消息拒绝时不再重新入队列, exchange配置x-dead-letter-exchange(在后台或者代码中给
死信交换器绑定好死信队列),

<rabbit:queue id="rabbit.queue.test" name="rabbit.queue.test">
  <rabbit:queue-arguments>
    <entry key="x-dead-letter-exchange" value="dlx_exchange"/>
  </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:listener-container acknowledge="auto" requeue-rejected="false">
</rabbit:listener-container> 

配置好之后如果消费者出现未捕获的异常,或者消费者手动抛出AmqpRejectAndDontRequeueException,消息会进入到死信队列

  1. 消息进死信队列前重试几次

考虑这样一种场景, 收到rabbitMQ消息后调用RPC接口请求数据超时了,可能只是网络抖动或者服务端突然响应慢了一下导致的, 这种情况下可以在消费端结合spring-retry
框架进行重试, 需要特别注意的是spring-retry重试是在当前接收线程处理的,重试的次数和总时长不应太长,否则如果重试一直失败会严重影响性能,spring-rabbit
和spring-retry结合起来也很方便,配置如下:

  <bean id="messageRecoverer" class="com.xxxx.rabbit.MessageRecover" />

  <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
      <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
        <property name="initialInterval" value="500" />
        <property name="maxInterval" value="3000" />
      </bean>
    </property>
    <property name="retryPolicy">
      <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
        <property name="maxAttempts" value="3" />
      </bean>
    </property>
  </bean>

  <bean id="retryInterceptorFactoryBean" class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
    <property name="messageRecoverer" ref="messageRecoverer" />
    <property name="retryOperations" ref="retryTemplate" />
  </bean>


  <bean id="someListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="prefetchCount" value="10" />
    <property name="defaultRequeueRejected" value="false" />
    <property name="acknowledgeMode" value="AUTO" />
    <property name="queues" ref="rabbit.queue.test" />
    <property name="messageListener" ref="commonMessageListener" />
    <property name="adviceChain">
      <array>
        <ref bean="retryInterceptorFactoryBean" />
      </array>
    </property>
  </bean>

//重试多少次都失败后执行的Recover逻辑, 把消息持久化到磁盘或数据库,报警,或者抛出AmqpRejectAndDontRequeueException进死信队列
public class MessageRecover implements MessageRecoverer {

  @Override
  public void recover(Message message, Throwable throwable) {
    //消息持久化逻辑
    //报警
    logger.error("xxx");
    throw new AmqpRejectAndDontRequeueException(throwable);
  }

高可用

为了防止丢消息,交换器,队列和消息都应该设置成持久化,具体配置就是把durable设置为true.

我们线上的RabbitMQ采用的是集群模式,集群模式下所有节点会存储交换器和绑定关系以及队列的名字, 但是队列只存储在某一个节点,如果某个节点出现故障, 该节点上的
队列将会不可用,公司6月4号出现过一次交换机故障导致RabbitMQ集群脑裂的场景, 就算网络恢复了脑裂的状态还是一直保持,需要重启集群的一个节点才能解决,由于我们用的
是虚拟IP去连的集群的一个节点,如果你请求的队列是在另一个节点上RabbitMQ会给你返回404 queue not exist,spring-rabbit遇到这个错误会重启Consumer线程
,然后重新尝试创建队列,由于队列的元数据已有了, 在脑裂期间是无法再创建该队列的,spring默认只重试3次,每次间隔5s,如果15s内集群未恢复,你的rabbit消费者就死掉了,
需要重启业务进程解决,针对这个问题可以把rabbit:listener-container的declaration-retries设置的大一些, 这样集群恢复的时候consumer会自动恢复无需重启.

为了应对操作系统重启,掉电导致未及时fsync刷盘的场景,可以采用镜像队列, 镜像队列可以在rabbitmq管理后台配置策略, 从而解决队列单点故障问题.
配置了镜像队列后, 集群会自动选举一个rabbit节点为Master节点,再往镜像队列发送数据时,Rabbit会把数据发给所有节点中的队列,从而保证高可用.

镜像队列的架构如下:

监控

Rabbit管理后台上的监控是基于connection,exchange,queue, 很多时候发现某个队列积压了,但并不知道队列是谁创建的,只能在群里@所有人, 其实RabbitMQ是
提供了关联应用的能力,创建队列时可以加上队列参数,如下所示:

  <rabbit:queue id="rabbit.queue.test" name="rabbit.queue.test">
    <rabbit:queue-arguments>
      <entry key="application" value="xxx-application"/>
    </rabbit:queue-arguments>
  </rabbit:queue>

RabbitMQ会为每个consumer分配一个ConsumerTag,客户端可以自己指定,如果没指定,RabbitMQ会自动创建一个随机的,有时候想知道某个队列是谁在消费,
spring-rabbit提供了consumer-tag-strategy这个属性来配置一个生成ConsumerTag的Bean:

  <bean id="appNameConsumerTagStrategy" class="com.ximalaya.flash.rabbit.AppNameConsumerTagStrategy"/>

  <rabbit:listener-container concurrency="10" prefetch="10" declaration-retries="2147483647" acknowledge="auto"
                             requeue-rejected="false" consumer-tag-strategy="appNameConsumerTagStrategy"
                             connection-factory="rabbitConnectFactory" message-converter="messageConverter"
                              >
    <rabbit:listener ref="commonMessageListener"
                     queue-names="rabbit.queue.test"/>
  </rabbit:listener-container>

    public class AppNameConsumerTagStrategy implements ConsumerTagStrategy {
      private AtomicInteger cnt = new AtomicInteger(0);
      @Override
      public String createConsumerTag(String queue) {
        return "xxx-consumer-" + cnt.getAndIncrement();
      }
    }

最后可以在rabbit后台看到队列的创建者和消费者

rabbitmq在资源不足时会给所有Publish Connection发送Connection.Blocked指令, 这时候所有的Producer都无法发送信息, 在极端情况下会导致大部分线程都
阻塞在Rabbit的发送, AMQP的connection提供了BlockListener和UnBlockListener,业务方可以实现对应的监听器, 在连接被Block住后把消息写入到磁盘或者数据库,等
恢复后再从磁盘或数据库中恢复重新发送,避免进程卡死的情况.

    cachingConnectionFactory.addConnectionListener(new ConnectionListener() {
      @Override
      public void onCreate(Connection connection) {
        if (connection instanceof ConnectionProxy) {
          Connection targetConnection = ((ConnectionProxy) connection).getTargetConnection();
          if (targetConnection instanceof SimpleConnection) {
            try {
              SimpleConnection simpleConnection = (SimpleConnection) targetConnection;
              Field field = SimpleConnection.class.getDeclaredField("delegate");
              field.setAccessible(true);
              com.rabbitmq.client.Connection originConnection =
                  (com.rabbitmq.client.Connection) field.get(simpleConnection);
              originConnection.addBlockedListener(new BlockedListener() {
                @Override
                public void handleBlocked(String reason) throws IOException {
                 //do something
                }

                @Override
                public void handleUnblocked() throws IOException {
                  //do something
                }
              });
            } catch (Exception e) {
            }
          }
        }
      }

      @Override
      public void onClose(Connection connection) {
      }
    });

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表