aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc
diff options
context:
space:
mode:
authorNiall Sheridan <nsheridan@gmail.com>2016-12-28 21:18:36 +0000
committerNiall Sheridan <nsheridan@gmail.com>2016-12-28 21:18:36 +0000
commit73ef85bc5db590c22689e11be20737a3dd88168f (patch)
treefe393a6f0776bca1889b2113ab341a2922e25d10 /vendor/google.golang.org/grpc
parent9e573e571fe878ed32947cae5a6d43cb5d72d3bb (diff)
Update dependencies
Diffstat (limited to 'vendor/google.golang.org/grpc')
-rw-r--r--vendor/google.golang.org/grpc/README.md25
-rw-r--r--vendor/google.golang.org/grpc/call.go69
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go86
-rwxr-xr-xvendor/google.golang.org/grpc/codegen.sh2
-rwxr-xr-xvendor/google.golang.org/grpc/coverage.sh3
-rw-r--r--vendor/google.golang.org/grpc/credentials/oauth/oauth.go2
-rw-r--r--vendor/google.golang.org/grpc/metadata/metadata.go2
-rw-r--r--vendor/google.golang.org/grpc/rpc_util.go74
-rw-r--r--vendor/google.golang.org/grpc/server.go154
-rw-r--r--vendor/google.golang.org/grpc/stats/handlers.go146
-rw-r--r--vendor/google.golang.org/grpc/stats/stats.go223
-rw-r--r--vendor/google.golang.org/grpc/stream.go146
-rw-r--r--vendor/google.golang.org/grpc/tap/tap.go54
-rw-r--r--vendor/google.golang.org/grpc/transport/control.go34
-rw-r--r--vendor/google.golang.org/grpc/transport/handler_server.go2
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_client.go94
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_server.go87
-rw-r--r--vendor/google.golang.org/grpc/transport/transport.go27
18 files changed, 1077 insertions, 153 deletions
diff --git a/vendor/google.golang.org/grpc/README.md b/vendor/google.golang.org/grpc/README.md
index 110a8cf..39120c2 100644
--- a/vendor/google.golang.org/grpc/README.md
+++ b/vendor/google.golang.org/grpc/README.md
@@ -18,6 +18,22 @@ Prerequisites
This requires Go 1.5 or later.
+A note on the version used: significant performance improvements in benchmarks
+of grpc-go have been seen by upgrading the go version from 1.5 to the latest
+1.7.1.
+
+From https://golang.org/doc/install, one way to install the latest version of go is:
+```
+$ GO_VERSION=1.7.1
+$ OS=linux
+$ ARCH=amd64
+$ curl -O https://storage.googleapis.com/golang/go${GO_VERSION}.${OS}-${ARCH}.tar.gz
+$ sudo tar -C /usr/local -xzf go$GO_VERSION.$OS-$ARCH.tar.gz
+$ # Put go on the PATH, keep the usual installation dir
+$ sudo ln -s /usr/local/go/bin/go /usr/bin/go
+$ rm go$GO_VERSION.$OS-$ARCH.tar.gz
+```
+
Constraints
-----------
The grpc package should only depend on standard Go packages and a small number of exceptions. If your contribution introduces new dependencies which are NOT in the [list](http://godoc.org/google.golang.org/grpc?imports), you need a discussion with gRPC-Go authors and consultants.
@@ -30,3 +46,12 @@ Status
------
GA
+FAQ
+---
+
+#### Compiling error, undefined: grpc.SupportPackageIsVersion
+
+Please update proto package, gRPC package and rebuild the proto files:
+ - `go get -u github.com/golang/protobuf/{proto,protoc-gen-go}`
+ - `go get -u google.golang.org/grpc`
+ - `protoc --go_out=plugins=grpc:. *.proto`
diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go
index 772c817..4d8023d 100644
--- a/vendor/google.golang.org/grpc/call.go
+++ b/vendor/google.golang.org/grpc/call.go
@@ -42,6 +42,7 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@@ -49,7 +50,8 @@ import (
// On error, it returns the error and indicates whether the call should be retried.
//
// TODO(zhaoq): Check whether the received message sequence is valid.
-func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {
+// TODO ctx is used for stats collection and processing. It is the context passed from the application.
+func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {
// Try to acquire header metadata from the server if there is any.
defer func() {
if err != nil {
@@ -63,14 +65,25 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s
return
}
p := &parser{r: stream}
+ var inPayload *stats.InPayload
+ if stats.On() {
+ inPayload = &stats.InPayload{
+ Client: true,
+ }
+ }
for {
- if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32); err != nil {
+ if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inPayload); err != nil {
if err == io.EOF {
break
}
return
}
}
+ if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK {
+ // TODO in the current implementation, inTrailer may be handled before inPayload in some cases.
+ // Fix the order if necessary.
+ stats.HandleRPC(ctx, inPayload)
+ }
c.trailerMD = stream.Trailer()
return nil
}
@@ -89,15 +102,27 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
}
}
}()
- var cbuf *bytes.Buffer
+ var (
+ cbuf *bytes.Buffer
+ outPayload *stats.OutPayload
+ )
if compressor != nil {
cbuf = new(bytes.Buffer)
}
- outBuf, err := encode(codec, args, compressor, cbuf)
+ if stats.On() {
+ outPayload = &stats.OutPayload{
+ Client: true,
+ }
+ }
+ outBuf, err := encode(codec, args, compressor, cbuf, outPayload)
if err != nil {
return nil, Errorf(codes.Internal, "grpc: %v", err)
}
err = t.Write(stream, outBuf, opts)
+ if err == nil && outPayload != nil {
+ outPayload.SentTime = time.Now()
+ stats.HandleRPC(ctx, outPayload)
+ }
// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
// recvResponse to get the final status.
@@ -118,8 +143,16 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
return invoke(ctx, method, args, reply, cc, opts...)
}
-func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
+func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
c := defaultCallInfo
+ if mc, ok := cc.getMethodConfig(method); ok {
+ c.failFast = !mc.WaitForReady
+ if mc.Timeout > 0 {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
+ defer cancel()
+ }
+ }
for _, o := range opts {
if err := o.before(&c); err != nil {
return toRPCErr(err)
@@ -140,12 +173,31 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)
// TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set.
defer func() {
- if err != nil {
- c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+ if e != nil {
+ c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true)
c.traceInfo.tr.SetError()
}
}()
}
+ if stats.On() {
+ ctx = stats.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
+ begin := &stats.Begin{
+ Client: true,
+ BeginTime: time.Now(),
+ FailFast: c.failFast,
+ }
+ stats.HandleRPC(ctx, begin)
+ }
+ defer func() {
+ if stats.On() {
+ end := &stats.End{
+ Client: true,
+ EndTime: time.Now(),
+ Error: e,
+ }
+ stats.HandleRPC(ctx, end)
+ }
+ }()
topts := &transport.Options{
Last: true,
Delay: false,
@@ -167,6 +219,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
}
+
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
}
@@ -205,7 +258,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
return toRPCErr(err)
}
- err = recvResponse(cc.dopts, t, &c, stream, reply)
+ err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)
if err != nil {
if put != nil {
put()
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 6167472..aa6b63d 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -54,6 +54,8 @@ var (
ErrClientConnClosing = errors.New("grpc: the client connection is closing")
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
+ // DEPRECATED: Please use context.DeadlineExceeded instead. This error will be
+ // removed in Q1 2017.
ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
// errNoTransportSecurity indicates that there is no transport security
@@ -93,6 +95,7 @@ type dialOptions struct {
block bool
insecure bool
timeout time.Duration
+ scChan <-chan ServiceConfig
copts transport.ConnectOptions
}
@@ -129,6 +132,13 @@ func WithBalancer(b Balancer) DialOption {
}
}
+// WithServiceConfig returns a DialOption which has a channel to read the service configuration.
+func WithServiceConfig(c <-chan ServiceConfig) DialOption {
+ return func(o *dialOptions) {
+ o.scChan = c
+ }
+}
+
// WithBackoffMaxDelay configures the dialer to use the provided maximum delay
// when backing off after failed connection attempts.
func WithBackoffMaxDelay(md time.Duration) DialOption {
@@ -199,6 +209,8 @@ func WithTimeout(d time.Duration) DialOption {
}
// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
+// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
+// Temporary() method to decide if it should try to reconnect to the network address.
func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
return func(o *dialOptions) {
o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
@@ -210,6 +222,17 @@ func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
}
}
+// FailOnNonTempDialError returns a DialOption that specified if gRPC fails on non-temporary dial errors.
+// If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
+// address and won't try to reconnect.
+// The default value of FailOnNonTempDialError is false.
+// This is an EXPERIMENTAL API.
+func FailOnNonTempDialError(f bool) DialOption {
+ return func(o *dialOptions) {
+ o.copts.FailOnNonTempDialError = f
+ }
+}
+
// WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
func WithUserAgent(s string) DialOption {
return func(o *dialOptions) {
@@ -247,6 +270,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
conns: make(map[Address]*addrConn),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
+ for _, opt := range opts {
+ opt(&cc.dopts)
+ }
+ if cc.dopts.timeout > 0 {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
+ defer cancel()
+ }
+
defer func() {
select {
case <-ctx.Done():
@@ -259,10 +291,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}()
- for _, opt := range opts {
- opt(&cc.dopts)
+ if cc.dopts.scChan != nil {
+ // Wait for the initial service config.
+ select {
+ case sc, ok := <-cc.dopts.scChan:
+ if ok {
+ cc.sc = sc
+ }
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
}
-
// Set defaults.
if cc.dopts.codec == nil {
cc.dopts.codec = protoCodec{}
@@ -284,6 +323,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
waitC := make(chan error, 1)
go func() {
var addrs []Address
+ if cc.dopts.balancer == nil && cc.sc.LB != nil {
+ cc.dopts.balancer = cc.sc.LB
+ }
if cc.dopts.balancer == nil {
// Connect to target directly if balancer is nil.
addrs = append(addrs, Address{Addr: target})
@@ -319,10 +361,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
close(waitC)
}()
- var timeoutCh <-chan time.Time
- if cc.dopts.timeout > 0 {
- timeoutCh = time.After(cc.dopts.timeout)
- }
select {
case <-ctx.Done():
return nil, ctx.Err()
@@ -330,14 +368,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if err != nil {
return nil, err
}
- case <-timeoutCh:
- return nil, ErrClientConnTimeout
}
+
// If balancer is nil or balancer.Notify() is nil, ok will be false here.
// The lbWatcher goroutine will not be created.
if ok {
go cc.lbWatcher()
}
+
+ if cc.dopts.scChan != nil {
+ go cc.scWatcher()
+ }
return cc, nil
}
@@ -384,6 +425,7 @@ type ClientConn struct {
dopts dialOptions
mu sync.RWMutex
+ sc ServiceConfig
conns map[Address]*addrConn
}
@@ -422,6 +464,24 @@ func (cc *ClientConn) lbWatcher() {
}
}
+func (cc *ClientConn) scWatcher() {
+ for {
+ select {
+ case sc, ok := <-cc.dopts.scChan:
+ if !ok {
+ return
+ }
+ cc.mu.Lock()
+ // TODO: load balance policy runtime change is ignored.
+ // We may revist this decision in the future.
+ cc.sc = sc
+ cc.mu.Unlock()
+ case <-cc.ctx.Done():
+ return
+ }
+ }
+}
+
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
@@ -509,6 +569,14 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err
return nil
}
+// TODO: Avoid the locking here.
+func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) {
+ cc.mu.RLock()
+ defer cc.mu.RUnlock()
+ m, ok = cc.sc.Methods[method]
+ return
+}
+
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
var (
ac *addrConn
diff --git a/vendor/google.golang.org/grpc/codegen.sh b/vendor/google.golang.org/grpc/codegen.sh
index b009488..4cdc6ba 100755
--- a/vendor/google.golang.org/grpc/codegen.sh
+++ b/vendor/google.golang.org/grpc/codegen.sh
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/usr/bin/env bash
# This script serves as an example to demonstrate how to generate the gRPC-Go
# interface and the related messages from .proto file.
diff --git a/vendor/google.golang.org/grpc/coverage.sh b/vendor/google.golang.org/grpc/coverage.sh
index 1202353..b85f918 100755
--- a/vendor/google.golang.org/grpc/coverage.sh
+++ b/vendor/google.golang.org/grpc/coverage.sh
@@ -1,4 +1,5 @@
-#!/bin/bash
+#!/usr/bin/env bash
+
set -e
diff --git a/vendor/google.golang.org/grpc/credentials/oauth/oauth.go b/vendor/google.golang.org/grpc/credentials/oauth/oauth.go
index 8e68c4d..25393cc 100644
--- a/vendor/google.golang.org/grpc/credentials/oauth/oauth.go
+++ b/vendor/google.golang.org/grpc/credentials/oauth/oauth.go
@@ -61,7 +61,7 @@ func (ts TokenSource) GetRequestMetadata(ctx context.Context, uri ...string) (ma
}, nil
}
-// RequireTransportSecurity indicates whether the credentails requires transport security.
+// RequireTransportSecurity indicates whether the credentials requires transport security.
func (ts TokenSource) RequireTransportSecurity() bool {
return true
}
diff --git a/vendor/google.golang.org/grpc/metadata/metadata.go b/vendor/google.golang.org/grpc/metadata/metadata.go
index 3c0ca7a..65dc5af 100644
--- a/vendor/google.golang.org/grpc/metadata/metadata.go
+++ b/vendor/google.golang.org/grpc/metadata/metadata.go
@@ -141,6 +141,8 @@ func NewContext(ctx context.Context, md MD) context.Context {
}
// FromContext returns the MD in ctx if it exists.
+// The returned md should be immutable, writing to it may cause races.
+// Modification should be made to the copies of the returned md.
func FromContext(ctx context.Context) (md MD, ok bool) {
md, ok = ctx.Value(mdKey{}).(MD)
return
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
index 6b60095..2619d39 100644
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -42,11 +42,13 @@ import (
"io/ioutil"
"math"
"os"
+ "time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@@ -255,9 +257,11 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro
// encode serializes msg and prepends the message header. If msg is nil, it
// generates the message header of 0 message length.
-func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte, error) {
- var b []byte
- var length uint
+func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayload *stats.OutPayload) ([]byte, error) {
+ var (
+ b []byte
+ length uint
+ )
if msg != nil {
var err error
// TODO(zhaoq): optimize to reduce memory alloc and copying.
@@ -265,6 +269,12 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte
if err != nil {
return nil, err
}
+ if outPayload != nil {
+ outPayload.Payload = msg
+ // TODO truncate large payload.
+ outPayload.Data = b
+ outPayload.Length = len(b)
+ }
if cp != nil {
if err := cp.Do(cbuf, b); err != nil {
return nil, err
@@ -295,6 +305,10 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte
// Copy encoded msg to buf
copy(buf[5:], b)
+ if outPayload != nil {
+ outPayload.WireLength = len(buf)
+ }
+
return buf, nil
}
@@ -311,11 +325,14 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er
return nil
}
-func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int) error {
+func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error {
pf, d, err := p.recvMsg(maxMsgSize)
if err != nil {
return err
}
+ if inPayload != nil {
+ inPayload.WireLength = len(d)
+ }
if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {
return err
}
@@ -333,6 +350,13 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
if err := c.Unmarshal(d, m); err != nil {
return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
}
+ if inPayload != nil {
+ inPayload.RecvTime = time.Now()
+ inPayload.Payload = m
+ // TODO truncate large payload.
+ inPayload.Data = d
+ inPayload.Length = len(d)
+ }
return nil
}
@@ -448,10 +472,48 @@ func convertCode(err error) codes.Code {
return codes.Unknown
}
-// SupportPackageIsVersion3 is referenced from generated protocol buffer files
+// MethodConfig defines the configuration recommended by the service providers for a
+// particular method.
+// This is EXPERIMENTAL and subject to change.
+type MethodConfig struct {
+ // WaitForReady indicates whether RPCs sent to this method should wait until
+ // the connection is ready by default (!failfast). The value specified via the
+ // gRPC client API will override the value set here.
+ WaitForReady bool
+ // Timeout is the default timeout for RPCs sent to this method. The actual
+ // deadline used will be the minimum of the value specified here and the value
+ // set by the application via the gRPC client API. If either one is not set,
+ // then the other will be used. If neither is set, then the RPC has no deadline.
+ Timeout time.Duration
+ // MaxReqSize is the maximum allowed payload size for an individual request in a
+ // stream (client->server) in bytes. The size which is measured is the serialized,
+ // uncompressed payload in bytes. The actual value used is the minumum of the value
+ // specified here and the value set by the application via the gRPC client API. If
+ // either one is not set, then the other will be used. If neither is set, then the
+ // built-in default is used.
+ // TODO: support this.
+ MaxReqSize uint64
+ // MaxRespSize is the maximum allowed payload size for an individual response in a
+ // stream (server->client) in bytes.
+ // TODO: support this.
+ MaxRespSize uint64
+}
+
+// ServiceConfig is provided by the service provider and contains parameters for how
+// clients that connect to the service should behave.
+// This is EXPERIMENTAL and subject to change.
+type ServiceConfig struct {
+ // LB is the load balancer the service providers recommends. The balancer specified
+ // via grpc.WithBalancer will override this.
+ LB Balancer
+ // Methods contains a map for the methods in this service.
+ Methods map[string]MethodConfig
+}
+
+// SupportPackageIsVersion4 is referenced from generated protocol buffer files
// to assert that that code is compatible with this version of the grpc package.
//
// This constant may be renamed in the future if a change in the generated code
// requires a synchronised update of grpc-go and protoc-gen-go. This constant
// should not be referenced from any other code.
-const SupportPackageIsVersion3 = true
+const SupportPackageIsVersion4 = true
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index e0bb187..b52a563 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -54,6 +54,8 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/stats"
+ "google.golang.org/grpc/tap"
"google.golang.org/grpc/transport"
)
@@ -110,6 +112,7 @@ type options struct {
maxMsgSize int
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
+ inTapHandle tap.ServerInHandle
maxConcurrentStreams uint32
useHandlerImpl bool // use http.Handler-based server
}
@@ -186,6 +189,17 @@ func StreamInterceptor(i StreamServerInterceptor) ServerOption {
}
}
+// InTapHandle returns a ServerOption that sets the tap handle for all the server
+// transport to be created. Only one can be installed.
+func InTapHandle(h tap.ServerInHandle) ServerOption {
+ return func(o *options) {
+ if o.inTapHandle != nil {
+ panic("The tap handle has been set.")
+ }
+ o.inTapHandle = h
+ }
+}
+
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
@@ -329,6 +343,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
+// Serve always returns non-nil error.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("serving")
@@ -412,17 +427,22 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
if s.opts.useHandlerImpl {
s.serveUsingHandler(conn)
} else {
- s.serveNewHTTP2Transport(conn, authInfo)
+ s.serveHTTP2Transport(conn, authInfo)
}
}
-// serveNewHTTP2Transport sets up a new http/2 transport (using the
+// serveHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go) and
// serves streams on it.
// This is run in its own goroutine (it does network I/O in
// transport.NewServerTransport).
-func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
- st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo)
+func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
+ config := &transport.ServerConfig{
+ MaxStreams: s.opts.maxConcurrentStreams,
+ AuthInfo: authInfo,
+ InTapHandle: s.opts.inTapHandle,
+ }
+ st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
s.mu.Lock()
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
@@ -448,6 +468,12 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
+ }, func(ctx context.Context, method string) context.Context {
+ if !EnableTracing {
+ return ctx
+ }
+ tr := trace.New("grpc.Recv."+methodFamily(method), method)
+ return trace.NewContext(ctx, tr)
})
wg.Wait()
}
@@ -497,15 +523,17 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
// If tracing is not enabled, it returns nil.
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
- if !EnableTracing {
+ tr, ok := trace.FromContext(stream.Context())
+ if !ok {
return nil
}
+
trInfo = &traceInfo{
- tr: trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()),
+ tr: tr,
}
trInfo.firstLine.client = false
trInfo.firstLine.remoteAddr = st.RemoteAddr()
- stream.TraceContext(trInfo.tr)
+
if dl, ok := stream.Context().Deadline(); ok {
trInfo.firstLine.deadline = dl.Sub(time.Now())
}
@@ -532,11 +560,17 @@ func (s *Server) removeConn(c io.Closer) {
}
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
- var cbuf *bytes.Buffer
+ var (
+ cbuf *bytes.Buffer
+ outPayload *stats.OutPayload
+ )
if cp != nil {
cbuf = new(bytes.Buffer)
}
- p, err := encode(s.opts.codec, msg, cp, cbuf)
+ if stats.On() {
+ outPayload = &stats.OutPayload{}
+ }
+ p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
if err != nil {
// This typically indicates a fatal issue (e.g., memory
// corruption or hardware faults) the application program
@@ -547,10 +581,32 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
// the optimal option.
grpclog.Fatalf("grpc: Server failed to encode response %v", err)
}
- return t.Write(stream, p, opts)
+ err = t.Write(stream, p, opts)
+ if err == nil && outPayload != nil {
+ outPayload.SentTime = time.Now()
+ stats.HandleRPC(stream.Context(), outPayload)
+ }
+ return err
}
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
+ if stats.On() {
+ begin := &stats.Begin{
+ BeginTime: time.Now(),
+ }
+ stats.HandleRPC(stream.Context(), begin)
+ }
+ defer func() {
+ if stats.On() {
+ end := &stats.End{
+ EndTime: time.Now(),
+ }
+ if err != nil && err != io.EOF {
+ end.Error = toRPCErr(err)
+ }
+ stats.HandleRPC(stream.Context(), end)
+ }
+ }()
if trInfo != nil {
defer trInfo.tr.Finish()
trInfo.firstLine.client = false
@@ -579,14 +635,14 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err != nil {
switch err := err.(type) {
case *rpcError:
- if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
+ if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
case transport.ConnectionError:
// Nothing to do here.
case transport.StreamError:
- if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
+ if e := t.WriteStatus(stream, err.Code, err.Desc); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
default:
panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
@@ -597,20 +653,29 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
switch err := err.(type) {
case *rpcError:
- if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
+ if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
+ return err
default:
- if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
+ if e := t.WriteStatus(stream, codes.Internal, err.Error()); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
-
+ // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
+ }
+ }
+ var inPayload *stats.InPayload
+ if stats.On() {
+ inPayload = &stats.InPayload{
+ RecvTime: time.Now(),
}
- return err
}
statusCode := codes.OK
statusDesc := ""
df := func(v interface{}) error {
+ if inPayload != nil {
+ inPayload.WireLength = len(req)
+ }
if pf == compressionMade {
var err error
req, err = s.opts.dc.Do(bytes.NewReader(req))
@@ -618,7 +683,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
}
- return err
+ return Errorf(codes.Internal, err.Error())
}
}
if len(req) > s.opts.maxMsgSize {
@@ -630,6 +695,12 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := s.opts.codec.Unmarshal(req, v); err != nil {
return err
}
+ if inPayload != nil {
+ inPayload.Payload = v
+ inPayload.Data = req
+ inPayload.Length = len(req)
+ stats.HandleRPC(stream.Context(), inPayload)
+ }
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
}
@@ -650,9 +721,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
- return err
}
- return nil
+ return Errorf(statusCode, statusDesc)
}
if trInfo != nil {
trInfo.tr.LazyLog(stringer("OK"), false)
@@ -677,11 +747,32 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
}
- return t.WriteStatus(stream, statusCode, statusDesc)
+ errWrite := t.WriteStatus(stream, statusCode, statusDesc)
+ if statusCode != codes.OK {
+ return Errorf(statusCode, statusDesc)
+ }
+ return errWrite
}
}
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
+ if stats.On() {
+ begin := &stats.Begin{
+ BeginTime: time.Now(),
+ }
+ stats.HandleRPC(stream.Context(), begin)
+ }
+ defer func() {
+ if stats.On() {
+ end := &stats.End{
+ EndTime: time.Now(),
+ }
+ if err != nil && err != io.EOF {
+ end.Error = toRPCErr(err)
+ }
+ stats.HandleRPC(stream.Context(), end)
+ }
+ }()
if s.opts.cp != nil {
stream.SetSendCompress(s.opts.cp.Type())
}
@@ -744,7 +835,11 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
}
ss.mu.Unlock()
}
- return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
+ errWrite := t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
+ if ss.statusCode != codes.OK {
+ return Errorf(ss.statusCode, ss.statusDesc)
+ }
+ return errWrite
}
@@ -759,7 +854,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
trInfo.tr.SetError()
}
- if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil {
+ errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
+ if err := t.WriteStatus(stream, codes.InvalidArgument, errDesc); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
@@ -779,7 +875,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
trInfo.tr.SetError()
}
- if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil {
+ errDesc := fmt.Sprintf("unknown service %v", service)
+ if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
@@ -804,7 +901,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
trInfo.tr.SetError()
}
- if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {
+ errDesc := fmt.Sprintf("unknown method %v", method)
+ if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
diff --git a/vendor/google.golang.org/grpc/stats/handlers.go b/vendor/google.golang.org/grpc/stats/handlers.go
new file mode 100644
index 0000000..ce47786
--- /dev/null
+++ b/vendor/google.golang.org/grpc/stats/handlers.go
@@ -0,0 +1,146 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package stats
+
+import (
+ "net"
+ "sync/atomic"
+
+ "golang.org/x/net/context"
+ "google.golang.org/grpc/grpclog"
+)
+
+// ConnTagInfo defines the relevant information needed by connection context tagger.
+type ConnTagInfo struct {
+ // RemoteAddr is the remote address of the corresponding connection.
+ RemoteAddr net.Addr
+ // LocalAddr is the local address of the corresponding connection.
+ LocalAddr net.Addr
+ // TODO add QOS related fields.
+}
+
+// RPCTagInfo defines the relevant information needed by RPC context tagger.
+type RPCTagInfo struct {
+ // FullMethodName is the RPC method in the format of /package.service/method.
+ FullMethodName string
+}
+
+var (
+ on = new(int32)
+ rpcHandler func(context.Context, RPCStats)
+ connHandler func(context.Context, ConnStats)
+ connTagger func(context.Context, *ConnTagInfo) context.Context
+ rpcTagger func(context.Context, *RPCTagInfo) context.Context
+)
+
+// HandleRPC processes the RPC stats using the rpc handler registered by the user.
+func HandleRPC(ctx context.Context, s RPCStats) {
+ if rpcHandler == nil {
+ return
+ }
+ rpcHandler(ctx, s)
+}
+
+// RegisterRPCHandler registers the user handler function for RPC stats processing.
+// It should be called only once. The later call will overwrite the former value if it is called multiple times.
+// This handler function will be called to process the rpc stats.
+func RegisterRPCHandler(f func(context.Context, RPCStats)) {
+ rpcHandler = f
+}
+
+// HandleConn processes the stats using the call back function registered by user.
+func HandleConn(ctx context.Context, s ConnStats) {
+ if connHandler == nil {
+ return
+ }
+ connHandler(ctx, s)
+}
+
+// RegisterConnHandler registers the user handler function for conn stats.
+// It should be called only once. The later call will overwrite the former value if it is called multiple times.
+// This handler function will be called to process the conn stats.
+func RegisterConnHandler(f func(context.Context, ConnStats)) {
+ connHandler = f
+}
+
+// TagConn calls user registered connection context tagger.
+func TagConn(ctx context.Context, info *ConnTagInfo) context.Context {
+ if connTagger == nil {
+ return ctx
+ }
+ return connTagger(ctx, info)
+}
+
+// RegisterConnTagger registers the user connection context tagger function.
+// The connection context tagger can attach some information to the given context.
+// The returned context will be used for stats handling.
+// For conn stats handling, the context used in connHandler for this
+// connection will be derived from the context returned.
+// For RPC stats handling,
+// - On server side, the context used in rpcHandler for all RPCs on this
+// connection will be derived from the context returned.
+// - On client side, the context is not derived from the context returned.
+func RegisterConnTagger(t func(context.Context, *ConnTagInfo) context.Context) {
+ connTagger = t
+}
+
+// TagRPC calls the user registered RPC context tagger.
+func TagRPC(ctx context.Context, info *RPCTagInfo) context.Context {
+ if rpcTagger == nil {
+ return ctx
+ }
+ return rpcTagger(ctx, info)
+}
+
+// RegisterRPCTagger registers the user RPC context tagger function.
+// The RPC context tagger can attach some information to the given context.
+// The context used in stats rpcHandler for this RPC will be derived from the
+// context returned.
+func RegisterRPCTagger(t func(context.Context, *RPCTagInfo) context.Context) {
+ rpcTagger = t
+}
+
+// Start starts the stats collection and processing if there is a registered stats handle.
+func Start() {
+ if rpcHandler == nil && connHandler == nil {
+ grpclog.Println("rpcHandler and connHandler are both nil when starting stats. Stats is not started")
+ return
+ }
+ atomic.StoreInt32(on, 1)
+}
+
+// On indicates whether the stats collection and processing is on.
+func On() bool {
+ return atomic.CompareAndSwapInt32(on, 1, 1)
+}
diff --git a/vendor/google.golang.org/grpc/stats/stats.go b/vendor/google.golang.org/grpc/stats/stats.go
new file mode 100644
index 0000000..a82448a
--- /dev/null
+++ b/vendor/google.golang.org/grpc/stats/stats.go
@@ -0,0 +1,223 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+// Package stats is for collecting and reporting various network and RPC stats.
+// This package is for monitoring purpose only. All fields are read-only.
+// All APIs are experimental.
+package stats // import "google.golang.org/grpc/stats"
+
+import (
+ "net"
+ "time"
+)
+
+// RPCStats contains stats information about RPCs.
+type RPCStats interface {
+ isRPCStats()
+ // IsClient returns true if this RPCStats is from client side.
+ IsClient() bool
+}
+
+// Begin contains stats when an RPC begins.
+// FailFast are only valid if Client is true.
+type Begin struct {
+ // Client is true if this Begin is from client side.
+ Client bool
+ // BeginTime is the time when the RPC begins.
+ BeginTime time.Time
+ // FailFast indicates if this RPC is failfast.
+ FailFast bool
+}
+
+// IsClient indicates if this is from client side.
+func (s *Begin) IsClient() bool { return s.Client }
+
+func (s *Begin) isRPCStats() {}
+
+// InPayload contains the information for an incoming payload.
+type InPayload struct {
+ // Client is true if this InPayload is from client side.
+ Client bool
+ // Payload is the payload with original type.
+ Payload interface{}
+ // Data is the serialized message payload.
+ Data []byte
+ // Length is the length of uncompressed data.
+ Length int
+ // WireLength is the length of data on wire (compressed, signed, encrypted).
+ WireLength int
+ // RecvTime is the time when the payload is received.
+ RecvTime time.Time
+}
+
+// IsClient indicates if this is from client side.
+func (s *InPayload) IsClient() bool { return s.Client }
+
+func (s *InPayload) isRPCStats() {}
+
+// InHeader contains stats when a header is received.
+// FullMethod, addresses and Compression are only valid if Client is false.
+type InHeader struct {
+ // Client is true if this InHeader is from client side.
+ Client bool
+ // WireLength is the wire length of header.
+ WireLength int
+
+ // FullMethod is the full RPC method string, i.e., /package.service/method.
+ FullMethod string
+ // RemoteAddr is the remote address of the corresponding connection.
+ RemoteAddr net.Addr
+ // LocalAddr is the local address of the corresponding connection.
+ LocalAddr net.Addr
+ // Compression is the compression algorithm used for the RPC.
+ Compression string
+}
+
+// IsClient indicates if this is from client side.
+func (s *InHeader) IsClient() bool { return s.Client }
+
+func (s *InHeader) isRPCStats() {}
+
+// InTrailer contains stats when a trailer is received.
+type InTrailer struct {
+ // Client is true if this InTrailer is from client side.
+ Client bool
+ // WireLength is the wire length of trailer.
+ WireLength int
+}
+
+// IsClient indicates if this is from client side.
+func (s *InTrailer) IsClient() bool { return s.Client }
+
+func (s *InTrailer) isRPCStats() {}
+
+// OutPayload contains the information for an outgoing payload.
+type OutPayload struct {
+ // Client is true if this OutPayload is from client side.
+ Client bool
+ // Payload is the payload with original type.
+ Payload interface{}
+ // Data is the serialized message payload.
+ Data []byte
+ // Length is the length of uncompressed data.
+ Length int
+ // WireLength is the length of data on wire (compressed, signed, encrypted).
+ WireLength int
+ // SentTime is the time when the payload is sent.
+ SentTime time.Time
+}
+
+// IsClient indicates if this is from client side.
+func (s *OutPayload) IsClient() bool { return s.Client }
+
+func (s *OutPayload) isRPCStats() {}
+
+// OutHeader contains stats when a header is sent.
+// FullMethod, addresses and Compression are only valid if Client is true.
+type OutHeader struct {
+ // Client is true if this OutHeader is from client side.
+ Client bool
+ // WireLength is the wire length of header.
+ WireLength int
+
+ // FullMethod is the full RPC method string, i.e., /package.service/method.
+ FullMethod string
+ // RemoteAddr is the remote address of the corresponding connection.
+ RemoteAddr net.Addr
+ // LocalAddr is the local address of the corresponding connection.
+ LocalAddr net.Addr
+ // Compression is the compression algorithm used for the RPC.
+ Compression string
+}
+
+// IsClient indicates if this is from client side.
+func (s *OutHeader) IsClient() bool { return s.Client }
+
+func (s *OutHeader) isRPCStats() {}
+
+// OutTrailer contains stats when a trailer is sent.
+type OutTrailer struct {
+ // Client is true if this OutTrailer is from client side.
+ Client bool
+ // WireLength is the wire length of trailer.
+ WireLength int
+}
+
+// IsClient indicates if this is from client side.
+func (s *OutTrailer) IsClient() bool { return s.Client }
+
+func (s *OutTrailer) isRPCStats() {}
+
+// End contains stats when an RPC ends.
+type End struct {
+ // Client is true if this End is from client side.
+ Client bool
+ // EndTime is the time when the RPC ends.
+ EndTime time.Time
+ // Error is the error just happened. Its type is gRPC error.
+ Error error
+}
+
+// IsClient indicates if this is from client side.
+func (s *End) IsClient() bool { return s.Client }
+
+func (s *End) isRPCStats() {}
+
+// ConnStats contains stats information about connections.
+type ConnStats interface {
+ isConnStats()
+ // IsClient returns true if this ConnStats is from client side.
+ IsClient() bool
+}
+
+// ConnBegin contains the stats of a connection when it is established.
+type ConnBegin struct {
+ // Client is true if this ConnBegin is from client side.
+ Client bool
+}
+
+// IsClient indicates if this is from client side.
+func (s *ConnBegin) IsClient() bool { return s.Client }
+
+func (s *ConnBegin) isConnStats() {}
+
+// ConnEnd contains the stats of a connection when it ends.
+type ConnEnd struct {
+ // Client is true if this ConnEnd is from client side.
+ Client bool
+}
+
+// IsClient indicates if this is from client side.
+func (s *ConnEnd) IsClient() bool { return s.Client }
+
+func (s *ConnEnd) isConnStats() {}
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 4681054..d3a4deb 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -45,6 +45,7 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@@ -97,7 +98,7 @@ type ClientStream interface {
// NewClientStream creates a new Stream for the client side. This is called
// by generated code.
-func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
+func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
if cc.dopts.streamInt != nil {
return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
}
@@ -106,11 +107,18 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
var (
- t transport.ClientTransport
- s *transport.Stream
- put func()
+ t transport.ClientTransport
+ s *transport.Stream
+ put func()
+ cancel context.CancelFunc
)
c := defaultCallInfo
+ if mc, ok := cc.getMethodConfig(method); ok {
+ c.failFast = !mc.WaitForReady
+ if mc.Timeout > 0 {
+ ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
+ }
+ }
for _, o := range opts {
if err := o.before(&c); err != nil {
return nil, toRPCErr(err)
@@ -143,6 +151,25 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
}()
}
+ if stats.On() {
+ ctx = stats.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
+ begin := &stats.Begin{
+ Client: true,
+ BeginTime: time.Now(),
+ FailFast: c.failFast,
+ }
+ stats.HandleRPC(ctx, begin)
+ }
+ defer func() {
+ if err != nil && stats.On() {
+ // Only handle end stats if err != nil.
+ end := &stats.End{
+ Client: true,
+ Error: err,
+ }
+ stats.HandleRPC(ctx, end)
+ }
+ }()
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
}
@@ -180,12 +207,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
break
}
cs := &clientStream{
- opts: opts,
- c: c,
- desc: desc,
- codec: cc.dopts.codec,
- cp: cc.dopts.cp,
- dc: cc.dopts.dc,
+ opts: opts,
+ c: c,
+ desc: desc,
+ codec: cc.dopts.codec,
+ cp: cc.dopts.cp,
+ dc: cc.dopts.dc,
+ cancel: cancel,
put: put,
t: t,
@@ -194,6 +222,8 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
tracing: EnableTracing,
trInfo: trInfo,
+
+ statsCtx: ctx,
}
if cc.dopts.cp != nil {
cs.cbuf = new(bytes.Buffer)
@@ -227,16 +257,17 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
// clientStream implements a client side Stream.
type clientStream struct {
- opts []CallOption
- c callInfo
- t transport.ClientTransport
- s *transport.Stream
- p *parser
- desc *StreamDesc
- codec Codec
- cp Compressor
- cbuf *bytes.Buffer
- dc Decompressor
+ opts []CallOption
+ c callInfo
+ t transport.ClientTransport
+ s *transport.Stream
+ p *parser
+ desc *StreamDesc
+ codec Codec
+ cp Compressor
+ cbuf *bytes.Buffer
+ dc Decompressor
+ cancel context.CancelFunc
tracing bool // set to EnableTracing when the clientStream is created.
@@ -246,6 +277,11 @@ type clientStream struct {
// trInfo.tr is set when the clientStream is created (if EnableTracing is true),
// and is set to nil when the clientStream's finish method is called.
trInfo traceInfo
+
+ // statsCtx keeps the user context for stats handling.
+ // All stats collection should use the statsCtx (instead of the stream context)
+ // so that all the generated stats for a particular RPC can be associated in the processing phase.
+ statsCtx context.Context
}
func (cs *clientStream) Context() context.Context {
@@ -274,6 +310,8 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
cs.mu.Unlock()
}
+ // TODO Investigate how to signal the stats handling party.
+ // generate error stats if err != nil && err != io.EOF?
defer func() {
if err != nil {
cs.finish(err)
@@ -296,7 +334,13 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
err = toRPCErr(err)
}()
- out, err := encode(cs.codec, m, cs.cp, cs.cbuf)
+ var outPayload *stats.OutPayload
+ if stats.On() {
+ outPayload = &stats.OutPayload{
+ Client: true,
+ }
+ }
+ out, err := encode(cs.codec, m, cs.cp, cs.cbuf, outPayload)
defer func() {
if cs.cbuf != nil {
cs.cbuf.Reset()
@@ -305,11 +349,37 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if err != nil {
return Errorf(codes.Internal, "grpc: %v", err)
}
- return cs.t.Write(cs.s, out, &transport.Options{Last: false})
+ err = cs.t.Write(cs.s, out, &transport.Options{Last: false})
+ if err == nil && outPayload != nil {
+ outPayload.SentTime = time.Now()
+ stats.HandleRPC(cs.statsCtx, outPayload)
+ }
+ return err
}
func (cs *clientStream) RecvMsg(m interface{}) (err error) {
- err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
+ defer func() {
+ if err != nil && stats.On() {
+ // Only generate End if err != nil.
+ // If err == nil, it's not the last RecvMsg.
+ // The last RecvMsg gets either an RPC error or io.EOF.
+ end := &stats.End{
+ Client: true,
+ EndTime: time.Now(),
+ }
+ if err != io.EOF {
+ end.Error = toRPCErr(err)
+ }
+ stats.HandleRPC(cs.statsCtx, end)
+ }
+ }()
+ var inPayload *stats.InPayload
+ if stats.On() {
+ inPayload = &stats.InPayload{
+ Client: true,
+ }
+ }
+ err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inPayload)
defer func() {
// err != nil indicates the termination of the stream.
if err != nil {
@@ -324,11 +394,15 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
}
cs.mu.Unlock()
}
+ if inPayload != nil {
+ stats.HandleRPC(cs.statsCtx, inPayload)
+ }
if !cs.desc.ClientStreams || cs.desc.ServerStreams {
return
}
// Special handling for client streaming rpc.
- err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
+ // This recv expects EOF or errors, so we don't collect inPayload.
+ err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, nil)
cs.closeTransportStream(err)
if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
@@ -384,6 +458,11 @@ func (cs *clientStream) closeTransportStream(err error) {
}
func (cs *clientStream) finish(err error) {
+ defer func() {
+ if cs.cancel != nil {
+ cs.cancel()
+ }
+ }()
cs.mu.Lock()
defer cs.mu.Unlock()
for _, o := range cs.opts {
@@ -482,7 +561,11 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
ss.mu.Unlock()
}
}()
- out, err := encode(ss.codec, m, ss.cp, ss.cbuf)
+ var outPayload *stats.OutPayload
+ if stats.On() {
+ outPayload = &stats.OutPayload{}
+ }
+ out, err := encode(ss.codec, m, ss.cp, ss.cbuf, outPayload)
defer func() {
if ss.cbuf != nil {
ss.cbuf.Reset()
@@ -495,6 +578,10 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
+ if outPayload != nil {
+ outPayload.SentTime = time.Now()
+ stats.HandleRPC(ss.s.Context(), outPayload)
+ }
return nil
}
@@ -513,7 +600,11 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
ss.mu.Unlock()
}
}()
- if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize); err != nil {
+ var inPayload *stats.InPayload
+ if stats.On() {
+ inPayload = &stats.InPayload{}
+ }
+ if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, inPayload); err != nil {
if err == io.EOF {
return err
}
@@ -522,5 +613,8 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}
return toRPCErr(err)
}
+ if inPayload != nil {
+ stats.HandleRPC(ss.s.Context(), inPayload)
+ }
return nil
}
diff --git a/vendor/google.golang.org/grpc/tap/tap.go b/vendor/google.golang.org/grpc/tap/tap.go
new file mode 100644
index 0000000..0f36647
--- /dev/null
+++ b/vendor/google.golang.org/grpc/tap/tap.go
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+// Package tap defines the function handles which are executed on the transport
+// layer of gRPC-Go and related information. Everything here is EXPERIMENTAL.
+package tap
+
+import (
+ "golang.org/x/net/context"
+)
+
+// Info defines the relevant information needed by the handles.
+type Info struct {
+ // FullMethodName is the string of grpc method (in the format of
+ // /package.service/method).
+ FullMethodName string
+ // TODO: More to be added.
+}
+
+// ServerInHandle defines the function which runs when a new stream is created
+// on the server side. Note that it is executed in the per-connection I/O goroutine(s) instead
+// of per-RPC goroutine. Therefore, users should NOT have any blocking/time-consuming
+// work in this handle. Otherwise all the RPCs would slow down.
+type ServerInHandle func(ctx context.Context, info *Info) (context.Context, error)
diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/control.go
index 4ef0830..2586cba 100644
--- a/vendor/google.golang.org/grpc/transport/control.go
+++ b/vendor/google.golang.org/grpc/transport/control.go
@@ -111,35 +111,9 @@ func newQuotaPool(q int) *quotaPool {
return qb
}
-// add adds n to the available quota and tries to send it on acquire.
-func (qb *quotaPool) add(n int) {
- qb.mu.Lock()
- defer qb.mu.Unlock()
- qb.quota += n
- if qb.quota <= 0 {
- return
- }
- select {
- case qb.c <- qb.quota:
- qb.quota = 0
- default:
- }
-}
-
-// cancel cancels the pending quota sent on acquire, if any.
-func (qb *quotaPool) cancel() {
- qb.mu.Lock()
- defer qb.mu.Unlock()
- select {
- case n := <-qb.c:
- qb.quota += n
- default:
- }
-}
-
-// reset cancels the pending quota sent on acquired, incremented by v and sends
+// add cancels the pending quota sent on acquired, incremented by v and sends
// it back on acquire.
-func (qb *quotaPool) reset(v int) {
+func (qb *quotaPool) add(v int) {
qb.mu.Lock()
defer qb.mu.Unlock()
select {
@@ -151,6 +125,10 @@ func (qb *quotaPool) reset(v int) {
if qb.quota <= 0 {
return
}
+ // After the pool has been created, this is the only place that sends on
+ // the channel. Since mu is held at this point and any quota that was sent
+ // on the channel has been retrieved, we know that this code will always
+ // place any positive quota value on the channel.
select {
case qb.c <- qb.quota:
qb.quota = 0
diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/transport/handler_server.go
index 114e349..10b6dc0 100644
--- a/vendor/google.golang.org/grpc/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/transport/handler_server.go
@@ -268,7 +268,7 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
})
}
-func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
+func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
// With this transport type there will be exactly 1 stream: this HTTP request.
var ctx context.Context
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go
index 2b0f680..605b1e5 100644
--- a/vendor/google.golang.org/grpc/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/transport/http2_client.go
@@ -51,16 +51,20 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
+ "google.golang.org/grpc/stats"
)
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
- target string // server name/addr
- userAgent string
- md interface{}
- conn net.Conn // underlying communication channel
- authInfo credentials.AuthInfo // auth info about the connection
- nextID uint32 // the next stream ID to be used
+ ctx context.Context
+ target string // server name/addr
+ userAgent string
+ md interface{}
+ conn net.Conn // underlying communication channel
+ remoteAddr net.Addr
+ localAddr net.Addr
+ authInfo credentials.AuthInfo // auth info about the connection
+ nextID uint32 // the next stream ID to be used
// writableChan synchronizes write access to the transport.
// A writer acquires the write lock by sending a value on writableChan
@@ -150,6 +154,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
scheme := "http"
conn, err := dial(ctx, opts.Dialer, addr.Addr)
if err != nil {
+ if opts.FailOnNonTempDialError {
+ return nil, connectionErrorf(isTemporary(err), err, "transport: %v", err)
+ }
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Any further errors will close the underlying connection
@@ -175,11 +182,14 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
}
var buf bytes.Buffer
t := &http2Client{
- target: addr.Addr,
- userAgent: ua,
- md: addr.Metadata,
- conn: conn,
- authInfo: authInfo,
+ ctx: ctx,
+ target: addr.Addr,
+ userAgent: ua,
+ md: addr.Metadata,
+ conn: conn,
+ remoteAddr: conn.RemoteAddr(),
+ localAddr: conn.LocalAddr(),
+ authInfo: authInfo,
// The client initiated stream id is odd starting from 1.
nextID: 1,
writableChan: make(chan int, 1),
@@ -199,6 +209,16 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
maxStreams: math.MaxInt32,
streamSendQuota: defaultWindowSize,
}
+ if stats.On() {
+ t.ctx = stats.TagConn(t.ctx, &stats.ConnTagInfo{
+ RemoteAddr: t.remoteAddr,
+ LocalAddr: t.localAddr,
+ })
+ connBegin := &stats.ConnBegin{
+ Client: true,
+ }
+ stats.HandleConn(t.ctx, connBegin)
+ }
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
// dispatches the frame to the corresponding stream entity.
@@ -270,12 +290,13 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// streams.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
pr := &peer.Peer{
- Addr: t.conn.RemoteAddr(),
+ Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
pr.AuthInfo = t.authInfo
}
+ userCtx := ctx
ctx = peer.NewContext(ctx, pr)
authData := make(map[string]string)
for _, c := range t.creds {
@@ -347,6 +368,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, ErrConnClosing
}
s := t.newStream(ctx, callHdr)
+ s.clientStatsCtx = userCtx
t.activeStreams[s.id] = s
// This stream is not counted when applySetings(...) initialize t.streamsQuota.
@@ -357,7 +379,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
t.mu.Unlock()
if reset {
- t.streamsQuota.reset(-1)
+ t.streamsQuota.add(-1)
}
// HPACK encodes various headers. Note that once WriteField(...) is
@@ -413,6 +435,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
}
first := true
+ bufLen := t.hBuf.Len()
// Sends the headers in a single batch even when they span multiple frames.
for !endHeaders {
size := t.hBuf.Len()
@@ -447,6 +470,17 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
+ if stats.On() {
+ outHeader := &stats.OutHeader{
+ Client: true,
+ WireLength: bufLen,
+ FullMethod: callHdr.Method,
+ RemoteAddr: t.remoteAddr,
+ LocalAddr: t.localAddr,
+ Compression: callHdr.SendCompress,
+ }
+ stats.HandleRPC(s.clientStatsCtx, outHeader)
+ }
t.writableChan <- 0
return s, nil
}
@@ -525,6 +559,12 @@ func (t *http2Client) Close() (err error) {
s.mu.Unlock()
s.write(recvMsg{err: ErrConnClosing})
}
+ if stats.On() {
+ connEnd := &stats.ConnEnd{
+ Client: true,
+ }
+ stats.HandleConn(t.ctx, connEnd)
+ }
return
}
@@ -582,19 +622,14 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
var p []byte
if r.Len() > 0 {
size := http2MaxFrameLen
- s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
- t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
- if _, ok := err.(StreamError); ok || err == io.EOF {
- t.sendQuotaPool.cancel()
- }
return err
}
if sq < size {
@@ -874,6 +909,24 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
endStream := frame.StreamEnded()
+ var isHeader bool
+ defer func() {
+ if stats.On() {
+ if isHeader {
+ inHeader := &stats.InHeader{
+ Client: true,
+ WireLength: int(frame.Header().Length),
+ }
+ stats.HandleRPC(s.clientStatsCtx, inHeader)
+ } else {
+ inTrailer := &stats.InTrailer{
+ Client: true,
+ WireLength: int(frame.Header().Length),
+ }
+ stats.HandleRPC(s.clientStatsCtx, inTrailer)
+ }
+ }
+ }()
s.mu.Lock()
if !endStream {
@@ -885,6 +938,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
close(s.headerChan)
s.headerDone = true
+ isHeader = true
}
if !endStream || s.state == streamDone {
s.mu.Unlock()
@@ -994,13 +1048,13 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
t.maxStreams = int(s.Val)
t.mu.Unlock()
if reset {
- t.streamsQuota.reset(int(s.Val) - ms)
+ t.streamsQuota.add(int(s.Val) - ms)
}
case http2.SettingInitialWindowSize:
t.mu.Lock()
for _, stream := range t.activeStreams {
// Adjust the sending quota for each stream.
- stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
+ stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota))
}
t.streamSendQuota = s.Val
t.mu.Unlock()
diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go
index a62fb7c..316188e 100644
--- a/vendor/google.golang.org/grpc/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/transport/http2_server.go
@@ -50,6 +50,8 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
+ "google.golang.org/grpc/stats"
+ "google.golang.org/grpc/tap"
)
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
@@ -58,9 +60,13 @@ var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHe
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
+ ctx context.Context
conn net.Conn
+ remoteAddr net.Addr
+ localAddr net.Addr
maxStreamID uint32 // max stream ID ever seen
authInfo credentials.AuthInfo // auth info about the connection
+ inTapHandle tap.ServerInHandle
// writableChan synchronizes write access to the transport.
// A writer acquires the write lock by receiving a value on writableChan
// and releases it by sending on writableChan.
@@ -91,12 +97,13 @@ type http2Server struct {
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
-func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) {
+func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
framer := newFramer(conn)
// Send initial settings as connection preface to client.
var settings []http2.Setting
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
+ maxStreams := config.MaxStreams
if maxStreams == 0 {
maxStreams = math.MaxUint32
} else {
@@ -121,12 +128,16 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
}
var buf bytes.Buffer
t := &http2Server{
+ ctx: context.Background(),
conn: conn,
- authInfo: authInfo,
+ remoteAddr: conn.RemoteAddr(),
+ localAddr: conn.LocalAddr(),
+ authInfo: config.AuthInfo,
framer: framer,
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
maxStreams: maxStreams,
+ inTapHandle: config.InTapHandle,
controlBuf: newRecvBuffer(),
fc: &inFlow{limit: initialConnWindowSize},
sendQuotaPool: newQuotaPool(defaultWindowSize),
@@ -136,13 +147,21 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
activeStreams: make(map[uint32]*Stream),
streamSendQuota: defaultWindowSize,
}
+ if stats.On() {
+ t.ctx = stats.TagConn(t.ctx, &stats.ConnTagInfo{
+ RemoteAddr: t.remoteAddr,
+ LocalAddr: t.localAddr,
+ })
+ connBegin := &stats.ConnBegin{}
+ stats.HandleConn(t.ctx, connBegin)
+ }
go t.controller()
t.writableChan <- 0
return t, nil
}
// operateHeader takes action on the decoded headers.
-func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
+func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
buf := newRecvBuffer()
s := &Stream{
id: frame.Header().StreamID,
@@ -168,12 +187,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
s.recvCompress = state.encoding
if state.timeoutSet {
- s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
+ s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
} else {
- s.ctx, s.cancel = context.WithCancel(context.TODO())
+ s.ctx, s.cancel = context.WithCancel(t.ctx)
}
pr := &peer.Peer{
- Addr: t.conn.RemoteAddr(),
+ Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
@@ -195,6 +214,18 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
s.recvCompress = state.encoding
s.method = state.method
+ if t.inTapHandle != nil {
+ var err error
+ info := &tap.Info{
+ FullMethodName: state.method,
+ }
+ s.ctx, err = t.inTapHandle(s.ctx, info)
+ if err != nil {
+ // TODO: Log the real error.
+ t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
+ return
+ }
+ }
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
@@ -218,13 +249,26 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
}
+ s.ctx = traceCtx(s.ctx, s.method)
+ if stats.On() {
+ s.ctx = stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
+ inHeader := &stats.InHeader{
+ FullMethod: s.method,
+ RemoteAddr: t.remoteAddr,
+ LocalAddr: t.localAddr,
+ Compression: s.recvCompress,
+ WireLength: int(frame.Header().Length),
+ }
+ stats.HandleRPC(s.ctx, inHeader)
+ }
handle(s)
return
}
// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
-func (t *http2Server) HandleStreams(handle func(*Stream)) {
+// traceCtx attaches trace to ctx and returns the new context.
+func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
// Check the validity of client preface.
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(t.conn, preface); err != nil {
@@ -279,7 +323,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
- if t.operateHeaders(frame, handle) {
+ if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
break
}
@@ -492,9 +536,16 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
}
}
+ bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, false); err != nil {
return err
}
+ if stats.On() {
+ outHeader := &stats.OutHeader{
+ WireLength: bufLen,
+ }
+ stats.HandleRPC(s.Context(), outHeader)
+ }
t.writableChan <- 0
return nil
}
@@ -547,10 +598,17 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
}
}
+ bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, true); err != nil {
t.Close()
return err
}
+ if stats.On() {
+ outTrailer := &stats.OutTrailer{
+ WireLength: bufLen,
+ }
+ stats.HandleRPC(s.Context(), outTrailer)
+ }
t.closeStream(s)
t.writableChan <- 0
return nil
@@ -579,19 +637,14 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
return nil
}
size := http2MaxFrameLen
- s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
- t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
- if _, ok := err.(StreamError); ok {
- t.sendQuotaPool.cancel()
- }
return err
}
if sq < size {
@@ -659,7 +712,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
t.mu.Lock()
defer t.mu.Unlock()
for _, stream := range t.activeStreams {
- stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
+ stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota))
}
t.streamSendQuota = s.Val
}
@@ -736,6 +789,10 @@ func (t *http2Server) Close() (err error) {
for _, s := range streams {
s.cancel()
}
+ if stats.On() {
+ connEnd := &stats.ConnEnd{}
+ stats.HandleConn(t.ctx, connEnd)
+ }
return
}
@@ -767,7 +824,7 @@ func (t *http2Server) closeStream(s *Stream) {
}
func (t *http2Server) RemoteAddr() net.Addr {
- return t.conn.RemoteAddr()
+ return t.remoteAddr
}
func (t *http2Server) Drain() {
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go
index 413f749..4726bb2 100644
--- a/vendor/google.golang.org/grpc/transport/transport.go
+++ b/vendor/google.golang.org/grpc/transport/transport.go
@@ -45,10 +45,10 @@ import (
"sync"
"golang.org/x/net/context"
- "golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/tap"
)
// recvMsg represents the received msg from the transport. All transport
@@ -167,6 +167,11 @@ type Stream struct {
id uint32
// nil for client side Stream.
st ServerTransport
+ // clientStatsCtx keeps the user context for stats handling.
+ // It's only valid on client side. Server side stats context is same as s.ctx.
+ // All client side stats collection should use the clientStatsCtx (instead of the stream context)
+ // so that all the generated stats for a particular RPC can be associated in the processing phase.
+ clientStatsCtx context.Context
// ctx is the associated context of the stream.
ctx context.Context
// cancel is always nil for client side Stream.
@@ -266,11 +271,6 @@ func (s *Stream) Context() context.Context {
return s.ctx
}
-// TraceContext recreates the context of s with a trace.Trace.
-func (s *Stream) TraceContext(tr trace.Trace) {
- s.ctx = trace.NewContext(s.ctx, tr)
-}
-
// Method returns the method for the stream.
func (s *Stream) Method() string {
return s.method
@@ -355,10 +355,17 @@ const (
draining
)
+// ServerConfig consists of all the configurations to establish a server transport.
+type ServerConfig struct {
+ MaxStreams uint32
+ AuthInfo credentials.AuthInfo
+ InTapHandle tap.ServerInHandle
+}
+
// NewServerTransport creates a ServerTransport with conn or non-nil error
// if it fails.
-func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (ServerTransport, error) {
- return newHTTP2Server(conn, maxStreams, authInfo)
+func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
+ return newHTTP2Server(conn, config)
}
// ConnectOptions covers all relevant options for communicating with the server.
@@ -367,6 +374,8 @@ type ConnectOptions struct {
UserAgent string
// Dialer specifies how to dial a network address.
Dialer func(context.Context, string) (net.Conn, error)
+ // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
+ FailOnNonTempDialError bool
// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
@@ -466,7 +475,7 @@ type ClientTransport interface {
// Write methods for a given Stream will be called serially.
type ServerTransport interface {
// HandleStreams receives incoming streams using the given handler.
- HandleStreams(func(*Stream))
+ HandleStreams(func(*Stream), func(context.Context, string) context.Context)
// WriteHeader sends the header metadata for the given stream.
// WriteHeader may not be called on all streams.