综合

系列文章第 7 篇:TickDB WebSocket 入门:订阅、心跳与断线后不能想当然的事

作者: TickDB Research · 发布: 2026/6/3 · 阅读: 2

标签: S02, 知乎, WebSocket 订阅

这是 TickDB 实时行情接入系列的第 7 篇。前一篇解决“AI Agent 查询行情”,本文解决“WebSocket 订阅”。

做持续行情接入时,我最早踩过的判断误区是:只要 WebSocket 已连接、控制台也打印了几条消息,就把链路视为完成。

实际上,连接只是起点。一个最小但合格的验证流程,至少需要回答四个问题:

  1. 客户端是否按正确结构发出了订阅。
  2. 客户端是否维持了服务端约定的心跳交互。
  3. 推送消息是否按真实嵌套结构解析,而不是凭印象读字段。
  4. 连接断开再建连后,程序是否诚实承认中间可能存在数据空档。

本文用 TickDB WebSocket 做一条最小验证链路:只订阅一个 ticker 品种,只验证订阅、心跳、读取一次推送与断线边界。不做告警系统,不讨论部署和交易动作。

先给结论

TickDB 的持续行情订阅应单独处理连接、订阅、心跳和断线恢复。收到推送可以证明当前连接下读到了消息;连接断开后重新订阅,只能说明程序恢复了继续接收的能力,不能直接证明断线期间的数据已被自动补齐。

从一次查询到持续订阅,任务变化在哪里

REST 快照适合回答“现在请求一次,返回了什么”。WebSocket 订阅面对的是另一组工程问题:连接会持续存在,客户端要发送订阅、保持心跳、解析异步消息,也要接受网络连接可能中断的现实。

因此,本篇不重复产品入口介绍,也不把 WebSocket 包装成一个完整监控系统。这里只关心最短的持续接收闭环:

连接 -> 订阅 -> 心跳 -> 收到推送 -> 识别断开 -> 重连并重新订阅
                                              |
                                              +-> 不宣称断线区间已补齐

连接生命周期:每一阶段实际要验证什么

阶段客户端动作可观察信号到这一步仍不能推出什么
准备连接读取本地 API Key,构造 WebSocket URL准备发起连接Key 有效、订阅一定成功
建立连接连接 wss://api.tickdb.ai/v1/realtime?api_key=...socket 建立或抛出连接异常已开始接收目标品种数据
发送订阅发送 subscribe,指定 ticker 与 symbol订阅命令已从客户端发出指定观察窗口内必有行情更新
维持心跳每秒发送 {"cmd":"ping"}收到 {"cmd":"pong"}行情消息没有遗漏
读取推送从外层 cmd 与内层 data 解析消息data.symboldata.last_price 可读取推送具备连续性、可用于交易判断
检测断开捕获关闭、异常或本地观察超时明确记录连接状态变化服务端会替客户端保存断线区间
恢复接收新建连接并重新订阅恢复后继续读取新消息断线期间历史推送自动回放或完整补齐

这里最重要的一行是最后一行:恢复接收不是历史补偿的同义词。客户端可以把恢复做得明确、可观测,但不能在未获得明确补齐机制证据时,把中断区间写成“无缺口”。

最小可复现示例:连接、订阅、心跳、读一条推送

运行前提

  1. 取得当前有效的 TickDB API Key,并只存入本地环境变量。
  2. 使用 Python 3.10 或更高版本。
  3. 安装固定范围的 WebSocket 客户端依赖:
python -m pip install 'websockets>=12,<13'
  1. 本例默认订阅 AAPL.USticker 更新。若观察窗口内没有收到行情消息,这只说明本次观察期内未拿到目标更新,不应直接判定产品能力或数据连续性。

环境变量

export TICKDB_API_KEY='<your-api-key>'
export TICKDB_SYMBOL='AAPL.US'
export TICKDB_OBSERVE_SECONDS='30'

TICKDB_OBSERVE_SECONDS 仅是本地验证脚本等待一条目标消息的观察窗口,不代表接口性能或服务承诺。

验证脚本

import asyncio
import contextlib
import json
import os
from urllib.parse import quote

import websockets


SYMBOL = os.environ.get("TICKDB_SYMBOL", "AAPL.US")
OBSERVE_SECONDS = float(os.environ.get("TICKDB_OBSERVE_SECONDS", "30"))


async def send_heartbeat(ws):
    while True:
        await ws.send(json.dumps({"cmd": "ping"}))
        await asyncio.sleep(1)


async def verify_once():
    api_key = os.environ.get("TICKDB_API_KEY")
    if not api_key:
        print("[error] set TICKDB_API_KEY in the local environment first")
        return

    url = (
        "wss://api.tickdb.ai/v1/realtime?api_key="
        + quote(api_key, safe="")
    )
    subscription = {
        "cmd": "subscribe",
        "data": {"channel": "ticker", "symbols": [SYMBOL]},
    }

    try:
        async with websockets.connect(url) as ws:
            await ws.send(json.dumps(subscription))
            print(f"[sent] subscribe channel=ticker symbol={SYMBOL}")

            heartbeat_task = asyncio.create_task(send_heartbeat(ws))
            got_pong = False
            deadline = asyncio.get_running_loop().time() + OBSERVE_SECONDS

            try:
                while True:
                    remaining = deadline - asyncio.get_running_loop().time()
                    if remaining <= 0:
                        print("[observe] no ticker message in local observation window")
                        return

                    raw = await asyncio.wait_for(ws.recv(), timeout=remaining)
                    message = json.loads(raw)

                    if message.get("cmd") == "pong":
                        if not got_pong:
                            print("[recv] pong")
                            got_pong = True
                        continue

                    data = message.get("data")
                    if (
                        isinstance(data, dict)
                        and "symbol" in data
                        and "last_price" in data
                    ):
                        redacted = {
                            "symbol": data["symbol"],
                            "last_price": "<redacted>",
                        }
                        print("[recv]", message.get("cmd"), json.dumps(redacted))
                        return

                    print("[recv] other message; inspect cmd/data before extending parser")
            finally:
                heartbeat_task.cancel()
                with contextlib.suppress(asyncio.CancelledError):
                    await heartbeat_task
    except (OSError, websockets.WebSocketException) as exc:
        # Do not print an exception string that might include a URL carrying the key.
        print("[error] websocket connection failed:", type(exc).__name__)


if __name__ == "__main__":
    asyncio.run(verify_once())

运行:

python tickdb_ws_verify.py

脱敏输出样式

以下输出用于展示按已核验消息结构脱敏后的日志样式,价格值已遮盖;它不是本文写作线程新执行的一次在线行情记录。正式发布门禁如要求实测凭证,应使用有效 Key 运行上面的脚本并保存去敏后的输出。

[sent] subscribe channel=ticker symbol=AAPL.US
[recv] pong
[recv] ticker {"symbol": "AAPL.US", "last_price": "<redacted>"}

这段示例特意做了三件小事:

  1. API Key 只从环境变量读取,日志不打印带 Key 的连接地址。
  2. 订阅明确写成 {"cmd": "subscribe", "data": {"channel": "ticker", "symbols": [...]}}
  3. 解析行情时先读外层消息,再从 message["data"] 读取字段;没有把推送误写成扁平 JSON。

心跳是活性检查,不是数据补齐证明

在本文使用的 WebSocket 约定中,客户端每秒发送:

{"cmd": "ping"}

服务端返回:

{"cmd": "pong"}

这让客户端能够看到连接仍在发生心跳往返,并在异常时更快转入处理分支。但心跳的含义需要说准:

观察到的事情可以怎样解释不应怎样解释
收到 pong本次心跳交互得到了响应所有行情更新都已收到
收到一条 ticker 推送当前连接中读到一条符合解析结构的更新从启动到现在没有任何缺口
socket 关闭或抛出异常客户端应记录中断并进入恢复流程中断区间可忽略
重连后重新收到消息客户端恢复继续接收新消息历史缺失消息已自动回放

把心跳写清楚,程序的日志才有审计价值。否则,“有 pong”很容易被误读为“有完整行情”,而这两个判断不是一回事。

断线后如何处理:先标记空档,再恢复继续接收

一个克制的恢复流程可以是:

  1. 记录断线发生的本地时间、当前订阅集合和异常类型,不在日志中暴露 Key。
  2. 重新建立 WebSocket 连接。
  3. 重新发送当前程序需要的订阅命令。
  4. 将“恢复后第一条成功读取的消息”记为恢复接收起点。
  5. 对中断期间是否需要另行校准状态,按业务需求和已核验的数据入口单独设计;不要在 WebSocket 重连逻辑中假定已经补齐。

本文故意不继续展开状态校准代码。那会进入另一个需要单独核验的接口主题,也会让一篇讲 WebSocket 边界的文章变成完整监控实现。

能保证 / 不能保证:发布前最值得保留的一张表

在本文最小验证范围内可以确认本文不能保证,也不应宣称
客户端可按所示 URL 形式发起 WebSocket 连接连接永久不断开
客户端可发送 ticker 订阅命令观察窗口内必然产生目标品种更新
客户端可发送 ping 并识别 pong心跳等于行情无遗漏
客户端可按 cmd + data 嵌套结构编写解析逻辑所有业务所需字段、市场或频道都已被本文验证
客户端可发现异常并重新连接、重新订阅断线期间消息自动补齐、自动回放或天然无空档
本例可作为最小链路验证脚本完整监控部署、通知到达、交易动作或业务结果

排错清单:没有得到预期结果时先看这里

现象优先检查项处理方式
一开始就无法连接URL 是否为 /v1/realtime?api_key=...;环境变量是否实际存在不打印完整 URL;修正 Key 传入方式后重试
socket 已连接但没有目标推送是否发送了 subscribe;频道是否为 ticker;symbol 是否按本文示例填写先保留原始去敏日志,不凭“无消息”外推结论
有消息但代码读不到 symbol是否误写成 message["symbol"]改为检查外层 cmd,再从 message["data"] 读取
日志持续看到 pong,却没有行情消息是否把心跳当作行情;本地观察窗口是否过短或目标暂无更新区分活性响应与数据推送,记录观察结果
连接中断后脚本又有消息了是否把重新接收写成“断线无损”记录中断区间和恢复起点,不宣称历史补齐
想继续加入更多频道或字段是否已有当前文档或实测支持先按对应能力单独核验,再扩展解析逻辑

我为什么把示例停在这里

从这里继续写,很容易滑向“多品种告警”“通知机器人”“守护进程部署”或者“交易触发”。它们各自都有价值,但不是本文的问题。

对准备接入持续行情的开发者,第一步更应该是把基础链路写对:明确连接方式,发送真实订阅,按真实结构解析消息,在心跳和断线处留下不会误导自己的日志。做到这些之后,后续应用才有可靠的起点。

系列导航

ID主题与本文的关系
S00TickDB 产品总入口与首次验证路线图系列总入口:先选择 REST、WebSocket 或工具调用路径
S01REST 完成一次行情查询单次查询的最小验证路线
S02WebSocket 订阅、心跳与恢复边界本文:持续接收链路与恢复误区
S03REST / WebSocket / MCP / Skill / CLI 怎样选择多入口决策表
S05symbol、字段与 timestamp 的数据语义后续数据语义专题
S06个人开发者的只读监控工作流后续场景专题,不在本文展开
S08鉴权、空数据、时间单位和接入 FAQ后续集中排错入口

参考入口

  • TickDB WebSocket 快速开始:
  • TickDB WebSocket 频道与消息:

本文展示的是行情数据接入与连接恢复边界,不构成投资建议,也不对数据补齐、通知送达或业务结果作出保证。


通过 TickDB API 获取实时行情数据

一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。

免费领取 API Key查看 API 文档

相关文章