aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go39
1 files changed, 22 insertions, 17 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index d3a4deb..bb468dc 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -151,23 +151,24 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
}()
}
- if stats.On() {
- ctx = stats.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
+ sh := cc.dopts.copts.StatsHandler
+ if sh != nil {
+ ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
FailFast: c.failFast,
}
- stats.HandleRPC(ctx, begin)
+ sh.HandleRPC(ctx, begin)
}
defer func() {
- if err != nil && stats.On() {
+ if err != nil && sh != nil {
// Only handle end stats if err != nil.
end := &stats.End{
Client: true,
Error: err,
}
- stats.HandleRPC(ctx, end)
+ sh.HandleRPC(ctx, end)
}
}()
gopts := BalancerGetOptions{
@@ -223,7 +224,8 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
tracing: EnableTracing,
trInfo: trInfo,
- statsCtx: ctx,
+ statsCtx: ctx,
+ statsHandler: cc.dopts.copts.StatsHandler,
}
if cc.dopts.cp != nil {
cs.cbuf = new(bytes.Buffer)
@@ -281,7 +283,8 @@ type clientStream struct {
// statsCtx keeps the user context for stats handling.
// All stats collection should use the statsCtx (instead of the stream context)
// so that all the generated stats for a particular RPC can be associated in the processing phase.
- statsCtx context.Context
+ statsCtx context.Context
+ statsHandler stats.Handler
}
func (cs *clientStream) Context() context.Context {
@@ -335,7 +338,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
err = toRPCErr(err)
}()
var outPayload *stats.OutPayload
- if stats.On() {
+ if cs.statsHandler != nil {
outPayload = &stats.OutPayload{
Client: true,
}
@@ -352,14 +355,14 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
err = cs.t.Write(cs.s, out, &transport.Options{Last: false})
if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
- stats.HandleRPC(cs.statsCtx, outPayload)
+ cs.statsHandler.HandleRPC(cs.statsCtx, outPayload)
}
return err
}
func (cs *clientStream) RecvMsg(m interface{}) (err error) {
defer func() {
- if err != nil && stats.On() {
+ if err != nil && cs.statsHandler != nil {
// Only generate End if err != nil.
// If err == nil, it's not the last RecvMsg.
// The last RecvMsg gets either an RPC error or io.EOF.
@@ -370,11 +373,11 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
if err != io.EOF {
end.Error = toRPCErr(err)
}
- stats.HandleRPC(cs.statsCtx, end)
+ cs.statsHandler.HandleRPC(cs.statsCtx, end)
}
}()
var inPayload *stats.InPayload
- if stats.On() {
+ if cs.statsHandler != nil {
inPayload = &stats.InPayload{
Client: true,
}
@@ -395,7 +398,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
cs.mu.Unlock()
}
if inPayload != nil {
- stats.HandleRPC(cs.statsCtx, inPayload)
+ cs.statsHandler.HandleRPC(cs.statsCtx, inPayload)
}
if !cs.desc.ClientStreams || cs.desc.ServerStreams {
return
@@ -520,6 +523,8 @@ type serverStream struct {
statusDesc string
trInfo *traceInfo
+ statsHandler stats.Handler
+
mu sync.Mutex // protects trInfo.tr after the service handler runs.
}
@@ -562,7 +567,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
}
}()
var outPayload *stats.OutPayload
- if stats.On() {
+ if ss.statsHandler != nil {
outPayload = &stats.OutPayload{}
}
out, err := encode(ss.codec, m, ss.cp, ss.cbuf, outPayload)
@@ -580,7 +585,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
}
if outPayload != nil {
outPayload.SentTime = time.Now()
- stats.HandleRPC(ss.s.Context(), outPayload)
+ ss.statsHandler.HandleRPC(ss.s.Context(), outPayload)
}
return nil
}
@@ -601,7 +606,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}
}()
var inPayload *stats.InPayload
- if stats.On() {
+ if ss.statsHandler != nil {
inPayload = &stats.InPayload{}
}
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, inPayload); err != nil {
@@ -614,7 +619,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
return toRPCErr(err)
}
if inPayload != nil {
- stats.HandleRPC(ss.s.Context(), inPayload)
+ ss.statsHandler.HandleRPC(ss.s.Context(), inPayload)
}
return nil
}