From 73ef85bc5db590c22689e11be20737a3dd88168f Mon Sep 17 00:00:00 2001 From: Niall Sheridan Date: Wed, 28 Dec 2016 21:18:36 +0000 Subject: Update dependencies --- .../grpc/transport/http2_server.go | 87 ++++++++++++++++++---- 1 file changed, 72 insertions(+), 15 deletions(-) (limited to 'vendor/google.golang.org/grpc/transport/http2_server.go') diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go index a62fb7c..316188e 100644 --- a/vendor/google.golang.org/grpc/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/transport/http2_server.go @@ -50,6 +50,8 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" + "google.golang.org/grpc/stats" + "google.golang.org/grpc/tap" ) // ErrIllegalHeaderWrite indicates that setting header is illegal because of @@ -58,9 +60,13 @@ var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHe // http2Server implements the ServerTransport interface with HTTP2. type http2Server struct { + ctx context.Context conn net.Conn + remoteAddr net.Addr + localAddr net.Addr maxStreamID uint32 // max stream ID ever seen authInfo credentials.AuthInfo // auth info about the connection + inTapHandle tap.ServerInHandle // writableChan synchronizes write access to the transport. // A writer acquires the write lock by receiving a value on writableChan // and releases it by sending on writableChan. @@ -91,12 +97,13 @@ type http2Server struct { // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is // returned if something goes wrong. -func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) { +func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { framer := newFramer(conn) // Send initial settings as connection preface to client. var settings []http2.Setting // TODO(zhaoq): Have a better way to signal "no limit" because 0 is // permitted in the HTTP2 spec. + maxStreams := config.MaxStreams if maxStreams == 0 { maxStreams = math.MaxUint32 } else { @@ -121,12 +128,16 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI } var buf bytes.Buffer t := &http2Server{ + ctx: context.Background(), conn: conn, - authInfo: authInfo, + remoteAddr: conn.RemoteAddr(), + localAddr: conn.LocalAddr(), + authInfo: config.AuthInfo, framer: framer, hBuf: &buf, hEnc: hpack.NewEncoder(&buf), maxStreams: maxStreams, + inTapHandle: config.InTapHandle, controlBuf: newRecvBuffer(), fc: &inFlow{limit: initialConnWindowSize}, sendQuotaPool: newQuotaPool(defaultWindowSize), @@ -136,13 +147,21 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI activeStreams: make(map[uint32]*Stream), streamSendQuota: defaultWindowSize, } + if stats.On() { + t.ctx = stats.TagConn(t.ctx, &stats.ConnTagInfo{ + RemoteAddr: t.remoteAddr, + LocalAddr: t.localAddr, + }) + connBegin := &stats.ConnBegin{} + stats.HandleConn(t.ctx, connBegin) + } go t.controller() t.writableChan <- 0 return t, nil } // operateHeader takes action on the decoded headers. -func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) { +func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) { buf := newRecvBuffer() s := &Stream{ id: frame.Header().StreamID, @@ -168,12 +187,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( } s.recvCompress = state.encoding if state.timeoutSet { - s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout) + s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout) } else { - s.ctx, s.cancel = context.WithCancel(context.TODO()) + s.ctx, s.cancel = context.WithCancel(t.ctx) } pr := &peer.Peer{ - Addr: t.conn.RemoteAddr(), + Addr: t.remoteAddr, } // Attach Auth info if there is any. if t.authInfo != nil { @@ -195,6 +214,18 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( } s.recvCompress = state.encoding s.method = state.method + if t.inTapHandle != nil { + var err error + info := &tap.Info{ + FullMethodName: state.method, + } + s.ctx, err = t.inTapHandle(s.ctx, info) + if err != nil { + // TODO: Log the real error. + t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream}) + return + } + } t.mu.Lock() if t.state != reachable { t.mu.Unlock() @@ -218,13 +249,26 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( s.windowHandler = func(n int) { t.updateWindow(s, uint32(n)) } + s.ctx = traceCtx(s.ctx, s.method) + if stats.On() { + s.ctx = stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) + inHeader := &stats.InHeader{ + FullMethod: s.method, + RemoteAddr: t.remoteAddr, + LocalAddr: t.localAddr, + Compression: s.recvCompress, + WireLength: int(frame.Header().Length), + } + stats.HandleRPC(s.ctx, inHeader) + } handle(s) return } // HandleStreams receives incoming streams using the given handler. This is // typically run in a separate goroutine. -func (t *http2Server) HandleStreams(handle func(*Stream)) { +// traceCtx attaches trace to ctx and returns the new context. +func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { // Check the validity of client preface. preface := make([]byte, len(clientPreface)) if _, err := io.ReadFull(t.conn, preface); err != nil { @@ -279,7 +323,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) { } switch frame := frame.(type) { case *http2.MetaHeadersFrame: - if t.operateHeaders(frame, handle) { + if t.operateHeaders(frame, handle, traceCtx) { t.Close() break } @@ -492,9 +536,16 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry}) } } + bufLen := t.hBuf.Len() if err := t.writeHeaders(s, t.hBuf, false); err != nil { return err } + if stats.On() { + outHeader := &stats.OutHeader{ + WireLength: bufLen, + } + stats.HandleRPC(s.Context(), outHeader) + } t.writableChan <- 0 return nil } @@ -547,10 +598,17 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry}) } } + bufLen := t.hBuf.Len() if err := t.writeHeaders(s, t.hBuf, true); err != nil { t.Close() return err } + if stats.On() { + outTrailer := &stats.OutTrailer{ + WireLength: bufLen, + } + stats.HandleRPC(s.Context(), outTrailer) + } t.closeStream(s) t.writableChan <- 0 return nil @@ -579,19 +637,14 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { return nil } size := http2MaxFrameLen - s.sendQuotaPool.add(0) // Wait until the stream has some quota to send the data. sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire()) if err != nil { return err } - t.sendQuotaPool.add(0) // Wait until the transport has some quota to send the data. tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire()) if err != nil { - if _, ok := err.(StreamError); ok { - t.sendQuotaPool.cancel() - } return err } if sq < size { @@ -659,7 +712,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) { t.mu.Lock() defer t.mu.Unlock() for _, stream := range t.activeStreams { - stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota)) + stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota)) } t.streamSendQuota = s.Val } @@ -736,6 +789,10 @@ func (t *http2Server) Close() (err error) { for _, s := range streams { s.cancel() } + if stats.On() { + connEnd := &stats.ConnEnd{} + stats.HandleConn(t.ctx, connEnd) + } return } @@ -767,7 +824,7 @@ func (t *http2Server) closeStream(s *Stream) { } func (t *http2Server) RemoteAddr() net.Addr { - return t.conn.RemoteAddr() + return t.remoteAddr } func (t *http2Server) Drain() { -- cgit v1.2.3