Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出

aceld · · 120 次点击 · 开始浏览    置顶

[Golang框架实战-KisFlow流式计算框架专栏](https://www.jianshu.com/c/c72f100773a6) --- [Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df) [Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a) [Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7) [Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc) [Golang框架实战-KisFlow流式计算框架(5)-Function调度](https://www.jianshu.com/p/6f54966c9f1e) [Golang框架实战-KisFlow流式计算框架(6)-Connector](https://www.jianshu.com/p/159b14219559) [Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出](https://www.jianshu.com/p/8870598beed0) --- ## 6.1 配置的导入 现在每次建立Flow和Function等,都需要一系列繁琐的添加,不是很方便,接下来,我们可以通过批量读写配置文件,构建KisFlow中的结构关系,并且也可以将KisFlow的结构导出到本地文件中。目前我们先用文件的形式做配置的持久化,开发者也可以今后做数据库或者远程配置的持久化均可。 ## 6.1 配置的导入 ### 6.1.1 创建配置文件 首先我们在`kis-flow/test/load_conf/`下创建需要加载的kisflow业务配置文件。 在`kis-flow/test/load_conf/`下分别创建`conn/`、`flow/`、`func/`三个文件夹分别存放Connector、Flow、Funciton的配置信息。 ```bash ├── conn │ └── conn-ConnName1.yml ├── flow │ └── flow-FlowName1.yml └── func ├── func-FuncName1.yml ├── func-FuncName2.yml └── func-FuncName3.yml ``` 分别创建一些`yml`文件。具体内容如下: #### A.Function > kis-flow/test/load_conf/func/func-FuncNam1.yml ```yaml kistype: func fname: funcName1 fmode: Verify source: name: 公众号抖音商城户订单数据 must: - order_id - user_id ``` > kis-flow/test/load_conf/func/func-FuncNam2.yml ```yaml kistype: func fname: funcName2 fmode: Save source: name: 用户订单错误率 must: - order_id - user_id option: cname: ConnName1 ``` > kis-flow/test/load_conf/func/func-FuncNam2.yml ```yaml kistype: func fname: funcName2 fmode: Save source: name: 用户订单错误率 must: - order_id - user_id option: cname: ConnName1 ``` > kis-flow/test/load_conf/func/func-FuncNam3.yml ```yaml kistype: func fname: funcName3 fmode: Calculate source: name: 用户订单错误率 must: - order_id - user_id ``` #### B.Connector > kis-flow/test/load_conf/func/func-ConnName1.yml ```yaml kistype: conn cname: ConnName1 addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990' type: redis key: redis-key params: args1: value1 args2: value2 load: null save: - funcName2 ``` #### C.Flow > kis-flow/test/load_conf/func/func-FlowName1.yml ```yaml kistype: flow status: 1 flow_name: flowName1 flows: - fname: funcName1 - fname: funcName2 - fname: funcName3 ``` ### 6.1.2 配置解析 创建`kis-flow/file/`目录,且创建`kis-flow/file/config_import.go`文件。 首先定义一个可以存放全部配置的接口: > kis-flow/file/config_import.go ```go type allConfig struct { Flows map[string]*config.KisFlowConfig Funcs map[string]*config.KisFuncConfig Conns map[string]*config.KisConnConfig } ``` key作为各个模块的Name名称字段。 然后分别定义解析Flow、Function、Connector配置的方法。yaml的第三方库,我们用`"gopkg.in/yaml.v3"`这个库。 > kis-flow/go.mod ```go module kis-flow go 1.18 require github.com/google/uuid v1.5.0 require gopkg.in/yaml.v3 v3.0.1 // indirect ``` #### A. Flow 配置解析 > kis-flow/file/config_import.go ```go // kisTypeFlowConfigure 解析Flow配置文件,yaml格式 func kisTypeFlowConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error { flow := new(config.KisFlowConfig) if ok := yaml.Unmarshal(confData, flow); ok != nil { return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType)) } // 如果FLow状态为关闭,则不做配置加载 if common.KisOnOff(flow.Status) == common.FlowDisable { return nil } if _, ok := all.Flows[flow.FlowName]; ok { return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flow.FlowName)) } // 加入配置集合中 all.Flows[flow.FlowName] = flow return nil } ``` - confData:是文件二进制数据 - fileName:是文件路径 - kistype: 为配置文件类别 `kisTypeFlowConfigure` 会将配置信息解析到allConfig的Flows成员中。 同理,Function和Connector的解析办法如下。 #### B. Functioin配置解析 > kis-flow/file/config_import.go ```go // kisTypeFuncConfigure 解析Function配置文件,yaml格式 func kisTypeFuncConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error { function := new(config.KisFuncConfig) if ok := yaml.Unmarshal(confData, function); ok != nil { return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType)) } if _, ok := all.Funcs[function.FName]; ok { return errors.New(fmt.Sprintf("%s set repeat function_id:%s", fileName, function.FName)) } // 加入配置集合中 all.Funcs[function.FName] = function return nil } ``` #### C. Connector配置解析 > kis-flow/file/config_import.go ```go // kisTypeConnConfigure 解析Connector配置文件,yaml格式 func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error { conn := new(config.KisConnConfig) if ok := yaml.Unmarshal(confData, conn); ok != nil { return errors.New(fmt.Sprintf("%s is wrong format nsType = %s", fileName, kisType)) } if _, ok := all.Conns[conn.CName]; ok { return errors.New(fmt.Sprintf("%s set repeat conn_id:%s", fileName, conn.CName)) } // 加入配置集合中 all.Conns[conn.CName] = conn return nil } ``` ### 6.1.3 遍历文件 下面实现一个遍历一个路径`loadPath`下面所有的yml和yaml类型文件,按照kistype类别解析配置信息到allConfig中。 > kis-flow/file/config_import.go ```go // parseConfigWalkYaml 全盘解析配置文件,yaml格式, 讲配置信息解析到allConfig中 func parseConfigWalkYaml(loadPath string) (*allConfig, error) { all := new(allConfig) all.Flows = make(map[string]*config.KisFlowConfig) all.Funcs = make(map[string]*config.KisFuncConfig) all.Conns = make(map[string]*config.KisConnConfig) err := filepath.Walk(loadPath, func(filePath string, info os.FileInfo, err error) error { // 校验文件后缀是否合法 if suffix := path.Ext(filePath); suffix != ".yml" && suffix != ".yaml" { return nil } // 读取文件内容 confData, err := ioutil.ReadFile(filePath) if err != nil { return err } confMap := make(map[string]interface{}) // 校验yaml合法性 if err := yaml.Unmarshal(confData, confMap); err != nil { return err } // 判断kisType是否存在 if kisType, ok := confMap["kistype"]; !ok { return errors.New(fmt.Sprintf("yaml file %s has no file [kistype]!", filePath)) } else { switch kisType { case common.KisIdTypeFlow: return kisTypeFlowConfigure(all, confData, filePath, kisType) case common.KisIdTypeFunction: return kisTypeFuncConfigure(all, confData, filePath, kisType) case common.KisIdTypeConnnector: return kisTypeConnConfigure(all, confData, filePath, kisType) default: return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType)) } } }) if err != nil { return nil, err } return all, nil } ``` ### 6.1.4 导入方法 下面提供一个对外的公开方法`ConfigImportYaml`,需要提供一个导入的文件根路径。 > kis-flow/file/config_import.go ```go // ConfigImportYaml 全盘解析配置文件,yaml格式 func ConfigImportYaml(loadPath string) error { all, err := parseConfigWalkYaml(loadPath) if err != nil { return err } for flowName, flowConfig := range all.Flows { // 构建一个Flow newFlow := flow.NewKisFlow(flowConfig) for _, fp := range flowConfig.Flows { if err := buildFlow(all, fp, newFlow, flowName); err != nil { return err } } //将flow添加到FlowPool中 kis.Pool().AddFlow(flowName, newFlow) } return nil } ``` 首先会调用`parseConfigWalkYaml()`将全部的配置信息加载到内存中。 其次,遍历所有的Flow,依次去构建Flow,最终将flow添加到Pool当中,具体的构建流程如下: > kis-flow/file/config_import.go ```go func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, flowName string) error { //加载当前Flow依赖的Function if funcConfig, ok := all.Funcs[fp.FuncName]; !ok { return errors.New(fmt.Sprintf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName)) } else { //flow add connector if funcConfig.Option.CName != "" { // 加载当前Function依赖的Connector if connConf, ok := all.Conns[funcConfig.Option.CName]; !ok { return errors.New(fmt.Sprintf("FuncName [%s] need ConnName [%s], But has No This ConnName Config", fp.FuncName, funcConfig.Option.CName)) } else { // Function Config 关联 Connector Config _ = funcConfig.AddConnConfig(connConf) } } //flow add function if err := newFlow.Link(funcConfig, fp.Params); err != nil { return err } } return nil } ``` ## 6.2 配置导入单元测试 创建单元测试文件 `kis-flow/test/kis_config_import_test.go`。 > kis-flow/test/kis_config_import_test.go ```go package test import ( "context" "kis-flow/common" "kis-flow/file" "kis-flow/kis" "kis-flow/test/caas" "kis-flow/test/faas" "testing" ) func TestConfigImportYmal(t *testing.T) { ctx := context.Background() // 0. 注册Function 回调业务 kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) // 0. 注册ConnectorInit 和 Connector 回调业务 kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) // 1. 加载配置文件并构建Flow if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil { panic(err) } // 2. 获取Flow flow1 := kis.Pool().GetFlow("flowName1") // 3. 提交原始数据 _ = flow1.CommitRow("This is Data1 from Test") _ = flow1.CommitRow("This is Data2 from Test") _ = flow1.CommitRow("This is Data3 from Test") // 4. 执行flow1 if err := flow1.Run(ctx); err != nil { panic(err) } } ``` 先注册业务方法。然后通过ConfigImportYaml加载配置,之后从Pool中得到flow实例,提交数据,运行。 > 注意,这里的配置文件路径,写的是绝对路径。 cd 到`kis-flow/test/`目录下,执行指令: ```bash go test -test.v -test.paniconexit0 -test.run TestConfigImportYmal ``` 结果如下: ```bash === RUN TestConfigImportYmal Add KisPool FuncName=funcName1 Add KisPool FuncName=funcName2 Add KisPool FuncName=funcName3 Add KisPool CaaSInit CName=ConnName1 Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save ===> Call Connector InitDemo1 &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]} Add FlowRouter FlowName=flowName1 context.Background ====> After CommitSrcData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] KisFunctionV, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114960 ThisFunctionId:func-37c7070f45144529891d433ae9c4ebfc PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]} ---> Call funcName1Handler ---- In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data1 from Test In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data2 from Test In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data3 from Test context.Background ====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] KisFunctionS, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0001149c0 ThisFunctionId:func-5315301ffbbb4ae4be021729ddff1569 PrevFunctionId:func-37c7070f45144529891d433ae9c4ebfc funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]} ---> Call funcName2Handler ---- In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 0 ===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save ===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0 In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 1 ===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save ===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1 In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 2 ===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save ===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2 context.Background ====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] KisFunctionC, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114a20 ThisFunctionId:func-89a6a662729b4a0895e849c40bf29892 PrevFunctionId:func-5315301ffbbb4ae4be021729ddff1569 funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]} ---> Call funcName3Handler ---- In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 0 In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 1 In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 2 --- PASS: TestConfigImportYmal (0.01s) PASS ok kis-flow/test 0.517s ``` 预期的结果和我们一致,现在我们可以通过配置文件进行加载且构建KisFlow了。 ## 6.3 配置的导出 ### 6.3.1 导出实现 > kis-flow/file/config_export.go ```go package file import ( "errors" "fmt" "gopkg.in/yaml.v3" "io/ioutil" "kis-flow/common" "kis-flow/kis" ) // ConfigExportYaml 将flow配置输出,且存储本地 func ConfigExportYaml(flow kis.Flow, savaPath string) error { if data, err := yaml.Marshal(flow.GetConfig()); err != nil { return err } else { //flow err := ioutil.WriteFile(savaPath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644) if err != nil { return err } //function for _, fp := range flow.GetConfig().Flows { fConf := flow.GetFuncConfigByName(fp.FuncName) if fConf == nil { return errors.New(fmt.Sprintf("function name = %s config is nil ", fp.FuncName)) } if fdata, err := yaml.Marshal(fConf); err != nil { return err } else { if err := ioutil.WriteFile(savaPath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil { return err } } // Connector if fConf.Option.CName != "" { cConf, err := fConf.GetConnConfig() if err != nil { return err } if cdata, err := yaml.Marshal(cConf); err != nil { return err } else { if err := ioutil.WriteFile(savaPath+common.KisIdTypeConnnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil { return err } } } } } return nil } ``` 这里面需要补充一些接口,如下: ### 6.3.2 Flow新增接口 > kis-flow/kis/flow.go ```go package kis import ( "context" "kis-flow/common" "kis-flow/config" ) type Flow interface { // Run 调度Flow,依次调度Flow中的Function并且执行 Run(ctx context.Context) error // Link 将Flow中的Function按照配置文件中的配置进行连接 Link(fConf *config.KisFuncConfig, fParams config.FParam) error // CommitRow 提交Flow数据到即将执行的Function层 CommitRow(row interface{}) error // Input 得到flow当前执行Function的输入源数据 Input() common.KisRowArr // GetName 得到Flow的名称 GetName() string // GetThisFunction 得到当前正在执行的Function GetThisFunction() Function // GetThisFuncConf 得到当前正在执行的Function的配置 GetThisFuncConf() *config.KisFuncConfig // GetConnector 得到当前正在执行的Function的Connector GetConnector() (Connector, error) // GetConnConf 得到当前正在执行的Function的Connector的配置 GetConnConf() (*config.KisConnConfig, error) // +++++++++++++++++++++++++++++++ // GetConfig 得到当前Flow的配置 GetConfig() *config.KisFlowConfig // GetFuncConfigByName 得到当前Flow的配置 GetFuncConfigByName(funcName string) *config.KisFuncConfig // +++++++++++++++++++++++++++++++ } ``` flow新增的接口实现如下: > kis-flow/flow/kis_flow.go ```go func (flow *KisFlow) GetConfig() *config.KisFlowConfig { return flow.Conf } // GetFuncConfigByName 得到当前Flow的配置 func (flow *KisFlow) GetFuncConfigByName(funcName string) *config.KisFuncConfig { if f, ok := flow.Funcs[funcName]; ok { return f.GetConfig() } else { log.Logger().ErrorF("GetFuncConfigByName(): Function %s not found", funcName) return nil } } ``` ### 6.3.3 KisFlow中Funcs修复 这里面之前有个笔误。 > kis-flow/flow/kis_flow.go ```go // KisFlow 用于贯穿整条流式计算的上下文环境 type KisFlow struct { // 基础信息 Id string // Flow的分布式实例ID(用于KisFlow内部区分不同实例) Name string // Flow的可读名称 Conf *config.KisFlowConfig // Flow配置策略 // Function列表 Funcs map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName FlowHead kis.Function // 当前Flow所拥有的Function列表表头 FlowTail kis.Function // 当前Flow所拥有的Function列表表尾 flock sync.RWMutex // 管理链表插入读写的锁 ThisFunction kis.Function // Flow当前正在执行的KisFunction对象 ThisFunctionId string // 当前执行到的Function ID PrevFunctionId string // 当前执行到的Function 上一层FunctionID // Function列表参数 funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam fplock sync.RWMutex // 管理funcParams的读写锁 // 数据 buffer common.KisRowArr // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch data common.KisDataMap // 流式计算各个层级的数据源 inPut common.KisRowArr // 当前Function的计算输入数据 } ``` 这里的Funcs成员,其key的含义,之前我们定义的是KisID,现在要修正为key的含义是FunctionName。 下面想Funcs成员赋值的代码做一个简单的修正 ```go // appendFunc 将Function添加到Flow中, 链表操作 func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) error { // ... ... //将Function Name 详细Hash对应关系添加到flow对象中 flow.Funcs[function.GetConfig().FName] = function // ... ... } ``` ### 6.3.4 KisPool新增方法 > kis-flow/kis/pool.go ```go // GetFlows 得到全部的Flow func (pool *kisPool) GetFlows() []Flow { pool.flowLock.RLock() // 读锁 defer pool.flowLock.RUnlock() var flows []Flow for _, flow := range pool.flowRouter { flows = append(flows, flow) } return flows } ``` KisPool新增 获取全部Flow的方法,以支持导出模块使用。 ## 6.4 配置导出单元测试 在`kis-flow/test/`创建`kis_config_export_test.go`文件。 ```go package test import ( "kis-flow/common" "kis-flow/file" "kis-flow/kis" "kis-flow/test/caas" "kis-flow/test/faas" "testing" ) func TestConfigExportYmal(t *testing.T) { // 0. 注册Function 回调业务 kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) // 0. 注册ConnectorInit 和 Connector 回调业务 kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) // 1. 加载配置文件并构建Flow if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil { panic(err) } // 2. 讲构建的内存KisFlow结构配置导出的文件当中 flows := kis.Pool().GetFlows() for _, flow := range flows { if err := file.ConfigExportYaml(flow, "/Users/gopath/src/kis-flow/test/export_conf/"); err != nil { panic(err) } } } ``` cd到`kis-flow/test/`下 执行: ```bash go test -test.v -test.paniconexit0 -test.run TestConfigExportYmal ``` 会在`kis-flow/test/export_conf/`下得到导出的配置。 ```bash ├── export_conf │ ├── conn-ConnName1.yaml │ ├── flow-flowName1.yaml │ ├── func-funcName1.yaml │ ├── func-funcName2.yaml │ └── func-funcName3.yaml ``` ## 6.5 【V0.5】源代码 https://github.com/aceld/kis-flow/releases/tag/v0.5 --- 作者:刘丹冰Aceld github: [https://github.com/aceld](https://github.com/aceld) KisFlow开源项目地址:[https://github.com/aceld/kis-flow](https://github.com/aceld/kis-flow) --- [Golang框架实战-KisFlow流式计算框架专栏](https://www.jianshu.com/c/c72f100773a6) --- [Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df) [Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a) [Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7) [Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc) [Golang框架实战-KisFlow流式计算框架(5)-Function调度](https://www.jianshu.com/p/6f54966c9f1e) [Golang框架实战-KisFlow流式计算框架(6)-Connector](https://www.jianshu.com/p/159b14219559) [Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出](https://www.jianshu.com/p/8870598beed0)

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

120 次点击  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传