# kafkaTask **Repository Path**: TengOne/kafka-task ## Basic Information - **Project Name**: kafkaTask - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2026-03-27 - **Last Updated**: 2026-03-28 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Kafka Task Consumer 基于Kafka的异步任务处理系统,用于替代Celery任务。 ## 概述 Kafka Task Consumer是一个基于Kafka SDK的异步任务处理系统,旨在替代原有的Celery + RabbitMQ架构。它提供了以下优势: - **标准化消息格式**: 使用统一的标准化消息格式,便于跨系统集成 - **分布式追踪**: 内置分布式Trace ID追踪 - **高性能**: 基于Kafka的高吞吐量消息处理 - **可扩展性**: 支持水平扩展,轻松应对高并发场景 - **监控指标**: 内置丰富的监控指标和统计信息 ## 架构对比 ### 原有架构 (Celery + RabbitMQ) ``` 前端应用 → Celery Task → RabbitMQ → Celery Worker ↓ 异步处理(下载页面、邮件发送、数据抽取等) ``` ### 新架构 (Kafka Task Consumer) ``` 前端应用 → Kafka Producer → Kafka Topic → Kafka Task Consumer ↓ 标准化消息处理(下载页面、邮件发送、数据抽取等) ``` ## 快速开始 ### 1. 环境准备 ```bash # 复制环境配置文件 cp .env.template .env # 编辑 .env 文件,配置必要的参数 # 主要需要配置: # - Kafka服务器地址 (BOOTSTRAP_SERVERS) # - Redis连接信息 (REDIS_*) # - 邮件服务配置 (SMTP_*) # - 抽取模块URL (EXTRACTOR_URL) ``` ### 2. 启动消费者 ```bash # 方法1: 使用启动脚本(推荐) chmod +x run_consumer.sh ./run_consumer.sh # 方法2: 直接运行Python脚本 python3 start_consumer.py # 方法3: 自定义消费者ID python3 start_consumer.py --consumer-id "my-consumer-001" ``` ### 3. 查看运行状态 消费者启动后会输出以下信息: ``` [INFO] Kafka Task Consumer 已启动 [INFO] 消费者ID: kafka-task-consumer-1648000000 [INFO] 消费者组: tf-data-task-consumer-group [INFO] 订阅主题: ['tfdata_referral_reward', 'tfdata_register_email_code', 'tfdata_download_page', 'tfdata_extractor'] [INFO] Kafka服务器: 192.168.2.10:19092 ``` ## 任务处理器 Kafka Task Consumer包含以下任务处理器,对应原有的Celery任务: ### 1. 推荐奖励邮件处理器 (`ReferralRewardHandler`) - **对应Celery任务**: `send_referral_reward_email` - **Kafka主题**: `tfdata_referral_reward` - **消息格式**: ```json { "trace_id": "uuid", "task_id": "referral_reward_task_001", "timestamp": "2024-01-01T12:00:00Z", "payload": { "referrer_email": "user@example.com", "referrer_username": "user123", "new_username": "newuser456", "reward_points": 100 }, "metadata": { "message_type": "referral_reward", "version": "1.0", "priority": 5 } } ``` ### 2. 注册验证码邮件处理器 (`RegisterEmailCodeHandler`) - **对应Celery任务**: `send_register_email_code` - **Kafka主题**: `tfdata_register_email_code` - **消息格式**: ```json { "trace_id": "uuid", "task_id": "register_code_task_001", "timestamp": "2024-01-01T12:00:00Z", "payload": { "email": "user@example.com", "code": "123456" }, "metadata": { "message_type": "register_email_code", "version": "1.0", "priority": 7 } } ``` ### 3. 下载页面处理器 (`DownloadPageHandler`) - **对应Celery任务**: `download_page` - **Kafka主题**: `tfdata_download_page` - **消息格式**: 包含完整的任务信息、请求配置和页面数据 ### 4. 抽取任务处理器 (`ExtractorHandler`) - **对应Celery任务**: `extractor` - **Kafka主题**: `tfdata_extractor` - **消息格式**: 包含任务信息和页面数据,提交到抽取模块 ## 从Celery迁移指南 ### 1. 生产者端迁移 原有的Celery调用需要改为Kafka生产者调用: #### 原有Celery调用 ```python from celery_app.app.tasks import send_referral_reward_email, send_register_email_code, download_page, extractor # 发送推荐奖励邮件 send_referral_reward_email.delay( referrer_email="user@example.com", referrer_username="user123", new_username="newuser456", reward_points=100 ) # 发送注册验证码 send_register_email_code.delay(email="user@example.com", code="123456") # 下载页面 download_page.delay(data) ``` #### 新的Kafka调用 ```python from kafkaSdk import KafkaProducer, KafkaConfig # 创建Kafka生产者 config = KafkaConfig(bootstrap_servers="192.168.2.10:19092") producer = KafkaProducer(config) # 发送推荐奖励消息 producer.send_standardized_message( topic="tfdata_referral_reward", message_type="referral_reward", payload={ "referrer_email": "user@example.com", "referrer_username": "user123", "new_username": "newuser456", "reward_points": 100 }, task_id="referral_reward_task_001", priority=5 ) # 发送注册验证码消息 producer.send_standardized_message( topic="tfdata_register_email_code", message_type="register_email_code", payload={ "email": "user@example.com", "code": "123456" }, task_id="register_code_task_001", priority=7 ) # 发送下载页面消息 producer.send_standardized_message( topic="tfdata_download_page", message_type="download_page", payload=data, # 完整的任务数据 task_id=data.get("task", {}).get("id", "download_task_001") ) producer.close() ``` ### 2. 配置迁移 #### 原有Celery配置 (celery_app/app/settings.py) ```python # RabbitMQ配置 RABBITMQ_USERNAME = os.getenv('RABBITMQ_USERNAME') RABBITMQ_PASSWORD = os.getenv('RABBITMQ_PASSWORD') RABBITMQ_HOST = os.getenv('RABBITMQ_HOST') RABBITMQ_PORT = os.getenv('RABBITMQ_PORT') ``` #### 新的Kafka配置 (kafkaTask/.env) ```bash # Kafka配置 BOOTSTRAP_SERVERS=192.168.2.10:19092 SCHEMA_REGISTRY_URL=http://192.168.2.10:8081 KAFKA_CONSUMER_GROUP_ID=tf-data-task-consumer-group ``` ### 3. 部署方式迁移 #### 原有Celery部署 ```bash # 启动Celery worker celery -A app worker -l INFO -Q referral_reward,register_email_code,download_page,extractor # 启动Celery beat(定时任务) celery -A app beat -l INFO ``` #### 新的Kafka Task Consumer部署 ```bash # 启动Kafka任务消费者 ./kafkaTask/run_consumer.sh # 或者直接运行 cd kafkaTask python3 start_consumer.py ``` ## MirrorMaker 2 跨集群同步 当前仓库同时提供了基于 Kafka Connect / MirrorMaker 2 的跨集群同步配置,用于在以下两个 Kafka 集群之间同步消息: - `192.168.2.10:9092`:国内集群 `cn` - `192.168.2.11:9092`:海外集群 `overseas` ### 1. 配置文件说明 - `docker-compose.yml` - 启动两个独立的 MirrorMaker 2 实例 - `mm2-cn-to-overseas.properties` - 单向同步 `cn -> overseas` - 同步 topics: `overseas_download_topic,online_test_overseas_download_topic,parse_template_overseas_download_topic` - `mm2-overseas-to-cn.properties` - 单向同步 `overseas -> cn` - 同步 topics: `overseas_download_result_topic,online_test_overseas_download_result_topic,parse_template_overseas_download_result_topic` ### 2. 启动同步服务 ```bash docker compose up -d ``` 启动后会有两个容器: - `kafka-connect-mm2-cn-to-overseas` - `kafka-connect-mm2-overseas-to-cn` ### 3. 查看运行状态 ```bash docker compose ps docker logs -f kafka-connect-mm2-cn-to-overseas docker logs -f kafka-connect-mm2-overseas-to-cn ``` ### 4. 单独重建某一个方向 ```bash docker compose up -d --force-recreate kafka-connect-mm2-cn-to-overseas docker compose up -d --force-recreate kafka-connect-mm2-overseas-to-cn ``` ### 5. 停止同步服务 ```bash docker compose down ``` ### 6. 常见排查命令 ```bash # 查看正向同步实例实际加载的配置 docker exec kafka-connect-mm2-cn-to-overseas cat /etc/kafka/mm2-cn-to-overseas.properties # 查看反向同步实例实际加载的配置 docker exec kafka-connect-mm2-overseas-to-cn cat /etc/kafka/mm2-overseas-to-cn.properties # 查看最近 200 行日志 docker logs --tail 200 kafka-connect-mm2-cn-to-overseas docker logs --tail 200 kafka-connect-mm2-overseas-to-cn ``` ### 7. 常见问题 1. **报错 `UnknownHostException: kafka`** - 根因通常是 Kafka broker 的 `advertised.listeners` 配成了容器内地址,例如 `kafka:9092` - 需要改成 MirrorMaker 容器可访问的真实地址,例如 `192.168.2.10:9092` 或 `192.168.2.11:9092` 2. **无法创建 topic** - 如果集群是单 broker,必须确保 Connect internal topics 和 MM2 internal topics 的 replication factor 为 `1` - 当前两个 MM2 properties 已显式按单 broker 场景配置 3. **启动后仍然跑旧配置** - 需要强制重建容器 - 执行 `docker compose up -d --force-recreate` 4. **两个方向不能共用同一个 worker group** - 当前配置已拆分为两个独立 group.id 和 internal topics - 不要手动改成相同值,否则两个实例会互相干扰 ## 监控和运维 ### 1. 监控指标 Kafka Task Consumer提供以下监控指标: - **消息接收统计**: `messages_received` - **消息处理统计**: `messages_processed`, `messages_failed`, `messages_retried` - **处理时间**: `average_processing_time_ms` - **消费者状态**: `status` (running, stopped, error) ### 2. 日志查看 消费者输出详细的日志信息,可通过以下方式查看: ```bash # 查看实时日志 tail -f kafkaTask/logs/kafka_task_consumer.log # 查看错误日志 grep -i "error\|exception" kafkaTask/logs/kafka_task_consumer.log # 查看处理统计 grep -i "消费者统计" kafkaTask/logs/kafka_task_consumer.log ``` ### 3. 故障排查 常见问题及解决方案: 1. **消费者无法连接Kafka** - 检查Kafka服务器地址配置 (`BOOTSTRAP_SERVERS`) - 检查网络连接 - 检查Kafka服务状态 2. **消息处理失败** - 检查Redis连接配置 - 检查邮件服务配置 - 检查抽取模块URL 3. **消费者卡死或无响应** - 重启消费者进程 - 检查系统资源(内存、CPU) - 查看详细错误日志 ## 扩展和定制 ### 1. 添加新的任务处理器 如果需要添加新的任务处理器,参考以下步骤: 1. 在 `app/handlers/` 目录下创建新的处理器类 2. 继承 `MessageHandler` 基类 3. 实现 `handle_message` 方法 4. 在 `app/handlers/__init__.py` 中注册处理器 5. 在 `app/consumer.py` 中订阅对应的主题 ### 2. 自定义配置 可以通过修改 `app/config.py` 中的 `KafkaTaskConfig` 类来自定义配置: ```python config = KafkaTaskConfig( bootstrap_servers="custom-kafka-server:9092", consumer_group_id="custom-consumer-group", max_retries=5, enable_metrics=True, enable_tracing=True ) ``` ### 3. 性能调优 根据实际负载调整以下参数: - **批量大小**: `producer_batch_size` - **重试次数**: `max_retries` - **并发消费者**: 启动多个消费者实例 - **主题分区**: 增加Kafka主题分区数 ## 目录结构 ``` kafkaTask/ ├── README.md # 本文档 ├── .env.template # 环境配置模板 ├── requirements.txt # 依赖包列表 ├── run_consumer.sh # 启动脚本 ├── start_consumer.py # Python启动脚本 ├── app/ │ ├── __init__.py # 应用入口 │ ├── config.py # 配置管理 │ ├── consumer.py # 主消费者类 │ └── handlers/ # 任务处理器 │ ├── __init__.py │ ├── referral_reward_handler.py │ ├── register_email_code_handler.py │ ├── download_page_handler.py │ └── extractor_handler.py └── templates/ # 邮件模板(可选) ``` ## 贡献指南 1. Fork 项目 2. 创建功能分支 (`git checkout -b feature/amazing-feature`) 3. 提交更改 (`git commit -m 'Add amazing feature'`) 4. 推送到分支 (`git push origin feature/amazing-feature`) 5. 开启 Pull Request ## 许可证 本项目遵循 MIT 许可证。 ## 技术支持 如有问题或建议,请联系: - 邮箱: tf-data@example.com - 文档: [项目文档](#) - 问题: [GitHub Issues](#)