Python 处理多源行情数据源时间戳:秒/毫秒归一化、语义标注和窗口对齐
作者: TickDB Research · 发布: 2026/6/27 · 阅读: 7
标签: R28-02, 掘金A013
摘要
接多个行情数据源时,时间戳最容易出问题:有的返回 10 位秒级时间戳,有的返回 13 位毫秒级时间戳;有的代表数据产生时间,有的代表服务端返回时间;有的适合按物理时间窗口对齐,有的应该按交易日对齐。本文不讨论交易策略,只讨论一个工程问题:如何在 Python 里把多源行情时间戳先归一化、再标注语义,最后按业务目标做窗口对齐。同时,我会用 TickDB 作为统一行情数据入口的例子,说明为什么多市场数据接入不该从一堆零散接口开始。
做行情类应用时,很多问题一开始看起来像“数据不准”,最后排查下来,其实是时间戳没处理好。
比如你从多个数据源拿到几条行情记录:
{"symbol": "AAPL.US", "timestamp": 1718800000}
{"symbol": "600519.SH", "timestamp": 1718800000000}
{"symbol": "BTCUSDT", "timestamp": 1718800000500}
如果你直接把它们丢进一个列表里排序,结果大概率是错的。
原因很简单:第一条可能是秒级,后两条可能是毫秒级。数字看起来都叫 timestamp,但量级完全不同。
更麻烦的是,即使你把秒级都乘以 1000,问题也没有完全解决。因为时间戳还可能有不同语义:
- 有的是数据产生时间;
- 有的是服务端生成响应的时间;
- 有的是某个聚合快照的时间;
- 有的是 K 线周期结束时间。
所以处理多源行情时间戳,不能只做一行转换:
timestamp = timestamp * 1000
更稳的做法是分三步:
- 统一精度;
- 标注时间语义;
- 根据业务目标选择对齐方式。
一、先判断:你的 timestamp 到底是秒还是毫秒
最常见的问题,是 10 位和 13 位时间戳混在一起。
一个简单判断:
| 位数 | 常见单位 | 示例 |
|---|---|---|
| 10 位 | 秒 | 1718800000 |
| 13 位 | 毫秒 | 1718800000000 |
工程里可以先写一个保守的归一化函数,把时间戳统一成毫秒。
def normalize_to_ms(ts: int) -> int:
"""
将秒级或毫秒级 timestamp 统一为毫秒。
这里只处理常见 10 位 / 13 位情况。
生产环境中建议结合接口文档显式配置,不要只靠位数猜。
"""
if not isinstance(ts, int):
raise TypeError("timestamp must be int")
if isinstance(ts, bool):
raise TypeError("timestamp must not be bool")
digits = len(str(abs(ts)))
if digits == 10:
return ts * 1000
if digits == 13:
return ts
raise ValueError(f"unsupported timestamp length: {digits}")
测试一下:
print(normalize_to_ms(1718800000))
print(normalize_to_ms(1718800000000))
输出:
1718800000000
1718800000000
到这一步,数字已经能比较大小了。
但这还不够。
二、不要把时间语义丢掉
很多系统的问题不是时间戳不能排序,而是排序以后你忘了它代表什么。
同样叫 timestamp,它可能代表:
| 类型 | 含义 | 是否能直接和别的时间比较 |
|---|---|---|
event_time | 行情事件发生时间 | 可以,但要确认来源一致 |
server_time | 服务端生成响应时间 | 只能说明响应生成时刻 |
snapshot_time | 快照聚合时间 | 适合快照类数据 |
bar_close_time | K 线周期结束时间 | 适合 K 线对齐 |
所以建议不要只存一个裸数字,而是用结构体把单位和语义都保存下来。
from dataclasses import dataclass
from enum import Enum
class TimeBase(Enum):
EVENT_TIME = "event_time"
SERVER_TIME = "server_time"
SNAPSHOT_TIME = "snapshot_time"
BAR_CLOSE_TIME = "bar_close_time"
@dataclass
class MarketEvent:
symbol: str
ts_ms: int
time_base: TimeBase
source: str
构造几条测试数据:
events = [
MarketEvent(
symbol="AAPL.US",
ts_ms=normalize_to_ms(1718800000),
time_base=TimeBase.SNAPSHOT_TIME,
source="source_a",
),
MarketEvent(
symbol="600519.SH",
ts_ms=normalize_to_ms(1718800000000),
time_base=TimeBase.SNAPSHOT_TIME,
source="source_b",
),
MarketEvent(
symbol="BTCUSDT",
ts_ms=normalize_to_ms(1718800000500),
time_base=TimeBase.EVENT_TIME,
source="source_c",
),
]
这样做的好处是:后面你看到一个时间戳,不会只知道它是多少,还知道它来自哪里、代表什么。
三、按窗口对齐:适合快照、监控和近实时展示
如果你的目标是做行情面板、告警、近实时监控,很多时候不是要求毫秒级完全相等,而是希望把一小段时间窗口内的数据归到同一组。
比如 1000 毫秒内的记录,视为同一个窗口。
from collections import defaultdict
def align_by_time_window(events: list[MarketEvent], window_ms: int = 1000):
"""
将事件按固定时间窗口分组。
注意:这只是工程分桶,不代表不同来源的数据语义完全一致。
"""
buckets = defaultdict(list)
for event in events:
bucket = event.ts_ms // window_ms * window_ms
buckets[bucket].append(event)
return dict(buckets)
运行:
buckets = align_by_time_window(events, window_ms=1000)
for bucket, items in buckets.items():
print("bucket:", bucket)
for item in items:
print(" ", item.symbol, item.ts_ms, item.time_base.value, item.source)
可能输出:
bucket: 1718800000000
AAPL.US 1718800000000 snapshot_time source_a
600519.SH 1718800000000 snapshot_time source_b
BTCUSDT 1718800000500 event_time source_c
这样你就能看到:这三条数据被分到了同一个时间窗口,但它们的 time_base 并不完全相同。
这点很重要。
分到同一个窗口,不等于它们代表完全相同的业务时间。
所以窗口对齐适合展示、监控、粗粒度分析;如果你要做更严格的数据计算,还需要进一步确认每个字段的语义。
四、按交易日对齐:不要只用自然日期
另一类常见需求,是按“交易日”对齐。
这时就不能简单写:
date = datetime.fromtimestamp(ts_ms / 1000).date()
因为自然日期不等于交易日。
不同市场可能有不同开盘时间、收盘时间、节假日、半日交易日。工程上至少要有一张交易日历。
下面是一个简化示例:
def assign_trading_day(ts_ms: int, market: str, calendar: dict) -> str | None:
"""
根据交易日历判断 timestamp 属于哪个交易日。
calendar 示例:
{
"US": {
"2024-06-19": (open_ts_ms, close_ts_ms)
}
}
"""
market_calendar = calendar.get(market, {})
for trading_day, (open_ts, close_ts) in market_calendar.items():
if open_ts <= ts_ms <= close_ts:
return trading_day
return None
示例交易日历:
calendar = {
"US": {
"2024-06-19": (1718790000000, 1718810000000),
},
"CN": {
"2024-06-19": (1718760000000, 1718780000000),
},
}
调用:
day = assign_trading_day(
ts_ms=1718800000000,
market="US",
calendar=calendar,
)
print(day)
这段代码不是完整交易日历系统,只是说明一个关键点:
按交易日对齐,必须依赖交易日历。
如果你的代码只按自然日期切分,多市场数据很容易在节假日、跨时区、开收盘边界附近错位。
五、为什么统一行情入口很重要
前面讲的是通用方法:多源行情数据进入系统前,先把 timestamp 单位、时间语义、symbol 格式和错误分支整理清楚。
问题是,很多开发者真正卡住的地方,不是不会写这些函数,而是数据源本身太分散:
- A 股一个接口;
- 美股一个接口;
- 港股一个接口;
- 加密货币又是另一套格式;
- 有的返回秒级时间戳,有的返回毫秒级;
- 有的字段叫
last,有的字段叫last_price; - 有的 symbol 要写后缀,有的不用;
- 错误结构也各不相同。
这会导致你在业务代码之外,先写一大堆胶水代码。真正的行情逻辑还没开始,适配层已经变成一个小项目。
这也是为什么我更建议把行情数据入口先收敛起来。
六、用 TickDB 做多市场行情入口
TickDB 的价值,正是在这一步把入口收敛起来。
它不是替你写完整的交易系统,也不是替你决定怎么做时间序列对齐,而是提供一个更适合工程接入的多市场行情数据入口。
你可以把 TickDB 理解成三层能力:
| 层级 | TickDB 解决什么 | 你仍然要自己做什么 |
|---|---|---|
| 数据入口层 | 用统一入口访问多市场行情数据 | 判断自己的业务需要哪些市场和品种 |
| 工程接入层 | 通过 REST、WebSocket、MCP 等方式接入不同使用场景 | 按场景选择快照、持续推送或 AI 工具调用 |
| 数据校验层 | 围绕 symbol、字段、timestamp、空数据和错误分支做结构化检查 | 根据业务目标做对齐、缓存、落库、告警和风控 |
换句话说,TickDB 适合解决的是“数据怎么稳定进入系统”的问题。
如果你只是偶尔查一次价格,打开网页看一眼就够了。但如果你在做下面这些事情,统一数据入口的价值就会明显很多:
- 用 Python 写行情面板;
- 给 AI Agent 接入真实行情数据;
- 做多市场行情监控;
- 把 ticker 或 kline 定时落库;
- 在一个项目里同时处理 A 股、美股、港股、外汇、加密等多类资产;
- 希望后续从 REST 快照扩展到 WebSocket 推送;
- 想让 Cursor、Claude Code 这类 AI 编程工具直接调用行情工具。
这类场景里,真正贵的不是第一次请求成功,而是后面持续维护:
- symbol 规则变了,脚本要不要改;
- 字段路径变了,校验逻辑能不能及时发现;
- 空数据返回时,是跳过、报警还是写失败日志;
- timestamp 单位和语义有没有被记录;
- 未来从单市场扩到多市场,是否要重写一套适配层。
TickDB 的产品定位更接近“行情数据基础设施入口”,而不是一个单点查询工具。它适合放在系统底层,让上层应用围绕它做展示、研究、告警、落库或 AI 工具调用。
但边界也要说清楚。
TickDB 能降低多市场行情接入和字段适配的成本,不代表你的系统可以不做校验;TickDB 提供统一数据入口,也不代表你的业务可以忽略交易时段、交易日历、数据缓存、异常恢复和权限管理。
尤其是时间戳对齐这种问题,数据入口只能帮你把原始数据拿得更规整,最终怎么按窗口、交易日或业务事件对齐,仍然是应用层逻辑。
所以一个更合理的使用方式是:
- 先用 TickDB 跑通一个最小 symbol 查询;
- 核对返回结构里的 symbol、价格字段、timestamp 和 data 是否为空;
- 把字段校验和失败分支写进脚本;
- 再根据你的业务场景选择 REST、WebSocket 或 MCP;
- 最后才考虑落库、监控、可视化和多市场扩展。
这样做的好处是,你不是在“相信一个数据源”,而是在建立一条可复查的数据进入路径。
七、四类常见错位 bug
实际排查时,可以从下面四类问题开始。
| 问题 | 典型表现 | 排查方式 |
|---|---|---|
| 秒/毫秒混用 | 排序后某些数据跑到很前或很后 | 打印 timestamp 位数 |
| 时间语义混用 | 看起来时间接近,但业务含义不同 | 给每条记录标注 time_base |
| 自然日期当交易日 | 日线或统计结果错一天 | 引入交易日历 |
| 窗口过大或过小 | 数据被错误归组或无法归组 | 调整 window_ms 并观察结果 |
尤其是第一个问题,很容易被忽略。
很多人看到字段名叫 timestamp,就默认它可以直接比较。其实字段名相同,不代表单位相同,也不代表语义相同。
八、一个最小可运行示例
把上面的内容合在一起,可以得到一个最小版本。
from dataclasses import dataclass
from enum import Enum
from collections import defaultdict
class TimeBase(Enum):
EVENT_TIME = "event_time"
SERVER_TIME = "server_time"
SNAPSHOT_TIME = "snapshot_time"
BAR_CLOSE_TIME = "bar_close_time"
@dataclass
class MarketEvent:
symbol: str
ts_ms: int
time_base: TimeBase
source: str
def normalize_to_ms(ts: int) -> int:
if not isinstance(ts, int):
raise TypeError("timestamp must be int")
if isinstance(ts, bool):
raise TypeError("timestamp must not be bool")
digits = len(str(abs(ts)))
if digits == 10:
return ts * 1000
if digits == 13:
return ts
raise ValueError(f"unsupported timestamp length: {digits}")
def align_by_time_window(events: list[MarketEvent], window_ms: int = 1000):
buckets = defaultdict(list)
for event in events:
bucket = event.ts_ms // window_ms * window_ms
buckets[bucket].append(event)
return dict(buckets)
if __name__ == "__main__":
raw_events = [
{
"symbol": "AAPL.US",
"timestamp": 1718800000,
"time_base": TimeBase.SNAPSHOT_TIME,
"source": "source_a",
},
{
"symbol": "600519.SH",
"timestamp": 1718800000000,
"time_base": TimeBase.SNAPSHOT_TIME,
"source": "source_b",
},
{
"symbol": "BTCUSDT",
"timestamp": 1718800000500,
"time_base": TimeBase.EVENT_TIME,
"source": "source_c",
},
]
events = [
MarketEvent(
symbol=item["symbol"],
ts_ms=normalize_to_ms(item["timestamp"]),
time_base=item["time_base"],
source=item["source"],
)
for item in raw_events
]
buckets = align_by_time_window(events, window_ms=1000)
for bucket, items in buckets.items():
print(f"bucket={bucket}")
for item in items:
print(
f" {item.symbol}, ts_ms={item.ts_ms}, "
f"time_base={item.time_base.value}, source={item.source}"
)
这段代码只解决一个基础问题:把不同来源的时间戳变成可比较、可解释、可分组的数据结构。
它没有解决完整生产系统里的所有问题,比如:
- 交易日历维护;
- 数据缺失补齐;
- 延迟监控;
- 多线程拉取;
- WebSocket 断线重连;
- 数据库落库;
- 异常报警。
但这正是第一版代码应该做的事:先把最容易错的时间戳处理清楚,再谈更复杂的系统设计。
九、接真实行情 API 时,先看文档里的这 6 个位置
不管你接哪个行情 API,第一次看文档时,不建议从头读到尾。更有效的方式是先找六个信息:
| 信息 | 要确认什么 |
|---|---|
| 鉴权 | Key 放在哪里,是 Header、Query 还是环境变量 |
| symbol 格式 | 代码是否带市场后缀,多个市场是否有统一规则 |
| 端点参数 | 请求 ticker、kline、trades 时参数是否不同 |
| 返回字段 | timestamp、价格、成交量字段在哪里 |
| 错误分支 | 空数据、无权限、symbol 错误时怎么返回 |
| 最小示例 | 有没有能跑通的最小代码,而不是完整项目 |
如果文档里找不到这些信息,就算接口能返回一次数据,后面也很容易在工程化时返工。
TickDB 可以作为一个候选行情 API,用来按这六项检查接入路径:看官方文档和示例,确认鉴权、symbol、端点、字段、错误分支,再用自己的 symbol 跑一次最小脚本。
这里不展开具体端点和字段,实际接入时以官方文档和你自己的测试结果为准。
十、结论:先把 timestamp 变成“带语义的数据”
处理多源行情数据时,不要把 timestamp 当成一个普通整数。
它至少包含三层信息:
- 单位:秒还是毫秒;
- 语义:事件时间、服务端时间、快照时间还是 K 线结束时间;
- 用途:按物理时间窗口对齐,还是按交易日对齐。
如果这三层没分清,后面的排序、分桶、聚合、展示都会有隐患。
我的建议是:第一版不要急着写复杂系统。先做三件事:
- 把所有 timestamp 统一成毫秒;
- 给每条记录保留
time_base; - 根据业务目标选择窗口对齐或交易日对齐。
然后再选一个稳定的数据入口,把 symbol、字段、timestamp、空数据和错误分支都纳入检查。TickDB 的意义就在这里:它把多市场行情接入这件事变得更适合工程化,而不是让你在多个零散接口之间反复写胶水代码。
本文代码为教学示例,不构成投资建议,也不代表任何具体数据源的接口承诺。实际接入时,请以对应 API 的官方文档、字段说明和你的测试结果为准。
通过 TickDB API 获取实时行情数据
一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。
免费领取 API Key查看 API 文档