From c9849d667ab55c23d343332a11afb3eb8ede3f2d Mon Sep 17 00:00:00 2001 From: Niall Sheridan Date: Sun, 17 Jul 2016 17:16:14 +0100 Subject: Update vendor libs --- vendor/google.golang.org/grpc/balancer.go | 175 +++++++++++++++++++----------- 1 file changed, 110 insertions(+), 65 deletions(-) (limited to 'vendor/google.golang.org/grpc/balancer.go') diff --git a/vendor/google.golang.org/grpc/balancer.go b/vendor/google.golang.org/grpc/balancer.go index 348bf97..419e214 100644 --- a/vendor/google.golang.org/grpc/balancer.go +++ b/vendor/google.golang.org/grpc/balancer.go @@ -40,7 +40,6 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/naming" - "google.golang.org/grpc/transport" ) // Address represents a server the client connects to. @@ -94,10 +93,10 @@ type Balancer interface { // instead of blocking. // // The function returns put which is called once the rpc has completed or failed. - // put can collect and report RPC stats to a remote load balancer. gRPC internals - // will try to call this again if err is non-nil (unless err is ErrClientConnClosing). + // put can collect and report RPC stats to a remote load balancer. // - // TODO: Add other non-recoverable errors? + // This function should only return the errors Balancer cannot recover by itself. + // gRPC internals will fail the RPC if an error is returned. Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) // Notify returns a channel that is used by gRPC internals to watch the addresses // gRPC needs to connect. The addresses might be from a name resolver or remote @@ -139,35 +138,40 @@ func RoundRobin(r naming.Resolver) Balancer { return &roundRobin{r: r} } +type addrInfo struct { + addr Address + connected bool +} + type roundRobin struct { - r naming.Resolver - w naming.Watcher - open []Address // all the addresses the client should potentially connect - mu sync.Mutex - addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to. - connected []Address // all the connected addresses - next int // index of the next address to return for Get() - waitCh chan struct{} // the channel to block when there is no connected address available - done bool // The Balancer is closed. + r naming.Resolver + w naming.Watcher + addrs []*addrInfo // all the addresses the client should potentially connect + mu sync.Mutex + addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to. + next int // index of the next address to return for Get() + waitCh chan struct{} // the channel to block when there is no connected address available + done bool // The Balancer is closed. } func (rr *roundRobin) watchAddrUpdates() error { updates, err := rr.w.Next() if err != nil { - grpclog.Println("grpc: the naming watcher stops working due to %v.", err) + grpclog.Printf("grpc: the naming watcher stops working due to %v.\n", err) return err } rr.mu.Lock() defer rr.mu.Unlock() for _, update := range updates { addr := Address{ - Addr: update.Addr, + Addr: update.Addr, + Metadata: update.Metadata, } switch update.Op { case naming.Add: var exist bool - for _, v := range rr.open { - if addr == v { + for _, v := range rr.addrs { + if addr == v.addr { exist = true grpclog.Println("grpc: The name resolver wanted to add an existing address: ", addr) break @@ -176,12 +180,12 @@ func (rr *roundRobin) watchAddrUpdates() error { if exist { continue } - rr.open = append(rr.open, addr) + rr.addrs = append(rr.addrs, &addrInfo{addr: addr}) case naming.Delete: - for i, v := range rr.open { - if v == addr { - copy(rr.open[i:], rr.open[i+1:]) - rr.open = rr.open[:len(rr.open)-1] + for i, v := range rr.addrs { + if addr == v.addr { + copy(rr.addrs[i:], rr.addrs[i+1:]) + rr.addrs = rr.addrs[:len(rr.addrs)-1] break } } @@ -189,9 +193,11 @@ func (rr *roundRobin) watchAddrUpdates() error { grpclog.Println("Unknown update.Op ", update.Op) } } - // Make a copy of rr.open and write it onto rr.addrCh so that gRPC internals gets notified. - open := make([]Address, len(rr.open), len(rr.open)) - copy(open, rr.open) + // Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified. + open := make([]Address, len(rr.addrs)) + for i, v := range rr.addrs { + open[i] = v.addr + } if rr.done { return ErrClientConnClosing } @@ -202,7 +208,9 @@ func (rr *roundRobin) watchAddrUpdates() error { func (rr *roundRobin) Start(target string) error { if rr.r == nil { // If there is no name resolver installed, it is not needed to - // do name resolution. In this case, rr.addrCh stays nil. + // do name resolution. In this case, target is added into rr.addrs + // as the only address available and rr.addrCh stays nil. + rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}}) return nil } w, err := rr.r.Resolve(target) @@ -221,38 +229,41 @@ func (rr *roundRobin) Start(target string) error { return nil } -// Up appends addr to the end of rr.connected and sends notification if there -// are pending Get() calls. +// Up sets the connected state of addr and sends notification if there are pending +// Get() calls. func (rr *roundRobin) Up(addr Address) func(error) { rr.mu.Lock() defer rr.mu.Unlock() - for _, a := range rr.connected { - if a == addr { - return nil + var cnt int + for _, a := range rr.addrs { + if a.addr == addr { + if a.connected { + return nil + } + a.connected = true } - } - rr.connected = append(rr.connected, addr) - if len(rr.connected) == 1 { - // addr is only one available. Notify the Get() callers who are blocking. - if rr.waitCh != nil { - close(rr.waitCh) - rr.waitCh = nil + if a.connected { + cnt++ } } + // addr is only one which is connected. Notify the Get() callers who are blocking. + if cnt == 1 && rr.waitCh != nil { + close(rr.waitCh) + rr.waitCh = nil + } return func(err error) { rr.down(addr, err) } } -// down removes addr from rr.connected and moves the remaining addrs forward. +// down unsets the connected state of addr. func (rr *roundRobin) down(addr Address, err error) { rr.mu.Lock() defer rr.mu.Unlock() - for i, a := range rr.connected { - if a == addr { - copy(rr.connected[i:], rr.connected[i+1:]) - rr.connected = rr.connected[:len(rr.connected)-1] - return + for _, a := range rr.addrs { + if addr == a.addr { + a.connected = false + break } } } @@ -266,17 +277,40 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad err = ErrClientConnClosing return } - if rr.next >= len(rr.connected) { - rr.next = 0 + + if len(rr.addrs) > 0 { + if rr.next >= len(rr.addrs) { + rr.next = 0 + } + next := rr.next + for { + a := rr.addrs[next] + next = (next + 1) % len(rr.addrs) + if a.connected { + addr = a.addr + rr.next = next + rr.mu.Unlock() + return + } + if next == rr.next { + // Has iterated all the possible address but none is connected. + break + } + } } - if len(rr.connected) > 0 { - addr = rr.connected[rr.next] + if !opts.BlockingWait { + if len(rr.addrs) == 0 { + rr.mu.Unlock() + err = fmt.Errorf("there is no address available") + return + } + // Returns the next addr on rr.addrs for failfast RPCs. + addr = rr.addrs[rr.next].addr rr.next++ rr.mu.Unlock() return } - // There is no address available. Wait on rr.waitCh. - // TODO(zhaoq): Handle the case when opts.BlockingWait is false. + // Wait on rr.waitCh for non-failfast RPCs. if rr.waitCh == nil { ch = make(chan struct{}) rr.waitCh = ch @@ -287,7 +321,7 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad for { select { case <-ctx.Done(): - err = transport.ContextErr(ctx.Err()) + err = ctx.Err() return case <-ch: rr.mu.Lock() @@ -296,24 +330,35 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad err = ErrClientConnClosing return } - if len(rr.connected) == 0 { - // The newly added addr got removed by Down() again. - if rr.waitCh == nil { - ch = make(chan struct{}) - rr.waitCh = ch - } else { - ch = rr.waitCh + + if len(rr.addrs) > 0 { + if rr.next >= len(rr.addrs) { + rr.next = 0 + } + next := rr.next + for { + a := rr.addrs[next] + next = (next + 1) % len(rr.addrs) + if a.connected { + addr = a.addr + rr.next = next + rr.mu.Unlock() + return + } + if next == rr.next { + // Has iterated all the possible address but none is connected. + break + } } - rr.mu.Unlock() - continue } - if rr.next >= len(rr.connected) { - rr.next = 0 + // The newly added addr got removed by Down() again. + if rr.waitCh == nil { + ch = make(chan struct{}) + rr.waitCh = ch + } else { + ch = rr.waitCh } - addr = rr.connected[rr.next] - rr.next++ rr.mu.Unlock() - return } } } -- cgit v1.2.3