aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/balancer_conn_wrappers.go')
-rw-r--r--vendor/google.golang.org/grpc/balancer_conn_wrappers.go68
1 files changed, 60 insertions, 8 deletions
diff --git a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
index e4a95fd..c23f817 100644
--- a/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
+++ b/vendor/google.golang.org/grpc/balancer_conn_wrappers.go
@@ -19,6 +19,7 @@
package grpc
import (
+ "fmt"
"sync"
"google.golang.org/grpc/balancer"
@@ -73,7 +74,7 @@ func (b *scStateUpdateBuffer) load() {
}
}
-// get returns the channel that receives a recvMsg in the buffer.
+// get returns the channel that the scStateUpdate will be sent to.
//
// Upon receiving, the caller should call load to send another
// scStateChangeTuple onto the channel if there is any.
@@ -96,6 +97,9 @@ type ccBalancerWrapper struct {
stateChangeQueue *scStateUpdateBuffer
resolverUpdateCh chan *resolverUpdate
done chan struct{}
+
+ mu sync.Mutex
+ subConns map[*acBalancerWrapper]struct{}
}
func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {
@@ -104,21 +108,34 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
stateChangeQueue: newSCStateUpdateBuffer(),
resolverUpdateCh: make(chan *resolverUpdate, 1),
done: make(chan struct{}),
+ subConns: make(map[*acBalancerWrapper]struct{}),
}
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
return ccb
}
-// watcher balancer functions sequencially, so the balancer can be implemeneted
+// watcher balancer functions sequentially, so the balancer can be implemented
// lock-free.
func (ccb *ccBalancerWrapper) watcher() {
for {
select {
case t := <-ccb.stateChangeQueue.get():
ccb.stateChangeQueue.load()
+ select {
+ case <-ccb.done:
+ ccb.balancer.Close()
+ return
+ default:
+ }
ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
case t := <-ccb.resolverUpdateCh:
+ select {
+ case <-ccb.done:
+ ccb.balancer.Close()
+ return
+ default:
+ }
ccb.balancer.HandleResolvedAddrs(t.addrs, t.err)
case <-ccb.done:
}
@@ -126,6 +143,13 @@ func (ccb *ccBalancerWrapper) watcher() {
select {
case <-ccb.done:
ccb.balancer.Close()
+ ccb.mu.Lock()
+ scs := ccb.subConns
+ ccb.subConns = nil
+ ccb.mu.Unlock()
+ for acbw := range scs {
+ ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
+ }
return
default:
}
@@ -165,31 +189,54 @@ func (ccb *ccBalancerWrapper) handleResolvedAddrs(addrs []resolver.Address, err
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
- grpclog.Infof("ccBalancerWrapper: new subconn: %v", addrs)
+ if len(addrs) <= 0 {
+ return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")
+ }
+ ccb.mu.Lock()
+ defer ccb.mu.Unlock()
+ if ccb.subConns == nil {
+ return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")
+ }
ac, err := ccb.cc.newAddrConn(addrs)
if err != nil {
return nil, err
}
acbw := &acBalancerWrapper{ac: ac}
+ acbw.ac.mu.Lock()
ac.acbw = acbw
+ acbw.ac.mu.Unlock()
+ ccb.subConns[acbw] = struct{}{}
return acbw, nil
}
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
- grpclog.Infof("ccBalancerWrapper: removing subconn")
acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
}
+ ccb.mu.Lock()
+ defer ccb.mu.Unlock()
+ if ccb.subConns == nil {
+ return
+ }
+ delete(ccb.subConns, acbw)
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}
func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
- grpclog.Infof("ccBalancerWrapper: updating state and picker called by balancer: %v, %p", s, p)
+ ccb.mu.Lock()
+ defer ccb.mu.Unlock()
+ if ccb.subConns == nil {
+ return
+ }
ccb.cc.csMgr.updateState(s)
ccb.cc.blockingpicker.updatePicker(p)
}
+func (ccb *ccBalancerWrapper) ResolveNow(o resolver.ResolveNowOption) {
+ ccb.cc.resolveNow(o)
+}
+
func (ccb *ccBalancerWrapper) Target() string {
return ccb.cc.target
}
@@ -202,9 +249,12 @@ type acBalancerWrapper struct {
}
func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
- grpclog.Infof("acBalancerWrapper: UpdateAddresses called with %v", addrs)
acbw.mu.Lock()
defer acbw.mu.Unlock()
+ if len(addrs) <= 0 {
+ acbw.ac.tearDown(errConnDrain)
+ return
+ }
if !acbw.ac.tryUpdateAddrs(addrs) {
cc := acbw.ac.cc
acbw.ac.mu.Lock()
@@ -228,9 +278,11 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
return
}
acbw.ac = ac
+ ac.mu.Lock()
ac.acbw = acbw
+ ac.mu.Unlock()
if acState != connectivity.Idle {
- ac.connect(false)
+ ac.connect()
}
}
}
@@ -238,7 +290,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
func (acbw *acBalancerWrapper) Connect() {
acbw.mu.Lock()
defer acbw.mu.Unlock()
- acbw.ac.connect(false)
+ acbw.ac.connect()
}
func (acbw *acBalancerWrapper) getAddrConn() *addrConn {