diff options
author | Niall Sheridan <nsheridan@gmail.com> | 2017-04-10 21:18:42 +0100 |
---|---|---|
committer | Niall Sheridan <nsheridan@gmail.com> | 2017-04-10 21:38:33 +0100 |
commit | 30802e07b2d84fbc213b490d3402707dffe60096 (patch) | |
tree | 934aecb8f3582325dfd1aa6652193adac87d00db /vendor/google.golang.org/grpc/clientconn.go | |
parent | da7638dc112c4c106e8929601b642d2ca4596cba (diff) |
update dependencies
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r-- | vendor/google.golang.org/grpc/clientconn.go | 131 |
1 files changed, 77 insertions, 54 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index 459ce0b..0879ef0 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -36,8 +36,8 @@ package grpc import ( "errors" "fmt" + "math" "net" - "strings" "sync" "time" @@ -45,6 +45,7 @@ import ( "golang.org/x/net/trace" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/stats" "google.golang.org/grpc/transport" ) @@ -78,7 +79,6 @@ var ( errConnClosing = errors.New("grpc: the connection is closing") // errConnUnavailable indicates that the connection is unavailable. errConnUnavailable = errors.New("grpc: the connection is unavailable") - errNoAddr = errors.New("grpc: there is no address available to dial") // minimum time to give a connection to complete minConnectTimeout = 20 * time.Second ) @@ -86,23 +86,33 @@ var ( // dialOptions configure a Dial call. dialOptions are set by the DialOption // values passed to Dial. type dialOptions struct { - unaryInt UnaryClientInterceptor - streamInt StreamClientInterceptor - codec Codec - cp Compressor - dc Decompressor - bs backoffStrategy - balancer Balancer - block bool - insecure bool - timeout time.Duration - scChan <-chan ServiceConfig - copts transport.ConnectOptions -} + unaryInt UnaryClientInterceptor + streamInt StreamClientInterceptor + codec Codec + cp Compressor + dc Decompressor + bs backoffStrategy + balancer Balancer + block bool + insecure bool + timeout time.Duration + scChan <-chan ServiceConfig + copts transport.ConnectOptions + maxMsgSize int +} + +const defaultClientMaxMsgSize = math.MaxInt32 // DialOption configures how we set up the connection. type DialOption func(*dialOptions) +// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. +func WithMaxMsgSize(s int) DialOption { + return func(o *dialOptions) { + o.maxMsgSize = s + } +} + // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling. func WithCodec(c Codec) DialOption { return func(o *dialOptions) { @@ -249,6 +259,13 @@ func WithUserAgent(s string) DialOption { } } +// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport. +func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption { + return func(o *dialOptions) { + o.copts.KeepaliveParams = kp + } +} + // WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption { return func(o *dialOptions) { @@ -288,9 +305,18 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * conns: make(map[Address]*addrConn), } cc.ctx, cc.cancel = context.WithCancel(context.Background()) + cc.dopts.maxMsgSize = defaultClientMaxMsgSize for _, opt := range opts { opt(&cc.dopts) } + + grpcUA := "grpc-go/" + Version + if cc.dopts.copts.UserAgent != "" { + cc.dopts.copts.UserAgent += " " + grpcUA + } else { + cc.dopts.copts.UserAgent = grpcUA + } + if cc.dopts.timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) @@ -333,23 +359,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" { cc.authority = cc.dopts.copts.Authority } else { - colonPos := strings.LastIndex(target, ":") - if colonPos == -1 { - colonPos = len(target) - } - cc.authority = target[:colonPos] + cc.authority = target } - var ok bool waitC := make(chan error, 1) go func() { - var addrs []Address + defer close(waitC) if cc.dopts.balancer == nil && cc.sc.LB != nil { cc.dopts.balancer = cc.sc.LB } - if cc.dopts.balancer == nil { - // Connect to target directly if balancer is nil. - addrs = append(addrs, Address{Addr: target}) - } else { + if cc.dopts.balancer != nil { var credsClone credentials.TransportCredentials if creds != nil { credsClone = creds.Clone() @@ -362,24 +380,22 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * return } ch := cc.dopts.balancer.Notify() - if ch == nil { - // There is no name resolver installed. - addrs = append(addrs, Address{Addr: target}) - } else { - addrs, ok = <-ch - if !ok || len(addrs) == 0 { - waitC <- errNoAddr - return + if ch != nil { + if cc.dopts.block { + doneChan := make(chan struct{}) + go cc.lbWatcher(doneChan) + <-doneChan + } else { + go cc.lbWatcher(nil) } - } - } - for _, a := range addrs { - if err := cc.resetAddrConn(a, false, nil); err != nil { - waitC <- err return } } - close(waitC) + // No balancer, or no resolver within the balancer. Connect directly. + if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil { + waitC <- err + return + } }() select { case <-ctx.Done(): @@ -390,15 +406,10 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } } - // If balancer is nil or balancer.Notify() is nil, ok will be false here. - // The lbWatcher goroutine will not be created. - if ok { - go cc.lbWatcher() - } - if cc.dopts.scChan != nil { go cc.scWatcher() } + return cc, nil } @@ -449,7 +460,10 @@ type ClientConn struct { conns map[Address]*addrConn } -func (cc *ClientConn) lbWatcher() { +// lbWatcher watches the Notify channel of the balancer in cc and manages +// connections accordingly. If doneChan is not nil, it is closed after the +// first successfull connection is made. +func (cc *ClientConn) lbWatcher(doneChan chan struct{}) { for addrs := range cc.dopts.balancer.Notify() { var ( add []Address // Addresses need to setup connections. @@ -476,7 +490,15 @@ func (cc *ClientConn) lbWatcher() { } cc.mu.Unlock() for _, a := range add { - cc.resetAddrConn(a, true, nil) + if doneChan != nil { + err := cc.resetAddrConn(a, true, nil) + if err == nil { + close(doneChan) + doneChan = nil + } + } else { + cc.resetAddrConn(a, false, nil) + } } for _, c := range del { c.tearDown(errConnDrain) @@ -505,7 +527,7 @@ func (cc *ClientConn) scWatcher() { // resetAddrConn creates an addrConn for addr and adds it to cc.conns. // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason. // If tearDownErr is nil, errConnDrain will be used instead. -func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error { +func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error { ac := &addrConn{ cc: cc, addr: addr, @@ -555,8 +577,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err stale.tearDown(tearDownErr) } } - // skipWait may overwrite the decision in ac.dopts.block. - if ac.dopts.block && !skipWait { + if block { if err := ac.resetTransport(false); err != nil { if err != errConnClosing { // Tear down ac and delete it from cc.conns. @@ -777,6 +798,8 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { Metadata: ac.addr.Metadata, } newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts) + // Don't call cancel in success path due to a race in Go 1.6: + // https://github.com/golang/go/issues/15078. if err != nil { cancel() @@ -855,9 +878,9 @@ func (ac *addrConn) transportMonitor() { // In both cases, a new ac is created. select { case <-t.Error(): - ac.cc.resetAddrConn(ac.addr, true, errNetworkIO) + ac.cc.resetAddrConn(ac.addr, false, errNetworkIO) default: - ac.cc.resetAddrConn(ac.addr, true, errConnDrain) + ac.cc.resetAddrConn(ac.addr, false, errConnDrain) } return case <-t.Error(): @@ -866,7 +889,7 @@ func (ac *addrConn) transportMonitor() { t.Close() return case <-t.GoAway(): - ac.cc.resetAddrConn(ac.addr, true, errNetworkIO) + ac.cc.resetAddrConn(ac.addr, false, errNetworkIO) return default: } |