aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/balancer_v1_wrapper.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/balancer_v1_wrapper.go')
-rw-r--r--vendor/google.golang.org/grpc/balancer_v1_wrapper.go367
1 files changed, 367 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/balancer_v1_wrapper.go b/vendor/google.golang.org/grpc/balancer_v1_wrapper.go
new file mode 100644
index 0000000..9d06160
--- /dev/null
+++ b/vendor/google.golang.org/grpc/balancer_v1_wrapper.go
@@ -0,0 +1,367 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpc
+
+import (
+ "sync"
+
+ "golang.org/x/net/context"
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/resolver"
+)
+
+type balancerWrapperBuilder struct {
+ b Balancer // The v1 balancer.
+}
+
+func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
+ bwb.b.Start(cc.Target(), BalancerConfig{
+ DialCreds: opts.DialCreds,
+ Dialer: opts.Dialer,
+ })
+ _, pickfirst := bwb.b.(*pickFirst)
+ bw := &balancerWrapper{
+ balancer: bwb.b,
+ pickfirst: pickfirst,
+ cc: cc,
+ startCh: make(chan struct{}),
+ conns: make(map[resolver.Address]balancer.SubConn),
+ connSt: make(map[balancer.SubConn]*scState),
+ csEvltr: &connectivityStateEvaluator{},
+ state: connectivity.Idle,
+ }
+ cc.UpdateBalancerState(connectivity.Idle, bw)
+ go bw.lbWatcher()
+ return bw
+}
+
+func (bwb *balancerWrapperBuilder) Name() string {
+ return "wrapper"
+}
+
+type scState struct {
+ addr Address // The v1 address type.
+ s connectivity.State
+ down func(error)
+}
+
+type balancerWrapper struct {
+ balancer Balancer // The v1 balancer.
+ pickfirst bool
+
+ cc balancer.ClientConn
+
+ // To aggregate the connectivity state.
+ csEvltr *connectivityStateEvaluator
+ state connectivity.State
+
+ mu sync.Mutex
+ conns map[resolver.Address]balancer.SubConn
+ connSt map[balancer.SubConn]*scState
+ // This channel is closed when handling the first resolver result.
+ // lbWatcher blocks until this is closed, to avoid race between
+ // - NewSubConn is created, cc wants to notify balancer of state changes;
+ // - Build hasn't return, cc doesn't have access to balancer.
+ startCh chan struct{}
+}
+
+// lbWatcher watches the Notify channel of the balancer and manages
+// connections accordingly.
+func (bw *balancerWrapper) lbWatcher() {
+ <-bw.startCh
+ grpclog.Infof("balancerWrapper: is pickfirst: %v\n", bw.pickfirst)
+ notifyCh := bw.balancer.Notify()
+ if notifyCh == nil {
+ // There's no resolver in the balancer. Connect directly.
+ a := resolver.Address{
+ Addr: bw.cc.Target(),
+ Type: resolver.Backend,
+ }
+ sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
+ if err != nil {
+ grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
+ } else {
+ bw.mu.Lock()
+ bw.conns[a] = sc
+ bw.connSt[sc] = &scState{
+ addr: Address{Addr: bw.cc.Target()},
+ s: connectivity.Idle,
+ }
+ bw.mu.Unlock()
+ sc.Connect()
+ }
+ return
+ }
+
+ for addrs := range notifyCh {
+ grpclog.Infof("balancerWrapper: got update addr from Notify: %v\n", addrs)
+ if bw.pickfirst {
+ var (
+ oldA resolver.Address
+ oldSC balancer.SubConn
+ )
+ bw.mu.Lock()
+ for oldA, oldSC = range bw.conns {
+ break
+ }
+ bw.mu.Unlock()
+ if len(addrs) <= 0 {
+ if oldSC != nil {
+ // Teardown old sc.
+ bw.mu.Lock()
+ delete(bw.conns, oldA)
+ delete(bw.connSt, oldSC)
+ bw.mu.Unlock()
+ bw.cc.RemoveSubConn(oldSC)
+ }
+ continue
+ }
+
+ var newAddrs []resolver.Address
+ for _, a := range addrs {
+ newAddr := resolver.Address{
+ Addr: a.Addr,
+ Type: resolver.Backend, // All addresses from balancer are all backends.
+ ServerName: "",
+ Metadata: a.Metadata,
+ }
+ newAddrs = append(newAddrs, newAddr)
+ }
+ if oldSC == nil {
+ // Create new sc.
+ sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{})
+ if err != nil {
+ grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err)
+ } else {
+ bw.mu.Lock()
+ // For pickfirst, there should be only one SubConn, so the
+ // address doesn't matter. All states updating (up and down)
+ // and picking should all happen on that only SubConn.
+ bw.conns[resolver.Address{}] = sc
+ bw.connSt[sc] = &scState{
+ addr: addrs[0], // Use the first address.
+ s: connectivity.Idle,
+ }
+ bw.mu.Unlock()
+ sc.Connect()
+ }
+ } else {
+ oldSC.UpdateAddresses(newAddrs)
+ bw.mu.Lock()
+ bw.connSt[oldSC].addr = addrs[0]
+ bw.mu.Unlock()
+ }
+ } else {
+ var (
+ add []resolver.Address // Addresses need to setup connections.
+ del []balancer.SubConn // Connections need to tear down.
+ )
+ resAddrs := make(map[resolver.Address]Address)
+ for _, a := range addrs {
+ resAddrs[resolver.Address{
+ Addr: a.Addr,
+ Type: resolver.Backend, // All addresses from balancer are all backends.
+ ServerName: "",
+ Metadata: a.Metadata,
+ }] = a
+ }
+ bw.mu.Lock()
+ for a := range resAddrs {
+ if _, ok := bw.conns[a]; !ok {
+ add = append(add, a)
+ }
+ }
+ for a, c := range bw.conns {
+ if _, ok := resAddrs[a]; !ok {
+ del = append(del, c)
+ delete(bw.conns, a)
+ // Keep the state of this sc in bw.connSt until its state becomes Shutdown.
+ }
+ }
+ bw.mu.Unlock()
+ for _, a := range add {
+ sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
+ if err != nil {
+ grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
+ } else {
+ bw.mu.Lock()
+ bw.conns[a] = sc
+ bw.connSt[sc] = &scState{
+ addr: resAddrs[a],
+ s: connectivity.Idle,
+ }
+ bw.mu.Unlock()
+ sc.Connect()
+ }
+ }
+ for _, c := range del {
+ bw.cc.RemoveSubConn(c)
+ }
+ }
+ }
+}
+
+func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
+ grpclog.Infof("balancerWrapper: handle subconn state change: %p, %v", sc, s)
+ bw.mu.Lock()
+ defer bw.mu.Unlock()
+ scSt, ok := bw.connSt[sc]
+ if !ok {
+ return
+ }
+ if s == connectivity.Idle {
+ sc.Connect()
+ }
+ oldS := scSt.s
+ scSt.s = s
+ if oldS != connectivity.Ready && s == connectivity.Ready {
+ scSt.down = bw.balancer.Up(scSt.addr)
+ } else if oldS == connectivity.Ready && s != connectivity.Ready {
+ if scSt.down != nil {
+ scSt.down(errConnClosing)
+ }
+ }
+ sa := bw.csEvltr.recordTransition(oldS, s)
+ if bw.state != sa {
+ bw.state = sa
+ }
+ bw.cc.UpdateBalancerState(bw.state, bw)
+ if s == connectivity.Shutdown {
+ // Remove state for this sc.
+ delete(bw.connSt, sc)
+ }
+ return
+}
+
+func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
+ bw.mu.Lock()
+ defer bw.mu.Unlock()
+ select {
+ case <-bw.startCh:
+ default:
+ close(bw.startCh)
+ }
+ // There should be a resolver inside the balancer.
+ // All updates here, if any, are ignored.
+ return
+}
+
+func (bw *balancerWrapper) Close() {
+ bw.mu.Lock()
+ defer bw.mu.Unlock()
+ select {
+ case <-bw.startCh:
+ default:
+ close(bw.startCh)
+ }
+ bw.balancer.Close()
+ return
+}
+
+// The picker is the balancerWrapper itself.
+// Pick should never return ErrNoSubConnAvailable.
+// It either blocks or returns error, consistent with v1 balancer Get().
+func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
+ failfast := true // Default failfast is true.
+ if ss, ok := rpcInfoFromContext(ctx); ok {
+ failfast = ss.failfast
+ }
+ a, p, err := bw.balancer.Get(ctx, BalancerGetOptions{BlockingWait: !failfast})
+ if err != nil {
+ return nil, nil, err
+ }
+ var done func(balancer.DoneInfo)
+ if p != nil {
+ done = func(i balancer.DoneInfo) { p() }
+ }
+ var sc balancer.SubConn
+ bw.mu.Lock()
+ defer bw.mu.Unlock()
+ if bw.pickfirst {
+ // Get the first sc in conns.
+ for _, sc = range bw.conns {
+ break
+ }
+ } else {
+ var ok bool
+ sc, ok = bw.conns[resolver.Address{
+ Addr: a.Addr,
+ Type: resolver.Backend,
+ ServerName: "",
+ Metadata: a.Metadata,
+ }]
+ if !ok && failfast {
+ return nil, nil, Errorf(codes.Unavailable, "there is no connection available")
+ }
+ if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) {
+ // If the returned sc is not ready and RPC is failfast,
+ // return error, and this RPC will fail.
+ return nil, nil, Errorf(codes.Unavailable, "there is no connection available")
+ }
+ }
+
+ return sc, done, nil
+}
+
+// connectivityStateEvaluator gets updated by addrConns when their
+// states transition, based on which it evaluates the state of
+// ClientConn.
+type connectivityStateEvaluator struct {
+ mu sync.Mutex
+ numReady uint64 // Number of addrConns in ready state.
+ numConnecting uint64 // Number of addrConns in connecting state.
+ numTransientFailure uint64 // Number of addrConns in transientFailure.
+}
+
+// recordTransition records state change happening in every subConn and based on
+// that it evaluates what aggregated state should be.
+// It can only transition between Ready, Connecting and TransientFailure. Other states,
+// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
+// before any subConn is created ClientConn is in idle state. In the end when ClientConn
+// closes it is in Shutdown state.
+// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
+func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
+ cse.mu.Lock()
+ defer cse.mu.Unlock()
+
+ // Update counters.
+ for idx, state := range []connectivity.State{oldState, newState} {
+ updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
+ switch state {
+ case connectivity.Ready:
+ cse.numReady += updateVal
+ case connectivity.Connecting:
+ cse.numConnecting += updateVal
+ case connectivity.TransientFailure:
+ cse.numTransientFailure += updateVal
+ }
+ }
+
+ // Evaluate.
+ if cse.numReady > 0 {
+ return connectivity.Ready
+ }
+ if cse.numConnecting > 0 {
+ return connectivity.Connecting
+ }
+ return connectivity.TransientFailure
+}