diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r-- | vendor/google.golang.org/grpc/clientconn.go | 359 |
1 files changed, 246 insertions, 113 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index c3c7691..1d3b46c 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -43,7 +43,6 @@ import ( "golang.org/x/net/context" "golang.org/x/net/trace" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/transport" @@ -68,13 +67,15 @@ var ( // errCredentialsConflict indicates that grpc.WithTransportCredentials() // and grpc.WithInsecure() are both called for a connection. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") - // errNetworkIP indicates that the connection is down due to some network I/O error. + // errNetworkIO indicates that the connection is down due to some network I/O error. errNetworkIO = errors.New("grpc: failed with network I/O error") // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. errConnDrain = errors.New("grpc: the connection is drained") // errConnClosing indicates that the connection is closing. errConnClosing = errors.New("grpc: the connection is closing") - errNoAddr = errors.New("grpc: there is no address available to dial") + // errConnUnavailable indicates that the connection is unavailable. + errConnUnavailable = errors.New("grpc: the connection is unavailable") + errNoAddr = errors.New("grpc: there is no address available to dial") // minimum time to give a connection to complete minConnectTimeout = 20 * time.Second ) @@ -196,9 +197,14 @@ func WithTimeout(d time.Duration) DialOption { } // WithDialer returns a DialOption that specifies a function to use for dialing network addresses. -func WithDialer(f func(addr string, timeout time.Duration) (net.Conn, error)) DialOption { +func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { return func(o *dialOptions) { - o.copts.Dialer = f + o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) { + if deadline, ok := ctx.Deadline(); ok { + return f(addr, deadline.Sub(time.Now())) + } + return f(addr, 0) + } } } @@ -209,12 +215,34 @@ func WithUserAgent(s string) DialOption { } } -// Dial creates a client connection the given target. +// Dial creates a client connection to the given target. func Dial(target string, opts ...DialOption) (*ClientConn, error) { + return DialContext(context.Background(), target, opts...) +} + +// DialContext creates a client connection to the given target. ctx can be used to +// cancel or expire the pending connecting. Once this function returns, the +// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close +// to terminate all the pending operations after this function returns. +// This is the EXPERIMENTAL API. +func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ target: target, conns: make(map[Address]*addrConn), } + cc.ctx, cc.cancel = context.WithCancel(context.Background()) + defer func() { + select { + case <-ctx.Done(): + conn, err = nil, ctx.Err() + default: + } + + if err != nil { + cc.Close() + } + }() + for _, opt := range opts { opt(&cc.dopts) } @@ -226,31 +254,33 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { if cc.dopts.bs == nil { cc.dopts.bs = DefaultBackoffConfig } - if cc.dopts.balancer == nil { - cc.dopts.balancer = RoundRobin(nil) - } - if err := cc.dopts.balancer.Start(target); err != nil { - return nil, err - } var ( ok bool addrs []Address ) - ch := cc.dopts.balancer.Notify() - if ch == nil { - // There is no name resolver installed. + if cc.dopts.balancer == nil { + // Connect to target directly if balancer is nil. addrs = append(addrs, Address{Addr: target}) } else { - addrs, ok = <-ch - if !ok || len(addrs) == 0 { - return nil, errNoAddr + if err := cc.dopts.balancer.Start(target); err != nil { + return nil, err + } + ch := cc.dopts.balancer.Notify() + if ch == nil { + // There is no name resolver installed. + addrs = append(addrs, Address{Addr: target}) + } else { + addrs, ok = <-ch + if !ok || len(addrs) == 0 { + return nil, errNoAddr + } } } waitC := make(chan error, 1) go func() { for _, a := range addrs { - if err := cc.newAddrConn(a, false); err != nil { + if err := cc.resetAddrConn(a, false, nil); err != nil { waitC <- err return } @@ -262,15 +292,17 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { timeoutCh = time.After(cc.dopts.timeout) } select { + case <-ctx.Done(): + return nil, ctx.Err() case err := <-waitC: if err != nil { - cc.Close() return nil, err } case <-timeoutCh: - cc.Close() return nil, ErrClientConnTimeout } + // If balancer is nil or balancer.Notify() is nil, ok will be false here. + // The lbWatcher goroutine will not be created. if ok { go cc.lbWatcher() } @@ -317,6 +349,9 @@ func (s ConnectivityState) String() string { // ClientConn represents a client connection to an RPC server. type ClientConn struct { + ctx context.Context + cancel context.CancelFunc + target string authority string dopts dialOptions @@ -347,11 +382,12 @@ func (cc *ClientConn) lbWatcher() { } if !keep { del = append(del, c) + delete(cc.conns, c.addr) } } cc.mu.Unlock() for _, a := range add { - cc.newAddrConn(a, true) + cc.resetAddrConn(a, true, nil) } for _, c := range del { c.tearDown(errConnDrain) @@ -359,13 +395,17 @@ func (cc *ClientConn) lbWatcher() { } } -func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { +// resetAddrConn creates an addrConn for addr and adds it to cc.conns. +// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason. +// If tearDownErr is nil, errConnDrain will be used instead. +func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error { ac := &addrConn{ - cc: cc, - addr: addr, - dopts: cc.dopts, - shutdownChan: make(chan struct{}), + cc: cc, + addr: addr, + dopts: cc.dopts, } + ac.ctx, ac.cancel = context.WithCancel(cc.ctx) + ac.stateCV = sync.NewCond(&ac.mu) if EnableTracing { ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr) } @@ -383,26 +423,44 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { } } } - // Insert ac into ac.cc.conns. This needs to be done before any getTransport(...) is called. - ac.cc.mu.Lock() - if ac.cc.conns == nil { - ac.cc.mu.Unlock() + // Track ac in cc. This needs to be done before any getTransport(...) is called. + cc.mu.Lock() + if cc.conns == nil { + cc.mu.Unlock() return ErrClientConnClosing } - stale := ac.cc.conns[ac.addr] - ac.cc.conns[ac.addr] = ac - ac.cc.mu.Unlock() + stale := cc.conns[ac.addr] + cc.conns[ac.addr] = ac + cc.mu.Unlock() if stale != nil { // There is an addrConn alive on ac.addr already. This could be due to - // i) stale's Close is undergoing; - // ii) a buggy Balancer notifies duplicated Addresses. - stale.tearDown(errConnDrain) + // 1) a buggy Balancer notifies duplicated Addresses; + // 2) goaway was received, a new ac will replace the old ac. + // The old ac should be deleted from cc.conns, but the + // underlying transport should drain rather than close. + if tearDownErr == nil { + // tearDownErr is nil if resetAddrConn is called by + // 1) Dial + // 2) lbWatcher + // In both cases, the stale ac should drain, not close. + stale.tearDown(errConnDrain) + } else { + stale.tearDown(tearDownErr) + } } - ac.stateCV = sync.NewCond(&ac.mu) // skipWait may overwrite the decision in ac.dopts.block. if ac.dopts.block && !skipWait { if err := ac.resetTransport(false); err != nil { - ac.tearDown(err) + if err != errConnClosing { + // Tear down ac and delete it from cc.conns. + cc.mu.Lock() + delete(cc.conns, ac.addr) + cc.mu.Unlock() + ac.tearDown(err) + } + if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { + return e.Origin() + } return err } // Start to monitor the error status of transport. @@ -412,7 +470,10 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { go func() { if err := ac.resetTransport(false); err != nil { grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err) - ac.tearDown(err) + if err != errConnClosing { + // Keep this ac in cc.conns, to get the reason it's torn down. + ac.tearDown(err) + } return } ac.transportMonitor() @@ -422,24 +483,48 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { } func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { - addr, put, err := cc.dopts.balancer.Get(ctx, opts) - if err != nil { - return nil, nil, toRPCErr(err) - } - cc.mu.RLock() - if cc.conns == nil { + var ( + ac *addrConn + ok bool + put func() + ) + if cc.dopts.balancer == nil { + // If balancer is nil, there should be only one addrConn available. + cc.mu.RLock() + if cc.conns == nil { + cc.mu.RUnlock() + return nil, nil, toRPCErr(ErrClientConnClosing) + } + for _, ac = range cc.conns { + // Break after the first iteration to get the first addrConn. + ok = true + break + } + cc.mu.RUnlock() + } else { + var ( + addr Address + err error + ) + addr, put, err = cc.dopts.balancer.Get(ctx, opts) + if err != nil { + return nil, nil, toRPCErr(err) + } + cc.mu.RLock() + if cc.conns == nil { + cc.mu.RUnlock() + return nil, nil, toRPCErr(ErrClientConnClosing) + } + ac, ok = cc.conns[addr] cc.mu.RUnlock() - return nil, nil, toRPCErr(ErrClientConnClosing) } - ac, ok := cc.conns[addr] - cc.mu.RUnlock() if !ok { if put != nil { put() } - return nil, nil, Errorf(codes.Internal, "grpc: failed to find the transport to send the rpc") + return nil, nil, errConnClosing } - t, err := ac.wait(ctx, !opts.BlockingWait) + t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait) if err != nil { if put != nil { put() @@ -451,6 +536,8 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) // Close tears down the ClientConn and all underlying connections. func (cc *ClientConn) Close() error { + cc.cancel() + cc.mu.Lock() if cc.conns == nil { cc.mu.Unlock() @@ -459,7 +546,9 @@ func (cc *ClientConn) Close() error { conns := cc.conns cc.conns = nil cc.mu.Unlock() - cc.dopts.balancer.Close() + if cc.dopts.balancer != nil { + cc.dopts.balancer.Close() + } for _, ac := range conns { ac.tearDown(ErrClientConnClosing) } @@ -468,11 +557,13 @@ func (cc *ClientConn) Close() error { // addrConn is a network connection to a given address. type addrConn struct { - cc *ClientConn - addr Address - dopts dialOptions - shutdownChan chan struct{} - events trace.EventLog + ctx context.Context + cancel context.CancelFunc + + cc *ClientConn + addr Address + dopts dialOptions + events trace.EventLog mu sync.Mutex state ConnectivityState @@ -482,6 +573,9 @@ type addrConn struct { // due to timeout. ready chan struct{} transport transport.ClientTransport + + // The reason this addrConn is torn down. + tearDownErr error } // printf records an event in ac's event log, unless ac has been closed. @@ -537,8 +631,7 @@ func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState Connecti } func (ac *addrConn) resetTransport(closeTransport bool) error { - var retries int - for { + for retries := 0; ; retries++ { ac.mu.Lock() ac.printf("connecting") if ac.state == Shutdown { @@ -558,13 +651,20 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { t.Close() } sleepTime := ac.dopts.bs.backoff(retries) - ac.dopts.copts.Timeout = sleepTime - if sleepTime < minConnectTimeout { - ac.dopts.copts.Timeout = minConnectTimeout + timeout := minConnectTimeout + if timeout < sleepTime { + timeout = sleepTime } + ctx, cancel := context.WithTimeout(ac.ctx, timeout) connectTime := time.Now() - newTransport, err := transport.NewClientTransport(ac.addr.Addr, &ac.dopts.copts) + newTransport, err := transport.NewClientTransport(ctx, ac.addr.Addr, ac.dopts.copts) if err != nil { + cancel() + + if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { + return err + } + grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr) ac.mu.Lock() if ac.state == Shutdown { // ac.tearDown(...) has been invoked. @@ -579,17 +679,12 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { ac.ready = nil } ac.mu.Unlock() - sleepTime -= time.Since(connectTime) - if sleepTime < 0 { - sleepTime = 0 - } closeTransport = false select { - case <-time.After(sleepTime): - case <-ac.shutdownChan: + case <-time.After(sleepTime - time.Since(connectTime)): + case <-ac.ctx.Done(): + return ac.ctx.Err() } - retries++ - grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr) continue } ac.mu.Lock() @@ -607,7 +702,9 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { close(ac.ready) ac.ready = nil } - ac.down = ac.cc.dopts.balancer.Up(ac.addr) + if ac.cc.dopts.balancer != nil { + ac.down = ac.cc.dopts.balancer.Up(ac.addr) + } ac.mu.Unlock() return nil } @@ -621,14 +718,42 @@ func (ac *addrConn) transportMonitor() { t := ac.transport ac.mu.Unlock() select { - // shutdownChan is needed to detect the teardown when + // This is needed to detect the teardown when // the addrConn is idle (i.e., no RPC in flight). - case <-ac.shutdownChan: + case <-ac.ctx.Done(): + select { + case <-t.Error(): + t.Close() + default: + } + return + case <-t.GoAway(): + // If GoAway happens without any network I/O error, ac is closed without shutting down the + // underlying transport (the transport will be closed when all the pending RPCs finished or + // failed.). + // If GoAway and some network I/O error happen concurrently, ac and its underlying transport + // are closed. + // In both cases, a new ac is created. + select { + case <-t.Error(): + ac.cc.resetAddrConn(ac.addr, true, errNetworkIO) + default: + ac.cc.resetAddrConn(ac.addr, true, errConnDrain) + } return case <-t.Error(): + select { + case <-ac.ctx.Done(): + t.Close() + return + case <-t.GoAway(): + ac.cc.resetAddrConn(ac.addr, true, errNetworkIO) + return + default: + } ac.mu.Lock() if ac.state == Shutdown { - // ac.tearDown(...) has been invoked. + // ac has been shutdown. ac.mu.Unlock() return } @@ -640,6 +765,10 @@ func (ac *addrConn) transportMonitor() { ac.printf("transport exiting: %v", err) ac.mu.Unlock() grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err) + if err != errConnClosing { + // Keep this ac in cc.conns, to get the reason it's torn down. + ac.tearDown(err) + } return } } @@ -647,35 +776,42 @@ func (ac *addrConn) transportMonitor() { } // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or -// iv) transport is in TransientFailure and the RPC is fail-fast. -func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTransport, error) { +// iv) transport is in TransientFailure and there's no balancer/failfast is true. +func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) { for { ac.mu.Lock() switch { case ac.state == Shutdown: + if failfast || !hasBalancer { + // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr. + err := ac.tearDownErr + ac.mu.Unlock() + return nil, err + } ac.mu.Unlock() return nil, errConnClosing case ac.state == Ready: ct := ac.transport ac.mu.Unlock() return ct, nil - case ac.state == TransientFailure && failFast: - ac.mu.Unlock() - return nil, Errorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure") - default: - ready := ac.ready - if ready == nil { - ready = make(chan struct{}) - ac.ready = ready - } - ac.mu.Unlock() - select { - case <-ctx.Done(): - return nil, toRPCErr(ctx.Err()) - // Wait until the new transport is ready or failed. - case <-ready: + case ac.state == TransientFailure: + if failfast || hasBalancer { + ac.mu.Unlock() + return nil, errConnUnavailable } } + ready := ac.ready + if ready == nil { + ready = make(chan struct{}) + ac.ready = ready + } + ac.mu.Unlock() + select { + case <-ctx.Done(): + return nil, toRPCErr(ctx.Err()) + // Wait until the new transport is ready or failed. + case <-ready: + } } } @@ -683,24 +819,28 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in // some edge cases (e.g., the caller opens and closes many addrConn's in a // tight loop. +// tearDown doesn't remove ac from ac.cc.conns. func (ac *addrConn) tearDown(err error) { + ac.cancel() + ac.mu.Lock() - defer func() { - ac.mu.Unlock() - ac.cc.mu.Lock() - if ac.cc.conns != nil { - delete(ac.cc.conns, ac.addr) - } - ac.cc.mu.Unlock() - }() - if ac.state == Shutdown { - return - } - ac.state = Shutdown + defer ac.mu.Unlock() if ac.down != nil { ac.down(downErrorf(false, false, "%v", err)) ac.down = nil } + if err == errConnDrain && ac.transport != nil { + // GracefulClose(...) may be executed multiple times when + // i) receiving multiple GoAway frames from the server; or + // ii) there are concurrent name resolver/Balancer triggered + // address removal and GoAway. + ac.transport.GracefulClose() + } + if ac.state == Shutdown { + return + } + ac.state = Shutdown + ac.tearDownErr = err ac.stateCV.Broadcast() if ac.events != nil { ac.events.Finish() @@ -710,15 +850,8 @@ func (ac *addrConn) tearDown(err error) { close(ac.ready) ac.ready = nil } - if ac.transport != nil { - if err == errConnDrain { - ac.transport.GracefulClose() - } else { - ac.transport.Close() - } - } - if ac.shutdownChan != nil { - close(ac.shutdownChan) + if ac.transport != nil && err != errConnDrain { + ac.transport.Close() } return } |