aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc')
-rw-r--r--vendor/google.golang.org/grpc/README.md18
-rw-r--r--vendor/google.golang.org/grpc/call.go10
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go131
-rw-r--r--vendor/google.golang.org/grpc/credentials/credentials.go4
-rw-r--r--vendor/google.golang.org/grpc/credentials/credentials_util_go17.go3
-rw-r--r--vendor/google.golang.org/grpc/credentials/credentials_util_go18.go (renamed from vendor/google.golang.org/grpc/transport/pre_go16.go)28
-rw-r--r--vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go2
-rw-r--r--vendor/google.golang.org/grpc/interceptor.go2
-rw-r--r--vendor/google.golang.org/grpc/keepalive/keepalive.go80
-rw-r--r--vendor/google.golang.org/grpc/metadata/metadata.go38
-rw-r--r--vendor/google.golang.org/grpc/rpc_util.go82
-rw-r--r--vendor/google.golang.org/grpc/server.go216
-rw-r--r--vendor/google.golang.org/grpc/stats/stats.go2
-rw-r--r--vendor/google.golang.org/grpc/status/status.go136
-rw-r--r--vendor/google.golang.org/grpc/stream.go70
-rw-r--r--vendor/google.golang.org/grpc/transport/control.go18
-rw-r--r--vendor/google.golang.org/grpc/transport/handler_server.go15
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_client.go252
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_server.go273
-rw-r--r--vendor/google.golang.org/grpc/transport/http_util.go74
-rw-r--r--vendor/google.golang.org/grpc/transport/transport.go61
21 files changed, 1082 insertions, 433 deletions
diff --git a/vendor/google.golang.org/grpc/README.md b/vendor/google.golang.org/grpc/README.md
index 39120c2..4a65571 100644
--- a/vendor/google.golang.org/grpc/README.md
+++ b/vendor/google.golang.org/grpc/README.md
@@ -16,23 +16,7 @@ $ go get google.golang.org/grpc
Prerequisites
-------------
-This requires Go 1.5 or later.
-
-A note on the version used: significant performance improvements in benchmarks
-of grpc-go have been seen by upgrading the go version from 1.5 to the latest
-1.7.1.
-
-From https://golang.org/doc/install, one way to install the latest version of go is:
-```
-$ GO_VERSION=1.7.1
-$ OS=linux
-$ ARCH=amd64
-$ curl -O https://storage.googleapis.com/golang/go${GO_VERSION}.${OS}-${ARCH}.tar.gz
-$ sudo tar -C /usr/local -xzf go$GO_VERSION.$OS-$ARCH.tar.gz
-$ # Put go on the PATH, keep the usual installation dir
-$ sudo ln -s /usr/local/go/bin/go /usr/bin/go
-$ rm go$GO_VERSION.$OS-$ARCH.tar.gz
-```
+This requires Go 1.6 or later.
Constraints
-----------
diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go
index 81b52be..13ca5b7 100644
--- a/vendor/google.golang.org/grpc/call.go
+++ b/vendor/google.golang.org/grpc/call.go
@@ -36,7 +36,6 @@ package grpc
import (
"bytes"
"io"
- "math"
"time"
"golang.org/x/net/context"
@@ -44,6 +43,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
+ "google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)
@@ -73,14 +73,14 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
}
}
for {
- if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inPayload); err != nil {
+ if err = recv(p, dopts.codec, stream, dopts.dc, reply, dopts.maxMsgSize, inPayload); err != nil {
if err == io.EOF {
break
}
return
}
}
- if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK {
+ if inPayload != nil && err == io.EOF && stream.Status().Code() == codes.OK {
// TODO in the current implementation, inTrailer may be handled before inPayload in some cases.
// Fix the order if necessary.
dopts.copts.StatsHandler.HandleRPC(ctx, inPayload)
@@ -231,7 +231,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
- if _, ok := err.(*rpcError); ok {
+ if _, ok := status.FromError(err); ok {
return err
}
if err == errConnClosing || err == errConnUnavailable {
@@ -285,6 +285,6 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
put()
put = nil
}
- return Errorf(stream.StatusCode(), "%s", stream.StatusDesc())
+ return stream.Status().Err()
}
}
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 459ce0b..0879ef0 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -36,8 +36,8 @@ package grpc
import (
"errors"
"fmt"
+ "math"
"net"
- "strings"
"sync"
"time"
@@ -45,6 +45,7 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@@ -78,7 +79,6 @@ var (
errConnClosing = errors.New("grpc: the connection is closing")
// errConnUnavailable indicates that the connection is unavailable.
errConnUnavailable = errors.New("grpc: the connection is unavailable")
- errNoAddr = errors.New("grpc: there is no address available to dial")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
@@ -86,23 +86,33 @@ var (
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
- unaryInt UnaryClientInterceptor
- streamInt StreamClientInterceptor
- codec Codec
- cp Compressor
- dc Decompressor
- bs backoffStrategy
- balancer Balancer
- block bool
- insecure bool
- timeout time.Duration
- scChan <-chan ServiceConfig
- copts transport.ConnectOptions
-}
+ unaryInt UnaryClientInterceptor
+ streamInt StreamClientInterceptor
+ codec Codec
+ cp Compressor
+ dc Decompressor
+ bs backoffStrategy
+ balancer Balancer
+ block bool
+ insecure bool
+ timeout time.Duration
+ scChan <-chan ServiceConfig
+ copts transport.ConnectOptions
+ maxMsgSize int
+}
+
+const defaultClientMaxMsgSize = math.MaxInt32
// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)
+// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive.
+func WithMaxMsgSize(s int) DialOption {
+ return func(o *dialOptions) {
+ o.maxMsgSize = s
+ }
+}
+
// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
func WithCodec(c Codec) DialOption {
return func(o *dialOptions) {
@@ -249,6 +259,13 @@ func WithUserAgent(s string) DialOption {
}
}
+// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
+func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
+ return func(o *dialOptions) {
+ o.copts.KeepaliveParams = kp
+ }
+}
+
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return func(o *dialOptions) {
@@ -288,9 +305,18 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
conns: make(map[Address]*addrConn),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
+ cc.dopts.maxMsgSize = defaultClientMaxMsgSize
for _, opt := range opts {
opt(&cc.dopts)
}
+
+ grpcUA := "grpc-go/" + Version
+ if cc.dopts.copts.UserAgent != "" {
+ cc.dopts.copts.UserAgent += " " + grpcUA
+ } else {
+ cc.dopts.copts.UserAgent = grpcUA
+ }
+
if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
@@ -333,23 +359,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
} else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
cc.authority = cc.dopts.copts.Authority
} else {
- colonPos := strings.LastIndex(target, ":")
- if colonPos == -1 {
- colonPos = len(target)
- }
- cc.authority = target[:colonPos]
+ cc.authority = target
}
- var ok bool
waitC := make(chan error, 1)
go func() {
- var addrs []Address
+ defer close(waitC)
if cc.dopts.balancer == nil && cc.sc.LB != nil {
cc.dopts.balancer = cc.sc.LB
}
- if cc.dopts.balancer == nil {
- // Connect to target directly if balancer is nil.
- addrs = append(addrs, Address{Addr: target})
- } else {
+ if cc.dopts.balancer != nil {
var credsClone credentials.TransportCredentials
if creds != nil {
credsClone = creds.Clone()
@@ -362,24 +380,22 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
return
}
ch := cc.dopts.balancer.Notify()
- if ch == nil {
- // There is no name resolver installed.
- addrs = append(addrs, Address{Addr: target})
- } else {
- addrs, ok = <-ch
- if !ok || len(addrs) == 0 {
- waitC <- errNoAddr
- return
+ if ch != nil {
+ if cc.dopts.block {
+ doneChan := make(chan struct{})
+ go cc.lbWatcher(doneChan)
+ <-doneChan
+ } else {
+ go cc.lbWatcher(nil)
}
- }
- }
- for _, a := range addrs {
- if err := cc.resetAddrConn(a, false, nil); err != nil {
- waitC <- err
return
}
}
- close(waitC)
+ // No balancer, or no resolver within the balancer. Connect directly.
+ if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil {
+ waitC <- err
+ return
+ }
}()
select {
case <-ctx.Done():
@@ -390,15 +406,10 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}
- // If balancer is nil or balancer.Notify() is nil, ok will be false here.
- // The lbWatcher goroutine will not be created.
- if ok {
- go cc.lbWatcher()
- }
-
if cc.dopts.scChan != nil {
go cc.scWatcher()
}
+
return cc, nil
}
@@ -449,7 +460,10 @@ type ClientConn struct {
conns map[Address]*addrConn
}
-func (cc *ClientConn) lbWatcher() {
+// lbWatcher watches the Notify channel of the balancer in cc and manages
+// connections accordingly. If doneChan is not nil, it is closed after the
+// first successfull connection is made.
+func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
for addrs := range cc.dopts.balancer.Notify() {
var (
add []Address // Addresses need to setup connections.
@@ -476,7 +490,15 @@ func (cc *ClientConn) lbWatcher() {
}
cc.mu.Unlock()
for _, a := range add {
- cc.resetAddrConn(a, true, nil)
+ if doneChan != nil {
+ err := cc.resetAddrConn(a, true, nil)
+ if err == nil {
+ close(doneChan)
+ doneChan = nil
+ }
+ } else {
+ cc.resetAddrConn(a, false, nil)
+ }
}
for _, c := range del {
c.tearDown(errConnDrain)
@@ -505,7 +527,7 @@ func (cc *ClientConn) scWatcher() {
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
-func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
+func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
ac := &addrConn{
cc: cc,
addr: addr,
@@ -555,8 +577,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err
stale.tearDown(tearDownErr)
}
}
- // skipWait may overwrite the decision in ac.dopts.block.
- if ac.dopts.block && !skipWait {
+ if block {
if err := ac.resetTransport(false); err != nil {
if err != errConnClosing {
// Tear down ac and delete it from cc.conns.
@@ -777,6 +798,8 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
Metadata: ac.addr.Metadata,
}
newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
+ // Don't call cancel in success path due to a race in Go 1.6:
+ // https://github.com/golang/go/issues/15078.
if err != nil {
cancel()
@@ -855,9 +878,9 @@ func (ac *addrConn) transportMonitor() {
// In both cases, a new ac is created.
select {
case <-t.Error():
- ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
+ ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
default:
- ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
+ ac.cc.resetAddrConn(ac.addr, false, errConnDrain)
}
return
case <-t.Error():
@@ -866,7 +889,7 @@ func (ac *addrConn) transportMonitor() {
t.Close()
return
case <-t.GoAway():
- ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
+ ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
return
default:
}
diff --git a/vendor/google.golang.org/grpc/credentials/credentials.go b/vendor/google.golang.org/grpc/credentials/credentials.go
index 4d45c3e..a8114d6 100644
--- a/vendor/google.golang.org/grpc/credentials/credentials.go
+++ b/vendor/google.golang.org/grpc/credentials/credentials.go
@@ -102,6 +102,10 @@ type TransportCredentials interface {
// authentication protocol on rawConn for clients. It returns the authenticated
// connection and the corresponding auth information about the connection.
// Implementations must use the provided context to implement timely cancellation.
+ // gRPC will try to reconnect if the error returned is a temporary error
+ // (io.EOF, context.DeadlineExceeded or err.Temporary() == true).
+ // If the returned error is a wrapper error, implementations should make sure that
+ // the error implements Temporary() to have the correct retry behaviors.
ClientHandshake(context.Context, string, net.Conn) (net.Conn, AuthInfo, error)
// ServerHandshake does the authentication handshake for servers. It returns
// the authenticated connection and the corresponding auth information about
diff --git a/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go b/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go
index 9647b9e..7597b09 100644
--- a/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go
+++ b/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go
@@ -1,4 +1,5 @@
// +build go1.7
+// +build !go1.8
/*
*
@@ -44,8 +45,6 @@ import (
// contains a mutex and must not be copied.
//
// If cfg is nil, a new zero tls.Config is returned.
-//
-// TODO replace this function with official clone function.
func cloneTLSConfig(cfg *tls.Config) *tls.Config {
if cfg == nil {
return &tls.Config{}
diff --git a/vendor/google.golang.org/grpc/transport/pre_go16.go b/vendor/google.golang.org/grpc/credentials/credentials_util_go18.go
index 33d91c1..0ecf342 100644
--- a/vendor/google.golang.org/grpc/transport/pre_go16.go
+++ b/vendor/google.golang.org/grpc/credentials/credentials_util_go18.go
@@ -1,7 +1,8 @@
-// +build !go1.6
+// +build go1.8
/*
- * Copyright 2016, Google Inc.
+ *
+ * Copyright 2017, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -32,20 +33,21 @@
*
*/
-package transport
+package credentials
import (
- "net"
- "time"
-
- "golang.org/x/net/context"
+ "crypto/tls"
)
-// dialContext connects to the address on the named network.
-func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
- var dialer net.Dialer
- if deadline, ok := ctx.Deadline(); ok {
- dialer.Timeout = deadline.Sub(time.Now())
+// cloneTLSConfig returns a shallow clone of the exported
+// fields of cfg, ignoring the unexported sync.Once, which
+// contains a mutex and must not be copied.
+//
+// If cfg is nil, a new zero tls.Config is returned.
+func cloneTLSConfig(cfg *tls.Config) *tls.Config {
+ if cfg == nil {
+ return &tls.Config{}
}
- return dialer.Dial(network, address)
+
+ return cfg.Clone()
}
diff --git a/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go b/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go
index 09b8d12..cfd40df 100644
--- a/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go
+++ b/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go
@@ -44,8 +44,6 @@ import (
// contains a mutex and must not be copied.
//
// If cfg is nil, a new zero tls.Config is returned.
-//
-// TODO replace this function with official clone function.
func cloneTLSConfig(cfg *tls.Config) *tls.Config {
if cfg == nil {
return &tls.Config{}
diff --git a/vendor/google.golang.org/grpc/interceptor.go b/vendor/google.golang.org/grpc/interceptor.go
index 8d932ef..a692161 100644
--- a/vendor/google.golang.org/grpc/interceptor.go
+++ b/vendor/google.golang.org/grpc/interceptor.go
@@ -40,7 +40,7 @@ import (
// UnaryInvoker is called by UnaryClientInterceptor to complete RPCs.
type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error
-// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. inovker is the handler to complete the RPC
+// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. invoker is the handler to complete the RPC
// and it is the responsibility of the interceptor to call it.
// This is the EXPERIMENTAL API.
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
diff --git a/vendor/google.golang.org/grpc/keepalive/keepalive.go b/vendor/google.golang.org/grpc/keepalive/keepalive.go
new file mode 100644
index 0000000..d492589
--- /dev/null
+++ b/vendor/google.golang.org/grpc/keepalive/keepalive.go
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+// Package keepalive defines configurable parameters for point-to-point healthcheck.
+package keepalive
+
+import (
+ "time"
+)
+
+// ClientParameters is used to set keepalive parameters on the client-side.
+// These configure how the client will actively probe to notice when a connection broken
+// and to cause activity so intermediaries are aware the connection is still in use.
+// Make sure these parameters are set in coordination with the keepalive policy on the server,
+// as incompatible settings can result in closing of connection.
+type ClientParameters struct {
+ // After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive.
+ Time time.Duration // The current default value is infinity.
+ // After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that
+ // the connection is closed.
+ Timeout time.Duration // The current default value is 20 seconds.
+ // If true, client runs keepalive checks even with no active RPCs.
+ PermitWithoutStream bool // false by default.
+}
+
+// ServerParameters is used to set keepalive and max-age parameters on the server-side.
+type ServerParameters struct {
+ // MaxConnectionIdle is a duration for the amount of time after which an idle connection would be closed by sending a GoAway.
+ // Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment.
+ MaxConnectionIdle time.Duration // The current default value is infinity.
+ // MaxConnectionAge is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway.
+ // A random jitter of +/-10% will be added to MaxConnectionAge to spread out connection storms.
+ MaxConnectionAge time.Duration // The current default value is infinity.
+ // MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
+ MaxConnectionAgeGrace time.Duration // The current default value is infinity.
+ // After a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive.
+ Time time.Duration // The current default value is 2 hours.
+ // After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that
+ // the connection is closed.
+ Timeout time.Duration // The current default value is 20 seconds.
+}
+
+// EnforcementPolicy is used to set keepalive enforcement policy on the server-side.
+// Server will close connection with a client that violates this policy.
+type EnforcementPolicy struct {
+ // MinTime is the minimum amount of time a client should wait before sending a keepalive ping.
+ MinTime time.Duration // The current default value is 5 minutes.
+ // If true, server expects keepalive pings even when there are no active streams(RPCs).
+ PermitWithoutStream bool // false by default.
+}
diff --git a/vendor/google.golang.org/grpc/metadata/metadata.go b/vendor/google.golang.org/grpc/metadata/metadata.go
index 7332395..7ca4418 100644
--- a/vendor/google.golang.org/grpc/metadata/metadata.go
+++ b/vendor/google.golang.org/grpc/metadata/metadata.go
@@ -136,17 +136,41 @@ func Join(mds ...MD) MD {
return out
}
-type mdKey struct{}
+type mdIncomingKey struct{}
+type mdOutgoingKey struct{}
-// NewContext creates a new context with md attached.
+// NewContext is a wrapper for NewOutgoingContext(ctx, md). Deprecated.
func NewContext(ctx context.Context, md MD) context.Context {
- return context.WithValue(ctx, mdKey{}, md)
+ return NewOutgoingContext(ctx, md)
}
-// FromContext returns the MD in ctx if it exists.
-// The returned md should be immutable, writing to it may cause races.
-// Modification should be made to the copies of the returned md.
+// NewIncomingContext creates a new context with incoming md attached.
+func NewIncomingContext(ctx context.Context, md MD) context.Context {
+ return context.WithValue(ctx, mdIncomingKey{}, md)
+}
+
+// NewOutgoingContext creates a new context with outgoing md attached.
+func NewOutgoingContext(ctx context.Context, md MD) context.Context {
+ return context.WithValue(ctx, mdOutgoingKey{}, md)
+}
+
+// FromContext is a wrapper for FromIncomingContext(ctx). Deprecated.
func FromContext(ctx context.Context) (md MD, ok bool) {
- md, ok = ctx.Value(mdKey{}).(MD)
+ return FromIncomingContext(ctx)
+}
+
+// FromIncomingContext returns the incoming MD in ctx if it exists. The
+// returned md should be immutable, writing to it may cause races.
+// Modification should be made to the copies of the returned md.
+func FromIncomingContext(ctx context.Context) (md MD, ok bool) {
+ md, ok = ctx.Value(mdIncomingKey{}).(MD)
+ return
+}
+
+// FromOutgoingContext returns the outgoing MD in ctx if it exists. The
+// returned md should be immutable, writing to it may cause races.
+// Modification should be made to the copies of the returned md.
+func FromOutgoingContext(ctx context.Context) (md MD, ok bool) {
+ md, ok = ctx.Value(mdOutgoingKey{}).(MD)
return
}
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
index d98a883..db56a88 100644
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -37,7 +37,6 @@ import (
"bytes"
"compress/gzip"
"encoding/binary"
- "fmt"
"io"
"io/ioutil"
"math"
@@ -50,6 +49,7 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
+ "google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)
@@ -189,7 +189,9 @@ func Trailer(md *metadata.MD) CallOption {
// unary RPC.
func Peer(peer *peer.Peer) CallOption {
return afterCall(func(c *callInfo) {
- *peer = *c.peer
+ if c.peer != nil {
+ *peer = *c.peer
+ }
})
}
@@ -370,88 +372,57 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
return nil
}
-// rpcError defines the status from an RPC.
-type rpcError struct {
- code codes.Code
- desc string
-}
-
-func (e *rpcError) Error() string {
- return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc)
-}
-
// Code returns the error code for err if it was produced by the rpc system.
// Otherwise, it returns codes.Unknown.
+//
+// Deprecated; use status.FromError and Code method instead.
func Code(err error) codes.Code {
- if err == nil {
- return codes.OK
- }
- if e, ok := err.(*rpcError); ok {
- return e.code
+ if s, ok := status.FromError(err); ok {
+ return s.Code()
}
return codes.Unknown
}
// ErrorDesc returns the error description of err if it was produced by the rpc system.
// Otherwise, it returns err.Error() or empty string when err is nil.
+//
+// Deprecated; use status.FromError and Message method instead.
func ErrorDesc(err error) string {
- if err == nil {
- return ""
- }
- if e, ok := err.(*rpcError); ok {
- return e.desc
+ if s, ok := status.FromError(err); ok {
+ return s.Message()
}
return err.Error()
}
// Errorf returns an error containing an error code and a description;
// Errorf returns nil if c is OK.
+//
+// Deprecated; use status.Errorf instead.
func Errorf(c codes.Code, format string, a ...interface{}) error {
- if c == codes.OK {
- return nil
- }
- return &rpcError{
- code: c,
- desc: fmt.Sprintf(format, a...),
- }
+ return status.Errorf(c, format, a...)
}
-// toRPCErr converts an error into a rpcError.
+// toRPCErr converts an error into an error from the status package.
func toRPCErr(err error) error {
- switch e := err.(type) {
- case *rpcError:
+ if _, ok := status.FromError(err); ok {
return err
+ }
+ switch e := err.(type) {
case transport.StreamError:
- return &rpcError{
- code: e.Code,
- desc: e.Desc,
- }
+ return status.Error(e.Code, e.Desc)
case transport.ConnectionError:
- return &rpcError{
- code: codes.Internal,
- desc: e.Desc,
- }
+ return status.Error(codes.Internal, e.Desc)
default:
switch err {
case context.DeadlineExceeded:
- return &rpcError{
- code: codes.DeadlineExceeded,
- desc: err.Error(),
- }
+ return status.Error(codes.DeadlineExceeded, err.Error())
case context.Canceled:
- return &rpcError{
- code: codes.Canceled,
- desc: err.Error(),
- }
+ return status.Error(codes.Canceled, err.Error())
case ErrClientConnClosing:
- return &rpcError{
- code: codes.FailedPrecondition,
- desc: err.Error(),
- }
+ return status.Error(codes.FailedPrecondition, err.Error())
}
-
}
- return Errorf(codes.Unknown, "%v", err)
+ return status.Error(codes.Unknown, err.Error())
}
// convertCode converts a standard Go error into its canonical code. Note that
@@ -527,3 +498,6 @@ type ServiceConfig struct {
// requires a synchronised update of grpc-go and protoc-gen-go. This constant
// should not be referenced from any other code.
const SupportPackageIsVersion4 = true
+
+// Version is the current grpc version.
+const Version = "1.3.0-dev"
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 985226d..b15f71c 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -53,8 +53,10 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
+ "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
+ "google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
"google.golang.org/grpc/transport"
)
@@ -116,6 +118,9 @@ type options struct {
statsHandler stats.Handler
maxConcurrentStreams uint32
useHandlerImpl bool // use http.Handler-based server
+ unknownStreamDesc *StreamDesc
+ keepaliveParams keepalive.ServerParameters
+ keepalivePolicy keepalive.EnforcementPolicy
}
var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
@@ -123,6 +128,20 @@ var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size l
// A ServerOption sets options.
type ServerOption func(*options)
+// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
+func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
+ return func(o *options) {
+ o.keepaliveParams = kp
+ }
+}
+
+// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
+func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
+ return func(o *options) {
+ o.keepalivePolicy = kep
+ }
+}
+
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
func CustomCodec(codec Codec) ServerOption {
return func(o *options) {
@@ -208,6 +227,24 @@ func StatsHandler(h stats.Handler) ServerOption {
}
}
+// UnknownServiceHandler returns a ServerOption that allows for adding a custom
+// unknown service handler. The provided method is a bidi-streaming RPC service
+// handler that will be invoked instead of returning the the "unimplemented" gRPC
+// error whenever a request is received for an unregistered service or method.
+// The handling function has full access to the Context of the request and the
+// stream, and the invocation passes through interceptors.
+func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
+ return func(o *options) {
+ o.unknownStreamDesc = &StreamDesc{
+ StreamName: "unknown_service_handler",
+ Handler: streamHandler,
+ // We need to assume that the users of the streamHandler will want to use both.
+ ClientStreams: true,
+ ServerStreams: true,
+ }
+ }
+}
+
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
@@ -446,10 +483,12 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
// transport.NewServerTransport).
func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
config := &transport.ServerConfig{
- MaxStreams: s.opts.maxConcurrentStreams,
- AuthInfo: authInfo,
- InTapHandle: s.opts.inTapHandle,
- StatsHandler: s.opts.statsHandler,
+ MaxStreams: s.opts.maxConcurrentStreams,
+ AuthInfo: authInfo,
+ InTapHandle: s.opts.inTapHandle,
+ StatsHandler: s.opts.statsHandler,
+ KeepaliveParams: s.opts.keepaliveParams,
+ KeepalivePolicy: s.opts.keepalivePolicy,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
@@ -633,7 +672,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
stream.SetSendCompress(s.opts.cp.Type())
}
p := &parser{r: stream}
- for {
+ for { // TODO: delete
pf, req, err := p.recvMsg(s.opts.maxMsgSize)
if err == io.EOF {
// The entire stream is done (for unary RPC only).
@@ -643,36 +682,37 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
}
if err != nil {
- switch err := err.(type) {
- case *rpcError:
- if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
+ if st, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, st); e != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
- case transport.ConnectionError:
- // Nothing to do here.
- case transport.StreamError:
- if e := t.WriteStatus(stream, err.Code, err.Desc); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ } else {
+ switch st := err.(type) {
+ case transport.ConnectionError:
+ // Nothing to do here.
+ case transport.StreamError:
+ if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+ default:
+ panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
}
- default:
- panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
}
return err
}
if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
- switch err := err.(type) {
- case *rpcError:
- if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
+ if st, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, st); e != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
return err
- default:
- if e := t.WriteStatus(stream, codes.Internal, err.Error()); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
}
+ if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+
+ // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
}
var inPayload *stats.InPayload
if sh != nil {
@@ -680,8 +720,6 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
RecvTime: time.Now(),
}
}
- statusCode := codes.OK
- statusDesc := ""
df := func(v interface{}) error {
if inPayload != nil {
inPayload.WireLength = len(req)
@@ -690,20 +728,16 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
var err error
req, err = s.opts.dc.Do(bytes.NewReader(req))
if err != nil {
- if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
- }
return Errorf(codes.Internal, err.Error())
}
}
if len(req) > s.opts.maxMsgSize {
// TODO: Revisit the error code. Currently keep it consistent with
// java implementation.
- statusCode = codes.Internal
- statusDesc = fmt.Sprintf("grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
+ return status.Errorf(codes.Internal, "grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
}
if err := s.opts.codec.Unmarshal(req, v); err != nil {
- return err
+ return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}
if inPayload != nil {
inPayload.Payload = v
@@ -718,21 +752,20 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
if appErr != nil {
- if err, ok := appErr.(*rpcError); ok {
- statusCode = err.code
- statusDesc = err.desc
- } else {
- statusCode = convertCode(appErr)
- statusDesc = appErr.Error()
+ appStatus, ok := status.FromError(appErr)
+ if !ok {
+ // Convert appErr if it is not a grpc status error.
+ appErr = status.Error(convertCode(appErr), appErr.Error())
+ appStatus, _ = status.FromError(appErr)
}
- if trInfo != nil && statusCode != codes.OK {
- trInfo.tr.LazyLog(stringer(statusDesc), true)
+ if trInfo != nil {
+ trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
trInfo.tr.SetError()
}
- if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
+ if e := t.WriteStatus(stream, appStatus); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", e)
}
- return Errorf(statusCode, statusDesc)
+ return appErr
}
if trInfo != nil {
trInfo.tr.LazyLog(stringer("OK"), false)
@@ -742,26 +775,35 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Delay: false,
}
if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
- switch err := err.(type) {
- case transport.ConnectionError:
- // Nothing to do here.
- case transport.StreamError:
- statusCode = err.Code
- statusDesc = err.Desc
- default:
- statusCode = codes.Unknown
- statusDesc = err.Error()
+ if err == io.EOF {
+ // The entire stream is done (for unary RPC only).
+ return err
+ }
+ if s, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, s); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", e)
+ }
+ } else {
+ switch st := err.(type) {
+ case transport.ConnectionError:
+ // Nothing to do here.
+ case transport.StreamError:
+ if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+ default:
+ panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
+ }
}
return err
}
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
}
- errWrite := t.WriteStatus(stream, statusCode, statusDesc)
- if statusCode != codes.OK {
- return Errorf(statusCode, statusDesc)
- }
- return errWrite
+ // TODO: Should we be logging if writing status failed here, like above?
+ // Should the logging be in WriteStatus? Should we ignore the WriteStatus
+ // error or allow the stats handler to see it?
+ return t.WriteStatus(stream, status.New(codes.OK, ""))
}
}
@@ -815,43 +857,47 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
}()
}
var appErr error
+ var server interface{}
+ if srv != nil {
+ server = srv.server
+ }
if s.opts.streamInt == nil {
- appErr = sd.Handler(srv.server, ss)
+ appErr = sd.Handler(server, ss)
} else {
info := &StreamServerInfo{
FullMethod: stream.Method(),
IsClientStream: sd.ClientStreams,
IsServerStream: sd.ServerStreams,
}
- appErr = s.opts.streamInt(srv.server, ss, info, sd.Handler)
+ appErr = s.opts.streamInt(server, ss, info, sd.Handler)
}
if appErr != nil {
- if err, ok := appErr.(*rpcError); ok {
- ss.statusCode = err.code
- ss.statusDesc = err.desc
- } else if err, ok := appErr.(transport.StreamError); ok {
- ss.statusCode = err.Code
- ss.statusDesc = err.Desc
- } else {
- ss.statusCode = convertCode(appErr)
- ss.statusDesc = appErr.Error()
+ appStatus, ok := status.FromError(appErr)
+ if !ok {
+ switch err := appErr.(type) {
+ case transport.StreamError:
+ appStatus = status.New(err.Code, err.Desc)
+ default:
+ appStatus = status.New(convertCode(appErr), appErr.Error())
+ }
+ appErr = appStatus.Err()
+ }
+ if trInfo != nil {
+ ss.mu.Lock()
+ ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
+ ss.trInfo.tr.SetError()
+ ss.mu.Unlock()
}
+ t.WriteStatus(ss.s, appStatus)
+ // TODO: Should we log an error from WriteStatus here and below?
+ return appErr
}
if trInfo != nil {
ss.mu.Lock()
- if ss.statusCode != codes.OK {
- ss.trInfo.tr.LazyLog(stringer(ss.statusDesc), true)
- ss.trInfo.tr.SetError()
- } else {
- ss.trInfo.tr.LazyLog(stringer("OK"), false)
- }
+ ss.trInfo.tr.LazyLog(stringer("OK"), false)
ss.mu.Unlock()
}
- errWrite := t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
- if ss.statusCode != codes.OK {
- return Errorf(ss.statusCode, ss.statusDesc)
- }
- return errWrite
+ return t.WriteStatus(ss.s, status.New(codes.OK, ""))
}
@@ -867,7 +913,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
- if err := t.WriteStatus(stream, codes.InvalidArgument, errDesc); err != nil {
+ if err := t.WriteStatus(stream, status.New(codes.InvalidArgument, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
@@ -883,12 +929,16 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
method := sm[pos+1:]
srv, ok := s.m[service]
if !ok {
+ if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
+ s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
+ return
+ }
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("unknown service %v", service)
- if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
+ if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
@@ -913,8 +963,12 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
trInfo.tr.SetError()
}
+ if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
+ s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
+ return
+ }
errDesc := fmt.Sprintf("unknown method %v", method)
- if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
+ if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
diff --git a/vendor/google.golang.org/grpc/stats/stats.go b/vendor/google.golang.org/grpc/stats/stats.go
index a82448a..43d6f00 100644
--- a/vendor/google.golang.org/grpc/stats/stats.go
+++ b/vendor/google.golang.org/grpc/stats/stats.go
@@ -184,7 +184,7 @@ type End struct {
Client bool
// EndTime is the time when the RPC ends.
EndTime time.Time
- // Error is the error just happened. Its type is gRPC error.
+ // Error is the error just happened. It implements status.Status if non-nil.
Error error
}
diff --git a/vendor/google.golang.org/grpc/status/status.go b/vendor/google.golang.org/grpc/status/status.go
new file mode 100644
index 0000000..99f1a09
--- /dev/null
+++ b/vendor/google.golang.org/grpc/status/status.go
@@ -0,0 +1,136 @@
+/*
+ *
+ * Copyright 2017, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+// Package status implements errors returned by gRPC. These errors are
+// serialized and transmitted on the wire between server and client, and allow
+// for additional data to be transmitted via the Details field in the status
+// proto. gRPC service handlers should return an error created by this
+// package, and gRPC clients should expect a corresponding error to be
+// returned from the RPC call.
+//
+// This package upholds the invariants that a non-nil error may not
+// contain an OK code, and an OK code must result in a nil error.
+package status
+
+import (
+ "fmt"
+
+ "github.com/golang/protobuf/proto"
+ spb "google.golang.org/genproto/googleapis/rpc/status"
+ "google.golang.org/grpc/codes"
+)
+
+// statusError is an alias of a status proto. It implements error and Status,
+// and a nil statusError should never be returned by this package.
+type statusError spb.Status
+
+func (se *statusError) Error() string {
+ p := (*spb.Status)(se)
+ return fmt.Sprintf("rpc error: code = %s desc = %s", codes.Code(p.GetCode()), p.GetMessage())
+}
+
+func (se *statusError) status() *Status {
+ return &Status{s: (*spb.Status)(se)}
+}
+
+// Status represents an RPC status code, message, and details. It is immutable
+// and should be created with New, Newf, or FromProto.
+type Status struct {
+ s *spb.Status
+}
+
+// Code returns the status code contained in s.
+func (s *Status) Code() codes.Code {
+ return codes.Code(s.s.Code)
+}
+
+// Message returns the message contained in s.
+func (s *Status) Message() string {
+ return s.s.Message
+}
+
+// Proto returns s's status as an spb.Status proto message.
+func (s *Status) Proto() *spb.Status {
+ return proto.Clone(s.s).(*spb.Status)
+}
+
+// Err returns an immutable error representing s; returns nil if s.Code() is
+// OK.
+func (s *Status) Err() error {
+ if s.Code() == codes.OK {
+ return nil
+ }
+ return (*statusError)(s.s)
+}
+
+// New returns a Status representing c and msg.
+func New(c codes.Code, msg string) *Status {
+ return &Status{s: &spb.Status{Code: int32(c), Message: msg}}
+}
+
+// Newf returns New(c, fmt.Sprintf(format, a...)).
+func Newf(c codes.Code, format string, a ...interface{}) *Status {
+ return New(c, fmt.Sprintf(format, a...))
+}
+
+// Error returns an error representing c and msg. If c is OK, returns nil.
+func Error(c codes.Code, msg string) error {
+ return New(c, msg).Err()
+}
+
+// Errorf returns Error(c, fmt.Sprintf(format, a...)).
+func Errorf(c codes.Code, format string, a ...interface{}) error {
+ return Error(c, fmt.Sprintf(format, a...))
+}
+
+// ErrorProto returns an error representing s. If s.Code is OK, returns nil.
+func ErrorProto(s *spb.Status) error {
+ return FromProto(s).Err()
+}
+
+// FromProto returns a Status representing s.
+func FromProto(s *spb.Status) *Status {
+ return &Status{s: proto.Clone(s).(*spb.Status)}
+}
+
+// FromError returns a Status representing err if it was produced from this
+// package, otherwise it returns nil, false.
+func FromError(err error) (s *Status, ok bool) {
+ if err == nil {
+ return &Status{s: &spb.Status{Code: int32(codes.OK)}}, true
+ }
+ if s, ok := err.(*statusError); ok {
+ return s.status(), true
+ }
+ return nil, false
+}
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index bb468dc..ecb1a31 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -37,7 +37,6 @@ import (
"bytes"
"errors"
"io"
- "math"
"sync"
"time"
@@ -46,6 +45,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
+ "google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)
@@ -178,7 +178,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
- if _, ok := err.(*rpcError); ok {
+ if _, ok := status.FromError(err); ok {
return nil, err
}
if err == errConnClosing || err == errConnUnavailable {
@@ -208,13 +208,14 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
break
}
cs := &clientStream{
- opts: opts,
- c: c,
- desc: desc,
- codec: cc.dopts.codec,
- cp: cc.dopts.cp,
- dc: cc.dopts.dc,
- cancel: cancel,
+ opts: opts,
+ c: c,
+ desc: desc,
+ codec: cc.dopts.codec,
+ cp: cc.dopts.cp,
+ dc: cc.dopts.dc,
+ maxMsgSize: cc.dopts.maxMsgSize,
+ cancel: cancel,
put: put,
t: t,
@@ -239,11 +240,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
case <-s.Done():
// TODO: The trace of the RPC is terminated here when there is no pending
// I/O, which is probably not the optimal solution.
- if s.StatusCode() == codes.OK {
- cs.finish(nil)
- } else {
- cs.finish(Errorf(s.StatusCode(), "%s", s.StatusDesc()))
- }
+ cs.finish(s.Status().Err())
cs.closeTransportStream(nil)
case <-s.GoAway():
cs.finish(errConnDrain)
@@ -259,17 +256,18 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
// clientStream implements a client side Stream.
type clientStream struct {
- opts []CallOption
- c callInfo
- t transport.ClientTransport
- s *transport.Stream
- p *parser
- desc *StreamDesc
- codec Codec
- cp Compressor
- cbuf *bytes.Buffer
- dc Decompressor
- cancel context.CancelFunc
+ opts []CallOption
+ c callInfo
+ t transport.ClientTransport
+ s *transport.Stream
+ p *parser
+ desc *StreamDesc
+ codec Codec
+ cp Compressor
+ cbuf *bytes.Buffer
+ dc Decompressor
+ maxMsgSize int
+ cancel context.CancelFunc
tracing bool // set to EnableTracing when the clientStream is created.
@@ -382,7 +380,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
Client: true,
}
}
- err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inPayload)
+ err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, inPayload)
defer func() {
// err != nil indicates the termination of the stream.
if err != nil {
@@ -405,17 +403,17 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
}
// Special handling for client streaming rpc.
// This recv expects EOF or errors, so we don't collect inPayload.
- err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, nil)
+ err = recv(cs.p, cs.codec, cs.s, cs.dc, m, cs.maxMsgSize, nil)
cs.closeTransportStream(err)
if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
}
if err == io.EOF {
- if cs.s.StatusCode() == codes.OK {
- cs.finish(err)
- return nil
+ if se := cs.s.Status().Err(); se != nil {
+ return se
}
- return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
+ cs.finish(err)
+ return nil
}
return toRPCErr(err)
}
@@ -423,11 +421,11 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
cs.closeTransportStream(err)
}
if err == io.EOF {
- if cs.s.StatusCode() == codes.OK {
- // Returns io.EOF to indicate the end of the stream.
- return
+ if statusErr := cs.s.Status().Err(); statusErr != nil {
+ return statusErr
}
- return Errorf(cs.s.StatusCode(), "%s", cs.s.StatusDesc())
+ // Returns io.EOF to indicate the end of the stream.
+ return
}
return toRPCErr(err)
}
@@ -519,8 +517,6 @@ type serverStream struct {
dc Decompressor
cbuf *bytes.Buffer
maxMsgSize int
- statusCode codes.Code
- statusDesc string
trInfo *traceInfo
statsHandler stats.Handler
diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/control.go
index 2586cba..8d29aee 100644
--- a/vendor/google.golang.org/grpc/transport/control.go
+++ b/vendor/google.golang.org/grpc/transport/control.go
@@ -35,7 +35,9 @@ package transport
import (
"fmt"
+ "math"
"sync"
+ "time"
"golang.org/x/net/http2"
)
@@ -44,8 +46,18 @@ const (
// The default value of flow control window size in HTTP2 spec.
defaultWindowSize = 65535
// The initial window size for flow control.
- initialWindowSize = defaultWindowSize // for an RPC
- initialConnWindowSize = defaultWindowSize * 16 // for a connection
+ initialWindowSize = defaultWindowSize // for an RPC
+ initialConnWindowSize = defaultWindowSize * 16 // for a connection
+ infinity = time.Duration(math.MaxInt64)
+ defaultClientKeepaliveTime = infinity
+ defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
+ defaultMaxStreamsClient = 100
+ defaultMaxConnectionIdle = infinity
+ defaultMaxConnectionAge = infinity
+ defaultMaxConnectionAgeGrace = infinity
+ defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
+ defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
+ defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
)
// The following defines various control items which could flow through
@@ -73,6 +85,8 @@ type resetStream struct {
func (*resetStream) item() {}
type goAway struct {
+ code http2.ErrCode
+ debugData []byte
}
func (*goAway) item() {}
diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/transport/handler_server.go
index 10b6dc0..28c9ce0 100644
--- a/vendor/google.golang.org/grpc/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/transport/handler_server.go
@@ -53,6 +53,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
+ "google.golang.org/grpc/status"
)
// NewServerHandlerTransport returns a ServerTransport handling gRPC
@@ -182,7 +183,7 @@ func (ht *serverHandlerTransport) do(fn func()) error {
}
}
-func (ht *serverHandlerTransport) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
+func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
err := ht.do(func() {
ht.writeCommonHeaders(s)
@@ -192,10 +193,13 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, statusCode codes.Code,
ht.rw.(http.Flusher).Flush()
h := ht.rw.Header()
- h.Set("Grpc-Status", fmt.Sprintf("%d", statusCode))
- if statusDesc != "" {
- h.Set("Grpc-Message", encodeGrpcMessage(statusDesc))
+ h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
+ if m := st.Message(); m != "" {
+ h.Set("Grpc-Message", encodeGrpcMessage(m))
}
+
+ // TODO: Support Grpc-Status-Details-Bin
+
if md := s.Trailer(); len(md) > 0 {
for k, vv := range md {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
@@ -234,6 +238,7 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
// and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
h.Add("Trailer", "Grpc-Status")
h.Add("Trailer", "Grpc-Message")
+ // TODO: Support Grpc-Status-Details-Bin
if s.sendCompress != "" {
h.Set("Grpc-Encoding", s.sendCompress)
@@ -314,7 +319,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
if req.TLS != nil {
pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
}
- ctx = metadata.NewContext(ctx, ht.headerMD)
+ ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
ctx = peer.NewContext(ctx, pr)
s.ctx = newContextWithStream(ctx, s)
s.dec = &recvBufferReader{ctx: s.ctx, recv: s.buf}
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go
index 892f8ba..5fc6b75 100644
--- a/vendor/google.golang.org/grpc/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/transport/http2_client.go
@@ -35,12 +35,12 @@ package transport
import (
"bytes"
- "fmt"
"io"
"math"
"net"
"strings"
"sync"
+ "sync/atomic"
"time"
"golang.org/x/net/context"
@@ -49,9 +49,11 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
+ "google.golang.org/grpc/status"
)
// http2Client implements the ClientTransport interface with HTTP2.
@@ -80,6 +82,8 @@ type http2Client struct {
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
+ // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
+ awakenKeepalive chan struct{}
framer *framer
hBuf *bytes.Buffer // the buffer for HPACK encoding
@@ -99,6 +103,11 @@ type http2Client struct {
creds []credentials.PerRPCCredentials
+ // Boolean to keep track of reading activity on transport.
+ // 1 is true and 0 is false.
+ activity uint32 // Accessed atomically.
+ kp keepalive.ClientParameters
+
statsHandler stats.Handler
mu sync.Mutex // guard the following variables
@@ -178,15 +187,19 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
return nil, connectionErrorf(temp, err, "transport: %v", err)
}
}
- ua := primaryUA
- if opts.UserAgent != "" {
- ua = opts.UserAgent + " " + ua
+ kp := opts.KeepaliveParams
+ // Validate keepalive parameters.
+ if kp.Time == 0 {
+ kp.Time = defaultClientKeepaliveTime
+ }
+ if kp.Timeout == 0 {
+ kp.Timeout = defaultClientKeepaliveTimeout
}
var buf bytes.Buffer
t := &http2Client{
ctx: ctx,
target: addr.Addr,
- userAgent: ua,
+ userAgent: opts.UserAgent,
md: addr.Metadata,
conn: conn,
remoteAddr: conn.RemoteAddr(),
@@ -198,6 +211,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
shutdownChan: make(chan struct{}),
errorChan: make(chan struct{}),
goAway: make(chan struct{}),
+ awakenKeepalive: make(chan struct{}, 1),
framer: newFramer(conn),
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
@@ -208,10 +222,15 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
state: reachable,
activeStreams: make(map[uint32]*Stream),
creds: opts.PerRPCCredentials,
- maxStreams: math.MaxInt32,
+ maxStreams: defaultMaxStreamsClient,
+ streamsQuota: newQuotaPool(defaultMaxStreamsClient),
streamSendQuota: defaultWindowSize,
+ kp: kp,
statsHandler: opts.StatsHandler,
}
+ // Make sure awakenKeepalive can't be written upon.
+ // keepalive routine will make it writable, if need be.
+ t.awakenKeepalive <- struct{}{}
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
@@ -256,6 +275,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
}
}
go t.controller()
+ if t.kp.Time != infinity {
+ go t.keepalive()
+ }
t.writableChan <- 0
return t, nil
}
@@ -289,7 +311,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
return s
}
-// NewStream creates a stream and register it into the transport as "active"
+// NewStream creates a stream and registers it into the transport as "active"
// streams.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
pr := &peer.Peer{
@@ -337,21 +359,18 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.mu.Unlock()
return nil, ErrConnClosing
}
- checkStreamsQuota := t.streamsQuota != nil
t.mu.Unlock()
- if checkStreamsQuota {
- sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
- if err != nil {
- return nil, err
- }
- // Returns the quota balance back.
- if sq > 1 {
- t.streamsQuota.add(sq - 1)
- }
+ sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
+ if err != nil {
+ return nil, err
+ }
+ // Returns the quota balance back.
+ if sq > 1 {
+ t.streamsQuota.add(sq - 1)
}
if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
// Return the quota back now because there is no stream returned to the caller.
- if _, ok := err.(StreamError); ok && checkStreamsQuota {
+ if _, ok := err.(StreamError); ok {
t.streamsQuota.add(1)
}
return nil, err
@@ -359,9 +378,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.mu.Lock()
if t.state == draining {
t.mu.Unlock()
- if checkStreamsQuota {
- t.streamsQuota.add(1)
- }
+ t.streamsQuota.add(1)
// Need to make t writable again so that the rpc in flight can still proceed.
t.writableChan <- 0
return nil, ErrStreamDrain
@@ -373,17 +390,17 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
s := t.newStream(ctx, callHdr)
s.clientStatsCtx = userCtx
t.activeStreams[s.id] = s
-
- // This stream is not counted when applySetings(...) initialize t.streamsQuota.
- // Reset t.streamsQuota to the right value.
- var reset bool
- if !checkStreamsQuota && t.streamsQuota != nil {
- reset = true
+ // If the number of active streams change from 0 to 1, then check if keepalive
+ // has gone dormant. If so, wake it up.
+ if len(t.activeStreams) == 1 {
+ select {
+ case t.awakenKeepalive <- struct{}{}:
+ t.framer.writePing(false, false, [8]byte{})
+ default:
+ }
}
+
t.mu.Unlock()
- if reset {
- t.streamsQuota.add(-1)
- }
// HPACK encodes various headers. Note that once WriteField(...) is
// called, the corresponding headers/continuation frame has to be sent
@@ -415,7 +432,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
hasMD bool
endHeaders bool
)
- if md, ok := metadata.FromContext(ctx); ok {
+ if md, ok := metadata.FromOutgoingContext(ctx); ok {
hasMD = true
for k, v := range md {
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
@@ -491,15 +508,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
// CloseStream clears the footprint of a stream when the stream is not needed any more.
// This must not be executed in reader's goroutine.
func (t *http2Client) CloseStream(s *Stream, err error) {
- var updateStreams bool
t.mu.Lock()
if t.activeStreams == nil {
t.mu.Unlock()
return
}
- if t.streamsQuota != nil {
- updateStreams = true
- }
delete(t.activeStreams, s.id)
if t.state == draining && len(t.activeStreams) == 0 {
// The transport is draining and s is the last live stream on t.
@@ -508,10 +521,27 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
return
}
t.mu.Unlock()
- if updateStreams {
- t.streamsQuota.add(1)
- }
+ // rstStream is true in case the stream is being closed at the client-side
+ // and the server needs to be intimated about it by sending a RST_STREAM
+ // frame.
+ // To make sure this frame is written to the wire before the headers of the
+ // next stream waiting for streamsQuota, we add to streamsQuota pool only
+ // after having acquired the writableChan to send RST_STREAM out (look at
+ // the controller() routine).
+ var rstStream bool
+ var rstError http2.ErrCode
+ defer func() {
+ // In case, the client doesn't have to send RST_STREAM to server
+ // we can safely add back to streamsQuota pool now.
+ if !rstStream {
+ t.streamsQuota.add(1)
+ return
+ }
+ t.controlBuf.put(&resetStream{s.id, rstError})
+ }()
s.mu.Lock()
+ rstStream = s.rstStream
+ rstError = s.rstError
if q := s.fc.resetPendingData(); q > 0 {
if n := t.fc.onRead(q); n > 0 {
t.controlBuf.put(&windowUpdate{0, n})
@@ -527,8 +557,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
}
s.state = streamDone
s.mu.Unlock()
- if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded {
- t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
+ if _, ok := err.(StreamError); ok {
+ rstStream = true
+ rstError = http2.ErrCodeCancel
}
}
@@ -742,7 +773,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
}
func (t *http2Client) handleData(f *http2.DataFrame) {
- size := len(f.Data())
+ size := f.Header().Length
if err := t.fc.onData(uint32(size)); err != nil {
t.notifyError(connectionErrorf(true, err, "%v", err))
return
@@ -756,6 +787,11 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
return
}
if size > 0 {
+ if f.Header().Flags.Has(http2.FlagDataPadded) {
+ if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
+ t.controlBuf.put(&windowUpdate{0, w})
+ }
+ }
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
@@ -766,22 +802,27 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
return
}
if err := s.fc.onData(uint32(size)); err != nil {
- s.state = streamDone
- s.statusCode = codes.Internal
- s.statusDesc = err.Error()
- close(s.done)
+ s.rstStream = true
+ s.rstError = http2.ErrCodeFlowControl
+ s.finish(status.New(codes.Internal, err.Error()))
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
- t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return
}
+ if f.Header().Flags.Has(http2.FlagDataPadded) {
+ if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
+ t.controlBuf.put(&windowUpdate{s.id, w})
+ }
+ }
s.mu.Unlock()
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
- data := make([]byte, size)
- copy(data, f.Data())
- s.write(recvMsg{data: data})
+ if len(f.Data()) > 0 {
+ data := make([]byte, len(f.Data()))
+ copy(data, f.Data())
+ s.write(recvMsg{data: data})
+ }
}
// The server has closed the stream without sending trailers. Record that
// the read direction is closed, and set the status appropriately.
@@ -791,10 +832,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
s.mu.Unlock()
return
}
- s.state = streamDone
- s.statusCode = codes.Internal
- s.statusDesc = "server closed the stream without sending trailers"
- close(s.done)
+ s.finish(status.New(codes.Internal, "server closed the stream without sending trailers"))
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
}
@@ -810,18 +848,16 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
s.mu.Unlock()
return
}
- s.state = streamDone
if !s.headerDone {
close(s.headerChan)
s.headerDone = true
}
- s.statusCode, ok = http2ErrConvTab[http2.ErrCode(f.ErrCode)]
+ statusCode, ok := http2ErrConvTab[http2.ErrCode(f.ErrCode)]
if !ok {
grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode)
- s.statusCode = codes.Unknown
+ statusCode = codes.Unknown
}
- s.statusDesc = fmt.Sprintf("stream terminated by RST_STREAM with error code: %d", f.ErrCode)
- close(s.done)
+ s.finish(status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %d", f.ErrCode))
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
}
@@ -849,6 +885,9 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
}
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
+ if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
+ grpclog.Printf("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
+ }
t.mu.Lock()
if t.state == reachable || t.state == draining {
if f.LastStreamID > 0 && f.LastStreamID%2 != 1 {
@@ -897,18 +936,17 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
var state decodeState
for _, hf := range frame.Fields {
- state.processHeaderField(hf)
- }
- if state.err != nil {
- s.mu.Lock()
- if !s.headerDone {
- close(s.headerChan)
- s.headerDone = true
+ if err := state.processHeaderField(hf); err != nil {
+ s.mu.Lock()
+ if !s.headerDone {
+ close(s.headerChan)
+ s.headerDone = true
+ }
+ s.mu.Unlock()
+ s.write(recvMsg{err: err})
+ // Something wrong. Stops reading even when there is remaining.
+ return
}
- s.mu.Unlock()
- s.write(recvMsg{err: state.err})
- // Something wrong. Stops reading even when there is remaining.
- return
}
endStream := frame.StreamEnded()
@@ -951,10 +989,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
if len(state.mdata) > 0 {
s.trailer = state.mdata
}
- s.statusCode = state.statusCode
- s.statusDesc = state.statusDesc
- close(s.done)
- s.state = streamDone
+ s.finish(state.status())
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
}
@@ -982,6 +1017,7 @@ func (t *http2Client) reader() {
t.notifyError(err)
return
}
+ atomic.CompareAndSwapUint32(&t.activity, 0, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.notifyError(err)
@@ -992,6 +1028,7 @@ func (t *http2Client) reader() {
// loop to keep reading incoming messages on this transport.
for {
frame, err := t.framer.readFrame()
+ atomic.CompareAndSwapUint32(&t.activity, 0, 1)
if err != nil {
// Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response
@@ -1043,16 +1080,10 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
s.Val = math.MaxInt32
}
t.mu.Lock()
- reset := t.streamsQuota != nil
- if !reset {
- t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams))
- }
ms := t.maxStreams
t.maxStreams = int(s.Val)
t.mu.Unlock()
- if reset {
- t.streamsQuota.add(int(s.Val) - ms)
- }
+ t.streamsQuota.add(int(s.Val) - ms)
case http2.SettingInitialWindowSize:
t.mu.Lock()
for _, stream := range t.activeStreams {
@@ -1085,6 +1116,12 @@ func (t *http2Client) controller() {
t.framer.writeSettings(true, i.ss...)
}
case *resetStream:
+ // If the server needs to be to intimated about stream closing,
+ // then we need to make sure the RST_STREAM frame is written to
+ // the wire before the headers of the next stream waiting on
+ // streamQuota. We ensure this by adding to the streamsQuota pool
+ // only after having acquired the writableChan to send RST_STREAM.
+ t.streamsQuota.add(1)
t.framer.writeRSTStream(true, i.streamID, i.code)
case *flushIO:
t.framer.flushWrite()
@@ -1104,6 +1141,61 @@ func (t *http2Client) controller() {
}
}
+// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
+func (t *http2Client) keepalive() {
+ p := &ping{data: [8]byte{}}
+ timer := time.NewTimer(t.kp.Time)
+ for {
+ select {
+ case <-timer.C:
+ if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
+ timer.Reset(t.kp.Time)
+ continue
+ }
+ // Check if keepalive should go dormant.
+ t.mu.Lock()
+ if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
+ // Make awakenKeepalive writable.
+ <-t.awakenKeepalive
+ t.mu.Unlock()
+ select {
+ case <-t.awakenKeepalive:
+ // If the control gets here a ping has been sent
+ // need to reset the timer with keepalive.Timeout.
+ case <-t.shutdownChan:
+ return
+ }
+ } else {
+ t.mu.Unlock()
+ // Send ping.
+ t.controlBuf.put(p)
+ }
+
+ // By the time control gets here a ping has been sent one way or the other.
+ timer.Reset(t.kp.Timeout)
+ select {
+ case <-timer.C:
+ if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
+ timer.Reset(t.kp.Time)
+ continue
+ }
+ t.Close()
+ return
+ case <-t.shutdownChan:
+ if !timer.Stop() {
+ <-timer.C
+ }
+ return
+ }
+ case <-t.shutdownChan:
+ if !timer.Stop() {
+ <-timer.C
+ }
+ return
+ }
+ }
+}
+
func (t *http2Client) Error() <-chan struct{} {
return t.errorChan
}
diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go
index a095dd0..31fefc7 100644
--- a/vendor/google.golang.org/grpc/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/transport/http2_server.go
@@ -38,19 +38,25 @@ import (
"errors"
"io"
"math"
+ "math/rand"
"net"
"strconv"
"sync"
+ "sync/atomic"
+ "time"
+ "github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
+ "google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
)
@@ -90,11 +96,33 @@ type http2Server struct {
stats stats.Handler
+ // Flag to keep track of reading activity on transport.
+ // 1 is true and 0 is false.
+ activity uint32 // Accessed atomically.
+ // Keepalive and max-age parameters for the server.
+ kp keepalive.ServerParameters
+
+ // Keepalive enforcement policy.
+ kep keepalive.EnforcementPolicy
+ // The time instance last ping was received.
+ lastPingAt time.Time
+ // Number of times the client has violated keepalive ping policy so far.
+ pingStrikes uint8
+ // Flag to signify that number of ping strikes should be reset to 0.
+ // This is set whenever data or header frames are sent.
+ // 1 means yes.
+ resetPingStrikes uint32 // Accessed atomically.
+
mu sync.Mutex // guard the following
state transportState
activeStreams map[uint32]*Stream
// the per-stream outbound flow control window size set by the peer.
streamSendQuota uint32
+ // idle is the time instant when the connection went idle.
+ // This is either the begining of the connection or when the number of
+ // RPCs go down to 0.
+ // When the connection is busy, this value is set to 0.
+ idle time.Time
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -128,6 +156,28 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
+ kp := config.KeepaliveParams
+ if kp.MaxConnectionIdle == 0 {
+ kp.MaxConnectionIdle = defaultMaxConnectionIdle
+ }
+ if kp.MaxConnectionAge == 0 {
+ kp.MaxConnectionAge = defaultMaxConnectionAge
+ }
+ // Add a jitter to MaxConnectionAge.
+ kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
+ if kp.MaxConnectionAgeGrace == 0 {
+ kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
+ }
+ if kp.Time == 0 {
+ kp.Time = defaultServerKeepaliveTime
+ }
+ if kp.Timeout == 0 {
+ kp.Timeout = defaultServerKeepaliveTimeout
+ }
+ kep := config.KeepalivePolicy
+ if kep.MinTime == 0 {
+ kep.MinTime = defaultKeepalivePolicyMinTime
+ }
var buf bytes.Buffer
t := &http2Server{
ctx: context.Background(),
@@ -149,6 +199,9 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
activeStreams: make(map[uint32]*Stream),
streamSendQuota: defaultWindowSize,
stats: config.StatsHandler,
+ kp: kp,
+ idle: time.Now(),
+ kep: kep,
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
@@ -159,6 +212,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
t.stats.HandleConn(t.ctx, connBegin)
}
go t.controller()
+ go t.keepalive()
t.writableChan <- 0
return t, nil
}
@@ -175,13 +229,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
var state decodeState
for _, hf := range frame.Fields {
- state.processHeaderField(hf)
- }
- if err := state.err; err != nil {
- if se, ok := err.(StreamError); ok {
- t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
+ if err := state.processHeaderField(hf); err != nil {
+ if se, ok := err.(StreamError); ok {
+ t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
+ }
+ return
}
- return
}
if frame.StreamEnded() {
@@ -208,7 +261,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.ctx = newContextWithStream(s.ctx, s)
// Attach the received metadata to the context.
if len(state.mdata) > 0 {
- s.ctx = metadata.NewContext(s.ctx, state.mdata)
+ s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
}
s.dec = &recvBufferReader{
@@ -248,6 +301,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.maxStreamID = s.id
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
t.activeStreams[s.id] = s
+ if len(t.activeStreams) == 1 {
+ t.idle = time.Time{}
+ }
t.mu.Unlock()
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
@@ -295,6 +351,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
t.Close()
return
}
+ atomic.StoreUint32(&t.activity, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
@@ -305,6 +362,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
for {
frame, err := t.framer.readFrame()
+ atomic.StoreUint32(&t.activity, 1)
if err != nil {
if se, ok := err.(http2.StreamError); ok {
t.mu.Lock()
@@ -381,7 +439,7 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) {
}
func (t *http2Server) handleData(f *http2.DataFrame) {
- size := len(f.Data())
+ size := f.Header().Length
if err := t.fc.onData(uint32(size)); err != nil {
grpclog.Printf("transport: http2Server %v", err)
t.Close()
@@ -396,6 +454,11 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
return
}
if size > 0 {
+ if f.Header().Flags.Has(http2.FlagDataPadded) {
+ if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
+ t.controlBuf.put(&windowUpdate{0, w})
+ }
+ }
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
@@ -411,13 +474,20 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return
}
+ if f.Header().Flags.Has(http2.FlagDataPadded) {
+ if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
+ t.controlBuf.put(&windowUpdate{s.id, w})
+ }
+ }
s.mu.Unlock()
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
- data := make([]byte, size)
- copy(data, f.Data())
- s.write(recvMsg{data: data})
+ if len(f.Data()) > 0 {
+ data := make([]byte, len(f.Data()))
+ copy(data, f.Data())
+ s.write(recvMsg{data: data})
+ }
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
// Received the end of stream from the client.
@@ -451,6 +521,11 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
t.controlBuf.put(&settings{ack: true, ss: ss})
}
+const (
+ maxPingStrikes = 2
+ defaultPingTimeout = 2 * time.Hour
+)
+
func (t *http2Server) handlePing(f *http2.PingFrame) {
if f.IsAck() { // Do nothing.
return
@@ -458,6 +533,38 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
pingAck := &ping{ack: true}
copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck)
+
+ now := time.Now()
+ defer func() {
+ t.lastPingAt = now
+ }()
+ // A reset ping strikes means that we don't need to check for policy
+ // violation for this ping and the pingStrikes counter should be set
+ // to 0.
+ if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
+ t.pingStrikes = 0
+ return
+ }
+ t.mu.Lock()
+ ns := len(t.activeStreams)
+ t.mu.Unlock()
+ if ns < 1 && !t.kep.PermitWithoutStream {
+ // Keepalive shouldn't be active thus, this new ping should
+ // have come after atleast defaultPingTimeout.
+ if t.lastPingAt.Add(defaultPingTimeout).After(now) {
+ t.pingStrikes++
+ }
+ } else {
+ // Check if keepalive policy is respected.
+ if t.lastPingAt.Add(t.kep.MinTime).After(now) {
+ t.pingStrikes++
+ }
+ }
+
+ if t.pingStrikes > maxPingStrikes {
+ // Send goaway and close the connection.
+ t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings")})
+ }
}
func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
@@ -476,6 +583,13 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e
first := true
endHeaders := false
var err error
+ defer func() {
+ if err == nil {
+ // Reset ping strikes when seding headers since that might cause the
+ // peer to send ping.
+ atomic.StoreUint32(&t.resetPingStrikes, 1)
+ }
+ }()
// Sends the headers in a single batch.
for !endHeaders {
size := t.hBuf.Len()
@@ -557,7 +671,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
// There is no further I/O operations being able to perform on this stream.
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
-func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
+func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
var headersSent, hasHeader bool
s.mu.Lock()
if s.state == streamDone {
@@ -588,9 +702,24 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
t.hEnc.WriteField(
hpack.HeaderField{
Name: "grpc-status",
- Value: strconv.Itoa(int(statusCode)),
+ Value: strconv.Itoa(int(st.Code())),
})
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(statusDesc)})
+ t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
+
+ if p := st.Proto(); p != nil && len(p.Details) > 0 {
+ stBytes, err := proto.Marshal(p)
+ if err != nil {
+ // TODO: return error instead, when callers are able to handle it.
+ panic(err)
+ }
+
+ for k, v := range metadata.New(map[string]string{"grpc-status-details-bin": (string)(stBytes)}) {
+ for _, entry := range v {
+ t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
+ }
+ }
+ }
+
// Attach the trailer metadata.
for k, v := range s.trailer {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
@@ -619,7 +748,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
-func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
+func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
// TODO(zhaoq): Support multi-writers for a single stream.
var writeHeaderFrame bool
s.mu.Lock()
@@ -634,6 +763,13 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
if writeHeaderFrame {
t.WriteHeader(s, nil)
}
+ defer func() {
+ if err == nil {
+ // Reset ping strikes when sending data since this might cause
+ // the peer to send ping.
+ atomic.StoreUint32(&t.resetPingStrikes, 1)
+ }
+ }()
r := bytes.NewBuffer(data)
for {
if r.Len() == 0 {
@@ -723,6 +859,91 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
}
}
+// keepalive running in a separate goroutine does the following:
+// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
+// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
+// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
+// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-resposive connection
+// after an additional duration of keepalive.Timeout.
+func (t *http2Server) keepalive() {
+ p := &ping{}
+ var pingSent bool
+ maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
+ maxAge := time.NewTimer(t.kp.MaxConnectionAge)
+ keepalive := time.NewTimer(t.kp.Time)
+ // NOTE: All exit paths of this function should reset their
+ // respecitve timers. A failure to do so will cause the
+ // following clean-up to deadlock and eventually leak.
+ defer func() {
+ if !maxIdle.Stop() {
+ <-maxIdle.C
+ }
+ if !maxAge.Stop() {
+ <-maxAge.C
+ }
+ if !keepalive.Stop() {
+ <-keepalive.C
+ }
+ }()
+ for {
+ select {
+ case <-maxIdle.C:
+ t.mu.Lock()
+ idle := t.idle
+ if idle.IsZero() { // The connection is non-idle.
+ t.mu.Unlock()
+ maxIdle.Reset(t.kp.MaxConnectionIdle)
+ continue
+ }
+ val := t.kp.MaxConnectionIdle - time.Since(idle)
+ if val <= 0 {
+ // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
+ // Gracefully close the connection.
+ t.state = draining
+ t.mu.Unlock()
+ t.Drain()
+ // Reseting the timer so that the clean-up doesn't deadlock.
+ maxIdle.Reset(infinity)
+ return
+ }
+ t.mu.Unlock()
+ maxIdle.Reset(val)
+ case <-maxAge.C:
+ t.mu.Lock()
+ t.state = draining
+ t.mu.Unlock()
+ t.Drain()
+ maxAge.Reset(t.kp.MaxConnectionAgeGrace)
+ select {
+ case <-maxAge.C:
+ // Close the connection after grace period.
+ t.Close()
+ // Reseting the timer so that the clean-up doesn't deadlock.
+ maxAge.Reset(infinity)
+ case <-t.shutdownChan:
+ }
+ return
+ case <-keepalive.C:
+ if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
+ pingSent = false
+ keepalive.Reset(t.kp.Time)
+ continue
+ }
+ if pingSent {
+ t.Close()
+ // Reseting the timer so that the clean-up doesn't deadlock.
+ keepalive.Reset(infinity)
+ return
+ }
+ pingSent = true
+ t.controlBuf.put(p)
+ keepalive.Reset(t.kp.Timeout)
+ case <-t.shutdownChan:
+ return
+ }
+ }
+}
+
// controller running in a separate goroutine takes charge of sending control
// frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Server) controller() {
@@ -754,7 +975,10 @@ func (t *http2Server) controller() {
sid := t.maxStreamID
t.state = draining
t.mu.Unlock()
- t.framer.writeGoAway(true, sid, http2.ErrCodeNo, nil)
+ t.framer.writeGoAway(true, sid, i.code, i.debugData)
+ if i.code == http2.ErrCodeEnhanceYourCalm {
+ t.Close()
+ }
case *flushIO:
t.framer.flushWrite()
case *ping:
@@ -804,6 +1028,9 @@ func (t *http2Server) Close() (err error) {
func (t *http2Server) closeStream(s *Stream) {
t.mu.Lock()
delete(t.activeStreams, s.id)
+ if len(t.activeStreams) == 0 {
+ t.idle = time.Now()
+ }
if t.state == draining && len(t.activeStreams) == 0 {
defer t.Close()
}
@@ -831,5 +1058,17 @@ func (t *http2Server) RemoteAddr() net.Addr {
}
func (t *http2Server) Drain() {
- t.controlBuf.put(&goAway{})
+ t.controlBuf.put(&goAway{code: http2.ErrCodeNo})
+}
+
+var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
+
+func getJitter(v time.Duration) time.Duration {
+ if v == infinity {
+ return 0
+ }
+ // Generate a jitter between +/- 10% of the value.
+ r := int64(v / 10)
+ j := rgen.Int63n(2*r) - r
+ return time.Duration(j)
}
diff --git a/vendor/google.golang.org/grpc/transport/http_util.go b/vendor/google.golang.org/grpc/transport/http_util.go
index a3c68d4..6adef77 100644
--- a/vendor/google.golang.org/grpc/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/transport/http_util.go
@@ -44,16 +44,17 @@ import (
"sync/atomic"
"time"
+ "github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
+ spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/status"
)
const (
- // The primary user agent
- primaryUA = "grpc-go/1.0"
// http2MaxFrameLen specifies the max length of a HTTP2 frame.
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues
@@ -92,13 +93,15 @@ var (
// Records the states during HPACK decoding. Must be reset once the
// decoding of the entire headers are finished.
type decodeState struct {
- err error // first error encountered decoding
-
encoding string
- // statusCode caches the stream status received from the trailer
- // the server sent. Client side only.
- statusCode codes.Code
- statusDesc string
+ // statusGen caches the stream status received from the trailer the server
+ // sent. Client side only. Do not access directly. After all trailers are
+ // parsed, use the status method to retrieve the status.
+ statusGen *status.Status
+ // rawStatusCode and rawStatusMsg are set from the raw trailer fields and are not
+ // intended for direct access outside of parsing.
+ rawStatusCode int32
+ rawStatusMsg string
// Server side only fields.
timeoutSet bool
timeout time.Duration
@@ -121,6 +124,7 @@ func isReservedHeader(hdr string) bool {
"grpc-message",
"grpc-status",
"grpc-timeout",
+ "grpc-status-details-bin",
"te":
return true
default:
@@ -139,12 +143,6 @@ func isWhitelistedPseudoHeader(hdr string) bool {
}
}
-func (d *decodeState) setErr(err error) {
- if d.err == nil {
- d.err = err
- }
-}
-
func validContentType(t string) bool {
e := "application/grpc"
if !strings.HasPrefix(t, e) {
@@ -158,56 +156,62 @@ func validContentType(t string) bool {
return true
}
-func (d *decodeState) processHeaderField(f hpack.HeaderField) {
+func (d *decodeState) status() *status.Status {
+ if d.statusGen == nil {
+ // No status-details were provided; generate status using code/msg.
+ d.statusGen = status.New(codes.Code(d.rawStatusCode), d.rawStatusMsg)
+ }
+ return d.statusGen
+}
+
+func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
switch f.Name {
case "content-type":
if !validContentType(f.Value) {
- d.setErr(streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
- return
+ return streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value)
}
case "grpc-encoding":
d.encoding = f.Value
case "grpc-status":
code, err := strconv.Atoi(f.Value)
if err != nil {
- d.setErr(streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
- return
+ return streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err)
}
- d.statusCode = codes.Code(code)
+ d.rawStatusCode = int32(code)
case "grpc-message":
- d.statusDesc = decodeGrpcMessage(f.Value)
+ d.rawStatusMsg = decodeGrpcMessage(f.Value)
+ case "grpc-status-details-bin":
+ _, v, err := metadata.DecodeKeyValue("grpc-status-details-bin", f.Value)
+ if err != nil {
+ return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
+ }
+ s := &spb.Status{}
+ if err := proto.Unmarshal([]byte(v), s); err != nil {
+ return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
+ }
+ d.statusGen = status.FromProto(s)
case "grpc-timeout":
d.timeoutSet = true
var err error
- d.timeout, err = decodeTimeout(f.Value)
- if err != nil {
- d.setErr(streamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
- return
+ if d.timeout, err = decodeTimeout(f.Value); err != nil {
+ return streamErrorf(codes.Internal, "transport: malformed time-out: %v", err)
}
case ":path":
d.method = f.Value
default:
if !isReservedHeader(f.Name) || isWhitelistedPseudoHeader(f.Name) {
- if f.Name == "user-agent" {
- i := strings.LastIndex(f.Value, " ")
- if i == -1 {
- // There is no application user agent string being set.
- return
- }
- // Extract the application user agent string.
- f.Value = f.Value[:i]
- }
if d.mdata == nil {
d.mdata = make(map[string][]string)
}
k, v, err := metadata.DecodeKeyValue(f.Name, f.Value)
if err != nil {
grpclog.Printf("Failed to decode (%q, %q): %v", f.Name, f.Value, err)
- return
+ return nil
}
d.mdata[k] = append(d.mdata[k], v)
}
}
+ return nil
}
type timeoutUnit uint8
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go
index caee54a..b62f702 100644
--- a/vendor/google.golang.org/grpc/transport/transport.go
+++ b/vendor/google.golang.org/grpc/transport/transport.go
@@ -45,10 +45,13 @@ import (
"sync"
"golang.org/x/net/context"
+ "golang.org/x/net/http2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
+ "google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
)
@@ -210,9 +213,13 @@ type Stream struct {
// true iff headerChan is closed. Used to avoid closing headerChan
// multiple times.
headerDone bool
- // the status received from the server.
- statusCode codes.Code
- statusDesc string
+ // the status error received from the server.
+ status *status.Status
+ // rstStream indicates whether a RST_STREAM frame needs to be sent
+ // to the server to signify that this stream is closing.
+ rstStream bool
+ // rstError is the error that needs to be sent along with the RST_STREAM frame.
+ rstError http2.ErrCode
}
// RecvCompress returns the compression algorithm applied to the inbound
@@ -277,14 +284,9 @@ func (s *Stream) Method() string {
return s.method
}
-// StatusCode returns statusCode received from the server.
-func (s *Stream) StatusCode() codes.Code {
- return s.statusCode
-}
-
-// StatusDesc returns statusDesc received from the server.
-func (s *Stream) StatusDesc() string {
- return s.statusDesc
+// Status returns the status received from the server.
+func (s *Stream) Status() *status.Status {
+ return s.status
}
// SetHeader sets the header metadata. This can be called multiple times.
@@ -331,6 +333,20 @@ func (s *Stream) Read(p []byte) (n int, err error) {
return
}
+// finish sets the stream's state and status, and closes the done channel.
+// s.mu must be held by the caller. st must always be non-nil.
+func (s *Stream) finish(st *status.Status) {
+ s.status = st
+ s.state = streamDone
+ close(s.done)
+}
+
+// GoString is implemented by Stream so context.String() won't
+// race when printing %#v.
+func (s *Stream) GoString() string {
+ return fmt.Sprintf("<stream: %p, %v>", s, s.method)
+}
+
// The key to save transport.Stream in the context.
type streamKey struct{}
@@ -358,10 +374,12 @@ const (
// ServerConfig consists of all the configurations to establish a server transport.
type ServerConfig struct {
- MaxStreams uint32
- AuthInfo credentials.AuthInfo
- InTapHandle tap.ServerInHandle
- StatsHandler stats.Handler
+ MaxStreams uint32
+ AuthInfo credentials.AuthInfo
+ InTapHandle tap.ServerInHandle
+ StatsHandler stats.Handler
+ KeepaliveParams keepalive.ServerParameters
+ KeepalivePolicy keepalive.EnforcementPolicy
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
@@ -385,6 +403,8 @@ type ConnectOptions struct {
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials
+ // KeepaliveParams stores the keepalive parameters.
+ KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
}
@@ -492,10 +512,9 @@ type ServerTransport interface {
// Write may not be called on all streams.
Write(s *Stream, data []byte, opts *Options) error
- // WriteStatus sends the status of a stream to the client.
- // WriteStatus is the final call made on a stream and always
- // occurs.
- WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error
+ // WriteStatus sends the status of a stream to the client. WriteStatus is
+ // the final call made on a stream and always occurs.
+ WriteStatus(s *Stream, st *status.Status) error
// Close tears down the transport. Once it is called, the transport
// should not be accessed any more. All the pending streams and their
@@ -561,6 +580,8 @@ var (
ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
)
+// TODO: See if we can replace StreamError with status package errors.
+
// StreamError is an error that only affects one stream within a connection.
type StreamError struct {
Code codes.Code
@@ -568,7 +589,7 @@ type StreamError struct {
}
func (e StreamError) Error() string {
- return fmt.Sprintf("stream error: code = %d desc = %q", e.Code, e.Desc)
+ return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
}
// ContextErr converts the error from context package into a StreamError.