aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/balancer.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/balancer.go')
-rw-r--r--vendor/google.golang.org/grpc/balancer.go175
1 files changed, 110 insertions, 65 deletions
diff --git a/vendor/google.golang.org/grpc/balancer.go b/vendor/google.golang.org/grpc/balancer.go
index 348bf97..419e214 100644
--- a/vendor/google.golang.org/grpc/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer.go
@@ -40,7 +40,6 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/naming"
- "google.golang.org/grpc/transport"
)
// Address represents a server the client connects to.
@@ -94,10 +93,10 @@ type Balancer interface {
// instead of blocking.
//
// The function returns put which is called once the rpc has completed or failed.
- // put can collect and report RPC stats to a remote load balancer. gRPC internals
- // will try to call this again if err is non-nil (unless err is ErrClientConnClosing).
+ // put can collect and report RPC stats to a remote load balancer.
//
- // TODO: Add other non-recoverable errors?
+ // This function should only return the errors Balancer cannot recover by itself.
+ // gRPC internals will fail the RPC if an error is returned.
Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)
// Notify returns a channel that is used by gRPC internals to watch the addresses
// gRPC needs to connect. The addresses might be from a name resolver or remote
@@ -139,35 +138,40 @@ func RoundRobin(r naming.Resolver) Balancer {
return &roundRobin{r: r}
}
+type addrInfo struct {
+ addr Address
+ connected bool
+}
+
type roundRobin struct {
- r naming.Resolver
- w naming.Watcher
- open []Address // all the addresses the client should potentially connect
- mu sync.Mutex
- addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to.
- connected []Address // all the connected addresses
- next int // index of the next address to return for Get()
- waitCh chan struct{} // the channel to block when there is no connected address available
- done bool // The Balancer is closed.
+ r naming.Resolver
+ w naming.Watcher
+ addrs []*addrInfo // all the addresses the client should potentially connect
+ mu sync.Mutex
+ addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to.
+ next int // index of the next address to return for Get()
+ waitCh chan struct{} // the channel to block when there is no connected address available
+ done bool // The Balancer is closed.
}
func (rr *roundRobin) watchAddrUpdates() error {
updates, err := rr.w.Next()
if err != nil {
- grpclog.Println("grpc: the naming watcher stops working due to %v.", err)
+ grpclog.Printf("grpc: the naming watcher stops working due to %v.\n", err)
return err
}
rr.mu.Lock()
defer rr.mu.Unlock()
for _, update := range updates {
addr := Address{
- Addr: update.Addr,
+ Addr: update.Addr,
+ Metadata: update.Metadata,
}
switch update.Op {
case naming.Add:
var exist bool
- for _, v := range rr.open {
- if addr == v {
+ for _, v := range rr.addrs {
+ if addr == v.addr {
exist = true
grpclog.Println("grpc: The name resolver wanted to add an existing address: ", addr)
break
@@ -176,12 +180,12 @@ func (rr *roundRobin) watchAddrUpdates() error {
if exist {
continue
}
- rr.open = append(rr.open, addr)
+ rr.addrs = append(rr.addrs, &addrInfo{addr: addr})
case naming.Delete:
- for i, v := range rr.open {
- if v == addr {
- copy(rr.open[i:], rr.open[i+1:])
- rr.open = rr.open[:len(rr.open)-1]
+ for i, v := range rr.addrs {
+ if addr == v.addr {
+ copy(rr.addrs[i:], rr.addrs[i+1:])
+ rr.addrs = rr.addrs[:len(rr.addrs)-1]
break
}
}
@@ -189,9 +193,11 @@ func (rr *roundRobin) watchAddrUpdates() error {
grpclog.Println("Unknown update.Op ", update.Op)
}
}
- // Make a copy of rr.open and write it onto rr.addrCh so that gRPC internals gets notified.
- open := make([]Address, len(rr.open), len(rr.open))
- copy(open, rr.open)
+ // Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified.
+ open := make([]Address, len(rr.addrs))
+ for i, v := range rr.addrs {
+ open[i] = v.addr
+ }
if rr.done {
return ErrClientConnClosing
}
@@ -202,7 +208,9 @@ func (rr *roundRobin) watchAddrUpdates() error {
func (rr *roundRobin) Start(target string) error {
if rr.r == nil {
// If there is no name resolver installed, it is not needed to
- // do name resolution. In this case, rr.addrCh stays nil.
+ // do name resolution. In this case, target is added into rr.addrs
+ // as the only address available and rr.addrCh stays nil.
+ rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})
return nil
}
w, err := rr.r.Resolve(target)
@@ -221,38 +229,41 @@ func (rr *roundRobin) Start(target string) error {
return nil
}
-// Up appends addr to the end of rr.connected and sends notification if there
-// are pending Get() calls.
+// Up sets the connected state of addr and sends notification if there are pending
+// Get() calls.
func (rr *roundRobin) Up(addr Address) func(error) {
rr.mu.Lock()
defer rr.mu.Unlock()
- for _, a := range rr.connected {
- if a == addr {
- return nil
+ var cnt int
+ for _, a := range rr.addrs {
+ if a.addr == addr {
+ if a.connected {
+ return nil
+ }
+ a.connected = true
}
- }
- rr.connected = append(rr.connected, addr)
- if len(rr.connected) == 1 {
- // addr is only one available. Notify the Get() callers who are blocking.
- if rr.waitCh != nil {
- close(rr.waitCh)
- rr.waitCh = nil
+ if a.connected {
+ cnt++
}
}
+ // addr is only one which is connected. Notify the Get() callers who are blocking.
+ if cnt == 1 && rr.waitCh != nil {
+ close(rr.waitCh)
+ rr.waitCh = nil
+ }
return func(err error) {
rr.down(addr, err)
}
}
-// down removes addr from rr.connected and moves the remaining addrs forward.
+// down unsets the connected state of addr.
func (rr *roundRobin) down(addr Address, err error) {
rr.mu.Lock()
defer rr.mu.Unlock()
- for i, a := range rr.connected {
- if a == addr {
- copy(rr.connected[i:], rr.connected[i+1:])
- rr.connected = rr.connected[:len(rr.connected)-1]
- return
+ for _, a := range rr.addrs {
+ if addr == a.addr {
+ a.connected = false
+ break
}
}
}
@@ -266,17 +277,40 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
err = ErrClientConnClosing
return
}
- if rr.next >= len(rr.connected) {
- rr.next = 0
+
+ if len(rr.addrs) > 0 {
+ if rr.next >= len(rr.addrs) {
+ rr.next = 0
+ }
+ next := rr.next
+ for {
+ a := rr.addrs[next]
+ next = (next + 1) % len(rr.addrs)
+ if a.connected {
+ addr = a.addr
+ rr.next = next
+ rr.mu.Unlock()
+ return
+ }
+ if next == rr.next {
+ // Has iterated all the possible address but none is connected.
+ break
+ }
+ }
}
- if len(rr.connected) > 0 {
- addr = rr.connected[rr.next]
+ if !opts.BlockingWait {
+ if len(rr.addrs) == 0 {
+ rr.mu.Unlock()
+ err = fmt.Errorf("there is no address available")
+ return
+ }
+ // Returns the next addr on rr.addrs for failfast RPCs.
+ addr = rr.addrs[rr.next].addr
rr.next++
rr.mu.Unlock()
return
}
- // There is no address available. Wait on rr.waitCh.
- // TODO(zhaoq): Handle the case when opts.BlockingWait is false.
+ // Wait on rr.waitCh for non-failfast RPCs.
if rr.waitCh == nil {
ch = make(chan struct{})
rr.waitCh = ch
@@ -287,7 +321,7 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
for {
select {
case <-ctx.Done():
- err = transport.ContextErr(ctx.Err())
+ err = ctx.Err()
return
case <-ch:
rr.mu.Lock()
@@ -296,24 +330,35 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
err = ErrClientConnClosing
return
}
- if len(rr.connected) == 0 {
- // The newly added addr got removed by Down() again.
- if rr.waitCh == nil {
- ch = make(chan struct{})
- rr.waitCh = ch
- } else {
- ch = rr.waitCh
+
+ if len(rr.addrs) > 0 {
+ if rr.next >= len(rr.addrs) {
+ rr.next = 0
+ }
+ next := rr.next
+ for {
+ a := rr.addrs[next]
+ next = (next + 1) % len(rr.addrs)
+ if a.connected {
+ addr = a.addr
+ rr.next = next
+ rr.mu.Unlock()
+ return
+ }
+ if next == rr.next {
+ // Has iterated all the possible address but none is connected.
+ break
+ }
}
- rr.mu.Unlock()
- continue
}
- if rr.next >= len(rr.connected) {
- rr.next = 0
+ // The newly added addr got removed by Down() again.
+ if rr.waitCh == nil {
+ ch = make(chan struct{})
+ rr.waitCh = ch
+ } else {
+ ch = rr.waitCh
}
- addr = rr.connected[rr.next]
- rr.next++
rr.mu.Unlock()
- return
}
}
}