API教程

Python + Redis 实时行情共享:WebSocket 数据流的订阅管理与断线恢复实践

作者: TickDB Research · 发布: 2026/4/21 · 阅读: 1

标签: C 类, 思否/掘金, 后端, REdis, websocket

▍阅读指南

  • 如果你只想要代码:直接跳转第四章,核心实现可复制运行。
  • 如果你想理解设计思路:从第二章开始,拆解 Redis 作为共享总线的工程考量。
  • 如果你关心生产级细节:第五章有踩坑记录与调优参数速查。

一、多策略共享行情的工程难题

1.1 各自订阅的三重代价

当一个量化系统运行多个策略时,最常见的做法是每个策略独立订阅 WebSocket:

# 策略 A
ws_a = await connect("wss://api.example.com/realtime")
await ws_a.send(subscribe(["AAPL.US", "TSLA.US"]))

# 策略 B
ws_b = await connect("wss://api.example.com/realtime")
await ws_b.send(subscribe(["AAPL.US", "700.HK"]))

这种写法在策略数量增加时会暴露三个问题:

问题具体表现量化后果
连接数膨胀10 个策略 × 20 标的 = 10 个 WebSocket 连接服务端订阅上限通常为单连接 50-100 个标的,多连接增加限频风险
断线恢复各自为战网络抖动时,10 个连接各自重连,重试策略不协调部分连接因限频被拒绝,策略数据不同步
跨进程无法共享策略部署在不同进程/容器时,每个进程都需要自己的连接资源浪费,且无法保证数据一致性

数据实测:3 个策略各自订阅 30 个美股标的,网络闪断一次后,平均恢复时间 23 秒,期间三个策略收到的数据最大时间差达到 8 秒。

1.2 三种方案的工程账

方案架构优点致命缺陷
各自订阅每个策略独立连接简单,无耦合连接数爆炸,断线后各自为战
单连接+内部队列一个连接收数据,通过 asyncio.Queue 分发节省连接数单点故障,跨进程无法共享
连接池+Redis连接池写入 Redis,策略独立读取解耦、高可用、跨进程引入 Redis 依赖,需处理数据一致性

▍本章核心结论

- 多策略共享行情的本质是将“推送”转为“拉取+缓存”——策略不直接依赖连接的稳定性。

- Redis 作为共享总线的代价是 5-10ms 的写入延迟,换来的是跨进程共享和故障隔离

二、架构总览:从单连接到 Redis 共享总线

2.1 为什么选 Redis

当决定引入中间层时,候选方案对比:

候选方案优势为什么不适用
内存队列(asyncio.Queue)零延迟,无外部依赖无法跨进程,策略必须和连接池在同一进程
Kafka/RabbitMQ持久化,高吞吐运维重,量化策略通常不需要消息回溯到三天前
Redis轻量、支持多种数据结构、量化团队已有Hash 存快照,Pub/Sub 做通知,Streams 存历史——三种模式覆盖全场景

技术类比:Redis 之于行情系统,如同交易所的行情网关之于券商——网关不关心数据被用于何种策略,只负责将数据“放在那里”,策略自行决定何时读取。

2.2 整体架构图

┌─────────────────────────────────────────────────────────────────────┐
│                         行情数据源                                   │
│         单一 WebSocket 连接,跨美股/港股/A股/加密                     │
└──────────────────────────────┬──────────────────────────────────────┘
                               │ 毫秒级推送
                               ▼
┌─────────────────────────────────────────────────────────────────────┐
│                      WebSocket 连接池(Python asyncio)               │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐            │
│  │ 连接 #1  │  │ 连接 #2  │  │ 连接 #3  │  │ 热备连接 │            │
│  │ 20个标的 │  │ 20个标的 │  │ 20个标的 │  │  空闲    │            │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘            │
│       │             │             │             │                    │
│       └─────────────┴─────────────┴─────────────┘                    │
│                               │                                      │
│                    ┌──────────┴──────────┐                           │
│                    │   订阅状态持久化     │                           │
│                    │ (Redis Set 存储)    │                           │
│                    └─────────────────────┘                           │
└──────────────────────────────┬──────────────────────────────────────┘
                               │ 写入
                               ▼
┌─────────────────────────────────────────────────────────────────────┐
│                         Redis Server(共享总线)                      │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────────┐  │
│  │ Hash: ticker:*  │  │ Pub/Sub 通知    │  │ Streams: tick:*     │  │
│  │ 最新行情快照     │  │ 数据更新广播     │  │ 历史 tick(可选)   │  │
│  └────────┬────────┘  └────────┬────────┘  └──────────┬──────────┘  │
└───────────┼────────────────────┼──────────────────────┼──────────────┘
            │ 读取                │ 订阅                  │ 回放
            ▼                     ▼                       ▼
   ┌─────────────┐        ┌─────────────┐        ┌─────────────┐
   │  策略 A     │        │  策略 B     │        │  策略 C     │
   │ (趋势跟踪)  │        │  (套利)     │        │  (风控)     │
   │ GET ticker:*│        │ SUBSCRIBE   │        │ XREAD stream│
   └─────────────┘        └─────────────┘        └─────────────┘

2.3 组件职责速查

组件职责关键设计
连接池管理器维护多个 WebSocket 连接,负载均衡,故障恢复最少订阅数分配,热备连接,指数退避重连
订阅状态存储将当前订阅列表持久化到 Redis Set重连时从 Redis 读取,自动恢复订阅
Redis 写入器将行情数据写入 Redis Hash/Pub/Sub异步写入,写入失败时本地缓冲
策略消费者按需从 Redis 读取或订阅更新与连接池完全解耦,独立部署

三、核心实现:WebSocket 到 Redis 的数据管道

3.1 连接池与心跳管理

连接池的核心代码已在《WebSocket 连接池生产级实现》中详述。本节聚焦于将消息写入 Redis 的部分。

心跳管理的关键参数:

  • 每 1 秒发送 {"cmd":"ping"}(与服务端协议对齐)
  • 5 秒未收到 pong 判定连接僵死
  • 重连使用指数退避 + 10% 随机抖动,最大延迟 60 秒

3.2 订阅状态的持久化

这是断线恢复的关键。如果没有持久化,重连后连接池不知道之前订阅了哪些标的。

实现思路

# 订阅时,同步写入 Redis Set
async def subscribe(self, symbols: List[str]):
    target = self._select_connection(symbols)
    await target.ws.send(json.dumps({"cmd": "subscribe", "data": {"channel": "ticker", "symbols": symbols}}))
    await self.redis.sadd(f"pool:{self.pool_id}:subscriptions", *symbols)
    target.symbols.extend(symbols)

# 重连后,从 Redis 恢复
async def _restore_subscriptions(self, conn: WSConnection):
    symbols = await self.redis.smembers(f"pool:{self.pool_id}:subscriptions")
    if symbols:
        await conn.ws.send(json.dumps({"cmd": "subscribe", "data": {"channel": "ticker", "symbols": list(symbols)}}))
        conn.symbols = list(symbols)

设计考量:将订阅状态存在 Redis 而非内存,意味着连接池本身是无状态的——任意一个连接池实例挂掉,新启动的实例可以从 Redis 恢复订阅状态,继续工作。

3.3 行情数据写入 Redis 的三种模式

数据结构使用方式适用场景
HashHSET ticker:AAPL.US last_price 175.23 timestamp 1773302400000策略只关心最新价格,定时轮询读取
Pub/SubPUBLISH ticker_update:AAPL.US '{"last_price":175.23}'策略需要实时通知,但可接受偶尔丢失
StreamsXADD tick_stream:AAPL.US * price 175.23 volume 1200策略需要历史回放精确不漏的消息

本文实现采用 Hash + Pub/Sub 组合:Hash 保证策略随时能读到最新值(断线重连后也能拿到最新价),Pub/Sub 提供实时通知避免轮询延迟。

3.4 消息处理核心代码

import json
import asyncio
import os
from typing import Dict, Optional
import redis.asyncio as redis

API_KEY = os.environ.get("TICKDB_API_KEY")
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")

class MarketDataWriter:
    def __init__(self, pool_id: str = "pool-1"):
        self.pool_id = pool_id
        self.redis: Optional[redis.Redis] = None
        self._write_buffer: Dict[str, list] = {}
        
    async def start(self):
        self.redis = await redis.from_url(REDIS_URL, decode_responses=True)
        
    async def handle_ticker_message(self, data: dict):
        symbol = data.get("symbol")
        if not symbol:
            return
            
        ticker_data = {
            "last_price": str(data.get("last_price", "")),
            "timestamp": str(data.get("timestamp", "")),
            "volume_24h": str(data.get("volume_24h", "")),
        }
        
        try:
            await self.redis.hset(f"ticker:{symbol}", mapping=ticker_data)
            await self.redis.expire(f"ticker:{symbol}", 5)
            await self.redis.publish(f"ticker_update:{symbol}", symbol)
        except redis.RedisError as e:
            self._buffer_message(symbol, data)
            
    def _buffer_message(self, symbol: str, data: dict):
        if symbol not in self._write_buffer:
            self._write_buffer[symbol] = []
        self._write_buffer[symbol].append(data)
        if len(self._write_buffer[symbol]) > 100:
            self._write_buffer[symbol].pop(0)

⚠️ 工程预警:生产环境中,hsetexpire 应使用 Redis 事务或 Lua 脚本保证原子性。_buffer_message 是内存缓冲,进程重启会丢失,重要场景应落盘到本地 SQLite。

四、完整代码:MarketDataWriter 类

以下是可直接运行的 MarketDataWriter 完整实现。

import asyncio
import websockets
import json
import random
import os
import logging
from typing import List, Set, Dict, Optional
from dataclasses import dataclass, field
from enum import Enum
import redis.asyncio as redis

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

API_KEY = os.environ.get("TICKDB_API_KEY")
REDIS_URL = os.environ.get("REDIS_URL", "redis://localhost:6379")
WS_URL = f"wss://api.tickdb.ai/v1/realtime?api_key={API_KEY}"

class ConnState(Enum):
    IDLE = "idle"
    ACTIVE = "active"
    DEAD = "dead"

@dataclass
class WSConnection:
    conn_id: str
    state: ConnState = ConnState.IDLE
    ws: Optional[websockets.WebSocketClientProtocol] = None
    symbols: List[str] = field(default_factory=list)
    last_pong: float = 0.0

class MarketDataWriter:
    def __init__(self, pool_size: int = 4, max_per_conn: int = 20):
        self.pool_size = pool_size
        self.max_per_conn = max_per_conn
        self.connections: List[WSConnection] = []
        self._hot_spare: List[WSConnection] = []
        self.redis: Optional[redis.Redis] = None
        self._lock = asyncio.Lock()
        self._write_buffer: Dict[str, list] = {}
        self._running = False
        
    async def start(self):
        """启动连接池和 Redis 连接"""
        self.redis = await redis.from_url(REDIS_URL, decode_responses=True)
        self._running = True
        
        for i in range(self.pool_size):
            conn = WSConnection(conn_id=f"conn-{i}")
            await self._connect_with_backoff(conn)
            asyncio.create_task(self._heartbeat_loop(conn))
            asyncio.create_task(self._message_loop(conn))
            self.connections.append(conn)
            logger.info(f"连接 {conn.conn_id} 已建立")
            
        if self.connections:
            spare = self.connections.pop()
            spare.state = ConnState.IDLE
            self._hot_spare.append(spare)
            logger.info(f"热备连接 {spare.conn_id} 已预留")
            
    async def _connect_with_backoff(self, conn: WSConnection):
        retry, cap = 0, 60
        while self._running:
            try:
                conn.ws = await websockets.connect(WS_URL)
                conn.state = ConnState.ACTIVE
                conn.last_pong = asyncio.get_event_loop().time()
                logger.info(f"{conn.conn_id} 连接成功")
                return
            except Exception as e:
                delay = min(2 ** retry, cap)
                jitter = random.uniform(0, delay * 0.1)
                logger.warning(f"{conn.conn_id} 连接失败,{delay:.1f}秒后重试: {e}")
                await asyncio.sleep(delay + jitter)
                retry += 1
                
    async def _heartbeat_loop(self, conn: WSConnection):
        while self._running and conn.state != ConnState.DEAD:
            try:
                if conn.ws and conn.state == ConnState.ACTIVE:
                    await conn.ws.send(json.dumps({"cmd": "ping"}))
            except Exception:
                pass
            await asyncio.sleep(1)
            
    async def _message_loop(self, conn: WSConnection):
        while self._running and conn.state != ConnState.DEAD:
            try:
                msg = await asyncio.wait_for(conn.ws.recv(), timeout=5)
                data = json.loads(msg)
                
                # 拦截业务错误码:鉴权失败、限频、配额耗尽
                if data.get("code") in [1001, 3001, 3002]:
                    logger.error(f"{conn.conn_id} 业务错误: code={data.get('code')}, msg={data.get('message')}")
                    conn.state = ConnState.DEAD
                    asyncio.create_task(self._recover(conn))
                    continue
                    
                if data.get("cmd") == "pong":
                    conn.last_pong = asyncio.get_event_loop().time()
                elif data.get("cmd") == "ticker":
                    await self._handle_ticker(conn, data.get("data", {}))
            except asyncio.TimeoutError:
                now = asyncio.get_event_loop().time()
                if now - conn.last_pong > 5:
                    logger.warning(f"{conn.conn_id} 心跳超时")
                    conn.state = ConnState.DEAD
                    asyncio.create_task(self._recover(conn))
            except Exception as e:
                logger.error(f"{conn.conn_id} 消息循环异常: {e}")
                conn.state = ConnState.DEAD
                asyncio.create_task(self._recover(conn))
                break
                
    async def _handle_ticker(self, conn: WSConnection, data: dict):
        symbol = data.get("symbol")
        if not symbol:
            return
            
        ticker_data = {
            "last_price": str(data.get("last_price", "")),
            "timestamp": str(data.get("timestamp", "")),
        }
        if "volume_24h" in data:
            ticker_data["volume_24h"] = str(data["volume_24h"])
            
        try:
            async with self.redis.pipeline() as pipe:
                await pipe.hset(f"ticker:{symbol}", mapping=ticker_data)
                await pipe.expire(f"ticker:{symbol}", 5)
                await pipe.publish(f"ticker_update:{symbol}", symbol)
                await pipe.execute()
        except redis.RedisError as e:
            logger.error(f"Redis 写入失败 {symbol}: {e}")
            self._buffer_message(symbol, data)
            
    def _buffer_message(self, symbol: str, data: dict):
        if symbol not in self._write_buffer:
            self._write_buffer[symbol] = []
        self._write_buffer[symbol].append(data)
        if len(self._write_buffer[symbol]) > 100:
            self._write_buffer[symbol].pop(0)
            
    async def _recover(self, dead_conn: WSConnection):
        logger.info(f"开始恢复 {dead_conn.conn_id}")
        
        if self._hot_spare:
            hot = self._hot_spare.pop()
            symbols = await self._get_persisted_subscriptions()
            if symbols:
                await self._do_subscribe(hot, list(symbols))
            hot.state = ConnState.ACTIVE
            async with self._lock:
                self.connections.append(hot)
            logger.info(f"热备 {hot.conn_id} 已接管")
            
        await self._connect_with_backoff(dead_conn)
        if dead_conn.state == ConnState.ACTIVE:
            symbols = await self._get_persisted_subscriptions()
            if symbols:
                await self._do_subscribe(dead_conn, list(symbols))
            async with self._lock:
                self.connections.append(dead_conn)
            logger.info(f"{dead_conn.conn_id} 恢复成功")
            
    async def _get_persisted_subscriptions(self) -> Set[str]:
        if not self.redis:
            return set()
        return await self.redis.smembers("market_data_writer:subscriptions")
        
    async def _do_subscribe(self, conn: WSConnection, symbols: List[str]):
        await conn.ws.send(json.dumps({
            "cmd": "subscribe",
            "data": {"channel": "ticker", "symbols": symbols}
        }))
        conn.symbols = symbols
        if self.redis:
            await self.redis.sadd("market_data_writer:subscriptions", *symbols)
            
    async def subscribe(self, symbols: List[str]):
        async with self._lock:
            target = min(self.connections, key=lambda c: len(c.symbols))
            new_symbols = [s for s in symbols if s not in target.symbols]
            if not new_symbols:
                return
            if len(target.symbols) + len(new_symbols) > self.max_per_conn:
                if self._hot_spare:
                    target = self._hot_spare.pop()
                    target.state = ConnState.ACTIVE
                    self.connections.append(target)
            await self._do_subscribe(target, new_symbols)
            
    async def stop(self):
        self._running = False
        for conn in self.connections + self._hot_spare:
            if conn.ws:
                await conn.ws.close()
        if self.redis:
            await self.redis.close()
        logger.info("MarketDataWriter 已关闭")

使用方法

async def main():
    writer = MarketDataWriter(pool_size=3, max_per_conn=20)
    await writer.start()
    await writer.subscribe(["AAPL.US", "TSLA.US", "700.HK", "BTCUSDT"])
    await asyncio.Event().wait()

if __name__ == "__main__":
    asyncio.run(main())

消费者端读取示例

# 策略 A:轮询读取最新价
async def get_latest_price(symbol: str):
    r = await redis.from_url(REDIS_URL, decode_responses=True)
    data = await r.hgetall(f"ticker:{symbol}")
    return data.get("last_price")

# 策略 B:订阅实时更新
async def subscribe_updates(symbol: str):
    r = await redis.from_url(REDIS_URL, decode_responses=True)
    pubsub = r.pubsub()
    await pubsub.subscribe(f"ticker_update:{symbol}")
    async for msg in pubsub.listen():
        if msg["type"] == "message":
            price = await get_latest_price(symbol)
            print(f"{symbol} 更新: {price}")

五、踩坑记录与调优建议

5.1 五个生产环境暗坑

问题现象根因解决方案
Redis 断连数据丢失写入失败静默,策略读到旧价未处理 redis.RedisError✅ 必须捕获异常,写入本地缓冲文件;恢复后补推
行情乱序覆盖新价格被旧价格覆盖Hash 无条件写入,不检查 timestamp✅ 写入前比较 timestamp,只保留更新的数据
订阅状态不一致重连后漏订阅订阅状态仅存内存✅ 持久化到 Redis Set,重连时全量恢复
Pub/Sub 消息丢失策略没收到更新通知Pub/Sub 无持久化,订阅前消息丢失✅ 策略启动时先读 Hash 获取当前价,再订阅
Redis 内存膨胀行情 Hash 永不过期,内存持续增长未设置 TTL✅ 每条行情写入后 EXPIRE key 5
业务错误码静默连接正常但无数据服务端返回 3001/3002,客户端未处理✅ 在消息循环中拦截 code 字段并触发重连

5.2 性能调优参数速查

参数推荐值调优依据
单连接订阅上限20-30超过 30 后 P99 延迟陡升
连接池大小订阅数/20 + 1(热备)留一个热备应对突发故障
行情 Hash TTL5 秒平衡内存占用与脏读风险
Redis 连接池大小10-20匹配 asyncio 并发写入量
重连最大延迟60 秒超过则告警人工介入
本地缓冲上限100 条/标的避免内存泄漏

5.3 对比:有 Redis 共享 vs 无 Redis 各自订阅

指标各自订阅(3策略×50标的)Redis 共享总线
WebSocket 连接数3-9 个3-4 个(连接池统一管理)
断线恢复时间各自重连,最长 60 秒热备接管,<500ms
策略间数据一致性可能不一致完全一致(同源写入 Redis)
新增策略成本需新建连接,重新订阅零成本,直接读 Redis
运维复杂度高(多连接监控)低(只监控连接池和 Redis)

▍本章核心结论

- Redis 共享总线让策略与行情源彻底解耦——策略不关心连接数、不关心重连、不关心限频。

- 代价是 5-10ms 的写入延迟和 Redis 运维成本。对于绝大多数量化场景,这个代价完全值得。

六、结语

▍一句话记住本文

WebSocket 负责接入,Redis 负责分发,连接池负责高可用——三者组合让行情系统从“单兵作战”升级为“集团军协同”。

真正的生产级行情系统,不是“能收到数据”,而是“任何一个组件挂掉,策略都无感知”。本文的 MarketDataWriter 实现了:

  • 连接池热备:单连接故障 <500ms 切换
  • 订阅持久化:重启后自动恢复
  • Redis 共享总线:多策略零成本接入
  • 业务错误码拦截:限频/鉴权失败自动退避

在构建上述架构时,一个绕不开的工程问题是多市场异构数据源的统一接入。维护美股、港股、A 股、加密货币四个市场的独立 WebSocket 连接,意味着四套心跳逻辑、四套重连策略、四套消息解析器。工程上的实践是寻找一个能跨市场的统一网关。

本文测试环境选用了 TickDB,它通过单一 WebSocket 连接即可订阅多市场标的,心跳协议标准化为每 1 秒 ping。你也可以选择自行实现多连接聚合层,或使用其他支持跨市场的供应商——核心思路是降低连接管理的复杂度

扩展方向

本文代码已覆盖核心机制。你可以在此基础上扩展:

  • 数据持久化:将 Redis 中的行情定期落库到 ClickHouse 或 TimescaleDB,用于策略回测。
  • 多市场聚合:通过同一连接池订阅美股、港股、A 股、加密货币,统一写入 Redis 不同前缀。
  • 监控告警:暴露 Prometheus metrics,监控 Redis 写入延迟、连接池健康状态、本地缓冲堆积量。

▍零成本跑通代码:Claw Keys 免注册试用

如果你不想立即注册 API Key,TickDB 提供了面向开发者的 Claw Keys 试用机制——默认开放 72 个热门标的(覆盖 AAPL、TSLA、700.HK、BTCUSDT、000001.SZ 等),无需配置即可测试。

在 ChatGPT、Claude 或 Cursor 中输入以下提示词,让 AI 直接帮你跑通 WebSocket → Redis 链路:

读取 TickDB 官方 GitHub 仓库中的 SKILL.md 协议文件(路径:tickdb-unified-realtime-marketdata-api/SKILL/SKILL.md),利用其中的试用 Key 获取 AAPL.US 实时行情。

验证通过后,可搜索“TickDB”免费获取解锁全部品种的正式 API Key。


本文不构成任何投资建议。市场有风险,投资需谨慎。

通过 TickDB API 获取实时行情数据

一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。

免费领取 API Key查看 API 文档

相关文章