系列文章第 7 篇:TickDB WebSocket 入门:订阅、心跳与断线后不能想当然的事
作者: TickDB Research · 发布: 2026/6/3 · 阅读: 2
标签: S02, 知乎, WebSocket 订阅
这是 TickDB 实时行情接入系列的第 7 篇。前一篇解决“AI Agent 查询行情”,本文解决“WebSocket 订阅”。
做持续行情接入时,我最早踩过的判断误区是:只要 WebSocket 已连接、控制台也打印了几条消息,就把链路视为完成。
实际上,连接只是起点。一个最小但合格的验证流程,至少需要回答四个问题:
- 客户端是否按正确结构发出了订阅。
- 客户端是否维持了服务端约定的心跳交互。
- 推送消息是否按真实嵌套结构解析,而不是凭印象读字段。
- 连接断开再建连后,程序是否诚实承认中间可能存在数据空档。
本文用 TickDB WebSocket 做一条最小验证链路:只订阅一个 ticker 品种,只验证订阅、心跳、读取一次推送与断线边界。不做告警系统,不讨论部署和交易动作。
先给结论
TickDB 的持续行情订阅应单独处理连接、订阅、心跳和断线恢复。收到推送可以证明当前连接下读到了消息;连接断开后重新订阅,只能说明程序恢复了继续接收的能力,不能直接证明断线期间的数据已被自动补齐。
从一次查询到持续订阅,任务变化在哪里
REST 快照适合回答“现在请求一次,返回了什么”。WebSocket 订阅面对的是另一组工程问题:连接会持续存在,客户端要发送订阅、保持心跳、解析异步消息,也要接受网络连接可能中断的现实。
因此,本篇不重复产品入口介绍,也不把 WebSocket 包装成一个完整监控系统。这里只关心最短的持续接收闭环:
连接 -> 订阅 -> 心跳 -> 收到推送 -> 识别断开 -> 重连并重新订阅
|
+-> 不宣称断线区间已补齐
连接生命周期:每一阶段实际要验证什么
| 阶段 | 客户端动作 | 可观察信号 | 到这一步仍不能推出什么 |
|---|---|---|---|
| 准备连接 | 读取本地 API Key,构造 WebSocket URL | 准备发起连接 | Key 有效、订阅一定成功 |
| 建立连接 | 连接 wss://api.tickdb.ai/v1/realtime?api_key=... | socket 建立或抛出连接异常 | 已开始接收目标品种数据 |
| 发送订阅 | 发送 subscribe,指定 ticker 与 symbol | 订阅命令已从客户端发出 | 指定观察窗口内必有行情更新 |
| 维持心跳 | 每秒发送 {"cmd":"ping"} | 收到 {"cmd":"pong"} | 行情消息没有遗漏 |
| 读取推送 | 从外层 cmd 与内层 data 解析消息 | data.symbol、data.last_price 可读取 | 推送具备连续性、可用于交易判断 |
| 检测断开 | 捕获关闭、异常或本地观察超时 | 明确记录连接状态变化 | 服务端会替客户端保存断线区间 |
| 恢复接收 | 新建连接并重新订阅 | 恢复后继续读取新消息 | 断线期间历史推送自动回放或完整补齐 |
这里最重要的一行是最后一行:恢复接收不是历史补偿的同义词。客户端可以把恢复做得明确、可观测,但不能在未获得明确补齐机制证据时,把中断区间写成“无缺口”。
最小可复现示例:连接、订阅、心跳、读一条推送
运行前提
- 取得当前有效的 TickDB API Key,并只存入本地环境变量。
- 使用 Python 3.10 或更高版本。
- 安装固定范围的 WebSocket 客户端依赖:
python -m pip install 'websockets>=12,<13'
- 本例默认订阅
AAPL.US的ticker更新。若观察窗口内没有收到行情消息,这只说明本次观察期内未拿到目标更新,不应直接判定产品能力或数据连续性。
环境变量
export TICKDB_API_KEY='<your-api-key>'
export TICKDB_SYMBOL='AAPL.US'
export TICKDB_OBSERVE_SECONDS='30'
TICKDB_OBSERVE_SECONDS 仅是本地验证脚本等待一条目标消息的观察窗口,不代表接口性能或服务承诺。
验证脚本
import asyncio
import contextlib
import json
import os
from urllib.parse import quote
import websockets
SYMBOL = os.environ.get("TICKDB_SYMBOL", "AAPL.US")
OBSERVE_SECONDS = float(os.environ.get("TICKDB_OBSERVE_SECONDS", "30"))
async def send_heartbeat(ws):
while True:
await ws.send(json.dumps({"cmd": "ping"}))
await asyncio.sleep(1)
async def verify_once():
api_key = os.environ.get("TICKDB_API_KEY")
if not api_key:
print("[error] set TICKDB_API_KEY in the local environment first")
return
url = (
"wss://api.tickdb.ai/v1/realtime?api_key="
+ quote(api_key, safe="")
)
subscription = {
"cmd": "subscribe",
"data": {"channel": "ticker", "symbols": [SYMBOL]},
}
try:
async with websockets.connect(url) as ws:
await ws.send(json.dumps(subscription))
print(f"[sent] subscribe channel=ticker symbol={SYMBOL}")
heartbeat_task = asyncio.create_task(send_heartbeat(ws))
got_pong = False
deadline = asyncio.get_running_loop().time() + OBSERVE_SECONDS
try:
while True:
remaining = deadline - asyncio.get_running_loop().time()
if remaining <= 0:
print("[observe] no ticker message in local observation window")
return
raw = await asyncio.wait_for(ws.recv(), timeout=remaining)
message = json.loads(raw)
if message.get("cmd") == "pong":
if not got_pong:
print("[recv] pong")
got_pong = True
continue
data = message.get("data")
if (
isinstance(data, dict)
and "symbol" in data
and "last_price" in data
):
redacted = {
"symbol": data["symbol"],
"last_price": "<redacted>",
}
print("[recv]", message.get("cmd"), json.dumps(redacted))
return
print("[recv] other message; inspect cmd/data before extending parser")
finally:
heartbeat_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await heartbeat_task
except (OSError, websockets.WebSocketException) as exc:
# Do not print an exception string that might include a URL carrying the key.
print("[error] websocket connection failed:", type(exc).__name__)
if __name__ == "__main__":
asyncio.run(verify_once())
运行:
python tickdb_ws_verify.py
脱敏输出样式
以下输出用于展示按已核验消息结构脱敏后的日志样式,价格值已遮盖;它不是本文写作线程新执行的一次在线行情记录。正式发布门禁如要求实测凭证,应使用有效 Key 运行上面的脚本并保存去敏后的输出。
[sent] subscribe channel=ticker symbol=AAPL.US
[recv] pong
[recv] ticker {"symbol": "AAPL.US", "last_price": "<redacted>"}
这段示例特意做了三件小事:
- API Key 只从环境变量读取,日志不打印带 Key 的连接地址。
- 订阅明确写成
{"cmd": "subscribe", "data": {"channel": "ticker", "symbols": [...]}}。 - 解析行情时先读外层消息,再从
message["data"]读取字段;没有把推送误写成扁平 JSON。
心跳是活性检查,不是数据补齐证明
在本文使用的 WebSocket 约定中,客户端每秒发送:
{"cmd": "ping"}
服务端返回:
{"cmd": "pong"}
这让客户端能够看到连接仍在发生心跳往返,并在异常时更快转入处理分支。但心跳的含义需要说准:
| 观察到的事情 | 可以怎样解释 | 不应怎样解释 |
|---|---|---|
收到 pong | 本次心跳交互得到了响应 | 所有行情更新都已收到 |
收到一条 ticker 推送 | 当前连接中读到一条符合解析结构的更新 | 从启动到现在没有任何缺口 |
| socket 关闭或抛出异常 | 客户端应记录中断并进入恢复流程 | 中断区间可忽略 |
| 重连后重新收到消息 | 客户端恢复继续接收新消息 | 历史缺失消息已自动回放 |
把心跳写清楚,程序的日志才有审计价值。否则,“有 pong”很容易被误读为“有完整行情”,而这两个判断不是一回事。
断线后如何处理:先标记空档,再恢复继续接收
一个克制的恢复流程可以是:
- 记录断线发生的本地时间、当前订阅集合和异常类型,不在日志中暴露 Key。
- 重新建立 WebSocket 连接。
- 重新发送当前程序需要的订阅命令。
- 将“恢复后第一条成功读取的消息”记为恢复接收起点。
- 对中断期间是否需要另行校准状态,按业务需求和已核验的数据入口单独设计;不要在 WebSocket 重连逻辑中假定已经补齐。
本文故意不继续展开状态校准代码。那会进入另一个需要单独核验的接口主题,也会让一篇讲 WebSocket 边界的文章变成完整监控实现。
能保证 / 不能保证:发布前最值得保留的一张表
| 在本文最小验证范围内可以确认 | 本文不能保证,也不应宣称 |
|---|---|
| 客户端可按所示 URL 形式发起 WebSocket 连接 | 连接永久不断开 |
客户端可发送 ticker 订阅命令 | 观察窗口内必然产生目标品种更新 |
客户端可发送 ping 并识别 pong | 心跳等于行情无遗漏 |
客户端可按 cmd + data 嵌套结构编写解析逻辑 | 所有业务所需字段、市场或频道都已被本文验证 |
| 客户端可发现异常并重新连接、重新订阅 | 断线期间消息自动补齐、自动回放或天然无空档 |
| 本例可作为最小链路验证脚本 | 完整监控部署、通知到达、交易动作或业务结果 |
排错清单:没有得到预期结果时先看这里
| 现象 | 优先检查项 | 处理方式 |
|---|---|---|
| 一开始就无法连接 | URL 是否为 /v1/realtime?api_key=...;环境变量是否实际存在 | 不打印完整 URL;修正 Key 传入方式后重试 |
| socket 已连接但没有目标推送 | 是否发送了 subscribe;频道是否为 ticker;symbol 是否按本文示例填写 | 先保留原始去敏日志,不凭“无消息”外推结论 |
有消息但代码读不到 symbol | 是否误写成 message["symbol"] | 改为检查外层 cmd,再从 message["data"] 读取 |
日志持续看到 pong,却没有行情消息 | 是否把心跳当作行情;本地观察窗口是否过短或目标暂无更新 | 区分活性响应与数据推送,记录观察结果 |
| 连接中断后脚本又有消息了 | 是否把重新接收写成“断线无损” | 记录中断区间和恢复起点,不宣称历史补齐 |
| 想继续加入更多频道或字段 | 是否已有当前文档或实测支持 | 先按对应能力单独核验,再扩展解析逻辑 |
我为什么把示例停在这里
从这里继续写,很容易滑向“多品种告警”“通知机器人”“守护进程部署”或者“交易触发”。它们各自都有价值,但不是本文的问题。
对准备接入持续行情的开发者,第一步更应该是把基础链路写对:明确连接方式,发送真实订阅,按真实结构解析消息,在心跳和断线处留下不会误导自己的日志。做到这些之后,后续应用才有可靠的起点。
系列导航
| ID | 主题 | 与本文的关系 |
|---|---|---|
S00 | TickDB 产品总入口与首次验证路线图 | 系列总入口:先选择 REST、WebSocket 或工具调用路径 |
S01 | REST 完成一次行情查询 | 单次查询的最小验证路线 |
S02 | WebSocket 订阅、心跳与恢复边界 | 本文:持续接收链路与恢复误区 |
S03 | REST / WebSocket / MCP / Skill / CLI 怎样选择 | 多入口决策表 |
S05 | symbol、字段与 timestamp 的数据语义 | 后续数据语义专题 |
S06 | 个人开发者的只读监控工作流 | 后续场景专题,不在本文展开 |
S08 | 鉴权、空数据、时间单位和接入 FAQ | 后续集中排错入口 |
参考入口
- TickDB WebSocket 快速开始:
- TickDB WebSocket 频道与消息:
本文展示的是行情数据接入与连接恢复边界,不构成投资建议,也不对数据补齐、通知送达或业务结果作出保证。
通过 TickDB API 获取实时行情数据
一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。
免费领取 API Key查看 API 文档