前沿
首先谈谈我对推荐系统的引擎和算法的理解。
现在市面上讲起推荐系统,大多都是讲各种算法,讲的天花乱坠,高深莫测,其实很多算法都是大同小异,核心思想是差不多的,只不过实现手段略有差异。而在工业上,各种复杂算法能够落地的,我认为不多,大部分的厂商,运用的算法都是很集中的那一部分算法。
一套好的推荐系统,对于引擎是非常依赖的,实验显示,响应时长与各项指标之间都是有直接关联的,响应时长越长,指标越低。
作为一个朴实的推荐码农,我还是想从基础做起,朴朴实实,脚踏实地,先把引擎部分做好。当然,算法后面也会有,毕竟引擎和算法缺一不可。
那么,废话少说,推荐引擎,搞起吧!
服务发现
既然是搞引擎,也就是后端,当然要先把架构先搭建起来。
后端服务,微服务已经成为了当前的主流,具有非常多的优点,比如高内聚,单独部署,各自负载均衡等,当然缺点也有,通信更复杂了等。具体就不在这里展开了,有兴趣的兄弟们可以百度,google一下。
而微服务之间的通信中,客户端如何确定服务端的地址,就需要服务发现了。
在整个流程中,可以分为服务端的要做的,以及客户端要做的,下面依次来看一下。
服务端比较简单,只需要将自己的信息存储到某个存储中。
客户端呢,首先要从存储中拿到服务端信息列表,然后根据一些负载均衡的原则,选择一个地址,最终来调用。
是不是原理上非常简单!那么进入实操吧!
etcd介绍
以前有zookeeper,而zookeeper可以看到,早就不再维护更新了。
而etcd,用go语言开发,因kubernetes而闻名,在kubernetes中,使用etcd作为分布式存储获取分布式锁。
所以我们当然用更年轻,更轻量,并且也非常稳定的etcd搞了!就是这么喜新厌旧= =
etcd使用raft算法实现的一致性,至于raft算法,可以看下面这个动画演示,很完美生动。
etcd实战
我这边用docker来做自己的测试环境,上我的docker-compose.yaml
version: '2.2'
services:
etcd:
image: gcr.io/etcd-development/etcd:v3.4.13
container_name: etcd
restart: always
ports:
- 2379:2379
- 2380:2380
command:
- "/usr/local/bin/etcd"
- "--name"
- "s1"
- "--data-dir"
- "/etcd-data"
- "--advertise-client-urls"
- "http://0.0.0.0:2379"
- "--listen-client-urls"
- "http://0.0.0.0:2379"
- "--initial-advertise-peer-urls"
- "http://0.0.0.0:2380"
- "--listen-peer-urls"
- "http://0.0.0.0:2380"
- "--initial-cluster-token"
- "tkn"
- "--initial-cluster"
- "s1=http://0.0.0.0:2380"
- "--initial-cluster-state"
- "new"
如果想通过其他途径安装可以看官方的说明:
那么,既然是存储,我们就来测试一下CRUD吧,还有etcd的租约功能。
CRUD:
# etcdctl put test/key hello
OK
# etcdctl get test/key
test/key
hello
# etcdctl put test/key goodbye
OK
# etcdctl get test/key
test/key
goodbye
# etcdctl del test/key
1
# etcdctl get test/key
租约:
创建租约,120s过期
# etcdctl lease grant 120
lease 3f3575c45fa5ff26 granted with TTL(120s)
查看租约列表
# etcdctl lease list
found 1 leases
3f3575c45fa5ff26
新建kv,并绑定租约
# etcdctl put test/key hello --lease="3f3575c45fa5ff26"
OK
查看租约下的key剩余时间
# etcdctl lease timetolive 3f3575c45fa5ff26 --keys
lease 3f3575c45fa5ff26 granted with TTL(120s), remaining(46s), attached keys([test/key])
查看还存在的key
# etcdctl get --prefix ""
test/key
hello
等租约过期后,查看key,key已被自动删除
# etcdctl lease timetolive 3f3575c45fa5ff26 --keys
lease 3f3575c45fa5ff26 already expired
# etcdctl get --prefix ""
租约续约:
同样创建租约,绑定kv
# etcdctl lease grant 30
lease 3f3575c45fa5ff2c granted with TTL(30s)
# etcdctl put test/key hello --lease="3f3575c45fa5ff2c"
OK
续约
# etcdctl lease keep-alive 3f3575c45fa5ff2c
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)
lease 3f3575c45fa5ff2c keepalived with TTL(30)
打开个新窗口查看租约与key
# etcdctl lease timetolive 3f3575c45fa5ff2c --keys
lease 3f3575c45fa5ff2c granted with TTL(30s), remaining(23s), attached keys([test/key])
# etcdctl get --prefix ""
test/key
hello
发现并没有过期。
golang+grpc+etcd 服务发现终极实战!
先上github仓库:github仓库
代码目录/go_server/src/lib/discovery/
说一下整个流程:
服务端向etcd注册服务,就是将本服务的信息写进etcd。
客户端大体流程:
- 从etcd取服务端地址列表,并watch列表变化,并更新。
- 把地址列表写进grpc resolver的resolver.ClientConn的地址列表中。
- grpc创建连接,根据负载均衡请求。
整个模块分为7个文件:
- config.go,配置文件。
- discovery.go,用于初始化。
- register.go,用于服务注册。
- resolver.go,用于解析etcd里注册的服务地址,以及grpc负载均衡。
- util.go,公共方法。
- wrapper.go,对外部提供的调用封装。
- ctx.go,context,设置超时时间。
config.go
package config
import "time"
// etcd
const (
Timeout = 15 * time.Second
Expires = 10
TickerInterval = 5
// scheme
Scheme = "etcd"
// etcd中存储key的格式前缀:/scheme/authority/endpoint
DirFormat = "/%s/%s/%s/"
// grpc resolver中自定义解析需要提供的格式:scheme://authority/endpoint
// 其中scheme可以理解为解析策略,authority可以理解为权限管理,endpoint为地址
TargetFormat = "%s://%s/%s"
)
// server name
const (
GreetServer = "greet_server"
)
discovery.go
package discovery
import (
"fmt"
"go_server/src/lib/discovery/config"
"go_server/src/lib/logger"
"strings"
"go.etcd.io/etcd/clientv3"
)
var (
client *clientv3.Client
)
// Init 初始化etcd
func Init(etcdAddr string) error {
var err error
if client == nil {
//构建etcd client
client, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(etcdAddr, ";"),
DialTimeout: config.Timeout,
})
if err != nil {
logger.Error("连接etcd失败:%s\n", err)
fmt.Printf("连接etcd失败:%s\n", err)
return err
}
}
return nil
}
register.go
package discovery
import (
"context"
"errors"
"fmt"
"go_server/src/lib/discovery/config"
"os"
"os/signal"
"syscall"
"time"
"go.etcd.io/etcd/clientv3"
)
//Service 服务端用于服务注册的对象
type Service struct {
Name string //服务名称
Host string //{ip}:{port}
Env string //所属环境
Key string //保存在etcd中的key
}
var service *Service
func (s *Service) register() error {
if s.Env == "" {
return errors.New("env is null")
}
s.Key = fmt.Sprintf(config.DirFormat, config.Scheme, s.Env, s.Name) + s.Host
ticker := time.NewTicker(time.Second * time.Duration(config.TickerInterval))
go func() {
for {
resp, err := client.Get(context.Background(), s.Key)
if err != nil {
fmt.Printf("获取服务地址失败:%s", err)
} else if resp.Count == 0 { //尚未注册
err = s.keepAlive()
if err != nil {
fmt.Printf("保持连接失败:%s", err)
}
}
<-ticker.C
}
}()
return nil
}
// keepAlive 创建租约,绑定,并续期
func (s *Service) keepAlive() error {
//创建租约
leaseResp, err := client.Grant(context.Background(), config.Expires)
if err != nil {
fmt.Printf("创建租期失败:%s\n", err)
return err
}
//将服务地址注册到etcd中
_, err = client.Put(context.Background(), s.Key, s.Host, clientv3.WithLease(leaseResp.ID))
if err != nil {
fmt.Printf("注册服务失败:%s", err)
return err
}
//租约续期
ch, err := client.KeepAlive(context.Background(), leaseResp.ID)
if err != nil {
fmt.Printf("租约续期失败:%s\n", err)
return err
}
//清空keepAlive返回的channel
go func() {
for {
<-ch
}
}()
return nil
}
//取消注册
func (s *Service) unRegister() {
if client != nil {
_, _ = client.Delete(context.Background(), s.Key)
}
}
func WaitForClose() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
sig := <-ch
service.unRegister()
if i, ok := sig.(syscall.Signal); ok {
os.Exit(int(i))
} else {
os.Exit(0)
}
}
resolver.go
package discovery
import (
"context"
"fmt"
"go_server/src/lib/discovery/config"
"strings"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
"google.golang.org/grpc/resolver"
)
//EtcdResolver解析器
type EtcdResolver struct {
dir string
clientConn resolver.ClientConn
}
func Resolver(env string, name string) *grpc.ClientConn {
//注册etcd解析器
r := &EtcdResolver{}
resolver.Register(r)
target := fmt.Sprintf(config.TargetFormat, r.Scheme(), env, name)
//客户端连接服务器(负载均衡:轮询) 会同步调用r.Build()
dailOpts := []grpc.DialOption{
grpc.WithBalancerName("round_robin"), // grpc内部提供的轮询负载均衡
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(1024 * 1024 * 16),
),
}
conn, err := grpc.Dial(target, dailOpts...)
if err != nil {
fmt.Println("连接服务器失败:", err)
}
return conn
}
func (r *EtcdResolver) Scheme() string {
return config.Scheme
}
//构建解析器 grpc.Dial()同步调用
func (r *EtcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
r.clientConn = clientConn
r.dir = fmt.Sprintf(config.DirFormat, target.Scheme, target.Authority, target.Endpoint)
go r.watch()
return r, nil
}
//监听etcd中某个key前缀的服务地址列表的变化
func (r *EtcdResolver) watch() {
//初始化服务地址列表
var addrList []resolver.Address
resp, err := client.Get(context.Background(), r.dir, clientv3.WithPrefix())
if err != nil {
fmt.Println("获取服务地址列表失败:", err)
} else {
for i := range resp.Kvs {
fmt.Println(strings.TrimPrefix(string(resp.Kvs[i].Key), r.dir))
addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), r.dir)})
}
}
r.clientConn.NewAddress(addrList)
//监听服务地址列表的变化
rch := client.Watch(context.Background(), r.dir, clientv3.WithPrefix())
for n := range rch {
for _, ev := range n.Events {
addr := strings.TrimPrefix(string(ev.Kv.Key), r.dir)
switch ev.Type {
case clientv3.EventTypePut:
if !exists(addrList, addr) {
addrList = append(addrList, resolver.Address{Addr: addr})
r.clientConn.NewAddress(addrList)
}
case clientv3.EventTypeDelete:
if s, ok := remove(addrList, addr); ok {
addrList = s
r.clientConn.NewAddress(addrList)
}
}
}
}
}
func exists(l []resolver.Address, addr string) bool {
for i := range l {
if l[i].Addr == addr {
return true
}
}
return false
}
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
for i := range s {
if s[i].Addr == addr {
s[i] = s[len(s)-1]
return s[:len(s)-1], true
}
}
return nil, false
}
//Close ...
func (r *EtcdResolver) Close() {}
//ResolveNow ...
func (r *EtcdResolver) ResolveNow(_ resolver.ResolveNowOption) {}
util.go
package discovery
import (
"fmt"
"net"
)
// 获取本机ip地址
func getIntranetIP() (ip string) {
if addrs, err := net.InterfaceAddrs(); err == nil {
for _, address := range addrs {
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
ip = ipnet.IP.String()
break
}
}
}
}
return
}
// 自动获取本机的ip以及端口号,ip:port格式
func getListener() (listener net.Listener, host string, err error) {
host = "0.0.0.0:0"
listener, err = net.Listen("tcp", host)
if err == nil {
addr := listener.Addr().String()
_, portString, _ := net.SplitHostPort(addr)
host = fmt.Sprintf("%s:%s", getIntranetIP(), portString)
}
return
}
wrapper.go
package discovery
import (
"fmt"
"go_server/src/lib/discovery/config"
"go_server/src/lib/proto/greet"
"google.golang.org/grpc"
)
func GreetRegister(env string, server greet.GreetServer) error {
listener, host, err := getListener()
if err != nil {
fmt.Println("监听网络失败:", err)
return err
}
fmt.Println("host:", host)
srv := grpc.NewServer()
go srv.Serve(listener)
greet.RegisterGreetServer(srv, server)
service = &Service{Name: config.GreetServer, Host: host, Env: env}
err = service.register()
if err != nil {
fmt.Println(err)
return err
}
return nil
}
func GreetResolve(env string) greet.GreetClient {
return greet.NewGreetClient(Resolver(env, config.GreetServer))
}
ctx.go
package discovery
import (
"context"
"time"
)
// 1s超时
func Context1s() (ctx context.Context, cancel context.CancelFunc) {
return context.WithTimeout(context.TODO(), time.Second)
}
测试一下吧,测试文件也都在github仓库里:
搞个测试的proto,server和client,也直接上代码:
greet.proto
syntax = "proto3";
option go_package = "src/lib/proto/greet";
service Greet {
rpc Morning(GreetRequest)returns(GreetResponse){}
rpc Night(GreetRequest)returns(GreetResponse){}
}
message GreetRequest {
string name = 1;
}
message GreetResponse {
string message = 1;
string from = 2;
}
server main.go
package main
import (
"context"
"flag"
"fmt"
"go_server/src/lib/discovery"
proto "go_server/src/lib/proto/greet"
)
var (
Flag = flag.String("flag", "a", "flag")
EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address")
Env = flag.String("Env", "test", "env")
)
//rpc服务接口
type GreetServer struct{}
func (gs *GreetServer) Morning(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
fmt.Printf("Morning 调用: %s\n", req.Name)
return &proto.GreetResponse{
Message: "Good morning, " + req.Name,
From: *Flag,
}, nil
}
func (gs *GreetServer) Night(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
fmt.Printf("Night 调用: %s\n", req.Name)
return &proto.GreetResponse{
Message: "Good night, " + req.Name,
From: *Flag,
}, nil
}
func main() {
flag.Parse()
err := discovery.Init(*EtcdAddr)
if err != nil {
fmt.Println(err)
return
}
err = discovery.GreetRegister(*Env, &GreetServer{})
if err != nil {
fmt.Println(err)
return
}
discovery.WaitForClose()
}
client main.go
package main
import (
"flag"
"fmt"
"go_server/src/lib/discovery"
proto "go_server/src/lib/proto/greet"
"time"
)
var (
EtcdAddr = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address")
Env = flag.String("Env", "test", "env")
)
func main() {
flag.Parse()
err := discovery.Init(*EtcdAddr)
if err != nil {
fmt.Println(err)
return
}
c := discovery.GreetResolve(*Env)
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
fmt.Println("Morning 调用...")
ctx, cancel := discovery.Context1s()
resp1, err := c.Morning(
ctx,
&proto.GreetRequest{Name: "Jinfeng"},
)
cancel()
if err != nil {
fmt.Println("Morning调用失败:", err)
return
}
fmt.Printf("Morning 响应:%s,来自:%s\n", resp1.Message, resp1.From)
fmt.Println("Night 调用...")
ctx, cancel = discovery.Context1s()
resp2, err := c.Night(
ctx,
&proto.GreetRequest{Name: "Jinfeng"},
)
cancel()
if err != nil {
fmt.Println("Night调用失败:", err)
return
}
fmt.Printf("Night 响应:%s,来自:%s\n", resp2.Message, resp2.From)
}
}
跑起来吧,起3个server,可以看到,在etcd已经注册了3台服务。
# etcdctl get --prefix ""
/etcd/test/greet_server/192.168.31.71:52963
192.168.31.71:52963
/etcd/test/greet_server/192.168.31.71:52969
192.168.31.71:52969
/etcd/test/greet_server/192.168.31.71:52973
192.168.31.71:52973
client调用
➜ client git:(main) ✗ go run .
192.168.31.71:52963
192.168.31.71:52969
192.168.31.71:52973
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:c
Night 调用...
Night 响应:Good night, Jinfeng,来自:a
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:b
Night 调用...
Night 响应:Good night, Jinfeng,来自:c
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:c
shutdown一台服务
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
重新启动
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:c
Night 调用...
Night 响应:Good night, Jinfeng,来自:a
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:b
Night 调用...
Night 响应:Good night, Jinfeng,来自:c
Morning 调用...
Morning 响应:Good morning, Jinfeng,来自:a
Night 调用...
Night 响应:Good night, Jinfeng,来自:b
这一轮,只是用grpc内部简单的轮训来做负载均衡,后面有空了,再加入一致性哈希等方法吧!
到现在,服务发现已经有了,下面就可以先做一个简单的推荐系统,把流程跑起来了!
后面计划先做一个只有简单召回的推荐系统,然后再慢慢优化整套系统。
兄弟们,奥利给!
有疑问加站长微信联系(非本文作者)