Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数

aceld · · 168 次点击 · 开始浏览    置顶

[Golang框架实战-KisFlow流式计算框架专栏](https://www.jianshu.com/c/c72f100773a6) --- [Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df) [Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a) [Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7) [Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc) [Golang框架实战-KisFlow流式计算框架(5)-Function调度](https://www.jianshu.com/p/6f54966c9f1e) [Golang框架实战-KisFlow流式计算框架(6)-Connector](https://www.jianshu.com/p/159b14219559) [Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出](https://www.jianshu.com/p/8870598beed0) [Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action](https://www.jianshu.com/p/6d43e9708752) [Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数](https://www.jianshu.com/p/39f55a631d87) --- ## 8.1 Flow Cache 数据流缓存 KisFlow也提供流式计算中的共享缓存,采用简单的本地缓存供开发者按需使用,有关本地缓存的第三方技术依赖选型: [https://github.com/patrickmn/go-cache](https://github.com/patrickmn/go-cache) ## 8.1.1 go-cache (1)安装 ```bash go get github.com/patrickmn/go-cache ``` (2)使用 ```go import ( "fmt" "github.com/patrickmn/go-cache" "time" ) func main() { // Create a cache with a default expiration time of 5 minutes, and which // purges expired items every 10 minutes c := cache.New(5*time.Minute, 10*time.Minute) // Set the value of the key "foo" to "bar", with the default expiration time c.Set("foo", "bar", cache.DefaultExpiration) // Set the value of the key "baz" to 42, with no expiration time // (the item won't be removed until it is re-set, or removed using // c.Delete("baz") c.Set("baz", 42, cache.NoExpiration) // Get the string associated with the key "foo" from the cache foo, found := c.Get("foo") if found { fmt.Println(foo) } // Since Go is statically typed, and cache values can be anything, type // assertion is needed when values are being passed to functions that don't // take arbitrary types, (i.e. interface{}). The simplest way to do this for // values which will only be used once--e.g. for passing to another // function--is: foo, found := c.Get("foo") if found { MyFunction(foo.(string)) } // This gets tedious if the value is used several times in the same function. // You might do either of the following instead: if x, found := c.Get("foo"); found { foo := x.(string) // ... } // or var foo string if x, found := c.Get("foo"); found { foo = x.(string) } // ... // foo can then be passed around freely as a string // Want performance? Store pointers! c.Set("foo", &MyStruct, cache.DefaultExpiration) if x, found := c.Get("foo"); found { foo := x.(*MyStruct) // ... } } ``` 详细参考:[https://github.com/patrickmn/go-cache](https://github.com/patrickmn/go-cache) ### 8.1.2 KisFlow集成go-cache能力 #### (1) Flow提供抽象层接口 在Flow中提供有关Cache的操作的接口,如下: > kis-flow/kis/flow.go ```go type Flow interface { // Run 调度Flow,依次调度Flow中的Function并且执行 Run(ctx context.Context) error // Link 将Flow中的Function按照配置文件中的配置进行连接 Link(fConf *config.KisFuncConfig, fParams config.FParam) error // CommitRow 提交Flow数据到即将执行的Function层 CommitRow(row interface{}) error // Input 得到flow当前执行Function的输入源数据 Input() common.KisRowArr // GetName 得到Flow的名称 GetName() string // GetThisFunction 得到当前正在执行的Function GetThisFunction() Function // GetThisFuncConf 得到当前正在执行的Function的配置 GetThisFuncConf() *config.KisFuncConfig // GetConnector 得到当前正在执行的Function的Connector GetConnector() (Connector, error) // GetConnConf 得到当前正在执行的Function的Connector的配置 GetConnConf() (*config.KisConnConfig, error) // GetConfig 得到当前Flow的配置 GetConfig() *config.KisFlowConfig // GetFuncConfigByName 得到当前Flow的配置 GetFuncConfigByName(funcName string) *config.KisFuncConfig // Next 当前Flow执行到的Function进入下一层Function所携带的Action动作 Next(acts ...ActionFunc) error // ++++++++++++++++++++++++++++++++++++++++ // GetCacheData 得到当前Flow的缓存数据 GetCacheData(key string) interface{} // SetCacheData 设置当前Flow的缓存数据 SetCacheData(key string, value interface{}, Exp time.Duration) } ``` `SetCacheData()`为设置本地缓存,Exp为超时时间,如果Exp为0,则为永久。 `GetCacheData()`为读取本地缓存。 #### (2)提供一些常量 提供有关缓存超时时间的一些常量。 > kis-flow/common/const.go ```go // cache const ( // DeFaultFlowCacheCleanUp KisFlow中Flow对象Cache缓存默认的清理内存时间 DeFaultFlowCacheCleanUp = 5 //单位 min // DefaultExpiration 默认GoCahce时间 ,永久保存 DefaultExpiration time.Duration = 0 ) ``` #### (3) KisFlow新增成员及初始化 > #### kis-flow/flow/kis_flow.go ```go // KisFlow 用于贯穿整条流式计算的上下文环境 type KisFlow struct { // ... ... // ... ... // flow的本地缓存 cache *cache.Cache // Flow流的临时缓存上线文环境 } // NewKisFlow 创建一个KisFlow. func NewKisFlow(conf *config.KisFlowConfig) kis.Flow { flow := new(KisFlow) // ... ... // ... ... // 初始化本地缓存 flow.cache = cache.New(cache.NoExpiration, common.DeFaultFlowCacheCleanUp*time.Minute) return flow } ``` #### (4)实现接口 最后实现有关缓存读写操作的两个接口,代码如下: > kis-flow/flow/kis_flow_data.go ```go func (flow *KisFlow) GetCacheData(key string) interface{} { if data, found := flow.cache.Get(key); found { return data } return nil } func (flow *KisFlow) SetCacheData(key string, value interface{}, Exp time.Duration) { if Exp == common.DefaultExpiration { flow.cache.Set(key, value, cache.DefaultExpiration) } else { flow.cache.Set(key, value, Exp) } } ``` ## 8.2 MetaData 临时缓存参数 MetaData我们定义为Flow、Function、Connector每个层级都会提供一个`map[string]interface{}` 的结构来存放临时数据,这个数据的生命周期与各个实例的生命周期一致。 ## 8.2.1 Flow添加MetaData 首先,KisFlow的成员新增`metaData map[string]interface{}`和对应的读写锁。 > kis-flow/flow/kis_flow.go ```go // KisFlow 用于贯穿整条流式计算的上下文环境 type KisFlow struct { // ... ... // ... ... // +++++++++++++++++++++++++++++++++++++++++++ // flow的metaData metaData map[string]interface{} // Flow的自定义临时数据 mLock sync.RWMutex // 管理metaData的读写锁 } ``` 且在KisFlow的构造函数下对`metaData`成员进行内存初始化,如下: > kis-flow/flow/kis_flow.go ```go // NewKisFlow 创建一个KisFlow. func NewKisFlow(conf *config.KisFlowConfig) kis.Flow { flow := new(KisFlow) // ... ... // ... ... // ++++++++++++++++++++++++++++++++++++++ // 初始化临时数据 flow.metaData = make(map[string]interface{}) return flow } ``` 之后,给Flow添加MetaData的读写接口,实现非常的简单,如下: > kis-flow/kis/flow.go ```go type Flow interface { // Run 调度Flow,依次调度Flow中的Function并且执行 Run(ctx context.Context) error // Link 将Flow中的Function按照配置文件中的配置进行连接 Link(fConf *config.KisFuncConfig, fParams config.FParam) error // CommitRow 提交Flow数据到即将执行的Function层 CommitRow(row interface{}) error // Input 得到flow当前执行Function的输入源数据 Input() common.KisRowArr // GetName 得到Flow的名称 GetName() string // GetThisFunction 得到当前正在执行的Function GetThisFunction() Function // GetThisFuncConf 得到当前正在执行的Function的配置 GetThisFuncConf() *config.KisFuncConfig // GetConnector 得到当前正在执行的Function的Connector GetConnector() (Connector, error) // GetConnConf 得到当前正在执行的Function的Connector的配置 GetConnConf() (*config.KisConnConfig, error) // GetConfig 得到当前Flow的配置 GetConfig() *config.KisFlowConfig // GetFuncConfigByName 得到当前Flow的配置 GetFuncConfigByName(funcName string) *config.KisFuncConfig // Next 当前Flow执行到的Function进入下一层Function所携带的Action动作 Next(acts ...ActionFunc) error // GetCacheData 得到当前Flow的缓存数据 GetCacheData(key string) interface{} // SetCacheData 设置当前Flow的缓存数据 SetCacheData(key string, value interface{}, Exp time.Duration) // ++++++++++++++++++++++++++++ // GetMetaData 得到当前Flow的临时数据 GetMetaData(key string) interface{} // SetMetaData 设置当前Flow的临时数据 SetMetaData(key string, value interface{}) } ``` 定义接口`GetMetaData()`、`SetMetaData()`,分别作为读写接口。 最后来实现,如下: > kis-flow/flow/kis_flow_data.go ```go // GetMetaData 得到当前Flow对象的临时数据 func (flow *KisFlow) GetMetaData(key string) interface{} { flow.mLock.RLock() defer flow.mLock.RUnlock() data, ok := flow.metaData[key] if !ok { return nil } return data } // SetMetaData 设置当前Flow对象的临时数据 func (flow *KisFlow) SetMetaData(key string, value interface{}) { flow.mLock.Lock() defer flow.mLock.Unlock() flow.metaData[key] = value } ``` ## 8.2.2 Function 添加MetaData 首先在BaseFunciton中添加成员metaData,如下: > kis-flow/function/kis_base_funciton.go ```go type BaseFunction struct { // Id , KisFunction的实例ID,用于KisFlow内部区分不同的实例对象 Id string Config *config.KisFuncConfig // flow flow kis.Flow //上下文环境KisFlow // connector connector kis.Connector // ++++++++++++++++++++++++ // Function的自定义临时数据 metaData map[string]interface{} // 管理metaData的读写锁 mLock sync.RWMutex // link N kis.Function //下一个流计算Function P kis.Function //上一个流计算Function ``` 在Funciton构造函数的地方,这里需要进行改进下,每个具体的Funciton都需要一个构造函数来初始化`metaData`成员,改动如下: > kis-flow/function/kis_base_function.go ```go func NewKisFunction(flow kis.Flow, config *config.KisFuncConfig) kis.Function { var f kis.Function //工厂生产泛化对象 // ++++++++++++++ switch common.KisMode(config.FMode) { case common.V: f = NewKisFunctionV() // +++ case common.S: f = NewKisFunctionS() // +++ case common.L: f = NewKisFunctionL() // +++ case common.C: f = NewKisFunctionC() // +++ case common.E: f = NewKisFunctionE() // +++ default: //LOG ERROR return nil } // 生成随机实例唯一ID f.CreateId() // 设置基础信息属性 if err := f.SetConfig(config); err != nil { panic(err) } // 设置Flow if err := f.SetFlow(flow); err != nil { panic(err) } return f } ``` 其中每个构造函数如下: > kis-flow/function/kis_function_c.go ```go func NewKisFunctionC() kis.Function { f := new(KisFunctionC) // 初始化metaData f.metaData = make(map[string]interface{}) return f } ``` > kis-flow/function/kis_function_v.go ```go func NewKisFunctionV() kis.Function { f := new(KisFunctionV) // 初始化metaData f.metaData = make(map[string]interface{}) return f } ``` > kis-flow/function/kis_function_e.go ```go func NewKisFunctionE() kis.Function { f := new(KisFunctionE) // 初始化metaData f.metaData = make(map[string]interface{}) return f } ``` > kis-flow/function/kis_function_s.go ```go func NewKisFunctionS() kis.Function { f := new(KisFunctionS) // 初始化metaData f.metaData = make(map[string]interface{}) return f } ``` > kis-flow/function/kis_function_l.go ```go func NewKisFunctionL() kis.Function { f := new(KisFunctionL) // 初始化metaData f.metaData = make(map[string]interface{}) return f } ``` 接下来,给Funciton抽象层,添加获取`metaData`成员的接口,如下: > kis-flow/kis/function.go ```go type Function interface { // Call 执行流式计算逻辑 Call(ctx context.Context, flow Flow) error // SetConfig 给当前Function实例配置策略 SetConfig(s *config.KisFuncConfig) error // GetConfig 获取当前Function实例配置策略 GetConfig() *config.KisFuncConfig // SetFlow 给当前Function实例设置所依赖的Flow实例 SetFlow(f Flow) error // GetFlow 获取当前Functioin实力所依赖的Flow GetFlow() Flow // AddConnector 给当前Function实例添加一个Connector AddConnector(conn Connector) error // GetConnector 获取当前Function实例所关联的Connector GetConnector() Connector // CreateId 给当前Funciton实力生成一个随机的实例KisID CreateId() // GetId 获取当前Function的FID GetId() string // GetPrevId 获取当前Function上一个Function节点FID GetPrevId() string // GetNextId 获取当前Function下一个Function节点FID GetNextId() string // Next 返回下一层计算流Function,如果当前层为最后一层,则返回nil Next() Function // Prev 返回上一层计算流Function,如果当前层为最后一层,则返回nil Prev() Function // SetN 设置下一层Function实例 SetN(f Function) // SetP 设置上一层Function实例 SetP(f Function) // ++++++++++++++++++++++++++++++++++ // GetMetaData 得到当前Function的临时数据 GetMetaData(key string) interface{} // SetMetaData 设置当前Function的临时数据 SetMetaData(key string, value interface{}) } ``` 对上述新增的两个接口的实现,在BaseFunction中实现就可以了。 > kis-flow/funciton/kis_base_function.go ```go // GetMetaData 得到当前Function的临时数据 func (base *BaseFunction) GetMetaData(key string) interface{} { base.mLock.RLock() defer base.mLock.RUnlock() data, ok := base.metaData[key] if !ok { return nil } return data } // SetMetaData 设置当前Function的临时数据 func (base *BaseFunction) SetMetaData(key string, value interface{}) { base.mLock.Lock() defer base.mLock.Unlock() base.metaData[key] = value } ``` ## 8.2.3 Connector添加MetaData 首先,给KisConnector添加`metaData`成员,如下: > kis-flow/conn/kis_connector.go ```go type KisConnector struct { // Connector ID CId string // Connector Name CName string // Connector Config Conf *config.KisConnConfig // Connector Init onceInit sync.Once // ++++++++++++++ // KisConnector的自定义临时数据 metaData map[string]interface{} // 管理metaData的读写锁 mLock sync.RWMutex } // NewKisConnector 根据配置策略创建一个KisConnector func NewKisConnector(config *config.KisConnConfig) *KisConnector { conn := new(KisConnector) conn.CId = id.KisID(common.KisIdTypeConnnector) conn.CName = config.CName conn.Conf = config // +++++++++++++++++++++++++++++++++++ conn.metaData = make(map[string]interface{}) return conn } ``` 且在构造函数中进行对metaData的初始化。 其次,给Connector抽象层,提供获取和设置MetaData的接口,如下: > kis-flow/kis/connector.go ```go type Connector interface { // Init 初始化Connector所关联的存储引擎链接等 Init() error // Call 调用Connector 外挂存储逻辑的读写操作 Call(ctx context.Context, flow Flow, args interface{}) error // GetId 获取Connector的ID GetId() string // GetName 获取Connector的名称 GetName() string // GetConfig 获取Connector的配置信息 GetConfig() *config.KisConnConfig // GetMetaData 得到当前Connector的临时数据 // +++++++++++++++++++++++++++++++ GetMetaData(key string) interface{} // SetMetaData 设置当前Connector的临时数据 SetMetaData(key string, value interface{}) } ``` 最后在KisConnector实现上述两个接口,如下: > kis-flow/conn/kis_connector.go ```go // GetMetaData 得到当前Connector的临时数据 func (conn *KisConnector) GetMetaData(key string) interface{} { conn.mLock.RLock() defer conn.mLock.RUnlock() data, ok := conn.metaData[key] if !ok { return nil } return data } // SetMetaData 设置当前Connector的临时数据 func (conn *KisConnector) SetMetaData(key string, value interface{}) { conn.mLock.Lock() defer conn.mLock.Unlock() conn.metaData[key] = value } ``` ## 8.3 Params 配置文件参数 KisFlow提供了配置文件中,在配置Flow、Function、Connector等的默认携带参数:Params。 如下: Function: ```yaml kistype: func fname: funcName1 fmode: Verify source: name: 公众号抖音商城户订单数据 must: - order_id - user_id option: default_params: default1: funcName1_param1 default2: funcName1_param2 ``` Flow: ```yaml kistype: flow status: 1 flow_name: flowName1 flows: - fname: funcName1 params: myKey1: flowValue1-1 myKey2: flowValue1-2 - fname: funcName2 params: myKey1: flowValue2-1 myKey2: flowValue2-2 - fname: funcName3 params: myKey1: flowValue3-1 myKey2: flowValue3-2 ``` Connector: ```yaml kistype: conn cname: ConnName1 addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990' type: redis key: redis-key params: args1: value1 args2: value2 load: null save: - funcName2 ``` 这里面开发者均可以给定义的模块,提供Params,其中Flow提供的Params也会叠加到Function中去。 我们在之前构建Flow模块的时候,已经将这些参数读取进了每个模块的内存中,但是并没有给开发者暴露接口。 ### 8.3.1 Flow添加Param读取接口 首先给Flow提供Param的查询接口: > kis-flow/kis/flow.go ```go type Flow interface { // ... ... // ... ... // GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value GetFuncParam(key string) string // GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value GetFuncParamAll() config.FParam } ``` 实现如下: > kis-flow/flow/kis_flow_data.go ```go // GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value func (flow *KisFlow) GetFuncParam(key string) string { flow.fplock.RLock() defer flow.fplock.RUnlock() if param, ok := flow.funcParams[flow.ThisFunctionId]; ok { if value, vok := param[key]; vok { return value } } return "" } // GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value func (flow *KisFlow) GetFuncParamAll() config.FParam { flow.fplock.RLock() defer flow.fplock.RUnlock() param, ok := flow.funcParams[flow.ThisFunctionId] if !ok { return nil } return param } ``` `GetFuncParam()`和 `GetFuncParamAll()`分别为取出一个key,和取出全部的参数,但都是取出当前正在执行的Function的Params参数。 ### 8.3.2 单元测试 我们这里给FlowName1中的每个Function添加一些参数。 > kis-flow/test/load_conf/flow-FlowName1.yml ```yaml kistype: flow status: 1 flow_name: flowName1 flows: - fname: funcName1 params: myKey1: flowValue1-1 myKey2: flowValue1-2 - fname: funcName2 params: myKey1: flowValue2-1 myKey2: flowValue2-2 - fname: funcName3 params: myKey1: flowValue3-1 myKey2: flowValue3-2 ``` 然后再分别给这里面关联的Function依次配置一些默认的自定义配置参数,如下: > kis-flow/test/load_conf/func/func-FuncName1.yml ```yaml kistype: func fname: funcName1 fmode: Verify source: name: 公众号抖音商城户订单数据 must: - order_id - user_id option: default_params: default1: funcName1_param1 default2: funcName1_param2 ``` > kis-flow/test/load_conf/func/func-FuncName2.yml ```yaml kistype: func fname: funcName2 fmode: Save source: name: 用户订单错误率 must: - order_id - user_id option: cname: ConnName1 default_params: default1: funcName2_param1 default2: funcName2_param2 ``` > kis-flow/test/load_conf/func/func-FuncName3.yml ```yaml kistype: func fname: funcName3 fmode: Calculate source: name: 用户订单错误率 must: - order_id - user_id option: default_params: default1: funcName3_param1 default2: funcName3_param2 ``` 我们给FuncName2关联的Connector也配置一些Param参数,如下: > kis-flow/test/load_conf/conn/conn-ConnName1.yml ```yaml kistype: conn cname: ConnName1 addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990' type: redis key: redis-key params: args1: value1 args2: value2 load: null save: - funcName2 ``` 最后,为了验证我们的配置参数可以在Function执行的过程中被准确的取出,我们依次改造了每个Funciton和Connector的业务函数,把各自Param打印出来,如下: > kis-flow/test/faas/faas_demo1.go ```go func FuncDemo1Handler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call funcName1Handler ----") // ++++++++++++++++ fmt.Printf("Params = %+v\n", flow.GetFuncParamAll()) for index, row := range flow.Input() { // 打印数据 str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) fmt.Println(str) // 计算结果数据 resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index) // 提交结果数据 _ = flow.CommitRow(resultStr) } return nil } ``` > kis-flow/test/faas/faas_demo2.go ```go func FuncDemo2Handler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call funcName2Handler ----") // ++++++++++++++++ fmt.Printf("Params = %+v\n", flow.GetFuncParamAll()) for index, row := range flow.Input() { str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) fmt.Println(str) conn, err := flow.GetConnector() if err != nil { log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): GetConnector err = %s\n", err.Error()) return err } if conn.Call(ctx, flow, row) != nil { log.Logger().ErrorFX(ctx, "FuncDemo2Handler(): Call err = %s\n", err.Error()) return err } // 计算结果数据 resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index) // 提交结果数据 _ = flow.CommitRow(resultStr) } return nil } ``` > kis-flow/test/faas/faas_demo3.go ```go func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error { fmt.Println("---> Call funcName3Handler ----") // ++++ fmt.Printf("Params = %+v\n", flow.GetFuncParamAll()) for _, row := range flow.Input() { str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row) fmt.Println(str) } return nil } ``` > kis-flow/test/caas/caas_demo1.go ```go func CaasDemoHanler1(ctx context.Context, conn kis.Connector, fn kis.Function, flow kis.Flow, args interface{}) error { fmt.Printf("===> In CaasDemoHanler1: flowName: %s, cName:%s, fnName:%s, mode:%s\n", flow.GetName(), conn.GetName(), fn.GetConfig().FName, fn.GetConfig().FMode) // +++++++++++ fmt.Printf("Params = %+v\n", conn.GetConfig().Params) fmt.Printf("===> Call Connector CaasDemoHanler1, args from funciton: %s\n", args) return nil } ``` 最后,我们来编写单元测试用例代码,如下: > kis-flow/test/kis_params_test.go ```go package test import ( "context" "kis-flow/common" "kis-flow/file" "kis-flow/kis" "kis-flow/test/caas" "kis-flow/test/faas" "testing" ) func TestParams(t *testing.T) { ctx := context.Background() // 0. 注册Function 回调业务 kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) // 0. 注册ConnectorInit 和 Connector 回调业务 kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) // 1. 加载配置文件并构建Flow if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { panic(err) } // 2. 获取Flow flow1 := kis.Pool().GetFlow("flowName1") // 3. 提交原始数据 _ = flow1.CommitRow("This is Data1 from Test") _ = flow1.CommitRow("This is Data2 from Test") _ = flow1.CommitRow("This is Data3 from Test") // 4. 执行flow1 if err := flow1.Run(ctx); err != nil { panic(err) } } ``` cd到`kis-flow/test/`下,执行 ```bash go test -test.v -test.paniconexit0 -test.run TestParams ``` 结果如下: ```bash === RUN TestParams .... .... ---> Call funcName1Handler ---- Params = map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] ... ... ---> Call funcName2Handler ---- Params = map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2] ... ... ===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save Params = map[args1:value1 args2:value2] ... ... ===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save Params = map[args1:value1 args2:value2] ... ... ===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save Params = map[args1:value1 args2:value2] ... ... ---> Call funcName3Handler ---- Params = map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] ... ... --- PASS: TestParams (0.01s) PASS ok kis-flow/test 0.433s ``` 我们可以看到,现在可以正确的取出各个层级的Params的配置参数了。 ## 8.4 【V0.7】源代码 https://github.com/aceld/kis-flow/releases/tag/v0.7 --- 作者:刘丹冰Aceld github: [https://github.com/aceld](https://github.com/aceld) KisFlow开源项目地址:[https://github.com/aceld/kis-flow](https://github.com/aceld/kis-flow) [Golang框架实战-KisFlow流式计算框架专栏](https://www.jianshu.com/c/c72f100773a6) --- [Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df) [Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a) [Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7) [Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc) [Golang框架实战-KisFlow流式计算框架(5)-Function调度](https://www.jianshu.com/p/6f54966c9f1e) [Golang框架实战-KisFlow流式计算框架(6)-Connector](https://www.jianshu.com/p/159b14219559) [Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出](https://www.jianshu.com/p/8870598beed0) [Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action](https://www.jianshu.com/p/6d43e9708752) [Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数](https://www.jianshu.com/p/39f55a631d87)

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

168 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传