From e0a1ccb64a637673195804513902cba6b1d4e97c Mon Sep 17 00:00:00 2001 From: Niall Sheridan Date: Mon, 31 Oct 2016 16:36:17 +0000 Subject: Update dependencies --- vendor/google.golang.org/grpc/server.go | 74 ++++++++++++++++++++++++++------- 1 file changed, 59 insertions(+), 15 deletions(-) (limited to 'vendor/google.golang.org/grpc/server.go') diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index debbd79..e0bb187 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -89,10 +89,12 @@ type service struct { type Server struct { opts options - mu sync.Mutex // guards following - lis map[net.Listener]bool - conns map[io.Closer]bool - drain bool + mu sync.Mutex // guards following + lis map[net.Listener]bool + conns map[io.Closer]bool + drain bool + ctx context.Context + cancel context.CancelFunc // A CondVar to let GracefulStop() blocks until all the pending RPCs are finished // and all the transport goes away. cv *sync.Cond @@ -203,6 +205,7 @@ func NewServer(opt ...ServerOption) *Server { m: make(map[string]*service), } s.cv = sync.NewCond(&s.mu) + s.ctx, s.cancel = context.WithCancel(context.Background()) if EnableTracing { _, file, line, _ := runtime.Caller(1) s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) @@ -324,7 +327,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti // Serve accepts incoming connections on the listener lis, creating a new // ServerTransport and service goroutine for each. The service goroutines // read gRPC requests and then call the registered handlers to reply to them. -// Serve returns when lis.Accept fails. lis will be closed when +// Serve returns when lis.Accept fails with fatal errors. lis will be closed when // this method returns. func (s *Server) Serve(lis net.Listener) error { s.mu.Lock() @@ -344,14 +347,38 @@ func (s *Server) Serve(lis net.Listener) error { } s.mu.Unlock() }() + + var tempDelay time.Duration // how long to sleep on accept failure + for { rawConn, err := lis.Accept() if err != nil { + if ne, ok := err.(interface { + Temporary() bool + }); ok && ne.Temporary() { + if tempDelay == 0 { + tempDelay = 5 * time.Millisecond + } else { + tempDelay *= 2 + } + if max := 1 * time.Second; tempDelay > max { + tempDelay = max + } + s.mu.Lock() + s.printf("Accept error: %v; retrying in %v", err, tempDelay) + s.mu.Unlock() + select { + case <-time.After(tempDelay): + case <-s.ctx.Done(): + } + continue + } s.mu.Lock() s.printf("done serving; Accept = %v", err) s.mu.Unlock() return err } + tempDelay = 0 // Start a new goroutine to deal with rawConn // so we don't stall this Accept loop goroutine. go s.handleRawConn(rawConn) @@ -500,7 +527,7 @@ func (s *Server) removeConn(c io.Closer) { defer s.mu.Unlock() if s.conns != nil { delete(s.conns, c) - s.cv.Signal() + s.cv.Broadcast() } } @@ -801,7 +828,7 @@ func (s *Server) Stop() { st := s.conns s.conns = nil // interrupt GracefulStop if Stop and GracefulStop are called concurrently. - s.cv.Signal() + s.cv.Broadcast() s.mu.Unlock() for lis := range listeners { @@ -812,6 +839,7 @@ func (s *Server) Stop() { } s.mu.Lock() + s.cancel() if s.events != nil { s.events.Finish() s.events = nil @@ -824,16 +852,19 @@ func (s *Server) Stop() { func (s *Server) GracefulStop() { s.mu.Lock() defer s.mu.Unlock() - if s.drain == true || s.conns == nil { + if s.conns == nil { return } - s.drain = true for lis := range s.lis { lis.Close() } s.lis = nil - for c := range s.conns { - c.(transport.ServerTransport).Drain() + s.cancel() + if !s.drain { + for c := range s.conns { + c.(transport.ServerTransport).Drain() + } + s.drain = true } for len(s.conns) != 0 { s.cv.Wait() @@ -865,12 +896,26 @@ func (s *Server) testingCloseConns() { s.mu.Unlock() } -// SendHeader sends header metadata. It may be called at most once from a unary -// RPC handler. The ctx is the RPC handler's Context or one derived from it. -func SendHeader(ctx context.Context, md metadata.MD) error { +// SetHeader sets the header metadata. +// When called multiple times, all the provided metadata will be merged. +// All the metadata will be sent out when one of the following happens: +// - grpc.SendHeader() is called; +// - The first response is sent out; +// - An RPC status is sent out (error or success). +func SetHeader(ctx context.Context, md metadata.MD) error { if md.Len() == 0 { return nil } + stream, ok := transport.StreamFromContext(ctx) + if !ok { + return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) + } + return stream.SetHeader(md) +} + +// SendHeader sends header metadata. It may be called at most once. +// The provided md and headers set by SetHeader() will be sent. +func SendHeader(ctx context.Context, md metadata.MD) error { stream, ok := transport.StreamFromContext(ctx) if !ok { return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) @@ -887,7 +932,6 @@ func SendHeader(ctx context.Context, md metadata.MD) error { // SetTrailer sets the trailer metadata that will be sent when an RPC returns. // When called more than once, all the provided metadata will be merged. -// The ctx is the RPC handler's Context or one derived from it. func SetTrailer(ctx context.Context, md metadata.MD) error { if md.Len() == 0 { return nil -- cgit v1.2.3