aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go70
1 files changed, 54 insertions, 16 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index de125d5..7a3bef5 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -79,7 +79,7 @@ type Stream interface {
RecvMsg(m interface{}) error
}
-// ClientStream defines the interface a client stream has to satify.
+// ClientStream defines the interface a client stream has to satisfy.
type ClientStream interface {
// Header returns the header metadata received from the server if there
// is any. It blocks if the metadata is not ready to read.
@@ -102,16 +102,15 @@ type ClientStream interface {
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
var (
t transport.ClientTransport
+ s *transport.Stream
err error
put func()
)
- // TODO(zhaoq): CallOption is omitted. Add support when it is needed.
- gopts := BalancerGetOptions{
- BlockingWait: false,
- }
- t, put, err = cc.getTransport(ctx, gopts)
- if err != nil {
- return nil, toRPCErr(err)
+ c := defaultCallInfo
+ for _, o := range opts {
+ if err := o.before(&c); err != nil {
+ return nil, toRPCErr(err)
+ }
}
callHdr := &transport.CallHdr{
Host: cc.authority,
@@ -122,8 +121,9 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
callHdr.SendCompress = cc.dopts.cp.Type()
}
cs := &clientStream{
+ opts: opts,
+ c: c,
desc: desc,
- put: put,
codec: cc.dopts.codec,
cp: cc.dopts.cp,
dc: cc.dopts.dc,
@@ -142,11 +142,44 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
cs.trInfo.tr.LazyLog(&cs.trInfo.firstLine, false)
ctx = trace.NewContext(ctx, cs.trInfo.tr)
}
- s, err := t.NewStream(ctx, callHdr)
- if err != nil {
- cs.finish(err)
- return nil, toRPCErr(err)
+ gopts := BalancerGetOptions{
+ BlockingWait: !c.failFast,
}
+ for {
+ t, put, err = cc.getTransport(ctx, gopts)
+ if err != nil {
+ // TODO(zhaoq): Probably revisit the error handling.
+ if _, ok := err.(*rpcError); ok {
+ return nil, err
+ }
+ if err == errConnClosing {
+ if c.failFast {
+ return nil, Errorf(codes.Unavailable, "%v", errConnClosing)
+ }
+ continue
+ }
+ // All the other errors are treated as Internal errors.
+ return nil, Errorf(codes.Internal, "%v", err)
+ }
+
+ s, err = t.NewStream(ctx, callHdr)
+ if err != nil {
+ if put != nil {
+ put()
+ put = nil
+ }
+ if _, ok := err.(transport.ConnectionError); ok {
+ if c.failFast {
+ cs.finish(err)
+ return nil, toRPCErr(err)
+ }
+ continue
+ }
+ return nil, toRPCErr(err)
+ }
+ break
+ }
+ cs.put = put
cs.t = t
cs.s = s
cs.p = &parser{r: s}
@@ -167,6 +200,8 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
// clientStream implements a client side Stream.
type clientStream struct {
+ opts []CallOption
+ c callInfo
t transport.ClientTransport
s *transport.Stream
p *parser
@@ -312,15 +347,18 @@ func (cs *clientStream) closeTransportStream(err error) {
}
func (cs *clientStream) finish(err error) {
- if !cs.tracing {
- return
- }
cs.mu.Lock()
defer cs.mu.Unlock()
+ for _, o := range cs.opts {
+ o.after(&cs.c)
+ }
if cs.put != nil {
cs.put()
cs.put = nil
}
+ if !cs.tracing {
+ return
+ }
if cs.trInfo.tr != nil {
if err == nil || err == io.EOF {
cs.trInfo.tr.LazyPrintf("RPC: [OK]")