diff options
author | Niall Sheridan <nsheridan@gmail.com> | 2016-08-27 01:32:30 +0100 |
---|---|---|
committer | Niall Sheridan <nsheridan@gmail.com> | 2016-08-27 01:32:30 +0100 |
commit | 921818bca208f0c70e85ec670074cb3905cbbc82 (patch) | |
tree | 4aa67ad2bb2083bd486db3f99680d6d08a2c36b3 /vendor/google.golang.org/grpc/stream.go | |
parent | 7f1c9358805302344a89c1fed4eab1342931b061 (diff) |
Update dependencies
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r-- | vendor/google.golang.org/grpc/stream.go | 115 |
1 files changed, 73 insertions, 42 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index 7a3bef5..51df3f0 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -37,6 +37,7 @@ import ( "bytes" "errors" "io" + "math" "sync" "time" @@ -84,12 +85,9 @@ type ClientStream interface { // Header returns the header metadata received from the server if there // is any. It blocks if the metadata is not ready to read. Header() (metadata.MD, error) - // Trailer returns the trailer metadata from the server. It must be called - // after stream.Recv() returns non-nil error (including io.EOF) for - // bi-directional streaming and server streaming or stream.CloseAndRecv() - // returns for client streaming in order to receive trailer metadata if - // present. Otherwise, it could returns an empty MD even though trailer - // is present. + // Trailer returns the trailer metadata from the server, if there is any. + // It must only be called after stream.CloseAndRecv has returned, or + // stream.Recv has returned a non-nil error (including io.EOF). Trailer() metadata.MD // CloseSend closes the send direction of the stream. It closes the stream // when non-nil error is met. @@ -99,11 +97,10 @@ type ClientStream interface { // NewClientStream creates a new Stream for the client side. This is called // by generated code. -func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { +func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { var ( t transport.ClientTransport s *transport.Stream - err error put func() ) c := defaultCallInfo @@ -120,27 +117,24 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth if cc.dopts.cp != nil { callHdr.SendCompress = cc.dopts.cp.Type() } - cs := &clientStream{ - opts: opts, - c: c, - desc: desc, - codec: cc.dopts.codec, - cp: cc.dopts.cp, - dc: cc.dopts.dc, - tracing: EnableTracing, - } - if cc.dopts.cp != nil { - callHdr.SendCompress = cc.dopts.cp.Type() - cs.cbuf = new(bytes.Buffer) - } - if cs.tracing { - cs.trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) - cs.trInfo.firstLine.client = true + var trInfo traceInfo + if EnableTracing { + trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) + trInfo.firstLine.client = true if deadline, ok := ctx.Deadline(); ok { - cs.trInfo.firstLine.deadline = deadline.Sub(time.Now()) + trInfo.firstLine.deadline = deadline.Sub(time.Now()) } - cs.trInfo.tr.LazyLog(&cs.trInfo.firstLine, false) - ctx = trace.NewContext(ctx, cs.trInfo.tr) + trInfo.tr.LazyLog(&trInfo.firstLine, false) + ctx = trace.NewContext(ctx, trInfo.tr) + defer func() { + if err != nil { + // Need to call tr.finish() if error is returned. + // Because tr will not be returned to caller. + trInfo.tr.LazyPrintf("RPC: [%v]", err) + trInfo.tr.SetError() + trInfo.tr.Finish() + } + }() } gopts := BalancerGetOptions{ BlockingWait: !c.failFast, @@ -152,9 +146,9 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth if _, ok := err.(*rpcError); ok { return nil, err } - if err == errConnClosing { + if err == errConnClosing || err == errConnUnavailable { if c.failFast { - return nil, Errorf(codes.Unavailable, "%v", errConnClosing) + return nil, Errorf(codes.Unavailable, "%v", err) } continue } @@ -168,9 +162,8 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth put() put = nil } - if _, ok := err.(transport.ConnectionError); ok { + if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain { if c.failFast { - cs.finish(err) return nil, toRPCErr(err) } continue @@ -179,16 +172,43 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } break } - cs.put = put - cs.t = t - cs.s = s - cs.p = &parser{r: s} - // Listen on ctx.Done() to detect cancellation when there is no pending - // I/O operations on this stream. + cs := &clientStream{ + opts: opts, + c: c, + desc: desc, + codec: cc.dopts.codec, + cp: cc.dopts.cp, + dc: cc.dopts.dc, + + put: put, + t: t, + s: s, + p: &parser{r: s}, + + tracing: EnableTracing, + trInfo: trInfo, + } + if cc.dopts.cp != nil { + cs.cbuf = new(bytes.Buffer) + } + // Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination + // when there is no pending I/O operations on this stream. go func() { select { case <-t.Error(): // Incur transport error, simply exit. + case <-s.Done(): + // TODO: The trace of the RPC is terminated here when there is no pending + // I/O, which is probably not the optimal solution. + if s.StatusCode() == codes.OK { + cs.finish(nil) + } else { + cs.finish(Errorf(s.StatusCode(), "%s", s.StatusDesc())) + } + cs.closeTransportStream(nil) + case <-s.GoAway(): + cs.finish(errConnDrain) + cs.closeTransportStream(errConnDrain) case <-s.Context().Done(): err := s.Context().Err() cs.finish(err) @@ -251,7 +271,17 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if err != nil { cs.finish(err) } - if err == nil || err == io.EOF { + if err == nil { + return + } + if err == io.EOF { + // Specialize the process for server streaming. SendMesg is only called + // once when creating the stream object. io.EOF needs to be skipped when + // the rpc is early finished (before the stream object is created.). + // TODO: It is probably better to move this into the generated code. + if !cs.desc.ClientStreams && cs.desc.ServerStreams { + err = nil + } return } if _, ok := err.(transport.ConnectionError); !ok { @@ -272,7 +302,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } func (cs *clientStream) RecvMsg(m interface{}) (err error) { - err = recv(cs.p, cs.codec, cs.s, cs.dc, m) + err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32) defer func() { // err != nil indicates the termination of the stream. if err != nil { @@ -291,7 +321,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { return } // Special handling for client streaming rpc. - err = recv(cs.p, cs.codec, cs.s, cs.dc, m) + err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32) cs.closeTransportStream(err) if err == nil { return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) @@ -326,7 +356,7 @@ func (cs *clientStream) CloseSend() (err error) { } }() if err == nil || err == io.EOF { - return + return nil } if _, ok := err.(transport.ConnectionError); !ok { cs.closeTransportStream(err) @@ -392,6 +422,7 @@ type serverStream struct { cp Compressor dc Decompressor cbuf *bytes.Buffer + maxMsgSize int statusCode codes.Code statusDesc string trInfo *traceInfo @@ -458,5 +489,5 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { ss.mu.Unlock() } }() - return recv(ss.p, ss.codec, ss.s, ss.dc, m) + return recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize) } |