diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
| -rw-r--r-- | vendor/google.golang.org/grpc/server.go | 74 | 
1 files changed, 59 insertions, 15 deletions
| diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index debbd79..e0bb187 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -89,10 +89,12 @@ type service struct {  type Server struct {  	opts options -	mu    sync.Mutex // guards following -	lis   map[net.Listener]bool -	conns map[io.Closer]bool -	drain bool +	mu     sync.Mutex // guards following +	lis    map[net.Listener]bool +	conns  map[io.Closer]bool +	drain  bool +	ctx    context.Context +	cancel context.CancelFunc  	// A CondVar to let GracefulStop() blocks until all the pending RPCs are finished  	// and all the transport goes away.  	cv     *sync.Cond @@ -203,6 +205,7 @@ func NewServer(opt ...ServerOption) *Server {  		m:     make(map[string]*service),  	}  	s.cv = sync.NewCond(&s.mu) +	s.ctx, s.cancel = context.WithCancel(context.Background())  	if EnableTracing {  		_, file, line, _ := runtime.Caller(1)  		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) @@ -324,7 +327,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti  // Serve accepts incoming connections on the listener lis, creating a new  // ServerTransport and service goroutine for each. The service goroutines  // read gRPC requests and then call the registered handlers to reply to them. -// Serve returns when lis.Accept fails. lis will be closed when +// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when  // this method returns.  func (s *Server) Serve(lis net.Listener) error {  	s.mu.Lock() @@ -344,14 +347,38 @@ func (s *Server) Serve(lis net.Listener) error {  		}  		s.mu.Unlock()  	}() + +	var tempDelay time.Duration // how long to sleep on accept failure +  	for {  		rawConn, err := lis.Accept()  		if err != nil { +			if ne, ok := err.(interface { +				Temporary() bool +			}); ok && ne.Temporary() { +				if tempDelay == 0 { +					tempDelay = 5 * time.Millisecond +				} else { +					tempDelay *= 2 +				} +				if max := 1 * time.Second; tempDelay > max { +					tempDelay = max +				} +				s.mu.Lock() +				s.printf("Accept error: %v; retrying in %v", err, tempDelay) +				s.mu.Unlock() +				select { +				case <-time.After(tempDelay): +				case <-s.ctx.Done(): +				} +				continue +			}  			s.mu.Lock()  			s.printf("done serving; Accept = %v", err)  			s.mu.Unlock()  			return err  		} +		tempDelay = 0  		// Start a new goroutine to deal with rawConn  		// so we don't stall this Accept loop goroutine.  		go s.handleRawConn(rawConn) @@ -500,7 +527,7 @@ func (s *Server) removeConn(c io.Closer) {  	defer s.mu.Unlock()  	if s.conns != nil {  		delete(s.conns, c) -		s.cv.Signal() +		s.cv.Broadcast()  	}  } @@ -801,7 +828,7 @@ func (s *Server) Stop() {  	st := s.conns  	s.conns = nil  	// interrupt GracefulStop if Stop and GracefulStop are called concurrently. -	s.cv.Signal() +	s.cv.Broadcast()  	s.mu.Unlock()  	for lis := range listeners { @@ -812,6 +839,7 @@ func (s *Server) Stop() {  	}  	s.mu.Lock() +	s.cancel()  	if s.events != nil {  		s.events.Finish()  		s.events = nil @@ -824,16 +852,19 @@ func (s *Server) Stop() {  func (s *Server) GracefulStop() {  	s.mu.Lock()  	defer s.mu.Unlock() -	if s.drain == true || s.conns == nil { +	if s.conns == nil {  		return  	} -	s.drain = true  	for lis := range s.lis {  		lis.Close()  	}  	s.lis = nil -	for c := range s.conns { -		c.(transport.ServerTransport).Drain() +	s.cancel() +	if !s.drain { +		for c := range s.conns { +			c.(transport.ServerTransport).Drain() +		} +		s.drain = true  	}  	for len(s.conns) != 0 {  		s.cv.Wait() @@ -865,9 +896,13 @@ func (s *Server) testingCloseConns() {  	s.mu.Unlock()  } -// SendHeader sends header metadata. It may be called at most once from a unary -// RPC handler. The ctx is the RPC handler's Context or one derived from it. -func SendHeader(ctx context.Context, md metadata.MD) error { +// SetHeader sets the header metadata. +// When called multiple times, all the provided metadata will be merged. +// All the metadata will be sent out when one of the following happens: +//  - grpc.SendHeader() is called; +//  - The first response is sent out; +//  - An RPC status is sent out (error or success). +func SetHeader(ctx context.Context, md metadata.MD) error {  	if md.Len() == 0 {  		return nil  	} @@ -875,6 +910,16 @@ func SendHeader(ctx context.Context, md metadata.MD) error {  	if !ok {  		return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)  	} +	return stream.SetHeader(md) +} + +// SendHeader sends header metadata. It may be called at most once. +// The provided md and headers set by SetHeader() will be sent. +func SendHeader(ctx context.Context, md metadata.MD) error { +	stream, ok := transport.StreamFromContext(ctx) +	if !ok { +		return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) +	}  	t := stream.ServerTransport()  	if t == nil {  		grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream) @@ -887,7 +932,6 @@ func SendHeader(ctx context.Context, md metadata.MD) error {  // SetTrailer sets the trailer metadata that will be sent when an RPC returns.  // When called more than once, all the provided metadata will be merged. -// The ctx is the RPC handler's Context or one derived from it.  func SetTrailer(ctx context.Context, md metadata.MD) error {  	if md.Len() == 0 {  		return nil | 
