Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)

aceld · · 346 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

# 2. V0.1-项目构建及基础模块定义 首先我们创建我们的项目,项目的主文件目录就叫KisFlow,且在Github上创建对应的仓库: [https://github.com/aceld/kis-flow](https://github.com/aceld/kis-flow) 然后将项目代码clone到本地。 ## 2.0 项目构建 (这里如果你是按照本教程开发,需要在自己的仓库重新创建一个新项目,并且clone到本地开发) ### 2.0.1 创建项目目录 接下来,我们先将项目中的必要的文件目录创建好,项目的目录结构如下: ```bash kis-flow / . ├── LICENSE ├── README.md ├── common/ ├── example/ ├── function/ ├── conn/ ├── config/ ├── flow/ └── kis/ ``` 这里我们创建三个文件夹, `common/`为 存放我们一些公用的基础常量和一些枚举参数,还有一些工具类的方法。 `flow/`为存放KisFlow的核心代码。 `function/`为存放KisFunction的核心代码。 `conn/`为存放KisConnector的核心代码。 `config/` 存放flow、functioin、connector等策略配置信息模块。 `example/`为我们针对KisFlow的一些测试案例和test单元测试案例等,能够及时验证我们的项目效果。 `kis/`来存放所有模块的抽象层。 ### 2.0.1 创建go.mod cd 到 kis-flow的项目根目录,执行如下指令: 我们会得到go.mod文件,这个是作为当前项目的包管理文件,如下: ```go module kis-flow go 1.18 ``` 首先因为在之后会有很多调试日志要打印,我们先把日志模块集成了,日志模块KisFlow提供一个默认的标准输出Logger对象,再对我开放一个SetLogger() 方法来进行重新设置开发者自己的Logger模块。 ## 2.1 KisLogger ### 2.1.1 Logger抽象接口 将Logger的定义在`kis-flow/log/`目录下,创建`kis_log.go`文件: > kis-flow/log/kis_log.go ```go package log import "context" type KisLogger interface { // InfoFX 有上下文的Info级别日志接口, format字符串格式 InfoFX(ctx context.Context, str string, v ...interface{}) // ErrorFX 有上下文的Error级别日志接口, format字符串格式 ErrorFX(ctx context.Context, str string, v ...interface{}) // DebugFX 有上下文的Debug级别日志接口, format字符串格式 DebugFX(ctx context.Context, str string, v ...interface{}) // InfoF 无上下文的Info级别日志接口, format字符串格式 InfoF(str string, v ...interface{}) // ErrorF 无上下文的Error级别日志接口, format字符串格式 ErrorF(str string, v ...interface{}) // DebugF 无上下文的Debug级别日志接口, format字符串格式 DebugF(str string, v ...interface{}) } // kisLog 默认的KisLog 对象 var kisLog KisLogger // SetLogger 设置KisLog对象, 可以是用户自定义的Logger对象 func SetLogger(newlog KisLogger) { kisLog = newlog } // Logger 获取到kisLog对象 func Logger() KisLogger { return kisLog } ``` KisLogger提供了三个级别的日志,分别是Info、Error、Debug。且也分别提供了具备context参数与不具备context参数的两套日志接口。 提供一个全局对象`kisLog`,默认的KisLog 对象。以及方法`SetLogger()`和`Logger()`供开发可以设置自己的Logger对象以及获取到Logger对象。 ### 2.1.2 默认的日志对象KisDefaultLogger 如果开发没有自定义的日志对象定义,那么KisFlow会提供一个默认的日志对象`kisDefaultLogger`,这个类实现了`KisLogger`的全部接口,且都是默认打印到标准输出的形式来打印日志,定义在`kis-flow/log/`目录下,创建`kis_default_log.go`文件。 > kis-flow/log/kis_default_log.go ```go package log import ( "context" "fmt" ) // kisDefaultLog 默认提供的日志对象 type kisDefaultLog struct{} func (log *kisDefaultLog) InfoF(str string, v ...interface{}) { fmt.Printf(str, v...) } func (log *kisDefaultLog) ErrorF(str string, v ...interface{}) { fmt.Printf(str, v...) } func (log *kisDefaultLog) DebugF(str string, v ...interface{}) { fmt.Printf(str, v...) } func (log *kisDefaultLog) InfoFX(ctx context.Context, str string, v ...interface{}) { fmt.Println(ctx) fmt.Printf(str, v...) } func (log *kisDefaultLog) ErrorFX(ctx context.Context, str string, v ...interface{}) { fmt.Println(ctx) fmt.Printf(str, v...) } func (log *kisDefaultLog) DebugFX(ctx context.Context, str string, v ...interface{}) { fmt.Println(ctx) fmt.Printf(str, v...) } func init() { // 如果没有设置Logger, 则启动时使用默认的kisDefaultLog对象 if Logger() == nil { SetLogger(&kisDefaultLog{}) } } ``` 这里在`init()`初始化方法中,会判断目前是否已经有设置全局的Logger对象,如果没有,KisFlow会默认选择kisDefaultLog 作为全局Logger日志对象。 ### 2.1.3 单元测试KisLogger 现在,我们先不针对`KisLogger`做过多的方法开发,我们优先将现有的程序跑起来,做一个单元测试来测试创建一个`KisLogger`。 > kis-flow/test/kis_log_test.go ```go package test import ( "context" "kis-flow/log" "testing" ) func TestKisLogger(t *testing.T) { ctx := context.Background() log.Logger().InfoFX(ctx, "TestKisLogger InfoFX") log.Logger().ErrorFX(ctx, "TestKisLogger ErrorFX") log.Logger().DebugFX(ctx, "TestKisLogger DebugFX") log.Logger().InfoF("TestKisLogger InfoF") log.Logger().ErrorF("TestKisLogger ErrorF") log.Logger().DebugF("TestKisLogger DebugF") } ``` 我们`cd`到`kis-flow/test/`目录下执行单元测试指令: ```bash go test -test.v -test.paniconexit0 -test.run TestKisLogger ``` 得到结果如下: ```bash === RUN TestKisLogger context.Background TestKisLogger InfoFX context.Background TestKisLogger ErrorFX context.Background TestKisLogger DebugFX TestKisLogger InfoF TestKisLogger ErrorF TestKisLogger DebugF --- PASS: TestKisLogger (0.00s) PASS ok kis-flow/test 0.509s ``` ## 2.2 KisConfig 在KisFlow中,我们定义了三种核心模块,分别是`KisFunction`, `KisFlow`, `KisConnector` ,所以KisConfig也分别需要针对这三个模块进行定义,我们将全部有关KisConfig的代码都放在`kis-flow/config/`目录下。 ```bash ➜ kis-flow git:(master) ✗ tree . ├── LICENSE ├── README.md ├── common/ │   └── ├── example/ │   └── ├── config/ │   ├── ├── test/ └── go.mod ``` ## 2.2.1 KisFuncConfig 定义 KisFuncConfig在设计文档中的yaml文件形式如下: ```yaml kistype: func fname: 测试KisFunction_S1 fmode: Save source: name: 被校验的测试数据源1-用户订单维度 must: - userid - orderid option: cname: 测试KisConnector_1 retry_times: 3 retry_duration: 500 default_params: default1: default1_param default2: default2_param ``` 参数说明: ![](https://upload-images.jianshu.io/upload_images/11093205-528913c04d3667d3.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) ![](https://upload-images.jianshu.io/upload_images/11093205-a4d12979a833b7f6.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 接下来我们根据上述的配置协议,来定义KisFunction的策略配置结构体,并且提供一些响应的初始化方法。 我们在项目文档中创建`kis_func_config.go`文件,在这里我们将需要的Config定义实现。 ### A. 结构体定义 > kis-flow/config/kis_func_config.go ```go package config import ( "kis-flow/common" "kis-flow/log" ) // FParam 在当前Flow中Function定制固定配置参数类型 type FParam map[string]string // KisSource 表示当前Function的业务源 type KisSource struct { Name string `yaml:"name"` //本层Function的数据源描述 Must []string `yaml:"must"` //source必传字段 } // KisFuncOption 可选配置 type KisFuncOption struct { CName string `yaml:"cname"` //连接器Connector名称 RetryTimes int `yaml:"retry_times"` //选填,Function调度重试(不包括正常调度)最大次数 RetryDuriton int `yaml:"return_duration"` //选填,Function调度每次重试最大时间间隔(单位:ms) Params FParam `yaml:"default_params"` //选填,在当前Flow中Function定制固定配置参数 } // KisFuncConfig 一个KisFunction策略配置 type KisFuncConfig struct { KisType string `yaml:"kistype"` FName string `yaml:"fname"` FMode string `yaml:"fmode"` Source KisSource `yaml:"source"` Option KisFuncOption `yaml:"option"` } ``` 这里`KisFuncConfig`是相关结构体,其中 `FParam`、`KisSource`、`KisFuncOption`均为一些相关的参数类型。 ### B. 相关方法定义 下面我们先简单的提供创建`KisFuncConfig`的构造方法。 > kis-flow/config/kis_func_config.go ```go // NewFuncConfig 创建一个Function策略配置对象, 用于描述一个KisFunction信息 func NewFuncConfig(funcName string, mode common.KisMode, source *KisSource, option *KisFuncOption) *KisFuncConfig { config := new(KisFuncConfig) config.FName = funcName if source == nil { log.Logger().ErrorF("funcName NewConfig Error, source is nil, funcName = %s\n", funcName) return nil } config.Source = *source config.FMode = string(mode) //FunctionS 和 L 需要必传KisConnector参数,原因是S和L需要通过Connector进行建立流式关系 if mode == common.S || mode == common.L { if option == nil { log.Logger().ErrorF("Funcion S/L need option->Cid\n") return nil } else if option.CName == "" { log.Logger().ErrorF("Funcion S/L need option->Cid\n") return nil } } if option != nil { config.Option = *option } return config } ``` 上述代码中提到了`common.S`和 `common.L`两个枚举类型,这是我们针对KisFunction提供的五种类型的枚举值,我们可以将他们定义在 `kis-flow/common/const.go`文件中。 > kis-flow/common/const.go ```go package common type KisMode string const ( // V 为校验特征的KisFunction, // 主要进行数据的过滤,验证,字段梳理,幂等等前置数据处理 V KisMode = "Verify" // S 为存储特征的KisFunction, // S会通过NsConnector进行将数据进行存储,数据的临时声明周期为NsWindow S KisMode = "Save" // L 为加载特征的KisFunction, // L会通过KisConnector进行数据加载,通过该Function可以从逻辑上与对应的S Function进行并流 L KisMode = "Load" // C 为计算特征的KisFunction, // C会通过KisFlow中的数据计算,生成新的字段,将数据流传递给下游S进行存储,或者自己也已直接通过KisConnector进行存储 C KisMode = "Calculate" // E 为扩展特征的KisFunction, // 作为流式计算的自定义特征Function,如,Notify 调度器触发任务的消息发送,删除一些数据,重置状态等。 E KisMode = "Expand" ) ``` 如果`fmode`为`Save`或者`Load`说明这个function有查询库或者存储数据的行为,那么这个Function就需要关联一个KisConnector,那么CName就需要传递进来。 ### C. 创建KisFuncConfig单元测试 现在,我们先不针对`KisFuncConfig`做过多的方法开发,我们优先将现有的程序跑起来,做一个单元测试来测试创建一个`KisFuncConfig`。 > kis-flow/test/kis_config_test.go ```go func TestNewFuncConfig(t *testing.T) { source := config.KisSource{ Name: "公众号抖音商城户订单数据", Must: []string{"order_id", "user_id"}, } option := config.KisFuncOption{ CName: "connectorName1", RetryTimes: 3, RetryDuriton: 300, Params: config.FParam{ "param1": "value1", "param2": "value2", }, } myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option) log.Logger().InfoF("funcName1: %+v\n", myFunc1) } ``` 我们`cd`到`kis-flow/test/`目录下执行单元测试指令: ```bash go test -test.v -test.paniconexit0 -test.run TestNewFuncConfig ``` 得到结果如下: ```bash === RUN TestNewFuncConfig funcName1: &{KisType: FName:funcName1 FMode:Save Source:{Name:公众号抖音商城户订单数据 Must:[order_id user_id]} Option:{CName:connectorName1 RetryTimes:3 RetryDuriton:300 Params:map[param1:value1 param2:value2]}} --- PASS: TestNewFuncConfig (0.00s) PASS ok kis-flow/test 0.545s ``` 好了,现在最简单的KisFuncConfig的策略创建基本完成了。 ## 2.2.2 KisFlowConfig 定义 KisFlowConfig在设计文档中的yaml文件形式如下: ```yaml kistype: flow status: 1 flow_name: MyFlow1 flows: - fname: 测试PrintInput params: args1: value1 args2: value2 - fname: 测试KisFunction_S1 - fname: 测试PrintInput params: args1: value11 args2: value22 default2: newDefault - fname: 测试PrintInput - fname: 测试KisFunction_S1 params: my_user_param1: ffffffxxxxxx - fname: 测试PrintInput ``` 参数说明: ![](https://upload-images.jianshu.io/upload_images/11093205-e68080e6d90f3a91.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) ### A. 结构体定义 接下来我们根据上述的配置协议,来定义KisFlow的策略配置结构体,并且提供一些响应的初始化方法。 我们在项目文档中创建`kis_flow_config.go`文件,在这里我们将需要的Config定义实现。 > kis-flow/config/kis_flow_config.go ```go package config import "kis-flow/common" // KisFlowFunctionParam 一个Flow配置中Function的Id及携带固定配置参数 type KisFlowFunctionParam struct { FuncName string `yaml:"fname"` //必须 Params FParam `yaml:"params"` //选填,在当前Flow中Function定制固定配置参数 } // KisFlowConfig 用户贯穿整条流式计算上下文环境的对象 type KisFlowConfig struct { KisType string `yaml:"kistype"` Status int `yaml:"status"` FlowName string `yaml:"flow_name"` Flows []KisFlowFunctionParam `yaml:"flows"` } ``` 这里提供了一个新的参数类型 `KisFlowFunctionParam` ,这个表示配置KisFlow的时候,在调度的时候,flow默认传递当前被调度Function的自定义默认参数,如果不需要可以不添加此参数。 ### B. 相关方法定义 提供一个新建`KisFlowConfig`的构造方法。 > kis-flow/config/kis_flow_config.go ```go // NewFlowConfig 创建一个Flow策略配置对象, 用于描述一个KisFlow信息 func NewFlowConfig(flowName string, enable common.KisOnOff) *KisFlowConfig { config := new(KisFlowConfig) config.FlowName = flowName config.Flows = make([]KisFlowFunctionParam, 0) config.Status = int(enable) return config } // AppendFunctionConfig 添加一个Function Config 到当前Flow中 func (fConfig *KisFlowConfig) AppendFunctionConfig(params KisFlowFunctionParam) { fConfig.Flows = append(fConfig.Flows, params) } ``` 有关flow携带的Function配置,这里我们采用通过`AppendFunctionConfig`动态的去添加,目的是为了,今后可能有关kisflow的配置会从数据库/动态远程配置等中提取,那么就需要动态的将配置组合进来。 ### C. KisFlowConfig单元测试 同样,我们简单些一个单元测试来测试KisFlowConfig的创建。 > kis-flow/test/kis_config_test.go ```go func TestNewFlowConfig(t *testing.T) { flowFuncParams1 := config.KisFlowFunctionParam{ FuncName: "funcName1", Params: config.FParam{ "flowSetFunParam1": "value1", "flowSetFunParam2": "value2", }, } flowFuncParams2 := config.KisFlowFunctionParam{ FuncName: "funcName2", Params: config.FParam{ "default": "value1", }, } myFlow1 := config.NewFlowConfig("flowName1", common.FlowEnable) myFlow1.AppendFunctionConfig(flowFuncParams1) myFlow1.AppendFunctionConfig(flowFuncParams2) log.Logger().InfoF("myFlow1: %+v\n", myFlow1) } ``` 我们`cd`到`kis-flow/test/`目录下执行单元测试指令: ```go $ go test -test.v -test.paniconexit0 -test.run TestNewFlowConfig ``` 得到结果如下: ```bash === RUN TestNewFlowConfig myFlow1: &{KisType: Status:1 FlowName:flowName1 Flows:[{FuncName:funcName1 Params:map[flowSetFunParam1:value1 flowSetFunParam2:value2]} {FuncName:funcName2 Params:map[default:value1]}]} --- PASS: TestNewFlowConfig (0.00s) PASS ok kis-flow/test 0.251s ``` ## 2.2.3 KisConnConfig KisConnConfig在设计文档中的yaml文件形式如下: ```yaml kistype: conn cname: 测试KisConnector_1 addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990' type: redis key: userid_orderid_option params: args1: value1 args2: value2 load: null save: - 测试KisFunction_S1 ``` ### A. 结构体定义 接下来我们根据上述的配置协议,来定义KisConnector的策略配置结构体,并且提供一些响应的初始化方法。 我们在项目文档中创建`kis_conn_config.go`文件,在这里我们将需要的Config定义实现。 > kis-flow/config/kis_conn_config.go ```go package config import ( "errors" "fmt" "kis-flow/common" ) // KisConnConfig KisConnector 策略配置 type KisConnConfig struct { //配置类型 KisType string `yaml:"kistype"` //唯一描述标识 CName string `yaml:"cname"` //基础存储媒介地址 AddrString string `yaml:"addrs"` //存储媒介引擎类型"Mysql" "Redis" "Kafka"等 Type common.KisConnType `yaml:"type"` //一次存储的标识:如Redis为Key名称、Mysql为Table名称,Kafka为Topic名称等 Key string `yaml:"key"` //配置信息中的自定义参数 Params map[string]string `yaml:"params"` //存储读取所绑定的NsFuncionID Load []string `yaml:"load"` Save []string `yaml:"save"` } ``` ### B. 相关方法定义 > kis-flow/config/kis_conn_config.go ```go // NewConnConfig 创建一个KisConnector策略配置对象, 用于描述一个KisConnector信息 func NewConnConfig(cName string, addr string, t common.KisConnType, key string, param FParam) *KisConnConfig { strategy := new(KisConnConfig) strategy.CName = cName strategy.AddrString = addr strategy.Type = t strategy.Key = key strategy.Params = param return strategy } // WithFunc Connector与Function进行关系绑定 func (cConfig *KisConnConfig) WithFunc(fConfig *KisFuncConfig) error { switch common.KisMode(fConfig.FMode) { case common.S: cConfig.Save = append(cConfig.Save, fConfig.FName) case common.L: cConfig.Load = append(cConfig.Load, fConfig.FName) default: return errors.New(fmt.Sprintf("Wrong KisMode %s", fConfig.FMode)) } return nil } ``` 这里也是通过提供`WithFunc`方法来动态的添加Conn和Function的关联关系 ### C. KisConnConfig 单元测试 同样,我们简单些一个单元测试来测试KisConnConfig的创建。 > kis-flow/test/kis_config_test.go ```go func TestNewConnConfig(t *testing.T) { source := config.KisSource{ Name: "公众号抖音商城户订单数据", Must: []string{"order_id", "user_id"}, } option := config.KisFuncOption{ CName: "connectorName1", RetryTimes: 3, RetryDuriton: 300, Params: config.FParam{ "param1": "value1", "param2": "value2", }, } myFunc1 := config.NewFuncConfig("funcName1", common.S, &source, &option) connParams := config.FParam{ "param1": "value1", "param2": "value2", } myConnector1 := config.NewConnConfig("connectorName1", "0.0.0.0:9987,0.0.0.0:9997", common.REDIS, "key", connParams) if err := myConnector1.WithFunc(myFunc1); err != nil { log.Logger().ErrorF("WithFunc err: %s\n", err.Error()) } log.Logger().InfoF("myConnector1: %+v\n", myConnector1) } ``` 我们`cd`到`kis-fow/test/`目录下执行单元测试指令: ```bash $ go test -test.v -test.paniconexit0 -test.run TestNewConnConfig ``` 得到结果如下: ```bash === RUN TestNewConnConfig myConnector1: &{KisType: CName:connectorName1 AddrString:0.0.0.0:9987,0.0.0.0:9997 Type:redis Key:key Params:map[param1:value1 param2:value2] Load:[] Save:[funcName1]} --- PASS: TestNewConnConfig (0.00s) PASS ok kis-flow/test 0.481s ``` --- 作者:刘丹冰Aceld github: [https://github.com/aceld](https://github.com/aceld) KisFlow开源项目地址:[https://github.com/aceld/kis-flow](https://github.com/aceld/kis-flow) --- 连载中... [Golang框架实战-KisFlow流式计算框架(1)-概述](https://www.jianshu.com/p/ee3b0e9a38df) [Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)](https://www.jianshu.com/p/db9cdb3e9c8a)

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

346 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传