aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go986
1 files changed, 727 insertions, 259 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index a34bd98..84ba9e5 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -31,24 +31,54 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/balancer"
+ _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal"
+ "google.golang.org/grpc/internal/backoff"
+ "google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
+ _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
+ _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
"google.golang.org/grpc/stats"
+ "google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)
+const (
+ // minimum time to give a connection to complete
+ minConnectTimeout = 20 * time.Second
+ // must match grpclbName in grpclb/grpclb.go
+ grpclbName = "grpclb"
+)
+
var (
// ErrClientConnClosing indicates that the operation is illegal because
// the ClientConn is closing.
- ErrClientConnClosing = errors.New("grpc: the client connection is closing")
- // ErrClientConnTimeout indicates that the ClientConn cannot establish the
- // underlying connections within the specified timeout.
- // DEPRECATED: Please use context.DeadlineExceeded instead.
- ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
+ //
+ // Deprecated: this error should not be relied upon by users; use the status
+ // code of Canceled instead.
+ ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
+ // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
+ errConnDrain = errors.New("grpc: the connection is drained")
+ // errConnClosing indicates that the connection is closing.
+ errConnClosing = errors.New("grpc: the connection is closing")
+ // errConnUnavailable indicates that the connection is unavailable.
+ errConnUnavailable = errors.New("grpc: the connection is unavailable")
+ // errBalancerClosed indicates that the balancer is closed.
+ errBalancerClosed = errors.New("grpc: balancer is closed")
+ // We use an accessor so that minConnectTimeout can be
+ // atomically read and updated while testing.
+ getMinConnectTimeout = func() time.Duration {
+ return minConnectTimeout
+ }
+)
+// The following errors are returned from Dial and DialContext
+var (
// errNoTransportSecurity indicates that there is no transport security
// being set for ClientConn. Users should either set one or explicitly
// call WithInsecure DialOption to disable security.
@@ -62,16 +92,6 @@ var (
errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
// errNetworkIO indicates that the connection is down due to some network I/O error.
errNetworkIO = errors.New("grpc: failed with network I/O error")
- // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
- errConnDrain = errors.New("grpc: the connection is drained")
- // errConnClosing indicates that the connection is closing.
- errConnClosing = errors.New("grpc: the connection is closing")
- // errConnUnavailable indicates that the connection is unavailable.
- errConnUnavailable = errors.New("grpc: the connection is unavailable")
- // errBalancerClosed indicates that the balancer is closed.
- errBalancerClosed = errors.New("grpc: balancer is closed")
- // minimum time to give a connection to complete
- minConnectTimeout = 20 * time.Second
)
// dialOptions configure a Dial call. dialOptions are set by the DialOption
@@ -79,18 +99,23 @@ var (
type dialOptions struct {
unaryInt UnaryClientInterceptor
streamInt StreamClientInterceptor
- codec Codec
cp Compressor
dc Decompressor
- bs backoffStrategy
+ bs backoff.Strategy
block bool
insecure bool
timeout time.Duration
scChan <-chan ServiceConfig
copts transport.ConnectOptions
callOptions []CallOption
- // This is to support v1 balancer.
+ // This is used by v1 balancer dial option WithBalancer to support v1
+ // balancer, and also by WithBalancerName dial option.
balancerBuilder balancer.Builder
+ // This is to support grpclb.
+ resolverBuilder resolver.Builder
+ waitForHandshake bool
+ channelzParentID int64
+ disableServiceConfig bool
}
const (
@@ -98,9 +123,24 @@ const (
defaultClientMaxSendMessageSize = math.MaxInt32
)
+// RegisterChannelz turns on channelz service.
+// This is an EXPERIMENTAL API.
+func RegisterChannelz() {
+ channelz.TurnOn()
+}
+
// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)
+// WithWaitForHandshake blocks until the initial settings frame is received from the
+// server before assigning RPCs to the connection.
+// Experimental API.
+func WithWaitForHandshake() DialOption {
+ return func(o *dialOptions) {
+ o.waitForHandshake = true
+ }
+}
+
// WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
// before doing a write on the wire.
func WithWriteBufferSize(s int) DialOption {
@@ -133,7 +173,9 @@ func WithInitialConnWindowSize(s int32) DialOption {
}
}
-// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
+// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive.
+//
+// Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
func WithMaxMsgSize(s int) DialOption {
return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
}
@@ -146,22 +188,32 @@ func WithDefaultCallOptions(cos ...CallOption) DialOption {
}
// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
+//
+// Deprecated: use WithDefaultCallOptions(CallCustomCodec(c)) instead.
func WithCodec(c Codec) DialOption {
- return func(o *dialOptions) {
- o.codec = c
- }
+ return WithDefaultCallOptions(CallCustomCodec(c))
}
-// WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
-// compressor.
+// WithCompressor returns a DialOption which sets a Compressor to use for
+// message compression. It has lower priority than the compressor set by
+// the UseCompressor CallOption.
+//
+// Deprecated: use UseCompressor instead.
func WithCompressor(cp Compressor) DialOption {
return func(o *dialOptions) {
o.cp = cp
}
}
-// WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
-// message decompressor.
+// WithDecompressor returns a DialOption which sets a Decompressor to use for
+// incoming message decompression. If incoming response messages are encoded
+// using the decompressor's Type(), it will be used. Otherwise, the message
+// encoding will be used to look up the compressor registered via
+// encoding.RegisterCompressor, which will then be used to decompress the
+// message. If no compressor is registered for the encoding, an Unimplemented
+// status error will be returned.
+//
+// Deprecated: use encoding.RegisterCompressor instead.
func WithDecompressor(dc Decompressor) DialOption {
return func(o *dialOptions) {
o.dc = dc
@@ -170,7 +222,8 @@ func WithDecompressor(dc Decompressor) DialOption {
// WithBalancer returns a DialOption which sets a load balancer with the v1 API.
// Name resolver will be ignored if this DialOption is specified.
-// Deprecated: use the new balancer APIs in balancer package instead.
+//
+// Deprecated: use the new balancer APIs in balancer package and WithBalancerName.
func WithBalancer(b Balancer) DialOption {
return func(o *dialOptions) {
o.balancerBuilder = &balancerWrapperBuilder{
@@ -179,16 +232,35 @@ func WithBalancer(b Balancer) DialOption {
}
}
-// WithBalancerBuilder is for testing only. Users using custom balancers should
-// register their balancer and use service config to choose the balancer to use.
-func WithBalancerBuilder(b balancer.Builder) DialOption {
- // TODO(bar) remove this when switching balancer is done.
+// WithBalancerName sets the balancer that the ClientConn will be initialized
+// with. Balancer registered with balancerName will be used. This function
+// panics if no balancer was registered by balancerName.
+//
+// The balancer cannot be overridden by balancer option specified by service
+// config.
+//
+// This is an EXPERIMENTAL API.
+func WithBalancerName(balancerName string) DialOption {
+ builder := balancer.Get(balancerName)
+ if builder == nil {
+ panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
+ }
+ return func(o *dialOptions) {
+ o.balancerBuilder = builder
+ }
+}
+
+// withResolverBuilder is only for grpclb.
+func withResolverBuilder(b resolver.Builder) DialOption {
return func(o *dialOptions) {
- o.balancerBuilder = b
+ o.resolverBuilder = b
}
}
// WithServiceConfig returns a DialOption which has a channel to read the service configuration.
+//
+// Deprecated: service config should be received through name resolver, as specified here.
+// https://github.com/grpc/grpc/blob/master/doc/service_config.md
func WithServiceConfig(c <-chan ServiceConfig) DialOption {
return func(o *dialOptions) {
o.scChan = c
@@ -207,17 +279,17 @@ func WithBackoffMaxDelay(md time.Duration) DialOption {
// Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
// for use.
func WithBackoffConfig(b BackoffConfig) DialOption {
- // Set defaults to ensure that provided BackoffConfig is valid and
- // unexported fields get default values.
- setDefaults(&b)
- return withBackoff(b)
+
+ return withBackoff(backoff.Exponential{
+ MaxDelay: b.MaxDelay,
+ })
}
-// withBackoff sets the backoff strategy used for retries after a
+// withBackoff sets the backoff strategy used for connectRetryNum after a
// failed connection attempt.
//
// This can be exported if arbitrary backoff strategies are allowed by gRPC.
-func withBackoff(bs backoffStrategy) DialOption {
+func withBackoff(bs backoff.Strategy) DialOption {
return func(o *dialOptions) {
o.bs = bs
}
@@ -258,6 +330,7 @@ func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
// initially. This is valid if and only if WithBlock() is present.
+//
// Deprecated: use DialContext and context.WithTimeout instead.
func WithTimeout(d time.Duration) DialOption {
return func(o *dialOptions) {
@@ -265,18 +338,28 @@ func WithTimeout(d time.Duration) DialOption {
}
}
+func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
+ return func(o *dialOptions) {
+ o.copts.Dialer = f
+ }
+}
+
+func init() {
+ internal.WithContextDialer = withContextDialer
+ internal.WithResolverBuilder = withResolverBuilder
+}
+
// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
// Temporary() method to decide if it should try to reconnect to the network address.
func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
- return func(o *dialOptions) {
- o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
+ return withContextDialer(
+ func(ctx context.Context, addr string) (net.Conn, error) {
if deadline, ok := ctx.Deadline(); ok {
return f(addr, deadline.Sub(time.Now()))
}
return f(addr, 0)
- }
- }
+ })
}
// WithStatsHandler returns a DialOption that specifies the stats handler
@@ -335,15 +418,44 @@ func WithAuthority(a string) DialOption {
}
}
+// WithChannelzParentID returns a DialOption that specifies the channelz ID of current ClientConn's
+// parent. This function is used in nested channel creation (e.g. grpclb dial).
+func WithChannelzParentID(id int64) DialOption {
+ return func(o *dialOptions) {
+ o.channelzParentID = id
+ }
+}
+
+// WithDisableServiceConfig returns a DialOption that causes grpc to ignore any
+// service config provided by the resolver and provides a hint to the resolver
+// to not fetch service configs.
+func WithDisableServiceConfig() DialOption {
+ return func(o *dialOptions) {
+ o.disableServiceConfig = true
+ }
+}
+
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
-// DialContext creates a client connection to the given target. ctx can be used to
-// cancel or expire the pending connection. Once this function returns, the
-// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
-// to terminate all the pending operations after this function returns.
+// DialContext creates a client connection to the given target. By default, it's
+// a non-blocking dial (the function won't wait for connections to be
+// established, and connecting happens in the background). To make it a blocking
+// dial, use WithBlock() dial option.
+//
+// In the non-blocking case, the ctx does not act against the connection. It
+// only controls the setup steps.
+//
+// In the blocking case, ctx can be used to cancel or expire the pending
+// connection. Once this function returns, the cancellation and expiration of
+// ctx will be noop. Users should call ClientConn.Close to terminate all the
+// pending operations after this function returns.
+//
+// The target name syntax is defined in
+// https://github.com/grpc/grpc/blob/master/doc/naming.md.
+// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
@@ -358,6 +470,14 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
opt(&cc.dopts)
}
+ if channelz.IsOn() {
+ if cc.dopts.channelzParentID != 0 {
+ cc.channelzID = channelz.RegisterChannel(cc, cc.dopts.channelzParentID, target)
+ } else {
+ cc.channelzID = channelz.RegisterChannel(cc, 0, target)
+ }
+ }
+
if !cc.dopts.insecure {
if cc.dopts.copts.TransportCredentials == nil {
return nil, errNoTransportSecurity
@@ -378,7 +498,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if cc.dopts.copts.Dialer == nil {
cc.dopts.copts.Dialer = newProxyDialer(
func(ctx context.Context, addr string) (net.Conn, error) {
- return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
+ network, addr := parseDialTarget(addr)
+ return dialContext(ctx, network, addr)
},
)
}
@@ -419,12 +540,29 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
default:
}
}
- // Set defaults.
- if cc.dopts.codec == nil {
- cc.dopts.codec = protoCodec{}
- }
if cc.dopts.bs == nil {
- cc.dopts.bs = DefaultBackoffConfig
+ cc.dopts.bs = backoff.Exponential{
+ MaxDelay: DefaultBackoffConfig.MaxDelay,
+ }
+ }
+ if cc.dopts.resolverBuilder == nil {
+ // Only try to parse target when resolver builder is not already set.
+ cc.parsedTarget = parseTarget(cc.target)
+ grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
+ cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
+ if cc.dopts.resolverBuilder == nil {
+ // If resolver builder is still nil, the parse target's scheme is
+ // not registered. Fallback to default resolver and set Endpoint to
+ // the original unparsed target.
+ grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
+ cc.parsedTarget = resolver.Target{
+ Scheme: resolver.GetDefaultScheme(),
+ Endpoint: target,
+ }
+ cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
+ }
+ } else {
+ cc.parsedTarget = resolver.Target{Endpoint: target}
}
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
@@ -432,45 +570,11 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
} else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
cc.authority = cc.dopts.copts.Authority
} else {
- cc.authority = target
+ // Use endpoint from "scheme://authority/endpoint" as the default
+ // authority for ClientConn.
+ cc.authority = cc.parsedTarget.Endpoint
}
- if cc.dopts.balancerBuilder != nil {
- var credsClone credentials.TransportCredentials
- if creds != nil {
- credsClone = creds.Clone()
- }
- buildOpts := balancer.BuildOptions{
- DialCreds: credsClone,
- Dialer: cc.dopts.copts.Dialer,
- }
- // Build should not take long time. So it's ok to not have a goroutine for it.
- // TODO(bar) init balancer after first resolver result to support service config balancer.
- cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, buildOpts)
- } else {
- waitC := make(chan error, 1)
- go func() {
- defer close(waitC)
- // No balancer, or no resolver within the balancer. Connect directly.
- ac, err := cc.newAddrConn([]resolver.Address{{Addr: target}})
- if err != nil {
- waitC <- err
- return
- }
- if err := ac.connect(cc.dopts.block); err != nil {
- waitC <- err
- return
- }
- }()
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case err := <-waitC:
- if err != nil {
- return nil, err
- }
- }
- }
if cc.dopts.scChan != nil && !scSet {
// Blocking wait for the initial service config.
select {
@@ -486,19 +590,29 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
go cc.scWatcher()
}
+ var credsClone credentials.TransportCredentials
+ if creds := cc.dopts.copts.TransportCredentials; creds != nil {
+ credsClone = creds.Clone()
+ }
+ cc.balancerBuildOpts = balancer.BuildOptions{
+ DialCreds: credsClone,
+ Dialer: cc.dopts.copts.Dialer,
+ ChannelzParentID: cc.channelzID,
+ }
+
// Build the resolver.
cc.resolverWrapper, err = newCCResolverWrapper(cc)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
-
- if cc.balancerWrapper != nil && cc.resolverWrapper == nil {
- // TODO(bar) there should always be a resolver (DNS as the default).
- // Unblock balancer initialization with a fake resolver update if there's no resolver.
- // The balancer wrapper will not read the addresses, so an empty list works.
- // TODO(bar) remove this after the real resolver is started.
- cc.balancerWrapper.handleResolvedAddrs([]resolver.Address{}, nil)
- }
+ // Start the resolver wrapper goroutine after resolverWrapper is created.
+ //
+ // If the goroutine is started before resolverWrapper is ready, the
+ // following may happen: The goroutine sends updates to cc. cc forwards
+ // those to balancer. Balancer creates new addrConn. addrConn fails to
+ // connect, and calls resolveNow(). resolveNow() tries to use the non-ready
+ // resolverWrapper.
+ cc.resolverWrapper.start()
// A blocking dial blocks until the clientConn is ready.
if cc.dopts.block {
@@ -565,21 +679,33 @@ type ClientConn struct {
ctx context.Context
cancel context.CancelFunc
- target string
- authority string
- dopts dialOptions
- csMgr *connectivityStateManager
-
- balancerWrapper *ccBalancerWrapper
- resolverWrapper *ccResolverWrapper
+ target string
+ parsedTarget resolver.Target
+ authority string
+ dopts dialOptions
+ csMgr *connectivityStateManager
- blockingpicker *pickerWrapper
+ balancerBuildOpts balancer.BuildOptions
+ resolverWrapper *ccResolverWrapper
+ blockingpicker *pickerWrapper
mu sync.RWMutex
sc ServiceConfig
+ scRaw string
conns map[*addrConn]struct{}
// Keepalive parameter can be updated if a GoAway is received.
- mkp keepalive.ClientParameters
+ mkp keepalive.ClientParameters
+ curBalancerName string
+ preBalancerName string // previous balancer name.
+ curAddresses []resolver.Address
+ balancerWrapper *ccBalancerWrapper
+
+ channelzID int64 // channelz unique identification number
+ czmu sync.RWMutex
+ callsStarted int64
+ callsSucceeded int64
+ callsFailed int64
+ lastCallStartedTime time.Time
}
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
@@ -615,6 +741,7 @@ func (cc *ClientConn) scWatcher() {
// TODO: load balance policy runtime change is ignored.
// We may revist this decision in the future.
cc.sc = sc
+ cc.scRaw = ""
cc.mu.Unlock()
case <-cc.ctx.Done():
return
@@ -622,7 +749,115 @@ func (cc *ClientConn) scWatcher() {
}
}
+func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ if cc.conns == nil {
+ // cc was closed.
+ return
+ }
+
+ if reflect.DeepEqual(cc.curAddresses, addrs) {
+ return
+ }
+
+ cc.curAddresses = addrs
+
+ if cc.dopts.balancerBuilder == nil {
+ // Only look at balancer types and switch balancer if balancer dial
+ // option is not set.
+ var isGRPCLB bool
+ for _, a := range addrs {
+ if a.Type == resolver.GRPCLB {
+ isGRPCLB = true
+ break
+ }
+ }
+ var newBalancerName string
+ if isGRPCLB {
+ newBalancerName = grpclbName
+ } else {
+ // Address list doesn't contain grpclb address. Try to pick a
+ // non-grpclb balancer.
+ newBalancerName = cc.curBalancerName
+ // If current balancer is grpclb, switch to the previous one.
+ if newBalancerName == grpclbName {
+ newBalancerName = cc.preBalancerName
+ }
+ // The following could be true in two cases:
+ // - the first time handling resolved addresses
+ // (curBalancerName="")
+ // - the first time handling non-grpclb addresses
+ // (curBalancerName="grpclb", preBalancerName="")
+ if newBalancerName == "" {
+ newBalancerName = PickFirstBalancerName
+ }
+ }
+ cc.switchBalancer(newBalancerName)
+ } else if cc.balancerWrapper == nil {
+ // Balancer dial option was set, and this is the first time handling
+ // resolved addresses. Build a balancer with dopts.balancerBuilder.
+ cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
+ }
+
+ cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
+}
+
+// switchBalancer starts the switching from current balancer to the balancer
+// with the given name.
+//
+// It will NOT send the current address list to the new balancer. If needed,
+// caller of this function should send address list to the new balancer after
+// this function returns.
+//
+// Caller must hold cc.mu.
+func (cc *ClientConn) switchBalancer(name string) {
+ if cc.conns == nil {
+ return
+ }
+
+ if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
+ return
+ }
+
+ grpclog.Infof("ClientConn switching balancer to %q", name)
+ if cc.dopts.balancerBuilder != nil {
+ grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
+ return
+ }
+ // TODO(bar switching) change this to two steps: drain and close.
+ // Keep track of sc in wrapper.
+ if cc.balancerWrapper != nil {
+ cc.balancerWrapper.close()
+ }
+ // Clear all stickiness state.
+ cc.blockingpicker.clearStickinessState()
+
+ builder := balancer.Get(name)
+ if builder == nil {
+ grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
+ builder = newPickfirstBuilder()
+ }
+ cc.preBalancerName = cc.curBalancerName
+ cc.curBalancerName = builder.Name()
+ cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
+}
+
+func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
+ cc.mu.Lock()
+ if cc.conns == nil {
+ cc.mu.Unlock()
+ return
+ }
+ // TODO(bar switching) send updates to all balancer wrappers when balancer
+ // gracefully switching is supported.
+ cc.balancerWrapper.handleSubConnStateChange(sc, s)
+ cc.mu.Unlock()
+}
+
// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
+//
+// Caller needs to make sure len(addrs) > 0.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
ac := &addrConn{
cc: cc,
@@ -636,6 +871,9 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
cc.mu.Unlock()
return nil, ErrClientConnClosing
}
+ if channelz.IsOn() {
+ ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
+ }
cc.conns[ac] = struct{}{}
cc.mu.Unlock()
return ac, nil
@@ -654,12 +892,48 @@ func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
ac.tearDown(err)
}
+// ChannelzMetric returns ChannelInternalMetric of current ClientConn.
+// This is an EXPERIMENTAL API.
+func (cc *ClientConn) ChannelzMetric() *channelz.ChannelInternalMetric {
+ state := cc.GetState()
+ cc.czmu.RLock()
+ defer cc.czmu.RUnlock()
+ return &channelz.ChannelInternalMetric{
+ State: state,
+ Target: cc.target,
+ CallsStarted: cc.callsStarted,
+ CallsSucceeded: cc.callsSucceeded,
+ CallsFailed: cc.callsFailed,
+ LastCallStartedTimestamp: cc.lastCallStartedTime,
+ }
+}
+
+func (cc *ClientConn) incrCallsStarted() {
+ cc.czmu.Lock()
+ cc.callsStarted++
+ // TODO(yuxuanli): will make this a time.Time pointer improve performance?
+ cc.lastCallStartedTime = time.Now()
+ cc.czmu.Unlock()
+}
+
+func (cc *ClientConn) incrCallsSucceeded() {
+ cc.czmu.Lock()
+ cc.callsSucceeded++
+ cc.czmu.Unlock()
+}
+
+func (cc *ClientConn) incrCallsFailed() {
+ cc.czmu.Lock()
+ cc.callsFailed++
+ cc.czmu.Unlock()
+}
+
// connect starts to creating transport and also starts the transport monitor
// goroutine for this ac.
// It does nothing if the ac is not IDLE.
// TODO(bar) Move this to the addrConn section.
// This was part of resetAddrConn, keep it here to make the diff look clean.
-func (ac *addrConn) connect(block bool) error {
+func (ac *addrConn) connect() error {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
@@ -670,39 +944,21 @@ func (ac *addrConn) connect(block bool) error {
return nil
}
ac.state = connectivity.Connecting
- if ac.cc.balancerWrapper != nil {
- ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
- } else {
- ac.cc.csMgr.updateState(ac.state)
- }
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.mu.Unlock()
- if block {
+ // Start a goroutine connecting to the server asynchronously.
+ go func() {
if err := ac.resetTransport(); err != nil {
+ grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
if err != errConnClosing {
+ // Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
- if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
- return e.Origin()
- }
- return err
+ return
}
- // Start to monitor the error status of transport.
- go ac.transportMonitor()
- } else {
- // Start a goroutine connecting to the server asynchronously.
- go func() {
- if err := ac.resetTransport(); err != nil {
- grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
- if err != errConnClosing {
- // Keep this ac in cc.conns, to get the reason it's torn down.
- ac.tearDown(err)
- }
- return
- }
- ac.transportMonitor()
- }()
- }
+ ac.transportMonitor()
+ }()
return nil
}
@@ -731,6 +987,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
if curAddrFound {
ac.addrs = addrs
+ ac.reconnectIdx = 0 // Start reconnecting from beginning in the new list.
}
return curAddrFound
@@ -741,7 +998,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
// the corresponding MethodConfig.
// If there isn't an exact match for the input method, we look for the default config
// under the service (i.e /service/). If there is a default MethodConfig for
-// the serivce, we return it.
+// the service, we return it.
// Otherwise, we return an empty MethodConfig.
func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
// TODO: Avoid the locking here.
@@ -750,37 +1007,12 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
m, ok := cc.sc.Methods[method]
if !ok {
i := strings.LastIndex(method, "/")
- m, _ = cc.sc.Methods[method[:i+1]]
+ m = cc.sc.Methods[method[:i+1]]
}
return m
}
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool) (transport.ClientTransport, func(balancer.DoneInfo), error) {
- if cc.balancerWrapper == nil {
- // If balancer is nil, there should be only one addrConn available.
- cc.mu.RLock()
- if cc.conns == nil {
- cc.mu.RUnlock()
- // TODO this function returns toRPCErr and non-toRPCErr. Clean up
- // the errors in ClientConn.
- return nil, nil, toRPCErr(ErrClientConnClosing)
- }
- var ac *addrConn
- for ac = range cc.conns {
- // Break after the first iteration to get the first addrConn.
- break
- }
- cc.mu.RUnlock()
- if ac == nil {
- return nil, nil, errConnClosing
- }
- t, err := ac.wait(ctx, false /*hasBalancer*/, failfast)
- if err != nil {
- return nil, nil, err
- }
- return t, nil, nil
- }
-
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{})
if err != nil {
return nil, nil, toRPCErr(err)
@@ -788,9 +1020,61 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool) (transpor
return t, done, nil
}
+// handleServiceConfig parses the service config string in JSON format to Go native
+// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
+func (cc *ClientConn) handleServiceConfig(js string) error {
+ if cc.dopts.disableServiceConfig {
+ return nil
+ }
+ sc, err := parseServiceConfig(js)
+ if err != nil {
+ return err
+ }
+ cc.mu.Lock()
+ cc.scRaw = js
+ cc.sc = sc
+ if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
+ if cc.curBalancerName == grpclbName {
+ // If current balancer is grpclb, there's at least one grpclb
+ // balancer address in the resolved list. Don't switch the balancer,
+ // but change the previous balancer name, so if a new resolved
+ // address list doesn't contain grpclb address, balancer will be
+ // switched to *sc.LB.
+ cc.preBalancerName = *sc.LB
+ } else {
+ cc.switchBalancer(*sc.LB)
+ cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
+ }
+ }
+
+ if envConfigStickinessOn {
+ var newStickinessMDKey string
+ if sc.stickinessMetadataKey != nil && *sc.stickinessMetadataKey != "" {
+ newStickinessMDKey = *sc.stickinessMetadataKey
+ }
+ // newStickinessMDKey is "" if one of the following happens:
+ // - stickinessMetadataKey is set to ""
+ // - stickinessMetadataKey field doesn't exist in service config
+ cc.blockingpicker.updateStickinessMDKey(strings.ToLower(newStickinessMDKey))
+ }
+
+ cc.mu.Unlock()
+ return nil
+}
+
+func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
+ cc.mu.RLock()
+ r := cc.resolverWrapper
+ cc.mu.RUnlock()
+ if r == nil {
+ return
+ }
+ go r.resolveNow(o)
+}
+
// Close tears down the ClientConn and all underlying connections.
func (cc *ClientConn) Close() error {
- cc.cancel()
+ defer cc.cancel()
cc.mu.Lock()
if cc.conns == nil {
@@ -800,17 +1084,28 @@ func (cc *ClientConn) Close() error {
conns := cc.conns
cc.conns = nil
cc.csMgr.updateState(connectivity.Shutdown)
+
+ rWrapper := cc.resolverWrapper
+ cc.resolverWrapper = nil
+ bWrapper := cc.balancerWrapper
+ cc.balancerWrapper = nil
cc.mu.Unlock()
+
cc.blockingpicker.close()
- if cc.resolverWrapper != nil {
- cc.resolverWrapper.close()
+
+ if rWrapper != nil {
+ rWrapper.close()
}
- if cc.balancerWrapper != nil {
- cc.balancerWrapper.close()
+ if bWrapper != nil {
+ bWrapper.close()
}
+
for ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
+ if channelz.IsOn() {
+ channelz.RemoveEntry(cc.channelzID)
+ }
return nil
}
@@ -819,15 +1114,16 @@ type addrConn struct {
ctx context.Context
cancel context.CancelFunc
- cc *ClientConn
- curAddr resolver.Address
- addrs []resolver.Address
- dopts dialOptions
- events trace.EventLog
- acbw balancer.SubConn
+ cc *ClientConn
+ addrs []resolver.Address
+ dopts dialOptions
+ events trace.EventLog
+ acbw balancer.SubConn
- mu sync.Mutex
- state connectivity.State
+ mu sync.Mutex
+ curAddr resolver.Address
+ reconnectIdx int // The index in addrs list to start reconnecting from.
+ state connectivity.State
// ready is closed and becomes nil when a new transport is up or failed
// due to timeout.
ready chan struct{}
@@ -835,13 +1131,28 @@ type addrConn struct {
// The reason this addrConn is torn down.
tearDownErr error
+
+ connectRetryNum int
+ // backoffDeadline is the time until which resetTransport needs to
+ // wait before increasing connectRetryNum count.
+ backoffDeadline time.Time
+ // connectDeadline is the time by which all connection
+ // negotiations must complete.
+ connectDeadline time.Time
+
+ channelzID int64 // channelz unique identification number
+ czmu sync.RWMutex
+ callsStarted int64
+ callsSucceeded int64
+ callsFailed int64
+ lastCallStartedTime time.Time
}
// adjustParams updates parameters used to create transports upon
// receiving a GoAway.
func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
switch r {
- case transport.TooManyPings:
+ case transport.GoAwayTooManyPings:
v := 2 * ac.dopts.copts.KeepaliveParams.Time
ac.cc.mu.Lock()
if v > ac.cc.mkp.Time {
@@ -869,6 +1180,16 @@ func (ac *addrConn) errorf(format string, a ...interface{}) {
// resetTransport recreates a transport to the address for ac. The old
// transport will close itself on error or when the clientconn is closed.
+// The created transport must receive initial settings frame from the server.
+// In case that doesn't happen, transportMonitor will kill the newly created
+// transport after connectDeadline has expired.
+// In case there was an error on the transport before the settings frame was
+// received, resetTransport resumes connecting to backends after the one that
+// was previously connected to. In case end of the list is reached, resetTransport
+// backs off until the original deadline.
+// If the DialOption WithWaitForHandshake was set, resetTrasport returns
+// successfully only after server settings are received.
+//
// TODO(bar) make sure all state transitions are valid.
func (ac *addrConn) resetTransport() error {
ac.mu.Lock()
@@ -876,135 +1197,222 @@ func (ac *addrConn) resetTransport() error {
ac.mu.Unlock()
return errConnClosing
}
- ac.state = connectivity.TransientFailure
- if ac.cc.balancerWrapper != nil {
- ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
- } else {
- ac.cc.csMgr.updateState(ac.state)
- }
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
ac.transport = nil
- ac.curAddr = resolver.Address{}
+ ridx := ac.reconnectIdx
ac.mu.Unlock()
ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock()
- for retries := 0; ; retries++ {
- sleepTime := ac.dopts.bs.backoff(retries)
- timeout := minConnectTimeout
+ var backoffDeadline, connectDeadline time.Time
+ for connectRetryNum := 0; ; connectRetryNum++ {
ac.mu.Lock()
- if timeout < time.Duration(int(sleepTime)/len(ac.addrs)) {
- timeout = time.Duration(int(sleepTime) / len(ac.addrs))
+ if ac.backoffDeadline.IsZero() {
+ // This means either a successful HTTP2 connection was established
+ // or this is the first time this addrConn is trying to establish a
+ // connection.
+ backoffFor := ac.dopts.bs.Backoff(connectRetryNum) // time.Duration.
+ // This will be the duration that dial gets to finish.
+ dialDuration := getMinConnectTimeout()
+ if backoffFor > dialDuration {
+ // Give dial more time as we keep failing to connect.
+ dialDuration = backoffFor
+ }
+ start := time.Now()
+ backoffDeadline = start.Add(backoffFor)
+ connectDeadline = start.Add(dialDuration)
+ ridx = 0 // Start connecting from the beginning.
+ } else {
+ // Continue trying to connect with the same deadlines.
+ connectRetryNum = ac.connectRetryNum
+ backoffDeadline = ac.backoffDeadline
+ connectDeadline = ac.connectDeadline
+ ac.backoffDeadline = time.Time{}
+ ac.connectDeadline = time.Time{}
+ ac.connectRetryNum = 0
}
- connectTime := time.Now()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
ac.printf("connecting")
- ac.state = connectivity.Connecting
- // TODO(bar) remove condition once we always have a balancer.
- if ac.cc.balancerWrapper != nil {
- ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
- } else {
- ac.cc.csMgr.updateState(ac.state)
+ if ac.state != connectivity.Connecting {
+ ac.state = connectivity.Connecting
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
// copy ac.addrs in case of race
addrsIter := make([]resolver.Address, len(ac.addrs))
copy(addrsIter, ac.addrs)
copts := ac.dopts.copts
ac.mu.Unlock()
- for _, addr := range addrsIter {
+ connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts)
+ if err != nil {
+ return err
+ }
+ if connected {
+ return nil
+ }
+ }
+}
+
+// createTransport creates a connection to one of the backends in addrs.
+// It returns true if a connection was established.
+func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error) {
+ for i := ridx; i < len(addrs); i++ {
+ addr := addrs[i]
+ target := transport.TargetInfo{
+ Addr: addr.Addr,
+ Metadata: addr.Metadata,
+ Authority: ac.cc.authority,
+ }
+ done := make(chan struct{})
+ onPrefaceReceipt := func() {
ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- // ac.tearDown(...) has been invoked.
- ac.mu.Unlock()
- return errConnClosing
+ close(done)
+ if !ac.backoffDeadline.IsZero() {
+ // If we haven't already started reconnecting to
+ // other backends.
+ // Note, this can happen when writer notices an error
+ // and triggers resetTransport while at the same time
+ // reader receives the preface and invokes this closure.
+ ac.backoffDeadline = time.Time{}
+ ac.connectDeadline = time.Time{}
+ ac.connectRetryNum = 0
}
ac.mu.Unlock()
- sinfo := transport.TargetInfo{
- Addr: addr.Addr,
- Metadata: addr.Metadata,
- }
- newTransport, err := transport.NewClientTransport(ac.cc.ctx, sinfo, copts, timeout)
- if err != nil {
- if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
- return err
- }
- grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, addr)
- ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- // ac.tearDown(...) has been invoked.
- ac.mu.Unlock()
- return errConnClosing
- }
- ac.mu.Unlock()
- continue
- }
+ }
+ // Do not cancel in the success path because of
+ // this issue in Go1.6: https://github.com/golang/go/issues/15078.
+ connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
+ if channelz.IsOn() {
+ copts.ChannelzParentID = ac.channelzID
+ }
+ newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt)
+ if err != nil {
+ cancel()
+ ac.cc.blockingpicker.updateConnectionError(err)
ac.mu.Lock()
- ac.printf("ready")
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
- newTransport.Close()
- return errConnClosing
- }
- ac.state = connectivity.Ready
- if ac.cc.balancerWrapper != nil {
- ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
- } else {
- ac.cc.csMgr.updateState(ac.state)
- }
- t := ac.transport
- ac.transport = newTransport
- if t != nil {
- t.Close()
- }
- ac.curAddr = addr
- if ac.ready != nil {
- close(ac.ready)
- ac.ready = nil
+ return false, errConnClosing
}
ac.mu.Unlock()
- return nil
+ grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
+ continue
+ }
+ if ac.dopts.waitForHandshake {
+ select {
+ case <-done:
+ case <-connectCtx.Done():
+ // Didn't receive server preface, must kill this new transport now.
+ grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.")
+ newTr.Close()
+ break
+ case <-ac.ctx.Done():
+ }
}
ac.mu.Lock()
- ac.state = connectivity.TransientFailure
- if ac.cc.balancerWrapper != nil {
- ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
- } else {
- ac.cc.csMgr.updateState(ac.state)
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ // ac.tearDonn(...) has been invoked.
+ newTr.Close()
+ return false, errConnClosing
}
+ ac.printf("ready")
+ ac.state = connectivity.Ready
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ ac.transport = newTr
+ ac.curAddr = addr
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
- ac.mu.Unlock()
- timer := time.NewTimer(sleepTime - time.Since(connectTime))
select {
- case <-timer.C:
- case <-ac.ctx.Done():
- timer.Stop()
- return ac.ctx.Err()
+ case <-done:
+ // If the server has responded back with preface already,
+ // don't set the reconnect parameters.
+ default:
+ ac.connectRetryNum = connectRetryNum
+ ac.backoffDeadline = backoffDeadline
+ ac.connectDeadline = connectDeadline
+ ac.reconnectIdx = i + 1 // Start reconnecting from the next backend in the list.
}
+ ac.mu.Unlock()
+ return true, nil
+ }
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return false, errConnClosing
+ }
+ ac.state = connectivity.TransientFailure
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ ac.cc.resolveNow(resolver.ResolveNowOption{})
+ if ac.ready != nil {
+ close(ac.ready)
+ ac.ready = nil
+ }
+ ac.mu.Unlock()
+ timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
+ select {
+ case <-timer.C:
+ case <-ac.ctx.Done():
timer.Stop()
+ return false, ac.ctx.Err()
}
+ return false, nil
}
// Run in a goroutine to track the error in transport and create the
// new transport if an error happens. It returns when the channel is closing.
func (ac *addrConn) transportMonitor() {
for {
+ var timer *time.Timer
+ var cdeadline <-chan time.Time
ac.mu.Lock()
t := ac.transport
+ if !ac.connectDeadline.IsZero() {
+ timer = time.NewTimer(ac.connectDeadline.Sub(time.Now()))
+ cdeadline = timer.C
+ }
ac.mu.Unlock()
// Block until we receive a goaway or an error occurs.
select {
case <-t.GoAway():
+ done := t.Error()
+ cleanup := t.Close
+ // Since this transport will be orphaned (won't have a transportMonitor)
+ // we need to launch a goroutine to keep track of clientConn.Close()
+ // happening since it might not be noticed by any other goroutine for a while.
+ go func() {
+ <-done
+ cleanup()
+ }()
case <-t.Error():
+ // In case this is triggered because clientConn.Close()
+ // was called, we want to immeditately close the transport
+ // since no other goroutine might notice it for a while.
+ t.Close()
+ case <-cdeadline:
+ ac.mu.Lock()
+ // This implies that client received server preface.
+ if ac.backoffDeadline.IsZero() {
+ ac.mu.Unlock()
+ continue
+ }
+ ac.mu.Unlock()
+ timer = nil
+ // No server preface received until deadline.
+ // Kill the connection.
+ grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.")
+ t.Close()
+ }
+ if timer != nil {
+ timer.Stop()
}
// If a GoAway happened, regardless of error, adjust our keepalive
// parameters as appropriate.
@@ -1013,6 +1421,18 @@ func (ac *addrConn) transportMonitor() {
ac.adjustParams(t.GetGoAwayReason())
default:
}
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return
+ }
+ // Set connectivity state to TransientFailure before calling
+ // resetTransport. Transition READY->CONNECTING is not valid.
+ ac.state = connectivity.TransientFailure
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ ac.cc.resolveNow(resolver.ResolveNowOption{})
+ ac.curAddr = resolver.Address{}
+ ac.mu.Unlock()
if err := ac.resetTransport(); err != nil {
ac.mu.Lock()
ac.printf("transport exiting: %v", err)
@@ -1084,7 +1504,7 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
ac.mu.Unlock()
// Trigger idle ac to connect.
if idle {
- ac.connect(false)
+ ac.connect()
}
return nil, false
}
@@ -1097,8 +1517,11 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
func (ac *addrConn) tearDown(err error) {
ac.cancel()
ac.mu.Lock()
- ac.curAddr = resolver.Address{}
defer ac.mu.Unlock()
+ if ac.state == connectivity.Shutdown {
+ return
+ }
+ ac.curAddr = resolver.Address{}
if err == errConnDrain && ac.transport != nil {
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
@@ -1106,16 +1529,9 @@ func (ac *addrConn) tearDown(err error) {
// address removal and GoAway.
ac.transport.GracefulClose()
}
- if ac.state == connectivity.Shutdown {
- return
- }
ac.state = connectivity.Shutdown
ac.tearDownErr = err
- if ac.cc.balancerWrapper != nil {
- ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
- } else {
- ac.cc.csMgr.updateState(ac.state)
- }
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
if ac.events != nil {
ac.events.Finish()
ac.events = nil
@@ -1124,7 +1540,9 @@ func (ac *addrConn) tearDown(err error) {
close(ac.ready)
ac.ready = nil
}
- return
+ if channelz.IsOn() {
+ channelz.RemoveEntry(ac.channelzID)
+ }
}
func (ac *addrConn) getState() connectivity.State {
@@ -1132,3 +1550,53 @@ func (ac *addrConn) getState() connectivity.State {
defer ac.mu.Unlock()
return ac.state
}
+
+func (ac *addrConn) getCurAddr() (ret resolver.Address) {
+ ac.mu.Lock()
+ ret = ac.curAddr
+ ac.mu.Unlock()
+ return
+}
+
+func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
+ ac.mu.Lock()
+ addr := ac.curAddr.Addr
+ ac.mu.Unlock()
+ state := ac.getState()
+ ac.czmu.RLock()
+ defer ac.czmu.RUnlock()
+ return &channelz.ChannelInternalMetric{
+ State: state,
+ Target: addr,
+ CallsStarted: ac.callsStarted,
+ CallsSucceeded: ac.callsSucceeded,
+ CallsFailed: ac.callsFailed,
+ LastCallStartedTimestamp: ac.lastCallStartedTime,
+ }
+}
+
+func (ac *addrConn) incrCallsStarted() {
+ ac.czmu.Lock()
+ ac.callsStarted++
+ ac.lastCallStartedTime = time.Now()
+ ac.czmu.Unlock()
+}
+
+func (ac *addrConn) incrCallsSucceeded() {
+ ac.czmu.Lock()
+ ac.callsSucceeded++
+ ac.czmu.Unlock()
+}
+
+func (ac *addrConn) incrCallsFailed() {
+ ac.czmu.Lock()
+ ac.callsFailed++
+ ac.czmu.Unlock()
+}
+
+// ErrClientConnTimeout indicates that the ClientConn cannot establish the
+// underlying connections within the specified timeout.
+//
+// Deprecated: This error is never returned by grpc and should not be
+// referenced by users.
+var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")