aboutsummaryrefslogtreecommitdiff
path: root/vendor/golang.org/x/net/http2/writesched.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/golang.org/x/net/http2/writesched.go')
-rw-r--r--vendor/golang.org/x/net/http2/writesched.go283
1 files changed, 283 insertions, 0 deletions
diff --git a/vendor/golang.org/x/net/http2/writesched.go b/vendor/golang.org/x/net/http2/writesched.go
new file mode 100644
index 0000000..c24316c
--- /dev/null
+++ b/vendor/golang.org/x/net/http2/writesched.go
@@ -0,0 +1,283 @@
+// Copyright 2014 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package http2
+
+import "fmt"
+
+// frameWriteMsg is a request to write a frame.
+type frameWriteMsg struct {
+ // write is the interface value that does the writing, once the
+ // writeScheduler (below) has decided to select this frame
+ // to write. The write functions are all defined in write.go.
+ write writeFramer
+
+ stream *stream // used for prioritization. nil for non-stream frames.
+
+ // done, if non-nil, must be a buffered channel with space for
+ // 1 message and is sent the return value from write (or an
+ // earlier error) when the frame has been written.
+ done chan error
+}
+
+// for debugging only:
+func (wm frameWriteMsg) String() string {
+ var streamID uint32
+ if wm.stream != nil {
+ streamID = wm.stream.id
+ }
+ var des string
+ if s, ok := wm.write.(fmt.Stringer); ok {
+ des = s.String()
+ } else {
+ des = fmt.Sprintf("%T", wm.write)
+ }
+ return fmt.Sprintf("[frameWriteMsg stream=%d, ch=%v, type: %v]", streamID, wm.done != nil, des)
+}
+
+// writeScheduler tracks pending frames to write, priorities, and decides
+// the next one to use. It is not thread-safe.
+type writeScheduler struct {
+ // zero are frames not associated with a specific stream.
+ // They're sent before any stream-specific freams.
+ zero writeQueue
+
+ // maxFrameSize is the maximum size of a DATA frame
+ // we'll write. Must be non-zero and between 16K-16M.
+ maxFrameSize uint32
+
+ // sq contains the stream-specific queues, keyed by stream ID.
+ // when a stream is idle, it's deleted from the map.
+ sq map[uint32]*writeQueue
+
+ // canSend is a slice of memory that's reused between frame
+ // scheduling decisions to hold the list of writeQueues (from sq)
+ // which have enough flow control data to send. After canSend is
+ // built, the best is selected.
+ canSend []*writeQueue
+
+ // pool of empty queues for reuse.
+ queuePool []*writeQueue
+}
+
+func (ws *writeScheduler) putEmptyQueue(q *writeQueue) {
+ if len(q.s) != 0 {
+ panic("queue must be empty")
+ }
+ ws.queuePool = append(ws.queuePool, q)
+}
+
+func (ws *writeScheduler) getEmptyQueue() *writeQueue {
+ ln := len(ws.queuePool)
+ if ln == 0 {
+ return new(writeQueue)
+ }
+ q := ws.queuePool[ln-1]
+ ws.queuePool = ws.queuePool[:ln-1]
+ return q
+}
+
+func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 }
+
+func (ws *writeScheduler) add(wm frameWriteMsg) {
+ st := wm.stream
+ if st == nil {
+ ws.zero.push(wm)
+ } else {
+ ws.streamQueue(st.id).push(wm)
+ }
+}
+
+func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue {
+ if q, ok := ws.sq[streamID]; ok {
+ return q
+ }
+ if ws.sq == nil {
+ ws.sq = make(map[uint32]*writeQueue)
+ }
+ q := ws.getEmptyQueue()
+ ws.sq[streamID] = q
+ return q
+}
+
+// take returns the most important frame to write and removes it from the scheduler.
+// It is illegal to call this if the scheduler is empty or if there are no connection-level
+// flow control bytes available.
+func (ws *writeScheduler) take() (wm frameWriteMsg, ok bool) {
+ if ws.maxFrameSize == 0 {
+ panic("internal error: ws.maxFrameSize not initialized or invalid")
+ }
+
+ // If there any frames not associated with streams, prefer those first.
+ // These are usually SETTINGS, etc.
+ if !ws.zero.empty() {
+ return ws.zero.shift(), true
+ }
+ if len(ws.sq) == 0 {
+ return
+ }
+
+ // Next, prioritize frames on streams that aren't DATA frames (no cost).
+ for id, q := range ws.sq {
+ if q.firstIsNoCost() {
+ return ws.takeFrom(id, q)
+ }
+ }
+
+ // Now, all that remains are DATA frames with non-zero bytes to
+ // send. So pick the best one.
+ if len(ws.canSend) != 0 {
+ panic("should be empty")
+ }
+ for _, q := range ws.sq {
+ if n := ws.streamWritableBytes(q); n > 0 {
+ ws.canSend = append(ws.canSend, q)
+ }
+ }
+ if len(ws.canSend) == 0 {
+ return
+ }
+ defer ws.zeroCanSend()
+
+ // TODO: find the best queue
+ q := ws.canSend[0]
+
+ return ws.takeFrom(q.streamID(), q)
+}
+
+// zeroCanSend is defered from take.
+func (ws *writeScheduler) zeroCanSend() {
+ for i := range ws.canSend {
+ ws.canSend[i] = nil
+ }
+ ws.canSend = ws.canSend[:0]
+}
+
+// streamWritableBytes returns the number of DATA bytes we could write
+// from the given queue's stream, if this stream/queue were
+// selected. It is an error to call this if q's head isn't a
+// *writeData.
+func (ws *writeScheduler) streamWritableBytes(q *writeQueue) int32 {
+ wm := q.head()
+ ret := wm.stream.flow.available() // max we can write
+ if ret == 0 {
+ return 0
+ }
+ if int32(ws.maxFrameSize) < ret {
+ ret = int32(ws.maxFrameSize)
+ }
+ if ret == 0 {
+ panic("internal error: ws.maxFrameSize not initialized or invalid")
+ }
+ wd := wm.write.(*writeData)
+ if len(wd.p) < int(ret) {
+ ret = int32(len(wd.p))
+ }
+ return ret
+}
+
+func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg, ok bool) {
+ wm = q.head()
+ // If the first item in this queue costs flow control tokens
+ // and we don't have enough, write as much as we can.
+ if wd, ok := wm.write.(*writeData); ok && len(wd.p) > 0 {
+ allowed := wm.stream.flow.available() // max we can write
+ if allowed == 0 {
+ // No quota available. Caller can try the next stream.
+ return frameWriteMsg{}, false
+ }
+ if int32(ws.maxFrameSize) < allowed {
+ allowed = int32(ws.maxFrameSize)
+ }
+ // TODO: further restrict the allowed size, because even if
+ // the peer says it's okay to write 16MB data frames, we might
+ // want to write smaller ones to properly weight competing
+ // streams' priorities.
+
+ if len(wd.p) > int(allowed) {
+ wm.stream.flow.take(allowed)
+ chunk := wd.p[:allowed]
+ wd.p = wd.p[allowed:]
+ // Make up a new write message of a valid size, rather
+ // than shifting one off the queue.
+ return frameWriteMsg{
+ stream: wm.stream,
+ write: &writeData{
+ streamID: wd.streamID,
+ p: chunk,
+ // even if the original had endStream set, there
+ // arebytes remaining because len(wd.p) > allowed,
+ // so we know endStream is false:
+ endStream: false,
+ },
+ // our caller is blocking on the final DATA frame, not
+ // these intermediates, so no need to wait:
+ done: nil,
+ }, true
+ }
+ wm.stream.flow.take(int32(len(wd.p)))
+ }
+
+ q.shift()
+ if q.empty() {
+ ws.putEmptyQueue(q)
+ delete(ws.sq, id)
+ }
+ return wm, true
+}
+
+func (ws *writeScheduler) forgetStream(id uint32) {
+ q, ok := ws.sq[id]
+ if !ok {
+ return
+ }
+ delete(ws.sq, id)
+
+ // But keep it for others later.
+ for i := range q.s {
+ q.s[i] = frameWriteMsg{}
+ }
+ q.s = q.s[:0]
+ ws.putEmptyQueue(q)
+}
+
+type writeQueue struct {
+ s []frameWriteMsg
+}
+
+// streamID returns the stream ID for a non-empty stream-specific queue.
+func (q *writeQueue) streamID() uint32 { return q.s[0].stream.id }
+
+func (q *writeQueue) empty() bool { return len(q.s) == 0 }
+
+func (q *writeQueue) push(wm frameWriteMsg) {
+ q.s = append(q.s, wm)
+}
+
+// head returns the next item that would be removed by shift.
+func (q *writeQueue) head() frameWriteMsg {
+ if len(q.s) == 0 {
+ panic("invalid use of queue")
+ }
+ return q.s[0]
+}
+
+func (q *writeQueue) shift() frameWriteMsg {
+ if len(q.s) == 0 {
+ panic("invalid use of queue")
+ }
+ wm := q.s[0]
+ // TODO: less copy-happy queue.
+ copy(q.s, q.s[1:])
+ q.s[len(q.s)-1] = frameWriteMsg{}
+ q.s = q.s[:len(q.s)-1]
+ return wm
+}
+
+func (q *writeQueue) firstIsNoCost() bool {
+ if df, ok := q.s[0].write.(*writeData); ok {
+ return len(df.p) == 0
+ }
+ return true
+}