aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go359
1 files changed, 246 insertions, 113 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index c3c7691..1d3b46c 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -43,7 +43,6 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/trace"
- "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/transport"
@@ -68,13 +67,15 @@ var (
// errCredentialsConflict indicates that grpc.WithTransportCredentials()
// and grpc.WithInsecure() are both called for a connection.
errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
- // errNetworkIP indicates that the connection is down due to some network I/O error.
+ // errNetworkIO indicates that the connection is down due to some network I/O error.
errNetworkIO = errors.New("grpc: failed with network I/O error")
// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
- errNoAddr = errors.New("grpc: there is no address available to dial")
+ // errConnUnavailable indicates that the connection is unavailable.
+ errConnUnavailable = errors.New("grpc: the connection is unavailable")
+ errNoAddr = errors.New("grpc: there is no address available to dial")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
@@ -196,9 +197,14 @@ func WithTimeout(d time.Duration) DialOption {
}
// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
-func WithDialer(f func(addr string, timeout time.Duration) (net.Conn, error)) DialOption {
+func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
return func(o *dialOptions) {
- o.copts.Dialer = f
+ o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
+ if deadline, ok := ctx.Deadline(); ok {
+ return f(addr, deadline.Sub(time.Now()))
+ }
+ return f(addr, 0)
+ }
}
}
@@ -209,12 +215,34 @@ func WithUserAgent(s string) DialOption {
}
}
-// Dial creates a client connection the given target.
+// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
+ return DialContext(context.Background(), target, opts...)
+}
+
+// DialContext creates a client connection to the given target. ctx can be used to
+// cancel or expire the pending connecting. Once this function returns, the
+// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
+// to terminate all the pending operations after this function returns.
+// This is the EXPERIMENTAL API.
+func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
conns: make(map[Address]*addrConn),
}
+ cc.ctx, cc.cancel = context.WithCancel(context.Background())
+ defer func() {
+ select {
+ case <-ctx.Done():
+ conn, err = nil, ctx.Err()
+ default:
+ }
+
+ if err != nil {
+ cc.Close()
+ }
+ }()
+
for _, opt := range opts {
opt(&cc.dopts)
}
@@ -226,31 +254,33 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
if cc.dopts.bs == nil {
cc.dopts.bs = DefaultBackoffConfig
}
- if cc.dopts.balancer == nil {
- cc.dopts.balancer = RoundRobin(nil)
- }
- if err := cc.dopts.balancer.Start(target); err != nil {
- return nil, err
- }
var (
ok bool
addrs []Address
)
- ch := cc.dopts.balancer.Notify()
- if ch == nil {
- // There is no name resolver installed.
+ if cc.dopts.balancer == nil {
+ // Connect to target directly if balancer is nil.
addrs = append(addrs, Address{Addr: target})
} else {
- addrs, ok = <-ch
- if !ok || len(addrs) == 0 {
- return nil, errNoAddr
+ if err := cc.dopts.balancer.Start(target); err != nil {
+ return nil, err
+ }
+ ch := cc.dopts.balancer.Notify()
+ if ch == nil {
+ // There is no name resolver installed.
+ addrs = append(addrs, Address{Addr: target})
+ } else {
+ addrs, ok = <-ch
+ if !ok || len(addrs) == 0 {
+ return nil, errNoAddr
+ }
}
}
waitC := make(chan error, 1)
go func() {
for _, a := range addrs {
- if err := cc.newAddrConn(a, false); err != nil {
+ if err := cc.resetAddrConn(a, false, nil); err != nil {
waitC <- err
return
}
@@ -262,15 +292,17 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
timeoutCh = time.After(cc.dopts.timeout)
}
select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
case err := <-waitC:
if err != nil {
- cc.Close()
return nil, err
}
case <-timeoutCh:
- cc.Close()
return nil, ErrClientConnTimeout
}
+ // If balancer is nil or balancer.Notify() is nil, ok will be false here.
+ // The lbWatcher goroutine will not be created.
if ok {
go cc.lbWatcher()
}
@@ -317,6 +349,9 @@ func (s ConnectivityState) String() string {
// ClientConn represents a client connection to an RPC server.
type ClientConn struct {
+ ctx context.Context
+ cancel context.CancelFunc
+
target string
authority string
dopts dialOptions
@@ -347,11 +382,12 @@ func (cc *ClientConn) lbWatcher() {
}
if !keep {
del = append(del, c)
+ delete(cc.conns, c.addr)
}
}
cc.mu.Unlock()
for _, a := range add {
- cc.newAddrConn(a, true)
+ cc.resetAddrConn(a, true, nil)
}
for _, c := range del {
c.tearDown(errConnDrain)
@@ -359,13 +395,17 @@ func (cc *ClientConn) lbWatcher() {
}
}
-func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
+// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
+// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
+// If tearDownErr is nil, errConnDrain will be used instead.
+func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
ac := &addrConn{
- cc: cc,
- addr: addr,
- dopts: cc.dopts,
- shutdownChan: make(chan struct{}),
+ cc: cc,
+ addr: addr,
+ dopts: cc.dopts,
}
+ ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
+ ac.stateCV = sync.NewCond(&ac.mu)
if EnableTracing {
ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
}
@@ -383,26 +423,44 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
}
}
}
- // Insert ac into ac.cc.conns. This needs to be done before any getTransport(...) is called.
- ac.cc.mu.Lock()
- if ac.cc.conns == nil {
- ac.cc.mu.Unlock()
+ // Track ac in cc. This needs to be done before any getTransport(...) is called.
+ cc.mu.Lock()
+ if cc.conns == nil {
+ cc.mu.Unlock()
return ErrClientConnClosing
}
- stale := ac.cc.conns[ac.addr]
- ac.cc.conns[ac.addr] = ac
- ac.cc.mu.Unlock()
+ stale := cc.conns[ac.addr]
+ cc.conns[ac.addr] = ac
+ cc.mu.Unlock()
if stale != nil {
// There is an addrConn alive on ac.addr already. This could be due to
- // i) stale's Close is undergoing;
- // ii) a buggy Balancer notifies duplicated Addresses.
- stale.tearDown(errConnDrain)
+ // 1) a buggy Balancer notifies duplicated Addresses;
+ // 2) goaway was received, a new ac will replace the old ac.
+ // The old ac should be deleted from cc.conns, but the
+ // underlying transport should drain rather than close.
+ if tearDownErr == nil {
+ // tearDownErr is nil if resetAddrConn is called by
+ // 1) Dial
+ // 2) lbWatcher
+ // In both cases, the stale ac should drain, not close.
+ stale.tearDown(errConnDrain)
+ } else {
+ stale.tearDown(tearDownErr)
+ }
}
- ac.stateCV = sync.NewCond(&ac.mu)
// skipWait may overwrite the decision in ac.dopts.block.
if ac.dopts.block && !skipWait {
if err := ac.resetTransport(false); err != nil {
- ac.tearDown(err)
+ if err != errConnClosing {
+ // Tear down ac and delete it from cc.conns.
+ cc.mu.Lock()
+ delete(cc.conns, ac.addr)
+ cc.mu.Unlock()
+ ac.tearDown(err)
+ }
+ if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
+ return e.Origin()
+ }
return err
}
// Start to monitor the error status of transport.
@@ -412,7 +470,10 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
go func() {
if err := ac.resetTransport(false); err != nil {
grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
- ac.tearDown(err)
+ if err != errConnClosing {
+ // Keep this ac in cc.conns, to get the reason it's torn down.
+ ac.tearDown(err)
+ }
return
}
ac.transportMonitor()
@@ -422,24 +483,48 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
}
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
- addr, put, err := cc.dopts.balancer.Get(ctx, opts)
- if err != nil {
- return nil, nil, toRPCErr(err)
- }
- cc.mu.RLock()
- if cc.conns == nil {
+ var (
+ ac *addrConn
+ ok bool
+ put func()
+ )
+ if cc.dopts.balancer == nil {
+ // If balancer is nil, there should be only one addrConn available.
+ cc.mu.RLock()
+ if cc.conns == nil {
+ cc.mu.RUnlock()
+ return nil, nil, toRPCErr(ErrClientConnClosing)
+ }
+ for _, ac = range cc.conns {
+ // Break after the first iteration to get the first addrConn.
+ ok = true
+ break
+ }
+ cc.mu.RUnlock()
+ } else {
+ var (
+ addr Address
+ err error
+ )
+ addr, put, err = cc.dopts.balancer.Get(ctx, opts)
+ if err != nil {
+ return nil, nil, toRPCErr(err)
+ }
+ cc.mu.RLock()
+ if cc.conns == nil {
+ cc.mu.RUnlock()
+ return nil, nil, toRPCErr(ErrClientConnClosing)
+ }
+ ac, ok = cc.conns[addr]
cc.mu.RUnlock()
- return nil, nil, toRPCErr(ErrClientConnClosing)
}
- ac, ok := cc.conns[addr]
- cc.mu.RUnlock()
if !ok {
if put != nil {
put()
}
- return nil, nil, Errorf(codes.Internal, "grpc: failed to find the transport to send the rpc")
+ return nil, nil, errConnClosing
}
- t, err := ac.wait(ctx, !opts.BlockingWait)
+ t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
if err != nil {
if put != nil {
put()
@@ -451,6 +536,8 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
// Close tears down the ClientConn and all underlying connections.
func (cc *ClientConn) Close() error {
+ cc.cancel()
+
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
@@ -459,7 +546,9 @@ func (cc *ClientConn) Close() error {
conns := cc.conns
cc.conns = nil
cc.mu.Unlock()
- cc.dopts.balancer.Close()
+ if cc.dopts.balancer != nil {
+ cc.dopts.balancer.Close()
+ }
for _, ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
@@ -468,11 +557,13 @@ func (cc *ClientConn) Close() error {
// addrConn is a network connection to a given address.
type addrConn struct {
- cc *ClientConn
- addr Address
- dopts dialOptions
- shutdownChan chan struct{}
- events trace.EventLog
+ ctx context.Context
+ cancel context.CancelFunc
+
+ cc *ClientConn
+ addr Address
+ dopts dialOptions
+ events trace.EventLog
mu sync.Mutex
state ConnectivityState
@@ -482,6 +573,9 @@ type addrConn struct {
// due to timeout.
ready chan struct{}
transport transport.ClientTransport
+
+ // The reason this addrConn is torn down.
+ tearDownErr error
}
// printf records an event in ac's event log, unless ac has been closed.
@@ -537,8 +631,7 @@ func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState Connecti
}
func (ac *addrConn) resetTransport(closeTransport bool) error {
- var retries int
- for {
+ for retries := 0; ; retries++ {
ac.mu.Lock()
ac.printf("connecting")
if ac.state == Shutdown {
@@ -558,13 +651,20 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
t.Close()
}
sleepTime := ac.dopts.bs.backoff(retries)
- ac.dopts.copts.Timeout = sleepTime
- if sleepTime < minConnectTimeout {
- ac.dopts.copts.Timeout = minConnectTimeout
+ timeout := minConnectTimeout
+ if timeout < sleepTime {
+ timeout = sleepTime
}
+ ctx, cancel := context.WithTimeout(ac.ctx, timeout)
connectTime := time.Now()
- newTransport, err := transport.NewClientTransport(ac.addr.Addr, &ac.dopts.copts)
+ newTransport, err := transport.NewClientTransport(ctx, ac.addr.Addr, ac.dopts.copts)
if err != nil {
+ cancel()
+
+ if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
+ return err
+ }
+ grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr)
ac.mu.Lock()
if ac.state == Shutdown {
// ac.tearDown(...) has been invoked.
@@ -579,17 +679,12 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
ac.ready = nil
}
ac.mu.Unlock()
- sleepTime -= time.Since(connectTime)
- if sleepTime < 0 {
- sleepTime = 0
- }
closeTransport = false
select {
- case <-time.After(sleepTime):
- case <-ac.shutdownChan:
+ case <-time.After(sleepTime - time.Since(connectTime)):
+ case <-ac.ctx.Done():
+ return ac.ctx.Err()
}
- retries++
- grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr)
continue
}
ac.mu.Lock()
@@ -607,7 +702,9 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
close(ac.ready)
ac.ready = nil
}
- ac.down = ac.cc.dopts.balancer.Up(ac.addr)
+ if ac.cc.dopts.balancer != nil {
+ ac.down = ac.cc.dopts.balancer.Up(ac.addr)
+ }
ac.mu.Unlock()
return nil
}
@@ -621,14 +718,42 @@ func (ac *addrConn) transportMonitor() {
t := ac.transport
ac.mu.Unlock()
select {
- // shutdownChan is needed to detect the teardown when
+ // This is needed to detect the teardown when
// the addrConn is idle (i.e., no RPC in flight).
- case <-ac.shutdownChan:
+ case <-ac.ctx.Done():
+ select {
+ case <-t.Error():
+ t.Close()
+ default:
+ }
+ return
+ case <-t.GoAway():
+ // If GoAway happens without any network I/O error, ac is closed without shutting down the
+ // underlying transport (the transport will be closed when all the pending RPCs finished or
+ // failed.).
+ // If GoAway and some network I/O error happen concurrently, ac and its underlying transport
+ // are closed.
+ // In both cases, a new ac is created.
+ select {
+ case <-t.Error():
+ ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
+ default:
+ ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
+ }
return
case <-t.Error():
+ select {
+ case <-ac.ctx.Done():
+ t.Close()
+ return
+ case <-t.GoAway():
+ ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
+ return
+ default:
+ }
ac.mu.Lock()
if ac.state == Shutdown {
- // ac.tearDown(...) has been invoked.
+ // ac has been shutdown.
ac.mu.Unlock()
return
}
@@ -640,6 +765,10 @@ func (ac *addrConn) transportMonitor() {
ac.printf("transport exiting: %v", err)
ac.mu.Unlock()
grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
+ if err != errConnClosing {
+ // Keep this ac in cc.conns, to get the reason it's torn down.
+ ac.tearDown(err)
+ }
return
}
}
@@ -647,35 +776,42 @@ func (ac *addrConn) transportMonitor() {
}
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
-// iv) transport is in TransientFailure and the RPC is fail-fast.
-func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTransport, error) {
+// iv) transport is in TransientFailure and there's no balancer/failfast is true.
+func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
for {
ac.mu.Lock()
switch {
case ac.state == Shutdown:
+ if failfast || !hasBalancer {
+ // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
+ err := ac.tearDownErr
+ ac.mu.Unlock()
+ return nil, err
+ }
ac.mu.Unlock()
return nil, errConnClosing
case ac.state == Ready:
ct := ac.transport
ac.mu.Unlock()
return ct, nil
- case ac.state == TransientFailure && failFast:
- ac.mu.Unlock()
- return nil, Errorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure")
- default:
- ready := ac.ready
- if ready == nil {
- ready = make(chan struct{})
- ac.ready = ready
- }
- ac.mu.Unlock()
- select {
- case <-ctx.Done():
- return nil, toRPCErr(ctx.Err())
- // Wait until the new transport is ready or failed.
- case <-ready:
+ case ac.state == TransientFailure:
+ if failfast || hasBalancer {
+ ac.mu.Unlock()
+ return nil, errConnUnavailable
}
}
+ ready := ac.ready
+ if ready == nil {
+ ready = make(chan struct{})
+ ac.ready = ready
+ }
+ ac.mu.Unlock()
+ select {
+ case <-ctx.Done():
+ return nil, toRPCErr(ctx.Err())
+ // Wait until the new transport is ready or failed.
+ case <-ready:
+ }
}
}
@@ -683,24 +819,28 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
// some edge cases (e.g., the caller opens and closes many addrConn's in a
// tight loop.
+// tearDown doesn't remove ac from ac.cc.conns.
func (ac *addrConn) tearDown(err error) {
+ ac.cancel()
+
ac.mu.Lock()
- defer func() {
- ac.mu.Unlock()
- ac.cc.mu.Lock()
- if ac.cc.conns != nil {
- delete(ac.cc.conns, ac.addr)
- }
- ac.cc.mu.Unlock()
- }()
- if ac.state == Shutdown {
- return
- }
- ac.state = Shutdown
+ defer ac.mu.Unlock()
if ac.down != nil {
ac.down(downErrorf(false, false, "%v", err))
ac.down = nil
}
+ if err == errConnDrain && ac.transport != nil {
+ // GracefulClose(...) may be executed multiple times when
+ // i) receiving multiple GoAway frames from the server; or
+ // ii) there are concurrent name resolver/Balancer triggered
+ // address removal and GoAway.
+ ac.transport.GracefulClose()
+ }
+ if ac.state == Shutdown {
+ return
+ }
+ ac.state = Shutdown
+ ac.tearDownErr = err
ac.stateCV.Broadcast()
if ac.events != nil {
ac.events.Finish()
@@ -710,15 +850,8 @@ func (ac *addrConn) tearDown(err error) {
close(ac.ready)
ac.ready = nil
}
- if ac.transport != nil {
- if err == errConnDrain {
- ac.transport.GracefulClose()
- } else {
- ac.transport.Close()
- }
- }
- if ac.shutdownChan != nil {
- close(ac.shutdownChan)
+ if ac.transport != nil && err != errConnDrain {
+ ac.transport.Close()
}
return
}