1 http server端
package main import ( "log" "net/http" "time" ) func sayHello(w http.ResponseWriter,r *http.Request){ w.Write([]byte("hello world")) } var addr = "0.0.0.0:9090" func main() { // 创建路由器 mux := http.NewServeMux() // 注册路由 mux.HandleFunc("/test",sayHello) server := &http.Server{ Addr: addr, WriteTimeout: time.Second * 3, Handler: mux, } log.Println("Start to serve at : ",addr) //启动服务 if err := server.ListenAndServe();err !=nil{ log.Fatal("Failed to start http server,err:",err) } }
1.1 http server 源码分析
关键步骤:
注册路由
启动服务
连接处理
1.2 http server 源码走读
1.2.1 注册路由
// NewServeMux allocates and returns a new ServeMux. func NewServeMux() *ServeMux { return new(ServeMux) }
ServeMux 结构体
type ServeMux struct { mu sync.RWMutex m map[string]muxEntry es []muxEntry // slice of entries sorted from longest to shortest. hosts bool // whether any patterns contain hostnames }
muxEntry结构体
type muxEntry struct { h Handler pattern string }
Handler : 实现ServerHTTP接口的方法
type Handler interface { ServeHTTP(ResponseWriter, *Request) }
// 注册handler
// The HandlerFunc type is an adapter to allow the use of // ordinary functions as HTTP handlers. If f is a function // with the appropriate signature, HandlerFunc(f) is a // Handler that calls f. type HandlerFunc func(ResponseWriter, *Request) // HandleFunc registers the handler function for the given pattern. func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) { if handler == nil { panic("http: nil handler") } mux.Handle(pattern, HandlerFunc(handler)) }
路由注册具体实现
// Handle registers the handler for the given pattern. // If a handler already exists for pattern, Handle panics. func (mux *ServeMux) Handle(pattern string, handler Handler) { mux.mu.Lock() defer mux.mu.Unlock() // 检查pattern 、handler是否不为空,且pattern 是否已注册过 if pattern == "" { panic("http: invalid pattern") } if handler == nil { panic("http: nil handler") } if _, exist := mux.m[pattern]; exist { panic("http: multiple registrations for " + pattern) } // 初始化mux.m map[string]muxEntry if mux.m == nil { mux.m = make(map[string]muxEntry) } e := muxEntry{h: handler, pattern: pattern} // map key为pattern,value 为muxEntry mux.m[pattern] = e if pattern[len(pattern)-1] == '/' { mux.es = appendSorted(mux.es, e) } if pattern[0] != '/' { mux.hosts = true } }
因此,注册路由实际上就是构造一个map[pattern] = muxEntry
1.2.2 启动服务
// ListenAndServe listens on the TCP network address srv.Addr and then // calls Serve to handle requests on incoming connections. // Accepted connections are configured to enable TCP keep-alives. // // If srv.Addr is blank, ":http" is used. // // ListenAndServe always returns a non-nil error. After Shutdown or Close, // the returned error is ErrServerClosed. func (srv *Server) ListenAndServe() error { if srv.shuttingDown() { return ErrServerClosed } addr := srv.Addr if addr == "" { addr = ":http" } ln, err := net.Listen("tcp", addr) if err != nil { return err } return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)}) }
func (srv *Server) Serve(l net.Listener) error { ...... l = &onceCloseListener{Listener: l} defer l.Close() ...... var tempDelay time.Duration // how long to sleep on accept failure for { // 建立socket 拿到conn rw, e := l.Accept() if e != nil { // 接收关闭服务器信号 select { case <-srv.getDoneChan(): return ErrServerClosed default: } ...... } tempDelay = 0 // 包装一个conn结构体 c := srv.newConn(rw) c.setState(c.rwc, StateNew) // before Serve can return go c.serve(ctx) } }
conn 结构体
// A conn represents the server side of an HTTP connection. type conn struct { server *Server rwc net.Conn remoteAddr string r *connReader bufr *bufio.Reader bufw *bufio.Writer mu sync.Mutex }
1.2.3 处理连接
// ServeHTTP dispatches the request to the handler whose // pattern most closely matches the request URL. func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) { if r.RequestURI == "*" { if r.ProtoAtLeast(1, 1) { w.Header().Set("Connection", "close") } w.WriteHeader(StatusBadRequest) return } h, _ := mux.Handler(r) h.ServeHTTP(w, r) }
func (mux *ServeMux) Handler(r *Request) (h Handler, pattern string) { // CONNECT requests are not canonicalized. if r.Method == "CONNECT" { // If r.URL.Path is /tree and its handler is not registered, // the /tree -> /tree/ redirect applies to CONNECT requests // but the path canonicalization does not. if u, ok := mux.redirectToPathSlash(r.URL.Host, r.URL.Path, r.URL); ok { return RedirectHandler(u.String(), StatusMovedPermanently), u.Path } return mux.handler(r.Host, r.URL.Path) } // All other requests have any port stripped and path cleaned // before passing to mux.handler. host := stripHostPort(r.Host) path := cleanPath(r.URL.Path) // If the given path is /tree and its handler is not registered, // redirect for /tree/. if u, ok := mux.redirectToPathSlash(host, path, r.URL); ok { return RedirectHandler(u.String(), StatusMovedPermanently), u.Path } if path != r.URL.Path { _, pattern = mux.handler(host, path) url := *r.URL url.Path = path return RedirectHandler(url.String(), StatusMovedPermanently), pattern } return mux.handler(host, r.URL.Path) }
// handler is the main implementation of Handler. // The path is known to be in canonical form, except for CONNECT methods. func (mux *ServeMux) handler(host, path string) (h Handler, pattern string) { mux.mu.RLock() defer mux.mu.RUnlock() // Host-specific pattern takes precedence over generic ones if mux.hosts { h, pattern = mux.match(host + path) } if h == nil { h, pattern = mux.match(path) } if h == nil { h, pattern = NotFoundHandler(), "" } return }
match 第一步注册路由时构建的map
// Find a handler on a handler map given a path string. // Most-specific (longest) pattern wins. func (mux *ServeMux) match(path string) (h Handler, pattern string) { // Check for exact match first. v, ok := mux.m[path] if ok { return v.h, v.pattern } // Check for longest valid match. mux.es contains all patterns // that end in / sorted from longest to shortest. for _, e := range mux.es { if strings.HasPrefix(path, e.pattern) { return e.h, e.pattern } } return nil, "" }
主要逻辑如下:
mux.ServerHttp -> mux.Handler(r) -> mux.handler(host, r.URL.Path) -> mux.match()
获取到h Handler 之后一步步返回,最终调用h.ServeHTTP(w,r) ,也就相当于调用了第一步注册路由时指定的mux 中的ServeHTTP方法
2. http client端
package main import ( "fmt" "io/ioutil" "log" "net" "net/http" "time" ) func main() { // 创建连接池 transport := &http.Transport{ DialContext: (&net.Dialer{ Timeout: time.Second * 30, // 连接超时时间 KeepAlive: time.Second * 30, // 探活时间 }).DialContext, MaxIdleConns: 100, //最大空闲连接 IdleConnTimeout: time.Second * 90, //空闲超时时间 TLSHandshakeTimeout: time.Second * 10, //tls握手超时时间 ExpectContinueTimeout: time.Second * 1, //100-continue状态码超时时间 } // 创建客户端 client := &http.Client{ Transport:transport, Timeout: time.Second * 30, //请求超时时间 } var url = "http://127.0.0.1:9090/test" // 请求数据 resp,err := client.Get(url) defer resp.Body.Close() if err != nil { log.Fatalf("GET %s failed.",url) } // 读取并打印内容 data,err := ioutil.ReadAll(resp.Body) if err != nil { log.Fatal("Read content failed,err:",err) } fmt.Println("Data : ",string(data)) }
2.1 http client 源码分析
主要结构体
type Client struct { // Transport specifies the mechanism by which individual // HTTP requests are made. // If nil, DefaultTransport is used. Transport RoundTripper CheckRedirect func(req *Request, via []*Request) error Jar CookieJar // Timeout specifies a time limit for requests made by this // Client. The timeout includes connection time, any // redirects, and reading the response body. The timer remains // running after Get, Head, Post, or Do return and will // interrupt reading of the Response.Body. // // A Timeout of zero means no timeout. Timeout time.Duration }
// RoundTripper is an interface representing the ability to execute a // single HTTP transaction, obtaining the Response for a given Request. // // A RoundTripper must be safe for concurrent use by multiple // goroutines. type RoundTripper interface { // RoundTrip executes a single HTTP transaction, returning // a Response for the provided Request. RoundTrip(*Request) (*Response, error) }
type Request struct { Method string URL *url.URL Proto string // "HTTP/1.0" Header Header Body io.ReadCloser GetBody func() (io.ReadCloser, error) ContentLength int64 TransferEncoding []string Close bool Host string Form url.Values PostForm url.Values MultipartForm *multipart.Form RemoteAddr string RequestURI string TLS *tls.ConnectionState Cancel <-chan struct{} Response *Response ctx context.Context }
主要处理流程
2.2 http client 源码走读
2.2.1 client.Get
func (c *Client) Get(url string) (resp *Response, err error) { req, err := NewRequest("GET", url, nil) if err != nil { return nil, err } return c.Do(req) }
func (c *Client) Do(req *Request) (*Response, error) { return c.do(req) }
func (c *Client) do(req *Request) (retres *Response, reterr error) { ...... var ( deadline = c.deadline() reqs []*Request resp *Response copyHeaders = c.makeHeadersCopier(req) reqBodyClosed = false // have we closed the current req.Body? // Redirect behavior: redirectMethod string includeBody bool ) reqs = append(reqs, req) var err error var didTimeout func() bool if resp, didTimeout, err = c.send(req, deadline); err != nil { // c.send() always closes req.Body ...... } ...... }
// didTimeout is non-nil only if err != nil. func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) { ...... resp, didTimeout, err = send(req, c.transport(), deadline) ...... return resp, nil, nil }
// send issues an HTTP request. // Caller should close resp.Body when done reading from it. func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) { req := ireq // req is either the original request, or a modified fork ...... resp, err = rt.RoundTrip(req) ...... return resp, nil, nil }
2.2.2 连接处理roundTrip()
2.2.2.1 主要逻辑
// roundTrip implements a RoundTripper over HTTP. func (t *Transport) roundTrip(req *Request) (*Response, error) { ...... for { // Get the cached or newly-created connection to either the // host (for http or https), the http proxy, or the http proxy // pre-CONNECTed to https server. In any case, we'll be ready // to send it requests. pconn, err := t.getConn(treq, cm) ...... var resp *Response if pconn.alt != nil { // HTTP/2 path. t.decHostConnCount(cm.key()) // don't count cached http2 conns toward conns per host t.setReqCanceler(req, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { resp, err = pconn.roundTrip(treq) // 拿到持久化连接后调用该方法 } ...... } }
获取空闲连接
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) { // 获取空闲连接 if pc, idleSince := t.getIdleConn(cm); pc != nil { // set request canceler to some non-nil function so we // can detect whether it was cleared between now and when // we enter roundTrip t.setReqCanceler(req, func(error) {}) return pc, nil } ...... handlePendingDial := func() { testHookPrePendingDial() go func() { if v := <-dialc; v.err == nil { t.putOrCloseIdleConn(v.pc) } else { t.decHostConnCount(cmKey) } testHookPostPendingDial() }() } cancelc := make(chan error, 1) t.setReqCanceler(req, func(err error) { cancelc <- err }) if t.MaxConnsPerHost > 0 { select { case <-t.incHostConnCount(cmKey): // 确认每个主机是否有限制 case pc := <-t.getIdleConnCh(cm): // 等待释放的空闲连接 if trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()}) } return pc, nil case <-req.Cancel: // 监听取消事件 return nil, errRequestCanceledConn case <-req.Context().Done(): return nil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err } } // 如果获取不到空闲连接,异步创建连接 go func() { pc, err := t.dialConn(ctx, cm) dialc <- dialRes{pc, err} }() idleConnCh := t.getIdleConnCh(cm) select { // 新连接创建成功 case v := <-dialc: // Our dial finished. if v.pc != nil { if trace != nil && trace.GotConn != nil && v.pc.alt == nil { trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn}) } return v.pc, nil } // Our dial failed. See why to return a nicer error // value. t.decHostConnCount(cmKey) select { case <-req.Cancel: // It was an error due to cancelation, so prioritize that // error value. (Issue 16049) return nil, errRequestCanceledConn case <-req.Context().Done(): return nil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err default: // It wasn't an error due to cancelation, so // return the original error message: return nil, v.err } // 获取到空闲连接 case pc := <-idleConnCh: // Another request finished first and its net.Conn // became available before our dial. Or somebody // else's dial that they didn't use. // But our dial is still going, so give it away // when it finishes: handlePendingDial() if trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()}) } return pc, nil case <-req.Cancel: handlePendingDial() return nil, errRequestCanceledConn case <-req.Context().Done(): handlePendingDial() return nil, req.Context().Err() case err := <-cancelc: handlePendingDial() if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err } }
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) { pconn := &persistConn{ t: t, cacheKey: cm.key(), reqch: make(chan requestAndChan, 1), writech: make(chan writeRequest, 1), closech: make(chan struct{}), writeErrCh: make(chan error, 1), writeLoopDone: make(chan struct{}), } ...... pconn.br = bufio.NewReader(pconn) pconn.bw = bufio.NewWriter(persistConnWriter{pconn}) go pconn.readLoop() // 监听pc.reqch rc := <-pc.reqch go pconn.writeLoop() // 监听writech wr := <-pc.writech return pconn, nil }
func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn, idleSince time.Time) { key := cm.key() t.idleMu.Lock() defer t.idleMu.Unlock() for { pconns, ok := t.idleConn[key] if !ok { return nil, time.Time{} } // 只有一个pconn 直接返回 if len(pconns) == 1 { pconn = pconns[0] delete(t.idleConn, key) } else { // LRU 算法获取最后使用的一个pconn pconn = pconns[len(pconns)-1] t.idleConn[key] = pconns[:len(pconns)-1] } t.idleLRU.remove(pconn) ...... return pconn, pconn.idleAt } }
type persistConn struct { t *Transport cacheKey connectMethodKey conn net.Conn br *bufio.Reader // from conn bw *bufio.Writer // to conn reqch chan requestAndChan // 由 roundTrip写入; 由 readLoop 读取 writech chan writeRequest // 由 roundTrip写入;由 writeLoop 读取 mu sync.Mutex }
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { ...... // Write the request concurrently with waiting for a response, // in case the server decides to reply before reading our full // request body. startBytesWritten := pc.nwrite writeErrCh := make(chan error, 1) pc.writech <- writeRequest{req, writeErrCh, continueCh} //将请求写入到writech resc := make(chan responseAndError) pc.reqch <- requestAndChan{ // 写pc.reqch req: req.Request, ch: resc, addedGzip: requestedGzip, continueCh: continueCh, callerGone: gone, } var respHeaderTimer <-chan time.Time cancelChan := req.Request.Cancel ctxDoneChan := req.Context().Done() for { testHookWaitResLoop() select { case err := <-writeErrCh: ...... case <-pc.closech: ...... case <-respHeaderTimer: ...... case <-cancelChan: ...... case <-ctxDoneChan: ...... } } }
有疑问加站长微信联系(非本文作者)