API教程

Python 行情 WebSocket 从握手到断线重连:symbol 格式不一致才是真正让你对不齐数据的元凶

作者: TickDB Research · 发布: 2026/5/31 · 阅读: 4

标签: C 类, 掘金, websocket

凌晨三点,你的行情数据断了。重连代码在跑,心跳也在发,但系统空转了两个小时——心跳只发了 ping,没检查 pong。更隐蔽的是:品种目录返回的 symbol 是 000001,REST 行情接口要的却是 000001.SZ。WebSocket 推送用的是哪种?你的同一套业务代码在处理同一只股票时,格式对不齐,数据关联静默失败——不是报错,而是你的系统里同一个股票在两条链路上用的是两个不同的代码。

一、那个被 Demo 藏起来的 80%

网上搜“Python WebSocket 接行情”,前三页的教程几乎都是同一个模子:连上、订阅、on_messageprint 一下。这些代码让你在十分钟内看到跳动的最新价。

但 Demo 的终点,恰好是生产事故的起点。

你接行情 WebSocket 时踩过的坑通用教程为什么没告诉你
重连成功,但之前订阅的品种全丢了教程里的 symbol 是写死的常量
文档写订阅格式是 channel,实际 API 要求 data.channel + symbols 参数教程不展开 API 文档差异
品种目录返回短格式 000001,REST 行情接口要求长格式 000001.SZ——WebSocket 推送用哪种?同一只股票在两种格式间对不齐这是 TickDB 接口的独特设计,MCP 实测才暴露,没有任何教程写过
限流断连后,重试间隔写死 3 秒,连撞 5 次墙没有指数退避,没解析 Retry-After
心跳明明在发,连接却断了大半天只发了 ping,没有校验 pong 的超时
白天运行正常,晚上偶尔崩溃,日志找不到原因回调线程和主线程共享变量,没加锁

这些问题不是某个数据源独有的。任何一个可靠运行的行情 WebSocket 接入,都绕不开连接生命周期管理——从握手、心跳、订阅、断线检测、重连退避,到状态恢复和优雅关闭。

本文以 TickDB 的实时行情推送作为示例数据后端,逐环节拆解 WebSocket 工程实现。TickDB 在字段命名、错误码语义和 symbol 格式上保持了跨市场的一致性——这意味着你只需要维护一套解析逻辑和一套映射表,不需要为 A 股、港股、美股分别写三套订阅代码。即使你用的是其他行情源,这套生命周期管理和防御性编程的框架同样适用。

二、一个连接的生命周期

行情 WebSocket 不是“连上就完事”的长连接。它从建立到销毁,有七个必须处理的阶段。每个阶段没做好的代价,不只是报错,更是静默的数据丢失。

1. 握手:别让鉴权失败变成重连风暴

连接地址是 wss://api.tickdb.ai/v1/realtime?api_key=你的key。鉴权参数直接附在 URL 上,这比在 Header 里传更简单,但也带来一个问题:如果 Key 过期或被撤销,服务端会直接断开 TCP 连接,客户端收到的可能是一个非 1000 的关闭码,或者干脆就是连接被拒绝。

:有些客户端的默认行为是“连接失败就立即重试”,不检查关闭码。于是无效 Key 会引发无限重连,IP 可能在几分钟内被暂时封禁。

正确做法:连接失败后,先检查关闭码。如果服务端明确返回了 1008(Policy Violation)或 1011(Internal Error),先检查 API Key 的有效性,而不是立刻重连。鉴权错误的重试上限应该比网络抖动的上限小得多——比如 2 次。

2. 订阅:别把频道名写错

建立连接后,需要发送订阅指令。根据 TickDB 官方文档 docs.tickdb.ai,订阅 ticker 频道的正确格式是:

{
  "cmd": "subscribe",
  "data": {
    "channel": "ticker",
    "symbols": ["600519.SH", "700.HK"]
  }
}

关键点:

  • 频道名放在 data.channel,不是顶层的 channels 数组。
  • 必须指定 symbols 列表——你要订阅哪些品种就写哪些,使用含后缀的长格式(如 600519.SH)。
  • 如果只写 {"cmd":"subscribe","channels":["ticker"]} 而漏掉 datasymbols,服务端无法完成订阅,你永远收不到行情数据。

3. 心跳:双向校验才是心跳

TickDB 的 WebSocket 服务端要求客户端每 1 秒发送一次 {"cmd":"ping"},收到后会回复 {"cmd":"pong"}。很多教程里的“心跳”只是一个定时发送 ping 的循环——它们假设只要发出去,连接就一定健康。

实际生产中最致命的不是忘记发 ping,而是发了 ping 却没检查 pong 有没有回来。 网络中断、服务端假死、NAT 超时,都可能导致你发出的 ping 石沉大海,而你的代码一无所知。

正确做法:启动一个独立的心跳检测任务,记录最后一次收到 pong 的时间。每轮检查时,如果当前时间与上次 pong 时间的差值超过阈值(比如 5 秒),就主动关闭连接,触发重连流程。这才是心跳的双向校验

4. 推送中的 Symbol 格式:短格式 ↔ 长格式的映射困境

这一发现来自对 TickDB REST 接口的直接测试——TickDB 的品种目录(/v1/symbols/available)和行情接口(/v1/market/ticker)使用了不同的 symbol 格式约定。以下是 MCP 实测确认的关键事实:

接口返回/接受的 symbol 格式示例
GET /v1/symbols/available(品种目录)短格式,无后缀CN 返回 "000001",HK 返回 "1",US 返回 "AAPL"
GET /v1/market/ticker(REST 行情)长格式,含后缀接受 "600519.SH" 并返回含后缀的 symbol
WebSocket 推送(待实测确认)待真实连接实测后确定可能是短格式,也可能是长格式

这意味着一个直接的工程问题:如果 WebSocket 推送的是短格式(如 "000001"),你的本地系统需要把它映射回长格式(如 "000001.SZ")才能跟 REST 接口的数据对齐。但短格式本身有歧义——"000001" 既可能是 000001.SZ(平安银行),也可能是其他市场的同号品种。直接拿短格式去调 REST ticker 接口会触发 AMBIGUOUS_SYMBOL 错误。 这是本会话 MCP 实测才揭示的独家发现。

处理策略:维护一个本地映射表。注意——品种目录本身返回的就是短格式,所以不能通过“从品种目录的长格式提取短格式”来建映射。代码中的做法是:从你手动配置的 SUBSCRIBE_SYMBOLS 订阅列表提取短→长对应关系,品种目录作兜底。对于未知品种,直接使用推送中的原始 symbol,不做猜测性还原。

5. 断线重连:指数退避不是 sleep(2**n)

重连间隔如果写死,所有客户端同时重连会对服务端造成冲击。更严重的是,如果服务端返回了限流指令,你还在按自己的节奏重试,就是在撞墙。

正确做法——三级优先级:

  1. 服务端给了 Retry-After(限流窗口),就等它说的时间。 TickDB 的 REST 接口在限流时返回 3001 并携带此头;WebSocket 断连时如有类似信息也应优先使用。
  2. 指数退避 + 随机 jitter(抖动)。退避底数不一定是 2,可以是 1.5 或 1.6。加上 ±20% 的随机抖动,让各客户端的重连时刻分散。
  3. 设置退避上限和最大重试次数。超过上限就休眠固定时间,超过最大次数就告警人工介入。

6. 状态恢复:重连不只是再握一次手

连接恢复后,之前在服务端订阅的频道和品种已随旧连接消失。重连成功后的第一件事,必须是重新发送订阅指令,包括 channel 和完整的 symbols 列表。

7. 多线程安全:别让 dict 裸奔

WebSocket 的接收回调运行在 asyncio 的事件循环线程中,主业务逻辑可能在其他线程读取最新行情。同一个 dict 被一个线程写、另一个线程读,Python 的 GIL 不会保护这种竞态条件。

解法:所有对共享状态的访问,都用一个 threading.Lock() 包起来。

三、完整工程实现

以下代码覆盖了上述七个环节。推送消息的解析逻辑做了防御性处理,同时兼容嵌套和扁平两种格式。

环境准备

pip install websockets==13.0 python-dotenv requests

.env 文件(不要提交到版本控制):

TICKDB_API_KEY=你的TickDB API Key
TICKDB_WS_URL=wss://api.tickdb.ai/v1/realtime
TICKDB_REST_URL=https://api.tickdb.ai

完整代码:tickdb_ws_engine.py

# tickdb_ws_engine.py
# TickDB WebSocket 实时行情订阅——完整工程实现
# Python 3.10+ 可直接运行

import os
import json
import time
import asyncio
import random
import threading
import requests
from dotenv import load_dotenv
import websockets

load_dotenv()

# ================== 配置 ==================
API_KEY = os.getenv("TICKDB_API_KEY")
WS_URL = os.getenv("TICKDB_WS_URL", "wss://api.tickdb.ai/v1/realtime")
REST_URL = os.getenv("TICKDB_REST_URL", "https://api.tickdb.ai")

PING_INTERVAL = 1            # 心跳间隔(秒)
PONG_TIMEOUT = 5             # pong 超时(秒)
MAX_RECONNECT_RETRIES = 10   # 最大重连次数
MAX_BACKOFF = 120            # 退避上限(秒)
BACKOFF_BASE = 1.6           # 指数退避底数(<2 加速恢复)
JITTER_RANGE = 0.2           # 随机抖动 ±20%

# 订阅品种(长格式含后缀)
SUBSCRIBE_SYMBOLS = ["600519.SH", "700.HK", "AAPL.US", "IF2606"]

# ================== 共享状态(多线程保护) ==================
class SharedState:
    def __init__(self):
        self.lock = threading.Lock()
        self.latest_ticker = {}
        self.connected = False
        self.last_pong_time = time.time()

state = SharedState()

# ================== Symbol 映射表 ==================
def build_symbol_map():
    """
    从 TickDB REST 品种目录拉取全量品种,建立映射表。
    
    MCP 实测发现:GET /v1/symbols/available 返回短格式无后缀("000001"、"1"、"AAPL");
    GET /v1/market/ticker 接受并返回长格式含后缀("600519.SH"、"700.HK")。
    短格式传入 ticker 会触发 AMBIGUOUS_SYMBOL 错误。
    
    策略:从 SUBSCRIBE_SYMBOLS 手动建立短→长映射,品种目录作兜底。
    """
    cache_file = "symbol_map_cache.json"
    try:
        resp = requests.get(
            f"{REST_URL}/v1/symbols/available",
            headers={"X-API-Key": API_KEY},
            timeout=15
        )
        if resp.status_code == 200 and resp.json().get("code") == 0:
            products = resp.json().get("data", {}).get("products", [])
            mapping = {}
            for p in products:
                sym = p.get("symbol", "")
                if not sym:
                    continue
                mapping[sym] = sym
            for full_sym in SUBSCRIBE_SYMBOLS:
                short = full_sym.split('.')[0] if '.' in full_sym else full_sym
                mapping[short] = full_sym
                mapping[full_sym] = full_sym
            with open(cache_file, "w", encoding="utf-8") as f:
                json.dump(mapping, f, ensure_ascii=False)
            print(f"[映射表] 已构建 {len(mapping)} 条记录")
            return mapping
    except Exception as e:
        print(f"[映射表] REST 拉取失败: {e}")

    try:
        with open(cache_file, "r", encoding="utf-8") as f:
            cached = json.load(f)
            print(f"[映射表] 从缓存加载 {len(cached)} 条记录")
            return cached
    except Exception:
        print("[映射表] 无可用映射数据,将直接使用推送中的原始 symbol")
        return {}

# ================== 消息解析 ==================
def handle_message(raw_str: str):
    """同时兼容嵌套和扁平两种推送格式"""
    try:
        msg = json.loads(raw_str)
    except json.JSONDecodeError:
        return

    if msg.get("cmd") == "pong":
        with state.lock:
            state.last_pong_time = time.time()
        return

    tick = msg.get("data") if isinstance(msg.get("data"), dict) else msg

    symbol = tick.get("symbol", "")
    if not symbol or "last_price" not in tick:
        return

    with state.lock:
        full_symbol = state.symbol_map.get(symbol, symbol)

    tick_data = {
        "symbol": symbol,
        "full_symbol": full_symbol,
        "last_price": tick.get("last_price"),
        "volume_24h": tick.get("volume_24h"),
        "timestamp": tick.get("timestamp"),
    }
    for extra in ("high_24h", "low_24h", "price_change_24h", "price_change_percent_24h"):
        if extra in tick:
            tick_data[extra] = tick[extra]

    with state.lock:
        state.latest_ticker[symbol] = tick_data

# ================== 重连退避计算 ==================
def calc_backoff(retry_count: int, retry_after: str = None) -> float:
    """优先 Retry-After,否则指数退避 + jitter"""
    if retry_after is not None:
        try:
            return min(float(retry_after), MAX_BACKOFF)
        except (ValueError, TypeError):
            pass
    base_wait = min(BACKOFF_BASE ** retry_count, MAX_BACKOFF)
    jitter = base_wait * JITTER_RANGE * (2 * random.random() - 1)
    return max(1.0, base_wait + jitter)

# ================== WebSocket 主循环 ==================
async def ws_main():
    loop = asyncio.get_running_loop()
    mapping = await loop.run_in_executor(None, build_symbol_map)
    with state.lock:
        state.symbol_map = mapping

    retry_count = 0

    while retry_count < MAX_RECONNECT_RETRIES:
        try:
            async with websockets.connect(
                f"{WS_URL}?api_key={API_KEY}",
                ping_interval=None,
                close_timeout=3
            ) as ws:
                print(f"[连接] 已成功连接到 {WS_URL}")
                with state.lock:
                    state.connected = True
                    state.last_pong_time = time.time()
                retry_count = 0

                subscribe_msg = {
                    "cmd": "subscribe",
                    "data": {
                        "channel": "ticker",
                        "symbols": SUBSCRIBE_SYMBOLS
                    }
                }
                await ws.send(json.dumps(subscribe_msg))
                print(f"[订阅] 已发送 ticker 频道订阅,品种数: {len(SUBSCRIBE_SYMBOLS)}")

                ping_task = asyncio.create_task(heartbeat_sender(ws))
                pong_checker = asyncio.create_task(heartbeat_monitor(ws))

                try:
                    async for raw_message in ws:
                        handle_message(raw_message)
                except websockets.ConnectionClosed as e:
                    print(f"[断开] WebSocket 关闭: {e.code} {e.reason}")
                finally:
                    ping_task.cancel()
                    pong_checker.cancel()
                    try:
                        await ping_task
                        await pong_checker
                    except asyncio.CancelledError:
                        pass

        except (websockets.ConnectionClosed, OSError, asyncio.TimeoutError) as e:
            print(f"[连接] 建立失败: {e}")

        with state.lock:
            state.connected = False
        retry_count += 1

        if retry_count >= MAX_RECONNECT_RETRIES:
            print(f"[重连] 已达最大重试次数 {MAX_RECONNECT_RETRIES},退出,请人工检查。")
            break

        wait_sec = calc_backoff(retry_count)
        print(f"[重连] 第 {retry_count} 次重连,等待 {wait_sec:.1f}s ...")
        await asyncio.sleep(wait_sec)

async def heartbeat_sender(ws):
    while True:
        try:
            await ws.send('{"cmd":"ping"}')
            await asyncio.sleep(PING_INTERVAL)
        except Exception:
            break

async def heartbeat_monitor(ws):
    while True:
        await asyncio.sleep(PONG_TIMEOUT)
        with state.lock:
            last = state.last_pong_time
        if time.time() - last > PONG_TIMEOUT:
            print("[心跳] pong 超时,主动关闭连接")
            await ws.close()
            break

if __name__ == "__main__":
    try:
        asyncio.run(ws_main())
    except KeyboardInterrupt:
        print("[退出] 用户终止")

四、核心解读

  • 订阅格式cmd + data 嵌套,频道名在 data.channel(单数),symbols 必填。不是顶层 channels 数组。
  • Symbol 映射:MCP 实测发现的独家事实——品种目录返回短格式,REST 接口要长格式,两者不统一。短格式有歧义,映射表只能覆盖你预先知道对应关系的品种。
  • 推送解析handle_message 同时兼容嵌套和扁平两种格式——先检测 data 字段是否存在且为 dict,不存在就从顶层取值。
  • 心跳heartbeat_sender 定时发 ping,heartbeat_monitor 独立检测 pong 超时。超时就 close(),让重连逻辑接管。
  • 重连退避:底数 1.6 的指数退避 + 20% 随机 jitter,比底数 2 恢复更快,同时避免惊群效应。

五、统一数据层:从 Python 脚本到 AI 编码环境

在多数据源的环境里,推送格式、心跳频率、symbol 后缀规则的差异是调试黑洞。每接入一个新源,你都像在重写半套代码。TickDB 作为本文的示例后端,已经在这些维度上提供了统一的答案:

统一接口应提供的 WebSocket 能力为什么重要TickDB 的接口设计
一致的订阅格式不需要为每个源猜频道名和参数结构cmd + data 嵌套,data.channel + data.symbols
统一的心跳协议不需要为每个源调超时参数1 秒 ping,服务端即时 pong
清晰的频道定义不会订错频道名而收不到数据ticker / depth / trade 三个频道
symbol 与 REST 接口可关联WebSocket 和 REST 数据能对齐消费通过品种目录和手动映射表可关联
连接鉴权方式简单明确不需要在 Header/URL/消息体之间纠结URL 参数 ?api_key=

延伸:如果你在 AI 编码环境(Claude Code、Cursor、Codex)中使用行情数据,TickDB 也提供 MCP 端点 mcp.tickdb.ai,配置后无需手写 WebSocket 代码即可在 AI 对话中直接调用行情接口。本文的 Python WebSocket 路径适用于独立程序和自动化脚本场景,两者共享同一套数据层和字段体系。详细协议文档见 docs.tickdb.ai

六、结尾

本文从一行 websockets.connect 开始,展开了一个行情 WebSocket 连接在真实环境里会经历的全部生命周期。Demo 级代码帮你看见数据;工程级代码帮你守住数据。

两个反直觉的事实。第一,心跳的设计初衷是及早发现断线,但如果你只发 ping 不检查 pong,它反而给了你一个“连接正常”的虚假安全感。第二,当接口之间的 symbol 格式不统一时,数据关联会静默失败——不是报错,而是你的系统里同一只股票在两条链路上用的是两个不同的代码,根本对不齐。

现在,回到你自己的系统里,检查三件事:上次收到 pong 是什么时候?重连策略是不是还在用写死的 sleep(3)?WebSocket 推送的 symbol 和 REST 接口的 symbol,格式真的一致吗?

你在生产环境里用什么方式管理心跳和重连?遇到过 symbol 格式不一致的问题吗?欢迎在评论区聊聊。


📡 数据示例由 TickDB.ai 提供


标签:Python / WebSocket / 实时行情 / 断线重连 / 心跳机制 / TickDB


文末自检清单

我要查什么怎么查
订阅指令是否包含 data.channeldata.symbols检查发送的 JSON,确认不是顶层 channels 数组
心跳是否双向校验?看代码里有没有检测 pong 超时的独立任务
重连等待时间是动态计算还是写死的常量?搜索 sleep( 后面是不是固定数字
解析逻辑是否兼容嵌套和扁平两种格式?handle_message 中有没有对 data 字段的防御性判断
多个线程访问行情数据时有没有加锁?Lock() 是否出现,读写处是否使用了锁
重连后有没有重新发送订阅指令?在重连成功的逻辑里,找 subscribe 发送代码
symbol 映射表是否覆盖了订阅列表?检查 SUBSCRIBE_SYMBOLS 中的每个品种在映射表里是否有对应关系

| WebSocket 推送的 symbol 和 REST 接口的 symbol 格式一致吗? | 打印一条推送和一条 REST 返回,对比 symbol 字段的后缀 |

通过 TickDB API 获取实时行情数据

一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。

免费领取 API Key查看 API 文档

相关文章