diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r-- | vendor/google.golang.org/grpc/clientconn.go | 158 |
1 files changed, 79 insertions, 79 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index 53a1212..c3c7691 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -53,24 +53,28 @@ var ( // ErrClientConnClosing indicates that the operation is illegal because // the ClientConn is closing. ErrClientConnClosing = errors.New("grpc: the client connection is closing") + // ErrClientConnTimeout indicates that the ClientConn cannot establish the + // underlying connections within the specified timeout. + ErrClientConnTimeout = errors.New("grpc: timed out when dialing") // errNoTransportSecurity indicates that there is no transport security // being set for ClientConn. Users should either set one or explicitly // call WithInsecure DialOption to disable security. errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") - // errCredentialsMisuse indicates that users want to transmit security information - // (e.g., oauth2 token) which requires secure connection on an insecure + // errTransportCredentialsMissing indicates that users want to transmit security + // information (e.g., oauth2 token) which requires secure connection on an insecure // connection. - errCredentialsMisuse = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportAuthenticator() to set)") - // errClientConnTimeout indicates that the connection could not be - // established or re-established within the specified timeout. - errClientConnTimeout = errors.New("grpc: timed out trying to connect") + errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") + // errCredentialsConflict indicates that grpc.WithTransportCredentials() + // and grpc.WithInsecure() are both called for a connection. + errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") // errNetworkIP indicates that the connection is down due to some network I/O error. errNetworkIO = errors.New("grpc: failed with network I/O error") // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. errConnDrain = errors.New("grpc: the connection is drained") // errConnClosing indicates that the connection is closing. errConnClosing = errors.New("grpc: the connection is closing") + errNoAddr = errors.New("grpc: there is no address available to dial") // minimum time to give a connection to complete minConnectTimeout = 20 * time.Second ) @@ -85,6 +89,7 @@ type dialOptions struct { balancer Balancer block bool insecure bool + timeout time.Duration copts transport.ConnectOptions } @@ -168,24 +173,25 @@ func WithInsecure() DialOption { // WithTransportCredentials returns a DialOption which configures a // connection level security credentials (e.g., TLS/SSL). -func WithTransportCredentials(creds credentials.TransportAuthenticator) DialOption { +func WithTransportCredentials(creds credentials.TransportCredentials) DialOption { return func(o *dialOptions) { - o.copts.AuthOptions = append(o.copts.AuthOptions, creds) + o.copts.TransportCredentials = creds } } // WithPerRPCCredentials returns a DialOption which sets // credentials which will place auth state on each outbound RPC. -func WithPerRPCCredentials(creds credentials.Credentials) DialOption { +func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption { return func(o *dialOptions) { - o.copts.AuthOptions = append(o.copts.AuthOptions, creds) + o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds) } } -// WithTimeout returns a DialOption that configures a timeout for dialing a client connection. +// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn +// initially. This is valid if and only if WithBlock() is present. func WithTimeout(d time.Duration) DialOption { return func(o *dialOptions) { - o.copts.Timeout = d + o.timeout = d } } @@ -212,42 +218,62 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { for _, opt := range opts { opt(&cc.dopts) } + + // Set defaults. if cc.dopts.codec == nil { - // Set the default codec. cc.dopts.codec = protoCodec{} } - if cc.dopts.bs == nil { cc.dopts.bs = DefaultBackoffConfig } - - cc.balancer = cc.dopts.balancer - if cc.balancer == nil { - cc.balancer = RoundRobin(nil) + if cc.dopts.balancer == nil { + cc.dopts.balancer = RoundRobin(nil) } - if err := cc.balancer.Start(target); err != nil { + + if err := cc.dopts.balancer.Start(target); err != nil { return nil, err } - ch := cc.balancer.Notify() + var ( + ok bool + addrs []Address + ) + ch := cc.dopts.balancer.Notify() if ch == nil { // There is no name resolver installed. - addr := Address{Addr: target} - if err := cc.newAddrConn(addr, false); err != nil { - return nil, err - } + addrs = append(addrs, Address{Addr: target}) } else { - addrs, ok := <-ch + addrs, ok = <-ch if !ok || len(addrs) == 0 { - return nil, fmt.Errorf("grpc: there is no address available to dial") + return nil, errNoAddr } + } + waitC := make(chan error, 1) + go func() { for _, a := range addrs { if err := cc.newAddrConn(a, false); err != nil { - return nil, err + waitC <- err + return } } + close(waitC) + }() + var timeoutCh <-chan time.Time + if cc.dopts.timeout > 0 { + timeoutCh = time.After(cc.dopts.timeout) + } + select { + case err := <-waitC: + if err != nil { + cc.Close() + return nil, err + } + case <-timeoutCh: + cc.Close() + return nil, ErrClientConnTimeout + } + if ok { go cc.lbWatcher() } - colonPos := strings.LastIndex(target, ":") if colonPos == -1 { colonPos = len(target) @@ -292,7 +318,6 @@ func (s ConnectivityState) String() string { // ClientConn represents a client connection to an RPC server. type ClientConn struct { target string - balancer Balancer authority string dopts dialOptions @@ -301,7 +326,7 @@ type ClientConn struct { } func (cc *ClientConn) lbWatcher() { - for addrs := range cc.balancer.Notify() { + for addrs := range cc.dopts.balancer.Notify() { var ( add []Address // Addresses need to setup connections. del []*addrConn // Connections need to tear down. @@ -345,19 +370,16 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr) } if !ac.dopts.insecure { - var ok bool - for _, cd := range ac.dopts.copts.AuthOptions { - if _, ok = cd.(credentials.TransportAuthenticator); ok { - break - } - } - if !ok { + if ac.dopts.copts.TransportCredentials == nil { return errNoTransportSecurity } } else { - for _, cd := range ac.dopts.copts.AuthOptions { + if ac.dopts.copts.TransportCredentials != nil { + return errCredentialsConflict + } + for _, cd := range ac.dopts.copts.PerRPCCredentials { if cd.RequireTransportSecurity() { - return errCredentialsMisuse + return errTransportCredentialsMissing } } } @@ -400,15 +422,14 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { } func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { - // TODO(zhaoq): Implement fail-fast logic. - addr, put, err := cc.balancer.Get(ctx, opts) + addr, put, err := cc.dopts.balancer.Get(ctx, opts) if err != nil { - return nil, nil, err + return nil, nil, toRPCErr(err) } cc.mu.RLock() if cc.conns == nil { cc.mu.RUnlock() - return nil, nil, ErrClientConnClosing + return nil, nil, toRPCErr(ErrClientConnClosing) } ac, ok := cc.conns[addr] cc.mu.RUnlock() @@ -416,9 +437,9 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) if put != nil { put() } - return nil, nil, transport.StreamErrorf(codes.Internal, "grpc: failed to find the transport to send the rpc") + return nil, nil, Errorf(codes.Internal, "grpc: failed to find the transport to send the rpc") } - t, err := ac.wait(ctx) + t, err := ac.wait(ctx, !opts.BlockingWait) if err != nil { if put != nil { put() @@ -438,7 +459,7 @@ func (cc *ClientConn) Close() error { conns := cc.conns cc.conns = nil cc.mu.Unlock() - cc.balancer.Close() + cc.dopts.balancer.Close() for _, ac := range conns { ac.tearDown(ErrClientConnClosing) } @@ -517,7 +538,6 @@ func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState Connecti func (ac *addrConn) resetTransport(closeTransport bool) error { var retries int - start := time.Now() for { ac.mu.Lock() ac.printf("connecting") @@ -537,29 +557,13 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { if closeTransport && t != nil { t.Close() } - // Adjust timeout for the current try. - copts := ac.dopts.copts - if copts.Timeout < 0 { - ac.tearDown(errClientConnTimeout) - return errClientConnTimeout - } - if copts.Timeout > 0 { - copts.Timeout -= time.Since(start) - if copts.Timeout <= 0 { - ac.tearDown(errClientConnTimeout) - return errClientConnTimeout - } - } sleepTime := ac.dopts.bs.backoff(retries) - timeout := sleepTime - if timeout < minConnectTimeout { - timeout = minConnectTimeout - } - if copts.Timeout == 0 || copts.Timeout > timeout { - copts.Timeout = timeout + ac.dopts.copts.Timeout = sleepTime + if sleepTime < minConnectTimeout { + ac.dopts.copts.Timeout = minConnectTimeout } connectTime := time.Now() - newTransport, err := transport.NewClientTransport(ac.addr.Addr, &copts) + newTransport, err := transport.NewClientTransport(ac.addr.Addr, &ac.dopts.copts) if err != nil { ac.mu.Lock() if ac.state == Shutdown { @@ -579,14 +583,6 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { if sleepTime < 0 { sleepTime = 0 } - // Fail early before falling into sleep. - if ac.dopts.copts.Timeout > 0 && ac.dopts.copts.Timeout < sleepTime+time.Since(start) { - ac.mu.Lock() - ac.errorf("connection timeout") - ac.mu.Unlock() - ac.tearDown(errClientConnTimeout) - return errClientConnTimeout - } closeTransport = false select { case <-time.After(sleepTime): @@ -611,7 +607,7 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { close(ac.ready) ac.ready = nil } - ac.down = ac.cc.balancer.Up(ac.addr) + ac.down = ac.cc.dopts.balancer.Up(ac.addr) ac.mu.Unlock() return nil } @@ -650,8 +646,9 @@ func (ac *addrConn) transportMonitor() { } } -// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed. -func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error) { +// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or +// iv) transport is in TransientFailure and the RPC is fail-fast. +func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTransport, error) { for { ac.mu.Lock() switch { @@ -662,6 +659,9 @@ func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error) ct := ac.transport ac.mu.Unlock() return ct, nil + case ac.state == TransientFailure && failFast: + ac.mu.Unlock() + return nil, Errorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure") default: ready := ac.ready if ready == nil { @@ -671,7 +671,7 @@ func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error) ac.mu.Unlock() select { case <-ctx.Done(): - return nil, transport.ContextErr(ctx.Err()) + return nil, toRPCErr(ctx.Err()) // Wait until the new transport is ready or failed. case <-ready: } |