From 7b320119ba532fd409ec7dade7ad02011c309599 Mon Sep 17 00:00:00 2001 From: Niall Sheridan Date: Wed, 18 Oct 2017 13:15:14 +0100 Subject: Update dependencies --- .../grpc/transport/http2_client.go | 969 ++++++++++++--------- 1 file changed, 535 insertions(+), 434 deletions(-) (limited to 'vendor/google.golang.org/grpc/transport/http2_client.go') diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go index 5fc6b75..6ca6cc6 100644 --- a/vendor/google.golang.org/grpc/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/transport/http2_client.go @@ -1,33 +1,18 @@ /* * - * Copyright 2014, Google Inc. - * All rights reserved. + * Copyright 2014 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -48,7 +33,6 @@ import ( "golang.org/x/net/http2/hpack" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" - "google.golang.org/grpc/grpclog" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" @@ -59,6 +43,7 @@ import ( // http2Client implements the ClientTransport interface with HTTP2. type http2Client struct { ctx context.Context + cancel context.CancelFunc target string // server name/addr userAgent string md interface{} @@ -68,17 +53,6 @@ type http2Client struct { authInfo credentials.AuthInfo // auth info about the connection nextID uint32 // the next stream ID to be used - // writableChan synchronizes write access to the transport. - // A writer acquires the write lock by sending a value on writableChan - // and releases it by receiving from writableChan. - writableChan chan int - // shutdownChan is closed when Close is called. - // Blocking operations should select on shutdownChan to avoid - // blocking forever after Close. - // TODO(zhaoq): Maybe have a channel context? - shutdownChan chan struct{} - // errorChan is closed to notify the I/O error to the caller. - errorChan chan struct{} // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) // that the server sent GoAway on this transport. goAway chan struct{} @@ -91,7 +65,7 @@ type http2Client struct { // controlBuf delivers all the control related tasks (e.g., window // updates, reset streams, and various settings) to the controller. - controlBuf *recvBuffer + controlBuf *controlBuffer fc *inFlow // sendQuotaPool provides flow control to outbound message. sendQuotaPool *quotaPool @@ -101,6 +75,8 @@ type http2Client struct { // The scheme used: https if TLS is on, http otherwise. scheme string + isSecure bool + creds []credentials.PerRPCCredentials // Boolean to keep track of reading activity on transport. @@ -110,6 +86,11 @@ type http2Client struct { statsHandler stats.Handler + initialWindowSize int32 + + bdpEst *bdpEstimator + outQuotaVersion uint32 + mu sync.Mutex // guard the following variables state transportState // the state of underlying connection activeStreams map[uint32]*Stream @@ -117,17 +98,18 @@ type http2Client struct { maxStreams int // the per-stream outbound flow control window size set by the peer. streamSendQuota uint32 - // goAwayID records the Last-Stream-ID in the GoAway frame from the server. - goAwayID uint32 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. prevGoAwayID uint32 + // goAwayReason records the http2.ErrCode and debug data received with the + // GoAway frame. + goAwayReason GoAwayReason } func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { if fn != nil { return fn(ctx, addr) } - return dialContext(ctx, "tcp", addr) + return (&net.Dialer{}).DialContext(ctx, "tcp", addr) } func isTemporary(err error) bool { @@ -161,14 +143,23 @@ func isTemporary(err error) bool { // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 // and starts to receive messages on it. Non-nil error returns if construction // fails. -func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (_ ClientTransport, err error) { +func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, timeout time.Duration) (_ ClientTransport, err error) { scheme := "http" - conn, err := dial(ctx, opts.Dialer, addr.Addr) + ctx, cancel := context.WithCancel(ctx) + connectCtx, connectCancel := context.WithTimeout(ctx, timeout) + defer func() { + connectCancel() + if err != nil { + cancel() + } + }() + + conn, err := dial(connectCtx, opts.Dialer, addr.Addr) if err != nil { if opts.FailOnNonTempDialError { - return nil, connectionErrorf(isTemporary(err), err, "transport: %v", err) + return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err) } - return nil, connectionErrorf(true, err, "transport: %v", err) + return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err) } // Any further errors will close the underlying connection defer func(conn net.Conn) { @@ -176,16 +167,20 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( conn.Close() } }(conn) - var authInfo credentials.AuthInfo + var ( + isSecure bool + authInfo credentials.AuthInfo + ) if creds := opts.TransportCredentials; creds != nil { scheme = "https" - conn, authInfo, err = creds.ClientHandshake(ctx, addr.Addr, conn) + conn, authInfo, err = creds.ClientHandshake(connectCtx, addr.Addr, conn) if err != nil { // Credentials handshake errors are typically considered permanent // to avoid retrying on e.g. bad certificates. temp := isTemporary(err) - return nil, connectionErrorf(temp, err, "transport: %v", err) + return nil, connectionErrorf(temp, err, "transport: authentication handshake failed: %v", err) } + isSecure = true } kp := opts.KeepaliveParams // Validate keepalive parameters. @@ -195,9 +190,24 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( if kp.Timeout == 0 { kp.Timeout = defaultClientKeepaliveTimeout } + dynamicWindow := true + icwz := int32(initialWindowSize) + if opts.InitialConnWindowSize >= defaultWindowSize { + icwz = opts.InitialConnWindowSize + dynamicWindow = false + } var buf bytes.Buffer + writeBufSize := defaultWriteBufSize + if opts.WriteBufferSize > 0 { + writeBufSize = opts.WriteBufferSize + } + readBufSize := defaultReadBufSize + if opts.ReadBufferSize > 0 { + readBufSize = opts.ReadBufferSize + } t := &http2Client{ ctx: ctx, + cancel: cancel, target: addr.Addr, userAgent: opts.UserAgent, md: addr.Metadata, @@ -206,27 +216,36 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( localAddr: conn.LocalAddr(), authInfo: authInfo, // The client initiated stream id is odd starting from 1. - nextID: 1, - writableChan: make(chan int, 1), - shutdownChan: make(chan struct{}), - errorChan: make(chan struct{}), - goAway: make(chan struct{}), - awakenKeepalive: make(chan struct{}, 1), - framer: newFramer(conn), - hBuf: &buf, - hEnc: hpack.NewEncoder(&buf), - controlBuf: newRecvBuffer(), - fc: &inFlow{limit: initialConnWindowSize}, - sendQuotaPool: newQuotaPool(defaultWindowSize), - scheme: scheme, - state: reachable, - activeStreams: make(map[uint32]*Stream), - creds: opts.PerRPCCredentials, - maxStreams: defaultMaxStreamsClient, - streamsQuota: newQuotaPool(defaultMaxStreamsClient), - streamSendQuota: defaultWindowSize, - kp: kp, - statsHandler: opts.StatsHandler, + nextID: 1, + goAway: make(chan struct{}), + awakenKeepalive: make(chan struct{}, 1), + hBuf: &buf, + hEnc: hpack.NewEncoder(&buf), + framer: newFramer(conn, writeBufSize, readBufSize), + controlBuf: newControlBuffer(), + fc: &inFlow{limit: uint32(icwz)}, + sendQuotaPool: newQuotaPool(defaultWindowSize), + scheme: scheme, + state: reachable, + activeStreams: make(map[uint32]*Stream), + isSecure: isSecure, + creds: opts.PerRPCCredentials, + maxStreams: defaultMaxStreamsClient, + streamsQuota: newQuotaPool(defaultMaxStreamsClient), + streamSendQuota: defaultWindowSize, + kp: kp, + statsHandler: opts.StatsHandler, + initialWindowSize: initialWindowSize, + } + if opts.InitialWindowSize >= defaultWindowSize { + t.initialWindowSize = opts.InitialWindowSize + dynamicWindow = false + } + if dynamicWindow { + t.bdpEst = &bdpEstimator{ + bdp: initialWindowSize, + updateFlowControl: t.updateFlowControl, + } } // Make sure awakenKeepalive can't be written upon. // keepalive routine will make it writable, if need be. @@ -249,65 +268,75 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( n, err := t.conn.Write(clientPreface) if err != nil { t.Close() - return nil, connectionErrorf(true, err, "transport: %v", err) + return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err) } if n != len(clientPreface) { t.Close() return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) } - if initialWindowSize != defaultWindowSize { - err = t.framer.writeSettings(true, http2.Setting{ + if t.initialWindowSize != defaultWindowSize { + err = t.framer.fr.WriteSettings(http2.Setting{ ID: http2.SettingInitialWindowSize, - Val: uint32(initialWindowSize), + Val: uint32(t.initialWindowSize), }) } else { - err = t.framer.writeSettings(true) + err = t.framer.fr.WriteSettings() } if err != nil { t.Close() - return nil, connectionErrorf(true, err, "transport: %v", err) + return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err) } // Adjust the connection flow control window if needed. - if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 { - if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil { + if delta := uint32(icwz - defaultWindowSize); delta > 0 { + if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil { t.Close() - return nil, connectionErrorf(true, err, "transport: %v", err) + return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err) } } - go t.controller() + t.framer.writer.Flush() + go func() { + loopyWriter(t.ctx, t.controlBuf, t.itemHandler) + t.Close() + }() if t.kp.Time != infinity { go t.keepalive() } - t.writableChan <- 0 return t, nil } func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { // TODO(zhaoq): Handle uint32 overflow of Stream.id. s := &Stream{ - id: t.nextID, - done: make(chan struct{}), - goAway: make(chan struct{}), - method: callHdr.Method, - sendCompress: callHdr.SendCompress, - buf: newRecvBuffer(), - fc: &inFlow{limit: initialWindowSize}, - sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), - headerChan: make(chan struct{}), + id: t.nextID, + done: make(chan struct{}), + goAway: make(chan struct{}), + method: callHdr.Method, + sendCompress: callHdr.SendCompress, + buf: newRecvBuffer(), + fc: &inFlow{limit: uint32(t.initialWindowSize)}, + sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), + localSendQuota: newQuotaPool(defaultLocalSendQuota), + headerChan: make(chan struct{}), } t.nextID += 2 - s.windowHandler = func(n int) { - t.updateWindow(s, uint32(n)) + s.requestRead = func(n int) { + t.adjustWindow(s, uint32(n)) } // The client side stream context should have exactly the same life cycle with the user provided context. // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done. // So we use the original context here instead of creating a copy. s.ctx = ctx - s.dec = &recvBufferReader{ - ctx: s.ctx, - goAway: s.goAway, - recv: s.buf, + s.trReader = &transportReader{ + reader: &recvBufferReader{ + ctx: s.ctx, + goAway: s.goAway, + recv: s.buf, + }, + windowHandler: func(n int) { + t.updateWindow(s, uint32(n)) + }, } + return s } @@ -321,31 +350,51 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea if t.authInfo != nil { pr.AuthInfo = t.authInfo } - userCtx := ctx ctx = peer.NewContext(ctx, pr) - authData := make(map[string]string) - for _, c := range t.creds { + var ( + authData = make(map[string]string) + audience string + ) + // Create an audience string only if needed. + if len(t.creds) > 0 || callHdr.Creds != nil { // Construct URI required to get auth request metadata. - var port string - if pos := strings.LastIndex(t.target, ":"); pos != -1 { - // Omit port if it is the default one. - if t.target[pos+1:] != "443" { - port = ":" + t.target[pos+1:] - } - } + // Omit port if it is the default one. + host := strings.TrimSuffix(callHdr.Host, ":443") pos := strings.LastIndex(callHdr.Method, "/") if pos == -1 { - return nil, streamErrorf(codes.InvalidArgument, "transport: malformed method name: %q", callHdr.Method) + pos = len(callHdr.Method) } - audience := "https://" + callHdr.Host + port + callHdr.Method[:pos] + audience = "https://" + host + callHdr.Method[:pos] + } + for _, c := range t.creds { data, err := c.GetRequestMetadata(ctx, audience) if err != nil { - return nil, streamErrorf(codes.InvalidArgument, "transport: %v", err) + return nil, streamErrorf(codes.Internal, "transport: %v", err) } for k, v := range data { + // Capital header names are illegal in HTTP/2. + k = strings.ToLower(k) authData[k] = v } } + callAuthData := map[string]string{} + // Check if credentials.PerRPCCredentials were provided via call options. + // Note: if these credentials are provided both via dial options and call + // options, then both sets of credentials will be applied. + if callCreds := callHdr.Creds; callCreds != nil { + if !t.isSecure && callCreds.RequireTransportSecurity() { + return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection") + } + data, err := callCreds.GetRequestMetadata(ctx, audience) + if err != nil { + return nil, streamErrorf(codes.Internal, "transport: %v", err) + } + for k, v := range data { + // Capital header names are illegal in HTTP/2 + k = strings.ToLower(k) + callAuthData[k] = v + } + } t.mu.Lock() if t.activeStreams == nil { t.mu.Unlock() @@ -360,7 +409,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea return nil, ErrConnClosing } t.mu.Unlock() - sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire()) + sq, err := wait(ctx, t.ctx, nil, nil, t.streamsQuota.acquire()) if err != nil { return nil, err } @@ -368,140 +417,108 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea if sq > 1 { t.streamsQuota.add(sq - 1) } - if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { - // Return the quota back now because there is no stream returned to the caller. - if _, ok := err.(StreamError); ok { - t.streamsQuota.add(1) - } - return nil, err - } - t.mu.Lock() - if t.state == draining { - t.mu.Unlock() - t.streamsQuota.add(1) - // Need to make t writable again so that the rpc in flight can still proceed. - t.writableChan <- 0 - return nil, ErrStreamDrain - } - if t.state != reachable { - t.mu.Unlock() - return nil, ErrConnClosing - } - s := t.newStream(ctx, callHdr) - s.clientStatsCtx = userCtx - t.activeStreams[s.id] = s - // If the number of active streams change from 0 to 1, then check if keepalive - // has gone dormant. If so, wake it up. - if len(t.activeStreams) == 1 { - select { - case t.awakenKeepalive <- struct{}{}: - t.framer.writePing(false, false, [8]byte{}) - default: - } - } - - t.mu.Unlock() - - // HPACK encodes various headers. Note that once WriteField(...) is - // called, the corresponding headers/continuation frame has to be sent - // because hpack.Encoder is stateful. - t.hBuf.Reset() - t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"}) - t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme}) - t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method}) - t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host}) - t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) - t.hEnc.WriteField(hpack.HeaderField{Name: "user-agent", Value: t.userAgent}) - t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"}) + // TODO(mmukhi): Benchmark if the perfomance gets better if count the metadata and other header fields + // first and create a slice of that exact size. + // Make the slice of certain predictable size to reduce allocations made by append. + hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te + hfLen += len(authData) + len(callAuthData) + headerFields := make([]hpack.HeaderField, 0, hfLen) + headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"}) + headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme}) + headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method}) + headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host}) + headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) + headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent}) + headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"}) if callHdr.SendCompress != "" { - t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress}) + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress}) } if dl, ok := ctx.Deadline(); ok { // Send out timeout regardless its value. The server can detect timeout context by itself. + // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire. timeout := dl.Sub(time.Now()) - t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)}) + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)}) } - for k, v := range authData { - // Capital header names are illegal in HTTP/2. - k = strings.ToLower(k) - t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: v}) + headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) + } + for k, v := range callAuthData { + headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) + } + if b := stats.OutgoingTags(ctx); b != nil { + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)}) + } + if b := stats.OutgoingTrace(ctx); b != nil { + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)}) } - var ( - hasMD bool - endHeaders bool - ) if md, ok := metadata.FromOutgoingContext(ctx); ok { - hasMD = true - for k, v := range md { + for k, vv := range md { // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. if isReservedHeader(k) { continue } - for _, entry := range v { - t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry}) + for _, v := range vv { + headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) } } } if md, ok := t.md.(*metadata.MD); ok { - for k, v := range *md { + for k, vv := range *md { if isReservedHeader(k) { continue } - for _, entry := range v { - t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry}) + for _, v := range vv { + headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) } } } - first := true - bufLen := t.hBuf.Len() - // Sends the headers in a single batch even when they span multiple frames. - for !endHeaders { - size := t.hBuf.Len() - if size > http2MaxFrameLen { - size = http2MaxFrameLen - } else { - endHeaders = true - } - var flush bool - if endHeaders && (hasMD || callHdr.Flush) { - flush = true - } - if first { - // Sends a HeadersFrame to server to start a new stream. - p := http2.HeadersFrameParam{ - StreamID: s.id, - BlockFragment: t.hBuf.Next(size), - EndStream: false, - EndHeaders: endHeaders, - } - // Do a force flush for the buffered frames iff it is the last headers frame - // and there is header metadata to be sent. Otherwise, there is flushing until - // the corresponding data frame is written. - err = t.framer.writeHeaders(flush, p) - first = false - } else { - // Sends Continuation frames for the leftover headers. - err = t.framer.writeContinuation(flush, s.id, endHeaders, t.hBuf.Next(size)) - } - if err != nil { - t.notifyError(err) - return nil, connectionErrorf(true, err, "transport: %v", err) + t.mu.Lock() + if t.state == draining { + t.mu.Unlock() + t.streamsQuota.add(1) + return nil, ErrStreamDrain + } + if t.state != reachable { + t.mu.Unlock() + return nil, ErrConnClosing + } + s := t.newStream(ctx, callHdr) + t.activeStreams[s.id] = s + // If the number of active streams change from 0 to 1, then check if keepalive + // has gone dormant. If so, wake it up. + if len(t.activeStreams) == 1 { + select { + case t.awakenKeepalive <- struct{}{}: + t.controlBuf.put(&ping{data: [8]byte{}}) + // Fill the awakenKeepalive channel again as this channel must be + // kept non-writable except at the point that the keepalive() + // goroutine is waiting either to be awaken or shutdown. + t.awakenKeepalive <- struct{}{} + default: } } + t.controlBuf.put(&headerFrame{ + streamID: s.id, + hf: headerFields, + endStream: false, + }) + t.mu.Unlock() + + s.mu.Lock() + s.bytesSent = true + s.mu.Unlock() + if t.statsHandler != nil { outHeader := &stats.OutHeader{ Client: true, - WireLength: bufLen, FullMethod: callHdr.Method, RemoteAddr: t.remoteAddr, LocalAddr: t.localAddr, Compression: callHdr.SendCompress, } - t.statsHandler.HandleRPC(s.clientStatsCtx, outHeader) + t.statsHandler.HandleRPC(s.ctx, outHeader) } - t.writableChan <- 0 return s, nil } @@ -513,6 +530,10 @@ func (t *http2Client) CloseStream(s *Stream, err error) { t.mu.Unlock() return } + if err != nil { + // notify in-flight streams, before the deletion + s.write(recvMsg{err: err}) + } delete(t.activeStreams, s.id) if t.state == draining && len(t.activeStreams) == 0 { // The transport is draining and s is the last live stream on t. @@ -542,11 +563,6 @@ func (t *http2Client) CloseStream(s *Stream, err error) { s.mu.Lock() rstStream = s.rstStream rstError = s.rstError - if q := s.fc.resetPendingData(); q > 0 { - if n := t.fc.onRead(q); n > 0 { - t.controlBuf.put(&windowUpdate{0, n}) - } - } if s.state == streamDone { s.mu.Unlock() return @@ -572,12 +588,9 @@ func (t *http2Client) Close() (err error) { t.mu.Unlock() return } - if t.state == reachable || t.state == draining { - close(t.errorChan) - } t.state = closing t.mu.Unlock() - close(t.shutdownChan) + t.cancel() err = t.conn.Close() t.mu.Lock() streams := t.activeStreams @@ -599,41 +612,18 @@ func (t *http2Client) Close() (err error) { } t.statsHandler.HandleConn(t.ctx, connEnd) } - return + return err } +// GracefulClose sets the state to draining, which prevents new streams from +// being created and causes the transport to be closed when the last active +// stream is closed. If there are no active streams, the transport is closed +// immediately. This does nothing if the transport is already draining or +// closing. func (t *http2Client) GracefulClose() error { t.mu.Lock() switch t.state { - case unreachable: - // The server may close the connection concurrently. t is not available for - // any streams. Close it now. - t.mu.Unlock() - t.Close() - return nil - case closing: - t.mu.Unlock() - return nil - } - // Notify the streams which were initiated after the server sent GOAWAY. - select { - case <-t.goAway: - n := t.prevGoAwayID - if n == 0 && t.nextID > 1 { - n = t.nextID - 2 - } - m := t.goAwayID + 2 - if m == 2 { - m = 1 - } - for i := m; i <= n; i += 2 { - if s, ok := t.activeStreams[i]; ok { - close(s.goAway) - } - } - default: - } - if t.state == draining { + case closing, draining: t.mu.Unlock() return nil } @@ -648,21 +638,38 @@ func (t *http2Client) GracefulClose() error { // Write formats the data into HTTP2 data frame(s) and sends it out. The caller // should proceed only if Write returns nil. -// TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later -// if it improves the performance. -func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { - r := bytes.NewBuffer(data) - for { - var p []byte - if r.Len() > 0 { +func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { + select { + case <-s.ctx.Done(): + return ContextErr(s.ctx.Err()) + case <-t.ctx.Done(): + return ErrConnClosing + default: + } + + if hdr == nil && data == nil && opts.Last { + // stream.CloseSend uses this to send an empty frame with endStream=True + t.controlBuf.put(&dataFrame{streamID: s.id, endStream: true, f: func() {}}) + return nil + } + // Add data to header frame so that we can equally distribute data across frames. + emptyLen := http2MaxFrameLen - len(hdr) + if emptyLen > len(data) { + emptyLen = len(data) + } + hdr = append(hdr, data[:emptyLen]...) + data = data[emptyLen:] + for idx, r := range [][]byte{hdr, data} { + for len(r) > 0 { size := http2MaxFrameLen // Wait until the stream has some quota to send the data. - sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire()) + quotaChan, quotaVer := s.sendQuotaPool.acquireWithVersion() + sq, err := wait(s.ctx, t.ctx, s.done, s.goAway, quotaChan) if err != nil { return err } // Wait until the transport has some quota to send the data. - tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire()) + tq, err := wait(s.ctx, t.ctx, s.done, s.goAway, t.sendQuotaPool.acquire()) if err != nil { return err } @@ -672,69 +679,51 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { if tq < size { size = tq } - p = r.Next(size) - ps := len(p) - if ps < sq { - // Overbooked stream quota. Return it back. - s.sendQuotaPool.add(sq - ps) + if size > len(r) { + size = len(r) } + p := r[:size] + ps := len(p) if ps < tq { // Overbooked transport quota. Return it back. t.sendQuotaPool.add(tq - ps) } - } - var ( - endStream bool - forceFlush bool - ) - if opts.Last && r.Len() == 0 { - endStream = true - } - // Indicate there is a writer who is about to write a data frame. - t.framer.adjustNumWriters(1) - // Got some quota. Try to acquire writing privilege on the transport. - if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil { - if _, ok := err.(StreamError); ok || err == io.EOF { - // Return the connection quota back. - t.sendQuotaPool.add(len(p)) + // Acquire local send quota to be able to write to the controlBuf. + ltq, err := wait(s.ctx, t.ctx, s.done, s.goAway, s.localSendQuota.acquire()) + if err != nil { + if _, ok := err.(ConnectionError); !ok { + t.sendQuotaPool.add(ps) + } + return err } - if t.framer.adjustNumWriters(-1) == 0 { - // This writer is the last one in this batch and has the - // responsibility to flush the buffered frames. It queues - // a flush request to controlBuf instead of flushing directly - // in order to avoid the race with other writing or flushing. - t.controlBuf.put(&flushIO{}) + s.localSendQuota.add(ltq - ps) // It's ok if we make it negative. + var endStream bool + // See if this is the last frame to be written. + if opts.Last { + if len(r)-size == 0 { // No more data in r after this iteration. + if idx == 0 { // We're writing data header. + if len(data) == 0 { // There's no data to follow. + endStream = true + } + } else { // We're writing data. + endStream = true + } + } } - return err - } - select { - case <-s.ctx.Done(): - t.sendQuotaPool.add(len(p)) - if t.framer.adjustNumWriters(-1) == 0 { - t.controlBuf.put(&flushIO{}) + success := func() { + t.controlBuf.put(&dataFrame{streamID: s.id, endStream: endStream, d: p, f: func() { s.localSendQuota.add(ps) }}) + if ps < sq { + s.sendQuotaPool.lockedAdd(sq - ps) + } + r = r[ps:] + } + failure := func() { + s.sendQuotaPool.lockedAdd(sq) + } + if !s.sendQuotaPool.compareAndExecute(quotaVer, success, failure) { + t.sendQuotaPool.add(ps) + s.localSendQuota.add(ps) } - t.writableChan <- 0 - return ContextErr(s.ctx.Err()) - default: - } - if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 { - // Do a force flush iff this is last frame for the entire gRPC message - // and the caller is the only writer at this moment. - forceFlush = true - } - // If WriteData fails, all the pending streams will be handled - // by http2Client.Close(). No explicit CloseStream() needs to be - // invoked. - if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil { - t.notifyError(err) - return connectionErrorf(true, err, "transport: %v", err) - } - if t.framer.adjustNumWriters(-1) == 0 { - t.framer.flushWrite() - } - t.writableChan <- 0 - if r.Len() == 0 { - break } } if !opts.Last { @@ -755,6 +744,24 @@ func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) { return s, ok } +// adjustWindow sends out extra window update over the initial window size +// of stream if the application is requesting data larger in size than +// the window. +func (t *http2Client) adjustWindow(s *Stream, n uint32) { + s.mu.Lock() + defer s.mu.Unlock() + if s.state == streamDone { + return + } + if w := s.fc.maybeAdjust(n); w > 0 { + // Piggyback connection's window update along. + if cw := t.fc.resetPendingUpdate(); cw > 0 { + t.controlBuf.put(&windowUpdate{0, cw}) + } + t.controlBuf.put(&windowUpdate{s.id, w}) + } +} + // updateWindow adjusts the inbound quota for the stream and the transport. // Window updates will deliver to the controller for sending when // the cumulative quota exceeds the corresponding threshold. @@ -764,41 +771,76 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) { if s.state == streamDone { return } - if w := t.fc.onRead(n); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } if w := s.fc.onRead(n); w > 0 { + if cw := t.fc.resetPendingUpdate(); cw > 0 { + t.controlBuf.put(&windowUpdate{0, cw}) + } t.controlBuf.put(&windowUpdate{s.id, w}) } } +// updateFlowControl updates the incoming flow control windows +// for the transport and the stream based on the current bdp +// estimation. +func (t *http2Client) updateFlowControl(n uint32) { + t.mu.Lock() + for _, s := range t.activeStreams { + s.fc.newLimit(n) + } + t.initialWindowSize = int32(n) + t.mu.Unlock() + t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n)}) + t.controlBuf.put(&settings{ + ack: false, + ss: []http2.Setting{ + { + ID: http2.SettingInitialWindowSize, + Val: uint32(n), + }, + }, + }) +} + func (t *http2Client) handleData(f *http2.DataFrame) { size := f.Header().Length - if err := t.fc.onData(uint32(size)); err != nil { - t.notifyError(connectionErrorf(true, err, "%v", err)) - return + var sendBDPPing bool + if t.bdpEst != nil { + sendBDPPing = t.bdpEst.add(uint32(size)) + } + // Decouple connection's flow control from application's read. + // An update on connection's flow control should not depend on + // whether user application has read the data or not. Such a + // restriction is already imposed on the stream's flow control, + // and therefore the sender will be blocked anyways. + // Decoupling the connection flow control will prevent other + // active(fast) streams from starving in presence of slow or + // inactive streams. + // + // Furthermore, if a bdpPing is being sent out we can piggyback + // connection's window update for the bytes we just received. + if sendBDPPing { + if size != 0 { // Could've been an empty data frame. + t.controlBuf.put(&windowUpdate{0, uint32(size)}) + } + t.controlBuf.put(bdpPing) + } else { + if err := t.fc.onData(uint32(size)); err != nil { + t.Close() + return + } + if w := t.fc.onRead(uint32(size)); w > 0 { + t.controlBuf.put(&windowUpdate{0, w}) + } } // Select the right stream to dispatch. s, ok := t.getStream(f) if !ok { - if w := t.fc.onRead(uint32(size)); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } return } if size > 0 { - if f.Header().Flags.Has(http2.FlagDataPadded) { - if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } - } s.mu.Lock() if s.state == streamDone { s.mu.Unlock() - // The stream has been closed. Release the corresponding quota. - if w := t.fc.onRead(uint32(size)); w > 0 { - t.controlBuf.put(&windowUpdate{0, w}) - } return } if err := s.fc.onData(uint32(size)); err != nil { @@ -854,10 +896,10 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { } statusCode, ok := http2ErrConvTab[http2.ErrCode(f.ErrCode)] if !ok { - grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode) + warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode) statusCode = codes.Unknown } - s.finish(status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %d", f.ErrCode)) + s.finish(status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode)) s.mu.Unlock() s.write(recvMsg{err: io.EOF}) } @@ -876,7 +918,11 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) { } func (t *http2Client) handlePing(f *http2.PingFrame) { - if f.IsAck() { // Do nothing. + if f.IsAck() { + // Maybe it's a BDP ping. + if t.bdpEst != nil { + t.bdpEst.calculate(f.Data) + } return } pingAck := &ping{ack: true} @@ -885,35 +931,76 @@ func (t *http2Client) handlePing(f *http2.PingFrame) { } func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { + t.mu.Lock() + if t.state != reachable && t.state != draining { + t.mu.Unlock() + return + } if f.ErrCode == http2.ErrCodeEnhanceYourCalm { - grpclog.Printf("Client received GoAway with http2.ErrCodeEnhanceYourCalm.") + infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.") } - t.mu.Lock() - if t.state == reachable || t.state == draining { - if f.LastStreamID > 0 && f.LastStreamID%2 != 1 { - t.mu.Unlock() - t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID)) - return - } - select { - case <-t.goAway: - id := t.goAwayID - // t.goAway has been closed (i.e.,multiple GoAways). - if id < f.LastStreamID { - t.mu.Unlock() - t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID)) - return - } - t.prevGoAwayID = id - t.goAwayID = f.LastStreamID + id := f.LastStreamID + if id > 0 && id%2 != 1 { + t.mu.Unlock() + t.Close() + return + } + // A client can receive multiple GoAways from server (look at https://github.com/grpc/grpc-go/issues/1387). + // The idea is that the first GoAway will be sent with an ID of MaxInt32 and the second GoAway will be sent after an RTT delay + // with the ID of the last stream the server will process. + // Therefore, when we get the first GoAway we don't really close any streams. While in case of second GoAway we + // close all streams created after the second GoAwayId. This way streams that were in-flight while the GoAway from server + // was being sent don't get killed. + select { + case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways). + // If there are multiple GoAways the first one should always have an ID greater than the following ones. + if id > t.prevGoAwayID { t.mu.Unlock() + t.Close() return - default: } - t.goAwayID = f.LastStreamID + default: + t.setGoAwayReason(f) close(t.goAway) + t.state = draining } + // All streams with IDs greater than the GoAwayId + // and smaller than the previous GoAway ID should be killed. + upperLimit := t.prevGoAwayID + if upperLimit == 0 { // This is the first GoAway Frame. + upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID. + } + for streamID, stream := range t.activeStreams { + if streamID > id && streamID <= upperLimit { + close(stream.goAway) + } + } + t.prevGoAwayID = id + active := len(t.activeStreams) t.mu.Unlock() + if active == 0 { + t.Close() + } +} + +// setGoAwayReason sets the value of t.goAwayReason based +// on the GoAway frame received. +// It expects a lock on transport's mutext to be held by +// the caller. +func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) { + t.goAwayReason = NoReason + switch f.ErrCode { + case http2.ErrCodeEnhanceYourCalm: + if string(f.DebugData()) == "too_many_pings" { + t.goAwayReason = TooManyPings + } + } +} + +func (t *http2Client) GetGoAwayReason() GoAwayReason { + t.mu.Lock() + defer t.mu.Unlock() + return t.goAwayReason } func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) { @@ -934,19 +1021,20 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { if !ok { return } + s.mu.Lock() + s.bytesReceived = true + s.mu.Unlock() var state decodeState - for _, hf := range frame.Fields { - if err := state.processHeaderField(hf); err != nil { - s.mu.Lock() - if !s.headerDone { - close(s.headerChan) - s.headerDone = true - } - s.mu.Unlock() - s.write(recvMsg{err: err}) - // Something wrong. Stops reading even when there is remaining. - return + if err := state.decodeResponseHeader(frame); err != nil { + s.mu.Lock() + if !s.headerDone { + close(s.headerChan) + s.headerDone = true } + s.mu.Unlock() + s.write(recvMsg{err: err}) + // Something wrong. Stops reading even when there is remaining. + return } endStream := frame.StreamEnded() @@ -958,13 +1046,13 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { Client: true, WireLength: int(frame.Header().Length), } - t.statsHandler.HandleRPC(s.clientStatsCtx, inHeader) + t.statsHandler.HandleRPC(s.ctx, inHeader) } else { inTrailer := &stats.InTrailer{ Client: true, WireLength: int(frame.Header().Length), } - t.statsHandler.HandleRPC(s.clientStatsCtx, inTrailer) + t.statsHandler.HandleRPC(s.ctx, inTrailer) } } }() @@ -1012,22 +1100,22 @@ func handleMalformedHTTP2(s *Stream, err error) { // TODO(zhaoq): Check the validity of the incoming frame sequence. func (t *http2Client) reader() { // Check the validity of server preface. - frame, err := t.framer.readFrame() + frame, err := t.framer.fr.ReadFrame() if err != nil { - t.notifyError(err) + t.Close() return } atomic.CompareAndSwapUint32(&t.activity, 0, 1) sf, ok := frame.(*http2.SettingsFrame) if !ok { - t.notifyError(err) + t.Close() return } t.handleSettings(sf) // loop to keep reading incoming messages on this transport. for { - frame, err := t.framer.readFrame() + frame, err := t.framer.fr.ReadFrame() atomic.CompareAndSwapUint32(&t.activity, 0, 1) if err != nil { // Abort an active stream if the http2.Framer returns a @@ -1039,12 +1127,12 @@ func (t *http2Client) reader() { t.mu.Unlock() if s != nil { // use error detail to provide better err message - handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail())) + handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.fr.ErrorDetail())) } continue } else { // Transport error. - t.notifyError(err) + t.Close() return } } @@ -1064,7 +1152,7 @@ func (t *http2Client) reader() { case *http2.WindowUpdateFrame: t.handleWindowUpdate(frame) default: - grpclog.Printf("transport: http2Client.reader got unhandled frame type %v.", frame) + errorf("transport: http2Client.reader got unhandled frame type %v.", frame) } } } @@ -1088,7 +1176,7 @@ func (t *http2Client) applySettings(ss []http2.Setting) { t.mu.Lock() for _, stream := range t.activeStreams { // Adjust the sending quota for each stream. - stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota)) + stream.sendQuotaPool.addAndUpdate(int(s.Val) - int(t.streamSendQuota)) } t.streamSendQuota = s.Val t.mu.Unlock() @@ -1096,49 +1184,78 @@ func (t *http2Client) applySettings(ss []http2.Setting) { } } -// controller running in a separate goroutine takes charge of sending control -// frames (e.g., window update, reset stream, setting, etc.) to the server. -func (t *http2Client) controller() { - for { - select { - case i := <-t.controlBuf.get(): - t.controlBuf.load() - select { - case <-t.writableChan: - switch i := i.(type) { - case *windowUpdate: - t.framer.writeWindowUpdate(true, i.streamID, i.increment) - case *settings: - if i.ack { - t.framer.writeSettingsAck(true) - t.applySettings(i.ss) - } else { - t.framer.writeSettings(true, i.ss...) - } - case *resetStream: - // If the server needs to be to intimated about stream closing, - // then we need to make sure the RST_STREAM frame is written to - // the wire before the headers of the next stream waiting on - // streamQuota. We ensure this by adding to the streamsQuota pool - // only after having acquired the writableChan to send RST_STREAM. - t.streamsQuota.add(1) - t.framer.writeRSTStream(true, i.streamID, i.code) - case *flushIO: - t.framer.flushWrite() - case *ping: - t.framer.writePing(true, i.ack, i.data) - default: - grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i) - } - t.writableChan <- 0 - continue - case <-t.shutdownChan: - return +// TODO(mmukhi): A lot of this code(and code in other places in the tranpsort layer) +// is duplicated between the client and the server. +// The transport layer needs to be refactored to take care of this. +func (t *http2Client) itemHandler(i item) error { + var err error + switch i := i.(type) { + case *dataFrame: + err = t.framer.fr.WriteData(i.streamID, i.endStream, i.d) + if err == nil { + i.f() + } + case *headerFrame: + t.hBuf.Reset() + for _, f := range i.hf { + t.hEnc.WriteField(f) + } + endHeaders := false + first := true + for !endHeaders { + size := t.hBuf.Len() + if size > http2MaxFrameLen { + size = http2MaxFrameLen + } else { + endHeaders = true } - case <-t.shutdownChan: - return + if first { + first = false + err = t.framer.fr.WriteHeaders(http2.HeadersFrameParam{ + StreamID: i.streamID, + BlockFragment: t.hBuf.Next(size), + EndStream: i.endStream, + EndHeaders: endHeaders, + }) + } else { + err = t.framer.fr.WriteContinuation( + i.streamID, + endHeaders, + t.hBuf.Next(size), + ) + } + if err != nil { + return err + } + } + case *windowUpdate: + err = t.framer.fr.WriteWindowUpdate(i.streamID, i.increment) + case *settings: + if i.ack { + t.applySettings(i.ss) + err = t.framer.fr.WriteSettingsAck() + } else { + err = t.framer.fr.WriteSettings(i.ss...) + } + case *resetStream: + // If the server needs to be to intimated about stream closing, + // then we need to make sure the RST_STREAM frame is written to + // the wire before the headers of the next stream waiting on + // streamQuota. We ensure this by adding to the streamsQuota pool + // only after having acquired the writableChan to send RST_STREAM. + err = t.framer.fr.WriteRSTStream(i.streamID, i.code) + t.streamsQuota.add(1) + case *flushIO: + err = t.framer.writer.Flush() + case *ping: + if !i.ack { + t.bdpEst.timesnap(i.data) } + err = t.framer.fr.WritePing(i.ack, i.data) + default: + errorf("transport: http2Client.controller got unexpected item type %v", i) } + return err } // keepalive running in a separate goroutune makes sure the connection is alive by sending pings. @@ -1162,7 +1279,7 @@ func (t *http2Client) keepalive() { case <-t.awakenKeepalive: // If the control gets here a ping has been sent // need to reset the timer with keepalive.Timeout. - case <-t.shutdownChan: + case <-t.ctx.Done(): return } } else { @@ -1181,13 +1298,13 @@ func (t *http2Client) keepalive() { } t.Close() return - case <-t.shutdownChan: + case <-t.ctx.Done(): if !timer.Stop() { <-timer.C } return } - case <-t.shutdownChan: + case <-t.ctx.Done(): if !timer.Stop() { <-timer.C } @@ -1197,25 +1314,9 @@ func (t *http2Client) keepalive() { } func (t *http2Client) Error() <-chan struct{} { - return t.errorChan + return t.ctx.Done() } func (t *http2Client) GoAway() <-chan struct{} { return t.goAway } - -func (t *http2Client) notifyError(err error) { - t.mu.Lock() - // make sure t.errorChan is closed only once. - if t.state == draining { - t.mu.Unlock() - t.Close() - return - } - if t.state == reachable { - t.state = unreachable - close(t.errorChan) - grpclog.Printf("transport: http2Client.notifyError got notified that the client transport was broken %v.", err) - } - t.mu.Unlock() -} -- cgit v1.2.3