别再手动抄代码了!用Python+efinance批量抓取A股全量数据(附完整脚本与MongoDB存储方案)
Python量化实战用efinance构建A股全量数据库的工程化实践在量化投资领域数据是策略开发的基石。传统的手动下载和整理股票数据不仅效率低下还容易出错。我曾见过一位量化研究员因为手动复制粘贴错了一行数据导致回测结果完全失真——这个教训价值百万。本文将分享如何用Python的efinance库构建自动化数据管道从零开始搭建本地A股数据库。1. 环境准备与工具链搭建1.1 核心工具选型构建稳定可靠的数据采集系统需要精心挑选工具链。经过多次实践验证我确定了以下技术组合efinance非官方但稳定的金融数据接口相比Tushare更轻量MongoDB文档型数据库天然适合存储非结构化的行情数据Loguru比标准logging更友好的日志工具Tqdm进度条可视化让长时间运行的任务不再盲跑安装这些工具只需一行命令pip install efinance loguru tqdm pymongo提示建议使用Python 3.8环境某些库在新版本Python中可能有兼容性问题1.2 工程化目录结构好的项目结构能大幅降低维护成本。这是我常用的目录布局stock_data_pipeline/ ├── config/ # 配置文件 ├── logs/ # 日志文件 ├── src/ # 源代码 │ ├── crawler.py # 数据采集主程序 │ └── utils.py # 工具函数 └── requirements.txt # 依赖清单2. 高效数据采集策略2.1 股票代码批量获取efinance的get_realtime_quotes()能获取全部活跃股票列表但实际使用中我发现几个优化点import efinance as ef def fetch_stock_codes(): 获取全量股票代码并去重 df ef.stock.get_realtime_quotes() # 过滤掉B股和新三板 codes df[ (df[市场类型] 沪深A股) (~df[股票代码].str.startswith(8)) ][股票代码].unique() return codes.tolist()2.2 历史行情数据下载直接循环请求容易被封IP需要加入以下防护措施随机延时2-5秒自动重试机制异常捕获与日志记录from loguru import logger from time import sleep import random def safe_fetch_history(stock_code, retry3): 带防护措施的历史数据下载 for i in range(retry): try: df ef.stock.get_quote_history(stock_code) return df except Exception as e: logger.error(f{stock_code} 第{i1}次失败: {str(e)}) sleep(random.uniform(2, 5)) return None3. MongoDB存储优化方案3.1 批量写入提升性能逐条插入是MongoDB的性能杀手。这是我优化后的批量写入方案from pymongo import UpdateOne def bulk_upsert(collection, data_list): 批量更新插入操作 operations [ UpdateOne( {_id: item[_id]}, {$set: item}, upsertTrue ) for item in data_list ] if operations: collection.bulk_write(operations)3.2 数据模型设计合理的文档结构能提升查询效率。我的设计原则是以股票代码作为_id主键将K线数据按日期嵌套存储添加元数据方便检索示例文档结构{ _id: 600519, name: 贵州茅台, industry: 酿酒, daily_data: { 2023-01-04: { open: 1800.0, close: 1820.5, volume: 32500 } } }4. 实战构建完整数据管道4.1 主程序架构将各个模块组合成完整工作流from tqdm import tqdm def run_pipeline(): codes fetch_stock_codes() client pymongo.MongoClient() db client[stock_db] with tqdm(codes) as pbar: for code in pbar: pbar.set_description(f处理 {code}) data safe_fetch_history(code) if data is not None: processed process_data(code, data) bulk_upsert(db.daily, [processed]) sleep(random.uniform(1, 3))4.2 异常处理与监控添加以下保障措施确保长时间运行稳定断点续传记录已处理的股票代码内存监控定期检查内存使用情况心跳检测每100只股票输出一次状态报告import psutil def memory_guard(threshold0.9): 内存保护机制 if psutil.virtual_memory().percent threshold: logger.warning(内存使用过高暂停处理) sleep(60)5. 进阶优化技巧5.1 分布式采集方案当需要采集全市场多年数据时单机可能需数天时间。可以考虑按股票代码分段多进程处理使用Redis作为任务队列云函数动态扩展采集节点from multiprocessing import Pool def distributed_crawl(): codes fetch_stock_codes() with Pool(processes4) as pool: pool.map(process_stock, codes)5.2 数据质量检查采集完成后应进行完整性验证检查每只股票的数据量是否合理验证关键字段无空值对比最新数据与公开源是否一致def validate_data(db): 数据质量检查 problematic [] for code in db.daily.distinct(_id): count db.daily.count_documents({_id: code}) if count 100: # 假设正常股票至少有100个交易日数据 problematic.append(code) return problematic在实际项目中这套系统帮我节省了数百小时的手工操作时间。最关键的体会是不要追求一次性完美先构建最小可行方案再逐步迭代优化。比如最初可以只采集收盘价等管道稳定后再扩展其他字段。