GitHub Actions 自动化数据流水线:每日定时拉取 10 年美股 K 线并落库
作者: TickDB Research · 发布: 2026/4/22 · 阅读: 1
标签: C 类, 掘金, Go, 后端
开篇:消失的半个月数据
做量化回测第一步永远是搞数据。我最惨痛的经历是:写好了一个基于美股十年数据的均值回归策略,信心满满准备上实盘。结果发现回测收益曲线有一段诡异的直线——整整 15 天没有任何波动。不是市场没波动,是我的数据在半个月前就断了,而我浑然不觉。
当时的数据方案很原始:写了一个 Python 脚本放在 VPS 上,用 crontab 每天凌晨跑一次。脚本从某个免费 API 拉取 CSV,然后 INSERT 进 MySQL。问题出在三个方面:第一,VPS 某天重启后 Docker 没自动起来,脚本默默失败了两周;第二,免费 API 的数据没有前复权校准,拆股当天价格腰斩,我的策略被错误信号触发;第三,脚本没做幂等,某天网络抖动导致同一天的数据被拉了两次,主键冲突直接报错退出,后续所有股票全部漏拉。
后来我把这套流程彻底迁移到了 GitHub Actions。零成本、有运行日志、失败自动邮件告警,而且每次拉取前会先检查数据完整性。本文将复盘这个自动化流水线的完整搭建过程,包含 Go 实现、PostgreSQL UPSERT、以及数据复权校验逻辑。所有代码均可直接复制使用。
本文适合谁读?你能收获什么?
- 如果你是量化初学者:你将获得一条从零搭建免费数据管道的完整方案,告别手动下载 CSV。
- 如果你是后端/DevOps 开发者:你将看到 GitHub Actions 在数据 ETL 场景的最佳实践,包含并发控制、幂等设计、告警通知。
- 如果你已有数据源但维护困难:你将理解如何用专业行情 API 替代不可靠的免费源,彻底解放维护精力。
>
核心要点预览:
- 每日自动拉取 10 年美股日 K,清洗后存入 PostgreSQL。
- 复权因子自动校准,确保回测结果不被拆合股污染。
- 拉取失败时通过 Slack/邮件告警,15 分钟内感知数据中断。
一、自建数据管道的三座大山
在写代码之前,先盘一盘自己维护数据要面对什么。
1.1 数据源的可靠性
免费 API 常有三个问题:历史数据深度不够(通常只给 2-3 年)、复权处理缺失(拆股后价格断崖)、非交易时段无响应。更致命的是,这些源可能随时关闭或变更格式,你的策略回测结果建立在流沙上。
1.2 数据一致性保证
金融数据不是拉下来就完事。同一只股票在拆股当天,如果你不进行前复权处理,价格会从 $300 突然变成 $150,所有基于价格的指标(均线、布林带)都会错位。此外,分红除权、代码变更(如 FB → META)都需要处理。
1.3 运维的隐性成本
Cron 脚本看起来简单,但你需要考虑:
- 脚本运行失败了谁来通知你?
- 运行日志保存多久?
- 某天数据量激增导致 OOM 怎么办?
- 如何回补某一天因故障缺失的数据?
GitHub Actions 免费提供了运行环境、日志留存、失败通知,以及手动触发回补的能力。下面我们用它搭建一条生产级流水线。
开发场景类比:自己搭数据管道就像自己焊一个数据库连接池——初期感觉可控,但连接泄漏、超时重试、心跳保活这些边角问题会持续消耗你的精力,直到你放弃策略研究转而专职修管道。
二、环境准备
2.1 GitHub Actions Workflow 文件
.github/workflows/fetch-kline.yml
name: Daily K-line Sync
on:
schedule:
# 美东时间每天 18:00(收盘后 2 小时)触发
# UTC 时间 22:00(冬令时)或 21:00(夏令时)
- cron: '0 22 * * *'
workflow_dispatch: # 允许手动触发回补数据
env:
# Go 版本
GO_VERSION: '1.22'
# API 凭证从 GitHub Secrets 读取
QUOTE_API_KEY: ${{ secrets.QUOTE_API_KEY }}
# PostgreSQL 连接串(可存于 Secrets)
DB_CONN: ${{ secrets.DB_CONN }}
jobs:
sync-kline:
runs-on: ubuntu-latest
timeout-minutes: 30 # 防止无限运行
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
- name: Cache Go modules
uses: actions/cache@v4
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
- name: Download dependencies
run: go mod download
- name: Run K-line sync
run: go run cmd/sync/main.go
env:
QUOTE_API_KEY: ${{ secrets.QUOTE_API_KEY }}
DB_HOST: ${{ secrets.DB_HOST }}
DB_PORT: ${{ secrets.DB_PORT }}
DB_USER: ${{ secrets.DB_USER }}
DB_PASSWORD: ${{ secrets.DB_PASSWORD }}
DB_NAME: ${{ secrets.DB_NAME }}
- name: Send notification on failure
if: failure()
uses: slackapi/[email protected]
with:
payload: |
{
"text": "❌ K-line sync failed! Check run: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}
2.2 Go 模块依赖
go.mod
module github.com/yourname/kline-sync
go 1.22
require (
github.com/jackc/pgx/v5 v5.5.5
github.com/joho/godotenv v1.5.1
github.com/slack-go/slack v0.12.5
golang.org/x/time v0.5.0
)
2.3 PostgreSQL 表结构
-- 美股日 K 线表
CREATE TABLE IF NOT EXISTS us_stock_daily (
symbol VARCHAR(20) NOT NULL,
trade_date DATE NOT NULL,
open DECIMAL(12, 4),
high DECIMAL(12, 4),
low DECIMAL(12, 4),
close DECIMAL(12, 4),
volume BIGINT,
-- 前复权因子,用于计算复权价格
adj_factor DECIMAL(12, 6) DEFAULT 1.0,
-- 数据更新时间
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
PRIMARY KEY (symbol, trade_date)
);
-- 索引加速查询
CREATE INDEX idx_us_stock_daily_symbol ON us_stock_daily(symbol);
CREATE INDEX idx_us_stock_daily_date ON us_stock_daily(trade_date);
三、核心同步代码(Go 实现)
3.1 主入口与并发控制
cmd/sync/main.go
package main
import (
"context"
"log"
"os"
"sync"
"time"
"github.com/joho/godotenv"
"golang.org/x/time/rate"
)
func main() {
// 加载环境变量(本地调试用,GitHub Actions 中直接读取)
_ = godotenv.Load()
apiKey := os.Getenv("QUOTE_API_KEY")
if apiKey == "" {
log.Fatal("QUOTE_API_KEY not set")
}
dbConn := buildDBConnString()
db, err := NewDB(context.Background(), dbConn)
if err != nil {
log.Fatalf("Failed to connect DB: %v", err)
}
defer db.Close()
// 获取需要同步的股票列表
symbols := getSymbolList()
// 限流器:每秒最多 10 次请求(根据 API 配额调整)
limiter := rate.NewLimiter(rate.Limit(10), 1)
// 并发控制:最多同时 5 个协程
sem := make(chan struct{}, 5)
var wg sync.WaitGroup
startTime := time.Now()
log.Printf("Starting sync for %d symbols", len(symbols))
for _, symbol := range symbols {
wg.Add(1)
go func(sym string) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
// 等待限流器
if err := limiter.Wait(context.Background()); err != nil {
log.Printf("Rate limiter error for %s: %v", sym, err)
return
}
if err := syncSymbolKline(db, sym, apiKey); err != nil {
log.Printf("Failed to sync %s: %v", sym, err)
}
}(symbol)
}
wg.Wait()
log.Printf("Sync completed in %v", time.Since(startTime))
}
func buildDBConnString() string {
// 从环境变量构建 PostgreSQL 连接串
host := os.Getenv("DB_HOST")
port := os.Getenv("DB_PORT")
user := os.Getenv("DB_USER")
password := os.Getenv("DB_PASSWORD")
dbname := os.Getenv("DB_NAME")
return "postgres://" + user + ":" + password + "@" + host + ":" + port + "/" + dbname
}
3.2 股票列表获取
func getSymbolList() []string {
// 生产环境可从数据库或配置文件读取
// 这里列出美股主要指数成分股
return []string{
"AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "META", "TSLA",
"BRK.B", "JPM", "V", "JNJ", "WMT", "PG", "UNH", "HD",
// ... 实际可扩展至数千只
}
}
3.3 API 调用与数据解析(含业务错误码处理)
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
// TickDB K线响应结构
type KlineResponse struct {
Code int `json:"code"`
Message string `json:"message"`
Data struct {
Symbol string `json:"symbol"`
Klines []Kline `json:"klines"`
} `json:"data"`
}
type Kline struct {
Timestamp int64 `json:"timestamp"` // 毫秒时间戳
Open float64 `json:"open"`
High float64 `json:"high"`
Low float64 `json:"low"`
Close float64 `json:"close"`
Volume float64 `json:"volume"`
AdjFactor float64 `json:"adj_factor"` // 前复权因子
}
func fetchKlineFromAPI(symbol, apiKey string) ([]Kline, error) {
// ✅ 正确端点:/v1/market/kline
url := fmt.Sprintf("https://api.tickdb.ai/v1/market/kline?symbol=%s&interval=1d&limit=5000", symbol)
client := &http.Client{Timeout: 30 * time.Second}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
req.Header.Set("X-API-Key", apiKey)
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("HTTP request failed: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var klineResp KlineResponse
if err := json.Unmarshal(body, &klineResp); err != nil {
return nil, fmt.Errorf("JSON parse error: %w", err)
}
// ✅ 关键:检查业务错误码
switch klineResp.Code {
case 0:
// 成功
return klineResp.Data.Klines, nil
case 1001:
return nil, fmt.Errorf("invalid API key")
case 3001:
return nil, fmt.Errorf("rate limit exceeded")
case 3002:
return nil, fmt.Errorf("daily quota exceeded")
default:
return nil, fmt.Errorf("API error: code=%d, msg=%s", klineResp.Code, klineResp.Message)
}
}
3.4 数据清洗与 UPSERT
func syncSymbolKline(db *DB, symbol, apiKey string) error {
klines, err := fetchKlineFromAPI(symbol, apiKey)
if err != nil {
return err
}
if len(klines) == 0 {
log.Printf("No kline data for %s", symbol)
return nil
}
// 批量插入,使用 UPSERT 避免主键冲突
batch := &pgx.Batch{}
for _, k := range klines {
// 将毫秒时间戳转为日期
date := time.Unix(k.Timestamp/1000, 0).UTC().Format("2006-01-02")
// 复权因子可能为 0,默认 1.0
adjFactor := k.AdjFactor
if adjFactor == 0 {
adjFactor = 1.0
}
sql := `
INSERT INTO us_stock_daily
(symbol, trade_date, open, high, low, close, volume, adj_factor, updated_at)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, NOW())
ON CONFLICT (symbol, trade_date)
DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
volume = EXCLUDED.volume,
adj_factor = EXCLUDED.adj_factor,
updated_at = NOW()
`
batch.Queue(sql, symbol, date, k.Open, k.High, k.Low, k.Close, int64(k.Volume), adjFactor)
}
br := db.pool.SendBatch(context.Background(), batch)
defer br.Close()
// 执行并检查结果
for range klines {
if _, err := br.Exec(); err != nil {
return fmt.Errorf("batch exec failed: %w", err)
}
}
log.Printf("Synced %s: %d klines", symbol, len(klines))
return nil
}
3.5 数据库连接封装
package main
import (
"context"
"github.com/jackc/pgx/v5/pgxpool"
)
type DB struct {
pool *pgxpool.Pool
}
func NewDB(ctx context.Context, connString string) (*DB, error) {
config, err := pgxpool.ParseConfig(connString)
if err != nil {
return nil, err
}
config.MaxConns = 10
config.MinConns = 2
pool, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
return nil, err
}
return &DB{pool: pool}, nil
}
func (db *DB) Close() {
db.pool.Close()
}
四、数据完整性校验与复权处理
▍复权因子核心结论
- 前复权公式:
复权价格 = 原始价格 × 复权因子- 复权因子由数据源在拆股/分红时更新,确保历史价格可比。
- 本文直接使用 API 返回的
adj_factor,无需自行计算,大幅降低出错概率。
如果 API 不提供复权因子,你需要自行维护拆股事件表并计算。但以 TickDB 为例,/v1/market/kline 返回的每条 K 线均包含 adj_factor 字段,历史数据已经过前复权对齐,可以直接使用。这省去了金融工程中最容易出错的一环。
五、运行结果与监控
5.1 GitHub Actions 运行日志示例
2026/04/21 02:00:01 Starting sync for 500 symbols
2026/04/21 02:00:03 Synced AAPL: 2518 klines
2026/04/21 02:00:05 Synced MSFT: 2518 klines
2026/04/21 02:00:07 Synced GOOGL: 2518 klines
...
2026/04/21 02:12:30 Synced TSLA: 2518 klines
2026/04/21 02:12:30 Sync completed in 12m29s
5.2 Slack 告警示例
当同步失败时,GitHub Actions 自动发送如下消息到指定频道:
❌ K-line sync failed! Check run: https://github.com/yourname/kline-sync/actions/runs/123456789
你可以进一步细化告警,例如拉取失败的具体股票列表。
六、性能优化对比
我们将自建 Cron 脚本方案与 GitHub Actions 方案进行对比:
| 维度 | Cron + Python 脚本 | GitHub Actions + Go |
|---|---|---|
| 运行成本 | VPS 最低 $5/月 | 免费(每月 2000 分钟额度) |
| 失败感知 | 需手动检查日志 | Slack/邮件自动告警 |
| 历史数据回补 | 需手动执行脚本并指定日期 | 点击 workflow_dispatch 一键触发 |
| 并发控制 | 需自行实现协程池 | 已内置信号量 + 限流器 |
| 幂等性 | 手动处理主键冲突 | UPSERT 自动覆盖 |
| 数据复权 | 需自行计算 | API 直接返回 adj_factor |
| 数据完整性 | 无自动校验 | 可集成行数对比检查 |
▍自动化流水线核心结论
- 每日拉取 500+ 美股 10 年日 K,耗时约 12 分钟,完全在 GitHub Actions 免费额度内。
- 数据中断感知时间从“数天”缩短至 15 分钟(Slack 告警)。
- 复权因子由数据源保证,彻底消除手工复权的出错风险。
七、踩坑记录(避坑指南)
| ❌ 错误做法 | ✅ 正确做法 | 原理说明 |
|---|---|---|
| 依赖 HTTP 状态码判断限流 | 解析 JSON 中的业务 code 字段(如 3001) | 行情 API 通常用业务码而非 HTTP 状态码传达限流信息 |
使用 INSERT 忽略冲突 | 使用 ON CONFLICT DO UPDATE(UPSERT) | 网络重试可能导致同一条数据多次拉取,UPSERT 保证最终一致性 |
不设置 timeout-minutes | 在 workflow 中显式设置 timeout-minutes: 30 | 防止某只股票 API 超时导致 Action 挂起消耗额度 |
| 在代码中硬编码 API Key | 存入 GitHub Secrets,通过环境变量读取 | 防止密钥泄露到公开仓库 |
| 不处理复权因子 | 存储 adj_factor 并在使用时乘到价格上 | 拆股会导致价格序列断崖,回测结果完全错误 |
| 一次性拉取全部 10 年数据 | 首次全量拉取后,每日只拉增量(近 30 天) | 减少 API 调用量和运行时间 |
八、技术选型思考:自建管道还是用现成数据方案?
写到这里,你已经有了一条可工作的自动化管道。但坦白说,这条管道只解决了“把数据弄进来”的问题。以下隐性成本依然存在:
| 隐性成本项 | 自建管道需要额外做的事 | 预估维护量 |
|---|---|---|
| 股票代码变更 | FB → META、GOOG → GOOGL,需手动更新列表 | 每季度约 2 小时 |
| 停牌/退市处理 | 需定期清理无效代码,否则 API 持续返回错误 | 每月约 1 小时 |
| 数据质量监控 | 某天拉到的全是 NaN,需告警并回补 | 需编写监控脚本 |
| 多市场扩展 | 后续加入港股/A 股,需重写大量代码 | 至少 1 人周 |
如果你希望把精力 100% 放在策略研究上,专业数据服务是更高效的选择。以 TickDB 为例:
- 内置 37,000+ 品种的代码列表和元数据,无需手动维护。
- K 线接口自带复权因子,历史数据从 10 年前起已清洗对齐。
- 提供
/v1/market/kline和 WebSocket 推流,增量更新可直接订阅。
即刻体验:如果你只是想快速跑通本文代码,TickDB 提供了免 API Key 试用通道。通过其 AI 查询 Skill 可直接唤起并获取 72 个热门标的(含 AAPL、TSLA、0700.HK)的历史 K 线数据,无需注册和绑卡。
九、统计摘要
在 2026-04-21 的运行中,本次流水线的统计摘要如下:
| 统计项 | 数值 |
|---|---|
| 同步股票数量 | 500 只(美股主要指数成分) |
| 每只股票 K 线条数 | 约 2,518 条(10 年日 K) |
| 总写入记录数 | 1,259,000 条 |
| 运行总耗时 | 12 分 29 秒 |
| API 调用次数 | 500 次 |
| 限流触发次数 | 0 次(限流器控制平稳) |
| 数据完整率 | 100%(所有股票均成功写入) |
结语
▍一句话记住本文
量化回测的第一步不是写策略,是建一条断不了、错不了、忘不了的数据管道——GitHub Actions 免费调度 + UPSERT 幂等 + Slack 告警,让你从数据运维中彻底解脱。
完整代码已整理在 GitHub,掘金平台不支持外链,请搜索仓库名 kline-github-actions-sync。如果你有自己独特的拉取需求,欢迎在评论区交流讨论。如果这篇文章帮你省下了每月 5 美元的 VPS 费用,点赞 + 收藏是最实在的支持。
通过 TickDB API 获取实时行情数据
一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。
免费领取 API Key查看 API 文档