综合

GitHub Actions 定时拉取 K 线:任务跑通了,一年后数据为什么仍会缺?

作者: TickDB Research · 发布: 2026/6/9 · 阅读: 7

标签: W23-T01, 知乎 A002

以下是假设场景。

你写了一个 GitHub Actions 定时任务,每天拉取美股日线 K 线写入 PostgreSQL。push 上去,第一天跑通,数据库里有数据。

三天后你发现同一个交易日出现了重复行。再过一周,有一天数据直接缺了——任务没有失败,只是 schedule 延迟了。你想查上次跑到哪了,才意识到 runner 销毁后本地文件一起消失了。脚本第一天就能跑通。让它连续运行一年不出岔子,需要的是一套调度器本身不提供的保障。

GitHub Actions 是闹钟,不是流水线。它能叫醒你的脚本,但不会替你保存状态、检查数据完整性、保证准点或实现 exactly-once。GitHub schedule 是 best-effort 定时触发,可能延迟或丢弃,不提供准时、必达或 exactly-once 保证。幂等写入之所以必要,是因为手动重跑、失败重跑和恢复重放都可能导致同一段数据被多次拉取。恢复逻辑之所以靠有界回补,是因为写入可能因调度跳过而漏数据。

把定时器跑成可靠数据管道,需要设计三个核心机制:幂等写入,让重复执行不产生重复数据;合同校验,让坏数据在入库前被拒绝;有界重放,让最近窗口内的数据可以被重复拉取并覆盖修正。窗口外缺口需要独立监控与 backfill 流程处理。


调度的真相:best-effort,不是 exactly-once

GitHub Actions 的 schedule 事件在指定时间触发工作流。官方文档写明:高负载时可能延迟,负载足够高时部分排队任务可能被丢弃。定时工作流只在默认分支运行;公开仓库 60 天无活动会自动停用。

GitHub-hosted runner 为每个 job 创建全新虚拟机,job 结束后销毁。本地磁盘只能在同一 job 的步骤间共享,不能作为跨运行的持久化存储。

这意味着三件事同时成立:触发可能延迟或跳过,执行环境每次都是全新的,同一个 job 可能被重复触发。把“每天跑一次”理解成 exactly-once 交付保证,是从一开始就弄错了调度器的定位。

!image.png

图 1:GitHub Actions 官方文档中的 schedule、GitHub-hosted runner 与 concurrency 边界。本图说明定时触发可能延迟、运行环境不适合作为跨运行持久化存储,并且 concurrency 不提供完整排队或 exactly-once 保证。官方文档要点图,不是工作流运行截图。


五层可靠性模型

每一层只解决一类问题。上一层通过,不代表下一层自动成立。

层级职责常见误区
调度层按计划唤醒脚本把 schedule 当成精确时钟或交付保证
调用层受限重试与错误分类所有错误一视同仁地重试,不区分限流和鉴权
校验层合同校验,拒绝坏数据只检查 HTTP 200 和 code=0,不验证字段类型和基本关系
写入层幂等 upsert,重复执行不重复写用 INSERT 遇到重复行直接报错
恢复层有界重放与覆盖修正只拉最近一天,缺了数据不自知

!image.png

图 2:K 线定时入库的五层可靠性模型。原理示意图,不是运行结果;每一层只处理特定失败模式,不能向下传递可靠性保证。


失败分类

不是所有红字都是同一种问题。错误的响应策略比没有策略更危险。

失败类型典型信号正确响应
限流HTTP 429 或业务码 3001,带 Retry-After 头按指示等待后重试,使用有上限的退避
鉴权/权限1001、1002 或 1004阻断重试,非零退出
网络/服务端异常timeout、HTTP 5xx 或无效 JSON有限重试,超过上限后非零退出
数据不完整code=0 但 klines 为空数组或字段缺失非零退出,不写入
字段类型错误price 不是字符串或 time 不是正整数毫秒非零退出

当前代码只执行最近 30 根 K 线的有界重放与覆盖修正,不负责判断窗口外缺口;窗口外恢复需要独立 backfill 流程。


表结构

CREATE TABLE IF NOT EXISTS kline_1d (
    symbol      TEXT NOT NULL,
    interval    TEXT NOT NULL,
    time_ms     BIGINT NOT NULL,
    open        NUMERIC NOT NULL,
    high        NUMERIC NOT NULL,
    low         NUMERIC NOT NULL,
    close       NUMERIC NOT NULL,
    volume      NUMERIC NOT NULL,
    quote_volume NUMERIC NOT NULL,
    created_at  TIMESTAMPTZ DEFAULT NOW(),
    updated_at  TIMESTAMPTZ DEFAULT NOW(),
    PRIMARY KEY (symbol, interval, time_ms)
);

复合主键 (symbol, interval, time_ms) 是幂等写入的基础。它把同一根 K 线的定义固定在表结构上,让 upsert 能识别重复行。没有这个约束,重复运行就是重复写入。


安装命令

pip install requests==2.31.0 psycopg==3.1.19

同步脚本

import os
import sys
import time
import logging
from decimal import Decimal, InvalidOperation

import requests
import psycopg
from psycopg import sql

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")

API_KEY = os.environ["TICKDB_API_KEY"]
DATABASE_URL = os.environ["DATABASE_URL"]
BASE_URL = "https://api.tickdb.ai"
SYMBOL = "AAPL.US"
INTERVAL = "1d"
LOOKBACK = 30
MAX_RETRIES = 3

def retry_delay(resp, attempt, cap=60):
    raw = resp.headers.get("Retry-After") if resp is not None else None
    if raw is not None:
        try:
            return min(max(int(raw), 0), cap)
        except ValueError:
            pass
    return min(2 ** (attempt - 1), cap)

def fetch_kline(symbol, interval, limit):
    url = f"{BASE_URL}/v1/market/kline"
    headers = {"X-API-Key": API_KEY}
    params = {"symbol": symbol, "interval": interval, "limit": limit}
    last_exception = None
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            resp = requests.get(url, headers=headers, params=params, timeout=30)
            if resp.status_code == 429:
                delay = retry_delay(resp, attempt)
                logging.warning("HTTP 429, retry after %ds", delay)
                time.sleep(delay)
                continue
            if 400 <= resp.status_code < 500 and resp.status_code != 429:
                logging.error("HTTP %d, not retrying", resp.status_code)
                sys.exit(1)
            if resp.status_code >= 500:
                delay = retry_delay(resp, attempt)
                logging.warning("HTTP %d, attempt %d, retry after %ds", resp.status_code, attempt, delay)
                time.sleep(delay)
                continue
            data = resp.json()
            code = data.get("code")
            if code == 3001:
                delay = retry_delay(resp, attempt)
                logging.warning("code=3001, retry after %ds", delay)
                time.sleep(delay)
                continue
            if code in (1001, 1002, 1004):
                logging.error("Fatal: code=%d, stopping", code)
                sys.exit(1)
            if code != 0:
                logging.error("Unexpected code=%d", code)
                sys.exit(1)
            return data
        except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
            delay = retry_delay(None, attempt)
            logging.warning("Network error attempt %d: %s, retry after %ds", attempt, e, delay)
            last_exception = e
            time.sleep(delay)
        except ValueError as e:
            logging.warning("Invalid JSON attempt %d: %s", attempt, e)
            last_exception = e
            if attempt == MAX_RETRIES:
                logging.error("Max retries exceeded after invalid JSON")
                sys.exit(1)
            delay = retry_delay(None, attempt)
            time.sleep(delay)
    logging.error("Max retries exceeded")
    raise last_exception or RuntimeError("fetch failed")

def validate_kline_response(data, requested_symbol, requested_interval):
    if not isinstance(data, dict):
        logging.error("Response is not a dict")
        sys.exit(1)
    resp_data = data.get("data")
    if not isinstance(resp_data, dict):
        logging.error("data is not a dict")
        sys.exit(1)
    resp_symbol = resp_data.get("symbol")
    if resp_symbol != requested_symbol:
        logging.error("Symbol mismatch in response: %s", resp_symbol)
        sys.exit(1)
    resp_interval = resp_data.get("interval")
    if resp_interval != requested_interval:
        logging.error("Interval mismatch in response: %s", resp_interval)
        sys.exit(1)
    klines = resp_data.get("klines")
    if not isinstance(klines, list) or len(klines) == 0:
        logging.error("klines empty or missing")
        sys.exit(1)
    seen_times = set()
    for i, bar in enumerate(klines):
        if not isinstance(bar, dict):
            logging.error("Bar %d is not a dict", i)
            sys.exit(1)
        time_ms = bar.get("time")
        if not isinstance(time_ms, int) or time_ms <= 0:
            logging.error("Invalid time at index %d: %s", i, time_ms)
            sys.exit(1)
        if time_ms in seen_times:
            logging.error("Duplicate time at index %d: %d", i, time_ms)
            sys.exit(1)
        seen_times.add(time_ms)
        for price_field in ("open", "high", "low", "close"):
            val = bar.get(price_field)
            if not isinstance(val, str):
                logging.error("Field %s not string at index %d", price_field, i)
                sys.exit(1)
            try:
                d = Decimal(val)
            except InvalidOperation:
                logging.error("Invalid Decimal for %s at index %d: %s", price_field, i, val)
                sys.exit(1)
            if not d.is_finite():
                logging.error("Non-finite %s at index %d", price_field, i)
                sys.exit(1)
        open_val = Decimal(bar["open"])
        close_val = Decimal(bar["close"])
        high_val = Decimal(bar["high"])
        low_val = Decimal(bar["low"])
        if high_val < max(open_val, close_val, low_val):
            logging.error("high < max(open, close, low) at index %d", i)
            sys.exit(1)
        if low_val > min(open_val, close_val, high_val):
            logging.error("low > min(open, close, high) at index %d", i)
            sys.exit(1)
        for vol_field in ("volume", "quote_volume"):
            val = bar.get(vol_field)
            if not isinstance(val, str):
                logging.error("Field %s not string at index %d", vol_field, i)
                sys.exit(1)
            try:
                d = Decimal(val)
            except InvalidOperation:
                logging.error("Invalid Decimal for %s at index %d: %s", vol_field, i, val)
                sys.exit(1)
            if not d.is_finite():
                logging.error("Non-finite %s at index %d", vol_field, i)
                sys.exit(1)
    return klines, resp_symbol, resp_interval

def upsert_klines(conn, klines, symbol, interval):
    query = sql.SQL("""
        INSERT INTO kline_1d (symbol, interval, time_ms, open, high, low, close, volume, quote_volume, updated_at)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
        ON CONFLICT (symbol, interval, time_ms)
        DO UPDATE SET
            open = EXCLUDED.open,
            high = EXCLUDED.high,
            low = EXCLUDED.low,
            close = EXCLUDED.close,
            volume = EXCLUDED.volume,
            quote_volume = EXCLUDED.quote_volume,
            updated_at = NOW()
    """)
    affected = 0
    with conn.cursor() as cur:
        for bar in klines:
            cur.execute(query, (
                symbol,
                interval,
                bar["time"],
                bar["open"],
                bar["high"],
                bar["low"],
                bar["close"],
                bar["volume"],
                bar["quote_volume"],
            ))
            affected += cur.rowcount
    return affected

def main():
    raw = fetch_kline(SYMBOL, INTERVAL, LOOKBACK)
    klines, resp_symbol, resp_interval = validate_kline_response(raw, SYMBOL, INTERVAL)

    with psycopg.connect(DATABASE_URL) as conn:
        with conn.transaction():
            affected = upsert_klines(conn, klines, SYMBOL, INTERVAL)

        logging.info(
            "symbol=%s interval=%s count=%d affected=%d earliest=%d latest=%d",
            SYMBOL, INTERVAL, len(klines), affected,
            min(bar["time"] for bar in klines),
            max(bar["time"] for bar in klines),
        )

if __name__ == "__main__":
    main()

核心校验逻辑在 validate_kline_response 里。symbolintervalresponse["data"] 中读取并校验,每根 bar 不再单独携带这两个字段,写入时由 upsert_klines 函数使用请求参数构造规范化记录。价格字段必须是字符串且能转成有限 Decimal,high 不低于 max(open, close, low)low 不高于 min(open, close, high),时间戳是正整数毫秒且不重复。任何一项失败,脚本非零退出,不执行写入。

fetch_kline 对不同类型的错误采取不同策略。429 和业务码 3001 按 Retry-After 指示等待后重试,退避有 60 秒上限。timeout、连接错误、5xx 和无效 JSON 有限重试,超过最大次数后非零退出。1001、1002、1004 阻断重试立即退出。其他 4xx 不重试直接退出。

为核对字段层级,本文于 2026 年 6 月 8 日通过 TickDB MCP 对 AAPL.US1dlimit=3 做了最小调用验证。该验证只用于确认本次 MCP 返回结构,不代表 REST 接口及全部品种已经实测。

!image.png

图 3:TickDB MCP 实际调用结果,2026-06-08;由 Codex 执行并核对,敏感信息已脱敏。本图仅证明该次调用的 K 线返回层级和字段类型,不代表 REST 行为、全品种一致性、排序、分页、延迟或 SLA。


工作流文件

name: kline-sync
on:
  schedule:
    - cron: "17 10 * * *"
  workflow_dispatch:
permissions:
  contents: read
concurrency:
  group: kline-sync-main
  cancel-in-progress: false
jobs:
  sync:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v6
      - uses: actions/setup-python@v6
        with:
          python-version: "3.12"
      - run: pip install requests==2.31.0 psycopg==3.1.19
      - name: run sync
        env:
          TICKDB_API_KEY: ${{ secrets.TICKDB_API_KEY }}
          DATABASE_URL: ${{ secrets.DATABASE_URL }}
        run: python sync_kline.py

cron 使用 UTC 时间,可能延迟。schedule 只在默认分支运行,公开仓库 60 天无活动自动停用。同一 concurrency group 最多保留一个运行中任务和一个 pending 任务;新的任务进入队列时,已有 pending 任务可能被取消并由新任务替代。该机制既不保证完整排队,也不提供 exactly-once。

API Key 和数据库连接字符串通过 GitHub Secrets 注入,不出现在代码、日志或 YAML 的明文里。


边界与限制

当前实现为每次运行时拉取最近 30 根已结束 K 线并执行覆盖修正,属于有界重放策略。本示例不证明 start_time 参数的包含性行为、返回数据的排序与分页边界、交易日的连续性,也不提供任意断点续传能力。

本示例只覆盖 REST K 线端点。其他端点、其他品种、更长时间跨度需要独立的验证和实现。

如果你只是个人实验,可以简化。但如果这份数据会被下游系统消费——告警、回测、Agent 决策——就需要合同校验和明确的失败信号。数据一旦离开你的脚本,别人会默认它是完整的。这个默认是你的责任。

你现在保存的是数据,还是只保存了一条“这次跑成功了”的日志?


参考来源

通过 TickDB API 获取实时行情数据

一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。

免费领取 API Key查看 API 文档

相关文章