aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r--vendor/google.golang.org/grpc/server.go74
1 files changed, 59 insertions, 15 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index debbd79..e0bb187 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -89,10 +89,12 @@ type service struct {
type Server struct {
opts options
- mu sync.Mutex // guards following
- lis map[net.Listener]bool
- conns map[io.Closer]bool
- drain bool
+ mu sync.Mutex // guards following
+ lis map[net.Listener]bool
+ conns map[io.Closer]bool
+ drain bool
+ ctx context.Context
+ cancel context.CancelFunc
// A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
// and all the transport goes away.
cv *sync.Cond
@@ -203,6 +205,7 @@ func NewServer(opt ...ServerOption) *Server {
m: make(map[string]*service),
}
s.cv = sync.NewCond(&s.mu)
+ s.ctx, s.cancel = context.WithCancel(context.Background())
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
@@ -324,7 +327,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
-// Serve returns when lis.Accept fails. lis will be closed when
+// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
@@ -344,14 +347,38 @@ func (s *Server) Serve(lis net.Listener) error {
}
s.mu.Unlock()
}()
+
+ var tempDelay time.Duration // how long to sleep on accept failure
+
for {
rawConn, err := lis.Accept()
if err != nil {
+ if ne, ok := err.(interface {
+ Temporary() bool
+ }); ok && ne.Temporary() {
+ if tempDelay == 0 {
+ tempDelay = 5 * time.Millisecond
+ } else {
+ tempDelay *= 2
+ }
+ if max := 1 * time.Second; tempDelay > max {
+ tempDelay = max
+ }
+ s.mu.Lock()
+ s.printf("Accept error: %v; retrying in %v", err, tempDelay)
+ s.mu.Unlock()
+ select {
+ case <-time.After(tempDelay):
+ case <-s.ctx.Done():
+ }
+ continue
+ }
s.mu.Lock()
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
return err
}
+ tempDelay = 0
// Start a new goroutine to deal with rawConn
// so we don't stall this Accept loop goroutine.
go s.handleRawConn(rawConn)
@@ -500,7 +527,7 @@ func (s *Server) removeConn(c io.Closer) {
defer s.mu.Unlock()
if s.conns != nil {
delete(s.conns, c)
- s.cv.Signal()
+ s.cv.Broadcast()
}
}
@@ -801,7 +828,7 @@ func (s *Server) Stop() {
st := s.conns
s.conns = nil
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
- s.cv.Signal()
+ s.cv.Broadcast()
s.mu.Unlock()
for lis := range listeners {
@@ -812,6 +839,7 @@ func (s *Server) Stop() {
}
s.mu.Lock()
+ s.cancel()
if s.events != nil {
s.events.Finish()
s.events = nil
@@ -824,16 +852,19 @@ func (s *Server) Stop() {
func (s *Server) GracefulStop() {
s.mu.Lock()
defer s.mu.Unlock()
- if s.drain == true || s.conns == nil {
+ if s.conns == nil {
return
}
- s.drain = true
for lis := range s.lis {
lis.Close()
}
s.lis = nil
- for c := range s.conns {
- c.(transport.ServerTransport).Drain()
+ s.cancel()
+ if !s.drain {
+ for c := range s.conns {
+ c.(transport.ServerTransport).Drain()
+ }
+ s.drain = true
}
for len(s.conns) != 0 {
s.cv.Wait()
@@ -865,9 +896,13 @@ func (s *Server) testingCloseConns() {
s.mu.Unlock()
}
-// SendHeader sends header metadata. It may be called at most once from a unary
-// RPC handler. The ctx is the RPC handler's Context or one derived from it.
-func SendHeader(ctx context.Context, md metadata.MD) error {
+// SetHeader sets the header metadata.
+// When called multiple times, all the provided metadata will be merged.
+// All the metadata will be sent out when one of the following happens:
+// - grpc.SendHeader() is called;
+// - The first response is sent out;
+// - An RPC status is sent out (error or success).
+func SetHeader(ctx context.Context, md metadata.MD) error {
if md.Len() == 0 {
return nil
}
@@ -875,6 +910,16 @@ func SendHeader(ctx context.Context, md metadata.MD) error {
if !ok {
return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
+ return stream.SetHeader(md)
+}
+
+// SendHeader sends header metadata. It may be called at most once.
+// The provided md and headers set by SetHeader() will be sent.
+func SendHeader(ctx context.Context, md metadata.MD) error {
+ stream, ok := transport.StreamFromContext(ctx)
+ if !ok {
+ return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+ }
t := stream.ServerTransport()
if t == nil {
grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
@@ -887,7 +932,6 @@ func SendHeader(ctx context.Context, md metadata.MD) error {
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
// When called more than once, all the provided metadata will be merged.
-// The ctx is the RPC handler's Context or one derived from it.
func SetTrailer(ctx context.Context, md metadata.MD) error {
if md.Len() == 0 {
return nil