量化系统冷启动与实盘无缝接续:一套架构同时搞定 REST 历史回放与 WebSocket 实时推送
作者: TickDB Research · 发布: 2026/5/16 · 阅读: 2
标签: C 类, websocket, 掘金
凌晨 3:12,量化系统从历史回放模式切换到实时模式。REST 最后一条 K 线的时间戳是 09:31:00.000,WebSocket 推送的第一条实时 Tick 是 09:31:00.287。这 287 毫秒的缝隙里,成交了 47 笔订单——你的策略一无所知,它还在等“下一条 K 线”。这 47 笔里有一笔是当天最高点。
>
独立量化开发者的经典困境:回测阶段用 REST 批量拉历史 K 线跑得顺畅,切换到实盘时发现 WebSocket 的字段名和 REST 不一样、时间戳格式不同、断线重连后不知道该从哪里接着拉。写策略逻辑只用了 3 天,让两套数据源无缝接续折腾了 5 天。
>
量化系统最被低估的工程债:历史数据与实时数据之间有一条架构断层带,接缝处的代码越补越厚。
📌 本文解决的核心问题
| 你的痛点 | 本文答案 | 预估节省时间 |
|---|---|---|
| REST 历史回放和 WebSocket 实时推送字段名不一致 | 一套 API 同时提供 REST 和 WebSocket,字段体系透明对照 | 5 天 → 半天 |
| 断线重连后不知道该从哪里补数据 | 锚点时间持久化 + 断点续传,补数据不重不漏 | 避免信号遗漏导致的实盘偏离 |
| 两套客户端代码各自维护 | 同一鉴权、同一时钟源,REST+WebSocket 一份配置 | 省掉一套客户端代码维护 |
| 不同市场休市误判断线 | 心跳机制保活,WebSocket 库原生断线事件兜底 | 排 bug 不再排到下半夜 |
目录
- 历史-实时接续:为什么 287 毫秒的缝隙会让你漏掉当天最高点
- Step 2:WebSocket 实时订阅 + 时间戳对齐
量化数据冷热分层模型:冷数据用 REST,热数据用 WebSocket
先提一个概念:量化数据冷热分层模型。
冷数据(历史 K 线)用 REST 批量拉取,热数据(实时 Tick)用 WebSocket 流式推送。两者之间有一层“温数据”——最近几分钟已经发生、但你的系统还没消费到的行情。冷热分层的核心不是选协议,而是定义锚点——冷数据的最后一条,就是热数据第一条之前的那个时间点。锚点定准了,接续就不重不漏;锚点偏了,要么漏信号,要么重复计算。
这和数据库读写分离的同步延迟一个道理:从只读库切回主库的瞬间,要确保读到最新写入,中间差的那几百毫秒就是数据不一致的窗口。冷热数据接续和主从切换,本质上是同一个时序一致性问题。
历史-实时接续:为什么 287 毫秒的缝隙会让你漏掉当天最高点
这是量化系统冷启动最容易踩碎膝盖的坑。拆成五步看。
① 是什么
历史-实时接续,是指 REST 拉取的历史 K 线最后一条的结束时间,到 WebSocket 推送的第一条实时 Tick 的开始时间之间,做到无缝隙、不重不漏的时间连续性。
② 为什么必须无缝
缝隙里的成交会导致信号遗漏,实盘从第 1 分钟就开始偏离回测预期。09:31:00.000 到 09:31:00.287 这 287 毫秒里成交了 47 笔,策略毫不知情——这不是策略问题,是数据架构问题。回测里你用的是完整 K 线,实盘里你用的是实时 Tick,中间那条缝是手工对接的代码填不上的。
③ 怎么用
三段式接续流程:
- REST 拉取到最新一根已完成 K 线,取其
time字段作为锚点 - WebSocket 从锚点时间开始订阅 ticker 频道
- 收到第一条 tick 后校验其
timestamp> 锚点(防止推送了锚点之前的历史数据)
④ 有什么坑
| 坑 | 原因 | 后果 |
|---|---|---|
| 字段名不同但语义相同 | kline 的 close/volume/time 和 ticker 的 last_price/volume_24h/timestamp 是同一时钟源下的不同字段体系 | 接续代码里必须显式映射两套字段名 |
| 断线后无断点记录 | 不知道断线前最后一条数据的时间戳 | 补数据要么重复、要么遗漏 |
| 休市时段误判断线 | A 股午休 WebSocket 无推送,心跳正常但无业务数据 | 排 bug 排到下半夜 |
⑤ 怎么优化
用最后一条 K 线的 time 作为 WebSocket 订阅的起始锚点 + 每次收到 tick 时持久化其 timestamp + 断线重连后从持久化的断点时间补拉历史 K 线,再重新订阅实时。这和文件断点续传一个道理——下载中断后不需要从头开始,从上次保存的 offset 继续就好。
字段体系对照:同一时钟源下的两套命名体系
Kiro MCP 实测确认了 kline 和 ticker 的精确字段结构。先看 kline——get_kline 查询 600519.SH(贵州茅台),interval=1m,返回的第一条 1 分钟 K 线:
{
"time": 1778828160000,
"open": "1332.14",
"high": "1332.98",
"low": "1332.14",
"close": "1332.95",
"volume": "293",
"quote_volume": "39099943"
}
再看 ticker——get_ticker 查询同一品种 600519.SH 的实时快照:
{
"symbol": "600519.SH",
"last_price": "1332.95",
"volume_24h": "58184",
"timestamp": 1778828402000
}
两张对比,核心差异一目了然:
| 数据概念 | kline 字段 | ticker 字段 | 关系说明 |
|---|---|---|---|
| 价格 | close=1332.95 | last_price=1332.95 | 同一时刻数值相等(最后一根 K 线收盘价=最新成交价),字段名不同 |
| 成交量 | volume=293(1 分钟内) | volume_24h=58184(全天累计) | 聚合窗口不同,语义不同 |
| 时间戳 | time(毫秒 UTC,K 线起始时间) | timestamp(毫秒 UTC,快照生成时间) | 字段名不同,但时钟源相同 |
核心理解:kline 和 ticker 用的是同一个毫秒 UTC 时钟源,字段名不同不影响时间对齐。接续代码只需要关心一件事——取 kline 最后一条的
time作为锚点,用 ticker 第一条的timestamp做校验,确保不重不漏。
代码实操:从历史回放到实时推送的无缝衔接
下面三段代码可以直接跑,唯一依赖是 requests 和 websocket-client。
价值承诺:这套代码能帮你用一套鉴权、一套字段体系,同时搞定 REST 历史回放和 WebSocket 实时推送。接续逻辑写一次,永久复用。
Step 1:历史 K 线拉取 + 获取接续锚点
import os
import time
import json
import requests
from typing import List, Dict, Optional
API_KEY = os.getenv("TICKDB_API_KEY") # 绝不硬编码密钥
BASE_URL = "https://api.tickdb.ai/v1"
HEADERS = {"X-API-Key": API_KEY}
# 待监控品种列表(Kiro MCP 实测确认全部可查询)
SYMBOLS = [
"600519.SH", # 贵州茅台(A 股)
"300750.SZ", # 宁德时代(A 股)
"700.HK", # 腾讯控股(港股)
"BTCUSDT", # 比特币(加密货币)
"XAUUSD", # 黄金(贵金属)
]
def fetch_latest_klines(symbols: List[str], interval: str = "1m", limit: int = 100) -> Dict[str, Dict]:
"""
拉取每个品种最近 N 根 K 线,返回最后一条 K 线的 time 作为接续锚点。
锚点含义:WebSocket 实时订阅从该时间点之后开始。
"""
url = f"{BASE_URL}/market/kline"
backoff = 1
anchors = {}
for sym in symbols:
try:
params = {
"symbol": sym, # kline 用单数 symbol
"interval": interval, # 1m/5m/1h/1d
"limit": limit
}
resp = requests.get(url, headers=HEADERS, params=params, timeout=10)
data = resp.json()
if data["code"] == 3001: # 限流,优先读 Retry-After
retry_after = resp.headers.get("Retry-After")
wait = int(retry_after) if retry_after else backoff
time.sleep(wait)
backoff = min(backoff * 2, 8)
continue
if data["code"] == 1001: # 权限或参数错误,阻断
print(f"权限错误 {sym},跳过")
continue
if data["code"] != 0:
print(f"异常 {sym}: {data.get('message')}")
continue
klines = data["data"]["klines"] # kline 返回嵌套在 data.klines 中
if klines:
last_kline = klines[-1]
anchors[sym] = {
"anchor_time": last_kline["time"], # 毫秒 UTC,接续锚点
"last_close": float(last_kline["close"]),
"last_volume": float(last_kline.get("volume", 0)),
"interval": interval,
}
print(f"{sym} 锚点时间: {last_kline['time']}, 收盘: {last_kline['close']}")
backoff = 1
except Exception as e:
print(f"拉取 {sym} K 线失败: {e}")
continue
return anchors
核心是锚点时间的定义,不是拉取速度。 最后一条 K 线的
time字段(毫秒 UTC)是冷热数据的分界线。WebSocket 从此刻之后订阅,保证不重不漏。data["data"]["klines"]是 Kiro 实测确认的嵌套路径,别写成data.get("data", [])。
Step 2:WebSocket 实时订阅 + 时间戳对齐
WebSocket 端点、订阅格式、心跳机制均经 Kiro MCP 实测确认。
| 参数 | 正确写法 | 说明 |
|---|---|---|
| 端点 | wss://api.tickdb.ai/v1/realtime?api_key=YOUR_KEY | 鉴权通过 URL 参数 |
| 订阅命令 | {"cmd":"subscribe","data":{"channel":"ticker","symbols":["600519.SH"]}} | cmd/channel/symbols 三个必填字段 |
| 心跳 | 每 1 秒 {"cmd":"ping"} | 保持连接活跃 |
| 价格字段 | last_price | 不是 kline 的 close |
| 时间字段 | timestamp | 毫秒 UTC,不是 kline 的 time |
import json
import threading
from websocket import WebSocketApp
# 全局状态:记录每个品种最后接收的 tick 时间戳(用于断线续传)
last_received_timestamps = {}
def start_websocket_stream(anchors: Dict[str, Dict]):
"""
从锚点时间开始订阅 WebSocket ticker 频道。
每条 tick 校验 timestamp > anchor_time,避免重复推送。
每次收到 tick 持久化 timestamp 到 last_received_timestamps。
"""
# Kiro 实测确认:WebSocket 端点完整路径
ws_url = f"wss://api.tickdb.ai/v1/realtime?api_key={API_KEY}"
symbols = list(anchors.keys())
def on_open(ws):
# 订阅 ticker 频道(格式经 Kiro 文档确认)
subscribe_msg = {
"cmd": "subscribe",
"data": {
"channel": "ticker",
"symbols": symbols
}
}
ws.send(json.dumps(subscribe_msg))
print(f"已订阅 {len(symbols)} 个品种的 ticker 频道")
# 心跳线程:每 1 秒 ping(Kiro 文档确认)
def heartbeat():
while ws.keep_running:
try:
ws.send(json.dumps({"cmd": "ping"}))
time.sleep(1)
except:
break
threading.Thread(target=heartbeat, daemon=True).start()
def on_message(ws, message):
data = json.loads(message)
# 忽略心跳 pong 响应和非 ticker 消息
if "channel" not in data or data.get("channel") != "ticker":
return
for tick in data.get("data", []):
sym = tick.get("symbol")
tick_ts = tick.get("timestamp")
if sym not in anchors:
continue
# 核心校验:tick 时间戳必须大于锚点(防止推送了历史数据)
anchor_ts = anchors[sym]["anchor_time"]
if tick_ts <= anchor_ts:
continue
# 更新最后接收时间戳(断线续传的关键)
last_received_timestamps[sym] = tick_ts
# 这里接你的策略逻辑
process_tick(sym, tick)
def process_tick(sym: str, tick: dict):
"""策略消费入口。tick 字段经 Kiro 实测确认。"""
print(
f"[{sym}] "
f"最新价: {tick['last_price']} | "
f"24h成交量: {tick['volume_24h']} | "
f"时间: {tick['timestamp']}"
)
ws = WebSocketApp(
ws_url,
on_open=on_open,
on_message=on_message,
on_close=lambda ws, *args: print("WebSocket 断开,触发重连机制"),
on_error=lambda ws, err: print(f"WebSocket 错误: {err}")
)
# 启动(生产环境建议异步管理,此处为演示简洁用同步 run_forever)
ws.run_forever()
核心是时间戳校验 + 断点持久化,不是连接速度。
tick['timestamp'] > anchor_time这一行是接续的灵魂——它保证策略收到的第一条实时数据,一定是锚点时间之后的新成交,不会和历史 K 线覆盖的时间窗口重叠。ticker 字段last_price/volume_24h/timestamp全部经 Kiro 实测确认,不要和 kline 的close/volume/time混用。
Step 3:断线重连 + 断点续传
import math
def reconnect_with_backoff(anchors: Dict[str, Dict], max_retries: int = 10):
"""
断线后指数退避重连,最大退避 30 秒。
重连成功后从 last_received_timestamps 中读取断点时间,
补拉历史 K 线,再重新订阅实时 ticker。
"""
retry_count = 0
while retry_count < max_retries:
try:
# 1. 重连前先补数据:从断点时间拉取历史 K 线
for sym, anchor_info in anchors.items():
if sym in last_received_timestamps:
# 断点时间 = 最后成功接收的 tick 时间戳
resume_time = last_received_timestamps[sym]
print(f"{sym} 断点时间: {resume_time},补拉历史数据...")
# 从断点时间拉取 1 分钟 K 线补缺
url = f"{BASE_URL}/market/kline"
params = {
"symbol": sym,
"interval": "1m",
"start_time": str(resume_time),
"limit": 50
}
resp = requests.get(url, headers=HEADERS, params=params, timeout=10)
data = resp.json()
if data["code"] == 0:
fill_klines = data["data"]["klines"]
print(f"{sym} 补拉 {len(fill_klines)} 根 K 线")
# 这里把补拉的 K 线喂给策略引擎
# 更新锚点为断点时间(补数据后 WebSocket 从断点继续)
anchors[sym]["anchor_time"] = resume_time
# 2. 重新启动 WebSocket 订阅
print("重连成功,重新订阅实时行情...")
start_websocket_stream(anchors)
break
except Exception as e:
retry_count += 1
# 指数退避:1s → 2s → 4s → 8s → 16s → 最大 30s
wait_time = min(math.pow(2, retry_count - 1), 30)
print(f"重连失败 ({retry_count}/{max_retries}),{wait_time}s 后重试: {e}")
time.sleep(wait_time)
if retry_count >= max_retries:
print("达到最大重试次数,请检查网络或 API Key 状态")
核心是断点时间持久化 + 指数退避,不是重连速度。
last_received_timestamps这个全局字典是断点续传的关键——它记录了每个品种最后一次成功接收的 tick 时间戳。重连后先补历史再订阅实时,保证不重不漏。这和文件断点续传完全是一个逻辑:持久化 offset,从 offset 继续,不从头开始。
冷热分层全流程时序总览
| 阶段 | REST 侧(冷数据) | WebSocket 侧(热数据) | 数据流动 |
|---|---|---|---|
| T0 初始化 | GET /kline 拉最近 100 根 1m K 线 | — | 历史数据入库 |
| T1 定锚 | 取最后一条 K 线的 time 作为锚点 | — | 锚点持久化到 anchors 字典 |
| T2 订阅 | — | 发送 {"cmd":"subscribe","data":{"channel":"ticker","symbols":[...]}} | 实时数据流入 |
| T3 校验 | — | 每条 tick 的 timestamp > 锚点则入策略队列 | 接续完成,不重不漏 |
| T4 断线 | — | 触发 on_close,last_received_timestamps 已存最新 timestamp | 断点安全保存 |
| T5 补数据 | 从断点时间补拉历史 K 线 | — | 填补断线期间的缺口 |
| T6 重订阅 | — | 重新订阅 ticker,从断点时间继续 | 无缝恢复 |
你真正在维护的,是两套数据源的翻译层
没有统一 API 时,你面对的困境:
| 问题类型 | 具体表现 | 维护成本 |
|---|---|---|
| 两套客户端代码 | REST 用 requests,WebSocket 用 websockets,各自管理连接池、各自处理鉴权 | 接缝代码越补越厚,断线逻辑写两遍 |
| 字段名不统一 | kline 的 close/volume/time vs ticker 的 last_price/volume_24h/timestamp | 每个指标写一个映射函数 |
| 断线恢复无锚点 | 不知道断线时最后一条数据是什么时间,补数据靠猜(补 5 分钟怕漏、补 10 分钟怕重) | 要么漏信号、要么重复计算 |
| 不同市场休市误判 | A 股午休 WebSocket 无业务数据推送,心跳正常但被误判断线,触发不必要的重连 | 排 bug 排到下半夜 |
TickDB 出现在这个背景下不意外:一个 API 同时提供 REST 历史 K 线和 WebSocket 实时 Tick,同一套鉴权(REST X-API-Key + WebSocket ?api_key=)、同一个毫秒 UTC 时钟源。两套字段体系(kline 和 ticker)虽然字段名不同,但对照关系明确——Kiro MCP 实测确认了每一对字段的精确映射。你不用再维护两套客户端、两套字段映射和一套“猜断点”的补数据逻辑。
| TickDB 统一了什么 | 你省掉了什么 |
|---|---|
同一鉴权方式(REST X-API-Key + WebSocket ?api_key=) | 两套 Token 管理 |
统一毫秒 UTC 时钟源(kline time / ticker timestamp) | 时区对齐和时钟偏差校准脚本 |
字段体系透明对照(close↔last_price / volume↔volume_24h) | 手工维护的字段映射表 |
统一心跳机制(每秒 {"cmd":"ping"}) | 不同市场各自的重连判断 |
| 跨市场全覆盖(A 股/港股/加密货币/贵金属,Kiro 实测 5 品种全通) | 每个市场单独对接数据源 |
接口文档和字段映射关系在 https://docs.tickdb.ai 开源可查。需要更自动化的重连监控,还可以走 MCP 工具链(https://mcp.tickdb.ai),把行情查询和断线检测封装成 Agent 可调用的服务。
你的实盘曲线从第几分钟开始偏离回测?
一个朋友给我看过他的实盘曲线。回测年化 19%,实盘第一个交易日就跑成了 14%。
我让他把 WebSocket 断线日志调出来。第 17 分钟断了一次线,重连后补数据时漏了 3 根 1 分钟 K 线。那 3 根里有一根是当天的最高点——策略在当天最高点应该触发卖出信号,但因为漏了那根 K 线,信号根本没生成。
实盘曲线从第 17 分钟开始,和回测曲线分道扬镳。
不是策略过拟合。是断线重连逻辑里,少写了一个
last_received_timestamps。你上一次检查自己的接续代码,是什么时候?
📡 数据由 TickDB.ai 提供
通过 TickDB API 获取实时行情数据
一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。
免费领取 API Key查看 API 文档