借鉴akshare设计模式,用Python构建东方财富期权期货数据采集器
1. 为什么需要模块化数据采集器做量化交易的朋友都知道获取实时、准确的期权期货数据是第一步。东方财富网作为国内主流金融数据源提供了丰富的市场行情数据。但直接爬取网页数据会遇到几个典型问题网页结构经常变动导致爬虫失效缺乏统一的数据清洗和格式化处理不同品种的数据接口差异大异常情况处理不够健壮我在早期做量化策略时经常要花70%的时间处理数据问题。直到发现了akshare这个宝藏库它的模块化设计给了我很大启发。akshare把每个数据接口都封装成独立函数统一返回DataFrame格式异常处理也很完善。这让我意识到与其每次临时写爬虫不如构建一个类似akshare的可持续维护的数据采集框架。2. 理解akshare的设计哲学akshare最值得借鉴的是它的小而美架构设计。我拆解过它的源代码发现几个关键特点功能原子化每个函数只做一件事比如stock_zh_a_spot()只获取A股实时行情接口标准化统一返回pandas.DataFrame列名使用中文方便理解参数显式化所有可配置参数都有明确默认值异常包容性网络请求失败会自动重试数据解析失败会返回空DataFrame而非报错基于这些观察我设计东方财富采集器时确立了三个原则一个品种一个函数如期权、期货分开统一数据输出格式内置智能重试机制3. 构建期权数据采集模块先看期权数据的实现。东方财富的期权接口返回的是JSON格式但字段命名很晦涩如f1,f2。我的处理方式是def option_current_em() - pd.DataFrame: url http://77.push2.eastmoney.com/api/qt/clist/get params { pn: 1, pz: 200000, po: 1, np: 1, ut: bd1d9ddb04089700cf9c27f6f7426281, fltt: 2, invt: 2, fid: f3, fs: m:10,m:140,m:141,m:151, fields: f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f12,f13,f14,f15,f16,f17,f18,f20,f21,f23,f24,f25,f22,f28,f11,f62,f128,f136,f115,f152,f133,f108,f163,f161,f162, _: 1606225274063, } # 加入重试逻辑 for _ in range(3): try: r requests.get(url, paramsparams, timeout5) data_json r.json() break except Exception as e: print(f请求失败重试中...{str(e)}) time.sleep(1) else: return pd.DataFrame()数据清洗环节特别注意了几点字段映射将f1,f2转换为最新价等中文列名类型转换所有数值字段强制转为数字类型错误值设为NaN列筛选只保留核心交易指标去掉无用字段temp_df temp_df[[ 代码, 名称, 最新价, 涨跌额, 涨跌幅, 成交量, 成交额, 持仓量, 行权价, 剩余日, 日增, 昨结, 今开, 市场标识 ]]4. 期货数据采集的差异化处理期货接口与期权有三点不同需要指定交易所参数上期所、郑商所等返回数据是JSONP格式需要特殊处理字段命名规则不一致我的解决方案是用字典映射交易所名称和代码用字符串切片去除JSONP回调函数包裹统一字段命名风格def future_current_em(market上期所) - pd.DataFrame: mk_dict { 上期所: 113, 大商所: 114, 郑商所: 115, 上期能源: 142, 中金所: 220 } # 处理JSONP响应 r requests.get(url, paramsparams) jsonDecoder json.decoder.JSONDecoder() data jsonDecoder.decode(r.text[13:-1])[list]特别提醒期货接口的pageSize参数建议设置足够大如20000避免分页问题。我遇到过因为默认只返回20条导致数据不全的情况。5. 异常处理与日志记录金融数据采集最怕的就是静默失败。我总结了几个常见故障点网络波动添加重试机制3次失败才放弃接口变更监控返回数据格式发现异常立即告警数据质量检查关键字段是否存在空值建议在项目中加入日志记录import logging logging.basicConfig( filenamedata_collector.log, levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s ) try: df option_current_em() if df.empty: logging.warning(期权数据返回为空) except Exception as e: logging.error(f期权数据采集失败: {str(e)})6. 性能优化实践当需要采集多个品种时同步请求会很慢。我后来改用了异步IO方案速度提升明显import aiohttp import asyncio async def fetch_data(session, url, params): async with session.get(url, paramsparams) as response: return await response.json() async def get_all_futures(): async with aiohttp.ClientSession() as session: tasks [] for market in [上期所, 郑商所, 大商所]: task asyncio.create_task(fetch_future(session, market)) tasks.append(task) return await asyncio.gather(*tasks)另外发现几个优化点设置合理的超时时间5-10秒使用连接池复用HTTP连接对不变的基础参数使用类变量存储7. 数据质量监控方案采集到数据只是开始如何确保数据质量才是关键。我建立了三层校验机制基础校验检查DataFrame是否为空列是否缺失业务校验检查价格是否在合理范围内比如期货价格不应为0趋势校验检查当前数据与历史数据的波动是否异常实现示例def validate_data(df, product_type): # 基础校验 if df.empty: raise ValueError(空数据) required_cols { option: [代码, 最新价, 成交量], future: [代码, 最新价, 持仓量] } if not set(required_cols[product_type]).issubset(df.columns): raise ValueError(缺失必要字段) # 业务校验 if (df[最新价] 0).any(): raise ValueError(存在无效价格)8. 项目架构建议经过多次迭代我总结出一个可扩展的架构设计eastmoney_collector/ │── constants.py # 存储接口URL、参数等常量 │── options.py # 期权数据采集 │── futures.py # 期货数据采集 │── utils.py # 通用工具函数 │── exceptions.py # 自定义异常 │── tests/ # 单元测试这种结构的好处是新增品种只需添加新模块常量集中管理修改方便便于编写单元测试比如新增股票数据采集时只需新建stocks.py并实现相同规范的函数即可。