「推荐系统从0到1」服务发现

金小锋 · · 297 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

奥利给!

前沿

首先谈谈我对推荐系统的引擎和算法的理解。

现在市面上讲起推荐系统,大多都是讲各种算法,讲的天花乱坠,高深莫测,其实很多算法都是大同小异,核心思想是差不多的,只不过实现手段略有差异。而在工业上,各种复杂算法能够落地的,我认为不多,大部分的厂商,运用的算法都是很集中的那一部分算法。

一套好的推荐系统,对于引擎是非常依赖的,实验显示,响应时长与各项指标之间都是有直接关联的,响应时长越长,指标越低。

作为一个朴实的推荐码农,我还是想从基础做起,朴朴实实,脚踏实地,先把引擎部分做好。当然,算法后面也会有,毕竟引擎和算法缺一不可。

那么,废话少说,推荐引擎,搞起吧!

服务发现

既然是搞引擎,也就是后端,当然要先把架构先搭建起来。

后端服务,微服务已经成为了当前的主流,具有非常多的优点,比如高内聚,单独部署,各自负载均衡等,当然缺点也有,通信更复杂了等。具体就不在这里展开了,有兴趣的兄弟们可以百度,google一下。

而微服务之间的通信中,客户端如何确定服务端的地址,就需要服务发现了。

在整个流程中,可以分为服务端的要做的,以及客户端要做的,下面依次来看一下。

服务端比较简单,只需要将自己的信息存储到某个存储中。

客户端呢,首先要从存储中拿到服务端信息列表,然后根据一些负载均衡的原则,选择一个地址,最终来调用。

是不是原理上非常简单!那么进入实操吧!

etcd介绍

以前有zookeeper,而zookeeper可以看到,早就不再维护更新了。

而etcd,用go语言开发,因kubernetes而闻名,在kubernetes中,使用etcd作为分布式存储获取分布式锁。

所以我们当然用更年轻,更轻量,并且也非常稳定的etcd搞了!就是这么喜新厌旧= =

etcd使用raft算法实现的一致性,至于raft算法,可以看下面这个动画演示,很完美生动。

raft动画演示

etcd实战

我这边用docker来做自己的测试环境,上我的docker-compose.yaml

version: '2.2'
services:
  etcd:
    image: gcr.io/etcd-development/etcd:v3.4.13
    container_name: etcd
    restart: always
    ports:
      - 2379:2379
      - 2380:2380
    command:
      - "/usr/local/bin/etcd"
      - "--name"
      - "s1"
      - "--data-dir"
      - "/etcd-data"
      - "--advertise-client-urls"
      - "http://0.0.0.0:2379"
      - "--listen-client-urls"
      - "http://0.0.0.0:2379"
      - "--initial-advertise-peer-urls"
      - "http://0.0.0.0:2380"
      - "--listen-peer-urls"
      - "http://0.0.0.0:2380"
      - "--initial-cluster-token"
      - "tkn"
      - "--initial-cluster"
      - "s1=http://0.0.0.0:2380"
      - "--initial-cluster-state"
      - "new"

如果想通过其他途径安装可以看官方的说明:

安装etcd

那么,既然是存储,我们就来测试一下CRUD吧,还有etcd的租约功能。

CRUD:

# etcdctl put test/key hello
OK
# etcdctl get test/key
test/key
hello
# etcdctl put test/key goodbye
OK
# etcdctl get test/key
test/key
goodbye
# etcdctl del test/key
1
# etcdctl get test/key

租约:

创建租约,120s过期

# etcdctl lease grant 120
lease 3f3575c45fa5ff26 granted with TTL(120s)

查看租约列表

# etcdctl lease list
found 1 leases
3f3575c45fa5ff26

新建kv,并绑定租约

# etcdctl put test/key hello --lease="3f3575c45fa5ff26"
OK

查看租约下的key剩余时间

# etcdctl lease timetolive 3f3575c45fa5ff26 --keys
lease 3f3575c45fa5ff26 granted with TTL(120s), remaining(46s), attached keys([test/key])

查看还存在的key

# etcdctl get --prefix ""
test/key
hello

等租约过期后,查看key,key已被自动删除

# etcdctl lease timetolive 3f3575c45fa5ff26 --keys
lease 3f3575c45fa5ff26 already expired
# etcdctl get --prefix ""

租约续约:

同样创建租约,绑定kv

# etcdctl lease grant 30
lease 3f3575c45fa5ff2c granted with TTL(30s)
# etcdctl put test/key hello --lease="3f3575c45fa5ff2c"
OK

续约

# etcdctl lease keep-alive 3f3575c45fa5ff2c
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)

打开个新窗口查看租约与key

# etcdctl lease timetolive 3f3575c45fa5ff2c --keys
lease 3f3575c45fa5ff2c granted with TTL(30s), remaining(23s), attached keys([test/key])
# etcdctl get --prefix ""
test/key
hello

发现并没有过期。

golang+grpc+etcd 服务发现终极实战!

先上github仓库:github仓库

代码目录/go_server/src/lib/discovery/

说一下整个流程:

服务端向etcd注册服务,就是将本服务的信息写进etcd。

客户端大体流程:

  1. 从etcd取服务端地址列表,并watch列表变化,并更新。
  2. 把地址列表写进grpc resolver的resolver.ClientConn的地址列表中。
  3. grpc创建连接,根据负载均衡请求。

整个模块分为7个文件:

  • config.go,配置文件。
  • discovery.go,用于初始化。
  • register.go,用于服务注册。
  • resolver.go,用于解析etcd里注册的服务地址,以及grpc负载均衡。
  • util.go,公共方法。
  • wrapper.go,对外部提供的调用封装。
  • ctx.go,context,设置超时时间。

config.go

package config

import "time"

// etcd
const (
    Timeout        = 15 * time.Second
    Expires        = 10
    TickerInterval = 5
    // scheme
    Scheme = "etcd"
    // etcd中存储key的格式前缀:/scheme/authority/endpoint
    DirFormat = "/%s/%s/%s/"
    // grpc resolver中自定义解析需要提供的格式:scheme://authority/endpoint
    // 其中scheme可以理解为解析策略,authority可以理解为权限管理,endpoint为地址
    TargetFormat = "%s://%s/%s"
)

// server name
const (
    GreetServer = "greet_server"
)

discovery.go

package discovery

import (
    "fmt"
    "go_server/src/lib/discovery/config"
    "go_server/src/lib/logger"
    "strings"

    "go.etcd.io/etcd/clientv3"
)

var (
    client *clientv3.Client
)

// Init 初始化etcd
func Init(etcdAddr string) error {
    var err error
    if client == nil {
        //构建etcd client
        client, err = clientv3.New(clientv3.Config{
            Endpoints:   strings.Split(etcdAddr, ";"),
            DialTimeout: config.Timeout,
        })
        if err != nil {
            logger.Error("连接etcd失败:%s\n", err)
            fmt.Printf("连接etcd失败:%s\n", err)
            return err
        }
    }
    return nil
}

register.go

package discovery

import (
    "context"
    "errors"
    "fmt"
    "go_server/src/lib/discovery/config"
    "os"
    "os/signal"
    "syscall"
    "time"

    "go.etcd.io/etcd/clientv3"
)

//Service 服务端用于服务注册的对象
type Service struct {
    Name string //服务名称
    Host string //{ip}:{port}
    Env  string //所属环境

    Key string //保存在etcd中的key
}

var service *Service

func (s *Service) register() error {
    if s.Env == "" {
        return errors.New("env is null")
    }
    s.Key = fmt.Sprintf(config.DirFormat, config.Scheme, s.Env, s.Name) + s.Host
    ticker := time.NewTicker(time.Second * time.Duration(config.TickerInterval))
    go func() {
        for {
            resp, err := client.Get(context.Background(), s.Key)
            if err != nil {
                fmt.Printf("获取服务地址失败:%s", err)
            } else if resp.Count == 0 { //尚未注册
                err = s.keepAlive()
                if err != nil {
                    fmt.Printf("保持连接失败:%s", err)
                }
            }
            <-ticker.C
        }
    }()
    return nil
}

// keepAlive 创建租约,绑定,并续期
func (s *Service) keepAlive() error {
    //创建租约
    leaseResp, err := client.Grant(context.Background(), config.Expires)
    if err != nil {
        fmt.Printf("创建租期失败:%s\n", err)
        return err
    }

    //将服务地址注册到etcd中
    _, err = client.Put(context.Background(), s.Key, s.Host, clientv3.WithLease(leaseResp.ID))
    if err != nil {
        fmt.Printf("注册服务失败:%s", err)
        return err
    }

    //租约续期
    ch, err := client.KeepAlive(context.Background(), leaseResp.ID)
    if err != nil {
        fmt.Printf("租约续期失败:%s\n", err)
        return err
    }

    //清空keepAlive返回的channel
    go func() {
        for {
            <-ch
        }
    }()
    return nil
}

//取消注册
func (s *Service) unRegister() {
    if client != nil {
        _, _ = client.Delete(context.Background(), s.Key)
    }
}

func WaitForClose() {
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
    sig := <-ch
    service.unRegister()
    if i, ok := sig.(syscall.Signal); ok {
        os.Exit(int(i))
    } else {
        os.Exit(0)
    }
}

resolver.go

package discovery

import (
    "context"
    "fmt"
    "go_server/src/lib/discovery/config"
    "strings"

    "go.etcd.io/etcd/clientv3"
    "google.golang.org/grpc"
    "google.golang.org/grpc/resolver"
)

//EtcdResolver解析器
type EtcdResolver struct {
    dir        string
    clientConn resolver.ClientConn
}

func Resolver(env string, name string) *grpc.ClientConn {
    //注册etcd解析器
    r := &EtcdResolver{}
    resolver.Register(r)
    target := fmt.Sprintf(config.TargetFormat, r.Scheme(), env, name)
    //客户端连接服务器(负载均衡:轮询) 会同步调用r.Build()
    dailOpts := []grpc.DialOption{
        grpc.WithBalancerName("round_robin"), // grpc内部提供的轮询负载均衡
        grpc.WithInsecure(),
        grpc.WithDefaultCallOptions(
            grpc.MaxCallRecvMsgSize(1024 * 1024 * 16),
        ),
    }
    conn, err := grpc.Dial(target, dailOpts...)
    if err != nil {
        fmt.Println("连接服务器失败:", err)
    }
    return conn
}

func (r *EtcdResolver) Scheme() string {
    return config.Scheme
}

//构建解析器 grpc.Dial()同步调用
func (r *EtcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
    r.clientConn = clientConn
    r.dir = fmt.Sprintf(config.DirFormat, target.Scheme, target.Authority, target.Endpoint)
    go r.watch()
    return r, nil
}

//监听etcd中某个key前缀的服务地址列表的变化
func (r *EtcdResolver) watch() {
    //初始化服务地址列表
    var addrList []resolver.Address

    resp, err := client.Get(context.Background(), r.dir, clientv3.WithPrefix())
    if err != nil {
        fmt.Println("获取服务地址列表失败:", err)
    } else {
        for i := range resp.Kvs {
            fmt.Println(strings.TrimPrefix(string(resp.Kvs[i].Key), r.dir))
            addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), r.dir)})
        }
    }

    r.clientConn.NewAddress(addrList)

    //监听服务地址列表的变化
    rch := client.Watch(context.Background(), r.dir, clientv3.WithPrefix())
    for n := range rch {
        for _, ev := range n.Events {
            addr := strings.TrimPrefix(string(ev.Kv.Key), r.dir)
            switch ev.Type {
            case clientv3.EventTypePut:
                if !exists(addrList, addr) {
                    addrList = append(addrList, resolver.Address{Addr: addr})
                    r.clientConn.NewAddress(addrList)
                }
            case clientv3.EventTypeDelete:
                if s, ok := remove(addrList, addr); ok {
                    addrList = s
                    r.clientConn.NewAddress(addrList)
                }
            }
        }
    }
}

func exists(l []resolver.Address, addr string) bool {
    for i := range l {
        if l[i].Addr == addr {
            return true
        }
    }
    return false
}

func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
    for i := range s {
        if s[i].Addr == addr {
            s[i] = s[len(s)-1]
            return s[:len(s)-1], true
        }
    }
    return nil, false
}

//Close ...
func (r *EtcdResolver) Close() {}

//ResolveNow ...
func (r *EtcdResolver) ResolveNow(_ resolver.ResolveNowOption) {}

util.go

package discovery

import (
    "fmt"
    "net"
)

// 获取本机ip地址
func getIntranetIP() (ip string) {
    if addrs, err := net.InterfaceAddrs(); err == nil {
        for _, address := range addrs {
            if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
                if ipnet.IP.To4() != nil {
                    ip = ipnet.IP.String()
                    break
                }
            }
        }
    }
    return
}

// 自动获取本机的ip以及端口号,ip:port格式
func getListener() (listener net.Listener, host string, err error) {
    host = "0.0.0.0:0"
    listener, err = net.Listen("tcp", host)
    if err == nil {
        addr := listener.Addr().String()
        _, portString, _ := net.SplitHostPort(addr)
        host = fmt.Sprintf("%s:%s", getIntranetIP(), portString)
    }
    return
}

wrapper.go

package discovery

import (
    "fmt"
    "go_server/src/lib/discovery/config"
    "go_server/src/lib/proto/greet"

    "google.golang.org/grpc"
)

func GreetRegister(env string, server greet.GreetServer) error {
    listener, host, err := getListener()
    if err != nil {
        fmt.Println("监听网络失败:", err)
        return err
    }
    fmt.Println("host:", host)
    srv := grpc.NewServer()
    go srv.Serve(listener)
    greet.RegisterGreetServer(srv, server)
    service = &Service{Name: config.GreetServer, Host: host, Env: env}
    err = service.register()
    if err != nil {
        fmt.Println(err)
        return err
    }
    return nil
}

func GreetResolve(env string) greet.GreetClient {
    return greet.NewGreetClient(Resolver(env, config.GreetServer))
}

ctx.go

package discovery

import (
    "context"
    "time"
)

// 1s超时
func Context1s() (ctx context.Context, cancel context.CancelFunc) {
    return context.WithTimeout(context.TODO(), time.Second)
}

测试一下吧,测试文件也都在github仓库里:

搞个测试的proto,server和client,也直接上代码:

greet.proto

syntax = "proto3";


option go_package = "src/lib/proto/greet";

service Greet {
  rpc Morning(GreetRequest)returns(GreetResponse){}
  rpc Night(GreetRequest)returns(GreetResponse){}
}

message GreetRequest {
  string name = 1;
}

message GreetResponse {
  string message = 1;
  string from = 2;
}

server main.go

package main

import (
    "context"
    "flag"
    "fmt"
    "go_server/src/lib/discovery"
    proto "go_server/src/lib/proto/greet"
)

var (
    Flag     = flag.String("flag", "a", "flag")
    EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address")
    Env      = flag.String("Env", "test", "env")
)

//rpc服务接口
type GreetServer struct{}

func (gs *GreetServer) Morning(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
    fmt.Printf("Morning 调用: %s\n", req.Name)
    return &proto.GreetResponse{
        Message: "Good morning, " + req.Name,
        From:    *Flag,
    }, nil
}

func (gs *GreetServer) Night(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
    fmt.Printf("Night 调用: %s\n", req.Name)
    return &proto.GreetResponse{
        Message: "Good night, " + req.Name,
        From:    *Flag,
    }, nil
}

func main() {
    flag.Parse()
    err := discovery.Init(*EtcdAddr)
    if err != nil {
        fmt.Println(err)
        return
    }
    err = discovery.GreetRegister(*Env, &GreetServer{})
    if err != nil {
        fmt.Println(err)
        return
    }
    discovery.WaitForClose()
}

client main.go

package main

import (
    "flag"
    "fmt"
    "go_server/src/lib/discovery"
    proto "go_server/src/lib/proto/greet"
    "time"
)

var (
    EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address")
    Env      = flag.String("Env", "test", "env")
)

func main() {
    flag.Parse()
    err := discovery.Init(*EtcdAddr)
    if err != nil {
        fmt.Println(err)
        return
    }
    c := discovery.GreetResolve(*Env)
    ticker := time.NewTicker(1 * time.Second)
    for range ticker.C {
        fmt.Println("Morning 调用...")
        ctx, cancel := discovery.Context1s()
        resp1, err := c.Morning(
            ctx,
            &proto.GreetRequest{Name: "Jinfeng"},
        )
        cancel()
        if err != nil {
            fmt.Println("Morning调用失败:", err)
            return
        }
        fmt.Printf("Morning 响应:%s,来自:%s\n", resp1.Message, resp1.From)

        fmt.Println("Night 调用...")
        ctx, cancel = discovery.Context1s()
        resp2, err := c.Night(
            ctx,
            &proto.GreetRequest{Name: "Jinfeng"},
        )
        cancel()
        if err != nil {
            fmt.Println("Night调用失败:", err)
            return
        }
        fmt.Printf("Night 响应:%s,来自:%s\n", resp2.Message, resp2.From)
    }
}

跑起来吧,起3个server,可以看到,在etcd已经注册了3台服务。

# etcdctl get --prefix ""
/etcd/test/greet_server/192.168.31.71:52963
192.168.31.71:52963
/etcd/test/greet_server/192.168.31.71:52969
192.168.31.71:52969
/etcd/test/greet_server/192.168.31.71:52973
192.168.31.71:52973

client调用

➜  client git:(main) ✗ go run .
192.168.31.71:52963
192.168.31.71:52969
192.168.31.71:52973
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:c
Night 调用...
Night 响应:Good night, Jinfeng,来自:a
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:b
Night 调用...
Night 响应:Good night, Jinfeng,来自:c
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:c

shutdown一台服务

Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b

重新启动

Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:c
Night 调用...
Night 响应:Good night, Jinfeng,来自:a
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:b
Night 调用...
Night 响应:Good night, Jinfeng,来自:c
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b

这一轮,只是用grpc内部简单的轮训来做负载均衡,后面有空了,再加入一致性哈希等方法吧!

到现在,服务发现已经有了,下面就可以先做一个简单的推荐系统,把流程跑起来了!

后面计划先做一个只有简单召回的推荐系统,然后再慢慢优化整套系统。

兄弟们,奥利给!


原文链接


有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:金小锋

查看原文:「推荐系统从0到1」服务发现

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:701969077

297 次点击  ∙  1 赞  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传