第一部分:基础配置与简单示例
1. 项目初始化
使用Spring Boot创建一个项目,添加RocketMQ依赖。
-
POM依赖(Maven):
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>3.2.3</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.3.0</version> </dependency>
-
application.yml 配置:
rocketmq: name-server: localhost:9876 producer: group: default-producer-group consumer: group: default-consumer-group
2. 简单生产者与消费者
-
生产者:
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class SimpleProducerController { @Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/send") public String sendMessage() { rocketMQTemplate.convertAndSend("SimpleTopic", "Hello, RocketMQ with Spring Boot!"); return "Message sent!"; } }
-
消费者:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Service @RocketMQMessageListener(topic = "SimpleTopic", consumerGroup = "simple-consumer-group") public class SimpleConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("Received message: " + message); } }
-
启动类:
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RocketMQApplication { public static void main(String[] args) { SpringApplication.run(RocketMQApplication.class, args); } }
启动项目后,访问 http://localhost:8080/send
,消费者会打印消息。这是最基础的用法,面试中常被问到如何快速集成。
第二部分:真实项目应用场景
以下是RocketMQ在Spring Boot中的典型应用场景,涵盖面试常见问题。
1. 电商订单系统(异步消息)
场景:用户下单后,异步通知库存扣减和物流系统。
-
生产者(订单服务):
@RestController public class OrderController { @Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/place-order") public String placeOrder() { String orderJson = "{\"orderId\":\"12345\",\"item\":\"Laptop\",\"quantity\":1}"; // 异步发送消息 rocketMQTemplate.asyncSend("OrderTopic", orderJson, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("Order sent successfully: " + sendResult.getMsgId()); } @Override public void onException(Throwable throwable) { System.err.println("Order send failed: " + throwable.getMessage()); } }); return "Order placed!"; } }
-
消费者(库存服务):
@Service @RocketMQMessageListener(topic = "OrderTopic", consumerGroup = "inventory-consumer-group", selectorExpression = "Inventory") public class InventoryConsumer implements RocketMQListener<String> { @Override public void onMessage(String orderJson) { System.out.println("Processing inventory for: " + orderJson); // 假设这里调用库存扣减逻辑 } }
-
配置Tag(application.yml):
rocketmq: name-server: localhost:9876 producer: group: order-producer-group
面试问题:如何确保消息不丢失?
- 回答:使用异步发送时,结合
SendCallback
检查发送结果;在生产者端开启retries
(默认3次重试);Broker端开启持久化。
2. 事务消息(支付系统)
场景:用户支付后,确保订单状态更新和消息发送一致。
-
生产者(事务消息):
@RestController public class PaymentController { @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private OrderService orderService; @GetMapping("/pay") public String payOrder() { String orderId = "12345"; rocketMQTemplate.sendMessageInTransaction("TransactionTopic", MessageBuilder.withPayload("Payment for " + orderId).build(), orderId); return "Payment processed!"; } } @Service @RocketMQTransactionListener public class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Autowired private OrderService orderService; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { String orderId = (String) arg; try { orderService.updateOrderStatus(orderId, "PAID"); // 本地事务 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String orderId = new String(msg.getPayload()).split(" ")[2]; String status = orderService.getOrderStatus(orderId); return "PAID".equals(status) ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK; } } @Service public class OrderService { // 模拟数据库操作 private Map<String, String> orderStatus = new HashMap<>(); public void updateOrderStatus(String orderId, String status) { orderStatus.put(orderId, status); } public String getOrderStatus(String orderId) { return orderStatus.getOrDefault(orderId, "UNPAID"); } }
面试问题:事务消息的实现原理?
- 回答:分为两阶段提交。Producer先发送Half消息,执行本地事务后提交或回滚;Broker定时检查未决事务,调用
checkLocalTransaction
确认状态。
3. 日志收集(顺序消息)
场景:收集应用日志,确保按时间顺序处理。
-
生产者:
@RestController public class LogController { @Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/log") public String sendLog() { String log = "{\"timestamp\":\"2025-02-25 10:00:00\",\"message\":\"User login\"}"; rocketMQTemplate.syncSendOrderly("LogTopic", log, "user123"); // 使用userId作为hashKey保证顺序 return "Log sent!"; } }
-
消费者:
@Service @RocketMQMessageListener(topic = "LogTopic", consumerGroup = "log-consumer-group", messageModel = MessageModel.CLUSTERING) public class LogConsumer implements RocketMQListener<String> { @Override public void onMessage(String log) { System.out.println("Processing log: " + log); } }
面试问题:如何保证消息顺序?
- 回答:使用
syncSendOrderly
,通过hashKey(如用户ID)将消息路由到同一队列,消费者单线程消费该队列。
4. 延迟消息(促销提醒)
场景:订单未支付30分钟后发送提醒。
-
生产者:
@RestController public class ReminderController { @Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/remind") public String sendReminder() { String reminder = "Order 12345 unpaid"; rocketMQTemplate.syncSend("ReminderTopic", MessageBuilder.withPayload(reminder).build(), 1000, 18); // 18代表30分钟延迟 return "Reminder scheduled!"; } }
-
消费者:
@Service @RocketMQMessageListener(topic = "ReminderTopic", consumerGroup = "reminder-consumer-group") public class ReminderConsumer implements RocketMQListener<String> { @Override public void onMessage(String reminder) { System.out.println("Sending reminder: " + reminder); } }
面试问题:延迟消息的实现机制?
- 回答:RocketMQ内置18个延迟级别(1s到2h),消息先存储到延迟队列,到期后投递到目标队列。
第三部分:优化与高可用
1. 高可用性配置
-
多NameServer:
rocketmq: name-server: localhost:9876;localhost:9877
-
多消费者实例:ConsumerGroup内多实例负载均衡。
2. 性能优化
-
批量发送:
List<Message> messages = Arrays.asList( MessageBuilder.withPayload("msg1").build(), MessageBuilder.withPayload("msg2").build() ); rocketMQTemplate.syncSend("BatchTopic", messages);
-
调整线程池:
rocketmq: consumer: pull-batch-size: 32 consume-thread-max: 64
3. 异常处理
-
消费者重试:
@Service @RocketMQMessageListener(topic = "RetryTopic", consumerGroup = "retry-consumer-group") public class RetryConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { if (true) { // 模拟失败 throw new RuntimeException("Processing failed"); } } }
默认重试16次,可通过
maxReconsumeTimes
调整。
第四部分:面试常见问题与回答
-
RocketMQ与Kafka的区别?
- RocketMQ支持事务消息和延迟消息,Kafka不支持。
- RocketMQ拉模式和推模式都支持,Kafka主要拉模式。
- RocketMQ适合业务场景,Kafka更偏大数据处理。
-
如何处理消息重复消费?
- 在消费者端实现幂等性(如数据库唯一约束或Redis去重)。
-
如何监控RocketMQ?
- 使用RocketMQ Dashboard查看Topic、消费进度;集成Prometheus+Grafana监控性能。