From 73ef85bc5db590c22689e11be20737a3dd88168f Mon Sep 17 00:00:00 2001 From: Niall Sheridan Date: Wed, 28 Dec 2016 21:18:36 +0000 Subject: Update dependencies --- vendor/google.golang.org/grpc/clientconn.go | 86 ++++++++++++++++++++++++++--- 1 file changed, 77 insertions(+), 9 deletions(-) (limited to 'vendor/google.golang.org/grpc/clientconn.go') diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index 6167472..aa6b63d 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -54,6 +54,8 @@ var ( ErrClientConnClosing = errors.New("grpc: the client connection is closing") // ErrClientConnTimeout indicates that the ClientConn cannot establish the // underlying connections within the specified timeout. + // DEPRECATED: Please use context.DeadlineExceeded instead. This error will be + // removed in Q1 2017. ErrClientConnTimeout = errors.New("grpc: timed out when dialing") // errNoTransportSecurity indicates that there is no transport security @@ -93,6 +95,7 @@ type dialOptions struct { block bool insecure bool timeout time.Duration + scChan <-chan ServiceConfig copts transport.ConnectOptions } @@ -129,6 +132,13 @@ func WithBalancer(b Balancer) DialOption { } } +// WithServiceConfig returns a DialOption which has a channel to read the service configuration. +func WithServiceConfig(c <-chan ServiceConfig) DialOption { + return func(o *dialOptions) { + o.scChan = c + } +} + // WithBackoffMaxDelay configures the dialer to use the provided maximum delay // when backing off after failed connection attempts. func WithBackoffMaxDelay(md time.Duration) DialOption { @@ -199,6 +209,8 @@ func WithTimeout(d time.Duration) DialOption { } // WithDialer returns a DialOption that specifies a function to use for dialing network addresses. +// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's +// Temporary() method to decide if it should try to reconnect to the network address. func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { return func(o *dialOptions) { o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) { @@ -210,6 +222,17 @@ func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { } } +// FailOnNonTempDialError returns a DialOption that specified if gRPC fails on non-temporary dial errors. +// If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network +// address and won't try to reconnect. +// The default value of FailOnNonTempDialError is false. +// This is an EXPERIMENTAL API. +func FailOnNonTempDialError(f bool) DialOption { + return func(o *dialOptions) { + o.copts.FailOnNonTempDialError = f + } +} + // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs. func WithUserAgent(s string) DialOption { return func(o *dialOptions) { @@ -247,6 +270,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * conns: make(map[Address]*addrConn), } cc.ctx, cc.cancel = context.WithCancel(context.Background()) + for _, opt := range opts { + opt(&cc.dopts) + } + if cc.dopts.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) + defer cancel() + } + defer func() { select { case <-ctx.Done(): @@ -259,10 +291,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } }() - for _, opt := range opts { - opt(&cc.dopts) + if cc.dopts.scChan != nil { + // Wait for the initial service config. + select { + case sc, ok := <-cc.dopts.scChan: + if ok { + cc.sc = sc + } + case <-ctx.Done(): + return nil, ctx.Err() + } } - // Set defaults. if cc.dopts.codec == nil { cc.dopts.codec = protoCodec{} @@ -284,6 +323,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * waitC := make(chan error, 1) go func() { var addrs []Address + if cc.dopts.balancer == nil && cc.sc.LB != nil { + cc.dopts.balancer = cc.sc.LB + } if cc.dopts.balancer == nil { // Connect to target directly if balancer is nil. addrs = append(addrs, Address{Addr: target}) @@ -319,10 +361,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } close(waitC) }() - var timeoutCh <-chan time.Time - if cc.dopts.timeout > 0 { - timeoutCh = time.After(cc.dopts.timeout) - } select { case <-ctx.Done(): return nil, ctx.Err() @@ -330,14 +368,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * if err != nil { return nil, err } - case <-timeoutCh: - return nil, ErrClientConnTimeout } + // If balancer is nil or balancer.Notify() is nil, ok will be false here. // The lbWatcher goroutine will not be created. if ok { go cc.lbWatcher() } + + if cc.dopts.scChan != nil { + go cc.scWatcher() + } return cc, nil } @@ -384,6 +425,7 @@ type ClientConn struct { dopts dialOptions mu sync.RWMutex + sc ServiceConfig conns map[Address]*addrConn } @@ -422,6 +464,24 @@ func (cc *ClientConn) lbWatcher() { } } +func (cc *ClientConn) scWatcher() { + for { + select { + case sc, ok := <-cc.dopts.scChan: + if !ok { + return + } + cc.mu.Lock() + // TODO: load balance policy runtime change is ignored. + // We may revist this decision in the future. + cc.sc = sc + cc.mu.Unlock() + case <-cc.ctx.Done(): + return + } + } +} + // resetAddrConn creates an addrConn for addr and adds it to cc.conns. // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason. // If tearDownErr is nil, errConnDrain will be used instead. @@ -509,6 +569,14 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err return nil } +// TODO: Avoid the locking here. +func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) { + cc.mu.RLock() + defer cc.mu.RUnlock() + m, ok = cc.sc.Methods[method] + return +} + func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { var ( ac *addrConn -- cgit v1.2.3