Go微服务 - 第八部分 - 使用Viper和Spring Cloud Config进行集中配置

WalkerQiao · · 1474 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

第八部分: Go微服务 - 使用Viper和Spring Cloud Config进行集中配置

在第八部分,我们探索Go微服务中使用Spring Cloud Config进行集中配置。

简介

考虑到微服务毕竟是用来分解应用为独立软件片段的,在微服务中集中处理一些东西感觉有些不太搭配。然而我们通常在后面的是进程之间的独立。微服务的其他操作应该集中处理。例如,日志应该在你的日志解决, 比如elk stack中终结, 监控应该纳入专用的监控中。 在这部分,我们将使用Spring Cloud Config和git处理外部化和集中配置。

集中处理组成我们应用程序的各种微服务的配置实际上是很自然的事情。 特别是在未知数量底层硬件节点上的容器环境运行的时候,管理配置文件构建到每个微服务的映像中或放到不同的安装卷中,很快就会变成真正的难题。有很多行之有效的项目可以帮我们处理这些问题,例如etcd, consul和ZooKeeper。然而,应该注意的是,这些项目提供的不仅仅是配置服务。既然本文聚焦的是集成Go微服务和Spring Cloud/Netflix OSS生态的支持服务, 我们将基于Spring Cloud配置进行集中配置, Spring Cloud Config是一个提供精确配置的专用软件。

Spring Cloud Config

Spring Cloud生态提供了集中配置的解决方案,也没有什么创意,就叫Spring Cloud Config。Spring Cloud Config服务器可以被视为服务和真正配置之间的代理, 提供了一些非常整洁的特性:

  • 支持多种不同的后端,例如git(默认), 用于etcd、consul和ZooKeeper的文件系统和插件。
  • 加密属性的透明解密。
  • 可插拔安全性。
  • 使用git钩子/REST API以及Spring Cloud Bus(例如RabbitMQ)的推送机制来将配置文件中的改变传播到服务,使得配置的实时更新成为可能。

我的同事Magnus最近的一篇文章对Spring Cloud Config进行特别深入的探讨, 见参考连接。在本文中,我们将集成我们的accountservice服务和Spring Cloud Config服务,配置后端使用公开的位于github上的git仓库, 从仓库中我们可以获取配置,解密/加密属性以及实现实时重载配置。

下面是我们整个解决方案目标的简单概述:

clipboard.png

概述

既然我们以Swarm模式运行Docker, 我们将继续以各种方式使用Docker的机制。在Swarm内部,我们应该运行至少一个(可以更多)Spring Cloud Config服务器。当我们的微服务中的一个启动的时候,它们要知道:

  • 配置服务器的逻辑服务名和端口号。也就是说,我们把我们的配置服务器也部署到Docker Swarm上作为服务,这里我们称之为configserver。意味着这是微服务要请求配置的时候唯一需要知道的东西。
  • 它们的名字是什么, 例如"accountservice"。
  • 它运行在什么样的执行配置文件上,例如"dev", "test", "prod"。 如果你对spring.profiles.active概念比较熟悉的话,这用于Go语言一样很自然。
  • 如果我们使用git作为后端,并想从特定的分支获取配置信息,我们就需要提前知道(可选的)。

鉴于上面四个标准, 请求配置的简单GET可能看起来像下面的样子:

resp, err := http.Get("http://configserver:8888/accountservice/dev/P8")

也就是下面的协议:

protocol://url:port/applicationName/profile/branch

在Swarm中搭建一个Spring Cloud配置服务器

本文代码可以从github直接克隆下来: https://github.com/callistaen...

你也可以用其他方式来设置和部署配置服务器。而我在goblog目录下面准备了一个support目录,用于存放https://github.com/callistaen...,里边包含了我们后面需要的第三方服务。

一般来说,每个必要的支持组件要么是简单的便于构建和部署组件的现成的Dockerfile, 要么是(java)源代码和配置(Spring Cloud应用通常是基于Spring Boot的), 这样我们需要自己使用Gradle构建。(不需要担心,我们只需要安装JDK就可以了)。

(这些Spring Cloud应用程序大部分我的同事都已经提前准备好了。具体可以参考Java微服务)

RabbitMQ

什么情况? 我们不是要安装Spring Cloud Config服务器吗? 好吧,这个依赖的软件具有一个消息中间人,可以使用支持RabbitMQ的Spring Cloud Bus来传播配置改变。

有RabbitMQ是一个很好的事情,不管怎么说,我们文章后面还会用到它。所以将从RabbitMQ开始,并在我们的Swarm中作为服务来运行。

我已经在/goblog/support/rabbitmq目录下面准备了一个Dockerfile,可以使用我在Docker Swarm服务中提前准备好的映像。

# use rabbitmq official
FROM rabbitmq

# enable management plugin
RUN rabbitmq-plugins enable --offline rabbitmq_management

# enable mqtt plugin
RUN rabbitmq-plugins enable --offline rabbitmq_mqtt

# expose management port
EXPOSE 15672
EXPOSE 5672

然后我们可以创建一个脚本文件, 在需要更新的时候帮我们自动做这些事情。

#!/bin/bash

# RabbitMQ
docker service rm rabbitmq
docker build -t someprefix/rabbitmq support/rabbitmq/
docker service create --name=rabbitmq --replicas=1 --network=my_network -p 1883:1883 -p 5672:5672 -p 15672:15672 someprefix/rabbitmq

(注意,你可能需要给这个脚本语言添加可执行权限。)

运行它,等待Docker下载必要的映像,并将它部署到Swarm中。 当它完成的时候,你就可以打开RabbitMQ管理UI,并且能使用guest/guest来登录进去。

Spring Cloud Config服务器

在/support/config-server中你会发现一个提前配置好的Spring Boot应用程序,它用于运行配置服务器。我们会使用一个git仓库来保存和访问我们的yaml文件存储的配置。

---
# For deployment in Docker containers
spring:
  profiles: docker
  cloud:
    config:
      server:
        git:
          uri: https://github.com/eriklupander/go-microservice-config.git

# Home-baked keystore for encryption. Of course, a real environment wouldn't expose passwords in a blog...
encrypt:
  key-store:
    location: file:/server.jks
    password: letmein
    alias: goblogkey
    secret: changeme

# Since we're running in Docker Swarm mode, disable Eureka Service Discovery
eureka:
  client:
    enabled: false

# Spring Cloud Config requires rabbitmq, use the service name.
spring.rabbitmq.host: rabbitmq
spring.rabbitmq.port: 5672

上面是配置服务器的配置文件。我们可以看到一些东西:

  • 我们告诉config-server到我们指定的URL来获取配置。
  • 一个密钥库,用于加密(自签名)和解密的密钥存储库。
  • 既然我们是运行在Docker Swarm模式下的,因此eureka的服务发现功能是禁用的。
  • 配置服务器期望找到一个RabbitMQ, 它的host名为rabbitmq, 端口为5672, host刚好是刚才我们给我们的RabbitMQ服务起的Docker Swarm服务名。

下面是配置服务器的Dockerfile内容, 相当简单:

FROM davidcaste/alpine-java-unlimited-jce

EXPOSE 8888

ADD ./build/libs/*.jar app.jar
ADD ./server.jks /

ENTRYPOINT ["java","-Dspring.profiles.active=docker","-Djava.security.egd=file:/dev/./urandom","-jar","/app.jar"]

不要介意java.security.egd的东西,这是这个文章系列中我们不需要关心的问题的解决办法。

这里有几点需要注意:

  • 我们使用的镜像是基于Alpine Linux的,没有限制Java的加密扩展安装的。 这是一个必要要求,如果我们想要Spring Cloud Config的加密/解密功能。
  • 容器镜像的根目录中我们加入了在提前准备好的keystore。

编译keystore

后面我们要使用加密属性,我们需要为配置服务器带一个自签名证书。(这里我们需要使用keytool工具。)

在/goblog/support/config-server目录下面执行下面的命令:

keytool -genkeypair -alias goblogkey -keyalg RSA -dname "CN=Go Blog,OU=Unit,O=Organization,L=City,S=State,C=SE" -keypass changeme -keystore server.jks -storepass letmein -validity 730

keytool是一个密钥和证书管理工具。它具有很多选项:

  • -certreq: 生成证书请求。
  • -changealias: 更改条目的别名。
  • -delete: 删除条目。
  • -exportcert: 导出证书。
  • -genkeypair: 生成密钥对。
  • -genseckey: 生成密钥。
  • -gencert: 根据证书请求生成证书。
  • -importcert: 导入证书或证书链。
  • -importpass: 导入口令。
  • -importkeystore: 从其他密钥库导入一个或所有条目。
  • -keypasswd: 更改条目的密钥口令。
  • -list: 列出密钥库中的条目。
  • -printcert: 打印证书内容。
  • -printcertreq: 打印证书请求的内容。
  • -printcrl: 打印 CRL 文件的内容。
  • -storepasswd: 更改密钥库的存储口令。

执行完上面命令后在当前目录下面生成一个server.jks keystore签名证书。你可以随意修改任何属性/密码, 主要记住相应的更改application.yml就可以了。

...
encrypt:
  key-store:
    location: file:/server.jks
    password: letmein
    alias: goblogkey
    secret: changeme
...

构建部署

是时候构建部署服务器了。 我们先创建一个shell脚本来节约我们时间,因为我们可能会需要重复做很多次。 记住 - 你需要Java运行时环境来构建它。 在/goblog目录,我们创建一个springcloud.sh的脚本文件。 我们把所有真正需要构建的东西都放这里(构建可能需要很长时间):

#!/bin/bash

cd support/config-server
./gradlew build
cd ../..
docker build -t someprefix/configserver support/config-server/
docker service rm configserver
docker service create --replicas 1 --name configserver -p 8888:8888 --network my_network --update-delay 10s --with-registry-auth  --update-parallelism 1 someprefix/configserver

然后运行脚本,需要修改脚本的可执行权限。
等待几分钟时间,然后检查它是否在docker服务中启动运行了:

> docker service ls

ID                  NAME                MODE                REPLICAS            IMAGE
39d26cc3zeor        rabbitmq            replicated          1/1                 someprefix/rabbitmq
eu00ii1zoe76        viz                 replicated          1/1                 manomarks/visualizer:latest
q36gw6ee6wry        accountservice      replicated          1/1                 someprefix/accountservice
t105u5bw2cld        quotes-service      replicated          1/1                 eriklupander/quotes-service:latest
urrfsu262e9i        dvizz               replicated          1/1                 eriklupander/dvizz:latest
w0jo03yx79mu        configserver        replicated          1/1                 someprefix/configserver

然后可以通过curl来加载accountservice的JSON配置。

> curl http://$ManagerIP:8888/accountservice/dev/master
{"name":"accountservice","profiles":["dev"],"label":"master","version":"b8cfe2779e9604804e625135b96b4724ea378736",
    "propertySources":[
    {"name":"https://github.com/eriklupander/go-microservice-config.git/accountservice-dev.yml",
    "source":
        {"server_port":6767,"server_name":"Accountservice DEV"}
    }]
}

(这里输出为了简洁,我们格式化了的)。实际配置保存在source属性中,在那里包含有所有.yml文件的属性值,它们以key-value对的形式出现。加载并解析source属性到Go语言可用的配置中, 是本文的中间件来完成的。

yaml配置文件

在我们深入到Go代码之前,我们先看看https://github.com/eriklupand...:

accountservice-dev.yml
accountservice-test.yml

这两个文件目前里边的内容都非常少。

server_port: 6767
server_name: Accountservice TEST
the_password: (we'll get back to this one)

这里我们只配置了我们希望绑定服务的HTTP端口号。真实的服务可能在里边设置很多东西。

使用解密/加密

Spring Cloud Config其中一个灵活的地方就是在配置文件中支持内置支持透明的解密被加密值。例如,可以看看accountservice-test.yml文件,那里我们有the_password属性:

server_port: 6767
server_name: Accountservice TEST
the_password: '{cipher}AQB1BMFCu5UsCcTWUwEQt293nPq0ElEFHHp5B2SZY8m4kUzzqxOFsMXHaH7SThNNjOUDGxRVkpPZEkdgo6aJFSPRzVF04SXOVZ6Rjg6hml1SAkLy/k1R/E0wp0RrgySbgh9nNEbhzqJz8OgaDvRdHO5VxzZGx8uj5KN+x6nrQobbIv6xTyVj9CSqJ/Btf/u1T8/OJ54vHwi5h1gSvdox67teta0vdpin2aSKKZ6w5LyQocRJbONUuHyP5roCONw0pklP+2zhrMCy0mXhCJSnjoHvqazmPRUkyGcjcY3LHjd39S2eoyDmyz944TKheI6rWtCfozLcIr/wAZwOTD5sIuA9q8a9nG2GppclGK7X649aYQynL+RUy1q7T7FbW/TzSBg='

使用字符串{cipher}作为解密前缀,我们的Spring Cloud配置服务器将在传递结果给服务器之前,知道如何自动为我们解密值。在所有配置都正确的运行实例中,curl请求REST API来获取这个配置将返回:

...
      "source": {
        "server_port": 6767,
        "server_name": "Accountservice TEST",
        "the_password": "password"
....

相当灵活吧, 对吧?the_password属性可以在公网服务器和Spring Cloud服务器(它可能在不安全环境或内部服务器外部可见的任何环境都不可用。)中用保存明文加密的字符串(如果你相信加密算法和签名密钥的完整性)透明解密这个属性为真正的password。

当然,你需要使用相同的key作为Spring Cloud Config的解密key来解密,有些事情可以通过配置服务器的HTTP API来完成。

curl http://$ManagerIP:8888/encrypt -d 'password'
AQClKEMzqsGiVpKx+Vx6vz+7ww00n... (rest omitted for brevity)

Viper

我们的基于Go的配置框架选择的是Viper。 Viper具有很好的API可以用, 并且很方便扩展, 并且不会妨碍我们正常的应用代码。虽然Viper不愿生的支持从Spring Cloud配置服务器加载配置, 但是我们可以写一小片代码可以帮我们做到这点。 Viper也可以处理很多种文件类型作为配置源 - 例如json, yaml, 普通属性文件。 Viper可以为我们从OS读取环境变量, 相当整洁。 一旦初始化并产生后,我们的配置总是可以使用各种的viper.Get函数获取来使用,确实很方便。

还记得在本文开头的图片吗? 好吧,如果不记得了, 我们再重复一遍:

clipboard.png

我们将让微服务启动的时候发起一个HTTP请求, 获取JSON响应的source部分,并将它们放到Viper中,这样我们就可以在那里获取我们的web服务器的端口号了。 让我们开始吧。

加载配置

正如使用curl的已展示示例,我们可以对配置服务器进行简单HTTP请求,那里我们只需要知道名字和我们的profile即可。 我们将添加一些解析flag的功能到我们的accountservice main.go, 因此在启动的时候,我们可以指定一个环境profile,也可以指定到配置服务器的可选的URI。

var appName = "accountservice"

// Init function, runs before main()
func init() {
    // Read command line flags
    profile := flag.String("profile", "test", "Environment profile, something similar to spring profiles")
    configServerUrl := flag.String("configServerUrl", "http://configserver:8888", "Address to config server")
    configBranch := flag.String("configBranch", "master", "git branch to fetch configuration from")
    flag.Parse()
    
    // Pass the flag values into viper.
    viper.Set("profile", *profile)
    viper.Set("configServerUrl", *configServerUrl)
    viper.Set("configBranch", *configBranch)
}

func main() {
    fmt.Printf("Starting %v\n", appName)

    // NEW - load the config
    config.LoadConfigurationFromBranch(
        viper.GetString("configServerUrl"),
        appName,
        viper.GetString("profile"),
        viper.GetString("configBranch"))
    initializeBoltClient()
    service.StartWebServer(viper.GetString("server_port"))    // NEW, use port from loaded config 
}

init函数比较简单,就是从命令行参数解析flag参数值,然后设置到viper中。 在main函数中,调用config.LoadConfigurationFromBranch, 从远程git仓库加载配置。这里config.LoadConfigurationFromBranch是在goblog/common/config/loader.go中定义的:

package config

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "net/http"

    "github.com/Sirupsen/logrus"
    "github.com/spf13/viper"
)

// LoadConfigurationFromBranch loads config from for example http://configserver:8888/accountservice/test/P8
func LoadConfigurationFromBranch(configServerURL string, appName string, profile string, branch string) {
    url := fmt.Sprintf("%s/%s/%s/%s", configServerURL, appName, profile, branch)
    logrus.Printf("Loading config from %s\n", url)
    body, err := fetchConfiguration(url)
    if err != nil {
        logrus.Errorf("Couldn't load configuration, cannot start. Terminating. Error: %v", err.Error())
        panic("Couldn't load configuration, cannot start. Terminating. Error: " + err.Error())
    }
    parseConfiguration(body)
}

func fetchConfiguration(url string) ([]byte, error) {
    defer func() {
        if r := recover(); r != nil {
            fmt.Println("Recovered in f", r)
        }
    }()
    logrus.Printf("Getting config from %v\n", url)
    resp, err := http.Get(url)
    if err != nil || resp.StatusCode != 200 {
        logrus.Errorf("Couldn't load configuration, cannot start. Terminating. Error: %v", err.Error())
        panic("Couldn't load configuration, cannot start. Terminating. Error: " + err.Error())
    }
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        panic("Error reading configuration: " + err.Error())
    }
    return body, err
}

func parseConfiguration(body []byte) {
    var cloudConfig springCloudConfig
    err := json.Unmarshal(body, &cloudConfig)
    if err != nil {
        panic("Cannot parse configuration, message: " + err.Error())
    }

    for key, value := range cloudConfig.PropertySources[0].Source {
        viper.Set(key, value)
        logrus.Printf("Loading config property %v => %v\n", key, value)
    }
    if viper.IsSet("server_name") {
        logrus.Printf("Successfully loaded configuration for service %s\n", viper.GetString("server_name"))
    }
}

type springCloudConfig struct {
    Name            string           `json:"name"`
    Profiles        []string         `json:"profiles"`
    Label           string           `json:"label"`
    Version         string           `json:"version"`
    PropertySources []propertySource `json:"propertySources"`
}

type propertySource struct {
    Name   string                 `json:"name"`
    Source map[string]interface{} `json:"source"`
}

本代码引入了三个包logrus, viper和amqp。因为我没有使用deps之类的包管理工具,因此我们在安装logrus和viper包的时候,这两个包也有依赖的第三方包,我们手工进行一些go get:

mkdir -p $GOPATH/src/golang.org/x
cd !$

git clone https://github.com/golang/text.git
git clone https://github.com/golang/sys.git


logrus go get问题
git clone https://github.com/golang/crypto.git

loadConfigurationFromBranch函数根据提供的参数获取配置并解析配置到viper中。

基本上来说就是我们发起一个带有appName, profile, git branch参数的HTTP GET请求到配置服务器, 然后解码响应JSON到在同一文件中声明的springCloudConfig结构体中。最后我们简单迭代cloudConfig.PropertySources[0]的所有key-value对, 并将它们分别放入viper, 这样我们可以随处都可以使用viper.GetString(key)或其他的Viper提供的其他Get方法来获取它们。

注意,如果我们连接配置服务器或解析响应发生错误的话,就会panic()整个微服务, 这样就会kill掉它。Docker Swarm将检测这个并尝试在数秒之内部署一个新的实例。 拥有这样行为的典型原因在于集群冷启动的时候,基于Go的微服务要比基于Sping Boot的配置服务器启动要快得多。让Swarm尝试几次,事情会自己解决掉的。

我们吧实际工作分割到一个公共函数和一些包级别的函数单元,主要是便于单元测试。 单元测试检查,以便我们能将JSON转换为实际的viper属性,看起来想GoConvey样式的测试:

func TestParseConfiguration(t *testing.T) {
    Convey("Given a JSON configuration response body", t, func() {
        var body = `{"name":"accountservice-dev","profiles":["dev"],"label":null,"version":null,"propertySources":[{"name":"file:/config-repo/accountservice-dev.yml","source":{"server_port":6767"}}]}`

        Convey("When parsed", func() {
            parseConfiguration([]byte(body))

            Convey("Then Viper should have been populated with values from Source", func() {
                So(viper.GetString("server_port"), ShouldEqual, "6767")
            })
        })
    })
}

然后在goblog/accountservice目录运行测试: go test ./...

更新Dockerfile

鉴于我们是从外部源加载配置,我们的服务需要一个查找的线索。 这可以在容器和服务启动的时候,通过使用flag作为命令行参数来执行。

FROM iron/base
EXPOSE 6767

ADD accountservice-linux-amd64 /
ADD healthchecker-linux-amd64 /

HEALTHCHECK --interval=3s --timeout=3s CMD ["./healthchecker-linux-amd64", "-port=6767"] || exit 1
ENTRYPOINT ["./accountservice-linux-amd64", "-configServerUrl=http://configserver:8888", "-profile=test", "-configBranch=P8"]

ENTRYPOINT现在提供了一些值,使得它可以到达配置,这样可以加载配置。

放入Swarm

你可能已经注意到我们不再使用6767端口号作为端口号的硬编码了, 也就是:

service.StartWebServer(viper.GetString("server_port"))

使用copyall.sh脚本重新构建并部署更新后的accountservice到Docker Swarm中。

所有事情都完成的时候,服务依然如本博客系列那样运行,例外的是它实际上是从外部和集中化配置服务器拿的端口号,而不是硬编码到编译二进制文件的端口号。

我们可以看看我们的accountservice的日志:

docker logs -f [containerid]
Starting accountservice
Loading config from http://configserver:8888/accountservice/test/P8
Loading config property the_password => password
Loading config property server_port => 6767
Loading config property server_name => Accountservice TEST
Successfully loaded configuration for service Accountservice TEST

这里我们又get新技能了,使用docker logs可以查看具体容器的日志:

Usage:    docker logs [OPTIONS] CONTAINER

Fetch the logs of a container

Options:
      --details        Show extra details provided to logs
  -f, --follow         Follow log output
      --since string   Show logs since timestamp (e.g. 2013-01-02T13:23:37) or relative (e.g. 42m for 42 minutes)
      --tail string    Number of lines to show from the end of the logs (default "all")
  -t, --timestamps     Show timestamps
      --until string   Show logs before a timestamp (e.g. 2013-01-02T13:23:37) or relative (e.g. 42m for 42 minutes)

docker logs支持查询某个时间点前后的日志。

实际上打印配置值是错误的做法,这里我们只是出于学习目的作出的输出。这里我们使用logrus来打印日志。

实时配置更新

1. 哦,我们用于某种目的的外部服务器的URL是否改变了呢?
2. 该死,怎么没有人告诉我!

假设我们很多人都遇到下面情况, 我们需要重建整个应用或至少重启来更新一些无效或改变的配置值。Spring Cloud具有刷新域的概念,其中bean可以实时更新,使用配置修改通过git commit hook传播。

下图提供了一个如何推送到git仓库,传播到我们Go微服务的概览:

clipboard.png

在本文中,我们使用的是github仓库,它完全不知道如何执行post-commit hook操作到我的笔记本的Spring Cloud Server, 因此我们将模拟一个提交挂钩使用Spring Cloud服务器的内置/监控端点来推送。

curl -H "X-Github-Event: push" -H "Content-Type: application/json" -X POST -d '{"commits": [{"modified": ["accountservice.yml"]}],"name":"some name..."}' -ki http://$ManagerIP:8888/monitor

Spring Cloud服务器将知道使用这个POST做什么,并在RabbitMQ(由Spring Cloud Bus抽象出来的)的交换上发送一个RefreshRemoteApplicationEvent。如果在成功引导了Spring Cloud Config之后,看看RabbitMQ的管理界面,应该创建了exchange。

clipboard.png

exchange和传统的消息控制例如publisher, consumer, queue的区别是什么?

Publisher -> Exchange -> (Routing) -> Queue -> Consumer

也就是消息被发布到exchange, 然后基于路由规则和可能注册了消费者的捆绑将消息副本分布到queue。

因此为了消费RefreshRemoteApplicationEvent消息(我更喜欢调用它们的refresh tokens), 所有我们需要做的是确保我们的Go服务在springCloudBus exchange上监听这样的消息, 如果我们的目标应用执行了配置重载。 下面我们来实现它。

Go语言中使用AMQP协议来消费消息

RabbitMQ中间人可以通过使用AMQP协议来访问。我们将使用一个叫做streadway/amqp的Go版本的AMQP客户端。 大部分AMQP/RabbitMQ管道代码都应该使用一些可复用工具,可能我们稍后会重构它。 基于这个例子的管道代码是来自streadway/amqp仓库的:

// This example declares a durable Exchange, an ephemeral (auto-delete) Queue,
// binds the Queue to the Exchange with a binding key, and consumes every
// message published to that Exchange with that routing key.
//
package main

import (
    "flag"
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "time"
)

var (
    uri          = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
    exchange     = flag.String("exchange", "test-exchange", "Durable, non-auto-deleted AMQP exchange name")
    exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom")
    queue        = flag.String("queue", "test-queue", "Ephemeral AMQP queue name")
    bindingKey   = flag.String("key", "test-key", "AMQP binding key")
    consumerTag  = flag.String("consumer-tag", "simple-consumer", "AMQP consumer tag (should not be blank)")
    lifetime     = flag.Duration("lifetime", 5*time.Second, "lifetime of process before shutdown (0s=infinite)")
)

func init() {
    flag.Parse()
}

func main() {
    c, err := NewConsumer(*uri, *exchange, *exchangeType, *queue, *bindingKey, *consumerTag)
    if err != nil {
        log.Fatalf("%s", err)
    }

    if *lifetime > 0 {
        log.Printf("running for %s", *lifetime)
        time.Sleep(*lifetime)
    } else {
        log.Printf("running forever")
        select {}
    }

    log.Printf("shutting down")

    if err := c.Shutdown(); err != nil {
        log.Fatalf("error during shutdown: %s", err)
    }
}

type Consumer struct {
    conn    *amqp.Connection
    channel *amqp.Channel
    tag     string
    done    chan error
}

func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) (*Consumer, error) {
    c := &Consumer{
        conn:    nil,
        channel: nil,
        tag:     ctag,
        done:    make(chan error),
    }

    var err error

    log.Printf("dialing %q", amqpURI)
    c.conn, err = amqp.Dial(amqpURI)
    if err != nil {
        return nil, fmt.Errorf("Dial: %s", err)
    }

    go func() {
        fmt.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error)))
    }()

    log.Printf("got Connection, getting Channel")
    c.channel, err = c.conn.Channel()
    if err != nil {
        return nil, fmt.Errorf("Channel: %s", err)
    }

    log.Printf("got Channel, declaring Exchange (%q)", exchange)
    if err = c.channel.ExchangeDeclare(
        exchange,     // name of the exchange
        exchangeType, // type
        true,         // durable
        false,        // delete when complete
        false,        // internal
        false,        // noWait
        nil,          // arguments
    ); err != nil {
        return nil, fmt.Errorf("Exchange Declare: %s", err)
    }

    log.Printf("declared Exchange, declaring Queue %q", queueName)
    queue, err := c.channel.QueueDeclare(
        queueName, // name of the queue
        true,      // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // noWait
        nil,       // arguments
    )
    if err != nil {
        return nil, fmt.Errorf("Queue Declare: %s", err)
    }

    log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)",
        queue.Name, queue.Messages, queue.Consumers, key)

    if err = c.channel.QueueBind(
        queue.Name, // name of the queue
        key,        // bindingKey
        exchange,   // sourceExchange
        false,      // noWait
        nil,        // arguments
    ); err != nil {
        return nil, fmt.Errorf("Queue Bind: %s", err)
    }

    log.Printf("Queue bound to Exchange, starting Consume (consumer tag %q)", c.tag)
    deliveries, err := c.channel.Consume(
        queue.Name, // name
        c.tag,      // consumerTag,
        false,      // noAck
        false,      // exclusive
        false,      // noLocal
        false,      // noWait
        nil,        // arguments
    )
    if err != nil {
        return nil, fmt.Errorf("Queue Consume: %s", err)
    }

    go handle(deliveries, c.done)

    return c, nil
}

func (c *Consumer) Shutdown() error {
    // will close() the deliveries channel
    if err := c.channel.Cancel(c.tag, true); err != nil {
        return fmt.Errorf("Consumer cancel failed: %s", err)
    }

    if err := c.conn.Close(); err != nil {
        return fmt.Errorf("AMQP connection close error: %s", err)
    }

    defer log.Printf("AMQP shutdown OK")

    // wait for handle() to exit
    return <-c.done
}

func handle(deliveries <-chan amqp.Delivery, done chan error) {
    for d := range deliveries {
        log.Printf(
            "got %dB delivery: [%v] %q",
            len(d.Body),
            d.DeliveryTag,
            d.Body,
        )
        d.Ack(false)
    }
    log.Printf("handle: deliveries channel closed")
    done <- nil
}

载goblog/accountservice/main.go main函数中添加新行, 为我们启动一个AMQP消费者:

func main() {
    fmt.Printf("Starting %v\n", appName)

    config.LoadConfigurationFromBranch(
            viper.GetString("configServerUrl"),
            appName,
            viper.GetString("profile"),
            viper.GetString("configBranch"))
    initializeBoltClient()
    
    // NEW
    go config.StartListener(appName, viper.GetString("amqp_server_url"), viper.GetString("config_event_bus"))   
    service.StartWebServer(viper.GetString("server_port"))
}

注意上面的StartListener的两个参数服务器url和事件bus两个属性,它们是在下面的文件中定义的:

server_port: 6767
server_name: Accountservice TEST
the_password: '{cipher}AQB1BMFC....'
amqp_server_url: amqp://guest:guest@rabbitmq:5672/
config_event_bus: springCloudBus
func StartListener(appName string, amqpServer string, exchangeName string) {
    err := NewConsumer(amqpServer, exchangeName, "topic", "config-event-queue", exchangeName, appName)
    if err != nil {
        log.Fatalf("%s", err)
    }

    log.Printf("running forever")
    select {}   // Yet another way to stop a Goroutine from finishing...
}

NewConsumer是样板代码的实际位置,这里先忽略过它,直接看看实际处理进来请求的代码:

func handleRefreshEvent(body []byte, consumerTag string) {
    updateToken := &UpdateToken{}
    err := json.Unmarshal(body, updateToken)
    if err != nil {
        log.Printf("Problem parsing UpdateToken: %v", err.Error())
    } else {
        if strings.Contains(updateToken.DestinationService, consumerTag) {
            log.Println("Reloading Viper config from Spring Cloud Config server")

            // Consumertag is same as application name.
            LoadConfigurationFromBranch(
                viper.GetString("configServerUrl"),
                consumerTag,
                viper.GetString("profile"),
                viper.GetString("configBranch"))
        }
    }
}

// {"type":"RefreshRemoteApplicationEvent","timestamp":1494514362123,"originService":"config-server:docker:8888","destinationService":"xxxaccoun:**","id":"53e61c71-cbae-4b6d-84bb-d0dcc0aeb4dc"}
type UpdateToken struct {
    Type string `json:"type"`
    Timestamp int `json:"timestamp"`
    OriginService string `json:"originService"`
    DestinationService string `json:"destinationService"`
    Id string `json:"id"`
}

这个代码尝试解析到达的消息为UpdateToken结构体,并且如果destinationService匹配我们的consumerTag(也就是 appName accountservice), 我们就调用同样的最初服务启动时调用的LoadConfigurationFromBranch函数。

请注意在实际场景中,NewConsumer函数和一般的消息处理代码将需要更多的错误处理、确保只处理恰当的消息等等工作。

单元测试

让我们为handleRefreshEvent()函数写一个单元测试。 创建一个新的测试文件:

var SERVICE_NAME = "accountservice"

func TestHandleRefreshEvent(t *testing.T) {
    // Configure initial viper values
    viper.Set("configServerUrl", "http://configserver:8888")
    viper.Set("profile", "test")
    viper.Set("configBranch", "master")

    // Mock the expected outgoing request for new config
    defer gock.Off()
    gock.New("http://configserver:8888").
        Get("/accountservice/test/master").
        Reply(200).
        BodyString(`{"name":"accountservice-test","profiles":["test"],"label":null,"version":null,"propertySources":[{"name":"file:/config-repo/accountservice-test.yml","source":{"server_port":6767,"server_name":"Accountservice RELOADED"}}]}`)

Convey("Given a refresh event received, targeting our application", t, func() {
        var body = `{"type":"RefreshRemoteApplicationEvent","timestamp":1494514362123,"originService":"config-server:docker:8888","destinationService":"accountservice:**","id":"53e61c71-cbae-4b6d-84bb-d0dcc0aeb4dc"}
`
        Convey("When handled", func() {
            handleRefreshEvent([]byte(body), SERVICE_NAME)

            Convey("Then Viper should have been re-populated with values from Source", func() {
                So(viper.GetString("server_name"), ShouldEqual, "Accountservice RELOADED")
            })
        })
    })
}

我希望BDD样式的GoConvey传达(双关语!)测试如何工作。 注意我们如何使用gock来拦截对外的请求新配置的HTTP请求,以及我们预先产生的带有一些初始值的viper。

运行它

是时候测试了。 重新使用copyall.sh脚本部署服务。

检查accountservice的日志:

> docker logs -f [containerid]
Starting accountservice
... [truncated for brevity] ...
Successfully loaded configuration for service Accountservice TEST    <-- LOOK HERE!!!!
... [truncated for brevity] ...
2017/05/12 12:06:36 dialing amqp://guest:guest@rabbitmq:5672/
2017/05/12 12:06:36 got Connection, getting Channel
2017/05/12 12:06:36 got Channel, declaring Exchange (springCloudBus)
2017/05/12 12:06:36 declared Exchange, declaring Queue (config-event-queue)
2017/05/12 12:06:36 declared Queue (0 messages, 0 consumers), binding to Exchange (key 'springCloudBus')
2017/05/12 12:06:36 Queue bound to Exchange, starting Consume (consumer tag 'accountservice')
2017/05/12 12:06:36 running forever

现在对accountservice-test.yml和service name进行修改,并使用前面展示的使用monitor API POST来伪造一个提交hook:

我修改了accountservice-test.yml文件和它的service name属性,从accountservice TEST到Temporary test string, 然后推送改变。

接着,我们使用curl来让我们的Spring Cloud Config服务器知道这些更新:

> curl -H "X-Github-Event: push" -H "Content-Type: application/json" -X POST -d '{"commits": [{"modified": ["accountservice.yml"]}],"name":"what is this?"}' -ki http://192.168.99.100:8888/monitor

如果所有都正常工作,就会触发一个refresh token从Config服务器,我们的accountservice就会捡起它. 再次检查下log:

> docker logs -f [containerid]
2017/05/12 12:13:22 got 195B consumer: [accountservice] delivery: [1] routingkey: [springCloudBus] {"type":"RefreshRemoteApplicationEvent","timestamp":1494591202057,"originService":"config-server:docker:8888","destinationService":"accountservice:**","id":"1f421f58-cdd6-44c8-b5c4-fbf1e2839baa"}
2017/05/12 12:13:22 Reloading Viper config from Spring Cloud Config server
Loading config from http://configserver:8888/accountservice/test/P8
Loading config property server_port => 6767
Loading config property server_name => Temporary test string!
Loading config property amqp_server_url => amqp://guest:guest@rabbitmq:5672/
Loading config property config_event_bus => springCloudBus
Loading config property the_password => password
Successfully loaded configuration for service Temporary test string!      <-- LOOK HERE!!!!

正如你所见的,最后一行打印了"Successfully loaded configuration for service Temporary test string!", 源代码如下:

if viper.IsSet("server_name") {
    fmt.Printf("Successfully loaded configuration for service %s\n", viper.GetString("server_name"))
} 

也就是说,我们已经动态修改了的之前存储在Viper中的属性值, 而没告诉我们的服务!这是真正的酷!

重要提示: 虽然动态更新属性是非常酷的,但是它本身不会更新这些东西,比如我们运行服务器的端口,池中已存在连接对象, 或RabbitMQ中间人的活动连接。 这些类型的已运行东西需要花费一些时间来使用新的配置来重启, 这些内容超出了本文的范围。

Footprint及性能

在启动时添加配置加载不应该影响运行时性能, 事实上它确实不影响。每秒1千个请求和之前具有同样的吞吐,CPU和内存使用。相信我的话或者你自己试试。我们将在第一次启动后快速查看内存使用情况:

CONTAINER                                    CPU %               MEM USAGE / LIMIT     MEM %               NET I/O             BLOCK I/O           PIDS
accountservice.1.pi7wt0wmh2quwm8kcw4e82ay4   0.02%               4.102MiB / 1.955GiB   0.20%               18.8kB / 16.5kB     0B / 1.92MB         6
configserver.1.3joav3m6we6oimg28879gii79     0.13%               568.7MiB / 1.955GiB   28.41%              171kB / 130kB       72.9MB / 225kB      50
rabbitmq.1.kfmtsqp5fnw576btraq19qel9         0.19%               125.5MiB / 1.955GiB   6.27%               6.2MB / 5.18MB      31MB / 414kB        75
quotes-service.1.q81deqxl50n3xmj0gw29mp7jy   0.05%               340.1MiB / 1.955GiB   16.99%              2.97kB / 0B         48.1MB / 0B         30

甚至和AMQP、Viper作为配置框架的集成,我们最初运行信息大概4MB左右。我们的Spring Boot实现的配置服务器使用了超过500MB的内存,而RabbitMQ(我认为是用Erlang写的)使用125MB。

我可以肯定的是,我们可以使用一些标准的JVM -xmx参数可以让配置服务器的尺寸下降到256MB初始化堆尺寸,但是它绝对是需要大量RAM的。然而,在生产环境中我希望运行2个配置服务器,而非几十个或几百个。 当谈及支持服务,从Spring Cloud生态,内存使用并不是什么大事,因为我们通常不会有这种服务的多余一个或几个实例。

备忘

// 查找需要加入的swarm的token, 需要在Leader中查询。
docker swarm join-token -q worker

// 以worker节点的形式加入Swarm
docker swarm join --token tokenstring worker ip:port
docker swarm join --token tokenstring manager ip:port

// ssh到具体的机器
docker-machine ssh docker-name # swarm-manager-1

总结

在本文中我们部署了一个Spring Cloud配置服务器,和它的RabbitMQ依赖到我们的Swarm中。然后我们写了一些Go代码,使用一些简单的HTTP, JSON和Viper框架从配置服务器启动时加载配置并填充到Viper中,方便我们整个微服务代码方便使用。

在下一节中,我们会继续探索AMQP和RabbitMQ, 深入更多细节,看看我们自己如何发送一些消息。

参考连接


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1474 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传