GitHub Actions 定时拉取 K 线:任务跑通了,一年后数据为什么仍会缺?
作者: TickDB Research · 发布: 2026/6/9 · 阅读: 7
标签: W23-T01, 知乎 A002
以下是假设场景。
你写了一个 GitHub Actions 定时任务,每天拉取美股日线 K 线写入 PostgreSQL。push 上去,第一天跑通,数据库里有数据。
三天后你发现同一个交易日出现了重复行。再过一周,有一天数据直接缺了——任务没有失败,只是 schedule 延迟了。你想查上次跑到哪了,才意识到 runner 销毁后本地文件一起消失了。脚本第一天就能跑通。让它连续运行一年不出岔子,需要的是一套调度器本身不提供的保障。
GitHub Actions 是闹钟,不是流水线。它能叫醒你的脚本,但不会替你保存状态、检查数据完整性、保证准点或实现 exactly-once。GitHub schedule 是 best-effort 定时触发,可能延迟或丢弃,不提供准时、必达或 exactly-once 保证。幂等写入之所以必要,是因为手动重跑、失败重跑和恢复重放都可能导致同一段数据被多次拉取。恢复逻辑之所以靠有界回补,是因为写入可能因调度跳过而漏数据。
把定时器跑成可靠数据管道,需要设计三个核心机制:幂等写入,让重复执行不产生重复数据;合同校验,让坏数据在入库前被拒绝;有界重放,让最近窗口内的数据可以被重复拉取并覆盖修正。窗口外缺口需要独立监控与 backfill 流程处理。
调度的真相:best-effort,不是 exactly-once
GitHub Actions 的 schedule 事件在指定时间触发工作流。官方文档写明:高负载时可能延迟,负载足够高时部分排队任务可能被丢弃。定时工作流只在默认分支运行;公开仓库 60 天无活动会自动停用。
GitHub-hosted runner 为每个 job 创建全新虚拟机,job 结束后销毁。本地磁盘只能在同一 job 的步骤间共享,不能作为跨运行的持久化存储。
这意味着三件事同时成立:触发可能延迟或跳过,执行环境每次都是全新的,同一个 job 可能被重复触发。把“每天跑一次”理解成 exactly-once 交付保证,是从一开始就弄错了调度器的定位。
图 1:GitHub Actions 官方文档中的 schedule、GitHub-hosted runner 与 concurrency 边界。本图说明定时触发可能延迟、运行环境不适合作为跨运行持久化存储,并且 concurrency 不提供完整排队或 exactly-once 保证。官方文档要点图,不是工作流运行截图。
五层可靠性模型
每一层只解决一类问题。上一层通过,不代表下一层自动成立。
| 层级 | 职责 | 常见误区 |
|---|---|---|
| 调度层 | 按计划唤醒脚本 | 把 schedule 当成精确时钟或交付保证 |
| 调用层 | 受限重试与错误分类 | 所有错误一视同仁地重试,不区分限流和鉴权 |
| 校验层 | 合同校验,拒绝坏数据 | 只检查 HTTP 200 和 code=0,不验证字段类型和基本关系 |
| 写入层 | 幂等 upsert,重复执行不重复写 | 用 INSERT 遇到重复行直接报错 |
| 恢复层 | 有界重放与覆盖修正 | 只拉最近一天,缺了数据不自知 |
图 2:K 线定时入库的五层可靠性模型。原理示意图,不是运行结果;每一层只处理特定失败模式,不能向下传递可靠性保证。
失败分类
不是所有红字都是同一种问题。错误的响应策略比没有策略更危险。
| 失败类型 | 典型信号 | 正确响应 |
|---|---|---|
| 限流 | HTTP 429 或业务码 3001,带 Retry-After 头 | 按指示等待后重试,使用有上限的退避 |
| 鉴权/权限 | 1001、1002 或 1004 | 阻断重试,非零退出 |
| 网络/服务端异常 | timeout、HTTP 5xx 或无效 JSON | 有限重试,超过上限后非零退出 |
| 数据不完整 | code=0 但 klines 为空数组或字段缺失 | 非零退出,不写入 |
| 字段类型错误 | price 不是字符串或 time 不是正整数毫秒 | 非零退出 |
当前代码只执行最近 30 根 K 线的有界重放与覆盖修正,不负责判断窗口外缺口;窗口外恢复需要独立 backfill 流程。
表结构
CREATE TABLE IF NOT EXISTS kline_1d (
symbol TEXT NOT NULL,
interval TEXT NOT NULL,
time_ms BIGINT NOT NULL,
open NUMERIC NOT NULL,
high NUMERIC NOT NULL,
low NUMERIC NOT NULL,
close NUMERIC NOT NULL,
volume NUMERIC NOT NULL,
quote_volume NUMERIC NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (symbol, interval, time_ms)
);
复合主键 (symbol, interval, time_ms) 是幂等写入的基础。它把同一根 K 线的定义固定在表结构上,让 upsert 能识别重复行。没有这个约束,重复运行就是重复写入。
安装命令
pip install requests==2.31.0 psycopg==3.1.19
同步脚本
import os
import sys
import time
import logging
from decimal import Decimal, InvalidOperation
import requests
import psycopg
from psycopg import sql
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
API_KEY = os.environ["TICKDB_API_KEY"]
DATABASE_URL = os.environ["DATABASE_URL"]
BASE_URL = "https://api.tickdb.ai"
SYMBOL = "AAPL.US"
INTERVAL = "1d"
LOOKBACK = 30
MAX_RETRIES = 3
def retry_delay(resp, attempt, cap=60):
raw = resp.headers.get("Retry-After") if resp is not None else None
if raw is not None:
try:
return min(max(int(raw), 0), cap)
except ValueError:
pass
return min(2 ** (attempt - 1), cap)
def fetch_kline(symbol, interval, limit):
url = f"{BASE_URL}/v1/market/kline"
headers = {"X-API-Key": API_KEY}
params = {"symbol": symbol, "interval": interval, "limit": limit}
last_exception = None
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = requests.get(url, headers=headers, params=params, timeout=30)
if resp.status_code == 429:
delay = retry_delay(resp, attempt)
logging.warning("HTTP 429, retry after %ds", delay)
time.sleep(delay)
continue
if 400 <= resp.status_code < 500 and resp.status_code != 429:
logging.error("HTTP %d, not retrying", resp.status_code)
sys.exit(1)
if resp.status_code >= 500:
delay = retry_delay(resp, attempt)
logging.warning("HTTP %d, attempt %d, retry after %ds", resp.status_code, attempt, delay)
time.sleep(delay)
continue
data = resp.json()
code = data.get("code")
if code == 3001:
delay = retry_delay(resp, attempt)
logging.warning("code=3001, retry after %ds", delay)
time.sleep(delay)
continue
if code in (1001, 1002, 1004):
logging.error("Fatal: code=%d, stopping", code)
sys.exit(1)
if code != 0:
logging.error("Unexpected code=%d", code)
sys.exit(1)
return data
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
delay = retry_delay(None, attempt)
logging.warning("Network error attempt %d: %s, retry after %ds", attempt, e, delay)
last_exception = e
time.sleep(delay)
except ValueError as e:
logging.warning("Invalid JSON attempt %d: %s", attempt, e)
last_exception = e
if attempt == MAX_RETRIES:
logging.error("Max retries exceeded after invalid JSON")
sys.exit(1)
delay = retry_delay(None, attempt)
time.sleep(delay)
logging.error("Max retries exceeded")
raise last_exception or RuntimeError("fetch failed")
def validate_kline_response(data, requested_symbol, requested_interval):
if not isinstance(data, dict):
logging.error("Response is not a dict")
sys.exit(1)
resp_data = data.get("data")
if not isinstance(resp_data, dict):
logging.error("data is not a dict")
sys.exit(1)
resp_symbol = resp_data.get("symbol")
if resp_symbol != requested_symbol:
logging.error("Symbol mismatch in response: %s", resp_symbol)
sys.exit(1)
resp_interval = resp_data.get("interval")
if resp_interval != requested_interval:
logging.error("Interval mismatch in response: %s", resp_interval)
sys.exit(1)
klines = resp_data.get("klines")
if not isinstance(klines, list) or len(klines) == 0:
logging.error("klines empty or missing")
sys.exit(1)
seen_times = set()
for i, bar in enumerate(klines):
if not isinstance(bar, dict):
logging.error("Bar %d is not a dict", i)
sys.exit(1)
time_ms = bar.get("time")
if not isinstance(time_ms, int) or time_ms <= 0:
logging.error("Invalid time at index %d: %s", i, time_ms)
sys.exit(1)
if time_ms in seen_times:
logging.error("Duplicate time at index %d: %d", i, time_ms)
sys.exit(1)
seen_times.add(time_ms)
for price_field in ("open", "high", "low", "close"):
val = bar.get(price_field)
if not isinstance(val, str):
logging.error("Field %s not string at index %d", price_field, i)
sys.exit(1)
try:
d = Decimal(val)
except InvalidOperation:
logging.error("Invalid Decimal for %s at index %d: %s", price_field, i, val)
sys.exit(1)
if not d.is_finite():
logging.error("Non-finite %s at index %d", price_field, i)
sys.exit(1)
open_val = Decimal(bar["open"])
close_val = Decimal(bar["close"])
high_val = Decimal(bar["high"])
low_val = Decimal(bar["low"])
if high_val < max(open_val, close_val, low_val):
logging.error("high < max(open, close, low) at index %d", i)
sys.exit(1)
if low_val > min(open_val, close_val, high_val):
logging.error("low > min(open, close, high) at index %d", i)
sys.exit(1)
for vol_field in ("volume", "quote_volume"):
val = bar.get(vol_field)
if not isinstance(val, str):
logging.error("Field %s not string at index %d", vol_field, i)
sys.exit(1)
try:
d = Decimal(val)
except InvalidOperation:
logging.error("Invalid Decimal for %s at index %d: %s", vol_field, i, val)
sys.exit(1)
if not d.is_finite():
logging.error("Non-finite %s at index %d", vol_field, i)
sys.exit(1)
return klines, resp_symbol, resp_interval
def upsert_klines(conn, klines, symbol, interval):
query = sql.SQL("""
INSERT INTO kline_1d (symbol, interval, time_ms, open, high, low, close, volume, quote_volume, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (symbol, interval, time_ms)
DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
volume = EXCLUDED.volume,
quote_volume = EXCLUDED.quote_volume,
updated_at = NOW()
""")
affected = 0
with conn.cursor() as cur:
for bar in klines:
cur.execute(query, (
symbol,
interval,
bar["time"],
bar["open"],
bar["high"],
bar["low"],
bar["close"],
bar["volume"],
bar["quote_volume"],
))
affected += cur.rowcount
return affected
def main():
raw = fetch_kline(SYMBOL, INTERVAL, LOOKBACK)
klines, resp_symbol, resp_interval = validate_kline_response(raw, SYMBOL, INTERVAL)
with psycopg.connect(DATABASE_URL) as conn:
with conn.transaction():
affected = upsert_klines(conn, klines, SYMBOL, INTERVAL)
logging.info(
"symbol=%s interval=%s count=%d affected=%d earliest=%d latest=%d",
SYMBOL, INTERVAL, len(klines), affected,
min(bar["time"] for bar in klines),
max(bar["time"] for bar in klines),
)
if __name__ == "__main__":
main()
核心校验逻辑在 validate_kline_response 里。symbol 和 interval 从 response["data"] 中读取并校验,每根 bar 不再单独携带这两个字段,写入时由 upsert_klines 函数使用请求参数构造规范化记录。价格字段必须是字符串且能转成有限 Decimal,high 不低于 max(open, close, low),low 不高于 min(open, close, high),时间戳是正整数毫秒且不重复。任何一项失败,脚本非零退出,不执行写入。
fetch_kline 对不同类型的错误采取不同策略。429 和业务码 3001 按 Retry-After 指示等待后重试,退避有 60 秒上限。timeout、连接错误、5xx 和无效 JSON 有限重试,超过最大次数后非零退出。1001、1002、1004 阻断重试立即退出。其他 4xx 不重试直接退出。
为核对字段层级,本文于 2026 年 6 月 8 日通过 TickDB MCP 对 AAPL.US、1d、limit=3 做了最小调用验证。该验证只用于确认本次 MCP 返回结构,不代表 REST 接口及全部品种已经实测。
图 3:TickDB MCP 实际调用结果,2026-06-08;由 Codex 执行并核对,敏感信息已脱敏。本图仅证明该次调用的 K 线返回层级和字段类型,不代表 REST 行为、全品种一致性、排序、分页、延迟或 SLA。
工作流文件
name: kline-sync
on:
schedule:
- cron: "17 10 * * *"
workflow_dispatch:
permissions:
contents: read
concurrency:
group: kline-sync-main
cancel-in-progress: false
jobs:
sync:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: actions/setup-python@v6
with:
python-version: "3.12"
- run: pip install requests==2.31.0 psycopg==3.1.19
- name: run sync
env:
TICKDB_API_KEY: ${{ secrets.TICKDB_API_KEY }}
DATABASE_URL: ${{ secrets.DATABASE_URL }}
run: python sync_kline.py
cron 使用 UTC 时间,可能延迟。schedule 只在默认分支运行,公开仓库 60 天无活动自动停用。同一 concurrency group 最多保留一个运行中任务和一个 pending 任务;新的任务进入队列时,已有 pending 任务可能被取消并由新任务替代。该机制既不保证完整排队,也不提供 exactly-once。
API Key 和数据库连接字符串通过 GitHub Secrets 注入,不出现在代码、日志或 YAML 的明文里。
边界与限制
当前实现为每次运行时拉取最近 30 根已结束 K 线并执行覆盖修正,属于有界重放策略。本示例不证明 start_time 参数的包含性行为、返回数据的排序与分页边界、交易日的连续性,也不提供任意断点续传能力。
本示例只覆盖 REST K 线端点。其他端点、其他品种、更长时间跨度需要独立的验证和实现。
如果你只是个人实验,可以简化。但如果这份数据会被下游系统消费——告警、回测、Agent 决策——就需要合同校验和明确的失败信号。数据一旦离开你的脚本,别人会默认它是完整的。这个默认是你的责任。
你现在保存的是数据,还是只保存了一条“这次跑成功了”的日志?
参考来源
通过 TickDB API 获取实时行情数据
一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。
免费领取 API Key查看 API 文档