实时行情数据监控:接口返回正常,告警为什么没响?
作者: TickDB Research · 发布: 2026/7/3 · 阅读: 7
标签: RANK-2026-07-03-01, 知乎A007
摘要
行情告警没响,你先查的是阈值,还是数据链路?很多告警失效的根本原因不在价格判断——定时任务静默停跑、接口返回空data、WebSocket假在线、缓存价格没有stale标记、异常发生但日志里什么都没留下。本文给出一套从数据链路健康检查出发的5层排查方案:先问行情数据是否还活着、是否新鲜、异常是否可追溯,最后再问价格有没有触发告警。附带五层速查表、最小监控字段表和常见问题,适合直接复制到团队排查文档。
一、这篇文章解决什么问题,不解决什么
适合谁看
- 实时行情API接入者:你写了取数脚本,但不确定拿到的是有效数据还是空壳。
- 行情面板开发者:你维护着价格展示面板,但不确定展示的是实时价还是缓存旧值。
- AI行情分析工具维护者:你的Agent依赖实时行情数据,但告警静默时你不知道是模型问题还是数据问题。
- 技术管理者:团队里每个人都在盯自己那块,但没有人确认数据链路本身是否健康。
解决什么
告诉你告警没响时,应该按什么顺序排查。不是调阈值,而是从定时任务心跳开始,逐层往下查。读完你会知道每一层该查什么、怎么查、用什么字段记录。
不解决什么
不教你设什么价格阈值,不讨论策略有效性,不比较数据源优劣。只解决一件事:告警静默时,怎么判断是行情没触发,还是系统已经失明。
方法论:五层排查,从链路到价格
价格告警是最后一层,数据生命体征才是第一层。五层排查按从底层到上层的顺序:先查任务是否执行(L1),再查接口是否返回有效数据(L2),再查数据是否新鲜(L3),再查缓存是否污染了展示(L4),最后查异常是否可追溯(L5)。五层全部通过,才轮到你怀疑价格阈值。
二、为什么排查顺序不能反过来
如果你先查价格阈值,调了半天发现阈值没问题——回头一查,是定时任务三天前就停了。如果你先查接口返回,发现HTTP 200,以为没问题——其实WebSocket已经假在线30分钟了,数据早就stale了。
告警失效的根因,越靠近底层越容易被忽略,也越致命。 L1任务停跑会让后面一切逻辑空转。L2空data会让后续所有判断基于缺失数据。L5异常不留痕会让你永远不知道前四层出过什么问题。
五层必须从L1开始,逐层往下。每一层通过,才进入下一层。
| 层次 | 检查对象 | 核心问题 | 如果这层坏了 |
|---|---|---|---|
| L1 定时任务心跳 | 取数任务是否按时执行 | 告警逻辑跑了吗? | 任务停了,后续一切免谈 |
| L2 接口返回完整性 | HTTP 200后payload是否有效 | 数据拿到了吗? | 拿到空数据,告警在空跑 |
| L3 数据新鲜度 | 数据是新的还是旧的 | 拿到的是此刻行情吗? | 盯着30分钟前的价格 |
| L4 缓存可信度 | 展示的价格有没有stale标记 | 你看到的是真数据吗? | 看着旧价格,不知道它过期 |
| L5 异常可追溯 | 失败时有没有留下现场 | 出事能查吗? | 告警静默,日志空白 |
三、L1:定时任务心跳——告警逻辑跑了吗?
假象:监控面板还在,告警规则还配着,一切看起来正常。
真问题:取数任务可能已经悄无声息地停了。不是报错崩溃——报错至少会在日志里留下痕迹。更常见的是任务超时被调度器跳过、依赖的配置文件被误改导致import失败、或者进程还在但取数线程已经卡死。
该查什么:取数任务的最近一次执行时间是否在预期间隔内。
from datetime import datetime, timezone
from typing import Dict
def check_heartbeat(job_state: Dict) -> Dict:
"""
检查定时任务心跳。
job_state 由调度器维护,包含 last_run_time 和 expected_interval_sec。
"""
last_run = job_state.get("last_run_time")
expected_interval = job_state.get("expected_interval_sec", 300)
if last_run is None:
return {"heartbeat_ok": False,
"reason": "no last_run_time recorded",
"action": "check if scheduler is running"}
now = datetime.now(timezone.utc)
elapsed = (now - last_run).total_seconds()
if elapsed > expected_interval * 2:
return {"heartbeat_ok": False,
"reason": f"last run {elapsed:.0f}s ago, expected every {expected_interval}s",
"action": "check scheduler logs and task status"}
return {"heartbeat_ok": True,
"last_run": last_run.isoformat(),
"elapsed_sec": elapsed}
四、L2:接口返回200,为什么行情数据仍可能失效?
假象:HTTP 200,请求成功,数据拿到了。
真问题:接口返回200,只说明请求成功,不说明数据还新鲜。 返回的data数组可能只有部分symbol,last_price可能是空字符串或null,服务端可能返回了业务错误码但你的代码只检查了HTTP状态码。你请求了5个symbol,返回了3条,另外2条被静默丢弃——代码继续往下跑,告警在空数据上做判断。
该查什么:data数组长度是否等于请求的symbol数量,每个核心字段是否可解析。
from decimal import Decimal, InvalidOperation
from typing import List, Dict
def check_payload(response: Dict, expected_symbols: List[str]) -> Dict:
"""
检查接口返回完整性。
返回检查结果和失败详情。
"""
data = response.get("data", [])
if not isinstance(data, list) or len(data) == 0:
return {"payload_ok": False,
"reason": "data is empty or not an array",
"action": "check symbol format, permissions, and market hours"}
returned_symbols = {item.get("symbol") for item in data if isinstance(item, dict)}
missing = set(expected_symbols) - returned_symbols
field_issues = []
for item in data:
sym = item.get("symbol", "?")
raw_price = item.get("last_price")
if raw_price is None:
field_issues.append(f"{sym}: last_price missing")
continue
try:
d = Decimal(str(raw_price))
if not d.is_finite():
field_issues.append(f"{sym}: last_price not finite")
except (InvalidOperation, ValueError):
field_issues.append(f"{sym}: last_price unparseable")
payload_ok = len(missing) == 0 and len(field_issues) == 0
return {"payload_ok": payload_ok,
"missing_symbols": list(missing),
"field_issues": field_issues,
"action": "check business code, symbol format, and field contract" if not payload_ok else None}
五、L3:WebSocket连接还在,为什么数据可能已经过期?
假象:WebSocket连接状态显示“已连接”,心跳正常。
真问题:服务端可能在某个时间点之后就不再推送新数据了。连接watch dog只管通道,不管数据内容是否新鲜。你看到的那个“最新价”,实际上是最后一次推送的价格——可能是30分钟前的。这一步不解决价格是否合理,只解决数据是否还新鲜。
该查什么:最后一条数据的payload_timestamp和当前时间的差值,同时考虑market_state。
from datetime import datetime, timezone
from typing import Optional
def check_staleness(payload_timestamp: Optional[int],
now: Optional[datetime] = None,
market_state: str = "unknown",
max_age_sec: int = 300) -> Dict:
"""
检查行情数据是否过期。
市场状态为closed时按市场规则调整新鲜度判断标准。
"""
if now is None:
now = datetime.now(timezone.utc)
if payload_timestamp is None:
return {"is_stale": True,
"stale_reason": "payload_timestamp is missing",
"action": "check timestamp field contract and server clock"}
ts_sec = payload_timestamp / 1000.0
age_sec = now.timestamp() - ts_sec
effective_max_age = max_age_sec
if market_state in ("closed", "after_hours", "pre_market"):
effective_max_age = max_age_sec * 3
is_stale = age_sec > effective_max_age
return {"is_stale": is_stale,
"age_sec": age_sec,
"stale_reason": f"data age {age_sec:.0f}s exceeds max {effective_max_age}s" if is_stale else None,
"market_state": market_state,
"action": "trigger rest query to verify current price" if is_stale else None}
实测:通过 TickDB 查询 AAPL.US、TSLA.US、NVDA.US 的 ticker 快照,并核对 data、symbol、last_price、timestamp、交易时段和最近成交字段。
六、L4:页面上挂着缓存价格,为什么没有stale标记?
假象:面板上价格还在跳动,颜色还在变化。
真问题:那是上一次成功请求的快照。真正的数据已经断流很久了,但缓存值旁边没有stale标记——用户看到的和真实行情已经完全脱节。
该查什么:缓存回填时有没有触发用户可见的状态变化,面板上有没有数据更新时间展示。
from datetime import datetime, timezone
def mark_stale(symbol: str, reason: str) -> Dict:
"""
标记某个symbol的当前展示数据为stale。
由业务侧监控逻辑调用,记录stale原因和时间。
此标记写入业务侧自定义字段,不依赖行情数据源原生字段。
"""
return {"symbol": symbol,
"is_stale": True,
"stale_reason": reason,
"marked_at": datetime.now(timezone.utc).isoformat(),
"action": "flag this symbol on dashboard and trigger rest query to refresh"}
七、L5:异常发生但日志里什么都没有
假象:出问题的时候日志里会有报错信息。
真问题:那次异常的请求参数没记录,原始返回体没保存,失败原因只写了一句“Error”——没有任何上下文。你不知道当时请求了什么symbol,服务端返回了什么,是超时还是业务码异常。告警没响不可怕,可怕的是你不知道它为什么没响。
该查什么:最近一次异常是否有完整的请求参数、原始返回体和可读的失败原因记录。
from datetime import datetime, timezone
import hashlib
import json
from typing import Any
def write_alert_event(symbol: str,
source: str,
request_time: str,
payload_timestamp: Any,
local_receive_time: str,
market_state: str,
is_stale: bool,
stale_reason: str,
raw_response: Dict,
failure_reason: str = None) -> Dict:
"""
记录一次告警相关事件。
不管告警是否触发,只要数据链路出现异常,都写入此事件供事后复盘。
字段名(如market_state、is_stale)为业务侧自定义监控字段。
"""
raw_snapshot_hash = hashlib.sha256(
json.dumps(raw_response, sort_keys=True, ensure_ascii=False, default=str).encode()
).hexdigest()[:16]
event = {"symbol": symbol,
"source": source,
"request_time": request_time,
"last_success_time": local_receive_time if not failure_reason else None,
"payload_timestamp": payload_timestamp,
"local_receive_time": local_receive_time,
"market_state": market_state,
"is_stale": is_stale,
"stale_reason": stale_reason,
"raw_snapshot_hash": raw_snapshot_hash,
"failure_reason": failure_reason,
"logged_at": datetime.now(timezone.utc).isoformat()}
return event
八、五层排查速查表
| 失效层 | 典型假象 | 要查的字段/日志(业务侧建议记录) | 处理动作 |
|---|---|---|---|
| L1 任务心跳 | 面板还在,告警规则配着 | last_run_time、expected_interval_sec | 检查调度器日志和任务状态 |
| L2 接口返回 | HTTP 200,请求成功 | data长度、symbol集合、last_price可解析性 | 检查业务码、symbol格式、字段契约 |
| L3 数据新鲜度 | WebSocket连接正常 | payload_timestamp、local_receive_time、market_state | 计算age,超过阈值用REST重新查询 |
| L4 缓存可信度 | 价格还在跳 | is_stale、stale_reason、last_success_time | 打stale标记,触发刷新 |
| L5 异常可追溯 | 日志里应该有报错 | request_params、raw_snapshot、failure_reason | 补全write_alert_event记录逻辑 |
九、最小监控字段表
以下字段为业务侧建议记录字段,用于五层排查。不是任何行情数据源的原生字段。
| 字段名 | 含义 | 为什么监控需要它 |
|---|---|---|
symbol | 监控标的代码 | 定位问题品种 |
source | 数据源标识 | 多源对比时知道数据来自哪里 |
request_time | 发起请求的时间 | 判断任务是否按时执行 |
last_success_time | 最近一次获取有效数据的时间 | 判断数据断流起点 |
payload_timestamp | 行情数据自带的时间戳 | 判断数据新鲜度 |
local_receive_time | 本地收到响应的时间 | 判断网络延迟 |
market_state | 交易状态 | 非交易时段调整新鲜度标准 |
is_stale | 数据是否过期 | 控制缓存可信度 |
stale_reason | 过期原因 | 辅助排查 |
raw_snapshot_hash | 原始响应哈希 | 事后校验数据完整性 |
failure_reason | 失败原因 | 让异常可读、可追溯 |
十、告警失效排查顺序
按以下顺序排查,不要跳步。可以直接复制到团队文档。
- 先查任务是否执行:取数任务的
last_run_time是否在预期间隔内。 - 再查接口是否返回有效payload:
data非空,symbol集合一致,核心字段可解析。 - 再查timestamp是否过期:
payload_timestamp和当前时间的差值是否在新鲜度阈值内。 - 再查缓存是否污染:页面展示的价格有没有
is_stale标记,缓存回填有没有触发提示。 - 最后查价格口径和阈值:确认数据链路健康后,再检查告警规则的阈值和逻辑。
如果团队里每个人都只盯价格阈值,真正的数据链路问题很容易没人负责。
十一、TickDB 的工程边界
上面这套5层排查方案是通用的,不绑定任何特定数据源。如果你用 TickDB 做行情接入,它在监控链路中的角色:
TickDB 是统一实时行情数据API,支持 REST、WebSocket、MCP 等多种接入方式。
- REST:适合快照查询、字段核对和定时探测。L2的check_payload和L3的check_staleness可以直接消费返回的symbol和timestamp字段。
- WebSocket:适合持续行情推送和新鲜度观察。L3的新鲜度检查可以通过观察WebSocket最后一条推送的时间戳来实现。
- MCP:适合AI工具按需查询真实行情,不替代生产监控。
TickDB可以帮助围绕symbol、timestamp、结构化字段和原始响应建立自己的监控检查流程,但不内置生产级告警能力。 本文中所有业务侧监控字段(如market_state、is_stale、stale_reason、failure_reason)均为自定义设计,不是TickDB的原生字段。具体端点、鉴权方式和字段路径以官方文档和实测为准。
十二、常见问题
Q1:行情告警没响,第一步查什么?
先查取数任务的定时心跳——任务是否还在按时执行。如果任务已经静默停跑,后续的接口、数据新鲜度、缓存和价格判断全部失效。确认任务正常后,再逐层排查接口返回、数据新鲜度和缓存可信度。
Q2:接口返回200是否代表行情数据正常?
不代表。HTTP 200只说明请求成功到达服务端并收到了响应。返回的data数组可能为空,symbol可能缺失或被自动修正,last_price可能是无法解析的空字符串。必须检查业务码、data长度、symbol集合和核心字段的可解析性。
Q3:WebSocket连接还在但行情不更新,怎么判断?
不要只看连接状态。检查最近一条数据的payload_timestamp和当前时间的差值。如果差值超过预期阈值,即使连接状态显示正常,数据也已经是stale data。WebSocket watch dog只管传输通道,不管数据内容是否新鲜。
Q4:怎么判断实时行情数据是不是stale data?
用check_staleness函数比较payload_timestamp和当前时间的差值。注意根据market_state调整新鲜度阈值——非交易时段的阈值应放宽。如果payload_timestamp缺失或类型异常,直接标记为stale并触发REST重新查询。
Q5:TickDB在行情监控里适合放在哪一层?
TickDB作为统一实时行情数据API,适合作为L2(接口返回完整性)和L3(数据新鲜度)的行情数据入口。REST适合定时探测和字段核对,WebSocket适合持续推送和新鲜度观察。但TickDB不内置生产级告警能力——L1的任务调度、L4的缓存管理和L5的异常留痕需要你自己实现。
十三、边界声明
- 本文所有业务侧监控字段(market_state、is_stale、stale_reason、failure_reason等)均为自定义设计,不是任何行情数据源的原生字段。
- TickDB不内置生产级告警能力,不保证不会断流、不会stale、不会丢数据。
- 本文不涉及投资建议、收益判断、策略有效性。
- 不写延迟、SLA、覆盖数量、价格、排名。
- 所有端点、字段路径和timestamp口径以TickDB官方文档和实测为准。
你们的行情告警,现在监控的是价格,还是也监控last_message_time和payload_timestamp?有没有把缓存值标记成stale?
📡 本文以 TickDB 作为行情接入示例。文中代码为 Python 教学骨架,所有业务侧监控字段均为自定义设计。本文仅讨论监控告警的工程排查方法,不构成投资建议。
标签: Python, 行情监控, 告警排查, WebSocket, 数据链路, TickDB
通过 TickDB API 获取实时行情数据
一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。
免费领取 API Key查看 API 文档