Go实现简单负载均衡

mob604756f04b77 · · 1410 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

 平台开发 360云计算 

女主宣言



今天小编为大家分享一篇关于Go实现简单的负载均衡器的文章,只是对负载均衡进行了基础的功能实现,有助于对负载均衡的理解。如果有兴趣,也可以以此为基础进行功能扩展,希望能对大家有所帮助。

PS:丰富的一线技术、多元化的表现形式,尽在“360云计算”,点关注哦!

载均衡在Web体系架构中一直是扮演着非常关键的角色。它可以实现在一组后端服务之间进行负载分配,从而增加服务的可扩展性。由于配置了多个后端服务,因此可以提高服务的可用性。在发生故障的时,负载均衡器可以帮我们把请求转发到正常运行的节点上。

在使用过像 Nginx 之类的专业的负载均衡之后,为了加深对负载均衡的原理理解,本次我们使用 Golang 也来实现一个简单的负载均衡。 

1

工作原理

均衡器可以使用不同的分配策略来分发请求到后端服务。

例如:

  • 轮询 - 平均的分配负载,假定所有后端服务具有相同的处理能力

  • 加权轮询 - 根据后端服务的处理能力,可以赋予相应权重

  • 最少连接 - 负载分配到活跃连接最少的服务器上

对于我们将要实现的简单负载均衡,我们首先尝试实现这些方式中最简单的一种,即轮询方式。

图片

轮询在实现方面非常简单,它以均等的机会让后端服务轮流执行请求任务。

图片

如图所示,请求周期性的轮流转发给后端服务。但是我们不能直接这样简单来实现,需要考虑其他因素。

如果后端服务宕机了怎么办?我们肯定不想把流量转发到这台挂掉的节点上。因此,除非有附加条件,否则不能直接轮流转发负载。我们需要将流量仅路由到已启动并正常运行的后端服务节点上。

2

数据结构

思路梳理完成,现在我们需要一种方法,来跟踪关于后端的所有详细信息。我们需要知道它是否存活,同时还需要跟踪 URL。

可以简单地定义下面的结构来保存后端服务信息。

type Backend struct {
 URL          *url.URL
 Alive        bool
 mux          sync.RWMutex
 ReverseProxy *httputil.ReverseProxy
}

同时还需要另外一种方法,来跟踪我们负载均衡中所有的后端服务。为此,我们可以直接使用切片及一个计数器来实现。

type ServerPool struct {
 backends []*Backend
 current  uint64
}

3

ReverseProxy

如我们上面确定,负载均衡的唯一目的就是将流量路由到不同的后端服务,并将结果返回给原始请求端。

Go 的文档中有相关的描述:

ReverseProxy 是一个HTTP处理程序,它接收传入的请求并将其发送到另一台服务器,并将响应代理回客户端。

这正是我们想要的。所以不需要重复造轮子了。我们可以简单的使用它来转发我们的请求。

u, _ := url.Parse("http://localhost:8080")
rp := httputil.NewSingleHostReverseProxy(u)
 
// initialize your server and add this as handler
http.HandlerFunc(rp.ServeHTTP)

使用 httputil.NewSingleHostReverseProxy(url) 我们可以初始化一个反向代理,它将请求转发给传入的 url。在上面的示例中,所有请求都将转发到 localhost:8080 上,并将结果返回给原始请求端。

如果我们看一下 ServeHTTP 方法签名,会发现它具有 HTTP 处理程序的签名,这就是为什么我们可以将其传递给 http 中的 HandlerFunc 的原因。

对于我们简单的负载均衡实现,我们可以使用后端服务的 URL 来初始化 ReverseProxy,以便 ReverseProxy 将请求发送到URL。

4

选择过程

在下一个轮询选择中,我们需要跳过挂掉的后端服务。这里我们需要一种计数方法。

多个客户端将连接到负载均衡中,并且当每个客户端请求下一个连接时,都有可能会发生竞争情况。所以为了防止这种情况,我们可以使用 mutex 来锁定 ServerPool。但这将是一个过大的杀伤力,我们原本不想锁定 ServerPool。而只想将计数器增加一。

为了满足该要求,理想的解决方案是原子增加。可以通过 Go 的原子包来实现。

func (s *ServerPool) NextIndex() int {
 return int(atomic.AddUint64(&s.current, uint64(1)) % uint64(len(s.backends)))
}

这里,我们以原子方式将当前值进行加,并通过修改切片的长度来返回索引。这意味着该值将始终在 0 到切片的长度之间。但是,我们只关心索引,而不是总数。

5

取回一个存活后端

GetNext() 始终返回一个介于 0 和切片长度之间的值。在任何时候,我们都会得到下一个端点,如果它不存在,我们将循环搜索整个切片。

image.png

如上图所示,我们想从整个列表的下一个元素开始遍历,这可以简单地通过遍历 next + length 来完成。但是要选择一个索引,我们希望将其限制在切片长度之间。 通过修改可以轻松完成此操作。

通过搜索找到可用后端,将其标记为当前后端。

// GetNextPeer returns next active peer to take a connection
func (s *ServerPool) GetNextPeer() *Backend {
 // loop entire backends to find out an Alive backend
 next := s.NextIndex()
 l := len(s.backends) + next // start from next and move a full cycle
 for i := next; i < l; i++ {
   idx := i % len(s.backends) // take an index by modding with length
   // if we have an alive backend, use it and store if its not the original one
   if s.backends[idx].IsAlive() {
     if i != next {
       atomic.StoreUint64(&s.current, uint64(idx)) // mark the current one
     }
     return s.backends[idx]
   }
 }
 return nil
}

6

处理竞争

有一个严重的问题,我们的 Backend 数据结构中有个变量 Alive,可以由不同的 goroutine 同时修改或访问。

我们知道,将会有更多的 goroutine 从中读取而不是写入。因此,我们使用RWMutex 来序列化对变量 Alive 的访问。

// SetAlive for this backend
func (b *Backend) SetAlive(alive bool) {
 b.mux.Lock()
 b.Alive = alive
 b.mux.Unlock()
}

// IsAlive returns true when backend is alive
func (b *Backend) IsAlive() (alive bool) {
 b.mux.RLock()
 alive = b.Alive
 b.mux.RUnlock()
 return
}

7

请求处理

我们可以实现下面简单的方法来负载均衡请求。

// lb load balances the incoming request
func lb(w http.ResponseWriter, r *http.Request) {
 peer := serverPool.GetNextPeer()
 if peer != nil {
   peer.ReverseProxy.ServeHTTP(w, r)
   return
 }
 http.Error(w, "Service not available", http.StatusServiceUnavailable)
}

方法可以简单地作为 HandlerFunc 传递给 http 服务器。

server := http.Server{
 Addr:    fmt.Sprintf(":%d", port),
 Handler: http.HandlerFunc(lb),
}

8

只路由到健康节点

现在 lb 有一个很严重的问题,我们不知道哪一个后端节点是健康的。所以还需要一个后端服务健康检查。

proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) {
 log.Printf("[%s] %s\n", serverUrl.Host, e.Error())
 retries := GetRetryFromContext(request)
 if retries < 3 {
   select {
     case <-time.After(10 * time.Millisecond):
       ctx := context.WithValue(request.Context(), Retry, retries+1)
       proxy.ServeHTTP(writer, request.WithContext(ctx))
     }
     return
   }

 // after 3 retries, mark this backend as down
 serverPool.MarkBackendStatus(serverUrl, false)

 // if the same request routing for few attempts with different backends, increase the count
 attempts := GetAttemptsFromContext(request)
 log.Printf("%s(%s) Attempting retry %d\n", request.RemoteAddr, request.URL.Path, attempts)
 ctx := context.WithValue(request.Context(), Attempts, attempts+1)
 lb(writer, request.WithContext(ctx))
}

我们利用闭包设计错误处理程序。它允许我们将外部变量捕获到方法中。它将检查现有的重试计数,如果小于3,我们将再次向相同的后端发送相同的请求。

每次重试失败后,我们将此后端标记为下线。

我们可以简单地从请求中获取尝试次数,如果超过最大次数,则消除请求。

// lb load balances the incoming request
func lb(w http.ResponseWriter, r *http.Request) {
 attempts := GetAttemptsFromContext(r)
 if attempts > 3 {
   log.Printf("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path)
   http.Error(w, "Service not available", http.StatusServiceUnavailable)
   return
 }

 peer := serverPool.GetNextPeer()
 if peer != nil {
   peer.ReverseProxy.ServeHTTP(w, r)
   return
 }
 http.Error(w, "Service not available", http.StatusServiceUnavailable)
}

使用 context

context 包允许将有用的数据存储在 Http 请求中。我们利用它来跟踪请求的特定数据,例如尝试次数和重试次数。

const (
 Attempts int = iota
 Retry
)

然后,我们可以像使用 HashMap 一样来检索值。

// GetAttemptsFromContext returns the attempts for request
func GetRetryFromContext(r *http.Request) int {
 if retry, ok := r.Context().Value(Retry).(int); ok {
   return retry
 }
 return 0
}

9

被动健康检查

被动健康检查可以恢复挂掉的节点。以固定的间隔 ping 后端节点来检查状态。

// isAlive checks whether a backend is Alive by establishing a TCP connection
func isBackendAlive(u *url.URL) bool {
 timeout := 2 * time.Second
 conn, err := net.DialTimeout("tcp", u.Host, timeout)
 if err != nil {
   log.Println("Site unreachable, error: ", err)
   return false
 }
 _ = conn.Close() // close it, we dont need to maintain this connection
 return true
}

现在,我们可以递归检查节点并标记状态。

// HealthCheck pings the backends and update the status
func (s *ServerPool) HealthCheck() {
 for _, b := range s.backends {
   status := "up"
   alive := isBackendAlive(b.URL)
   b.SetAlive(alive)
   if !alive {
     status = "down"
   }
   log.Printf("%s [%s]\n", b.URL, status)
 }
}

定期执行,可以在 Go 中启动一个计时器,然后使用通道来监听事件。

// healthCheck runs a routine for check status of the backends every 20 secs
func healthCheck() {
 t := time.NewTicker(time.Second * 20)
 for {
   select {
   case <-t.C:
     log.Println("Starting health check...")
     serverPool.HealthCheck()
     log.Println("Health check completed")
   }
 }
}

最后,我们单独启一个 goroutine 来运行它。

go healthCheck()

总结

以上我们只是实现了一个简单的负载均衡,当然我们还可以对它做很多的改善提升。

例如:

  • 使用堆来整理存活的后端节点可以减少搜索范围

  • 收集统计数据

  • 实现加权轮询/最少连接

  • 添加配置文件的支持

  • ...

如果对负载均衡有兴趣,可以做相应的功能扩展开发。



有疑问加站长微信联系(非本文作者)

本文来自:51CTO博客

感谢作者:mob604756f04b77

查看原文:Go实现简单负载均衡

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1410 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传