aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/picker_wrapper.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/picker_wrapper.go')
-rw-r--r--vendor/google.golang.org/grpc/picker_wrapper.go203
1 files changed, 199 insertions, 4 deletions
diff --git a/vendor/google.golang.org/grpc/picker_wrapper.go b/vendor/google.golang.org/grpc/picker_wrapper.go
index 9085dbc..019e658 100644
--- a/vendor/google.golang.org/grpc/picker_wrapper.go
+++ b/vendor/google.golang.org/grpc/picker_wrapper.go
@@ -19,12 +19,17 @@
package grpc
import (
+ "io"
"sync"
+ "sync/atomic"
"golang.org/x/net/context"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)
@@ -36,13 +41,57 @@ type pickerWrapper struct {
done bool
blockingCh chan struct{}
picker balancer.Picker
+
+ // The latest connection happened.
+ connErrMu sync.Mutex
+ connErr error
+
+ stickinessMDKey atomic.Value
+ stickiness *stickyStore
}
func newPickerWrapper() *pickerWrapper {
- bp := &pickerWrapper{blockingCh: make(chan struct{})}
+ bp := &pickerWrapper{
+ blockingCh: make(chan struct{}),
+ stickiness: newStickyStore(),
+ }
return bp
}
+func (bp *pickerWrapper) updateConnectionError(err error) {
+ bp.connErrMu.Lock()
+ bp.connErr = err
+ bp.connErrMu.Unlock()
+}
+
+func (bp *pickerWrapper) connectionError() error {
+ bp.connErrMu.Lock()
+ err := bp.connErr
+ bp.connErrMu.Unlock()
+ return err
+}
+
+func (bp *pickerWrapper) updateStickinessMDKey(newKey string) {
+ // No need to check ok because mdKey == "" if ok == false.
+ if oldKey, _ := bp.stickinessMDKey.Load().(string); oldKey != newKey {
+ bp.stickinessMDKey.Store(newKey)
+ bp.stickiness.reset(newKey)
+ }
+}
+
+func (bp *pickerWrapper) getStickinessMDKey() string {
+ // No need to check ok because mdKey == "" if ok == false.
+ mdKey, _ := bp.stickinessMDKey.Load().(string)
+ return mdKey
+}
+
+func (bp *pickerWrapper) clearStickinessState() {
+ if oldKey := bp.getStickinessMDKey(); oldKey != "" {
+ // There's no need to reset store if mdKey was "".
+ bp.stickiness.reset(oldKey)
+ }
+}
+
// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
bp.mu.Lock()
@@ -57,6 +106,23 @@ func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
bp.mu.Unlock()
}
+func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
+ acw.mu.Lock()
+ ac := acw.ac
+ acw.mu.Unlock()
+ ac.incrCallsStarted()
+ return func(b balancer.DoneInfo) {
+ if b.Err != nil && b.Err != io.EOF {
+ ac.incrCallsFailed()
+ } else {
+ ac.incrCallsSucceeded()
+ }
+ if done != nil {
+ done(b)
+ }
+ }
+}
+
// pick returns the transport that will be used for the RPC.
// It may block in the following cases:
// - there's no picker
@@ -65,6 +131,27 @@ func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
+
+ mdKey := bp.getStickinessMDKey()
+ stickyKey, isSticky := stickyKeyFromContext(ctx, mdKey)
+
+ // Potential race here: if stickinessMDKey is updated after the above two
+ // lines, and this pick is a sticky pick, the following put could add an
+ // entry to sticky store with an outdated sticky key.
+ //
+ // The solution: keep the current md key in sticky store, and at the
+ // beginning of each get/put, check the mdkey against store.curMDKey.
+ // - Cons: one more string comparing for each get/put.
+ // - Pros: the string matching happens inside get/put, so the overhead for
+ // non-sticky RPCs will be minimal.
+
+ if isSticky {
+ if t, ok := bp.stickiness.get(mdKey, stickyKey); ok {
+ // Done function returned is always nil.
+ return t, nil, nil
+ }
+ }
+
var (
p balancer.Picker
ch chan struct{}
@@ -97,7 +184,7 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.
p = bp.picker
bp.mu.Unlock()
- subConn, put, err := p.Pick(ctx, opts)
+ subConn, done, err := p.Pick(ctx, opts)
if err != nil {
switch err {
@@ -107,7 +194,7 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.
if !failfast {
continue
}
- return nil, nil, status.Errorf(codes.Unavailable, "%v", err)
+ return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError())
default:
// err is some other error.
return nil, nil, toRPCErr(err)
@@ -120,7 +207,13 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.
continue
}
if t, ok := acw.getAddrConn().getReadyTransport(); ok {
- return t, put, nil
+ if isSticky {
+ bp.stickiness.put(mdKey, stickyKey, acw)
+ }
+ if channelz.IsOn() {
+ return t, doneChannelzWrapper(acw, done), nil
+ }
+ return t, done, nil
}
grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
// If ok == false, ac.state is not READY.
@@ -139,3 +232,105 @@ func (bp *pickerWrapper) close() {
bp.done = true
close(bp.blockingCh)
}
+
+const stickinessKeyCountLimit = 1000
+
+type stickyStoreEntry struct {
+ acw *acBalancerWrapper
+ addr resolver.Address
+}
+
+type stickyStore struct {
+ mu sync.Mutex
+ // curMDKey is check before every get/put to avoid races. The operation will
+ // abort immediately when the given mdKey is different from the curMDKey.
+ curMDKey string
+ store *linkedMap
+}
+
+func newStickyStore() *stickyStore {
+ return &stickyStore{
+ store: newLinkedMap(),
+ }
+}
+
+// reset clears the map in stickyStore, and set the currentMDKey to newMDKey.
+func (ss *stickyStore) reset(newMDKey string) {
+ ss.mu.Lock()
+ ss.curMDKey = newMDKey
+ ss.store.clear()
+ ss.mu.Unlock()
+}
+
+// stickyKey is the key to look up in store. mdKey will be checked against
+// curMDKey to avoid races.
+func (ss *stickyStore) put(mdKey, stickyKey string, acw *acBalancerWrapper) {
+ ss.mu.Lock()
+ defer ss.mu.Unlock()
+ if mdKey != ss.curMDKey {
+ return
+ }
+ // TODO(stickiness): limit the total number of entries.
+ ss.store.put(stickyKey, &stickyStoreEntry{
+ acw: acw,
+ addr: acw.getAddrConn().getCurAddr(),
+ })
+ if ss.store.len() > stickinessKeyCountLimit {
+ ss.store.removeOldest()
+ }
+}
+
+// stickyKey is the key to look up in store. mdKey will be checked against
+// curMDKey to avoid races.
+func (ss *stickyStore) get(mdKey, stickyKey string) (transport.ClientTransport, bool) {
+ ss.mu.Lock()
+ defer ss.mu.Unlock()
+ if mdKey != ss.curMDKey {
+ return nil, false
+ }
+ entry, ok := ss.store.get(stickyKey)
+ if !ok {
+ return nil, false
+ }
+ ac := entry.acw.getAddrConn()
+ if ac.getCurAddr() != entry.addr {
+ ss.store.remove(stickyKey)
+ return nil, false
+ }
+ t, ok := ac.getReadyTransport()
+ if !ok {
+ ss.store.remove(stickyKey)
+ return nil, false
+ }
+ return t, true
+}
+
+// Get one value from metadata in ctx with key stickinessMDKey.
+//
+// It returns "", false if stickinessMDKey is an empty string.
+func stickyKeyFromContext(ctx context.Context, stickinessMDKey string) (string, bool) {
+ if stickinessMDKey == "" {
+ return "", false
+ }
+
+ md, added, ok := metadata.FromOutgoingContextRaw(ctx)
+ if !ok {
+ return "", false
+ }
+
+ if vv, ok := md[stickinessMDKey]; ok {
+ if len(vv) > 0 {
+ return vv[0], true
+ }
+ }
+
+ for _, ss := range added {
+ for i := 0; i < len(ss)-1; i += 2 {
+ if ss[i] == stickinessMDKey {
+ return ss[i+1], true
+ }
+ }
+ }
+
+ return "", false
+}