aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r--vendor/google.golang.org/grpc/server.go58
1 files changed, 35 insertions, 23 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index b52a563..985226d 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -113,6 +113,7 @@ type options struct {
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
inTapHandle tap.ServerInHandle
+ statsHandler stats.Handler
maxConcurrentStreams uint32
useHandlerImpl bool // use http.Handler-based server
}
@@ -200,6 +201,13 @@ func InTapHandle(h tap.ServerInHandle) ServerOption {
}
}
+// StatsHandler returns a ServerOption that sets the stats handler for the server.
+func StatsHandler(h stats.Handler) ServerOption {
+ return func(o *options) {
+ o.statsHandler = h
+ }
+}
+
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
@@ -438,9 +446,10 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
// transport.NewServerTransport).
func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
config := &transport.ServerConfig{
- MaxStreams: s.opts.maxConcurrentStreams,
- AuthInfo: authInfo,
- InTapHandle: s.opts.inTapHandle,
+ MaxStreams: s.opts.maxConcurrentStreams,
+ AuthInfo: authInfo,
+ InTapHandle: s.opts.inTapHandle,
+ StatsHandler: s.opts.statsHandler,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
@@ -567,7 +576,7 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
if cp != nil {
cbuf = new(bytes.Buffer)
}
- if stats.On() {
+ if s.opts.statsHandler != nil {
outPayload = &stats.OutPayload{}
}
p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
@@ -584,27 +593,28 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
err = t.Write(stream, p, opts)
if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
- stats.HandleRPC(stream.Context(), outPayload)
+ s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
}
return err
}
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
- if stats.On() {
+ sh := s.opts.statsHandler
+ if sh != nil {
begin := &stats.Begin{
BeginTime: time.Now(),
}
- stats.HandleRPC(stream.Context(), begin)
+ sh.HandleRPC(stream.Context(), begin)
}
defer func() {
- if stats.On() {
+ if sh != nil {
end := &stats.End{
EndTime: time.Now(),
}
if err != nil && err != io.EOF {
end.Error = toRPCErr(err)
}
- stats.HandleRPC(stream.Context(), end)
+ sh.HandleRPC(stream.Context(), end)
}
}()
if trInfo != nil {
@@ -665,7 +675,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
}
var inPayload *stats.InPayload
- if stats.On() {
+ if sh != nil {
inPayload = &stats.InPayload{
RecvTime: time.Now(),
}
@@ -699,7 +709,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
inPayload.Payload = v
inPayload.Data = req
inPayload.Length = len(req)
- stats.HandleRPC(stream.Context(), inPayload)
+ sh.HandleRPC(stream.Context(), inPayload)
}
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
@@ -756,35 +766,37 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
- if stats.On() {
+ sh := s.opts.statsHandler
+ if sh != nil {
begin := &stats.Begin{
BeginTime: time.Now(),
}
- stats.HandleRPC(stream.Context(), begin)
+ sh.HandleRPC(stream.Context(), begin)
}
defer func() {
- if stats.On() {
+ if sh != nil {
end := &stats.End{
EndTime: time.Now(),
}
if err != nil && err != io.EOF {
end.Error = toRPCErr(err)
}
- stats.HandleRPC(stream.Context(), end)
+ sh.HandleRPC(stream.Context(), end)
}
}()
if s.opts.cp != nil {
stream.SetSendCompress(s.opts.cp.Type())
}
ss := &serverStream{
- t: t,
- s: stream,
- p: &parser{r: stream},
- codec: s.opts.codec,
- cp: s.opts.cp,
- dc: s.opts.dc,
- maxMsgSize: s.opts.maxMsgSize,
- trInfo: trInfo,
+ t: t,
+ s: stream,
+ p: &parser{r: stream},
+ codec: s.opts.codec,
+ cp: s.opts.cp,
+ dc: s.opts.dc,
+ maxMsgSize: s.opts.maxMsgSize,
+ trInfo: trInfo,
+ statsHandler: sh,
}
if ss.cp != nil {
ss.cbuf = new(bytes.Buffer)