【Go 夜读】第 16 期 OpenFaas 介绍及源码分析

yangwen13 · · 492 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

>文章来自于:https://reading.developerlearning.cn/reading/other/16-2018-09-06-openfaas-guide/ ## 观看视频 https://youtu.be/bZtgrAVR9HQ ### 关于我 网名: Lucas Github:https://github.com/zhenfeng-zhu 博客:https://zhenfeng-zhu.github.io/ 知乎:https://www.zhihu.com/people/zhu-zhen-feng-96/activities 专栏:https://zhuanlan.zhihu.com/openfaas-cn 微信:zhuzhenfeng1993 ### 主要内容 - OpenFaaS的简介 - OpenFaaS的快速入门 - OpenFaaS的基础组件 - OpenFaaS的源码分析 - OpenFaaS的定制 OpenFaaS的Gateway是一个golang实现的请求转发的网关,在这个网关服务中,主要有以下几个功能: - UI - 部署函数 - 监控 - 自动伸缩 ## 架构分析 ![图:Kubernetes作为Provider的架构图](https://ws1.sinaimg.cn/large/b831e4c7gy1fttxzimdp3j20sm0hqabx.jpg) 从图中可以发现,当Gateway作为一个入口,当CLI或者web页面发来要部署或者调用一个函数的时候,Gateway会将请求转发给Provider,同时会将监控指标发给Prometheus。AlterManager会根据需求,调用API自动伸缩函数。 ## 源码分析 ### 依赖 ```go github.com/gorilla/mux github.com/nats-io/go-nats-streaming github.com/nats-io/go-nats github.com/openfaas/nats-queue-worker github.com/prometheus/client_golang ``` mux 是一个用来执行http请求的路由和分发的第三方扩展包。 go-nats-streaming,go-nats,nats-queue-worker这三个依赖是异步函数的时候才会用到,在分析queue-worker的时候有说到Gateway也是一个发布者。 client_golang是Prometheus的客户端。 ### 项目结构 ```bash ├── Dockerfile ├── Dockerfile.arm64 ├── Dockerfile.armhf ├── Gopkg.lock ├── Gopkg.toml ├── README.md ├── assets ├── build.sh ├── handlers │   ├── alerthandler.go │   ├── alerthandler_test.go │   ├── asyncreport.go │   ├── baseurlresolver_test.go │   ├── basic_auth.go │   ├── basic_auth_test.go │   ├── callid_middleware.go │   ├── cors.go │   ├── cors_test.go │   ├── forwarding_proxy.go │   ├── forwarding_proxy_test.go │   ├── function_cache.go │   ├── function_cache_test.go │   ├── infohandler.go │   ├── metrics.go │   ├── queueproxy.go │   ├── scaling.go │   └── service_query.go ├── metrics │   ├── add_metrics.go │   ├── add_metrics_test.go │   ├── externalwatcher.go │   ├── metrics.go │   └── prometheus_query.go ├── plugin │   ├── external.go │   └── external_test.go ├── queue │   └── types.go ├── requests │   ├── forward_request.go │   ├── forward_request_test.go │   ├── prometheus.go │   ├── prometheus_test.go │   └── requests.go ├── server.go ├── tests │   └── integration ├── types │   ├── handler_set.go │   ├── inforequest.go │   ├── load_credentials.go │   ├── proxy_client.go │   ├── readconfig.go │   └── readconfig_test.go ├── vendor │   └── github.com └── version └── version.go ``` Gateway的目录明显多了很多,看源码的时候,首先要找到的是main包,从main函数看起,就能很容易分析出来项目是如何运行的。 从server.go的main函数中我们可以看到,其实有如下几个模块: - 基本的安全验证 - 和函数相关的代理转发 - 同步函数 - 列出函数 - 部署函数 - 删除函数 - 更新函数 - 异步函数 - Prometheus的监控 - ui - 自动伸缩 ### 基本的安全验证 如果配置了开启基本安全验证,会从磁盘中读取密钥: ```go var credentials *types.BasicAuthCredentials if config.UseBasicAuth { var readErr error reader := types.ReadBasicAuthFromDisk{ SecretMountPath: config.SecretMountPath, } credentials, readErr = reader.Read() if readErr != nil { log.Panicf(readErr.Error()) } } ``` 在Gateway的配置相关的,都会有一个read()方法,进行初始化赋值。 如果credentials被赋值之后,就会对一些要加密的API handler进行一个修饰,被修饰的API有: - UpdateFunction - DeleteFunction - DeployFunction - ListFunctions - ScaleFunction ```go if credentials != nil { faasHandlers.UpdateFunction = handlers.DecorateWithBasicAuth(faasHandlers.UpdateFunction, credentials) faasHandlers.DeleteFunction = handlers.DecorateWithBasicAuth(faasHandlers.DeleteFunction, credentials) faasHandlers.DeployFunction = handlers.DecorateWithBasicAuth(faasHandlers.DeployFunction, credentials) faasHandlers.ListFunctions = handlers.DecorateWithBasicAuth(faasHandlers.ListFunctions, credentials) faasHandlers.ScaleFunction = handlers.DecorateWithBasicAuth(faasHandlers.ScaleFunction, credentials) } ``` 这个DecorateWithBasicAuth()方法是一个路由中间件: 1. 调用mux路由的BasicAuth(),从http的header中取到用户名和密码 2. 然后给请求头上设置一个字段`WWW-Authenticate`,值为`Basic realm="Restricted"` 3. 如果校验失败,则返回错误,成功的话调用next方法继续进入下一个handler。 ```go // DecorateWithBasicAuth enforces basic auth as a middleware with given credentials func DecorateWithBasicAuth(next http.HandlerFunc, credentials *types.BasicAuthCredentials) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { user, password, ok := r.BasicAuth() w.Header().Set("WWW-Authenticate", `Basic realm="Restricted"`) if !ok || !(credentials.Password == password && user == credentials.User) { w.WriteHeader(http.StatusUnauthorized) w.Write([]byte("invalid credentials")) return } next.ServeHTTP(w, r) } } ``` ### 代理转发 Gateway本身不做任何和部署发布函数的事情,它只是作为一个代理,把请求转发给相应的Provider去处理,所有的请求都要通过这个网关。 #### 同步函数转发 主要转发的API有: - RoutelessProxy - ListFunctions - DeployFunction - DeleteFunction - UpdateFunction ```go faasHandlers.RoutelessProxy = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver) faasHandlers.ListFunctions = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver) faasHandlers.DeployFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver) faasHandlers.DeleteFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver) faasHandlers.UpdateFunction = handlers.MakeForwardingProxyHandler(reverseProxy, forwardingNotifiers, urlResolver) ``` MakeForwardingProxyHandler()有三个参数: - proxy 这是一个http的客户端,作者把这个客户端抽成一个类,然后使用该类的NewHTTPClientReverseProxy方法创建实例,这样就简化了代码,不用每次都得写一堆相同的配置。 - notifiers 这个其实是要打印的日志,这里是一个HTTPNotifier的接口。而在这个MakeForwardingProxyHandler中其实有两个实现类,一个是LoggingNotifier,一个是PrometheusFunctionNotifier,分别用来打印和函数http请求相关的日志以及和Prometheus监控相关的日志。 - baseURLResolver 这个就是Provider的url地址。 在这个MakeForwardingProxyHandler中主要做了三件事儿: 1. 解析要转发的url 2. 调用forwardRequest方法转发请求, forwardRequest方法的逻辑比较简单,只是把请求发出去。这里就不深入分析了。 3. 打印日志 ```go // MakeForwardingProxyHandler create a handler which forwards HTTP requests func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy, notifiers []HTTPNotifier, baseURLResolver BaseURLResolver) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { baseURL := baseURLResolver.Resolve(r) requestURL := r.URL.Path start := time.Now() statusCode, err := forwardRequest(w, r, proxy.Client, baseURL, requestURL, proxy.Timeout) seconds := time.Since(start) if err != nil { log.Printf("error with upstream request to: %s, %s\n", requestURL, err.Error()) } for _, notifier := range notifiers { notifier.Notify(r.Method, requestURL, statusCode, seconds) } } } ``` #### 异步函数转发 前面说过,如果是异步函数,Gateway就作为一个发布者,将函数放到队列里。MakeQueuedProxy方法就是做这件事儿的: 1. 读取请求体 2. 将`X-Callback-Url`参数从参数中http的header中读出来 3. 实例化用于异步处理的Request对象 4. 调用canQueueRequests.Queue(req),将请求发布到队列中 ```go // MakeQueuedProxy accepts work onto a queue func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueRequests queue.CanQueueRequests) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() body, err := ioutil.ReadAll(r.Body) // 省略错误处理代码 vars := mux.Vars(r) name := vars["name"] callbackURLHeader := r.Header.Get("X-Callback-Url") var callbackURL *url.URL if len(callbackURLHeader) > 0 { urlVal, urlErr := url.Parse(callbackURLHeader) // 省略错误处理代码 callbackURL = urlVal } req := &queue.Request{ Function: name, Body: body, Method: r.Method, QueryString: r.URL.RawQuery, Header: r.Header, CallbackURL: callbackURL, } err = canQueueRequests.Queue(req) // 省略错误处理代码 w.WriteHeader(http.StatusAccepted) } } ``` #### 自动伸缩 伸缩性其实有两种,一种是可以通过调用API接口,来将函数进行缩放。另外一种就是通过AlertHandler。 自动伸缩是OpenFaaS的一大特点,触发自动伸缩主要是根据不同的指标需求。 - 根据每秒请求数来做伸缩 OpenFaaS附带了一个自动伸缩的规则,这个规则是在AlertManager配置文件中定义。AlertManager从Prometheus中读取使用情况(每秒请求数),然后在满足一定条件时向Gateway发送警报。 可以通过删除AlertManager,或者将部署扩展的环境变量设置为0,来禁用此方式。 - 最小/最大副本数 通过向函数添加标签, 可以在部署时设置最小 (初始) 和最大副本数。 - `com.openfaas.scale.min` 默认是 `1` - `com.openfaas.scale.max` 默认是 `20` - `com.openfaas.scale.factor` 默认是 `20%` ,在0-100之间,这是每次扩容的时候,新增实例的百分比,若是100的话,会瞬间飙升到副本数的最大值。 `com.openfaas.scale.min` 和 `com.openfaas.scale.max`值一样的时候,可以关闭自动伸缩。 `com.openfaas.scale.factor`是0时,也会关闭自动伸缩。 - 通过内存和CPU的使用量。 使用k8s内置的HPA,也可以触发AlertManager。 ##### 手动指定伸缩的值 可以从这句代码中发现,调用这个路由,转发给了provider处理。 ``` r.HandleFunc("/system/scale-function/{name:[-a-zA-Z_0-9]+}", faasHandlers.ScaleFunction).Methods(http.MethodPost) ``` ##### 处理AlertManager的伸缩请求 Prometheus将监控指标发给AlertManager之后,会触发AlterManager调用`/system/alert`接口,这个接口的handler是由`handlers.MakeAlertHandler`方法生成。 MakeAlertHandler方法接收的参数是ServiceQuery。ServiceQuery是一个接口,它有两个函数,用来get或者ser最大的副本数。Gateway中实现这个接口的类是ExternalServiceQuery,这个实现类是在plugin包中,我们也可以直接定制这个实现类,用来实现满足特定条件。 ```go // ServiceQuery provides interface for replica querying/setting type ServiceQuery interface { GetReplicas(service string) (response ServiceQueryResponse, err error) SetReplicas(service string, count uint64) error } // ExternalServiceQuery proxies service queries to external plugin via HTTP type ExternalServiceQuery struct { URL url.URL ProxyClient http.Client } ``` 这个ExternalServiceQuery有一个`NewExternalServiceQuery`方法,这个方法也是一个工厂方法,用来创建实例。这个url其实就是provider的url,proxyClient就是一个http的client对象。 - `GetReplicas`方法 从`system/function/:name`接口获取到函数的信息,组装一个`ServiceQueryResponse`对象即可。 - `SetReplicas`方法 调用`system/scale-function/:name`接口,设置副本数。 MakeAlertHandler的函数主要是从`http.Request`中读取body,然后反序列化成`PrometheusAlert`对象: ```go // PrometheusAlert as produced by AlertManager type PrometheusAlert struct { Status string `json:"status"` Receiver string `json:"receiver"` Alerts []PrometheusInnerAlert `json:"alerts"` } ``` 可以发现,这个Alerts是一个数组对象,所以可以是对多个函数进行缩放。反序列化之后,调用`handleAlerts`方法,而`handleAlerts`对Alerts进行遍历,针对每个Alerts调用了`scaleService`方法。`scaleService`才是真正处理伸缩服务的函数。 ```go func scaleService(alert requests.PrometheusInnerAlert, service ServiceQuery) error { var err error serviceName := alert.Labels.FunctionName if len(serviceName) > 0 { queryResponse, getErr := service.GetReplicas(serviceName) if getErr == nil { status := alert.Status newReplicas := CalculateReplicas(status, queryResponse.Replicas, uint64(queryResponse.MaxReplicas), queryResponse.MinReplicas, queryResponse.ScalingFactor) log.Printf("[Scale] function=%s %d => %d.\n", serviceName, queryResponse.Replicas, newReplicas) if newReplicas == queryResponse.Replicas { return nil } updateErr := service.SetReplicas(serviceName, newReplicas) if updateErr != nil { err = updateErr } } } return err } ``` 从代码总就可以看到,scaleService做了三件事儿: - 获取现在的副本数 - 计算新的副本数 新副本数的计算方法是根据`com.openfaas.scale.factor`计算步长: ```go step := uint64((float64(maxReplicas) / 100) * float64(scalingFactor)) ``` - 设置为新的副本数 ##### 从0增加副本到的最小值 我们在调用函数的时候,用的路由是:`/function/:name`。如果环境变量里有配置`scale_from_zero`为true,先用`MakeScalingHandler()`方法对proxyHandler进行一次包装。 `MakeScalingHandler`接受参数主要是: - next:就是下一个httpHandlerFunc,中间件都会有这样一个参数 - config:`ScalingConfig`的对象: ```go // ScalingConfig for scaling behaviours type ScalingConfig struct { MaxPollCount uint // 查到的最大数量 FunctionPollInterval time.Duration // 函数调用时间间隔 CacheExpiry time.Duration // 缓存过期时间 ServiceQuery ServiceQuery // 外部服务调用的一个接口 } ``` 这个`MakeScalingHandler`中间件主要做了如下的事情: - 先从FunctionCache缓存中获取该函数的基本信息,从这个缓存可以拿到每个函数的副本数量。 - 为了加快函数的启动速度,如果缓存中可以获该得函数,且函数的副本数大于0,满足条件,return即可。 - 如果不满足上一步,就会调用`SetReplicas`方法设置副本数,并更新FunctionCache的缓存。 ```go // MakeScalingHandler creates handler which can scale a function from // zero to 1 replica(s). func MakeScalingHandler(next http.HandlerFunc, upstream http.HandlerFunc, config ScalingConfig) http.HandlerFunc { cache := FunctionCache{ Cache: make(map[string]*FunctionMeta), Expiry: config.CacheExpiry, } return func(w http.ResponseWriter, r *http.Request) { functionName := getServiceName(r.URL.String()) if serviceQueryResponse, hit := cache.Get(functionName); hit && serviceQueryResponse.AvailableReplicas > 0 { next.ServeHTTP(w, r) return } queryResponse, err := config.ServiceQuery.GetReplicas(functionName) cache.Set(functionName, queryResponse) // 省略错误处理 if queryResponse.AvailableReplicas == 0 { minReplicas := uint64(1) if queryResponse.MinReplicas > 0 { minReplicas = queryResponse.MinReplicas } err := config.ServiceQuery.SetReplicas(functionName, minReplicas) // 省略错误处理代码 for i := 0; i < int(config.MaxPollCount); i++ { queryResponse, err := config.ServiceQuery.GetReplicas(functionName) cache.Set(functionName, queryResponse) // 省略错误处理 time.Sleep(config.FunctionPollInterval) } } next.ServeHTTP(w, r) } } ``` ### 监控 监控是一个定时任务,开启了一个新协程,利用go的ticker.C的间隔不停的去调用`/system/functions`接口。反序列化到MetricOptions对象中。 ```go func AttachExternalWatcher(endpointURL url.URL, metricsOptions MetricOptions, label string, interval time.Duration) { ticker := time.NewTicker(interval) quit := make(chan struct{}) proxyClient := // 省略创建一个http.Client对象 go func() { for { select { case <-ticker.C: get, _ := http.NewRequest(http.MethodGet, endpointURL.String()+"system/functions", nil) services := []requests.Function{} res, err := proxyClient.Do(get) // 省略反序列的代码 for _, service := range services { metricsOptions.ServiceReplicasCounter. WithLabelValues(service.Name). Set(float64(service.Replicas)) } break case <-quit: return } } }() } ``` ### UI UI的代码很简单,主要就是一些前端的代码,调用上面的讲的一些API接口即可,这里就略去不表。 ## 总结 Gateway是OpenFaaS最为重要的一个组件。回过头看整个项目的结构,Gateway就是一个rest转发服务,一个一个的handler,每个模块之间的耦合性不是很高,可以很容易的去拆卸,自定义实现相应的模块。

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

492 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传