Skip to content

介绍 ( 版本>1.2.1 )

对rabbitmq进行二次封装并集成,主要特性包括:

  1. ✅ 消费者默认为手动ack机制,并提供了默认的ack处理逻辑
  2. ✅ 消费者支持消息重试机制,并可以配置重试次数
  3. ✅ 消息消费失败会自动放到死信队列(默认自动创建死信队列)
  4. ✅ 支持发送延迟消息(基于x-delayed-message插件)

依赖库

名称描述
spring-boot-starter-amqp

快速开始

引入

xml
<dependency>
	<groupId>io.github.zhanghongbin</groupId>
	<artifactId>zebra-spring-boot-starter-rabbitmq</artifactId>
</dependency>

在引入依赖后,会自动创建死信队列(默认队列名称为:default.dlx.queue),死信交换机(默认交换机名称为:default.dlx.exchange),并将死信队列绑定到死信交换机上

RabbitProducerClient 消息生产者客户端

RabbitProducerClient 受spring 容器管理,可直接注入使用。

java
@Autowired
private RabbitProducerClient rabbitProducerClient;

主要提供以下方法:

方法名描述
public boolean createQueue(String queueName)创建一个持久化并带死信交换机的队列,如果队列名称存在则不创建
public <T> void sendMessage(String exchangeName, String routingKey, T content)发送消息到指定的交换机和路由键,消息并持久化
public <T> void sendMessage(String queueName, T content)发送消息到指定的队列,消息并持久化,队列不存在则创建并带死信交换机的队列
public <T> void sendDelayedMessage(String queueName, T content, Integer expiration)发送延迟消息,消息并持久化,队列不存在则创建并带死信交换机的队列
public RabbitTemplate getRabbitTemplate()获取rabbitmq 模板对象
public RabbitAdmin getRabbitAdmin()获取rabbitmq 管理对象

AbstractRabbitListener 消息监听器

AbstractRabbitListener 是一个抽象类,用于定义消息消费者的基本行为。它提供了以下方法:

方法名描述
public abstract void onMessage(T message, Channel channel, Message amqpMessage)消息消费方法,由子类实现
使用示例:
java
@Component
public class TestRabbitListener extends AbstractRabbitListener<TestMessage> {
    
	public TestRabbitListener() {
		super("test.queue");
	}

	@Override
    public void onMessage(TestMessage message, Channel channel, Message amqpMessage) {
        // 处理消息
    }
}

首先,需要在子类的构造方法中调用父类的构造方法,传入队列名称。如何队列不存在会自动创建。 然后需要实现onMessage方法,用于处理消息。如果方法抛出异常,且没有配置消息重试,消息会被放到死信队列中。 使用者无需设置ack机制,该模块提供默认的ack处理逻辑。

消费者重试配置

yml
zebra:
  mq:
    rabbit:
      consumer:
        max-retry-count: 3

max-retry-count 配置需要大于0