首先,玩go-kit,他不是一个mvc框架,他的架构层次比较多一些。
大致上分为transport
, endpoint
, service
http-demo
import (
"context"
"encoding/json"
"fmt"
transport "github.com/go-kit/kit/transport/http"
"net/http"
)
type InfoDto struct {
Version string
}
func main() {
var endpoint = func(ctx context.Context, request interface{}) (response interface{}, err error) {
dto := request.(*InfoDto)
fmt.Println("version :", dto.Version)
response = map[string]interface{}{
"data": "ok",
}
err = nil
return
}
// transport 需要将 service+编解码 柔和起来
hand := transport.NewServer(endpoint, func(i context.Context, req *http.Request) (request interface{}, err error) {
request = &InfoDto{}
err = json.NewDecoder(req.Body).Decode(&request)
return
}, func(i context.Context, writer http.ResponseWriter, response interface{}) (err error) {
err = json.NewEncoder(writer).Encode(response)
return
})
http.Handle("/", hand)
http.ListenAndServe(":8888", nil)
}
复制代码
简单的来看,其实他就是将 transport 抽象了一下,其实做的事controller的事情,endpoint可以理解为是service。
定义middlerware
中间件其实是修饰的 endpoint ,类似于Spring的interceptor
// 限流
type Limier interface {
Allow() bool
}
type defaultLimiter struct {
}
func (*defaultLimiter) Allow() bool {
return rand.Intn(3) == 1
}
// 添加限流
func addLimier(limier Limier) endpoint.Middleware {
return func(next endpoint.Endpoint) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
if limier.Allow() {
return next(ctx, request)
}
return endpoint.Nop(ctx, request)
}
}
}
// 使用
end = addLimier(&defaultLimiter{})(end)
复制代码
定义前置、后置器等
类似于Java的Servlet的Filter,但是不具备拦截功能
// option 的方法,很好的解决go的重载
option := transport.ServerBefore(func(ctx context.Context, request *http.Request) context.Context {
fmt.Println("http before")
return ctx
})
复制代码
然后看看zipkin的组合实现 ,其实就是在前后结束实现了一个拦截,但是真的符合我们的要求吗。显然不符合。
func HTTPServerTrace(tracer *zipkin.Tracer, options ...TracerOption) kithttp.ServerOption {
serverBefore := kithttp.ServerBefore(
)
serverAfter := kithttp.ServerAfter(
)
serverFinalizer := kithttp.ServerFinalizer(
)
return func(s *kithttp.Server) {
serverBefore(s)
serverAfter(s)
serverFinalizer(s)
}
}
复制代码
go-kit http整体设计
结构
type Server struct {
e endpoint.Endpoint //service
dec DecodeRequestFunc // 编解码
enc EncodeResponseFunc
before []RequestFunc //前置处理器
after []ServerResponseFunc //后置处理器
errorEncoder ErrorEncoder// error处理器
finalizer []ServerFinalizerFunc
errorHandler transport.ErrorHandler// error处理器
}
复制代码
处理逻辑
// ServeHTTP implements http.Handler.
func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// defer
if len(s.finalizer) > 0 {
iw := &interceptingWriter{w, http.StatusOK, 0}
defer func() {
ctx = context.WithValue(ctx, ContextKeyResponseHeaders, iw.Header())
ctx = context.WithValue(ctx, ContextKeyResponseSize, iw.written)
for _, f := range s.finalizer {
f(ctx, iw.code, r)
}
}()
w = iw
}
// before
for _, f := range s.before {
ctx = f(ctx, r)
}
// 解码
request, err := s.dec(ctx, r)
if err != nil {
// 异常处理器
s.errorHandler.Handle(ctx, err)
s.errorEncoder(ctx, err, w)
return
}
// 处理器,异常处理器
response, err := s.e(ctx, request)
if err != nil {
s.errorHandler.Handle(ctx, err)
s.errorEncoder(ctx, err, w)
return
}
// 后置处理器
for _, f := range s.after {
ctx = f(ctx, w)
}
// 解码
if err := s.enc(ctx, w, response); err != nil {
s.errorHandler.Handle(ctx, err)
s.errorEncoder(ctx, err, w)
return
}
}
复制代码
基本流程
go-kit 整合grpc
首先整合过程很麻烦,它基本是基于service实现的,
syntax = "proto3";
package grpc_demo;
service Add {
rpc Sum (SumRequest) returns (SumReply) {
}
}
message SumRequest {
int64 a = 1;
int64 b = 2;
}
message SumReply {
int64 v = 1;
string err = 2;
}
复制代码
脚本
#!/usr/bin/env sh
# Install proto3 from source
# brew install autoconf automake libtool
# git clone https://github.com/google/protobuf
# ./autogen.sh ; ./configure ; make ; make install
#
# Update protoc Go bindings via
# go get -u github.com/golang/protobuf/{proto,protoc-gen-go}
#
# See also
# https://github.com/grpc/grpc-go/tree/master/examples
protoc addsvc.proto --go_out=plugins=grpc:.
复制代码
程序
package grpc_demo
import (
"context"
"fmt"
grpctransport "github.com/go-kit/kit/transport/grpc"
)
// 实现sum方法
type grpcServer struct {
sum grpctransport.Handler
concat grpctransport.Handler
}
//生成器
func NewGRPCServer(service endpoint.Endpoint) AddServer {
return &grpcServer{
sum: grpctransport.NewServer(
service,
decodeGRPCSumRequest,
encodeGRPCSumResponse,
),
}
}
//代码生成器生成
func (s *grpcServer) Sum(ctx context.Context, req *SumRequest) (*SumReply, error) {
_, rep, err := s.sum.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
return rep.(*SumReply), nil
}
// 代码生成器生成
func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req, _ := grpcReq.(*SumRequest)
return &SumRequest{A: int64(req.A), B: int64(req.B)}, nil
}
// 代码生成器
func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) {
resp, _ := response.(*SumReply)
return resp, nil
}
复制代码
以上代码均可以使用代码生成器生成
一下,是主方法
func Main() {
grpcListener, err := net.Listen("tcp", ":8888")
if err != nil {
panic(err)
}
g := grpc.NewServer()
RegisterAddServer(g, NewGRPCServer(func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(*SumRequest)
return &SumReply{V: req.B + req.A, Err: ""}, nil
}))
err = g.Serve(grpcListener)
if err != nil {
panic(err)
}
}
复制代码
客户端方法
func NewClient() {
// 1. 创建一个连接
conn, err := grpc.Dial(":8888", grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// 2. 然后创建客户端
client := NewAddClient(conn)
// 3. rpc调用
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
response, err := client.Sum(ctx, &SumRequest{A: 1, B: 2})
if err != nil {
fmt.Printf("超时 , 原因 : %s\n", err)
} else {
fmt.Printf("结果 : %v\n", response.V)
}
}
复制代码
这里我并没有get 到 go-kit的好处,发现很多没有用,也就是编解码器,根本没有用处,我觉得他的源码中,是不是可以加入一个如果他为空,可以跳过呢。
其实性能压测来说,grpc本身的效率只和 http快2-3倍,并没有想象中那么快。
grpc真的那么快吗
我自己测试,本地测试,grpc并没有http快,可能是go的http的transport比较好,复用连接,不需要重复建立,但是可以看到http在大多数场景下还是可以起到很大作用的。
我感觉dubbo会很快。
import (
"bytes"
"encoding/json"
"fmt"
"go-kit/demo"
"io"
"io/ioutil"
"net/http"
"runtime"
"time"
)
func main() {
server()
runtime.Gosched()
client()
}
func server() {
http.HandleFunc("/add", func(writer http.ResponseWriter, request *http.Request) {
req := demo.AddReqeust{}
err := json.NewDecoder(request.Body).Decode(&req)
if err != nil {
panic(err)
}
defer request.Body.Close()
_ = json.NewEncoder(writer).Encode(map[string]interface{}{
"result": req.A + req.B,
})
})
go http.ListenAndServe(":8888", nil)
}
func client() {
now := time.Now()
count := 10000
for x := 0; x < count; x++ {
request()
}
fmt.Println(time.Now().Sub(now).Milliseconds())
}
func request() {
reader, err := addJsonRequestParams(&demo.AddReqeust{
A: 1,
B: 2,
})
if err != nil {
return
}
resp, err := http.Post("http://127.0.0.1:8888/add", "application/json", reader)
if err != nil {
return
}
defer resp.Body.Close()
all, err := ioutil.ReadAll(resp.Body)
fmt.Printf("%s", all)
}
func addJsonRequestParams(params interface{}) (io.Reader, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(params); err != nil {
return nil, err
}
return ioutil.NopCloser(&buf), nil
}
复制代码
grpc测试
import (
"context"
"fmt"
"go-kit/grep_demo"
"google.golang.org/grpc"
"log"
"net"
"runtime"
"time"
)
type demos struct {
}
func (*demos) Sum(ctx context.Context, req *grep_demo.SumRequest) (*grep_demo.SumReply, error) {
return &grep_demo.SumReply{V: req.A + req.B, Err: ""}, nil
}
func main() {
rpcserver()
runtime.Gosched()
rpcclient()
}
func rpcserver() {
listener, err := net.Listen("tcp", ":8888")
if err != nil {
return
}
server := grpc.NewServer()
dd := demos{}
grep_demo.RegisterAddServer(server, &dd)
go server.Serve(listener)
}
func rpcclient() {
// 1. 创建一个连接
conn, err := grpc.Dial(":8888", grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
return
}
defer conn.Close()
// 2. 然后创建客户端
client := grep_demo.NewAddClient(conn)
now := time.Now()
for x := 0; x < 10000; x++ {
func() {
// 3. rpc调用
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
response, err := client.Sum(ctx, &grep_demo.SumRequest{A: 1, B: 2})
if err != nil {
fmt.Printf("err=%s\n", err)
} else {
fmt.Printf("{\"result\":%d}\n", response.V)
}
}()
}
fmt.Println(time.Now().Sub(now).Milliseconds())
}
复制代码
有疑问加站长微信联系(非本文作者)