介绍 ( 版本>1.2.1 )
对rabbitmq进行二次封装并集成,主要特性包括:
- ✅ 消费者默认为手动ack机制,并提供了默认的ack处理逻辑
- ✅ 消费者支持消息重试机制,并可以配置重试次数
- ✅ 消息消费失败会自动放到死信队列(默认自动创建死信队列)
- ✅ 支持发送延迟消息(基于x-delayed-message插件)
依赖库
| 名称 | 描述 |
|---|---|
| spring-boot-starter-amqp |
快速开始
引入
xml
<dependency>
<groupId>io.github.zhanghongbin</groupId>
<artifactId>zebra-spring-boot-starter-rabbitmq</artifactId>
</dependency>在引入依赖后,会自动创建死信队列(默认队列名称为:default.dlx.queue),死信交换机(默认交换机名称为:default.dlx.exchange),并将死信队列绑定到死信交换机上
RabbitProducerClient 消息生产者客户端
RabbitProducerClient 受spring 容器管理,可直接注入使用。
java
@Autowired
private RabbitProducerClient rabbitProducerClient;主要提供以下方法:
| 方法名 | 描述 |
|---|---|
| public boolean createQueue(String queueName) | 创建一个持久化并带死信交换机的队列,如果队列名称存在则不创建 |
| public <T> void sendMessage(String exchangeName, String routingKey, T content) | 发送消息到指定的交换机和路由键,消息并持久化 |
| public <T> void sendMessage(String queueName, T content) | 发送消息到指定的队列,消息并持久化,队列不存在则创建并带死信交换机的队列 |
| public <T> void sendDelayedMessage(String queueName, T content, Integer expiration) | 发送延迟消息,消息并持久化,队列不存在则创建并带死信交换机的队列 |
| public RabbitTemplate getRabbitTemplate() | 获取rabbitmq 模板对象 |
| public RabbitAdmin getRabbitAdmin() | 获取rabbitmq 管理对象 |
AbstractRabbitListener 消息监听器
AbstractRabbitListener 是一个抽象类,用于定义消息消费者的基本行为。它提供了以下方法:
| 方法名 | 描述 |
|---|---|
| public abstract void onMessage(T message, Channel channel, Message amqpMessage) | 消息消费方法,由子类实现 |
| 使用示例: |
java
@Component
public class TestRabbitListener extends AbstractRabbitListener<TestMessage> {
public TestRabbitListener() {
super("test.queue");
}
@Override
public void onMessage(TestMessage message, Channel channel, Message amqpMessage) {
// 处理消息
}
}首先,需要在子类的构造方法中调用父类的构造方法,传入队列名称。如何队列不存在会自动创建。 然后需要实现onMessage方法,用于处理消息。如果方法抛出异常,且没有配置消息重试,消息会被放到死信队列中。 使用者无需设置ack机制,该模块提供默认的ack处理逻辑。
消费者重试配置
yml
zebra:
mq:
rabbit:
consumer:
max-retry-count: 3max-retry-count 配置需要大于0
