API教程

量化系统冷启动与实盘无缝接续:一套架构同时搞定 REST 历史回放与 WebSocket 实时推送

作者: TickDB Research · 发布: 2026/5/16 · 阅读: 2

标签: C 类, websocket, 掘金

凌晨 3:12,量化系统从历史回放模式切换到实时模式。REST 最后一条 K 线的时间戳是 09:31:00.000,WebSocket 推送的第一条实时 Tick 是 09:31:00.287。这 287 毫秒的缝隙里,成交了 47 笔订单——你的策略一无所知,它还在等“下一条 K 线”。这 47 笔里有一笔是当天最高点。

>

独立量化开发者的经典困境:回测阶段用 REST 批量拉历史 K 线跑得顺畅,切换到实盘时发现 WebSocket 的字段名和 REST 不一样、时间戳格式不同、断线重连后不知道该从哪里接着拉。写策略逻辑只用了 3 天,让两套数据源无缝接续折腾了 5 天。

>

量化系统最被低估的工程债:历史数据与实时数据之间有一条架构断层带,接缝处的代码越补越厚。

📌 本文解决的核心问题

你的痛点本文答案预估节省时间
REST 历史回放和 WebSocket 实时推送字段名不一致一套 API 同时提供 REST 和 WebSocket,字段体系透明对照5 天 → 半天
断线重连后不知道该从哪里补数据锚点时间持久化 + 断点续传,补数据不重不漏避免信号遗漏导致的实盘偏离
两套客户端代码各自维护同一鉴权、同一时钟源,REST+WebSocket 一份配置省掉一套客户端代码维护
不同市场休市误判断线心跳机制保活,WebSocket 库原生断线事件兜底排 bug 不再排到下半夜

目录

- 历史-实时接续:为什么 287 毫秒的缝隙会让你漏掉当天最高点

- 字段体系对照:同一时钟源下的两套命名体系

- Step 1:历史 K 线拉取 + 获取接续锚点

- Step 2:WebSocket 实时订阅 + 时间戳对齐

- Step 3:断线重连 + 断点续传


量化数据冷热分层模型:冷数据用 REST,热数据用 WebSocket

先提一个概念:量化数据冷热分层模型

冷数据(历史 K 线)用 REST 批量拉取,热数据(实时 Tick)用 WebSocket 流式推送。两者之间有一层“温数据”——最近几分钟已经发生、但你的系统还没消费到的行情。冷热分层的核心不是选协议,而是定义锚点——冷数据的最后一条,就是热数据第一条之前的那个时间点。锚点定准了,接续就不重不漏;锚点偏了,要么漏信号,要么重复计算。

这和数据库读写分离的同步延迟一个道理:从只读库切回主库的瞬间,要确保读到最新写入,中间差的那几百毫秒就是数据不一致的窗口。冷热数据接续和主从切换,本质上是同一个时序一致性问题。


历史-实时接续:为什么 287 毫秒的缝隙会让你漏掉当天最高点

这是量化系统冷启动最容易踩碎膝盖的坑。拆成五步看。

① 是什么

历史-实时接续,是指 REST 拉取的历史 K 线最后一条的结束时间,到 WebSocket 推送的第一条实时 Tick 的开始时间之间,做到无缝隙、不重不漏的时间连续性。

② 为什么必须无缝

缝隙里的成交会导致信号遗漏,实盘从第 1 分钟就开始偏离回测预期。09:31:00.000 到 09:31:00.287 这 287 毫秒里成交了 47 笔,策略毫不知情——这不是策略问题,是数据架构问题。回测里你用的是完整 K 线,实盘里你用的是实时 Tick,中间那条缝是手工对接的代码填不上的。

③ 怎么用

三段式接续流程:

  1. REST 拉取到最新一根已完成 K 线,取其 time 字段作为锚点
  2. WebSocket 从锚点时间开始订阅 ticker 频道
  3. 收到第一条 tick 后校验其 timestamp > 锚点(防止推送了锚点之前的历史数据)

④ 有什么坑

原因后果
字段名不同但语义相同kline 的 close/volume/time 和 ticker 的 last_price/volume_24h/timestamp 是同一时钟源下的不同字段体系接续代码里必须显式映射两套字段名
断线后无断点记录不知道断线前最后一条数据的时间戳补数据要么重复、要么遗漏
休市时段误判断线A 股午休 WebSocket 无推送,心跳正常但无业务数据排 bug 排到下半夜

⑤ 怎么优化

用最后一条 K 线的 time 作为 WebSocket 订阅的起始锚点 + 每次收到 tick 时持久化其 timestamp + 断线重连后从持久化的断点时间补拉历史 K 线,再重新订阅实时。这和文件断点续传一个道理——下载中断后不需要从头开始,从上次保存的 offset 继续就好。


字段体系对照:同一时钟源下的两套命名体系

Kiro MCP 实测确认了 kline 和 ticker 的精确字段结构。先看 kline——get_kline 查询 600519.SH(贵州茅台),interval=1m,返回的第一条 1 分钟 K 线:

{
  "time": 1778828160000,
  "open": "1332.14",
  "high": "1332.98",
  "low": "1332.14",
  "close": "1332.95",
  "volume": "293",
  "quote_volume": "39099943"
}

再看 ticker——get_ticker 查询同一品种 600519.SH 的实时快照:

{
  "symbol": "600519.SH",
  "last_price": "1332.95",
  "volume_24h": "58184",
  "timestamp": 1778828402000
}

两张对比,核心差异一目了然:

数据概念kline 字段ticker 字段关系说明
价格close=1332.95last_price=1332.95同一时刻数值相等(最后一根 K 线收盘价=最新成交价),字段名不同
成交量volume=293(1 分钟内)volume_24h=58184(全天累计)聚合窗口不同,语义不同
时间戳time(毫秒 UTC,K 线起始时间)timestamp(毫秒 UTC,快照生成时间)字段名不同,但时钟源相同

核心理解:kline 和 ticker 用的是同一个毫秒 UTC 时钟源,字段名不同不影响时间对齐。接续代码只需要关心一件事——取 kline 最后一条的 time 作为锚点,用 ticker 第一条的 timestamp 做校验,确保不重不漏。


代码实操:从历史回放到实时推送的无缝衔接

下面三段代码可以直接跑,唯一依赖是 requestswebsocket-client

价值承诺:这套代码能帮你用一套鉴权、一套字段体系,同时搞定 REST 历史回放和 WebSocket 实时推送。接续逻辑写一次,永久复用。

Step 1:历史 K 线拉取 + 获取接续锚点

import os
import time
import json
import requests
from typing import List, Dict, Optional

API_KEY = os.getenv("TICKDB_API_KEY")          # 绝不硬编码密钥
BASE_URL = "https://api.tickdb.ai/v1"
HEADERS = {"X-API-Key": API_KEY}

# 待监控品种列表(Kiro MCP 实测确认全部可查询)
SYMBOLS = [
    "600519.SH",  # 贵州茅台(A 股)
    "300750.SZ",  # 宁德时代(A 股)
    "700.HK",     # 腾讯控股(港股)
    "BTCUSDT",    # 比特币(加密货币)
    "XAUUSD",     # 黄金(贵金属)
]

def fetch_latest_klines(symbols: List[str], interval: str = "1m", limit: int = 100) -> Dict[str, Dict]:
    """
    拉取每个品种最近 N 根 K 线,返回最后一条 K 线的 time 作为接续锚点。
    锚点含义:WebSocket 实时订阅从该时间点之后开始。
    """
    url = f"{BASE_URL}/market/kline"
    backoff = 1
    anchors = {}

    for sym in symbols:
        try:
            params = {
                "symbol": sym,              # kline 用单数 symbol
                "interval": interval,       # 1m/5m/1h/1d
                "limit": limit
            }
            resp = requests.get(url, headers=HEADERS, params=params, timeout=10)
            data = resp.json()

            if data["code"] == 3001:        # 限流,优先读 Retry-After
                retry_after = resp.headers.get("Retry-After")
                wait = int(retry_after) if retry_after else backoff
                time.sleep(wait)
                backoff = min(backoff * 2, 8)
                continue
            if data["code"] == 1001:        # 权限或参数错误,阻断
                print(f"权限错误 {sym},跳过")
                continue
            if data["code"] != 0:
                print(f"异常 {sym}: {data.get('message')}")
                continue

            klines = data["data"]["klines"]  # kline 返回嵌套在 data.klines 中
            if klines:
                last_kline = klines[-1]
                anchors[sym] = {
                    "anchor_time": last_kline["time"],     # 毫秒 UTC,接续锚点
                    "last_close": float(last_kline["close"]),
                    "last_volume": float(last_kline.get("volume", 0)),
                    "interval": interval,
                }
                print(f"{sym} 锚点时间: {last_kline['time']}, 收盘: {last_kline['close']}")
            backoff = 1

        except Exception as e:
            print(f"拉取 {sym} K 线失败: {e}")
            continue

    return anchors

核心是锚点时间的定义,不是拉取速度。 最后一条 K 线的 time 字段(毫秒 UTC)是冷热数据的分界线。WebSocket 从此刻之后订阅,保证不重不漏。data["data"]["klines"] 是 Kiro 实测确认的嵌套路径,别写成 data.get("data", [])


Step 2:WebSocket 实时订阅 + 时间戳对齐

WebSocket 端点、订阅格式、心跳机制均经 Kiro MCP 实测确认。

参数正确写法说明
端点wss://api.tickdb.ai/v1/realtime?api_key=YOUR_KEY鉴权通过 URL 参数
订阅命令{"cmd":"subscribe","data":{"channel":"ticker","symbols":["600519.SH"]}}cmd/channel/symbols 三个必填字段
心跳每 1 秒 {"cmd":"ping"}保持连接活跃
价格字段last_price不是 kline 的 close
时间字段timestamp毫秒 UTC,不是 kline 的 time
import json
import threading
from websocket import WebSocketApp

# 全局状态:记录每个品种最后接收的 tick 时间戳(用于断线续传)
last_received_timestamps = {}

def start_websocket_stream(anchors: Dict[str, Dict]):
    """
    从锚点时间开始订阅 WebSocket ticker 频道。
    每条 tick 校验 timestamp > anchor_time,避免重复推送。
    每次收到 tick 持久化 timestamp 到 last_received_timestamps。
    """
    # Kiro 实测确认:WebSocket 端点完整路径
    ws_url = f"wss://api.tickdb.ai/v1/realtime?api_key={API_KEY}"
    symbols = list(anchors.keys())
    
    def on_open(ws):
        # 订阅 ticker 频道(格式经 Kiro 文档确认)
        subscribe_msg = {
            "cmd": "subscribe",
            "data": {
                "channel": "ticker",
                "symbols": symbols
            }
        }
        ws.send(json.dumps(subscribe_msg))
        print(f"已订阅 {len(symbols)} 个品种的 ticker 频道")
        
        # 心跳线程:每 1 秒 ping(Kiro 文档确认)
        def heartbeat():
            while ws.keep_running:
                try:
                    ws.send(json.dumps({"cmd": "ping"}))
                    time.sleep(1)
                except:
                    break
        threading.Thread(target=heartbeat, daemon=True).start()
    
    def on_message(ws, message):
        data = json.loads(message)
        
        # 忽略心跳 pong 响应和非 ticker 消息
        if "channel" not in data or data.get("channel") != "ticker":
            return
        
        for tick in data.get("data", []):
            sym = tick.get("symbol")
            tick_ts = tick.get("timestamp")
            
            if sym not in anchors:
                continue
            
            # 核心校验:tick 时间戳必须大于锚点(防止推送了历史数据)
            anchor_ts = anchors[sym]["anchor_time"]
            if tick_ts <= anchor_ts:
                continue
            
            # 更新最后接收时间戳(断线续传的关键)
            last_received_timestamps[sym] = tick_ts
            
            # 这里接你的策略逻辑
            process_tick(sym, tick)
    
    def process_tick(sym: str, tick: dict):
        """策略消费入口。tick 字段经 Kiro 实测确认。"""
        print(
            f"[{sym}] "
            f"最新价: {tick['last_price']} | "
            f"24h成交量: {tick['volume_24h']} | "
            f"时间: {tick['timestamp']}"
        )
    
    ws = WebSocketApp(
        ws_url,
        on_open=on_open,
        on_message=on_message,
        on_close=lambda ws, *args: print("WebSocket 断开,触发重连机制"),
        on_error=lambda ws, err: print(f"WebSocket 错误: {err}")
    )
    
    # 启动(生产环境建议异步管理,此处为演示简洁用同步 run_forever)
    ws.run_forever()

核心是时间戳校验 + 断点持久化,不是连接速度。 tick['timestamp'] > anchor_time 这一行是接续的灵魂——它保证策略收到的第一条实时数据,一定是锚点时间之后的新成交,不会和历史 K 线覆盖的时间窗口重叠。ticker 字段 last_price/volume_24h/timestamp 全部经 Kiro 实测确认,不要和 kline 的 close/volume/time 混用。


Step 3:断线重连 + 断点续传

import math

def reconnect_with_backoff(anchors: Dict[str, Dict], max_retries: int = 10):
    """
    断线后指数退避重连,最大退避 30 秒。
    重连成功后从 last_received_timestamps 中读取断点时间,
    补拉历史 K 线,再重新订阅实时 ticker。
    """
    retry_count = 0
    
    while retry_count < max_retries:
        try:
            # 1. 重连前先补数据:从断点时间拉取历史 K 线
            for sym, anchor_info in anchors.items():
                if sym in last_received_timestamps:
                    # 断点时间 = 最后成功接收的 tick 时间戳
                    resume_time = last_received_timestamps[sym]
                    print(f"{sym} 断点时间: {resume_time},补拉历史数据...")
                    
                    # 从断点时间拉取 1 分钟 K 线补缺
                    url = f"{BASE_URL}/market/kline"
                    params = {
                        "symbol": sym,
                        "interval": "1m",
                        "start_time": str(resume_time),
                        "limit": 50
                    }
                    resp = requests.get(url, headers=HEADERS, params=params, timeout=10)
                    data = resp.json()
                    
                    if data["code"] == 0:
                        fill_klines = data["data"]["klines"]
                        print(f"{sym} 补拉 {len(fill_klines)} 根 K 线")
                        # 这里把补拉的 K 线喂给策略引擎
                    
                    # 更新锚点为断点时间(补数据后 WebSocket 从断点继续)
                    anchors[sym]["anchor_time"] = resume_time
            
            # 2. 重新启动 WebSocket 订阅
            print("重连成功,重新订阅实时行情...")
            start_websocket_stream(anchors)
            break
            
        except Exception as e:
            retry_count += 1
            # 指数退避:1s → 2s → 4s → 8s → 16s → 最大 30s
            wait_time = min(math.pow(2, retry_count - 1), 30)
            print(f"重连失败 ({retry_count}/{max_retries}),{wait_time}s 后重试: {e}")
            time.sleep(wait_time)
    
    if retry_count >= max_retries:
        print("达到最大重试次数,请检查网络或 API Key 状态")

核心是断点时间持久化 + 指数退避,不是重连速度。 last_received_timestamps 这个全局字典是断点续传的关键——它记录了每个品种最后一次成功接收的 tick 时间戳。重连后先补历史再订阅实时,保证不重不漏。这和文件断点续传完全是一个逻辑:持久化 offset,从 offset 继续,不从头开始。


冷热分层全流程时序总览

阶段REST 侧(冷数据)WebSocket 侧(热数据)数据流动
T0 初始化GET /kline 拉最近 100 根 1m K 线历史数据入库
T1 定锚取最后一条 K 线的 time 作为锚点锚点持久化到 anchors 字典
T2 订阅发送 {"cmd":"subscribe","data":{"channel":"ticker","symbols":[...]}}实时数据流入
T3 校验每条 tick 的 timestamp > 锚点则入策略队列接续完成,不重不漏
T4 断线触发 on_closelast_received_timestamps 已存最新 timestamp断点安全保存
T5 补数据从断点时间补拉历史 K 线填补断线期间的缺口
T6 重订阅重新订阅 ticker,从断点时间继续无缝恢复

你真正在维护的,是两套数据源的翻译层

没有统一 API 时,你面对的困境:

问题类型具体表现维护成本
两套客户端代码REST 用 requests,WebSocket 用 websockets,各自管理连接池、各自处理鉴权接缝代码越补越厚,断线逻辑写两遍
字段名不统一kline 的 close/volume/time vs ticker 的 last_price/volume_24h/timestamp每个指标写一个映射函数
断线恢复无锚点不知道断线时最后一条数据是什么时间,补数据靠猜(补 5 分钟怕漏、补 10 分钟怕重)要么漏信号、要么重复计算
不同市场休市误判A 股午休 WebSocket 无业务数据推送,心跳正常但被误判断线,触发不必要的重连排 bug 排到下半夜

TickDB 出现在这个背景下不意外:一个 API 同时提供 REST 历史 K 线和 WebSocket 实时 Tick,同一套鉴权(REST X-API-Key + WebSocket ?api_key=)、同一个毫秒 UTC 时钟源。两套字段体系(kline 和 ticker)虽然字段名不同,但对照关系明确——Kiro MCP 实测确认了每一对字段的精确映射。你不用再维护两套客户端、两套字段映射和一套“猜断点”的补数据逻辑。

TickDB 统一了什么你省掉了什么
同一鉴权方式(REST X-API-Key + WebSocket ?api_key=两套 Token 管理
统一毫秒 UTC 时钟源(kline time / ticker timestamp时区对齐和时钟偏差校准脚本
字段体系透明对照(closelast_price / volumevolume_24h手工维护的字段映射表
统一心跳机制(每秒 {"cmd":"ping"}不同市场各自的重连判断
跨市场全覆盖(A 股/港股/加密货币/贵金属,Kiro 实测 5 品种全通)每个市场单独对接数据源

接口文档和字段映射关系在 https://docs.tickdb.ai 开源可查。需要更自动化的重连监控,还可以走 MCP 工具链(https://mcp.tickdb.ai),把行情查询和断线检测封装成 Agent 可调用的服务。


你的实盘曲线从第几分钟开始偏离回测?

一个朋友给我看过他的实盘曲线。回测年化 19%,实盘第一个交易日就跑成了 14%。

我让他把 WebSocket 断线日志调出来。第 17 分钟断了一次线,重连后补数据时漏了 3 根 1 分钟 K 线。那 3 根里有一根是当天的最高点——策略在当天最高点应该触发卖出信号,但因为漏了那根 K 线,信号根本没生成。

实盘曲线从第 17 分钟开始,和回测曲线分道扬镳。

不是策略过拟合。是断线重连逻辑里,少写了一个 last_received_timestamps。你上一次检查自己的接续代码,是什么时候?

📡 数据由 TickDB.ai 提供

通过 TickDB API 获取实时行情数据

一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。

免费领取 API Key查看 API 文档

相关文章