aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/call.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/call.go')
-rw-r--r--vendor/google.golang.org/grpc/call.go29
1 files changed, 21 insertions, 8 deletions
diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go
index 84ac178..fea0799 100644
--- a/vendor/google.golang.org/grpc/call.go
+++ b/vendor/google.golang.org/grpc/call.go
@@ -36,6 +36,7 @@ package grpc
import (
"bytes"
"io"
+ "math"
"time"
"golang.org/x/net/context"
@@ -51,13 +52,20 @@ import (
func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) error {
// Try to acquire header metadata from the server if there is any.
var err error
+ defer func() {
+ if err != nil {
+ if _, ok := err.(transport.ConnectionError); !ok {
+ t.CloseStream(stream, err)
+ }
+ }
+ }()
c.headerMD, err = stream.Header()
if err != nil {
return err
}
p := &parser{r: stream}
for {
- if err = recv(p, dopts.codec, stream, dopts.dc, reply); err != nil {
+ if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32); err != nil {
if err == io.EOF {
break
}
@@ -76,6 +84,7 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
}
defer func() {
if err != nil {
+ // If err is connection error, t will be closed, no need to close stream here.
if _, ok := err.(transport.ConnectionError); !ok {
t.CloseStream(stream, err)
}
@@ -90,7 +99,10 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err)
}
err = t.Write(stream, outBuf, opts)
- if err != nil {
+ // t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
+ // does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
+ // recvResponse to get the final status.
+ if err != nil && err != io.EOF {
return nil, err
}
// Sent successfully.
@@ -158,9 +170,9 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if _, ok := err.(*rpcError); ok {
return err
}
- if err == errConnClosing {
+ if err == errConnClosing || err == errConnUnavailable {
if c.failFast {
- return Errorf(codes.Unavailable, "%v", errConnClosing)
+ return Errorf(codes.Unavailable, "%v", err)
}
continue
}
@@ -176,7 +188,10 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
put()
put = nil
}
- if _, ok := err.(transport.ConnectionError); ok {
+ // Retry a non-failfast RPC when
+ // i) there is a connection error; or
+ // ii) the server started to drain before this RPC was initiated.
+ if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
}
@@ -184,20 +199,18 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
return toRPCErr(err)
}
- // Receive the response
err = recvResponse(cc.dopts, t, &c, stream, reply)
if err != nil {
if put != nil {
put()
put = nil
}
- if _, ok := err.(transport.ConnectionError); ok {
+ if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
}
continue
}
- t.CloseStream(stream, err)
return toRPCErr(err)
}
if c.traceInfo.tr != nil {