WebSocket 行情重连成功,K线缺口不会自动消失
作者: TickDB Research · 发布: 2026/7/2 · 阅读: 9
标签: W27-T01, 掘金A059
WebSocket 行情重连成功,K线缺口不会自动消失
摘要
WebSocket 断流后重连成功,watch dog 显示一切正常——但中间窗口的 K 线缺口不会自己补回来。连接恢复不等于数据连续,断流窗口必须通过独立的缺口检测、REST K 线回补和 gap_report 留痕三步完成修复。本文给出一套可集成的 Python 方案,附带状态判断的严格标准:不是回补条数够了就算完,recovered_times 必须覆盖 missing_times。
一、重连成功,K线图上出现了一段平直曲线
凌晨三点,行情监控告警:WebSocket 断开。五秒后重连成功,日志显示连接恢复,心跳正常。
你看了一眼 K 线图——没有跳空,没有断崖,曲线平滑。你关掉告警继续睡觉。
两周后跑回测,发现那天的分钟线数据不对。排查了半天才定位到根因:凌晨断流那段时间缺了几分钟数据。重连后 WebSocket 只推送了当前快照,没有把断流窗口的数据补回来。你看到的平滑曲线,是前端把缺失段直接连起来了。
重连成功 ≠ 数据连续。watch dog 只能告诉你“通道恢复了”,不能告诉你“中间窗口的数据是完整的”。
二、三个问题,通道层回答不了
一个典型的 WebSocket watch dog 只做三件事:心跳超时检测、连接状态监控、重连触发。三者都围绕传输通道展开。
它回答不了这三个问题:
- 断流期间到底缺了几根 K 线?
- 重连后第一条数据的时间戳,和断流前最后一条是否连续?
- 回补窗口有没有被完整覆盖,还是只补了一部分?
连接是传输层的概念,K 线是业务层的概念。 通道恢复不能自动推导出数据完整。这个推导必须由业务代码主动完成。
断流后常见三种数据异常:
- 缺段:窗口内 K 线全部缺失,重连后直接跳到当前时间。
- 缺条:窗口内部分 K 线缺失,比如 10 分钟内缺了 3 根。
- 假补全:重连后推送了部分重叠数据,但不是完整窗口,你以为补全了,其实还有缺口。
三、缺口检测:expected_times vs actual_times
缺口检测的逻辑:根据 K 线周期和交易日历,生成断流窗口内“应该有”的时间序列(expected_times)。然后跟“实际有”的时间序列(actual_times)做集合差。
actual_times 从哪里来? 不是从 WebSocket 推送的 K 线频道直接提取——当前只确认 WebSocket 可用于 ticker/depth/trade 持续推送,K 线频道不在本次讨论范围内。actual_times 应从本地聚合/入库后的 bar_time 提取,或者从业务侧记录的“有效行情时间轴”中获取。两者精度必须一致,不一致时先归一化再做比对。
expected_times 从哪里来? 根据 interval 和交易日历推算。这一步如果不参照交易日历排除非交易时段、午休和节假日,就会把不该有 K 线的时段也计为“缺失”,expected_count 虚高,回补永远补不齐。
from typing import List
from datetime import datetime
def detect_gaps(expected_times: List[datetime],
actual_times: List[datetime]) -> dict:
"""
缺口检测:expected_times 应该有,actual_times 实际有。
actual_times 来源:本地聚合/入库后的 bar_time,或业务侧记录的有效行情时间轴。
expected_times 来源:根据 interval 和交易日历推算。
返回 missing_times、overlap、unexpected。
"""
expected_set = set(expected_times)
actual_set = set(actual_times)
missing_times = sorted(expected_set - actual_set)
overlap = sorted(expected_set & actual_set)
unexpected = sorted(actual_set - expected_set)
return {
"gap_count": len(missing_times),
"missing_times": missing_times,
"overlap_count": len(overlap),
"unexpected_count": len(unexpected),
"is_complete": len(missing_times) == 0 and len(unexpected) == 0
}
四、为什么数量够了也可能没补全
回补最容易踩的坑:只看 recovered_count >= expected_count 就标 full。
假设缺口窗口内 missing_times 是 10:01、10:02、10:03 三根 1 分钟 K 线。REST 回补请求返回了三条记录,recovered_count = 3,和 expected_count 一致。但仔细看返回的 K 线时间字段——补回来的是 10:02、10:03、10:04。10:01 仍然缺失,10:04 是多余的。
数量对了,不代表缺口被覆盖了。 判定 full 必须比较 recovered_times 是否覆盖 missing_times,不是只比数量。
def verify_gap_coverage(missing_times: List[datetime],
recovered_klines: list,
time_field: str = "time") -> dict:
"""
验证回补数据是否真正覆盖了缺失窗口。
不是比数量——是比 recovered_times 是否包含 missing_times。
time_field 为 K 线记录中的时间字段名,以数据源文档为准。
"""
recovered_times = set()
for bar in recovered_klines:
t = bar.get(time_field)
if t is not None:
recovered_times.add(t)
missing_set = set(missing_times)
still_missing = sorted(missing_set - recovered_times)
unexpected = sorted(recovered_times - missing_set)
return {
"coverage_complete": len(still_missing) == 0,
"still_missing": still_missing,
"unexpected_times": unexpected,
"expected_count": len(missing_times),
"recovered_count": len(recovered_klines)
}
判定 full 的双重标准:
recovered_count >= expected_countverify_gap_coverage返回coverage_complete == True
两条同时满足,才是 full。任一条不满足,最多标 partial。
五、回补:用 REST K 线接口独立查询缺口窗口
为什么回补不能靠 WebSocket 重新推送?WebSocket 是面向实时推送的轻量通道,推送的是“当前及未来”的增量,不包含断开期间的历史快照。REST K 线接口适合独立查询指定时间窗口,但返回结果是否完整覆盖缺口,仍要校验 recovered_times 和 missing_times 的覆盖关系。
MCP 工具是给 AI 按需查询用的,不适合放进自动化断流回补链路。
def fetch_kline_gap(symbol: str,
interval: str,
start: datetime,
end: datetime) -> dict:
"""
用 REST K 线接口回补缺失窗口。
实际调用时替换为具体数据源的端点、鉴权方式和参数。
回补前先查 gap_report 表,用 gap_start + gap_end + symbol
+ interval 四个字段去重。已有同窗口成功回补记录则跳过。
"""
# 教学骨架,具体实现以数据源官方文档为准
return {
"symbol": symbol,
"interval": interval,
"start": start.isoformat(),
"end": end.isoformat(),
"klines": [], # 实际返回的 K 线数组
"status": "success",
"raw_snapshot": {}
}
分段拉取,不要一次覆盖太长窗口。 缺口跨越数天时按天或按小时分段。单段失败不影响其他段,也能精确定位哪一段没补上。
六、留痕:gap_report 字段表与写入逻辑
每次回补都生成一条 gap_report。事后排查“那段行情为什么不对”,全靠这条记录。
| 字段名 | 来源 | 说明 |
|---|---|---|
symbol | 请求参数 | 回补标的 |
interval | 请求参数 | K 线周期 |
gap_start | 检测结果 | 缺口起始 |
gap_end | 检测结果 | 缺口结束 |
expected_count | 检测结果 | 应有 K 线条数 |
recovered_count | 回补结果 | 实际回补条数 |
still_missing_count | 覆盖验证 | 仍未覆盖条数 |
raw_snapshot_id | 客户端生成 | 原始响应哈希摘要 |
status | 回补结果 | full / partial / empty / unrecoverable / failed |
note | 手动或自动 | 异常说明 |
import hashlib
import json
from datetime import datetime, timezone
def write_gap_report(symbol: str,
interval: str,
gap_start: datetime,
gap_end: datetime,
expected_count: int,
recovered_klines: list,
missing_times: list,
raw_response: dict,
request_error: str = None) -> dict:
"""
生成 gap_report。不直接写库,由调用方决定存储方式。
status 判断基于覆盖验证结果,而非仅比数量。
"""
raw_id = hashlib.sha256(
json.dumps(raw_response, sort_keys=True, ensure_ascii=False, default=str).encode()
).hexdigest()[:16]
recovered_count = len(recovered_klines) if recovered_klines else 0
# 覆盖验证
coverage = verify_gap_coverage(missing_times, recovered_klines)
still_missing_count = len(coverage["still_missing"])
# 判断状态
if request_error:
status = "failed"
elif recovered_count == 0:
status = "empty"
elif coverage["coverage_complete"]:
status = "full"
elif recovered_count > 0 and still_missing_count > 0:
status = "partial"
else:
status = "partial"
note = f"expected {expected_count}, recovered {recovered_count}, still missing {still_missing_count}"
if status == "failed":
note = request_error
elif status == "empty":
note += ", check if gap window is outside available history range"
return {
"symbol": symbol,
"interval": interval,
"gap_start": gap_start.isoformat(),
"gap_end": gap_end.isoformat(),
"expected_count": expected_count,
"recovered_count": recovered_count,
"still_missing_count": still_missing_count,
"raw_snapshot_id": raw_id,
"status": status,
"note": note,
"reported_at": datetime.now(timezone.utc).isoformat()
}
empty 不等于 unrecoverable。 REST 返回空且参数权限无误,同时确认该时段在数据源历史覆盖范围内,才标 unrecoverable。如果无法确认,先标 empty 等待人工判断。不要把 unrecoverable 标成 partial——下游会一直等一个永远不会来的回补。
七、6 个失败分支
| # | 失败场景 | 处理方式 |
|---|---|---|
| ① | REST 回补返回空 data | 先标 empty,确认参数和权限后,若不在历史覆盖范围则标 unrecoverable |
| ② | 回补条数够了但 recovered_times 未覆盖 missing_times | status=partial,标注仍缺失的具体时间点 |
| ③ | 交易日历误判导致 expected 不准确 | note 标注“交易日历可能不准确”,不强制补数 |
| ④ | interval 不一致 | 阻断,修正参数后重新拉取 |
| ⑤ | 同一缺口被重复回补 | 回补前用 gap_start+end+symbol+interval 去重查询 |
| ⑥ | raw_snapshot 未保存 | status=failed,note 标注“缺失原始快照”,该记录不可复查 |
八、TickDB 的工程边界
上面这套方案是通用的,不绑定任何特定数据源。如果你用 TickDB 做行情接入,它在断流回补中的角色:
- WebSocket:负责 ticker/depth/trade 的持续推送。鉴权使用
api_keyquery 参数。 - REST K 线:负责断流后按缺口窗口独立查询历史 K 线。鉴权使用
X-API-KeyHeader。 - MCP:供 AI 工具按需查询,不适合放进自动化断流回补链路。
三者各司其职。WebSocket 不负责历史回补,REST K 线不是实时推送通道,MCP 不替代监控链路。所有端点、字段路径和 timestamp 口径,以 TickDB 官方文档和实际测试为准。
九、断流回补的最小检查清单
- [ ] 重连后,有没有根据 interval 和交易日历生成 expected_times?
- [ ] actual_times 来源是否明确(本地聚合 bar_time 或业务侧有效行情时间轴)?
- [ ] 缺口回补是否用 REST K 线独立查询,而不是靠 WebSocket 重新推送?
- [ ] 每次回补是否生成 gap_report,包含 expected_count 和 recovered_count?
- [ ] 是否执行了
verify_gap_coverage,比较 recovered_times 和 missing_times? - [ ]
status=partial是否标注了仍缺失的具体时间点? - [ ]
status=empty是否已确认是否为unrecoverable? - [ ] 回补前是否做了去重查询,防止重复写入?
- [ ] 原始响应是否保存了 raw_snapshot_id?
- [ ] 全部回补完成后是否检查了 status 分布?
你们现在监控 WebSocket 断流,是只看连接状态,还是已经把缺口检测和回补留痕做进了系统?遇到过最隐蔽的一次数据缺失,最后是怎么定位到的?
📡 本文以 TickDB 作为行情接入示例。文中代码为 Python 教学骨架,不依赖任何特定数据源的端点或字段。本文仅讨论断流回补的工程方法,不构成投资建议。
标签: Python, WebSocket, K线, 数据回补, 工程实践, TickDB
通过 TickDB API 获取实时行情数据
一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。
免费领取 API Key查看 API 文档