# cloudflow
**Repository Path**: yaozhicheng/cloudflow
## Basic Information
- **Project Name**: cloudflow
- **Description**: 面向编程的 工作流编程 框架
- **Primary Language**: Go
- **License**: MulanPSL-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 2
- **Forks**: 1
- **Created**: 2023-06-02
- **Last Updated**: 2023-08-28
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# CloudFlow
#### (一)简介
CloudFlow 利用"编程"的方式尝试解决如下问题:
1. **屏蔽云执行环境**。在当前云原生环境中,应用开发者需要在"编码以外"的事情上投入大量精力。基于CloudFlow编写程序,可在单节点,多节点,以及k8s环境下进行调试,程序员不需要频繁登录到其他节点,不需要知道k8s如何操作。部署时,可以根据目标自动生成Docker镜像以及配置文件,免去配置文件编写过程过程。
2. **数据流编程框架**。CloudFlow是基于DataFlow概念设计的数据处理框架,用户只需要考虑数据如何流动,不需要考虑其执行环境。CloudFlow根据处理数据量的大小以及可用资源情况,自动化的进行调度,实现动态扩缩容。
3. **微服务编程框架**。除去数据流外,CloudFlow 提供 restful apt编程接口,支持多语言(golang, C++, python)混合编程,内置KV存储,对象存储等服务,编程者只用关注业务逻辑,CloudFLow提供统一调试和部署功能。
4. **执行流测试框架**。对应传统多阶段,多工具任务(例如EDA任务需要组合多个小工具进行级联运行),可以通过CloudFlow提供的流构建功能,实现多阶段测试与验证。在流处理模式中,只有当上级节点测试完成,才会执行下一个节点。基于该特征可以实现应用持续集成。
5. **分布式环境构建**。基于k8s环境,CloudFlow中的Node可以按需求被调度到各个物理节点,并且保证节点之间能相互通讯,因此基于该框架可以快速的运行其他框架程序,例如MPI程序、Pytorch/Tensorflow等分布式机器学习框架,以及Ray框架等。
#### (二)逻辑架构

图1 cloudflow 组成
如上图所示,CloudFLow的核心服务有:执行器,全局状态服务,调度器组成
- **执行器:** 执行器(worker)负责从全局状态服务中拉取属于自己的任务进行执行,并实时汇报任务的执行状态,统计信息等。
- **调度器:** 调度器(scheduler)负责进行任务调度,把用户提交的任务指派给最适合的执行器。并对全局状态进行维护,例如清空超时任务,检查任务状态等。
- **状态服务:** 用来保存整个cloudflow的全局信息,例如任务执行状态,worker和scheduler信息等。
- **任务:** 在CloudFlow中,一个应用(Job)由多个Task组织,Job对应应用中的整个DAG图,Task对应其中的每个节点。
- **消息通讯:** worker之间通过消息中间件进行数据传递,目前支持 Jet-Stream,后继计划支持 Rocket-MQ等
#### (三)基本概念
在使用Cloudflow进行开发时,需要理清楚如下基本概念:
1. **APP:** 应用主体,表示应用本身,是"服务"的集合。
2. **Session:** 会话,当应用支持"常驻"时,一个Session表示一次用户任务执行(仅仅在APP Reuse模型下有多个session),Session内的Flow,Node等可资源相互可见,在同一命名空间中。一个APP下可以有多个Session。
3. **Flow:** 控制流,应用的数据流程逻辑。属于Session,一个Session下可以有多个Flow。
4. **Node:** 控制流的基本组成单元,一个Node代表一个进程。通常情况下,一个Node独立运行一个物理节点或者docker。
5. **Service:** 控制服务,用于实现rest ful API 或者 RPC,共享APP命名空间内数据。
各组件层级关系如下:
```
APP
+Session
+Flow
+Node
+Service
```
其中 Node 和 Service 是基础调度单元,可以进行相互访问。

图 2 cloudflow 应用
在CloudFlow应用中,节点和服务之间通过消息进行通讯。每个Node和Service可以有多个实例。如图2所示,Node构成的DAG图主要用于数据流处理,Service针对微服务应用。
#### (四)使用案例
"词频统计"是最常见的数据流程处理任务,统计文件/字符串中词语出现的次数。示例编码如下:
```
package main
import (
cf "cloudflow/sdk/golang/cloudflow"
)
func statistics(app *cf.App) string {
return "Hello word"
}
func ReadWords(self *cf.Node, count int) string {
// ...
}
func CountWords(self *cf.Node, txt string) map[string]float64 {
// ...
}
func ReduceWords(se *cf.Node, statistic []map[string]float64) map[string]float64 {
// ...
}
func main() {
var app = cf.NewApp("test-app")
var ses = app.CreateSession("session-1")
var flw = ses.CreateFlow("flow-1")
app.Reg(statistics, "record the process")
flw.Add(ReadWords, "read", 10_0000).Map(CountWords, "count", 10).Reduce(ReduceWords, "reduce", 10)
app.Run()
}
```
先定义数据产生,处理函数,如果通过CloudFlow接口构建Flow,然后执行。执行过程中CloudFlow会根据参数选择是单节点,多节点还是基于K8s环境执行。对于用户而言,感觉是在本地执行。
#### (五)快速开始
**1. 安装&运行**
```
# 安装 etcd
请参考 https://etcd.io/docs/v3.5/install/
# 安装 jet stream
请参考 https://nats.io/download/
# 启动前置服务
bash script/startsrvs.bash
# 执行用例
bash script/reinitall.bash; bash script/cloudflow.bash -d example gigasort
```
2. 验证 TBD
#### (六)开发文档
1. [全局数据格式定义](doc/01-runstate.md)
2. [消息中间件](doc/02-message.md)
3. [文件存储](doc/03-filestore.md)
4. KV存储
5. TBD
#### (七)开发任务
[任务清单](doc/99-todolist.md)