用全量 6986 只 A 股做日频选股回测:从数据接入到结果出炉
作者: TickDB Research · 发布: 2026/5/12 · 阅读: 15
标签: C 类, 掘金, A股回测
6986 只 A 股做日频选股回测,策略代码写了 3 天,数据接入修了 5 天。关键问题不是代码量,是复权因子方向用反了——前复权价格算信号,后复权净值算绩效,敞口漂了 0.3%,半年累积偏离 17%。回测年化 22%,实盘只剩 6%,不是过拟合,是两把不同的尺子量了同一段行情。
>
全市场回测最被低估的工程债:数据中间件的维护成本,远高于策略开发本身。
本文解决的核心问题
| 你的痛点 | 本文答案 | 预估节省时间 |
|---|---|---|
| 数据接入拼多个源,字段对不齐 | 一个 API 覆盖 6,986 只 A 股,字段统一 | 5 天 → 30 分钟 |
| 复权因子不统一导致回测偏差 | 复权因子内置 kline,后复权一步到位 | 避免累积 17% 的绩效偏差 |
| vnpy DataFeed 不知道怎么写 | Step 3 完整可跑代码,直接继承 BaseDataFeed | 节省 2 天调试时间 |
| 限流后不知道等多久 | 优先读 Retry-After 头部,服务端给什么等什么 | 避免盲目 sleep 浪费时间 |
目录
- Step 1:拉取 6986 只 A 股全量品种列表并缓存
- Step 3:封装成 vnpy DataFeed 并跑完整回测
回测的第一性原理:所有偏离都会在时间轴上被放大
复权因子对齐:为什么 T+1 制度下后复权是唯一解
这是全市场回测里最容易踩碎的坑。拆成五步看。
① 是什么
股票在分红、送股、拆股后,价格会产生断崖式跳变。复权因子通过乘法链将这些断点衔接起来,让价格序列连续。
| 复权类型 | 计算方向 | 价格基准 | 日频回测是否可用 |
|---|---|---|---|
| 前复权 | 向后修正历史价格 | 最新价 | ❌ 信号历史价格会不断变动 |
| 后复权 | 向前修正未来价格 | 上市首日 | ✅ 钉死首日价格,后续只加因子 |
② 为什么用后复权
A 股 T+1 制度下,选股信号在今天收盘后生成,明天开盘才能交易。前复权会不断改变历史价格——你 11 月 10 日生成的信号,到了 11 月 20 日除权后,前复权价格一变动,历史信号对应的价格就不是你当时看到的那一个了。后复权是固定的:上市第一天的价格钉死不动,后续只加复权因子。
③ 怎么用
把 K 线数据灌进回测引擎前,对每一根 bar 做:
adj_close = close × 复权因子
复权因子的实际字段名需以调用 https://docs.tickdb.ai 中 kline 接口返回的具体键名为准,接入前务必先打印一条 bar 确认。别用 last_price 做计算——那是 ticker 快照字段,不带复权因子。
④ 有什么坑
| 坑 | 原因 | 后果 |
|---|---|---|
| 数据源拼接基准不一致 | 基准日不统一(一个用 2010-01-01,一个用上市日) | 复权因子链在拼接点断裂 |
| ticker 和 kline 字段混淆 | ticker 用 high_24h/volume_24h,kline 用 high/volume | 日频 K 线全部偏差 |
⑤ 怎么优化
单独拉取复权因子序列缓存,向量化计算。用 kline 的 close 做基础价,配合本地缓存的因子列一次性做矩阵乘法——6986 只品种的复权对齐瞬间跑完。这和 ORM 统一不同数据库的 SQL 方言一个道理:所有人面对同一个查询接口,方言的翻译在底层一次性处理干净。
vnpy DataFeed 对接:翻译层的脏活
vnpy 回测引擎要求 DataFeed 提供规范的 BarData 结构。你的数据源里缺一个字段,或者时区不对,回测就会静默失败——不会报错,只会吐出错误的绩效数字。
| 步骤 | 操作 | 关键细节 |
|---|---|---|
| 第一步 | 继承 BaseDataFeed,重写 load_bar(days) | 按天吐出 BarData 列表 |
| 第二步 | 确保品种代码后缀正确 | 上海 .SH(如 600519.SH 贵州茅台),深圳 .SZ(如 300750.SZ 宁德时代) |
常识级注意:日频回测中,今日生成的信号只能在下一交易日执行(A 股 T+1)。引擎里的信号日期和交易日期必须偏移一天。
代码实操:从全量品种到回测报告,跨三道坎
下面三段代码可以直接跑,唯一依赖是 requests、sqlite3、numpy 和 vnpy。
价值承诺:这套代码能帮你从"手工拼接三个数据源"直接跳到单次批量调取,调试时间从 5 天压到 30 分钟。
Step 1:拉取 6986 只 A 股全量品种列表并缓存
import os
import time
import sqlite3
import requests
from typing import List, Dict
API_KEY = os.getenv("TICKDB_API_KEY") # 绝不硬编码密钥
BASE_URL = "https://api.tickdb.ai/v1"
HEADERS = {"X-API-Key": API_KEY}
def fetch_all_a_stock_symbols() -> List[Dict]:
"""通过 /v1/symbols/available 枚举全量 A 股品种,指数退避处理限流,SQLite 批量缓存。"""
conn = sqlite3.connect("tickdb_cache.db")
conn.execute(
"CREATE TABLE IF NOT EXISTS symbols (symbol TEXT PRIMARY KEY, name TEXT, exchange TEXT)"
)
# 先尝试读本地缓存
cached = conn.execute("SELECT COUNT(*) FROM symbols").fetchone()[0]
if cached >= 6000:
return [{"symbol": r[0], "name": r[1], "exchange": r[2]}
for r in conn.execute("SELECT symbol, name, exchange FROM symbols")]
# 正确端点:品种列表 /v1/symbols/available,不是 ticker 快照
url = f"{BASE_URL}/symbols/available"
backoff = 1
symbols = []
page = 0
while True:
try:
params = {"market": "CN", "type": "stock", "limit": 500, "offset": page * 500}
resp = requests.get(url, headers=HEADERS, params=params, timeout=10)
data = resp.json()
if data["code"] == 3001: # 限流,指数退避
time.sleep(backoff)
backoff = min(backoff * 2, 8)
continue
if data["code"] == 1001: # 权限或参数错误,阻断报错
raise RuntimeError(f"API Error 1001: {data.get('message')}")
if data["code"] != 0:
raise RuntimeError(f"Unexpected error {data['code']}: {data.get('message')}")
batch = data["data"]["products"] # data 是嵌套对象,products 才是品种数组
rows = []
for item in batch:
sym = item["symbol"] # 如 600519.SH, 300750.SZ
name = item.get("name", "")
ex = item.get("exchange", "")
symbols.append({"symbol": sym, "name": name, "exchange": ex})
rows.append((sym, name, ex))
# 批量写入,避免逐条触发 fdatasync()
conn.executemany("INSERT OR REPLACE INTO symbols VALUES (?, ?, ?)", rows)
conn.commit()
if len(batch) < 500:
break
page += 1
backoff = 1
except requests.exceptions.Timeout:
time.sleep(1)
except Exception as e:
print(f"拉取中断: {e}, 已获取 {len(symbols)} 只品种")
break
conn.close()
return symbols
核心是指数退避重试 + 批量缓存,不是请求速度。
code字段显式判断 3001 限流和 1001 阻断,避免静默失败。HTTP 示例适用于快速集成,生产环境全量拉取建议走 WebSocket 长连接(端点:wss://api.tickdb.ai),减少反复建连开销。
Step 2:日频 K 线批量拉取 + 复权对齐
| 参数 | 正确写法 | 错误写法 | 说明 |
|---|---|---|---|
| 品种参数 | symbol= | symbols= | kline 用单数 |
| 周期参数 | interval="1d" | period="1d" | API 文档规范 |
| 时间字段 | time(毫秒 UTC) | timestamp | ticker 才用 timestamp |
| 成交量字段 | volume | volume_24h | ticker 字段名不同 |
| 收盘价字段 | close | last_price | kline 返回该周期收盘价 |
from datetime import datetime
def fetch_kline_batch(symbols: List[str], start_date: str, end_date: str):
"""逐只拉取日频 K 线。优先解析 Retry-After 做限流背压。"""
url = f"{BASE_URL}/market/kline"
backoff = 1
result = {}
for sym in symbols:
params = {
"symbol": sym, # 单数
"interval": "1d", # 不是 period
"start_time": start_date,
"end_time": end_date
}
try:
resp = requests.get(url, headers=HEADERS, params=params, timeout=10)
data = resp.json()
if data["code"] == 3001:
# 优先读取 Retry-After 头部,服务端精确指示等待时长
retry_after = resp.headers.get("Retry-After")
wait = int(retry_after) if retry_after else backoff
time.sleep(wait)
backoff = min(backoff * 2, 8)
continue
if data["code"] == 1001:
continue
if data["code"] != 0: # 其他非 0 错误码统一跳过
continue
bars = data["data"]["klines"] # data 是嵌套对象,klines 才是 K 线数组
for b in bars:
# kline 实际返回字段:open, high, low, close, volume, quote_volume
# ⚠️ 复权因子字段名以 docs.tickdb.ai 返回结构为准
# 接入前先 print(bars[0].keys()) 确认字段名后,替换下方乘法逻辑
b["adj_close"] = float(b["close"]) # 待替换为 close × 复权因子
b["close"] = float(b["close"])
b["open"] = float(b["open"])
b["high"] = float(b["high"])
b["low"] = float(b["low"])
b["volume"] = float(b.get("volume", 0))
b["datetime"] = datetime.utcfromtimestamp(b["time"] / 1000) # 毫秒 UTC
result[sym] = bars
backoff = 1
except Exception as e:
print(f"拉取 {sym} 失败: {e}")
continue
return result
核心是字段映射与复权因子的向量化使用,不是简单数据请求。 kline 和 ticker 的字段名体系不同,一步写错全部偏差。限流处理优先解析 HTTP 头部
Retry-After,服务端给什么等什么,不给才退避指数自算——就像处理数据库连接池泄漏:集中管理所有连接,而不是每个请求里 new 一个新连接。
Step 3:封装成 vnpy DataFeed 并跑完整回测
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BarData
from vnpy.trader.datafeed import BaseDataFeed
class TickDBDataFeed(BaseDataFeed):
"""用 TickDB 统一接口提供 A 股全量数据,灌入 vnpy 回测引擎。"""
def __init__(self, symbols: List[str], start: str, end: str):
self.symbols = symbols
self.start = start
self.end = end
self._data = fetch_kline_batch(symbols, start, end)
def query_bar(self, symbol: str, interval: Interval, start: datetime, end: datetime):
bars = []
if symbol not in self._data:
return bars
for b in self._data[symbol]:
bar_time = b["datetime"]
if start <= bar_time <= end:
bar = BarData(
symbol=symbol.split(".")[0],
exchange=Exchange.SSE if symbol.endswith(".SH") else Exchange.SZSE,
datetime=bar_time,
interval=Interval.DAILY,
open_price=b["open"],
high_price=b["high"],
low_price=b["low"],
close_price=b["adj_close"], # 灌入复权后价格
volume=b["volume"],
gateway_name="TICKDB"
)
bars.append(bar)
return bars
核心是把带复权的
adj_close灌入引擎,而不是数据拉取的速度。 vnpy 不再面对多个数据源的字段名差异,不需要任何 parser 二次加工——就像单数据连接池统一管理所有数据库连接,全市场数据请求收归到一个出口。
你真正在维护的,是一个数据中间件
没有统一 API 时,你面对的困境:
| 问题类型 | 具体表现 | 维护成本 |
|---|---|---|
| 字段命名不一致 | 今天叫 vol,明天叫 volume;AKShare 取北交所网页表格列名带空格 | 每个源写一个 parser |
| 品种代码规范混乱 | 科创板 688981 要补 SE. 前缀才能被 vnpy 认 | 手工维护映射表 |
| 时区不统一 | UTC / 北京时间 / 交易所时间混用 | 排 bug 排到下半夜 |
| 复权因子缺失或基准不一致 | 拼接收据复权链断裂 | 回测绩效系统性偏移 |
TickDB 出现在这个背景下不意外:一个 REST + WebSocket 长连接,覆盖 A 股 6,986 只,字段命名、鉴权方式、UTC 毫秒时间戳在所有交易所保持一致,复权因子直接内置在 kline 返回中。你不用再维护三个 parser、两套字段映射和一套时区转换脚本。
| TickDB 统一了什么 | 你省掉了什么 |
|---|---|
| 一套字段命名(所有市场) | 多源字段映射表 |
一种鉴权方式(X-API-Key) | 多套 Token 管理 |
| 统一 UTC 毫秒时间戳 | 时区转换脚本 |
| 复权因子内置 kline | 外部复权因子表拼接 |
接口文档和字段映射关系在 https://docs.tickdb.ai 开源可查。需要更自动化的行情查询,还可以走 MCP 工具链(https://mcp.tickdb.ai),把行情封装成 Agent 可调用的服务。
你的复权因子用对了方向吗?
一个朋友问:"回测年化 22%,实盘只剩 6%,是不是过拟合?"
我让他截图 DataFeed 代码,第 42 行用的是前复权 close,资金曲线计算用的却是后复权净值。
这不是过拟合,是用两根不同的尺子量了同一段行情。
你上一次检查自己的复权坐标,是什么时候?
📡 数据由 TickDB.ai 提供
通过 TickDB API 获取实时行情数据
一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。
免费领取 API Key查看 API 文档