基于 Python 的分布式并发写入方案


以下是基于Python、Redis队列和MongoDB的分布式并发写入方案,适用于5000个接口数据处理:

方案架构

  1. 技术栈

• 任务队列:Redis(List结构实现FIFO队列,支持分布式消费)。

• 并发处理:Python多进程/线程(或异步框架如FastAPI+协程)。

• 存储层:MongoDB(分片集群或副本集,支持高并发写入)。

核心步骤

  1. 数据入队

• 接口请求:

◦ 用Python遍历5000个接口,将每个接口URL/参数作为任务存入Redis队列(如task_queue)。

◦ 示例代码: import redis r = redis.Redis(host='localhost', port=6379, db=0) for url in interface_urls: # interface_urls为5000个接口列表 r.lpush('task_queue', url) 2. 分布式消费者(多进程/线程)

• 启动多个工作进程:

◦ 使用multiprocessing.Pool或concurrent.futures.ThreadPoolExecutor创建worker,从Redis队列中拉取任务并处理。

◦ 关键逻辑: import requests from pymongo import MongoClient

def worker(): mongo_client = MongoClient('mongodb://localhost:27017/') db = mongo_client['mydb'] collection = db['data']

while True:
    task = r.brpop('task_queue', timeout=0)[1].decode()  # 阻塞式取任务
    if not task:
        break
    try:
        # 请求接口
        response = requests.get(task, timeout=10)
        data = response.json()
        # 写入MongoDB(批量写入提升性能)
        collection.insert_one(data)
    except Exception as e:
        # 失败任务重新入队或记录日志
        r.lpush('task_queue', task)
        print(f"任务失败:{task}, 错误:{e}")
  1. 性能优化

• 批量写入MongoDB: 使用collection.insert_many()批量处理多个数据(如每100条提交一次)。

• 连接池管理:

◦ 为MongoDB创建连接池(pymongo.MongoClient(maxPoolSize=100)),避免频繁创建连接。

◦ Redis连接可复用,或使用连接池(redis.ConnectionPool)。

• 任务分片: 若为分布式集群,不同服务器的worker监听同一队列,自动负载均衡。

关键组件

  1. Redis队列设计

• 队列类型:

◦ task_queue:存储待处理的接口任务(字符串)。

◦ error_queue:存储失败任务,便于重试(可选)。

• 原子操作: 使用brpop(阻塞式弹出)避免任务重复消费,确保每个任务仅被一个worker处理。

  1. MongoDB集群配置

• 分片集群: 当数据量/写入并发极高时,采用MongoDB分片(Sharding),按业务字段(如接口ID)分片,提升写入吞吐量。

• 写入策略: 使用w=1(默认,主节点确认)提升写入速度,若需强一致性可设为w=majority。

扩展建议

  1. 异步框架(更高并发): 使用aiohttp(异步请求接口)+ asyncio + redis-py-cluster(异步Redis客户端),配合motor(异步MongoDB驱动)实现全异步处理。

  2. 任务优先级: 若部分接口数据需优先处理,可使用Redis有序集合(ZSET)按优先级排序任务。

  3. 监控与告警:

◦ 监控Redis队列长度(避免积压)和MongoDB写入延迟(db.currentOp())。

◦ 使用Prometheus+Grafana或Python的logging模块记录任务成功率、耗时等指标。

示例流程总结 接口列表 → Redis队列 → 多worker并发消费 → 接口请求 → 数据清洗 → MongoDB批量写入 此方案通过Redis队列解耦任务生产与消费,利用Python多进程/异步特性实现分布式并发,适合大规模数据的可靠写入。根据实际服务器资源调整worker数量(建议为CPU核心数2-4倍),并通过压测优化MongoDB索引和写入批次大小。

以下是从数据库取URL、入队、消费队列到写入MongoDB的完整Python实现方案,包含详细代码和流程说明:

一、从数据库获取URL(以MySQL为例)

假设URL存储在MySQL数据库的interface_urls表中,字段包含id和url。

  1. 数据库连接配置

config.py

DB_CONFIG = { "host": "localhost", "user": "root", "password": "your_password", "database": "your_db", "charset": "utf8mb4" }

REDIS_CONFIG = { "host": "localhost", "port": 6379, "db": 0, "password": "" # 若有密码请添加 }

MONGODB_CONFIG = { "uri": "mongodb://localhost:27017/", "database": "data_db", "collection": "interface_data" } 2. 从MySQL获取URL

fetch_urls.py

import pymysql from config import DB_CONFIG

def get_interface_urls(): """从数据库获取所有接口URL""" try: conn = pymysql.connect(**DB_CONFIG) with conn.cursor() as cursor: sql = "SELECT url FROM interface_urls;" cursor.execute(sql) urls = [row[0] for row in cursor.fetchall()] return urls except Exception as e: print(f"获取URL失败: {e}") raise finally: conn.close()

if name == "main": urls = get_interface_urls() print(f"成功获取 {len(urls)} 个URL") 二、将URL存入Redis队列

使用Redis的列表(List)结构实现任务队列,支持FIFO顺序消费。

  1. 入队逻辑

push_to_redis.py

import redis from config import REDIS_CONFIG, get_interface_urls # 导入URL获取函数

def push_urls_to_queue(urls): """将URL列表存入Redis队列""" try: pool = redis.ConnectionPool(REDIS_CONFIG) r = redis.Redis(connection_pool=pool) # 使用LPUSH从队列头部批量入队(也可用RPUSH从尾部入队,消费时对应BRPOP) r.lpush("task_queue", urls) # urls解包为多个参数 print(f"成功入队 {len(urls)} 个任务") except Exception as e: print(f"入队失败: {e}") raise

if name == "main": urls = get_interface_urls() push_urls_to_queue(urls) 2. 关键说明

• 队列名称:使用task_queue作为主队列,失败任务可存入failed_task_queue。

• 批量操作:lpush key url1 url2 url3 一次性入队多个任务,比循环单个入队更高效。

三、Python并发消费队列并写入MongoDB

使用多进程(适用于CPU密集型)或多线程(适用于IO密集型,如网络请求)处理任务。

  1. 多进程Worker实现(推荐)

worker_process.py

import os import requests import redis from pymongo import MongoClient from config import REDIS_CONFIG, MONGODB_CONFIG from typing import Optional

class Worker: def init(self): self.redis_pool = redis.ConnectionPool(**REDIS_CONFIG) self.mongo_client = MongoClient(MONGODB_CONFIG["uri"]) self.db = self.mongo_client[MONGODB_CONFIG["database"]] self.collection = self.db[MONGODB_CONFIG["collection"]] self.redis = redis.Redis(connection_pool=self.redis_pool)

def process_task(self, url: str) -> Optional[str]:
    """处理单个任务:请求接口 + 写入MongoDB"""
    try:
        # 1. 请求接口
        response = requests.get(url, timeout=10)
        response.raise_for_status()  # 抛异常处理HTTP错误
        data = response.json()

        # 2. 写入MongoDB可添加数据清洗逻辑
        data["_task_url"] = url  # 记录来源URL
        data["_timestamp"] = int(self.redis.time()[0])  # 添加处理时间戳
        self.collection.insert_one(data)
        print(f"进程{os.getpid()}成功处理: {url}")
        return None  # 成功返回None

    except Exception as e:
        print(f"进程{os.getpid()}处理失败: {url}, 错误: {e}")
        return url  # 失败返回URL用于重新入队

def run(self):
    """循环消费队列"""
    while True:
        # 阻塞式获取任务无任务时阻塞避免空转
        result = self.redis.brpop("task_queue", timeout=0)  # (b'task_queue', b'url')
        if not result:
            break  # 队列空时退出需配合主进程控制
        _, url = result
        url = url.decode("utf-8")

        # 处理任务
        failed_url = self.process_task(url)
        if failed_url:
            # 失败任务重新入队可添加重试次数限制如存入失败队列并设置TTL
            self.redis.lpush("failed_task_queue", failed_url)
            print(f"失败任务重新入队: {failed_url}")

if name == "main": worker = Worker() worker.run() 2. 启动多进程Worker

main.py

import multiprocessing from worker_process import Worker

def start_workers(num_workers: int = 10): """启动多个Worker进程""" processes = [] for _ in range(num_workers): p = multiprocessing.Process(target=Worker().run) p.daemon = True # 主进程退出时自动终止子进程 p.start() processes.append(p) print(f"启动 {num_workers} 个Worker进程")

# 阻塞主进程,避免退出
for p in processes:
    p.join()

if name == "main": # 1. 先执行入队操作(可单独部署为定时任务) # python push_to_redis.py

# 2. 启动Worker进程
start_workers(num_workers=20)  # 根据CPU核心数调整,建议2-4倍核心数

四、优化点与异常处理

  1. 批量写入MongoDB

修改process_task方法中的写入逻辑

def process_task(self, url: str) -> Optional[str]: # ...(请求接口逻辑不变) batch = [data] # 假设每次处理1条数据,可积累到一定数量再批量写入 if len(batch) >= 100: # 每100条批量写入 self.collection.insert_many(batch) batch.clear() # ... 2. 失败任务处理

• 有限重试:为失败任务添加重试次数字段(如retry_count),超过阈值后记录到日志或通知人工处理。

• 独立失败队列:使用failed_task_queue存储失败任务,单独启动少量Worker处理(避免阻塞主队列)。

  1. 连接池优化

• Redis连接池:全局共享一个连接池,避免频繁创建连接。

• MongoDB连接池:通过MongoClient(maxPoolSize=100)设置连接池大小(默认100)。

五、分布式部署方案

  1. 多服务器消费: 在多台服务器上运行main.py,所有服务器的Worker共享同一Redis队列,自动实现负载均衡。

  2. 任务分片(可选): 若接口属于不同业务分组,可按分组创建多个队列(如task_queue_GroupA),避免混合消费导致资源抢占。

六、完整流程验证

  1. 初始化数据库: 向MySQL的interface_urls表插入5000条URL数据。

  2. 执行入队: python push_to_redis.py # 输出:成功入队5000个任务

  3. 查看队列长度: redis-cli LLEN task_queue # 应返回5000
  4. 启动Worker: python main.py # 启动20个Worker进程开始消费
  5. 验证MongoDB数据:

检查写入数量

count = self.collection.count_documents({}) print(f"已写入数据:{count}条") 此方案实现了从数据库取数据、队列分发、并发处理到持久化的全流程自动化,可根据实际场景调整Worker数量、批量大小和重试策略,确保高可靠性和性能。