diff options
author | Niall Sheridan <nsheridan@gmail.com> | 2016-07-17 17:16:14 +0100 |
---|---|---|
committer | Niall Sheridan <nsheridan@gmail.com> | 2016-07-17 18:24:20 +0100 |
commit | c9849d667ab55c23d343332a11afb3eb8ede3f2d (patch) | |
tree | 86684d5481d8b12be84ea1a3a8f32afaac007efa /vendor/google.golang.org/grpc/stream.go | |
parent | 49f40a952943f26494d6407dc608b50b2ec0df7f (diff) |
Update vendor libs
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r-- | vendor/google.golang.org/grpc/stream.go | 70 |
1 files changed, 54 insertions, 16 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index de125d5..7a3bef5 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -79,7 +79,7 @@ type Stream interface { RecvMsg(m interface{}) error } -// ClientStream defines the interface a client stream has to satify. +// ClientStream defines the interface a client stream has to satisfy. type ClientStream interface { // Header returns the header metadata received from the server if there // is any. It blocks if the metadata is not ready to read. @@ -102,16 +102,15 @@ type ClientStream interface { func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { var ( t transport.ClientTransport + s *transport.Stream err error put func() ) - // TODO(zhaoq): CallOption is omitted. Add support when it is needed. - gopts := BalancerGetOptions{ - BlockingWait: false, - } - t, put, err = cc.getTransport(ctx, gopts) - if err != nil { - return nil, toRPCErr(err) + c := defaultCallInfo + for _, o := range opts { + if err := o.before(&c); err != nil { + return nil, toRPCErr(err) + } } callHdr := &transport.CallHdr{ Host: cc.authority, @@ -122,8 +121,9 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth callHdr.SendCompress = cc.dopts.cp.Type() } cs := &clientStream{ + opts: opts, + c: c, desc: desc, - put: put, codec: cc.dopts.codec, cp: cc.dopts.cp, dc: cc.dopts.dc, @@ -142,11 +142,44 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth cs.trInfo.tr.LazyLog(&cs.trInfo.firstLine, false) ctx = trace.NewContext(ctx, cs.trInfo.tr) } - s, err := t.NewStream(ctx, callHdr) - if err != nil { - cs.finish(err) - return nil, toRPCErr(err) + gopts := BalancerGetOptions{ + BlockingWait: !c.failFast, } + for { + t, put, err = cc.getTransport(ctx, gopts) + if err != nil { + // TODO(zhaoq): Probably revisit the error handling. + if _, ok := err.(*rpcError); ok { + return nil, err + } + if err == errConnClosing { + if c.failFast { + return nil, Errorf(codes.Unavailable, "%v", errConnClosing) + } + continue + } + // All the other errors are treated as Internal errors. + return nil, Errorf(codes.Internal, "%v", err) + } + + s, err = t.NewStream(ctx, callHdr) + if err != nil { + if put != nil { + put() + put = nil + } + if _, ok := err.(transport.ConnectionError); ok { + if c.failFast { + cs.finish(err) + return nil, toRPCErr(err) + } + continue + } + return nil, toRPCErr(err) + } + break + } + cs.put = put cs.t = t cs.s = s cs.p = &parser{r: s} @@ -167,6 +200,8 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth // clientStream implements a client side Stream. type clientStream struct { + opts []CallOption + c callInfo t transport.ClientTransport s *transport.Stream p *parser @@ -312,15 +347,18 @@ func (cs *clientStream) closeTransportStream(err error) { } func (cs *clientStream) finish(err error) { - if !cs.tracing { - return - } cs.mu.Lock() defer cs.mu.Unlock() + for _, o := range cs.opts { + o.after(&cs.c) + } if cs.put != nil { cs.put() cs.put = nil } + if !cs.tracing { + return + } if cs.trInfo.tr != nil { if err == nil || err == io.EOF { cs.trInfo.tr.LazyPrintf("RPC: [OK]") |