aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/balancer_v1_wrapper.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/balancer_v1_wrapper.go')
-rw-r--r--vendor/google.golang.org/grpc/balancer_v1_wrapper.go99
1 files changed, 30 insertions, 69 deletions
diff --git a/vendor/google.golang.org/grpc/balancer_v1_wrapper.go b/vendor/google.golang.org/grpc/balancer_v1_wrapper.go
index 9d06160..e0ce32c 100644
--- a/vendor/google.golang.org/grpc/balancer_v1_wrapper.go
+++ b/vendor/google.golang.org/grpc/balancer_v1_wrapper.go
@@ -19,6 +19,7 @@
package grpc
import (
+ "strings"
"sync"
"golang.org/x/net/context"
@@ -27,6 +28,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/status"
)
type balancerWrapperBuilder struct {
@@ -34,20 +36,27 @@ type balancerWrapperBuilder struct {
}
func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
- bwb.b.Start(cc.Target(), BalancerConfig{
+ targetAddr := cc.Target()
+ targetSplitted := strings.Split(targetAddr, ":///")
+ if len(targetSplitted) >= 2 {
+ targetAddr = targetSplitted[1]
+ }
+
+ bwb.b.Start(targetAddr, BalancerConfig{
DialCreds: opts.DialCreds,
Dialer: opts.Dialer,
})
_, pickfirst := bwb.b.(*pickFirst)
bw := &balancerWrapper{
- balancer: bwb.b,
- pickfirst: pickfirst,
- cc: cc,
- startCh: make(chan struct{}),
- conns: make(map[resolver.Address]balancer.SubConn),
- connSt: make(map[balancer.SubConn]*scState),
- csEvltr: &connectivityStateEvaluator{},
- state: connectivity.Idle,
+ balancer: bwb.b,
+ pickfirst: pickfirst,
+ cc: cc,
+ targetAddr: targetAddr,
+ startCh: make(chan struct{}),
+ conns: make(map[resolver.Address]balancer.SubConn),
+ connSt: make(map[balancer.SubConn]*scState),
+ csEvltr: &balancer.ConnectivityStateEvaluator{},
+ state: connectivity.Idle,
}
cc.UpdateBalancerState(connectivity.Idle, bw)
go bw.lbWatcher()
@@ -68,11 +77,8 @@ type balancerWrapper struct {
balancer Balancer // The v1 balancer.
pickfirst bool
- cc balancer.ClientConn
-
- // To aggregate the connectivity state.
- csEvltr *connectivityStateEvaluator
- state connectivity.State
+ cc balancer.ClientConn
+ targetAddr string // Target without the scheme.
mu sync.Mutex
conns map[resolver.Address]balancer.SubConn
@@ -82,18 +88,21 @@ type balancerWrapper struct {
// - NewSubConn is created, cc wants to notify balancer of state changes;
// - Build hasn't return, cc doesn't have access to balancer.
startCh chan struct{}
+
+ // To aggregate the connectivity state.
+ csEvltr *balancer.ConnectivityStateEvaluator
+ state connectivity.State
}
// lbWatcher watches the Notify channel of the balancer and manages
// connections accordingly.
func (bw *balancerWrapper) lbWatcher() {
<-bw.startCh
- grpclog.Infof("balancerWrapper: is pickfirst: %v\n", bw.pickfirst)
notifyCh := bw.balancer.Notify()
if notifyCh == nil {
// There's no resolver in the balancer. Connect directly.
a := resolver.Address{
- Addr: bw.cc.Target(),
+ Addr: bw.targetAddr,
Type: resolver.Backend,
}
sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
@@ -103,7 +112,7 @@ func (bw *balancerWrapper) lbWatcher() {
bw.mu.Lock()
bw.conns[a] = sc
bw.connSt[sc] = &scState{
- addr: Address{Addr: bw.cc.Target()},
+ addr: Address{Addr: bw.targetAddr},
s: connectivity.Idle,
}
bw.mu.Unlock()
@@ -165,10 +174,10 @@ func (bw *balancerWrapper) lbWatcher() {
sc.Connect()
}
} else {
- oldSC.UpdateAddresses(newAddrs)
bw.mu.Lock()
bw.connSt[oldSC].addr = addrs[0]
bw.mu.Unlock()
+ oldSC.UpdateAddresses(newAddrs)
}
} else {
var (
@@ -221,7 +230,6 @@ func (bw *balancerWrapper) lbWatcher() {
}
func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
- grpclog.Infof("balancerWrapper: handle subconn state change: %p, %v", sc, s)
bw.mu.Lock()
defer bw.mu.Unlock()
scSt, ok := bw.connSt[sc]
@@ -240,7 +248,7 @@ func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s conne
scSt.down(errConnClosing)
}
}
- sa := bw.csEvltr.recordTransition(oldS, s)
+ sa := bw.csEvltr.RecordTransition(oldS, s)
if bw.state != sa {
bw.state = sa
}
@@ -249,7 +257,6 @@ func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s conne
// Remove state for this sc.
delete(bw.connSt, sc)
}
- return
}
func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
@@ -262,7 +269,6 @@ func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
}
// There should be a resolver inside the balancer.
// All updates here, if any, are ignored.
- return
}
func (bw *balancerWrapper) Close() {
@@ -274,7 +280,6 @@ func (bw *balancerWrapper) Close() {
close(bw.startCh)
}
bw.balancer.Close()
- return
}
// The picker is the balancerWrapper itself.
@@ -310,58 +315,14 @@ func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions)
Metadata: a.Metadata,
}]
if !ok && failfast {
- return nil, nil, Errorf(codes.Unavailable, "there is no connection available")
+ return nil, nil, status.Errorf(codes.Unavailable, "there is no connection available")
}
if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) {
// If the returned sc is not ready and RPC is failfast,
// return error, and this RPC will fail.
- return nil, nil, Errorf(codes.Unavailable, "there is no connection available")
+ return nil, nil, status.Errorf(codes.Unavailable, "there is no connection available")
}
}
return sc, done, nil
}
-
-// connectivityStateEvaluator gets updated by addrConns when their
-// states transition, based on which it evaluates the state of
-// ClientConn.
-type connectivityStateEvaluator struct {
- mu sync.Mutex
- numReady uint64 // Number of addrConns in ready state.
- numConnecting uint64 // Number of addrConns in connecting state.
- numTransientFailure uint64 // Number of addrConns in transientFailure.
-}
-
-// recordTransition records state change happening in every subConn and based on
-// that it evaluates what aggregated state should be.
-// It can only transition between Ready, Connecting and TransientFailure. Other states,
-// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
-// before any subConn is created ClientConn is in idle state. In the end when ClientConn
-// closes it is in Shutdown state.
-// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
-func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
- cse.mu.Lock()
- defer cse.mu.Unlock()
-
- // Update counters.
- for idx, state := range []connectivity.State{oldState, newState} {
- updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
- switch state {
- case connectivity.Ready:
- cse.numReady += updateVal
- case connectivity.Connecting:
- cse.numConnecting += updateVal
- case connectivity.TransientFailure:
- cse.numTransientFailure += updateVal
- }
- }
-
- // Evaluate.
- if cse.numReady > 0 {
- return connectivity.Ready
- }
- if cse.numConnecting > 0 {
- return connectivity.Connecting
- }
- return connectivity.TransientFailure
-}