ads_fk_credit_score.py 优化对比说明

本文档详细说明了 ads_fk_credit_score_parallel.py 相对于原始版本 ads_fk_credit_score.py 的优化改进。

1. 代码结构优化

1.1 类封装

# 原始版本:全局函数
def s06_fk_mongodb2sr(batch_size=500000):
    # ...

# 优化版本:面向对象封装
class CreditScoreProcessor:
    def __init__(self):
        self.batch_size = 500000
        self.max_workers = 4
        # ...

优化点:

2. MongoDB查询优化

2.1 游标优化

# 原始版本
result_cursor = mongo_link.find(
    {
        "interfaceType": {"$in": args.part_list},
        "createTime": {"$gte": args.start_date, "$lt": args.end_date},
        "entrance": 0
    },
    {"creditId": 1, ...}
)

# 优化版本
batch_cursor = (mongo_link.find(query, projection)
    .hint([("createTime", 1)])
    .batch_size(10000)
    .max_time_ms(600000)
    .no_cursor_timeout())

try:
    for doc in batch_cursor:
        yield doc
finally:
    batch_cursor.close()

优化点:

3. 并行处理优化

3.1 多线程处理

# 原始版本:串行处理
def process_and_stream_load_batch(batch):
    result_df = pd.DataFrame(batch)
    # ...

# 优化版本:并行处理
def process_batch_parallel(self, batch: List[Dict]) -> Optional[pd.DataFrame]:
    batch_len = len(batch)
    optimal_batch_size = min(50000, max(1000, batch_len // (self.max_workers * 2)))
    sub_batches = [batch[i:i + optimal_batch_size] for i in range(0, batch_len, optimal_batch_size)]
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        futures = {executor.submit(self._process_sub_batch_stream, sub_batch): sub_batch 
                  for sub_batch in sub_batches}

优化点:

4. 内存管理优化

4.1 流式处理

# 原始版本:一次性加载
result_df = pd.DataFrame(batch)

# 优化版本:流式处理
def _process_docs_stream(self, docs: Iterator[Dict]) -> Generator[pd.DataFrame, None, None]:
    batch = []
    batch_size = 0
    max_batch_size_bytes = 100 * 1024 * 1024  # 100MB
    
    for doc in docs:
        doc_size = sys.getsizeof(str(doc))
        if batch_size + doc_size > max_batch_size_bytes and batch:
            if processed_df := self.process_batch_parallel(batch):
                yield processed_df
            batch = []
            batch_size = 0
            gc.collect()

优化点:

5. Stream Load优化

5.1 重试机制和错误处理

# 原始版本
result = subprocess.run(curl_command, capture_output=True, text=True)
if result.returncode == 0:
    logger.info(f"第 {idx + 1} 批数据插入成功")

# 优化版本
max_retries = 3
retry_count = 0
while retry_count < max_retries:
    try:
        result = subprocess.run(curl_command, 
                              capture_output=True, 
                              text=True,
                              timeout=300)
        if result.returncode == 0:
            response = json.loads(result.stdout)
            if response.get("Status") == "Success":
                logger.info(f"成功导入 {len(chunk)} 条记录")
                break
        retry_count += 1
        time.sleep(5 * retry_count)  # 指数退避

优化点:

6. 日志和监控优化

6.1 增强的日志记录

# 原始版本
logger.error(f"处理数据时出错: {str(e)}")

# 优化版本
logger.error(f"处理第 {chunk_idx}/{len(chunks)} 批数据时出错: {str(e)}")
logger.error(traceback.format_exc())

优化点:

7. 数据验证

7.1 新增结果验证功能

def validate_results(self, original_df: pd.DataFrame, parallel_df: pd.DataFrame) -> bool:
    # 检查行数是否一致
    if len(original_df) != len(parallel_df):
        logger.error(f"行数不一致: 原始={len(original_df)}, 并行={len(parallel_df)}")
        return False
    
    # 检查关键字段
    key_columns = ['credit_id', 'source_id', 'event_code']
    for col in key_columns:
        if not original_df[col].equals(parallel_df[col]):
            logger.error(f"关键字段{col}不一致")
            return False
    
    return True

优化点:

业务逻辑保持

在所有优化中,保持了原有的业务逻辑不变:

  1. 数据处理流程不变
  2. 配置参数保持
  3. 输出结果一致

性能提升

我预估的优化后的版本在以下方面实现了性能提升:

  1. 查询性能
  2. 处理速度
  3. 资源利用