[Golang框架实战-KisFlow流式计算框架专栏](https://www.jianshu.com/c/c72f100773a6)
---
[Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df)
[Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a)
[Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7)
[Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc)
[Golang框架实战-KisFlow流式计算框架(5)-Function调度](https://www.jianshu.com/p/6f54966c9f1e)
[Golang框架实战-KisFlow流式计算框架(6)-Connector](https://www.jianshu.com/p/159b14219559)
[Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出](https://www.jianshu.com/p/8870598beed0)
---
## 6.1 配置的导入
现在每次建立Flow和Function等,都需要一系列繁琐的添加,不是很方便,接下来,我们可以通过批量读写配置文件,构建KisFlow中的结构关系,并且也可以将KisFlow的结构导出到本地文件中。目前我们先用文件的形式做配置的持久化,开发者也可以今后做数据库或者远程配置的持久化均可。
## 6.1 配置的导入
### 6.1.1 创建配置文件
首先我们在`kis-flow/test/load_conf/`下创建需要加载的kisflow业务配置文件。
在`kis-flow/test/load_conf/`下分别创建`conn/`、`flow/`、`func/`三个文件夹分别存放Connector、Flow、Funciton的配置信息。
```bash
├── conn
│ └── conn-ConnName1.yml
├── flow
│ └── flow-FlowName1.yml
└── func
├── func-FuncName1.yml
├── func-FuncName2.yml
└── func-FuncName3.yml
```
分别创建一些`yml`文件。具体内容如下:
#### A.Function
> kis-flow/test/load_conf/func/func-FuncNam1.yml
```yaml
kistype: func
fname: funcName1
fmode: Verify
source:
name: 公众号抖音商城户订单数据
must:
- order_id
- user_id
```
> kis-flow/test/load_conf/func/func-FuncNam2.yml
```yaml
kistype: func
fname: funcName2
fmode: Save
source:
name: 用户订单错误率
must:
- order_id
- user_id
option:
cname: ConnName1
```
> kis-flow/test/load_conf/func/func-FuncNam2.yml
```yaml
kistype: func
fname: funcName2
fmode: Save
source:
name: 用户订单错误率
must:
- order_id
- user_id
option:
cname: ConnName1
```
> kis-flow/test/load_conf/func/func-FuncNam3.yml
```yaml
kistype: func
fname: funcName3
fmode: Calculate
source:
name: 用户订单错误率
must:
- order_id
- user_id
```
#### B.Connector
> kis-flow/test/load_conf/func/func-ConnName1.yml
```yaml
kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
args1: value1
args2: value2
load: null
save:
- funcName2
```
#### C.Flow
> kis-flow/test/load_conf/func/func-FlowName1.yml
```yaml
kistype: flow
status: 1
flow_name: flowName1
flows:
- fname: funcName1
- fname: funcName2
- fname: funcName3
```
### 6.1.2 配置解析
创建`kis-flow/file/`目录,且创建`kis-flow/file/config_import.go`文件。
首先定义一个可以存放全部配置的接口:
> kis-flow/file/config_import.go
```go
type allConfig struct {
Flows map[string]*config.KisFlowConfig
Funcs map[string]*config.KisFuncConfig
Conns map[string]*config.KisConnConfig
}
```
key作为各个模块的Name名称字段。
然后分别定义解析Flow、Function、Connector配置的方法。yaml的第三方库,我们用`"gopkg.in/yaml.v3"`这个库。
> kis-flow/go.mod
```go
module kis-flow
go 1.18
require github.com/google/uuid v1.5.0
require gopkg.in/yaml.v3 v3.0.1 // indirect
```
#### A. Flow 配置解析
> kis-flow/file/config_import.go
```go
// kisTypeFlowConfigure 解析Flow配置文件,yaml格式
func kisTypeFlowConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
flow := new(config.KisFlowConfig)
if ok := yaml.Unmarshal(confData, flow); ok != nil {
return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
}
// 如果FLow状态为关闭,则不做配置加载
if common.KisOnOff(flow.Status) == common.FlowDisable {
return nil
}
if _, ok := all.Flows[flow.FlowName]; ok {
return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flow.FlowName))
}
// 加入配置集合中
all.Flows[flow.FlowName] = flow
return nil
}
```
- confData:是文件二进制数据
- fileName:是文件路径
- kistype: 为配置文件类别
`kisTypeFlowConfigure` 会将配置信息解析到allConfig的Flows成员中。
同理,Function和Connector的解析办法如下。
#### B. Functioin配置解析
> kis-flow/file/config_import.go
```go
// kisTypeFuncConfigure 解析Function配置文件,yaml格式
func kisTypeFuncConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
function := new(config.KisFuncConfig)
if ok := yaml.Unmarshal(confData, function); ok != nil {
return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
}
if _, ok := all.Funcs[function.FName]; ok {
return errors.New(fmt.Sprintf("%s set repeat function_id:%s", fileName, function.FName))
}
// 加入配置集合中
all.Funcs[function.FName] = function
return nil
}
```
#### C. Connector配置解析
> kis-flow/file/config_import.go
```go
// kisTypeConnConfigure 解析Connector配置文件,yaml格式
func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
conn := new(config.KisConnConfig)
if ok := yaml.Unmarshal(confData, conn); ok != nil {
return errors.New(fmt.Sprintf("%s is wrong format nsType = %s", fileName, kisType))
}
if _, ok := all.Conns[conn.CName]; ok {
return errors.New(fmt.Sprintf("%s set repeat conn_id:%s", fileName, conn.CName))
}
// 加入配置集合中
all.Conns[conn.CName] = conn
return nil
}
```
### 6.1.3 遍历文件
下面实现一个遍历一个路径`loadPath`下面所有的yml和yaml类型文件,按照kistype类别解析配置信息到allConfig中。
> kis-flow/file/config_import.go
```go
// parseConfigWalkYaml 全盘解析配置文件,yaml格式, 讲配置信息解析到allConfig中
func parseConfigWalkYaml(loadPath string) (*allConfig, error) {
all := new(allConfig)
all.Flows = make(map[string]*config.KisFlowConfig)
all.Funcs = make(map[string]*config.KisFuncConfig)
all.Conns = make(map[string]*config.KisConnConfig)
err := filepath.Walk(loadPath, func(filePath string, info os.FileInfo, err error) error {
// 校验文件后缀是否合法
if suffix := path.Ext(filePath); suffix != ".yml" && suffix != ".yaml" {
return nil
}
// 读取文件内容
confData, err := ioutil.ReadFile(filePath)
if err != nil {
return err
}
confMap := make(map[string]interface{})
// 校验yaml合法性
if err := yaml.Unmarshal(confData, confMap); err != nil {
return err
}
// 判断kisType是否存在
if kisType, ok := confMap["kistype"]; !ok {
return errors.New(fmt.Sprintf("yaml file %s has no file [kistype]!", filePath))
} else {
switch kisType {
case common.KisIdTypeFlow:
return kisTypeFlowConfigure(all, confData, filePath, kisType)
case common.KisIdTypeFunction:
return kisTypeFuncConfigure(all, confData, filePath, kisType)
case common.KisIdTypeConnnector:
return kisTypeConnConfigure(all, confData, filePath, kisType)
default:
return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType))
}
}
})
if err != nil {
return nil, err
}
return all, nil
}
```
### 6.1.4 导入方法
下面提供一个对外的公开方法`ConfigImportYaml`,需要提供一个导入的文件根路径。
> kis-flow/file/config_import.go
```go
// ConfigImportYaml 全盘解析配置文件,yaml格式
func ConfigImportYaml(loadPath string) error {
all, err := parseConfigWalkYaml(loadPath)
if err != nil {
return err
}
for flowName, flowConfig := range all.Flows {
// 构建一个Flow
newFlow := flow.NewKisFlow(flowConfig)
for _, fp := range flowConfig.Flows {
if err := buildFlow(all, fp, newFlow, flowName); err != nil {
return err
}
}
//将flow添加到FlowPool中
kis.Pool().AddFlow(flowName, newFlow)
}
return nil
}
```
首先会调用`parseConfigWalkYaml()`将全部的配置信息加载到内存中。
其次,遍历所有的Flow,依次去构建Flow,最终将flow添加到Pool当中,具体的构建流程如下:
> kis-flow/file/config_import.go
```go
func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, flowName string) error {
//加载当前Flow依赖的Function
if funcConfig, ok := all.Funcs[fp.FuncName]; !ok {
return errors.New(fmt.Sprintf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName))
} else {
//flow add connector
if funcConfig.Option.CName != "" {
// 加载当前Function依赖的Connector
if connConf, ok := all.Conns[funcConfig.Option.CName]; !ok {
return errors.New(fmt.Sprintf("FuncName [%s] need ConnName [%s], But has No This ConnName Config", fp.FuncName, funcConfig.Option.CName))
} else {
// Function Config 关联 Connector Config
_ = funcConfig.AddConnConfig(connConf)
}
}
//flow add function
if err := newFlow.Link(funcConfig, fp.Params); err != nil {
return err
}
}
return nil
}
```
## 6.2 配置导入单元测试
创建单元测试文件 `kis-flow/test/kis_config_import_test.go`。
> kis-flow/test/kis_config_import_test.go
```go
package test
import (
"context"
"kis-flow/common"
"kis-flow/file"
"kis-flow/kis"
"kis-flow/test/caas"
"kis-flow/test/faas"
"testing"
)
func TestConfigImportYmal(t *testing.T) {
ctx := context.Background()
// 0. 注册Function 回调业务
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. 注册ConnectorInit 和 Connector 回调业务
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("flowName1")
// 3. 提交原始数据
_ = flow1.CommitRow("This is Data1 from Test")
_ = flow1.CommitRow("This is Data2 from Test")
_ = flow1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
if err := flow1.Run(ctx); err != nil {
panic(err)
}
}
```
先注册业务方法。然后通过ConfigImportYaml加载配置,之后从Pool中得到flow实例,提交数据,运行。
> 注意,这里的配置文件路径,写的是绝对路径。
cd 到`kis-flow/test/`目录下,执行指令:
```bash
go test -test.v -test.paniconexit0 -test.run TestConfigImportYmal
```
结果如下:
```bash
=== RUN TestConfigImportYmal
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114960 ThisFunctionId:func-37c7070f45144529891d433ae9c4ebfc PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}
---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionS, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0001149c0 ThisFunctionId:func-5315301ffbbb4ae4be021729ddff1569 PrevFunctionId:func-37c7070f45144529891d433ae9c4ebfc funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}
---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]]
KisFunctionC, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114a20 ThisFunctionId:func-89a6a662729b4a0895e849c40bf29892 PrevFunctionId:func-5315301ffbbb4ae4be021729ddff1569 funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]}
---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 0
In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 1
In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 2
--- PASS: TestConfigImportYmal (0.01s)
PASS
ok kis-flow/test 0.517s
```
预期的结果和我们一致,现在我们可以通过配置文件进行加载且构建KisFlow了。
## 6.3 配置的导出
### 6.3.1 导出实现
> kis-flow/file/config_export.go
```go
package file
import (
"errors"
"fmt"
"gopkg.in/yaml.v3"
"io/ioutil"
"kis-flow/common"
"kis-flow/kis"
)
// ConfigExportYaml 将flow配置输出,且存储本地
func ConfigExportYaml(flow kis.Flow, savaPath string) error {
if data, err := yaml.Marshal(flow.GetConfig()); err != nil {
return err
} else {
//flow
err := ioutil.WriteFile(savaPath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644)
if err != nil {
return err
}
//function
for _, fp := range flow.GetConfig().Flows {
fConf := flow.GetFuncConfigByName(fp.FuncName)
if fConf == nil {
return errors.New(fmt.Sprintf("function name = %s config is nil ", fp.FuncName))
}
if fdata, err := yaml.Marshal(fConf); err != nil {
return err
} else {
if err := ioutil.WriteFile(savaPath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil {
return err
}
}
// Connector
if fConf.Option.CName != "" {
cConf, err := fConf.GetConnConfig()
if err != nil {
return err
}
if cdata, err := yaml.Marshal(cConf); err != nil {
return err
} else {
if err := ioutil.WriteFile(savaPath+common.KisIdTypeConnnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil {
return err
}
}
}
}
}
return nil
}
```
这里面需要补充一些接口,如下:
### 6.3.2 Flow新增接口
> kis-flow/kis/flow.go
```go
package kis
import (
"context"
"kis-flow/common"
"kis-flow/config"
)
type Flow interface {
// Run 调度Flow,依次调度Flow中的Function并且执行
Run(ctx context.Context) error
// Link 将Flow中的Function按照配置文件中的配置进行连接
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow 提交Flow数据到即将执行的Function层
CommitRow(row interface{}) error
// Input 得到flow当前执行Function的输入源数据
Input() common.KisRowArr
// GetName 得到Flow的名称
GetName() string
// GetThisFunction 得到当前正在执行的Function
GetThisFunction() Function
// GetThisFuncConf 得到当前正在执行的Function的配置
GetThisFuncConf() *config.KisFuncConfig
// GetConnector 得到当前正在执行的Function的Connector
GetConnector() (Connector, error)
// GetConnConf 得到当前正在执行的Function的Connector的配置
GetConnConf() (*config.KisConnConfig, error)
// +++++++++++++++++++++++++++++++
// GetConfig 得到当前Flow的配置
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName 得到当前Flow的配置
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// +++++++++++++++++++++++++++++++
}
```
flow新增的接口实现如下:
> kis-flow/flow/kis_flow.go
```go
func (flow *KisFlow) GetConfig() *config.KisFlowConfig {
return flow.Conf
}
// GetFuncConfigByName 得到当前Flow的配置
func (flow *KisFlow) GetFuncConfigByName(funcName string) *config.KisFuncConfig {
if f, ok := flow.Funcs[funcName]; ok {
return f.GetConfig()
} else {
log.Logger().ErrorF("GetFuncConfigByName(): Function %s not found", funcName)
return nil
}
}
```
### 6.3.3 KisFlow中Funcs修复
这里面之前有个笔误。
> kis-flow/flow/kis_flow.go
```go
// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
// 基础信息
Id string // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
Name string // Flow的可读名称
Conf *config.KisFlowConfig // Flow配置策略
// Function列表
Funcs map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName
FlowHead kis.Function // 当前Flow所拥有的Function列表表头
FlowTail kis.Function // 当前Flow所拥有的Function列表表尾
flock sync.RWMutex // 管理链表插入读写的锁
ThisFunction kis.Function // Flow当前正在执行的KisFunction对象
ThisFunctionId string // 当前执行到的Function ID
PrevFunctionId string // 当前执行到的Function 上一层FunctionID
// Function列表参数
funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
fplock sync.RWMutex // 管理funcParams的读写锁
// 数据
buffer common.KisRowArr // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
data common.KisDataMap // 流式计算各个层级的数据源
inPut common.KisRowArr // 当前Function的计算输入数据
}
```
这里的Funcs成员,其key的含义,之前我们定义的是KisID,现在要修正为key的含义是FunctionName。
下面想Funcs成员赋值的代码做一个简单的修正
```go
// appendFunc 将Function添加到Flow中, 链表操作
func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) error {
// ... ...
//将Function Name 详细Hash对应关系添加到flow对象中
flow.Funcs[function.GetConfig().FName] = function
// ... ...
}
```
### 6.3.4 KisPool新增方法
> kis-flow/kis/pool.go
```go
// GetFlows 得到全部的Flow
func (pool *kisPool) GetFlows() []Flow {
pool.flowLock.RLock() // 读锁
defer pool.flowLock.RUnlock()
var flows []Flow
for _, flow := range pool.flowRouter {
flows = append(flows, flow)
}
return flows
}
```
KisPool新增 获取全部Flow的方法,以支持导出模块使用。
## 6.4 配置导出单元测试
在`kis-flow/test/`创建`kis_config_export_test.go`文件。
```go
package test
import (
"kis-flow/common"
"kis-flow/file"
"kis-flow/kis"
"kis-flow/test/caas"
"kis-flow/test/faas"
"testing"
)
func TestConfigExportYmal(t *testing.T) {
// 0. 注册Function 回调业务
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. 注册ConnectorInit 和 Connector 回调业务
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. 讲构建的内存KisFlow结构配置导出的文件当中
flows := kis.Pool().GetFlows()
for _, flow := range flows {
if err := file.ConfigExportYaml(flow, "/Users/gopath/src/kis-flow/test/export_conf/"); err != nil {
panic(err)
}
}
}
```
cd到`kis-flow/test/`下 执行:
```bash
go test -test.v -test.paniconexit0 -test.run TestConfigExportYmal
```
会在`kis-flow/test/export_conf/`下得到导出的配置。
```bash
├── export_conf
│ ├── conn-ConnName1.yaml
│ ├── flow-flowName1.yaml
│ ├── func-funcName1.yaml
│ ├── func-funcName2.yaml
│ └── func-funcName3.yaml
```
## 6.5 【V0.5】源代码
https://github.com/aceld/kis-flow/releases/tag/v0.5
---
作者:刘丹冰Aceld github: [https://github.com/aceld](https://github.com/aceld)
KisFlow开源项目地址:[https://github.com/aceld/kis-flow](https://github.com/aceld/kis-flow)
---
[Golang框架实战-KisFlow流式计算框架专栏](https://www.jianshu.com/c/c72f100773a6)
---
[Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df)
[Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a)
[Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)](https://www.jianshu.com/p/77a67359e4a7)
[Golang框架实战-KisFlow流式计算框架(4)-数据流](https://www.jianshu.com/p/f66ca48156dc)
[Golang框架实战-KisFlow流式计算框架(5)-Function调度](https://www.jianshu.com/p/6f54966c9f1e)
[Golang框架实战-KisFlow流式计算框架(6)-Connector](https://www.jianshu.com/p/159b14219559)
[Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出](https://www.jianshu.com/p/8870598beed0)
有疑问加站长微信联系(非本文作者)