diff options
author | Niall Sheridan <nsheridan@gmail.com> | 2016-12-28 21:18:36 +0000 |
---|---|---|
committer | Niall Sheridan <nsheridan@gmail.com> | 2016-12-28 21:18:36 +0000 |
commit | 73ef85bc5db590c22689e11be20737a3dd88168f (patch) | |
tree | fe393a6f0776bca1889b2113ab341a2922e25d10 /vendor/google.golang.org/grpc | |
parent | 9e573e571fe878ed32947cae5a6d43cb5d72d3bb (diff) |
Update dependencies
Diffstat (limited to 'vendor/google.golang.org/grpc')
18 files changed, 1077 insertions, 153 deletions
diff --git a/vendor/google.golang.org/grpc/README.md b/vendor/google.golang.org/grpc/README.md index 110a8cf..39120c2 100644 --- a/vendor/google.golang.org/grpc/README.md +++ b/vendor/google.golang.org/grpc/README.md @@ -18,6 +18,22 @@ Prerequisites This requires Go 1.5 or later. +A note on the version used: significant performance improvements in benchmarks +of grpc-go have been seen by upgrading the go version from 1.5 to the latest +1.7.1. + +From https://golang.org/doc/install, one way to install the latest version of go is: +``` +$ GO_VERSION=1.7.1 +$ OS=linux +$ ARCH=amd64 +$ curl -O https://storage.googleapis.com/golang/go${GO_VERSION}.${OS}-${ARCH}.tar.gz +$ sudo tar -C /usr/local -xzf go$GO_VERSION.$OS-$ARCH.tar.gz +$ # Put go on the PATH, keep the usual installation dir +$ sudo ln -s /usr/local/go/bin/go /usr/bin/go +$ rm go$GO_VERSION.$OS-$ARCH.tar.gz +``` + Constraints ----------- The grpc package should only depend on standard Go packages and a small number of exceptions. If your contribution introduces new dependencies which are NOT in the [list](http://godoc.org/google.golang.org/grpc?imports), you need a discussion with gRPC-Go authors and consultants. @@ -30,3 +46,12 @@ Status ------ GA +FAQ +--- + +#### Compiling error, undefined: grpc.SupportPackageIsVersion + +Please update proto package, gRPC package and rebuild the proto files: + - `go get -u github.com/golang/protobuf/{proto,protoc-gen-go}` + - `go get -u google.golang.org/grpc` + - `protoc --go_out=plugins=grpc:. *.proto` diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go index 772c817..4d8023d 100644 --- a/vendor/google.golang.org/grpc/call.go +++ b/vendor/google.golang.org/grpc/call.go @@ -42,6 +42,7 @@ import ( "golang.org/x/net/context" "golang.org/x/net/trace" "google.golang.org/grpc/codes" + "google.golang.org/grpc/stats" "google.golang.org/grpc/transport" ) @@ -49,7 +50,8 @@ import ( // On error, it returns the error and indicates whether the call should be retried. // // TODO(zhaoq): Check whether the received message sequence is valid. -func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) { +// TODO ctx is used for stats collection and processing. It is the context passed from the application. +func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) { // Try to acquire header metadata from the server if there is any. defer func() { if err != nil { @@ -63,14 +65,25 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s return } p := &parser{r: stream} + var inPayload *stats.InPayload + if stats.On() { + inPayload = &stats.InPayload{ + Client: true, + } + } for { - if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32); err != nil { + if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inPayload); err != nil { if err == io.EOF { break } return } } + if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK { + // TODO in the current implementation, inTrailer may be handled before inPayload in some cases. + // Fix the order if necessary. + stats.HandleRPC(ctx, inPayload) + } c.trailerMD = stream.Trailer() return nil } @@ -89,15 +102,27 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd } } }() - var cbuf *bytes.Buffer + var ( + cbuf *bytes.Buffer + outPayload *stats.OutPayload + ) if compressor != nil { cbuf = new(bytes.Buffer) } - outBuf, err := encode(codec, args, compressor, cbuf) + if stats.On() { + outPayload = &stats.OutPayload{ + Client: true, + } + } + outBuf, err := encode(codec, args, compressor, cbuf, outPayload) if err != nil { return nil, Errorf(codes.Internal, "grpc: %v", err) } err = t.Write(stream, outBuf, opts) + if err == nil && outPayload != nil { + outPayload.SentTime = time.Now() + stats.HandleRPC(ctx, outPayload) + } // t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method // does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following // recvResponse to get the final status. @@ -118,8 +143,16 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli return invoke(ctx, method, args, reply, cc, opts...) } -func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) { +func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) { c := defaultCallInfo + if mc, ok := cc.getMethodConfig(method); ok { + c.failFast = !mc.WaitForReady + if mc.Timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, mc.Timeout) + defer cancel() + } + } for _, o := range opts { if err := o.before(&c); err != nil { return toRPCErr(err) @@ -140,12 +173,31 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false) // TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set. defer func() { - if err != nil { - c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) + if e != nil { + c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true) c.traceInfo.tr.SetError() } }() } + if stats.On() { + ctx = stats.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method}) + begin := &stats.Begin{ + Client: true, + BeginTime: time.Now(), + FailFast: c.failFast, + } + stats.HandleRPC(ctx, begin) + } + defer func() { + if stats.On() { + end := &stats.End{ + Client: true, + EndTime: time.Now(), + Error: e, + } + stats.HandleRPC(ctx, end) + } + }() topts := &transport.Options{ Last: true, Delay: false, @@ -167,6 +219,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli if cc.dopts.cp != nil { callHdr.SendCompress = cc.dopts.cp.Type() } + gopts := BalancerGetOptions{ BlockingWait: !c.failFast, } @@ -205,7 +258,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } return toRPCErr(err) } - err = recvResponse(cc.dopts, t, &c, stream, reply) + err = recvResponse(ctx, cc.dopts, t, &c, stream, reply) if err != nil { if put != nil { put() diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index 6167472..aa6b63d 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -54,6 +54,8 @@ var ( ErrClientConnClosing = errors.New("grpc: the client connection is closing") // ErrClientConnTimeout indicates that the ClientConn cannot establish the // underlying connections within the specified timeout. + // DEPRECATED: Please use context.DeadlineExceeded instead. This error will be + // removed in Q1 2017. ErrClientConnTimeout = errors.New("grpc: timed out when dialing") // errNoTransportSecurity indicates that there is no transport security @@ -93,6 +95,7 @@ type dialOptions struct { block bool insecure bool timeout time.Duration + scChan <-chan ServiceConfig copts transport.ConnectOptions } @@ -129,6 +132,13 @@ func WithBalancer(b Balancer) DialOption { } } +// WithServiceConfig returns a DialOption which has a channel to read the service configuration. +func WithServiceConfig(c <-chan ServiceConfig) DialOption { + return func(o *dialOptions) { + o.scChan = c + } +} + // WithBackoffMaxDelay configures the dialer to use the provided maximum delay // when backing off after failed connection attempts. func WithBackoffMaxDelay(md time.Duration) DialOption { @@ -199,6 +209,8 @@ func WithTimeout(d time.Duration) DialOption { } // WithDialer returns a DialOption that specifies a function to use for dialing network addresses. +// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's +// Temporary() method to decide if it should try to reconnect to the network address. func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { return func(o *dialOptions) { o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) { @@ -210,6 +222,17 @@ func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { } } +// FailOnNonTempDialError returns a DialOption that specified if gRPC fails on non-temporary dial errors. +// If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network +// address and won't try to reconnect. +// The default value of FailOnNonTempDialError is false. +// This is an EXPERIMENTAL API. +func FailOnNonTempDialError(f bool) DialOption { + return func(o *dialOptions) { + o.copts.FailOnNonTempDialError = f + } +} + // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs. func WithUserAgent(s string) DialOption { return func(o *dialOptions) { @@ -247,6 +270,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * conns: make(map[Address]*addrConn), } cc.ctx, cc.cancel = context.WithCancel(context.Background()) + for _, opt := range opts { + opt(&cc.dopts) + } + if cc.dopts.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) + defer cancel() + } + defer func() { select { case <-ctx.Done(): @@ -259,10 +291,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } }() - for _, opt := range opts { - opt(&cc.dopts) + if cc.dopts.scChan != nil { + // Wait for the initial service config. + select { + case sc, ok := <-cc.dopts.scChan: + if ok { + cc.sc = sc + } + case <-ctx.Done(): + return nil, ctx.Err() + } } - // Set defaults. if cc.dopts.codec == nil { cc.dopts.codec = protoCodec{} @@ -284,6 +323,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * waitC := make(chan error, 1) go func() { var addrs []Address + if cc.dopts.balancer == nil && cc.sc.LB != nil { + cc.dopts.balancer = cc.sc.LB + } if cc.dopts.balancer == nil { // Connect to target directly if balancer is nil. addrs = append(addrs, Address{Addr: target}) @@ -319,10 +361,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } close(waitC) }() - var timeoutCh <-chan time.Time - if cc.dopts.timeout > 0 { - timeoutCh = time.After(cc.dopts.timeout) - } select { case <-ctx.Done(): return nil, ctx.Err() @@ -330,14 +368,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * if err != nil { return nil, err } - case <-timeoutCh: - return nil, ErrClientConnTimeout } + // If balancer is nil or balancer.Notify() is nil, ok will be false here. // The lbWatcher goroutine will not be created. if ok { go cc.lbWatcher() } + + if cc.dopts.scChan != nil { + go cc.scWatcher() + } return cc, nil } @@ -384,6 +425,7 @@ type ClientConn struct { dopts dialOptions mu sync.RWMutex + sc ServiceConfig conns map[Address]*addrConn } @@ -422,6 +464,24 @@ func (cc *ClientConn) lbWatcher() { } } +func (cc *ClientConn) scWatcher() { + for { + select { + case sc, ok := <-cc.dopts.scChan: + if !ok { + return + } + cc.mu.Lock() + // TODO: load balance policy runtime change is ignored. + // We may revist this decision in the future. + cc.sc = sc + cc.mu.Unlock() + case <-cc.ctx.Done(): + return + } + } +} + // resetAddrConn creates an addrConn for addr and adds it to cc.conns. // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason. // If tearDownErr is nil, errConnDrain will be used instead. @@ -509,6 +569,14 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err return nil } +// TODO: Avoid the locking here. +func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) { + cc.mu.RLock() + defer cc.mu.RUnlock() + m, ok = cc.sc.Methods[method] + return +} + func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { var ( ac *addrConn diff --git a/vendor/google.golang.org/grpc/codegen.sh b/vendor/google.golang.org/grpc/codegen.sh index b009488..4cdc6ba 100755 --- a/vendor/google.golang.org/grpc/codegen.sh +++ b/vendor/google.golang.org/grpc/codegen.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/usr/bin/env bash # This script serves as an example to demonstrate how to generate the gRPC-Go # interface and the related messages from .proto file. diff --git a/vendor/google.golang.org/grpc/coverage.sh b/vendor/google.golang.org/grpc/coverage.sh index 1202353..b85f918 100755 --- a/vendor/google.golang.org/grpc/coverage.sh +++ b/vendor/google.golang.org/grpc/coverage.sh @@ -1,4 +1,5 @@ -#!/bin/bash +#!/usr/bin/env bash + set -e diff --git a/vendor/google.golang.org/grpc/credentials/oauth/oauth.go b/vendor/google.golang.org/grpc/credentials/oauth/oauth.go index 8e68c4d..25393cc 100644 --- a/vendor/google.golang.org/grpc/credentials/oauth/oauth.go +++ b/vendor/google.golang.org/grpc/credentials/oauth/oauth.go @@ -61,7 +61,7 @@ func (ts TokenSource) GetRequestMetadata(ctx context.Context, uri ...string) (ma }, nil } -// RequireTransportSecurity indicates whether the credentails requires transport security. +// RequireTransportSecurity indicates whether the credentials requires transport security. func (ts TokenSource) RequireTransportSecurity() bool { return true } diff --git a/vendor/google.golang.org/grpc/metadata/metadata.go b/vendor/google.golang.org/grpc/metadata/metadata.go index 3c0ca7a..65dc5af 100644 --- a/vendor/google.golang.org/grpc/metadata/metadata.go +++ b/vendor/google.golang.org/grpc/metadata/metadata.go @@ -141,6 +141,8 @@ func NewContext(ctx context.Context, md MD) context.Context { } // FromContext returns the MD in ctx if it exists. +// The returned md should be immutable, writing to it may cause races. +// Modification should be made to the copies of the returned md. func FromContext(ctx context.Context) (md MD, ok bool) { md, ok = ctx.Value(mdKey{}).(MD) return diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go index 6b60095..2619d39 100644 --- a/vendor/google.golang.org/grpc/rpc_util.go +++ b/vendor/google.golang.org/grpc/rpc_util.go @@ -42,11 +42,13 @@ import ( "io/ioutil" "math" "os" + "time" "github.com/golang/protobuf/proto" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/stats" "google.golang.org/grpc/transport" ) @@ -255,9 +257,11 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro // encode serializes msg and prepends the message header. If msg is nil, it // generates the message header of 0 message length. -func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte, error) { - var b []byte - var length uint +func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayload *stats.OutPayload) ([]byte, error) { + var ( + b []byte + length uint + ) if msg != nil { var err error // TODO(zhaoq): optimize to reduce memory alloc and copying. @@ -265,6 +269,12 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte if err != nil { return nil, err } + if outPayload != nil { + outPayload.Payload = msg + // TODO truncate large payload. + outPayload.Data = b + outPayload.Length = len(b) + } if cp != nil { if err := cp.Do(cbuf, b); err != nil { return nil, err @@ -295,6 +305,10 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte // Copy encoded msg to buf copy(buf[5:], b) + if outPayload != nil { + outPayload.WireLength = len(buf) + } + return buf, nil } @@ -311,11 +325,14 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er return nil } -func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int) error { +func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error { pf, d, err := p.recvMsg(maxMsgSize) if err != nil { return err } + if inPayload != nil { + inPayload.WireLength = len(d) + } if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil { return err } @@ -333,6 +350,13 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{ if err := c.Unmarshal(d, m); err != nil { return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) } + if inPayload != nil { + inPayload.RecvTime = time.Now() + inPayload.Payload = m + // TODO truncate large payload. + inPayload.Data = d + inPayload.Length = len(d) + } return nil } @@ -448,10 +472,48 @@ func convertCode(err error) codes.Code { return codes.Unknown } -// SupportPackageIsVersion3 is referenced from generated protocol buffer files +// MethodConfig defines the configuration recommended by the service providers for a +// particular method. +// This is EXPERIMENTAL and subject to change. +type MethodConfig struct { + // WaitForReady indicates whether RPCs sent to this method should wait until + // the connection is ready by default (!failfast). The value specified via the + // gRPC client API will override the value set here. + WaitForReady bool + // Timeout is the default timeout for RPCs sent to this method. The actual + // deadline used will be the minimum of the value specified here and the value + // set by the application via the gRPC client API. If either one is not set, + // then the other will be used. If neither is set, then the RPC has no deadline. + Timeout time.Duration + // MaxReqSize is the maximum allowed payload size for an individual request in a + // stream (client->server) in bytes. The size which is measured is the serialized, + // uncompressed payload in bytes. The actual value used is the minumum of the value + // specified here and the value set by the application via the gRPC client API. If + // either one is not set, then the other will be used. If neither is set, then the + // built-in default is used. + // TODO: support this. + MaxReqSize uint64 + // MaxRespSize is the maximum allowed payload size for an individual response in a + // stream (server->client) in bytes. + // TODO: support this. + MaxRespSize uint64 +} + +// ServiceConfig is provided by the service provider and contains parameters for how +// clients that connect to the service should behave. +// This is EXPERIMENTAL and subject to change. +type ServiceConfig struct { + // LB is the load balancer the service providers recommends. The balancer specified + // via grpc.WithBalancer will override this. + LB Balancer + // Methods contains a map for the methods in this service. + Methods map[string]MethodConfig +} + +// SupportPackageIsVersion4 is referenced from generated protocol buffer files // to assert that that code is compatible with this version of the grpc package. // // This constant may be renamed in the future if a change in the generated code // requires a synchronised update of grpc-go and protoc-gen-go. This constant // should not be referenced from any other code. -const SupportPackageIsVersion3 = true +const SupportPackageIsVersion4 = true diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index e0bb187..b52a563 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -54,6 +54,8 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/stats" + "google.golang.org/grpc/tap" "google.golang.org/grpc/transport" ) @@ -110,6 +112,7 @@ type options struct { maxMsgSize int unaryInt UnaryServerInterceptor streamInt StreamServerInterceptor + inTapHandle tap.ServerInHandle maxConcurrentStreams uint32 useHandlerImpl bool // use http.Handler-based server } @@ -186,6 +189,17 @@ func StreamInterceptor(i StreamServerInterceptor) ServerOption { } } +// InTapHandle returns a ServerOption that sets the tap handle for all the server +// transport to be created. Only one can be installed. +func InTapHandle(h tap.ServerInHandle) ServerOption { + return func(o *options) { + if o.inTapHandle != nil { + panic("The tap handle has been set.") + } + o.inTapHandle = h + } +} + // NewServer creates a gRPC server which has no service registered and has not // started to accept requests yet. func NewServer(opt ...ServerOption) *Server { @@ -329,6 +343,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti // read gRPC requests and then call the registered handlers to reply to them. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when // this method returns. +// Serve always returns non-nil error. func (s *Server) Serve(lis net.Listener) error { s.mu.Lock() s.printf("serving") @@ -412,17 +427,22 @@ func (s *Server) handleRawConn(rawConn net.Conn) { if s.opts.useHandlerImpl { s.serveUsingHandler(conn) } else { - s.serveNewHTTP2Transport(conn, authInfo) + s.serveHTTP2Transport(conn, authInfo) } } -// serveNewHTTP2Transport sets up a new http/2 transport (using the +// serveHTTP2Transport sets up a http/2 transport (using the // gRPC http2 server transport in transport/http2_server.go) and // serves streams on it. // This is run in its own goroutine (it does network I/O in // transport.NewServerTransport). -func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) { - st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo) +func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) { + config := &transport.ServerConfig{ + MaxStreams: s.opts.maxConcurrentStreams, + AuthInfo: authInfo, + InTapHandle: s.opts.inTapHandle, + } + st, err := transport.NewServerTransport("http2", c, config) if err != nil { s.mu.Lock() s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) @@ -448,6 +468,12 @@ func (s *Server) serveStreams(st transport.ServerTransport) { defer wg.Done() s.handleStream(st, stream, s.traceInfo(st, stream)) }() + }, func(ctx context.Context, method string) context.Context { + if !EnableTracing { + return ctx + } + tr := trace.New("grpc.Recv."+methodFamily(method), method) + return trace.NewContext(ctx, tr) }) wg.Wait() } @@ -497,15 +523,17 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled. // If tracing is not enabled, it returns nil. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) { - if !EnableTracing { + tr, ok := trace.FromContext(stream.Context()) + if !ok { return nil } + trInfo = &traceInfo{ - tr: trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()), + tr: tr, } trInfo.firstLine.client = false trInfo.firstLine.remoteAddr = st.RemoteAddr() - stream.TraceContext(trInfo.tr) + if dl, ok := stream.Context().Deadline(); ok { trInfo.firstLine.deadline = dl.Sub(time.Now()) } @@ -532,11 +560,17 @@ func (s *Server) removeConn(c io.Closer) { } func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error { - var cbuf *bytes.Buffer + var ( + cbuf *bytes.Buffer + outPayload *stats.OutPayload + ) if cp != nil { cbuf = new(bytes.Buffer) } - p, err := encode(s.opts.codec, msg, cp, cbuf) + if stats.On() { + outPayload = &stats.OutPayload{} + } + p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload) if err != nil { // This typically indicates a fatal issue (e.g., memory // corruption or hardware faults) the application program @@ -547,10 +581,32 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str // the optimal option. grpclog.Fatalf("grpc: Server failed to encode response %v", err) } - return t.Write(stream, p, opts) + err = t.Write(stream, p, opts) + if err == nil && outPayload != nil { + outPayload.SentTime = time.Now() + stats.HandleRPC(stream.Context(), outPayload) + } + return err } func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { + if stats.On() { + begin := &stats.Begin{ + BeginTime: time.Now(), + } + stats.HandleRPC(stream.Context(), begin) + } + defer func() { + if stats.On() { + end := &stats.End{ + EndTime: time.Now(), + } + if err != nil && err != io.EOF { + end.Error = toRPCErr(err) + } + stats.HandleRPC(stream.Context(), end) + } + }() if trInfo != nil { defer trInfo.tr.Finish() trInfo.firstLine.client = false @@ -579,14 +635,14 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if err != nil { switch err := err.(type) { case *rpcError: - if err := t.WriteStatus(stream, err.code, err.desc); err != nil { - grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err) + if e := t.WriteStatus(stream, err.code, err.desc); e != nil { + grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e) } case transport.ConnectionError: // Nothing to do here. case transport.StreamError: - if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil { - grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err) + if e := t.WriteStatus(stream, err.Code, err.Desc); e != nil { + grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e) } default: panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err)) @@ -597,20 +653,29 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil { switch err := err.(type) { case *rpcError: - if err := t.WriteStatus(stream, err.code, err.desc); err != nil { - grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err) + if e := t.WriteStatus(stream, err.code, err.desc); e != nil { + grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e) } + return err default: - if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil { - grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err) + if e := t.WriteStatus(stream, codes.Internal, err.Error()); e != nil { + grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e) } - + // TODO checkRecvPayload always return RPC error. Add a return here if necessary. + } + } + var inPayload *stats.InPayload + if stats.On() { + inPayload = &stats.InPayload{ + RecvTime: time.Now(), } - return err } statusCode := codes.OK statusDesc := "" df := func(v interface{}) error { + if inPayload != nil { + inPayload.WireLength = len(req) + } if pf == compressionMade { var err error req, err = s.opts.dc.Do(bytes.NewReader(req)) @@ -618,7 +683,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil { grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err) } - return err + return Errorf(codes.Internal, err.Error()) } } if len(req) > s.opts.maxMsgSize { @@ -630,6 +695,12 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if err := s.opts.codec.Unmarshal(req, v); err != nil { return err } + if inPayload != nil { + inPayload.Payload = v + inPayload.Data = req + inPayload.Length = len(req) + stats.HandleRPC(stream.Context(), inPayload) + } if trInfo != nil { trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) } @@ -650,9 +721,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil { grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err) - return err } - return nil + return Errorf(statusCode, statusDesc) } if trInfo != nil { trInfo.tr.LazyLog(stringer("OK"), false) @@ -677,11 +747,32 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if trInfo != nil { trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) } - return t.WriteStatus(stream, statusCode, statusDesc) + errWrite := t.WriteStatus(stream, statusCode, statusDesc) + if statusCode != codes.OK { + return Errorf(statusCode, statusDesc) + } + return errWrite } } func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { + if stats.On() { + begin := &stats.Begin{ + BeginTime: time.Now(), + } + stats.HandleRPC(stream.Context(), begin) + } + defer func() { + if stats.On() { + end := &stats.End{ + EndTime: time.Now(), + } + if err != nil && err != io.EOF { + end.Error = toRPCErr(err) + } + stats.HandleRPC(stream.Context(), end) + } + }() if s.opts.cp != nil { stream.SetSendCompress(s.opts.cp.Type()) } @@ -744,7 +835,11 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp } ss.mu.Unlock() } - return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc) + errWrite := t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc) + if ss.statusCode != codes.OK { + return Errorf(ss.statusCode, ss.statusDesc) + } + return errWrite } @@ -759,7 +854,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) trInfo.tr.SetError() } - if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil { + errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) + if err := t.WriteStatus(stream, codes.InvalidArgument, errDesc); err != nil { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) trInfo.tr.SetError() @@ -779,7 +875,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true) trInfo.tr.SetError() } - if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil { + errDesc := fmt.Sprintf("unknown service %v", service) + if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) trInfo.tr.SetError() @@ -804,7 +901,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true) trInfo.tr.SetError() } - if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil { + errDesc := fmt.Sprintf("unknown method %v", method) + if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) trInfo.tr.SetError() diff --git a/vendor/google.golang.org/grpc/stats/handlers.go b/vendor/google.golang.org/grpc/stats/handlers.go new file mode 100644 index 0000000..ce47786 --- /dev/null +++ b/vendor/google.golang.org/grpc/stats/handlers.go @@ -0,0 +1,146 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +package stats + +import ( + "net" + "sync/atomic" + + "golang.org/x/net/context" + "google.golang.org/grpc/grpclog" +) + +// ConnTagInfo defines the relevant information needed by connection context tagger. +type ConnTagInfo struct { + // RemoteAddr is the remote address of the corresponding connection. + RemoteAddr net.Addr + // LocalAddr is the local address of the corresponding connection. + LocalAddr net.Addr + // TODO add QOS related fields. +} + +// RPCTagInfo defines the relevant information needed by RPC context tagger. +type RPCTagInfo struct { + // FullMethodName is the RPC method in the format of /package.service/method. + FullMethodName string +} + +var ( + on = new(int32) + rpcHandler func(context.Context, RPCStats) + connHandler func(context.Context, ConnStats) + connTagger func(context.Context, *ConnTagInfo) context.Context + rpcTagger func(context.Context, *RPCTagInfo) context.Context +) + +// HandleRPC processes the RPC stats using the rpc handler registered by the user. +func HandleRPC(ctx context.Context, s RPCStats) { + if rpcHandler == nil { + return + } + rpcHandler(ctx, s) +} + +// RegisterRPCHandler registers the user handler function for RPC stats processing. +// It should be called only once. The later call will overwrite the former value if it is called multiple times. +// This handler function will be called to process the rpc stats. +func RegisterRPCHandler(f func(context.Context, RPCStats)) { + rpcHandler = f +} + +// HandleConn processes the stats using the call back function registered by user. +func HandleConn(ctx context.Context, s ConnStats) { + if connHandler == nil { + return + } + connHandler(ctx, s) +} + +// RegisterConnHandler registers the user handler function for conn stats. +// It should be called only once. The later call will overwrite the former value if it is called multiple times. +// This handler function will be called to process the conn stats. +func RegisterConnHandler(f func(context.Context, ConnStats)) { + connHandler = f +} + +// TagConn calls user registered connection context tagger. +func TagConn(ctx context.Context, info *ConnTagInfo) context.Context { + if connTagger == nil { + return ctx + } + return connTagger(ctx, info) +} + +// RegisterConnTagger registers the user connection context tagger function. +// The connection context tagger can attach some information to the given context. +// The returned context will be used for stats handling. +// For conn stats handling, the context used in connHandler for this +// connection will be derived from the context returned. +// For RPC stats handling, +// - On server side, the context used in rpcHandler for all RPCs on this +// connection will be derived from the context returned. +// - On client side, the context is not derived from the context returned. +func RegisterConnTagger(t func(context.Context, *ConnTagInfo) context.Context) { + connTagger = t +} + +// TagRPC calls the user registered RPC context tagger. +func TagRPC(ctx context.Context, info *RPCTagInfo) context.Context { + if rpcTagger == nil { + return ctx + } + return rpcTagger(ctx, info) +} + +// RegisterRPCTagger registers the user RPC context tagger function. +// The RPC context tagger can attach some information to the given context. +// The context used in stats rpcHandler for this RPC will be derived from the +// context returned. +func RegisterRPCTagger(t func(context.Context, *RPCTagInfo) context.Context) { + rpcTagger = t +} + +// Start starts the stats collection and processing if there is a registered stats handle. +func Start() { + if rpcHandler == nil && connHandler == nil { + grpclog.Println("rpcHandler and connHandler are both nil when starting stats. Stats is not started") + return + } + atomic.StoreInt32(on, 1) +} + +// On indicates whether the stats collection and processing is on. +func On() bool { + return atomic.CompareAndSwapInt32(on, 1, 1) +} diff --git a/vendor/google.golang.org/grpc/stats/stats.go b/vendor/google.golang.org/grpc/stats/stats.go new file mode 100644 index 0000000..a82448a --- /dev/null +++ b/vendor/google.golang.org/grpc/stats/stats.go @@ -0,0 +1,223 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +// Package stats is for collecting and reporting various network and RPC stats. +// This package is for monitoring purpose only. All fields are read-only. +// All APIs are experimental. +package stats // import "google.golang.org/grpc/stats" + +import ( + "net" + "time" +) + +// RPCStats contains stats information about RPCs. +type RPCStats interface { + isRPCStats() + // IsClient returns true if this RPCStats is from client side. + IsClient() bool +} + +// Begin contains stats when an RPC begins. +// FailFast are only valid if Client is true. +type Begin struct { + // Client is true if this Begin is from client side. + Client bool + // BeginTime is the time when the RPC begins. + BeginTime time.Time + // FailFast indicates if this RPC is failfast. + FailFast bool +} + +// IsClient indicates if this is from client side. +func (s *Begin) IsClient() bool { return s.Client } + +func (s *Begin) isRPCStats() {} + +// InPayload contains the information for an incoming payload. +type InPayload struct { + // Client is true if this InPayload is from client side. + Client bool + // Payload is the payload with original type. + Payload interface{} + // Data is the serialized message payload. + Data []byte + // Length is the length of uncompressed data. + Length int + // WireLength is the length of data on wire (compressed, signed, encrypted). + WireLength int + // RecvTime is the time when the payload is received. + RecvTime time.Time +} + +// IsClient indicates if this is from client side. +func (s *InPayload) IsClient() bool { return s.Client } + +func (s *InPayload) isRPCStats() {} + +// InHeader contains stats when a header is received. +// FullMethod, addresses and Compression are only valid if Client is false. +type InHeader struct { + // Client is true if this InHeader is from client side. + Client bool + // WireLength is the wire length of header. + WireLength int + + // FullMethod is the full RPC method string, i.e., /package.service/method. + FullMethod string + // RemoteAddr is the remote address of the corresponding connection. + RemoteAddr net.Addr + // LocalAddr is the local address of the corresponding connection. + LocalAddr net.Addr + // Compression is the compression algorithm used for the RPC. + Compression string +} + +// IsClient indicates if this is from client side. +func (s *InHeader) IsClient() bool { return s.Client } + +func (s *InHeader) isRPCStats() {} + +// InTrailer contains stats when a trailer is received. +type InTrailer struct { + // Client is true if this InTrailer is from client side. + Client bool + // WireLength is the wire length of trailer. + WireLength int +} + +// IsClient indicates if this is from client side. +func (s *InTrailer) IsClient() bool { return s.Client } + +func (s *InTrailer) isRPCStats() {} + +// OutPayload contains the information for an outgoing payload. +type OutPayload struct { + // Client is true if this OutPayload is from client side. + Client bool + // Payload is the payload with original type. + Payload interface{} + // Data is the serialized message payload. + Data []byte + // Length is the length of uncompressed data. + Length int + // WireLength is the length of data on wire (compressed, signed, encrypted). + WireLength int + // SentTime is the time when the payload is sent. + SentTime time.Time +} + +// IsClient indicates if this is from client side. +func (s *OutPayload) IsClient() bool { return s.Client } + +func (s *OutPayload) isRPCStats() {} + +// OutHeader contains stats when a header is sent. +// FullMethod, addresses and Compression are only valid if Client is true. +type OutHeader struct { + // Client is true if this OutHeader is from client side. + Client bool + // WireLength is the wire length of header. + WireLength int + + // FullMethod is the full RPC method string, i.e., /package.service/method. + FullMethod string + // RemoteAddr is the remote address of the corresponding connection. + RemoteAddr net.Addr + // LocalAddr is the local address of the corresponding connection. + LocalAddr net.Addr + // Compression is the compression algorithm used for the RPC. + Compression string +} + +// IsClient indicates if this is from client side. +func (s *OutHeader) IsClient() bool { return s.Client } + +func (s *OutHeader) isRPCStats() {} + +// OutTrailer contains stats when a trailer is sent. +type OutTrailer struct { + // Client is true if this OutTrailer is from client side. + Client bool + // WireLength is the wire length of trailer. + WireLength int +} + +// IsClient indicates if this is from client side. +func (s *OutTrailer) IsClient() bool { return s.Client } + +func (s *OutTrailer) isRPCStats() {} + +// End contains stats when an RPC ends. +type End struct { + // Client is true if this End is from client side. + Client bool + // EndTime is the time when the RPC ends. + EndTime time.Time + // Error is the error just happened. Its type is gRPC error. + Error error +} + +// IsClient indicates if this is from client side. +func (s *End) IsClient() bool { return s.Client } + +func (s *End) isRPCStats() {} + +// ConnStats contains stats information about connections. +type ConnStats interface { + isConnStats() + // IsClient returns true if this ConnStats is from client side. + IsClient() bool +} + +// ConnBegin contains the stats of a connection when it is established. +type ConnBegin struct { + // Client is true if this ConnBegin is from client side. + Client bool +} + +// IsClient indicates if this is from client side. +func (s *ConnBegin) IsClient() bool { return s.Client } + +func (s *ConnBegin) isConnStats() {} + +// ConnEnd contains the stats of a connection when it ends. +type ConnEnd struct { + // Client is true if this ConnEnd is from client side. + Client bool +} + +// IsClient indicates if this is from client side. +func (s *ConnEnd) IsClient() bool { return s.Client } + +func (s *ConnEnd) isConnStats() {} diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index 4681054..d3a4deb 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -45,6 +45,7 @@ import ( "golang.org/x/net/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/stats" "google.golang.org/grpc/transport" ) @@ -97,7 +98,7 @@ type ClientStream interface { // NewClientStream creates a new Stream for the client side. This is called // by generated code. -func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { +func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { if cc.dopts.streamInt != nil { return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) } @@ -106,11 +107,18 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { var ( - t transport.ClientTransport - s *transport.Stream - put func() + t transport.ClientTransport + s *transport.Stream + put func() + cancel context.CancelFunc ) c := defaultCallInfo + if mc, ok := cc.getMethodConfig(method); ok { + c.failFast = !mc.WaitForReady + if mc.Timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, mc.Timeout) + } + } for _, o := range opts { if err := o.before(&c); err != nil { return nil, toRPCErr(err) @@ -143,6 +151,25 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } }() } + if stats.On() { + ctx = stats.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method}) + begin := &stats.Begin{ + Client: true, + BeginTime: time.Now(), + FailFast: c.failFast, + } + stats.HandleRPC(ctx, begin) + } + defer func() { + if err != nil && stats.On() { + // Only handle end stats if err != nil. + end := &stats.End{ + Client: true, + Error: err, + } + stats.HandleRPC(ctx, end) + } + }() gopts := BalancerGetOptions{ BlockingWait: !c.failFast, } @@ -180,12 +207,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth break } cs := &clientStream{ - opts: opts, - c: c, - desc: desc, - codec: cc.dopts.codec, - cp: cc.dopts.cp, - dc: cc.dopts.dc, + opts: opts, + c: c, + desc: desc, + codec: cc.dopts.codec, + cp: cc.dopts.cp, + dc: cc.dopts.dc, + cancel: cancel, put: put, t: t, @@ -194,6 +222,8 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth tracing: EnableTracing, trInfo: trInfo, + + statsCtx: ctx, } if cc.dopts.cp != nil { cs.cbuf = new(bytes.Buffer) @@ -227,16 +257,17 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth // clientStream implements a client side Stream. type clientStream struct { - opts []CallOption - c callInfo - t transport.ClientTransport - s *transport.Stream - p *parser - desc *StreamDesc - codec Codec - cp Compressor - cbuf *bytes.Buffer - dc Decompressor + opts []CallOption + c callInfo + t transport.ClientTransport + s *transport.Stream + p *parser + desc *StreamDesc + codec Codec + cp Compressor + cbuf *bytes.Buffer + dc Decompressor + cancel context.CancelFunc tracing bool // set to EnableTracing when the clientStream is created. @@ -246,6 +277,11 @@ type clientStream struct { // trInfo.tr is set when the clientStream is created (if EnableTracing is true), // and is set to nil when the clientStream's finish method is called. trInfo traceInfo + + // statsCtx keeps the user context for stats handling. + // All stats collection should use the statsCtx (instead of the stream context) + // so that all the generated stats for a particular RPC can be associated in the processing phase. + statsCtx context.Context } func (cs *clientStream) Context() context.Context { @@ -274,6 +310,8 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } cs.mu.Unlock() } + // TODO Investigate how to signal the stats handling party. + // generate error stats if err != nil && err != io.EOF? defer func() { if err != nil { cs.finish(err) @@ -296,7 +334,13 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } err = toRPCErr(err) }() - out, err := encode(cs.codec, m, cs.cp, cs.cbuf) + var outPayload *stats.OutPayload + if stats.On() { + outPayload = &stats.OutPayload{ + Client: true, + } + } + out, err := encode(cs.codec, m, cs.cp, cs.cbuf, outPayload) defer func() { if cs.cbuf != nil { cs.cbuf.Reset() @@ -305,11 +349,37 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if err != nil { return Errorf(codes.Internal, "grpc: %v", err) } - return cs.t.Write(cs.s, out, &transport.Options{Last: false}) + err = cs.t.Write(cs.s, out, &transport.Options{Last: false}) + if err == nil && outPayload != nil { + outPayload.SentTime = time.Now() + stats.HandleRPC(cs.statsCtx, outPayload) + } + return err } func (cs *clientStream) RecvMsg(m interface{}) (err error) { - err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32) + defer func() { + if err != nil && stats.On() { + // Only generate End if err != nil. + // If err == nil, it's not the last RecvMsg. + // The last RecvMsg gets either an RPC error or io.EOF. + end := &stats.End{ + Client: true, + EndTime: time.Now(), + } + if err != io.EOF { + end.Error = toRPCErr(err) + } + stats.HandleRPC(cs.statsCtx, end) + } + }() + var inPayload *stats.InPayload + if stats.On() { + inPayload = &stats.InPayload{ + Client: true, + } + } + err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inPayload) defer func() { // err != nil indicates the termination of the stream. if err != nil { @@ -324,11 +394,15 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { } cs.mu.Unlock() } + if inPayload != nil { + stats.HandleRPC(cs.statsCtx, inPayload) + } if !cs.desc.ClientStreams || cs.desc.ServerStreams { return } // Special handling for client streaming rpc. - err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32) + // This recv expects EOF or errors, so we don't collect inPayload. + err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, nil) cs.closeTransportStream(err) if err == nil { return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) @@ -384,6 +458,11 @@ func (cs *clientStream) closeTransportStream(err error) { } func (cs *clientStream) finish(err error) { + defer func() { + if cs.cancel != nil { + cs.cancel() + } + }() cs.mu.Lock() defer cs.mu.Unlock() for _, o := range cs.opts { @@ -482,7 +561,11 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { ss.mu.Unlock() } }() - out, err := encode(ss.codec, m, ss.cp, ss.cbuf) + var outPayload *stats.OutPayload + if stats.On() { + outPayload = &stats.OutPayload{} + } + out, err := encode(ss.codec, m, ss.cp, ss.cbuf, outPayload) defer func() { if ss.cbuf != nil { ss.cbuf.Reset() @@ -495,6 +578,10 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil { return toRPCErr(err) } + if outPayload != nil { + outPayload.SentTime = time.Now() + stats.HandleRPC(ss.s.Context(), outPayload) + } return nil } @@ -513,7 +600,11 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { ss.mu.Unlock() } }() - if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize); err != nil { + var inPayload *stats.InPayload + if stats.On() { + inPayload = &stats.InPayload{} + } + if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, inPayload); err != nil { if err == io.EOF { return err } @@ -522,5 +613,8 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { } return toRPCErr(err) } + if inPayload != nil { + stats.HandleRPC(ss.s.Context(), inPayload) + } return nil } diff --git a/vendor/google.golang.org/grpc/tap/tap.go b/vendor/google.golang.org/grpc/tap/tap.go new file mode 100644 index 0000000..0f36647 --- /dev/null +++ b/vendor/google.golang.org/grpc/tap/tap.go @@ -0,0 +1,54 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +// Package tap defines the function handles which are executed on the transport +// layer of gRPC-Go and related information. Everything here is EXPERIMENTAL. +package tap + +import ( + "golang.org/x/net/context" +) + +// Info defines the relevant information needed by the handles. +type Info struct { + // FullMethodName is the string of grpc method (in the format of + // /package.service/method). + FullMethodName string + // TODO: More to be added. +} + +// ServerInHandle defines the function which runs when a new stream is created +// on the server side. Note that it is executed in the per-connection I/O goroutine(s) instead +// of per-RPC goroutine. Therefore, users should NOT have any blocking/time-consuming +// work in this handle. Otherwise all the RPCs would slow down. +type ServerInHandle func(ctx context.Context, info *Info) (context.Context, error) diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/control.go index 4ef0830..2586cba 100644 --- a/vendor/google.golang.org/grpc/transport/control.go +++ b/vendor/google.golang.org/grpc/transport/control.go @@ -111,35 +111,9 @@ func newQuotaPool(q int) *quotaPool { return qb } -// add adds n to the available quota and tries to send it on acquire. -func (qb *quotaPool) add(n int) { - qb.mu.Lock() - defer qb.mu.Unlock() - qb.quota += n - if qb.quota <= 0 { - return - } - select { - case qb.c <- qb.quota: - qb.quota = 0 - default: - } -} - -// cancel cancels the pending quota sent on acquire, if any. -func (qb *quotaPool) cancel() { - qb.mu.Lock() - defer qb.mu.Unlock() - select { - case n := <-qb.c: - qb.quota += n - default: - } -} - -// reset cancels the pending quota sent on acquired, incremented by v and sends +// add cancels the pending quota sent on acquired, incremented by v and sends // it back on acquire. -func (qb *quotaPool) reset(v int) { +func (qb *quotaPool) add(v int) { qb.mu.Lock() defer qb.mu.Unlock() select { @@ -151,6 +125,10 @@ func (qb *quotaPool) reset(v int) { if qb.quota <= 0 { return } + // After the pool has been created, this is the only place that sends on + // the channel. Since mu is held at this point and any quota that was sent + // on the channel has been retrieved, we know that this code will always + // place any positive quota value on the channel. select { case qb.c <- qb.quota: qb.quota = 0 diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/transport/handler_server.go index 114e349..10b6dc0 100644 --- a/vendor/google.golang.org/grpc/transport/handler_server.go +++ b/vendor/google.golang.org/grpc/transport/handler_server.go @@ -268,7 +268,7 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error { }) } -func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) { +func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) { // With this transport type there will be exactly 1 stream: this HTTP request. var ctx context.Context diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go index 2b0f680..605b1e5 100644 --- a/vendor/google.golang.org/grpc/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/transport/http2_client.go @@ -51,16 +51,20 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" + "google.golang.org/grpc/stats" ) // http2Client implements the ClientTransport interface with HTTP2. type http2Client struct { - target string // server name/addr - userAgent string - md interface{} - conn net.Conn // underlying communication channel - authInfo credentials.AuthInfo // auth info about the connection - nextID uint32 // the next stream ID to be used + ctx context.Context + target string // server name/addr + userAgent string + md interface{} + conn net.Conn // underlying communication channel + remoteAddr net.Addr + localAddr net.Addr + authInfo credentials.AuthInfo // auth info about the connection + nextID uint32 // the next stream ID to be used // writableChan synchronizes write access to the transport. // A writer acquires the write lock by sending a value on writableChan @@ -150,6 +154,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( scheme := "http" conn, err := dial(ctx, opts.Dialer, addr.Addr) if err != nil { + if opts.FailOnNonTempDialError { + return nil, connectionErrorf(isTemporary(err), err, "transport: %v", err) + } return nil, connectionErrorf(true, err, "transport: %v", err) } // Any further errors will close the underlying connection @@ -175,11 +182,14 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( } var buf bytes.Buffer t := &http2Client{ - target: addr.Addr, - userAgent: ua, - md: addr.Metadata, - conn: conn, - authInfo: authInfo, + ctx: ctx, + target: addr.Addr, + userAgent: ua, + md: addr.Metadata, + conn: conn, + remoteAddr: conn.RemoteAddr(), + localAddr: conn.LocalAddr(), + authInfo: authInfo, // The client initiated stream id is odd starting from 1. nextID: 1, writableChan: make(chan int, 1), @@ -199,6 +209,16 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( maxStreams: math.MaxInt32, streamSendQuota: defaultWindowSize, } + if stats.On() { + t.ctx = stats.TagConn(t.ctx, &stats.ConnTagInfo{ + RemoteAddr: t.remoteAddr, + LocalAddr: t.localAddr, + }) + connBegin := &stats.ConnBegin{ + Client: true, + } + stats.HandleConn(t.ctx, connBegin) + } // Start the reader goroutine for incoming message. Each transport has // a dedicated goroutine which reads HTTP2 frame from network. Then it // dispatches the frame to the corresponding stream entity. @@ -270,12 +290,13 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { // streams. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { pr := &peer.Peer{ - Addr: t.conn.RemoteAddr(), + Addr: t.remoteAddr, } // Attach Auth info if there is any. if t.authInfo != nil { pr.AuthInfo = t.authInfo } + userCtx := ctx ctx = peer.NewContext(ctx, pr) authData := make(map[string]string) for _, c := range t.creds { @@ -347,6 +368,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea return nil, ErrConnClosing } s := t.newStream(ctx, callHdr) + s.clientStatsCtx = userCtx t.activeStreams[s.id] = s // This stream is not counted when applySetings(...) initialize t.streamsQuota. @@ -357,7 +379,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea } t.mu.Unlock() if reset { - t.streamsQuota.reset(-1) + t.streamsQuota.add(-1) } // HPACK encodes various headers. Note that once WriteField(...) is @@ -413,6 +435,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea } } first := true + bufLen := t.hBuf.Len() // Sends the headers in a single batch even when they span multiple frames. for !endHeaders { size := t.hBuf.Len() @@ -447,6 +470,17 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea return nil, connectionErrorf(true, err, "transport: %v", err) } } + if stats.On() { + outHeader := &stats.OutHeader{ + Client: true, + WireLength: bufLen, + FullMethod: callHdr.Method, + RemoteAddr: t.remoteAddr, + LocalAddr: t.localAddr, + Compression: callHdr.SendCompress, + } + stats.HandleRPC(s.clientStatsCtx, outHeader) + } t.writableChan <- 0 return s, nil } @@ -525,6 +559,12 @@ func (t *http2Client) Close() (err error) { s.mu.Unlock() s.write(recvMsg{err: ErrConnClosing}) } + if stats.On() { + connEnd := &stats.ConnEnd{ + Client: true, + } + stats.HandleConn(t.ctx, connEnd) + } return } @@ -582,19 +622,14 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { var p []byte if r.Len() > 0 { size := http2MaxFrameLen - s.sendQuotaPool.add(0) // Wait until the stream has some quota to send the data. sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire()) if err != nil { return err } - t.sendQuotaPool.add(0) // Wait until the transport has some quota to send the data. tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire()) if err != nil { - if _, ok := err.(StreamError); ok || err == io.EOF { - t.sendQuotaPool.cancel() - } return err } if sq < size { @@ -874,6 +909,24 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } endStream := frame.StreamEnded() + var isHeader bool + defer func() { + if stats.On() { + if isHeader { + inHeader := &stats.InHeader{ + Client: true, + WireLength: int(frame.Header().Length), + } + stats.HandleRPC(s.clientStatsCtx, inHeader) + } else { + inTrailer := &stats.InTrailer{ + Client: true, + WireLength: int(frame.Header().Length), + } + stats.HandleRPC(s.clientStatsCtx, inTrailer) + } + } + }() s.mu.Lock() if !endStream { @@ -885,6 +938,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } close(s.headerChan) s.headerDone = true + isHeader = true } if !endStream || s.state == streamDone { s.mu.Unlock() @@ -994,13 +1048,13 @@ func (t *http2Client) applySettings(ss []http2.Setting) { t.maxStreams = int(s.Val) t.mu.Unlock() if reset { - t.streamsQuota.reset(int(s.Val) - ms) + t.streamsQuota.add(int(s.Val) - ms) } case http2.SettingInitialWindowSize: t.mu.Lock() for _, stream := range t.activeStreams { // Adjust the sending quota for each stream. - stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota)) + stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota)) } t.streamSendQuota = s.Val t.mu.Unlock() diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go index a62fb7c..316188e 100644 --- a/vendor/google.golang.org/grpc/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/transport/http2_server.go @@ -50,6 +50,8 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" + "google.golang.org/grpc/stats" + "google.golang.org/grpc/tap" ) // ErrIllegalHeaderWrite indicates that setting header is illegal because of @@ -58,9 +60,13 @@ var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHe // http2Server implements the ServerTransport interface with HTTP2. type http2Server struct { + ctx context.Context conn net.Conn + remoteAddr net.Addr + localAddr net.Addr maxStreamID uint32 // max stream ID ever seen authInfo credentials.AuthInfo // auth info about the connection + inTapHandle tap.ServerInHandle // writableChan synchronizes write access to the transport. // A writer acquires the write lock by receiving a value on writableChan // and releases it by sending on writableChan. @@ -91,12 +97,13 @@ type http2Server struct { // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is // returned if something goes wrong. -func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) { +func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { framer := newFramer(conn) // Send initial settings as connection preface to client. var settings []http2.Setting // TODO(zhaoq): Have a better way to signal "no limit" because 0 is // permitted in the HTTP2 spec. + maxStreams := config.MaxStreams if maxStreams == 0 { maxStreams = math.MaxUint32 } else { @@ -121,12 +128,16 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI } var buf bytes.Buffer t := &http2Server{ + ctx: context.Background(), conn: conn, - authInfo: authInfo, + remoteAddr: conn.RemoteAddr(), + localAddr: conn.LocalAddr(), + authInfo: config.AuthInfo, framer: framer, hBuf: &buf, hEnc: hpack.NewEncoder(&buf), maxStreams: maxStreams, + inTapHandle: config.InTapHandle, controlBuf: newRecvBuffer(), fc: &inFlow{limit: initialConnWindowSize}, sendQuotaPool: newQuotaPool(defaultWindowSize), @@ -136,13 +147,21 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI activeStreams: make(map[uint32]*Stream), streamSendQuota: defaultWindowSize, } + if stats.On() { + t.ctx = stats.TagConn(t.ctx, &stats.ConnTagInfo{ + RemoteAddr: t.remoteAddr, + LocalAddr: t.localAddr, + }) + connBegin := &stats.ConnBegin{} + stats.HandleConn(t.ctx, connBegin) + } go t.controller() t.writableChan <- 0 return t, nil } // operateHeader takes action on the decoded headers. -func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) { +func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) { buf := newRecvBuffer() s := &Stream{ id: frame.Header().StreamID, @@ -168,12 +187,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( } s.recvCompress = state.encoding if state.timeoutSet { - s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout) + s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout) } else { - s.ctx, s.cancel = context.WithCancel(context.TODO()) + s.ctx, s.cancel = context.WithCancel(t.ctx) } pr := &peer.Peer{ - Addr: t.conn.RemoteAddr(), + Addr: t.remoteAddr, } // Attach Auth info if there is any. if t.authInfo != nil { @@ -195,6 +214,18 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( } s.recvCompress = state.encoding s.method = state.method + if t.inTapHandle != nil { + var err error + info := &tap.Info{ + FullMethodName: state.method, + } + s.ctx, err = t.inTapHandle(s.ctx, info) + if err != nil { + // TODO: Log the real error. + t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream}) + return + } + } t.mu.Lock() if t.state != reachable { t.mu.Unlock() @@ -218,13 +249,26 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( s.windowHandler = func(n int) { t.updateWindow(s, uint32(n)) } + s.ctx = traceCtx(s.ctx, s.method) + if stats.On() { + s.ctx = stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) + inHeader := &stats.InHeader{ + FullMethod: s.method, + RemoteAddr: t.remoteAddr, + LocalAddr: t.localAddr, + Compression: s.recvCompress, + WireLength: int(frame.Header().Length), + } + stats.HandleRPC(s.ctx, inHeader) + } handle(s) return } // HandleStreams receives incoming streams using the given handler. This is // typically run in a separate goroutine. -func (t *http2Server) HandleStreams(handle func(*Stream)) { +// traceCtx attaches trace to ctx and returns the new context. +func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { // Check the validity of client preface. preface := make([]byte, len(clientPreface)) if _, err := io.ReadFull(t.conn, preface); err != nil { @@ -279,7 +323,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) { } switch frame := frame.(type) { case *http2.MetaHeadersFrame: - if t.operateHeaders(frame, handle) { + if t.operateHeaders(frame, handle, traceCtx) { t.Close() break } @@ -492,9 +536,16 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry}) } } + bufLen := t.hBuf.Len() if err := t.writeHeaders(s, t.hBuf, false); err != nil { return err } + if stats.On() { + outHeader := &stats.OutHeader{ + WireLength: bufLen, + } + stats.HandleRPC(s.Context(), outHeader) + } t.writableChan <- 0 return nil } @@ -547,10 +598,17 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry}) } } + bufLen := t.hBuf.Len() if err := t.writeHeaders(s, t.hBuf, true); err != nil { t.Close() return err } + if stats.On() { + outTrailer := &stats.OutTrailer{ + WireLength: bufLen, + } + stats.HandleRPC(s.Context(), outTrailer) + } t.closeStream(s) t.writableChan <- 0 return nil @@ -579,19 +637,14 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error { return nil } size := http2MaxFrameLen - s.sendQuotaPool.add(0) // Wait until the stream has some quota to send the data. sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire()) if err != nil { return err } - t.sendQuotaPool.add(0) // Wait until the transport has some quota to send the data. tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire()) if err != nil { - if _, ok := err.(StreamError); ok { - t.sendQuotaPool.cancel() - } return err } if sq < size { @@ -659,7 +712,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) { t.mu.Lock() defer t.mu.Unlock() for _, stream := range t.activeStreams { - stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota)) + stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota)) } t.streamSendQuota = s.Val } @@ -736,6 +789,10 @@ func (t *http2Server) Close() (err error) { for _, s := range streams { s.cancel() } + if stats.On() { + connEnd := &stats.ConnEnd{} + stats.HandleConn(t.ctx, connEnd) + } return } @@ -767,7 +824,7 @@ func (t *http2Server) closeStream(s *Stream) { } func (t *http2Server) RemoteAddr() net.Addr { - return t.conn.RemoteAddr() + return t.remoteAddr } func (t *http2Server) Drain() { diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go index 413f749..4726bb2 100644 --- a/vendor/google.golang.org/grpc/transport/transport.go +++ b/vendor/google.golang.org/grpc/transport/transport.go @@ -45,10 +45,10 @@ import ( "sync" "golang.org/x/net/context" - "golang.org/x/net/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/tap" ) // recvMsg represents the received msg from the transport. All transport @@ -167,6 +167,11 @@ type Stream struct { id uint32 // nil for client side Stream. st ServerTransport + // clientStatsCtx keeps the user context for stats handling. + // It's only valid on client side. Server side stats context is same as s.ctx. + // All client side stats collection should use the clientStatsCtx (instead of the stream context) + // so that all the generated stats for a particular RPC can be associated in the processing phase. + clientStatsCtx context.Context // ctx is the associated context of the stream. ctx context.Context // cancel is always nil for client side Stream. @@ -266,11 +271,6 @@ func (s *Stream) Context() context.Context { return s.ctx } -// TraceContext recreates the context of s with a trace.Trace. -func (s *Stream) TraceContext(tr trace.Trace) { - s.ctx = trace.NewContext(s.ctx, tr) -} - // Method returns the method for the stream. func (s *Stream) Method() string { return s.method @@ -355,10 +355,17 @@ const ( draining ) +// ServerConfig consists of all the configurations to establish a server transport. +type ServerConfig struct { + MaxStreams uint32 + AuthInfo credentials.AuthInfo + InTapHandle tap.ServerInHandle +} + // NewServerTransport creates a ServerTransport with conn or non-nil error // if it fails. -func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (ServerTransport, error) { - return newHTTP2Server(conn, maxStreams, authInfo) +func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) { + return newHTTP2Server(conn, config) } // ConnectOptions covers all relevant options for communicating with the server. @@ -367,6 +374,8 @@ type ConnectOptions struct { UserAgent string // Dialer specifies how to dial a network address. Dialer func(context.Context, string) (net.Conn, error) + // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors. + FailOnNonTempDialError bool // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs. PerRPCCredentials []credentials.PerRPCCredentials // TransportCredentials stores the Authenticator required to setup a client connection. @@ -466,7 +475,7 @@ type ClientTransport interface { // Write methods for a given Stream will be called serially. type ServerTransport interface { // HandleStreams receives incoming streams using the given handler. - HandleStreams(func(*Stream)) + HandleStreams(func(*Stream), func(context.Context, string) context.Context) // WriteHeader sends the header metadata for the given stream. // WriteHeader may not be called on all streams. |