aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc')
-rw-r--r--vendor/google.golang.org/grpc/README.md2
-rw-r--r--vendor/google.golang.org/grpc/backoff.go2
-rw-r--r--vendor/google.golang.org/grpc/balancer.go3
-rw-r--r--vendor/google.golang.org/grpc/call.go7
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go8
-rw-r--r--vendor/google.golang.org/grpc/server.go74
-rw-r--r--vendor/google.golang.org/grpc/stream.go20
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_client.go25
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_server.go43
-rw-r--r--vendor/google.golang.org/grpc/transport/transport.go28
10 files changed, 157 insertions, 55 deletions
diff --git a/vendor/google.golang.org/grpc/README.md b/vendor/google.golang.org/grpc/README.md
index 660658b..110a8cf 100644
--- a/vendor/google.golang.org/grpc/README.md
+++ b/vendor/google.golang.org/grpc/README.md
@@ -16,7 +16,7 @@ $ go get google.golang.org/grpc
Prerequisites
-------------
-This requires Go 1.5 or later .
+This requires Go 1.5 or later.
Constraints
-----------
diff --git a/vendor/google.golang.org/grpc/backoff.go b/vendor/google.golang.org/grpc/backoff.go
index 52f4f10..c99024e 100644
--- a/vendor/google.golang.org/grpc/backoff.go
+++ b/vendor/google.golang.org/grpc/backoff.go
@@ -58,7 +58,7 @@ func setDefaults(bc *BackoffConfig) {
}
}
-func (bc BackoffConfig) backoff(retries int) (t time.Duration) {
+func (bc BackoffConfig) backoff(retries int) time.Duration {
if retries == 0 {
return bc.baseDelay
}
diff --git a/vendor/google.golang.org/grpc/balancer.go b/vendor/google.golang.org/grpc/balancer.go
index e217a20..9d943fb 100644
--- a/vendor/google.golang.org/grpc/balancer.go
+++ b/vendor/google.golang.org/grpc/balancer.go
@@ -38,6 +38,7 @@ import (
"sync"
"golang.org/x/net/context"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/naming"
@@ -315,7 +316,7 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
if !opts.BlockingWait {
if len(rr.addrs) == 0 {
rr.mu.Unlock()
- err = fmt.Errorf("there is no address available")
+ err = Errorf(codes.Unavailable, "there is no address available")
return
}
// Returns the next addr on rr.addrs for failfast RPCs.
diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go
index 788b3d9..772c817 100644
--- a/vendor/google.golang.org/grpc/call.go
+++ b/vendor/google.golang.org/grpc/call.go
@@ -49,9 +49,8 @@ import (
// On error, it returns the error and indicates whether the call should be retried.
//
// TODO(zhaoq): Check whether the received message sequence is valid.
-func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) error {
+func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {
// Try to acquire header metadata from the server if there is any.
- var err error
defer func() {
if err != nil {
if _, ok := err.(transport.ConnectionError); !ok {
@@ -61,7 +60,7 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s
}()
c.headerMD, err = stream.Header()
if err != nil {
- return err
+ return
}
p := &parser{r: stream}
for {
@@ -69,7 +68,7 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s
if err == io.EOF {
break
}
- return err
+ return
}
}
c.trailerMD = stream.Trailer()
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 11dce44..6167472 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -684,7 +684,11 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
}
ctx, cancel := context.WithTimeout(ac.ctx, timeout)
connectTime := time.Now()
- newTransport, err := transport.NewClientTransport(ctx, ac.addr.Addr, ac.dopts.copts)
+ sinfo := transport.TargetInfo{
+ Addr: ac.addr.Addr,
+ Metadata: ac.addr.Metadata,
+ }
+ newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
if err != nil {
cancel()
@@ -803,7 +807,7 @@ func (ac *addrConn) transportMonitor() {
}
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
-// iv) transport is in TransientFailure and there's no balancer/failfast is true.
+// iv) transport is in TransientFailure and there is a balancer/failfast is true.
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
for {
ac.mu.Lock()
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index debbd79..e0bb187 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -89,10 +89,12 @@ type service struct {
type Server struct {
opts options
- mu sync.Mutex // guards following
- lis map[net.Listener]bool
- conns map[io.Closer]bool
- drain bool
+ mu sync.Mutex // guards following
+ lis map[net.Listener]bool
+ conns map[io.Closer]bool
+ drain bool
+ ctx context.Context
+ cancel context.CancelFunc
// A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
// and all the transport goes away.
cv *sync.Cond
@@ -203,6 +205,7 @@ func NewServer(opt ...ServerOption) *Server {
m: make(map[string]*service),
}
s.cv = sync.NewCond(&s.mu)
+ s.ctx, s.cancel = context.WithCancel(context.Background())
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
@@ -324,7 +327,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
-// Serve returns when lis.Accept fails. lis will be closed when
+// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
@@ -344,14 +347,38 @@ func (s *Server) Serve(lis net.Listener) error {
}
s.mu.Unlock()
}()
+
+ var tempDelay time.Duration // how long to sleep on accept failure
+
for {
rawConn, err := lis.Accept()
if err != nil {
+ if ne, ok := err.(interface {
+ Temporary() bool
+ }); ok && ne.Temporary() {
+ if tempDelay == 0 {
+ tempDelay = 5 * time.Millisecond
+ } else {
+ tempDelay *= 2
+ }
+ if max := 1 * time.Second; tempDelay > max {
+ tempDelay = max
+ }
+ s.mu.Lock()
+ s.printf("Accept error: %v; retrying in %v", err, tempDelay)
+ s.mu.Unlock()
+ select {
+ case <-time.After(tempDelay):
+ case <-s.ctx.Done():
+ }
+ continue
+ }
s.mu.Lock()
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
return err
}
+ tempDelay = 0
// Start a new goroutine to deal with rawConn
// so we don't stall this Accept loop goroutine.
go s.handleRawConn(rawConn)
@@ -500,7 +527,7 @@ func (s *Server) removeConn(c io.Closer) {
defer s.mu.Unlock()
if s.conns != nil {
delete(s.conns, c)
- s.cv.Signal()
+ s.cv.Broadcast()
}
}
@@ -801,7 +828,7 @@ func (s *Server) Stop() {
st := s.conns
s.conns = nil
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
- s.cv.Signal()
+ s.cv.Broadcast()
s.mu.Unlock()
for lis := range listeners {
@@ -812,6 +839,7 @@ func (s *Server) Stop() {
}
s.mu.Lock()
+ s.cancel()
if s.events != nil {
s.events.Finish()
s.events = nil
@@ -824,16 +852,19 @@ func (s *Server) Stop() {
func (s *Server) GracefulStop() {
s.mu.Lock()
defer s.mu.Unlock()
- if s.drain == true || s.conns == nil {
+ if s.conns == nil {
return
}
- s.drain = true
for lis := range s.lis {
lis.Close()
}
s.lis = nil
- for c := range s.conns {
- c.(transport.ServerTransport).Drain()
+ s.cancel()
+ if !s.drain {
+ for c := range s.conns {
+ c.(transport.ServerTransport).Drain()
+ }
+ s.drain = true
}
for len(s.conns) != 0 {
s.cv.Wait()
@@ -865,9 +896,13 @@ func (s *Server) testingCloseConns() {
s.mu.Unlock()
}
-// SendHeader sends header metadata. It may be called at most once from a unary
-// RPC handler. The ctx is the RPC handler's Context or one derived from it.
-func SendHeader(ctx context.Context, md metadata.MD) error {
+// SetHeader sets the header metadata.
+// When called multiple times, all the provided metadata will be merged.
+// All the metadata will be sent out when one of the following happens:
+// - grpc.SendHeader() is called;
+// - The first response is sent out;
+// - An RPC status is sent out (error or success).
+func SetHeader(ctx context.Context, md metadata.MD) error {
if md.Len() == 0 {
return nil
}
@@ -875,6 +910,16 @@ func SendHeader(ctx context.Context, md metadata.MD) error {
if !ok {
return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
+ return stream.SetHeader(md)
+}
+
+// SendHeader sends header metadata. It may be called at most once.
+// The provided md and headers set by SetHeader() will be sent.
+func SendHeader(ctx context.Context, md metadata.MD) error {
+ stream, ok := transport.StreamFromContext(ctx)
+ if !ok {
+ return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+ }
t := stream.ServerTransport()
if t == nil {
grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
@@ -887,7 +932,6 @@ func SendHeader(ctx context.Context, md metadata.MD) error {
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
// When called more than once, all the provided metadata will be merged.
-// The ctx is the RPC handler's Context or one derived from it.
func SetTrailer(ctx context.Context, md metadata.MD) error {
if md.Len() == 0 {
return nil
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 68d777b..4681054 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -410,9 +410,16 @@ func (cs *clientStream) finish(err error) {
// ServerStream defines the interface a server stream has to satisfy.
type ServerStream interface {
- // SendHeader sends the header metadata. It should not be called
- // after SendProto. It fails if called multiple times or if
- // called after SendProto.
+ // SetHeader sets the header metadata. It may be called multiple times.
+ // When call multiple times, all the provided metadata will be merged.
+ // All the metadata will be sent out when one of the following happens:
+ // - ServerStream.SendHeader() is called;
+ // - The first response is sent out;
+ // - An RPC status is sent out (error or success).
+ SetHeader(metadata.MD) error
+ // SendHeader sends the header metadata.
+ // The provided md and headers set by SetHeader() will be sent.
+ // It fails if called multiple times.
SendHeader(metadata.MD) error
// SetTrailer sets the trailer metadata which will be sent with the RPC status.
// When called more than once, all the provided metadata will be merged.
@@ -441,6 +448,13 @@ func (ss *serverStream) Context() context.Context {
return ss.s.Context()
}
+func (ss *serverStream) SetHeader(md metadata.MD) error {
+ if md.Len() == 0 {
+ return nil
+ }
+ return ss.s.SetHeader(md)
+}
+
func (ss *serverStream) SendHeader(md metadata.MD) error {
return ss.t.WriteHeader(ss.s, md)
}
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go
index 3c18554..2b0f680 100644
--- a/vendor/google.golang.org/grpc/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/transport/http2_client.go
@@ -57,6 +57,7 @@ import (
type http2Client struct {
target string // server name/addr
userAgent string
+ md interface{}
conn net.Conn // underlying communication channel
authInfo credentials.AuthInfo // auth info about the connection
nextID uint32 // the next stream ID to be used
@@ -107,7 +108,7 @@ type http2Client struct {
prevGoAwayID uint32
}
-func dial(fn func(context.Context, string) (net.Conn, error), ctx context.Context, addr string) (net.Conn, error) {
+func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
if fn != nil {
return fn(ctx, addr)
}
@@ -145,9 +146,9 @@ func isTemporary(err error) bool {
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
-func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ ClientTransport, err error) {
+func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (_ ClientTransport, err error) {
scheme := "http"
- conn, err := dial(opts.Dialer, ctx, addr)
+ conn, err := dial(ctx, opts.Dialer, addr.Addr)
if err != nil {
return nil, connectionErrorf(true, err, "transport: %v", err)
}
@@ -160,7 +161,7 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
var authInfo credentials.AuthInfo
if creds := opts.TransportCredentials; creds != nil {
scheme = "https"
- conn, authInfo, err = creds.ClientHandshake(ctx, addr, conn)
+ conn, authInfo, err = creds.ClientHandshake(ctx, addr.Addr, conn)
if err != nil {
// Credentials handshake errors are typically considered permanent
// to avoid retrying on e.g. bad certificates.
@@ -174,8 +175,9 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
}
var buf bytes.Buffer
t := &http2Client{
- target: addr,
+ target: addr.Addr,
userAgent: ua,
+ md: addr.Metadata,
conn: conn,
authInfo: authInfo,
// The client initiated stream id is odd starting from 1.
@@ -400,6 +402,16 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
}
}
+ if md, ok := t.md.(*metadata.MD); ok {
+ for k, v := range *md {
+ if isReservedHeader(k) {
+ continue
+ }
+ for _, entry := range v {
+ t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
+ }
+ }
+ }
first := true
// Sends the headers in a single batch even when they span multiple frames.
for !endHeaders {
@@ -790,6 +802,9 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
}
func (t *http2Client) handlePing(f *http2.PingFrame) {
+ if f.IsAck() { // Do nothing.
+ return
+ }
pingAck := &ping{ack: true}
copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck)
diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go
index f753c4f..a62fb7c 100644
--- a/vendor/google.golang.org/grpc/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/transport/http2_server.go
@@ -405,6 +405,9 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
}
func (t *http2Server) handlePing(f *http2.PingFrame) {
+ if f.IsAck() { // Do nothing.
+ return
+ }
pingAck := &ping{ack: true}
copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck)
@@ -462,6 +465,14 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
return ErrIllegalHeaderWrite
}
s.headerOk = true
+ if md.Len() > 0 {
+ if s.header.Len() > 0 {
+ s.header = metadata.Join(s.header, md)
+ } else {
+ s.header = md
+ }
+ }
+ md = s.header
s.mu.Unlock()
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
@@ -493,7 +504,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
- var headersSent bool
+ var headersSent, hasHeader bool
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
@@ -502,7 +513,16 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
if s.headerOk {
headersSent = true
}
+ if s.header.Len() > 0 {
+ hasHeader = true
+ }
s.mu.Unlock()
+
+ if !headersSent && hasHeader {
+ t.WriteHeader(s, nil)
+ headersSent = true
+ }
+
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
}
@@ -548,29 +568,10 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
}
if !s.headerOk {
writeHeaderFrame = true
- s.headerOk = true
}
s.mu.Unlock()
if writeHeaderFrame {
- if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
- return err
- }
- t.hBuf.Reset()
- t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
- t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
- if s.sendCompress != "" {
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
- }
- p := http2.HeadersFrameParam{
- StreamID: s.id,
- BlockFragment: t.hBuf.Bytes(),
- EndHeaders: true,
- }
- if err := t.framer.writeHeaders(false, p); err != nil {
- t.Close()
- return connectionErrorf(true, err, "transport: %v", err)
- }
- t.writableChan <- 0
+ t.WriteHeader(s, nil)
}
r := bytes.NewBuffer(data)
for {
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go
index 3d6b6a6..413f749 100644
--- a/vendor/google.golang.org/grpc/transport/transport.go
+++ b/vendor/google.golang.org/grpc/transport/transport.go
@@ -286,9 +286,27 @@ func (s *Stream) StatusDesc() string {
return s.statusDesc
}
+// SetHeader sets the header metadata. This can be called multiple times.
+// Server side only.
+func (s *Stream) SetHeader(md metadata.MD) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.headerOk || s.state == streamDone {
+ return ErrIllegalHeaderWrite
+ }
+ if md.Len() == 0 {
+ return nil
+ }
+ s.header = metadata.Join(s.header, md)
+ return nil
+}
+
// SetTrailer sets the trailer metadata which will be sent with the RPC status
// by the server. This can be called multiple times. Server side only.
func (s *Stream) SetTrailer(md metadata.MD) error {
+ if md.Len() == 0 {
+ return nil
+ }
s.mu.Lock()
defer s.mu.Unlock()
s.trailer = metadata.Join(s.trailer, md)
@@ -343,7 +361,7 @@ func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authI
return newHTTP2Server(conn, maxStreams, authInfo)
}
-// ConnectOptions covers all relevant options for dialing a server.
+// ConnectOptions covers all relevant options for communicating with the server.
type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
@@ -355,9 +373,15 @@ type ConnectOptions struct {
TransportCredentials credentials.TransportCredentials
}
+// TargetInfo contains the information of the target such as network address and metadata.
+type TargetInfo struct {
+ Addr string
+ Metadata interface{}
+}
+
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
-func NewClientTransport(ctx context.Context, target string, opts ConnectOptions) (ClientTransport, error) {
+func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions) (ClientTransport, error) {
return newHTTP2Client(ctx, target, opts)
}