aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc')
-rw-r--r--vendor/google.golang.org/grpc/balancer.go20
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go61
-rw-r--r--vendor/google.golang.org/grpc/credentials/credentials.go42
-rw-r--r--vendor/google.golang.org/grpc/metadata/metadata.go13
-rw-r--r--vendor/google.golang.org/grpc/server.go13
-rw-r--r--vendor/google.golang.org/grpc/stream.go4
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_client.go27
-rw-r--r--vendor/google.golang.org/grpc/transport/http_util.go5
-rw-r--r--vendor/google.golang.org/grpc/transport/transport.go15
9 files changed, 122 insertions, 78 deletions
diff --git a/vendor/google.golang.org/grpc/balancer.go b/vendor/google.golang.org/grpc/balancer.go
index 419e214..e217a20 100644
--- a/vendor/google.golang.org/grpc/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer.go
@@ -38,6 +38,7 @@ import (
"sync"
"golang.org/x/net/context"
+ "google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/naming"
)
@@ -52,6 +53,14 @@ type Address struct {
Metadata interface{}
}
+// BalancerConfig specifies the configurations for Balancer.
+type BalancerConfig struct {
+ // DialCreds is the transport credential the Balancer implementation can
+ // use to dial to a remote load balancer server. The Balancer implementations
+ // can ignore this if it does not need to talk to another party securely.
+ DialCreds credentials.TransportCredentials
+}
+
// BalancerGetOptions configures a Get call.
// This is the EXPERIMENTAL API and may be changed or extended in the future.
type BalancerGetOptions struct {
@@ -66,11 +75,11 @@ type Balancer interface {
// Start does the initialization work to bootstrap a Balancer. For example,
// this function may start the name resolution and watch the updates. It will
// be called when dialing.
- Start(target string) error
+ Start(target string, config BalancerConfig) error
// Up informs the Balancer that gRPC has a connection to the server at
// addr. It returns down which is called once the connection to addr gets
// lost or closed.
- // TODO: It is not clear how to construct and take advantage the meaningful error
+ // TODO: It is not clear how to construct and take advantage of the meaningful error
// parameter for down. Need realistic demands to guide.
Up(addr Address) (down func(error))
// Get gets the address of a server for the RPC corresponding to ctx.
@@ -205,7 +214,12 @@ func (rr *roundRobin) watchAddrUpdates() error {
return nil
}
-func (rr *roundRobin) Start(target string) error {
+func (rr *roundRobin) Start(target string, config BalancerConfig) error {
+ rr.mu.Lock()
+ defer rr.mu.Unlock()
+ if rr.done {
+ return ErrClientConnClosing
+ }
if rr.r == nil {
// If there is no name resolver installed, it is not needed to
// do name resolution. In this case, target is added into rr.addrs
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index a257f01..11dce44 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -270,31 +270,47 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if cc.dopts.bs == nil {
cc.dopts.bs = DefaultBackoffConfig
}
-
- var (
- ok bool
- addrs []Address
- )
- if cc.dopts.balancer == nil {
- // Connect to target directly if balancer is nil.
- addrs = append(addrs, Address{Addr: target})
+ creds := cc.dopts.copts.TransportCredentials
+ if creds != nil && creds.Info().ServerName != "" {
+ cc.authority = creds.Info().ServerName
} else {
- if err := cc.dopts.balancer.Start(target); err != nil {
- return nil, err
+ colonPos := strings.LastIndex(target, ":")
+ if colonPos == -1 {
+ colonPos = len(target)
}
- ch := cc.dopts.balancer.Notify()
- if ch == nil {
- // There is no name resolver installed.
+ cc.authority = target[:colonPos]
+ }
+ var ok bool
+ waitC := make(chan error, 1)
+ go func() {
+ var addrs []Address
+ if cc.dopts.balancer == nil {
+ // Connect to target directly if balancer is nil.
addrs = append(addrs, Address{Addr: target})
} else {
- addrs, ok = <-ch
- if !ok || len(addrs) == 0 {
- return nil, errNoAddr
+ var credsClone credentials.TransportCredentials
+ if creds != nil {
+ credsClone = creds.Clone()
+ }
+ config := BalancerConfig{
+ DialCreds: credsClone,
+ }
+ if err := cc.dopts.balancer.Start(target, config); err != nil {
+ waitC <- err
+ return
+ }
+ ch := cc.dopts.balancer.Notify()
+ if ch == nil {
+ // There is no name resolver installed.
+ addrs = append(addrs, Address{Addr: target})
+ } else {
+ addrs, ok = <-ch
+ if !ok || len(addrs) == 0 {
+ waitC <- errNoAddr
+ return
+ }
}
}
- }
- waitC := make(chan error, 1)
- go func() {
for _, a := range addrs {
if err := cc.resetAddrConn(a, false, nil); err != nil {
waitC <- err
@@ -322,11 +338,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if ok {
go cc.lbWatcher()
}
- colonPos := strings.LastIndex(target, ":")
- if colonPos == -1 {
- colonPos = len(target)
- }
- cc.authority = target[:colonPos]
return cc, nil
}
@@ -680,7 +691,7 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
return err
}
- grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr)
+ grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
ac.mu.Lock()
if ac.state == Shutdown {
// ac.tearDown(...) has been invoked.
diff --git a/vendor/google.golang.org/grpc/credentials/credentials.go b/vendor/google.golang.org/grpc/credentials/credentials.go
index 13be457..5555ef0 100644
--- a/vendor/google.golang.org/grpc/credentials/credentials.go
+++ b/vendor/google.golang.org/grpc/credentials/credentials.go
@@ -72,7 +72,7 @@ type PerRPCCredentials interface {
}
// ProtocolInfo provides information regarding the gRPC wire protocol version,
-// security protocol, security protocol version in use, etc.
+// security protocol, security protocol version in use, server name, etc.
type ProtocolInfo struct {
// ProtocolVersion is the gRPC wire protocol version.
ProtocolVersion string
@@ -80,6 +80,8 @@ type ProtocolInfo struct {
SecurityProtocol string
// SecurityVersion is the security protocol version.
SecurityVersion string
+ // ServerName is the user-configured server name.
+ ServerName string
}
// AuthInfo defines the common interface for the auth information the users are interested in.
@@ -107,6 +109,12 @@ type TransportCredentials interface {
ServerHandshake(net.Conn) (net.Conn, AuthInfo, error)
// Info provides the ProtocolInfo of this TransportCredentials.
Info() ProtocolInfo
+ // Clone makes a copy of this TransportCredentials.
+ Clone() TransportCredentials
+ // OverrideServerName overrides the server name used to verify the hostname on the returned certificates from the server.
+ // gRPC internals also use it to override the virtual hosting name if it is set.
+ // It must be called before dialing. Currently, this is only used by grpclb.
+ OverrideServerName(string) error
}
// TLSInfo contains the auth information for a TLS authenticated connection.
@@ -130,19 +138,10 @@ func (c tlsCreds) Info() ProtocolInfo {
return ProtocolInfo{
SecurityProtocol: "tls",
SecurityVersion: "1.2",
+ ServerName: c.config.ServerName,
}
}
-// GetRequestMetadata returns nil, nil since TLS credentials does not have
-// metadata.
-func (c *tlsCreds) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
- return nil, nil
-}
-
-func (c *tlsCreds) RequireTransportSecurity() bool {
- return true
-}
-
func (c *tlsCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (_ net.Conn, _ AuthInfo, err error) {
// use local cfg to avoid clobbering ServerName if using multiple endpoints
cfg := cloneTLSConfig(c.config)
@@ -179,6 +178,15 @@ func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error)
return conn, TLSInfo{conn.ConnectionState()}, nil
}
+func (c *tlsCreds) Clone() TransportCredentials {
+ return NewTLS(c.config)
+}
+
+func (c *tlsCreds) OverrideServerName(serverNameOverride string) error {
+ c.config.ServerName = serverNameOverride
+ return nil
+}
+
// NewTLS uses c to construct a TransportCredentials based on TLS.
func NewTLS(c *tls.Config) TransportCredentials {
tc := &tlsCreds{cloneTLSConfig(c)}
@@ -187,12 +195,16 @@ func NewTLS(c *tls.Config) TransportCredentials {
}
// NewClientTLSFromCert constructs a TLS from the input certificate for client.
-func NewClientTLSFromCert(cp *x509.CertPool, serverName string) TransportCredentials {
- return NewTLS(&tls.Config{ServerName: serverName, RootCAs: cp})
+// serverNameOverride is for testing only. If set to a non empty string,
+// it will override the virtual host name of authority (e.g. :authority header field) in requests.
+func NewClientTLSFromCert(cp *x509.CertPool, serverNameOverride string) TransportCredentials {
+ return NewTLS(&tls.Config{ServerName: serverNameOverride, RootCAs: cp})
}
// NewClientTLSFromFile constructs a TLS from the input certificate file for client.
-func NewClientTLSFromFile(certFile, serverName string) (TransportCredentials, error) {
+// serverNameOverride is for testing only. If set to a non empty string,
+// it will override the virtual host name of authority (e.g. :authority header field) in requests.
+func NewClientTLSFromFile(certFile, serverNameOverride string) (TransportCredentials, error) {
b, err := ioutil.ReadFile(certFile)
if err != nil {
return nil, err
@@ -201,7 +213,7 @@ func NewClientTLSFromFile(certFile, serverName string) (TransportCredentials, er
if !cp.AppendCertsFromPEM(b) {
return nil, fmt.Errorf("credentials: failed to append certificates")
}
- return NewTLS(&tls.Config{ServerName: serverName, RootCAs: cp}), nil
+ return NewTLS(&tls.Config{ServerName: serverNameOverride, RootCAs: cp}), nil
}
// NewServerTLSFromCert constructs a TLS from the input certificate for server.
diff --git a/vendor/google.golang.org/grpc/metadata/metadata.go b/vendor/google.golang.org/grpc/metadata/metadata.go
index 954c0f7..3c0ca7a 100644
--- a/vendor/google.golang.org/grpc/metadata/metadata.go
+++ b/vendor/google.golang.org/grpc/metadata/metadata.go
@@ -117,10 +117,17 @@ func (md MD) Len() int {
// Copy returns a copy of md.
func (md MD) Copy() MD {
+ return Join(md)
+}
+
+// Join joins any number of MDs into a single MD.
+// The order of values for each key is determined by the order in which
+// the MDs containing those values are presented to Join.
+func Join(mds ...MD) MD {
out := MD{}
- for k, v := range md {
- for _, i := range v {
- out[k] = append(out[k], i)
+ for _, md := range mds {
+ for k, v := range md {
+ out[k] = append(out[k], v...)
}
}
return out
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 2524dd2..debbd79 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -873,25 +873,28 @@ func SendHeader(ctx context.Context, md metadata.MD) error {
}
stream, ok := transport.StreamFromContext(ctx)
if !ok {
- return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx)
+ return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
t := stream.ServerTransport()
if t == nil {
grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
}
- return t.WriteHeader(stream, md)
+ if err := t.WriteHeader(stream, md); err != nil {
+ return toRPCErr(err)
+ }
+ return nil
}
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
-// It may be called at most once from a unary RPC handler. The ctx is the RPC
-// handler's Context or one derived from it.
+// When called more than once, all the provided metadata will be merged.
+// The ctx is the RPC handler's Context or one derived from it.
func SetTrailer(ctx context.Context, md metadata.MD) error {
if md.Len() == 0 {
return nil
}
stream, ok := transport.StreamFromContext(ctx)
if !ok {
- return fmt.Errorf("grpc: failed to fetch the stream from the context %v", ctx)
+ return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
return stream.SetTrailer(md)
}
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index e1b4759..68d777b 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -414,8 +414,8 @@ type ServerStream interface {
// after SendProto. It fails if called multiple times or if
// called after SendProto.
SendHeader(metadata.MD) error
- // SetTrailer sets the trailer metadata which will be sent with the
- // RPC status.
+ // SetTrailer sets the trailer metadata which will be sent with the RPC status.
+ // When called more than once, all the provided metadata will be merged.
SetTrailer(metadata.MD)
Stream
}
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go
index 4892faa..3c18554 100644
--- a/vendor/google.golang.org/grpc/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/transport/http2_client.go
@@ -252,8 +252,10 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
}
- // Make a stream be able to cancel the pending operations by itself.
- s.ctx, s.cancel = context.WithCancel(ctx)
+ // The client side stream context should have exactly the same life cycle with the user provided context.
+ // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
+ // So we use the original context here instead of creating a copy.
+ s.ctx = ctx
s.dec = &recvBufferReader{
ctx: s.ctx,
goAway: s.goAway,
@@ -265,16 +267,6 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// NewStream creates a stream and register it into the transport as "active"
// streams.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
- // Record the timeout value on the context.
- var timeout time.Duration
- if dl, ok := ctx.Deadline(); ok {
- timeout = dl.Sub(time.Now())
- }
- select {
- case <-ctx.Done():
- return nil, ContextErr(ctx.Err())
- default:
- }
pr := &peer.Peer{
Addr: t.conn.RemoteAddr(),
}
@@ -381,9 +373,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if callHdr.SendCompress != "" {
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
}
- if timeout > 0 {
+ if dl, ok := ctx.Deadline(); ok {
+ // Send out timeout regardless its value. The server can detect timeout context by itself.
+ timeout := dl.Sub(time.Now())
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
}
+
for k, v := range authData {
// Capital header names are illegal in HTTP/2.
k = strings.ToLower(k)
@@ -852,6 +847,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
state.processHeaderField(hf)
}
if state.err != nil {
+ s.mu.Lock()
+ if !s.headerDone {
+ close(s.headerChan)
+ s.headerDone = true
+ }
+ s.mu.Unlock()
s.write(recvMsg{err: state.err})
// Something wrong. Stops reading even when there is remaining.
return
diff --git a/vendor/google.golang.org/grpc/transport/http_util.go b/vendor/google.golang.org/grpc/transport/http_util.go
index b024594..a3c68d4 100644
--- a/vendor/google.golang.org/grpc/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/transport/http_util.go
@@ -253,6 +253,9 @@ func div(d, r time.Duration) int64 {
// TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it.
func encodeTimeout(t time.Duration) string {
+ if t <= 0 {
+ return "0n"
+ }
if d := div(t, time.Nanosecond); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "n"
}
@@ -349,7 +352,7 @@ func decodeGrpcMessageUnchecked(msg string) string {
for i := 0; i < lenMsg; i++ {
c := msg[i]
if c == percentByte && i+2 < lenMsg {
- parsed, err := strconv.ParseInt(msg[i+1:i+3], 16, 8)
+ parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
if err != nil {
buf.WriteByte(c)
} else {
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go
index f4d8daf..3d6b6a6 100644
--- a/vendor/google.golang.org/grpc/transport/transport.go
+++ b/vendor/google.golang.org/grpc/transport/transport.go
@@ -39,7 +39,6 @@ package transport // import "google.golang.org/grpc/transport"
import (
"bytes"
- "errors"
"fmt"
"io"
"net"
@@ -169,7 +168,8 @@ type Stream struct {
// nil for client side Stream.
st ServerTransport
// ctx is the associated context of the stream.
- ctx context.Context
+ ctx context.Context
+ // cancel is always nil for client side Stream.
cancel context.CancelFunc
// done is closed when the final status arrives.
done chan struct{}
@@ -286,19 +286,12 @@ func (s *Stream) StatusDesc() string {
return s.statusDesc
}
-// ErrIllegalTrailerSet indicates that the trailer has already been set or it
-// is too late to do so.
-var ErrIllegalTrailerSet = errors.New("transport: trailer has been set")
-
// SetTrailer sets the trailer metadata which will be sent with the RPC status
-// by the server. This can only be called at most once. Server side only.
+// by the server. This can be called multiple times. Server side only.
func (s *Stream) SetTrailer(md metadata.MD) error {
s.mu.Lock()
defer s.mu.Unlock()
- if s.trailer != nil {
- return ErrIllegalTrailerSet
- }
- s.trailer = md.Copy()
+ s.trailer = metadata.Join(s.trailer, md)
return nil
}