以下是基于Python、Redis队列和MongoDB的分布式并发写入方案,适用于5000个接口数据处理:
方案架构
- 技术栈
• 任务队列:Redis(List结构实现FIFO队列,支持分布式消费)。
• 并发处理:Python多进程/线程(或异步框架如FastAPI+协程)。
• 存储层:MongoDB(分片集群或副本集,支持高并发写入)。
核心步骤
- 数据入队
• 接口请求:
◦ 用Python遍历5000个接口,将每个接口URL/参数作为任务存入Redis队列(如task_queue)。
◦ 示例代码: import redis r = redis.Redis(host='localhost', port=6379, db=0) for url in interface_urls: # interface_urls为5000个接口列表 r.lpush('task_queue', url) 2. 分布式消费者(多进程/线程)
• 启动多个工作进程:
◦ 使用multiprocessing.Pool或concurrent.futures.ThreadPoolExecutor创建worker,从Redis队列中拉取任务并处理。
◦ 关键逻辑: import requests from pymongo import MongoClient
def worker(): mongo_client = MongoClient('mongodb://localhost:27017/') db = mongo_client['mydb'] collection = db['data']
while True:
task = r.brpop('task_queue', timeout=0)[1].decode() # 阻塞式取任务
if not task:
break
try:
# 请求接口
response = requests.get(task, timeout=10)
data = response.json()
# 写入MongoDB(批量写入提升性能)
collection.insert_one(data)
except Exception as e:
# 失败任务重新入队或记录日志
r.lpush('task_queue', task)
print(f"任务失败:{task}, 错误:{e}")
- 性能优化
• 批量写入MongoDB: 使用collection.insert_many()批量处理多个数据(如每100条提交一次)。
• 连接池管理:
◦ 为MongoDB创建连接池(pymongo.MongoClient(maxPoolSize=100)),避免频繁创建连接。
◦ Redis连接可复用,或使用连接池(redis.ConnectionPool)。
• 任务分片: 若为分布式集群,不同服务器的worker监听同一队列,自动负载均衡。
关键组件
- Redis队列设计
• 队列类型:
◦ task_queue:存储待处理的接口任务(字符串)。
◦ error_queue:存储失败任务,便于重试(可选)。
• 原子操作: 使用brpop(阻塞式弹出)避免任务重复消费,确保每个任务仅被一个worker处理。
- MongoDB集群配置
• 分片集群: 当数据量/写入并发极高时,采用MongoDB分片(Sharding),按业务字段(如接口ID)分片,提升写入吞吐量。
• 写入策略: 使用w=1(默认,主节点确认)提升写入速度,若需强一致性可设为w=majority。
扩展建议
-
异步框架(更高并发): 使用aiohttp(异步请求接口)+ asyncio + redis-py-cluster(异步Redis客户端),配合motor(异步MongoDB驱动)实现全异步处理。
-
任务优先级: 若部分接口数据需优先处理,可使用Redis有序集合(ZSET)按优先级排序任务。
-
监控与告警:
◦ 监控Redis队列长度(避免积压)和MongoDB写入延迟(db.currentOp())。
◦ 使用Prometheus+Grafana或Python的logging模块记录任务成功率、耗时等指标。
示例流程总结 接口列表 → Redis队列 → 多worker并发消费 → 接口请求 → 数据清洗 → MongoDB批量写入 此方案通过Redis队列解耦任务生产与消费,利用Python多进程/异步特性实现分布式并发,适合大规模数据的可靠写入。根据实际服务器资源调整worker数量(建议为CPU核心数2-4倍),并通过压测优化MongoDB索引和写入批次大小。
以下是从数据库取URL、入队、消费队列到写入MongoDB的完整Python实现方案,包含详细代码和流程说明:
一、从数据库获取URL(以MySQL为例)
假设URL存储在MySQL数据库的interface_urls表中,字段包含id和url。
- 数据库连接配置
config.py
DB_CONFIG = { "host": "localhost", "user": "root", "password": "your_password", "database": "your_db", "charset": "utf8mb4" }
REDIS_CONFIG = { "host": "localhost", "port": 6379, "db": 0, "password": "" # 若有密码请添加 }
MONGODB_CONFIG = { "uri": "mongodb://localhost:27017/", "database": "data_db", "collection": "interface_data" } 2. 从MySQL获取URL
fetch_urls.py
import pymysql from config import DB_CONFIG
def get_interface_urls(): """从数据库获取所有接口URL""" try: conn = pymysql.connect(**DB_CONFIG) with conn.cursor() as cursor: sql = "SELECT url FROM interface_urls;" cursor.execute(sql) urls = [row[0] for row in cursor.fetchall()] return urls except Exception as e: print(f"获取URL失败: {e}") raise finally: conn.close()
if name == "main": urls = get_interface_urls() print(f"成功获取 {len(urls)} 个URL") 二、将URL存入Redis队列
使用Redis的列表(List)结构实现任务队列,支持FIFO顺序消费。
- 入队逻辑
push_to_redis.py
import redis from config import REDIS_CONFIG, get_interface_urls # 导入URL获取函数
def push_urls_to_queue(urls): """将URL列表存入Redis队列""" try: pool = redis.ConnectionPool(REDIS_CONFIG) r = redis.Redis(connection_pool=pool) # 使用LPUSH从队列头部批量入队(也可用RPUSH从尾部入队,消费时对应BRPOP) r.lpush("task_queue", urls) # urls解包为多个参数 print(f"成功入队 {len(urls)} 个任务") except Exception as e: print(f"入队失败: {e}") raise
if name == "main": urls = get_interface_urls() push_urls_to_queue(urls) 2. 关键说明
• 队列名称:使用task_queue作为主队列,失败任务可存入failed_task_queue。
• 批量操作:lpush key url1 url2 url3 一次性入队多个任务,比循环单个入队更高效。
三、Python并发消费队列并写入MongoDB
使用多进程(适用于CPU密集型)或多线程(适用于IO密集型,如网络请求)处理任务。
- 多进程Worker实现(推荐)
worker_process.py
import os import requests import redis from pymongo import MongoClient from config import REDIS_CONFIG, MONGODB_CONFIG from typing import Optional
class Worker: def init(self): self.redis_pool = redis.ConnectionPool(**REDIS_CONFIG) self.mongo_client = MongoClient(MONGODB_CONFIG["uri"]) self.db = self.mongo_client[MONGODB_CONFIG["database"]] self.collection = self.db[MONGODB_CONFIG["collection"]] self.redis = redis.Redis(connection_pool=self.redis_pool)
def process_task(self, url: str) -> Optional[str]:
"""处理单个任务:请求接口 + 写入MongoDB"""
try:
# 1. 请求接口
response = requests.get(url, timeout=10)
response.raise_for_status() # 抛异常处理HTTP错误
data = response.json()
# 2. 写入MongoDB(可添加数据清洗逻辑)
data["_task_url"] = url # 记录来源URL
data["_timestamp"] = int(self.redis.time()[0]) # 添加处理时间戳
self.collection.insert_one(data)
print(f"进程{os.getpid()}成功处理: {url}")
return None # 成功返回None
except Exception as e:
print(f"进程{os.getpid()}处理失败: {url}, 错误: {e}")
return url # 失败返回URL,用于重新入队
def run(self):
"""循环消费队列"""
while True:
# 阻塞式获取任务(无任务时阻塞,避免空转)
result = self.redis.brpop("task_queue", timeout=0) # (b'task_queue', b'url')
if not result:
break # 队列空时退出(需配合主进程控制)
_, url = result
url = url.decode("utf-8")
# 处理任务
failed_url = self.process_task(url)
if failed_url:
# 失败任务重新入队(可添加重试次数限制,如存入失败队列并设置TTL)
self.redis.lpush("failed_task_queue", failed_url)
print(f"失败任务重新入队: {failed_url}")
if name == "main": worker = Worker() worker.run() 2. 启动多进程Worker
main.py
import multiprocessing from worker_process import Worker
def start_workers(num_workers: int = 10): """启动多个Worker进程""" processes = [] for _ in range(num_workers): p = multiprocessing.Process(target=Worker().run) p.daemon = True # 主进程退出时自动终止子进程 p.start() processes.append(p) print(f"启动 {num_workers} 个Worker进程")
# 阻塞主进程,避免退出
for p in processes:
p.join()
if name == "main": # 1. 先执行入队操作(可单独部署为定时任务) # python push_to_redis.py
# 2. 启动Worker进程
start_workers(num_workers=20) # 根据CPU核心数调整,建议2-4倍核心数
四、优化点与异常处理
- 批量写入MongoDB
修改process_task方法中的写入逻辑
def process_task(self, url: str) -> Optional[str]: # ...(请求接口逻辑不变) batch = [data] # 假设每次处理1条数据,可积累到一定数量再批量写入 if len(batch) >= 100: # 每100条批量写入 self.collection.insert_many(batch) batch.clear() # ... 2. 失败任务处理
• 有限重试:为失败任务添加重试次数字段(如retry_count),超过阈值后记录到日志或通知人工处理。
• 独立失败队列:使用failed_task_queue存储失败任务,单独启动少量Worker处理(避免阻塞主队列)。
- 连接池优化
• Redis连接池:全局共享一个连接池,避免频繁创建连接。
• MongoDB连接池:通过MongoClient(maxPoolSize=100)设置连接池大小(默认100)。
五、分布式部署方案
-
多服务器消费: 在多台服务器上运行main.py,所有服务器的Worker共享同一Redis队列,自动实现负载均衡。
-
任务分片(可选): 若接口属于不同业务分组,可按分组创建多个队列(如task_queue_GroupA),避免混合消费导致资源抢占。
六、完整流程验证
-
初始化数据库: 向MySQL的interface_urls表插入5000条URL数据。
-
执行入队: python push_to_redis.py # 输出:成功入队5000个任务
- 查看队列长度: redis-cli LLEN task_queue # 应返回5000
- 启动Worker: python main.py # 启动20个Worker进程开始消费
- 验证MongoDB数据:
检查写入数量
count = self.collection.count_documents({}) print(f"已写入数据:{count}条") 此方案实现了从数据库取数据、队列分发、并发处理到持久化的全流程自动化,可根据实际场景调整Worker数量、批量大小和重试策略,确保高可靠性和性能。