综合

WebSocket 行情重连成功,K线缺口不会自动消失

作者: TickDB Research · 发布: 2026/7/2 · 阅读: 9

标签: W27-T01, 掘金A059

WebSocket 行情重连成功,K线缺口不会自动消失

摘要

WebSocket 断流后重连成功,watch dog 显示一切正常——但中间窗口的 K 线缺口不会自己补回来。连接恢复不等于数据连续,断流窗口必须通过独立的缺口检测、REST K 线回补和 gap_report 留痕三步完成修复。本文给出一套可集成的 Python 方案,附带状态判断的严格标准:不是回补条数够了就算完,recovered_times 必须覆盖 missing_times。

一、重连成功,K线图上出现了一段平直曲线

凌晨三点,行情监控告警:WebSocket 断开。五秒后重连成功,日志显示连接恢复,心跳正常。

你看了一眼 K 线图——没有跳空,没有断崖,曲线平滑。你关掉告警继续睡觉。

两周后跑回测,发现那天的分钟线数据不对。排查了半天才定位到根因:凌晨断流那段时间缺了几分钟数据。重连后 WebSocket 只推送了当前快照,没有把断流窗口的数据补回来。你看到的平滑曲线,是前端把缺失段直接连起来了。

重连成功 ≠ 数据连续。watch dog 只能告诉你“通道恢复了”,不能告诉你“中间窗口的数据是完整的”。

二、三个问题,通道层回答不了

一个典型的 WebSocket watch dog 只做三件事:心跳超时检测、连接状态监控、重连触发。三者都围绕传输通道展开。

它回答不了这三个问题:

  • 断流期间到底缺了几根 K 线?
  • 重连后第一条数据的时间戳,和断流前最后一条是否连续?
  • 回补窗口有没有被完整覆盖,还是只补了一部分?

连接是传输层的概念,K 线是业务层的概念。 通道恢复不能自动推导出数据完整。这个推导必须由业务代码主动完成。

断流后常见三种数据异常:

  • 缺段:窗口内 K 线全部缺失,重连后直接跳到当前时间。
  • 缺条:窗口内部分 K 线缺失,比如 10 分钟内缺了 3 根。
  • 假补全:重连后推送了部分重叠数据,但不是完整窗口,你以为补全了,其实还有缺口。

三、缺口检测:expected_times vs actual_times

缺口检测的逻辑:根据 K 线周期和交易日历,生成断流窗口内“应该有”的时间序列(expected_times)。然后跟“实际有”的时间序列(actual_times)做集合差。

actual_times 从哪里来? 不是从 WebSocket 推送的 K 线频道直接提取——当前只确认 WebSocket 可用于 ticker/depth/trade 持续推送,K 线频道不在本次讨论范围内。actual_times 应从本地聚合/入库后的 bar_time 提取,或者从业务侧记录的“有效行情时间轴”中获取。两者精度必须一致,不一致时先归一化再做比对。

expected_times 从哪里来? 根据 interval 和交易日历推算。这一步如果不参照交易日历排除非交易时段、午休和节假日,就会把不该有 K 线的时段也计为“缺失”,expected_count 虚高,回补永远补不齐。

from typing import List
from datetime import datetime


def detect_gaps(expected_times: List[datetime],
                actual_times: List[datetime]) -> dict:
    """
    缺口检测:expected_times 应该有,actual_times 实际有。
    actual_times 来源:本地聚合/入库后的 bar_time,或业务侧记录的有效行情时间轴。
    expected_times 来源:根据 interval 和交易日历推算。
    返回 missing_times、overlap、unexpected。
    """
    expected_set = set(expected_times)
    actual_set = set(actual_times)

    missing_times = sorted(expected_set - actual_set)
    overlap = sorted(expected_set & actual_set)
    unexpected = sorted(actual_set - expected_set)

    return {
        "gap_count": len(missing_times),
        "missing_times": missing_times,
        "overlap_count": len(overlap),
        "unexpected_count": len(unexpected),
        "is_complete": len(missing_times) == 0 and len(unexpected) == 0
    }

四、为什么数量够了也可能没补全

回补最容易踩的坑:只看 recovered_count >= expected_count 就标 full。

假设缺口窗口内 missing_times 是 10:01、10:02、10:03 三根 1 分钟 K 线。REST 回补请求返回了三条记录,recovered_count = 3,和 expected_count 一致。但仔细看返回的 K 线时间字段——补回来的是 10:02、10:03、10:04。10:01 仍然缺失,10:04 是多余的。

数量对了,不代表缺口被覆盖了。 判定 full 必须比较 recovered_times 是否覆盖 missing_times,不是只比数量。

def verify_gap_coverage(missing_times: List[datetime],
                         recovered_klines: list,
                         time_field: str = "time") -> dict:
    """
    验证回补数据是否真正覆盖了缺失窗口。
    不是比数量——是比 recovered_times 是否包含 missing_times。
    time_field 为 K 线记录中的时间字段名,以数据源文档为准。
    """
    recovered_times = set()
    for bar in recovered_klines:
        t = bar.get(time_field)
        if t is not None:
            recovered_times.add(t)

    missing_set = set(missing_times)
    still_missing = sorted(missing_set - recovered_times)
    unexpected = sorted(recovered_times - missing_set)

    return {
        "coverage_complete": len(still_missing) == 0,
        "still_missing": still_missing,
        "unexpected_times": unexpected,
        "expected_count": len(missing_times),
        "recovered_count": len(recovered_klines)
    }

判定 full 的双重标准

  • recovered_count >= expected_count
  • verify_gap_coverage 返回 coverage_complete == True

两条同时满足,才是 full。任一条不满足,最多标 partial。

五、回补:用 REST K 线接口独立查询缺口窗口

为什么回补不能靠 WebSocket 重新推送?WebSocket 是面向实时推送的轻量通道,推送的是“当前及未来”的增量,不包含断开期间的历史快照。REST K 线接口适合独立查询指定时间窗口,但返回结果是否完整覆盖缺口,仍要校验 recovered_times 和 missing_times 的覆盖关系。

MCP 工具是给 AI 按需查询用的,不适合放进自动化断流回补链路。

def fetch_kline_gap(symbol: str,
                    interval: str,
                    start: datetime,
                    end: datetime) -> dict:
    """
    用 REST K 线接口回补缺失窗口。
    实际调用时替换为具体数据源的端点、鉴权方式和参数。

    回补前先查 gap_report 表,用 gap_start + gap_end + symbol
    + interval 四个字段去重。已有同窗口成功回补记录则跳过。
    """
    # 教学骨架,具体实现以数据源官方文档为准
    return {
        "symbol": symbol,
        "interval": interval,
        "start": start.isoformat(),
        "end": end.isoformat(),
        "klines": [],  # 实际返回的 K 线数组
        "status": "success",
        "raw_snapshot": {}
    }

分段拉取,不要一次覆盖太长窗口。 缺口跨越数天时按天或按小时分段。单段失败不影响其他段,也能精确定位哪一段没补上。

六、留痕:gap_report 字段表与写入逻辑

每次回补都生成一条 gap_report。事后排查“那段行情为什么不对”,全靠这条记录。

字段名来源说明
symbol请求参数回补标的
interval请求参数K 线周期
gap_start检测结果缺口起始
gap_end检测结果缺口结束
expected_count检测结果应有 K 线条数
recovered_count回补结果实际回补条数
still_missing_count覆盖验证仍未覆盖条数
raw_snapshot_id客户端生成原始响应哈希摘要
status回补结果full / partial / empty / unrecoverable / failed
note手动或自动异常说明
import hashlib
import json
from datetime import datetime, timezone


def write_gap_report(symbol: str,
                     interval: str,
                     gap_start: datetime,
                     gap_end: datetime,
                     expected_count: int,
                     recovered_klines: list,
                     missing_times: list,
                     raw_response: dict,
                     request_error: str = None) -> dict:
    """
    生成 gap_report。不直接写库,由调用方决定存储方式。
    status 判断基于覆盖验证结果,而非仅比数量。
    """
    raw_id = hashlib.sha256(
        json.dumps(raw_response, sort_keys=True, ensure_ascii=False, default=str).encode()
    ).hexdigest()[:16]

    recovered_count = len(recovered_klines) if recovered_klines else 0

    # 覆盖验证
    coverage = verify_gap_coverage(missing_times, recovered_klines)
    still_missing_count = len(coverage["still_missing"])

    # 判断状态
    if request_error:
        status = "failed"
    elif recovered_count == 0:
        status = "empty"
    elif coverage["coverage_complete"]:
        status = "full"
    elif recovered_count > 0 and still_missing_count > 0:
        status = "partial"
    else:
        status = "partial"

    note = f"expected {expected_count}, recovered {recovered_count}, still missing {still_missing_count}"
    if status == "failed":
        note = request_error
    elif status == "empty":
        note += ", check if gap window is outside available history range"

    return {
        "symbol": symbol,
        "interval": interval,
        "gap_start": gap_start.isoformat(),
        "gap_end": gap_end.isoformat(),
        "expected_count": expected_count,
        "recovered_count": recovered_count,
        "still_missing_count": still_missing_count,
        "raw_snapshot_id": raw_id,
        "status": status,
        "note": note,
        "reported_at": datetime.now(timezone.utc).isoformat()
    }

empty 不等于 unrecoverable REST 返回空且参数权限无误,同时确认该时段在数据源历史覆盖范围内,才标 unrecoverable。如果无法确认,先标 empty 等待人工判断。不要把 unrecoverable 标成 partial——下游会一直等一个永远不会来的回补。

七、6 个失败分支

#失败场景处理方式
REST 回补返回空 data先标 empty,确认参数和权限后,若不在历史覆盖范围则标 unrecoverable
回补条数够了但 recovered_times 未覆盖 missing_timesstatus=partial,标注仍缺失的具体时间点
交易日历误判导致 expected 不准确note 标注“交易日历可能不准确”,不强制补数
interval 不一致阻断,修正参数后重新拉取
同一缺口被重复回补回补前用 gap_start+end+symbol+interval 去重查询
raw_snapshot 未保存status=failednote 标注“缺失原始快照”,该记录不可复查

八、TickDB 的工程边界

上面这套方案是通用的,不绑定任何特定数据源。如果你用 TickDB 做行情接入,它在断流回补中的角色:

  • WebSocket:负责 ticker/depth/trade 的持续推送。鉴权使用 api_key query 参数。
  • REST K 线:负责断流后按缺口窗口独立查询历史 K 线。鉴权使用 X-API-Key Header。
  • MCP:供 AI 工具按需查询,不适合放进自动化断流回补链路。

三者各司其职。WebSocket 不负责历史回补,REST K 线不是实时推送通道,MCP 不替代监控链路。所有端点、字段路径和 timestamp 口径,以 TickDB 官方文档和实际测试为准。

九、断流回补的最小检查清单

  • [ ] 重连后,有没有根据 interval 和交易日历生成 expected_times?
  • [ ] actual_times 来源是否明确(本地聚合 bar_time 或业务侧有效行情时间轴)?
  • [ ] 缺口回补是否用 REST K 线独立查询,而不是靠 WebSocket 重新推送?
  • [ ] 每次回补是否生成 gap_report,包含 expected_count 和 recovered_count?
  • [ ] 是否执行了 verify_gap_coverage,比较 recovered_times 和 missing_times?
  • [ ] status=partial 是否标注了仍缺失的具体时间点?
  • [ ] status=empty 是否已确认是否为 unrecoverable
  • [ ] 回补前是否做了去重查询,防止重复写入?
  • [ ] 原始响应是否保存了 raw_snapshot_id?
  • [ ] 全部回补完成后是否检查了 status 分布?

你们现在监控 WebSocket 断流,是只看连接状态,还是已经把缺口检测和回补留痕做进了系统?遇到过最隐蔽的一次数据缺失,最后是怎么定位到的?

📡 本文以 TickDB 作为行情接入示例。文中代码为 Python 教学骨架,不依赖任何特定数据源的端点或字段。本文仅讨论断流回补的工程方法,不构成投资建议。

标签: Python, WebSocket, K线, 数据回补, 工程实践, TickDB

通过 TickDB API 获取实时行情数据

一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。

免费领取 API Key查看 API 文档

相关文章