Spring Boot集成RocketMQ

引入相关依赖

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

添加RocketMQ的相关配置

rocketmq:
    consumer:
        group: springboot_consumer_group
        # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
        pull-batch-size: 10
    name-server: 127.0.0.1:9876
    producer:
        # 发送同一类消息的设置为同一个group,保证唯一
        group: springboot_producer_group
        # 发送消息超时时间,默认3000
        sendMessageTimeout: 10000
        # 发送消息失败重试次数,默认2
        retryTimesWhenSendFailed: 2
        # 异步消息重试此处,默认2
        retryTimesWhenSendAsyncFailed: 2
        # 消息最大长度,默认1024 * 1024 * 4(默认4M)
        maxMessageSize: 4096
        # 压缩消息阈值,默认4k(1024 * 4)
        compressMessageBodyThreshold: 4096
        # 是否在内部发送失败时重试另一个broker,默认false
        retryNextServer: false

模板工具类RocketMQTemplate发送消息

@Setter(onMethod_ = @Autowired)
private RocketMQTemplate rocketmqTemplate;
private String topic = "test-topic";
  
public void testSend() {
  Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();
  SendResult sendResult = rocketmqTemplate.send(topic, msg);
}

使用RocketMQListener消费消息

@Component
@RocketMQMessageListener(topic = "your_topic_name", consumerGroup = "your_consumer_group_name")
public class MyConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        // 处理消息的逻辑
        System.out.println("Received message: " + message);
    }
}

关闭RocketMQ的日志输出

1.在启动类中添加:

System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");

1.在spring配置文件中添加:

logging.level.RocketmqClient=error