From 30802e07b2d84fbc213b490d3402707dffe60096 Mon Sep 17 00:00:00 2001 From: Niall Sheridan Date: Mon, 10 Apr 2017 21:18:42 +0100 Subject: update dependencies --- .../grpc/transport/http2_client.go | 252 ++++++++++++++------- 1 file changed, 172 insertions(+), 80 deletions(-) (limited to 'vendor/google.golang.org/grpc/transport/http2_client.go') diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go index 892f8ba..5fc6b75 100644 --- a/vendor/google.golang.org/grpc/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/transport/http2_client.go @@ -35,12 +35,12 @@ package transport import ( "bytes" - "fmt" "io" "math" "net" "strings" "sync" + "sync/atomic" "time" "golang.org/x/net/context" @@ -49,9 +49,11 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" ) // http2Client implements the ClientTransport interface with HTTP2. @@ -80,6 +82,8 @@ type http2Client struct { // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) // that the server sent GoAway on this transport. goAway chan struct{} + // awakenKeepalive is used to wake up keepalive when after it has gone dormant. + awakenKeepalive chan struct{} framer *framer hBuf *bytes.Buffer // the buffer for HPACK encoding @@ -99,6 +103,11 @@ type http2Client struct { creds []credentials.PerRPCCredentials + // Boolean to keep track of reading activity on transport. + // 1 is true and 0 is false. + activity uint32 // Accessed atomically. + kp keepalive.ClientParameters + statsHandler stats.Handler mu sync.Mutex // guard the following variables @@ -178,15 +187,19 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( return nil, connectionErrorf(temp, err, "transport: %v", err) } } - ua := primaryUA - if opts.UserAgent != "" { - ua = opts.UserAgent + " " + ua + kp := opts.KeepaliveParams + // Validate keepalive parameters. + if kp.Time == 0 { + kp.Time = defaultClientKeepaliveTime + } + if kp.Timeout == 0 { + kp.Timeout = defaultClientKeepaliveTimeout } var buf bytes.Buffer t := &http2Client{ ctx: ctx, target: addr.Addr, - userAgent: ua, + userAgent: opts.UserAgent, md: addr.Metadata, conn: conn, remoteAddr: conn.RemoteAddr(), @@ -198,6 +211,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( shutdownChan: make(chan struct{}), errorChan: make(chan struct{}), goAway: make(chan struct{}), + awakenKeepalive: make(chan struct{}, 1), framer: newFramer(conn), hBuf: &buf, hEnc: hpack.NewEncoder(&buf), @@ -208,10 +222,15 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( state: reachable, activeStreams: make(map[uint32]*Stream), creds: opts.PerRPCCredentials, - maxStreams: math.MaxInt32, + maxStreams: defaultMaxStreamsClient, + streamsQuota: newQuotaPool(defaultMaxStreamsClient), streamSendQuota: defaultWindowSize, + kp: kp, statsHandler: opts.StatsHandler, } + // Make sure awakenKeepalive can't be written upon. + // keepalive routine will make it writable, if need be. + t.awakenKeepalive <- struct{}{} if t.statsHandler != nil { t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ RemoteAddr: t.remoteAddr, @@ -256,6 +275,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( } } go t.controller() + if t.kp.Time != infinity { + go t.keepalive() + } t.writableChan <- 0 return t, nil } @@ -289,7 +311,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { return s } -// NewStream creates a stream and register it into the transport as "active" +// NewStream creates a stream and registers it into the transport as "active" // streams. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { pr := &peer.Peer{ @@ -337,21 +359,18 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Unlock() return nil, ErrConnClosing } - checkStreamsQuota := t.streamsQuota != nil t.mu.Unlock() - if checkStreamsQuota { - sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire()) - if err != nil { - return nil, err - } - // Returns the quota balance back. - if sq > 1 { - t.streamsQuota.add(sq - 1) - } + sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire()) + if err != nil { + return nil, err + } + // Returns the quota balance back. + if sq > 1 { + t.streamsQuota.add(sq - 1) } if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { // Return the quota back now because there is no stream returned to the caller. - if _, ok := err.(StreamError); ok && checkStreamsQuota { + if _, ok := err.(StreamError); ok { t.streamsQuota.add(1) } return nil, err @@ -359,9 +378,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Lock() if t.state == draining { t.mu.Unlock() - if checkStreamsQuota { - t.streamsQuota.add(1) - } + t.streamsQuota.add(1) // Need to make t writable again so that the rpc in flight can still proceed. t.writableChan <- 0 return nil, ErrStreamDrain @@ -373,17 +390,17 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea s := t.newStream(ctx, callHdr) s.clientStatsCtx = userCtx t.activeStreams[s.id] = s - - // This stream is not counted when applySetings(...) initialize t.streamsQuota. - // Reset t.streamsQuota to the right value. - var reset bool - if !checkStreamsQuota && t.streamsQuota != nil { - reset = true + // If the number of active streams change from 0 to 1, then check if keepalive + // has gone dormant. If so, wake it up. + if len(t.activeStreams) == 1 { + select { + case t.awakenKeepalive <- struct{}{}: + t.framer.writePing(false, false, [8]byte{}) + default: + } } + t.mu.Unlock() - if reset { - t.streamsQuota.add(-1) - } // HPACK encodes various headers. Note that once WriteField(...) is // called, the corresponding headers/continuation frame has to be sent @@ -415,7 +432,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea hasMD bool endHeaders bool ) - if md, ok := metadata.FromContext(ctx); ok { + if md, ok := metadata.FromOutgoingContext(ctx); ok { hasMD = true for k, v := range md { // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. @@ -491,15 +508,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea // CloseStream clears the footprint of a stream when the stream is not needed any more. // This must not be executed in reader's goroutine. func (t *http2Client) CloseStream(s *Stream, err error) { - var updateStreams bool t.mu.Lock() if t.activeStreams == nil { t.mu.Unlock() return } - if t.streamsQuota != nil { - updateStreams = true - } delete(t.activeStreams, s.id) if t.state == draining && len(t.activeStreams) == 0 { // The transport is draining and s is the last live stream on t. @@ -508,10 +521,27 @@ func (t *http2Client) CloseStream(s *Stream, err error) { return } t.mu.Unlock() - if updateStreams { - t.streamsQuota.add(1) - } + // rstStream is true in case the stream is being closed at the client-side + // and the server needs to be intimated about it by sending a RST_STREAM + // frame. + // To make sure this frame is written to the wire before the headers of the + // next stream waiting for streamsQuota, we add to streamsQuota pool only + // after having acquired the writableChan to send RST_STREAM out (look at + // the controller() routine). + var rstStream bool + var rstError http2.ErrCode + defer func() { + // In case, the client doesn't have to send RST_STREAM to server + // we can safely add back to streamsQuota pool now. + if !rstStream { + t.streamsQuota.add(1) + return + } + t.controlBuf.put(&resetStream{s.id, rstError}) + }() s.mu.Lock() + rstStream = s.rstStream + rstError = s.rstError if q := s.fc.resetPendingData(); q > 0 { if n := t.fc.onRead(q); n > 0 { t.controlBuf.put(&windowUpdate{0, n}) @@ -527,8 +557,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) { } s.state = streamDone s.mu.Unlock() - if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded { - t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel}) + if _, ok := err.(StreamError); ok { + rstStream = true + rstError = http2.ErrCodeCancel } } @@ -742,7 +773,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) { } func (t *http2Client) handleData(f *http2.DataFrame) { - size := len(f.Data()) + size := f.Header().Length if err := t.fc.onData(uint32(size)); err != nil { t.notifyError(connectionErrorf(true, err, "%v", err)) return @@ -756,6 +787,11 @@ func (t *http2Client) handleData(f *http2.DataFrame) { return } if size > 0 { + if f.Header().Flags.Has(http2.FlagDataPadded) { + if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { + t.controlBuf.put(&windowUpdate{0, w}) + } + } s.mu.Lock() if s.state == streamDone { s.mu.Unlock() @@ -766,22 +802,27 @@ func (t *http2Client) handleData(f *http2.DataFrame) { return } if err := s.fc.onData(uint32(size)); err != nil { - s.state = streamDone - s.statusCode = codes.Internal - s.statusDesc = err.Error() - close(s.done) + s.rstStream = true + s.rstError = http2.ErrCodeFlowControl + s.finish(status.New(codes.Internal, err.Error())) s.mu.Unlock() s.write(recvMsg{err: io.EOF}) - t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) return } + if f.Header().Flags.Has(http2.FlagDataPadded) { + if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { + t.controlBuf.put(&windowUpdate{s.id, w}) + } + } s.mu.Unlock() // TODO(bradfitz, zhaoq): A copy is required here because there is no // guarantee f.Data() is consumed before the arrival of next frame. // Can this copy be eliminated? - data := make([]byte, size) - copy(data, f.Data()) - s.write(recvMsg{data: data}) + if len(f.Data()) > 0 { + data := make([]byte, len(f.Data())) + copy(data, f.Data()) + s.write(recvMsg{data: data}) + } } // The server has closed the stream without sending trailers. Record that // the read direction is closed, and set the status appropriately. @@ -791,10 +832,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) { s.mu.Unlock() return } - s.state = streamDone - s.statusCode = codes.Internal - s.statusDesc = "server closed the stream without sending trailers" - close(s.done) + s.finish(status.New(codes.Internal, "server closed the stream without sending trailers")) s.mu.Unlock() s.write(recvMsg{err: io.EOF}) } @@ -810,18 +848,16 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { s.mu.Unlock() return } - s.state = streamDone if !s.headerDone { close(s.headerChan) s.headerDone = true } - s.statusCode, ok = http2ErrConvTab[http2.ErrCode(f.ErrCode)] + statusCode, ok := http2ErrConvTab[http2.ErrCode(f.ErrCode)] if !ok { grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode) - s.statusCode = codes.Unknown + statusCode = codes.Unknown } - s.statusDesc = fmt.Sprintf("stream terminated by RST_STREAM with error code: %d", f.ErrCode) - close(s.done) + s.finish(status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %d", f.ErrCode)) s.mu.Unlock() s.write(recvMsg{err: io.EOF}) } @@ -849,6 +885,9 @@ func (t *http2Client) handlePing(f *http2.PingFrame) { } func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { + if f.ErrCode == http2.ErrCodeEnhanceYourCalm { + grpclog.Printf("Client received GoAway with http2.ErrCodeEnhanceYourCalm.") + } t.mu.Lock() if t.state == reachable || t.state == draining { if f.LastStreamID > 0 && f.LastStreamID%2 != 1 { @@ -897,18 +936,17 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } var state decodeState for _, hf := range frame.Fields { - state.processHeaderField(hf) - } - if state.err != nil { - s.mu.Lock() - if !s.headerDone { - close(s.headerChan) - s.headerDone = true + if err := state.processHeaderField(hf); err != nil { + s.mu.Lock() + if !s.headerDone { + close(s.headerChan) + s.headerDone = true + } + s.mu.Unlock() + s.write(recvMsg{err: err}) + // Something wrong. Stops reading even when there is remaining. + return } - s.mu.Unlock() - s.write(recvMsg{err: state.err}) - // Something wrong. Stops reading even when there is remaining. - return } endStream := frame.StreamEnded() @@ -951,10 +989,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { if len(state.mdata) > 0 { s.trailer = state.mdata } - s.statusCode = state.statusCode - s.statusDesc = state.statusDesc - close(s.done) - s.state = streamDone + s.finish(state.status()) s.mu.Unlock() s.write(recvMsg{err: io.EOF}) } @@ -982,6 +1017,7 @@ func (t *http2Client) reader() { t.notifyError(err) return } + atomic.CompareAndSwapUint32(&t.activity, 0, 1) sf, ok := frame.(*http2.SettingsFrame) if !ok { t.notifyError(err) @@ -992,6 +1028,7 @@ func (t *http2Client) reader() { // loop to keep reading incoming messages on this transport. for { frame, err := t.framer.readFrame() + atomic.CompareAndSwapUint32(&t.activity, 0, 1) if err != nil { // Abort an active stream if the http2.Framer returns a // http2.StreamError. This can happen only if the server's response @@ -1043,16 +1080,10 @@ func (t *http2Client) applySettings(ss []http2.Setting) { s.Val = math.MaxInt32 } t.mu.Lock() - reset := t.streamsQuota != nil - if !reset { - t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams)) - } ms := t.maxStreams t.maxStreams = int(s.Val) t.mu.Unlock() - if reset { - t.streamsQuota.add(int(s.Val) - ms) - } + t.streamsQuota.add(int(s.Val) - ms) case http2.SettingInitialWindowSize: t.mu.Lock() for _, stream := range t.activeStreams { @@ -1085,6 +1116,12 @@ func (t *http2Client) controller() { t.framer.writeSettings(true, i.ss...) } case *resetStream: + // If the server needs to be to intimated about stream closing, + // then we need to make sure the RST_STREAM frame is written to + // the wire before the headers of the next stream waiting on + // streamQuota. We ensure this by adding to the streamsQuota pool + // only after having acquired the writableChan to send RST_STREAM. + t.streamsQuota.add(1) t.framer.writeRSTStream(true, i.streamID, i.code) case *flushIO: t.framer.flushWrite() @@ -1104,6 +1141,61 @@ func (t *http2Client) controller() { } } +// keepalive running in a separate goroutune makes sure the connection is alive by sending pings. +func (t *http2Client) keepalive() { + p := &ping{data: [8]byte{}} + timer := time.NewTimer(t.kp.Time) + for { + select { + case <-timer.C: + if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { + timer.Reset(t.kp.Time) + continue + } + // Check if keepalive should go dormant. + t.mu.Lock() + if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { + // Make awakenKeepalive writable. + <-t.awakenKeepalive + t.mu.Unlock() + select { + case <-t.awakenKeepalive: + // If the control gets here a ping has been sent + // need to reset the timer with keepalive.Timeout. + case <-t.shutdownChan: + return + } + } else { + t.mu.Unlock() + // Send ping. + t.controlBuf.put(p) + } + + // By the time control gets here a ping has been sent one way or the other. + timer.Reset(t.kp.Timeout) + select { + case <-timer.C: + if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { + timer.Reset(t.kp.Time) + continue + } + t.Close() + return + case <-t.shutdownChan: + if !timer.Stop() { + <-timer.C + } + return + } + case <-t.shutdownChan: + if !timer.Stop() { + <-timer.C + } + return + } + } +} + func (t *http2Client) Error() <-chan struct{} { return t.errorChan } -- cgit v1.2.3