aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/transport/http2_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/transport/http2_server.go')
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_server.go848
1 files changed, 401 insertions, 447 deletions
diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go
index bad29b8..19acedb 100644
--- a/vendor/google.golang.org/grpc/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/transport/http2_server.go
@@ -24,7 +24,6 @@ import (
"fmt"
"io"
"math"
- "math/rand"
"net"
"strconv"
"sync"
@@ -35,8 +34,12 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
+
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -52,25 +55,25 @@ var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHe
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
ctx context.Context
+ ctxDone <-chan struct{} // Cache the context.Done() chan
cancel context.CancelFunc
conn net.Conn
+ loopy *loopyWriter
+ readerDone chan struct{} // sync point to enable testing.
+ writerDone chan struct{} // sync point to enable testing.
remoteAddr net.Addr
localAddr net.Addr
maxStreamID uint32 // max stream ID ever seen
authInfo credentials.AuthInfo // auth info about the connection
inTapHandle tap.ServerInHandle
framer *framer
- hBuf *bytes.Buffer // the buffer for HPACK encoding
- hEnc *hpack.Encoder // HPACK encoder
// The max number of concurrent streams.
maxStreams uint32
// controlBuf delivers all the control related tasks (e.g., window
// updates, reset streams, and various settings) to the controller.
controlBuf *controlBuffer
- fc *inFlow
- // sendQuotaPool provides flow control to outbound message.
- sendQuotaPool *quotaPool
- stats stats.Handler
+ fc *trInFlow
+ stats stats.Handler
// Flag to keep track of reading activity on transport.
// 1 is true and 0 is false.
activity uint32 // Accessed atomically.
@@ -101,13 +104,27 @@ type http2Server struct {
drainChan chan struct{}
state transportState
activeStreams map[uint32]*Stream
- // the per-stream outbound flow control window size set by the peer.
- streamSendQuota uint32
// idle is the time instant when the connection went idle.
// This is either the beginning of the connection or when the number of
// RPCs go down to 0.
// When the connection is busy, this value is set to 0.
idle time.Time
+
+ // Fields below are for channelz metric collection.
+ channelzID int64 // channelz unique identification number
+ czmu sync.RWMutex
+ kpCount int64
+ // The number of streams that have started, including already finished ones.
+ streamsStarted int64
+ // The number of streams that have ended successfully by sending frame with
+ // EoS bit set.
+ streamsSucceeded int64
+ streamsFailed int64
+ lastStreamCreated time.Time
+ msgSent int64
+ msgRecv int64
+ lastMsgSent time.Time
+ lastMsgRecv time.Time
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -152,12 +169,12 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
Val: uint32(iwz)})
}
if err := framer.fr.WriteSettings(isettings...); err != nil {
- return nil, connectionErrorf(true, err, "transport: %v", err)
+ return nil, connectionErrorf(false, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
- return nil, connectionErrorf(true, err, "transport: %v", err)
+ return nil, connectionErrorf(false, err, "transport: %v", err)
}
}
kp := config.KeepaliveParams
@@ -182,32 +199,30 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
- var buf bytes.Buffer
ctx, cancel := context.WithCancel(context.Background())
t := &http2Server{
ctx: ctx,
cancel: cancel,
+ ctxDone: ctx.Done(),
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
authInfo: config.AuthInfo,
framer: framer,
- hBuf: &buf,
- hEnc: hpack.NewEncoder(&buf),
+ readerDone: make(chan struct{}),
+ writerDone: make(chan struct{}),
maxStreams: maxStreams,
inTapHandle: config.InTapHandle,
- controlBuf: newControlBuffer(),
- fc: &inFlow{limit: uint32(icwz)},
- sendQuotaPool: newQuotaPool(defaultWindowSize),
+ fc: &trInFlow{limit: uint32(icwz)},
state: reachable,
activeStreams: make(map[uint32]*Stream),
- streamSendQuota: defaultWindowSize,
stats: config.StatsHandler,
kp: kp,
idle: time.Now(),
kep: kep,
initialWindowSize: iwz,
}
+ t.controlBuf = newControlBuffer(t.ctxDone)
if dynamicWindow {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
@@ -222,10 +237,48 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
connBegin := &stats.ConnBegin{}
t.stats.HandleConn(t.ctx, connBegin)
}
+ if channelz.IsOn() {
+ t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, "")
+ }
t.framer.writer.Flush()
+
+ defer func() {
+ if err != nil {
+ t.Close()
+ }
+ }()
+
+ // Check the validity of client preface.
+ preface := make([]byte, len(clientPreface))
+ if _, err := io.ReadFull(t.conn, preface); err != nil {
+ return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
+ }
+ if !bytes.Equal(preface, clientPreface) {
+ return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
+ }
+
+ frame, err := t.framer.fr.ReadFrame()
+ if err == io.EOF || err == io.ErrUnexpectedEOF {
+ return nil, err
+ }
+ if err != nil {
+ return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
+ }
+ atomic.StoreUint32(&t.activity, 1)
+ sf, ok := frame.(*http2.SettingsFrame)
+ if !ok {
+ return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
+ }
+ t.handleSettings(sf)
+
go func() {
- loopyWriter(t.ctx, t.controlBuf, t.itemHandler)
- t.Close()
+ t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
+ t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
+ if err := t.loopy.run(); err != nil {
+ errorf("transport: loopyWriter.run returning. Err: %v", err)
+ }
+ t.conn.Close()
+ close(t.writerDone)
}()
go t.keepalive()
return t, nil
@@ -234,12 +287,16 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
streamID := frame.Header().StreamID
-
var state decodeState
for _, hf := range frame.Fields {
if err := state.processHeaderField(hf); err != nil {
if se, ok := err.(StreamError); ok {
- t.controlBuf.put(&resetStream{streamID, statusCodeConvTab[se.Code]})
+ t.controlBuf.put(&cleanupStream{
+ streamID: streamID,
+ rst: true,
+ rstCode: statusCodeConvTab[se.Code],
+ onWrite: func() {},
+ })
}
return
}
@@ -247,14 +304,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
buf := newRecvBuffer()
s := &Stream{
- id: streamID,
- st: t,
- buf: buf,
- fc: &inFlow{limit: uint32(t.initialWindowSize)},
- recvCompress: state.encoding,
- method: state.method,
+ id: streamID,
+ st: t,
+ buf: buf,
+ fc: &inFlow{limit: uint32(t.initialWindowSize)},
+ recvCompress: state.encoding,
+ method: state.method,
+ contentSubtype: state.contentSubtype,
}
-
if frame.StreamEnded() {
// s is just created by the caller. No lock needed.
s.state = streamReadDone
@@ -272,10 +329,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
pr.AuthInfo = t.authInfo
}
s.ctx = peer.NewContext(s.ctx, pr)
- // Cache the current stream to the context so that the server application
- // can find out. Required when the server wants to send some metadata
- // back to the client (unary call only).
- s.ctx = newContextWithStream(s.ctx, s)
// Attach the received metadata to the context.
if len(state.mdata) > 0 {
s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
@@ -294,7 +347,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.ctx, err = t.inTapHandle(s.ctx, info)
if err != nil {
warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
- t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
+ t.controlBuf.put(&cleanupStream{
+ streamID: s.id,
+ rst: true,
+ rstCode: http2.ErrCodeRefusedStream,
+ onWrite: func() {},
+ })
return
}
}
@@ -305,7 +363,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
t.mu.Unlock()
- t.controlBuf.put(&resetStream{streamID, http2.ErrCodeRefusedStream})
+ t.controlBuf.put(&cleanupStream{
+ streamID: streamID,
+ rst: true,
+ rstCode: http2.ErrCodeRefusedStream,
+ onWrite: func() {},
+ })
return
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
@@ -315,13 +378,17 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
return true
}
t.maxStreamID = streamID
- s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
- s.localSendQuota = newQuotaPool(defaultLocalSendQuota)
t.activeStreams[streamID] = s
if len(t.activeStreams) == 1 {
t.idle = time.Time{}
}
t.mu.Unlock()
+ if channelz.IsOn() {
+ t.czmu.Lock()
+ t.streamsStarted++
+ t.lastStreamCreated = time.Now()
+ t.czmu.Unlock()
+ }
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
}
@@ -337,15 +404,23 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
t.stats.HandleRPC(s.ctx, inHeader)
}
+ s.ctxDone = s.ctx.Done()
+ s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
reader: &recvBufferReader{
- ctx: s.ctx,
- recv: s.buf,
+ ctx: s.ctx,
+ ctxDone: s.ctxDone,
+ recv: s.buf,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
},
}
+ // Register the stream with loopy.
+ t.controlBuf.put(&registerStream{
+ streamID: s.id,
+ wq: s.wq,
+ })
handle(s)
return
}
@@ -354,53 +429,26 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
// typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
- // Check the validity of client preface.
- preface := make([]byte, len(clientPreface))
- if _, err := io.ReadFull(t.conn, preface); err != nil {
- // Only log if it isn't a simple tcp accept check (ie: tcp balancer doing open/close socket)
- if err != io.EOF {
- errorf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
- }
- t.Close()
- return
- }
- if !bytes.Equal(preface, clientPreface) {
- errorf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
- t.Close()
- return
- }
-
- frame, err := t.framer.fr.ReadFrame()
- if err == io.EOF || err == io.ErrUnexpectedEOF {
- t.Close()
- return
- }
- if err != nil {
- errorf("transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
- t.Close()
- return
- }
- atomic.StoreUint32(&t.activity, 1)
- sf, ok := frame.(*http2.SettingsFrame)
- if !ok {
- errorf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
- t.Close()
- return
- }
- t.handleSettings(sf)
-
+ defer close(t.readerDone)
for {
frame, err := t.framer.fr.ReadFrame()
atomic.StoreUint32(&t.activity, 1)
if err != nil {
if se, ok := err.(http2.StreamError); ok {
+ warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
t.mu.Lock()
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
- t.closeStream(s)
+ t.closeStream(s, true, se.Code, nil, false)
+ } else {
+ t.controlBuf.put(&cleanupStream{
+ streamID: se.StreamID,
+ rst: true,
+ rstCode: se.Code,
+ onWrite: func() {},
+ })
}
- t.controlBuf.put(&resetStream{se.StreamID, se.Code})
continue
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
@@ -454,33 +502,20 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
// of stream if the application is requesting data larger in size than
// the window.
func (t *http2Server) adjustWindow(s *Stream, n uint32) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.state == streamDone {
- return
- }
if w := s.fc.maybeAdjust(n); w > 0 {
- if cw := t.fc.resetPendingUpdate(); cw > 0 {
- t.controlBuf.put(&windowUpdate{0, cw})
- }
- t.controlBuf.put(&windowUpdate{s.id, w})
+ t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
}
+
}
// updateWindow adjusts the inbound quota for the stream and the transport.
// Window updates will deliver to the controller for sending when
// the cumulative quota exceeds the corresponding threshold.
func (t *http2Server) updateWindow(s *Stream, n uint32) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.state == streamDone {
- return
- }
if w := s.fc.onRead(n); w > 0 {
- if cw := t.fc.resetPendingUpdate(); cw > 0 {
- t.controlBuf.put(&windowUpdate{0, cw})
- }
- t.controlBuf.put(&windowUpdate{s.id, w})
+ t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
+ increment: w,
+ })
}
}
@@ -494,13 +529,15 @@ func (t *http2Server) updateFlowControl(n uint32) {
}
t.initialWindowSize = int32(n)
t.mu.Unlock()
- t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n)})
- t.controlBuf.put(&settings{
- ack: false,
+ t.controlBuf.put(&outgoingWindowUpdate{
+ streamID: 0,
+ increment: t.fc.newLimit(n),
+ })
+ t.controlBuf.put(&outgoingSettings{
ss: []http2.Setting{
{
ID: http2.SettingInitialWindowSize,
- Val: uint32(n),
+ Val: n,
},
},
})
@@ -511,7 +548,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
size := f.Header().Length
var sendBDPPing bool
if t.bdpEst != nil {
- sendBDPPing = t.bdpEst.add(uint32(size))
+ sendBDPPing = t.bdpEst.add(size)
}
// Decouple connection's flow control from application's read.
// An update on connection's flow control should not depend on
@@ -521,23 +558,22 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
// Decoupling the connection flow control will prevent other
// active(fast) streams from starving in presence of slow or
// inactive streams.
- //
- // Furthermore, if a bdpPing is being sent out we can piggyback
- // connection's window update for the bytes we just received.
+ if w := t.fc.onData(size); w > 0 {
+ t.controlBuf.put(&outgoingWindowUpdate{
+ streamID: 0,
+ increment: w,
+ })
+ }
if sendBDPPing {
- if size != 0 { // Could be an empty frame.
- t.controlBuf.put(&windowUpdate{0, uint32(size)})
+ // Avoid excessive ping detection (e.g. in an L7 proxy)
+ // by sending a window update prior to the BDP ping.
+ if w := t.fc.reset(); w > 0 {
+ t.controlBuf.put(&outgoingWindowUpdate{
+ streamID: 0,
+ increment: w,
+ })
}
t.controlBuf.put(bdpPing)
- } else {
- if err := t.fc.onData(uint32(size)); err != nil {
- errorf("transport: http2Server %v", err)
- t.Close()
- return
- }
- if w := t.fc.onRead(uint32(size)); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
}
// Select the right stream to dispatch.
s, ok := t.getStream(f)
@@ -545,23 +581,15 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
return
}
if size > 0 {
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- return
- }
- if err := s.fc.onData(uint32(size)); err != nil {
- s.mu.Unlock()
- t.closeStream(s)
- t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
+ if err := s.fc.onData(size); err != nil {
+ t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
- if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
- t.controlBuf.put(&windowUpdate{s.id, w})
+ if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
+ t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
}
}
- s.mu.Unlock()
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
@@ -573,11 +601,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
// Received the end of stream from the client.
- s.mu.Lock()
- if s.state != streamDone {
- s.state = streamReadDone
- }
- s.mu.Unlock()
+ s.compareAndSwapState(streamActive, streamReadDone)
s.write(recvMsg{err: io.EOF})
}
}
@@ -587,7 +611,7 @@ func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
if !ok {
return
}
- t.closeStream(s)
+ t.closeStream(s, false, 0, nil, false)
}
func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
@@ -599,21 +623,9 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
ss = append(ss, s)
return nil
})
- t.controlBuf.put(&settings{ack: true, ss: ss})
-}
-
-func (t *http2Server) applySettings(ss []http2.Setting) {
- for _, s := range ss {
- if s.ID == http2.SettingInitialWindowSize {
- t.mu.Lock()
- for _, stream := range t.activeStreams {
- stream.sendQuotaPool.addAndUpdate(int(s.Val) - int(t.streamSendQuota))
- }
- t.streamSendQuota = s.Val
- t.mu.Unlock()
- }
-
- }
+ t.controlBuf.put(&incomingSettings{
+ ss: ss,
+ })
}
const (
@@ -666,39 +678,37 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
- errorf("transport: Got to too many pings from the client, closing the connection.")
+ errorf("transport: Got too many pings from the client, closing the connection.")
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
}
}
func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
- id := f.Header().StreamID
- incr := f.Increment
- if id == 0 {
- t.sendQuotaPool.add(int(incr))
- return
- }
- if s, ok := t.getStream(f); ok {
- s.sendQuotaPool.add(int(incr))
+ t.controlBuf.put(&incomingWindowUpdate{
+ streamID: f.Header().StreamID,
+ increment: f.Increment,
+ })
+}
+
+func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
+ for k, vv := range md {
+ if isReservedHeader(k) {
+ // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
+ continue
+ }
+ for _, v := range vv {
+ headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
+ }
}
+ return headerFields
}
// WriteHeader sends the header metedata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
- select {
- case <-s.ctx.Done():
- return ContextErr(s.ctx.Err())
- case <-t.ctx.Done():
- return ErrConnClosing
- default:
- }
-
- s.mu.Lock()
- if s.headerOk || s.state == streamDone {
- s.mu.Unlock()
+ if s.updateHeaderSent() || s.getState() == streamDone {
return ErrIllegalHeaderWrite
}
- s.headerOk = true
+ s.hdrMu.Lock()
if md.Len() > 0 {
if s.header.Len() > 0 {
s.header = metadata.Join(s.header, md)
@@ -706,37 +716,35 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
s.header = md
}
}
- md = s.header
- s.mu.Unlock()
- // TODO(mmukhi): Benchmark if the perfomance gets better if count the metadata and other header fields
+ t.writeHeaderLocked(s)
+ s.hdrMu.Unlock()
+ return nil
+}
+
+func (t *http2Server) writeHeaderLocked(s *Stream) {
+ // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
- headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
+ headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
if s.sendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
}
- for k, vv := range md {
- if isReservedHeader(k) {
- // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
- continue
- }
- for _, v := range vv {
- headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
- }
- }
+ headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
t.controlBuf.put(&headerFrame{
streamID: s.id,
hf: headerFields,
endStream: false,
+ onWrite: func() {
+ atomic.StoreUint32(&t.resetPingStrikes, 1)
+ },
})
if t.stats != nil {
- outHeader := &stats.OutHeader{
- //WireLength: // TODO(mmukhi): Revisit this later, if needed.
- }
+ // Note: WireLength is not set in outHeader.
+ // TODO(mmukhi): Revisit this later, if needed.
+ outHeader := &stats.OutHeader{}
t.stats.HandleRPC(s.Context(), outHeader)
}
- return nil
}
// WriteStatus sends stream status to the client and terminates the stream.
@@ -744,37 +752,20 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
- select {
- case <-t.ctx.Done():
- return ErrConnClosing
- default:
- }
-
- var headersSent, hasHeader bool
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
+ if s.getState() == streamDone {
return nil
}
- if s.headerOk {
- headersSent = true
- }
- if s.header.Len() > 0 {
- hasHeader = true
- }
- s.mu.Unlock()
-
- if !headersSent && hasHeader {
- t.WriteHeader(s, nil)
- headersSent = true
- }
-
- // TODO(mmukhi): Benchmark if the perfomance gets better if count the metadata and other header fields
+ s.hdrMu.Lock()
+ // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
- if !headersSent {
- headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
- headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
+ if !s.updateHeaderSent() { // No headers have been sent.
+ if len(s.header) > 0 { // Send a separate header frame.
+ t.writeHeaderLocked(s)
+ } else { // Send a trailer only response.
+ headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
+ headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
+ }
}
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
@@ -783,126 +774,75 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
- panic(err)
+ grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
+ } else {
+ headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
}
-
- headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
}
// Attach the trailer metadata.
- for k, vv := range s.trailer {
- // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
- if isReservedHeader(k) {
- continue
- }
- for _, v := range vv {
- headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
- }
- }
- t.controlBuf.put(&headerFrame{
+ headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
+ trailingHeader := &headerFrame{
streamID: s.id,
hf: headerFields,
endStream: true,
- })
+ onWrite: func() {
+ atomic.StoreUint32(&t.resetPingStrikes, 1)
+ },
+ }
+ s.hdrMu.Unlock()
+ t.closeStream(s, false, 0, trailingHeader, true)
if t.stats != nil {
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
}
- t.closeStream(s)
return nil
}
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
-func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) (err error) {
- select {
- case <-s.ctx.Done():
- return ContextErr(s.ctx.Err())
- case <-t.ctx.Done():
- return ErrConnClosing
- default:
- }
-
- var writeHeaderFrame bool
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- return streamErrorf(codes.Unknown, "the stream has been done")
- }
- if !s.headerOk {
- writeHeaderFrame = true
- }
- s.mu.Unlock()
- if writeHeaderFrame {
- t.WriteHeader(s, nil)
+func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
+ if !s.isHeaderSent() { // Headers haven't been written yet.
+ if err := t.WriteHeader(s, nil); err != nil {
+ // TODO(mmukhi, dfawley): Make sure this is the right code to return.
+ return streamErrorf(codes.Internal, "transport: %v", err)
+ }
+ } else {
+ // Writing headers checks for this condition.
+ if s.getState() == streamDone {
+ // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
+ s.cancel()
+ select {
+ case <-t.ctx.Done():
+ return ErrConnClosing
+ default:
+ }
+ return ContextErr(s.ctx.Err())
+ }
}
- // Add data to header frame so that we can equally distribute data across frames.
+ // Add some data to header frame so that we can equally distribute bytes across frames.
emptyLen := http2MaxFrameLen - len(hdr)
if emptyLen > len(data) {
emptyLen = len(data)
}
hdr = append(hdr, data[:emptyLen]...)
data = data[emptyLen:]
- for _, r := range [][]byte{hdr, data} {
- for len(r) > 0 {
- size := http2MaxFrameLen
- // Wait until the stream has some quota to send the data.
- quotaChan, quotaVer := s.sendQuotaPool.acquireWithVersion()
- sq, err := wait(s.ctx, t.ctx, nil, nil, quotaChan)
- if err != nil {
- return err
- }
- // Wait until the transport has some quota to send the data.
- tq, err := wait(s.ctx, t.ctx, nil, nil, t.sendQuotaPool.acquire())
- if err != nil {
- return err
- }
- if sq < size {
- size = sq
- }
- if tq < size {
- size = tq
- }
- if size > len(r) {
- size = len(r)
- }
- p := r[:size]
- ps := len(p)
- if ps < tq {
- // Overbooked transport quota. Return it back.
- t.sendQuotaPool.add(tq - ps)
- }
- // Acquire local send quota to be able to write to the controlBuf.
- ltq, err := wait(s.ctx, t.ctx, nil, nil, s.localSendQuota.acquire())
- if err != nil {
- if _, ok := err.(ConnectionError); !ok {
- t.sendQuotaPool.add(ps)
- }
- return err
- }
- s.localSendQuota.add(ltq - ps) // It's ok we make this negative.
- // Reset ping strikes when sending data since this might cause
- // the peer to send ping.
+ df := &dataFrame{
+ streamID: s.id,
+ h: hdr,
+ d: data,
+ onEachWrite: func() {
atomic.StoreUint32(&t.resetPingStrikes, 1)
- success := func() {
- t.controlBuf.put(&dataFrame{streamID: s.id, endStream: false, d: p, f: func() {
- s.localSendQuota.add(ps)
- }})
- if ps < sq {
- // Overbooked stream quota. Return it back.
- s.sendQuotaPool.lockedAdd(sq - ps)
- }
- r = r[ps:]
- }
- failure := func() {
- s.sendQuotaPool.lockedAdd(sq)
- }
- if !s.sendQuotaPool.compareAndExecute(quotaVer, success, failure) {
- t.sendQuotaPool.add(ps)
- s.localSendQuota.add(ps)
- }
+ },
+ }
+ if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
+ select {
+ case <-t.ctx.Done():
+ return ErrConnClosing
+ default:
}
+ return ContextErr(s.ctx.Err())
}
- return nil
+ return t.controlBuf.put(df)
}
// keepalive running in a separate goroutine does the following:
@@ -947,7 +887,7 @@ func (t *http2Server) keepalive() {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
t.drain(http2.ErrCodeNo, []byte{})
- // Reseting the timer so that the clean-up doesn't deadlock.
+ // Resetting the timer so that the clean-up doesn't deadlock.
maxIdle.Reset(infinity)
return
}
@@ -959,7 +899,7 @@ func (t *http2Server) keepalive() {
case <-maxAge.C:
// Close the connection after grace period.
t.Close()
- // Reseting the timer so that the clean-up doesn't deadlock.
+ // Resetting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
case <-t.ctx.Done():
}
@@ -972,11 +912,16 @@ func (t *http2Server) keepalive() {
}
if pingSent {
t.Close()
- // Reseting the timer so that the clean-up doesn't deadlock.
+ // Resetting the timer so that the clean-up doesn't deadlock.
keepalive.Reset(infinity)
return
}
pingSent = true
+ if channelz.IsOn() {
+ t.czmu.Lock()
+ t.kpCount++
+ t.czmu.Unlock()
+ }
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
case <-t.ctx.Done():
@@ -985,127 +930,6 @@ func (t *http2Server) keepalive() {
}
}
-var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
-
-// TODO(mmukhi): A lot of this code(and code in other places in the tranpsort layer)
-// is duplicated between the client and the server.
-// The transport layer needs to be refactored to take care of this.
-func (t *http2Server) itemHandler(i item) error {
- switch i := i.(type) {
- case *dataFrame:
- if err := t.framer.fr.WriteData(i.streamID, i.endStream, i.d); err != nil {
- return err
- }
- i.f()
- return nil
- case *headerFrame:
- t.hBuf.Reset()
- for _, f := range i.hf {
- t.hEnc.WriteField(f)
- }
- first := true
- endHeaders := false
- for !endHeaders {
- size := t.hBuf.Len()
- if size > http2MaxFrameLen {
- size = http2MaxFrameLen
- } else {
- endHeaders = true
- }
- var err error
- if first {
- first = false
- err = t.framer.fr.WriteHeaders(http2.HeadersFrameParam{
- StreamID: i.streamID,
- BlockFragment: t.hBuf.Next(size),
- EndStream: i.endStream,
- EndHeaders: endHeaders,
- })
- } else {
- err = t.framer.fr.WriteContinuation(
- i.streamID,
- endHeaders,
- t.hBuf.Next(size),
- )
- }
- if err != nil {
- return err
- }
- }
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- return nil
- case *windowUpdate:
- return t.framer.fr.WriteWindowUpdate(i.streamID, i.increment)
- case *settings:
- if i.ack {
- t.applySettings(i.ss)
- return t.framer.fr.WriteSettingsAck()
- }
- return t.framer.fr.WriteSettings(i.ss...)
- case *resetStream:
- return t.framer.fr.WriteRSTStream(i.streamID, i.code)
- case *goAway:
- t.mu.Lock()
- if t.state == closing {
- t.mu.Unlock()
- // The transport is closing.
- return fmt.Errorf("transport: Connection closing")
- }
- sid := t.maxStreamID
- if !i.headsUp {
- // Stop accepting more streams now.
- t.state = draining
- t.mu.Unlock()
- if err := t.framer.fr.WriteGoAway(sid, i.code, i.debugData); err != nil {
- return err
- }
- if i.closeConn {
- // Abruptly close the connection following the GoAway (via
- // loopywriter). But flush out what's inside the buffer first.
- t.framer.writer.Flush()
- return fmt.Errorf("transport: Connection closing")
- }
- return nil
- }
- t.mu.Unlock()
- // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
- // Follow that with a ping and wait for the ack to come back or a timer
- // to expire. During this time accept new streams since they might have
- // originated before the GoAway reaches the client.
- // After getting the ack or timer expiration send out another GoAway this
- // time with an ID of the max stream server intends to process.
- if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
- return err
- }
- if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
- return err
- }
- go func() {
- timer := time.NewTimer(time.Minute)
- defer timer.Stop()
- select {
- case <-t.drainChan:
- case <-timer.C:
- case <-t.ctx.Done():
- return
- }
- t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData})
- }()
- return nil
- case *flushIO:
- return t.framer.writer.Flush()
- case *ping:
- if !i.ack {
- t.bdpEst.timesnap(i.data)
- }
- return t.framer.fr.WritePing(i.ack, i.data)
- default:
- err := status.Errorf(codes.Internal, "transport: http2Server.controller got unexpected item type %t", i)
- errorf("%v", err)
- return err
- }
-}
-
// Close starts shutting down the http2Server transport.
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
// could cause some resource issue. Revisit this later.
@@ -1119,8 +943,12 @@ func (t *http2Server) Close() error {
streams := t.activeStreams
t.activeStreams = nil
t.mu.Unlock()
+ t.controlBuf.finish()
t.cancel()
err := t.conn.Close()
+ if channelz.IsOn() {
+ channelz.RemoveEntry(t.channelzID)
+ }
// Cancel all active streams.
for _, s := range streams {
s.cancel()
@@ -1134,27 +962,45 @@ func (t *http2Server) Close() error {
// closeStream clears the footprint of a stream when the stream is not needed
// any more.
-func (t *http2Server) closeStream(s *Stream) {
- t.mu.Lock()
- delete(t.activeStreams, s.id)
- if len(t.activeStreams) == 0 {
- t.idle = time.Now()
- }
- if t.state == draining && len(t.activeStreams) == 0 {
- defer t.Close()
+func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
+ if s.swapState(streamDone) == streamDone {
+ // If the stream was already done, return.
+ return
}
- t.mu.Unlock()
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- return
+ cleanup := &cleanupStream{
+ streamID: s.id,
+ rst: rst,
+ rstCode: rstCode,
+ onWrite: func() {
+ t.mu.Lock()
+ if t.activeStreams != nil {
+ delete(t.activeStreams, s.id)
+ if len(t.activeStreams) == 0 {
+ t.idle = time.Now()
+ }
+ }
+ t.mu.Unlock()
+ if channelz.IsOn() {
+ t.czmu.Lock()
+ if eosReceived {
+ t.streamsSucceeded++
+ } else {
+ t.streamsFailed++
+ }
+ t.czmu.Unlock()
+ }
+ },
+ }
+ if hdr != nil {
+ hdr.cleanup = cleanup
+ t.controlBuf.put(hdr)
+ } else {
+ t.controlBuf.put(cleanup)
}
- s.state = streamDone
- s.mu.Unlock()
}
func (t *http2Server) RemoteAddr() net.Addr {
@@ -1175,7 +1021,115 @@ func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
}
-var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
+var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
+
+// Handles outgoing GoAway and returns true if loopy needs to put itself
+// in draining mode.
+func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
+ t.mu.Lock()
+ if t.state == closing { // TODO(mmukhi): This seems unnecessary.
+ t.mu.Unlock()
+ // The transport is closing.
+ return false, ErrConnClosing
+ }
+ sid := t.maxStreamID
+ if !g.headsUp {
+ // Stop accepting more streams now.
+ t.state = draining
+ if len(t.activeStreams) == 0 {
+ g.closeConn = true
+ }
+ t.mu.Unlock()
+ if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
+ return false, err
+ }
+ if g.closeConn {
+ // Abruptly close the connection following the GoAway (via
+ // loopywriter). But flush out what's inside the buffer first.
+ t.framer.writer.Flush()
+ return false, fmt.Errorf("transport: Connection closing")
+ }
+ return true, nil
+ }
+ t.mu.Unlock()
+ // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
+ // Follow that with a ping and wait for the ack to come back or a timer
+ // to expire. During this time accept new streams since they might have
+ // originated before the GoAway reaches the client.
+ // After getting the ack or timer expiration send out another GoAway this
+ // time with an ID of the max stream server intends to process.
+ if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
+ return false, err
+ }
+ if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
+ return false, err
+ }
+ go func() {
+ timer := time.NewTimer(time.Minute)
+ defer timer.Stop()
+ select {
+ case <-t.drainChan:
+ case <-timer.C:
+ case <-t.ctx.Done():
+ return
+ }
+ t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
+ }()
+ return false, nil
+}
+
+func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
+ t.czmu.RLock()
+ s := channelz.SocketInternalMetric{
+ StreamsStarted: t.streamsStarted,
+ StreamsSucceeded: t.streamsSucceeded,
+ StreamsFailed: t.streamsFailed,
+ MessagesSent: t.msgSent,
+ MessagesReceived: t.msgRecv,
+ KeepAlivesSent: t.kpCount,
+ LastRemoteStreamCreatedTimestamp: t.lastStreamCreated,
+ LastMessageSentTimestamp: t.lastMsgSent,
+ LastMessageReceivedTimestamp: t.lastMsgRecv,
+ LocalFlowControlWindow: int64(t.fc.getSize()),
+ //socket options
+ LocalAddr: t.localAddr,
+ RemoteAddr: t.remoteAddr,
+ // Security
+ // RemoteName :
+ }
+ t.czmu.RUnlock()
+ s.RemoteFlowControlWindow = t.getOutFlowWindow()
+ return &s
+}
+
+func (t *http2Server) IncrMsgSent() {
+ t.czmu.Lock()
+ t.msgSent++
+ t.lastMsgSent = time.Now()
+ t.czmu.Unlock()
+}
+
+func (t *http2Server) IncrMsgRecv() {
+ t.czmu.Lock()
+ t.msgRecv++
+ t.lastMsgRecv = time.Now()
+ t.czmu.Unlock()
+}
+
+func (t *http2Server) getOutFlowWindow() int64 {
+ resp := make(chan uint32)
+ timer := time.NewTimer(time.Second)
+ defer timer.Stop()
+ t.controlBuf.put(&outFlowControlSizeRequest{resp})
+ select {
+ case sz := <-resp:
+ return int64(sz)
+ case <-t.ctxDone:
+ return -1
+ case <-timer.C:
+ return -2
+ }
+}
func getJitter(v time.Duration) time.Duration {
if v == infinity {
@@ -1183,6 +1137,6 @@ func getJitter(v time.Duration) time.Duration {
}
// Generate a jitter between +/- 10% of the value.
r := int64(v / 10)
- j := rgen.Int63n(2*r) - r
+ j := grpcrand.Int63n(2*r) - r
return time.Duration(j)
}