[Golang框架实战-KisFlow流式计算框架专栏](https://www.jianshu.com/c/c72f100773a6)
---
[Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df)
[Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a)
[Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7)
[Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc)
[Golang框架实战-KisFlow流式计算框架(5)-Function调度](https://www.jianshu.com/p/6f54966c9f1e)
[Golang框架实战-KisFlow流式计算框架(6)-Connector](https://www.jianshu.com/p/159b14219559)
[Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出](https://www.jianshu.com/p/8870598beed0)
---
## 7.1 Action Abort(终止流程)
KisFlow Action 是指在执行Function的时候,同时可以控制Flow的调度逻辑,KisFlow提供一些Action动作让开发者做选择,本节先介绍最简单的Action动作,Abort(终止当前Flow)。
我们最终的Abort的使用形式如下:
```go
func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call AbortFuncHandler ----")
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return flow.Next(kis.ActionAbort) // 终止Flow
}
```
`AbortFuncHandler()`是一个Function 的业务回调方法,是由开发者自定义的,在执行完当前Funciton之后,正常的情况是继续执行下一个Funciton,但是如果传递`flow.Next(kis.ActionAbort)` 作为当前Funciton的返回值,那么则不会执行到下一个Funciton,而是直接终止当前Flow的调度计算流。
下面我们先来实现KisFlow的 Abort Action动作模式。
## 7.1.1 Abort接口定义
首先,先对Flow的Abort()接口做定义。
> kis-flow/kis/flow.go
```go
type Flow interface {
// Run 调度Flow,依次调度Flow中的Function并且执行
Run(ctx context.Context) error
// Link 将Flow中的Function按照配置文件中的配置进行连接
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow 提交Flow数据到即将执行的Function层
CommitRow(row interface{}) error
// Input 得到flow当前执行Function的输入源数据
Input() common.KisRowArr
// GetName 得到Flow的名称
GetName() string
// GetThisFunction 得到当前正在执行的Function
GetThisFunction() Function
// GetThisFuncConf 得到当前正在执行的Function的配置
GetThisFuncConf() *config.KisFuncConfig
// GetConnector 得到当前正在执行的Function的Connector
GetConnector() (Connector, error)
// GetConnConf 得到当前正在执行的Function的Connector的配置
GetConnConf() (*config.KisConnConfig, error)
// GetConfig 得到当前Flow的配置
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName 得到当前Flow的配置
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// --- KisFlow Action ---
// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
Next(acts ...ActionFunc) error
}
```
这里面提供一个接口`Next(acts ...ActionFunc) error`,其中参数是一个可变参数,类型为`ActionFunc`,这个是我们给KisFlow定义的Action相关的方法。有关Action的定义模块如下:
## 7.1.2 Action模块定义
Action是用来在Flow执行过程中,通过Function来控制Flow执行特殊动作的行为配置模块,包括上面的Abort行为,Abort也属于其中一个Action。Action的模块定义如下,在`kis-flow/kis/`下创建`action.go`文件,实现:
> kis-flow/kis/action.go
```go
package kis
// Action KisFlow执行流程Actions
type Action struct {
// Abort 终止Flow的执行
Abort bool
}
// ActionFunc KisFlow Functional Option 类型
type ActionFunc func(ops *Action)
// LoadActions 加载Actions,依次执行ActionFunc操作函数
func LoadActions(acts []ActionFunc) Action {
action := Action{}
if acts == nil {
return action
}
for _, act := range acts {
act(&action)
}
return action
}
// ActionAbort 终止Flow的执行
func ActionAbort(action *Action) {
action.Abort = true
}
```
首先,现在Action只有Abort一个行为,我们用bool类型来表示Abort是否为终止,true则为需要终止flow的调用。
其次,`type ActionFunc func(ops *Action)`这个函数原型为一个函数类型,函数的形参是传递进来一个`Action{}` 指针,而 `func ActionAbort(action *Action)`则是它的一个具体的函数,ActionAbort()的方法的目的就是将Action的Abort成员设置为true。
最后看`func LoadActions(acts []ActionFunc) Action`方法。这个形参是一个`ActionFunc`函数数组,`LoadActions()`则是创建一个新的Action{} ,然后依次执行`[]ActionFunc`的函数来改变Aciton{}的成员,最终将新的Action{}返回上层。
## 7.1.3 Next方法实现
接下来,我们需要给KisFlow模块实现这个接口,首先需要给KisFlow添加一个Action{}成员,表示每次执行完Function之后所携带的动作。
> kis-flow/flow/kis_flow.go
```go
// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
// 基础信息
Id string // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
Name string // Flow的可读名称
Conf *config.KisFlowConfig // Flow配置策略
// Function列表
Funcs map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName
FlowHead kis.Function // 当前Flow所拥有的Function列表表头
FlowTail kis.Function // 当前Flow所拥有的Function列表表尾
flock sync.RWMutex // 管理链表插入读写的锁
ThisFunction kis.Function // Flow当前正在执行的KisFunction对象
ThisFunctionId string // 当前执行到的Function ID
PrevFunctionId string // 当前执行到的Function 上一层FunctionID
// Function列表参数
funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
fplock sync.RWMutex // 管理funcParams的读写锁
// 数据
buffer common.KisRowArr // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
data common.KisDataMap // 流式计算各个层级的数据源
inPut common.KisRowArr // 当前Function的计算输入数据
// +++++++++++++++++++++
// KisFlow Action
action kis.Action // 当前Flow所携带的Action动作
}
```
然后实现`KisFlow`的Next()接口,如下:
> kis-flow/flow/kis_flow.go
```go
// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
func (flow *KisFlow) Next(acts ...kis.ActionFunc) error {
// 加载Function FaaS 传递的 Action动作
flow.action = kis.LoadActions(acts)
return nil
}
```
每次开发者在执行Function的自定义业务回调中,最后会调用`flow.Next()`来传递Action,所以` Next(acts ...kis.ActionFunc) error`就是讲传递的Action属性加载进来并且在`flow.action`保存。
## 7.1.4 Abort控制Flow流程
现在有个Abort来控制Flow流,那么我们需要给KisFlow添加一个成员来表示这个状态
> kis-flow/flow/kis_flow.go
```go
// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
// 基础信息
Id string // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
Name string // Flow的可读名称
Conf *config.KisFlowConfig // Flow配置策略
// Function列表
Funcs map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName
FlowHead kis.Function // 当前Flow所拥有的Function列表表头
FlowTail kis.Function // 当前Flow所拥有的Function列表表尾
flock sync.RWMutex // 管理链表插入读写的锁
ThisFunction kis.Function // Flow当前正在执行的KisFunction对象
ThisFunctionId string // 当前执行到的Function ID
PrevFunctionId string // 当前执行到的Function 上一层FunctionID
// Function列表参数
funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
fplock sync.RWMutex // 管理funcParams的读写锁
// 数据
buffer common.KisRowArr // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
data common.KisDataMap // 流式计算各个层级的数据源
inPut common.KisRowArr // 当前Function的计算输入数据
action kis.Action // 当前Flow所携带的Action动作
// +++++++++
abort bool // 是否中断Flow
}
```
在每次执行到flow.Run()方法时,需要重置abort变量,并且在循环调度的时候加上对flow.abort的判断。
> kis-flow/flow/kis_flow.go
```go
// Run 启动KisFlow的流式计算, 从起始Function开始执行流
func (flow *KisFlow) Run(ctx context.Context) error {
// +++++++++
// 重置 abort
flow.abort = false // 每次进入调度,要重置abort状态
// ... ...
// ... ...
//流式链式调用
for fn != nil && flow.abort != true { // ++++ 如果设置abort则不进入下次循环调度
// ... ...
// ... ...
if err := fn.Call(ctx, flow); err != nil {
//Error
return err
} else {
//Success
// ... ...
fn = fn.Next()
}
}
return nil
```
这样在每次Call()调度到Funciton的自定方法时,如果`return flow.Next(ActionAbort)`就会对flow的Action状态进行改变,从而就控制了flow的流程终止。最后就是将Action的Abort状态传递给KisFlow的Abort状态。
既然有了Abort状态,那么我们可以通过给Flow执行过程中添加一个设定,如果当前的Function没有提交本层的结果数据,也就是flow.buffer为空,那么将不会进入下一层,在本层直接结束退出Flow的Run()调用。
> kis-flow/flow/kis_flow_data.go
```go
//commitCurData 提交Flow当前执行Function的结果数据
func (flow *KisFlow) commitCurData(ctx context.Context) error {
// 判断本层计算是否有结果数据,如果没有则退出本次Flow Run循环
if len(flow.buffer) == 0 {
// ++++++++++++
flow.abort = true
return nil
}
// ... ...
// ... ...
return nil
```
## 7.1.5 Action捕获及处理
接下来来实现一个专门处理Action动作的方法,定义在`kis-flow/flow/kis_flow_action.go`文件中,如下:
> kis-flow/flow/kis_flow_action.go
```go
package flow
import (
"context"
"errors"
"fmt"
"kis-flow/kis"
)
// dealAction 处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
if err := flow.commitCurData(ctx); err != nil {
return nil, err
}
// 更新上一层 FuncitonId 游标
flow.PrevFunctionId = flow.ThisFunctionId
fn = fn.Next()
// Abort Action 强制终止
if flow.action.Abort {
flow.abort = true
}
// 清空Action
flow.action = kis.Action{}
return fn, nil
}
```
然后稍微改进下`KisFlow的Run()` 流程,将`dealAction() `方法嵌入进去。
> kis-flow/flow/kis_flow.go
```go
// Run 启动KisFlow的流式计算, 从起始Function开始执行流
func (flow *KisFlow) Run(ctx context.Context) error {
var fn kis.Function
fn = flow.FlowHead
flow.abort = false
if flow.Conf.Status == int(common.FlowDisable) {
//flow被配置关闭
return nil
}
// 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function
flow.PrevFunctionId = common.FunctionIdFirstVirtual
// 提交数据流原始数据
if err := flow.commitSrcData(ctx); err != nil {
return err
}
//流式链式调用
for fn != nil && flow.abort == false {
// flow记录当前执行到的Function 标记
fid := fn.GetId()
flow.ThisFunction = fn
flow.ThisFunctionId = fid
// 得到当前Function要处理与的源数据
if inputData, err := flow.getCurData(); err != nil {
log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
return err
} else {
flow.inPut = inputData
}
if err := fn.Call(ctx, flow); err != nil {
//Error
return err
} else {
//Success
// +++++++++++++++++++++++++++++++
fn, err = flow.dealAction(ctx, fn)
if err != nil {
return err
}
// +++++++++++++++++++++++++++++++
}
}
return nil
}
```
## 7.1.6 Action Abort单元测试
首先我们新建一个Function业务,配置文件如下:
> kis-flow/test/load_conf/func/func-AbortFunc.yml
```yaml
kistype: func
fname: abortFunc
fmode: Calculate
source:
name: 用户订单错误率
must:
- order_id
- user_id
```
当前的Funciton的名称为`abortFunc`,然后实现其FaaS函数,如下:
> kis-flow/test/faas/faas_abort.go
```go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call AbortFuncHandler ----")
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return flow.Next(kis.ActionAbort)
}
```
这个Function就会最终调用`flow.Next(kis.ActionAbort)`来终止Flow,接下来我们新建一个Flow,将上面的Function作为中间的Function,看检测是否会终止之后的Function被执行。
新建的flow的配置如下:
> kis-flow/test/load_conf/flow/flow-FlowName2.yml
```yaml
kistype: flow
status: 1
flow_name: flowName2
flows:
- fname: funcName1
- fname: abortFunc
- fname: funcName3
```
当前Flow的名称为`flowName2`,当前的Flow有三个Function,其中funcNam1 和 funcName2我们之前都已经定义好了,abortFunc是我们新建的,并且在中间。如果abort功能满足,则funcName3将不会被调度。
接下来实现单元测试用例。
> kis-flow/test/kis_action_test.go
```go
package test
import (
"context"
"kis-flow/common"
"kis-flow/file"
"kis-flow/kis"
"kis-flow/test/caas"
"kis-flow/test/faas"
"testing"
)
func TestActionAbort(t *testing.T) {
ctx := context.Background()
// 0. 注册Function 回调业务
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // 添加abortFunc 业务
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. 注册ConnectorInit 和 Connector 回调业务
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("flowName2")
// 3. 提交原始数据
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
```
其中下面的代码是初始化注册的代码,大家也可以写在其他文件中,这样就不需要每次都携带这部分代码了。
```go
// 0. 注册Function 回调业务
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // 添加abortFunc 业务
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. 注册ConnectorInit 和 Connector 回调业务
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
```
cd 到`kis-flow/test/`目录下执行如下指令:
```bash
go test -test.v -test.paniconexit0 -test.run TestActionAbort
```
结果如下:
```bash
=== RUN TestActionAbort
Add KisPool FuncName=funcName1
Add KisPool FuncName=abortFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
context.Background
====> After CommitSrcData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094cc0 ThisFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] jumpFunc:NoJump abort:false nextOpt:<nil>}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094d20 ThisFunctionId:func-7f5af1521fd64d08839d5bdd26de5254 PrevFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] jumpFunc:NoJump abort:false nextOpt:<nil>}
---> Call AbortFuncHandler ----
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 0
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 1
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 2
--- PASS: TestActionAbort (0.00s)
PASS
ok kis-flow/test 0.487s
```
通过结果可以看到,在执行完 AbortFuncHandler 后,没有继续执行,而是退出了Flow的Run()方法。
## 7.2 Action DataReuse(复用上层数据)
Action DataReuse 为服用上层数据,含义为,当前的执行Function提交到下一层的结果将不被使用,而是直接将当前Function的上一层结果数据,复用到下一层,作为下一层Funciton的数据源。
下面来实现Action DataReuse功能。
## 7.2.1 DataReuse Action添加
在Action中添加DataReuse成员,是一个bool类型。
> kis-flow/kis/action.go
```go
// Action KisFlow执行流程Actions
type Action struct {
// +++++++++++++
// DataReuse 是否复用上层Function数据
DataReuse bool
// Abort 终止Flow的执行
Abort bool
}
// ActionDataReuse Next复用上层Function数据Option
func ActionDataReuse(act *Action) {
act.DataReuse = true
}
```
然后提供一个ActionFunc,命名为:`ActionDataReuse`,实现中为改变DataReuse状态为true。
## 7.2.2 复用上层数据到下层
这里需要再实现一个提交数据的方法,为如何提交复用数据,具体逻辑如下:
> kis-flow/flow/kis_flow_data.go
```go
// commitReuseData
func (flow *KisFlow) commitReuseData(ctx context.Context) error {
// 判断上层是否有结果数据, 如果没有则退出本次Flow Run循环
if len(flow.data[flow.PrevFunctionId]) == 0 {
flow.abort = true
return nil
}
// 本层结果数据等于上层结果数据(复用上层结果数据到本层)
flow.data[flow.ThisFunctionId] = flow.data[flow.PrevFunctionId]
// 清空缓冲Buf (如果是ReuseData选项,那么提交的全部数据,都将不会携带到下一层)
flow.buffer = flow.buffer[0:0]
log.Logger().DebugFX(ctx, " ====> After commitReuseData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
```
逻辑很简单,与`commitCurData()`不同的是,`commitCurData()`为将flow.buffer的数据提交到`flow.data[flow.ThisFunctionId]`中,而`commitReuseData()`为将上一层的结果数据提交到`flow.data[flow.ThisFunctionId]`中。
## 7.2.3 处理DataReuse Action动作
然后在`dealAction()`方法中添加对Action DataReuse的动作捕获,如下:
> kis-flow/flow/kis_flow_action.go
```go
// dealAction 处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
// ++++++++++++++++
// DataReuse Action
if flow.action.DataReuse {
if err := flow.commitReuseData(ctx); err != nil {
return nil, err
}
} else {
if err := flow.commitCurData(ctx); err != nil {
return nil, err
}
}
// 更新上一层 FuncitonId 游标
flow.PrevFunctionId = flow.ThisFunctionId
fn = fn.Next()
// Abort Action 强制终止
if flow.action.Abort {
flow.abort = true
}
// 清空Action
flow.action = kis.Action{}
return fn, nil
}
```
## 7.2.4 单元测试
下面来针对DataReuse做单元测试,首先创建一个名字为dataReuseFunc 的Funciton,先创建配置文件。
> kis-flow/test/load_conf/func/func-dataReuseFunc.yml
```yaml
kistype: func
fname: dataReuseFunc
fmode: Calculate
source:
name: 用户订单错误率
must:
- order_id
- user_id
```
同时新建一个Flow流,名称为flowName3,配置如下:
> kis-flow/test/load_conf/flow/func-FlowName3.yml
```yaml
kistype: flow
status: 1
flow_name: flowName3
flows:
- fname: funcName1
- fname: dataReuseFunc
- fname: funcName3
```
针对dataReuseFunc的Function的逻辑业务,如下:
> kis-flow/test/faas/faas_data_reuse.go
```go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func DataReuseFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call DataReuseFuncHandler ----")
for index, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
// 计算结果数据
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// 提交结果数据
_ = flow.CommitRow(resultStr)
}
return flow.Next(kis.ActionDataReuse)
}
```
最后实现测试用例,如下:
> kis-flow/test/kis_action_test.go
```go
func TestActionDataReuse(t *testing.T) {
ctx := context.Background()
// 0. 注册Function 回调业务
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("dataReuseFunc", faas.DataReuseFuncHandler) // 添加dataReuesFunc 业务
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. 注册ConnectorInit 和 Connector 回调业务
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("flowName3")
// 3. 提交原始数据
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
```
cd 到 `kis-flow/test/`下执行:
```bash
go test -test.v -test.paniconexit0 -test.run TestActionDataReuse
```
结果是:
```bash
=== RUN TestActionDataReuse
Add KisPool FuncName=funcName1
Add KisPool FuncName=dataReuseFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
context.Background
====> After CommitSrcData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000955c0 ThisFunctionId:func-7886178381634f05b302841141382e59 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095620 ThisFunctionId:func-ef567879d0dd45b287ed709e549e9d32 PrevFunctionId:func-7886178381634f05b302841141382e59 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call DataReuseFuncHandler ----
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 0
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 1
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 2
context.Background
====> After commitReuseData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095680 ThisFunctionId:func-cfe66e39aba54ff989d6764cc4edda20 PrevFunctionId:func-ef567879d0dd45b287ed709e549e9d32 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 0
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 1
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 2
--- PASS: TestActionDataReuse (0.02s)
PASS
ok kis-flow/test 0.523s
```
通过结果可以看出,在最后的`funcName3Handler`中得到的数据是funcName1传递下来的数据,中间的ReuseFunction将上层的数据复用到了下一层,变成了FuncName3的数据源。
## 7.3 Action ForceEntryNext(强制进入下一层)
## 7.3.1 ForceEntryNext Action属性
目前的KisFlow为,如果当前的Function没有commit数据(本层的结果数据),那么当前的Function结束后,将不会继续调度下一层Function。 但是有的Flow的流式计算可能需要继续向下执行,哪怕没有数据,所以这里可以通过`ForceEntryNext`这个动作来触发。
首先我们在Action中新增一个`ForceEntryNext` 属性。
> kis-flow/kis/action.go
```go
// Action KisFlow执行流程Actions
type Action struct {
// DataReuse 是否复用上层Function数据
DataReuse bool
// 默认Next()为如果本层Function计算结果为0条数据,之后Function将不会继续执行
// ForceEntryNext 为忽略上述默认规则,没有数据强制进入下一层Function
ForceEntryNext bool
// Abort 终止Flow的执行
Abort bool
}
// ActionForceEntryNext 强制进入下一层
func ActionForceEntryNext(act *Action) {
act.ForceEntryNext = true
}
```
且提供配置函数`ActionForceEntryNext()`来修改这个属性状态。
## 7.3.2 捕获Action
在捕获Action的`dealAction()`方法中,加上对这个状态的判断,如果被设置,则需要将flow.Abort状态改成false,flow将继续执行下一层。
> kis-flow/flow/kis_flow_action.go
```go
// dealAction 处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
// DataReuse Action
if flow.action.DataReuse {
if err := flow.commitReuseData(ctx); err != nil {
return nil, err
}
} else {
if err := flow.commitCurData(ctx); err != nil {
return nil, err
}
}
// ++++++++++++++++++++++++++++
// ForceEntryNext Action
if flow.action.ForceEntryNext {
if err := flow.commitVoidData(ctx); err != nil {
return nil, err
}
flow.abort = false
}
// 更新上一层 FuncitonId 游标
flow.PrevFunctionId = flow.ThisFunctionId
fn = fn.Next()
// Abort Action 强制终止
if flow.action.Abort {
flow.abort = true
}
// 清空Action
flow.action = kis.Action{}
return fn, nil
}
```
这里有一个细节,我们需要调用一个方法`commitVoidData()`,即提交空数据,原因是,如果不提交空数据,那么flow.buffer依然为空,那么不会执行数据的提交动作,那么会导致`flow.data[flow.ThisFunctionId]`这条不存在,也就是key不存在,那么再执行到`flow.getCurData()`会出现找不到key的异常而panic。所以这里需要提交一个空的数据到`flow.data[flow.ThisFunctionId]`中。
具体的`commitVoidData()`实现如下:
> kis-flow/flow/kis_flow_data.go
```go
func (flow *KisFlow) commitVoidData(ctx context.Context) error {
if len(flow.buffer) != 0 {
return nil
}
// 制作空数据
batch := make(common.KisRowArr, 0)
// 将本层计算的缓冲数据提交到本层结果数据中
flow.data[flow.ThisFunctionId] = batch
log.Logger().DebugFX(ctx, " ====> After commitVoidData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)
return nil
}
```
## 7.3.3 单元测试,不设置ForceEntryNext
首先,创建一个noResultFunc的Function配置,且实现相关的回调业务函数。
> kis-flow/test/load_conf/func/func-NoResultFunc.yml
```yaml
kistype: func
fname: noResultFunc
fmode: Calculate
source:
name: 用户订单错误率
must:
- order_id
- user_id
```
> kis-flow/test/faas/faas_no_result.go
```go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call NoResultFuncHandler ----")
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return flow.Next()
}
```
这里面在Function的最后,只调用`flow.Next()` 不传递任何Action动作。
然后新建一个FlowName4,配置如下:
> kis-flow/test/load_conf/flow-FlowName4.yml
```yaml
kistype: flow
status: 1
flow_name: flowName4
flows:
- fname: funcName1
- fname: noResultFunc
- fname: funcName3
```
最后我们编写单元测试用例代码,将noResultFunc放在中间的部分。
> kis-flow/test/kis_action_test.go
```go
func TestActionForceEntry(t *testing.T) {
ctx := context.Background()
// 0. 注册Function 回调业务
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("noResultFunc", faas.NoResultFuncHandler) // 添加noResultFunc 业务
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. 注册ConnectorInit 和 Connector 回调业务
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("flowName4")
// 3. 提交原始数据
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
```
cd到`kis-flow/test/` 下执行:
```bash
go test -test.v -test.paniconexit0 -test.run TestActionForceEntry
```
结果如下:
```bash
=== RUN TestActionForceEntry
Add KisPool FuncName=funcName1
Add KisPool FuncName=noResultFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
context.Background
====> After CommitSrcData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d560 ThisFunctionId:func-4d113d6a8e744d30a906db310f2d7818 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d5c0 ThisFunctionId:func-47cb6f9ae464484aa779c18284035705 PrevFunctionId:func-4d113d6a8e744d30a906db310f2d7818 funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call NoResultFuncHandler ----
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 0
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 1
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 2
--- PASS: TestActionForceEntry (0.02s)
PASS
ok kis-flow/test 0.958s
```
因为noResultFunc不会生成任何的结果数据,所以下一层Function将不会被执行,最后只执行到
```bash
---> Call NoResultFuncHandler ----
```
## 7.3.4 单元测试,设置ForceEntryNext
下面我们将Action为ForceEntryNext加上,在`NoResultFuncHandler()` 中,加上`flow.Next(kis.ActionForceEntryNext)`,如下:
> kis-flow/test/faas/faas_no_result.go
```go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call NoResultFuncHandler ----")
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return flow.Next(kis.ActionForceEntryNext)
}
```
cd到`kis-flow/test/` 下执行:
```bash
go test -test.v -test.paniconexit0 -test.run TestActionForceEntry
```
结果如下:
```bash
=== RUN TestActionForceEntry
Add KisPool FuncName=funcName1
Add KisPool FuncName=noResultFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
context.Background
====> After CommitSrcData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-ecddaee7d7d447a9852d07088732f509 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013740 ThisFunctionId:func-c9817c7993894919b8463dea1757544e PrevFunctionId:func-ecddaee7d7d447a9852d07088732f509 funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call NoResultFuncHandler ----
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 0
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 1
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 2
context.Background
====> After commitVoidData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000137a0 ThisFunctionId:func-5729600ae6ea4d6f879eb5832c638e1a PrevFunctionId:func-c9817c7993894919b8463dea1757544e funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName3Handler ----
--- PASS: TestActionForceEntry (0.01s)
PASS
ok kis-flow/test 0.348s
```
会发现,Function第三层funcName3Handler 被执行到,但是没有任何的数据。
## 7.4 Action JumpFunc(流程跳转)
接下来,来实现JumpFunc Action,JumpFunc是可以在当前Flow中任意跳转到指定的FuncName继续执行(前提是跳转的FuncName当当前Flow中存在)
> 注意:JumpFunc容易出现无限循环流,所以在业务的设计要慎用。
## 7.4.1 Action添加JumpFunc
首先在Action添加一个JumpFunc属性,注意,JunpFunc不是一个bool状态,而是一个string字符串,表示具体要跳转的FunctionName名称。
> kis-flow/kis/action.go
```go
// Action KisFlow执行流程Actions
type Action struct {
// DataReuse 是否复用上层Function数据
DataReuse bool
// 默认Next()为如果本层Function计算结果为0条数据,之后Function将不会继续执行
// ForceEntryNext 为忽略上述默认规则,没有数据强制进入下一层Function
ForceEntryNext bool
// ++++++++++
// JumpFunc 跳转到指定Function继续执行
JumpFunc string
// Abort 终止Flow的执行
Abort bool
}
// ActionJumpFunc 会返回一个ActionFunc函数,并且会将funcName赋值给Action.JumpFunc
// (注意:容易出现Flow循环调用,导致死循环)
func ActionJumpFunc(funcName string) ActionFunc {
return func(act *Action) {
act.JumpFunc = funcName
}
}
```
然后提供一个修改JumpFunc的配置方法`ActionJumpFunc()`,注意这个方法和之前的方法写法有一些不同,主要是返回一个匿名函数并且执行,目的则是修改Action中的JumpFunc属性。
## 7.4.2 捕获Action
接下来,我们来捕获JumpFunc的Action动作,判断JumpFunc是否为空字符串即可。
> kis-flow/flow/kis_flow_action.go
```go
// dealAction 处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {
// DataReuse Action
if flow.action.DataReuse {
if err := flow.commitReuseData(ctx); err != nil {
return nil, err
}
} else {
if err := flow.commitCurData(ctx); err != nil {
return nil, err
}
}
// ForceEntryNext Action
if flow.action.ForceEntryNext {
if err := flow.commitVoidData(ctx); err != nil {
return nil, err
}
flow.abort = false
}
// ++++++++++++++++++++++++++++++++
// JumpFunc Action
if flow.action.JumpFunc != "" {
if _, ok := flow.Funcs[flow.action.JumpFunc]; !ok {
//当前JumpFunc不在flow中
return nil, errors.New(fmt.Sprintf("Flow Jump -> %s is not in Flow", flow.action.JumpFunc))
}
jumpFunction := flow.Funcs[flow.action.JumpFunc]
// 更新上层Function
flow.PrevFunctionId = jumpFunction.GetPrevId()
fn = jumpFunction
// 如果设置跳跃,强制跳跃
flow.abort = false
// ++++++++++++++++++++++++++++++++
} else {
// 更新上一层 FuncitonId 游标
flow.PrevFunctionId = flow.ThisFunctionId
fn = fn.Next()
}
// Abort Action 强制终止
if flow.action.Abort {
flow.abort = true
}
// 清空Action
flow.action = kis.Action{}
return fn, nil
}
```
如果设置JumpFunc,则需要修改下次执行的`fn`指针,否则则正常寻址`fn.Next()`。
## 7.4.3 单元测试
接下来来定义一个跳转Action的Function,配置,如下:
> kis-flow/test/load_conf/func/func-jumpFunc.yml
```yaml
kistype: func
fname: jumpFunc
fmode: Calculate
source:
name: 用户订单错误率
must:
- order_id
- user_id
```
并且实现相关的Funciton业务逻辑,如下:
> kis-flow/test/faas/faas_jump.go
```go
package faas
import (
"context"
"fmt"
"kis-flow/kis"
)
// type FaaS func(context.Context, Flow) error
func JumpFuncHandler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call JumpFuncHandler ----")
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return flow.Next(kis.ActionJumpFunc("funcName1"))
}
```
这里,最后通过`flow.Next(kis.ActionJumpFunc("funcName1"))`来指定跳转到funcName1的Function。
新建一个Flow,为FlowName5,配置如下:
> kis-flow/test/load_conf/flow/flow-FlowName5.yml
```yaml
kistype: flow
status: 1
flow_name: flowName5
flows:
- fname: funcName1
- fname: funcName2
- fname: jumpFunc
```
之后,来实现单元测试用例代码,如下:
> kis-flow/test/kis_action_test.go
```go
func TestActionJumpFunc(t *testing.T) {
ctx := context.Background()
// 0. 注册Function 回调业务
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("jumpFunc", faas.JumpFuncHandler) // 添加jumpFunc 业务
// 0. 注册ConnectorInit 和 Connector 回调业务
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("flowName5")
// 3. 提交原始数据
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
```
cd到`kis-flow/test/`执行:
```bash
go test -test.v -test.paniconexit0 -test.run TestActionJumpFunc
```
结果如下:
```bash
...
...
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionS, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013680 ThisFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 PrevFunctionId:func-f6ca8010d66744429bf6069c9897a928 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionC, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-4faf8f019f4a4a48b84ef27abfad53d1 PrevFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call JumpFuncHandler ----
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 0
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 1
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 2
KisFunctionV, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013620 ThisFunctionId:func-f6ca8010d66744429bf6069c9897a928 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}
---> Call funcName1Handler ----
...
...
```
发现我们会无限循环的调度Flow,这样说明我们的JumpFunc Action已经生效。
## 7.5【V0.6】源代码
https://github.com/aceld/kis-flow/releases/tag/v0.6
---
作者:刘丹冰Aceld github: [https://github.com/aceld](https://github.com/aceld)
KisFlow开源项目地址:[https://github.com/aceld/kis-flow](https://github.com/aceld/kis-flow)
[Golang框架实战-KisFlow流式计算框架专栏](https://www.jianshu.com/c/c72f100773a6)
---
[Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df)
[Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a)
[Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7)
[Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc)
[Golang框架实战-KisFlow流式计算框架(5)-Function调度](https://www.jianshu.com/p/6f54966c9f1e)
[Golang框架实战-KisFlow流式计算框架(6)-Connector](https://www.jianshu.com/p/159b14219559)
[Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出](https://www.jianshu.com/p/8870598beed0)
有疑问加站长微信联系(非本文作者))