aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go70
1 files changed, 33 insertions, 37 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index bb468dc..ecb1a31 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -37,7 +37,6 @@ import (
"bytes"
"errors"
"io"
- "math"
"sync"
"time"
@@ -46,6 +45,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
+ "google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)
@@ -178,7 +178,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
- if _, ok := err.(*rpcError); ok {
+ if _, ok := status.FromError(err); ok {
return nil, err
}
if err == errConnClosing || err == errConnUnavailable {
@@ -208,13 +208,14 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
break
}
cs := &clientStream{
- opts: opts,
- c: c,
- desc: desc,
- codec: cc.dopts.codec,
- cp: cc.dopts.cp,
- dc: cc.dopts.dc,
- cancel: cancel,
+ opts: opts,
+ c: c,
+ desc: desc,
+ codec: cc.dopts.codec,
+ cp: cc.dopts.cp,
+ dc: cc.dopts.dc,
+ maxMsgSize: cc.dopts.maxMsgSize,
+ cancel: cancel,
put: put,
t: t,
@@ -239,11 +240,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
case <-s.Done():
// TODO: The trace of the RPC is terminated here when there is no pending
// I/O, which is probably not the optimal solution.
- if s.StatusCode() == codes.OK {
- cs.finish(nil)
- } else {
- cs.finish(Errorf(s.StatusCode(), "%s", s.StatusDesc()))
- }
+ cs.finish(s.Status().Err())
cs.closeTransportStream(nil)
case <-s.GoAway():
cs.finish(errConnDrain)
@@ -259,17 +256,18 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
// clientStream implements a client side Stream.
type clientStream struct {
- opts []CallOption
- c callInfo
- t transport.ClientTransport
- s *transport.Stream
- p *parser
- desc *StreamDesc
- codec Codec
- cp Compressor
- cbuf *bytes.Buffer
- dc Decompressor
- cancel context.CancelFunc
+ opts []CallOption
+ c callInfo
+ t transport.ClientTransport
+ s *transport.Stream
+ p *parser
+ desc *StreamDesc
+ codec Codec
+ cp Compressor
+ cbuf *bytes.Buffer
+ dc Decompressor
+ maxMsgSize int
+ cancel context.CancelFunc
tracing bool // set to EnableTracing when the clientStream is created.
@@ -382,7 +380,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
Client: true,
}
}
- err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inPayload)
+ err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, inPayload)
defer func() {
// err != nil indicates the termination of the stream.
if err != nil {
@@ -405,17 +403,17 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
}
// Special handling for client streaming rpc.
// This recv expects EOF or errors, so we don't collect inPayload.
- err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, nil)
+ err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, nil)
cs.closeTransportStream(err)
if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
if err == io.EOF {
- if cs.s.StatusCode() == codes.OK {
- cs.finish(err)
- return nil
+ if se := cs.s.Status().Err(); se != nil {
+ return se
}
- return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
+ cs.finish(err)
+ return nil
}
return toRPCErr(err)
}
@@ -423,11 +421,11 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
cs.closeTransportStream(err)
}
if err == io.EOF {
- if cs.s.StatusCode() == codes.OK {
- // Returns io.EOF to indicate the end of the stream.
- return
+ if statusErr := cs.s.Status().Err(); statusErr != nil {
+ return statusErr
}
- return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
+ // Returns io.EOF to indicate the end of the stream.
+ return
}
return toRPCErr(err)
}
@@ -519,8 +517,6 @@ type serverStream struct {
dc Decompressor
cbuf *bytes.Buffer
maxMsgSize int
- statusCode codes.Code
- statusDesc string
trInfo *traceInfo
statsHandler stats.Handler