Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action

aceld · · 756 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

[Golang框架实战-KisFlow流式计算框架专栏](https://www.jianshu.com/c/c72f100773a6) --- [Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df) [Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a) [Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7) [Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc) [Golang框架实战-KisFlow流式计算框架(5)-Function调度](https://www.jianshu.com/p/6f54966c9f1e) [Golang框架实战-KisFlow流式计算框架(6)-Connector](https://www.jianshu.com/p/159b14219559) [Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出](https://www.jianshu.com/p/8870598beed0) --- ## 7.1 Action Abort(终止流程) KisFlow Action 是指在执行Function的时候,同时可以控制Flow的调度逻辑,KisFlow提供一些Action动作让开发者做选择,本节先介绍最简单的Action动作,Abort(终止当前Flow)。 我们最终的Abort的使用形式如下: ```go func AbortFuncHandler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call AbortFuncHandler ----") for _, row := range flow.Input() { str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) fmt.Println(str) } return flow.Next(kis.ActionAbort) // 终止Flow } ``` `AbortFuncHandler()`是一个Function 的业务回调方法,是由开发者自定义的,在执行完当前Funciton之后,正常的情况是继续执行下一个Funciton,但是如果传递`flow.Next(kis.ActionAbort)` 作为当前Funciton的返回值,那么则不会执行到下一个Funciton,而是直接终止当前Flow的调度计算流。 下面我们先来实现KisFlow的 Abort Action动作模式。 ## 7.1.1 Abort接口定义 首先,先对Flow的Abort()接口做定义。 > kis-flow/kis/flow.go ```go type Flow interface { // Run 调度Flow,依次调度Flow中的Function并且执行 Run(ctx context.Context) error // Link 将Flow中的Function按照配置文件中的配置进行连接 Link(fConf *config.KisFuncConfig, fParams config.FParam) error // CommitRow 提交Flow数据到即将执行的Function层 CommitRow(row interface{}) error // Input 得到flow当前执行Function的输入源数据 Input() common.KisRowArr // GetName 得到Flow的名称 GetName() string // GetThisFunction 得到当前正在执行的Function GetThisFunction() Function // GetThisFuncConf 得到当前正在执行的Function的配置 GetThisFuncConf() *config.KisFuncConfig // GetConnector 得到当前正在执行的Function的Connector GetConnector() (Connector, error) // GetConnConf 得到当前正在执行的Function的Connector的配置 GetConnConf() (*config.KisConnConfig, error) // GetConfig 得到当前Flow的配置 GetConfig() *config.KisFlowConfig // GetFuncConfigByName 得到当前Flow的配置 GetFuncConfigByName(funcName string) *config.KisFuncConfig // --- KisFlow Action --- // Next 当前Flow执行到的Function进入下一层Function所携带的Action动作 Next(acts ...ActionFunc) error } ``` 这里面提供一个接口`Next(acts ...ActionFunc) error`,其中参数是一个可变参数,类型为`ActionFunc`,这个是我们给KisFlow定义的Action相关的方法。有关Action的定义模块如下: ## 7.1.2 Action模块定义 Action是用来在Flow执行过程中,通过Function来控制Flow执行特殊动作的行为配置模块,包括上面的Abort行为,Abort也属于其中一个Action。Action的模块定义如下,在`kis-flow/kis/`下创建`action.go`文件,实现: > kis-flow/kis/action.go ```go package kis // Action KisFlow执行流程Actions type Action struct { // Abort 终止Flow的执行 Abort bool } // ActionFunc KisFlow Functional Option 类型 type ActionFunc func(ops *Action) // LoadActions 加载Actions,依次执行ActionFunc操作函数 func LoadActions(acts []ActionFunc) Action { action := Action{} if acts == nil { return action } for _, act := range acts { act(&action) } return action } // ActionAbort 终止Flow的执行 func ActionAbort(action *Action) { action.Abort = true } ``` 首先,现在Action只有Abort一个行为,我们用bool类型来表示Abort是否为终止,true则为需要终止flow的调用。 其次,`type ActionFunc func(ops *Action)`这个函数原型为一个函数类型,函数的形参是传递进来一个`Action{}` 指针,而 `func ActionAbort(action *Action)`则是它的一个具体的函数,ActionAbort()的方法的目的就是将Action的Abort成员设置为true。 最后看`func LoadActions(acts []ActionFunc) Action`方法。这个形参是一个`ActionFunc`函数数组,`LoadActions()`则是创建一个新的Action{} ,然后依次执行`[]ActionFunc`的函数来改变Aciton{}的成员,最终将新的Action{}返回上层。 ## 7.1.3 Next方法实现 接下来,我们需要给KisFlow模块实现这个接口,首先需要给KisFlow添加一个Action{}成员,表示每次执行完Function之后所携带的动作。 > kis-flow/flow/kis_flow.go ```go // KisFlow 用于贯穿整条流式计算的上下文环境 type KisFlow struct { // 基础信息 Id string // Flow的分布式实例ID(用于KisFlow内部区分不同实例) Name string // Flow的可读名称 Conf *config.KisFlowConfig // Flow配置策略 // Function列表 Funcs map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName FlowHead kis.Function // 当前Flow所拥有的Function列表表头 FlowTail kis.Function // 当前Flow所拥有的Function列表表尾 flock sync.RWMutex // 管理链表插入读写的锁 ThisFunction kis.Function // Flow当前正在执行的KisFunction对象 ThisFunctionId string // 当前执行到的Function ID PrevFunctionId string // 当前执行到的Function 上一层FunctionID // Function列表参数 funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam fplock sync.RWMutex // 管理funcParams的读写锁 // 数据 buffer common.KisRowArr // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch data common.KisDataMap // 流式计算各个层级的数据源 inPut common.KisRowArr // 当前Function的计算输入数据 // +++++++++++++++++++++ // KisFlow Action action kis.Action // 当前Flow所携带的Action动作 } ``` 然后实现`KisFlow`的Next()接口,如下: > kis-flow/flow/kis_flow.go ```go // Next 当前Flow执行到的Function进入下一层Function所携带的Action动作 func (flow *KisFlow) Next(acts ...kis.ActionFunc) error { // 加载Function FaaS 传递的 Action动作 flow.action = kis.LoadActions(acts) return nil } ``` 每次开发者在执行Function的自定义业务回调中,最后会调用`flow.Next()`来传递Action,所以` Next(acts ...kis.ActionFunc) error`就是讲传递的Action属性加载进来并且在`flow.action`保存。 ## 7.1.4 Abort控制Flow流程 现在有个Abort来控制Flow流,那么我们需要给KisFlow添加一个成员来表示这个状态 > kis-flow/flow/kis_flow.go ```go // KisFlow 用于贯穿整条流式计算的上下文环境 type KisFlow struct { // 基础信息 Id string // Flow的分布式实例ID(用于KisFlow内部区分不同实例) Name string // Flow的可读名称 Conf *config.KisFlowConfig // Flow配置策略 // Function列表 Funcs map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName FlowHead kis.Function // 当前Flow所拥有的Function列表表头 FlowTail kis.Function // 当前Flow所拥有的Function列表表尾 flock sync.RWMutex // 管理链表插入读写的锁 ThisFunction kis.Function // Flow当前正在执行的KisFunction对象 ThisFunctionId string // 当前执行到的Function ID PrevFunctionId string // 当前执行到的Function 上一层FunctionID // Function列表参数 funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam fplock sync.RWMutex // 管理funcParams的读写锁 // 数据 buffer common.KisRowArr // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch data common.KisDataMap // 流式计算各个层级的数据源 inPut common.KisRowArr // 当前Function的计算输入数据 action kis.Action // 当前Flow所携带的Action动作 // +++++++++ abort bool // 是否中断Flow } ``` 在每次执行到flow.Run()方法时,需要重置abort变量,并且在循环调度的时候加上对flow.abort的判断。 > kis-flow/flow/kis_flow.go ```go // Run 启动KisFlow的流式计算, 从起始Function开始执行流 func (flow *KisFlow) Run(ctx context.Context) error { // +++++++++ // 重置 abort flow.abort = false // 每次进入调度,要重置abort状态 // ... ... // ... ... //流式链式调用 for fn != nil && flow.abort != true { // ++++ 如果设置abort则不进入下次循环调度 // ... ... // ... ... if err := fn.Call(ctx, flow); err != nil { //Error return err } else { //Success // ... ... fn = fn.Next() } } return nil ``` 这样在每次Call()调度到Funciton的自定方法时,如果`return flow.Next(ActionAbort)`就会对flow的Action状态进行改变,从而就控制了flow的流程终止。最后就是将Action的Abort状态传递给KisFlow的Abort状态。 既然有了Abort状态,那么我们可以通过给Flow执行过程中添加一个设定,如果当前的Function没有提交本层的结果数据,也就是flow.buffer为空,那么将不会进入下一层,在本层直接结束退出Flow的Run()调用。 > kis-flow/flow/kis_flow_data.go ```go //commitCurData 提交Flow当前执行Function的结果数据 func (flow *KisFlow) commitCurData(ctx context.Context) error { // 判断本层计算是否有结果数据,如果没有则退出本次Flow Run循环 if len(flow.buffer) == 0 { // ++++++++++++ flow.abort = true return nil } // ... ... // ... ... return nil ``` ## 7.1.5 Action捕获及处理 接下来来实现一个专门处理Action动作的方法,定义在`kis-flow/flow/kis_flow_action.go`文件中,如下: > kis-flow/flow/kis_flow_action.go ```go package flow import ( "context" "errors" "fmt" "kis-flow/kis" ) // dealAction 处理Action,决定接下来Flow的流程走向 func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) { if err := flow.commitCurData(ctx); err != nil { return nil, err } // 更新上一层 FuncitonId 游标 flow.PrevFunctionId = flow.ThisFunctionId fn = fn.Next() // Abort Action 强制终止 if flow.action.Abort { flow.abort = true } // 清空Action flow.action = kis.Action{} return fn, nil } ``` 然后稍微改进下`KisFlow的Run()` 流程,将`dealAction() `方法嵌入进去。 > kis-flow/flow/kis_flow.go ```go // Run 启动KisFlow的流式计算, 从起始Function开始执行流 func (flow *KisFlow) Run(ctx context.Context) error { var fn kis.Function fn = flow.FlowHead flow.abort = false if flow.Conf.Status == int(common.FlowDisable) { //flow被配置关闭 return nil } // 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function flow.PrevFunctionId = common.FunctionIdFirstVirtual // 提交数据流原始数据 if err := flow.commitSrcData(ctx); err != nil { return err } //流式链式调用 for fn != nil && flow.abort == false { // flow记录当前执行到的Function 标记 fid := fn.GetId() flow.ThisFunction = fn flow.ThisFunctionId = fid // 得到当前Function要处理与的源数据 if inputData, err := flow.getCurData(); err != nil { log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error()) return err } else { flow.inPut = inputData } if err := fn.Call(ctx, flow); err != nil { //Error return err } else { //Success // +++++++++++++++++++++++++++++++ fn, err = flow.dealAction(ctx, fn) if err != nil { return err } // +++++++++++++++++++++++++++++++ } } return nil } ``` ## 7.1.6 Action Abort单元测试 首先我们新建一个Function业务,配置文件如下: > kis-flow/test/load_conf/func/func-AbortFunc.yml ```yaml kistype: func fname: abortFunc fmode: Calculate source: name: 用户订单错误率 must: - order_id - user_id ``` 当前的Funciton的名称为`abortFunc`,然后实现其FaaS函数,如下: > kis-flow/test/faas/faas_abort.go ```go package faas import ( "context" "fmt" "kis-flow/kis" ) // type FaaS func(context.Context, Flow) error func AbortFuncHandler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call AbortFuncHandler ----") for _, row := range flow.Input() { str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) fmt.Println(str) } return flow.Next(kis.ActionAbort) } ``` 这个Function就会最终调用`flow.Next(kis.ActionAbort)`来终止Flow,接下来我们新建一个Flow,将上面的Function作为中间的Function,看检测是否会终止之后的Function被执行。 新建的flow的配置如下: > kis-flow/test/load_conf/flow/flow-FlowName2.yml ```yaml kistype: flow status: 1 flow_name: flowName2 flows: - fname: funcName1 - fname: abortFunc - fname: funcName3 ``` 当前Flow的名称为`flowName2`,当前的Flow有三个Function,其中funcNam1 和 funcName2我们之前都已经定义好了,abortFunc是我们新建的,并且在中间。如果abort功能满足,则funcName3将不会被调度。 接下来实现单元测试用例。 > kis-flow/test/kis_action_test.go ```go package test import ( "context" "kis-flow/common" "kis-flow/file" "kis-flow/kis" "kis-flow/test/caas" "kis-flow/test/faas" "testing" ) func TestActionAbort(t *testing.T) { ctx := context.Background() // 0. 注册Function 回调业务 kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // 添加abortFunc 业务 kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) // 0. 注册ConnectorInit 和 Connector 回调业务 kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) // 1. 加载配置文件并构建Flow if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil { panic(err) } // 2. 获取Flow flow1 := kis.Pool().GetFlow("flowName2") // 3. 提交原始数据 _ = flow1.CommitRow("This is Data1 from Test") _ = flow1.CommitRow("This is Data2 from Test") _ = flow1.CommitRow("This is Data3 from Test") // 4. 执行flow1 if err := flow1.Run(ctx); err != nil { panic(err) } } ``` 其中下面的代码是初始化注册的代码,大家也可以写在其他文件中,这样就不需要每次都携带这部分代码了。 ```go // 0. 注册Function 回调业务 kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // 添加abortFunc 业务 kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) // 0. 注册ConnectorInit 和 Connector 回调业务 kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) ``` cd 到`kis-flow/test/`目录下执行如下指令: ```bash go test -test.v -test.paniconexit0 -test.run TestActionAbort ``` 结果如下: ```bash === RUN TestActionAbort Add KisPool FuncName=funcName1 Add KisPool FuncName=abortFunc Add KisPool FuncName=funcName3 Add KisPool CaaSInit CName=ConnName1 Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save ===> Call Connector InitDemo1 &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]} Add FlowRouter FlowName=flowName1 Add FlowRouter FlowName=flowName2 context.Background ====> After CommitSrcData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513 All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] KisFunctionV, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094cc0 ThisFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] jumpFunc:NoJump abort:false nextOpt:<nil>} ---> Call funcName1Handler ---- In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data1 from Test In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data2 from Test In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data3 from Test context.Background ====> After commitCurData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513 All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] KisFunctionC, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094d20 ThisFunctionId:func-7f5af1521fd64d08839d5bdd26de5254 PrevFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] jumpFunc:NoJump abort:false nextOpt:<nil>} ---> Call AbortFuncHandler ---- In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 0 In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 1 In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 2 --- PASS: TestActionAbort (0.00s) PASS ok kis-flow/test 0.487s ``` 通过结果可以看到,在执行完 AbortFuncHandler 后,没有继续执行,而是退出了Flow的Run()方法。 ## 7.2 Action DataReuse(复用上层数据) Action DataReuse 为服用上层数据,含义为,当前的执行Function提交到下一层的结果将不被使用,而是直接将当前Function的上一层结果数据,复用到下一层,作为下一层Funciton的数据源。 下面来实现Action DataReuse功能。 ## 7.2.1 DataReuse Action添加 在Action中添加DataReuse成员,是一个bool类型。 > kis-flow/kis/action.go ```go // Action KisFlow执行流程Actions type Action struct { // +++++++++++++ // DataReuse 是否复用上层Function数据 DataReuse bool // Abort 终止Flow的执行 Abort bool } // ActionDataReuse Next复用上层Function数据Option func ActionDataReuse(act *Action) { act.DataReuse = true } ``` 然后提供一个ActionFunc,命名为:`ActionDataReuse`,实现中为改变DataReuse状态为true。 ## 7.2.2 复用上层数据到下层 这里需要再实现一个提交数据的方法,为如何提交复用数据,具体逻辑如下: > kis-flow/flow/kis_flow_data.go ```go // commitReuseData func (flow *KisFlow) commitReuseData(ctx context.Context) error { // 判断上层是否有结果数据, 如果没有则退出本次Flow Run循环 if len(flow.data[flow.PrevFunctionId]) == 0 { flow.abort = true return nil } // 本层结果数据等于上层结果数据(复用上层结果数据到本层) flow.data[flow.ThisFunctionId] = flow.data[flow.PrevFunctionId] // 清空缓冲Buf (如果是ReuseData选项,那么提交的全部数据,都将不会携带到下一层) flow.buffer = flow.buffer[0:0] log.Logger().DebugFX(ctx, " ====> After commitReuseData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data) return nil } ``` 逻辑很简单,与`commitCurData()`不同的是,`commitCurData()`为将flow.buffer的数据提交到`flow.data[flow.ThisFunctionId]`中,而`commitReuseData()`为将上一层的结果数据提交到`flow.data[flow.ThisFunctionId]`中。 ## 7.2.3 处理DataReuse Action动作 然后在`dealAction()`方法中添加对Action DataReuse的动作捕获,如下: > kis-flow/flow/kis_flow_action.go ```go // dealAction 处理Action,决定接下来Flow的流程走向 func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) { // ++++++++++++++++ // DataReuse Action if flow.action.DataReuse { if err := flow.commitReuseData(ctx); err != nil { return nil, err } } else { if err := flow.commitCurData(ctx); err != nil { return nil, err } } // 更新上一层 FuncitonId 游标 flow.PrevFunctionId = flow.ThisFunctionId fn = fn.Next() // Abort Action 强制终止 if flow.action.Abort { flow.abort = true } // 清空Action flow.action = kis.Action{} return fn, nil } ``` ## 7.2.4 单元测试 下面来针对DataReuse做单元测试,首先创建一个名字为dataReuseFunc 的Funciton,先创建配置文件。 > kis-flow/test/load_conf/func/func-dataReuseFunc.yml ```yaml kistype: func fname: dataReuseFunc fmode: Calculate source: name: 用户订单错误率 must: - order_id - user_id ``` 同时新建一个Flow流,名称为flowName3,配置如下: > kis-flow/test/load_conf/flow/func-FlowName3.yml ```yaml kistype: flow status: 1 flow_name: flowName3 flows: - fname: funcName1 - fname: dataReuseFunc - fname: funcName3 ``` 针对dataReuseFunc的Function的逻辑业务,如下: > kis-flow/test/faas/faas_data_reuse.go ```go package faas import ( "context" "fmt" "kis-flow/kis" ) // type FaaS func(context.Context, Flow) error func DataReuseFuncHandler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call DataReuseFuncHandler ----") for index, row := range flow.Input() { str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) fmt.Println(str) // 计算结果数据 resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index) // 提交结果数据 _ = flow.CommitRow(resultStr) } return flow.Next(kis.ActionDataReuse) } ``` 最后实现测试用例,如下: > kis-flow/test/kis_action_test.go ```go func TestActionDataReuse(t *testing.T) { ctx := context.Background() // 0. 注册Function 回调业务 kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) kis.Pool().FaaS("dataReuseFunc", faas.DataReuseFuncHandler) // 添加dataReuesFunc 业务 kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) // 0. 注册ConnectorInit 和 Connector 回调业务 kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) // 1. 加载配置文件并构建Flow if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { panic(err) } // 2. 获取Flow flow1 := kis.Pool().GetFlow("flowName3") // 3. 提交原始数据 _ = flow1.CommitRow("This is Data1 from Test") _ = flow1.CommitRow("This is Data2 from Test") _ = flow1.CommitRow("This is Data3 from Test") // 4. 执行flow1 if err := flow1.Run(ctx); err != nil { panic(err) } } ``` cd 到 `kis-flow/test/`下执行: ```bash go test -test.v -test.paniconexit0 -test.run TestActionDataReuse ``` 结果是: ```bash === RUN TestActionDataReuse Add KisPool FuncName=funcName1 Add KisPool FuncName=dataReuseFunc Add KisPool FuncName=funcName3 Add KisPool CaaSInit CName=ConnName1 Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save ===> Call Connector InitDemo1 &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]} Add FlowRouter FlowName=flowName5 ===> Call Connector InitDemo1 &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]} Add FlowRouter FlowName=flowName1 Add FlowRouter FlowName=flowName2 Add FlowRouter FlowName=flowName3 Add FlowRouter FlowName=flowName4 context.Background ====> After CommitSrcData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] KisFunctionV, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000955c0 ThisFunctionId:func-7886178381634f05b302841141382e59 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}} ---> Call funcName1Handler ---- In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data1 from Test In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data2 from Test In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data3 from Test context.Background ====> After commitCurData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095620 ThisFunctionId:func-ef567879d0dd45b287ed709e549e9d32 PrevFunctionId:func-7886178381634f05b302841141382e59 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}} ---> Call DataReuseFuncHandler ---- In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 0 In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 1 In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 2 context.Background ====> After commitReuseData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095680 ThisFunctionId:func-cfe66e39aba54ff989d6764cc4edda20 PrevFunctionId:func-ef567879d0dd45b287ed709e549e9d32 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}} ---> Call funcName3Handler ---- In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 0 In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 1 In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 2 --- PASS: TestActionDataReuse (0.02s) PASS ok kis-flow/test 0.523s ``` 通过结果可以看出,在最后的`funcName3Handler`中得到的数据是funcName1传递下来的数据,中间的ReuseFunction将上层的数据复用到了下一层,变成了FuncName3的数据源。 ## 7.3 Action ForceEntryNext(强制进入下一层) ## 7.3.1 ForceEntryNext Action属性 目前的KisFlow为,如果当前的Function没有commit数据(本层的结果数据),那么当前的Function结束后,将不会继续调度下一层Function。 但是有的Flow的流式计算可能需要继续向下执行,哪怕没有数据,所以这里可以通过`ForceEntryNext`这个动作来触发。 首先我们在Action中新增一个`ForceEntryNext` 属性。 > kis-flow/kis/action.go ```go // Action KisFlow执行流程Actions type Action struct { // DataReuse 是否复用上层Function数据 DataReuse bool // 默认Next()为如果本层Function计算结果为0条数据,之后Function将不会继续执行 // ForceEntryNext 为忽略上述默认规则,没有数据强制进入下一层Function ForceEntryNext bool // Abort 终止Flow的执行 Abort bool } // ActionForceEntryNext 强制进入下一层 func ActionForceEntryNext(act *Action) { act.ForceEntryNext = true } ``` 且提供配置函数`ActionForceEntryNext()`来修改这个属性状态。 ## 7.3.2 捕获Action 在捕获Action的`dealAction()`方法中,加上对这个状态的判断,如果被设置,则需要将flow.Abort状态改成false,flow将继续执行下一层。 > kis-flow/flow/kis_flow_action.go ```go // dealAction 处理Action,决定接下来Flow的流程走向 func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) { // DataReuse Action if flow.action.DataReuse { if err := flow.commitReuseData(ctx); err != nil { return nil, err } } else { if err := flow.commitCurData(ctx); err != nil { return nil, err } } // ++++++++++++++++++++++++++++ // ForceEntryNext Action if flow.action.ForceEntryNext { if err := flow.commitVoidData(ctx); err != nil { return nil, err } flow.abort = false } // 更新上一层 FuncitonId 游标 flow.PrevFunctionId = flow.ThisFunctionId fn = fn.Next() // Abort Action 强制终止 if flow.action.Abort { flow.abort = true } // 清空Action flow.action = kis.Action{} return fn, nil } ``` 这里有一个细节,我们需要调用一个方法`commitVoidData()`,即提交空数据,原因是,如果不提交空数据,那么flow.buffer依然为空,那么不会执行数据的提交动作,那么会导致`flow.data[flow.ThisFunctionId]`这条不存在,也就是key不存在,那么再执行到`flow.getCurData()`会出现找不到key的异常而panic。所以这里需要提交一个空的数据到`flow.data[flow.ThisFunctionId]`中。 具体的`commitVoidData()`实现如下: > kis-flow/flow/kis_flow_data.go ```go func (flow *KisFlow) commitVoidData(ctx context.Context) error { if len(flow.buffer) != 0 { return nil } // 制作空数据 batch := make(common.KisRowArr, 0) // 将本层计算的缓冲数据提交到本层结果数据中 flow.data[flow.ThisFunctionId] = batch log.Logger().DebugFX(ctx, " ====> After commitVoidData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data) return nil } ``` ## 7.3.3 单元测试,不设置ForceEntryNext 首先,创建一个noResultFunc的Function配置,且实现相关的回调业务函数。 > kis-flow/test/load_conf/func/func-NoResultFunc.yml ```yaml kistype: func fname: noResultFunc fmode: Calculate source: name: 用户订单错误率 must: - order_id - user_id ``` > kis-flow/test/faas/faas_no_result.go ```go package faas import ( "context" "fmt" "kis-flow/kis" ) // type FaaS func(context.Context, Flow) error func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call NoResultFuncHandler ----") for _, row := range flow.Input() { str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) fmt.Println(str) } return flow.Next() } ``` 这里面在Function的最后,只调用`flow.Next()` 不传递任何Action动作。 然后新建一个FlowName4,配置如下: > kis-flow/test/load_conf/flow-FlowName4.yml ```yaml kistype: flow status: 1 flow_name: flowName4 flows: - fname: funcName1 - fname: noResultFunc - fname: funcName3 ``` 最后我们编写单元测试用例代码,将noResultFunc放在中间的部分。 > kis-flow/test/kis_action_test.go ```go func TestActionForceEntry(t *testing.T) { ctx := context.Background() // 0. 注册Function 回调业务 kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) kis.Pool().FaaS("noResultFunc", faas.NoResultFuncHandler) // 添加noResultFunc 业务 kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) // 0. 注册ConnectorInit 和 Connector 回调业务 kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) // 1. 加载配置文件并构建Flow if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { panic(err) } // 2. 获取Flow flow1 := kis.Pool().GetFlow("flowName4") // 3. 提交原始数据 _ = flow1.CommitRow("This is Data1 from Test") _ = flow1.CommitRow("This is Data2 from Test") _ = flow1.CommitRow("This is Data3 from Test") // 4. 执行flow1 if err := flow1.Run(ctx); err != nil { panic(err) } } ``` cd到`kis-flow/test/` 下执行: ```bash go test -test.v -test.paniconexit0 -test.run TestActionForceEntry ``` 结果如下: ```bash === RUN TestActionForceEntry Add KisPool FuncName=funcName1 Add KisPool FuncName=noResultFunc Add KisPool FuncName=funcName3 Add KisPool CaaSInit CName=ConnName1 Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save ===> Call Connector InitDemo1 &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]} Add FlowRouter FlowName=flowName1 Add FlowRouter FlowName=flowName2 Add FlowRouter FlowName=flowName3 Add FlowRouter FlowName=flowName4 ===> Call Connector InitDemo1 &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]} Add FlowRouter FlowName=flowName5 context.Background ====> After CommitSrcData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9 All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] KisFunctionV, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d560 ThisFunctionId:func-4d113d6a8e744d30a906db310f2d7818 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}} ---> Call funcName1Handler ---- In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data1 from Test In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data2 from Test In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data3 from Test context.Background ====> After commitCurData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9 All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] KisFunctionC, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d5c0 ThisFunctionId:func-47cb6f9ae464484aa779c18284035705 PrevFunctionId:func-4d113d6a8e744d30a906db310f2d7818 funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}} ---> Call NoResultFuncHandler ---- In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 0 In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 1 In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 2 --- PASS: TestActionForceEntry (0.02s) PASS ok kis-flow/test 0.958s ``` 因为noResultFunc不会生成任何的结果数据,所以下一层Function将不会被执行,最后只执行到 ```bash ---> Call NoResultFuncHandler ---- ``` ## 7.3.4 单元测试,设置ForceEntryNext 下面我们将Action为ForceEntryNext加上,在`NoResultFuncHandler()` 中,加上`flow.Next(kis.ActionForceEntryNext)`,如下: > kis-flow/test/faas/faas_no_result.go ```go package faas import ( "context" "fmt" "kis-flow/kis" ) // type FaaS func(context.Context, Flow) error func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call NoResultFuncHandler ----") for _, row := range flow.Input() { str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) fmt.Println(str) } return flow.Next(kis.ActionForceEntryNext) } ``` cd到`kis-flow/test/` 下执行: ```bash go test -test.v -test.paniconexit0 -test.run TestActionForceEntry ``` 结果如下: ```bash === RUN TestActionForceEntry Add KisPool FuncName=funcName1 Add KisPool FuncName=noResultFunc Add KisPool FuncName=funcName3 Add KisPool CaaSInit CName=ConnName1 Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save ===> Call Connector InitDemo1 &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]} Add FlowRouter FlowName=flowName5 ===> Call Connector InitDemo1 &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]} Add FlowRouter FlowName=flowName1 Add FlowRouter FlowName=flowName2 Add FlowRouter FlowName=flowName3 Add FlowRouter FlowName=flowName4 context.Background ====> After CommitSrcData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] KisFunctionV, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-ecddaee7d7d447a9852d07088732f509 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}} ---> Call funcName1Handler ---- In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data1 from Test In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data2 from Test In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data3 from Test context.Background ====> After commitCurData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013740 ThisFunctionId:func-c9817c7993894919b8463dea1757544e PrevFunctionId:func-ecddaee7d7d447a9852d07088732f509 funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}} ---> Call NoResultFuncHandler ---- In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 0 In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 1 In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 2 context.Background ====> After commitVoidData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000137a0 ThisFunctionId:func-5729600ae6ea4d6f879eb5832c638e1a PrevFunctionId:func-c9817c7993894919b8463dea1757544e funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}} ---> Call funcName3Handler ---- --- PASS: TestActionForceEntry (0.01s) PASS ok kis-flow/test 0.348s ``` 会发现,Function第三层funcName3Handler 被执行到,但是没有任何的数据。 ## 7.4 Action JumpFunc(流程跳转) 接下来,来实现JumpFunc Action,JumpFunc是可以在当前Flow中任意跳转到指定的FuncName继续执行(前提是跳转的FuncName当当前Flow中存在) > 注意:JumpFunc容易出现无限循环流,所以在业务的设计要慎用。 ## 7.4.1 Action添加JumpFunc 首先在Action添加一个JumpFunc属性,注意,JunpFunc不是一个bool状态,而是一个string字符串,表示具体要跳转的FunctionName名称。 > kis-flow/kis/action.go ```go // Action KisFlow执行流程Actions type Action struct { // DataReuse 是否复用上层Function数据 DataReuse bool // 默认Next()为如果本层Function计算结果为0条数据,之后Function将不会继续执行 // ForceEntryNext 为忽略上述默认规则,没有数据强制进入下一层Function ForceEntryNext bool // ++++++++++ // JumpFunc 跳转到指定Function继续执行 JumpFunc string // Abort 终止Flow的执行 Abort bool } // ActionJumpFunc 会返回一个ActionFunc函数,并且会将funcName赋值给Action.JumpFunc // (注意:容易出现Flow循环调用,导致死循环) func ActionJumpFunc(funcName string) ActionFunc { return func(act *Action) { act.JumpFunc = funcName } } ``` 然后提供一个修改JumpFunc的配置方法`ActionJumpFunc()`,注意这个方法和之前的方法写法有一些不同,主要是返回一个匿名函数并且执行,目的则是修改Action中的JumpFunc属性。 ## 7.4.2 捕获Action 接下来,我们来捕获JumpFunc的Action动作,判断JumpFunc是否为空字符串即可。 > kis-flow/flow/kis_flow_action.go ```go // dealAction 处理Action,决定接下来Flow的流程走向 func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) { // DataReuse Action if flow.action.DataReuse { if err := flow.commitReuseData(ctx); err != nil { return nil, err } } else { if err := flow.commitCurData(ctx); err != nil { return nil, err } } // ForceEntryNext Action if flow.action.ForceEntryNext { if err := flow.commitVoidData(ctx); err != nil { return nil, err } flow.abort = false } // ++++++++++++++++++++++++++++++++ // JumpFunc Action if flow.action.JumpFunc != "" { if _, ok := flow.Funcs[flow.action.JumpFunc]; !ok { //当前JumpFunc不在flow中 return nil, errors.New(fmt.Sprintf("Flow Jump -> %s is not in Flow", flow.action.JumpFunc)) } jumpFunction := flow.Funcs[flow.action.JumpFunc] // 更新上层Function flow.PrevFunctionId = jumpFunction.GetPrevId() fn = jumpFunction // 如果设置跳跃,强制跳跃 flow.abort = false // ++++++++++++++++++++++++++++++++ } else { // 更新上一层 FuncitonId 游标 flow.PrevFunctionId = flow.ThisFunctionId fn = fn.Next() } // Abort Action 强制终止 if flow.action.Abort { flow.abort = true } // 清空Action flow.action = kis.Action{} return fn, nil } ``` 如果设置JumpFunc,则需要修改下次执行的`fn`指针,否则则正常寻址`fn.Next()`。 ## 7.4.3 单元测试 接下来来定义一个跳转Action的Function,配置,如下: > kis-flow/test/load_conf/func/func-jumpFunc.yml ```yaml kistype: func fname: jumpFunc fmode: Calculate source: name: 用户订单错误率 must: - order_id - user_id ``` 并且实现相关的Funciton业务逻辑,如下: > kis-flow/test/faas/faas_jump.go ```go package faas import ( "context" "fmt" "kis-flow/kis" ) // type FaaS func(context.Context, Flow) error func JumpFuncHandler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call JumpFuncHandler ----") for _, row := range flow.Input() { str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) fmt.Println(str) } return flow.Next(kis.ActionJumpFunc("funcName1")) } ``` 这里,最后通过`flow.Next(kis.ActionJumpFunc("funcName1"))`来指定跳转到funcName1的Function。 新建一个Flow,为FlowName5,配置如下: > kis-flow/test/load_conf/flow/flow-FlowName5.yml ```yaml kistype: flow status: 1 flow_name: flowName5 flows: - fname: funcName1 - fname: funcName2 - fname: jumpFunc ``` 之后,来实现单元测试用例代码,如下: > kis-flow/test/kis_action_test.go ```go func TestActionJumpFunc(t *testing.T) { ctx := context.Background() // 0. 注册Function 回调业务 kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) kis.Pool().FaaS("jumpFunc", faas.JumpFuncHandler) // 添加jumpFunc 业务 // 0. 注册ConnectorInit 和 Connector 回调业务 kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) // 1. 加载配置文件并构建Flow if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { panic(err) } // 2. 获取Flow flow1 := kis.Pool().GetFlow("flowName5") // 3. 提交原始数据 _ = flow1.CommitRow("This is Data1 from Test") _ = flow1.CommitRow("This is Data2 from Test") _ = flow1.CommitRow("This is Data3 from Test") // 4. 执行flow1 if err := flow1.Run(ctx); err != nil { panic(err) } } ``` cd到`kis-flow/test/`执行: ```bash go test -test.v -test.paniconexit0 -test.run TestActionJumpFunc ``` 结果如下: ```bash ... ... ---> Call funcName1Handler ---- In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data1 from Test In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data2 from Test In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data3 from Test context.Background ====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866 All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] KisFunctionS, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013680 ThisFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 PrevFunctionId:func-f6ca8010d66744429bf6069c9897a928 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}} ---> Call funcName2Handler ---- In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 0 ===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save ===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0 In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 1 ===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save ===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1 In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 2 ===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save ===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2 context.Background ====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866 All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] KisFunctionC, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-4faf8f019f4a4a48b84ef27abfad53d1 PrevFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}} ---> Call JumpFuncHandler ---- In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 0 In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 1 In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 2 KisFunctionV, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013620 ThisFunctionId:func-f6ca8010d66744429bf6069c9897a928 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}} ---> Call funcName1Handler ---- ... ... ``` 发现我们会无限循环的调度Flow,这样说明我们的JumpFunc Action已经生效。 ## 7.5【V0.6】源代码 https://github.com/aceld/kis-flow/releases/tag/v0.6 --- 作者:刘丹冰Aceld github: [https://github.com/aceld](https://github.com/aceld) KisFlow开源项目地址:[https://github.com/aceld/kis-flow](https://github.com/aceld/kis-flow) [Golang框架实战-KisFlow流式计算框架专栏](https://www.jianshu.com/c/c72f100773a6) --- [Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df) [Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a) [Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7) [Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc) [Golang框架实战-KisFlow流式计算框架(5)-Function调度](https://www.jianshu.com/p/6f54966c9f1e) [Golang框架实战-KisFlow流式计算框架(6)-Connector](https://www.jianshu.com/p/159b14219559) [Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出](https://www.jianshu.com/p/8870598beed0)

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

756 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传