aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/transport/http2_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/transport/http2_server.go')
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_server.go87
1 files changed, 72 insertions, 15 deletions
diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go
index a62fb7c..316188e 100644
--- a/vendor/google.golang.org/grpc/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/transport/http2_server.go
@@ -50,6 +50,8 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
+ "google.golang.org/grpc/stats"
+ "google.golang.org/grpc/tap"
)
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
@@ -58,9 +60,13 @@ var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHe
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
+ ctx context.Context
conn net.Conn
+ remoteAddr net.Addr
+ localAddr net.Addr
maxStreamID uint32 // max stream ID ever seen
authInfo credentials.AuthInfo // auth info about the connection
+ inTapHandle tap.ServerInHandle
// writableChan synchronizes write access to the transport.
// A writer acquires the write lock by receiving a value on writableChan
// and releases it by sending on writableChan.
@@ -91,12 +97,13 @@ type http2Server struct {
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
-func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) {
+func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
framer := newFramer(conn)
// Send initial settings as connection preface to client.
var settings []http2.Setting
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
+ maxStreams := config.MaxStreams
if maxStreams == 0 {
maxStreams = math.MaxUint32
} else {
@@ -121,12 +128,16 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
}
var buf bytes.Buffer
t := &http2Server{
+ ctx: context.Background(),
conn: conn,
- authInfo: authInfo,
+ remoteAddr: conn.RemoteAddr(),
+ localAddr: conn.LocalAddr(),
+ authInfo: config.AuthInfo,
framer: framer,
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
maxStreams: maxStreams,
+ inTapHandle: config.InTapHandle,
controlBuf: newRecvBuffer(),
fc: &inFlow{limit: initialConnWindowSize},
sendQuotaPool: newQuotaPool(defaultWindowSize),
@@ -136,13 +147,21 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
activeStreams: make(map[uint32]*Stream),
streamSendQuota: defaultWindowSize,
}
+ if stats.On() {
+ t.ctx = stats.TagConn(t.ctx, &stats.ConnTagInfo{
+ RemoteAddr: t.remoteAddr,
+ LocalAddr: t.localAddr,
+ })
+ connBegin := &stats.ConnBegin{}
+ stats.HandleConn(t.ctx, connBegin)
+ }
go t.controller()
t.writableChan <- 0
return t, nil
}
// operateHeader takes action on the decoded headers.
-func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
+func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
buf := newRecvBuffer()
s := &Stream{
id: frame.Header().StreamID,
@@ -168,12 +187,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
s.recvCompress = state.encoding
if state.timeoutSet {
- s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
+ s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
} else {
- s.ctx, s.cancel = context.WithCancel(context.TODO())
+ s.ctx, s.cancel = context.WithCancel(t.ctx)
}
pr := &peer.Peer{
- Addr: t.conn.RemoteAddr(),
+ Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
@@ -195,6 +214,18 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
s.recvCompress = state.encoding
s.method = state.method
+ if t.inTapHandle != nil {
+ var err error
+ info := &tap.Info{
+ FullMethodName: state.method,
+ }
+ s.ctx, err = t.inTapHandle(s.ctx, info)
+ if err != nil {
+ // TODO: Log the real error.
+ t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
+ return
+ }
+ }
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
@@ -218,13 +249,26 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
}
+ s.ctx = traceCtx(s.ctx, s.method)
+ if stats.On() {
+ s.ctx = stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
+ inHeader := &stats.InHeader{
+ FullMethod: s.method,
+ RemoteAddr: t.remoteAddr,
+ LocalAddr: t.localAddr,
+ Compression: s.recvCompress,
+ WireLength: int(frame.Header().Length),
+ }
+ stats.HandleRPC(s.ctx, inHeader)
+ }
handle(s)
return
}
// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
-func (t *http2Server) HandleStreams(handle func(*Stream)) {
+// traceCtx attaches trace to ctx and returns the new context.
+func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
// Check the validity of client preface.
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(t.conn, preface); err != nil {
@@ -279,7 +323,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
- if t.operateHeaders(frame, handle) {
+ if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
break
}
@@ -492,9 +536,16 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
}
}
+ bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, false); err != nil {
return err
}
+ if stats.On() {
+ outHeader := &stats.OutHeader{
+ WireLength: bufLen,
+ }
+ stats.HandleRPC(s.Context(), outHeader)
+ }
t.writableChan <- 0
return nil
}
@@ -547,10 +598,17 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
}
}
+ bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, true); err != nil {
t.Close()
return err
}
+ if stats.On() {
+ outTrailer := &stats.OutTrailer{
+ WireLength: bufLen,
+ }
+ stats.HandleRPC(s.Context(), outTrailer)
+ }
t.closeStream(s)
t.writableChan <- 0
return nil
@@ -579,19 +637,14 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
return nil
}
size := http2MaxFrameLen
- s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
- t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
- if _, ok := err.(StreamError); ok {
- t.sendQuotaPool.cancel()
- }
return err
}
if sq < size {
@@ -659,7 +712,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
t.mu.Lock()
defer t.mu.Unlock()
for _, stream := range t.activeStreams {
- stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
+ stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota))
}
t.streamSendQuota = s.Val
}
@@ -736,6 +789,10 @@ func (t *http2Server) Close() (err error) {
for _, s := range streams {
s.cancel()
}
+ if stats.On() {
+ connEnd := &stats.ConnEnd{}
+ stats.HandleConn(t.ctx, connEnd)
+ }
return
}
@@ -767,7 +824,7 @@ func (t *http2Server) closeStream(s *Stream) {
}
func (t *http2Server) RemoteAddr() net.Addr {
- return t.conn.RemoteAddr()
+ return t.remoteAddr
}
func (t *http2Server) Drain() {