aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go158
1 files changed, 79 insertions, 79 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 53a1212..c3c7691 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -53,24 +53,28 @@ var (
// ErrClientConnClosing indicates that the operation is illegal because
// the ClientConn is closing.
ErrClientConnClosing = errors.New("grpc: the client connection is closing")
+ // ErrClientConnTimeout indicates that the ClientConn cannot establish the
+ // underlying connections within the specified timeout.
+ ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
// errNoTransportSecurity indicates that there is no transport security
// being set for ClientConn. Users should either set one or explicitly
// call WithInsecure DialOption to disable security.
errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
- // errCredentialsMisuse indicates that users want to transmit security information
- // (e.g., oauth2 token) which requires secure connection on an insecure
+ // errTransportCredentialsMissing indicates that users want to transmit security
+ // information (e.g., oauth2 token) which requires secure connection on an insecure
// connection.
- errCredentialsMisuse = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportAuthenticator() to set)")
- // errClientConnTimeout indicates that the connection could not be
- // established or re-established within the specified timeout.
- errClientConnTimeout = errors.New("grpc: timed out trying to connect")
+ errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
+ // errCredentialsConflict indicates that grpc.WithTransportCredentials()
+ // and grpc.WithInsecure() are both called for a connection.
+ errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
// errNetworkIP indicates that the connection is down due to some network I/O error.
errNetworkIO = errors.New("grpc: failed with network I/O error")
// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
+ errNoAddr = errors.New("grpc: there is no address available to dial")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
@@ -85,6 +89,7 @@ type dialOptions struct {
balancer Balancer
block bool
insecure bool
+ timeout time.Duration
copts transport.ConnectOptions
}
@@ -168,24 +173,25 @@ func WithInsecure() DialOption {
// WithTransportCredentials returns a DialOption which configures a
// connection level security credentials (e.g., TLS/SSL).
-func WithTransportCredentials(creds credentials.TransportAuthenticator) DialOption {
+func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
return func(o *dialOptions) {
- o.copts.AuthOptions = append(o.copts.AuthOptions, creds)
+ o.copts.TransportCredentials = creds
}
}
// WithPerRPCCredentials returns a DialOption which sets
// credentials which will place auth state on each outbound RPC.
-func WithPerRPCCredentials(creds credentials.Credentials) DialOption {
+func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
return func(o *dialOptions) {
- o.copts.AuthOptions = append(o.copts.AuthOptions, creds)
+ o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
}
}
-// WithTimeout returns a DialOption that configures a timeout for dialing a client connection.
+// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
+// initially. This is valid if and only if WithBlock() is present.
func WithTimeout(d time.Duration) DialOption {
return func(o *dialOptions) {
- o.copts.Timeout = d
+ o.timeout = d
}
}
@@ -212,42 +218,62 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
for _, opt := range opts {
opt(&cc.dopts)
}
+
+ // Set defaults.
if cc.dopts.codec == nil {
- // Set the default codec.
cc.dopts.codec = protoCodec{}
}
-
if cc.dopts.bs == nil {
cc.dopts.bs = DefaultBackoffConfig
}
-
- cc.balancer = cc.dopts.balancer
- if cc.balancer == nil {
- cc.balancer = RoundRobin(nil)
+ if cc.dopts.balancer == nil {
+ cc.dopts.balancer = RoundRobin(nil)
}
- if err := cc.balancer.Start(target); err != nil {
+
+ if err := cc.dopts.balancer.Start(target); err != nil {
return nil, err
}
- ch := cc.balancer.Notify()
+ var (
+ ok bool
+ addrs []Address
+ )
+ ch := cc.dopts.balancer.Notify()
if ch == nil {
// There is no name resolver installed.
- addr := Address{Addr: target}
- if err := cc.newAddrConn(addr, false); err != nil {
- return nil, err
- }
+ addrs = append(addrs, Address{Addr: target})
} else {
- addrs, ok := <-ch
+ addrs, ok = <-ch
if !ok || len(addrs) == 0 {
- return nil, fmt.Errorf("grpc: there is no address available to dial")
+ return nil, errNoAddr
}
+ }
+ waitC := make(chan error, 1)
+ go func() {
for _, a := range addrs {
if err := cc.newAddrConn(a, false); err != nil {
- return nil, err
+ waitC <- err
+ return
}
}
+ close(waitC)
+ }()
+ var timeoutCh <-chan time.Time
+ if cc.dopts.timeout > 0 {
+ timeoutCh = time.After(cc.dopts.timeout)
+ }
+ select {
+ case err := <-waitC:
+ if err != nil {
+ cc.Close()
+ return nil, err
+ }
+ case <-timeoutCh:
+ cc.Close()
+ return nil, ErrClientConnTimeout
+ }
+ if ok {
go cc.lbWatcher()
}
-
colonPos := strings.LastIndex(target, ":")
if colonPos == -1 {
colonPos = len(target)
@@ -292,7 +318,6 @@ func (s ConnectivityState) String() string {
// ClientConn represents a client connection to an RPC server.
type ClientConn struct {
target string
- balancer Balancer
authority string
dopts dialOptions
@@ -301,7 +326,7 @@ type ClientConn struct {
}
func (cc *ClientConn) lbWatcher() {
- for addrs := range cc.balancer.Notify() {
+ for addrs := range cc.dopts.balancer.Notify() {
var (
add []Address // Addresses need to setup connections.
del []*addrConn // Connections need to tear down.
@@ -345,19 +370,16 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
}
if !ac.dopts.insecure {
- var ok bool
- for _, cd := range ac.dopts.copts.AuthOptions {
- if _, ok = cd.(credentials.TransportAuthenticator); ok {
- break
- }
- }
- if !ok {
+ if ac.dopts.copts.TransportCredentials == nil {
return errNoTransportSecurity
}
} else {
- for _, cd := range ac.dopts.copts.AuthOptions {
+ if ac.dopts.copts.TransportCredentials != nil {
+ return errCredentialsConflict
+ }
+ for _, cd := range ac.dopts.copts.PerRPCCredentials {
if cd.RequireTransportSecurity() {
- return errCredentialsMisuse
+ return errTransportCredentialsMissing
}
}
}
@@ -400,15 +422,14 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
}
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
- // TODO(zhaoq): Implement fail-fast logic.
- addr, put, err := cc.balancer.Get(ctx, opts)
+ addr, put, err := cc.dopts.balancer.Get(ctx, opts)
if err != nil {
- return nil, nil, err
+ return nil, nil, toRPCErr(err)
}
cc.mu.RLock()
if cc.conns == nil {
cc.mu.RUnlock()
- return nil, nil, ErrClientConnClosing
+ return nil, nil, toRPCErr(ErrClientConnClosing)
}
ac, ok := cc.conns[addr]
cc.mu.RUnlock()
@@ -416,9 +437,9 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
if put != nil {
put()
}
- return nil, nil, transport.StreamErrorf(codes.Internal, "grpc: failed to find the transport to send the rpc")
+ return nil, nil, Errorf(codes.Internal, "grpc: failed to find the transport to send the rpc")
}
- t, err := ac.wait(ctx)
+ t, err := ac.wait(ctx, !opts.BlockingWait)
if err != nil {
if put != nil {
put()
@@ -438,7 +459,7 @@ func (cc *ClientConn) Close() error {
conns := cc.conns
cc.conns = nil
cc.mu.Unlock()
- cc.balancer.Close()
+ cc.dopts.balancer.Close()
for _, ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
@@ -517,7 +538,6 @@ func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState Connecti
func (ac *addrConn) resetTransport(closeTransport bool) error {
var retries int
- start := time.Now()
for {
ac.mu.Lock()
ac.printf("connecting")
@@ -537,29 +557,13 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
if closeTransport && t != nil {
t.Close()
}
- // Adjust timeout for the current try.
- copts := ac.dopts.copts
- if copts.Timeout < 0 {
- ac.tearDown(errClientConnTimeout)
- return errClientConnTimeout
- }
- if copts.Timeout > 0 {
- copts.Timeout -= time.Since(start)
- if copts.Timeout <= 0 {
- ac.tearDown(errClientConnTimeout)
- return errClientConnTimeout
- }
- }
sleepTime := ac.dopts.bs.backoff(retries)
- timeout := sleepTime
- if timeout < minConnectTimeout {
- timeout = minConnectTimeout
- }
- if copts.Timeout == 0 || copts.Timeout > timeout {
- copts.Timeout = timeout
+ ac.dopts.copts.Timeout = sleepTime
+ if sleepTime < minConnectTimeout {
+ ac.dopts.copts.Timeout = minConnectTimeout
}
connectTime := time.Now()
- newTransport, err := transport.NewClientTransport(ac.addr.Addr, &copts)
+ newTransport, err := transport.NewClientTransport(ac.addr.Addr, &ac.dopts.copts)
if err != nil {
ac.mu.Lock()
if ac.state == Shutdown {
@@ -579,14 +583,6 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
if sleepTime < 0 {
sleepTime = 0
}
- // Fail early before falling into sleep.
- if ac.dopts.copts.Timeout > 0 && ac.dopts.copts.Timeout < sleepTime+time.Since(start) {
- ac.mu.Lock()
- ac.errorf("connection timeout")
- ac.mu.Unlock()
- ac.tearDown(errClientConnTimeout)
- return errClientConnTimeout
- }
closeTransport = false
select {
case <-time.After(sleepTime):
@@ -611,7 +607,7 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
close(ac.ready)
ac.ready = nil
}
- ac.down = ac.cc.balancer.Up(ac.addr)
+ ac.down = ac.cc.dopts.balancer.Up(ac.addr)
ac.mu.Unlock()
return nil
}
@@ -650,8 +646,9 @@ func (ac *addrConn) transportMonitor() {
}
}
-// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed.
-func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error) {
+// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
+// iv) transport is in TransientFailure and the RPC is fail-fast.
+func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTransport, error) {
for {
ac.mu.Lock()
switch {
@@ -662,6 +659,9 @@ func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error)
ct := ac.transport
ac.mu.Unlock()
return ct, nil
+ case ac.state == TransientFailure && failFast:
+ ac.mu.Unlock()
+ return nil, Errorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure")
default:
ready := ac.ready
if ready == nil {
@@ -671,7 +671,7 @@ func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error)
ac.mu.Unlock()
select {
case <-ctx.Done():
- return nil, transport.ContextErr(ctx.Err())
+ return nil, toRPCErr(ctx.Err())
// Wait until the new transport is ready or failed.
case <-ready:
}