From 347c11ec42264c579eb3f19494e4f75ab8bb8f0d Mon Sep 17 00:00:00 2001 From: Niall Sheridan Date: Thu, 9 Aug 2018 23:29:20 +0100 Subject: Remove gRPC This hasn't been enabled in a while due to gRPC limitations --- vendor/google.golang.org/grpc/stream.go | 694 +++++++++++++++++++++----------- 1 file changed, 469 insertions(+), 225 deletions(-) (limited to 'vendor/google.golang.org/grpc/stream.go') diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index 152d9ec..65d45a1 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -21,6 +21,8 @@ package grpc import ( "errors" "io" + "math" + "strconv" "sync" "time" @@ -29,11 +31,13 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding" + "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/grpcrand" + "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" "google.golang.org/grpc/status" - "google.golang.org/grpc/transport" ) // StreamHandler defines the handler called by gRPC server to complete the @@ -55,31 +59,20 @@ type StreamDesc struct { // Stream defines the common interface a client or server stream has to satisfy. // -// All errors returned from Stream are compatible with the status package. +// Deprecated: See ClientStream and ServerStream documentation instead. type Stream interface { - // Context returns the context for this stream. + // Deprecated: See ClientStream and ServerStream documentation instead. Context() context.Context - // SendMsg blocks until it sends m, the stream is done or the stream - // breaks. - // On error, it aborts the stream and returns an RPC status on client - // side. On server side, it simply returns the error to the caller. - // SendMsg is called by generated code. Also Users can call SendMsg - // directly when it is really needed in their use cases. - // It's safe to have a goroutine calling SendMsg and another goroutine calling - // recvMsg on the same stream at the same time. - // But it is not safe to call SendMsg on the same stream in different goroutines. + // Deprecated: See ClientStream and ServerStream documentation instead. SendMsg(m interface{}) error - // RecvMsg blocks until it receives a message or the stream is - // done. On client side, it returns io.EOF when the stream is done. On - // any other error, it aborts the stream and returns an RPC status. On - // server side, it simply returns the error to the caller. - // It's safe to have a goroutine calling SendMsg and another goroutine calling - // recvMsg on the same stream at the same time. - // But it is not safe to call RecvMsg on the same stream in different goroutines. + // Deprecated: See ClientStream and ServerStream documentation instead. RecvMsg(m interface{}) error } -// ClientStream defines the interface a client stream has to satisfy. +// ClientStream defines the client-side behavior of a streaming RPC. +// +// All errors returned from ClientStream methods are compatible with the +// status package. type ClientStream interface { // Header returns the header metadata received from the server if there // is any. It blocks if the metadata is not ready to read. @@ -91,13 +84,38 @@ type ClientStream interface { // CloseSend closes the send direction of the stream. It closes the stream // when non-nil error is met. CloseSend() error - // Stream.SendMsg() may return a non-nil error when something wrong happens sending - // the request. The returned error indicates the status of this sending, not the final - // status of the RPC. + // Context returns the context for this stream. + // + // It should not be called until after Header or RecvMsg has returned. Once + // called, subsequent client-side retries are disabled. + Context() context.Context + // SendMsg is generally called by generated code. On error, SendMsg aborts + // the stream. If the error was generated by the client, the status is + // returned directly; otherwise, io.EOF is returned and the status of + // the stream may be discovered using RecvMsg. + // + // SendMsg blocks until: + // - There is sufficient flow control to schedule m with the transport, or + // - The stream is done, or + // - The stream breaks. // - // Always call Stream.RecvMsg() to drain the stream and get the final - // status, otherwise there could be leaked resources. - Stream + // SendMsg does not wait until the message is received by the server. An + // untimely stream closure may result in lost messages. To ensure delivery, + // users should ensure the RPC completed successfully using RecvMsg. + // + // It is safe to have a goroutine calling SendMsg and another goroutine + // calling RecvMsg on the same stream at the same time, but it is not safe + // to call SendMsg on the same stream in different goroutines. + SendMsg(m interface{}) error + // RecvMsg blocks until it receives a message into m or the stream is + // done. It returns io.EOF when the stream completes successfully. On + // any other error, the stream is aborted and the error contains the RPC + // status. + // + // It is safe to have a goroutine calling SendMsg and another goroutine + // calling RecvMsg on the same stream at the same time, but it is not + // safe to call RecvMsg on the same stream in different goroutines. + RecvMsg(m interface{}) error } // NewStream creates a new Stream for the client side. This is typically @@ -128,8 +146,6 @@ func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method st } // NewClientStream is a wrapper for ClientConn.NewStream. -// -// DEPRECATED: Use ClientConn.NewStream instead. func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { return cc.NewStream(ctx, desc, method, opts...) } @@ -178,13 +194,8 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } callHdr := &transport.CallHdr{ - Host: cc.authority, - Method: method, - // If it's not client streaming, we should already have the request to be sent, - // so we don't flush the header. - // If it's client streaming, the user may never send a request or send it any - // time soon, so we ask the transport to flush the header. - Flush: desc.ClientStreams, + Host: cc.authority, + Method: method, ContentSubtype: c.contentSubtype, } @@ -218,15 +229,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } trInfo.tr.LazyLog(&trInfo.firstLine, false) ctx = trace.NewContext(ctx, trInfo.tr) - defer func() { - if err != nil { - // Need to call tr.finish() if error is returned. - // Because tr will not be returned to caller. - trInfo.tr.LazyPrintf("RPC: [%v]", err) - trInfo.tr.SetError() - trInfo.tr.Finish() - } - }() } ctx = newContextWithRPCInfo(ctx, c.failFast) sh := cc.dopts.copts.StatsHandler @@ -240,80 +242,41 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth FailFast: c.failFast, } sh.HandleRPC(ctx, begin) - defer func() { - if err != nil { - // Only handle end stats if err != nil. - end := &stats.End{ - Client: true, - Error: err, - BeginTime: beginTime, - EndTime: time.Now(), - } - sh.HandleRPC(ctx, end) - } - }() } - var ( - t transport.ClientTransport - s *transport.Stream - done func(balancer.DoneInfo) - ) - for { - // Check to make sure the context has expired. This will prevent us from - // looping forever if an error occurs for wait-for-ready RPCs where no data - // is sent on the wire. - select { - case <-ctx.Done(): - return nil, toRPCErr(ctx.Err()) - default: - } - - t, done, err = cc.getTransport(ctx, c.failFast) - if err != nil { - return nil, err - } + cs := &clientStream{ + callHdr: callHdr, + ctx: ctx, + methodConfig: &mc, + opts: opts, + callInfo: c, + cc: cc, + desc: desc, + codec: c.codec, + cp: cp, + comp: comp, + cancel: cancel, + beginTime: beginTime, + firstAttempt: true, + } + if !cc.dopts.disableRetry { + cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler) + } + + cs.callInfo.stream = cs + // Only this initial attempt has stats/tracing. + // TODO(dfawley): move to newAttempt when per-attempt stats are implemented. + if err := cs.newAttemptLocked(sh, trInfo); err != nil { + cs.finish(err) + return nil, err + } - s, err = t.NewStream(ctx, callHdr) - if err != nil { - if done != nil { - done(balancer.DoneInfo{Err: err}) - done = nil - } - // In the event of any error from NewStream, we never attempted to write - // anything to the wire, so we can retry indefinitely for non-fail-fast - // RPCs. - if !c.failFast { - continue - } - return nil, toRPCErr(err) - } - break + op := func(a *csAttempt) error { return a.newStream() } + if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil { + cs.finish(err) + return nil, err } - cs := &clientStream{ - opts: opts, - c: c, - cc: cc, - desc: desc, - codec: c.codec, - cp: cp, - comp: comp, - cancel: cancel, - attempt: &csAttempt{ - t: t, - s: s, - p: &parser{r: s}, - done: done, - dc: cc.dopts.dc, - ctx: ctx, - trInfo: trInfo, - statsHandler: sh, - beginTime: beginTime, - }, - } - cs.c.stream = cs - cs.attempt.cs = cs if desc != unaryStreamDesc { // Listen on cc and stream contexts to cleanup when the user closes the // ClientConn or cancels the stream context. In all other cases, an error @@ -332,12 +295,45 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth return cs, nil } +func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo traceInfo) error { + cs.attempt = &csAttempt{ + cs: cs, + dc: cs.cc.dopts.dc, + statsHandler: sh, + trInfo: trInfo, + } + + if err := cs.ctx.Err(); err != nil { + return toRPCErr(err) + } + t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method) + if err != nil { + return err + } + cs.attempt.t = t + cs.attempt.done = done + return nil +} + +func (a *csAttempt) newStream() error { + cs := a.cs + cs.callHdr.PreviousAttempts = cs.numRetries + s, err := a.t.NewStream(cs.ctx, cs.callHdr) + if err != nil { + return toRPCErr(err) + } + cs.attempt.s = s + cs.attempt.p = &parser{r: s} + return nil +} + // clientStream implements a client side Stream. type clientStream struct { - opts []CallOption - c *callInfo - cc *ClientConn - desc *StreamDesc + callHdr *transport.CallHdr + opts []CallOption + callInfo *callInfo + cc *ClientConn + desc *StreamDesc codec baseCodec cp Compressor @@ -345,13 +341,25 @@ type clientStream struct { cancel context.CancelFunc // cancels all attempts - sentLast bool // sent an end stream + sentLast bool // sent an end stream + beginTime time.Time + + methodConfig *MethodConfig + + ctx context.Context // the application's context, wrapped by stats/tracing - mu sync.Mutex // guards finished - finished bool // TODO: replace with atomic cmpxchg or sync.Once? + retryThrottler *retryThrottler // The throttler active when the RPC began. - attempt *csAttempt // the active client stream attempt + mu sync.Mutex + firstAttempt bool // if true, transparent retry is valid + numRetries int // exclusive of transparent retry attempt(s) + numRetriesSincePushback int // retries since pushback; to reset backoff + finished bool // TODO: replace with atomic cmpxchg or sync.Once? + attempt *csAttempt // the active client stream attempt // TODO(hedging): hedging will have multiple attempts simultaneously. + committed bool // active attempt committed for retry? + buffer []func(a *csAttempt) error // operations to replay on retry + bufferSize int // current size of buffer } // csAttempt implements a single transport stream attempt within a @@ -363,53 +371,298 @@ type csAttempt struct { p *parser done func(balancer.DoneInfo) + finished bool dc Decompressor decomp encoding.Compressor decompSet bool - ctx context.Context // the application's context, wrapped by stats/tracing - mu sync.Mutex // guards trInfo.tr // trInfo.tr is set when created (if EnableTracing is true), // and cleared when the finish method is called. trInfo traceInfo statsHandler stats.Handler - beginTime time.Time +} + +func (cs *clientStream) commitAttemptLocked() { + cs.committed = true + cs.buffer = nil +} + +func (cs *clientStream) commitAttempt() { + cs.mu.Lock() + cs.commitAttemptLocked() + cs.mu.Unlock() +} + +// shouldRetry returns nil if the RPC should be retried; otherwise it returns +// the error that should be returned by the operation. +func (cs *clientStream) shouldRetry(err error) error { + if cs.attempt.s == nil && !cs.callInfo.failFast { + // In the event of any error from NewStream (attempt.s == nil), we + // never attempted to write anything to the wire, so we can retry + // indefinitely for non-fail-fast RPCs. + return nil + } + if cs.finished || cs.committed { + // RPC is finished or committed; cannot retry. + return err + } + // Wait for the trailers. + if cs.attempt.s != nil { + <-cs.attempt.s.Done() + } + if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) { + // First attempt, wait-for-ready, stream unprocessed: transparently retry. + cs.firstAttempt = false + return nil + } + cs.firstAttempt = false + if cs.cc.dopts.disableRetry { + return err + } + + pushback := 0 + hasPushback := false + if cs.attempt.s != nil { + if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil { + // Context error; stop now. + return toErr + } else if !to { + return err + } + + // TODO(retry): Move down if the spec changes to not check server pushback + // before considering this a failure for throttling. + sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"] + if len(sps) == 1 { + var e error + if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 { + grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0]) + cs.retryThrottler.throttle() // This counts as a failure for throttling. + return err + } + hasPushback = true + } else if len(sps) > 1 { + grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps) + cs.retryThrottler.throttle() // This counts as a failure for throttling. + return err + } + } + + var code codes.Code + if cs.attempt.s != nil { + code = cs.attempt.s.Status().Code() + } else { + code = status.Convert(err).Code() + } + + rp := cs.methodConfig.retryPolicy + if rp == nil || !rp.retryableStatusCodes[code] { + return err + } + + // Note: the ordering here is important; we count this as a failure + // only if the code matched a retryable code. + if cs.retryThrottler.throttle() { + return err + } + if cs.numRetries+1 >= rp.maxAttempts { + return err + } + + var dur time.Duration + if hasPushback { + dur = time.Millisecond * time.Duration(pushback) + cs.numRetriesSincePushback = 0 + } else { + fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback)) + cur := float64(rp.initialBackoff) * fact + if max := float64(rp.maxBackoff); cur > max { + cur = max + } + dur = time.Duration(grpcrand.Int63n(int64(cur))) + cs.numRetriesSincePushback++ + } + + // TODO(dfawley): we could eagerly fail here if dur puts us past the + // deadline, but unsure if it is worth doing. + t := time.NewTimer(dur) + select { + case <-t.C: + cs.numRetries++ + return nil + case <-cs.ctx.Done(): + t.Stop() + return status.FromContextError(cs.ctx.Err()).Err() + } +} + +// Returns nil if a retry was performed and succeeded; error otherwise. +func (cs *clientStream) retryLocked(lastErr error) error { + for { + cs.attempt.finish(lastErr) + if err := cs.shouldRetry(lastErr); err != nil { + cs.commitAttemptLocked() + return err + } + if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil { + return err + } + if lastErr = cs.replayBufferLocked(); lastErr == nil { + return nil + } + } } func (cs *clientStream) Context() context.Context { - // TODO(retry): commit the current attempt (the context has peer-aware data). - return cs.attempt.context() + cs.commitAttempt() + // No need to lock before using attempt, since we know it is committed and + // cannot change. + return cs.attempt.s.Context() +} + +func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error { + cs.mu.Lock() + for { + if cs.committed { + cs.mu.Unlock() + return op(cs.attempt) + } + a := cs.attempt + cs.mu.Unlock() + err := op(a) + cs.mu.Lock() + if a != cs.attempt { + // We started another attempt already. + continue + } + if err == io.EOF { + <-a.s.Done() + } + if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) { + onSuccess() + cs.mu.Unlock() + return err + } + if err := cs.retryLocked(err); err != nil { + cs.mu.Unlock() + return err + } + } } func (cs *clientStream) Header() (metadata.MD, error) { - m, err := cs.attempt.header() + var m metadata.MD + err := cs.withRetry(func(a *csAttempt) error { + var err error + m, err = a.s.Header() + return toRPCErr(err) + }, cs.commitAttemptLocked) if err != nil { - // TODO(retry): maybe retry on error or commit attempt on success. - err = toRPCErr(err) cs.finish(err) } return m, err } func (cs *clientStream) Trailer() metadata.MD { - // TODO(retry): on error, maybe retry (trailers-only). - return cs.attempt.trailer() + // On RPC failure, we never need to retry, because usage requires that + // RecvMsg() returned a non-nil error before calling this function is valid. + // We would have retried earlier if necessary. + // + // Commit the attempt anyway, just in case users are not following those + // directions -- it will prevent races and should not meaningfully impact + // performance. + cs.commitAttempt() + if cs.attempt.s == nil { + return nil + } + return cs.attempt.s.Trailer() +} + +func (cs *clientStream) replayBufferLocked() error { + a := cs.attempt + for _, f := range cs.buffer { + if err := f(a); err != nil { + return err + } + } + return nil +} + +func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) { + // Note: we still will buffer if retry is disabled (for transparent retries). + if cs.committed { + return + } + cs.bufferSize += sz + if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize { + cs.commitAttemptLocked() + return + } + cs.buffer = append(cs.buffer, op) } func (cs *clientStream) SendMsg(m interface{}) (err error) { - // TODO(retry): buffer message for replaying if not committed. - return cs.attempt.sendMsg(m) + defer func() { + if err != nil && err != io.EOF { + // Call finish on the client stream for errors generated by this SendMsg + // call, as these indicate problems created by this client. (Transport + // errors are converted to an io.EOF error in csAttempt.sendMsg; the real + // error will be returned from RecvMsg eventually in that case, or be + // retried.) + cs.finish(err) + } + }() + if cs.sentLast { + return status.Errorf(codes.Internal, "SendMsg called after CloseSend") + } + if !cs.desc.ClientStreams { + cs.sentLast = true + } + data, err := encode(cs.codec, m) + if err != nil { + return err + } + compData, err := compress(data, cs.cp, cs.comp) + if err != nil { + return err + } + hdr, payload := msgHeader(data, compData) + // TODO(dfawley): should we be checking len(data) instead? + if len(payload) > *cs.callInfo.maxSendMessageSize { + return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize) + } + op := func(a *csAttempt) error { + err := a.sendMsg(m, hdr, payload, data) + // nil out the message and uncomp when replaying; they are only needed for + // stats which is disabled for subsequent attempts. + m, data = nil, nil + return err + } + return cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) }) } -func (cs *clientStream) RecvMsg(m interface{}) (err error) { - // TODO(retry): maybe retry on error or commit attempt on success. - return cs.attempt.recvMsg(m) +func (cs *clientStream) RecvMsg(m interface{}) error { + err := cs.withRetry(func(a *csAttempt) error { + return a.recvMsg(m) + }, cs.commitAttemptLocked) + if err != nil || !cs.desc.ServerStreams { + // err != nil or non-server-streaming indicates end of stream. + cs.finish(err) + } + return err } func (cs *clientStream) CloseSend() error { - cs.attempt.closeSend() + if cs.sentLast { + // TODO: return an error and finish the stream instead, due to API misuse? + return nil + } + cs.sentLast = true + op := func(a *csAttempt) error { return a.t.Write(a.s, nil, nil, &transport.Options{Last: true}) } + cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }) + // We never returned an error here for reasons. return nil } @@ -424,7 +677,11 @@ func (cs *clientStream) finish(err error) { return } cs.finished = true + cs.commitAttemptLocked() cs.mu.Unlock() + if err == nil { + cs.retryThrottler.successfulRPC() + } if channelz.IsOn() { if err != nil { cs.cc.incrCallsFailed() @@ -432,46 +689,20 @@ func (cs *clientStream) finish(err error) { cs.cc.incrCallsSucceeded() } } - // TODO(retry): commit current attempt if necessary. - cs.attempt.finish(err) - for _, o := range cs.opts { - o.after(cs.c) + if cs.attempt != nil { + cs.attempt.finish(err) + } + // after functions all rely upon having a stream. + if cs.attempt.s != nil { + for _, o := range cs.opts { + o.after(cs.callInfo) + } } cs.cancel() } -func (a *csAttempt) context() context.Context { - return a.s.Context() -} - -func (a *csAttempt) header() (metadata.MD, error) { - return a.s.Header() -} - -func (a *csAttempt) trailer() metadata.MD { - return a.s.Trailer() -} - -func (a *csAttempt) sendMsg(m interface{}) (err error) { - // TODO Investigate how to signal the stats handling party. - // generate error stats if err != nil && err != io.EOF? +func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { cs := a.cs - defer func() { - // For non-client-streaming RPCs, we return nil instead of EOF on success - // because the generated code requires it. finish is not called; RecvMsg() - // will call it with the stream's status independently. - if err == io.EOF && !cs.desc.ClientStreams { - err = nil - } - if err != nil && err != io.EOF { - // Call finish on the client stream for errors generated by this SendMsg - // call, as these indicate problems created by this client. (Transport - // errors are converted to an io.EOF error below; the real error will be - // returned from RecvMsg eventually in that case, or be retried.) - cs.finish(err) - } - }() - // TODO: Check cs.sentLast and error if we already ended the stream. if EnableTracing { a.mu.Lock() if a.trInfo.tr != nil { @@ -479,44 +710,26 @@ func (a *csAttempt) sendMsg(m interface{}) (err error) { } a.mu.Unlock() } - data, err := encode(cs.codec, m) - if err != nil { - return err - } - compData, err := compress(data, cs.cp, cs.comp) - if err != nil { - return err - } - hdr, payload := msgHeader(data, compData) - // TODO(dfawley): should we be checking len(data) instead? - if len(payload) > *cs.c.maxSendMessageSize { - return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.c.maxSendMessageSize) + if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { + if !cs.desc.ClientStreams { + // For non-client-streaming RPCs, we return nil instead of EOF on error + // because the generated code requires it. finish is not called; RecvMsg() + // will call it with the stream's status independently. + return nil + } + return io.EOF } - - if !cs.desc.ClientStreams { - cs.sentLast = true + if a.statsHandler != nil { + a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now())) } - err = a.t.Write(a.s, hdr, payload, &transport.Options{Last: !cs.desc.ClientStreams}) - if err == nil { - if a.statsHandler != nil { - a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payload, time.Now())) - } - if channelz.IsOn() { - a.t.IncrMsgSent() - } - return nil + if channelz.IsOn() { + a.t.IncrMsgSent() } - return io.EOF + return nil } func (a *csAttempt) recvMsg(m interface{}) (err error) { cs := a.cs - defer func() { - if err != nil || !cs.desc.ServerStreams { - // err != nil or non-server-streaming indicates end of stream. - cs.finish(err) - } - }() var inPayload *stats.InPayload if a.statsHandler != nil { inPayload = &stats.InPayload{ @@ -539,7 +752,7 @@ func (a *csAttempt) recvMsg(m interface{}) (err error) { // Only initialize this state once per stream. a.decompSet = true } - err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, inPayload, a.decomp) + err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, inPayload, a.decomp) if err != nil { if err == io.EOF { if statusErr := a.s.Status().Err(); statusErr != nil { @@ -557,7 +770,7 @@ func (a *csAttempt) recvMsg(m interface{}) (err error) { a.mu.Unlock() } if inPayload != nil { - a.statsHandler.HandleRPC(a.ctx, inPayload) + a.statsHandler.HandleRPC(cs.ctx, inPayload) } if channelz.IsOn() { a.t.IncrMsgRecv() @@ -569,7 +782,7 @@ func (a *csAttempt) recvMsg(m interface{}) (err error) { // Special handling for non-server-stream rpcs. // This recv expects EOF or errors, so we don't collect inPayload. - err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.c.maxReceiveMessageSize, nil, a.decomp) + err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp) if err == nil { return toRPCErr(errors.New("grpc: client streaming protocol violation: get , want ")) } @@ -579,37 +792,40 @@ func (a *csAttempt) recvMsg(m interface{}) (err error) { return toRPCErr(err) } -func (a *csAttempt) closeSend() { - cs := a.cs - if cs.sentLast { - return - } - cs.sentLast = true - cs.attempt.t.Write(cs.attempt.s, nil, nil, &transport.Options{Last: true}) - // We ignore errors from Write. Any error it would return would also be - // returned by a subsequent RecvMsg call, and the user is supposed to always - // finish the stream by calling RecvMsg until it returns err != nil. -} - func (a *csAttempt) finish(err error) { a.mu.Lock() - a.t.CloseStream(a.s, err) + if a.finished { + a.mu.Unlock() + return + } + a.finished = true + if err == io.EOF { + // Ending a stream with EOF indicates a success. + err = nil + } + if a.s != nil { + a.t.CloseStream(a.s, err) + } if a.done != nil { + br := false + if a.s != nil { + br = a.s.BytesReceived() + } a.done(balancer.DoneInfo{ Err: err, - BytesSent: true, - BytesReceived: a.s.BytesReceived(), + BytesSent: a.s != nil, + BytesReceived: br, }) } if a.statsHandler != nil { end := &stats.End{ Client: true, - BeginTime: a.beginTime, + BeginTime: a.cs.beginTime, EndTime: time.Now(), Error: err, } - a.statsHandler.HandleRPC(a.ctx, end) + a.statsHandler.HandleRPC(a.cs.ctx, end) } if a.trInfo.tr != nil { if err == nil { @@ -624,7 +840,10 @@ func (a *csAttempt) finish(err error) { a.mu.Unlock() } -// ServerStream defines the interface a server stream has to satisfy. +// ServerStream defines the server-side behavior of a streaming RPC. +// +// All errors returned from ServerStream methods are compatible with the +// status package. type ServerStream interface { // SetHeader sets the header metadata. It may be called multiple times. // When call multiple times, all the provided metadata will be merged. @@ -640,7 +859,32 @@ type ServerStream interface { // SetTrailer sets the trailer metadata which will be sent with the RPC status. // When called more than once, all the provided metadata will be merged. SetTrailer(metadata.MD) - Stream + // Context returns the context for this stream. + Context() context.Context + // SendMsg sends a message. On error, SendMsg aborts the stream and the + // error is returned directly. + // + // SendMsg blocks until: + // - There is sufficient flow control to schedule m with the transport, or + // - The stream is done, or + // - The stream breaks. + // + // SendMsg does not wait until the message is received by the client. An + // untimely stream closure may result in lost messages. + // + // It is safe to have a goroutine calling SendMsg and another goroutine + // calling RecvMsg on the same stream at the same time, but it is not safe + // to call SendMsg on the same stream in different goroutines. + SendMsg(m interface{}) error + // RecvMsg blocks until it receives a message into m or the stream is + // done. It returns io.EOF when the client has performed a CloseSend. On + // any non-EOF error, the stream is aborted and the error contains the + // RPC status. + // + // It is safe to have a goroutine calling SendMsg and another goroutine + // calling RecvMsg on the same stream at the same time, but it is not + // safe to call RecvMsg on the same stream in different goroutines. + RecvMsg(m interface{}) error } // serverStream implements a server side Stream. -- cgit v1.2.3