# stock-storage-frame **Repository Path**: panhuachao/stock-storage-frame ## Basic Information - **Project Name**: stock-storage-frame - **Description**: 最便捷的股票数据存储框架,安全实现配置驱动,不需要任何代码即可下载各种数据。 - **Primary Language**: Python - **License**: MIT - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 7 - **Forks**: 1 - **Created**: 2025-12-05 - **Last Updated**: 2026-02-27 ## Categories & Tags **Categories**: stocks **Tags**: None ## README # Stock Storage Frame 一个配置驱动的股票数据存储框架,专注于后端数据处理流程。 ## 特性 - **配置驱动**:完全通过YAML配置文件定义数据处理流程 - **模块化设计**:采集器、处理器、存储器分离,易于扩展 - **灵活的数据处理**:支持自定义Python脚本进行数据转换 - **多存储支持**:支持SQLite、MySQL、PostgreSQL、CSV等多种存储后端 - **通知功能**:支持钉钉通知,工作流执行成功/失败时自动发送通知 - **简单易用**:无需编写复杂代码,通过配置即可完成数据流程 ## 架构 框架采用分层架构设计,各层职责清晰,便于扩展和维护。整体架构如下: ```mermaid graph TB subgraph "配置层" A1[主配置文件 config.yaml] A2[Workflow配置文件] A3[环境变量配置] end subgraph "调度层" B1[定时调度器] B2[条件检查器] B3[任务执行器] end subgraph "采集层" C1[akshare采集器] C2[tushare采集器] C3[自定义脚本采集器] end subgraph "处理层" D1[数据清洗] D2[数据转换] D3[自定义脚本处理] end subgraph "存储层" E1[SQLite存储] E2[MySQL存储] E3[CSV存储] E4[PostgreSQL存储] end subgraph "数据层" F1[原始数据源] F2[处理后的数据] F3[持久化存储] end A1 --> B1 A2 --> B1 A3 --> B1 B1 --> B2 B2 --> B3 B3 --> C1 B3 --> C2 B3 --> C3 C1 --> D1 C2 --> D1 C3 --> D1 D1 --> D2 D2 --> D3 D3 --> E1 D3 --> E2 D3 --> E3 D3 --> E4 F1 --> C1 F1 --> C2 F1 --> C3 E1 --> F3 E2 --> F3 E3 --> F3 E4 --> F3 D3 --> F2 ``` **架构说明**: 1. **配置层**: - **主配置文件** (`config.yaml`):定义全局的采集器、处理器和存储器配置。 - **Workflow配置文件**:定义具体的数据处理流程,包括调度计划、采集器选择、处理器配置和存储目标。 - **环境变量配置**:通过 `.env` 文件管理敏感信息,支持 `${ENV_VAR}` 变量替换。 2. **调度层**: - **定时调度器**:根据 Workflow 中的 `schedule` 字段(cron表达式)自动触发任务执行。 - **条件检查器**:执行条件脚本,验证是否满足执行条件(如是否为交易日)。 - **任务执行器**:协调整个数据处理流程,按顺序调用采集层、处理层和存储层。 3. **采集层**: - **akshare采集器**:通过 akshare 库获取股票数据,支持多种数据接口。 - **tushare采集器**:通过 tushare 库获取专业金融数据,需要 token 认证。 - **自定义脚本采集器**:执行用户自定义的 Python 脚本,实现灵活的数据采集逻辑。 4. **处理层**: - **数据清洗**:对原始数据进行去重、缺失值处理、格式标准化等操作。 - **数据转换**:计算技术指标、数据聚合、字段映射等转换操作。 - **自定义脚本处理**:通过用户自定义的 Python 脚本实现复杂的数据处理逻辑。 5. **存储层**: - **SQLite存储**:轻量级本地数据库,适合单机部署和小规模数据。 - **MySQL存储**:关系型数据库,支持多用户并发访问和大规模数据存储。 - **CSV存储**:文件存储格式,便于数据导出和外部工具处理。 - **PostgreSQL存储**:高级关系型数据库,支持复杂查询和地理空间数据。 6. **数据层**: - **原始数据源**:外部数据源,如股票交易所、金融数据 API 等。 - **处理后的数据**:经过清洗和转换的标准化数据,准备存储。 - **持久化存储**:最终存储位置,数据可供后续分析和应用使用。 **数据处理流程**: 1. 调度层根据配置触发任务执行。 2. 采集层从原始数据源获取数据。 3. 处理层对数据进行清洗、转换和自定义处理。 4. 存储层将处理后的数据持久化到指定存储后端。 5. 整个流程完全由配置文件驱动,无需编写复杂代码。 ## 安装 ### 从源码安装 ```bash ## 从源码安装 git clone https://gitee.com/panhuachao/stock-storage-frame.git cd stock-storage-frame python3 -m venv venv source venv/bin/activate ## 国内源 pip install -i https://mirrors.aliyun.com/pypi/simple -r requirements.txt ## 官方源 pip install -i https://pypi.org/simple/ -r requirements.txt ``` ### 从PyPI安装 ```bash # 使用pip安装 pip install stock-storage-frame # 如使用postgresql数据库,pg数据库并不默认安装,需要额外安装 pip install stock-storage-frame[postgresql] ``` ## 快速开始 ### 1. 创建配置文件 创建 `config.yaml`: ```yaml app: name: "stock-data-pipeline" version: "1.0.0" log_level: "INFO" log_dir: "./logs" # 数据采集器配置 collectors: akshare1: type: "akshare" # method: "stock_zh_a_hist" # 默认方法,可在workflow中设置 config: timeout: 30 retry_times: 3 tushare1: type: "tushare" config: token: "${TUSHARE_TOKEN}" # 环境变量,.env文件中定义,参考.env.example,下述相同 timeout: 30 gainiancollector: type: "custom" # script: "./scripts/collector_demo.py" #默认配置脚本,可在workflow中设置 config: host: "${MYSQL_HOST}" # 环境变量,.env文件中定义,参考.env.example,下述相同 port: "${MYSQL_PORT}" # 环境变量,.env文件中定义,参考.env.example,下述相同 database: "${MYSQL_DB}" # 环境变量,.env文件中定义,参考.env.example,下述相同 username: "${MYSQL_USER}" # 环境变量,.env文件中定义,参考.env.example,下述相同 password: "${MYSQL_PASSWORD}" # 环境变量,.env文件中定义,参考.env.example,下述相同 customcollector: type: "custom" # script: "./scripts/collector_demo.py" #默认配置脚本,可在workflow中设置 config: api_url: "http://api_url" api_key: "http_api_key" # 数据存储配置,根据自身需要进行配置 storages: sqlite1: type: "sqlite" config: database: "./data/stock_data.db" csv1: type: "csv" config: directory: "./data/csv" mysql1: type: "mysql" config: host: "${MYSQL_HOST}" port: "${MYSQL_PORT}" database: "${MYSQL_DB}" username: "${MYSQL_USER}" password: "${MYSQL_PASSWORD}" postgresql1: type: "postgresql" config: host: "${POSTGRES_HOST}" port: "${POSTGRES_PORT}" database: "${POSTGRES_DB}" username: "${POSTGRES_USER}" password: "${POSTGRES_PASSWORD}" schema: "${POSTGRES_SCHEMA}" # 通知配置(可选) notifications: dingtalk1: type: "dingtalk" config: webhook_url: "${DINGTALK_WEBHOOK_URL}" secret: "${DINGTALK_SECRET}" at_mobiles: [] at_all: false timeout: 30 retry_times: 3 ``` ### 2. 创建Workflow配置 创建 `workflows/daily_stock_data.yaml`: ```yaml # workflow: stock_index_data.yaml name: "stock_index_data" description: "新浪财经-当天中国股票指数数据" schedule: "40 20 * * *" # 每天20:40执行,命令--schedule开启定时任务 # 条件配置(可选),如判定是否是交易日,只有定时任务才判断 condition: script: "./scripts/condition_trade_day.py" # 数据采集配置 - 使用不同的akshare方法 collector: name: "akshare1" #对应config.yaml中定义的akshare1名称,用于支持不同yaml不同数据源 type: "akshare" method: "stock_zh_index_spot_sina" # 股票指数日线数据 # 数据处理配置(可选) processor: # 使用Python脚本处理数据 script: "./scripts/process_add_date.py" # 数据存储配置 storage: name: "sqlite1" #对应config.yaml中定义的sqlite1名称,用于支持不同yaml使用相同类型但位置不同的数据存储 type: "sqlite" config: table_name: "stock_index_data" ``` ### 3. 创建自定义处理脚本 创建 `scripts/process_add_date.py`: ```python import pandas as pd from datetime import datetime def process(df: pd.DataFrame) -> pd.DataFrame: """自定义数据处理逻辑""" # 计算技术指标,根据需要自身添加 # df['ma5'] = df['close'].rolling(5).mean() # df['ma10'] = df['close'].rolling(10).mean() # 添加自定义字段,如日期,akshare中基本的数据都需要增加日期 df['日期'] = datetime.now().strftime("%Y-%m-%d") # 添加日期 return df ``` ### 4. 执行Workflow #### a. 库使用(通过pip安装) 如通过pip安装,则可以使用命令stock-storage-frame,如下: ```bash # 执行单个workflow stock-storage-frame --workflow workflows/stock_daily.yaml # 对特定workflow开启定时任务 stock-storage-frame --workflow workflows/stock_daily.yaml --schedule # 执行特定目录下所有workflow stock-storage-frame --workflows-dir workflows # 对目录下所有workflow执行定时任务 stock-storage-frame --workflows-dir workflows --schedule # 执行所有workflow(默认目录) stock-storage-frame --all # 测试所有组件 stock-storage-frame --test # 验证workflow配置 stock-storage-frame --validate workflows/stock_daily.yaml # 查看调度器状态 stock-storage-frame --scheduler-status ``` #### b. 源码使用(从源码项目) 如直接下载本项目源码进行使用,则可以使用命令python -m src.stock_storage.main: ```bash # 执行单个workflow python -m src.stock_storage.main --workflow workflows/stock_daily.yaml # 对特定workflow开启定时任务 python -m src.stock_storage.main --workflow workflows/stock_daily.yaml --schedule # 执行特定目录下所有workflow python -m src.stock_storage.main --workflows-dir workflows # 对目录下所有workflow执行定时任务 python -m src.stock_storage.main --workflows-dir workflows --schedule # 执行所有workflow(默认目录) python -m src.stock_storage.main --all # 测试所有组件 python -m src.stock_storage.main --test # 验证workflow配置 python -m src.stock_storage.main --validate workflows/stock_daily.yaml # 查看调度器状态 python -m src.stock_storage.main --scheduler-status ``` ### 5. 定时器调度 框架提供了内置的定时器调度功能,可以根据workflow配置中的schedule字段自动执行任务。 ```bash # 启动调度器(后台运行)- 调度所有有schedule的workflow python -m src.stock_storage.main --schedule # 对特定workflow开启定时任务 python -m src.stock_storage.main --workflow workflows/stock_daily.yaml --schedule # 对目录下所有workflow执行定时任务 python -m src.stock_storage.main --workflows-dir workflows --schedule # 查看调度器状态 python -m src.stock_storage.main --scheduler-status # 使用自定义配置和workflow目录 python -m src.stock_storage.main --schedule --config custom_config.yaml --workflows-dir custom_workflows ``` 调度器功能: - 自动加载所有包含schedule字段的workflow配置 - 根据cron表达式计算下一次执行时间 - 支持优雅关闭(Ctrl+C) - 实时日志记录执行结果 - 支持多workflow并发调度 - 支持调度特定workflow或整个目录 ## 项目结构 ``` stock-storage-frame/ ├── README.md ├── pyproject.toml ├── config.yaml # 主配置文件 ├── workflows/ # workflow配置目录 │ ├── daily_stock_data.yaml │ ├── weekly_report.yaml │ └── realtime_data.yaml ├── scripts/ # 自定义处理脚本 │ ├── process_daily_data.py │ └── calculate_indicators.py ├── src/ │ └── stock_storage/ │ ├── __init__.py │ ├── main.py # 主程序入口 │ ├── engine.py # Workflow引擎 │ ├── models.py # 数据模型 │ ├── factories.py # 组件工厂 │ ├── collectors/ # 采集器实现 │ │ ├── __init__.py │ │ ├── base.py │ │ ├── akshare.py │ │ └── tushare.py │ ├── processors/ # 处理器实现 │ │ ├── __init__.py │ │ ├── base.py │ │ ├── pandas.py │ │ └── custom.py │ └── storages/ # 存储器实现 │ ├── __init__.py │ ├── base.py │ ├── sqlite.py │ ├── mysql.py │ └── csv.py └── data/ # 数据存储目录 ├── stock_data.db # SQLite数据库 └── csv/ # CSV文件 ``` ## 配置说明 ### 主配置文件 (config.yaml) 主配置文件定义了全局的采集器、处理器和存储器配置。支持环境变量替换 `${ENV_VAR}`。 ### Workflow配置文件 每个workflow配置文件定义了一个完整的数据处理流程,包括: - **name**: workflow名称 - **description**: 描述信息 - **schedule**: 执行计划(cron表达式) - **collector**: 数据采集配置 - **processor**: 数据处理配置 - **storage**: 数据存储配置 ### 模板变量 workflow配置支持以下模板变量: - `{ today:YYYYMMDD }}或{{ today:YYYY-MM-DD }}`: 今天日期 - `{{ yesterday:YYYYMMDD }}或{{ yesterday:YYYY-MM-DD }}`: 昨天日期 - `{{ now }}`: 当前时间 ## 通知功能 框架支持在工作流执行成功或失败时发送通知。目前支持钉钉机器人通知。 ### 配置通知 1. 在 `config.yaml` 中添加通知配置(如上所示) 2. 在 `.env` 文件中配置钉钉机器人参数: ``` DINGTALK_WEBHOOK_URL=https://oapi.dingtalk.com/robot/send?access_token=your_token DINGTALK_SECRET=your_secret ``` 3. 工作流执行时会自动发送成功/失败通知 ### 通知内容 - **成功通知**:包含工作流名称、描述、处理记录数、保存记录数、执行时长 - **失败通知**:包含工作流名称、描述、错误信息、执行时长 ### 高级配置 - 支持多个通知器同时发送 - 支持通知级别过滤(info、success、warning、error) - 支持@指定人员或@所有人 - 支持自定义通知内容 详细使用指南请参考 [NOTIFICATION_GUIDE.md](NOTIFICATION_GUIDE.md) ## 扩展开发 ### 添加新的采集器 1. 在 `src/stock_storage/collectors/` 目录下创建新的采集器类 2. 继承 `BaseCollector` 类并实现必要的方法 3. 在 `factories.py` 中注册新的采集器 ### 添加新的存储器 1. 在 `src/stock_storage/storages/` 目录下创建新的存储器类 2. 继承 `BaseStorage` 类并实现必要的方法 3. 在 `factories.py` 中注册新的存储器 ### 添加新的处理器 1. 在 `src/stock_storage/processors/` 目录下创建新的处理器类 2. 继承 `BaseProcessor` 类并实现必要的方法 3. 在 `factories.py` 中注册新的处理器 ### 添加新的通知器 1. 在 `src/stock_storage/notifiers/` 目录下创建新的通知器类 2. 继承 `BaseNotifier` 类并实现必要的方法 3. 在 `NotifierFactory` 中注册新的通知器 ## 许可证 MIT License ## 贡献 欢迎提交Issue和Pull Request! ## 联系方式 - 项目地址: https://gitee.com/panhuachao/stock-storage-frame - 如有量化交易系统相关需求,可联系作者( email: tzzjchao@126.com ) - 作者: Pan Huachao