本文档详细说明了 ads_fk_credit_score_parallel.py
相对于原始版本 ads_fk_credit_score.py
的优化改进。
# 原始版本:全局函数
def s06_fk_mongodb2sr(batch_size=500000):
# ...
# 优化版本:面向对象封装
class CreditScoreProcessor:
def __init__(self):
self.batch_size = 500000
self.max_workers = 4
# ...
优化点:
# 原始版本
result_cursor = mongo_link.find(
{
"interfaceType": {"$in": args.part_list},
"createTime": {"$gte": args.start_date, "$lt": args.end_date},
"entrance": 0
},
{"creditId": 1, ...}
)
# 优化版本
batch_cursor = (mongo_link.find(query, projection)
.hint([("createTime", 1)])
.batch_size(10000)
.max_time_ms(600000)
.no_cursor_timeout())
try:
for doc in batch_cursor:
yield doc
finally:
batch_cursor.close()
优化点:
# 原始版本:串行处理
def process_and_stream_load_batch(batch):
result_df = pd.DataFrame(batch)
# ...
# 优化版本:并行处理
def process_batch_parallel(self, batch: List[Dict]) -> Optional[pd.DataFrame]:
batch_len = len(batch)
optimal_batch_size = min(50000, max(1000, batch_len // (self.max_workers * 2)))
sub_batches = [batch[i:i + optimal_batch_size] for i in range(0, batch_len, optimal_batch_size)]
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(self._process_sub_batch_stream, sub_batch): sub_batch
for sub_batch in sub_batches}
优化点:
# 原始版本:一次性加载
result_df = pd.DataFrame(batch)
# 优化版本:流式处理
def _process_docs_stream(self, docs: Iterator[Dict]) -> Generator[pd.DataFrame, None, None]:
batch = []
batch_size = 0
max_batch_size_bytes = 100 * 1024 * 1024 # 100MB
for doc in docs:
doc_size = sys.getsizeof(str(doc))
if batch_size + doc_size > max_batch_size_bytes and batch:
if processed_df := self.process_batch_parallel(batch):
yield processed_df
batch = []
batch_size = 0
gc.collect()
优化点:
# 原始版本
result = subprocess.run(curl_command, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"第 {idx + 1} 批数据插入成功")
# 优化版本
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
result = subprocess.run(curl_command,
capture_output=True,
text=True,
timeout=300)
if result.returncode == 0:
response = json.loads(result.stdout)
if response.get("Status") == "Success":
logger.info(f"成功导入 {len(chunk)} 条记录")
break
retry_count += 1
time.sleep(5 * retry_count) # 指数退避
优化点:
# 原始版本
logger.error(f"处理数据时出错: {str(e)}")
# 优化版本
logger.error(f"处理第 {chunk_idx}/{len(chunks)} 批数据时出错: {str(e)}")
logger.error(traceback.format_exc())
优化点:
def validate_results(self, original_df: pd.DataFrame, parallel_df: pd.DataFrame) -> bool:
# 检查行数是否一致
if len(original_df) != len(parallel_df):
logger.error(f"行数不一致: 原始={len(original_df)}, 并行={len(parallel_df)}")
return False
# 检查关键字段
key_columns = ['credit_id', 'source_id', 'event_code']
for col in key_columns:
if not original_df[col].equals(parallel_df[col]):
logger.error(f"关键字段{col}不一致")
return False
return True
优化点:
在所有优化中,保持了原有的业务逻辑不变:
我预估的优化后的版本在以下方面实现了性能提升: