Golang框架实战-KisFlow流式计算框架(4)-数据流

aceld · · 454 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

连载中... [Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df) [Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a) [Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7) [Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc) --- ## 3.1 数据类型定义 KisFlow中可以传递任意类型数据作为Flow的数据源。而且KisFlow支持批量数据的流逝计算处理。 首先需要对KisFlow中内部支持的数据类型做一个基本的定义,我们将这部分的定义代码写在`kis-flow/common/`中的`data_type.go` 文件中。 > kis-flow/common/data_type.go ```go package common // KisRow 一行数据 type KisRow interface{} // KisRowArr 一次业务的批量数据 type KisRowArr []KisRow /* KisDataMap 当前Flow承载的全部数据, key : 数据所在的Function ID value: 对应的KisRow */ type KisDataMap map[string]KisRowArr ``` - `KisRow` :表示一行数据,可以是任意的数据类型,比如字符串,json字符串,一些序列化的二进制数据, protobuf,yaml字符串等,均可。 - `KisRowArr`:表示多行数据,也就是一次提交的批量数据,他是KisRow的数组集合。 - `KisDataMap` :表示当前Flow承载的全部数据。是一个`map[string]KisRowArr`类型,其中key为数据所在的Function ID,value为数据。 ## 3.2 KisFlow数据流处理 在KisFlow模块中,新增一些存放数据的成员,如下: > kis-flow/flow/kis_flow.go ```go // KisFlow 用于贯穿整条流式计算的上下文环境 type KisFlow struct { // 基础信息 Id string // Flow的分布式实例ID(用于KisFlow内部区分不同实例) Name string // Flow的可读名称 Conf *config.KisFlowConfig // Flow配置策略 // Function列表 Funcs map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionID FlowHead kis.Function // 当前Flow所拥有的Function列表表头 FlowTail kis.Function // 当前Flow所拥有的Function列表表尾 flock sync.RWMutex // 管理链表插入读写的锁 ThisFunction kis.Function // Flow当前正在执行的KisFunction对象 ThisFunctionId string // 当前执行到的Function ID (策略配置ID) PrevFunctionId string // 当前执行到的Function 上一层FunctionID(策略配置ID) // Function列表参数 funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam fplock sync.RWMutex // 管理funcParams的读写锁 // ++++++++ 数据 ++++++++++ buffer common.KisRowArr // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch data common.KisDataMap // 流式计算各个层级的数据源 inPut common.KisRowArr // 当前Function的计算输入数据 } ``` - `buffer`: 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch - `data`: 流式计算各个层级的数据源 - `inPut`: 当前Function的计算输入数据 后续章节会使用到这几个成员属性,这里先做为了解。 因为data是一个`map`类型,所以需要在`NewKisFlow() `中,对其进行初始化操作: > kis-flow/flow/kis_flow.go ```go // NewKisFlow 创建一个KisFlow. func NewKisFlow(conf *config.KisFlowConfig) kis.Flow { flow := new(KisFlow) // 实例Id flow.Id = id.KisID(common.KisIdTypeFlow) // 基础信息 flow.Name = conf.FlowName flow.Conf = conf // Function列表 flow.Funcs = make(map[string]kis.Function) flow.funcParams = make(map[string]config.FParam) // ++++++++ 数据data +++++++ flow.data = make(common.KisDataMap) return flow } ``` ## 3.2.2 业务提交数据接口 KisFlow的开发者在编写业务时,可以通过flow实例来进行提交业务源数据,所以我们需要给`Flow`抽象层新增一个提交数据的接口: > kis-flow/kis/flow.go ```go package kis import ( "context" "kis-flow/common" "kis-flow/config" ) type Flow interface { // Run 调度Flow,依次调度Flow中的Function并且执行 Run(ctx context.Context) error // Link 将Flow中的Function按照配置文件中的配置进行连接 Link(fConf *config.KisFuncConfig, fParams config.FParam) error // CommitRow ++++++ 提交Flow数据到即将执行的Function层 ++++ CommitRow(row interface{}) error } ``` 新增接口 `CommitRow(any interface{}) error`。 在`kis-flow/flow/kis_flow_data.go`中实现KisFlow的该接口。 > kis-flow/flow/kis_flow_data.go ```go func (flow *KisFlow) CommitRow(row interface{}) error { flow.buffer = append(flow.buffer, row) return nil } ``` `CommitRow()` 为提交Flow数据, 一行数据,如果是批量数据可以提交多次。 所有提交的数据都会暂存在`flow.buffer` 成员中,作为缓冲区。 ## 3.2.3 KisFlow内部数据提交 现在开发者可以通过`CommitRow()`将数据提交到buffer中,但是在KisFlow内部需要一个内部接口来将buffer提交到KisFlow的data中,作为之后当前Flow全部Function的上下文数据供使用。所以我们这里需要再提供两个接口。分别是首次提交数据`commitSrcData()`和中间层提交数据`commitCurData()`两个函数。 ### A. 首层数据提交 > kis-flow/flow/kis_flow_data.go ```go // commitSrcData 提交当前Flow的数据源数据, 表示首次提交当前Flow的原始数据源 // 将flow的临时数据buffer,提交到flow的data中,(data为各个Function层级的源数据备份) // 会清空之前所有的flow数据 func (flow *KisFlow) commitSrcData(ctx context.Context) error { // 制作批量数据batch dataCnt := len(flow.buffer) batch := make(common.KisRowArr, 0, dataCnt) for _, row := range flow.buffer { batch = append(batch, row) } // 清空之前所有数据 flow.clearData(flow.data) // 首次提交,记录flow原始数据 // 因为首次提交,所以PrevFunctionId为FirstVirtual 因为没有上一层Function flow.data[common.FunctionIdFirstVirtual] = batch // 清空缓冲Buf flow.buffer = flow.buffer[0:0] log.Logger().DebugFX(ctx, "====> After CommitSrcData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data) return nil } //ClearData 清空flow所有数据 func (flow *KisFlow) clearData(data common.KisDataMap) { for k := range data { delete(data, k) } } ``` 实际上`commitSrcData()`在整个的`Flow`运行周期只会执行一次,这个作为当前`Flow`的始祖源数据。 `commitSrcData()` 的最终目的是 将buffer的数据提交到`data[FunctionIdFirstVirtual]` 中。 这里要注意的是`FunctionIdFirstVirtual`是一个虚拟fid,作为所有`Function`的上游Function ID。 并且首次提交之后,`flow.buffer`的数据将被清空。 ### B. 中间层数据提交 > kis-flow/flow/kis_flow_data.go ```go //commitCurData 提交Flow当前执行Function的结果数据 func (flow *KisFlow) commitCurData(ctx context.Context) error { //判断本层计算是否有结果数据,如果没有则退出本次Flow Run循环 if len(flow.buffer) == 0 { return nil } // 制作批量数据batch batch := make(common.KisRowArr, 0, len(flow.buffer)) //如果strBuf为空,则没有添加任何数据 for _, row := range flow.buffer { batch = append(batch, row) } //将本层计算的缓冲数据提交到本层结果数据中 flow.data[flow.ThisFunctionId] = batch //清空缓冲Buf flow.buffer = flow.buffer[0:0] log.Logger().DebugFX(ctx, " ====> After commitCurData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data) return nil } ``` `commitCurData()`会在每次Function执行计算后,将当前Function的计算结果数据进行提交。 commitCurData() 会在Flow的流式计算过程中被执行多次。 `commitCurData()`的最终目的是将将buffer的数据提交到`data[flow.ThisFunctionId]` 中 。ThisFunctionId也就是当前正在执行Function,同时也是下一层将要执行的Function的上一层。 提交之后,`flow.buffer`的数据将被清空。 ## 3.2.4 获取正在执行Function的源数据 至于每层Function的源数据如何得到,我们可以通过`getCurData()`方法得到。 通过`PrevFunctionId`进行索引,因为获取当前Function的源数据,就是上一层Function的结果数据,所以我们通过`PrevFunctionId`来得到上一层Function的Id,从data[`PrevFunctionId`] 中可以得到数据源。 > kis-flow/flow/kis_flow_data.go ```go // getCurData 获取flow当前Function层级的输入数据 func (flow *KisFlow) getCurData() (common.KisRowArr, error) { if flow.PrevFunctionId == "" { return nil, errors.New(fmt.Sprintf("flow.PrevFunctionId is not set")) } if _, ok := flow.data[flow.PrevFunctionId]; !ok { return nil, errors.New(fmt.Sprintf("[%s] is not in flow.data", flow.PrevFunctionId)) } return flow.data[flow.PrevFunctionId], nil } ``` ## 3.2.5 数据流链式调度处理 下面我们就要在`flow.Run()`方法中,来加入数据流的处理动作。 > kis-flow/flow/kis_flow.go ```go // Run 启动KisFlow的流式计算, 从起始Function开始执行流 func (flow *KisFlow) Run(ctx context.Context) error { var fn kis.Function fn = flow.FlowHead if flow.Conf.Status == int(common.FlowDisable) { //flow被配置关闭 return nil } // ========= 数据流 新增 =========== // 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function flow.PrevFunctionId = common.FunctionIdFirstVirtual // 提交数据流原始数据 if err := flow.commitSrcData(ctx); err != nil { return err } // ========= 数据流 新增 =========== //流式链式调用 for fn != nil { // ========= 数据流 新增 =========== // flow记录当前执行到的Function 标记 fid := fn.GetId() flow.ThisFunction = fn flow.ThisFunctionId = fid // 得到当前Function要处理与的源数据 if inputData, err := flow.getCurData(); err != nil { log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error()) return err } else { flow.inPut = inputData } // ========= 数据流 新增 =========== if err := fn.Call(ctx, flow); err != nil { //Error return err } else { //Success // ========= 数据流 新增 =========== if err := flow.commitCurData(ctx); err != nil { return err } // 更新上一层FuncitonId游标 flow.PrevFunctionId = flow.ThisFunctionId // ========= 数据流 新增 =========== fn = fn.Next() } } return nil } ``` - 在run() 刚执行的时候,对PrevFunctionId 进行初始化,设置为 `FunctionIdFirstVirtual`。 - 在run() 刚执行的时候,执行`commitSrcData()`将业务赋值的的buffer数据提交到data[`FunctionIdFirstVirtual`]中。 - 进入循环,执行每个Function的时候,`getCurData()`获取到当前Function的源数据,并且放在`flow.inPut` 成员中。 - 进入循环,更正`ThisFunctionId` 游标为当前Function ID。 - 进入循环,每个Funciton执行完毕后,将Function产生的结果数据通过`commitCurData()`进行提交,并且改变`PrevFunctionId`为当前FunctionID, 进入下一层。 很显然,我们还需要让`Flow`给开发者提供一个获取Input数据的接口。 > kis-flow/kis/flow.go ```go package kis import ( "context" "kis-flow/common" "kis-flow/config" ) type Flow interface { // Run 调度Flow,依次调度Flow中的Function并且执行 Run(ctx context.Context) error // Link 将Flow中的Function按照配置文件中的配置进行连接 Link(fConf *config.KisFuncConfig, fParams config.FParam) error // CommitRow 提交Flow数据到即将执行的Function层 CommitRow(row interface{}) error // ++++++++++++++++++++++ // Input 得到flow当前执行Function的输入源数据 Input() common.KisRowArr } ``` 实现如下: > kis-flow/flow/kis_flow_data.go ```go // Input 得到flow当前执行Function的输入源数据 func (flow *KisFlow) Input() common.KisRowArr { return flow.inPut } ``` ## 3.3 KisFunction的数据流处理 由于我们的Function调度模块还目前还没有实现,所以有关Function在执行`Call()`方法的时候,只能暂时将业务计算的逻辑写死在KisFlow框架中。 在下一章节,我们会将这部分的计算逻辑开放给开发者进行注册自己的业务。 现在Flow已经将数据传递给了每层的Function,那么在Function中我们下面来简单模拟一下业务的基础计算逻辑。 我们暂时修改`KisFunctionC`和 `KisFunctionE` 两个模块的`Call()`代码. 假设KisFunctionC 是 KisFunctionE的上层。 > kis-flow/function/kis_function_c.go ```go type KisFunctionC struct { BaseFunction } func (f *KisFunctionC) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionC, flow = %+v\n", flow) //TODO 调用具体的Function执行方法 //处理业务数据 for i, row := range flow.Input() { fmt.Printf("In KisFunctionC, row = %+v\n", row) // 提交本层计算结果数据 _ = flow.CommitRow("Data From KisFunctionC, index " + " " + fmt.Sprintf("%d", i)) } return nil } ``` > kis-flow/function/kis_function_e.go ```go type KisFunctionE struct { BaseFunction } func (f *KisFunctionE) Call(ctx context.Context, flow kis.Flow) error { log.Logger().InfoF("KisFunctionE, flow = %+v\n", flow) // TODO 调用具体的Function执行方法 //处理业务数据 for _, row := range flow.Input() { fmt.Printf("In KisFunctionE, row = %+v\n", row) } return nil } ``` ## 3.4 数据流单元测试 下面我们模拟一个简单的计算业务,测试下每层的Function是否可以得到数据,并且将计算结果传递给下一层。 > kis-flow/test/kis_flow_test.go ```go func TestNewKisFlowData(t *testing.T) { ctx := context.Background() // 1. 创建2个KisFunction配置实例 source1 := config.KisSource{ Name: "公众号抖音商城户订单数据", Must: []string{"order_id", "user_id"}, } source2 := config.KisSource{ Name: "用户订单错误率", Must: []string{"order_id", "user_id"}, } myFuncConfig1 := config.NewFuncConfig("funcName1", common.C, &source1, nil) if myFuncConfig1 == nil { panic("myFuncConfig1 is nil") } myFuncConfig2 := config.NewFuncConfig("funcName2", common.E, &source2, nil) if myFuncConfig2 == nil { panic("myFuncConfig2 is nil") } // 2. 创建一个 KisFlow 配置实例 myFlowConfig1 := config.NewFlowConfig("flowName1", common.FlowEnable) // 3. 创建一个KisFlow对象 flow1 := flow.NewKisFlow(myFlowConfig1) // 4. 拼接Functioin 到 Flow 上 if err := flow1.Link(myFuncConfig1, nil); err != nil { panic(err) } if err := flow1.Link(myFuncConfig2, nil); err != nil { panic(err) } // 5. 提交原始数据 _ = flow1.CommitRow("This is Data1 from Test") _ = flow1.CommitRow("This is Data2 from Test") _ = flow1.CommitRow("This is Data3 from Test") // 6. 执行flow1 if err := flow1.Run(ctx); err != nil { panic(err) } } ``` 这里我们通过`flow.CommitRow()`提交了3行数据,每行数据是一个字符串,当然数据格式可以任意,数据类型也可以任意,只需要在各层的Function业务自身确定拉齐好即可。 cd到`kis-flow/test/`下执行命令: ```bash go test -test.v -test.paniconexit0 -test.run TestNewKisFlowData ``` 结果如下: ```bash === RUN TestNewKisFlowData context.Background ====> After CommitSrcData, flow_name = flowName1, flow_id = flow-8b607ae6d55048408dae1f4e8f6dca6f All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] KisFunctionC, flow = &{Id:flow-8b607ae6d55048408dae1f4e8f6dca6f Name:flowName1 Conf:0xc00015a780 Funcs:map[func-2182fa1a049f4c1c9eeb641f5292f09f:0xc0001381e0 func-f3e7d7868f44448fb532935768ea2ca1:0xc000138190] FlowHead:0xc000138190 FlowTail:0xc0001381e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000138190 ThisFunctionId:func-f3e7d7868f44448fb532935768ea2ca1 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-2182fa1a049f4c1c9eeb641f5292f09f:map[] func-f3e7d7868f44448fb532935768ea2ca1:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]} In KisFunctionC, row = This is Data1 from Test In KisFunctionC, row = This is Data2 from Test In KisFunctionC, row = This is Data3 from Test context.Background ====> After commitCurData, flow_name = flowName1, flow_id = flow-8b607ae6d55048408dae1f4e8f6dca6f All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f3e7d7868f44448fb532935768ea2ca1:[Data From KisFunctionC, index 0 Data From KisFunctionC, index 1 Data From KisFunctionC, index 2]] KisFunctionE, flow = &{Id:flow-8b607ae6d55048408dae1f4e8f6dca6f Name:flowName1 Conf:0xc00015a780 Funcs:map[func-2182fa1a049f4c1c9eeb641f5292f09f:0xc0001381e0 func-f3e7d7868f44448fb532935768ea2ca1:0xc000138190] FlowHead:0xc000138190 FlowTail:0xc0001381e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0001381e0 ThisFunctionId:func-2182fa1a049f4c1c9eeb641f5292f09f PrevFunctionId:func-f3e7d7868f44448fb532935768ea2ca1 funcParams:map[func-2182fa1a049f4c1c9eeb641f5292f09f:map[] func-f3e7d7868f44448fb532935768ea2ca1:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-f3e7d7868f44448fb532935768ea2ca1:[Data From KisFunctionC, index 0 Data From KisFunctionC, index 1 Data From KisFunctionC, index 2]] inPut:[Data From KisFunctionC, index 0 Data From KisFunctionC, index 1 Data From KisFunctionC, index 2]} In KisFunctionE, row = Data From KisFunctionC, index 0 In KisFunctionE, row = Data From KisFunctionC, index 1 In KisFunctionE, row = Data From KisFunctionC, index 2 --- PASS: TestNewKisFlowData (0.00s) PASS ok kis-flow/test 0.636s ``` 经过日志的详细校验,结果是符合我们预期的。 好了,目前数据流的最简单版本已经实现了,下一章我们将Function的业务逻辑开放给开发者,而不是写在KisFlow框架中. ## 3.5 【V0.2】源代码 https://github.com/aceld/kis-flow/releases/tag/v0.2 --- 作者:刘丹冰Aceld github: [https://github.com/aceld](https://github.com/aceld) KisFlow开源项目地址:[https://github.com/aceld/kis-flow](https://github.com/aceld/kis-flow) --- 连载中... [Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df) [Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a) [Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7) [Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc)

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

454 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传