aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/transport
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/transport')
-rw-r--r--vendor/google.golang.org/grpc/transport/bdp_estimator.go9
-rw-r--r--vendor/google.golang.org/grpc/transport/controlbuf.go796
-rw-r--r--vendor/google.golang.org/grpc/transport/flowcontrol.go (renamed from vendor/google.golang.org/grpc/transport/control.go)253
-rw-r--r--vendor/google.golang.org/grpc/transport/go16.go51
-rw-r--r--vendor/google.golang.org/grpc/transport/go17.go52
-rw-r--r--vendor/google.golang.org/grpc/transport/handler_server.go109
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_client.go1140
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_server.go848
-rw-r--r--vendor/google.golang.org/grpc/transport/http_util.go193
-rw-r--r--vendor/google.golang.org/grpc/transport/transport.go438
10 files changed, 2356 insertions, 1533 deletions
diff --git a/vendor/google.golang.org/grpc/transport/bdp_estimator.go b/vendor/google.golang.org/grpc/transport/bdp_estimator.go
index 8dd2ed4..63cd262 100644
--- a/vendor/google.golang.org/grpc/transport/bdp_estimator.go
+++ b/vendor/google.golang.org/grpc/transport/bdp_estimator.go
@@ -41,12 +41,9 @@ const (
gamma = 2
)
-var (
- // Adding arbitrary data to ping so that its ack can be
- // identified.
- // Easter-egg: what does the ping message say?
- bdpPing = &ping{data: [8]byte{2, 4, 16, 16, 9, 14, 7, 7}}
-)
+// Adding arbitrary data to ping so that its ack can be identified.
+// Easter-egg: what does the ping message say?
+var bdpPing = &ping{data: [8]byte{2, 4, 16, 16, 9, 14, 7, 7}}
type bdpEstimator struct {
// sentAt is the time when the ping was sent.
diff --git a/vendor/google.golang.org/grpc/transport/controlbuf.go b/vendor/google.golang.org/grpc/transport/controlbuf.go
new file mode 100644
index 0000000..5c5891a
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/controlbuf.go
@@ -0,0 +1,796 @@
+/*
+ *
+ * Copyright 2014 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package transport
+
+import (
+ "bytes"
+ "fmt"
+ "runtime"
+ "sync"
+
+ "golang.org/x/net/http2"
+ "golang.org/x/net/http2/hpack"
+)
+
+var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
+ e.SetMaxDynamicTableSizeLimit(v)
+}
+
+type itemNode struct {
+ it interface{}
+ next *itemNode
+}
+
+type itemList struct {
+ head *itemNode
+ tail *itemNode
+}
+
+func (il *itemList) enqueue(i interface{}) {
+ n := &itemNode{it: i}
+ if il.tail == nil {
+ il.head, il.tail = n, n
+ return
+ }
+ il.tail.next = n
+ il.tail = n
+}
+
+// peek returns the first item in the list without removing it from the
+// list.
+func (il *itemList) peek() interface{} {
+ return il.head.it
+}
+
+func (il *itemList) dequeue() interface{} {
+ if il.head == nil {
+ return nil
+ }
+ i := il.head.it
+ il.head = il.head.next
+ if il.head == nil {
+ il.tail = nil
+ }
+ return i
+}
+
+func (il *itemList) dequeueAll() *itemNode {
+ h := il.head
+ il.head, il.tail = nil, nil
+ return h
+}
+
+func (il *itemList) isEmpty() bool {
+ return il.head == nil
+}
+
+// The following defines various control items which could flow through
+// the control buffer of transport. They represent different aspects of
+// control tasks, e.g., flow control, settings, streaming resetting, etc.
+
+// registerStream is used to register an incoming stream with loopy writer.
+type registerStream struct {
+ streamID uint32
+ wq *writeQuota
+}
+
+// headerFrame is also used to register stream on the client-side.
+type headerFrame struct {
+ streamID uint32
+ hf []hpack.HeaderField
+ endStream bool // Valid on server side.
+ initStream func(uint32) (bool, error) // Used only on the client side.
+ onWrite func()
+ wq *writeQuota // write quota for the stream created.
+ cleanup *cleanupStream // Valid on the server side.
+ onOrphaned func(error) // Valid on client-side
+}
+
+type cleanupStream struct {
+ streamID uint32
+ idPtr *uint32
+ rst bool
+ rstCode http2.ErrCode
+ onWrite func()
+}
+
+type dataFrame struct {
+ streamID uint32
+ endStream bool
+ h []byte
+ d []byte
+ // onEachWrite is called every time
+ // a part of d is written out.
+ onEachWrite func()
+}
+
+type incomingWindowUpdate struct {
+ streamID uint32
+ increment uint32
+}
+
+type outgoingWindowUpdate struct {
+ streamID uint32
+ increment uint32
+}
+
+type incomingSettings struct {
+ ss []http2.Setting
+}
+
+type outgoingSettings struct {
+ ss []http2.Setting
+}
+
+type settingsAck struct {
+}
+
+type incomingGoAway struct {
+}
+
+type goAway struct {
+ code http2.ErrCode
+ debugData []byte
+ headsUp bool
+ closeConn bool
+}
+
+type ping struct {
+ ack bool
+ data [8]byte
+}
+
+type outFlowControlSizeRequest struct {
+ resp chan uint32
+}
+
+type outStreamState int
+
+const (
+ active outStreamState = iota
+ empty
+ waitingOnStreamQuota
+)
+
+type outStream struct {
+ id uint32
+ state outStreamState
+ itl *itemList
+ bytesOutStanding int
+ wq *writeQuota
+
+ next *outStream
+ prev *outStream
+}
+
+func (s *outStream) deleteSelf() {
+ if s.prev != nil {
+ s.prev.next = s.next
+ }
+ if s.next != nil {
+ s.next.prev = s.prev
+ }
+ s.next, s.prev = nil, nil
+}
+
+type outStreamList struct {
+ // Following are sentinel objects that mark the
+ // beginning and end of the list. They do not
+ // contain any item lists. All valid objects are
+ // inserted in between them.
+ // This is needed so that an outStream object can
+ // deleteSelf() in O(1) time without knowing which
+ // list it belongs to.
+ head *outStream
+ tail *outStream
+}
+
+func newOutStreamList() *outStreamList {
+ head, tail := new(outStream), new(outStream)
+ head.next = tail
+ tail.prev = head
+ return &outStreamList{
+ head: head,
+ tail: tail,
+ }
+}
+
+func (l *outStreamList) enqueue(s *outStream) {
+ e := l.tail.prev
+ e.next = s
+ s.prev = e
+ s.next = l.tail
+ l.tail.prev = s
+}
+
+// remove from the beginning of the list.
+func (l *outStreamList) dequeue() *outStream {
+ b := l.head.next
+ if b == l.tail {
+ return nil
+ }
+ b.deleteSelf()
+ return b
+}
+
+type controlBuffer struct {
+ ch chan struct{}
+ done <-chan struct{}
+ mu sync.Mutex
+ consumerWaiting bool
+ list *itemList
+ err error
+}
+
+func newControlBuffer(done <-chan struct{}) *controlBuffer {
+ return &controlBuffer{
+ ch: make(chan struct{}, 1),
+ list: &itemList{},
+ done: done,
+ }
+}
+
+func (c *controlBuffer) put(it interface{}) error {
+ _, err := c.executeAndPut(nil, it)
+ return err
+}
+
+func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) {
+ var wakeUp bool
+ c.mu.Lock()
+ if c.err != nil {
+ c.mu.Unlock()
+ return false, c.err
+ }
+ if f != nil {
+ if !f(it) { // f wasn't successful
+ c.mu.Unlock()
+ return false, nil
+ }
+ }
+ if c.consumerWaiting {
+ wakeUp = true
+ c.consumerWaiting = false
+ }
+ c.list.enqueue(it)
+ c.mu.Unlock()
+ if wakeUp {
+ select {
+ case c.ch <- struct{}{}:
+ default:
+ }
+ }
+ return true, nil
+}
+
+func (c *controlBuffer) get(block bool) (interface{}, error) {
+ for {
+ c.mu.Lock()
+ if c.err != nil {
+ c.mu.Unlock()
+ return nil, c.err
+ }
+ if !c.list.isEmpty() {
+ h := c.list.dequeue()
+ c.mu.Unlock()
+ return h, nil
+ }
+ if !block {
+ c.mu.Unlock()
+ return nil, nil
+ }
+ c.consumerWaiting = true
+ c.mu.Unlock()
+ select {
+ case <-c.ch:
+ case <-c.done:
+ c.finish()
+ return nil, ErrConnClosing
+ }
+ }
+}
+
+func (c *controlBuffer) finish() {
+ c.mu.Lock()
+ if c.err != nil {
+ c.mu.Unlock()
+ return
+ }
+ c.err = ErrConnClosing
+ // There may be headers for streams in the control buffer.
+ // These streams need to be cleaned out since the transport
+ // is still not aware of these yet.
+ for head := c.list.dequeueAll(); head != nil; head = head.next {
+ hdr, ok := head.it.(*headerFrame)
+ if !ok {
+ continue
+ }
+ if hdr.onOrphaned != nil { // It will be nil on the server-side.
+ hdr.onOrphaned(ErrConnClosing)
+ }
+ }
+ c.mu.Unlock()
+}
+
+type side int
+
+const (
+ clientSide side = iota
+ serverSide
+)
+
+type loopyWriter struct {
+ side side
+ cbuf *controlBuffer
+ sendQuota uint32
+ oiws uint32 // outbound initial window size.
+ estdStreams map[uint32]*outStream // Established streams.
+ activeStreams *outStreamList // Streams that are sending data.
+ framer *framer
+ hBuf *bytes.Buffer // The buffer for HPACK encoding.
+ hEnc *hpack.Encoder // HPACK encoder.
+ bdpEst *bdpEstimator
+ draining bool
+
+ // Side-specific handlers
+ ssGoAwayHandler func(*goAway) (bool, error)
+}
+
+func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
+ var buf bytes.Buffer
+ l := &loopyWriter{
+ side: s,
+ cbuf: cbuf,
+ sendQuota: defaultWindowSize,
+ oiws: defaultWindowSize,
+ estdStreams: make(map[uint32]*outStream),
+ activeStreams: newOutStreamList(),
+ framer: fr,
+ hBuf: &buf,
+ hEnc: hpack.NewEncoder(&buf),
+ bdpEst: bdpEst,
+ }
+ return l
+}
+
+const minBatchSize = 1000
+
+// run should be run in a separate goroutine.
+func (l *loopyWriter) run() (err error) {
+ defer func() {
+ if err == ErrConnClosing {
+ // Don't log ErrConnClosing as error since it happens
+ // 1. When the connection is closed by some other known issue.
+ // 2. User closed the connection.
+ // 3. A graceful close of connection.
+ infof("transport: loopyWriter.run returning. %v", err)
+ err = nil
+ }
+ }()
+ for {
+ it, err := l.cbuf.get(true)
+ if err != nil {
+ return err
+ }
+ if err = l.handle(it); err != nil {
+ return err
+ }
+ if _, err = l.processData(); err != nil {
+ return err
+ }
+ gosched := true
+ hasdata:
+ for {
+ it, err := l.cbuf.get(false)
+ if err != nil {
+ return err
+ }
+ if it != nil {
+ if err = l.handle(it); err != nil {
+ return err
+ }
+ if _, err = l.processData(); err != nil {
+ return err
+ }
+ continue hasdata
+ }
+ isEmpty, err := l.processData()
+ if err != nil {
+ return err
+ }
+ if !isEmpty {
+ continue hasdata
+ }
+ if gosched {
+ gosched = false
+ if l.framer.writer.offset < minBatchSize {
+ runtime.Gosched()
+ continue hasdata
+ }
+ }
+ l.framer.writer.Flush()
+ break hasdata
+
+ }
+ }
+}
+
+func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
+ return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
+}
+
+func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
+ // Otherwise update the quota.
+ if w.streamID == 0 {
+ l.sendQuota += w.increment
+ return nil
+ }
+ // Find the stream and update it.
+ if str, ok := l.estdStreams[w.streamID]; ok {
+ str.bytesOutStanding -= int(w.increment)
+ if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
+ str.state = active
+ l.activeStreams.enqueue(str)
+ return nil
+ }
+ }
+ return nil
+}
+
+func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
+ return l.framer.fr.WriteSettings(s.ss...)
+}
+
+func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
+ if err := l.applySettings(s.ss); err != nil {
+ return err
+ }
+ return l.framer.fr.WriteSettingsAck()
+}
+
+func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
+ str := &outStream{
+ id: h.streamID,
+ state: empty,
+ itl: &itemList{},
+ wq: h.wq,
+ }
+ l.estdStreams[h.streamID] = str
+ return nil
+}
+
+func (l *loopyWriter) headerHandler(h *headerFrame) error {
+ if l.side == serverSide {
+ str, ok := l.estdStreams[h.streamID]
+ if !ok {
+ warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
+ return nil
+ }
+ // Case 1.A: Server is responding back with headers.
+ if !h.endStream {
+ return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
+ }
+ // else: Case 1.B: Server wants to close stream.
+
+ if str.state != empty { // either active or waiting on stream quota.
+ // add it str's list of items.
+ str.itl.enqueue(h)
+ return nil
+ }
+ if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
+ return err
+ }
+ return l.cleanupStreamHandler(h.cleanup)
+ }
+ // Case 2: Client wants to originate stream.
+ str := &outStream{
+ id: h.streamID,
+ state: empty,
+ itl: &itemList{},
+ wq: h.wq,
+ }
+ str.itl.enqueue(h)
+ return l.originateStream(str)
+}
+
+func (l *loopyWriter) originateStream(str *outStream) error {
+ hdr := str.itl.dequeue().(*headerFrame)
+ sendPing, err := hdr.initStream(str.id)
+ if err != nil {
+ if err == ErrConnClosing {
+ return err
+ }
+ // Other errors(errStreamDrain) need not close transport.
+ return nil
+ }
+ if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
+ return err
+ }
+ l.estdStreams[str.id] = str
+ if sendPing {
+ return l.pingHandler(&ping{data: [8]byte{}})
+ }
+ return nil
+}
+
+func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
+ if onWrite != nil {
+ onWrite()
+ }
+ l.hBuf.Reset()
+ for _, f := range hf {
+ if err := l.hEnc.WriteField(f); err != nil {
+ warningf("transport: loopyWriter.writeHeader encountered error while encoding headers:", err)
+ }
+ }
+ var (
+ err error
+ endHeaders, first bool
+ )
+ first = true
+ for !endHeaders {
+ size := l.hBuf.Len()
+ if size > http2MaxFrameLen {
+ size = http2MaxFrameLen
+ } else {
+ endHeaders = true
+ }
+ if first {
+ first = false
+ err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
+ StreamID: streamID,
+ BlockFragment: l.hBuf.Next(size),
+ EndStream: endStream,
+ EndHeaders: endHeaders,
+ })
+ } else {
+ err = l.framer.fr.WriteContinuation(
+ streamID,
+ endHeaders,
+ l.hBuf.Next(size),
+ )
+ }
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (l *loopyWriter) preprocessData(df *dataFrame) error {
+ str, ok := l.estdStreams[df.streamID]
+ if !ok {
+ return nil
+ }
+ // If we got data for a stream it means that
+ // stream was originated and the headers were sent out.
+ str.itl.enqueue(df)
+ if str.state == empty {
+ str.state = active
+ l.activeStreams.enqueue(str)
+ }
+ return nil
+}
+
+func (l *loopyWriter) pingHandler(p *ping) error {
+ if !p.ack {
+ l.bdpEst.timesnap(p.data)
+ }
+ return l.framer.fr.WritePing(p.ack, p.data)
+
+}
+
+func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
+ o.resp <- l.sendQuota
+ return nil
+}
+
+func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
+ c.onWrite()
+ if str, ok := l.estdStreams[c.streamID]; ok {
+ // On the server side it could be a trailers-only response or
+ // a RST_STREAM before stream initialization thus the stream might
+ // not be established yet.
+ delete(l.estdStreams, c.streamID)
+ str.deleteSelf()
+ }
+ if c.rst { // If RST_STREAM needs to be sent.
+ if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
+ return err
+ }
+ }
+ if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
+ return ErrConnClosing
+ }
+ return nil
+}
+
+func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
+ if l.side == clientSide {
+ l.draining = true
+ if len(l.estdStreams) == 0 {
+ return ErrConnClosing
+ }
+ }
+ return nil
+}
+
+func (l *loopyWriter) goAwayHandler(g *goAway) error {
+ // Handling of outgoing GoAway is very specific to side.
+ if l.ssGoAwayHandler != nil {
+ draining, err := l.ssGoAwayHandler(g)
+ if err != nil {
+ return err
+ }
+ l.draining = draining
+ }
+ return nil
+}
+
+func (l *loopyWriter) handle(i interface{}) error {
+ switch i := i.(type) {
+ case *incomingWindowUpdate:
+ return l.incomingWindowUpdateHandler(i)
+ case *outgoingWindowUpdate:
+ return l.outgoingWindowUpdateHandler(i)
+ case *incomingSettings:
+ return l.incomingSettingsHandler(i)
+ case *outgoingSettings:
+ return l.outgoingSettingsHandler(i)
+ case *headerFrame:
+ return l.headerHandler(i)
+ case *registerStream:
+ return l.registerStreamHandler(i)
+ case *cleanupStream:
+ return l.cleanupStreamHandler(i)
+ case *incomingGoAway:
+ return l.incomingGoAwayHandler(i)
+ case *dataFrame:
+ return l.preprocessData(i)
+ case *ping:
+ return l.pingHandler(i)
+ case *goAway:
+ return l.goAwayHandler(i)
+ case *outFlowControlSizeRequest:
+ return l.outFlowControlSizeRequestHandler(i)
+ default:
+ return fmt.Errorf("transport: unknown control message type %T", i)
+ }
+}
+
+func (l *loopyWriter) applySettings(ss []http2.Setting) error {
+ for _, s := range ss {
+ switch s.ID {
+ case http2.SettingInitialWindowSize:
+ o := l.oiws
+ l.oiws = s.Val
+ if o < l.oiws {
+ // If the new limit is greater make all depleted streams active.
+ for _, stream := range l.estdStreams {
+ if stream.state == waitingOnStreamQuota {
+ stream.state = active
+ l.activeStreams.enqueue(stream)
+ }
+ }
+ }
+ case http2.SettingHeaderTableSize:
+ updateHeaderTblSize(l.hEnc, s.Val)
+ }
+ }
+ return nil
+}
+
+func (l *loopyWriter) processData() (bool, error) {
+ if l.sendQuota == 0 {
+ return true, nil
+ }
+ str := l.activeStreams.dequeue()
+ if str == nil {
+ return true, nil
+ }
+ dataItem := str.itl.peek().(*dataFrame)
+ if len(dataItem.h) == 0 && len(dataItem.d) == 0 {
+ // Client sends out empty data frame with endStream = true
+ if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
+ return false, err
+ }
+ str.itl.dequeue()
+ if str.itl.isEmpty() {
+ str.state = empty
+ } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
+ if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
+ return false, err
+ }
+ if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
+ return false, nil
+ }
+ } else {
+ l.activeStreams.enqueue(str)
+ }
+ return false, nil
+ }
+ var (
+ idx int
+ buf []byte
+ )
+ if len(dataItem.h) != 0 { // data header has not been written out yet.
+ buf = dataItem.h
+ } else {
+ idx = 1
+ buf = dataItem.d
+ }
+ size := http2MaxFrameLen
+ if len(buf) < size {
+ size = len(buf)
+ }
+ if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 {
+ str.state = waitingOnStreamQuota
+ return false, nil
+ } else if strQuota < size {
+ size = strQuota
+ }
+
+ if l.sendQuota < uint32(size) {
+ size = int(l.sendQuota)
+ }
+ // Now that outgoing flow controls are checked we can replenish str's write quota
+ str.wq.replenish(size)
+ var endStream bool
+ // This last data message on this stream and all
+ // of it can be written in this go.
+ if dataItem.endStream && size == len(buf) {
+ // buf contains either data or it contains header but data is empty.
+ if idx == 1 || len(dataItem.d) == 0 {
+ endStream = true
+ }
+ }
+ if dataItem.onEachWrite != nil {
+ dataItem.onEachWrite()
+ }
+ if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
+ return false, err
+ }
+ buf = buf[size:]
+ str.bytesOutStanding += size
+ l.sendQuota -= uint32(size)
+ if idx == 0 {
+ dataItem.h = buf
+ } else {
+ dataItem.d = buf
+ }
+
+ if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
+ str.itl.dequeue()
+ }
+ if str.itl.isEmpty() {
+ str.state = empty
+ } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
+ if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
+ return false, err
+ }
+ if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
+ return false, err
+ }
+ } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
+ str.state = waitingOnStreamQuota
+ } else { // Otherwise add it back to the list of active streams.
+ l.activeStreams.enqueue(str)
+ }
+ return false, nil
+}
diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/flowcontrol.go
index dd1a8d4..bbf98b6 100644
--- a/vendor/google.golang.org/grpc/transport/control.go
+++ b/vendor/google.golang.org/grpc/transport/flowcontrol.go
@@ -24,9 +24,6 @@ import (
"sync"
"sync/atomic"
"time"
-
- "golang.org/x/net/http2"
- "golang.org/x/net/http2/hpack"
)
const (
@@ -36,179 +33,115 @@ const (
initialWindowSize = defaultWindowSize // for an RPC
infinity = time.Duration(math.MaxInt64)
defaultClientKeepaliveTime = infinity
- defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
+ defaultClientKeepaliveTimeout = 20 * time.Second
defaultMaxStreamsClient = 100
defaultMaxConnectionIdle = infinity
defaultMaxConnectionAge = infinity
defaultMaxConnectionAgeGrace = infinity
- defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
- defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
- defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
+ defaultServerKeepaliveTime = 2 * time.Hour
+ defaultServerKeepaliveTimeout = 20 * time.Second
+ defaultKeepalivePolicyMinTime = 5 * time.Minute
// max window limit set by HTTP2 Specs.
maxWindowSize = math.MaxInt32
- // defaultLocalSendQuota sets is default value for number of data
+ // defaultWriteQuota is the default value for number of data
// bytes that each stream can schedule before some of it being
// flushed out.
- defaultLocalSendQuota = 64 * 1024
+ defaultWriteQuota = 64 * 1024
)
-// The following defines various control items which could flow through
-// the control buffer of transport. They represent different aspects of
-// control tasks, e.g., flow control, settings, streaming resetting, etc.
-
-type headerFrame struct {
- streamID uint32
- hf []hpack.HeaderField
- endStream bool
-}
-
-func (*headerFrame) item() {}
-
-type continuationFrame struct {
- streamID uint32
- endHeaders bool
- headerBlockFragment []byte
-}
-
-type dataFrame struct {
- streamID uint32
- endStream bool
- d []byte
- f func()
-}
-
-func (*dataFrame) item() {}
-
-func (*continuationFrame) item() {}
-
-type windowUpdate struct {
- streamID uint32
- increment uint32
-}
-
-func (*windowUpdate) item() {}
-
-type settings struct {
- ack bool
- ss []http2.Setting
-}
-
-func (*settings) item() {}
-
-type resetStream struct {
- streamID uint32
- code http2.ErrCode
-}
-
-func (*resetStream) item() {}
-
-type goAway struct {
- code http2.ErrCode
- debugData []byte
- headsUp bool
- closeConn bool
-}
-
-func (*goAway) item() {}
-
-type flushIO struct {
-}
-
-func (*flushIO) item() {}
-
-type ping struct {
- ack bool
- data [8]byte
+// writeQuota is a soft limit on the amount of data a stream can
+// schedule before some of it is written out.
+type writeQuota struct {
+ quota int32
+ // get waits on read from when quota goes less than or equal to zero.
+ // replenish writes on it when quota goes positive again.
+ ch chan struct{}
+ // done is triggered in error case.
+ done <-chan struct{}
+ // replenish is called by loopyWriter to give quota back to.
+ // It is implemented as a field so that it can be updated
+ // by tests.
+ replenish func(n int)
+}
+
+func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
+ w := &writeQuota{
+ quota: sz,
+ ch: make(chan struct{}, 1),
+ done: done,
+ }
+ w.replenish = w.realReplenish
+ return w
}
-func (*ping) item() {}
-
-// quotaPool is a pool which accumulates the quota and sends it to acquire()
-// when it is available.
-type quotaPool struct {
- c chan int
-
- mu sync.Mutex
- version uint32
- quota int
+func (w *writeQuota) get(sz int32) error {
+ for {
+ if atomic.LoadInt32(&w.quota) > 0 {
+ atomic.AddInt32(&w.quota, -sz)
+ return nil
+ }
+ select {
+ case <-w.ch:
+ continue
+ case <-w.done:
+ return errStreamDone
+ }
+ }
}
-// newQuotaPool creates a quotaPool which has quota q available to consume.
-func newQuotaPool(q int) *quotaPool {
- qb := &quotaPool{
- c: make(chan int, 1),
- }
- if q > 0 {
- qb.c <- q
- } else {
- qb.quota = q
+func (w *writeQuota) realReplenish(n int) {
+ sz := int32(n)
+ a := atomic.AddInt32(&w.quota, sz)
+ b := a - sz
+ if b <= 0 && a > 0 {
+ select {
+ case w.ch <- struct{}{}:
+ default:
+ }
}
- return qb
}
-// add cancels the pending quota sent on acquired, incremented by v and sends
-// it back on acquire.
-func (qb *quotaPool) add(v int) {
- qb.mu.Lock()
- defer qb.mu.Unlock()
- qb.lockedAdd(v)
+type trInFlow struct {
+ limit uint32
+ unacked uint32
+ effectiveWindowSize uint32
}
-func (qb *quotaPool) lockedAdd(v int) {
- select {
- case n := <-qb.c:
- qb.quota += n
- default:
- }
- qb.quota += v
- if qb.quota <= 0 {
- return
- }
- // After the pool has been created, this is the only place that sends on
- // the channel. Since mu is held at this point and any quota that was sent
- // on the channel has been retrieved, we know that this code will always
- // place any positive quota value on the channel.
- select {
- case qb.c <- qb.quota:
- qb.quota = 0
- default:
- }
+func (f *trInFlow) newLimit(n uint32) uint32 {
+ d := n - f.limit
+ f.limit = n
+ f.updateEffectiveWindowSize()
+ return d
}
-func (qb *quotaPool) addAndUpdate(v int) {
- qb.mu.Lock()
- defer qb.mu.Unlock()
- qb.lockedAdd(v)
- // Update the version only after having added to the quota
- // so that if acquireWithVesrion sees the new vesrion it is
- // guaranteed to have seen the updated quota.
- // Also, still keep this inside of the lock, so that when
- // compareAndExecute is processing, this function doesn't
- // get executed partially (quota gets updated but the version
- // doesn't).
- atomic.AddUint32(&(qb.version), 1)
+func (f *trInFlow) onData(n uint32) uint32 {
+ f.unacked += n
+ if f.unacked >= f.limit/4 {
+ w := f.unacked
+ f.unacked = 0
+ f.updateEffectiveWindowSize()
+ return w
+ }
+ f.updateEffectiveWindowSize()
+ return 0
}
-func (qb *quotaPool) acquireWithVersion() (<-chan int, uint32) {
- return qb.c, atomic.LoadUint32(&(qb.version))
+func (f *trInFlow) reset() uint32 {
+ w := f.unacked
+ f.unacked = 0
+ f.updateEffectiveWindowSize()
+ return w
}
-func (qb *quotaPool) compareAndExecute(version uint32, success, failure func()) bool {
- qb.mu.Lock()
- defer qb.mu.Unlock()
- if version == atomic.LoadUint32(&(qb.version)) {
- success()
- return true
- }
- failure()
- return false
+func (f *trInFlow) updateEffectiveWindowSize() {
+ atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked)
}
-// acquire returns the channel on which available quota amounts are sent.
-func (qb *quotaPool) acquire() <-chan int {
- return qb.c
+func (f *trInFlow) getSize() uint32 {
+ return atomic.LoadUint32(&f.effectiveWindowSize)
}
+// TODO(mmukhi): Simplify this code.
// inFlow deals with inbound flow control
type inFlow struct {
mu sync.Mutex
@@ -229,9 +162,9 @@ type inFlow struct {
// It assumes that n is always greater than the old limit.
func (f *inFlow) newLimit(n uint32) uint32 {
f.mu.Lock()
- defer f.mu.Unlock()
d := n - f.limit
f.limit = n
+ f.mu.Unlock()
return d
}
@@ -240,7 +173,6 @@ func (f *inFlow) maybeAdjust(n uint32) uint32 {
n = uint32(math.MaxInt32)
}
f.mu.Lock()
- defer f.mu.Unlock()
// estSenderQuota is the receiver's view of the maximum number of bytes the sender
// can send without a window update.
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
@@ -252,7 +184,7 @@ func (f *inFlow) maybeAdjust(n uint32) uint32 {
// for this message. Therefore we must send an update over the limit since there's an active read
// request from the application.
if estUntransmittedData > estSenderQuota {
- // Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec.
+ // Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec.
if f.limit+n > maxWindowSize {
f.delta = maxWindowSize - f.limit
} else {
@@ -261,19 +193,24 @@ func (f *inFlow) maybeAdjust(n uint32) uint32 {
// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
f.delta = n
}
+ f.mu.Unlock()
return f.delta
}
+ f.mu.Unlock()
return 0
}
// onData is invoked when some data frame is received. It updates pendingData.
func (f *inFlow) onData(n uint32) error {
f.mu.Lock()
- defer f.mu.Unlock()
f.pendingData += n
if f.pendingData+f.pendingUpdate > f.limit+f.delta {
- return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
+ limit := f.limit
+ rcvd := f.pendingData + f.pendingUpdate
+ f.mu.Unlock()
+ return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
}
+ f.mu.Unlock()
return nil
}
@@ -281,8 +218,8 @@ func (f *inFlow) onData(n uint32) error {
// to be sent to the peer.
func (f *inFlow) onRead(n uint32) uint32 {
f.mu.Lock()
- defer f.mu.Unlock()
if f.pendingData == 0 {
+ f.mu.Unlock()
return 0
}
f.pendingData -= n
@@ -297,15 +234,9 @@ func (f *inFlow) onRead(n uint32) uint32 {
if f.pendingUpdate >= f.limit/4 {
wu := f.pendingUpdate
f.pendingUpdate = 0
+ f.mu.Unlock()
return wu
}
+ f.mu.Unlock()
return 0
}
-
-func (f *inFlow) resetPendingUpdate() uint32 {
- f.mu.Lock()
- defer f.mu.Unlock()
- n := f.pendingUpdate
- f.pendingUpdate = 0
- return n
-}
diff --git a/vendor/google.golang.org/grpc/transport/go16.go b/vendor/google.golang.org/grpc/transport/go16.go
new file mode 100644
index 0000000..5babcf9
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/go16.go
@@ -0,0 +1,51 @@
+// +build go1.6,!go1.7
+
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package transport
+
+import (
+ "net"
+ "net/http"
+
+ "google.golang.org/grpc/codes"
+
+ "golang.org/x/net/context"
+)
+
+// dialContext connects to the address on the named network.
+func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
+ return (&net.Dialer{Cancel: ctx.Done()}).Dial(network, address)
+}
+
+// ContextErr converts the error from context package into a StreamError.
+func ContextErr(err error) StreamError {
+ switch err {
+ case context.DeadlineExceeded:
+ return streamErrorf(codes.DeadlineExceeded, "%v", err)
+ case context.Canceled:
+ return streamErrorf(codes.Canceled, "%v", err)
+ }
+ return streamErrorf(codes.Internal, "Unexpected error from context packet: %v", err)
+}
+
+// contextFromRequest returns a background context.
+func contextFromRequest(r *http.Request) context.Context {
+ return context.Background()
+}
diff --git a/vendor/google.golang.org/grpc/transport/go17.go b/vendor/google.golang.org/grpc/transport/go17.go
new file mode 100644
index 0000000..b7fa6bd
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/go17.go
@@ -0,0 +1,52 @@
+// +build go1.7
+
+/*
+ *
+ * Copyright 2016 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package transport
+
+import (
+ "context"
+ "net"
+ "net/http"
+
+ "google.golang.org/grpc/codes"
+
+ netctx "golang.org/x/net/context"
+)
+
+// dialContext connects to the address on the named network.
+func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
+ return (&net.Dialer{}).DialContext(ctx, network, address)
+}
+
+// ContextErr converts the error from context package into a StreamError.
+func ContextErr(err error) StreamError {
+ switch err {
+ case context.DeadlineExceeded, netctx.DeadlineExceeded:
+ return streamErrorf(codes.DeadlineExceeded, "%v", err)
+ case context.Canceled, netctx.Canceled:
+ return streamErrorf(codes.Canceled, "%v", err)
+ }
+ return streamErrorf(codes.Internal, "Unexpected error from context packet: %v", err)
+}
+
+// contextFromRequest returns a context from the HTTP Request.
+func contextFromRequest(r *http.Request) context.Context {
+ return r.Context()
+}
diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/transport/handler_server.go
index f1f6caf..f71b748 100644
--- a/vendor/google.golang.org/grpc/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/transport/handler_server.go
@@ -40,20 +40,24 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
+ "google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)
// NewServerHandlerTransport returns a ServerTransport handling gRPC
// from inside an http.Handler. It requires that the http Server
// supports HTTP/2.
-func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTransport, error) {
+func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) {
if r.ProtoMajor != 2 {
return nil, errors.New("gRPC requires HTTP/2")
}
if r.Method != "POST" {
return nil, errors.New("invalid gRPC request method")
}
- if !validContentType(r.Header.Get("Content-Type")) {
+ contentType := r.Header.Get("Content-Type")
+ // TODO: do we assume contentType is lowercase? we did before
+ contentSubtype, validContentType := contentSubtype(contentType)
+ if !validContentType {
return nil, errors.New("invalid gRPC request content-type")
}
if _, ok := w.(http.Flusher); !ok {
@@ -64,10 +68,13 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTr
}
st := &serverHandlerTransport{
- rw: w,
- req: r,
- closedCh: make(chan struct{}),
- writes: make(chan func()),
+ rw: w,
+ req: r,
+ closedCh: make(chan struct{}),
+ writes: make(chan func()),
+ contentType: contentType,
+ contentSubtype: contentSubtype,
+ stats: stats,
}
if v := r.Header.Get("grpc-timeout"); v != "" {
@@ -79,19 +86,19 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTr
st.timeout = to
}
- var metakv []string
+ metakv := []string{"content-type", contentType}
if r.Host != "" {
metakv = append(metakv, ":authority", r.Host)
}
for k, vv := range r.Header {
k = strings.ToLower(k)
- if isReservedHeader(k) && !isWhitelistedPseudoHeader(k) {
+ if isReservedHeader(k) && !isWhitelistedHeader(k) {
continue
}
for _, v := range vv {
v, err := decodeMetadataHeader(k, v)
if err != nil {
- return nil, streamErrorf(codes.InvalidArgument, "malformed binary metadata: %v", err)
+ return nil, streamErrorf(codes.Internal, "malformed binary metadata: %v", err)
}
metakv = append(metakv, k, v)
}
@@ -123,10 +130,17 @@ type serverHandlerTransport struct {
// when WriteStatus is called.
writes chan func()
- mu sync.Mutex
- // streamDone indicates whether WriteStatus has been called and writes channel
- // has been closed.
- streamDone bool
+ // block concurrent WriteStatus calls
+ // e.g. grpc/(*serverStream).SendMsg/RecvMsg
+ writeStatusMu sync.Mutex
+
+ // we just mirror the request content-type
+ contentType string
+ // we store both contentType and contentSubtype so we don't keep recreating them
+ // TODO make sure this is consistent across handler_server and http2_server
+ contentSubtype string
+
+ stats stats.Handler
}
func (ht *serverHandlerTransport) Close() error {
@@ -177,13 +191,9 @@ func (ht *serverHandlerTransport) do(fn func()) error {
}
func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
- ht.mu.Lock()
- if ht.streamDone {
- ht.mu.Unlock()
- return nil
- }
- ht.streamDone = true
- ht.mu.Unlock()
+ ht.writeStatusMu.Lock()
+ defer ht.writeStatusMu.Unlock()
+
err := ht.do(func() {
ht.writeCommonHeaders(s)
@@ -222,7 +232,14 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
}
}
})
- close(ht.writes)
+
+ if err == nil { // transport has not been closed
+ if ht.stats != nil {
+ ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
+ }
+ ht.Close()
+ close(ht.writes)
+ }
return err
}
@@ -236,7 +253,7 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
h := ht.rw.Header()
h["Date"] = nil // suppress Date to make tests happy; TODO: restore
- h.Set("Content-Type", "application/grpc")
+ h.Set("Content-Type", ht.contentType)
// Predeclare trailers we'll set later in WriteStatus (after the body).
// This is a SHOULD in the HTTP RFC, and the way you add (known)
@@ -264,7 +281,7 @@ func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts
}
func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
- return ht.do(func() {
+ err := ht.do(func() {
ht.writeCommonHeaders(s)
h := ht.rw.Header()
for k, vv := range md {
@@ -280,17 +297,24 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
ht.rw.WriteHeader(200)
ht.rw.(http.Flusher).Flush()
})
+
+ if err == nil {
+ if ht.stats != nil {
+ ht.stats.HandleRPC(s.Context(), &stats.OutHeader{})
+ }
+ }
+ return err
}
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
// With this transport type there will be exactly 1 stream: this HTTP request.
- var ctx context.Context
+ ctx := contextFromRequest(ht.req)
var cancel context.CancelFunc
if ht.timeoutSet {
- ctx, cancel = context.WithTimeout(context.Background(), ht.timeout)
+ ctx, cancel = context.WithTimeout(ctx, ht.timeout)
} else {
- ctx, cancel = context.WithCancel(context.Background())
+ ctx, cancel = context.WithCancel(ctx)
}
// requestOver is closed when either the request's context is done
@@ -314,13 +338,14 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
req := ht.req
s := &Stream{
- id: 0, // irrelevant
- requestRead: func(int) {},
- cancel: cancel,
- buf: newRecvBuffer(),
- st: ht,
- method: req.URL.Path,
- recvCompress: req.Header.Get("grpc-encoding"),
+ id: 0, // irrelevant
+ requestRead: func(int) {},
+ cancel: cancel,
+ buf: newRecvBuffer(),
+ st: ht,
+ method: req.URL.Path,
+ recvCompress: req.Header.Get("grpc-encoding"),
+ contentSubtype: ht.contentSubtype,
}
pr := &peer.Peer{
Addr: ht.RemoteAddr(),
@@ -329,10 +354,18 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
}
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
- ctx = peer.NewContext(ctx, pr)
- s.ctx = newContextWithStream(ctx, s)
+ s.ctx = peer.NewContext(ctx, pr)
+ if ht.stats != nil {
+ s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
+ inHeader := &stats.InHeader{
+ FullMethod: s.method,
+ RemoteAddr: ht.RemoteAddr(),
+ Compression: s.recvCompress,
+ }
+ ht.stats.HandleRPC(s.ctx, inHeader)
+ }
s.trReader = &transportReader{
- reader: &recvBufferReader{ctx: s.ctx, recv: s.buf},
+ reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf},
windowHandler: func(int) {},
}
@@ -387,6 +420,10 @@ func (ht *serverHandlerTransport) runStream() {
}
}
+func (ht *serverHandlerTransport) IncrMsgSent() {}
+
+func (ht *serverHandlerTransport) IncrMsgRecv() {}
+
func (ht *serverHandlerTransport) Drain() {
panic("Drain() is not implemented")
}
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go
index 6ca6cc6..eaf007e 100644
--- a/vendor/google.golang.org/grpc/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/transport/http2_client.go
@@ -19,7 +19,6 @@
package transport
import (
- "bytes"
"io"
"math"
"net"
@@ -31,8 +30,10 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
+
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -44,15 +45,17 @@ import (
type http2Client struct {
ctx context.Context
cancel context.CancelFunc
- target string // server name/addr
+ ctxDone <-chan struct{} // Cache the ctx.Done() chan.
userAgent string
md interface{}
conn net.Conn // underlying communication channel
+ loopy *loopyWriter
remoteAddr net.Addr
localAddr net.Addr
authInfo credentials.AuthInfo // auth info about the connection
- nextID uint32 // the next stream ID to be used
+ readerDone chan struct{} // sync point to enable testing.
+ writerDone chan struct{} // sync point to enable testing.
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
@@ -60,18 +63,10 @@ type http2Client struct {
awakenKeepalive chan struct{}
framer *framer
- hBuf *bytes.Buffer // the buffer for HPACK encoding
- hEnc *hpack.Encoder // HPACK encoder
-
// controlBuf delivers all the control related tasks (e.g., window
// updates, reset streams, and various settings) to the controller.
controlBuf *controlBuffer
- fc *inFlow
- // sendQuotaPool provides flow control to outbound message.
- sendQuotaPool *quotaPool
- // streamsQuota limits the max number of concurrent streams.
- streamsQuota *quotaPool
-
+ fc *trInFlow
// The scheme used: https if TLS is on, http otherwise.
scheme string
@@ -81,50 +76,60 @@ type http2Client struct {
// Boolean to keep track of reading activity on transport.
// 1 is true and 0 is false.
- activity uint32 // Accessed atomically.
- kp keepalive.ClientParameters
+ activity uint32 // Accessed atomically.
+ kp keepalive.ClientParameters
+ keepaliveEnabled bool
statsHandler stats.Handler
initialWindowSize int32
- bdpEst *bdpEstimator
- outQuotaVersion uint32
+ bdpEst *bdpEstimator
+ // onSuccess is a callback that client transport calls upon
+ // receiving server preface to signal that a succefull HTTP2
+ // connection was established.
+ onSuccess func()
- mu sync.Mutex // guard the following variables
- state transportState // the state of underlying connection
+ maxConcurrentStreams uint32
+ streamQuota int64
+ streamsQuotaAvailable chan struct{}
+ waitingStreams uint32
+ nextID uint32
+
+ mu sync.Mutex // guard the following variables
+ state transportState
activeStreams map[uint32]*Stream
- // The max number of concurrent streams
- maxStreams int
- // the per-stream outbound flow control window size set by the peer.
- streamSendQuota uint32
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
prevGoAwayID uint32
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
+
+ // Fields below are for channelz metric collection.
+ channelzID int64 // channelz unique identification number
+ czmu sync.RWMutex
+ kpCount int64
+ // The number of streams that have started, including already finished ones.
+ streamsStarted int64
+ // The number of streams that have ended successfully by receiving EoS bit set
+ // frame from server.
+ streamsSucceeded int64
+ streamsFailed int64
+ lastStreamCreated time.Time
+ msgSent int64
+ msgRecv int64
+ lastMsgSent time.Time
+ lastMsgRecv time.Time
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
if fn != nil {
return fn(ctx, addr)
}
- return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
+ return dialContext(ctx, "tcp", addr)
}
func isTemporary(err error) bool {
- switch err {
- case io.EOF:
- // Connection closures may be resolved upon retry, and are thus
- // treated as temporary.
- return true
- case context.DeadlineExceeded:
- // In Go 1.7, context.DeadlineExceeded implements Timeout(), and this
- // special case is not needed. Until then, we need to keep this
- // clause.
- return true
- }
-
switch err := err.(type) {
case interface {
Temporary() bool
@@ -137,18 +142,16 @@ func isTemporary(err error) bool {
// temporary.
return err.Timeout()
}
- return false
+ return true
}
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
-func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, timeout time.Duration) (_ ClientTransport, err error) {
+func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ ClientTransport, err error) {
scheme := "http"
ctx, cancel := context.WithCancel(ctx)
- connectCtx, connectCancel := context.WithTimeout(ctx, timeout)
defer func() {
- connectCancel()
if err != nil {
cancel()
}
@@ -173,12 +176,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, t
)
if creds := opts.TransportCredentials; creds != nil {
scheme = "https"
- conn, authInfo, err = creds.ClientHandshake(connectCtx, addr.Addr, conn)
+ conn, authInfo, err = creds.ClientHandshake(connectCtx, addr.Authority, conn)
if err != nil {
- // Credentials handshake errors are typically considered permanent
- // to avoid retrying on e.g. bad certificates.
- temp := isTemporary(err)
- return nil, connectionErrorf(temp, err, "transport: authentication handshake failed: %v", err)
+ return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
}
isSecure = true
}
@@ -196,7 +196,6 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, t
icwz = opts.InitialConnWindowSize
dynamicWindow = false
}
- var buf bytes.Buffer
writeBufSize := defaultWriteBufSize
if opts.WriteBufferSize > 0 {
writeBufSize = opts.WriteBufferSize
@@ -206,37 +205,35 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, t
readBufSize = opts.ReadBufferSize
}
t := &http2Client{
- ctx: ctx,
- cancel: cancel,
- target: addr.Addr,
- userAgent: opts.UserAgent,
- md: addr.Metadata,
- conn: conn,
- remoteAddr: conn.RemoteAddr(),
- localAddr: conn.LocalAddr(),
- authInfo: authInfo,
- // The client initiated stream id is odd starting from 1.
- nextID: 1,
- goAway: make(chan struct{}),
- awakenKeepalive: make(chan struct{}, 1),
- hBuf: &buf,
- hEnc: hpack.NewEncoder(&buf),
- framer: newFramer(conn, writeBufSize, readBufSize),
- controlBuf: newControlBuffer(),
- fc: &inFlow{limit: uint32(icwz)},
- sendQuotaPool: newQuotaPool(defaultWindowSize),
- scheme: scheme,
- state: reachable,
- activeStreams: make(map[uint32]*Stream),
- isSecure: isSecure,
- creds: opts.PerRPCCredentials,
- maxStreams: defaultMaxStreamsClient,
- streamsQuota: newQuotaPool(defaultMaxStreamsClient),
- streamSendQuota: defaultWindowSize,
- kp: kp,
- statsHandler: opts.StatsHandler,
- initialWindowSize: initialWindowSize,
- }
+ ctx: ctx,
+ ctxDone: ctx.Done(), // Cache Done chan.
+ cancel: cancel,
+ userAgent: opts.UserAgent,
+ md: addr.Metadata,
+ conn: conn,
+ remoteAddr: conn.RemoteAddr(),
+ localAddr: conn.LocalAddr(),
+ authInfo: authInfo,
+ readerDone: make(chan struct{}),
+ writerDone: make(chan struct{}),
+ goAway: make(chan struct{}),
+ awakenKeepalive: make(chan struct{}, 1),
+ framer: newFramer(conn, writeBufSize, readBufSize),
+ fc: &trInFlow{limit: uint32(icwz)},
+ scheme: scheme,
+ activeStreams: make(map[uint32]*Stream),
+ isSecure: isSecure,
+ creds: opts.PerRPCCredentials,
+ kp: kp,
+ statsHandler: opts.StatsHandler,
+ initialWindowSize: initialWindowSize,
+ onSuccess: onSuccess,
+ nextID: 1,
+ maxConcurrentStreams: defaultMaxStreamsClient,
+ streamQuota: defaultMaxStreamsClient,
+ streamsQuotaAvailable: make(chan struct{}, 1),
+ }
+ t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
t.initialWindowSize = opts.InitialWindowSize
dynamicWindow = false
@@ -260,6 +257,13 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, t
}
t.statsHandler.HandleConn(t.ctx, connBegin)
}
+ if channelz.IsOn() {
+ t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, "")
+ }
+ if t.kp.Time != infinity {
+ t.keepaliveEnabled = true
+ go t.keepalive()
+ }
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
// dispatches the frame to the corresponding stream entity.
@@ -295,30 +299,32 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, t
}
t.framer.writer.Flush()
go func() {
- loopyWriter(t.ctx, t.controlBuf, t.itemHandler)
- t.Close()
+ t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
+ err := t.loopy.run()
+ if err != nil {
+ errorf("transport: loopyWriter.run returning. Err: %v", err)
+ }
+ // If it's a connection error, let reader goroutine handle it
+ // since there might be data in the buffers.
+ if _, ok := err.(net.Error); !ok {
+ t.conn.Close()
+ }
+ close(t.writerDone)
}()
- if t.kp.Time != infinity {
- go t.keepalive()
- }
return t, nil
}
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
s := &Stream{
- id: t.nextID,
done: make(chan struct{}),
- goAway: make(chan struct{}),
method: callHdr.Method,
sendCompress: callHdr.SendCompress,
buf: newRecvBuffer(),
- fc: &inFlow{limit: uint32(t.initialWindowSize)},
- sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
- localSendQuota: newQuotaPool(defaultLocalSendQuota),
headerChan: make(chan struct{}),
+ contentSubtype: callHdr.ContentSubtype,
}
- t.nextID += 2
+ s.wq = newWriteQuota(defaultWriteQuota, s.done)
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
}
@@ -328,21 +334,18 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
s.ctx = ctx
s.trReader = &transportReader{
reader: &recvBufferReader{
- ctx: s.ctx,
- goAway: s.goAway,
- recv: s.buf,
+ ctx: s.ctx,
+ ctxDone: s.ctx.Done(),
+ recv: s.buf,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
},
}
-
return s
}
-// NewStream creates a stream and registers it into the transport as "active"
-// streams.
-func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
+func (t *http2Client) getPeer() *peer.Peer {
pr := &peer.Peer{
Addr: t.remoteAddr,
}
@@ -350,74 +353,20 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if t.authInfo != nil {
pr.AuthInfo = t.authInfo
}
- ctx = peer.NewContext(ctx, pr)
- var (
- authData = make(map[string]string)
- audience string
- )
- // Create an audience string only if needed.
- if len(t.creds) > 0 || callHdr.Creds != nil {
- // Construct URI required to get auth request metadata.
- // Omit port if it is the default one.
- host := strings.TrimSuffix(callHdr.Host, ":443")
- pos := strings.LastIndex(callHdr.Method, "/")
- if pos == -1 {
- pos = len(callHdr.Method)
- }
- audience = "https://" + host + callHdr.Method[:pos]
- }
- for _, c := range t.creds {
- data, err := c.GetRequestMetadata(ctx, audience)
- if err != nil {
- return nil, streamErrorf(codes.Internal, "transport: %v", err)
- }
- for k, v := range data {
- // Capital header names are illegal in HTTP/2.
- k = strings.ToLower(k)
- authData[k] = v
- }
- }
- callAuthData := map[string]string{}
- // Check if credentials.PerRPCCredentials were provided via call options.
- // Note: if these credentials are provided both via dial options and call
- // options, then both sets of credentials will be applied.
- if callCreds := callHdr.Creds; callCreds != nil {
- if !t.isSecure && callCreds.RequireTransportSecurity() {
- return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
- }
- data, err := callCreds.GetRequestMetadata(ctx, audience)
- if err != nil {
- return nil, streamErrorf(codes.Internal, "transport: %v", err)
- }
- for k, v := range data {
- // Capital header names are illegal in HTTP/2
- k = strings.ToLower(k)
- callAuthData[k] = v
- }
- }
- t.mu.Lock()
- if t.activeStreams == nil {
- t.mu.Unlock()
- return nil, ErrConnClosing
- }
- if t.state == draining {
- t.mu.Unlock()
- return nil, ErrStreamDrain
- }
- if t.state != reachable {
- t.mu.Unlock()
- return nil, ErrConnClosing
- }
- t.mu.Unlock()
- sq, err := wait(ctx, t.ctx, nil, nil, t.streamsQuota.acquire())
+ return pr
+}
+
+func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
+ aud := t.createAudience(callHdr)
+ authData, err := t.getTrAuthData(ctx, aud)
if err != nil {
return nil, err
}
- // Returns the quota balance back.
- if sq > 1 {
- t.streamsQuota.add(sq - 1)
+ callAuthData, err := t.getCallAuthData(ctx, aud, callHdr)
+ if err != nil {
+ return nil, err
}
- // TODO(mmukhi): Benchmark if the perfomance gets better if count the metadata and other header fields
+ // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
// Make the slice of certain predictable size to reduce allocations made by append.
hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
@@ -427,7 +376,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
- headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
+ headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
@@ -452,7 +401,22 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if b := stats.OutgoingTrace(ctx); b != nil {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
}
- if md, ok := metadata.FromOutgoingContext(ctx); ok {
+
+ if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
+ var k string
+ for _, vv := range added {
+ for i, v := range vv {
+ if i%2 == 0 {
+ k = v
+ continue
+ }
+ // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
+ if isReservedHeader(k) {
+ continue
+ }
+ headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
+ }
+ }
for k, vv := range md {
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
if isReservedHeader(k) {
@@ -473,42 +437,178 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
}
}
- t.mu.Lock()
- if t.state == draining {
- t.mu.Unlock()
- t.streamsQuota.add(1)
- return nil, ErrStreamDrain
+ return headerFields, nil
+}
+
+func (t *http2Client) createAudience(callHdr *CallHdr) string {
+ // Create an audience string only if needed.
+ if len(t.creds) == 0 && callHdr.Creds == nil {
+ return ""
}
- if t.state != reachable {
- t.mu.Unlock()
- return nil, ErrConnClosing
+ // Construct URI required to get auth request metadata.
+ // Omit port if it is the default one.
+ host := strings.TrimSuffix(callHdr.Host, ":443")
+ pos := strings.LastIndex(callHdr.Method, "/")
+ if pos == -1 {
+ pos = len(callHdr.Method)
+ }
+ return "https://" + host + callHdr.Method[:pos]
+}
+
+func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
+ authData := map[string]string{}
+ for _, c := range t.creds {
+ data, err := c.GetRequestMetadata(ctx, audience)
+ if err != nil {
+ if _, ok := status.FromError(err); ok {
+ return nil, err
+ }
+
+ return nil, streamErrorf(codes.Unauthenticated, "transport: %v", err)
+ }
+ for k, v := range data {
+ // Capital header names are illegal in HTTP/2.
+ k = strings.ToLower(k)
+ authData[k] = v
+ }
+ }
+ return authData, nil
+}
+
+func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
+ callAuthData := map[string]string{}
+ // Check if credentials.PerRPCCredentials were provided via call options.
+ // Note: if these credentials are provided both via dial options and call
+ // options, then both sets of credentials will be applied.
+ if callCreds := callHdr.Creds; callCreds != nil {
+ if !t.isSecure && callCreds.RequireTransportSecurity() {
+ return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
+ }
+ data, err := callCreds.GetRequestMetadata(ctx, audience)
+ if err != nil {
+ return nil, streamErrorf(codes.Internal, "transport: %v", err)
+ }
+ for k, v := range data {
+ // Capital header names are illegal in HTTP/2
+ k = strings.ToLower(k)
+ callAuthData[k] = v
+ }
+ }
+ return callAuthData, nil
+}
+
+// NewStream creates a stream and registers it into the transport as "active"
+// streams.
+func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
+ ctx = peer.NewContext(ctx, t.getPeer())
+ headerFields, err := t.createHeaderFields(ctx, callHdr)
+ if err != nil {
+ return nil, err
}
s := t.newStream(ctx, callHdr)
- t.activeStreams[s.id] = s
- // If the number of active streams change from 0 to 1, then check if keepalive
- // has gone dormant. If so, wake it up.
- if len(t.activeStreams) == 1 {
- select {
- case t.awakenKeepalive <- struct{}{}:
- t.controlBuf.put(&ping{data: [8]byte{}})
- // Fill the awakenKeepalive channel again as this channel must be
- // kept non-writable except at the point that the keepalive()
- // goroutine is waiting either to be awaken or shutdown.
- t.awakenKeepalive <- struct{}{}
- default:
+ cleanup := func(err error) {
+ if s.swapState(streamDone) == streamDone {
+ // If it was already done, return.
+ return
+ }
+ // The stream was unprocessed by the server.
+ atomic.StoreUint32(&s.unprocessed, 1)
+ s.write(recvMsg{err: err})
+ close(s.done)
+ // If headerChan isn't closed, then close it.
+ if atomic.SwapUint32(&s.headerDone, 1) == 0 {
+ close(s.headerChan)
}
+
}
- t.controlBuf.put(&headerFrame{
- streamID: s.id,
+ hdr := &headerFrame{
hf: headerFields,
endStream: false,
- })
- t.mu.Unlock()
-
- s.mu.Lock()
- s.bytesSent = true
- s.mu.Unlock()
-
+ initStream: func(id uint32) (bool, error) {
+ t.mu.Lock()
+ if state := t.state; state != reachable {
+ t.mu.Unlock()
+ // Do a quick cleanup.
+ err := error(errStreamDrain)
+ if state == closing {
+ err = ErrConnClosing
+ }
+ cleanup(err)
+ return false, err
+ }
+ t.activeStreams[id] = s
+ if channelz.IsOn() {
+ t.czmu.Lock()
+ t.streamsStarted++
+ t.lastStreamCreated = time.Now()
+ t.czmu.Unlock()
+ }
+ var sendPing bool
+ // If the number of active streams change from 0 to 1, then check if keepalive
+ // has gone dormant. If so, wake it up.
+ if len(t.activeStreams) == 1 && t.keepaliveEnabled {
+ select {
+ case t.awakenKeepalive <- struct{}{}:
+ sendPing = true
+ // Fill the awakenKeepalive channel again as this channel must be
+ // kept non-writable except at the point that the keepalive()
+ // goroutine is waiting either to be awaken or shutdown.
+ t.awakenKeepalive <- struct{}{}
+ default:
+ }
+ }
+ t.mu.Unlock()
+ return sendPing, nil
+ },
+ onOrphaned: cleanup,
+ wq: s.wq,
+ }
+ firstTry := true
+ var ch chan struct{}
+ checkForStreamQuota := func(it interface{}) bool {
+ if t.streamQuota <= 0 { // Can go negative if server decreases it.
+ if firstTry {
+ t.waitingStreams++
+ }
+ ch = t.streamsQuotaAvailable
+ return false
+ }
+ if !firstTry {
+ t.waitingStreams--
+ }
+ t.streamQuota--
+ h := it.(*headerFrame)
+ h.streamID = t.nextID
+ t.nextID += 2
+ s.id = h.streamID
+ s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
+ if t.streamQuota > 0 && t.waitingStreams > 0 {
+ select {
+ case t.streamsQuotaAvailable <- struct{}{}:
+ default:
+ }
+ }
+ return true
+ }
+ for {
+ success, err := t.controlBuf.executeAndPut(checkForStreamQuota, hdr)
+ if err != nil {
+ return nil, err
+ }
+ if success {
+ break
+ }
+ firstTry = false
+ select {
+ case <-ch:
+ case <-s.ctx.Done():
+ return nil, ContextErr(s.ctx.Err())
+ case <-t.goAway:
+ return nil, errStreamDrain
+ case <-t.ctx.Done():
+ return nil, ErrConnClosing
+ }
+ }
if t.statsHandler != nil {
outHeader := &stats.OutHeader{
Client: true,
@@ -525,86 +625,97 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
// CloseStream clears the footprint of a stream when the stream is not needed any more.
// This must not be executed in reader's goroutine.
func (t *http2Client) CloseStream(s *Stream, err error) {
- t.mu.Lock()
- if t.activeStreams == nil {
- t.mu.Unlock()
- return
- }
+ var (
+ rst bool
+ rstCode http2.ErrCode
+ )
if err != nil {
- // notify in-flight streams, before the deletion
- s.write(recvMsg{err: err})
+ rst = true
+ rstCode = http2.ErrCodeCancel
}
- delete(t.activeStreams, s.id)
- if t.state == draining && len(t.activeStreams) == 0 {
- // The transport is draining and s is the last live stream on t.
- t.mu.Unlock()
- t.Close()
+ t.closeStream(s, err, rst, rstCode, nil, nil, false)
+}
+
+func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
+ // Set stream status to done.
+ if s.swapState(streamDone) == streamDone {
+ // If it was already done, return.
return
}
- t.mu.Unlock()
- // rstStream is true in case the stream is being closed at the client-side
- // and the server needs to be intimated about it by sending a RST_STREAM
- // frame.
- // To make sure this frame is written to the wire before the headers of the
- // next stream waiting for streamsQuota, we add to streamsQuota pool only
- // after having acquired the writableChan to send RST_STREAM out (look at
- // the controller() routine).
- var rstStream bool
- var rstError http2.ErrCode
- defer func() {
- // In case, the client doesn't have to send RST_STREAM to server
- // we can safely add back to streamsQuota pool now.
- if !rstStream {
- t.streamsQuota.add(1)
- return
- }
- t.controlBuf.put(&resetStream{s.id, rstError})
- }()
- s.mu.Lock()
- rstStream = s.rstStream
- rstError = s.rstError
- if s.state == streamDone {
- s.mu.Unlock()
- return
+ // status and trailers can be updated here without any synchronization because the stream goroutine will
+ // only read it after it sees an io.EOF error from read or write and we'll write those errors
+ // only after updating this.
+ s.status = st
+ if len(mdata) > 0 {
+ s.trailer = mdata
}
- if !s.headerDone {
+ if err != nil {
+ // This will unblock reads eventually.
+ s.write(recvMsg{err: err})
+ }
+ // This will unblock write.
+ close(s.done)
+ // If headerChan isn't closed, then close it.
+ if atomic.SwapUint32(&s.headerDone, 1) == 0 {
close(s.headerChan)
- s.headerDone = true
}
- s.state = streamDone
- s.mu.Unlock()
- if _, ok := err.(StreamError); ok {
- rstStream = true
- rstError = http2.ErrCodeCancel
+ cleanup := &cleanupStream{
+ streamID: s.id,
+ onWrite: func() {
+ t.mu.Lock()
+ if t.activeStreams != nil {
+ delete(t.activeStreams, s.id)
+ }
+ t.mu.Unlock()
+ if channelz.IsOn() {
+ t.czmu.Lock()
+ if eosReceived {
+ t.streamsSucceeded++
+ } else {
+ t.streamsFailed++
+ }
+ t.czmu.Unlock()
+ }
+ },
+ rst: rst,
+ rstCode: rstCode,
+ }
+ addBackStreamQuota := func(interface{}) bool {
+ t.streamQuota++
+ if t.streamQuota > 0 && t.waitingStreams > 0 {
+ select {
+ case t.streamsQuotaAvailable <- struct{}{}:
+ default:
+ }
+ }
+ return true
}
+ t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
}
// Close kicks off the shutdown process of the transport. This should be called
// only once on a transport. Once it is called, the transport should not be
// accessed any more.
-func (t *http2Client) Close() (err error) {
+func (t *http2Client) Close() error {
t.mu.Lock()
+ // Make sure we only Close once.
if t.state == closing {
t.mu.Unlock()
- return
+ return nil
}
t.state = closing
- t.mu.Unlock()
- t.cancel()
- err = t.conn.Close()
- t.mu.Lock()
streams := t.activeStreams
t.activeStreams = nil
t.mu.Unlock()
+ t.controlBuf.finish()
+ t.cancel()
+ err := t.conn.Close()
+ if channelz.IsOn() {
+ channelz.RemoveEntry(t.channelzID)
+ }
// Notify all active streams.
for _, s := range streams {
- s.mu.Lock()
- if !s.headerDone {
- close(s.headerChan)
- s.headerDone = true
- }
- s.mu.Unlock()
- s.write(recvMsg{err: ErrConnClosing})
+ t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, nil, nil, false)
}
if t.statsHandler != nil {
connEnd := &stats.ConnEnd{
@@ -622,8 +733,8 @@ func (t *http2Client) Close() (err error) {
// closing.
func (t *http2Client) GracefulClose() error {
t.mu.Lock()
- switch t.state {
- case closing, draining:
+ // Make sure we move to draining only from active.
+ if t.state == draining || t.state == closing {
t.mu.Unlock()
return nil
}
@@ -633,108 +744,41 @@ func (t *http2Client) GracefulClose() error {
if active == 0 {
return t.Close()
}
+ t.controlBuf.put(&incomingGoAway{})
return nil
}
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
- select {
- case <-s.ctx.Done():
- return ContextErr(s.ctx.Err())
- case <-t.ctx.Done():
- return ErrConnClosing
- default:
- }
-
- if hdr == nil && data == nil && opts.Last {
- // stream.CloseSend uses this to send an empty frame with endStream=True
- t.controlBuf.put(&dataFrame{streamID: s.id, endStream: true, f: func() {}})
- return nil
- }
- // Add data to header frame so that we can equally distribute data across frames.
- emptyLen := http2MaxFrameLen - len(hdr)
- if emptyLen > len(data) {
- emptyLen = len(data)
- }
- hdr = append(hdr, data[:emptyLen]...)
- data = data[emptyLen:]
- for idx, r := range [][]byte{hdr, data} {
- for len(r) > 0 {
- size := http2MaxFrameLen
- // Wait until the stream has some quota to send the data.
- quotaChan, quotaVer := s.sendQuotaPool.acquireWithVersion()
- sq, err := wait(s.ctx, t.ctx, s.done, s.goAway, quotaChan)
- if err != nil {
- return err
- }
- // Wait until the transport has some quota to send the data.
- tq, err := wait(s.ctx, t.ctx, s.done, s.goAway, t.sendQuotaPool.acquire())
- if err != nil {
- return err
- }
- if sq < size {
- size = sq
- }
- if tq < size {
- size = tq
- }
- if size > len(r) {
- size = len(r)
- }
- p := r[:size]
- ps := len(p)
- if ps < tq {
- // Overbooked transport quota. Return it back.
- t.sendQuotaPool.add(tq - ps)
- }
- // Acquire local send quota to be able to write to the controlBuf.
- ltq, err := wait(s.ctx, t.ctx, s.done, s.goAway, s.localSendQuota.acquire())
- if err != nil {
- if _, ok := err.(ConnectionError); !ok {
- t.sendQuotaPool.add(ps)
- }
- return err
- }
- s.localSendQuota.add(ltq - ps) // It's ok if we make it negative.
- var endStream bool
- // See if this is the last frame to be written.
- if opts.Last {
- if len(r)-size == 0 { // No more data in r after this iteration.
- if idx == 0 { // We're writing data header.
- if len(data) == 0 { // There's no data to follow.
- endStream = true
- }
- } else { // We're writing data.
- endStream = true
- }
- }
- }
- success := func() {
- t.controlBuf.put(&dataFrame{streamID: s.id, endStream: endStream, d: p, f: func() { s.localSendQuota.add(ps) }})
- if ps < sq {
- s.sendQuotaPool.lockedAdd(sq - ps)
- }
- r = r[ps:]
- }
- failure := func() {
- s.sendQuotaPool.lockedAdd(sq)
- }
- if !s.sendQuotaPool.compareAndExecute(quotaVer, success, failure) {
- t.sendQuotaPool.add(ps)
- s.localSendQuota.add(ps)
- }
+ if opts.Last {
+ // If it's the last message, update stream state.
+ if !s.compareAndSwapState(streamActive, streamWriteDone) {
+ return errStreamDone
}
+ } else if s.getState() != streamActive {
+ return errStreamDone
}
- if !opts.Last {
- return nil
- }
- s.mu.Lock()
- if s.state != streamDone {
- s.state = streamWriteDone
+ df := &dataFrame{
+ streamID: s.id,
+ endStream: opts.Last,
+ }
+ if hdr != nil || data != nil { // If it's not an empty data frame.
+ // Add some data to grpc message header so that we can equally
+ // distribute bytes across frames.
+ emptyLen := http2MaxFrameLen - len(hdr)
+ if emptyLen > len(data) {
+ emptyLen = len(data)
+ }
+ hdr = append(hdr, data[:emptyLen]...)
+ data = data[emptyLen:]
+ df.h, df.d = hdr, data
+ // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
+ if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
+ return err
+ }
}
- s.mu.Unlock()
- return nil
+ return t.controlBuf.put(df)
}
func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
@@ -748,34 +792,17 @@ func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
// of stream if the application is requesting data larger in size than
// the window.
func (t *http2Client) adjustWindow(s *Stream, n uint32) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.state == streamDone {
- return
- }
if w := s.fc.maybeAdjust(n); w > 0 {
- // Piggyback connection's window update along.
- if cw := t.fc.resetPendingUpdate(); cw > 0 {
- t.controlBuf.put(&windowUpdate{0, cw})
- }
- t.controlBuf.put(&windowUpdate{s.id, w})
+ t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
}
}
-// updateWindow adjusts the inbound quota for the stream and the transport.
-// Window updates will deliver to the controller for sending when
-// the cumulative quota exceeds the corresponding threshold.
+// updateWindow adjusts the inbound quota for the stream.
+// Window updates will be sent out when the cumulative quota
+// exceeds the corresponding threshold.
func (t *http2Client) updateWindow(s *Stream, n uint32) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.state == streamDone {
- return
- }
if w := s.fc.onRead(n); w > 0 {
- if cw := t.fc.resetPendingUpdate(); cw > 0 {
- t.controlBuf.put(&windowUpdate{0, cw})
- }
- t.controlBuf.put(&windowUpdate{s.id, w})
+ t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
}
}
@@ -787,15 +814,17 @@ func (t *http2Client) updateFlowControl(n uint32) {
for _, s := range t.activeStreams {
s.fc.newLimit(n)
}
- t.initialWindowSize = int32(n)
t.mu.Unlock()
- t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n)})
- t.controlBuf.put(&settings{
- ack: false,
+ updateIWS := func(interface{}) bool {
+ t.initialWindowSize = int32(n)
+ return true
+ }
+ t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
+ t.controlBuf.put(&outgoingSettings{
ss: []http2.Setting{
{
ID: http2.SettingInitialWindowSize,
- Val: uint32(n),
+ Val: n,
},
},
})
@@ -805,7 +834,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
size := f.Header().Length
var sendBDPPing bool
if t.bdpEst != nil {
- sendBDPPing = t.bdpEst.add(uint32(size))
+ sendBDPPing = t.bdpEst.add(size)
}
// Decouple connection's flow control from application's read.
// An update on connection's flow control should not depend on
@@ -816,21 +845,24 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
// active(fast) streams from starving in presence of slow or
// inactive streams.
//
- // Furthermore, if a bdpPing is being sent out we can piggyback
- // connection's window update for the bytes we just received.
+ if w := t.fc.onData(size); w > 0 {
+ t.controlBuf.put(&outgoingWindowUpdate{
+ streamID: 0,
+ increment: w,
+ })
+ }
if sendBDPPing {
- if size != 0 { // Could've been an empty data frame.
- t.controlBuf.put(&windowUpdate{0, uint32(size)})
+ // Avoid excessive ping detection (e.g. in an L7 proxy)
+ // by sending a window update prior to the BDP ping.
+
+ if w := t.fc.reset(); w > 0 {
+ t.controlBuf.put(&outgoingWindowUpdate{
+ streamID: 0,
+ increment: w,
+ })
}
+
t.controlBuf.put(bdpPing)
- } else {
- if err := t.fc.onData(uint32(size)); err != nil {
- t.Close()
- return
- }
- if w := t.fc.onRead(uint32(size)); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
}
// Select the right stream to dispatch.
s, ok := t.getStream(f)
@@ -838,25 +870,15 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
return
}
if size > 0 {
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- return
- }
- if err := s.fc.onData(uint32(size)); err != nil {
- s.rstStream = true
- s.rstError = http2.ErrCodeFlowControl
- s.finish(status.New(codes.Internal, err.Error()))
- s.mu.Unlock()
- s.write(recvMsg{err: io.EOF})
+ if err := s.fc.onData(size); err != nil {
+ t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
- if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
- t.controlBuf.put(&windowUpdate{s.id, w})
+ if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
+ t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
}
}
- s.mu.Unlock()
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
@@ -869,14 +891,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
// The server has closed the stream without sending trailers. Record that
// the read direction is closed, and set the status appropriately.
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- return
- }
- s.finish(status.New(codes.Internal, "server closed the stream without sending trailers"))
- s.mu.Unlock()
- s.write(recvMsg{err: io.EOF})
+ t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
}
}
@@ -885,36 +900,55 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
if !ok {
return
}
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- return
- }
- if !s.headerDone {
- close(s.headerChan)
- s.headerDone = true
+ if f.ErrCode == http2.ErrCodeRefusedStream {
+ // The stream was unprocessed by the server.
+ atomic.StoreUint32(&s.unprocessed, 1)
}
- statusCode, ok := http2ErrConvTab[http2.ErrCode(f.ErrCode)]
+ statusCode, ok := http2ErrConvTab[f.ErrCode]
if !ok {
warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
statusCode = codes.Unknown
}
- s.finish(status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode))
- s.mu.Unlock()
- s.write(recvMsg{err: io.EOF})
+ t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
}
-func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
+func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
if f.IsAck() {
return
}
+ var maxStreams *uint32
var ss []http2.Setting
f.ForeachSetting(func(s http2.Setting) error {
+ if s.ID == http2.SettingMaxConcurrentStreams {
+ maxStreams = new(uint32)
+ *maxStreams = s.Val
+ return nil
+ }
ss = append(ss, s)
return nil
})
- // The settings will be applied once the ack is sent.
- t.controlBuf.put(&settings{ack: true, ss: ss})
+ if isFirst && maxStreams == nil {
+ maxStreams = new(uint32)
+ *maxStreams = math.MaxUint32
+ }
+ sf := &incomingSettings{
+ ss: ss,
+ }
+ if maxStreams == nil {
+ t.controlBuf.put(sf)
+ return
+ }
+ updateStreamQuota := func(interface{}) bool {
+ delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
+ t.maxConcurrentStreams = *maxStreams
+ t.streamQuota += delta
+ if delta > 0 && t.waitingStreams > 0 {
+ close(t.streamsQuotaAvailable) // wake all of them up.
+ t.streamsQuotaAvailable = make(chan struct{}, 1)
+ }
+ return true
+ }
+ t.controlBuf.executeAndPut(updateStreamQuota, sf)
}
func (t *http2Client) handlePing(f *http2.PingFrame) {
@@ -932,7 +966,7 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
t.mu.Lock()
- if t.state != reachable && t.state != draining {
+ if t.state == closing {
t.mu.Unlock()
return
}
@@ -945,12 +979,16 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
t.Close()
return
}
- // A client can receive multiple GoAways from server (look at https://github.com/grpc/grpc-go/issues/1387).
- // The idea is that the first GoAway will be sent with an ID of MaxInt32 and the second GoAway will be sent after an RTT delay
- // with the ID of the last stream the server will process.
- // Therefore, when we get the first GoAway we don't really close any streams. While in case of second GoAway we
- // close all streams created after the second GoAwayId. This way streams that were in-flight while the GoAway from server
- // was being sent don't get killed.
+ // A client can receive multiple GoAways from the server (see
+ // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
+ // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
+ // sent after an RTT delay with the ID of the last stream the server will
+ // process.
+ //
+ // Therefore, when we get the first GoAway we don't necessarily close any
+ // streams. While in case of second GoAway we close all streams created after
+ // the GoAwayId. This way streams that were in-flight while the GoAway from
+ // server was being sent don't get killed.
select {
case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
@@ -963,6 +1001,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
t.setGoAwayReason(f)
close(t.goAway)
t.state = draining
+ t.controlBuf.put(&incomingGoAway{})
}
// All streams with IDs greater than the GoAwayId
// and smaller than the previous GoAway ID should be killed.
@@ -972,7 +1011,9 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
}
for streamID, stream := range t.activeStreams {
if streamID > id && streamID <= upperLimit {
- close(stream.goAway)
+ // The stream was unprocessed by the server.
+ atomic.StoreUint32(&stream.unprocessed, 1)
+ t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
}
}
t.prevGoAwayID = id
@@ -988,11 +1029,11 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// It expects a lock on transport's mutext to be held by
// the caller.
func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
- t.goAwayReason = NoReason
+ t.goAwayReason = GoAwayNoReason
switch f.ErrCode {
case http2.ErrCodeEnhanceYourCalm:
if string(f.DebugData()) == "too_many_pings" {
- t.goAwayReason = TooManyPings
+ t.goAwayReason = GoAwayTooManyPings
}
}
}
@@ -1004,15 +1045,10 @@ func (t *http2Client) GetGoAwayReason() GoAwayReason {
}
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
- id := f.Header().StreamID
- incr := f.Increment
- if id == 0 {
- t.sendQuotaPool.add(int(incr))
- return
- }
- if s, ok := t.getStream(f); ok {
- s.sendQuotaPool.add(int(incr))
- }
+ t.controlBuf.put(&incomingWindowUpdate{
+ streamID: f.Header().StreamID,
+ increment: f.Increment,
+ })
}
// operateHeaders takes action on the decoded headers.
@@ -1021,18 +1057,10 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
if !ok {
return
}
- s.mu.Lock()
- s.bytesReceived = true
- s.mu.Unlock()
+ atomic.StoreUint32(&s.bytesReceived, 1)
var state decodeState
if err := state.decodeResponseHeader(frame); err != nil {
- s.mu.Lock()
- if !s.headerDone {
- close(s.headerChan)
- s.headerDone = true
- }
- s.mu.Unlock()
- s.write(recvMsg{err: err})
+ t.closeStream(s, err, true, http2.ErrCodeProtocol, nil, nil, false)
// Something wrong. Stops reading even when there is remaining.
return
}
@@ -1056,40 +1084,25 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
}
}()
-
- s.mu.Lock()
- if !endStream {
- s.recvCompress = state.encoding
- }
- if !s.headerDone {
- if !endStream && len(state.mdata) > 0 {
- s.header = state.mdata
+ // If headers haven't been received yet.
+ if atomic.SwapUint32(&s.headerDone, 1) == 0 {
+ if !endStream {
+ // Headers frame is not actually a trailers-only frame.
+ isHeader = true
+ // These values can be set without any synchronization because
+ // stream goroutine will read it only after seeing a closed
+ // headerChan which we'll close after setting this.
+ s.recvCompress = state.encoding
+ if len(state.mdata) > 0 {
+ s.header = state.mdata
+ }
}
close(s.headerChan)
- s.headerDone = true
- isHeader = true
}
- if !endStream || s.state == streamDone {
- s.mu.Unlock()
+ if !endStream {
return
}
-
- if len(state.mdata) > 0 {
- s.trailer = state.mdata
- }
- s.finish(state.status())
- s.mu.Unlock()
- s.write(recvMsg{err: io.EOF})
-}
-
-func handleMalformedHTTP2(s *Stream, err error) {
- s.mu.Lock()
- if !s.headerDone {
- close(s.headerChan)
- s.headerDone = true
- }
- s.mu.Unlock()
- s.write(recvMsg{err: err})
+ t.closeStream(s, io.EOF, false, http2.ErrCodeNo, state.status(), state.mdata, true)
}
// reader runs as a separate goroutine in charge of reading data from network
@@ -1099,24 +1112,30 @@ func handleMalformedHTTP2(s *Stream, err error) {
// optimal.
// TODO(zhaoq): Check the validity of the incoming frame sequence.
func (t *http2Client) reader() {
+ defer close(t.readerDone)
// Check the validity of server preface.
frame, err := t.framer.fr.ReadFrame()
if err != nil {
t.Close()
return
}
- atomic.CompareAndSwapUint32(&t.activity, 0, 1)
+ if t.keepaliveEnabled {
+ atomic.CompareAndSwapUint32(&t.activity, 0, 1)
+ }
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
t.Close()
return
}
- t.handleSettings(sf)
+ t.onSuccess()
+ t.handleSettings(sf, true)
// loop to keep reading incoming messages on this transport.
for {
frame, err := t.framer.fr.ReadFrame()
- atomic.CompareAndSwapUint32(&t.activity, 0, 1)
+ if t.keepaliveEnabled {
+ atomic.CompareAndSwapUint32(&t.activity, 0, 1)
+ }
if err != nil {
// Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response
@@ -1127,7 +1146,7 @@ func (t *http2Client) reader() {
t.mu.Unlock()
if s != nil {
// use error detail to provide better err message
- handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.fr.ErrorDetail()))
+ t.closeStream(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.fr.ErrorDetail()), true, http2.ErrCodeProtocol, nil, nil, false)
}
continue
} else {
@@ -1144,7 +1163,7 @@ func (t *http2Client) reader() {
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
- t.handleSettings(frame)
+ t.handleSettings(frame, false)
case *http2.PingFrame:
t.handlePing(frame)
case *http2.GoAwayFrame:
@@ -1157,107 +1176,6 @@ func (t *http2Client) reader() {
}
}
-func (t *http2Client) applySettings(ss []http2.Setting) {
- for _, s := range ss {
- switch s.ID {
- case http2.SettingMaxConcurrentStreams:
- // TODO(zhaoq): This is a hack to avoid significant refactoring of the
- // code to deal with the unrealistic int32 overflow. Probably will try
- // to find a better way to handle this later.
- if s.Val > math.MaxInt32 {
- s.Val = math.MaxInt32
- }
- t.mu.Lock()
- ms := t.maxStreams
- t.maxStreams = int(s.Val)
- t.mu.Unlock()
- t.streamsQuota.add(int(s.Val) - ms)
- case http2.SettingInitialWindowSize:
- t.mu.Lock()
- for _, stream := range t.activeStreams {
- // Adjust the sending quota for each stream.
- stream.sendQuotaPool.addAndUpdate(int(s.Val) - int(t.streamSendQuota))
- }
- t.streamSendQuota = s.Val
- t.mu.Unlock()
- }
- }
-}
-
-// TODO(mmukhi): A lot of this code(and code in other places in the tranpsort layer)
-// is duplicated between the client and the server.
-// The transport layer needs to be refactored to take care of this.
-func (t *http2Client) itemHandler(i item) error {
- var err error
- switch i := i.(type) {
- case *dataFrame:
- err = t.framer.fr.WriteData(i.streamID, i.endStream, i.d)
- if err == nil {
- i.f()
- }
- case *headerFrame:
- t.hBuf.Reset()
- for _, f := range i.hf {
- t.hEnc.WriteField(f)
- }
- endHeaders := false
- first := true
- for !endHeaders {
- size := t.hBuf.Len()
- if size > http2MaxFrameLen {
- size = http2MaxFrameLen
- } else {
- endHeaders = true
- }
- if first {
- first = false
- err = t.framer.fr.WriteHeaders(http2.HeadersFrameParam{
- StreamID: i.streamID,
- BlockFragment: t.hBuf.Next(size),
- EndStream: i.endStream,
- EndHeaders: endHeaders,
- })
- } else {
- err = t.framer.fr.WriteContinuation(
- i.streamID,
- endHeaders,
- t.hBuf.Next(size),
- )
- }
- if err != nil {
- return err
- }
- }
- case *windowUpdate:
- err = t.framer.fr.WriteWindowUpdate(i.streamID, i.increment)
- case *settings:
- if i.ack {
- t.applySettings(i.ss)
- err = t.framer.fr.WriteSettingsAck()
- } else {
- err = t.framer.fr.WriteSettings(i.ss...)
- }
- case *resetStream:
- // If the server needs to be to intimated about stream closing,
- // then we need to make sure the RST_STREAM frame is written to
- // the wire before the headers of the next stream waiting on
- // streamQuota. We ensure this by adding to the streamsQuota pool
- // only after having acquired the writableChan to send RST_STREAM.
- err = t.framer.fr.WriteRSTStream(i.streamID, i.code)
- t.streamsQuota.add(1)
- case *flushIO:
- err = t.framer.writer.Flush()
- case *ping:
- if !i.ack {
- t.bdpEst.timesnap(i.data)
- }
- err = t.framer.fr.WritePing(i.ack, i.data)
- default:
- errorf("transport: http2Client.controller got unexpected item type %v", i)
- }
- return err
-}
-
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
p := &ping{data: [8]byte{}}
@@ -1284,6 +1202,11 @@ func (t *http2Client) keepalive() {
}
} else {
t.mu.Unlock()
+ if channelz.IsOn() {
+ t.czmu.Lock()
+ t.kpCount++
+ t.czmu.Unlock()
+ }
// Send ping.
t.controlBuf.put(p)
}
@@ -1320,3 +1243,56 @@ func (t *http2Client) Error() <-chan struct{} {
func (t *http2Client) GoAway() <-chan struct{} {
return t.goAway
}
+
+func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
+ t.czmu.RLock()
+ s := channelz.SocketInternalMetric{
+ StreamsStarted: t.streamsStarted,
+ StreamsSucceeded: t.streamsSucceeded,
+ StreamsFailed: t.streamsFailed,
+ MessagesSent: t.msgSent,
+ MessagesReceived: t.msgRecv,
+ KeepAlivesSent: t.kpCount,
+ LastLocalStreamCreatedTimestamp: t.lastStreamCreated,
+ LastMessageSentTimestamp: t.lastMsgSent,
+ LastMessageReceivedTimestamp: t.lastMsgRecv,
+ LocalFlowControlWindow: int64(t.fc.getSize()),
+ //socket options
+ LocalAddr: t.localAddr,
+ RemoteAddr: t.remoteAddr,
+ // Security
+ // RemoteName :
+ }
+ t.czmu.RUnlock()
+ s.RemoteFlowControlWindow = t.getOutFlowWindow()
+ return &s
+}
+
+func (t *http2Client) IncrMsgSent() {
+ t.czmu.Lock()
+ t.msgSent++
+ t.lastMsgSent = time.Now()
+ t.czmu.Unlock()
+}
+
+func (t *http2Client) IncrMsgRecv() {
+ t.czmu.Lock()
+ t.msgRecv++
+ t.lastMsgRecv = time.Now()
+ t.czmu.Unlock()
+}
+
+func (t *http2Client) getOutFlowWindow() int64 {
+ resp := make(chan uint32, 1)
+ timer := time.NewTimer(time.Second)
+ defer timer.Stop()
+ t.controlBuf.put(&outFlowControlSizeRequest{resp})
+ select {
+ case sz := <-resp:
+ return int64(sz)
+ case <-t.ctxDone:
+ return -1
+ case <-timer.C:
+ return -2
+ }
+}
diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go
index bad29b8..19acedb 100644
--- a/vendor/google.golang.org/grpc/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/transport/http2_server.go
@@ -24,7 +24,6 @@ import (
"fmt"
"io"
"math"
- "math/rand"
"net"
"strconv"
"sync"
@@ -35,8 +34,12 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
+
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -52,25 +55,25 @@ var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHe
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
ctx context.Context
+ ctxDone <-chan struct{} // Cache the context.Done() chan
cancel context.CancelFunc
conn net.Conn
+ loopy *loopyWriter
+ readerDone chan struct{} // sync point to enable testing.
+ writerDone chan struct{} // sync point to enable testing.
remoteAddr net.Addr
localAddr net.Addr
maxStreamID uint32 // max stream ID ever seen
authInfo credentials.AuthInfo // auth info about the connection
inTapHandle tap.ServerInHandle
framer *framer
- hBuf *bytes.Buffer // the buffer for HPACK encoding
- hEnc *hpack.Encoder // HPACK encoder
// The max number of concurrent streams.
maxStreams uint32
// controlBuf delivers all the control related tasks (e.g., window
// updates, reset streams, and various settings) to the controller.
controlBuf *controlBuffer
- fc *inFlow
- // sendQuotaPool provides flow control to outbound message.
- sendQuotaPool *quotaPool
- stats stats.Handler
+ fc *trInFlow
+ stats stats.Handler
// Flag to keep track of reading activity on transport.
// 1 is true and 0 is false.
activity uint32 // Accessed atomically.
@@ -101,13 +104,27 @@ type http2Server struct {
drainChan chan struct{}
state transportState
activeStreams map[uint32]*Stream
- // the per-stream outbound flow control window size set by the peer.
- streamSendQuota uint32
// idle is the time instant when the connection went idle.
// This is either the beginning of the connection or when the number of
// RPCs go down to 0.
// When the connection is busy, this value is set to 0.
idle time.Time
+
+ // Fields below are for channelz metric collection.
+ channelzID int64 // channelz unique identification number
+ czmu sync.RWMutex
+ kpCount int64
+ // The number of streams that have started, including already finished ones.
+ streamsStarted int64
+ // The number of streams that have ended successfully by sending frame with
+ // EoS bit set.
+ streamsSucceeded int64
+ streamsFailed int64
+ lastStreamCreated time.Time
+ msgSent int64
+ msgRecv int64
+ lastMsgSent time.Time
+ lastMsgRecv time.Time
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@@ -152,12 +169,12 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
Val: uint32(iwz)})
}
if err := framer.fr.WriteSettings(isettings...); err != nil {
- return nil, connectionErrorf(true, err, "transport: %v", err)
+ return nil, connectionErrorf(false, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
- return nil, connectionErrorf(true, err, "transport: %v", err)
+ return nil, connectionErrorf(false, err, "transport: %v", err)
}
}
kp := config.KeepaliveParams
@@ -182,32 +199,30 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime
}
- var buf bytes.Buffer
ctx, cancel := context.WithCancel(context.Background())
t := &http2Server{
ctx: ctx,
cancel: cancel,
+ ctxDone: ctx.Done(),
conn: conn,
remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(),
authInfo: config.AuthInfo,
framer: framer,
- hBuf: &buf,
- hEnc: hpack.NewEncoder(&buf),
+ readerDone: make(chan struct{}),
+ writerDone: make(chan struct{}),
maxStreams: maxStreams,
inTapHandle: config.InTapHandle,
- controlBuf: newControlBuffer(),
- fc: &inFlow{limit: uint32(icwz)},
- sendQuotaPool: newQuotaPool(defaultWindowSize),
+ fc: &trInFlow{limit: uint32(icwz)},
state: reachable,
activeStreams: make(map[uint32]*Stream),
- streamSendQuota: defaultWindowSize,
stats: config.StatsHandler,
kp: kp,
idle: time.Now(),
kep: kep,
initialWindowSize: iwz,
}
+ t.controlBuf = newControlBuffer(t.ctxDone)
if dynamicWindow {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
@@ -222,10 +237,48 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
connBegin := &stats.ConnBegin{}
t.stats.HandleConn(t.ctx, connBegin)
}
+ if channelz.IsOn() {
+ t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, "")
+ }
t.framer.writer.Flush()
+
+ defer func() {
+ if err != nil {
+ t.Close()
+ }
+ }()
+
+ // Check the validity of client preface.
+ preface := make([]byte, len(clientPreface))
+ if _, err := io.ReadFull(t.conn, preface); err != nil {
+ return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
+ }
+ if !bytes.Equal(preface, clientPreface) {
+ return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
+ }
+
+ frame, err := t.framer.fr.ReadFrame()
+ if err == io.EOF || err == io.ErrUnexpectedEOF {
+ return nil, err
+ }
+ if err != nil {
+ return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
+ }
+ atomic.StoreUint32(&t.activity, 1)
+ sf, ok := frame.(*http2.SettingsFrame)
+ if !ok {
+ return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
+ }
+ t.handleSettings(sf)
+
go func() {
- loopyWriter(t.ctx, t.controlBuf, t.itemHandler)
- t.Close()
+ t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
+ t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
+ if err := t.loopy.run(); err != nil {
+ errorf("transport: loopyWriter.run returning. Err: %v", err)
+ }
+ t.conn.Close()
+ close(t.writerDone)
}()
go t.keepalive()
return t, nil
@@ -234,12 +287,16 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
streamID := frame.Header().StreamID
-
var state decodeState
for _, hf := range frame.Fields {
if err := state.processHeaderField(hf); err != nil {
if se, ok := err.(StreamError); ok {
- t.controlBuf.put(&resetStream{streamID, statusCodeConvTab[se.Code]})
+ t.controlBuf.put(&cleanupStream{
+ streamID: streamID,
+ rst: true,
+ rstCode: statusCodeConvTab[se.Code],
+ onWrite: func() {},
+ })
}
return
}
@@ -247,14 +304,14 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
buf := newRecvBuffer()
s := &Stream{
- id: streamID,
- st: t,
- buf: buf,
- fc: &inFlow{limit: uint32(t.initialWindowSize)},
- recvCompress: state.encoding,
- method: state.method,
+ id: streamID,
+ st: t,
+ buf: buf,
+ fc: &inFlow{limit: uint32(t.initialWindowSize)},
+ recvCompress: state.encoding,
+ method: state.method,
+ contentSubtype: state.contentSubtype,
}
-
if frame.StreamEnded() {
// s is just created by the caller. No lock needed.
s.state = streamReadDone
@@ -272,10 +329,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
pr.AuthInfo = t.authInfo
}
s.ctx = peer.NewContext(s.ctx, pr)
- // Cache the current stream to the context so that the server application
- // can find out. Required when the server wants to send some metadata
- // back to the client (unary call only).
- s.ctx = newContextWithStream(s.ctx, s)
// Attach the received metadata to the context.
if len(state.mdata) > 0 {
s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
@@ -294,7 +347,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.ctx, err = t.inTapHandle(s.ctx, info)
if err != nil {
warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
- t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
+ t.controlBuf.put(&cleanupStream{
+ streamID: s.id,
+ rst: true,
+ rstCode: http2.ErrCodeRefusedStream,
+ onWrite: func() {},
+ })
return
}
}
@@ -305,7 +363,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
t.mu.Unlock()
- t.controlBuf.put(&resetStream{streamID, http2.ErrCodeRefusedStream})
+ t.controlBuf.put(&cleanupStream{
+ streamID: streamID,
+ rst: true,
+ rstCode: http2.ErrCodeRefusedStream,
+ onWrite: func() {},
+ })
return
}
if streamID%2 != 1 || streamID <= t.maxStreamID {
@@ -315,13 +378,17 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
return true
}
t.maxStreamID = streamID
- s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
- s.localSendQuota = newQuotaPool(defaultLocalSendQuota)
t.activeStreams[streamID] = s
if len(t.activeStreams) == 1 {
t.idle = time.Time{}
}
t.mu.Unlock()
+ if channelz.IsOn() {
+ t.czmu.Lock()
+ t.streamsStarted++
+ t.lastStreamCreated = time.Now()
+ t.czmu.Unlock()
+ }
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
}
@@ -337,15 +404,23 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
t.stats.HandleRPC(s.ctx, inHeader)
}
+ s.ctxDone = s.ctx.Done()
+ s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
reader: &recvBufferReader{
- ctx: s.ctx,
- recv: s.buf,
+ ctx: s.ctx,
+ ctxDone: s.ctxDone,
+ recv: s.buf,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
},
}
+ // Register the stream with loopy.
+ t.controlBuf.put(&registerStream{
+ streamID: s.id,
+ wq: s.wq,
+ })
handle(s)
return
}
@@ -354,53 +429,26 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
// typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
- // Check the validity of client preface.
- preface := make([]byte, len(clientPreface))
- if _, err := io.ReadFull(t.conn, preface); err != nil {
- // Only log if it isn't a simple tcp accept check (ie: tcp balancer doing open/close socket)
- if err != io.EOF {
- errorf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
- }
- t.Close()
- return
- }
- if !bytes.Equal(preface, clientPreface) {
- errorf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
- t.Close()
- return
- }
-
- frame, err := t.framer.fr.ReadFrame()
- if err == io.EOF || err == io.ErrUnexpectedEOF {
- t.Close()
- return
- }
- if err != nil {
- errorf("transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
- t.Close()
- return
- }
- atomic.StoreUint32(&t.activity, 1)
- sf, ok := frame.(*http2.SettingsFrame)
- if !ok {
- errorf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
- t.Close()
- return
- }
- t.handleSettings(sf)
-
+ defer close(t.readerDone)
for {
frame, err := t.framer.fr.ReadFrame()
atomic.StoreUint32(&t.activity, 1)
if err != nil {
if se, ok := err.(http2.StreamError); ok {
+ warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
t.mu.Lock()
s := t.activeStreams[se.StreamID]
t.mu.Unlock()
if s != nil {
- t.closeStream(s)
+ t.closeStream(s, true, se.Code, nil, false)
+ } else {
+ t.controlBuf.put(&cleanupStream{
+ streamID: se.StreamID,
+ rst: true,
+ rstCode: se.Code,
+ onWrite: func() {},
+ })
}
- t.controlBuf.put(&resetStream{se.StreamID, se.Code})
continue
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
@@ -454,33 +502,20 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
// of stream if the application is requesting data larger in size than
// the window.
func (t *http2Server) adjustWindow(s *Stream, n uint32) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.state == streamDone {
- return
- }
if w := s.fc.maybeAdjust(n); w > 0 {
- if cw := t.fc.resetPendingUpdate(); cw > 0 {
- t.controlBuf.put(&windowUpdate{0, cw})
- }
- t.controlBuf.put(&windowUpdate{s.id, w})
+ t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
}
+
}
// updateWindow adjusts the inbound quota for the stream and the transport.
// Window updates will deliver to the controller for sending when
// the cumulative quota exceeds the corresponding threshold.
func (t *http2Server) updateWindow(s *Stream, n uint32) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.state == streamDone {
- return
- }
if w := s.fc.onRead(n); w > 0 {
- if cw := t.fc.resetPendingUpdate(); cw > 0 {
- t.controlBuf.put(&windowUpdate{0, cw})
- }
- t.controlBuf.put(&windowUpdate{s.id, w})
+ t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
+ increment: w,
+ })
}
}
@@ -494,13 +529,15 @@ func (t *http2Server) updateFlowControl(n uint32) {
}
t.initialWindowSize = int32(n)
t.mu.Unlock()
- t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n)})
- t.controlBuf.put(&settings{
- ack: false,
+ t.controlBuf.put(&outgoingWindowUpdate{
+ streamID: 0,
+ increment: t.fc.newLimit(n),
+ })
+ t.controlBuf.put(&outgoingSettings{
ss: []http2.Setting{
{
ID: http2.SettingInitialWindowSize,
- Val: uint32(n),
+ Val: n,
},
},
})
@@ -511,7 +548,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
size := f.Header().Length
var sendBDPPing bool
if t.bdpEst != nil {
- sendBDPPing = t.bdpEst.add(uint32(size))
+ sendBDPPing = t.bdpEst.add(size)
}
// Decouple connection's flow control from application's read.
// An update on connection's flow control should not depend on
@@ -521,23 +558,22 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
// Decoupling the connection flow control will prevent other
// active(fast) streams from starving in presence of slow or
// inactive streams.
- //
- // Furthermore, if a bdpPing is being sent out we can piggyback
- // connection's window update for the bytes we just received.
+ if w := t.fc.onData(size); w > 0 {
+ t.controlBuf.put(&outgoingWindowUpdate{
+ streamID: 0,
+ increment: w,
+ })
+ }
if sendBDPPing {
- if size != 0 { // Could be an empty frame.
- t.controlBuf.put(&windowUpdate{0, uint32(size)})
+ // Avoid excessive ping detection (e.g. in an L7 proxy)
+ // by sending a window update prior to the BDP ping.
+ if w := t.fc.reset(); w > 0 {
+ t.controlBuf.put(&outgoingWindowUpdate{
+ streamID: 0,
+ increment: w,
+ })
}
t.controlBuf.put(bdpPing)
- } else {
- if err := t.fc.onData(uint32(size)); err != nil {
- errorf("transport: http2Server %v", err)
- t.Close()
- return
- }
- if w := t.fc.onRead(uint32(size)); w > 0 {
- t.controlBuf.put(&windowUpdate{0, w})
- }
}
// Select the right stream to dispatch.
s, ok := t.getStream(f)
@@ -545,23 +581,15 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
return
}
if size > 0 {
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- return
- }
- if err := s.fc.onData(uint32(size)); err != nil {
- s.mu.Unlock()
- t.closeStream(s)
- t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
+ if err := s.fc.onData(size); err != nil {
+ t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
- if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
- t.controlBuf.put(&windowUpdate{s.id, w})
+ if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
+ t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
}
}
- s.mu.Unlock()
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
@@ -573,11 +601,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
// Received the end of stream from the client.
- s.mu.Lock()
- if s.state != streamDone {
- s.state = streamReadDone
- }
- s.mu.Unlock()
+ s.compareAndSwapState(streamActive, streamReadDone)
s.write(recvMsg{err: io.EOF})
}
}
@@ -587,7 +611,7 @@ func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
if !ok {
return
}
- t.closeStream(s)
+ t.closeStream(s, false, 0, nil, false)
}
func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
@@ -599,21 +623,9 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
ss = append(ss, s)
return nil
})
- t.controlBuf.put(&settings{ack: true, ss: ss})
-}
-
-func (t *http2Server) applySettings(ss []http2.Setting) {
- for _, s := range ss {
- if s.ID == http2.SettingInitialWindowSize {
- t.mu.Lock()
- for _, stream := range t.activeStreams {
- stream.sendQuotaPool.addAndUpdate(int(s.Val) - int(t.streamSendQuota))
- }
- t.streamSendQuota = s.Val
- t.mu.Unlock()
- }
-
- }
+ t.controlBuf.put(&incomingSettings{
+ ss: ss,
+ })
}
const (
@@ -666,39 +678,37 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection.
- errorf("transport: Got to too many pings from the client, closing the connection.")
+ errorf("transport: Got too many pings from the client, closing the connection.")
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
}
}
func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
- id := f.Header().StreamID
- incr := f.Increment
- if id == 0 {
- t.sendQuotaPool.add(int(incr))
- return
- }
- if s, ok := t.getStream(f); ok {
- s.sendQuotaPool.add(int(incr))
+ t.controlBuf.put(&incomingWindowUpdate{
+ streamID: f.Header().StreamID,
+ increment: f.Increment,
+ })
+}
+
+func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
+ for k, vv := range md {
+ if isReservedHeader(k) {
+ // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
+ continue
+ }
+ for _, v := range vv {
+ headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
+ }
}
+ return headerFields
}
// WriteHeader sends the header metedata md back to the client.
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
- select {
- case <-s.ctx.Done():
- return ContextErr(s.ctx.Err())
- case <-t.ctx.Done():
- return ErrConnClosing
- default:
- }
-
- s.mu.Lock()
- if s.headerOk || s.state == streamDone {
- s.mu.Unlock()
+ if s.updateHeaderSent() || s.getState() == streamDone {
return ErrIllegalHeaderWrite
}
- s.headerOk = true
+ s.hdrMu.Lock()
if md.Len() > 0 {
if s.header.Len() > 0 {
s.header = metadata.Join(s.header, md)
@@ -706,37 +716,35 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
s.header = md
}
}
- md = s.header
- s.mu.Unlock()
- // TODO(mmukhi): Benchmark if the perfomance gets better if count the metadata and other header fields
+ t.writeHeaderLocked(s)
+ s.hdrMu.Unlock()
+ return nil
+}
+
+func (t *http2Server) writeHeaderLocked(s *Stream) {
+ // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
- headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
+ headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
if s.sendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
}
- for k, vv := range md {
- if isReservedHeader(k) {
- // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
- continue
- }
- for _, v := range vv {
- headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
- }
- }
+ headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
t.controlBuf.put(&headerFrame{
streamID: s.id,
hf: headerFields,
endStream: false,
+ onWrite: func() {
+ atomic.StoreUint32(&t.resetPingStrikes, 1)
+ },
})
if t.stats != nil {
- outHeader := &stats.OutHeader{
- //WireLength: // TODO(mmukhi): Revisit this later, if needed.
- }
+ // Note: WireLength is not set in outHeader.
+ // TODO(mmukhi): Revisit this later, if needed.
+ outHeader := &stats.OutHeader{}
t.stats.HandleRPC(s.Context(), outHeader)
}
- return nil
}
// WriteStatus sends stream status to the client and terminates the stream.
@@ -744,37 +752,20 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
// OK is adopted.
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
- select {
- case <-t.ctx.Done():
- return ErrConnClosing
- default:
- }
-
- var headersSent, hasHeader bool
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
+ if s.getState() == streamDone {
return nil
}
- if s.headerOk {
- headersSent = true
- }
- if s.header.Len() > 0 {
- hasHeader = true
- }
- s.mu.Unlock()
-
- if !headersSent && hasHeader {
- t.WriteHeader(s, nil)
- headersSent = true
- }
-
- // TODO(mmukhi): Benchmark if the perfomance gets better if count the metadata and other header fields
+ s.hdrMu.Lock()
+ // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
// first and create a slice of that exact size.
headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
- if !headersSent {
- headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
- headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
+ if !s.updateHeaderSent() { // No headers have been sent.
+ if len(s.header) > 0 { // Send a separate header frame.
+ t.writeHeaderLocked(s)
+ } else { // Send a trailer only response.
+ headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
+ headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
+ }
}
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
@@ -783,126 +774,75 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
- panic(err)
+ grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
+ } else {
+ headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
}
-
- headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
}
// Attach the trailer metadata.
- for k, vv := range s.trailer {
- // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
- if isReservedHeader(k) {
- continue
- }
- for _, v := range vv {
- headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
- }
- }
- t.controlBuf.put(&headerFrame{
+ headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
+ trailingHeader := &headerFrame{
streamID: s.id,
hf: headerFields,
endStream: true,
- })
+ onWrite: func() {
+ atomic.StoreUint32(&t.resetPingStrikes, 1)
+ },
+ }
+ s.hdrMu.Unlock()
+ t.closeStream(s, false, 0, trailingHeader, true)
if t.stats != nil {
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
}
- t.closeStream(s)
return nil
}
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
-func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) (err error) {
- select {
- case <-s.ctx.Done():
- return ContextErr(s.ctx.Err())
- case <-t.ctx.Done():
- return ErrConnClosing
- default:
- }
-
- var writeHeaderFrame bool
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- return streamErrorf(codes.Unknown, "the stream has been done")
- }
- if !s.headerOk {
- writeHeaderFrame = true
- }
- s.mu.Unlock()
- if writeHeaderFrame {
- t.WriteHeader(s, nil)
+func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
+ if !s.isHeaderSent() { // Headers haven't been written yet.
+ if err := t.WriteHeader(s, nil); err != nil {
+ // TODO(mmukhi, dfawley): Make sure this is the right code to return.
+ return streamErrorf(codes.Internal, "transport: %v", err)
+ }
+ } else {
+ // Writing headers checks for this condition.
+ if s.getState() == streamDone {
+ // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
+ s.cancel()
+ select {
+ case <-t.ctx.Done():
+ return ErrConnClosing
+ default:
+ }
+ return ContextErr(s.ctx.Err())
+ }
}
- // Add data to header frame so that we can equally distribute data across frames.
+ // Add some data to header frame so that we can equally distribute bytes across frames.
emptyLen := http2MaxFrameLen - len(hdr)
if emptyLen > len(data) {
emptyLen = len(data)
}
hdr = append(hdr, data[:emptyLen]...)
data = data[emptyLen:]
- for _, r := range [][]byte{hdr, data} {
- for len(r) > 0 {
- size := http2MaxFrameLen
- // Wait until the stream has some quota to send the data.
- quotaChan, quotaVer := s.sendQuotaPool.acquireWithVersion()
- sq, err := wait(s.ctx, t.ctx, nil, nil, quotaChan)
- if err != nil {
- return err
- }
- // Wait until the transport has some quota to send the data.
- tq, err := wait(s.ctx, t.ctx, nil, nil, t.sendQuotaPool.acquire())
- if err != nil {
- return err
- }
- if sq < size {
- size = sq
- }
- if tq < size {
- size = tq
- }
- if size > len(r) {
- size = len(r)
- }
- p := r[:size]
- ps := len(p)
- if ps < tq {
- // Overbooked transport quota. Return it back.
- t.sendQuotaPool.add(tq - ps)
- }
- // Acquire local send quota to be able to write to the controlBuf.
- ltq, err := wait(s.ctx, t.ctx, nil, nil, s.localSendQuota.acquire())
- if err != nil {
- if _, ok := err.(ConnectionError); !ok {
- t.sendQuotaPool.add(ps)
- }
- return err
- }
- s.localSendQuota.add(ltq - ps) // It's ok we make this negative.
- // Reset ping strikes when sending data since this might cause
- // the peer to send ping.
+ df := &dataFrame{
+ streamID: s.id,
+ h: hdr,
+ d: data,
+ onEachWrite: func() {
atomic.StoreUint32(&t.resetPingStrikes, 1)
- success := func() {
- t.controlBuf.put(&dataFrame{streamID: s.id, endStream: false, d: p, f: func() {
- s.localSendQuota.add(ps)
- }})
- if ps < sq {
- // Overbooked stream quota. Return it back.
- s.sendQuotaPool.lockedAdd(sq - ps)
- }
- r = r[ps:]
- }
- failure := func() {
- s.sendQuotaPool.lockedAdd(sq)
- }
- if !s.sendQuotaPool.compareAndExecute(quotaVer, success, failure) {
- t.sendQuotaPool.add(ps)
- s.localSendQuota.add(ps)
- }
+ },
+ }
+ if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
+ select {
+ case <-t.ctx.Done():
+ return ErrConnClosing
+ default:
}
+ return ContextErr(s.ctx.Err())
}
- return nil
+ return t.controlBuf.put(df)
}
// keepalive running in a separate goroutine does the following:
@@ -947,7 +887,7 @@ func (t *http2Server) keepalive() {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection.
t.drain(http2.ErrCodeNo, []byte{})
- // Reseting the timer so that the clean-up doesn't deadlock.
+ // Resetting the timer so that the clean-up doesn't deadlock.
maxIdle.Reset(infinity)
return
}
@@ -959,7 +899,7 @@ func (t *http2Server) keepalive() {
case <-maxAge.C:
// Close the connection after grace period.
t.Close()
- // Reseting the timer so that the clean-up doesn't deadlock.
+ // Resetting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
case <-t.ctx.Done():
}
@@ -972,11 +912,16 @@ func (t *http2Server) keepalive() {
}
if pingSent {
t.Close()
- // Reseting the timer so that the clean-up doesn't deadlock.
+ // Resetting the timer so that the clean-up doesn't deadlock.
keepalive.Reset(infinity)
return
}
pingSent = true
+ if channelz.IsOn() {
+ t.czmu.Lock()
+ t.kpCount++
+ t.czmu.Unlock()
+ }
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
case <-t.ctx.Done():
@@ -985,127 +930,6 @@ func (t *http2Server) keepalive() {
}
}
-var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
-
-// TODO(mmukhi): A lot of this code(and code in other places in the tranpsort layer)
-// is duplicated between the client and the server.
-// The transport layer needs to be refactored to take care of this.
-func (t *http2Server) itemHandler(i item) error {
- switch i := i.(type) {
- case *dataFrame:
- if err := t.framer.fr.WriteData(i.streamID, i.endStream, i.d); err != nil {
- return err
- }
- i.f()
- return nil
- case *headerFrame:
- t.hBuf.Reset()
- for _, f := range i.hf {
- t.hEnc.WriteField(f)
- }
- first := true
- endHeaders := false
- for !endHeaders {
- size := t.hBuf.Len()
- if size > http2MaxFrameLen {
- size = http2MaxFrameLen
- } else {
- endHeaders = true
- }
- var err error
- if first {
- first = false
- err = t.framer.fr.WriteHeaders(http2.HeadersFrameParam{
- StreamID: i.streamID,
- BlockFragment: t.hBuf.Next(size),
- EndStream: i.endStream,
- EndHeaders: endHeaders,
- })
- } else {
- err = t.framer.fr.WriteContinuation(
- i.streamID,
- endHeaders,
- t.hBuf.Next(size),
- )
- }
- if err != nil {
- return err
- }
- }
- atomic.StoreUint32(&t.resetPingStrikes, 1)
- return nil
- case *windowUpdate:
- return t.framer.fr.WriteWindowUpdate(i.streamID, i.increment)
- case *settings:
- if i.ack {
- t.applySettings(i.ss)
- return t.framer.fr.WriteSettingsAck()
- }
- return t.framer.fr.WriteSettings(i.ss...)
- case *resetStream:
- return t.framer.fr.WriteRSTStream(i.streamID, i.code)
- case *goAway:
- t.mu.Lock()
- if t.state == closing {
- t.mu.Unlock()
- // The transport is closing.
- return fmt.Errorf("transport: Connection closing")
- }
- sid := t.maxStreamID
- if !i.headsUp {
- // Stop accepting more streams now.
- t.state = draining
- t.mu.Unlock()
- if err := t.framer.fr.WriteGoAway(sid, i.code, i.debugData); err != nil {
- return err
- }
- if i.closeConn {
- // Abruptly close the connection following the GoAway (via
- // loopywriter). But flush out what's inside the buffer first.
- t.framer.writer.Flush()
- return fmt.Errorf("transport: Connection closing")
- }
- return nil
- }
- t.mu.Unlock()
- // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
- // Follow that with a ping and wait for the ack to come back or a timer
- // to expire. During this time accept new streams since they might have
- // originated before the GoAway reaches the client.
- // After getting the ack or timer expiration send out another GoAway this
- // time with an ID of the max stream server intends to process.
- if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
- return err
- }
- if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
- return err
- }
- go func() {
- timer := time.NewTimer(time.Minute)
- defer timer.Stop()
- select {
- case <-t.drainChan:
- case <-timer.C:
- case <-t.ctx.Done():
- return
- }
- t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData})
- }()
- return nil
- case *flushIO:
- return t.framer.writer.Flush()
- case *ping:
- if !i.ack {
- t.bdpEst.timesnap(i.data)
- }
- return t.framer.fr.WritePing(i.ack, i.data)
- default:
- err := status.Errorf(codes.Internal, "transport: http2Server.controller got unexpected item type %t", i)
- errorf("%v", err)
- return err
- }
-}
-
// Close starts shutting down the http2Server transport.
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
// could cause some resource issue. Revisit this later.
@@ -1119,8 +943,12 @@ func (t *http2Server) Close() error {
streams := t.activeStreams
t.activeStreams = nil
t.mu.Unlock()
+ t.controlBuf.finish()
t.cancel()
err := t.conn.Close()
+ if channelz.IsOn() {
+ channelz.RemoveEntry(t.channelzID)
+ }
// Cancel all active streams.
for _, s := range streams {
s.cancel()
@@ -1134,27 +962,45 @@ func (t *http2Server) Close() error {
// closeStream clears the footprint of a stream when the stream is not needed
// any more.
-func (t *http2Server) closeStream(s *Stream) {
- t.mu.Lock()
- delete(t.activeStreams, s.id)
- if len(t.activeStreams) == 0 {
- t.idle = time.Now()
- }
- if t.state == draining && len(t.activeStreams) == 0 {
- defer t.Close()
+func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
+ if s.swapState(streamDone) == streamDone {
+ // If the stream was already done, return.
+ return
}
- t.mu.Unlock()
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
// called to interrupt the potential blocking on other goroutines.
s.cancel()
- s.mu.Lock()
- if s.state == streamDone {
- s.mu.Unlock()
- return
+ cleanup := &cleanupStream{
+ streamID: s.id,
+ rst: rst,
+ rstCode: rstCode,
+ onWrite: func() {
+ t.mu.Lock()
+ if t.activeStreams != nil {
+ delete(t.activeStreams, s.id)
+ if len(t.activeStreams) == 0 {
+ t.idle = time.Now()
+ }
+ }
+ t.mu.Unlock()
+ if channelz.IsOn() {
+ t.czmu.Lock()
+ if eosReceived {
+ t.streamsSucceeded++
+ } else {
+ t.streamsFailed++
+ }
+ t.czmu.Unlock()
+ }
+ },
+ }
+ if hdr != nil {
+ hdr.cleanup = cleanup
+ t.controlBuf.put(hdr)
+ } else {
+ t.controlBuf.put(cleanup)
}
- s.state = streamDone
- s.mu.Unlock()
}
func (t *http2Server) RemoteAddr() net.Addr {
@@ -1175,7 +1021,115 @@ func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
}
-var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
+var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
+
+// Handles outgoing GoAway and returns true if loopy needs to put itself
+// in draining mode.
+func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
+ t.mu.Lock()
+ if t.state == closing { // TODO(mmukhi): This seems unnecessary.
+ t.mu.Unlock()
+ // The transport is closing.
+ return false, ErrConnClosing
+ }
+ sid := t.maxStreamID
+ if !g.headsUp {
+ // Stop accepting more streams now.
+ t.state = draining
+ if len(t.activeStreams) == 0 {
+ g.closeConn = true
+ }
+ t.mu.Unlock()
+ if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
+ return false, err
+ }
+ if g.closeConn {
+ // Abruptly close the connection following the GoAway (via
+ // loopywriter). But flush out what's inside the buffer first.
+ t.framer.writer.Flush()
+ return false, fmt.Errorf("transport: Connection closing")
+ }
+ return true, nil
+ }
+ t.mu.Unlock()
+ // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
+ // Follow that with a ping and wait for the ack to come back or a timer
+ // to expire. During this time accept new streams since they might have
+ // originated before the GoAway reaches the client.
+ // After getting the ack or timer expiration send out another GoAway this
+ // time with an ID of the max stream server intends to process.
+ if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
+ return false, err
+ }
+ if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
+ return false, err
+ }
+ go func() {
+ timer := time.NewTimer(time.Minute)
+ defer timer.Stop()
+ select {
+ case <-t.drainChan:
+ case <-timer.C:
+ case <-t.ctx.Done():
+ return
+ }
+ t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
+ }()
+ return false, nil
+}
+
+func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
+ t.czmu.RLock()
+ s := channelz.SocketInternalMetric{
+ StreamsStarted: t.streamsStarted,
+ StreamsSucceeded: t.streamsSucceeded,
+ StreamsFailed: t.streamsFailed,
+ MessagesSent: t.msgSent,
+ MessagesReceived: t.msgRecv,
+ KeepAlivesSent: t.kpCount,
+ LastRemoteStreamCreatedTimestamp: t.lastStreamCreated,
+ LastMessageSentTimestamp: t.lastMsgSent,
+ LastMessageReceivedTimestamp: t.lastMsgRecv,
+ LocalFlowControlWindow: int64(t.fc.getSize()),
+ //socket options
+ LocalAddr: t.localAddr,
+ RemoteAddr: t.remoteAddr,
+ // Security
+ // RemoteName :
+ }
+ t.czmu.RUnlock()
+ s.RemoteFlowControlWindow = t.getOutFlowWindow()
+ return &s
+}
+
+func (t *http2Server) IncrMsgSent() {
+ t.czmu.Lock()
+ t.msgSent++
+ t.lastMsgSent = time.Now()
+ t.czmu.Unlock()
+}
+
+func (t *http2Server) IncrMsgRecv() {
+ t.czmu.Lock()
+ t.msgRecv++
+ t.lastMsgRecv = time.Now()
+ t.czmu.Unlock()
+}
+
+func (t *http2Server) getOutFlowWindow() int64 {
+ resp := make(chan uint32)
+ timer := time.NewTimer(time.Second)
+ defer timer.Stop()
+ t.controlBuf.put(&outFlowControlSizeRequest{resp})
+ select {
+ case sz := <-resp:
+ return int64(sz)
+ case <-t.ctxDone:
+ return -1
+ case <-timer.C:
+ return -2
+ }
+}
func getJitter(v time.Duration) time.Duration {
if v == infinity {
@@ -1183,6 +1137,6 @@ func getJitter(v time.Duration) time.Duration {
}
// Generate a jitter between +/- 10% of the value.
r := int64(v / 10)
- j := rgen.Int63n(2*r) - r
+ j := grpcrand.Int63n(2*r) - r
return time.Duration(j)
}
diff --git a/vendor/google.golang.org/grpc/transport/http_util.go b/vendor/google.golang.org/grpc/transport/http_util.go
index 39f878c..7d15c7d 100644
--- a/vendor/google.golang.org/grpc/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/transport/http_util.go
@@ -23,12 +23,12 @@ import (
"bytes"
"encoding/base64"
"fmt"
- "io"
"net"
"net/http"
"strconv"
"strings"
"time"
+ "unicode/utf8"
"github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
@@ -46,6 +46,12 @@ const (
// http2IOBufSize specifies the buffer size for sending frames.
defaultWriteBufSize = 32 * 1024
defaultReadBufSize = 32 * 1024
+ // baseContentType is the base content-type for gRPC. This is a valid
+ // content-type on it's own, but can also include a content-subtype such as
+ // "proto" as a suffix after "+" or ";". See
+ // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
+ // for more details.
+ baseContentType = "application/grpc"
)
var (
@@ -64,7 +70,7 @@ var (
http2.ErrCodeConnect: codes.Internal,
http2.ErrCodeEnhanceYourCalm: codes.ResourceExhausted,
http2.ErrCodeInadequateSecurity: codes.PermissionDenied,
- http2.ErrCodeHTTP11Required: codes.FailedPrecondition,
+ http2.ErrCodeHTTP11Required: codes.Internal,
}
statusCodeConvTab = map[codes.Code]http2.ErrCode{
codes.Internal: http2.ErrCodeInternal,
@@ -111,9 +117,10 @@ type decodeState struct {
timeout time.Duration
method string
// key-value metadata map from the peer.
- mdata map[string][]string
- statsTags []byte
- statsTrace []byte
+ mdata map[string][]string
+ statsTags []byte
+ statsTrace []byte
+ contentSubtype string
}
// isReservedHeader checks whether hdr belongs to HTTP2 headers
@@ -125,6 +132,7 @@ func isReservedHeader(hdr string) bool {
}
switch hdr {
case "content-type",
+ "user-agent",
"grpc-message-type",
"grpc-encoding",
"grpc-message",
@@ -138,28 +146,55 @@ func isReservedHeader(hdr string) bool {
}
}
-// isWhitelistedPseudoHeader checks whether hdr belongs to HTTP2 pseudoheaders
-// that should be propagated into metadata visible to users.
-func isWhitelistedPseudoHeader(hdr string) bool {
+// isWhitelistedHeader checks whether hdr should be propagated
+// into metadata visible to users.
+func isWhitelistedHeader(hdr string) bool {
switch hdr {
- case ":authority":
+ case ":authority", "user-agent":
return true
default:
return false
}
}
-func validContentType(t string) bool {
- e := "application/grpc"
- if !strings.HasPrefix(t, e) {
- return false
+// contentSubtype returns the content-subtype for the given content-type. The
+// given content-type must be a valid content-type that starts with
+// "application/grpc". A content-subtype will follow "application/grpc" after a
+// "+" or ";". See
+// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
+// more details.
+//
+// If contentType is not a valid content-type for gRPC, the boolean
+// will be false, otherwise true. If content-type == "application/grpc",
+// "application/grpc+", or "application/grpc;", the boolean will be true,
+// but no content-subtype will be returned.
+//
+// contentType is assumed to be lowercase already.
+func contentSubtype(contentType string) (string, bool) {
+ if contentType == baseContentType {
+ return "", true
+ }
+ if !strings.HasPrefix(contentType, baseContentType) {
+ return "", false
+ }
+ // guaranteed since != baseContentType and has baseContentType prefix
+ switch contentType[len(baseContentType)] {
+ case '+', ';':
+ // this will return true for "application/grpc+" or "application/grpc;"
+ // which the previous validContentType function tested to be valid, so we
+ // just say that no content-subtype is specified in this case
+ return contentType[len(baseContentType)+1:], true
+ default:
+ return "", false
}
- // Support variations on the content-type
- // (e.g. "application/grpc+blah", "application/grpc;blah").
- if len(t) > len(e) && t[len(e)] != '+' && t[len(e)] != ';' {
- return false
+}
+
+// contentSubtype is assumed to be lowercase
+func contentType(contentSubtype string) string {
+ if contentSubtype == "" {
+ return baseContentType
}
- return true
+ return baseContentType + "+" + contentSubtype
}
func (d *decodeState) status() *status.Status {
@@ -228,9 +263,9 @@ func (d *decodeState) decodeResponseHeader(frame *http2.MetaHeadersFrame) error
// gRPC status doesn't exist and http status is OK.
// Set rawStatusCode to be unknown and return nil error.
// So that, if the stream has ended this Unknown status
- // will be propogated to the user.
+ // will be propagated to the user.
// Otherwise, it will be ignored. In which case, status from
- // a later trailer, that has StreamEnded flag set, is propogated.
+ // a later trailer, that has StreamEnded flag set, is propagated.
code := int(codes.Unknown)
d.rawStatusCode = &code
return nil
@@ -247,9 +282,16 @@ func (d *decodeState) addMetadata(k, v string) {
func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
switch f.Name {
case "content-type":
- if !validContentType(f.Value) {
- return streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value)
+ contentSubtype, validContentType := contentSubtype(f.Value)
+ if !validContentType {
+ return streamErrorf(codes.Internal, "transport: received the unexpected content-type %q", f.Value)
}
+ d.contentSubtype = contentSubtype
+ // TODO: do we want to propagate the whole content-type in the metadata,
+ // or come up with a way to just propagate the content-subtype if it was set?
+ // ie {"content-type": "application/grpc+proto"} or {"content-subtype": "proto"}
+ // in the metadata?
+ d.addMetadata(f.Name, f.Value)
case "grpc-encoding":
d.encoding = f.Value
case "grpc-status":
@@ -299,7 +341,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
d.statsTrace = v
d.addMetadata(f.Name, string(v))
default:
- if isReservedHeader(f.Name) && !isWhitelistedPseudoHeader(f.Name) {
+ if isReservedHeader(f.Name) && !isWhitelistedHeader(f.Name) {
break
}
v, err := decodeMetadataHeader(f.Name, f.Value)
@@ -307,7 +349,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err)
return nil
}
- d.addMetadata(f.Name, string(v))
+ d.addMetadata(f.Name, v)
}
return nil
}
@@ -396,16 +438,17 @@ func decodeTimeout(s string) (time.Duration, error) {
const (
spaceByte = ' '
- tildaByte = '~'
+ tildeByte = '~'
percentByte = '%'
)
// encodeGrpcMessage is used to encode status code in header field
-// "grpc-message".
-// It checks to see if each individual byte in msg is an
-// allowable byte, and then either percent encoding or passing it through.
-// When percent encoding, the byte is converted into hexadecimal notation
-// with a '%' prepended.
+// "grpc-message". It does percent encoding and also replaces invalid utf-8
+// characters with Unicode replacement character.
+//
+// It checks to see if each individual byte in msg is an allowable byte, and
+// then either percent encoding or passing it through. When percent encoding,
+// the byte is converted into hexadecimal notation with a '%' prepended.
func encodeGrpcMessage(msg string) string {
if msg == "" {
return ""
@@ -413,7 +456,7 @@ func encodeGrpcMessage(msg string) string {
lenMsg := len(msg)
for i := 0; i < lenMsg; i++ {
c := msg[i]
- if !(c >= spaceByte && c < tildaByte && c != percentByte) {
+ if !(c >= spaceByte && c <= tildeByte && c != percentByte) {
return encodeGrpcMessageUnchecked(msg)
}
}
@@ -422,14 +465,26 @@ func encodeGrpcMessage(msg string) string {
func encodeGrpcMessageUnchecked(msg string) string {
var buf bytes.Buffer
- lenMsg := len(msg)
- for i := 0; i < lenMsg; i++ {
- c := msg[i]
- if c >= spaceByte && c < tildaByte && c != percentByte {
- buf.WriteByte(c)
- } else {
- buf.WriteString(fmt.Sprintf("%%%02X", c))
+ for len(msg) > 0 {
+ r, size := utf8.DecodeRuneInString(msg)
+ for _, b := range []byte(string(r)) {
+ if size > 1 {
+ // If size > 1, r is not ascii. Always do percent encoding.
+ buf.WriteString(fmt.Sprintf("%%%02X", b))
+ continue
+ }
+
+ // The for loop is necessary even if size == 1. r could be
+ // utf8.RuneError.
+ //
+ // fmt.Sprintf("%%%02X", utf8.RuneError) gives "%FFFD".
+ if b >= spaceByte && b <= tildeByte && b != percentByte {
+ buf.WriteByte(b)
+ } else {
+ buf.WriteString(fmt.Sprintf("%%%02X", b))
+ }
}
+ msg = msg[size:]
}
return buf.String()
}
@@ -468,19 +523,67 @@ func decodeGrpcMessageUnchecked(msg string) string {
return buf.String()
}
+type bufWriter struct {
+ buf []byte
+ offset int
+ batchSize int
+ conn net.Conn
+ err error
+
+ onFlush func()
+}
+
+func newBufWriter(conn net.Conn, batchSize int) *bufWriter {
+ return &bufWriter{
+ buf: make([]byte, batchSize*2),
+ batchSize: batchSize,
+ conn: conn,
+ }
+}
+
+func (w *bufWriter) Write(b []byte) (n int, err error) {
+ if w.err != nil {
+ return 0, w.err
+ }
+ for len(b) > 0 {
+ nn := copy(w.buf[w.offset:], b)
+ b = b[nn:]
+ w.offset += nn
+ n += nn
+ if w.offset >= w.batchSize {
+ err = w.Flush()
+ }
+ }
+ return n, err
+}
+
+func (w *bufWriter) Flush() error {
+ if w.err != nil {
+ return w.err
+ }
+ if w.offset == 0 {
+ return nil
+ }
+ if w.onFlush != nil {
+ w.onFlush()
+ }
+ _, w.err = w.conn.Write(w.buf[:w.offset])
+ w.offset = 0
+ return w.err
+}
+
type framer struct {
- numWriters int32
- reader io.Reader
- writer *bufio.Writer
- fr *http2.Framer
+ writer *bufWriter
+ fr *http2.Framer
}
func newFramer(conn net.Conn, writeBufferSize, readBufferSize int) *framer {
+ r := bufio.NewReaderSize(conn, readBufferSize)
+ w := newBufWriter(conn, writeBufferSize)
f := &framer{
- reader: bufio.NewReaderSize(conn, readBufferSize),
- writer: bufio.NewWriterSize(conn, writeBufferSize),
+ writer: w,
+ fr: http2.NewFramer(w, r),
}
- f.fr = http2.NewFramer(f.writer, f.reader)
// Opt-in to Frame reuse API on framer to reduce garbage.
// Frames aren't safe to read from after a subsequent call to ReadFrame.
f.fr.SetReuseFrames()
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go
index 2cf9bd3..f51f878 100644
--- a/vendor/google.golang.org/grpc/transport/transport.go
+++ b/vendor/google.golang.org/grpc/transport/transport.go
@@ -17,19 +17,19 @@
*/
// Package transport defines and implements message oriented communication
-// channel to complete various transactions (e.g., an RPC).
-package transport // import "google.golang.org/grpc/transport"
+// channel to complete various transactions (e.g., an RPC). It is meant for
+// grpc-internal usage and is not intended to be imported directly by users.
+package transport // externally used as import "google.golang.org/grpc/transport"
import (
- stdctx "context"
+ "errors"
"fmt"
"io"
"net"
"sync"
- "time"
+ "sync/atomic"
"golang.org/x/net/context"
- "golang.org/x/net/http2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
@@ -58,6 +58,7 @@ type recvBuffer struct {
c chan recvMsg
mu sync.Mutex
backlog []recvMsg
+ err error
}
func newRecvBuffer() *recvBuffer {
@@ -69,6 +70,13 @@ func newRecvBuffer() *recvBuffer {
func (b *recvBuffer) put(r recvMsg) {
b.mu.Lock()
+ if b.err != nil {
+ b.mu.Unlock()
+ // An error had occurred earlier, don't accept more
+ // data or errors.
+ return
+ }
+ b.err = r.err
if len(b.backlog) == 0 {
select {
case b.c <- r:
@@ -102,14 +110,15 @@ func (b *recvBuffer) get() <-chan recvMsg {
return b.c
}
+//
// recvBufferReader implements io.Reader interface to read the data from
// recvBuffer.
type recvBufferReader struct {
- ctx context.Context
- goAway chan struct{}
- recv *recvBuffer
- last []byte // Stores the remaining data in the previous calls.
- err error
+ ctx context.Context
+ ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
+ recv *recvBuffer
+ last []byte // Stores the remaining data in the previous calls.
+ err error
}
// Read reads the next len(p) bytes from last. If last is drained, it tries to
@@ -131,10 +140,8 @@ func (r *recvBufferReader) read(p []byte) (n int, err error) {
return copied, nil
}
select {
- case <-r.ctx.Done():
+ case <-r.ctxDone:
return 0, ContextErr(r.ctx.Err())
- case <-r.goAway:
- return 0, ErrStreamDrain
case m := <-r.recv.get():
r.recv.load()
if m.err != nil {
@@ -146,61 +153,7 @@ func (r *recvBufferReader) read(p []byte) (n int, err error) {
}
}
-// All items in an out of a controlBuffer should be the same type.
-type item interface {
- item()
-}
-
-// controlBuffer is an unbounded channel of item.
-type controlBuffer struct {
- c chan item
- mu sync.Mutex
- backlog []item
-}
-
-func newControlBuffer() *controlBuffer {
- b := &controlBuffer{
- c: make(chan item, 1),
- }
- return b
-}
-
-func (b *controlBuffer) put(r item) {
- b.mu.Lock()
- if len(b.backlog) == 0 {
- select {
- case b.c <- r:
- b.mu.Unlock()
- return
- default:
- }
- }
- b.backlog = append(b.backlog, r)
- b.mu.Unlock()
-}
-
-func (b *controlBuffer) load() {
- b.mu.Lock()
- if len(b.backlog) > 0 {
- select {
- case b.c <- b.backlog[0]:
- b.backlog[0] = nil
- b.backlog = b.backlog[1:]
- default:
- }
- }
- b.mu.Unlock()
-}
-
-// get returns the channel that receives an item in the buffer.
-//
-// Upon receipt of an item, the caller should call load to send another
-// item onto the channel if there is any.
-func (b *controlBuffer) get() <-chan item {
- return b.c
-}
-
-type streamState uint8
+type streamState uint32
const (
streamActive streamState = iota
@@ -211,66 +164,93 @@ const (
// Stream represents an RPC in the transport layer.
type Stream struct {
- id uint32
- // nil for client side Stream.
- st ServerTransport
- // ctx is the associated context of the stream.
- ctx context.Context
- // cancel is always nil for client side Stream.
- cancel context.CancelFunc
- // done is closed when the final status arrives.
- done chan struct{}
- // goAway is closed when the server sent GoAways signal before this stream was initiated.
- goAway chan struct{}
- // method records the associated RPC method of the stream.
- method string
+ id uint32
+ st ServerTransport // nil for client side Stream
+ ctx context.Context // the associated context of the stream
+ cancel context.CancelFunc // always nil for client side Stream
+ done chan struct{} // closed at the end of stream to unblock writers. On the client side.
+ ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
+ method string // the associated RPC method of the stream
recvCompress string
sendCompress string
buf *recvBuffer
trReader io.Reader
fc *inFlow
recvQuota uint32
-
- // TODO: Remote this unused variable.
- // The accumulated inbound quota pending for window update.
- updateQuota uint32
+ wq *writeQuota
// Callback to state application's intentions to read data. This
- // is used to adjust flow control, if need be.
+ // is used to adjust flow control, if needed.
requestRead func(int)
- sendQuotaPool *quotaPool
- localSendQuota *quotaPool
- // Close headerChan to indicate the end of reception of header metadata.
- headerChan chan struct{}
- // header caches the received header metadata.
- header metadata.MD
- // The key-value map of trailer metadata.
- trailer metadata.MD
-
- mu sync.RWMutex // guard the following
- // headerOK becomes true from the first header is about to send.
- headerOk bool
- state streamState
- // true iff headerChan is closed. Used to avoid closing headerChan
- // multiple times.
- headerDone bool
- // the status error received from the server.
+ headerChan chan struct{} // closed to indicate the end of header metadata.
+ headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
+
+ // hdrMu protects header and trailer metadata on the server-side.
+ hdrMu sync.Mutex
+ header metadata.MD // the received header metadata.
+ trailer metadata.MD // the key-value map of trailer metadata.
+
+ // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
+ headerSent uint32
+
+ state streamState
+
+ // On client-side it is the status error received from the server.
+ // On server-side it is unused.
status *status.Status
- // rstStream indicates whether a RST_STREAM frame needs to be sent
- // to the server to signify that this stream is closing.
- rstStream bool
- // rstError is the error that needs to be sent along with the RST_STREAM frame.
- rstError http2.ErrCode
- // bytesSent and bytesReceived indicates whether any bytes have been sent or
- // received on this stream.
- bytesSent bool
- bytesReceived bool
+
+ bytesReceived uint32 // indicates whether any bytes have been received on this stream
+ unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
+
+ // contentSubtype is the content-subtype for requests.
+ // this must be lowercase or the behavior is undefined.
+ contentSubtype string
+}
+
+// isHeaderSent is only valid on the server-side.
+func (s *Stream) isHeaderSent() bool {
+ return atomic.LoadUint32(&s.headerSent) == 1
+}
+
+// updateHeaderSent updates headerSent and returns true
+// if it was alreay set. It is valid only on server-side.
+func (s *Stream) updateHeaderSent() bool {
+ return atomic.SwapUint32(&s.headerSent, 1) == 1
+}
+
+func (s *Stream) swapState(st streamState) streamState {
+ return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
+}
+
+func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
+ return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
+}
+
+func (s *Stream) getState() streamState {
+ return streamState(atomic.LoadUint32((*uint32)(&s.state)))
+}
+
+func (s *Stream) waitOnHeader() error {
+ if s.headerChan == nil {
+ // On the server headerChan is always nil since a stream originates
+ // only after having received headers.
+ return nil
+ }
+ select {
+ case <-s.ctx.Done():
+ return ContextErr(s.ctx.Err())
+ case <-s.headerChan:
+ return nil
+ }
}
// RecvCompress returns the compression algorithm applied to the inbound
// message. It is empty string if there is no compression applied.
func (s *Stream) RecvCompress() string {
+ if err := s.waitOnHeader(); err != nil {
+ return ""
+ }
return s.recvCompress
}
@@ -285,28 +265,17 @@ func (s *Stream) Done() <-chan struct{} {
return s.done
}
-// GoAway returns a channel which is closed when the server sent GoAways signal
-// before this stream was initiated.
-func (s *Stream) GoAway() <-chan struct{} {
- return s.goAway
-}
-
// Header acquires the key-value pairs of header metadata once it
// is available. It blocks until i) the metadata is ready or ii) there is no
// header metadata or iii) the stream is canceled/expired.
func (s *Stream) Header() (metadata.MD, error) {
- var err error
- select {
- case <-s.ctx.Done():
- err = ContextErr(s.ctx.Err())
- case <-s.goAway:
- err = ErrStreamDrain
- case <-s.headerChan:
- return s.header.Copy(), nil
- }
+ err := s.waitOnHeader()
// Even if the stream is closed, header is returned if available.
select {
case <-s.headerChan:
+ if s.header == nil {
+ return nil, nil
+ }
return s.header.Copy(), nil
default:
}
@@ -316,10 +285,10 @@ func (s *Stream) Header() (metadata.MD, error) {
// Trailer returns the cached trailer metedata. Note that if it is not called
// after the entire stream is done, it could return an empty MD. Client
// side only.
+// It can be safely read only after stream has ended that is either read
+// or write have returned io.EOF.
func (s *Stream) Trailer() metadata.MD {
- s.mu.RLock()
c := s.trailer.Copy()
- s.mu.RUnlock()
return c
}
@@ -329,6 +298,15 @@ func (s *Stream) ServerTransport() ServerTransport {
return s.st
}
+// ContentSubtype returns the content-subtype for a request. For example, a
+// content-subtype of "proto" will result in a content-type of
+// "application/grpc+proto". This will always be lowercase. See
+// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
+// more details.
+func (s *Stream) ContentSubtype() string {
+ return s.contentSubtype
+}
+
// Context returns the context of the stream.
func (s *Stream) Context() context.Context {
return s.ctx
@@ -340,36 +318,49 @@ func (s *Stream) Method() string {
}
// Status returns the status received from the server.
+// Status can be read safely only after the stream has ended,
+// that is, read or write has returned io.EOF.
func (s *Stream) Status() *status.Status {
return s.status
}
// SetHeader sets the header metadata. This can be called multiple times.
// Server side only.
+// This should not be called in parallel to other data writes.
func (s *Stream) SetHeader(md metadata.MD) error {
- s.mu.Lock()
- if s.headerOk || s.state == streamDone {
- s.mu.Unlock()
- return ErrIllegalHeaderWrite
- }
if md.Len() == 0 {
- s.mu.Unlock()
return nil
}
+ if s.isHeaderSent() || s.getState() == streamDone {
+ return ErrIllegalHeaderWrite
+ }
+ s.hdrMu.Lock()
s.header = metadata.Join(s.header, md)
- s.mu.Unlock()
+ s.hdrMu.Unlock()
return nil
}
+// SendHeader sends the given header metadata. The given metadata is
+// combined with any metadata set by previous calls to SetHeader and
+// then written to the transport stream.
+func (s *Stream) SendHeader(md metadata.MD) error {
+ t := s.ServerTransport()
+ return t.WriteHeader(s, md)
+}
+
// SetTrailer sets the trailer metadata which will be sent with the RPC status
// by the server. This can be called multiple times. Server side only.
+// This should not be called parallel to other data writes.
func (s *Stream) SetTrailer(md metadata.MD) error {
if md.Len() == 0 {
return nil
}
- s.mu.Lock()
+ if s.getState() == streamDone {
+ return ErrIllegalHeaderWrite
+ }
+ s.hdrMu.Lock()
s.trailer = metadata.Join(s.trailer, md)
- s.mu.Unlock()
+ s.hdrMu.Unlock()
return nil
}
@@ -409,28 +400,15 @@ func (t *transportReader) Read(p []byte) (n int, err error) {
return
}
-// finish sets the stream's state and status, and closes the done channel.
-// s.mu must be held by the caller. st must always be non-nil.
-func (s *Stream) finish(st *status.Status) {
- s.status = st
- s.state = streamDone
- close(s.done)
-}
-
-// BytesSent indicates whether any bytes have been sent on this stream.
-func (s *Stream) BytesSent() bool {
- s.mu.Lock()
- bs := s.bytesSent
- s.mu.Unlock()
- return bs
-}
-
// BytesReceived indicates whether any bytes have been received on this stream.
func (s *Stream) BytesReceived() bool {
- s.mu.Lock()
- br := s.bytesReceived
- s.mu.Unlock()
- return br
+ return atomic.LoadUint32(&s.bytesReceived) == 1
+}
+
+// Unprocessed indicates whether the server did not process this stream --
+// i.e. it sent a refused stream or GOAWAY including this stream ID.
+func (s *Stream) Unprocessed() bool {
+ return atomic.LoadUint32(&s.unprocessed) == 1
}
// GoString is implemented by Stream so context.String() won't
@@ -439,21 +417,6 @@ func (s *Stream) GoString() string {
return fmt.Sprintf("<stream: %p, %v>", s, s.method)
}
-// The key to save transport.Stream in the context.
-type streamKey struct{}
-
-// newContextWithStream creates a new context from ctx and attaches stream
-// to it.
-func newContextWithStream(ctx context.Context, stream *Stream) context.Context {
- return context.WithValue(ctx, streamKey{}, stream)
-}
-
-// StreamFromContext returns the stream saved in ctx.
-func StreamFromContext(ctx context.Context) (s *Stream, ok bool) {
- s, ok = ctx.Value(streamKey{}).(*Stream)
- return
-}
-
// state of transport
type transportState int
@@ -475,6 +438,7 @@ type ServerConfig struct {
InitialConnWindowSize int32
WriteBufferSize int
ReadBufferSize int
+ ChannelzParentID int64
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
@@ -510,18 +474,21 @@ type ConnectOptions struct {
WriteBufferSize int
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
ReadBufferSize int
+ // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
+ ChannelzParentID int64
}
// TargetInfo contains the information of the target such as network address and metadata.
type TargetInfo struct {
- Addr string
- Metadata interface{}
+ Addr string
+ Metadata interface{}
+ Authority string
}
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
-func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions, timeout time.Duration) (ClientTransport, error) {
- return newHTTP2Client(ctx, target, opts, timeout)
+func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) {
+ return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess)
}
// Options provides additional hints and information for message
@@ -545,10 +512,6 @@ type CallHdr struct {
// Method specifies the operation to perform.
Method string
- // RecvCompress specifies the compression algorithm applied on
- // inbound messages.
- RecvCompress string
-
// SendCompress specifies the compression algorithm applied on
// outbound message.
SendCompress string
@@ -563,6 +526,14 @@ type CallHdr struct {
// for performance purposes.
// If it's false, new stream will never be flushed.
Flush bool
+
+ // ContentSubtype specifies the content-subtype for a request. For example, a
+ // content-subtype of "proto" will result in a content-type of
+ // "application/grpc+proto". The value of ContentSubtype must be all
+ // lowercase, otherwise the behavior is undefined. See
+ // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
+ // for more details.
+ ContentSubtype string
}
// ClientTransport is the common interface for all gRPC client-side transport
@@ -604,6 +575,12 @@ type ClientTransport interface {
// GetGoAwayReason returns the reason why GoAway frame was received.
GetGoAwayReason() GoAwayReason
+
+ // IncrMsgSent increments the number of message sent through this transport.
+ IncrMsgSent()
+
+ // IncrMsgRecv increments the number of message received through this transport.
+ IncrMsgRecv()
}
// ServerTransport is the common interface for all gRPC server-side transport
@@ -637,6 +614,12 @@ type ServerTransport interface {
// Drain notifies the client this ServerTransport stops accepting new RPCs.
Drain()
+
+ // IncrMsgSent increments the number of message sent through this transport.
+ IncrMsgSent()
+
+ // IncrMsgRecv increments the number of message received through this transport.
+ IncrMsgRecv()
}
// streamErrorf creates an StreamError with the specified error code and description.
@@ -686,9 +669,16 @@ func (e ConnectionError) Origin() error {
var (
// ErrConnClosing indicates that the transport is closing.
ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
- // ErrStreamDrain indicates that the stream is rejected by the server because
- // the server stops accepting new RPCs.
- ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
+ // errStreamDrain indicates that the stream is rejected because the
+ // connection is draining. This could be caused by goaway or balancer
+ // removing the address.
+ errStreamDrain = streamErrorf(codes.Unavailable, "the connection is draining")
+ // errStreamDone is returned from write at the client side to indiacte application
+ // layer of an error.
+ errStreamDone = errors.New("the stream is done")
+ // StatusGoAway indicates that the server sent a GOAWAY that included this
+ // stream's ID in unprocessed RPCs.
+ statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
)
// TODO: See if we can replace StreamError with status package errors.
@@ -703,80 +693,16 @@ func (e StreamError) Error() string {
return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
}
-// wait blocks until it can receive from one of the provided contexts or
-// channels. ctx is the context of the RPC, tctx is the context of the
-// transport, done is a channel closed to indicate the end of the RPC, goAway
-// is a channel closed to indicate a GOAWAY was received, and proceed is a
-// quota channel, whose received value is returned from this function if none
-// of the other signals occur first.
-func wait(ctx, tctx context.Context, done, goAway <-chan struct{}, proceed <-chan int) (int, error) {
- select {
- case <-ctx.Done():
- return 0, ContextErr(ctx.Err())
- case <-done:
- return 0, io.EOF
- case <-goAway:
- return 0, ErrStreamDrain
- case <-tctx.Done():
- return 0, ErrConnClosing
- case i := <-proceed:
- return i, nil
- }
-}
-
-// ContextErr converts the error from context package into a StreamError.
-func ContextErr(err error) StreamError {
- switch err {
- case context.DeadlineExceeded, stdctx.DeadlineExceeded:
- return streamErrorf(codes.DeadlineExceeded, "%v", err)
- case context.Canceled, stdctx.Canceled:
- return streamErrorf(codes.Canceled, "%v", err)
- }
- return streamErrorf(codes.Internal, "Unexpected error from context packet: %v", err)
-}
-
// GoAwayReason contains the reason for the GoAway frame received.
type GoAwayReason uint8
const (
- // Invalid indicates that no GoAway frame is received.
- Invalid GoAwayReason = 0
- // NoReason is the default value when GoAway frame is received.
- NoReason GoAwayReason = 1
- // TooManyPings indicates that a GoAway frame with ErrCodeEnhanceYourCalm
- // was received and that the debug data said "too_many_pings".
- TooManyPings GoAwayReason = 2
+ // GoAwayInvalid indicates that no GoAway frame is received.
+ GoAwayInvalid GoAwayReason = 0
+ // GoAwayNoReason is the default value when GoAway frame is received.
+ GoAwayNoReason GoAwayReason = 1
+ // GoAwayTooManyPings indicates that a GoAway frame with
+ // ErrCodeEnhanceYourCalm was received and that the debug data said
+ // "too_many_pings".
+ GoAwayTooManyPings GoAwayReason = 2
)
-
-// loopyWriter is run in a separate go routine. It is the single code path that will
-// write data on wire.
-func loopyWriter(ctx context.Context, cbuf *controlBuffer, handler func(item) error) {
- for {
- select {
- case i := <-cbuf.get():
- cbuf.load()
- if err := handler(i); err != nil {
- return
- }
- case <-ctx.Done():
- return
- }
- hasData:
- for {
- select {
- case i := <-cbuf.get():
- cbuf.load()
- if err := handler(i); err != nil {
- return
- }
- case <-ctx.Done():
- return
- default:
- if err := handler(&flushIO{}); err != nil {
- return
- }
- break hasData
- }
- }
- }
-}