综合

Python 处理多源行情数据源时间戳:秒/毫秒归一化、语义标注和窗口对齐

作者: TickDB Research · 发布: 2026/6/27 · 阅读: 7

标签: R28-02, 掘金A013

摘要

接多个行情数据源时,时间戳最容易出问题:有的返回 10 位秒级时间戳,有的返回 13 位毫秒级时间戳;有的代表数据产生时间,有的代表服务端返回时间;有的适合按物理时间窗口对齐,有的应该按交易日对齐。本文不讨论交易策略,只讨论一个工程问题:如何在 Python 里把多源行情时间戳先归一化、再标注语义,最后按业务目标做窗口对齐。同时,我会用 TickDB 作为统一行情数据入口的例子,说明为什么多市场数据接入不该从一堆零散接口开始。


做行情类应用时,很多问题一开始看起来像“数据不准”,最后排查下来,其实是时间戳没处理好。

比如你从多个数据源拿到几条行情记录:

{"symbol": "AAPL.US", "timestamp": 1718800000}
{"symbol": "600519.SH", "timestamp": 1718800000000}
{"symbol": "BTCUSDT", "timestamp": 1718800000500}

如果你直接把它们丢进一个列表里排序,结果大概率是错的。

原因很简单:第一条可能是秒级,后两条可能是毫秒级。数字看起来都叫 timestamp,但量级完全不同。

更麻烦的是,即使你把秒级都乘以 1000,问题也没有完全解决。因为时间戳还可能有不同语义:

  • 有的是数据产生时间;
  • 有的是服务端生成响应的时间;
  • 有的是某个聚合快照的时间;
  • 有的是 K 线周期结束时间。

所以处理多源行情时间戳,不能只做一行转换:

timestamp = timestamp * 1000

更稳的做法是分三步:

  1. 统一精度;
  2. 标注时间语义;
  3. 根据业务目标选择对齐方式。

一、先判断:你的 timestamp 到底是秒还是毫秒

最常见的问题,是 10 位和 13 位时间戳混在一起。

一个简单判断:

位数常见单位示例
10 位1718800000
13 位毫秒1718800000000

工程里可以先写一个保守的归一化函数,把时间戳统一成毫秒。

def normalize_to_ms(ts: int) -> int:
    """
    将秒级或毫秒级 timestamp 统一为毫秒。
    这里只处理常见 10 位 / 13 位情况。
    生产环境中建议结合接口文档显式配置,不要只靠位数猜。
    """
    if not isinstance(ts, int):
        raise TypeError("timestamp must be int")

    if isinstance(ts, bool):
        raise TypeError("timestamp must not be bool")

    digits = len(str(abs(ts)))

    if digits == 10:
        return ts * 1000

    if digits == 13:
        return ts

    raise ValueError(f"unsupported timestamp length: {digits}")

测试一下:

print(normalize_to_ms(1718800000))
print(normalize_to_ms(1718800000000))

输出:

1718800000000
1718800000000

到这一步,数字已经能比较大小了。

但这还不够。


二、不要把时间语义丢掉

很多系统的问题不是时间戳不能排序,而是排序以后你忘了它代表什么。

同样叫 timestamp,它可能代表:

类型含义是否能直接和别的时间比较
event_time行情事件发生时间可以,但要确认来源一致
server_time服务端生成响应时间只能说明响应生成时刻
snapshot_time快照聚合时间适合快照类数据
bar_close_timeK 线周期结束时间适合 K 线对齐

所以建议不要只存一个裸数字,而是用结构体把单位和语义都保存下来。

from dataclasses import dataclass
from enum import Enum


class TimeBase(Enum):
    EVENT_TIME = "event_time"
    SERVER_TIME = "server_time"
    SNAPSHOT_TIME = "snapshot_time"
    BAR_CLOSE_TIME = "bar_close_time"


@dataclass
class MarketEvent:
    symbol: str
    ts_ms: int
    time_base: TimeBase
    source: str

构造几条测试数据:

events = [
    MarketEvent(
        symbol="AAPL.US",
        ts_ms=normalize_to_ms(1718800000),
        time_base=TimeBase.SNAPSHOT_TIME,
        source="source_a",
    ),
    MarketEvent(
        symbol="600519.SH",
        ts_ms=normalize_to_ms(1718800000000),
        time_base=TimeBase.SNAPSHOT_TIME,
        source="source_b",
    ),
    MarketEvent(
        symbol="BTCUSDT",
        ts_ms=normalize_to_ms(1718800000500),
        time_base=TimeBase.EVENT_TIME,
        source="source_c",
    ),
]

这样做的好处是:后面你看到一个时间戳,不会只知道它是多少,还知道它来自哪里、代表什么。


三、按窗口对齐:适合快照、监控和近实时展示

如果你的目标是做行情面板、告警、近实时监控,很多时候不是要求毫秒级完全相等,而是希望把一小段时间窗口内的数据归到同一组。

比如 1000 毫秒内的记录,视为同一个窗口。

from collections import defaultdict


def align_by_time_window(events: list[MarketEvent], window_ms: int = 1000):
    """
    将事件按固定时间窗口分组。
    注意:这只是工程分桶,不代表不同来源的数据语义完全一致。
    """
    buckets = defaultdict(list)

    for event in events:
        bucket = event.ts_ms // window_ms * window_ms
        buckets[bucket].append(event)

    return dict(buckets)

运行:

buckets = align_by_time_window(events, window_ms=1000)

for bucket, items in buckets.items():
    print("bucket:", bucket)
    for item in items:
        print(" ", item.symbol, item.ts_ms, item.time_base.value, item.source)

可能输出:

bucket: 1718800000000
  AAPL.US 1718800000000 snapshot_time source_a
  600519.SH 1718800000000 snapshot_time source_b
  BTCUSDT 1718800000500 event_time source_c

这样你就能看到:这三条数据被分到了同一个时间窗口,但它们的 time_base 并不完全相同。

这点很重要。

分到同一个窗口,不等于它们代表完全相同的业务时间。

所以窗口对齐适合展示、监控、粗粒度分析;如果你要做更严格的数据计算,还需要进一步确认每个字段的语义。


四、按交易日对齐:不要只用自然日期

另一类常见需求,是按“交易日”对齐。

这时就不能简单写:

date = datetime.fromtimestamp(ts_ms / 1000).date()

因为自然日期不等于交易日。

不同市场可能有不同开盘时间、收盘时间、节假日、半日交易日。工程上至少要有一张交易日历。

下面是一个简化示例:

def assign_trading_day(ts_ms: int, market: str, calendar: dict) -> str | None:
    """
    根据交易日历判断 timestamp 属于哪个交易日。
    calendar 示例:
    {
        "US": {
            "2024-06-19": (open_ts_ms, close_ts_ms)
        }
    }
    """
    market_calendar = calendar.get(market, {})

    for trading_day, (open_ts, close_ts) in market_calendar.items():
        if open_ts <= ts_ms <= close_ts:
            return trading_day

    return None

示例交易日历:

calendar = {
    "US": {
        "2024-06-19": (1718790000000, 1718810000000),
    },
    "CN": {
        "2024-06-19": (1718760000000, 1718780000000),
    },
}

调用:

day = assign_trading_day(
    ts_ms=1718800000000,
    market="US",
    calendar=calendar,
)

print(day)

这段代码不是完整交易日历系统,只是说明一个关键点:

按交易日对齐,必须依赖交易日历。

如果你的代码只按自然日期切分,多市场数据很容易在节假日、跨时区、开收盘边界附近错位。


五、为什么统一行情入口很重要

前面讲的是通用方法:多源行情数据进入系统前,先把 timestamp 单位、时间语义、symbol 格式和错误分支整理清楚。

问题是,很多开发者真正卡住的地方,不是不会写这些函数,而是数据源本身太分散:

  • A 股一个接口;
  • 美股一个接口;
  • 港股一个接口;
  • 加密货币又是另一套格式;
  • 有的返回秒级时间戳,有的返回毫秒级;
  • 有的字段叫 last,有的字段叫 last_price
  • 有的 symbol 要写后缀,有的不用;
  • 错误结构也各不相同。

这会导致你在业务代码之外,先写一大堆胶水代码。真正的行情逻辑还没开始,适配层已经变成一个小项目。

这也是为什么我更建议把行情数据入口先收敛起来。


六、用 TickDB 做多市场行情入口

TickDB 的价值,正是在这一步把入口收敛起来。

它不是替你写完整的交易系统,也不是替你决定怎么做时间序列对齐,而是提供一个更适合工程接入的多市场行情数据入口。

你可以把 TickDB 理解成三层能力:

层级TickDB 解决什么你仍然要自己做什么
数据入口层用统一入口访问多市场行情数据判断自己的业务需要哪些市场和品种
工程接入层通过 REST、WebSocket、MCP 等方式接入不同使用场景按场景选择快照、持续推送或 AI 工具调用
数据校验层围绕 symbol、字段、timestamp、空数据和错误分支做结构化检查根据业务目标做对齐、缓存、落库、告警和风控

换句话说,TickDB 适合解决的是“数据怎么稳定进入系统”的问题。

如果你只是偶尔查一次价格,打开网页看一眼就够了。但如果你在做下面这些事情,统一数据入口的价值就会明显很多:

  • 用 Python 写行情面板;
  • 给 AI Agent 接入真实行情数据;
  • 做多市场行情监控;
  • 把 ticker 或 kline 定时落库;
  • 在一个项目里同时处理 A 股、美股、港股、外汇、加密等多类资产;
  • 希望后续从 REST 快照扩展到 WebSocket 推送;
  • 想让 Cursor、Claude Code 这类 AI 编程工具直接调用行情工具。

这类场景里,真正贵的不是第一次请求成功,而是后面持续维护:

  • symbol 规则变了,脚本要不要改;
  • 字段路径变了,校验逻辑能不能及时发现;
  • 空数据返回时,是跳过、报警还是写失败日志;
  • timestamp 单位和语义有没有被记录;
  • 未来从单市场扩到多市场,是否要重写一套适配层。

TickDB 的产品定位更接近“行情数据基础设施入口”,而不是一个单点查询工具。它适合放在系统底层,让上层应用围绕它做展示、研究、告警、落库或 AI 工具调用。

但边界也要说清楚。

TickDB 能降低多市场行情接入和字段适配的成本,不代表你的系统可以不做校验;TickDB 提供统一数据入口,也不代表你的业务可以忽略交易时段、交易日历、数据缓存、异常恢复和权限管理。

尤其是时间戳对齐这种问题,数据入口只能帮你把原始数据拿得更规整,最终怎么按窗口、交易日或业务事件对齐,仍然是应用层逻辑。

所以一个更合理的使用方式是:

  1. 先用 TickDB 跑通一个最小 symbol 查询;
  2. 核对返回结构里的 symbol、价格字段、timestamp 和 data 是否为空;
  3. 把字段校验和失败分支写进脚本;
  4. 再根据你的业务场景选择 REST、WebSocket 或 MCP;
  5. 最后才考虑落库、监控、可视化和多市场扩展。

这样做的好处是,你不是在“相信一个数据源”,而是在建立一条可复查的数据进入路径。


七、四类常见错位 bug

实际排查时,可以从下面四类问题开始。

问题典型表现排查方式
秒/毫秒混用排序后某些数据跑到很前或很后打印 timestamp 位数
时间语义混用看起来时间接近,但业务含义不同给每条记录标注 time_base
自然日期当交易日日线或统计结果错一天引入交易日历
窗口过大或过小数据被错误归组或无法归组调整 window_ms 并观察结果

尤其是第一个问题,很容易被忽略。

很多人看到字段名叫 timestamp,就默认它可以直接比较。其实字段名相同,不代表单位相同,也不代表语义相同。


八、一个最小可运行示例

把上面的内容合在一起,可以得到一个最小版本。

from dataclasses import dataclass
from enum import Enum
from collections import defaultdict


class TimeBase(Enum):
    EVENT_TIME = "event_time"
    SERVER_TIME = "server_time"
    SNAPSHOT_TIME = "snapshot_time"
    BAR_CLOSE_TIME = "bar_close_time"


@dataclass
class MarketEvent:
    symbol: str
    ts_ms: int
    time_base: TimeBase
    source: str


def normalize_to_ms(ts: int) -> int:
    if not isinstance(ts, int):
        raise TypeError("timestamp must be int")

    if isinstance(ts, bool):
        raise TypeError("timestamp must not be bool")

    digits = len(str(abs(ts)))

    if digits == 10:
        return ts * 1000

    if digits == 13:
        return ts

    raise ValueError(f"unsupported timestamp length: {digits}")


def align_by_time_window(events: list[MarketEvent], window_ms: int = 1000):
    buckets = defaultdict(list)

    for event in events:
        bucket = event.ts_ms // window_ms * window_ms
        buckets[bucket].append(event)

    return dict(buckets)


if __name__ == "__main__":
    raw_events = [
        {
            "symbol": "AAPL.US",
            "timestamp": 1718800000,
            "time_base": TimeBase.SNAPSHOT_TIME,
            "source": "source_a",
        },
        {
            "symbol": "600519.SH",
            "timestamp": 1718800000000,
            "time_base": TimeBase.SNAPSHOT_TIME,
            "source": "source_b",
        },
        {
            "symbol": "BTCUSDT",
            "timestamp": 1718800000500,
            "time_base": TimeBase.EVENT_TIME,
            "source": "source_c",
        },
    ]

    events = [
        MarketEvent(
            symbol=item["symbol"],
            ts_ms=normalize_to_ms(item["timestamp"]),
            time_base=item["time_base"],
            source=item["source"],
        )
        for item in raw_events
    ]

    buckets = align_by_time_window(events, window_ms=1000)

    for bucket, items in buckets.items():
        print(f"bucket={bucket}")
        for item in items:
            print(
                f"  {item.symbol}, ts_ms={item.ts_ms}, "
                f"time_base={item.time_base.value}, source={item.source}"
            )

这段代码只解决一个基础问题:把不同来源的时间戳变成可比较、可解释、可分组的数据结构。

它没有解决完整生产系统里的所有问题,比如:

  • 交易日历维护;
  • 数据缺失补齐;
  • 延迟监控;
  • 多线程拉取;
  • WebSocket 断线重连;
  • 数据库落库;
  • 异常报警。

但这正是第一版代码应该做的事:先把最容易错的时间戳处理清楚,再谈更复杂的系统设计。


九、接真实行情 API 时,先看文档里的这 6 个位置

不管你接哪个行情 API,第一次看文档时,不建议从头读到尾。更有效的方式是先找六个信息:

信息要确认什么
鉴权Key 放在哪里,是 Header、Query 还是环境变量
symbol 格式代码是否带市场后缀,多个市场是否有统一规则
端点参数请求 ticker、kline、trades 时参数是否不同
返回字段timestamp、价格、成交量字段在哪里
错误分支空数据、无权限、symbol 错误时怎么返回
最小示例有没有能跑通的最小代码,而不是完整项目

如果文档里找不到这些信息,就算接口能返回一次数据,后面也很容易在工程化时返工。

TickDB 可以作为一个候选行情 API,用来按这六项检查接入路径:看官方文档和示例,确认鉴权、symbol、端点、字段、错误分支,再用自己的 symbol 跑一次最小脚本。

这里不展开具体端点和字段,实际接入时以官方文档和你自己的测试结果为准。


十、结论:先把 timestamp 变成“带语义的数据”

处理多源行情数据时,不要把 timestamp 当成一个普通整数。

它至少包含三层信息:

  1. 单位:秒还是毫秒;
  2. 语义:事件时间、服务端时间、快照时间还是 K 线结束时间;
  3. 用途:按物理时间窗口对齐,还是按交易日对齐。

如果这三层没分清,后面的排序、分桶、聚合、展示都会有隐患。

我的建议是:第一版不要急着写复杂系统。先做三件事:

  • 把所有 timestamp 统一成毫秒;
  • 给每条记录保留 time_base
  • 根据业务目标选择窗口对齐或交易日对齐。

然后再选一个稳定的数据入口,把 symbol、字段、timestamp、空数据和错误分支都纳入检查。TickDB 的意义就在这里:它把多市场行情接入这件事变得更适合工程化,而不是让你在多个零散接口之间反复写胶水代码。

本文代码为教学示例,不构成投资建议,也不代表任何具体数据源的接口承诺。实际接入时,请以对应 API 的官方文档、字段说明和你的测试结果为准。

通过 TickDB API 获取实时行情数据

一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。

免费领取 API Key查看 API 文档

相关文章