aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go29
1 files changed, 24 insertions, 5 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 51df3f0..e1b4759 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -97,7 +97,14 @@ type ClientStream interface {
// NewClientStream creates a new Stream for the client side. This is called
// by generated code.
-func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
+func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
+ if cc.dopts.streamInt != nil {
+ return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
+ }
+ return newClientStream(ctx, desc, cc, method, opts...)
+}
+
+func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
var (
t transport.ClientTransport
s *transport.Stream
@@ -296,7 +303,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
}()
if err != nil {
- return transport.StreamErrorf(codes.Internal, "grpc: %v", err)
+ return Errorf(codes.Internal, "grpc: %v", err)
}
return cs.t.Write(cs.s, out, &transport.Options{Last: false})
}
@@ -468,10 +475,13 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
}
}()
if err != nil {
- err = transport.StreamErrorf(codes.Internal, "grpc: %v", err)
+ err = Errorf(codes.Internal, "grpc: %v", err)
return err
}
- return ss.t.Write(ss.s, out, &transport.Options{Last: false})
+ if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil {
+ return toRPCErr(err)
+ }
+ return nil
}
func (ss *serverStream) RecvMsg(m interface{}) (err error) {
@@ -489,5 +499,14 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
ss.mu.Unlock()
}
}()
- return recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize)
+ if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize); err != nil {
+ if err == io.EOF {
+ return err
+ }
+ if err == io.ErrUnexpectedEOF {
+ err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
+ }
+ return toRPCErr(err)
+ }
+ return nil
}