[Golang框架实战-KisFlow流式计算框架专栏](https://www.jianshu.com/c/c72f100773a6)
---
[Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df)
[Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a)
[Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7)
[Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc)
[Golang框架实战-KisFlow流式计算框架(5)-Function调度](https://www.jianshu.com/p/6f54966c9f1e)
[Golang框架实战-KisFlow流式计算框架(6)-Connector](https://www.jianshu.com/p/159b14219559)
[Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出](https://www.jianshu.com/p/8870598beed0)
[Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action](https://www.jianshu.com/p/6d43e9708752)
[Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数](https://www.jianshu.com/p/39f55a631d87)
---
## 8.1 Flow Cache 数据流缓存
KisFlow也提供流式计算中的共享缓存,采用简单的本地缓存供开发者按需使用,有关本地缓存的第三方技术依赖选型: [https://github.com/patrickmn/go-cache](https://github.com/patrickmn/go-cache)
## 8.1.1 go-cache
(1)安装
```bash
go get github.com/patrickmn/go-cache
```
(2)使用
```go
import (
"fmt"
"github.com/patrickmn/go-cache"
"time"
)
func main() {
// Create a cache with a default expiration time of 5 minutes, and which
// purges expired items every 10 minutes
c := cache.New(5*time.Minute, 10*time.Minute)
// Set the value of the key "foo" to "bar", with the default expiration time
c.Set("foo", "bar", cache.DefaultExpiration)
// Set the value of the key "baz" to 42, with no expiration time
// (the item won't be removed until it is re-set, or removed using
// c.Delete("baz")
c.Set("baz", 42, cache.NoExpiration)
// Get the string associated with the key "foo" from the cache
foo, found := c.Get("foo")
if found {
fmt.Println(foo)
}
// Since Go is statically typed, and cache values can be anything, type
// assertion is needed when values are being passed to functions that don't
// take arbitrary types, (i.e. interface{}). The simplest way to do this for
// values which will only be used once--e.g. for passing to another
// function--is:
foo, found := c.Get("foo")
if found {
MyFunction(foo.(string))
}
// This gets tedious if the value is used several times in the same function.
// You might do either of the following instead:
if x, found := c.Get("foo"); found {
foo := x.(string)
// ...
}
// or
var foo string
if x, found := c.Get("foo"); found {
foo = x.(string)
}
// ...
// foo can then be passed around freely as a string
// Want performance? Store pointers!
c.Set("foo", &MyStruct, cache.DefaultExpiration)
if x, found := c.Get("foo"); found {
foo := x.(*MyStruct)
// ...
}
}
```
详细参考:[https://github.com/patrickmn/go-cache](https://github.com/patrickmn/go-cache)
### 8.1.2 KisFlow集成go-cache能力
#### (1) Flow提供抽象层接口
在Flow中提供有关Cache的操作的接口,如下:
> kis-flow/kis/flow.go
```go
type Flow interface {
// Run 调度Flow,依次调度Flow中的Function并且执行
Run(ctx context.Context) error
// Link 将Flow中的Function按照配置文件中的配置进行连接
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow 提交Flow数据到即将执行的Function层
CommitRow(row interface{}) error
// Input 得到flow当前执行Function的输入源数据
Input() common.KisRowArr
// GetName 得到Flow的名称
GetName() string
// GetThisFunction 得到当前正在执行的Function
GetThisFunction() Function
// GetThisFuncConf 得到当前正在执行的Function的配置
GetThisFuncConf() *config.KisFuncConfig
// GetConnector 得到当前正在执行的Function的Connector
GetConnector() (Connector, error)
// GetConnConf 得到当前正在执行的Function的Connector的配置
GetConnConf() (*config.KisConnConfig, error)
// GetConfig 得到当前Flow的配置
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName 得到当前Flow的配置
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
Next(acts ...ActionFunc) error
// ++++++++++++++++++++++++++++++++++++++++
// GetCacheData 得到当前Flow的缓存数据
GetCacheData(key string) interface{}
// SetCacheData 设置当前Flow的缓存数据
SetCacheData(key string, value interface{}, Exp time.Duration)
}
```
`SetCacheData()`为设置本地缓存,Exp为超时时间,如果Exp为0,则为永久。
`GetCacheData()`为读取本地缓存。
#### (2)提供一些常量
提供有关缓存超时时间的一些常量。
> kis-flow/common/const.go
```go
// cache
const (
// DeFaultFlowCacheCleanUp KisFlow中Flow对象Cache缓存默认的清理内存时间
DeFaultFlowCacheCleanUp = 5 //单位 min
// DefaultExpiration 默认GoCahce时间 ,永久保存
DefaultExpiration time.Duration = 0
)
```
#### (3) KisFlow新增成员及初始化
> #### kis-flow/flow/kis_flow.go
```go
// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
// ... ...
// ... ...
// flow的本地缓存
cache *cache.Cache // Flow流的临时缓存上线文环境
}
// NewKisFlow 创建一个KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
flow := new(KisFlow)
// ... ...
// ... ...
// 初始化本地缓存
flow.cache = cache.New(cache.NoExpiration, common.DeFaultFlowCacheCleanUp*time.Minute)
return flow
}
```
#### (4)实现接口
最后实现有关缓存读写操作的两个接口,代码如下:
> kis-flow/flow/kis_flow_data.go
```go
func (flow *KisFlow) GetCacheData(key string) interface{} {
if data, found := flow.cache.Get(key); found {
return data
}
return nil
}
func (flow *KisFlow) SetCacheData(key string, value interface{}, Exp time.Duration) {
if Exp == common.DefaultExpiration {
flow.cache.Set(key, value, cache.DefaultExpiration)
} else {
flow.cache.Set(key, value, Exp)
}
}
```
## 8.2 MetaData 临时缓存参数
MetaData我们定义为Flow、Function、Connector每个层级都会提供一个`map[string]interface{}` 的结构来存放临时数据,这个数据的生命周期与各个实例的生命周期一致。
## 8.2.1 Flow添加MetaData
首先,KisFlow的成员新增`metaData map[string]interface{}`和对应的读写锁。
> kis-flow/flow/kis_flow.go
```go
// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
// ... ...
// ... ...
// +++++++++++++++++++++++++++++++++++++++++++
// flow的metaData
metaData map[string]interface{} // Flow的自定义临时数据
mLock sync.RWMutex // 管理metaData的读写锁
}
```
且在KisFlow的构造函数下对`metaData`成员进行内存初始化,如下:
> kis-flow/flow/kis_flow.go
```go
// NewKisFlow 创建一个KisFlow.
func NewKisFlow(conf *config.KisFlowConfig) kis.Flow {
flow := new(KisFlow)
// ... ...
// ... ...
// ++++++++++++++++++++++++++++++++++++++
// 初始化临时数据
flow.metaData = make(map[string]interface{})
return flow
}
```
之后,给Flow添加MetaData的读写接口,实现非常的简单,如下:
> kis-flow/kis/flow.go
```go
type Flow interface {
// Run 调度Flow,依次调度Flow中的Function并且执行
Run(ctx context.Context) error
// Link 将Flow中的Function按照配置文件中的配置进行连接
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow 提交Flow数据到即将执行的Function层
CommitRow(row interface{}) error
// Input 得到flow当前执行Function的输入源数据
Input() common.KisRowArr
// GetName 得到Flow的名称
GetName() string
// GetThisFunction 得到当前正在执行的Function
GetThisFunction() Function
// GetThisFuncConf 得到当前正在执行的Function的配置
GetThisFuncConf() *config.KisFuncConfig
// GetConnector 得到当前正在执行的Function的Connector
GetConnector() (Connector, error)
// GetConnConf 得到当前正在执行的Function的Connector的配置
GetConnConf() (*config.KisConnConfig, error)
// GetConfig 得到当前Flow的配置
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName 得到当前Flow的配置
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
Next(acts ...ActionFunc) error
// GetCacheData 得到当前Flow的缓存数据
GetCacheData(key string) interface{}
// SetCacheData 设置当前Flow的缓存数据
SetCacheData(key string, value interface{}, Exp time.Duration)
// ++++++++++++++++++++++++++++
// GetMetaData 得到当前Flow的临时数据
GetMetaData(key string) interface{}
// SetMetaData 设置当前Flow的临时数据
SetMetaData(key string, value interface{})
}
```
定义接口`GetMetaData()`、`SetMetaData()`,分别作为读写接口。
最后来实现,如下:
> kis-flow/flow/kis_flow_data.go
```go
// GetMetaData 得到当前Flow对象的临时数据
func (flow *KisFlow) GetMetaData(key string) interface{} {
flow.mLock.RLock()
defer flow.mLock.RUnlock()
data, ok := flow.metaData[key]
if !ok {
return nil
}
return data
}
// SetMetaData 设置当前Flow对象的临时数据
func (flow *KisFlow) SetMetaData(key string, value interface{}) {
flow.mLock.Lock()
defer flow.mLock.Unlock()
flow.metaData[key] = value
}
```
## 8.2.2 Function 添加MetaData
首先在BaseFunciton中添加成员metaData,如下:
> kis-flow/function/kis_base_funciton.go
```go
type BaseFunction struct {
// Id , KisFunction的实例ID,用于KisFlow内部区分不同的实例对象
Id string
Config *config.KisFuncConfig
// flow
flow kis.Flow //上下文环境KisFlow
// connector
connector kis.Connector
// ++++++++++++++++++++++++
// Function的自定义临时数据
metaData map[string]interface{}
// 管理metaData的读写锁
mLock sync.RWMutex
// link
N kis.Function //下一个流计算Function
P kis.Function //上一个流计算Function
```
在Funciton构造函数的地方,这里需要进行改进下,每个具体的Funciton都需要一个构造函数来初始化`metaData`成员,改动如下:
> kis-flow/function/kis_base_function.go
```go
func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function {
var f kis.Function
//工厂生产泛化对象
// ++++++++++++++
switch common.KisMode(config.FMode) {
case common.V:
f = NewKisFunctionV() // +++
case common.S:
f = NewKisFunctionS() // +++
case common.L:
f = NewKisFunctionL() // +++
case common.C:
f = NewKisFunctionC() // +++
case common.E:
f = NewKisFunctionE() // +++
default:
//LOG ERROR
return nil
}
// 生成随机实例唯一ID
f.CreateId()
// 设置基础信息属性
if err := f.SetConfig(config); err != nil {
panic(err)
}
// 设置Flow
if err := f.SetFlow(flow); err != nil {
panic(err)
}
return f
}
```
其中每个构造函数如下:
> kis-flow/function/kis_function_c.go
```go
func NewKisFunctionC() kis.Function {
f := new(KisFunctionC)
// 初始化metaData
f.metaData = make(map[string]interface{})
return f
}
```
> kis-flow/function/kis_function_v.go
```go
func NewKisFunctionV() kis.Function {
f := new(KisFunctionV)
// 初始化metaData
f.metaData = make(map[string]interface{})
return f
}
```
> kis-flow/function/kis_function_e.go
```go
func NewKisFunctionE() kis.Function {
f := new(KisFunctionE)
// 初始化metaData
f.metaData = make(map[string]interface{})
return f
}
```
> kis-flow/function/kis_function_s.go
```go
func NewKisFunctionS() kis.Function {
f := new(KisFunctionS)
// 初始化metaData
f.metaData = make(map[string]interface{})
return f
}
```
> kis-flow/function/kis_function_l.go
```go
func NewKisFunctionL() kis.Function {
f := new(KisFunctionL)
// 初始化metaData
f.metaData = make(map[string]interface{})
return f
}
```
接下来,给Funciton抽象层,添加获取`metaData`成员的接口,如下:
> kis-flow/kis/function.go
```go
type Function interface {
// Call 执行流式计算逻辑
Call(ctx context.Context, flow Flow) error
// SetConfig 给当前Function实例配置策略
SetConfig(s *config.KisFuncConfig) error
// GetConfig 获取当前Function实例配置策略
GetConfig() *config.KisFuncConfig
// SetFlow 给当前Function实例设置所依赖的Flow实例
SetFlow(f Flow) error
// GetFlow 获取当前Functioin实力所依赖的Flow
GetFlow() Flow
// AddConnector 给当前Function实例添加一个Connector
AddConnector(conn Connector) error
// GetConnector 获取当前Function实例所关联的Connector
GetConnector() Connector
// CreateId 给当前Funciton实力生成一个随机的实例KisID
CreateId()
// GetId 获取当前Function的FID
GetId() string
// GetPrevId 获取当前Function上一个Function节点FID
GetPrevId() string
// GetNextId 获取当前Function下一个Function节点FID
GetNextId() string
// Next 返回下一层计算流Function,如果当前层为最后一层,则返回nil
Next() Function
// Prev 返回上一层计算流Function,如果当前层为最后一层,则返回nil
Prev() Function
// SetN 设置下一层Function实例
SetN(f Function)
// SetP 设置上一层Function实例
SetP(f Function)
// ++++++++++++++++++++++++++++++++++
// GetMetaData 得到当前Function的临时数据
GetMetaData(key string) interface{}
// SetMetaData 设置当前Function的临时数据
SetMetaData(key string, value interface{})
}
```
对上述新增的两个接口的实现,在BaseFunction中实现就可以了。
> kis-flow/funciton/kis_base_function.go
```go
// GetMetaData 得到当前Function的临时数据
func (base *BaseFunction) GetMetaData(key string) interface{} {
base.mLock.RLock()
defer base.mLock.RUnlock()
data, ok := base.metaData[key]
if !ok {
return nil
}
return data
}
// SetMetaData 设置当前Function的临时数据
func (base *BaseFunction) SetMetaData(key string, value interface{}) {
base.mLock.Lock()
defer base.mLock.Unlock()
base.metaData[key] = value
}
```
## 8.2.3 Connector添加MetaData
首先,给KisConnector添加`metaData`成员,如下:
> kis-flow/conn/kis_connector.go
```go
type KisConnector struct {
// Connector ID
CId string
// Connector Name
CName string
// Connector Config
Conf *config.KisConnConfig
// Connector Init
onceInit sync.Once
// ++++++++++++++
// KisConnector的自定义临时数据
metaData map[string]interface{}
// 管理metaData的读写锁
mLock sync.RWMutex
}
// NewKisConnector 根据配置策略创建一个KisConnector
func NewKisConnector(config *config.KisConnConfig) *KisConnector {
conn := new(KisConnector)
conn.CId = id.KisID(common.KisIdTypeConnnector)
conn.CName = config.CName
conn.Conf = config
// +++++++++++++++++++++++++++++++++++
conn.metaData = make(map[string]interface{})
return conn
}
```
且在构造函数中进行对metaData的初始化。
其次,给Connector抽象层,提供获取和设置MetaData的接口,如下:
> kis-flow/kis/connector.go
```go
type Connector interface {
// Init 初始化Connector所关联的存储引擎链接等
Init() error
// Call 调用Connector 外挂存储逻辑的读写操作
Call(ctx context.Context, flow Flow, args interface{}) error
// GetId 获取Connector的ID
GetId() string
// GetName 获取Connector的名称
GetName() string
// GetConfig 获取Connector的配置信息
GetConfig() *config.KisConnConfig
// GetMetaData 得到当前Connector的临时数据
// +++++++++++++++++++++++++++++++
GetMetaData(key string) interface{}
// SetMetaData 设置当前Connector的临时数据
SetMetaData(key string, value interface{})
}
```
最后在KisConnector实现上述两个接口,如下:
> kis-flow/conn/kis_connector.go
```go
// GetMetaData 得到当前Connector的临时数据
func (conn *KisConnector) GetMetaData(key string) interface{} {
conn.mLock.RLock()
defer conn.mLock.RUnlock()
data, ok := conn.metaData[key]
if !ok {
return nil
}
return data
}
// SetMetaData 设置当前Connector的临时数据
func (conn *KisConnector) SetMetaData(key string, value interface{}) {
conn.mLock.Lock()
defer conn.mLock.Unlock()
conn.metaData[key] = value
}
```
## 8.3 Params 配置文件参数
KisFlow提供了配置文件中,在配置Flow、Function、Connector等的默认携带参数:Params。
如下:
Function:
```yaml
kistype: func
fname: funcName1
fmode: Verify
source:
name: 公众号抖音商城户订单数据
must:
- order_id
- user_id
option:
default_params:
default1: funcName1_param1
default2: funcName1_param2
```
Flow:
```yaml
kistype: flow
status: 1
flow_name: flowName1
flows:
- fname: funcName1
params:
myKey1: flowValue1-1
myKey2: flowValue1-2
- fname: funcName2
params:
myKey1: flowValue2-1
myKey2: flowValue2-2
- fname: funcName3
params:
myKey1: flowValue3-1
myKey2: flowValue3-2
```
Connector:
```yaml
kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
args1: value1
args2: value2
load: null
save:
- funcName2
```
这里面开发者均可以给定义的模块,提供Params,其中Flow提供的Params也会叠加到Function中去。
我们在之前构建Flow模块的时候,已经将这些参数读取进了每个模块的内存中,但是并没有给开发者暴露接口。
### 8.3.1 Flow添加Param读取接口
首先给Flow提供Param的查询接口:
> kis-flow/kis/flow.go
```go
type Flow interface {
// ... ...
// ... ...
// GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value
GetFuncParam(key string) string
// GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value
GetFuncParamAll() config.FParam
}
```
实现如下:
> kis-flow/flow/kis_flow_data.go
```go
// GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value
func (flow *KisFlow) GetFuncParam(key string) string {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
if param, ok := flow.funcParams[flow.ThisFunctionId]; ok {
if value, vok := param[key]; vok {
return value
}
}
return ""
}
// GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value
func (flow *KisFlow) GetFuncParamAll() config.FParam {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
param, ok := flow.funcParams[flow.ThisFunctionId]
if !ok {
return nil
}
return param
}
```
`GetFuncParam()`和 `GetFuncParamAll()`分别为取出一个key,和取出全部的参数,但都是取出当前正在执行的Function的Params参数。
### 8.3.2 单元测试
我们这里给FlowName1中的每个Function添加一些参数。
> kis-flow/test/load_conf/flow-FlowName1.yml
```yaml
kistype: flow
status: 1
flow_name: flowName1
flows:
- fname: funcName1
params:
myKey1: flowValue1-1
myKey2: flowValue1-2
- fname: funcName2
params:
myKey1: flowValue2-1
myKey2: flowValue2-2
- fname: funcName3
params:
myKey1: flowValue3-1
myKey2: flowValue3-2
```
然后再分别给这里面关联的Function依次配置一些默认的自定义配置参数,如下:
> kis-flow/test/load_conf/func/func-FuncName1.yml
```yaml
kistype: func
fname: funcName1
fmode: Verify
source:
name: 公众号抖音商城户订单数据
must:
- order_id
- user_id
option:
default_params:
default1: funcName1_param1
default2: funcName1_param2
```
> kis-flow/test/load_conf/func/func-FuncName2.yml
```yaml
kistype: func
fname: funcName2
fmode: Save
source:
name: 用户订单错误率
must:
- order_id
- user_id
option:
cname: ConnName1
default_params:
default1: funcName2_param1
default2: funcName2_param2
```
> kis-flow/test/load_conf/func/func-FuncName3.yml
```yaml
kistype: func
fname: funcName3
fmode: Calculate
source:
name: 用户订单错误率
must:
- order_id
- user_id
option:
default_params:
default1: funcName3_param1
default2: funcName3_param2
```
我们给FuncName2关联的Connector也配置一些Param参数,如下:
> kis-flow/test/load_conf/conn/conn-ConnName1.yml
```yaml
kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
args1: value1
args2: value2
load: null
save:
- funcName2
```
最后,为了验证我们的配置参数可以在Function执行的过程中被准确的取出,我们依次改造了每个Funciton和Connector的业务函数,把各自Param打印出来,如下:
> kis-flow/test/faas/faas_demo1.go
```go
func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName1Handler ----")
// ++++++++++++++++
fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())
for index, row := range flow.Input() {
// 打印数据
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
// 计算结果数据
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// 提交结果数据
_ = flow.CommitRow(resultStr)
}
return nil
}
```
> kis-flow/test/faas/faas_demo2.go
```go
func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName2Handler ----")
// ++++++++++++++++
fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())
for index, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
conn, err := flow.GetConnector()
if err != nil {
log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): GetConnector err = %s\n", err.Error())
return err
}
if conn.Call(ctx, flow, row) != nil {
log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error())
return err
}
// 计算结果数据
resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)
// 提交结果数据
_ = flow.CommitRow(resultStr)
}
return nil
}
```
> kis-flow/test/faas/faas_demo3.go
```go
func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
fmt.Println("---> Call funcName3Handler ----")
// ++++
fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())
for _, row := range flow.Input() {
str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
fmt.Println(str)
}
return nil
}
```
> kis-flow/test/caas/caas_demo1.go
```go
func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error {
fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n",
flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode)
// +++++++++++
fmt.Printf("Params = %+v\n", conn.GetConfig().Params)
fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args)
return nil
}
```
最后,我们来编写单元测试用例代码,如下:
> kis-flow/test/kis_params_test.go
```go
package test
import (
"context"
"kis-flow/common"
"kis-flow/file"
"kis-flow/kis"
"kis-flow/test/caas"
"kis-flow/test/faas"
"testing"
)
func TestParams(t *testing.T) {
ctx := context.Background()
// 0. 注册Function 回调业务
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. 注册ConnectorInit 和 Connector 回调业务
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("flowName1")
// 3. 提交原始数据
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
```
cd到`kis-flow/test/`下,执行
```bash
go test -test.v -test.paniconexit0 -test.run TestParams
```
结果如下:
```bash
=== RUN TestParams
....
....
---> Call funcName1Handler ----
Params = map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]
...
...
---> Call funcName2Handler ----
Params = map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]
...
...
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
...
...
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
...
...
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
...
...
---> Call funcName3Handler ----
Params = map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2]
...
...
--- PASS: TestParams (0.01s)
PASS
ok kis-flow/test 0.433s
```
我们可以看到,现在可以正确的取出各个层级的Params的配置参数了。
## 8.4 【V0.7】源代码
https://github.com/aceld/kis-flow/releases/tag/v0.7
---
作者:刘丹冰Aceld github: [https://github.com/aceld](https://github.com/aceld)
KisFlow开源项目地址:[https://github.com/aceld/kis-flow](https://github.com/aceld/kis-flow)
[Golang框架实战-KisFlow流式计算框架专栏](https://www.jianshu.com/c/c72f100773a6)
---
[Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df)
[Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a)
[Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7)
[Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc)
[Golang框架实战-KisFlow流式计算框架(5)-Function调度](https://www.jianshu.com/p/6f54966c9f1e)
[Golang框架实战-KisFlow流式计算框架(6)-Connector](https://www.jianshu.com/p/159b14219559)
[Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出](https://www.jianshu.com/p/8870598beed0)
[Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action](https://www.jianshu.com/p/6d43e9708752)
[Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数](https://www.jianshu.com/p/39f55a631d87)
有疑问加站长微信联系(非本文作者)