aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go86
1 files changed, 77 insertions, 9 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 6167472..aa6b63d 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -54,6 +54,8 @@ var (
ErrClientConnClosing = errors.New("grpc: the client connection is closing")
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
+ // DEPRECATED: Please use context.DeadlineExceeded instead. This error will be
+ // removed in Q1 2017.
ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
// errNoTransportSecurity indicates that there is no transport security
@@ -93,6 +95,7 @@ type dialOptions struct {
block bool
insecure bool
timeout time.Duration
+ scChan <-chan ServiceConfig
copts transport.ConnectOptions
}
@@ -129,6 +132,13 @@ func WithBalancer(b Balancer) DialOption {
}
}
+// WithServiceConfig returns a DialOption which has a channel to read the service configuration.
+func WithServiceConfig(c <-chan ServiceConfig) DialOption {
+ return func(o *dialOptions) {
+ o.scChan = c
+ }
+}
+
// WithBackoffMaxDelay configures the dialer to use the provided maximum delay
// when backing off after failed connection attempts.
func WithBackoffMaxDelay(md time.Duration) DialOption {
@@ -199,6 +209,8 @@ func WithTimeout(d time.Duration) DialOption {
}
// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
+// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
+// Temporary() method to decide if it should try to reconnect to the network address.
func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
return func(o *dialOptions) {
o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
@@ -210,6 +222,17 @@ func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
}
}
+// FailOnNonTempDialError returns a DialOption that specified if gRPC fails on non-temporary dial errors.
+// If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
+// address and won't try to reconnect.
+// The default value of FailOnNonTempDialError is false.
+// This is an EXPERIMENTAL API.
+func FailOnNonTempDialError(f bool) DialOption {
+ return func(o *dialOptions) {
+ o.copts.FailOnNonTempDialError = f
+ }
+}
+
// WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
func WithUserAgent(s string) DialOption {
return func(o *dialOptions) {
@@ -247,6 +270,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
conns: make(map[Address]*addrConn),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
+ for _, opt := range opts {
+ opt(&cc.dopts)
+ }
+ if cc.dopts.timeout > 0 {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
+ defer cancel()
+ }
+
defer func() {
select {
case <-ctx.Done():
@@ -259,10 +291,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}()
- for _, opt := range opts {
- opt(&cc.dopts)
+ if cc.dopts.scChan != nil {
+ // Wait for the initial service config.
+ select {
+ case sc, ok := <-cc.dopts.scChan:
+ if ok {
+ cc.sc = sc
+ }
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
}
-
// Set defaults.
if cc.dopts.codec == nil {
cc.dopts.codec = protoCodec{}
@@ -284,6 +323,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
waitC := make(chan error, 1)
go func() {
var addrs []Address
+ if cc.dopts.balancer == nil && cc.sc.LB != nil {
+ cc.dopts.balancer = cc.sc.LB
+ }
if cc.dopts.balancer == nil {
// Connect to target directly if balancer is nil.
addrs = append(addrs, Address{Addr: target})
@@ -319,10 +361,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
close(waitC)
}()
- var timeoutCh <-chan time.Time
- if cc.dopts.timeout > 0 {
- timeoutCh = time.After(cc.dopts.timeout)
- }
select {
case <-ctx.Done():
return nil, ctx.Err()
@@ -330,14 +368,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if err != nil {
return nil, err
}
- case <-timeoutCh:
- return nil, ErrClientConnTimeout
}
+
// If balancer is nil or balancer.Notify() is nil, ok will be false here.
// The lbWatcher goroutine will not be created.
if ok {
go cc.lbWatcher()
}
+
+ if cc.dopts.scChan != nil {
+ go cc.scWatcher()
+ }
return cc, nil
}
@@ -384,6 +425,7 @@ type ClientConn struct {
dopts dialOptions
mu sync.RWMutex
+ sc ServiceConfig
conns map[Address]*addrConn
}
@@ -422,6 +464,24 @@ func (cc *ClientConn) lbWatcher() {
}
}
+func (cc *ClientConn) scWatcher() {
+ for {
+ select {
+ case sc, ok := <-cc.dopts.scChan:
+ if !ok {
+ return
+ }
+ cc.mu.Lock()
+ // TODO: load balance policy runtime change is ignored.
+ // We may revist this decision in the future.
+ cc.sc = sc
+ cc.mu.Unlock()
+ case <-cc.ctx.Done():
+ return
+ }
+ }
+}
+
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
@@ -509,6 +569,14 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err
return nil
}
+// TODO: Avoid the locking here.
+func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) {
+ cc.mu.RLock()
+ defer cc.mu.RUnlock()
+ m, ok = cc.sc.Methods[method]
+ return
+}
+
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
var (
ac *addrConn