# kafka **Repository Path**: gitSovereign/kafka ## Basic Information - **Project Name**: kafka - **Description**: kafka 场景实战 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2026-03-20 - **Last Updated**: 2026-03-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Spring Kafka 电商消息系统 基于 Spring Boot 3.2 + Spring Kafka 3.1 的电商场景消息系统,提供完整的订单消息生产与消费流程。 --- # 🆕 新人入门指南(必读) ## 一句话介绍这个项目 这是一个**电商订单消息系统**,用Kafka消息队列来处理订单相关的业务,比如: - 用户下单后通知仓库备货 - 支付成功后通知物流发货 - 订单状态变更时通知用户 就像一个**订单调度中心**,负责协调各个业务系统之间的消息传递。 --- ## 🗂️ 项目结构一览(新人必看) ``` src/main/java/com/ecommerce/kafka/ ├── EcommerceKafkaApplication.java # 🚀 项目入口(启动类) ├── config/ # ⚙️ 配置类(Kafka相关配置) │ ├── KafkaProducerConfig # 生产者配置(怎么发消息) │ ├── KafkaConsumerConfig # 消费者配置(怎么收消息) │ └── KafkaTopicConfig # Topic配置(消息主题) ├── producer/ # 📤 生产者(发送消息) │ └── OrderMessageProducer # 订单消息生产者 ├── consumer/ # 📥 消费者(接收消息) │ ├── OrderMessageConsumer # 订单消息消费者 │ ├── InventoryMessageConsumer # 库存消息消费者 │ ├── PaymentMessageConsumer # 支付消息消费者 │ └── ShippingMessageConsumer # 物流消息消费者 ├── service/ # 🏢 业务服务(处理业务逻辑) │ ├── OrderService # 订单服务 │ ├── InventoryService # 库存服务(面试题2) │ ├── PaymentService # 支付服务(面试题3) │ ├── ShippingService # 物流服务(面试题4) │ ├── CouponService # 优惠券服务(面试题5) │ ├── PointsService # 积分服务(面试题6) │ ├── NotificationService # 通知服务(面试题7) │ ├── ProductService # 商品服务 │ ├── UserBehaviorService # 用户行为分析(面试题13) │ ├── RealTimeStatisticsService # 实时统计(面试题15) │ ├── TransactionService # 分布式事务(面试题1) │ ├── DistributedLockService # 分布式锁(面试题16) │ ├── MessageRetryService # 消息重试(面试题20) │ └── MessageMonitorService # 消息监控(面试题18) ├── model/ # 📝 数据模型(消息格式) │ ├── OrderMessage # 订单消息 │ ├── InventoryMessage # 库存消息 │ ├── PaymentMessage # 支付消息 │ ├── ShippingMessage # 物流消息 │ ├── CouponMessage # 优惠券消息 │ ├── PointsMessage # 积分消息 │ ├── NotificationMessage # 通知消息 │ ├── ProductMessage # 商品消息 │ └── UserBehaviorLog # 用户行为日志 ├── dlq/ # 💀 死信队列(处理失败的消息) │ ├── DeadLetterQueueService # 死信队列服务 │ ├── DlqRetryService # 死信重试服务 │ └── DlqProgressManager # 重试进度管理 ├── util/ # 🛠️ 工具类 │ ├── IdempotentManager # 幂等性管理器 │ └── JsonUtils # JSON工具 └── exception/ # ⚠️ 异常处理 ├── OrderException # 订单异常 └── GlobalExceptionHandler # 全局异常处理器 ``` --- ## 🔑 核心概念(新人必须理解) ### 1. 什么是Kafka? Kafka就像一个**消息邮局**: - **Producer(生产者)**:寄信人,把消息寄出去 - **Consumer(消费者)**:收信人,从邮局取信 - **Topic(主题)**:就像信箱,一个Topic对应一类消息 - **Partition(分区)**:把一个大信箱分成多个小信箱,提高并发 ### 2. 什么是消息生产者? ```java // 就像寄信 kafkaTemplate.send("订单主题", "订单ID", 订单消息); ``` ### 3. 什么是消息消费者? ```java // 就像收信 @KafkaListener(topics = "订单主题") public void receive(订单消息) { // 处理订单 } ``` --- ## 🚀 如何启动项目 ### 1. 启动Kafka(需要先安装Docker) ```bash cd deploy docker-compose up -d ``` ### 2. 运行项目 ```bash mvn spring-boot:run ``` ### 3. 运行测试 ```bash mvn test ``` --- ## 🔄 消息流转流程(新人必看) ``` 用户下单 ↓ 【OrderMessageProducer】生成订单消息 ↓ 【Kafka Topic】消息队列 ↓ 【OrderMessageConsumer】消费订单消息 ↓ ├→ 【OrderService】处理订单业务 ├→ 【InventoryService】扣减库存 ├→ 【PaymentService】处理支付 └→ 【ShippingService】处理物流 ``` --- ## 📞 各模块调用关系(新人必看) ### 主流程:订单处理 ``` 入口: EcommerceKafkaApplication ↓ 配置: KafkaProducerConfig / KafkaConsumerConfig ↓ 生产者: OrderMessageProducer.sendOrder() ↓ 消息队列: Kafka (Topic: ecommerce-order) ↓ 消费者: OrderMessageConsumer.consume() ↓ 业务处理: OrderService / InventoryService / PaymentService ↓ 失败处理: DeadLetterQueueService (写入DLQ) ``` ### 分布式事务流程(面试题1) ``` TransactionService.executeOrderTransaction() ↓ kafkaTemplate.executeInTransaction() ← 开启事务 ├→ 发送订单消息 → order-topic ├→ 发送库存消息 → inventory-topic └→ 发送支付消息 → payment-topic ↓ processLocalBusiness() ← 本地业务处理 ``` --- ## 🎯 快速定位代码指南 | 我想找... | 去哪里找 | |-----------|---------| | 项目入口 | `EcommerceKafkaApplication.java` | | Kafka配置 | `config/KafkaProducerConfig.java`, `config/KafkaConsumerConfig.java` | | 怎么发消息 | `producer/OrderMessageProducer.java` | | 怎么收消息 | `consumer/OrderMessageConsumer.java` | | 订单业务逻辑 | `service/OrderServiceImpl.java` | | 库存扣减逻辑 | `service/InventoryService.java` (面试题2) | | 支付处理逻辑 | `service/PaymentService.java` (面试题3) | | 消息格式定义 | `model/OrderMessage.java` 等 | | 失败了怎么办 | `dlq/DeadLetterQueueService.java` | | 幂等性处理 | `util/IdempotentManager.java` | --- ## 📚 面试题对应的代码位置 | 面试题 | 核心代码位置 | |--------|-------------| | 1. Kafka事务 | `service/TransactionService.java` | | 2. 库存扣减 | `service/InventoryService.java` | | 3. 支付回调 | `service/PaymentService.java` | | 4. 物流通知 | `service/ShippingService.java` | | 5. 优惠券发放 | `service/CouponService.java` | | 6. 会员积分 | `service/PointsService.java` | | 7. 消息通知 | `service/NotificationService.java` | | 13. 用户行为分析 | `service/UserBehaviorService.java` | | 15. 实时统计 | `service/RealTimeStatisticsService.java` | | 16. 分布式锁 | `service/DistributedLockService.java` | | 18. 消息监控 | `service/MessageMonitorService.java` | | 20. 消息重试 | `service/MessageRetryService.java` | --- > **📑 知识点代码索引**:想要快速查找某个知识点对应的代码文件?请查看 [Kafka 知识点代码索引](docs/KAFKA_CODE_INDEX.md) --- ## ✅ 测试用例位置 | 测试内容 | 测试类位置 | |---------|-----------| | 订单服务测试 | `src/test/java/.../OrderServiceTest.java` | | 库存服务测试 | `src/test/java/.../InventoryServiceTest.java` | | 支付服务测试 | `src/test/java/.../PaymentServiceTest.java` | | 所有测试 | `mvn test` | --- # 下面是详细文档... ## 技术栈 | 组件 | 版本 | |------|------| | Java | 17+ | | Spring Boot | 3.2.0 | | Spring Kafka | 3.1.0 | | Apache Kafka | 3.6 | ## 核心功能 ### 1. 完整的生产者-消费者流程 - **生产者**:支持同步/异步/回调三种发送模式 - **消费者**:单条消费 + 手动提交offset - **Topic管理**:自动创建订单Topic和死信队列 ### 2. 全链路消息不丢失 ``` 生产端 消费端 ┌─────────┐ ┌─────────┐ │acks=all │ ──────────────────▶│手动提交 │ │无限重试 │ │offset │ └─────────┘ └─────────┘ ``` ### 3. 消息幂等性保证 - **生产端**:`enable.idempotence=true`,Broker自动去重 - **消费端**:`IdempotentManager`记录已处理消息ID ### 4. 高并发订单状态有序性 - 分区Key = 订单ID,同一订单消息进入同一分区 - 版本号乐观锁,防止乱序消费 - 状态机校验,非法状态转换直接拒绝 ### 5. 死信队列(DLQ) 消费失败的消息写入本地文件,便于后续人工处理或重试: ``` logs/dlq/ ├── dlq-2024-01-01.log ├── dlq-2024-01-02.log └── dlq-2024-01-03.log ``` **特点**: - 顺序追加写入,只添加不删除 - 文件锁保护,多线程安全 - 按天滚动,便于管理 - JSON格式,包含完整异常信息 ## 满足电商高并发场景的关键设计 ### 1. 消息可靠性(不丢失) | 配置项 | 值 | 作用 | |--------|-----|------| | `acks` | all | 所有ISR副本确认才算成功 | | `retries` | Integer.MAX_VALUE | 无限重试直到成功 | | `enable.idempotence` | true | 幂等生产者,防止重复 | | `enable.auto.commit` | false | 手动提交,处理成功才提交 | ### 2. 顺序性保证 ```java // 订单ID作为分区Key,保证同一订单消息有序 kafkaTemplate.send(topic, orderId, orderMessage); // 版本号校验,跳过过期消息 if (!idempotentManager.isVersionValid(orderId, version)) { acknowledgment.acknowledge(); // 跳过不处理 return; } ``` ### 3. 幂等性设计 ``` 消息ID = {orderId}_{version}_{timestamp} ┌─────────────────────────────────────────┐ │ IdempotentManager │ ├─────────────────────────────────────────┤ │ isProcessed(messageId) → 检查是否已处理 │ │ recordProcessing() → 记录处理结果 │ │ isVersionValid() → 校验版本连续性 │ └─────────────────────────────────────────┘ ``` ### 4. 并发处理能力 - 3个分区 × 3个消费者线程 = 9倍并发提升 - 同一订单的消息串行处理,不同订单并行处理 - ConcurrentHashMap保证线程安全 ## 项目结构 ``` src/main/java/com/ecommerce/kafka/ ├── config/ # Kafka配置 │ ├── KafkaTopicConfig # Topic配置 │ ├── KafkaProducerConfig # 生产者配置 │ └── KafkaConsumerConfig # 消费者配置 ├── consumer/ │ └── OrderMessageConsumer # 消息消费(幂等+顺序+DLQ) ├── dlq/ │ └── DeadLetterQueueService # 死信队列服务(文件追加写入) ├── exception/ │ ├── OrderException # 业务异常 │ └── GlobalExceptionHandler # 全局异常处理 ├── model/ │ ├── OrderMessage # 订单消息 │ ├── OrderStatus # 订单状态枚举 │ └── IdempotentRecord # 幂等性记录 ├── producer/ │ └── OrderMessageProducer # 消息生产(同步/异步/回调) ├── service/ │ ├── OrderService # 订单服务接口 │ └── OrderServiceImpl # 订单服务实现 └── util/ ├── JsonUtils # JSON工具类 └── IdempotentManager # 幂等性管理器 ``` ## 快速开始 ### 1. 启动Kafka集群 ```bash cd deploy docker-compose up -d ``` ### 2. 运行测试 ```bash mvn test ``` ### 3. 启动应用 ```bash mvn spring-boot:run ``` ## 可取之处 ### 1. 配置即文档 所有Kafka配置项都有详细注释,说明用途和注意事项: ```yaml # acks=all: Leader和所有ISR副本都确认后才认为发送成功,保证消息不丢失 acks: all ``` ### 2. 防御性编程 - 参数非空校验 - 状态转换合法性校验 - 版本号连续性校验 - 异常统一处理 ### 3. 可观测性 - 关键操作日志记录 - 消息发送/消费结果追踪 - 错误场景详细日志 ### 4. 测试覆盖完整 - 单元测试:Model、Util、Service、DLQ层 - 集成测试:完整Kafka生产消费流程 - 88个测试用例全部通过 ### 5. 遵循规范 - 阿里巴巴Java开发规范 - 使用commons-lang3/collections4/io工具包 - 完整的类/方法注释 ## 不足之处 ### 1. 幂等性存储 **现状**:基于内存的ConcurrentHashMap **问题**:单机有效,重启丢失 **改进**:生产环境应使用Redis或数据库 ```java // TODO: 改为Redis实现 @Service public class RedisIdempotentManager implements IdempotentManager { @Autowired private RedisTemplate redisTemplate; } ``` ### 2. 死信队列处理 **现状**:失败消息写入本地文件 **问题**:文件管理需人工介入 **改进**:添加DLQ消息重试机制和管理界面 ### 3. 分布式事务 **现状**:没有实现分布式事务 **问题**:消息发送和业务操作不在同一事务 **改进**:使用Kafka事务或本地消息表 ### 4. 监控告警 **现状**:只有日志 **问题**:无法实时监控和告警 **改进**:集成Prometheus + Grafana ### 5. 消费者组管理 **现状**:固定消费者组 **问题**:不支持动态扩展 **改进**:支持消费者组动态配置 ### 6. 消息追踪 **现状**:日志中记录关键信息 **问题**:无法端到端追踪 **改进**:集成分布式追踪(如SkyWalking) ## 订单状态流转 ``` ┌─────────┐ │ PENDING │ (待支付) └────┬────┘ │ ┌────────────┼────────────┐ ▼ │ ▼ ┌────────┐ │ ┌──────────┐ │ PAID │ │ │CANCELLED │ │(已支付)│ │ │ (已取消) │ └────┬───┘ │ └──────────┘ │ │ ▼ │ ┌─────────┐ │ │ SHIPPED │ │ │(已发货) │ │ └────┬────┘ │ │ │ ▼ │ ┌──────────┐ │ │ DELIVERED│ │ │ (已送达) │ │ └────┬─────┘ │ │ │ ▼ │ ┌──────────┐ │ │COMPLETED │ │ │ (已完成) │ │ └──────────┘ │ │ ``` ## 测试结果 ``` Tests run: 97, Failures: 0, Errors: 0, Skipped: 0 BUILD SUCCESS ``` | 测试类 | 测试数 | |--------|--------| | OrderStatusTest | 13 | | OrderMessageTest | 10 | | JsonUtilsTest | 17 | | IdempotentManagerTest | 15 | | OrderServiceTest | 16 | | DeadLetterQueueServiceTest | 9 | | DlqRetryServiceTest | 9 | | KafkaIntegrationTest | 8 | ## 死信队列重试机制 ### 文件结构 ``` logs/dlq/ ├── dlq-2024-01-01.log # 死信队列文件(按天滚动) ├── dlq-2024-01-02.log ├── progress/ # 进度文件目录 │ └── dlq-2024-01-01.log.progress.json └── exhausted/ # 终极失败文件目录 └── exhausted-2024-01-01.log ``` ### 重试配置 ```yaml app: kafka: dlq: retry: enabled: true # 启用定时重试 cron: "0 0/5 * * * ?" # 每5分钟执行一次 max-count: 3 # 最大重试次数 batch-size: 100 # 每次处理批量大小 ``` ### 进度记录格式 ```json { "fileName": "dlq-2024-01-01.log", "lastProcessedLine": 5, "totalLines": 10, "records": { "0": {"status": "SUCCESS", "retryCount": 1}, "1": {"status": "FAILED", "retryCount": 3}, "2": {"status": "PENDING", "retryCount": 0} } } ``` ## 作者 lvdaxianer --- # Kafka 场景面试题(20个) 本文档整理了20个Kafka常见面试场景,每个场景包含面试问题、详细解答和对应的代码实现。 ## 面试题1:如何保证消息的顺序性? ### 面试问题 Kafka中如何保证消息的顺序性?如果需要保证同一订单的消息按顺序处理,应该如何设计? ### 解答要点 1. **生产者端**:使用相同的Key发送到同一分区 2. **消费者端**:同一分区内消息按offset顺序消费 3. **订单场景**:使用订单ID作为Key,保证同一订单的消息进入同一分区 ### 代码实现 ```java /** * 使用订单ID作为分区Key,保证同一订单的消息有序 * * @param orderMessage 订单消息 * @return 发送结果 */ public SendResult sendInOrder(OrderMessage orderMessage) { // 订单ID作为Key,同一订单的消息会发送到同一分区 String orderId = orderMessage.getOrderId(); // KafkaTemplate.send(topic, key, value) - 相同key进入相同分区 CompletableFuture> future = kafkaTemplate.send(orderTopic, orderId, orderMessage); return future.get(); } /** * 消费者端:同一分区内消息顺序消费 * 由于同一订单消息在同一分区,消费者按offset顺序消费,自然保证顺序 */ @KafkaListener(topics = "${app.kafka.topic.order}", groupId = "order-group") public void consume(ConsumerRecord record) { // 同一订单的消息会按发送顺序被消费 log.info("消费消息: orderId={}, offset={}", record.key(), record.offset()); } ``` --- ## 面试题2:如何实现消息幂等性? ### 面试问题 什么是消息幂等性?如何防止消息重复消费导致的数据重复处理? ### 解答要点 1. **生产端幂等**:配置`enable.idempotence=true` 2. **消费端幂等**:使用唯一消息ID进行去重 3. **业务层面**:业务ID + 版本号双重校验 ### 代码实现 ```java /** * 幂等性管理器 - 消费端去重 */ @Component public class IdempotentManager { private final Map idempotentMap = new ConcurrentHashMap<>(); /** * 检查消息是否已处理 * * @param messageId 消息ID(唯一标识) * @return true表示已处理,false表示未处理 */ public boolean isProcessed(String messageId) { return idempotentMap.containsKey(messageId); } /** * 记录消息处理结果 * * @param messageId 消息ID * @param orderId 订单ID * @param version 版本号 */ public void recordProcessing(String messageId, String orderId, Long version) { IdempotentRecord record = new IdempotentRecord(); record.setMessageId(messageId); record.setOrderId(orderId); record.setVersion(version); record.setProcessTime(System.currentTimeMillis()); idempotentMap.put(messageId, record); } /** * 校验版本号 - 防止消息乱序 * * @param orderId 订单ID * @param version 消息版本号 * @return true表示版本有效,false表示是过期消息 */ public boolean isVersionValid(String orderId, Long version) { // 获取该订单最新处理的版本号 IdempotentRecord existing = idempotentMap.values().stream() .filter(r -> orderId.equals(r.getOrderId())) .max(Comparator.comparingLong(IdempotentRecord::getVersion)) .orElse(null); if (existing == null) { return true; // 首次处理,版本有效 } // 只有新版本才能处理 return version > existing.getVersion(); } } /** * 消费者使用幂等性检查 */ @KafkaListener(topics = "${app.kafka.topic.order}") public void consume(ConsumerRecord record) { String messageId = record.value().getMessageId(); // 1. 检查是否已处理 if (idempotentManager.isProcessed(messageId)) { log.warn("消息已处理,跳过: {}", messageId); return; } // 2. 处理业务逻辑 processOrder(record.value()); // 3. 记录处理结果 idempotentManager.recordProcessing(messageId, record.value().getOrderId(), record.value().getVersion()); } ``` --- ## 面试题3:如何保证消息不丢失? ### 面试问题 Kafka中消息丢失的场景有哪些?如何配置才能保证消息不丢失? ### 解答要点 1. **生产者配置**:acks=all, retries=Integer.MAX_VALUE, enable.idempotence=true 2. **Broker配置**:replication.factor >= 3, min.insync.replicas >= 2 3. **消费者配置**:enable.auto.commit=false, 手动提交offset ### 代码实现 ```java /** * Kafka生产者配置 - 保证消息不丢失 */ @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory producerFactory() { Map props = new HashMap<>(); // 1. 服务器地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 2. 序列化器 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // 3. acks配置 - all表示所有ISR副本确认 // 只有Leader和所有ISR副本都确认,才认为发送成功 props.put(ProducerConfig.ACKS_CONFIG, "all"); // 4. 重试次数 - 无限重试直到成功 props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 5. 幂等性保证 - 防止消息重复 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 6. 发送超时 props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 7. 批次大小和等待时间 - 批量发送提高吞吐量 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 8. 缓冲区大小 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); return new DefaultKafkaProducerFactory<>(props); } } /** * Kafka消费者配置 - 手动提交offset */ @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory consumerFactory() { Map props = new HashMap<>(); // 1. 服务器地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 2. 消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group"); // 3. 序列化器 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); // 4. 关闭自动提交 - 手动提交确保消息处理后才提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 5. 消费者.poll()返回的最大记录数 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 6. 消费者心跳超时 props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); // 7. 消费者会话超时 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); return new DefaultKafkaConsumerFactory<>(props); } @Bean public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 手动确认模式 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); return factory; } } ``` --- ## 面试题4:如何处理消息重复消费? ### 面试问题 什么情况下会导致消息重复消费?如何处理? ### 解答要点 1. **重复原因**:消费者处理成功但未提交offset就崩溃;生产者重试导致重复 2. **解决方案**:消费端幂等性设计(见面试题2) 3. **最佳实践**:业务ID + 版本号双重去重 ### 版本号深度解析 **问题:版本号是什么?谁来维护?** 版本号是**生产者**在发送消息时维护的,用于保证**同一订单的消息顺序**。 ``` 订单状态变更流程: ┌─────────────────────────────────────────────────────────────────┐ │ 订单状态流转 + 版本号变化 │ │ │ │ 订单创建 ──v1──▶ 待支付 ──v2──▶ 已支付 ──v3──▶ 已发货 ──v4 │ │ │ │ 每条消息都携带版本号: │ │ - 消息1: orderId=ORDER-001, version=1, status=创建 │ │ - 消息2: orderId=ORDER-001, version=2, status=待支付 │ │ - 消息3: orderId=ORDER-001, version=3, status=已支付 │ │ - 消息4: orderId=ORDER-001, version=4, status=已发货 │ └─────────────────────────────────────────────────────────────────┘ ``` **为什么需要版本号?** | 场景 | 说明 | |------|------| | **消息ID去重** | 防止同一条消息重复消费(messageId唯一) | | **版本号校验** | 防止同一订单的消息乱序(版本号必须递增) | **版本号的作用:** ``` 场景:消费者收到了version=3的消息,但本地记录只有version=1 可能原因: 1. version=2的消息在传输过程中丢失了 2. version=2的消息还在处理中,暂时未到达 处理方式: - 拒绝处理version=3的消息(跳过) - 等待version=2的消息到来后再处理 - 防止业务数据不一致 ``` **版本号由谁维护?** ``` 生产者端维护: ┌─────────────────────────────────────────────────────────────────┐ │ OrderMessage message = new OrderMessage(); │ │ message.setOrderId("ORDER-001"); │ │ message.setVersion(1); // 生产者维护版本号 │ │ message.setStatus(OrderStatus.CREATED); │ │ │ │ // 状态变更时,版本号递增 │ │ message.setStatus(OrderStatus.PENDING); │ │ message.setVersion(2); // 每次状态变更,version+1 │ └─────────────────────────────────────────────────────────────────┘ 消息ID格式:{orderId}_{version}_{timestamp} 示例:ORDER-001_2_1700000000000 这样消费者可以: 1. 通过messageId判断是否重复(同一条消息) 2. 通过version判断消息顺序(同一订单的消息) ``` ### 代码实现 ```java /** * 消息重复消费处理策略 */ @Component public class DuplicateMessageHandler { private final IdempotentManager idempotentManager; /** * 处理消息(带幂等性保护) * * @param record Kafka记录 * @param ack 确认对象 */ public void handleMessage(ConsumerRecord record, Acknowledgment ack) { OrderMessage message = record.value(); // ============ 第一层:消息ID去重 ============ // messageId格式: {orderId}_{version}_{timestamp} // 作用:防止同一条消息重复消费 String messageId = message.getMessageId(); if (idempotentManager.isProcessed(messageId)) { log.info("消息已处理过(messageId去重): {}", messageId); ack.acknowledge(); return; } // ============ 第二层:版本号校验 ============ // version: 订单状态版本号 // 作用:防止同一订单的消息乱序 String orderId = message.getOrderId(); Integer version = message.getVersion(); // 检查版本号是否有效(必须递增) if (!idempotentManager.isVersionValid(orderId, version)) { log.warn("消息版本号无效,跳过: orderId={}, version={}", orderId, version); ack.acknowledge(); // 跳过无效版本的消息 return; } // ============ 第三层:业务处理 ============ try { processMessage(message); // 更新版本号 idempotentManager.updateVersion(orderId, version); // 记录处理结果(用于消息ID去重) idempotentManager.recordProcessing(messageId, orderId, version); // 手动提交offset ack.acknowledge(); } catch (Exception e) { log.error("消息处理失败: {}", messageId, e); // 不提交offset,消息会被重新投递 } } private void processMessage(OrderMessage message) { // 业务处理逻辑 } } ``` **双重去重总结:** | 防护层 | 检查内容 | 作用 | 维护者 | |--------|----------|------|--------| | 第一层 | messageId(消息ID) | 防止同一条消息重复消费 | 生产者生成 | | 第二层 | version(版本号) | 防止同一订单消息乱序 | 生产者维护 | | 第三层 | 业务逻辑 | 最终数据一致性保证 | 业务代码 | --- ## 面试题5:消费者组与分区分配策略 ### 面试问题 什么是消费者组?分区是如何分配给消费者的? ### 解答要点 1. **消费者组**:同一消费者组内的消费者共同消费主题消息 2. **分区分配**:一个分区只能被同一消费者组的一个消费者消费 3. **分配策略**:RangeAssignor、RoundRobinAssignor、StickyAssignor ### 代码实现 ```java /** * 消费者组配置示例 */ @Configuration public class ConsumerGroupConfig { /** * 消费者组1 - 处理订单消息 */ @Bean public KafkaListenerContainerFactory orderListenerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(orderConsumerFactory()); factory.setConcurrency(3); // 3个消费者线程 return factory; } /** * 消费者组2 - 处理通知消息(不同消费者组独立消费) */ @Bean public KafkaListenerContainerFactory notificationListenerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(notificationConsumerFactory()); factory.setConcurrency(3); return factory; } } /** * 不同消费者组监听同一主题 */ @Component public class OrderMessageConsumer { // 消费者组1 @KafkaListener(topics = "order-topic", groupId = "order-processor-group") public void consumeOrder(ConsumerRecord record) { log.info("订单处理组消费: {}", record.value()); } } @Component public class OrderAnalyticsConsumer { // 消费者组2 - 同一主题,独立消费 @KafkaListener(topics = "order-topic", groupId = "order-analytics-group") public void consumeForAnalytics(ConsumerRecord record) { log.info("订单分析组消费: {}", record.value()); } } /** * 自定义分区分配策略 */ @Component public class CustomPartitionAssignor implements PartitionAssignor { @Override public Subscription subscription(Set topics) { return new Subscription(new ArrayList<>(topics)); } @Override public Map> assign(List partitions, List subscriptions) { // 自定义分配逻辑:例如按用户ID哈希分配 Map> assignment = new HashMap<>(); for (Subscription sub : subscriptions) { List assigned = new ArrayList<>(); for (Metadata.TopicPartition tp : partitions) { // 自定义分配逻辑 if (shouldAssign(sub.userData(), tp)) { assigned.add(tp); } } assignment.put(sub.memberId(), assigned); } return assignment; } } ``` --- ## 面试题6:分区再均衡(Rebalance) ### 面试问题 什么是分区再均衡?什么情况下会触发再均衡?如何减少再均衡对业务的影响? ### 解答要点 1. **触发条件**:消费者加入/离开、订阅主题变化、分区数变化 2. **影响**:再均衡期间停止消费 3. **优化**:设置合理的session.timeout.ms、heartbeat.interval.ms ### 代码实现 ```java /** * 分区再均衡监听器 */ @Component public class RebalanceListener implements ConsumerRebalanceListener { private static final Logger log = LoggerFactory.getLogger(RebalanceListener.class); /** * 再均衡开始前的回调 * 保存当前处理进度,避免消息丢失 */ @Override public void onPartitionsRevokedBeforeCommitRebalance(Collection partitions) { log.info("分区即将被回收,准备提交offset: {}", partitions); // 保存处理进度到数据库 saveProgressToDatabase(); // 或者提交当前处理的offset // kafkaTemplate.commitOffsets(); } /** * 再均衡完成后的回调 */ @Override public void onPartitionsLost(Collection partitions) { log.warn("分区丢失(消费者崩溃): {}", partitions); } /** * 再均衡完成后,重新分配分区 */ @Override public void onPartitionsAssigned(Collection partitions) { log.info("分区重新分配完成: {}", partitions); // 可以从数据库恢复处理进度 restoreProgressFromDatabase(partitions); } private void saveProgressToDatabase() { // 保存处理进度 } private void restoreProgressFromDatabase(Collection partitions) { // 恢复处理进度 } } /** * 使用再均衡监听器的消费者 */ @KafkaListener(topics = "${app.kafka.topic.order}", groupId = "order-group") public void consumeWithRebalance(ConsumerRecord record, Acknowledgment ack, Consumer consumer) { // 设置再均衡监听器 consumer.subscribe(Arrays.asList("order-topic"), new RebalanceListener()); // 处理消息 processMessage(record.value()); ack.acknowledge(); } ``` --- ## 面试题7:死信队列(DLQ)设计 ### 面试问题 如何设计死信队列处理消费失败的消息? ### 解答要点 1. **失败消息处理**:重试次数超过阈值后进入死信队列 2. **死信队列用途**:人工处理、后续重试、数据分析 3. **实现方式**:本地文件或Kafka独立主题 ### 代码实现 ```java /** * 死信队列服务 - 文件实现 */ @Component public class DeadLetterQueueService { private final String dlqDirectory = "./logs/dlq"; /** * 写入死信队列 */ public void writeToDeadLetterQueue(OrderMessage message, Exception e, int partition, long offset, int retryCount) { DeadLetterRecord record = new DeadLetterRecord(); record.setMessageId(message.getMessageId()); record.setOrderId(message.getOrderId()); record.setExceptionMessage(e.getMessage()); record.setRetryCount(retryCount); record.setPartition(partition); record.setOffset(offset); record.setTimestamp(LocalDateTime.now()); // 写入文件 String fileName = String.format("dlq-%s.log", LocalDate.now().format(DateTimeFormatter.ISO_LOCAL_DATE)); Path filePath = Paths.get(dlqDirectory, fileName); try { Files.createDirectories(filePath.getParent()); String json = new ObjectMapper().writeValueAsString(record); Files.write(filePath, (json + "\n").getBytes(), StandardOpenOption.CREATE, StandardOpenOption.APPEND); } catch (IOException ex) { log.error("写入DLQ失败", ex); } } } /** * 死信队列消费者 */ @Component public class DeadLetterConsumer { @KafkaListener(topics = "${app.kafka.dlq.topic}", groupId = "dlq-consumer-group") public void consume(ConsumerRecord record) { try { // 解析死信消息 DeadLetterRecord dlqRecord = new ObjectMapper() .readValue(record.value(), DeadLetterRecord.class); log.error("处理死信消息: orderId={}, error={}", dlqRecord.getOrderId(), dlqRecord.getExceptionMessage()); // 处理策略: // 1. 人工处理 - 记录告警 // 2. 修复后重试 - 重新发送到原主题 // 3. 跳过 - 直接丢弃 } catch (Exception e) { log.error("处理死信消息失败", e); } } } ``` --- ## 面试题8:消息积压处理 ### 面试问题 如果消息出现积压(积压了上百万条消息),应该如何处理? ### 解答要点 1. **临时扩容**:增加消费者实例 2. **跳过积压**:跳过历史消息,从最新开始消费 3. **清理积压**:清理不重要消息 4. **限流**:生产者端限流 ### 代码实现 ```java /** * 消息积压处理策略 */ @Component public class MessageBacklogHandler { /** * 策略1:跳到最新位置消费 * 适用于历史消息不需要处理的场景 */ public void skipToLatest(Consumer consumer, String topic) { // 跳过所有积压消息,从最新开始消费 consumer.seekToEnd(consumer.assignment()); } /** * 策略2:跳到指定时间点 * 适用于只需要处理某时间点之后的消息 */ public void skipToTimestamp(Consumer consumer, String topic, long timestamp) { Map timestampsToSearch = new HashMap<>(); for (TopicPartition partition : consumer.assignment()) { timestampsToSearch.put(partition, timestamp); } Map offsets = consumer.offsetsForTimes(timestampsToSearch); for (Map.Entry entry : offsets.entrySet()) { if (entry.getValue() != null) { consumer.seek(entry.getKey(), entry.getValue().offset()); } } } /** * 策略3:动态调整消费者并发 * 根据积压量动态调整处理能力 */ @Component public static class DynamicConsumerScaler { @Autowired private KafkaListenerEndpointRegistry registry; public void scaleConsumer(String listenerId, int targetConcurrency) { MessageListenerContainer container = registry.getListenerContainer(listenerId); if (container instanceof ConcurrentMessageListenerContainer) { ((ConcurrentMessageListenerContainer) container) .setConcurrency(targetConcurrency); } } } /** * 监控积压量并自动扩容 */ @Scheduled(fixedDelay = 60000) public void monitorAndScale() { // 获取积压量 Map> lag = getConsumerLag(); for (Map.Entry> topicEntry : lag.entrySet()) { long totalLag = topicEntry.getValue().values().stream() .mapToLong(Long::longValue).sum(); // 积压超过100万条,自动扩容 if (totalLag > 1_000_000) { log.warn("消息积压严重: topic={}, lag={}", topicEntry.getKey(), totalLag); // 触发告警 sendAlert(topicEntry.getKey(), totalLag); } } } private Map> getConsumerLag() { // 通过AdminClient获取消费者积压量 return new HashMap<>(); } private void sendAlert(String topic, long lag) { // 发送告警通知 } } ``` --- ## 面试题9:高吞吐量设计 ### 面试问题 如何设计Kafka系统以支持高吞吐量(每秒百万级消息)? ### 解答要点 1. **生产者**:批量发送、压缩、同步改异步 2. **Broker**:增加分区数、提高副本因子 3. **消费者**:增加消费者数量、提高并发 ### 代码实现 ```java /** * 高吞吐量生产者配置 */ @Configuration public class HighThroughputProducerConfig { @Bean public ProducerFactory highThroughputProducerFactory() { Map props = new HashMap<>(); // 1. 批量发送配置 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB props.put(ProducerConfig.LINGER_MS_CONFIG, 20); // 等待20ms批量发送 // 2. 压缩配置 - 推荐使用LZ4或ZSTD props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 3. 缓冲区大小 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB // 4. 并发请求数 props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // 5. 顺序保证(高吞吐场景可关闭) // props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); return new DefaultKafkaProducerFactory<>(props); } } /** * 批量消息生产者 */ @Component public class BatchMessageProducer { private final KafkaTemplate kafkaTemplate; private final List> buffer = new ArrayList<>(); /** * 异步批量发送 */ public void sendBatch(List messages) { List>> futures = new ArrayList<>(); for (OrderMessage message : messages) { ProducerRecord record = new ProducerRecord<>( "order-topic", message.getOrderId(), message); futures.add(kafkaTemplate.send(record)); } // 等待所有消息发送完成 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .whenComplete((result, ex) -> { if (ex != null) { log.error("批量发送失败", ex); } else { log.info("批量发送成功: count={}", messages.size()); } }); } /** * 使用ProducerInterceptor批量发送 */ @Component public static class BatchProducerInterceptor implements ProducerInterceptor { private static final int BATCH_SIZE = 100; private final List> batch = new ArrayList<>(); @Override public ProducerRecord onSend(ProducerRecord record) { batch.add(record); if (batch.size() >= BATCH_SIZE) { // 触发批量发送 flushBatch(); } return record; } private void flushBatch() { // 批量发送到Kafka batch.clear(); } } } /** * 高并发消费者配置 */ @Configuration public class HighThroughputConsumerConfig { @Bean public ConsumerFactory highThroughputConsumerFactory() { Map props = new HashMap<>(); // 1. 增加每次poll的记录数 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); // 2. 减少poll间隔 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 3. 增加fetch.min.bytes,减少网络往返 props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024); // 4. 增加fetch.max.wait.ms props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); return new DefaultKafkaConsumerFactory<>(props); } } ``` --- ## 面试题10:Kafka事务消息 ### 面试问题 Kafka事务是什么?如何使用Kafka事务保证消息发送和业务操作的原子性? ### 解答要点 1. **事务API**:KafkaProducer + 事务ID 2. **事务语义**:多消息发送的原子性 3. **隔离级别**:read_committed, read_uncommitted 4. **使用场景**:跨分区的消息一致性 ### 代码实现 ```java /** * Kafka事务配置 */ @Configuration public class KafkaTransactionConfig { @Bean public ProducerFactory transactionProducerFactory() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // 事务ID,同一应用内必须唯一 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-transaction-1"); // 幂等性必须开启 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); return new DefaultKafkaProducerFactory<>(props); } @Bean public KafkaTemplate transactionKafkaTemplate() { return new KafkaTemplate<>(transactionProducerFactory()); } } /** * 事务消息生产者 */ @Component public class TransactionalMessageProducer { private final KafkaTemplate kafkaTemplate; /** * 发送事务消息 * 订单创建 + 库存扣减在同一事务中 */ @Transactional(rollbackFor = Exception.class) public void sendTransactionMessage(OrderMessage orderMessage, InventoryOperation inventoryOp) { // 初始化事务 kafkaTemplate.initTransactions(); // 开启事务 kafkaTemplate.executeInTransaction(operations -> { // 1. 发送订单消息 operations.send("order-topic", orderMessage.getOrderId(), orderMessage); // 2. 发送库存扣减消息 operations.send("inventory-topic", inventoryOp.getProductId(), inventoryOp); // 3. 发送通知消息 operations.send("notification-topic", orderMessage.getUserId(), buildNotification(orderMessage)); return true; }); } /** * 批量事务发送 */ public void sendBatchTransaction(List orders) { kafkaTemplate.executeInTransaction(operations -> { for (OrderMessage order : orders) { operations.send("order-topic", order.getOrderId(), order); } return true; }); } } /** * 事务消费者 */ @Component public class TransactionalConsumer { /** * 读取已提交的事务消息 */ @KafkaListener(topics = "order-topic", groupId = "order-group", properties = {"isolation.level=read_committed"}) public void consumeCommitted(ConsumerRecord record) { // 只读取已提交的事务消息 log.info("消费事务消息: {}", record.value()); } /** * 读取所有消息(包括未提交的) */ @KafkaListener(topics = "order-topic", groupId = "order-group-uncommitted", properties = {"isolation.level=read_uncommitted"}) public void consumeUncommitted(ConsumerRecord record) { log.info("消费所有消息: {}", record.value()); } } ``` --- ## 面试题11:延迟消息 ### 面试问题 如何在Kafka中实现延迟消息(例如订单30分钟后自动取消)? ### 延迟消息概述 延迟消息是指消息在指定的时间点或延迟时间后才被消费者处理。在电商场景中,延迟消息常用于: - 订单超时自动取消 - 支付超时提醒 - 物流状态延迟通知 - 评价提醒 - 优惠券过期提醒 ### 三种实现方案对比(分布式环境) | 特性 | 方案1:时间轮算法 | 方案2:延时主题+定时扫描 | 方案3:外部延迟队列(Redis) | |------|------------------|------------------------|-------------------------| | **精度** | 毫秒级 | 秒级(依赖扫描频率) | 毫秒级 | | **性能** | 极高 | 中 | 高 | | **复杂度** | 中 | 低 | 低 | | **可靠性** | 一般(内存存储) | 一般(Kafka持久化) | 高(Redis持久化) | | **分布式支持** | ✅ Redis ZSET | ✅ Kafka消费者组 | ✅ Redis集群 | | **消息持久化** | ✅ Redis | ✅ Kafka | ✅ Redis | | **实现难度** | 中等 | 简单 | 简单 | | **适用场景** | 高并发、低延迟 | 简单场景 | 需要高可靠、精确延迟 | ### 分布式环境关键点 #### 分布式时间轮(Redis实现) - 使用Redis ZSET替代内存存储 - 每个槽位对应一个ZSET key - 使用Redis分布式锁保证同一时刻只有一个实例扫描 - 支持多实例部署,自动负载均衡 #### 分布式延时主题(Kafka消费者组) - 消息存储在Kafka分区中(天然分布式) - 使用消费者组实现负载均衡 - 多个消费者实例自动分配分区 - Kafka副本机制保证高可用 #### 分布式外部延迟队列(Redis) - 使用Redis ZSET存储消息 - 使用Redis SET实现消息处理锁(防止重复处理) - 实例ID标识,支持追踪消息处理来源 - 支持Redis集群部署 --- ### 方案1:时间轮算法(Time Wheel)- 分布式版 #### 原理 时间轮是一种**O(1)**时间复杂度的延迟任务调度算法,类似于时钟的指针转动机制: ``` ┌─────────────────────────────────────────────────────┐ │ 时间轮结构 │ │ ┌───┬───┬───┬───┬───┬───┬───┐ │ │ │ 0 │ 1 │ 2 │ 3 │...│58 │59 │ ← 60个槽位 │ │ └───┴───┴───┴───┴───┴───┴───┘ │ │ ↑ │ │ └── 当前指针位置 │ │ │ │ 每个槽位代表1秒,60个槽位组成一圈 │ │ 任务根据延迟时间计算应落入的槽位 │ └─────────────────────────────────────────────────────┘ ``` #### 核心概念 1. **槽位(Slot)**:时间轮被分成固定数量的槽位,每个槽位代表一个时间单位 2. **指针(Pointer)**:当前时间对应的槽位位置,每隔固定时间移动一次 3. **圈数(Round)**:任务需要等多圈才能被执行 #### 优缺点 **优点**: - 插入和删除操作时间复杂度为**O(1)** - 无需遍历所有任务,效率极高 - 适合海量延迟消息场景 - 内存占用小 **缺点**: - 消息存储在内存,重启会丢失 - 需要维护额外的槽位数据结构 - 时间精度取决于槽位大小 - 不支持消息持久化 #### 代码实现(分布式版) > 代码已更新为分布式版本,使用Redis ZSET存储,支持多实例部署。 ```java /** * 分布式时间轮延迟队列服务 * 面试题11:延迟消息 - 方案1:时间轮算法(分布式版) * * 分布式实现原理: * - 使用Redis ZSET存储时间轮数据,替代内存存储 * - 每个槽位对应一个ZSET key * - 使用Redis Lua脚本保证原子性操作 * - 支持多实例部署,自动负载均衡 */ @Service public class TimeWheelDelayQueueService { private static final int WHEEL_SIZE = 60; private static final String WHEEL_SLOT_KEY_PREFIX = "delay:wheel:slot:"; private static final String WHEEL_TASK_KEY_PREFIX = "delay:wheel:task:"; private static final String WHEEL_LOCK_KEY = "delay:wheel:lock:"; private final StringRedisTemplate redisTemplate; /** * 添加延迟任务(分布式版) */ public void addDelayTask(String topic, String key, Object message, long delayMs) { String taskId = key + "-" + System.currentTimeMillis(); long executeTime = System.currentTimeMillis() + delayMs; String messageJson = JsonUtils.toJson(message); int slot = calculateSlot(executeTime); String slotKey = WHEEL_SLOT_KEY_PREFIX + slot; redisTemplate.opsForZSet().add(slotKey, taskId, executeTime); } /** * 扫描并处理到期任务(使用分布式锁) */ @Scheduled(fixedDelay = 1000) public void scanAndProcessExpiredTasks() { // 使用分布式锁保证只有一个实例执行 String lockKey = WHEEL_LOCK_KEY + slot; Boolean acquired = redisTemplate.opsForValue() .setIfAbsent(lockKey, "scanner", 1, TimeUnit.SECONDS); if (Boolean.TRUE.equals(acquired)) { // 执行扫描逻辑 } } } } } /** * 启动时间轮(每秒tick一次) */ private void startTimeWheel() { scheduler.scheduleAtFixedRate(() -> tick(), 0, TICK_MS, TimeUnit.MILLISECONDS); } } ``` --- ### 方案2:延时主题 + 定时扫描 - 分布式版 #### 原理 将消息发送到专门的延时Topic,然后使用定时任务扫描到期的消息并转发到目标Topic。**分布式版使用Kafka消费者组实现多实例负载均衡**。 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 分布式延时主题架构(Kafka消费者组) │ │ │ │ 生产者 Kafka延时Topic 消费者组 │ │ │ │ │ │ │ │ send(delay=30min) │ │ │ │ │──────────────────────>│ partition0 │ │ │ │ │──────────────────────>│ 实例1 │ │ │ │ partition1 │ │ │ │ │──────────────────────>│ 实例2 │ │ │ │ partition2 │ │ │ │ │──────────────────────>│ 实例3 │ │ │ │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` #### 两种实现方式 **方式1:单延时Topic** - 所有延迟消息发送到同一个Topic - 消息内容包含执行时间 - 消费者根据执行时间判断是否处理 **方式2:分段时间Topic** - 按延迟时间分段创建多个Topic - 如:`delay-1min`、`delay-5min`、`delay-30min`、`delay-1hour` - 消费者只监听较短时间内到期的Topic #### 优缺点 **优点**: - 基于Kafka原生功能,实现简单 - 消息可靠存储在Kafka中 - 可利用Kafka的分区和副本机制 - 支持消息重试和死信队列 - 无需额外组件 **缺点**: - 有一定的延迟精度问题(取决于扫描频率) - 需要额外的Topic和消费者资源 - 消息需要在Kafka中长时间存储 - 不适合海量延迟消息场景 #### 代码实现 ```java /** * 延时主题+定时扫描延迟队列服务 * 面试题11:延迟消息 - 方案2:单独延时主题 + 定时扫描 */ @Service public class ScheduledTopicDelayQueueService { private static final String DELAY_TOPIC_PREFIX = "delay-"; private final ConcurrentHashMap waitingMessages; /** * 发送延迟消息到延时Topic */ public void sendToDelayTopic(String topic, String key, Object message, long delayMs) { String messageId = key + "-" + System.currentTimeMillis(); long executeTime = System.currentTimeMillis() + delayMs; String messageJson = JsonUtils.toJson(message); WaitingMessage waitingMessage = new WaitingMessage(messageId, topic, key, messageJson, executeTime); waitingMessages.put(messageId, waitingMessage); } /** * 定时扫描并处理到期的延迟消息(每秒执行) */ @Scheduled(fixedDelay = 1000) public void scanAndProcessExpiredMessages() { long now = System.currentTimeMillis(); for (Map.Entry entry : waitingMessages.entrySet()) { WaitingMessage message = entry.getValue(); if (message.getExecuteTime() <= now) { processExpiredMessage(message); waitingMessages.remove(entry.getKey()); } } } /** * 处理到期的消息,转发到目标Topic */ private void processExpiredMessage(WaitingMessage message) { kafkaTemplate.send(message.getTargetTopic(), message.getKey(), message.getMessageJson()); } } ``` --- ### 方案3:外部延迟队列(Redis ZSET)- 分布式版 #### 原理 使用Redis的ZSET(有序集合)存储延迟消息,通过score(分数)存储执行时间戳,定时扫描到期的消息。**分布式版使用Redis分布式锁保证消息只被处理一次**。 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 分布式Redis延迟队列架构(分布式锁) │ │ │ │ 实例A Redis 实例B │ │ ┌─────┐ ┌─────┐ ┌─────┐ │ │ │Scan │◄────────┬──────►│Lock │◄────────┬─────►│Scan │ │ │ └─────┘ │ └─────┘ │ └─────┘ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │ │ ┌─────────┐ │ │ │ │ │ └─────►│ ZSET │◄───────┘ │ │ │ │ └─────────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌─────────┐ │ │ │ └──────────────────►│ Kafka │◄─────────────────────┘ │ │ └─────────┘ │ └─────────────────────────────────────────────────────────────────┘ ``` │ │───────────────────────>│ │ │ │ │ │ │ │ │ │ │ ZRANGEBYSCORE │ │ │ │ │<──────────────────────│ │ │ │ │──────────────────────>│ │ │ │ │ (expired messages) │ │ │ │ │ │ │ │ │ │ send to Kafka │ │ │ │ │──────────────────────────────> │ │ │ │ │ │ │ │ │ ZREM │ │ │ │ │──────────────────────>│ │ │ │ │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` #### Redis ZSET 操作 1. **添加延迟消息**:`ZADD delay:queue {score: executeTime, member: messageJson}` 2. **查询到期消息**:`ZRANGEBYSCORE delay:queue 0 {currentTime}` 3. **移除消息**:`ZREM delay:queue {member}` #### 优缺点 **优点**: - 高性能:Redis操作是O(logN) - 支持分布式:Redis支持集群部署 - 消息持久化:消息存储在Redis中 - 支持精确延迟:毫秒级精度 - 支持消息取消和查询 **缺点**: - 需要额外依赖Redis - Redis重启可能导致消息丢失(需配置持久化) - 不适合超大批量延迟消息 #### 代码实现 ```java /** * 外部延迟队列服务 - 基于Redis ZSET实现 * 面试题11:延迟消息 - 方案3:外部延迟队列 */ @Service public class ExternalDelayQueueService { private static final String DELAY_QUEUE_KEY_PREFIX = "delay:queue:"; private final StringRedisTemplate redisTemplate; private final KafkaTemplate kafkaTemplate; /** * 添加延迟消息到Redis ZSET */ public void addDelayMessage(String topic, String key, Object message, long delayMs) { String messageId = key + "-" + System.currentTimeMillis(); long executeTime = System.currentTimeMillis() + delayMs; String messageJson = JsonUtils.toJson(message); String queueKey = DELAY_QUEUE_KEY_PREFIX + topic; String messageValue = messageId + "|" + key + "|" + messageJson; redisTemplate.opsForZSet().add(queueKey, messageValue, executeTime); } /** * 扫描并处理到期的延迟消息(每秒执行) */ @Scheduled(fixedDelay = 1000) public void scanAndProcessDelayMessages() { for (String topic : Arrays.asList("order-topic", "payment-topic")) { String queueKey = DELAY_QUEUE_KEY_PREFIX + topic; long now = System.currentTimeMillis(); Set> expiredMessages = redisTemplate.opsForZSet().rangeByScoreWithScores(queueKey, 0, now, 0, 100); for (ZSetOperations.TypedTuple tuple : expiredMessages) { String value = tuple.getValue(); String[] parts = value.split("\\|", 3); String messageId = parts[0]; String key = parts[1]; String messageJson = parts[2]; kafkaTemplate.send(topic, key, messageJson); redisTemplate.opsForZSet().remove(queueKey, value); } } } } ``` --- ### 面试加分回答 > "延迟消息的实现有多种方案,我分别介绍: > > **方案1:时间轮算法** > - 适用于高频、低延迟场景 > - 优点:O(1)插入删除,性能极高 > - 缺点:消息存储在内存,重启会丢失 > > **方案2:延时主题+定时扫描** > - 适用于简单场景 > - 优点:基于Kafka原生功能,实现简单 > - 缺点:精度依赖扫描频率,不适合高并发 > > **方案3:外部延迟队列(推荐)** > - 使用Redis ZSET实现 > - 优点:支持精确延迟、高可靠、可分布式 > - 缺点:需要额外依赖Redis > > 在实际项目中,我推荐**Redis延迟队列方案**,通过ZSET的score实现精确延迟,性能高且消息持久化不丢失。" --- ### 代码位置 | 方案 | 服务类 | 测试类 | |------|--------|--------| | 时间轮算法 | `TimeWheelDelayQueueService.java` | `TimeWheelDelayQueueServiceTest.java` | | 延时主题+扫描 | `ScheduledTopicDelayQueueService.java` | `ScheduledTopicDelayQueueServiceTest.java` | | 外部延迟队列(Redis) | `ExternalDelayQueueService.java` | `ExternalDelayQueueServiceTest.java` --- ### 代码实现 ```java /** * 延迟消息服务 - 基于时间轮算法 */ @Component public class DelayedMessageService { private final KafkaTemplate kafkaTemplate; private final DelayWheel delayWheel; /** * 发送延迟消息 * * @param message 消息 * @param delayMs 延迟毫秒数 */ public void sendDelayedMessage(OrderMessage message, long delayMs) { // 计算执行时间 long executeTime = System.currentTimeMillis() + delayMs; // 添加到时间轮 delayWheel.addTimerTask(message, executeTime, () -> { // 延迟到期后发送到Kafka kafkaTemplate.send("order-topic", message.getOrderId(), message); }); } /** * 订单延迟取消场景 */ public void scheduleOrderCancel(String orderId, long delayMinutes) { OrderMessage cancelMessage = new OrderMessage(); cancelMessage.setOrderId(orderId); cancelMessage.setStatus(OrderStatus.CANCELLED); // 30分钟后自动取消 sendDelayedMessage(cancelMessage, delayMinutes * 60 * 1000); } } /** * 简单时间轮实现 */ public class DelayWheel { private final long tickMs; private final int wheelSize; private final Map>> timerTaskMap = new ConcurrentHashMap<>(); private final Function taskExecutor; public DelayWheel(long tickMs, int wheelSize, Function taskExecutor) { this.tickMs = tickMs; this.wheelSize = wheelSize; this.taskExecutor = taskExecutor; } public void addTimerTask(T task, long executeTime, Runnable action) { long delay = executeTime - System.currentTimeMillis(); if (delay <= 0) { action.run(); return; } // 计算时间轮位置 long slot = (executeTime / tickMs) % wheelSize; // 提交到调度器 ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); executor.schedule(action, delay, TimeUnit.MILLISECONDS); } } /** * 延迟消息消费者 */ @Component public class DelayedMessageConsumer { @KafkaListener(topics = "order-delay-topic") public void consumeDelayMessage(ConsumerRecord record) { OrderMessage message = record.value(); // 延迟消息处理逻辑 if (message.getStatus() == OrderStatus.CANCELLED) { // 检查订单状态,如果未支付则取消 log.info("处理延迟取消订单: {}", message.getOrderId()); } } } ``` --- ## 面试题12:消息过滤 ### 面试问题 如何在Kafka中实现消息过滤(例如按地区、按用户类型过滤)? ### 解答要点 1. **生产者端过滤**:发送时过滤 2. **消费者端过滤**:消费时过滤 3. **Broker端过滤**:使用Kafka Streams ### 代码实现 ```java /** * 消息过滤器 */ @Component public class MessageFilter { /** * 按地区过滤 */ public boolean filterByRegion(OrderMessage message, Set allowedRegions) { if (allowedRegions == null || allowedRegions.isEmpty()) { return true; } return allowedRegions.contains(message.getRegion()); } /** * 按用户类型过滤 */ public boolean filterByUserType(OrderMessage message, Set allowedUserTypes) { if (allowedUserTypes == null || allowedUserTypes.isEmpty()) { return true; } return allowedUserTypes.contains(message.getUserType()); } /** * 复合过滤条件 */ public boolean filter(OrderMessage message, FilterCriteria criteria) { if (criteria.getMinAmount() != null && message.getAmount().compareTo(criteria.getMinAmount()) < 0) { return false; } if (criteria.getRegions() != null && !criteria.getRegions().isEmpty() && !criteria.getRegions().contains(message.getRegion())) { return false; } return true; } } /** * 带过滤的消费者 */ @Component public class FilteredConsumer { private final MessageFilter messageFilter; @KafkaListener(topics = "order-topic", groupId = "filtered-group") public void consume(ConsumerRecord record) { OrderMessage message = record.value(); // 过滤条件:从配置或数据库获取 FilterCriteria criteria = getFilterCriteria(); // 执行过滤 if (!messageFilter.filter(message, criteria)) { log.debug("消息被过滤: orderId={}", message.getOrderId()); return; } // 处理符合条件的消息 processMessage(message); } private FilterCriteria getFilterCriteria() { // 从配置或数据库获取过滤条件 FilterCriteria criteria = new FilterCriteria(); criteria.setRegions(Set.of("北京", "上海", "广州")); criteria.setMinAmount(new BigDecimal("100")); return criteria; } private void processMessage(OrderMessage message) { // 业务处理 } } /** * 使用Kafka Streams进行消息过滤 */ @Configuration public class KafkaStreamsFilterConfig { @Bean public KStream kStream(StreamsBuilder streamsBuilder) { KStream source = streamsBuilder.stream("order-topic"); // 过滤 KStream filtered = source.filter( (key, value) -> { try { OrderMessage message = new ObjectMapper() .readValue(value, OrderMessage.class); return "VIP".equals(message.getUserType()); } catch (Exception e) { return false; } }); // 发送到新主题 filtered.to("order-vip-topic"); return null; } } ``` --- ## 面试题13:多消费者监听 ### 面试问题 如何在同一个应用中监听多个Kafka主题? ### 解答要点 1. **@KafkaListener**:支持多主题 2. **ConsumerFactory**:共享或独立 3. **并发配置**:每个监听器独立配置 ### 代码实现 ```java /** * 多主题监听器 */ @Component public class MultiTopicConsumer { /** * 方式1:单个监听器监听多个主题 */ @KafkaListener(topics = {"order-topic", "payment-topic", "shipping-topic"}, groupId = "multi-topic-group") public void consumeMultiTopics(ConsumerRecord record) { String topic = record.topic(); switch (topic) { case "order-topic": handleOrderMessage(record.value()); break; case "payment-topic": handlePaymentMessage(record.value()); break; case "shipping-topic": handleShippingMessage(record.value()); break; } } /** * 方式2:每个主题单独监听器 */ @KafkaListener(topics = "order-topic", groupId = "order-group") public void consumeOrder(ConsumerRecord record) { handleOrderMessage(record.value()); } @KafkaListener(topics = "payment-topic", groupId = "payment-group") public void consumePayment(ConsumerRecord record) { handlePaymentMessage(record.value()); } /** * 方式3:使用@KafkaHandler区分不同消息类型 */ @KafkaListener(topics = "business-topic", groupId = "handler-group") public void consume(ConsumerRecord record, Acknowledgment ack) { // 根据消息类型分发 String json = record.value(); try { JsonNode node = new ObjectMapper().readTree(json); String type = node.get("type").asText(); switch (type) { case "ORDER": handleOrderMessage(json); break; case "PAYMENT": handlePaymentMessage(json); break; default: log.warn("未知消息类型: {}", type); } ack.acknowledge(); } catch (Exception e) { log.error("消息处理失败", e); } } private void handleOrderMessage(String json) { // 处理订单消息 } private void handlePaymentMessage(String json) { // 处理支付消息 } private void handleShippingMessage(String json) { // 处理物流消息 } } ``` --- ## 面试题14:拦截器机制 ### 面试问题 Kafka拦截器是什么?如何使用拦截器实现消息追踪和监控? ### 解答要点 1. **生产者拦截器**:发送前/发送后拦截 2. **消费者拦截器**:消费前/消费后拦截 3. **使用场景**:日志记录、监控、消息追踪 ### 代码实现 ```java /** * 生产者拦截器 - 消息追踪 */ public class TracingProducerInterceptor implements ProducerInterceptor { private static final Logger log = LoggerFactory.getLogger(TracingProducerInterceptor.class); private static final String TRACE_ID = "traceId"; @Override public ProducerRecord onSend(ProducerRecord record) { // 添加追踪ID String traceId = UUID.randomUUID().toString(); // 添加消息头 record.headers().add(TRACE_ID, traceId.getBytes()); log.info("消息发送拦截: traceId={}, topic={}, key={}", traceId, record.topic(), record.key()); return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { String traceId = new String(metadata.headers() .lastHeader(TRACE_ID).value()); if (exception != null) { log.error("消息发送失败: traceId={}, error={}", traceId, exception.getMessage()); } else { log.info("消息发送成功: traceId={}, partition={}, offset={}", traceId, metadata.partition(), metadata.offset()); } } @Override public void close() { // 清理资源 } @Override public void configure(Map configs) { // 配置 } } /** * 消费者拦截器 - 消息追踪 */ public class TracingConsumerInterceptor implements ConsumerInterceptor { private static final Logger log = LoggerFactory.getLogger(TracingConsumerInterceptor.class); private static final String TRACE_ID = "traceId"; @Override public ConsumerRecord onConsume(ConsumerRecord record) { // 获取追踪ID Header traceHeader = record.headers().lastHeader(TRACE_ID); String traceId = traceHeader != null ? new String(traceHeader.value()) : "unknown"; long startTime = System.currentTimeMillis(); // 添加处理时间指标 record.headers().add("processStartTime", String.valueOf(startTime).getBytes()); log.info("消息消费拦截: traceId={}, topic={}, partition={}, offset={}", traceId, record.topic(), record.partition(), record.offset()); return record; } @Override public void onCommit(Map offsets) { for (Map.Entry entry : offsets.entrySet()) { log.info("Offset提交: partition={}, offset={}", entry.getKey().partition(), entry.getValue().offset()); } } @Override public void close() { // 清理资源 } @Override public void configure(Map configs) { // 配置 } } /** * 配置拦截器 */ @Configuration public class InterceptorConfig { @Bean public ProducerInterceptor tracingProducerInterceptor() { return new TracingProducerInterceptor(); } @Bean public ConsumerInterceptor tracingConsumerInterceptor() { return new TracingConsumerInterceptor(); } } ``` --- ## 面试题15:分区副本与ISR机制 ### 面试问题 什么是ISR(In-Sync Replicas)?Leader选举是如何进行的? ### 解答要点 1. **ISR**:与Leader保持同步的副本集合 2. **Leader选举**:从ISR中选举新的Leader 3. **副本同步**:Follower从Leader拉取数据 ### 代码实现 ```java /** * 分区和副本配置 */ @Configuration public class TopicReplicationConfig { @Autowired private KafkaAdmin kafkaAdmin; /** * 创建高可用主题 */ @PostConstruct public void createHighAvailabilityTopic() { // 订单主题配置 NewTopic orderTopic = TopicBuilder.name("order-topic") .partitions(6) // 6个分区 .replicas(3) // 3个副本 .minInsyncReplicas(2) // 最小ISR数量 .compact() // 日志压缩 .build(); // 创建主题 kafkaAdmin.createOrModifyTopics(orderTopic); log.info("高可用主题创建完成: partitions=6, replicas=3, minISR=2"); } } /** * 副本同步状态监控 */ @Component public class ReplicationMonitor { @Autowired private KafkaAdmin kafkaAdmin; /** * 监控ISR变化 */ public void monitorISR(String topic) { try (AdminClient adminClient = KafkaAdminClient.create(getProperties())) { // 获取主题描述 DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(topic)); Map topicDescriptionMap = result.all().get(); TopicDescription topicDescription = topicDescriptionMap.get(topic); for (TopicPartitionInfo partitionInfo : topicDescription.partitions()) { // 获取Leader Node leader = partitionInfo.leader(); // 获取ISR列表 List isr = partitionInfo.isr(); // 获取所有副本 List replicas = partitionInfo.replicas(); log.info("分区{}: Leader={}, ISR={}, Replicas={}, 同步延迟={}", partitionInfo.partition(), leader != null ? leader.id() : "none", isr.stream().map(Node::id).collect(Collectors.toList()), replicas.stream().map(Node::id).collect(Collectors.toList()), calculateLag(partitionInfo)); } } catch (Exception e) { log.error("监控ISR失败", e); } } private long calculateLag(TopicPartitionInfo partitionInfo) { // 计算副本与Leader的偏移量差距 return 0; } private Map getProperties() { Map props = new HashMap<>(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); return props; } } /** * 手动触发Preferred Leader选举 */ @Component public class LeaderElectionService { /** * 执行Preferred Leader选举 * 让每个分区的第一个副本成为Leader */ public void electPreferredLeaders(String topic) { try (AdminClient adminClient = KafkaAdminClient.create(createProperties())) { // 创建选举请求 Map> election = new HashMap<>(); // 获取分区信息后设置选举 // election.put(new TopicPartition(topic, 0), Optional.empty()); // 执行选举 adminClient.alterPartitionReassignments(election).all().get(); log.info("Preferred Leader选举完成: topic={}", topic); } catch (Exception e) { log.error("选举失败", e); } } } ``` --- ## 面试题16:Controller选举 ### 面试问题 Kafka Controller是什么?如何选举?Controller的作用是什么? ### 解答要点 1. **Controller**:Kafka集群的协调者 2. **选举机制**:ZooKeeper/KRaft模式 3. **职责**:Leader选举、分区管理、Broker上下线处理 ### 代码实现 ```java /** * Controller相关配置 */ @Configuration public class ControllerConfig { /** * KRaft模式配置 - 不使用ZooKeeper */ @Bean public KafkaServer kafkaServer() { // 配置Kafka服务器 return null; } } /** * Controller状态监控 */ @Component public class ControllerMonitor { @Autowired private KafkaAdmin kafkaAdmin; /** * 获取当前Controller信息 */ public void getControllerInfo() { try (AdminClient adminClient = KafkaAdminClient.create(createProperties())) { // 获取集群元数据 DescribeClusterResult clusterResult = adminClient.describeCluster(); String controllerId = clusterResult.controller().get().idString(); log.info("当前Controller: {}", controllerId); } catch (Exception e) { log.error("获取Controller信息失败", e); } } /** * 监听Controller变更 */ @PostConstruct public void watchControllerChanges() { // 监听ZooKeeper节点变化 // /kafka/controller } } ``` --- ## 面试题17:日志压缩(Log Compaction) ### 面试问题 Kafka日志压缩是什么?适用于什么场景?如何配置? ### 解答要点 1. **日志压缩**:只保留每个Key的最新值 2. **使用场景**:数据库变更日志、配置变更 3. **配置**:cleanup.policy=compact ### 代码实现 ```java /** * 日志压缩配置 */ @Configuration public class LogCompactionConfig { @Autowired private KafkaAdmin kafkaAdmin; /** * 创建日志压缩主题 */ @PostConstruct public void createCompactedTopic() { // 用户配置主题 - 使用日志压缩 NewTopic userConfigTopic = TopicBuilder.name("user-config-topic") .partitions(3) .replicas(3) .compact() // 启用日志压缩 .build(); // 数据库变更日志主题 NewTopic changeLogTopic = TopicBuilder.name("change-log-topic") .partitions(6) .replicas(3) .compact() .build(); kafkaAdmin.createOrModifyTopics(userConfigTopic, changeLogTopic); log.info("日志压缩主题创建完成"); } } /** * 日志压缩消费 - 获取最新配置 */ @Component public class CompactionConsumer { /** * 消费日志压缩主题 - seek到开始位置获取最新值 */ @KafkaListener(topics = "user-config-topic", groupId = "config-group") public void consumeLatestConfig(ConsumerRecord record, Consumer consumer) { String key = record.key(); String value = record.value(); log.info("配置变更: key={}, value={}, offset={}", key, value, record.offset()); // 注意:需要记录每个key的最新offset // 下次从最新offset开始消费 seekToLatest(consumer, record.topic(), key); } private void seekToLatest(Consumer consumer, String topic, String key) { // 记录每个key的最新offset // 实际实现需要维护key -> offset的映射 } } /** * 手动触发日志压缩 */ @Component public class LogCompactionManager { /** * 手动执行日志压缩 */ public void triggerLogCompaction(String topic) { // 使用kafka-log-dirs脚本或AdminClient // bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 // --topic-list topicName --action execute } } ``` --- ## 面试题18:消费者位移管理 ### 面试问题 如何管理消费者offset?earliest、latest策略是什么?如何实现自定义offset管理? ### 解答要点 1. **Offset策略**:earliest、latest、none 2. **自动提交**:enable.auto.commit 3. **手动提交**:acknowledge.acknowledge() 4. **自定义存储**:将offset存储到数据库 ### 代码实现 ```java /** * Offset管理策略 */ @Component public class OffsetManagementStrategy { /** * 策略1:从最早位置开始消费 * 适用于需要处理历史消息的场景 */ public void consumeFromEarliest(Consumer consumer, String topic) { consumer.subscribe(Collections.singletonList(topic)); consumer.seekToBeginning(consumer.assignment()); } /** * 策略2:从最新位置开始消费 * 适用于只关心新消息的场景 */ public void consumeFromLatest(Consumer consumer, String topic) { consumer.subscribe(Collections.singletonList(topic)); consumer.seekToEnd(consumer.assignment()); } /** * 策略3:从指定时间开始消费 */ public void consumeFromTimestamp(Consumer consumer, String topic, long timestamp) { consumer.subscribe(Collections.singletonList(topic)); // 查找每个分区的offset Map timestampsToSearch = new HashMap<>(); for (TopicPartition partition : consumer.assignment()) { timestampsToSearch.put(partition, timestamp); } // 获取时间对应的offset Map offsets = consumer.offsetsForTimes(timestampsToSearch); // 跳转到对应offset for (Map.Entry entry : offsets.entrySet()) { if (entry.getValue() != null) { consumer.seek(entry.getKey(), entry.getValue().offset()); } } } /** * 策略4:从指定offset开始消费 */ public void consumeFromOffset(Consumer consumer, String topic, Map partitionOffsetMap) { consumer.subscribe(Collections.singletonList(topic)); // 等待分区分配完成 while (consumer.assignment().isEmpty()) { consumer.poll(Duration.ofMillis(100)); } // 跳转到指定offset for (Map.Entry entry : partitionOffsetMap.entrySet()) { TopicPartition partition = new TopicPartition(topic, entry.getKey()); consumer.seek(partition, entry.getValue()); } } } /** * 自定义Offset存储 - 存储到数据库 */ @Component public class CustomOffsetStorage { @Autowired private JdbcTemplate jdbcTemplate; /** * 保存消费者offset到数据库 */ public void saveOffset(String groupId, String topic, int partition, long offset) { String sql = "INSERT INTO consumer_offsets (group_id, topic, partition, offset, update_time) " + "VALUES (?, ?, ?, ?, NOW()) " + "ON CONFLICT (group_id, topic, partition) DO UPDATE SET offset = ?"; jdbcTemplate.update(sql, groupId, topic, partition, offset, offset); } /** * 从数据库恢复offset */ public void restoreOffset(Consumer consumer, String groupId, String topic) { String sql = "SELECT partition, offset FROM consumer_offsets " + "WHERE group_id = ? AND topic = ?"; List> offsets = jdbcTemplate.queryForList(sql, groupId, topic); for (Map row : offsets) { int partition = (int) row.get("partition"); long offset = (long) row.get("offset"); TopicPartition topicPartition = new TopicPartition(topic, partition); consumer.seek(topicPartition, offset); log.info("恢复offset: groupId={}, partition={}, offset={}", groupId, partition, offset); } } } ``` --- ## 面试题19:Spring Kafka集成 ### 面试问题 Spring Kafka如何使用?如何配置生产者和消费者? ### 解答要点 1. **依赖配置**:spring-kafka依赖 2. **自动配置**:application.yml配置 3. **注解驱动**:@KafkaListener ### 代码实现 ```java /** * Spring Kafka集成配置 */ @Configuration @EnableKafka public class SpringKafkaConfig { /** * KafkaTemplate配置 */ @Bean public KafkaTemplate kafkaTemplate( ProducerFactory producerFactory) { KafkaTemplate template = new KafkaTemplate<>(producerFactory); // 设置默认主题 template.setDefaultTopic("default-topic"); // 设置事务ID前缀 template.setTransactionIdPrefix("tx-"); return template; } /** * 监听器容器工厂配置 */ @Bean public KafkaListenerContainerFactory> kafkaListenerContainerFactory( ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); // 消费者并发数 factory.setConcurrency(3); // 批量消费 factory.setBatchListener(true); // 手动提交 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // 消息转换器 factory.setMessageConverter(new JsonMessageConverter()); return factory; } } /** * Spring Kafka监听器 */ @Component public class SpringKafkaListener { /** * 简单监听 */ @KafkaListener(topics = "order-topic", groupId = "order-group") public void listen(OrderMessage message) { log.info("收到消息: {}", message); } /** * 手动确认 */ @KafkaListener(topics = "order-topic", groupId = "manual-ack-group") public void listenWithManualAck( @Payload OrderMessage message, @Header(KafkaHeaders.OFFSET) long offset, Acknowledgment ack) { log.info("收到消息: offset={}, message={}", offset, message); try { // 业务处理 process(message); // 手动提交 ack.acknowledge(); } catch (Exception e) { log.error("处理失败", e); } } /** * 批量监听 */ @KafkaListener(topics = "order-topic", groupId = "batch-group", containerFactory = "kafkaListenerContainerFactory") public void listenBatch(List> records) { log.info("批量收到消息: count={}", records.size()); for (ConsumerRecord record : records) { process(record.value()); } } /** * 监听多个主题 */ @KafkaListener(topics = {"order-topic", "payment-topic"}, groupId = "multi-topic-group") public void listenMultiTopics(ConsumerRecord record) { log.info("收到消息: topic={}, value={}", record.topic(), record.value()); } private void process(OrderMessage message) { // 业务处理 } } /** * application.yml配置示例 */ ``` ```yaml spring: kafka: bootstrap-servers: localhost:9092 producer: # 生产者配置 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer acks: all retries: 3 batch-size: 16384 buffer-memory: 33554432 compression-type: lz4 consumer: # 消费者配置 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer group-id: order-group auto-offset-reset: earliest enable-auto-commit: false max-poll-records: 500 listener: # 监听器配置 ack-mode: manual concurrency: 3 ``` --- ## 面试题20:性能优化 ### 面试问题 Kafka性能优化的关键点有哪些? ### 解答要点 1. **生产者优化**:批量发送、压缩、连接池 2. **Broker优化**:分区数、副本数、JVM 3. **消费者优化**:并发、批量消费、缓存 ### 代码实现 ```java /** * 生产者性能优化 */ @Configuration public class ProducerOptimizationConfig { @Bean public ProducerFactory optimizedProducerFactory() { Map props = new HashMap<>(); // 1. 连接池配置 props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); // 2. 批量发送优化 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 32KB props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 10ms // 3. 缓冲区优化 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB // 4. 压缩优化 props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 5. 超时优化 props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 6. 重试优化 props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100); return new DefaultKafkaProducerFactory<>(props); } } /** * 消费者性能优化 */ @Configuration public class ConsumerOptimizationConfig { @Bean public ConsumerFactory optimizedConsumerFactory() { Map props = new HashMap<>(); // 1. 批量拉取优化 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 1024); // 1MB props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); // 2. 并发配置 - 与分区数匹配 // 建议:concurrency = 分区数 / 消费者数 // 3. 偏移量提交优化 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // 4. 心跳优化 props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 5. 拉取超时 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); return new DefaultKafkaConsumerFactory<>(props); } } /** * 性能监控指标 */ @Component public class PerformanceMonitor { private final MeterRegistry meterRegistry; @PostConstruct public void init() { // 监控消息发送速率 Gauge.builder("kafka.send.rate", this, m -> calculateSendRate()).register(meterRegistry); // 监控消费速率 Gauge.builder("kafka.consume.rate", this, m -> calculateConsumeRate()).register(meterRegistry); // 监控消费延迟 Gauge.builder("kafka.consumer.lag", this, m -> calculateLag()).register(meterRegistry); } private double calculateSendRate() { // 计算消息发送速率 return 0; } private double calculateConsumeRate() { // 计算消息消费速率 return 0; } private long calculateLag() { // 计算消息积压量 return 0; } } ``` --- # 第二部分:商城高并发Kafka使用场景与20个新面试题 ## 一、商城高并发Kafka使用场景梳理 在电商系统中,Kafka作为核心的消息中间件,承担着异步通信、系统解耦、数据同步等重要职责。以下是商城高并发场景下Kafka的典型使用场景: ### 1. 订单消息处理 - **场景**:用户下单后,需要通知库存、支付、物流等多个系统 - **实现**:`OrderMessageProducer` + `OrderMessageConsumer` - **关键点**:消息顺序性、幂等性 ### 2. 库存扣减 - **场景**:下单时扣减商品库存,防止超卖 - **实现**:`InventoryMessage` + `InventoryService` + `InventoryMessageConsumer` - **关键点**:乐观锁、分布式事务 ### 3. 支付回调 - **场景**:第三方支付完成后,接收异步回调通知 - **实现**:`PaymentMessage` + `PaymentService` + `PaymentMessageConsumer` - **关键点**:幂等性、消息去重 ### 4. 物流通知 - **场景**:订单发货后,通知用户物流状态 - **实现**:`ShippingMessage` + `ShippingService` + `ShippingMessageConsumer` - **关键点**:消息驱动、状态更新 ### 5. 优惠券发放 - **场景**:新人注册、活动发放优惠券 - **实现**:`CouponMessage` + `CouponService` - **关键点**:消息广播、海量用户 ### 6. 会员积分 - **场景**:订单完成后发放积分、积分过期 - **实现**:`PointsMessage` + `PointsService` - **关键点**:异步处理、积分计算 ### 7. 消息通知 - **场景**:短信、邮件、推送通知 - **实现**:`NotificationMessage` + `NotificationService` - **关键点**:多渠道、重试机制 ### 8. 异步下单 - **场景**:下单操作异步化,提升用户体验 - **实现**:订单消息异步写入队列 - **关键点**:事务消息、本地消息表 ### 9. 订单状态变更 - **场景**:订单状态流转通知各相关系统 - **实现**:状态变更消息 - **关键点**:状态机、顺序性 ### 10. 商品上架同步 - **场景**:商品信息同步到搜索、推荐系统 - **实现**:`ProductMessage` + `ProductService` - **关键点**:数据同步、实时性 ### 11. 促销活动 - **场景**:活动开始/结束通知、优惠计算 - **实现**:活动消息 - **关键点**:消息广播、时效性 ### 12. 日志收集 - **场景**:收集系统运行日志 - **实现**:日志消息 -> ELK - **关键点**:海量数据、压缩传输 ### 13. 用户行为分析 - **场景**:埋点上报、用户行为分析 - **实现**:`UserBehaviorLog` + `UserBehaviorService` - **关键点**:实时分析、转化率 ### 14. 搜索推荐 - **场景**:商品变更后实时更新搜索索引 - **实现**:商品消息 -> Elasticsearch - **关键点**:实时同步、近实时 ### 15. 实时统计 - **场景**:实时计算订单量、销售额等指标 - **实现**:`RealTimeStatisticsService` - **关键点**:实时计算、窗口统计 --- # 20个新面试题(商城高并发场景) 以下20个面试题是基于商城高并发场景的全新设计,与之前的面试题不重复。 ## 面试题1:Kafka事务实现分布式事务 ### 面试问题 如何在Kafka中实现分布式事务?订单创建、库存扣减、支付处理如何保证原子性? ### 解答要点 1. **事务API**:使用`executeInTransaction`方法 2. **事务ID**:配置`transactional.id` 3. **幂等性**:开启`enable.idempotence` 4. **隔离级别**:`read_committed` ### 代码实现 ```java /** * 分布式事务服务 * 使用Kafka事务保证多操作原子性 */ @Service public class TransactionService { @Autowired private KafkaTemplate kafkaTemplate; /** * 执行分布式事务 * 订单创建 + 库存扣减 + 支付处理在同一事务中 */ public boolean executeOrderTransaction(OrderMessage orderMessage, InventoryMessage inventoryMessage, PaymentMessage paymentMessage) { log.info("开始执行分布式事务: orderId={}", orderMessage.getOrderId()); try { // 开启Kafka事务 kafkaTemplate.executeInTransaction(operations -> { // 1. 发送订单消息 operations.send("order-topic", orderMessage.getOrderId(), orderMessage).get(); // 2. 发送库存扣减消息 operations.send("inventory-topic", inventoryMessage.getProductId(), inventoryMessage).get(); // 3. 发送支付消息 operations.send("payment-topic", paymentMessage.getOrderId(), paymentMessage).get(); return true; }); // 本地业务处理 processLocalBusiness(orderMessage, inventoryMessage, paymentMessage); log.info("分布式事务执行成功: orderId={}", orderMessage.getOrderId()); return true; } catch (Exception e) { log.error("分布式事务执行失败: orderId={}, error={}", orderMessage.getOrderId(), e.getMessage(), e); return false; } } } /** * 事务配置 */ @Configuration public class KafkaTransactionConfig { @Bean public ProducerFactory transactionProducerFactory() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 事务ID,同一应用内必须唯一 props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-transaction-1"); // 幂等性必须开启 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); return new DefaultKafkaProducerFactory<>(props); } } ``` --- ## 面试题2:库存扣减(乐观锁防超卖) ### 面试问题 如何保证库存不超卖?使用Kafka如何实现库存扣减? ### 解答要点 1. **乐观锁**:使用版本号防止并发问题 2. **原子性扣减**:库存扣减必须是原子的 3. **消息补偿**:扣减失败发送补偿消息 ### 代码实现 ```java /** * 库存服务 - 使用乐观锁保证库存不超卖 */ @Service public class InventoryService { private final Map inventoryStore = new ConcurrentHashMap<>(); static class InventoryData { private Integer stock; private Long version; public InventoryData(Integer stock, Long version) { this.stock = stock; this.version = version; } } /** * 扣减库存 - 乐观锁实现 */ public boolean deductInventory(String productId, Integer quantity, String orderId, Long version) { InventoryData current = inventoryStore.get(productId); if (current == null) { log.error("商品不存在: productId={}", productId); return false; } // 乐观锁检查版本 if (version != null && !version.equals(current.getVersion())) { log.warn("版本不匹配,库存扣减失败: productId={}", productId); return false; } // 检查库存是否充足 if (current.getStock() < quantity) { log.error("库存不足: productId={}, currentStock={}, required={}", productId, current.getStock(), quantity); return false; } // 扣减库存 int newStock = current.getStock() - quantity; Long newVersion = current.getVersion() + 1; inventoryStore.put(productId, new InventoryData(newStock, newVersion)); log.info("库存扣减成功: productId={}, newStock={}", productId, newStock); return true; } } ``` --- ## 面试题3:支付回调(幂等性处理) ### 面试问题 支付回调如何处理重复通知?如何保证幂等性? ### 解答要点 1. **唯一标识**:使用支付单号作为唯一标识 2. **状态检查**:已支付订单不再重复处理 3. **版本号**:使用消息版本号防止乱序 ### 代码实现 ```java /** * 支付服务 - 幂等性处理 */ @Service public class PaymentService { private final Map paymentStore = new ConcurrentHashMap<>(); /** * 处理支付回调 - 幂等性实现 */ public boolean processPaymentCallback(PaymentMessage paymentMessage) { String orderId = paymentMessage.getOrderId(); Long version = paymentMessage.getVersion(); PaymentData existingPayment = paymentStore.get(orderId); if (existingPayment != null) { // 版本号检查,防止重复处理 if (version != null && version <= existingPayment.getVersion()) { log.warn("支付消息版本过低,忽略: orderId={}", orderId); return true; } // 状态检查,防止重复更新 if (PaymentMessage.STATUS_SUCCESS.equals(existingPayment.getStatus())) { log.warn("订单已支付,忽略重复回调: orderId={}", orderId); return true; } } // 更新支付状态 PaymentData newPayment = new PaymentData( orderId, paymentMessage.getAmount(), paymentMessage.getPaymentStatus(), paymentMessage.getTransactionId(), version); paymentStore.put(orderId, newPayment); return true; } } ``` --- ## 面试题4:物流通知(消息驱动) ### 面试问题 如何使用Kafka实现物流状态变更通知? ### 解答要点 1. **消息驱动**:物流状态变更通过消息驱动 2. **状态追踪**:记录物流全链路状态 3. **用户通知**:状态变更通知用户 ### 代码实现 ```java /** * 物流服务 - 消息驱动实现 */ @Service public class ShippingService { /** * 处理物流状态更新 */ public boolean processShippingUpdate(ShippingMessage shippingMessage) { String orderId = shippingMessage.getOrderId(); Long version = shippingMessage.getVersion(); ShippingData existing = shippingStore.get(orderId); // 验证版本号,防止乱序 if (version != null && version <= existing.getVersion()) { log.warn("物流消息版本过低: orderId={}", orderId); return true; } // 更新物流状态 existing.setStatus(shippingMessage.getShippingStatus()); existing.setCurrentLocation(shippingMessage.getCurrentLocation()); existing.setVersion(version); shippingStore.put(orderId, existing); // 发送用户通知 sendShippingNotification(orderId, shippingMessage); return true; } } ``` --- ## 面试题5:优惠券广播发放 ### 面试问题 如何实现优惠券的批量发放?优惠券发放需要注意什么? ### 解答要点 1. **消息广播**:活动优惠券需要广播给所有用户 2. **幂等发放**:防止重复发放 3. **有效期管理**:优惠券有有效期 ### 代码实现 ```java /** * 优惠券服务 - 批量发放 */ @Service public class CouponService { /** * 批量发放优惠券 - 消息广播场景 */ public int batchIssueCoupon(String couponTemplateId, String[] userIds) { log.info("批量发放优惠券: templateId={}, userCount={}", couponTemplateId, userIds.length); int successCount = 0; for (String userId : userIds) { CouponMessage message = new CouponMessage(); message.setMessageId("COUPON-" + System.currentTimeMillis() + "-" + userId); message.setUserId(userId); message.setCouponId(couponTemplateId + "-" + userId); message.setCouponType(CouponMessage.TYPE_CASH); message.setDiscountAmount(new BigDecimal("10")); message.setValidFrom(LocalDateTime.now()); message.setValidTo(LocalDateTime.now().plusDays(30)); if (issueCoupon(message)) { successCount++; } } return successCount; } } ``` --- ## 面试题6:会员积分计算 ### 面试问题 如何使用Kafka实现会员积分的异步计算? ### 解答要点 1. **异步处理**:积分计算不阻塞主流程 2. **积分规则**:不同行为获取不同积分 3. **过期机制**:积分定期过期 ### 代码实现 ```java /** * 积分服务 - 异步积分计算 */ @Service public class PointsService { /** * 处理积分消息 */ public boolean processPointsMessage(PointsMessage pointsMessage) { String userId = pointsMessage.getUserId(); String pointsType = pointsMessage.getPointsType(); switch (pointsType) { case PointsMessage.TYPE_EARN: return earnPoints(userId, pointsMessage.getPoints(), pointsMessage.getOrderId()); case PointsMessage.TYPE_DEDUCT: return deductPoints(userId, pointsMessage.getPoints(), pointsMessage.getOrderId()); case PointsMessage.TYPE_EXPIRE: return expirePoints(userId, pointsMessage.getPoints()); default: return false; } } private boolean earnPoints(String userId, Integer points, String orderId) { UserPoints userPoints = userPointsStore.get(userId); if (userPoints == null) { userPoints = new UserPoints(userId, points, points, 1L); } else { userPoints.setTotalPoints(userPoints.getTotalPoints() + points); userPoints.setAvailablePoints(userPoints.getAvailablePoints() + points); } userPointsStore.put(userId, userPoints); return true; } } ``` --- ## 面试题7:消息通知服务 ### 面试问题 如何设计一个通用的消息通知服务?支持短信、邮件、推送? ### 解答要点 1. **多渠道**:支持短信、邮件、APP推送 2. **模板化**:使用消息模板 3. **重试机制**:发送失败支持重试 ### 代码实现 ```java /** * 通知服务 - 多渠道通知 */ @Service public class NotificationService { /** * 发送通知 */ public boolean sendNotification(NotificationMessage notificationMessage) { String channel = notificationMessage.getChannel(); // 根据渠道发送 switch (channel) { case NotificationMessage.CHANNEL_SMS: return sendSms(notificationMessage); case NotificationMessage.CHANNEL_EMAIL: return sendEmail(notificationMessage); case NotificationMessage.CHANNEL_PUSH: return sendPush(notificationMessage); default: return false; } } /** * 失败重试机制 */ public boolean retryFailedNotification(NotificationMessage message) { NotificationRecord record = notificationStore.get(message.getMessageId()); if (record.getRetryCount() >= MAX_RETRY_COUNT) { log.error("通知重试次数超限: messageId={}", message.getMessageId()); return false; } record.setRetryCount(record.getRetryCount() + 1); return sendNotification(message); } } ``` --- ## 面试题8:异步下单流程 ### 面试问题 如何实现异步下单?下单流程如何拆分? ### 解答要点 1. **异步化**:下单操作不等待所有后续处理 2. **消息驱动**:各环节通过消息驱动 3. **最终一致性**:保证最终一致性 ### 代码实现 ```java /** * 异步下单服务 */ @Service public class AsyncOrderService { @Autowired private KafkaTemplate kafkaTemplate; /** * 异步下单 */ public String submitOrderAsync(OrderCreateRequest request) { // 1. 创建订单(待支付状态) String orderId = createPendingOrder(request); // 2. 发送订单创建消息(异步) OrderMessage orderMessage = buildOrderMessage(orderId, request); kafkaTemplate.send("order-topic", orderId, orderMessage); // 3. 发送库存预扣消息 kafkaTemplate.send("inventory-topic", request.getProductId(), buildInventoryMessage(orderId, request)); return orderId; } } ``` --- ## 面试题9:订单状态流转 ### 面试问题 订单状态变更如何保证有序性?如何实现状态机? ### 解答要点 1. **状态机**:定义状态流转规则 2. **顺序消费**:同一订单消息有序 3. **幂等处理**:防止重复处理 ### 代码实现 ```java /** * 订单状态服务 - 状态机实现 */ @Service public class OrderStatusService { /** * 状态转换校验 */ public boolean canTransition(OrderStatus from, OrderStatus to) { Map> transitions = new HashMap<>(); transitions.put(OrderStatus.PENDING, Set.of(OrderStatus.PAID, OrderStatus.CANCELLED)); transitions.put(OrderStatus.PAID, Set.of(OrderStatus.SHIPPED, OrderStatus.CANCELLED)); transitions.put(OrderStatus.SHIPPED, Set.of(OrderStatus.DELIVERED)); transitions.put(OrderStatus.DELIVERED, Set.of(OrderStatus.COMPLETED)); Set allowed = transitions.get(from); return allowed != null && allowed.contains(to); } /** * 处理订单状态变更 */ public boolean processStatusChange(String orderId, OrderStatus newStatus, Long version) { OrderStatus currentStatus = getCurrentStatus(orderId); // 校验状态转换 if (!canTransition(currentStatus, newStatus)) { log.error("非法状态转换: orderId={}, from={}, to={}", orderId, currentStatus, newStatus); return false; } // 更新状态 return updateOrderStatus(orderId, newStatus, version); } } ``` --- ## 面试题10:商品数据同步 ### 面试问题 商品信息变更后如何同步到搜索、推荐系统? ### 解答要点 1. **消息驱动**:商品变更发送消息 2. **多消费者**:搜索、推荐分别消费 3. **实时性**:保证数据实时同步 ### 代码实现 ```java /** * 商品服务 - 数据同步 */ @Service public class ProductService { /** * 处理商品消息 - 同步到各系统 */ public boolean processProductMessage(ProductMessage productMessage) { String productId = productMessage.getProductId(); String status = productMessage.getStatus(); switch (status) { case ProductMessage.STATUS_ONLINE: return createOrUpdateProduct(productMessage); case ProductMessage.STATUS_OFFLINE: return offlineProduct(productId); case ProductMessage.STATUS_DELETED: return deleteProduct(productId); default: return false; } } /** * 创建/更新商品 */ private boolean createOrUpdateProduct(ProductMessage productMessage) { ProductData product = new ProductData( productMessage.getProductId(), productMessage.getProductName(), productMessage.getPrice(), productMessage.getStock(), productMessage.getStatus() ); productStore.put(productMessage.getProductId(), product); // 同步到搜索系统 syncToSearchEngine(productMessage); // 同步到推荐系统 syncToRecommendationEngine(productMessage); return true; } } ``` --- ## 面试题11:促销活动通知 ### 面试问题 大促期间如何处理海量促销活动消息? ### 解答要点 1. **消息广播**:活动开始/结束广播 2. **限流降级**:系统压力大时降级 3. **缓存预热**:提前缓存活动信息 ### 代码实现 ```java /** * 活动服务 - 促销通知 */ @Service public class PromotionService { /** * 活动开始通知 */ public void notifyPromotionStart(PromotionMessage message) { log.info("活动开始通知: promotionId={}", message.getPromotionId()); // 1. 更新缓存 updatePromotionCache(message); // 2. 发送广播消息 kafkaTemplate.send("promotion-broadcast-topic", message.getPromotionId(), message); // 3. 通知各业务系统 notifyRelatedSystems(message); } /** * 限流降级 */ public boolean checkRateLimit(String userId) { String key = "rate_limit:" + userId; Long count = redisTemplate.opsForValue().increment(key); if (count == 1) { redisTemplate.expire(key, 60, TimeUnit.SECONDS); } return count <= 100; // 每分钟100次限制 } } ``` --- ## 面试题12:日志收集(ELK集成) ### 面试问题 如何使用Kafka收集海量日志?与ELK如何集成? ### 解答要点 1. **消息队列**:Kafka作为日志收集缓冲 2. **Logstash**:从Kafka消费日志 3. **ES存储**:Elasticsearch存储和检索 4. **Grafana**:可视化展示 ### 代码实现 ```java /** * 日志服务 - 消息收集 */ @Service public class LogCollectionService { @Autowired private KafkaTemplate kafkaTemplate; /** * 收集应用日志 */ public void collectLog(LogMessage logMessage) { // 发送到Kafka kafkaTemplate.send("application-logs", logMessage.getTraceId(), logMessage); } /** * Logback配置示例 */ // // application-logs // // // localhost:9092 // } ``` --- ## 面试题13:用户行为分析 ### 面试问题 如何收集和分析用户行为数据?转化率如何计算? ### 解答要点 1. **埋点上报**:前端埋点 2. **实时分析**:Kafka + Flink实时处理 3. **转化漏斗**:转化率分析 ### 代码实现 ```java /** * 用户行为分析服务 */ @Service public class UserBehaviorService { /** * 处理用户行为日志 */ public boolean processUserBehavior(UserBehaviorLog behaviorLog) { String userId = behaviorLog.getUserId(); String eventType = behaviorLog.getEventType(); // 存储行为记录 UserBehaviorRecord record = new UserBehaviorRecord( behaviorLog.getMessageId(), userId, behaviorLog.getSessionId(), eventType, behaviorLog.getPageUrl(), behaviorLog.getEventTime() ); behaviorStore.put(behaviorLog.getMessageId(), record); // 更新事件计数器 eventCounter.computeIfAbsent(eventType, k -> new AtomicLong(0)) .incrementAndGet(); return true; } /** * 计算转化率 */ public double calculateConversionRate(String userId) { long viewCount = userBehaviors.stream() .filter(r -> UserBehaviorLog.EVENT_VIEW.equals(r.getEventType())) .count(); long purchaseCount = userBehaviors.stream() .filter(r -> UserBehaviorLog.EVENT_PURCHASE.equals(r.getEventType())) .count(); if (viewCount == 0) return 0.0; return (double) purchaseCount / viewCount * 100; } } ``` --- ## 面试题14:搜索推荐同步 ### 面试问题 商品变更后如何实时更新搜索索引? ### 解答要点 1. **消息驱动**:商品变更发送消息 2. **实时索引**:Elasticsearch实时更新 3. **增量同步**:只同步变更部分 ### 代码实现 ```java /** * 搜索同步服务 */ @Service public class SearchSyncService { @Autowired private ElasticsearchRestTemplate elasticsearchTemplate; /** * 同步商品到搜索索引 */ public void syncProductToSearch(ProductMessage productMessage) { String status = productMessage.getStatus(); switch (status) { case ProductMessage.STATUS_ONLINE: indexProduct(productMessage); break; case ProductMessage.STATUS_OFFLINE: case ProductMessage.STATUS_DELETED: deleteProductFromIndex(productMessage.getProductId()); break; } } /** * 索引商品 */ private void indexProduct(ProductMessage productMessage) { ProductDocument doc = new ProductDocument(); doc.setId(productMessage.getProductId()); doc.setName(productMessage.getProductName()); doc.setPrice(productMessage.getPrice()); doc.setCategoryId(productMessage.getCategoryId()); elasticsearchTemplate.save(doc); log.info("商品索引更新成功: productId={}", productMessage.getProductId()); } } ``` --- ## 面试题15:实时统计计算 ### 面试问题 如何实现实时统计?订单量、销售额如何实时计算? ### 解答要点 1. **滑动窗口**:使用滑动窗口统计 2. **预聚合**:提前聚合常用指标 3. **秒级延迟**:保证实时性 ### 代码实现 ```java /** * 实时统计服务 */ @Service public class RealTimeStatisticsService { private final Map orderCounters = new ConcurrentHashMap<>(); private final Map> amountAccumulators = new ConcurrentHashMap<>(); /** * 记录订单统计 */ public void recordOrder(String orderId, BigDecimal amount, String status) { // 按小时统计 String key = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH")); // 更新订单数 orderCounters.computeIfAbsent(key, k -> new AtomicLong(0)).incrementAndGet(); // 更新销售额 amountAccumulators.computeIfAbsent(key, k -> new AtomicReference<>(BigDecimal.ZERO)) .getAndUpdate(v -> v.add(amount)); } /** * 获取实时QPS */ public double calculateQPS() { long recentCount = getRecentOrderCount(1); return recentCount / 60.0; } /** * 获取最近N分钟的订单量 */ public long getRecentOrderCount(int minutes) { LocalDateTime cutoffTime = LocalDateTime.now().minusMinutes(minutes); return recentOrders.stream() .filter(r -> r.getCreateTime().isAfter(cutoffTime)) .count(); } } ``` --- ## 面试题16:分布式锁 ### 面试问题 如何使用Kafka实现分布式锁?与Redis分布式锁对比如何? ### 解答要点 1. **去中心化**:Kafka实现分布式锁 2. **过期机制**:锁自动过期 3. **对比Redis**:优缺点分析 ### 代码实现 ```java /** * 分布式锁服务 */ @Service public class DistributedLockService { private final Map lockMap = new ConcurrentHashMap<>(); /** * 尝试获取锁 */ public boolean tryLock(String lockKey, String owner, long expireMs) { LockData existing = lockMap.get(lockKey); // 检查锁是否存在且未过期 if (existing != null) { if (existing.isExpired()) { lockMap.remove(lockKey); } else if (!existing.getOwner().equals(owner)) { return false; } } // 创建新锁 long expireTime = System.currentTimeMillis() + expireMs; LockData newLock = new LockData(lockKey, owner, expireTime); return lockMap.putIfAbsent(lockKey, newLock) == null; } /** * 释放锁 */ public boolean unlock(String lockKey, String owner) { LockData lockData = lockMap.get(lockKey); if (lockData == null) return false; if (!lockData.getOwner().equals(owner)) { return false; } lockMap.remove(lockKey); return true; } /** * 使用锁执行操作 */ public void executeWithLock(String lockKey, String owner, long expireMs, Runnable runnable) { if (!tryLock(lockKey, owner, expireMs)) { throw new RuntimeException("获取锁失败: " + lockKey); } try { runnable.run(); } finally { unlock(lockKey, owner); } } } ``` --- ## 面试题17:消息顺序控制 ### 面试问题 如何保证消息的全局有序?分区内有序和全局有序的区别? ### 解答要点 1. **分区内有序**:相同Key进入相同分区 2. **全局有序**:单分区或自定义排序 3. **权衡**:性能与顺序的取舍 ### 代码实现 ```java /** * 全局有序控制 */ @Service public class GlobalOrderService { /** * 保证全局有序发送 * 只有一个分区,所有消息进入同一分区 */ public void sendInGlobalOrder(String topic, String key, Object message) { // 发送到单分区主题 kafkaTemplate.send(topic, "global", message); } /** * 分区内有序发送 * 相同订单ID的消息进入同一分区,保证有序 */ public void sendInPartitionOrder(String topic, String orderId, Object message) { // 使用订单ID作为Key,保证同一订单消息进入同一分区 kafkaTemplate.send(topic, orderId, message); } /** * 自定义分区策略 * 根据业务需求自定义分区 */ public int customPartition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); String keyStr = new String(keyBytes); // 按用户ID分区,同一用户的消息有序 if (keyStr.startsWith("user:")) { String userId = keyStr.substring(5); return Math.abs(userId.hashCode()) % numPartitions; } return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } ``` --- ## 面试题18:消息积压监控与告警 ### 面试问题 如何监控消息积压?积压过多如何处理? ### 解答要点 1. **监控指标**:积压量、消费延迟 2. **告警机制**:超过阈值告警 3. **应急处理**:扩容、跳消息 ### 代码实现 ```java /** * 消息监控服务 */ @Service public class MessageMonitorService { private final Map topicMetrics = new ConcurrentHashMap<>(); /** * 检查消息积压 */ public void checkMessageLag() { for (String topic : topicMetrics.keySet()) { TopicMetrics metrics = topicMetrics.get(topic); // 计算积压量 long lag = calculateLag(topic); metrics.setConsumerLag(lag); // 告警检查 if (lag > 100000) { log.warn("消息积压严重: topic={}, lag={}", topic, lag); sendAlert(topic, lag); } } } /** * 发送告警 */ private void sendAlert(String topic, long lag) { // 发送邮件、短信、钉钉等 log.error("发送告警: topic={}, lag={}", topic, lag); } /** * 跳到最新位置(应急处理) */ public void skipToLatest(Consumer consumer, String topic) { consumer.seekToEnd(consumer.assignment()); log.info("已跳过积压消息,从最新位置开始消费: topic={}", topic); } /** * 跳到指定时间点 */ public void skipToTimestamp(Consumer consumer, String topic, long timestamp) { Map timestampsToSearch = new HashMap<>(); for (TopicPartition partition : consumer.assignment()) { timestampsToSearch.put(partition, timestamp); } Map offsets = consumer.offsetsForTimes(timestampsToSearch); for (Map.Entry entry : offsets.entrySet()) { if (entry.getValue() != null) { consumer.seek(entry.getKey(), entry.getValue().offset()); } } } } ``` --- ## 面试题19:自定义分区策略 ### 面试问题 如何根据业务需求实现自定义分区策略? ### 解答要点 1. **默认策略**:哈希分区 2. **自定义策略**:按业务规则分区 3. **负载均衡**:避免分区倾斜 ### 代码实现 ```java /** * 自定义分区服务 */ @Service public class CustomPartitionService implements Partitioner { private final Map regionPartitionMap = new ConcurrentHashMap<>(); /** * 自定义分区策略 */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions; } String keyStr = new String(keyBytes); // 按用户ID分区 if (keyStr.startsWith("user:")) { String userId = keyStr.substring(5); return Math.abs(userId.hashCode()) % numPartitions; } // 按地区分区 if (keyStr.startsWith("region:")) { String region = keyStr.substring(7); return regionPartitionMap.computeIfAbsent(region, r -> Math.abs(r.hashCode()) % numPartitions); } // 按订单ID分区(保证同一订单消息有序) if (keyStr.startsWith("order:")) { String orderId = keyStr.substring(6); return Math.abs(orderId.hashCode()) % numPartitions; } return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } ``` --- ## 面试题20:消息重试机制 ### 面试问题 消息消费失败后如何重试?如何实现指数退避? ### 解答要点 1. **重试队列**:失败消息进入重试队列 2. **指数退避**:重试间隔递增 3. **最大次数**:超过次数进入死信队列 ### 代码实现 ```java /** * 消息重试服务 */ @Service public class MessageRetryService { private final DelayQueue retryQueue = new DelayQueue<>(); /** * 添加重试任务 */ public void addRetryTask(String topic, String key, Object message, String errorMsg) { String messageId = UUID.randomUUID().toString(); // 计算重试间隔(指数退避) long delay = calculateBackoff(0); RetryRecord record = new RetryRecord( messageId, topic, key, message, 0, System.currentTimeMillis() + delay, errorMsg); retryQueue.offer(new RetryTask(record, delay)); log.info("添加重试任务: messageId={}, delay={}ms", messageId, delay); } /** * 计算退避时间 */ private long calculateBackoff(int retryCount) { // 基础延迟:1秒 long baseDelay = 1000; // 指数退避:2^retryCount long delay = (long) (baseDelay * Math.pow(2, retryCount)); // 添加随机抖动 delay += (long) (Math.random() * baseDelay); // 限制最大延迟:5分钟 return Math.min(delay, 5 * 60 * 1000); } /** * 处理重试 */ private void processRetry(RetryRecord record) { kafkaTemplate.send(record.getTopic(), record.getKey(), record.getMessage()) .whenComplete((result, ex) -> { if (ex != null) { handleRetryFailure(record, ex); } else { handleRetrySuccess(record); } }); } /** * 处理重试失败 */ private void handleRetryFailure(RetryRecord record, Throwable ex) { int newRetryCount = record.getRetryCount() + 1; if (newRetryCount >= 5) { // 超过最大重试次数,发送到死信队列 sendToDeadLetterQueue(record); return; } // 计算下一次重试延迟 long delay = calculateBackoff(newRetryCount); record.setRetryCount(newRetryCount); retryQueue.offer(new RetryTask(record, delay)); log.warn("重试失败,计划下一次重试: retryCount={}, delay={}ms", newRetryCount, delay); } } ``` --- ## 测试用例代码 ### 面试题1测试用例:Kafka事务 ```java /** * 事务服务测试 * 面试题1:Kafka事务实现分布式事务 */ @ExtendWith(MockitoExtension.class) class TransactionServiceTest { @Mock private KafkaTemplate kafkaTemplate; @Mock private OrderService orderService; @Mock private InventoryService inventoryService; @Mock private PaymentService paymentService; @InjectMocks private TransactionService transactionService; /** * 测试分布式事务执行成功 */ @Test void testExecuteOrderTransaction_Success() { // 准备测试数据 OrderMessage orderMessage = createOrderMessage(); InventoryMessage inventoryMessage = createInventoryMessage(); PaymentMessage paymentMessage = createPaymentMessage(); // Mock KafkaTemplate.executeInTransaction when(kafkaTemplate.executeInTransaction(any())).thenAnswer(invocation -> true); // 执行测试 boolean result = transactionService.executeOrderTransaction( orderMessage, inventoryMessage, paymentMessage); // 验证结果 assertTrue(result); verify(orderService).createOrder(any(OrderMessage.class)); } } ``` ### 面试题2测试用例:库存扣减 ```java /** * 库存服务测试 * 面试题2:库存扣减(乐观锁防超卖) */ class InventoryServiceTest { private InventoryService inventoryService; @BeforeEach void setUp() { inventoryService = new InventoryService(); inventoryService.initInventory("PRODUCT-001", 100); } /** * 测试库存扣减成功 */ @Test void testDeductInventory_Success() { boolean result = inventoryService.deductInventory("PRODUCT-001", 10, "ORDER-001", 0L); assertTrue(result); assertEquals(90, inventoryService.getStock("PRODUCT-001")); } /** * 测试库存不足 */ @Test void testDeductInventory_InsufficientStock() { boolean result = inventoryService.deductInventory("PRODUCT-001", 150, "ORDER-001", 0L); assertFalse(result); assertEquals(100, inventoryService.getStock("PRODUCT-001")); } /** * 测试乐观锁 - 版本号不匹配 */ @Test void testDeductInventory_VersionMismatch() { inventoryService.deductInventory("PRODUCT-001", 10, "ORDER-001", 0L); boolean result = inventoryService.deductInventory("PRODUCT-001", 10, "ORDER-002", 0L); assertFalse(result); } } ``` ### 面试题3测试用例:支付回调 ```java /** * 支付服务测试 * 面试题3:支付回调(幂等性处理) */ class PaymentServiceTest { private PaymentService paymentService; @BeforeEach void setUp() { paymentService = new PaymentService(); } /** * 测试支付回调成功 */ @Test void testProcessPaymentCallback_Success() { PaymentMessage message = createPaymentMessage("ORDER-001", "PAY-001", PaymentMessage.STATUS_SUCCESS, 1L); boolean result = paymentService.processPaymentCallback(message); assertTrue(result); assertEquals(PaymentMessage.STATUS_SUCCESS, paymentService.getPaymentStatus("ORDER-001")); } /** * 测试重复通知(版本号相同) */ @Test void testProcessPaymentCallback_DuplicateWithSameVersion() { PaymentMessage message1 = createPaymentMessage("ORDER-001", "PAY-001", PaymentMessage.STATUS_SUCCESS, 1L); paymentService.processPaymentCallback(message1); PaymentMessage message2 = createPaymentMessage("ORDER-001", "PAY-001", PaymentMessage.STATUS_SUCCESS, 1L); boolean result = paymentService.processPaymentCallback(message2); assertTrue(result); // 应该被忽略 } } ``` ### 面试题4测试用例:物流通知 ```java /** * 物流服务测试 * 面试题4:物流通知(消息驱动) */ class ShippingServiceTest { private ShippingService shippingService; @BeforeEach void setUp() { shippingService = new ShippingService(); shippingService.createShipping("ORDER-001", "SF"); } /** * 测试物流状态更新 */ @Test void testProcessShippingUpdate_Shipped() { ShippingMessage message = createShippingMessage("ORDER-001", ShippingMessage.STATUS_SHIPPED, 1L); boolean result = shippingService.processShippingUpdate(message); assertTrue(result); ShippingService.ShippingData shipping = shippingService.getShippingInfo("ORDER-001"); assertEquals(ShippingMessage.STATUS_SHIPPED, shipping.getStatus()); } /** * 测试版本号过低 - 忽略旧消息 */ @Test void testProcessShippingUpdate_OldVersion() { shippingService.processShippingUpdate(createShippingMessage("ORDER-001", ShippingMessage.STATUS_SHIPPED, 1L)); ShippingMessage message = createShippingMessage("ORDER-001", ShippingMessage.STATUS_IN_TRANSIT, 0L); boolean result = shippingService.processShippingUpdate(message); assertTrue(result); // 旧版本应该被忽略 } } ``` ### 面试题5测试用例:优惠券发放 ```java /** * 优惠券服务测试 * 面试题5:优惠券广播发放 */ class CouponServiceTest { private CouponService couponService; @BeforeEach void setUp() { couponService = new CouponService(); } /** * 测试发放优惠券成功 */ @Test void testIssueCoupon_Success() { CouponMessage message = createCouponMessage("USER-001", "COUPON-001", CouponMessage.TYPE_CASH, new BigDecimal("10")); boolean result = couponService.issueCoupon(message); assertTrue(result); } /** * 测试重复发放优惠券 */ @Test void testIssueCoupon_Duplicate() { CouponMessage message1 = createCouponMessage("USER-001", "COUPON-001", CouponMessage.TYPE_CASH, new BigDecimal("10")); couponService.issueCoupon(message1); CouponMessage message2 = createCouponMessage("USER-001", "COUPON-001", CouponMessage.TYPE_CASH, new BigDecimal("10")); boolean result = couponService.issueCoupon(message2); assertFalse(result); // 应该失败 } /** * 测试使用优惠券 */ @Test void testUseCoupon_Success() { CouponMessage message = createCouponMessage("USER-001", "COUPON-001", CouponMessage.TYPE_CASH, new BigDecimal("10")); couponService.issueCoupon(message); boolean result = couponService.useCoupon("USER-001", "COUPON-001", new BigDecimal("100")); assertTrue(result); } } ``` ### 面试题6测试用例:会员积分 ```java /** * 积分服务测试 * 面试题6:会员积分计算 */ class PointsServiceTest { private PointsService pointsService; @BeforeEach void setUp() { pointsService = new PointsService(); } /** * 测试获取积分 */ @Test void testProcessPointsMessage_Earn() { PointsMessage message = createPointsMessage("USER-001", PointsMessage.TYPE_EARN, 100); boolean result = pointsService.processPointsMessage(message); assertTrue(result); PointsService.UserPoints userPoints = pointsService.getUserPoints("USER-001"); assertEquals(100, userPoints.getTotalPoints()); } /** * 测试扣减积分 */ @Test void testProcessPointsMessage_Deduct() { PointsMessage earnMessage = createPointsMessage("USER-001", PointsMessage.TYPE_EARN, 100); pointsService.processPointsMessage(earnMessage); PointsMessage deductMessage = createPointsMessage("USER-001", PointsMessage.TYPE_DEDUCT, 30); boolean result = pointsService.processPointsMessage(deductMessage); assertTrue(result); PointsService.UserPoints userPoints = pointsService.getUserPoints("USER-001"); assertEquals(70, userPoints.getTotalPoints()); } /** * 测试扣减积分 - 积分不足 */ @Test void testProcessPointsMessage_Deduct_Insufficient() { PointsMessage earnMessage = createPointsMessage("USER-001", PointsMessage.TYPE_EARN, 50); pointsService.processPointsMessage(earnMessage); PointsMessage deductMessage = createPointsMessage("USER-001", PointsMessage.TYPE_DEDUCT, 100); boolean result = pointsService.processPointsMessage(deductMessage); assertFalse(result); } } ``` ### 面试题7测试用例:消息通知 ```java /** * 通知服务测试 * 面试题7:消息通知服务 */ class NotificationServiceTest { private NotificationService notificationService; @BeforeEach void setUp() { notificationService = new NotificationService(); } /** * 测试发送短信通知 */ @Test void testSendNotification_SMS() { NotificationMessage message = createNotificationMessage("USER-001", NotificationMessage.CHANNEL_SMS); boolean result = notificationService.sendNotification(message); assertTrue(result); } /** * 测试重复发送通知(幂等性) */ @Test void testSendNotification_Duplicate() { NotificationMessage message1 = createNotificationMessage("USER-001", NotificationMessage.CHANNEL_SMS, "MSG-001"); notificationService.sendNotification(message1); NotificationMessage message2 = createNotificationMessage("USER-001", NotificationMessage.CHANNEL_SMS, "MSG-001"); boolean result = notificationService.sendNotification(message2); assertTrue(result); // 应该被跳过 } /** * 测试批量发送通知 */ @Test void testBatchSendNotification() { NotificationMessage[] messages = new NotificationMessage[]{ createNotificationMessage("USER-001", NotificationMessage.CHANNEL_SMS), createNotificationMessage("USER-002", NotificationMessage.CHANNEL_EMAIL), createNotificationMessage("USER-003", NotificationMessage.CHANNEL_PUSH) }; int successCount = notificationService.batchSendNotification(messages); assertEquals(3, successCount); } } ``` ### 面试题10测试用例:商品数据同步 ```java /** * 商品服务测试 * 面试题10:商品数据同步 */ class ProductServiceTest { private ProductService productService; @BeforeEach void setUp() { productService = new ProductService(); } /** * 测试创建商品 */ @Test void testProcessProductMessage_Create() { ProductMessage message = createProductMessage("PRODUCT-001", ProductMessage.STATUS_ONLINE, 1L); boolean result = productService.processProductMessage(message); assertTrue(result); List products = productService.getProductsByCategory("CATEGORY-001"); assertEquals(1, products.size()); } /** * 测试下架商品 */ @Test void testProcessProductMessage_Offline() { ProductMessage createMsg = createProductMessage("PRODUCT-001", ProductMessage.STATUS_ONLINE, 1L); productService.processProductMessage(createMsg); ProductMessage offlineMsg = createProductMessage("PRODUCT-001", ProductMessage.STATUS_OFFLINE, 2L); boolean result = productService.processProductMessage(offlineMsg); assertTrue(result); List products = productService.getProductsByCategory("CATEGORY-001"); assertEquals(0, products.size()); } /** * 测试版本号过低 - 跳过更新 */ @Test void testProcessProductMessage_OldVersion() { ProductMessage createMsg = createProductMessage("PRODUCT-001", ProductMessage.STATUS_ONLINE, 1L); productService.processProductMessage(createMsg); ProductMessage updateMsg = createProductMessage("PRODUCT-001", ProductMessage.STATUS_ONLINE, 0L); boolean result = productService.processProductMessage(updateMsg); assertTrue(result); // 应该被跳过 } } ``` ### 面试题13测试用例:用户行为分析 ```java /** * 用户行为分析服务测试 * 面试题13:用户行为分析 */ class UserBehaviorServiceTest { private UserBehaviorService userBehaviorService; @BeforeEach void setUp() { userBehaviorService = new UserBehaviorService(); } /** * 测试处理用户浏览行为 */ @Test void testProcessUserBehavior_View() { UserBehaviorLog log = createBehaviorLog("USER-001", UserBehaviorLog.EVENT_VIEW); boolean result = userBehaviorService.processUserBehavior(log); assertTrue(result); } /** * 测试获取事件统计 */ @Test void testGetEventStatistics() { userBehaviorService.processUserBehavior(createBehaviorLog("USER-001", UserBehaviorLog.EVENT_VIEW)); userBehaviorService.processUserBehavior(createBehaviorLog("USER-002", UserBehaviorLog.EVENT_VIEW)); userBehaviorService.processUserBehavior(createBehaviorLog("USER-003", UserBehaviorLog.EVENT_CLICK)); Map stats = userBehaviorService.getEventStatistics(); assertEquals(2, stats.get(UserBehaviorLog.EVENT_VIEW)); assertEquals(1, stats.get(UserBehaviorLog.EVENT_CLICK)); } /** * 测试计算转化率 */ @Test void testCalculateConversionRate_WithPurchase() { userBehaviorService.processUserBehavior(createBehaviorLog("USER-001", UserBehaviorLog.EVENT_VIEW)); userBehaviorService.processUserBehavior(createBehaviorLog("USER-001", UserBehaviorLog.EVENT_VIEW)); userBehaviorService.processUserBehavior(createBehaviorLog("USER-001", UserBehaviorLog.EVENT_PURCHASE)); double rate = userBehaviorService.calculateConversionRate("USER-001"); assertEquals(50.0, rate, 0.1); } } ``` ### 面试题15测试用例:实时统计 ```java /** * 实时统计服务测试 * 面试题15:实时统计计算 */ class RealTimeStatisticsServiceTest { private RealTimeStatisticsService statisticsService; @BeforeEach void setUp() { statisticsService = new RealTimeStatisticsService(); } /** * 测试记录订单统计 */ @Test void testRecordOrder() { statisticsService.recordOrder("ORDER-001", new BigDecimal("100.00"), "PAID"); long todayCount = statisticsService.getTodayOrderCount(); assertTrue(todayCount > 0); } /** * 测试获取平均订单金额 */ @Test void testGetAverageOrderAmount() { statisticsService.recordOrder("ORDER-001", new BigDecimal("100.00"), "PAID"); statisticsService.recordOrder("ORDER-002", new BigDecimal("200.00"), "PAID"); BigDecimal avgAmount = statisticsService.getAverageOrderAmount(); assertEquals(new BigDecimal("150.00").setScale(2, BigDecimal.ROUND_HALF_UP), avgAmount.setScale(2, BigDecimal.ROUND_HALF_UP)); } /** * 测试计算QPS */ @Test void testCalculateQPS() { statisticsService.recordOrder("ORDER-001", new BigDecimal("100.00"), "PAID"); double qps = statisticsService.calculateQPS(); assertTrue(qps >= 0); } } ``` ### 面试题16测试用例:分布式锁 ```java /** * 分布式锁服务测试 * 面试题16:分布式锁 */ class DistributedLockServiceTest { private DistributedLockService lockService; @BeforeEach void setUp() { lockService = new DistributedLockService(); } /** * 测试获取锁成功 */ @Test void testTryLock_Success() { boolean result = lockService.tryLock("ORDER-001", "USER-001", 5000L); assertTrue(result); } /** * 测试同一锁被其他持有者占用 */ @Test void testTryLock_OtherOwner() { lockService.tryLock("ORDER-001", "USER-001", 5000L); boolean result = lockService.tryLock("ORDER-001", "USER-002", 5000L); assertFalse(result); } /** * 测试释放自己的锁 */ @Test void testUnlock_OwnLock() { lockService.tryLock("ORDER-001", "USER-001", 5000L); boolean result = lockService.unlock("ORDER-001", "USER-001"); assertTrue(result); boolean canAcquire = lockService.tryLock("ORDER-001", "USER-002", 5000L); assertTrue(canAcquire); } } ``` ### 面试题18测试用例:消息积压监控 ```java /** * 消息监控服务测试 * 面试题18:消息积压监控告警 */ class MessageMonitorServiceTest { private MessageMonitorService monitorService; @BeforeEach void setUp() { monitorService = new MessageMonitorService(); } /** * 测试记录消费进度 */ @Test void testRecordConsumption() { assertDoesNotThrow(() -> monitorService.recordConsumption("order-topic", 100L) ); } /** * 测试获取主题指标 */ @Test void testGetTopicMetrics() { monitorService.recordConsumption("order-topic", 100L); var metrics = monitorService.getTopicMetrics("order-topic"); assertNotNull(metrics); } /** * 测试获取消费速率 */ @Test void testGetConsumptionRate() { monitorService.recordConsumption("order-topic", 100L); double rate = monitorService.getConsumptionRate("order-topic"); assertTrue(rate >= 0); } } ``` ### 面试题20测试用例:消息重试机制 ```java /** * 消息重试服务测试 * 面试题20:消息重试机制 */ class MessageRetryServiceTest { @Mock private KafkaTemplate kafkaTemplate; private MessageRetryService retryService; @BeforeEach void setUp() { retryService = new MessageRetryService(kafkaTemplate); } /** * 测试添加重试任务 */ @Test void testAddRetryTask() { OrderMessage message = new OrderMessage(); message.setOrderId("ORDER-001"); message.setStatus(OrderStatus.PENDING); assertDoesNotThrow(() -> retryService.addRetryTask("order-topic", "ORDER-001", message, "Test error") ); } /** * 测试获取重试状态 */ @Test void testGetRetryStatus() { OrderMessage message = new OrderMessage(); message.setOrderId("ORDER-001"); retryService.addRetryTask("order-topic", "ORDER-001", message, "Test error"); var status = retryService.getRetryStatus(); assertNotNull(status); assertTrue(status.containsKey("totalRetryRecords")); } /** * 测试手动重试 */ @Test void testManualRetry() { OrderMessage message = new OrderMessage(); message.setOrderId("ORDER-001"); retryService.addRetryTask("order-topic", "ORDER-001", message, "Test error"); assertDoesNotThrow(() -> retryService.manualRetry("MSG-001") ); } } ``` --- ## 总结 以上20个新面试题涵盖了商城高并发场景的核心问题: | 类别 | 面试题 | |------|--------| | 分布式事务 | 1, 8 | | 库存管理 | 2, 9 | | 支付回调 | 3 | | 消息通知 | 4, 7, 11 | | 用户体系 | 5, 6, 13 | | 数据同步 | 10, 14 | | 实时统计 | 12, 15 | | 分布式锁 | 16 | | 消息顺序 | 17 | | 监控运维 | 18, 19, 20 | --- # 生产环境Kafka延迟消息最优方案 ## 一、延迟消息场景分析 在电商系统中,延迟消息是常见需求: | 场景 | 延迟时间 | 典型例子 | |------|----------|----------| | 订单超时取消 | 15-30分钟 | 订单未支付自动取消 | | 支付超时提醒 | 5-10分钟 | 支付提醒 | | 物流状态通知 | 1-2小时 | 发货提醒 | | 评价提醒 | 1-7天 | 收货后提醒评价 | | 优惠券过期 | 1-30天 | 优惠券到期提醒 | ## 二、三种方案对比 | 特性 | 方案1:时间轮算法 | 方案2:延时主题 | 方案3:Redis ZSET | |------|-------------------|-----------------|-------------------| | **精度** | 毫秒级 | 秒级(依赖扫描频率) | 毫秒级 | | **性能** | 极高 O(1) | 中 | 高 | | **复杂度** | 中 | 低 | 低 | | **可靠性** | 高(Redis持久化) | 高(Kafka持久化) | 高(Redis持久化) | | **分布式支持** | ✅ Redis分布式锁 | ✅ Kafka消费者组 | ✅ Redis集群 | | **消息持久化** | ✅ Redis | ✅ Kafka | ✅ Redis | | **实现难度** | 中等 | 简单 | 简单 | | **额外依赖** | Redis | 无 | Redis | ## 三、生产环境最优方案推荐 ### 方案选择原则 ``` 延迟时间 < 1分钟 → 推荐:时间轮算法(最高性能) 延迟时间 1-60分钟 → 推荐:延时主题(最简单可靠) 延迟时间 > 1小时 → 推荐:Redis ZSET(高可靠、易管理) ``` ### 推荐方案:分层混合架构 ``` ┌─────────────────────────────────────────────────────────────────┐ │ 延迟消息分层架构 │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ 时间轮 │ │ 延时主题 │ │ Redis ZSET │ │ │ │ (毫秒级) │ │ (分钟级) │ │ (小时/天级) │ │ │ │ │ │ │ │ │ │ │ │ 0-60秒 │ │ 1-60分钟 │ │ 1小时以上 │ │ │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ │ │ │ │ └───────────────────┼───────────────────┘ │ │ ▼ │ │ ┌─────────────┐ │ │ │ 目标Topic │ → 最终消费者处理 │ │ └─────────────┘ │ └─────────────────────────────────────────────────────────────────┘ ``` ## 四、最优方案代码实现 ### 1. 时间轮算法(短延迟 0-60秒) **代码位置:** `service/TimeWheelDelayQueueService.java` ```java /** * 分布式时间轮延迟队列 - 适用于短延迟场景(毫秒级精度) * * 优点: * - O(1)时间复杂度,性能极高 * - 毫秒级精度 * - 支持分布式部署 * * 适用场景: * - 订单超时取消(30秒内) * - 实时消息推送 * - 短时延迟任务 */ @Service public class TimeWheelDelayQueueService { private static final int WHEEL_SIZE = 60; // 60个槽位,每秒移动一次 private static final String WHEEL_SLOT_KEY_PREFIX = "delay:wheel:slot:"; /** * 添加延迟任务 * * @param topic 目标Topic * @param key 消息Key * @param message 消息内容 * @param delayMs 延迟毫秒数(建议 < 60000ms) */ public void addDelayTask(String topic, String key, Object message, long delayMs) { String taskId = key + "-" + System.currentTimeMillis(); long executeTime = System.currentTimeMillis() + delayMs; // 计算槽位:当前时间 + 延迟时间 / 1000 % 60 int slot = (int) ((executeTime / 1000) % WHEEL_SIZE); // 存储到Redis ZSET String slotKey = WHEEL_SLOT_KEY_PREFIX + slot; redisTemplate.opsForZSet().add(slotKey, taskId, executeTime); log.info("时间轮添加任务: topic={}, delay={}ms, slot={}", topic, delayMs, slot); } /** * 每秒扫描到期任务 */ @Scheduled(fixedDelay = 1000) public void scanAndProcessExpiredTasks() { int currentSlot = (int) ((System.currentTimeMillis() / 1000) % WHEEL_SIZE); String slotKey = WHEEL_SLOT_KEY_PREFIX + currentSlot; // 获取到期任务 Set> expiredTasks = redisTemplate.opsForZSet().rangeByScoreWithScores(slotKey, 0, System.currentTimeMillis()); for (var task : expiredTasks) { // 发送到目标Topic processTask(task.getValue()); } } } ``` ### 2. 延时主题(中延迟 1-60分钟) **代码位置:** `service/ScheduledTopicDelayQueueService.java` > **核心原理:消息发送到"延时Topic"而非"目标Topic",消费者检查执行时间后再转发** ``` ┌─────────────────────────────────────────────────────────────────┐ │ 延时主题 + 定时扫描 架构 │ │ │ │ 1. 发送阶段 │ │ ┌─────────┐ │ │ │ 业务调用 │ sendToDelayTopic(order-topic, msg, 30分钟) │ │ └────┬────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ 延时Topic: delay-30min (不是目标topic!) │ │ │ │ 消息内容: {targetTopic:"order-topic", executeTime:xx} │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ │ 2. 消费阶段(关键!) │ │ ┌─────────────────────────────────────────────────────────┐│ │ │ 消费者从delay-30min消费 ││ │ │ ││ │ │ 检查 executeTime = 30分钟后的时间戳 ││ │ │ ↓ ││ │ │ 如果 还没到执行时间 → Thread.sleep(等待) ││ │ │ 如果 已经到执行时间 → 转发到目标Topic ││ │ └─────────────────────────────────────────────────────────┘│ └─────────────────────────────────────────────────────────────────┘ ``` **两种实现方式:** | 方式 | 说明 | 优点 | 缺点 | |------|------|------|------| | **消费者等待** | 消费到消息后,Thread.sleep()等待到期 | 简单 | 占用消费者线程 | | **定时扫描** | 消息存入内存Map,定时扫描到期转发 | 不占用线程 | 需要额外存储 | ```java /** * 延时主题延迟队列 - 适用于中延迟场景(简单可靠) * * 优点: * - 无需额外依赖,利用Kafka自身能力 * - 消息持久化在Kafka,天然高可用 * - 消费者组实现负载均衡 * * 适用场景: * - 订单超时取消(15-30分钟) * - 支付超时提醒(5-10分钟) * - 物流状态通知(1-2小时) * * ⚠️ 关键说明: * - 消息发送到"延时Topic"(如delay-30min),不是目标Topic! * - 消费者从延时Topic消费后,检查executeTime是否到期 * - 时间没到就等待/暂存,时间到了才转发到目标Topic */ @Service public class ScheduledTopicDelayQueueService { private static final String DELAY_TOPIC_PREFIX = "delay-"; /** * 发送延迟消息 - 发送到延时Topic,不是目标Topic! * * @param topic 目标Topic(最终消费的实际Topic) * @param key 消息Key * @param message 消息内容 * @param delayMs 延迟毫秒数(建议 60000ms - 3600000ms) */ public void sendToDelayTopic(String topic, String key, Object message, long delayMs) { // 根据延迟时间选择不同的延时Topic String delayTopic = calculateDelayTopic(delayMs); // 消息内容包含:目标Topic、执行时间、实际消息 Map delayMessage = new HashMap<>(); delayMessage.put("targetTopic", topic); // 目标Topic delayMessage.put("executeTime", System.currentTimeMillis() + delayMs); // 执行时间戳 delayMessage.put("message", JsonUtils.toJson(message)); // 实际消息 // 发送到延时Topic! kafkaTemplate.send(delayTopic, key, delayMessage); } /** * 根据延迟时间计算延时Topic */ private String calculateDelayTopic(long delayMs) { if (delayMs <= 60 * 1000) return "delay-1min"; if (delayMs <= 5 * 60 * 1000) return "delay-5min"; if (delayMs <= 30 * 60 * 1000) return "delay-30min"; if (delayMs <= 60 * 60 * 1000) return "delay-1hour"; return "delay-1day"; } /** * 消费延时消息 - 关键:检查是否到期,到期才转发 * * @param message 延时消息(包含targetTopic、executeTime、message) * @param ack 确认机制 */ @KafkaListener(topics = "delay-1min,delay-5min,delay-30min,delay-1hour", groupId = "delay-queue-consumer-group") public void consumeDelayMessage(Map message, Acknowledgment ack) { // 获取执行时间 long executeTime = (long) message.get("executeTime"); long now = System.currentTimeMillis(); // 关键:检查是否到期 if (executeTime > now) { // 时间还没到! long waitMs = executeTime - now; if (waitMs <= 60000) { // 小于1分钟:Thread.sleep等待(简单方案) try { Thread.sleep(waitMs); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { // 大于1分钟:暂存到内存,稍后定时扫描处理 // (代码中有waitingMessages Map暂存逻辑) log.info("延迟时间较长,暂存等待扫描: waitMs={}", waitMs); return; // 不提交ack,消息会重新被消费 } } // 时间到了,转发到目标Topic! String targetTopic = (String) message.get("targetTopic"); String messageJson = (String) message.get("message"); kafkaTemplate.send(targetTopic, messageJson); ack.acknowledge(); log.info("延时消息已转发到目标Topic: targetTopic={}", targetTopic); } } ``` ### 3. Redis ZSET(长延迟 > 1小时) **代码位置:** `service/ExternalDelayQueueService.java` > **核心原理:延迟消息存放到Redis ZSET,score=执行时间戳,定时扫描到期消息转发到Kafka** #### Redis ZSET 存放示例 ``` 添加延迟消息(7天后评价提醒): ┌─────────────────────────────────────────────────────────────────┐ │ Redis ZSET key: delay:queue:review-topic │ │ │ │ score (执行时间戳) member (消息内容) │ │ ───────────────────────────────────────────────────────── │ │ 1750000000000 ──▶ {messageId, key, message, executeTime} │ │ 1750000100000 ──▶ {messageId, key, message, executeTime} │ │ 1751000000000 ──▶ {messageId, key, message, executeTime} │ │ │ │ ZRANGEBYSCORE delay:queue:review-topic 0 1750000000000 │ │ → 获取所有到期消息(score <= 当前时间戳) │ └─────────────────────────────────────────────────────────────────┘ 实际数据示例: ┌─────────────────────────────────────────────────────────────────┐ │ Redis数据: │ │ │ │ Key: delay:queue:review-topic │ │ │ │ score=1750000000000 → "review-ORDER-001|ORDER-001|{...}|1750000000000" │ │ score=1750000100000 → "review-ORDER-002|ORDER-002|{...}|1750000100000" │ │ score=1751000000000 → "review-ORDER-003|ORDER-003|{...}|1751000000000" │ │ │ │ 消息格式:messageId|key|messageJson|executeTime │ └─────────────────────────────────────────────────────────────────┘ ``` #### 定时扫描机制 ``` 扫描频率:每秒执行一次 (@Scheduled(fixedDelay = 1000)) 扫描流程: ┌─────────────────────────────────────────────────────────────────┐ │ 1. 获取分布式锁(防止多实例重复处理) │ │ SET delay:lock:review-topic "instance-1" EX 2 NX │ │ │ │ 2. 查询到期消息 │ │ ZRANGEBYSCORE delay:queue:review-topic 0 1750000000000 │ │ │ │ 3. 遍历处理 │ │ for each message in expiredMessages: │ │ - 发送Kafka: send(topic, key, message) │ │ - 移除ZSET: ZREM delay:queue:review-topic message │ │ │ │ 4. 释放锁 │ │ DEL delay:lock:review-topic │ └─────────────────────────────────────────────────────────────────┘ ``` #### 使用示例 ```java /** * Redis ZSET延迟队列 - 适用于长延迟场景(高可靠、易管理) * * 优点: * - 数据持久化,不丢失(Redis持久化) * - 支持精确延迟(毫秒级) * - 易于监控和管理(Redis可查看ZSET内容) * - 支持Redis集群 * - 不占用消费者线程 * * 适用场景: * - 评价提醒(1-7天) * - 优惠券过期提醒(1-30天) * - 会员权益过期提醒 */ @Service public class ExternalDelayQueueService { private static final String DELAY_QUEUE_KEY_PREFIX = "delay:queue:"; private static final String PROCESSING_KEY_PREFIX = "delay:processing:"; private static final String LOCK_KEY_PREFIX = "delay:lock:"; /** * 添加延迟消息到Redis ZSET * * @param topic 目标Topic * @param key 消息Key * @param message 消息内容 * @param delayMs 延迟毫秒数(建议 > 3600000ms,即1小时以上) */ public void addDelayMessage(String topic, String key, Object message, long delayMs) { // 生成唯一消息ID String messageId = key + "-" + System.currentTimeMillis(); // 计算执行时间戳(当前时间 + 延迟时间) long executeTime = System.currentTimeMillis() + delayMs; // 构建消息内容 String messageValue = buildMessageValue(messageId, key, message, executeTime); // 存放到Redis ZSET,score=执行时间戳 String queueKey = DELAY_QUEUE_KEY_PREFIX + topic; redisTemplate.opsForZSet().add(queueKey, messageValue, executeTime); log.info("Redis延迟消息已添加: topic={}, key={}, delay={}ms, executeTime={}", topic, key, delayMs, executeTime); } /** * 定时扫描到期消息 - 每秒执行 */ @Scheduled(fixedDelay = 1000) public void scanAndProcessDelayMessages() { // 遍历所有Topic for (String topic : Arrays.asList("order-topic", "payment-topic", "review-topic")) { processTopicMessages(topic); } } /** * 处理特定Topic的到期消息 */ private void processTopicMessages(String topic) { String queueKey = DELAY_QUEUE_KEY_PREFIX + topic; String lockKey = LOCK_KEY_PREFIX + topic; long now = System.currentTimeMillis(); // 1. 获取分布式锁(2秒自动过期) Boolean lockAcquired = redisTemplate.opsForValue() .setIfAbsent(lockKey, INSTANCE_ID, 2, TimeUnit.SECONDS); if (!Boolean.TRUE.equals(lockAcquired)) { return; // 其他实例正在处理 } try { // 2. 查询到期消息(score从0到当前时间) Set> expiredMessages = redisTemplate.opsForZSet().rangeByScoreWithScores(queueKey, 0, now, 0, 100); if (expiredMessages == null || expiredMessages.isEmpty()) { return; } // 3. 遍历处理每条消息 for (ZSetOperations.TypedTuple tuple : expiredMessages) { processMessage(topic, queueKey, tuple.getValue()); } } finally { redisTemplate.delete(lockKey); } } /** * 处理单条消息 */ private void processMessage(String topic, String queueKey, String value) { // 解析消息 String[] parts = value.split("\\|", 4); String messageId = parts[0]; String key = parts[1]; String messageJson = parts[2]; // 4. 获取消息处理锁(防止重复处理) String processingKey = PROCESSING_KEY_PREFIX + topic + ":" + messageId; Boolean acquired = redisTemplate.opsForValue() .setIfAbsent(processingKey, INSTANCE_ID, 5, TimeUnit.MINUTES); if (!Boolean.TRUE.equals(acquired)) { return; // 消息正在被其他实例处理 } try { // 5. 发送Kafka kafkaTemplate.send(topic, key, messageJson).get(); // 6. 从ZSET中移除 redisTemplate.opsForZSet().remove(queueKey, value); log.info("Redis延迟消息已处理: topic={}, key={}, messageId={}", topic, key, messageId); } catch (Exception e) { // 发送失败,保留消息稍后重试 redisTemplate.delete(processingKey); } } } ``` #### Redis中的实际数据 ``` # 添加3条延迟消息(评价提醒) # 消息1: 1小时后提醒 > ZADD delay:queue:review-topic 1750010000000 "review-001|ORDER-001|{...}|1750010000000" # 消息2: 7天后提醒 > ZADD delay:queue:review-topic 1750800000000 "review-002|ORDER-002|{...}|1750800000000" # 消息3: 30天后提醒 > ZADD delay:queue:review-topic 1753500000000 "review-003|ORDER-003|{...}|1753500000000" # 查看所有消息(按执行时间排序) > ZRANGE delay:queue:review-topic 0 -1 WITHSCORES 1) "review-001|ORDER-001|{...}|1750010000000" 2) "1750010000000" 3) "review-002|ORDER-002|{...}|1750800000000" 4) "1750800000000" 5) "review-003|ORDER-003|{...}|1753500000000" 6) "1753500000000" # 查询已到期的消息(当前时间戳=1750010000001) > ZRANGEBYSCORE delay:queue:review-topic 0 1750010000001 1) "review-001|ORDER-001|{...}|1750010000000" ``` ## 五、实际使用示例 ```java @Service public class OrderDelayService { @Autowired private TimeWheelDelayQueueService timeWheelService; @Autowired private ScheduledTopicDelayQueueService scheduledTopicService; @Autowired private ExternalDelayQueueService externalDelayService; /** * 订单超时自动取消 - 使用时间轮(30秒) */ public void scheduleOrderCancelShort(String orderId) { OrderMessage cancelMsg = new OrderMessage(); cancelMsg.setOrderId(orderId); cancelMsg.setStatus(OrderStatus.CANCELLED); // 30秒后执行 timeWheelService.addDelayTask("order-topic", orderId, cancelMsg, 30 * 1000); } /** * 订单超时自动取消 - 使用延时主题(30分钟) */ public void scheduleOrderCancelMedium(String orderId) { OrderMessage cancelMsg = new OrderMessage(); cancelMsg.setOrderId(orderId); cancelMsg.setStatus(OrderStatus.CANCELLED); // 30分钟后执行 scheduledTopicService.sendToDelayTopic("order-topic", orderId, cancelMsg, 30 * 60 * 1000); } /** * 评价提醒 - 使用Redis(7天后) */ public void scheduleReviewReminder(String orderId) { OrderMessage reminderMsg = new OrderMessage(); reminderMsg.setOrderId(orderId); reminderMsg.setStatus(OrderStatus.COMPLETED); // 7天后执行 externalDelayService.addDelayMessage("review-topic", orderId, reminderMsg, 7L * 24 * 60 * 60 * 1000); } } ``` ## 六、方案总结 | 延迟范围 | 推荐方案 | 理由 | |----------|----------|------| | 0-60秒 | 时间轮算法 | 毫秒级精度,O(1)性能 | | 1-60分钟 | 延时主题 | 无额外依赖,Kafka天然高可用 | | 1小时以上 | Redis ZSET | 高可靠,易监控和扩展 | **生产环境建议:** 1. 短延迟(< 1分钟)使用**时间轮算法**,性能最优 2. 中延迟(1分钟-1小时)使用**延时主题**,简单可靠 3. 长延迟(> 1小时)使用**Redis ZSET**,易于管理 4. 复杂场景可以使用**分层混合架构**,根据延迟时间自动选择方案