aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/transport
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/transport')
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_client.go25
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_server.go43
-rw-r--r--vendor/google.golang.org/grpc/transport/transport.go28
3 files changed, 68 insertions, 28 deletions
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go
index 3c18554..2b0f680 100644
--- a/vendor/google.golang.org/grpc/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/transport/http2_client.go
@@ -57,6 +57,7 @@ import (
type http2Client struct {
target string // server name/addr
userAgent string
+ md interface{}
conn net.Conn // underlying communication channel
authInfo credentials.AuthInfo // auth info about the connection
nextID uint32 // the next stream ID to be used
@@ -107,7 +108,7 @@ type http2Client struct {
prevGoAwayID uint32
}
-func dial(fn func(context.Context, string) (net.Conn, error), ctx context.Context, addr string) (net.Conn, error) {
+func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
if fn != nil {
return fn(ctx, addr)
}
@@ -145,9 +146,9 @@ func isTemporary(err error) bool {
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
-func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ ClientTransport, err error) {
+func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (_ ClientTransport, err error) {
scheme := "http"
- conn, err := dial(opts.Dialer, ctx, addr)
+ conn, err := dial(ctx, opts.Dialer, addr.Addr)
if err != nil {
return nil, connectionErrorf(true, err, "transport: %v", err)
}
@@ -160,7 +161,7 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
var authInfo credentials.AuthInfo
if creds := opts.TransportCredentials; creds != nil {
scheme = "https"
- conn, authInfo, err = creds.ClientHandshake(ctx, addr, conn)
+ conn, authInfo, err = creds.ClientHandshake(ctx, addr.Addr, conn)
if err != nil {
// Credentials handshake errors are typically considered permanent
// to avoid retrying on e.g. bad certificates.
@@ -174,8 +175,9 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
}
var buf bytes.Buffer
t := &http2Client{
- target: addr,
+ target: addr.Addr,
userAgent: ua,
+ md: addr.Metadata,
conn: conn,
authInfo: authInfo,
// The client initiated stream id is odd starting from 1.
@@ -400,6 +402,16 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
}
}
+ if md, ok := t.md.(*metadata.MD); ok {
+ for k, v := range *md {
+ if isReservedHeader(k) {
+ continue
+ }
+ for _, entry := range v {
+ t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
+ }
+ }
+ }
first := true
// Sends the headers in a single batch even when they span multiple frames.
for !endHeaders {
@@ -790,6 +802,9 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
}
func (t *http2Client) handlePing(f *http2.PingFrame) {
+ if f.IsAck() { // Do nothing.
+ return
+ }
pingAck := &ping{ack: true}
copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck)
diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go
index f753c4f..a62fb7c 100644
--- a/vendor/google.golang.org/grpc/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/transport/http2_server.go
@@ -405,6 +405,9 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
}
func (t *http2Server) handlePing(f *http2.PingFrame) {
+ if f.IsAck() { // Do nothing.
+ return
+ }
pingAck := &ping{ack: true}
copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck)
@@ -462,6 +465,14 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
return ErrIllegalHeaderWrite
}
s.headerOk = true
+ if md.Len() > 0 {
+ if s.header.Len() > 0 {
+ s.header = metadata.Join(s.header, md)
+ } else {
+ s.header = md
+ }
+ }
+ md = s.header
s.mu.Unlock()
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
@@ -493,7 +504,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
- var headersSent bool
+ var headersSent, hasHeader bool
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
@@ -502,7 +513,16 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
if s.headerOk {
headersSent = true
}
+ if s.header.Len() > 0 {
+ hasHeader = true
+ }
s.mu.Unlock()
+
+ if !headersSent && hasHeader {
+ t.WriteHeader(s, nil)
+ headersSent = true
+ }
+
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
}
@@ -548,29 +568,10 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
}
if !s.headerOk {
writeHeaderFrame = true
- s.headerOk = true
}
s.mu.Unlock()
if writeHeaderFrame {
- if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
- return err
- }
- t.hBuf.Reset()
- t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
- t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
- if s.sendCompress != "" {
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
- }
- p := http2.HeadersFrameParam{
- StreamID: s.id,
- BlockFragment: t.hBuf.Bytes(),
- EndHeaders: true,
- }
- if err := t.framer.writeHeaders(false, p); err != nil {
- t.Close()
- return connectionErrorf(true, err, "transport: %v", err)
- }
- t.writableChan <- 0
+ t.WriteHeader(s, nil)
}
r := bytes.NewBuffer(data)
for {
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go
index 3d6b6a6..413f749 100644
--- a/vendor/google.golang.org/grpc/transport/transport.go
+++ b/vendor/google.golang.org/grpc/transport/transport.go
@@ -286,9 +286,27 @@ func (s *Stream) StatusDesc() string {
return s.statusDesc
}
+// SetHeader sets the header metadata. This can be called multiple times.
+// Server side only.
+func (s *Stream) SetHeader(md metadata.MD) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.headerOk || s.state == streamDone {
+ return ErrIllegalHeaderWrite
+ }
+ if md.Len() == 0 {
+ return nil
+ }
+ s.header = metadata.Join(s.header, md)
+ return nil
+}
+
// SetTrailer sets the trailer metadata which will be sent with the RPC status
// by the server. This can be called multiple times. Server side only.
func (s *Stream) SetTrailer(md metadata.MD) error {
+ if md.Len() == 0 {
+ return nil
+ }
s.mu.Lock()
defer s.mu.Unlock()
s.trailer = metadata.Join(s.trailer, md)
@@ -343,7 +361,7 @@ func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authI
return newHTTP2Server(conn, maxStreams, authInfo)
}
-// ConnectOptions covers all relevant options for dialing a server.
+// ConnectOptions covers all relevant options for communicating with the server.
type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
@@ -355,9 +373,15 @@ type ConnectOptions struct {
TransportCredentials credentials.TransportCredentials
}
+// TargetInfo contains the information of the target such as network address and metadata.
+type TargetInfo struct {
+ Addr string
+ Metadata interface{}
+}
+
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
-func NewClientTransport(ctx context.Context, target string, opts ConnectOptions) (ClientTransport, error) {
+func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions) (ClientTransport, error) {
return newHTTP2Client(ctx, target, opts)
}