aboutsummaryrefslogtreecommitdiff
path: root/vendor/golang.org/x/net/http2/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/golang.org/x/net/http2/server.go')
-rw-r--r--vendor/golang.org/x/net/http2/server.go919
1 files changed, 631 insertions, 288 deletions
diff --git a/vendor/golang.org/x/net/http2/server.go b/vendor/golang.org/x/net/http2/server.go
index c986bc1..370e42e 100644
--- a/vendor/golang.org/x/net/http2/server.go
+++ b/vendor/golang.org/x/net/http2/server.go
@@ -2,17 +2,6 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-// TODO: replace all <-sc.doneServing with reads from the stream's cw
-// instead, and make sure that on close we close all open
-// streams. then remove doneServing?
-
-// TODO: re-audit GOAWAY support. Consider each incoming frame type and
-// whether it should be ignored during graceful shutdown.
-
-// TODO: disconnect idle clients. GFE seems to do 4 minutes. make
-// configurable? or maximum number of idle clients and remove the
-// oldest?
-
// TODO: turn off the serve goroutine when idle, so
// an idle conn only has the readFrames goroutine active. (which could
// also be optimized probably to pin less memory in crypto/tls). This
@@ -44,6 +33,7 @@ import (
"fmt"
"io"
"log"
+ "math"
"net"
"net/http"
"net/textproto"
@@ -114,6 +104,15 @@ type Server struct {
// PermitProhibitedCipherSuites, if true, permits the use of
// cipher suites prohibited by the HTTP/2 spec.
PermitProhibitedCipherSuites bool
+
+ // IdleTimeout specifies how long until idle clients should be
+ // closed with a GOAWAY frame. PING frames are not considered
+ // activity for the purposes of IdleTimeout.
+ IdleTimeout time.Duration
+
+ // NewWriteScheduler constructs a write scheduler for a connection.
+ // If nil, a default scheduler is chosen.
+ NewWriteScheduler func() WriteScheduler
}
func (s *Server) maxReadFrameSize() uint32 {
@@ -130,15 +129,25 @@ func (s *Server) maxConcurrentStreams() uint32 {
return defaultMaxStreams
}
+// List of funcs for ConfigureServer to run. Both h1 and h2 are guaranteed
+// to be non-nil.
+var configServerFuncs []func(h1 *http.Server, h2 *Server) error
+
// ConfigureServer adds HTTP/2 support to a net/http Server.
//
// The configuration conf may be nil.
//
// ConfigureServer must be called before s begins serving.
func ConfigureServer(s *http.Server, conf *Server) error {
+ if s == nil {
+ panic("nil *http.Server")
+ }
if conf == nil {
conf = new(Server)
}
+ if err := configureServer18(s, conf); err != nil {
+ return err
+ }
if s.TLSConfig == nil {
s.TLSConfig = new(tls.Config)
@@ -204,6 +213,13 @@ func ConfigureServer(s *http.Server, conf *Server) error {
return nil
}
+// h1ServerShutdownChan if non-nil provides a func to return a channel
+// that will be closed when the provided *http.Server wants to shut
+// down. This is initialized via an init func in net/http (via its
+// mangled name from x/tools/cmd/bundle). This is only used when http2
+// is bundled into std for now.
+var h1ServerShutdownChan func(*http.Server) <-chan struct{}
+
// ServeConnOpts are options for the Server.ServeConn method.
type ServeConnOpts struct {
// BaseConfig optionally sets the base configuration
@@ -254,29 +270,35 @@ func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) {
defer cancel()
sc := &serverConn{
- srv: s,
- hs: opts.baseConfig(),
- conn: c,
- baseCtx: baseCtx,
- remoteAddrStr: c.RemoteAddr().String(),
- bw: newBufferedWriter(c),
- handler: opts.handler(),
- streams: make(map[uint32]*stream),
- readFrameCh: make(chan readFrameResult),
- wantWriteFrameCh: make(chan frameWriteMsg, 8),
- wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
- bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
- doneServing: make(chan struct{}),
- advMaxStreams: s.maxConcurrentStreams(),
- writeSched: writeScheduler{
- maxFrameSize: initialMaxFrameSize,
- },
+ srv: s,
+ hs: opts.baseConfig(),
+ conn: c,
+ baseCtx: baseCtx,
+ remoteAddrStr: c.RemoteAddr().String(),
+ bw: newBufferedWriter(c),
+ handler: opts.handler(),
+ streams: make(map[uint32]*stream),
+ readFrameCh: make(chan readFrameResult),
+ wantWriteFrameCh: make(chan FrameWriteRequest, 8),
+ wantStartPushCh: make(chan startPushRequest, 8),
+ wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
+ bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
+ doneServing: make(chan struct{}),
+ clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
+ advMaxStreams: s.maxConcurrentStreams(),
initialWindowSize: initialWindowSize,
+ maxFrameSize: initialMaxFrameSize,
headerTableSize: initialHeaderTableSize,
serveG: newGoroutineLock(),
pushEnabled: true,
}
+ if s.NewWriteScheduler != nil {
+ sc.writeSched = s.NewWriteScheduler()
+ } else {
+ sc.writeSched = NewRandomWriteScheduler()
+ }
+
sc.flow.add(initialWindowSize)
sc.inflow.add(initialWindowSize)
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
@@ -356,16 +378,18 @@ type serverConn struct {
handler http.Handler
baseCtx contextContext
framer *Framer
- doneServing chan struct{} // closed when serverConn.serve ends
- readFrameCh chan readFrameResult // written by serverConn.readFrames
- wantWriteFrameCh chan frameWriteMsg // from handlers -> serve
- wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
- bodyReadCh chan bodyReadMsg // from handlers -> serve
- testHookCh chan func(int) // code to run on the serve loop
- flow flow // conn-wide (not stream-specific) outbound flow control
- inflow flow // conn-wide inbound flow control
- tlsState *tls.ConnectionState // shared by all handlers, like net/http
+ doneServing chan struct{} // closed when serverConn.serve ends
+ readFrameCh chan readFrameResult // written by serverConn.readFrames
+ wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve
+ wantStartPushCh chan startPushRequest // from handlers -> serve
+ wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
+ bodyReadCh chan bodyReadMsg // from handlers -> serve
+ testHookCh chan func(int) // code to run on the serve loop
+ flow flow // conn-wide (not stream-specific) outbound flow control
+ inflow flow // conn-wide inbound flow control
+ tlsState *tls.ConnectionState // shared by all handlers, like net/http
remoteAddrStr string
+ writeSched WriteScheduler
// Everything following is owned by the serve loop; use serveG.check():
serveG goroutineLock // used to verify funcs are on serve()
@@ -375,22 +399,27 @@ type serverConn struct {
unackedSettings int // how many SETTINGS have we sent without ACKs?
clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
- curOpenStreams uint32 // client's number of open streams
- maxStreamID uint32 // max ever seen
+ curClientStreams uint32 // number of open streams initiated by the client
+ curPushedStreams uint32 // number of open streams initiated by server push
+ maxStreamID uint32 // max ever seen from client
+ maxPushPromiseID uint32 // ID of the last push promise, or 0 if there have been no pushes
streams map[uint32]*stream
initialWindowSize int32
+ maxFrameSize int32
headerTableSize uint32
peerMaxHeaderListSize uint32 // zero means unknown (default)
canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
- writingFrame bool // started write goroutine but haven't heard back on wroteFrameCh
+ writingFrame bool // started writing a frame (on serve goroutine or separate)
+ writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
needsFrameFlush bool // last frame write wasn't a flush
- writeSched writeScheduler
- inGoAway bool // we've started to or sent GOAWAY
- needToSendGoAway bool // we need to schedule a GOAWAY frame write
+ inGoAway bool // we've started to or sent GOAWAY
+ inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
+ needToSendGoAway bool // we need to schedule a GOAWAY frame write
goAwayCode ErrCode
shutdownTimerCh <-chan time.Time // nil until used
shutdownTimer *time.Timer // nil until used
- freeRequestBodyBuf []byte // if non-nil, a free initialWindowSize buffer for getRequestBodyBuf
+ idleTimer *time.Timer // nil if unused
+ idleTimerCh <-chan time.Time // nil if unused
// Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes.Buffer
@@ -434,11 +463,11 @@ type stream struct {
numTrailerValues int64
weight uint8
state streamState
- sentReset bool // only true once detached from streams map
- gotReset bool // only true once detacted from streams map
- gotTrailerHeader bool // HEADER frame for trailers was seen
- wroteHeaders bool // whether we wrote headers (not status 100)
- reqBuf []byte
+ sentReset bool // only true once detached from streams map
+ gotReset bool // only true once detacted from streams map
+ gotTrailerHeader bool // HEADER frame for trailers was seen
+ wroteHeaders bool // whether we wrote headers (not status 100)
+ reqBuf []byte // if non-nil, body pipe buffer to return later at EOF
trailer http.Header // accumulated trailers
reqTrailer http.Header // handler's Request.Trailer
@@ -453,7 +482,7 @@ func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) {
func (sc *serverConn) state(streamID uint32) (streamState, *stream) {
sc.serveG.check()
- // http://http2.github.io/http2-spec/#rfc.section.5.1
+ // http://tools.ietf.org/html/rfc7540#section-5.1
if st, ok := sc.streams[streamID]; ok {
return st.state, st
}
@@ -603,17 +632,17 @@ func (sc *serverConn) readFrames() {
// frameWriteResult is the message passed from writeFrameAsync to the serve goroutine.
type frameWriteResult struct {
- wm frameWriteMsg // what was written (or attempted)
- err error // result of the writeFrame call
+ wr FrameWriteRequest // what was written (or attempted)
+ err error // result of the writeFrame call
}
// writeFrameAsync runs in its own goroutine and writes a single frame
// and then reports when it's done.
// At most one goroutine can be running writeFrameAsync at a time per
// serverConn.
-func (sc *serverConn) writeFrameAsync(wm frameWriteMsg) {
- err := wm.write.writeFrame(sc)
- sc.wroteFrameCh <- frameWriteResult{wm, err}
+func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest) {
+ err := wr.write.writeFrame(sc)
+ sc.wroteFrameCh <- frameWriteResult{wr, err}
}
func (sc *serverConn) closeAllStreamsOnConnClose() {
@@ -657,7 +686,7 @@ func (sc *serverConn) serve() {
sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
}
- sc.writeFrame(frameWriteMsg{
+ sc.writeFrame(FrameWriteRequest{
write: writeSettings{
{SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
{SettingMaxConcurrentStreams, sc.advMaxStreams},
@@ -682,6 +711,17 @@ func (sc *serverConn) serve() {
sc.setConnState(http.StateActive)
sc.setConnState(http.StateIdle)
+ if sc.srv.IdleTimeout != 0 {
+ sc.idleTimer = time.NewTimer(sc.srv.IdleTimeout)
+ defer sc.idleTimer.Stop()
+ sc.idleTimerCh = sc.idleTimer.C
+ }
+
+ var gracefulShutdownCh <-chan struct{}
+ if sc.hs != nil && h1ServerShutdownChan != nil {
+ gracefulShutdownCh = h1ServerShutdownChan(sc.hs)
+ }
+
go sc.readFrames() // closed by defer sc.conn.Close above
settingsTimer := time.NewTimer(firstSettingsTimeout)
@@ -689,8 +729,10 @@ func (sc *serverConn) serve() {
for {
loopNum++
select {
- case wm := <-sc.wantWriteFrameCh:
- sc.writeFrame(wm)
+ case wr := <-sc.wantWriteFrameCh:
+ sc.writeFrame(wr)
+ case spr := <-sc.wantStartPushCh:
+ sc.startPush(spr)
case res := <-sc.wroteFrameCh:
sc.wroteFrame(res)
case res := <-sc.readFrameCh:
@@ -707,12 +749,22 @@ func (sc *serverConn) serve() {
case <-settingsTimer.C:
sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
return
+ case <-gracefulShutdownCh:
+ gracefulShutdownCh = nil
+ sc.goAwayIn(ErrCodeNo, 0)
case <-sc.shutdownTimerCh:
sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
return
+ case <-sc.idleTimerCh:
+ sc.vlogf("connection is idle")
+ sc.goAway(ErrCodeNo)
case fn := <-sc.testHookCh:
fn(loopNum)
}
+
+ if sc.inGoAway && sc.curClientStreams == 0 && !sc.needToSendGoAway && !sc.writingFrame {
+ return
+ }
}
}
@@ -760,7 +812,7 @@ func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStrea
ch := errChanPool.Get().(chan error)
writeArg := writeDataPool.Get().(*writeData)
*writeArg = writeData{stream.id, data, endStream}
- err := sc.writeFrameFromHandler(frameWriteMsg{
+ err := sc.writeFrameFromHandler(FrameWriteRequest{
write: writeArg,
stream: stream,
done: ch,
@@ -796,17 +848,17 @@ func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStrea
return err
}
-// writeFrameFromHandler sends wm to sc.wantWriteFrameCh, but aborts
+// writeFrameFromHandler sends wr to sc.wantWriteFrameCh, but aborts
// if the connection has gone away.
//
// This must not be run from the serve goroutine itself, else it might
// deadlock writing to sc.wantWriteFrameCh (which is only mildly
// buffered and is read by serve itself). If you're on the serve
// goroutine, call writeFrame instead.
-func (sc *serverConn) writeFrameFromHandler(wm frameWriteMsg) error {
+func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error {
sc.serveG.checkNotOn() // NOT
select {
- case sc.wantWriteFrameCh <- wm:
+ case sc.wantWriteFrameCh <- wr:
return nil
case <-sc.doneServing:
// Serve loop is gone.
@@ -823,38 +875,38 @@ func (sc *serverConn) writeFrameFromHandler(wm frameWriteMsg) error {
// make it onto the wire
//
// If you're not on the serve goroutine, use writeFrameFromHandler instead.
-func (sc *serverConn) writeFrame(wm frameWriteMsg) {
+func (sc *serverConn) writeFrame(wr FrameWriteRequest) {
sc.serveG.check()
var ignoreWrite bool
// Don't send a 100-continue response if we've already sent headers.
// See golang.org/issue/14030.
- switch wm.write.(type) {
+ switch wr.write.(type) {
case *writeResHeaders:
- wm.stream.wroteHeaders = true
+ wr.stream.wroteHeaders = true
case write100ContinueHeadersFrame:
- if wm.stream.wroteHeaders {
+ if wr.stream.wroteHeaders {
ignoreWrite = true
}
}
if !ignoreWrite {
- sc.writeSched.add(wm)
+ sc.writeSched.Push(wr)
}
sc.scheduleFrameWrite()
}
-// startFrameWrite starts a goroutine to write wm (in a separate
+// startFrameWrite starts a goroutine to write wr (in a separate
// goroutine since that might block on the network), and updates the
-// serve goroutine's state about the world, updated from info in wm.
-func (sc *serverConn) startFrameWrite(wm frameWriteMsg) {
+// serve goroutine's state about the world, updated from info in wr.
+func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) {
sc.serveG.check()
if sc.writingFrame {
panic("internal error: can only be writing one frame at a time")
}
- st := wm.stream
+ st := wr.stream
if st != nil {
switch st.state {
case stateHalfClosedLocal:
@@ -865,13 +917,31 @@ func (sc *serverConn) startFrameWrite(wm frameWriteMsg) {
sc.scheduleFrameWrite()
return
}
- panic(fmt.Sprintf("internal error: attempt to send a write %v on a closed stream", wm))
+ panic(fmt.Sprintf("internal error: attempt to send a write %v on a closed stream", wr))
+ }
+ }
+ if wpp, ok := wr.write.(*writePushPromise); ok {
+ var err error
+ wpp.promisedID, err = wpp.allocatePromisedID()
+ if err != nil {
+ sc.writingFrameAsync = false
+ if wr.done != nil {
+ wr.done <- err
+ }
+ return
}
}
sc.writingFrame = true
sc.needsFrameFlush = true
- go sc.writeFrameAsync(wm)
+ if wr.write.staysWithinBuffer(sc.bw.Available()) {
+ sc.writingFrameAsync = false
+ err := wr.write.writeFrame(sc)
+ sc.wroteFrame(frameWriteResult{wr, err})
+ } else {
+ sc.writingFrameAsync = true
+ go sc.writeFrameAsync(wr)
+ }
}
// errHandlerPanicked is the error given to any callers blocked in a read from
@@ -887,25 +957,26 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) {
panic("internal error: expected to be already writing a frame")
}
sc.writingFrame = false
+ sc.writingFrameAsync = false
- wm := res.wm
- st := wm.stream
+ wr := res.wr
+ st := wr.stream
- closeStream := endsStream(wm.write)
+ closeStream := endsStream(wr.write)
- if _, ok := wm.write.(handlerPanicRST); ok {
+ if _, ok := wr.write.(handlerPanicRST); ok {
sc.closeStream(st, errHandlerPanicked)
}
// Reply (if requested) to the blocked ServeHTTP goroutine.
- if ch := wm.done; ch != nil {
+ if ch := wr.done; ch != nil {
select {
case ch <- res.err:
default:
- panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wm.write))
+ panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))
}
}
- wm.write = nil // prevent use (assume it's tainted after wm.done send)
+ wr.write = nil // prevent use (assume it's tainted after wr.done send)
if closeStream {
if st == nil {
@@ -916,11 +987,11 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) {
// Here we would go to stateHalfClosedLocal in
// theory, but since our handler is done and
// the net/http package provides no mechanism
- // for finishing writing to a ResponseWriter
- // while still reading data (see possible TODO
- // at top of this file), we go into closed
- // state here anyway, after telling the peer
- // we're hanging up on them.
+ // for closing a ResponseWriter while still
+ // reading data (see possible TODO at top of
+ // this file), we go into closed state here
+ // anyway, after telling the peer we're
+ // hanging up on them.
st.state = stateHalfClosedLocal // won't last long, but necessary for closeStream via resetStream
errCancel := streamError(st.id, ErrCodeCancel)
sc.resetStream(errCancel)
@@ -946,47 +1017,61 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) {
// flush the write buffer.
func (sc *serverConn) scheduleFrameWrite() {
sc.serveG.check()
- if sc.writingFrame {
- return
- }
- if sc.needToSendGoAway {
- sc.needToSendGoAway = false
- sc.startFrameWrite(frameWriteMsg{
- write: &writeGoAway{
- maxStreamID: sc.maxStreamID,
- code: sc.goAwayCode,
- },
- })
- return
- }
- if sc.needToSendSettingsAck {
- sc.needToSendSettingsAck = false
- sc.startFrameWrite(frameWriteMsg{write: writeSettingsAck{}})
+ if sc.writingFrame || sc.inFrameScheduleLoop {
return
}
- if !sc.inGoAway {
- if wm, ok := sc.writeSched.take(); ok {
- sc.startFrameWrite(wm)
- return
+ sc.inFrameScheduleLoop = true
+ for !sc.writingFrameAsync {
+ if sc.needToSendGoAway {
+ sc.needToSendGoAway = false
+ sc.startFrameWrite(FrameWriteRequest{
+ write: &writeGoAway{
+ maxStreamID: sc.maxStreamID,
+ code: sc.goAwayCode,
+ },
+ })
+ continue
}
+ if sc.needToSendSettingsAck {
+ sc.needToSendSettingsAck = false
+ sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}})
+ continue
+ }
+ if !sc.inGoAway || sc.goAwayCode == ErrCodeNo {
+ if wr, ok := sc.writeSched.Pop(); ok {
+ sc.startFrameWrite(wr)
+ continue
+ }
+ }
+ if sc.needsFrameFlush {
+ sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}})
+ sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
+ continue
+ }
+ break
}
- if sc.needsFrameFlush {
- sc.startFrameWrite(frameWriteMsg{write: flushFrameWriter{}})
- sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
- return
- }
+ sc.inFrameScheduleLoop = false
}
func (sc *serverConn) goAway(code ErrCode) {
sc.serveG.check()
- if sc.inGoAway {
- return
- }
+ var forceCloseIn time.Duration
if code != ErrCodeNo {
- sc.shutDownIn(250 * time.Millisecond)
+ forceCloseIn = 250 * time.Millisecond
} else {
// TODO: configurable
- sc.shutDownIn(1 * time.Second)
+ forceCloseIn = 1 * time.Second
+ }
+ sc.goAwayIn(code, forceCloseIn)
+}
+
+func (sc *serverConn) goAwayIn(code ErrCode, forceCloseIn time.Duration) {
+ sc.serveG.check()
+ if sc.inGoAway {
+ return
+ }
+ if forceCloseIn != 0 {
+ sc.shutDownIn(forceCloseIn)
}
sc.inGoAway = true
sc.needToSendGoAway = true
@@ -1002,7 +1087,7 @@ func (sc *serverConn) shutDownIn(d time.Duration) {
func (sc *serverConn) resetStream(se StreamError) {
sc.serveG.check()
- sc.writeFrame(frameWriteMsg{write: se})
+ sc.writeFrame(FrameWriteRequest{write: se})
if st, ok := sc.streams[se.StreamID]; ok {
st.sentReset = true
sc.closeStream(st, se)
@@ -1115,15 +1200,28 @@ func (sc *serverConn) processPing(f *PingFrame) error {
// PROTOCOL_ERROR."
return ConnectionError(ErrCodeProtocol)
}
- sc.writeFrame(frameWriteMsg{write: writePingAck{f}})
+ if sc.inGoAway {
+ return nil
+ }
+ sc.writeFrame(FrameWriteRequest{write: writePingAck{f}})
return nil
}
func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
sc.serveG.check()
+ if sc.inGoAway {
+ return nil
+ }
switch {
case f.StreamID != 0: // stream-level flow control
- st := sc.streams[f.StreamID]
+ state, st := sc.state(f.StreamID)
+ if state == stateIdle {
+ // Section 5.1: "Receiving any frame other than HEADERS
+ // or PRIORITY on a stream in this state MUST be
+ // treated as a connection error (Section 5.4.1) of
+ // type PROTOCOL_ERROR."
+ return ConnectionError(ErrCodeProtocol)
+ }
if st == nil {
// "WINDOW_UPDATE can be sent by a peer that has sent a
// frame bearing the END_STREAM flag. This means that a
@@ -1146,6 +1244,9 @@ func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
sc.serveG.check()
+ if sc.inGoAway {
+ return nil
+ }
state, st := sc.state(f.StreamID)
if state == stateIdle {
@@ -1170,11 +1271,18 @@ func (sc *serverConn) closeStream(st *stream, err error) {
panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
}
st.state = stateClosed
- sc.curOpenStreams--
- if sc.curOpenStreams == 0 {
+ if st.isPushed() {
+ sc.curPushedStreams--
+ } else {
+ sc.curClientStreams--
+ }
+ if sc.curClientStreams+sc.curPushedStreams == 0 {
sc.setConnState(http.StateIdle)
}
delete(sc.streams, st.id)
+ if len(sc.streams) == 0 && sc.srv.IdleTimeout != 0 {
+ sc.idleTimer.Reset(sc.srv.IdleTimeout)
+ }
if p := st.body; p != nil {
// Return any buffered unread bytes worth of conn-level flow control.
// See golang.org/issue/16481
@@ -1183,19 +1291,7 @@ func (sc *serverConn) closeStream(st *stream, err error) {
p.CloseWithError(err)
}
st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
- sc.writeSched.forgetStream(st.id)
- if st.reqBuf != nil {
- // Stash this request body buffer (64k) away for reuse
- // by a future POST/PUT/etc.
- //
- // TODO(bradfitz): share on the server? sync.Pool?
- // Server requires locks and might hurt contention.
- // sync.Pool might work, or might be worse, depending
- // on goroutine CPU migrations. (get and put on
- // separate CPUs). Maybe a mix of strategies. But
- // this is an easy win for now.
- sc.freeRequestBodyBuf = st.reqBuf
- }
+ sc.writeSched.CloseStream(st.id)
}
func (sc *serverConn) processSettings(f *SettingsFrame) error {
@@ -1210,6 +1306,9 @@ func (sc *serverConn) processSettings(f *SettingsFrame) error {
}
return nil
}
+ if sc.inGoAway {
+ return nil
+ }
if err := f.ForeachSetting(sc.processSetting); err != nil {
return err
}
@@ -1237,7 +1336,7 @@ func (sc *serverConn) processSetting(s Setting) error {
case SettingInitialWindowSize:
return sc.processSettingInitialWindowSize(s.Val)
case SettingMaxFrameSize:
- sc.writeSched.maxFrameSize = s.Val
+ sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31
case SettingMaxHeaderListSize:
sc.peerMaxHeaderListSize = s.Val
default:
@@ -1281,14 +1380,24 @@ func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
func (sc *serverConn) processData(f *DataFrame) error {
sc.serveG.check()
+ if sc.inGoAway {
+ return nil
+ }
data := f.Data()
// "If a DATA frame is received whose stream is not in "open"
// or "half closed (local)" state, the recipient MUST respond
// with a stream error (Section 5.4.2) of type STREAM_CLOSED."
id := f.Header().StreamID
- st, ok := sc.streams[id]
- if !ok || st.state != stateOpen || st.gotTrailerHeader {
+ state, st := sc.state(id)
+ if id == 0 || state == stateIdle {
+ // Section 5.1: "Receiving any frame other than HEADERS
+ // or PRIORITY on a stream in this state MUST be
+ // treated as a connection error (Section 5.4.1) of
+ // type PROTOCOL_ERROR."
+ return ConnectionError(ErrCodeProtocol)
+ }
+ if st == nil || state != stateOpen || st.gotTrailerHeader {
// This includes sending a RST_STREAM if the stream is
// in stateHalfClosedLocal (which currently means that
// the http.Handler returned, so it's done reading &
@@ -1350,6 +1459,11 @@ func (sc *serverConn) processData(f *DataFrame) error {
return nil
}
+// isPushed reports whether the stream is server-initiated.
+func (st *stream) isPushed() bool {
+ return st.id%2 == 0
+}
+
// endStream closes a Request.Body's pipe. It is called when a DATA
// frame says a request body is over (or after trailers).
func (st *stream) endStream() {
@@ -1379,12 +1493,12 @@ func (st *stream) copyTrailersToHandlerRequest() {
func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
sc.serveG.check()
- id := f.Header().StreamID
+ id := f.StreamID
if sc.inGoAway {
// Ignore.
return nil
}
- // http://http2.github.io/http2-spec/#rfc.section.5.1.1
+ // http://tools.ietf.org/html/rfc7540#section-5.1.1
// Streams initiated by a client MUST use odd-numbered stream
// identifiers. [...] An endpoint that receives an unexpected
// stream identifier MUST respond with a connection error
@@ -1396,8 +1510,7 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
// send a trailer for an open one. If we already have a stream
// open, let it process its own HEADERS frame (trailers at this
// point, if it's valid).
- st := sc.streams[f.Header().StreamID]
- if st != nil {
+ if st := sc.streams[f.StreamID]; st != nil {
return st.processTrailerHeaders(f)
}
@@ -1411,49 +1524,40 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
}
sc.maxStreamID = id
- ctx, cancelCtx := contextWithCancel(sc.baseCtx)
- st = &stream{
- sc: sc,
- id: id,
- state: stateOpen,
- ctx: ctx,
- cancelCtx: cancelCtx,
+ if sc.idleTimer != nil {
+ sc.idleTimer.Stop()
}
- if f.StreamEnded() {
- st.state = stateHalfClosedRemote
- }
- st.cw.Init()
- st.flow.conn = &sc.flow // link to conn-level counter
- st.flow.add(sc.initialWindowSize)
- st.inflow.conn = &sc.inflow // link to conn-level counter
- st.inflow.add(initialWindowSize) // TODO: update this when we send a higher initial window size in the initial settings
-
- sc.streams[id] = st
- if f.HasPriority() {
- adjustStreamPriority(sc.streams, st.id, f.Priority)
- }
- sc.curOpenStreams++
- if sc.curOpenStreams == 1 {
- sc.setConnState(http.StateActive)
- }
- if sc.curOpenStreams > sc.advMaxStreams {
- // "Endpoints MUST NOT exceed the limit set by their
- // peer. An endpoint that receives a HEADERS frame
- // that causes their advertised concurrent stream
- // limit to be exceeded MUST treat this as a stream
- // error (Section 5.4.2) of type PROTOCOL_ERROR or
- // REFUSED_STREAM."
+ // http://tools.ietf.org/html/rfc7540#section-5.1.2
+ // [...] Endpoints MUST NOT exceed the limit set by their peer. An
+ // endpoint that receives a HEADERS frame that causes their
+ // advertised concurrent stream limit to be exceeded MUST treat
+ // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR
+ // or REFUSED_STREAM.
+ if sc.curClientStreams+1 > sc.advMaxStreams {
if sc.unackedSettings == 0 {
// They should know better.
- return streamError(st.id, ErrCodeProtocol)
+ return streamError(id, ErrCodeProtocol)
}
// Assume it's a network race, where they just haven't
// received our last SETTINGS update. But actually
// this can't happen yet, because we don't yet provide
// a way for users to adjust server parameters at
// runtime.
- return streamError(st.id, ErrCodeRefusedStream)
+ return streamError(id, ErrCodeRefusedStream)
+ }
+
+ initialState := stateOpen
+ if f.StreamEnded() {
+ initialState = stateHalfClosedRemote
+ }
+ st := sc.newStream(id, 0, initialState)
+
+ if f.HasPriority() {
+ if err := checkPriority(f.StreamID, f.Priority); err != nil {
+ return err
+ }
+ sc.writeSched.AdjustStream(st.id, f.Priority)
}
rw, req, err := sc.newWriterAndRequest(st, f)
@@ -1471,19 +1575,17 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
if f.Truncated {
// Their header list was too long. Send a 431 error.
handler = handleHeaderListTooLong
- } else if err := checkValidHTTP2Request(req); err != nil {
+ } else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil {
handler = new400Handler(err)
}
// The net/http package sets the read deadline from the
// http.Server.ReadTimeout during the TLS handshake, but then
// passes the connection off to us with the deadline already
- // set. Disarm it here after the request headers are read, similar
- // to how the http1 server works.
- // Unlike http1, though, we never re-arm it yet, though.
- // TODO(bradfitz): figure out golang.org/issue/14204
- // (IdleTimeout) and how this relates. Maybe the default
- // IdleTimeout is ReadTimeout.
+ // set. Disarm it here after the request headers are read,
+ // similar to how the http1 server works. Here it's
+ // technically more like the http1 Server's ReadHeaderTimeout
+ // (in Go 1.8), though. That's a more sane option anyway.
if sc.hs.ReadTimeout != 0 {
sc.conn.SetReadDeadline(time.Time{})
}
@@ -1522,62 +1624,78 @@ func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error {
return nil
}
+func checkPriority(streamID uint32, p PriorityParam) error {
+ if streamID == p.StreamDep {
+ // Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat
+ // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR."
+ // Section 5.3.3 says that a stream can depend on one of its dependencies,
+ // so it's only self-dependencies that are forbidden.
+ return streamError(streamID, ErrCodeProtocol)
+ }
+ return nil
+}
+
func (sc *serverConn) processPriority(f *PriorityFrame) error {
- adjustStreamPriority(sc.streams, f.StreamID, f.PriorityParam)
+ if sc.inGoAway {
+ return nil
+ }
+ if err := checkPriority(f.StreamID, f.PriorityParam); err != nil {
+ return err
+ }
+ sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam)
return nil
}
-func adjustStreamPriority(streams map[uint32]*stream, streamID uint32, priority PriorityParam) {
- st, ok := streams[streamID]
- if !ok {
- // TODO: not quite correct (this streamID might
- // already exist in the dep tree, but be closed), but
- // close enough for now.
- return
+func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream {
+ sc.serveG.check()
+ if id == 0 {
+ panic("internal error: cannot create stream with id 0")
}
- st.weight = priority.Weight
- parent := streams[priority.StreamDep] // might be nil
- if parent == st {
- // if client tries to set this stream to be the parent of itself
- // ignore and keep going
- return
+
+ ctx, cancelCtx := contextWithCancel(sc.baseCtx)
+ st := &stream{
+ sc: sc,
+ id: id,
+ state: state,
+ ctx: ctx,
+ cancelCtx: cancelCtx,
}
+ st.cw.Init()
+ st.flow.conn = &sc.flow // link to conn-level counter
+ st.flow.add(sc.initialWindowSize)
+ st.inflow.conn = &sc.inflow // link to conn-level counter
+ st.inflow.add(initialWindowSize) // TODO: update this when we send a higher initial window size in the initial settings
- // section 5.3.3: If a stream is made dependent on one of its
- // own dependencies, the formerly dependent stream is first
- // moved to be dependent on the reprioritized stream's previous
- // parent. The moved dependency retains its weight.
- for piter := parent; piter != nil; piter = piter.parent {
- if piter == st {
- parent.parent = st.parent
- break
- }
+ sc.streams[id] = st
+ sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID})
+ if st.isPushed() {
+ sc.curPushedStreams++
+ } else {
+ sc.curClientStreams++
}
- st.parent = parent
- if priority.Exclusive && (st.parent != nil || priority.StreamDep == 0) {
- for _, openStream := range streams {
- if openStream != st && openStream.parent == st.parent {
- openStream.parent = st
- }
- }
+ if sc.curClientStreams+sc.curPushedStreams == 1 {
+ sc.setConnState(http.StateActive)
}
+
+ return st
}
func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *http.Request, error) {
sc.serveG.check()
- method := f.PseudoValue("method")
- path := f.PseudoValue("path")
- scheme := f.PseudoValue("scheme")
- authority := f.PseudoValue("authority")
+ rp := requestParam{
+ method: f.PseudoValue("method"),
+ scheme: f.PseudoValue("scheme"),
+ authority: f.PseudoValue("authority"),
+ path: f.PseudoValue("path"),
+ }
- isConnect := method == "CONNECT"
+ isConnect := rp.method == "CONNECT"
if isConnect {
- if path != "" || scheme != "" || authority == "" {
+ if rp.path != "" || rp.scheme != "" || rp.authority == "" {
return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
}
- } else if method == "" || path == "" ||
- (scheme != "https" && scheme != "http") {
+ } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") {
// See 8.1.2.6 Malformed Requests and Responses:
//
// Malformed requests or responses that are detected
@@ -1592,36 +1710,64 @@ func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*res
}
bodyOpen := !f.StreamEnded()
- if method == "HEAD" && bodyOpen {
+ if rp.method == "HEAD" && bodyOpen {
// HEAD requests can't have bodies
return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
}
- var tlsState *tls.ConnectionState // nil if not scheme https
- if scheme == "https" {
- tlsState = sc.tlsState
+ rp.header = make(http.Header)
+ for _, hf := range f.RegularFields() {
+ rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value)
+ }
+ if rp.authority == "" {
+ rp.authority = rp.header.Get("Host")
}
- header := make(http.Header)
- for _, hf := range f.RegularFields() {
- header.Add(sc.canonicalHeader(hf.Name), hf.Value)
+ rw, req, err := sc.newWriterAndRequestNoBody(st, rp)
+ if err != nil {
+ return nil, nil, err
+ }
+ if bodyOpen {
+ st.reqBuf = getRequestBodyBuf()
+ req.Body.(*requestBody).pipe = &pipe{
+ b: &fixedBuffer{buf: st.reqBuf},
+ }
+
+ if vv, ok := rp.header["Content-Length"]; ok {
+ req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
+ } else {
+ req.ContentLength = -1
+ }
}
+ return rw, req, nil
+}
+
+type requestParam struct {
+ method string
+ scheme, authority, path string
+ header http.Header
+}
- if authority == "" {
- authority = header.Get("Host")
+func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*responseWriter, *http.Request, error) {
+ sc.serveG.check()
+
+ var tlsState *tls.ConnectionState // nil if not scheme https
+ if rp.scheme == "https" {
+ tlsState = sc.tlsState
}
- needsContinue := header.Get("Expect") == "100-continue"
+
+ needsContinue := rp.header.Get("Expect") == "100-continue"
if needsContinue {
- header.Del("Expect")
+ rp.header.Del("Expect")
}
// Merge Cookie headers into one "; "-delimited value.
- if cookies := header["Cookie"]; len(cookies) > 1 {
- header.Set("Cookie", strings.Join(cookies, "; "))
+ if cookies := rp.header["Cookie"]; len(cookies) > 1 {
+ rp.header.Set("Cookie", strings.Join(cookies, "; "))
}
// Setup Trailers
var trailer http.Header
- for _, v := range header["Trailer"] {
+ for _, v := range rp.header["Trailer"] {
for _, key := range strings.Split(v, ",") {
key = http.CanonicalHeaderKey(strings.TrimSpace(key))
switch key {
@@ -1636,57 +1782,42 @@ func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*res
}
}
}
- delete(header, "Trailer")
+ delete(rp.header, "Trailer")
- body := &requestBody{
- conn: sc,
- stream: st,
- needsContinue: needsContinue,
- }
var url_ *url.URL
var requestURI string
- if isConnect {
- url_ = &url.URL{Host: authority}
- requestURI = authority // mimic HTTP/1 server behavior
+ if rp.method == "CONNECT" {
+ url_ = &url.URL{Host: rp.authority}
+ requestURI = rp.authority // mimic HTTP/1 server behavior
} else {
var err error
- url_, err = url.ParseRequestURI(path)
+ url_, err = url.ParseRequestURI(rp.path)
if err != nil {
- return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
+ return nil, nil, streamError(st.id, ErrCodeProtocol)
}
- requestURI = path
+ requestURI = rp.path
+ }
+
+ body := &requestBody{
+ conn: sc,
+ stream: st,
+ needsContinue: needsContinue,
}
req := &http.Request{
- Method: method,
+ Method: rp.method,
URL: url_,
RemoteAddr: sc.remoteAddrStr,
- Header: header,
+ Header: rp.header,
RequestURI: requestURI,
Proto: "HTTP/2.0",
ProtoMajor: 2,
ProtoMinor: 0,
TLS: tlsState,
- Host: authority,
+ Host: rp.authority,
Body: body,
Trailer: trailer,
}
req = requestWithContext(req, st.ctx)
- if bodyOpen {
- // Disabled, per golang.org/issue/14960:
- // st.reqBuf = sc.getRequestBodyBuf()
- // TODO: remove this 64k of garbage per request (again, but without a data race):
- buf := make([]byte, initialWindowSize)
-
- body.pipe = &pipe{
- b: &fixedBuffer{buf: buf},
- }
-
- if vv, ok := header["Content-Length"]; ok {
- req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
- } else {
- req.ContentLength = -1
- }
- }
rws := responseWriterStatePool.Get().(*responseWriterState)
bwSave := rws.bw
@@ -1702,13 +1833,22 @@ func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*res
return rw, req, nil
}
-func (sc *serverConn) getRequestBodyBuf() []byte {
- sc.serveG.check()
- if buf := sc.freeRequestBodyBuf; buf != nil {
- sc.freeRequestBodyBuf = nil
- return buf
+var reqBodyCache = make(chan []byte, 8)
+
+func getRequestBodyBuf() []byte {
+ select {
+ case b := <-reqBodyCache:
+ return b
+ default:
+ return make([]byte, initialWindowSize)
+ }
+}
+
+func putRequestBodyBuf(b []byte) {
+ select {
+ case reqBodyCache <- b:
+ default:
}
- return make([]byte, initialWindowSize)
}
// Run on its own goroutine.
@@ -1722,7 +1862,7 @@ func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
- sc.writeFrameFromHandler(frameWriteMsg{
+ sc.writeFrameFromHandler(FrameWriteRequest{
write: handlerPanicRST{rw.rws.stream.id},
stream: rw.rws.stream,
})
@@ -1757,7 +1897,7 @@ func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) erro
// mutates it.
errc = errChanPool.Get().(chan error)
}
- if err := sc.writeFrameFromHandler(frameWriteMsg{
+ if err := sc.writeFrameFromHandler(FrameWriteRequest{
write: headerData,
stream: st,
done: errc,
@@ -1780,7 +1920,7 @@ func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) erro
// called from handler goroutines.
func (sc *serverConn) write100ContinueHeaders(st *stream) {
- sc.writeFrameFromHandler(frameWriteMsg{
+ sc.writeFrameFromHandler(FrameWriteRequest{
write: write100ContinueHeadersFrame{st.id},
stream: st,
})
@@ -1796,11 +1936,19 @@ type bodyReadMsg struct {
// called from handler goroutines.
// Notes that the handler for the given stream ID read n bytes of its body
// and schedules flow control tokens to be sent.
-func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int) {
+func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {
sc.serveG.checkNotOn() // NOT on
- select {
- case sc.bodyReadCh <- bodyReadMsg{st, n}:
- case <-sc.doneServing:
+ if n > 0 {
+ select {
+ case sc.bodyReadCh <- bodyReadMsg{st, n}:
+ case <-sc.doneServing:
+ }
+ }
+ if err == io.EOF {
+ if buf := st.reqBuf; buf != nil {
+ st.reqBuf = nil // shouldn't matter; field unused by other
+ putRequestBodyBuf(buf)
+ }
}
}
@@ -1843,7 +1991,7 @@ func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
if st != nil {
streamID = st.id
}
- sc.writeFrame(frameWriteMsg{
+ sc.writeFrame(FrameWriteRequest{
write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
stream: st,
})
@@ -1858,16 +2006,19 @@ func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
}
}
+// requestBody is the Handler's Request.Body type.
+// Read and Close may be called concurrently.
type requestBody struct {
stream *stream
conn *serverConn
- closed bool
+ closed bool // for use by Close only
+ sawEOF bool // for use by Read only
pipe *pipe // non-nil if we have a HTTP entity message body
needsContinue bool // need to send a 100-continue
}
func (b *requestBody) Close() error {
- if b.pipe != nil {
+ if b.pipe != nil && !b.closed {
b.pipe.BreakWithError(errClosedBody)
}
b.closed = true
@@ -1879,13 +2030,17 @@ func (b *requestBody) Read(p []byte) (n int, err error) {
b.needsContinue = false
b.conn.write100ContinueHeaders(b.stream)
}
- if b.pipe == nil {
+ if b.pipe == nil || b.sawEOF {
return 0, io.EOF
}
n, err = b.pipe.Read(p)
- if n > 0 {
- b.conn.noteBodyReadFromHandler(b.stream, n)
+ if err == io.EOF {
+ b.sawEOF = true
}
+ if b.conn == nil && inTests {
+ return
+ }
+ b.conn.noteBodyReadFromHandler(b.stream, n, err)
return
}
@@ -2220,6 +2375,194 @@ func (w *responseWriter) handlerDone() {
responseWriterStatePool.Put(rws)
}
+// Push errors.
+var (
+ ErrRecursivePush = errors.New("http2: recursive push not allowed")
+ ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS")
+)
+
+// pushOptions is the internal version of http.PushOptions, which we
+// cannot include here because it's only defined in Go 1.8 and later.
+type pushOptions struct {
+ Method string
+ Header http.Header
+}
+
+func (w *responseWriter) push(target string, opts pushOptions) error {
+ st := w.rws.stream
+ sc := st.sc
+ sc.serveG.checkNotOn()
+
+ // No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream."
+ // http://tools.ietf.org/html/rfc7540#section-6.6
+ if st.isPushed() {
+ return ErrRecursivePush
+ }
+
+ // Default options.
+ if opts.Method == "" {
+ opts.Method = "GET"
+ }
+ if opts.Header == nil {
+ opts.Header = http.Header{}
+ }
+ wantScheme := "http"
+ if w.rws.req.TLS != nil {
+ wantScheme = "https"
+ }
+
+ // Validate the request.
+ u, err := url.Parse(target)
+ if err != nil {
+ return err
+ }
+ if u.Scheme == "" {
+ if !strings.HasPrefix(target, "/") {
+ return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target)
+ }
+ u.Scheme = wantScheme
+ u.Host = w.rws.req.Host
+ } else {
+ if u.Scheme != wantScheme {
+ return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme)
+ }
+ if u.Host == "" {
+ return errors.New("URL must have a host")
+ }
+ }
+ for k := range opts.Header {
+ if strings.HasPrefix(k, ":") {
+ return fmt.Errorf("promised request headers cannot include psuedo header %q", k)
+ }
+ // These headers are meaningful only if the request has a body,
+ // but PUSH_PROMISE requests cannot have a body.
+ // http://tools.ietf.org/html/rfc7540#section-8.2
+ // Also disallow Host, since the promised URL must be absolute.
+ switch strings.ToLower(k) {
+ case "content-length", "content-encoding", "trailer", "te", "expect", "host":
+ return fmt.Errorf("promised request headers cannot include %q", k)
+ }
+ }
+ if err := checkValidHTTP2RequestHeaders(opts.Header); err != nil {
+ return err
+ }
+
+ // The RFC effectively limits promised requests to GET and HEAD:
+ // "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]"
+ // http://tools.ietf.org/html/rfc7540#section-8.2
+ if opts.Method != "GET" && opts.Method != "HEAD" {
+ return fmt.Errorf("method %q must be GET or HEAD", opts.Method)
+ }
+
+ msg := startPushRequest{
+ parent: st,
+ method: opts.Method,
+ url: u,
+ header: cloneHeader(opts.Header),
+ done: errChanPool.Get().(chan error),
+ }
+
+ select {
+ case <-sc.doneServing:
+ return errClientDisconnected
+ case <-st.cw:
+ return errStreamClosed
+ case sc.wantStartPushCh <- msg:
+ }
+
+ select {
+ case <-sc.doneServing:
+ return errClientDisconnected
+ case <-st.cw:
+ return errStreamClosed
+ case err := <-msg.done:
+ errChanPool.Put(msg.done)
+ return err
+ }
+}
+
+type startPushRequest struct {
+ parent *stream
+ method string
+ url *url.URL
+ header http.Header
+ done chan error
+}
+
+func (sc *serverConn) startPush(msg startPushRequest) {
+ sc.serveG.check()
+
+ // http://tools.ietf.org/html/rfc7540#section-6.6.
+ // PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
+ // is in either the "open" or "half-closed (remote)" state.
+ if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote {
+ // responseWriter.Push checks that the stream is peer-initiaed.
+ msg.done <- errStreamClosed
+ return
+ }
+
+ // http://tools.ietf.org/html/rfc7540#section-6.6.
+ if !sc.pushEnabled {
+ msg.done <- http.ErrNotSupported
+ return
+ }
+
+ // PUSH_PROMISE frames must be sent in increasing order by stream ID, so
+ // we allocate an ID for the promised stream lazily, when the PUSH_PROMISE
+ // is written. Once the ID is allocated, we start the request handler.
+ allocatePromisedID := func() (uint32, error) {
+ sc.serveG.check()
+
+ // Check this again, just in case. Technically, we might have received
+ // an updated SETTINGS by the time we got around to writing this frame.
+ if !sc.pushEnabled {
+ return 0, http.ErrNotSupported
+ }
+ // http://tools.ietf.org/html/rfc7540#section-6.5.2.
+ if sc.curPushedStreams+1 > sc.clientMaxStreams {
+ return 0, ErrPushLimitReached
+ }
+
+ // http://tools.ietf.org/html/rfc7540#section-5.1.1.
+ // Streams initiated by the server MUST use even-numbered identifiers.
+ sc.maxPushPromiseID += 2
+ promisedID := sc.maxPushPromiseID
+
+ // http://tools.ietf.org/html/rfc7540#section-8.2.
+ // Strictly speaking, the new stream should start in "reserved (local)", then
+ // transition to "half closed (remote)" after sending the initial HEADERS, but
+ // we start in "half closed (remote)" for simplicity.
+ // See further comments at the definition of stateHalfClosedRemote.
+ promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote)
+ rw, req, err := sc.newWriterAndRequestNoBody(promised, requestParam{
+ method: msg.method,
+ scheme: msg.url.Scheme,
+ authority: msg.url.Host,
+ path: msg.url.RequestURI(),
+ header: msg.header,
+ })
+ if err != nil {
+ // Should not happen, since we've already validated msg.url.
+ panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err))
+ }
+
+ go sc.runHandler(rw, req, sc.handler.ServeHTTP)
+ return promisedID, nil
+ }
+
+ sc.writeFrame(FrameWriteRequest{
+ write: &writePushPromise{
+ streamID: msg.parent.id,
+ method: msg.method,
+ url: msg.url,
+ h: msg.header,
+ allocatePromisedID: allocatePromisedID,
+ },
+ stream: msg.parent,
+ done: msg.done,
+ })
+}
+
// foreachHeaderElement splits v according to the "#rule" construction
// in RFC 2616 section 2.1 and calls fn for each non-empty element.
func foreachHeaderElement(v string, fn func(string)) {
@@ -2247,16 +2590,16 @@ var connHeaders = []string{
"Upgrade",
}
-// checkValidHTTP2Request checks whether req is a valid HTTP/2 request,
+// checkValidHTTP2RequestHeaders checks whether h is a valid HTTP/2 request,
// per RFC 7540 Section 8.1.2.2.
// The returned error is reported to users.
-func checkValidHTTP2Request(req *http.Request) error {
- for _, h := range connHeaders {
- if _, ok := req.Header[h]; ok {
- return fmt.Errorf("request header %q is not valid in HTTP/2", h)
+func checkValidHTTP2RequestHeaders(h http.Header) error {
+ for _, k := range connHeaders {
+ if _, ok := h[k]; ok {
+ return fmt.Errorf("request header %q is not valid in HTTP/2", k)
}
}
- te := req.Header["Te"]
+ te := h["Te"]
if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) {
return errors.New(`request header "TE" may only be "trailers" in HTTP/2`)
}