使用 Go 语言徒手撸一个简单的负载均衡器

mob604756ef1373 · · 769 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

图片

作者 | kasvith
译者 | 无明策划 | 赵钰莹负载均衡器在 Web 架构中扮演着非常重要的角色,被用于为多个后端分发流量负载,提升服务的伸缩性。负载均衡器后面配置了多个服务,在某个服务发生故障时,负载均衡器可以很快地选择另一个可用的服务,所以整体的服务可用性得到了提升。

在使用了专业的负载均衡器(比如 NGINX)之后,我试着自己开发一个简单的负载均衡器。我选择使用 Go 语言来实现,因为它对并发支持得非常好,还提供了丰富的标准库,只需要几行代码就可以开发出高性能的应用程序。

自研负载均衡器的工作原理   

负载均衡器在向后端服务分发流量负载时可以使用几种策略。

  • 轮询(Round Robin)——均匀地分发流量负载,假设所有后端服务都具有同样的处理能力;

  • 加权轮询(Weighted Round Robin)——根据后端服务的处理能力加权;

  • 最少连接(Least Connections)——优先把流量负载分发给连接最少的后端。

我打算实现最简单的策略,即轮询。

图片

简单的轮询负载均衡器

轮询选择   

轮询的原理非常简单,后端服务有平等的机会处理任务。

图片

轮询处理请求

如上图所示,轮询过程是循环不断的,但我们不能直接使用这种方式。

如果其中的一个后端发生故障该怎么办?我们当然不希望把流量定向给它。我们只能把流量路由给正常运行的服务。

定义结构体   

我们需要知道所有后端服务器的状态,比如一个服务是死了还是活着,还要跟踪它们的 url。

我们可以定义一个结构体来保存后端的信息。

type Backend struct {  URL          *url.URL  Alive        bool  mux          sync.RWMutex  ReverseProxy *httputil.ReverseProxy}

我们还需要一种方式来跟踪所有后端,以及一个计算器变量。

type ServerPool struct {  backends []*Backend  current  uint64}
使用 ReverseProxy   

之前说过,负载均衡器的作用是将流量负载分发到后端的服务器上,并将结果返回给客户端。

根据 Go 语言文档的描述:

ReverseProxy 是一种 HTTP 处理器,它接收入向请求,将请求发送给另一个服务器,然后将响应发送回客户端。

这刚好是我们想要的,所以我们没有必要重复发明轮子。我们可以直接使用 ReverseProxy 来中继初始请求。

u, _ := url.Parse("http://localhost:8080")rp := httputil.NewSingleHostReverseProxy(u)
// 初始化服务器,并添加处理器http.HandlerFunc(rp.ServeHTTP)

我们使用 httputil.NewSingleHostReverseProxy(url) 初始化一个反向代理,这个反向代理可以将请求中继到指定的 url。在上面的例子中,所有的请求都会被中继到 localhost:8080,结果被发送给初始客户端。

如果看一下 ServeHTTP 方法的签名,我们会发现它返回的是一个 HTTP 处理器,所以我们可以将它传给 http 的 HandlerFunc。

在我们的例子中,可以使用 Backend 里的 URL 来初始化 ReverseProxy,这样反向代理就会把请求路由给指定的 URL。

选择的过程   

在选择下一个服务器时,我们需要跳过已经死掉的服务器,但不管怎样,我们都需要一个计数器。

因为有很多客户端连接到负载均衡器,所以发生竟态条件是不可避免的。为了防止这种情况,我们需要使用 mutex 给 ServerPool 加锁。但这样做对性能会有影响,更何况我们并不是真想要给 ServerPool 加锁,我们只是想要更新计数器。

最理想的解决方案是使用原子操作,Go 语言的 atomic 包为此提供了很好的支持。

func (s *ServerPool) NextIndex() int {  return int(atomic.AddUint64(&s.current, uint64(1)) % uint64(len(s.backends)))}

我们通过原子操作递增 current 的值,并通过对 slice 的长度取模来获得当前索引值。所以,返回值总是介于 0 和 slice 的长度之间,毕竟我们想要的是索引值,而不是总的计数值。

选择可用的后端   

我们需要循环将请求路由到后端的每一台服务器上,但要跳过已经死掉的服务。

GetNext() 方法总是返回一个介于 0 和 slice 长度之间的值,如果这个值对应的服务器不可用,我们需要遍历一遍 slice。

图片

遍历一遍 slice

如上图所示,我们将从 next 位置开始遍历整个列表,但在选择索引时,需要保证它处在 slice 的长度之内,这个可以通过取模运算来保证。

在找到可用的服务器后,我们将它标记为当前可用服务器。

上述操作对应的代码如下。

// GetNextPeer 返回下一个可用的服务器func (s *ServerPool) GetNextPeer() *Backend {  // 遍历后端列表,找到可用的服务器  next := s.NextIndex()  l := len(s.backends) + next // 从 next 开始遍历  for i := next; i < l; i++ {    idx := i % len(s.backends) // 通过取模计算获得索引    // 如果找到一个可用的服务器,将它作为当前服务器。如果不是初始的那个,就把它保存下来    if s.backends[idx].IsAlive() {      if i != next {        atomic.StoreUint64(&s.current, uint64(idx)) // 标记当前可用服务器      }      return s.backends[idx]    }  }  return nil}
避免竟态条件   

我们还需要考虑到一些情况,比如不同的 goroutine 会同时访问 Backend 结构体里的一个变量。

我们知道,读取这个变量的 goroutine 比修改这个变量的要多,所以我们使用 RWMutex 来串行化对 Alive 的访问操作。

// SetAlivefunc (b *Backend) SetAlive(alive bool) {  b.mux.Lock()  b.Alive = alive  b.mux.Unlock()}
// 如果后端还活着,IsAlive 返回 truefunc (b *Backend) IsAlive() (alive bool) {  b.mux.RLock()  alive = b.Alive  b.mux.RUnlock()  return}
对请求进行负载均衡   

在有了上述的这些东西之后,接下来就可以用下面这个简单的办法来对请求进行负载均衡。只有当所有的后端服务都死掉它才会退出。

// lb 对入向请求进行负载均衡func lb(w http.ResponseWriter, r *http.Request) {  peer := serverPool.GetNextPeer()  if peer != nil {    peer.ReverseProxy.ServeHTTP(w, r)    return  }  http.Error(w, "Service not available", http.StatusServiceUnavailable)}

这个方法可以作为 HandlerFunc 传给 http 服务器。

server := http.Server{  Addr:    fmt.Sprintf(":%d", port),  Handler: http.HandlerFunc(lb),}

只将流量路由给活跃的服务器  

现在的 lb 方法存在一个严重的问题,我们并不知道后端服务是否处于正常的运行状态。为此,我们需要尝试发送请求,检查一下它是否正常。

我们可以通过两种方法来达到目的:

  • 主动(Active):在处理当前请求时,如果发现当前的后端没有响应,就把它标记为已宕机。

  • 被动(Passive):在固定的时间间隔内对后端服务器执行 ping 操作,以此来检查服务器的状态。

主动模式   

在发生错误时,ReverseProxy 会触发 ErrorHandler 回调函数,我们可以利用它来检查故障。

proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) {  log.Printf("[%s] %s\n", serverUrl.Host, e.Error())  retries := GetRetryFromContext(request)  if retries < 3 {    select {      case <-time.After(10 * time.Millisecond):        ctx := context.WithValue(request.Context(), Retry, retries+1)        proxy.ServeHTTP(writer, request.WithContext(ctx))      }      return    }
 // 在三次重试之后,把这个后端标记为宕机  serverPool.MarkBackendStatus(serverUrl, false)
 // 同一个请求在尝试了几个不同的后端之后,增加计数  attempts := GetAttemptsFromContext(request)  log.Printf("%s(%s) Attempting retry %d\n", request.RemoteAddr, request.URL.Path, attempts)  ctx := context.WithValue(request.Context(), Attempts, attempts+1)  lb(writer, request.WithContext(ctx))}

我们使用强大的闭包来实现错误处理器,它可以捕获外部变量错误。它会检查重试次数,如果小于 3,就把同一个请求发送给同一个后端服务器。之所以要进行重试,是因为服务器可能会发生临时错误,在经过短暂的延迟(比如服务器没有足够的 socket 来接收请求)之后,服务器又可以继续处理请求。我们使用了一个计时器,把重试时间间隔设定在 10 毫秒左右。

在重试失败之后,我们就把这个后端标记为宕机。

接下来,我们要找出新的可用后端。我们使用 context 来维护重试次数。在增加重试次数后,我们把它传回 lb,选择一个新的后端来处理请求。

但我们不能不加以限制,所以我们会在进一步处理请求之前检查是否达到了最大的重试上限。

我们从请求里拿到重试次数,如果已经达到最大上限,就终结这个请求。

// lb 对传入的请求进行负载均衡func lb(w http.ResponseWriter, r *http.Request) {  attempts := GetAttemptsFromContext(r)  if attempts > 3 {    log.Printf("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path)    http.Error(w, "Service not available", http.StatusServiceUnavailable)    return  }
 peer := serverPool.GetNextPeer()  if peer != nil {    peer.ReverseProxy.ServeHTTP(w, r)    return  }  http.Error(w, "Service not available", http.StatusServiceUnavailable)}
context 的使用   

我们可以利用 context 在 http 请求中保存有用的信息,用它来跟踪重试次数。

首先,我们需要为 context 指定键。我们建议使用不冲突的整数值作为键,而不是字符串。Go 语言提供了 iota 关键字,可以用来实现递增的常量,每一个常量都包含了唯一值。这是一种完美的整型键解决方案。

const (  Attempts int = iota  Retry)

然后我们就可以像操作 HashMap 那样获取这个值。默认返回值要视情况而定。

// GetAttemptsFromContext 返回尝试次数func GetRetryFromContext(r *http.Request) int {  if retry, ok := r.Context().Value(Retry).(int); ok {    return retry  }  return 0}
被动模式   

被动模式就是定时对后端执行 ping 操作,以此来检查它们的状态。

我们通过建立 TCP 连接来执行 ping 操作。如果后端及时响应,我们就认为它还活着。当然,如果你喜欢,也可以改成直接调用某个端点,比如 /status。切记,在执行完操作后要关闭连接,避免给服务器造成额外的负担,否则服务器会一直维护连接,最后把资源耗尽。

// isAlive 通过建立 TCP 连接检查后端是否还活着func isBackendAlive(u *url.URL) bool {  timeout := 2 * time.Second  conn, err := net.DialTimeout("tcp", u.Host, timeout)  if err != nil {    log.Println("Site unreachable, error: ", err)    return false  }  _ = conn.Close() // 不需要维护连接,把它关闭  return true}

现在我们可以遍历服务器,并标记它们的状态。

// HealthCheck 对后端执行 ping 操作,并更新状态func (s *ServerPool) HealthCheck() {  for _, b := range s.backends {    status := "up"    alive := isBackendAlive(b.URL)    b.SetAlive(alive)    if !alive {      status = "down"    }    log.Printf("%s [%s]\n", b.URL, status)  }}

我们可以启动定时器来定时发起 ping 操作。

// healthCheck 返回一个 routine,每 2 分钟检查一次后端的状态func healthCheck() {  t := time.NewTicker(time.Second * 20)  for {    select {    case <-t.C:      log.Println("Starting health check...")      serverPool.HealthCheck()      log.Println("Health check completed")    }  }}

在上面的例子中,<-t.C 每 20 秒返回一个值,select 会检测到这个事件。在没有 default case 的情况下,select 会一直等待,直到有满足条件的 case 被执行。

最后,使用单独的 goroutine 来执行。

go healthCheck()
结论   

这篇文章提到了很多东西:

  • 轮询;

  • Go 语言标准库里的 ReverseProxy;

  • mutex;

  • 原子操作;

  • 闭包;

  • 回调;

  • select。

这个简单的负载均衡器还有很多可以改进的地方:

  • 使用堆来维护后端的状态,以此来降低搜索成本;

  • 收集统计信息;

  • 实现加权轮询或最少连接策略;

  • 支持文件配置。

代码地址:

https://github.com/kasvith/simplelb/

原文连接:

https://kasvith.github.io/posts/lets-create-a-simple-lb-go/




有疑问加站长微信联系(非本文作者)

本文来自:51CTO博客

感谢作者:mob604756ef1373

查看原文:使用 Go 语言徒手撸一个简单的负载均衡器

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

769 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传