aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/transport
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/transport')
-rw-r--r--vendor/google.golang.org/grpc/transport/control.go5
-rw-r--r--vendor/google.golang.org/grpc/transport/go16.go46
-rw-r--r--vendor/google.golang.org/grpc/transport/go17.go46
-rw-r--r--vendor/google.golang.org/grpc/transport/handler_server.go10
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_client.go237
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_server.go76
-rw-r--r--vendor/google.golang.org/grpc/transport/http_util.go85
-rw-r--r--vendor/google.golang.org/grpc/transport/pre_go16.go51
-rw-r--r--vendor/google.golang.org/grpc/transport/transport.go93
9 files changed, 536 insertions, 113 deletions
diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/control.go
index 7e9bdf3..4ef0830 100644
--- a/vendor/google.golang.org/grpc/transport/control.go
+++ b/vendor/google.golang.org/grpc/transport/control.go
@@ -72,6 +72,11 @@ type resetStream struct {
func (*resetStream) item() {}
+type goAway struct {
+}
+
+func (*goAway) item() {}
+
type flushIO struct {
}
diff --git a/vendor/google.golang.org/grpc/transport/go16.go b/vendor/google.golang.org/grpc/transport/go16.go
new file mode 100644
index 0000000..ee1c46b
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/go16.go
@@ -0,0 +1,46 @@
+// +build go1.6,!go1.7
+
+/*
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package transport
+
+import (
+ "net"
+
+ "golang.org/x/net/context"
+)
+
+// dialContext connects to the address on the named network.
+func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
+ return (&net.Dialer{Cancel: ctx.Done()}).Dial(network, address)
+}
diff --git a/vendor/google.golang.org/grpc/transport/go17.go b/vendor/google.golang.org/grpc/transport/go17.go
new file mode 100644
index 0000000..356f13f
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/go17.go
@@ -0,0 +1,46 @@
+// +build go1.7
+
+/*
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package transport
+
+import (
+ "net"
+
+ "golang.org/x/net/context"
+)
+
+// dialContext connects to the address on the named network.
+func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
+ return (&net.Dialer{}).DialContext(ctx, network, address)
+}
diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/transport/handler_server.go
index 4b0d525..f23b2da 100644
--- a/vendor/google.golang.org/grpc/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/transport/handler_server.go
@@ -83,7 +83,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTr
}
if v := r.Header.Get("grpc-timeout"); v != "" {
- to, err := timeoutDecode(v)
+ to, err := decodeTimeout(v)
if err != nil {
return nil, StreamErrorf(codes.Internal, "malformed time-out: %v", err)
}
@@ -194,7 +194,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, statusCode codes.Code,
h := ht.rw.Header()
h.Set("Grpc-Status", fmt.Sprintf("%d", statusCode))
if statusDesc != "" {
- h.Set("Grpc-Message", statusDesc)
+ h.Set("Grpc-Message", encodeGrpcMessage(statusDesc))
}
if md := s.Trailer(); len(md) > 0 {
for k, vv := range md {
@@ -370,6 +370,10 @@ func (ht *serverHandlerTransport) runStream() {
}
}
+func (ht *serverHandlerTransport) Drain() {
+ panic("Drain() is not implemented")
+}
+
// mapRecvMsgError returns the non-nil err into the appropriate
// error value as expected by callers of *grpc.parser.recvMsg.
// In particular, in can only be:
@@ -389,5 +393,5 @@ func mapRecvMsgError(err error) error {
}
}
}
- return ConnectionError{Desc: err.Error()}
+ return ConnectionErrorf(true, err, err.Error())
}
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go
index f66435f..afbba45 100644
--- a/vendor/google.golang.org/grpc/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/transport/http2_client.go
@@ -35,6 +35,7 @@ package transport
import (
"bytes"
+ "fmt"
"io"
"math"
"net"
@@ -71,6 +72,9 @@ type http2Client struct {
shutdownChan chan struct{}
// errorChan is closed to notify the I/O error to the caller.
errorChan chan struct{}
+ // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
+ // that the server sent GoAway on this transport.
+ goAway chan struct{}
framer *framer
hBuf *bytes.Buffer // the buffer for HPACK encoding
@@ -97,41 +101,73 @@ type http2Client struct {
maxStreams int
// the per-stream outbound flow control window size set by the peer.
streamSendQuota uint32
+ // goAwayID records the Last-Stream-ID in the GoAway frame from the server.
+ goAwayID uint32
+ // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
+ prevGoAwayID uint32
+}
+
+func dial(fn func(context.Context, string) (net.Conn, error), ctx context.Context, addr string) (net.Conn, error) {
+ if fn != nil {
+ return fn(ctx, addr)
+ }
+ return dialContext(ctx, "tcp", addr)
+}
+
+func isTemporary(err error) bool {
+ switch err {
+ case io.EOF:
+ // Connection closures may be resolved upon retry, and are thus
+ // treated as temporary.
+ return true
+ case context.DeadlineExceeded:
+ // In Go 1.7, context.DeadlineExceeded implements Timeout(), and this
+ // special case is not needed. Until then, we need to keep this
+ // clause.
+ return true
+ }
+
+ switch err := err.(type) {
+ case interface {
+ Temporary() bool
+ }:
+ return err.Temporary()
+ case interface {
+ Timeout() bool
+ }:
+ // Timeouts may be resolved upon retry, and are thus treated as
+ // temporary.
+ return err.Timeout()
+ }
+ return false
}
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
-func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err error) {
- if opts.Dialer == nil {
- // Set the default Dialer.
- opts.Dialer = func(addr string, timeout time.Duration) (net.Conn, error) {
- return net.DialTimeout("tcp", addr, timeout)
- }
- }
+func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ ClientTransport, err error) {
scheme := "http"
- startT := time.Now()
- timeout := opts.Timeout
- conn, connErr := opts.Dialer(addr, timeout)
- if connErr != nil {
- return nil, ConnectionErrorf("transport: %v", connErr)
+ conn, err := dial(opts.Dialer, ctx, addr)
+ if err != nil {
+ return nil, ConnectionErrorf(true, err, "transport: %v", err)
}
+ // Any further errors will close the underlying connection
+ defer func(conn net.Conn) {
+ if err != nil {
+ conn.Close()
+ }
+ }(conn)
var authInfo credentials.AuthInfo
- if opts.TransportCredentials != nil {
+ if creds := opts.TransportCredentials; creds != nil {
scheme = "https"
- if timeout > 0 {
- timeout -= time.Since(startT)
- }
- conn, authInfo, connErr = opts.TransportCredentials.ClientHandshake(addr, conn, timeout)
- }
- if connErr != nil {
- return nil, ConnectionErrorf("transport: %v", connErr)
- }
- defer func() {
+ conn, authInfo, err = creds.ClientHandshake(ctx, addr, conn)
if err != nil {
- conn.Close()
+ // Credentials handshake errors are typically considered permanent
+ // to avoid retrying on e.g. bad certificates.
+ temp := isTemporary(err)
+ return nil, ConnectionErrorf(temp, err, "transport: %v", err)
}
- }()
+ }
ua := primaryUA
if opts.UserAgent != "" {
ua = opts.UserAgent + " " + ua
@@ -147,6 +183,7 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
writableChan: make(chan int, 1),
shutdownChan: make(chan struct{}),
errorChan: make(chan struct{}),
+ goAway: make(chan struct{}),
framer: newFramer(conn),
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
@@ -168,11 +205,11 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
n, err := t.conn.Write(clientPreface)
if err != nil {
t.Close()
- return nil, ConnectionErrorf("transport: %v", err)
+ return nil, ConnectionErrorf(true, err, "transport: %v", err)
}
if n != len(clientPreface) {
t.Close()
- return nil, ConnectionErrorf("transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
+ return nil, ConnectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
}
if initialWindowSize != defaultWindowSize {
err = t.framer.writeSettings(true, http2.Setting{
@@ -184,13 +221,13 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
}
if err != nil {
t.Close()
- return nil, ConnectionErrorf("transport: %v", err)
+ return nil, ConnectionErrorf(true, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
t.Close()
- return nil, ConnectionErrorf("transport: %v", err)
+ return nil, ConnectionErrorf(true, err, "transport: %v", err)
}
}
go t.controller()
@@ -202,6 +239,8 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
s := &Stream{
id: t.nextID,
+ done: make(chan struct{}),
+ goAway: make(chan struct{}),
method: callHdr.Method,
sendCompress: callHdr.SendCompress,
buf: newRecvBuffer(),
@@ -216,8 +255,9 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// Make a stream be able to cancel the pending operations by itself.
s.ctx, s.cancel = context.WithCancel(ctx)
s.dec = &recvBufferReader{
- ctx: s.ctx,
- recv: s.buf,
+ ctx: s.ctx,
+ goAway: s.goAway,
+ recv: s.buf,
}
return s
}
@@ -271,6 +311,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.mu.Unlock()
return nil, ErrConnClosing
}
+ if t.state == draining {
+ t.mu.Unlock()
+ return nil, ErrStreamDrain
+ }
if t.state != reachable {
t.mu.Unlock()
return nil, ErrConnClosing
@@ -278,7 +322,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
checkStreamsQuota := t.streamsQuota != nil
t.mu.Unlock()
if checkStreamsQuota {
- sq, err := wait(ctx, t.shutdownChan, t.streamsQuota.acquire())
+ sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
if err != nil {
return nil, err
}
@@ -287,7 +331,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.streamsQuota.add(sq - 1)
}
}
- if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil {
+ if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
// Return the quota back now because there is no stream returned to the caller.
if _, ok := err.(StreamError); ok && checkStreamsQuota {
t.streamsQuota.add(1)
@@ -295,6 +339,15 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, err
}
t.mu.Lock()
+ if t.state == draining {
+ t.mu.Unlock()
+ if checkStreamsQuota {
+ t.streamsQuota.add(1)
+ }
+ // Need to make t writable again so that the rpc in flight can still proceed.
+ t.writableChan <- 0
+ return nil, ErrStreamDrain
+ }
if t.state != reachable {
t.mu.Unlock()
return nil, ErrConnClosing
@@ -329,7 +382,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
}
if timeout > 0 {
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: timeoutEncode(timeout)})
+ t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
}
for k, v := range authData {
// Capital header names are illegal in HTTP/2.
@@ -384,7 +437,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
if err != nil {
t.notifyError(err)
- return nil, ConnectionErrorf("transport: %v", err)
+ return nil, ConnectionErrorf(true, err, "transport: %v", err)
}
}
t.writableChan <- 0
@@ -403,22 +456,17 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
if t.streamsQuota != nil {
updateStreams = true
}
- if t.state == draining && len(t.activeStreams) == 1 {
+ delete(t.activeStreams, s.id)
+ if t.state == draining && len(t.activeStreams) == 0 {
// The transport is draining and s is the last live stream on t.
t.mu.Unlock()
t.Close()
return
}
- delete(t.activeStreams, s.id)
t.mu.Unlock()
if updateStreams {
t.streamsQuota.add(1)
}
- // In case stream sending and receiving are invoked in separate
- // goroutines (e.g., bi-directional streaming), the caller needs
- // to call cancel on the stream to interrupt the blocking on
- // other goroutines.
- s.cancel()
s.mu.Lock()
if q := s.fc.resetPendingData(); q > 0 {
if n := t.fc.onRead(q); n > 0 {
@@ -445,13 +493,13 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
// accessed any more.
func (t *http2Client) Close() (err error) {
t.mu.Lock()
- if t.state == reachable {
- close(t.errorChan)
- }
if t.state == closing {
t.mu.Unlock()
return
}
+ if t.state == reachable || t.state == draining {
+ close(t.errorChan)
+ }
t.state = closing
t.mu.Unlock()
close(t.shutdownChan)
@@ -475,10 +523,35 @@ func (t *http2Client) Close() (err error) {
func (t *http2Client) GracefulClose() error {
t.mu.Lock()
- if t.state == closing {
+ switch t.state {
+ case unreachable:
+ // The server may close the connection concurrently. t is not available for
+ // any streams. Close it now.
+ t.mu.Unlock()
+ t.Close()
+ return nil
+ case closing:
t.mu.Unlock()
return nil
}
+ // Notify the streams which were initiated after the server sent GOAWAY.
+ select {
+ case <-t.goAway:
+ n := t.prevGoAwayID
+ if n == 0 && t.nextID > 1 {
+ n = t.nextID - 2
+ }
+ m := t.goAwayID + 2
+ if m == 2 {
+ m = 1
+ }
+ for i := m; i <= n; i += 2 {
+ if s, ok := t.activeStreams[i]; ok {
+ close(s.goAway)
+ }
+ }
+ default:
+ }
if t.state == draining {
t.mu.Unlock()
return nil
@@ -504,15 +577,15 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
size := http2MaxFrameLen
s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
- sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire())
+ sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
- tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire())
+ tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
- if _, ok := err.(StreamError); ok {
+ if _, ok := err.(StreamError); ok || err == io.EOF {
t.sendQuotaPool.cancel()
}
return err
@@ -544,8 +617,8 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
// Indicate there is a writer who is about to write a data frame.
t.framer.adjustNumWriters(1)
// Got some quota. Try to acquire writing privilege on the transport.
- if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
- if _, ok := err.(StreamError); ok {
+ if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil {
+ if _, ok := err.(StreamError); ok || err == io.EOF {
// Return the connection quota back.
t.sendQuotaPool.add(len(p))
}
@@ -578,7 +651,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
// invoked.
if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
t.notifyError(err)
- return ConnectionErrorf("transport: %v", err)
+ return ConnectionErrorf(true, err, "transport: %v", err)
}
if t.framer.adjustNumWriters(-1) == 0 {
t.framer.flushWrite()
@@ -593,11 +666,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
}
s.mu.Lock()
if s.state != streamDone {
- if s.state == streamReadDone {
- s.state = streamDone
- } else {
- s.state = streamWriteDone
- }
+ s.state = streamWriteDone
}
s.mu.Unlock()
return nil
@@ -630,7 +699,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
func (t *http2Client) handleData(f *http2.DataFrame) {
size := len(f.Data())
if err := t.fc.onData(uint32(size)); err != nil {
- t.notifyError(ConnectionErrorf("%v", err))
+ t.notifyError(ConnectionErrorf(true, err, "%v", err))
return
}
// Select the right stream to dispatch.
@@ -655,6 +724,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
s.state = streamDone
s.statusCode = codes.Internal
s.statusDesc = err.Error()
+ close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
@@ -672,13 +742,14 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
// the read direction is closed, and set the status appropriately.
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
s.mu.Lock()
- if s.state == streamWriteDone {
- s.state = streamDone
- } else {
- s.state = streamReadDone
+ if s.state == streamDone {
+ s.mu.Unlock()
+ return
}
+ s.state = streamDone
s.statusCode = codes.Internal
s.statusDesc = "server closed the stream without sending trailers"
+ close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
}
@@ -704,6 +775,8 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode)
s.statusCode = codes.Unknown
}
+ s.statusDesc = fmt.Sprintf("stream terminated by RST_STREAM with error code: %d", f.ErrCode)
+ close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
}
@@ -728,7 +801,32 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
}
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
- // TODO(zhaoq): GoAwayFrame handler to be implemented
+ t.mu.Lock()
+ if t.state == reachable || t.state == draining {
+ if f.LastStreamID > 0 && f.LastStreamID%2 != 1 {
+ t.mu.Unlock()
+ t.notifyError(ConnectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
+ return
+ }
+ select {
+ case <-t.goAway:
+ id := t.goAwayID
+ // t.goAway has been closed (i.e.,multiple GoAways).
+ if id < f.LastStreamID {
+ t.mu.Unlock()
+ t.notifyError(ConnectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
+ return
+ }
+ t.prevGoAwayID = id
+ t.goAwayID = f.LastStreamID
+ t.mu.Unlock()
+ return
+ default:
+ }
+ t.goAwayID = f.LastStreamID
+ close(t.goAway)
+ }
+ t.mu.Unlock()
}
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
@@ -780,11 +878,11 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
if len(state.mdata) > 0 {
s.trailer = state.mdata
}
- s.state = streamDone
s.statusCode = state.statusCode
s.statusDesc = state.statusDesc
+ close(s.done)
+ s.state = streamDone
s.mu.Unlock()
-
s.write(recvMsg{err: io.EOF})
}
@@ -937,13 +1035,22 @@ func (t *http2Client) Error() <-chan struct{} {
return t.errorChan
}
+func (t *http2Client) GoAway() <-chan struct{} {
+ return t.goAway
+}
+
func (t *http2Client) notifyError(err error) {
t.mu.Lock()
- defer t.mu.Unlock()
// make sure t.errorChan is closed only once.
+ if t.state == draining {
+ t.mu.Unlock()
+ t.Close()
+ return
+ }
if t.state == reachable {
t.state = unreachable
close(t.errorChan)
grpclog.Printf("transport: http2Client.notifyError got notified that the client transport was broken %v.", err)
}
+ t.mu.Unlock()
}
diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go
index cee1542..16010d5 100644
--- a/vendor/google.golang.org/grpc/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/transport/http2_server.go
@@ -111,12 +111,12 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
Val: uint32(initialWindowSize)})
}
if err := framer.writeSettings(true, settings...); err != nil {
- return nil, ConnectionErrorf("transport: %v", err)
+ return nil, ConnectionErrorf(true, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
- return nil, ConnectionErrorf("transport: %v", err)
+ return nil, ConnectionErrorf(true, err, "transport: %v", err)
}
}
var buf bytes.Buffer
@@ -142,7 +142,7 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
}
// operateHeader takes action on the decoded headers.
-func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) {
+func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
buf := newRecvBuffer()
s := &Stream{
id: frame.Header().StreamID,
@@ -205,6 +205,13 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
return
}
+ if s.id%2 != 1 || s.id <= t.maxStreamID {
+ t.mu.Unlock()
+ // illegal gRPC stream id.
+ grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", s.id)
+ return true
+ }
+ t.maxStreamID = s.id
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
t.activeStreams[s.id] = s
t.mu.Unlock()
@@ -212,6 +219,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.updateWindow(s, uint32(n))
}
handle(s)
+ return
}
// HandleStreams receives incoming streams using the given handler. This is
@@ -231,6 +239,10 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
}
frame, err := t.framer.readFrame()
+ if err == io.EOF || err == io.ErrUnexpectedEOF {
+ t.Close()
+ return
+ }
if err != nil {
grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
t.Close()
@@ -257,20 +269,20 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
t.controlBuf.put(&resetStream{se.StreamID, se.Code})
continue
}
+ if err == io.EOF || err == io.ErrUnexpectedEOF {
+ t.Close()
+ return
+ }
+ grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
t.Close()
return
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
- id := frame.Header().StreamID
- if id%2 != 1 || id <= t.maxStreamID {
- // illegal gRPC stream id.
- grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", id)
+ if t.operateHeaders(frame, handle) {
t.Close()
break
}
- t.maxStreamID = id
- t.operateHeaders(frame, handle)
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
@@ -282,7 +294,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
case *http2.GoAwayFrame:
- break
+ // TODO: Handle GoAway from the client appropriately.
default:
grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
}
@@ -364,11 +376,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
// Received the end of stream from the client.
s.mu.Lock()
if s.state != streamDone {
- if s.state == streamWriteDone {
- s.state = streamDone
- } else {
- s.state = streamReadDone
- }
+ s.state = streamReadDone
}
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
@@ -440,7 +448,7 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e
}
if err != nil {
t.Close()
- return ConnectionErrorf("transport: %v", err)
+ return ConnectionErrorf(true, err, "transport: %v", err)
}
}
return nil
@@ -455,7 +463,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
}
s.headerOk = true
s.mu.Unlock()
- if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
+ if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
}
t.hBuf.Reset()
@@ -495,7 +503,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
headersSent = true
}
s.mu.Unlock()
- if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
+ if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
}
t.hBuf.Reset()
@@ -508,7 +516,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
Name: "grpc-status",
Value: strconv.Itoa(int(statusCode)),
})
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: statusDesc})
+ t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(statusDesc)})
// Attach the trailer metadata.
for k, v := range s.trailer {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
@@ -544,7 +552,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
}
s.mu.Unlock()
if writeHeaderFrame {
- if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
+ if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
}
t.hBuf.Reset()
@@ -560,7 +568,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
}
if err := t.framer.writeHeaders(false, p); err != nil {
t.Close()
- return ConnectionErrorf("transport: %v", err)
+ return ConnectionErrorf(true, err, "transport: %v", err)
}
t.writableChan <- 0
}
@@ -572,13 +580,13 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
size := http2MaxFrameLen
s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
- sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire())
+ sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
- tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire())
+ tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
if _, ok := err.(StreamError); ok {
t.sendQuotaPool.cancel()
@@ -604,7 +612,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
t.framer.adjustNumWriters(1)
// Got some quota. Try to acquire writing privilege on the
// transport.
- if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
+ if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
if _, ok := err.(StreamError); ok {
// Return the connection quota back.
t.sendQuotaPool.add(ps)
@@ -634,7 +642,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
}
if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
t.Close()
- return ConnectionErrorf("transport: %v", err)
+ return ConnectionErrorf(true, err, "transport: %v", err)
}
if t.framer.adjustNumWriters(-1) == 0 {
t.framer.flushWrite()
@@ -679,6 +687,17 @@ func (t *http2Server) controller() {
}
case *resetStream:
t.framer.writeRSTStream(true, i.streamID, i.code)
+ case *goAway:
+ t.mu.Lock()
+ if t.state == closing {
+ t.mu.Unlock()
+ // The transport is closing.
+ return
+ }
+ sid := t.maxStreamID
+ t.state = draining
+ t.mu.Unlock()
+ t.framer.writeGoAway(true, sid, http2.ErrCodeNo, nil)
case *flushIO:
t.framer.flushWrite()
case *ping:
@@ -724,6 +743,9 @@ func (t *http2Server) Close() (err error) {
func (t *http2Server) closeStream(s *Stream) {
t.mu.Lock()
delete(t.activeStreams, s.id)
+ if t.state == draining && len(t.activeStreams) == 0 {
+ defer t.Close()
+ }
t.mu.Unlock()
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
@@ -746,3 +768,7 @@ func (t *http2Server) closeStream(s *Stream) {
func (t *http2Server) RemoteAddr() net.Addr {
return t.conn.RemoteAddr()
}
+
+func (t *http2Server) Drain() {
+ t.controlBuf.put(&goAway{})
+}
diff --git a/vendor/google.golang.org/grpc/transport/http_util.go b/vendor/google.golang.org/grpc/transport/http_util.go
index f2e23dc..79da512 100644
--- a/vendor/google.golang.org/grpc/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/transport/http_util.go
@@ -35,6 +35,7 @@ package transport
import (
"bufio"
+ "bytes"
"fmt"
"io"
"net"
@@ -52,7 +53,7 @@ import (
const (
// The primary user agent
- primaryUA = "grpc-go/0.11"
+ primaryUA = "grpc-go/1.0"
// http2MaxFrameLen specifies the max length of a HTTP2 frame.
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues
@@ -174,11 +175,11 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
}
d.statusCode = codes.Code(code)
case "grpc-message":
- d.statusDesc = f.Value
+ d.statusDesc = decodeGrpcMessage(f.Value)
case "grpc-timeout":
d.timeoutSet = true
var err error
- d.timeout, err = timeoutDecode(f.Value)
+ d.timeout, err = decodeTimeout(f.Value)
if err != nil {
d.setErr(StreamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
return
@@ -251,7 +252,7 @@ func div(d, r time.Duration) int64 {
}
// TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it.
-func timeoutEncode(t time.Duration) string {
+func encodeTimeout(t time.Duration) string {
if d := div(t, time.Nanosecond); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "n"
}
@@ -271,7 +272,7 @@ func timeoutEncode(t time.Duration) string {
return strconv.FormatInt(div(t, time.Hour), 10) + "H"
}
-func timeoutDecode(s string) (time.Duration, error) {
+func decodeTimeout(s string) (time.Duration, error) {
size := len(s)
if size < 2 {
return 0, fmt.Errorf("transport: timeout string is too short: %q", s)
@@ -288,6 +289,80 @@ func timeoutDecode(s string) (time.Duration, error) {
return d * time.Duration(t), nil
}
+const (
+ spaceByte = ' '
+ tildaByte = '~'
+ percentByte = '%'
+)
+
+// encodeGrpcMessage is used to encode status code in header field
+// "grpc-message".
+// It checks to see if each individual byte in msg is an
+// allowable byte, and then either percent encoding or passing it through.
+// When percent encoding, the byte is converted into hexadecimal notation
+// with a '%' prepended.
+func encodeGrpcMessage(msg string) string {
+ if msg == "" {
+ return ""
+ }
+ lenMsg := len(msg)
+ for i := 0; i < lenMsg; i++ {
+ c := msg[i]
+ if !(c >= spaceByte && c < tildaByte && c != percentByte) {
+ return encodeGrpcMessageUnchecked(msg)
+ }
+ }
+ return msg
+}
+
+func encodeGrpcMessageUnchecked(msg string) string {
+ var buf bytes.Buffer
+ lenMsg := len(msg)
+ for i := 0; i < lenMsg; i++ {
+ c := msg[i]
+ if c >= spaceByte && c < tildaByte && c != percentByte {
+ buf.WriteByte(c)
+ } else {
+ buf.WriteString(fmt.Sprintf("%%%02X", c))
+ }
+ }
+ return buf.String()
+}
+
+// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
+func decodeGrpcMessage(msg string) string {
+ if msg == "" {
+ return ""
+ }
+ lenMsg := len(msg)
+ for i := 0; i < lenMsg; i++ {
+ if msg[i] == percentByte && i+2 < lenMsg {
+ return decodeGrpcMessageUnchecked(msg)
+ }
+ }
+ return msg
+}
+
+func decodeGrpcMessageUnchecked(msg string) string {
+ var buf bytes.Buffer
+ lenMsg := len(msg)
+ for i := 0; i < lenMsg; i++ {
+ c := msg[i]
+ if c == percentByte && i+2 < lenMsg {
+ parsed, err := strconv.ParseInt(msg[i+1:i+3], 16, 8)
+ if err != nil {
+ buf.WriteByte(c)
+ } else {
+ buf.WriteByte(byte(parsed))
+ i += 2
+ }
+ } else {
+ buf.WriteByte(c)
+ }
+ }
+ return buf.String()
+}
+
type framer struct {
numWriters int32
reader io.Reader
diff --git a/vendor/google.golang.org/grpc/transport/pre_go16.go b/vendor/google.golang.org/grpc/transport/pre_go16.go
new file mode 100644
index 0000000..33d91c1
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/pre_go16.go
@@ -0,0 +1,51 @@
+// +build !go1.6
+
+/*
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package transport
+
+import (
+ "net"
+ "time"
+
+ "golang.org/x/net/context"
+)
+
+// dialContext connects to the address on the named network.
+func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
+ var dialer net.Dialer
+ if deadline, ok := ctx.Deadline(); ok {
+ dialer.Timeout = deadline.Sub(time.Now())
+ }
+ return dialer.Dial(network, address)
+}
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go
index d4c220a..b31769a 100644
--- a/vendor/google.golang.org/grpc/transport/transport.go
+++ b/vendor/google.golang.org/grpc/transport/transport.go
@@ -44,7 +44,6 @@ import (
"io"
"net"
"sync"
- "time"
"golang.org/x/net/context"
"golang.org/x/net/trace"
@@ -120,10 +119,11 @@ func (b *recvBuffer) get() <-chan item {
// recvBufferReader implements io.Reader interface to read the data from
// recvBuffer.
type recvBufferReader struct {
- ctx context.Context
- recv *recvBuffer
- last *bytes.Reader // Stores the remaining data in the previous calls.
- err error
+ ctx context.Context
+ goAway chan struct{}
+ recv *recvBuffer
+ last *bytes.Reader // Stores the remaining data in the previous calls.
+ err error
}
// Read reads the next len(p) bytes from last. If last is drained, it tries to
@@ -141,6 +141,8 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) {
select {
case <-r.ctx.Done():
return 0, ContextErr(r.ctx.Err())
+ case <-r.goAway:
+ return 0, ErrStreamDrain
case i := <-r.recv.get():
r.recv.load()
m := i.(*recvMsg)
@@ -158,7 +160,7 @@ const (
streamActive streamState = iota
streamWriteDone // EndStream sent
streamReadDone // EndStream received
- streamDone // sendDone and recvDone or RSTStreamFrame is sent or received.
+ streamDone // the entire stream is finished.
)
// Stream represents an RPC in the transport layer.
@@ -169,6 +171,10 @@ type Stream struct {
// ctx is the associated context of the stream.
ctx context.Context
cancel context.CancelFunc
+ // done is closed when the final status arrives.
+ done chan struct{}
+ // goAway is closed when the server sent GoAways signal before this stream was initiated.
+ goAway chan struct{}
// method records the associated RPC method of the stream.
method string
recvCompress string
@@ -214,6 +220,18 @@ func (s *Stream) SetSendCompress(str string) {
s.sendCompress = str
}
+// Done returns a chanel which is closed when it receives the final status
+// from the server.
+func (s *Stream) Done() <-chan struct{} {
+ return s.done
+}
+
+// GoAway returns a channel which is closed when the server sent GoAways signal
+// before this stream was initiated.
+func (s *Stream) GoAway() <-chan struct{} {
+ return s.goAway
+}
+
// Header acquires the key-value pairs of header metadata once it
// is available. It blocks until i) the metadata is ready or ii) there is no
// header metadata or iii) the stream is cancelled/expired.
@@ -221,6 +239,8 @@ func (s *Stream) Header() (metadata.MD, error) {
select {
case <-s.ctx.Done():
return nil, ContextErr(s.ctx.Err())
+ case <-s.goAway:
+ return nil, ErrStreamDrain
case <-s.headerChan:
return s.header.Copy(), nil
}
@@ -335,19 +355,17 @@ type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
// Dialer specifies how to dial a network address.
- Dialer func(string, time.Duration) (net.Conn, error)
+ Dialer func(context.Context, string) (net.Conn, error)
// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials
- // Timeout specifies the timeout for dialing a ClientTransport.
- Timeout time.Duration
}
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
-func NewClientTransport(target string, opts *ConnectOptions) (ClientTransport, error) {
- return newHTTP2Client(target, opts)
+func NewClientTransport(ctx context.Context, target string, opts ConnectOptions) (ClientTransport, error) {
+ return newHTTP2Client(ctx, target, opts)
}
// Options provides additional hints and information for message
@@ -417,6 +435,11 @@ type ClientTransport interface {
// and create a new one) in error case. It should not return nil
// once the transport is initiated.
Error() <-chan struct{}
+
+ // GoAway returns a channel that is closed when ClientTranspor
+ // receives the draining signal from the server (e.g., GOAWAY frame in
+ // HTTP/2).
+ GoAway() <-chan struct{}
}
// ServerTransport is the common interface for all gRPC server-side transport
@@ -448,6 +471,9 @@ type ServerTransport interface {
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
+
+ // Drain notifies the client this ServerTransport stops accepting new RPCs.
+ Drain()
}
// StreamErrorf creates an StreamError with the specified error code and description.
@@ -459,9 +485,11 @@ func StreamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
}
// ConnectionErrorf creates an ConnectionError with the specified error description.
-func ConnectionErrorf(format string, a ...interface{}) ConnectionError {
+func ConnectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
return ConnectionError{
Desc: fmt.Sprintf(format, a...),
+ temp: temp,
+ err: e,
}
}
@@ -469,14 +497,36 @@ func ConnectionErrorf(format string, a ...interface{}) ConnectionError {
// entire connection and the retry of all the active streams.
type ConnectionError struct {
Desc string
+ temp bool
+ err error
}
func (e ConnectionError) Error() string {
return fmt.Sprintf("connection error: desc = %q", e.Desc)
}
-// ErrConnClosing indicates that the transport is closing.
-var ErrConnClosing = ConnectionError{Desc: "transport is closing"}
+// Temporary indicates if this connection error is temporary or fatal.
+func (e ConnectionError) Temporary() bool {
+ return e.temp
+}
+
+// Origin returns the original error of this connection error.
+func (e ConnectionError) Origin() error {
+ // Never return nil error here.
+ // If the original error is nil, return itself.
+ if e.err == nil {
+ return e
+ }
+ return e.err
+}
+
+var (
+ // ErrConnClosing indicates that the transport is closing.
+ ErrConnClosing = ConnectionErrorf(true, nil, "transport is closing")
+ // ErrStreamDrain indicates that the stream is rejected by the server because
+ // the server stops accepting new RPCs.
+ ErrStreamDrain = StreamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
+)
// StreamError is an error that only affects one stream within a connection.
type StreamError struct {
@@ -501,12 +551,25 @@ func ContextErr(err error) StreamError {
// wait blocks until it can receive from ctx.Done, closing, or proceed.
// If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err.
+// If it receives from done, it returns 0, io.EOF if ctx is not done; otherwise
+// it return the StreamError for ctx.Err.
+// If it receives from goAway, it returns 0, ErrStreamDrain.
// If it receives from closing, it returns 0, ErrConnClosing.
// If it receives from proceed, it returns the received integer, nil.
-func wait(ctx context.Context, closing <-chan struct{}, proceed <-chan int) (int, error) {
+func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-chan int) (int, error) {
select {
case <-ctx.Done():
return 0, ContextErr(ctx.Err())
+ case <-done:
+ // User cancellation has precedence.
+ select {
+ case <-ctx.Done():
+ return 0, ContextErr(ctx.Err())
+ default:
+ }
+ return 0, io.EOF
+ case <-goAway:
+ return 0, ErrStreamDrain
case <-closing:
return 0, ErrConnClosing
case i := <-proceed: