aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/transport/handler_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/transport/handler_server.go')
-rw-r--r--vendor/google.golang.org/grpc/transport/handler_server.go109
1 files changed, 73 insertions, 36 deletions
diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/transport/handler_server.go
index f1f6caf..f71b748 100644
--- a/vendor/google.golang.org/grpc/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/transport/handler_server.go
@@ -40,20 +40,24 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
+ "google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)
// NewServerHandlerTransport returns a ServerTransport handling gRPC
// from inside an http.Handler. It requires that the http Server
// supports HTTP/2.
-func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTransport, error) {
+func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) {
if r.ProtoMajor != 2 {
return nil, errors.New("gRPC requires HTTP/2")
}
if r.Method != "POST" {
return nil, errors.New("invalid gRPC request method")
}
- if !validContentType(r.Header.Get("Content-Type")) {
+ contentType := r.Header.Get("Content-Type")
+ // TODO: do we assume contentType is lowercase? we did before
+ contentSubtype, validContentType := contentSubtype(contentType)
+ if !validContentType {
return nil, errors.New("invalid gRPC request content-type")
}
if _, ok := w.(http.Flusher); !ok {
@@ -64,10 +68,13 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTr
}
st := &serverHandlerTransport{
- rw: w,
- req: r,
- closedCh: make(chan struct{}),
- writes: make(chan func()),
+ rw: w,
+ req: r,
+ closedCh: make(chan struct{}),
+ writes: make(chan func()),
+ contentType: contentType,
+ contentSubtype: contentSubtype,
+ stats: stats,
}
if v := r.Header.Get("grpc-timeout"); v != "" {
@@ -79,19 +86,19 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTr
st.timeout = to
}
- var metakv []string
+ metakv := []string{"content-type", contentType}
if r.Host != "" {
metakv = append(metakv, ":authority", r.Host)
}
for k, vv := range r.Header {
k = strings.ToLower(k)
- if isReservedHeader(k) && !isWhitelistedPseudoHeader(k) {
+ if isReservedHeader(k) && !isWhitelistedHeader(k) {
continue
}
for _, v := range vv {
v, err := decodeMetadataHeader(k, v)
if err != nil {
- return nil, streamErrorf(codes.InvalidArgument, "malformed binary metadata: %v", err)
+ return nil, streamErrorf(codes.Internal, "malformed binary metadata: %v", err)
}
metakv = append(metakv, k, v)
}
@@ -123,10 +130,17 @@ type serverHandlerTransport struct {
// when WriteStatus is called.
writes chan func()
- mu sync.Mutex
- // streamDone indicates whether WriteStatus has been called and writes channel
- // has been closed.
- streamDone bool
+ // block concurrent WriteStatus calls
+ // e.g. grpc/(*serverStream).SendMsg/RecvMsg
+ writeStatusMu sync.Mutex
+
+ // we just mirror the request content-type
+ contentType string
+ // we store both contentType and contentSubtype so we don't keep recreating them
+ // TODO make sure this is consistent across handler_server and http2_server
+ contentSubtype string
+
+ stats stats.Handler
}
func (ht *serverHandlerTransport) Close() error {
@@ -177,13 +191,9 @@ func (ht *serverHandlerTransport) do(fn func()) error {
}
func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
- ht.mu.Lock()
- if ht.streamDone {
- ht.mu.Unlock()
- return nil
- }
- ht.streamDone = true
- ht.mu.Unlock()
+ ht.writeStatusMu.Lock()
+ defer ht.writeStatusMu.Unlock()
+
err := ht.do(func() {
ht.writeCommonHeaders(s)
@@ -222,7 +232,14 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
}
}
})
- close(ht.writes)
+
+ if err == nil { // transport has not been closed
+ if ht.stats != nil {
+ ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
+ }
+ ht.Close()
+ close(ht.writes)
+ }
return err
}
@@ -236,7 +253,7 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
h := ht.rw.Header()
h["Date"] = nil // suppress Date to make tests happy; TODO: restore
- h.Set("Content-Type", "application/grpc")
+ h.Set("Content-Type", ht.contentType)
// Predeclare trailers we'll set later in WriteStatus (after the body).
// This is a SHOULD in the HTTP RFC, and the way you add (known)
@@ -264,7 +281,7 @@ func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts
}
func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
- return ht.do(func() {
+ err := ht.do(func() {
ht.writeCommonHeaders(s)
h := ht.rw.Header()
for k, vv := range md {
@@ -280,17 +297,24 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
ht.rw.WriteHeader(200)
ht.rw.(http.Flusher).Flush()
})
+
+ if err == nil {
+ if ht.stats != nil {
+ ht.stats.HandleRPC(s.Context(), &stats.OutHeader{})
+ }
+ }
+ return err
}
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
// With this transport type there will be exactly 1 stream: this HTTP request.
- var ctx context.Context
+ ctx := contextFromRequest(ht.req)
var cancel context.CancelFunc
if ht.timeoutSet {
- ctx, cancel = context.WithTimeout(context.Background(), ht.timeout)
+ ctx, cancel = context.WithTimeout(ctx, ht.timeout)
} else {
- ctx, cancel = context.WithCancel(context.Background())
+ ctx, cancel = context.WithCancel(ctx)
}
// requestOver is closed when either the request's context is done
@@ -314,13 +338,14 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
req := ht.req
s := &Stream{
- id: 0, // irrelevant
- requestRead: func(int) {},
- cancel: cancel,
- buf: newRecvBuffer(),
- st: ht,
- method: req.URL.Path,
- recvCompress: req.Header.Get("grpc-encoding"),
+ id: 0, // irrelevant
+ requestRead: func(int) {},
+ cancel: cancel,
+ buf: newRecvBuffer(),
+ st: ht,
+ method: req.URL.Path,
+ recvCompress: req.Header.Get("grpc-encoding"),
+ contentSubtype: ht.contentSubtype,
}
pr := &peer.Peer{
Addr: ht.RemoteAddr(),
@@ -329,10 +354,18 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
}
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
- ctx = peer.NewContext(ctx, pr)
- s.ctx = newContextWithStream(ctx, s)
+ s.ctx = peer.NewContext(ctx, pr)
+ if ht.stats != nil {
+ s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
+ inHeader := &stats.InHeader{
+ FullMethod: s.method,
+ RemoteAddr: ht.RemoteAddr(),
+ Compression: s.recvCompress,
+ }
+ ht.stats.HandleRPC(s.ctx, inHeader)
+ }
s.trReader = &transportReader{
- reader: &recvBufferReader{ctx: s.ctx, recv: s.buf},
+ reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf},
windowHandler: func(int) {},
}
@@ -387,6 +420,10 @@ func (ht *serverHandlerTransport) runStream() {
}
}
+func (ht *serverHandlerTransport) IncrMsgSent() {}
+
+func (ht *serverHandlerTransport) IncrMsgRecv() {}
+
func (ht *serverHandlerTransport) Drain() {
panic("Drain() is not implemented")
}