diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/call.go')
| -rw-r--r-- | vendor/google.golang.org/grpc/call.go | 69 | 
1 files changed, 61 insertions, 8 deletions
| diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go index 772c817..4d8023d 100644 --- a/vendor/google.golang.org/grpc/call.go +++ b/vendor/google.golang.org/grpc/call.go @@ -42,6 +42,7 @@ import (  	"golang.org/x/net/context"  	"golang.org/x/net/trace"  	"google.golang.org/grpc/codes" +	"google.golang.org/grpc/stats"  	"google.golang.org/grpc/transport"  ) @@ -49,7 +50,8 @@ import (  // On error, it returns the error and indicates whether the call should be retried.  //  // TODO(zhaoq): Check whether the received message sequence is valid. -func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) { +// TODO ctx is used for stats collection and processing. It is the context passed from the application. +func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {  	// Try to acquire header metadata from the server if there is any.  	defer func() {  		if err != nil { @@ -63,14 +65,25 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s  		return  	}  	p := &parser{r: stream} +	var inPayload *stats.InPayload +	if stats.On() { +		inPayload = &stats.InPayload{ +			Client: true, +		} +	}  	for { -		if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32); err != nil { +		if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inPayload); err != nil {  			if err == io.EOF {  				break  			}  			return  		}  	} +	if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK { +		// TODO in the current implementation, inTrailer may be handled before inPayload in some cases. +		// Fix the order if necessary. +		stats.HandleRPC(ctx, inPayload) +	}  	c.trailerMD = stream.Trailer()  	return nil  } @@ -89,15 +102,27 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd  			}  		}  	}() -	var cbuf *bytes.Buffer +	var ( +		cbuf       *bytes.Buffer +		outPayload *stats.OutPayload +	)  	if compressor != nil {  		cbuf = new(bytes.Buffer)  	} -	outBuf, err := encode(codec, args, compressor, cbuf) +	if stats.On() { +		outPayload = &stats.OutPayload{ +			Client: true, +		} +	} +	outBuf, err := encode(codec, args, compressor, cbuf, outPayload)  	if err != nil {  		return nil, Errorf(codes.Internal, "grpc: %v", err)  	}  	err = t.Write(stream, outBuf, opts) +	if err == nil && outPayload != nil { +		outPayload.SentTime = time.Now() +		stats.HandleRPC(ctx, outPayload) +	}  	// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method  	// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following  	// recvResponse to get the final status. @@ -118,8 +143,16 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli  	return invoke(ctx, method, args, reply, cc, opts...)  } -func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) { +func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {  	c := defaultCallInfo +	if mc, ok := cc.getMethodConfig(method); ok { +		c.failFast = !mc.WaitForReady +		if mc.Timeout > 0 { +			var cancel context.CancelFunc +			ctx, cancel = context.WithTimeout(ctx, mc.Timeout) +			defer cancel() +		} +	}  	for _, o := range opts {  		if err := o.before(&c); err != nil {  			return toRPCErr(err) @@ -140,12 +173,31 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli  		c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)  		// TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set.  		defer func() { -			if err != nil { -				c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) +			if e != nil { +				c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true)  				c.traceInfo.tr.SetError()  			}  		}()  	} +	if stats.On() { +		ctx = stats.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method}) +		begin := &stats.Begin{ +			Client:    true, +			BeginTime: time.Now(), +			FailFast:  c.failFast, +		} +		stats.HandleRPC(ctx, begin) +	} +	defer func() { +		if stats.On() { +			end := &stats.End{ +				Client:  true, +				EndTime: time.Now(), +				Error:   e, +			} +			stats.HandleRPC(ctx, end) +		} +	}()  	topts := &transport.Options{  		Last:  true,  		Delay: false, @@ -167,6 +219,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli  		if cc.dopts.cp != nil {  			callHdr.SendCompress = cc.dopts.cp.Type()  		} +  		gopts := BalancerGetOptions{  			BlockingWait: !c.failFast,  		} @@ -205,7 +258,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli  			}  			return toRPCErr(err)  		} -		err = recvResponse(cc.dopts, t, &c, stream, reply) +		err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)  		if err != nil {  			if put != nil {  				put() | 
