aboutsummaryrefslogtreecommitdiff
path: root/vendor/golang.org/x/net
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/golang.org/x/net')
-rw-r--r--vendor/golang.org/x/net/http2/client_conn_pool.go7
-rw-r--r--vendor/golang.org/x/net/http2/errors.go8
-rw-r--r--vendor/golang.org/x/net/http2/frame.go44
-rw-r--r--vendor/golang.org/x/net/http2/http2.go13
-rw-r--r--vendor/golang.org/x/net/http2/server.go79
-rw-r--r--vendor/golang.org/x/net/http2/transport.go374
6 files changed, 363 insertions, 162 deletions
diff --git a/vendor/golang.org/x/net/http2/client_conn_pool.go b/vendor/golang.org/x/net/http2/client_conn_pool.go
index cb34cc2..b139412 100644
--- a/vendor/golang.org/x/net/http2/client_conn_pool.go
+++ b/vendor/golang.org/x/net/http2/client_conn_pool.go
@@ -55,11 +55,11 @@ const (
func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
if isConnectionCloseRequest(req) && dialOnMiss {
// It gets its own connection.
- cc, err := p.t.dialClientConn(addr)
+ const singleUse = true
+ cc, err := p.t.dialClientConn(addr, singleUse)
if err != nil {
return nil, err
}
- cc.singleUse = true
return cc, nil
}
p.mu.Lock()
@@ -104,7 +104,8 @@ func (p *clientConnPool) getStartDialLocked(addr string) *dialCall {
// run in its own goroutine.
func (c *dialCall) dial(addr string) {
- c.res, c.err = c.p.t.dialClientConn(addr)
+ const singleUse = false // shared conn
+ c.res, c.err = c.p.t.dialClientConn(addr, singleUse)
close(c.done)
c.p.mu.Lock()
diff --git a/vendor/golang.org/x/net/http2/errors.go b/vendor/golang.org/x/net/http2/errors.go
index 71a4e29..20fd762 100644
--- a/vendor/golang.org/x/net/http2/errors.go
+++ b/vendor/golang.org/x/net/http2/errors.go
@@ -64,9 +64,17 @@ func (e ConnectionError) Error() string { return fmt.Sprintf("connection error:
type StreamError struct {
StreamID uint32
Code ErrCode
+ Cause error // optional additional detail
+}
+
+func streamError(id uint32, code ErrCode) StreamError {
+ return StreamError{StreamID: id, Code: code}
}
func (e StreamError) Error() string {
+ if e.Cause != nil {
+ return fmt.Sprintf("stream error: stream ID %d; %v; %v", e.StreamID, e.Code, e.Cause)
+ }
return fmt.Sprintf("stream error: stream ID %d; %v", e.StreamID, e.Code)
}
diff --git a/vendor/golang.org/x/net/http2/frame.go b/vendor/golang.org/x/net/http2/frame.go
index 981d407..c9b09bb 100644
--- a/vendor/golang.org/x/net/http2/frame.go
+++ b/vendor/golang.org/x/net/http2/frame.go
@@ -594,6 +594,7 @@ func parseDataFrame(fh FrameHeader, payload []byte) (Frame, error) {
var (
errStreamID = errors.New("invalid stream ID")
errDepStreamID = errors.New("invalid dependent stream ID")
+ errPadLength = errors.New("pad length too large")
)
func validStreamIDOrZero(streamID uint32) bool {
@@ -607,18 +608,40 @@ func validStreamID(streamID uint32) bool {
// WriteData writes a DATA frame.
//
// It will perform exactly one Write to the underlying Writer.
-// It is the caller's responsibility to not call other Write methods concurrently.
+// It is the caller's responsibility not to violate the maximum frame size
+// and to not call other Write methods concurrently.
func (f *Framer) WriteData(streamID uint32, endStream bool, data []byte) error {
- // TODO: ignoring padding for now. will add when somebody cares.
+ return f.WriteDataPadded(streamID, endStream, data, nil)
+}
+
+// WriteData writes a DATA frame with optional padding.
+//
+// If pad is nil, the padding bit is not sent.
+// The length of pad must not exceed 255 bytes.
+//
+// It will perform exactly one Write to the underlying Writer.
+// It is the caller's responsibility not to violate the maximum frame size
+// and to not call other Write methods concurrently.
+func (f *Framer) WriteDataPadded(streamID uint32, endStream bool, data, pad []byte) error {
if !validStreamID(streamID) && !f.AllowIllegalWrites {
return errStreamID
}
+ if len(pad) > 255 {
+ return errPadLength
+ }
var flags Flags
if endStream {
flags |= FlagDataEndStream
}
+ if pad != nil {
+ flags |= FlagDataPadded
+ }
f.startWrite(FrameData, flags, streamID)
+ if pad != nil {
+ f.wbuf = append(f.wbuf, byte(len(pad)))
+ }
f.wbuf = append(f.wbuf, data...)
+ f.wbuf = append(f.wbuf, pad...)
return f.endWrite()
}
@@ -840,7 +863,7 @@ func parseWindowUpdateFrame(fh FrameHeader, p []byte) (Frame, error) {
if fh.StreamID == 0 {
return nil, ConnectionError(ErrCodeProtocol)
}
- return nil, StreamError{fh.StreamID, ErrCodeProtocol}
+ return nil, streamError(fh.StreamID, ErrCodeProtocol)
}
return &WindowUpdateFrame{
FrameHeader: fh,
@@ -921,7 +944,7 @@ func parseHeadersFrame(fh FrameHeader, p []byte) (_ Frame, err error) {
}
}
if len(p)-int(padLength) <= 0 {
- return nil, StreamError{fh.StreamID, ErrCodeProtocol}
+ return nil, streamError(fh.StreamID, ErrCodeProtocol)
}
hf.headerFragBuf = p[:len(p)-int(padLength)]
return hf, nil
@@ -1396,6 +1419,9 @@ func (fr *Framer) readMetaFrame(hf *HeadersFrame) (*MetaHeadersFrame, error) {
hdec.SetEmitEnabled(true)
hdec.SetMaxStringLength(fr.maxHeaderStringLen())
hdec.SetEmitFunc(func(hf hpack.HeaderField) {
+ if VerboseLogs && logFrameReads {
+ log.Printf("http2: decoded hpack field %+v", hf)
+ }
if !httplex.ValidHeaderFieldValue(hf.Value) {
invalid = headerFieldValueError(hf.Value)
}
@@ -1454,11 +1480,17 @@ func (fr *Framer) readMetaFrame(hf *HeadersFrame) (*MetaHeadersFrame, error) {
}
if invalid != nil {
fr.errDetail = invalid
- return nil, StreamError{mh.StreamID, ErrCodeProtocol}
+ if VerboseLogs {
+ log.Printf("http2: invalid header: %v", invalid)
+ }
+ return nil, StreamError{mh.StreamID, ErrCodeProtocol, invalid}
}
if err := mh.checkPseudos(); err != nil {
fr.errDetail = err
- return nil, StreamError{mh.StreamID, ErrCodeProtocol}
+ if VerboseLogs {
+ log.Printf("http2: invalid pseudo headers: %v", err)
+ }
+ return nil, StreamError{mh.StreamID, ErrCodeProtocol, err}
}
return mh, nil
}
diff --git a/vendor/golang.org/x/net/http2/http2.go b/vendor/golang.org/x/net/http2/http2.go
index 0173aed..401923b 100644
--- a/vendor/golang.org/x/net/http2/http2.go
+++ b/vendor/golang.org/x/net/http2/http2.go
@@ -13,7 +13,8 @@
// See https://http2.github.io/ for more information on HTTP/2.
//
// See https://http2.golang.org/ for a test server running this code.
-package http2
+//
+package http2 // import "golang.org/x/net/http2"
import (
"bufio"
@@ -349,3 +350,13 @@ func (s *sorter) SortStrings(ss []string) {
sort.Sort(s)
s.v = save
}
+
+// validPseudoPath reports whether v is a valid :path pseudo-header
+// value. It must be a non-empty string starting with '/', and not
+// start with two slashes.
+// For now this is only used a quick check for deciding when to clean
+// up Opaque URLs before sending requests from the Transport.
+// See golang.org/issue/16847
+func validPseudoPath(v string) bool {
+ return len(v) > 0 && v[0] == '/' && (len(v) == 1 || v[1] != '/')
+}
diff --git a/vendor/golang.org/x/net/http2/server.go b/vendor/golang.org/x/net/http2/server.go
index f368738..8206fa7 100644
--- a/vendor/golang.org/x/net/http2/server.go
+++ b/vendor/golang.org/x/net/http2/server.go
@@ -922,7 +922,7 @@ func (sc *serverConn) wroteFrame(res frameWriteResult) {
// state here anyway, after telling the peer
// we're hanging up on them.
st.state = stateHalfClosedLocal // won't last long, but necessary for closeStream via resetStream
- errCancel := StreamError{st.id, ErrCodeCancel}
+ errCancel := streamError(st.id, ErrCodeCancel)
sc.resetStream(errCancel)
case stateHalfClosedRemote:
sc.closeStream(st, errHandlerComplete)
@@ -1133,7 +1133,7 @@ func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
return nil
}
if !st.flow.add(int32(f.Increment)) {
- return StreamError{f.StreamID, ErrCodeFlowControl}
+ return streamError(f.StreamID, ErrCodeFlowControl)
}
default: // connection-level flow control
if !sc.flow.add(int32(f.Increment)) {
@@ -1159,7 +1159,7 @@ func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
if st != nil {
st.gotReset = true
st.cancelCtx()
- sc.closeStream(st, StreamError{f.StreamID, f.ErrCode})
+ sc.closeStream(st, streamError(f.StreamID, f.ErrCode))
}
return nil
}
@@ -1176,6 +1176,10 @@ func (sc *serverConn) closeStream(st *stream, err error) {
}
delete(sc.streams, st.id)
if p := st.body; p != nil {
+ // Return any buffered unread bytes worth of conn-level flow control.
+ // See golang.org/issue/16481
+ sc.sendWindowUpdate(nil, p.Len())
+
p.CloseWithError(err)
}
st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
@@ -1277,6 +1281,8 @@ func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
func (sc *serverConn) processData(f *DataFrame) error {
sc.serveG.check()
+ data := f.Data()
+
// "If a DATA frame is received whose stream is not in "open"
// or "half closed (local)" state, the recipient MUST respond
// with a stream error (Section 5.4.2) of type STREAM_CLOSED."
@@ -1288,32 +1294,55 @@ func (sc *serverConn) processData(f *DataFrame) error {
// the http.Handler returned, so it's done reading &
// done writing). Try to stop the client from sending
// more DATA.
- return StreamError{id, ErrCodeStreamClosed}
+
+ // But still enforce their connection-level flow control,
+ // and return any flow control bytes since we're not going
+ // to consume them.
+ if sc.inflow.available() < int32(f.Length) {
+ return streamError(id, ErrCodeFlowControl)
+ }
+ // Deduct the flow control from inflow, since we're
+ // going to immediately add it back in
+ // sendWindowUpdate, which also schedules sending the
+ // frames.
+ sc.inflow.take(int32(f.Length))
+ sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
+
+ return streamError(id, ErrCodeStreamClosed)
}
if st.body == nil {
panic("internal error: should have a body in this state")
}
- data := f.Data()
// Sender sending more than they'd declared?
if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
- return StreamError{id, ErrCodeStreamClosed}
+ return streamError(id, ErrCodeStreamClosed)
}
- if len(data) > 0 {
+ if f.Length > 0 {
// Check whether the client has flow control quota.
- if int(st.inflow.available()) < len(data) {
- return StreamError{id, ErrCodeFlowControl}
+ if st.inflow.available() < int32(f.Length) {
+ return streamError(id, ErrCodeFlowControl)
}
- st.inflow.take(int32(len(data)))
- wrote, err := st.body.Write(data)
- if err != nil {
- return StreamError{id, ErrCodeStreamClosed}
+ st.inflow.take(int32(f.Length))
+
+ if len(data) > 0 {
+ wrote, err := st.body.Write(data)
+ if err != nil {
+ return streamError(id, ErrCodeStreamClosed)
+ }
+ if wrote != len(data) {
+ panic("internal error: bad Writer")
+ }
+ st.bodyBytes += int64(len(data))
}
- if wrote != len(data) {
- panic("internal error: bad Writer")
+
+ // Return any padded flow control now, since we won't
+ // refund it later on body reads.
+ if pad := int32(f.Length) - int32(len(data)); pad > 0 {
+ sc.sendWindowUpdate32(nil, pad)
+ sc.sendWindowUpdate32(st, pad)
}
- st.bodyBytes += int64(len(data))
}
if f.StreamEnded() {
st.endStream()
@@ -1417,14 +1446,14 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
// REFUSED_STREAM."
if sc.unackedSettings == 0 {
// They should know better.
- return StreamError{st.id, ErrCodeProtocol}
+ return streamError(st.id, ErrCodeProtocol)
}
// Assume it's a network race, where they just haven't
// received our last SETTINGS update. But actually
// this can't happen yet, because we don't yet provide
// a way for users to adjust server parameters at
// runtime.
- return StreamError{st.id, ErrCodeRefusedStream}
+ return streamError(st.id, ErrCodeRefusedStream)
}
rw, req, err := sc.newWriterAndRequest(st, f)
@@ -1458,11 +1487,11 @@ func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error {
}
st.gotTrailerHeader = true
if !f.StreamEnded() {
- return StreamError{st.id, ErrCodeProtocol}
+ return streamError(st.id, ErrCodeProtocol)
}
if len(f.PseudoFields()) > 0 {
- return StreamError{st.id, ErrCodeProtocol}
+ return streamError(st.id, ErrCodeProtocol)
}
if st.trailer != nil {
for _, hf := range f.RegularFields() {
@@ -1471,7 +1500,7 @@ func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error {
// TODO: send more details to the peer somehow. But http2 has
// no way to send debug data at a stream level. Discuss with
// HTTP folk.
- return StreamError{st.id, ErrCodeProtocol}
+ return streamError(st.id, ErrCodeProtocol)
}
st.trailer[key] = append(st.trailer[key], hf.Value)
}
@@ -1532,7 +1561,7 @@ func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*res
isConnect := method == "CONNECT"
if isConnect {
if path != "" || scheme != "" || authority == "" {
- return nil, nil, StreamError{f.StreamID, ErrCodeProtocol}
+ return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
}
} else if method == "" || path == "" ||
(scheme != "https" && scheme != "http") {
@@ -1546,13 +1575,13 @@ func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*res
// "All HTTP/2 requests MUST include exactly one valid
// value for the :method, :scheme, and :path
// pseudo-header fields"
- return nil, nil, StreamError{f.StreamID, ErrCodeProtocol}
+ return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
}
bodyOpen := !f.StreamEnded()
if method == "HEAD" && bodyOpen {
// HEAD requests can't have bodies
- return nil, nil, StreamError{f.StreamID, ErrCodeProtocol}
+ return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
}
var tlsState *tls.ConnectionState // nil if not scheme https
@@ -1610,7 +1639,7 @@ func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*res
var err error
url_, err = url.ParseRequestURI(path)
if err != nil {
- return nil, nil, StreamError{f.StreamID, ErrCodeProtocol}
+ return nil, nil, streamError(f.StreamID, ErrCodeProtocol)
}
requestURI = path
}
diff --git a/vendor/golang.org/x/net/http2/transport.go b/vendor/golang.org/x/net/http2/transport.go
index de3f5fe..bcd27ae 100644
--- a/vendor/golang.org/x/net/http2/transport.go
+++ b/vendor/golang.org/x/net/http2/transport.go
@@ -16,6 +16,7 @@ import (
"io"
"io/ioutil"
"log"
+ "math"
"net"
"net/http"
"sort"
@@ -148,27 +149,28 @@ type ClientConn struct {
readerDone chan struct{} // closed on error
readerErr error // set before readerDone is closed
- mu sync.Mutex // guards following
- cond *sync.Cond // hold mu; broadcast on flow/closed changes
- flow flow // our conn-level flow control quota (cs.flow is per stream)
- inflow flow // peer's conn-level flow control
- closed bool
- goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
- goAwayDebug string // goAway frame's debug data, retained as a string
- streams map[uint32]*clientStream // client-initiated
- nextStreamID uint32
- bw *bufio.Writer
- br *bufio.Reader
- fr *Framer
- lastActive time.Time
-
- // Settings from peer:
+ mu sync.Mutex // guards following
+ cond *sync.Cond // hold mu; broadcast on flow/closed changes
+ flow flow // our conn-level flow control quota (cs.flow is per stream)
+ inflow flow // peer's conn-level flow control
+ closed bool
+ wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
+ goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
+ goAwayDebug string // goAway frame's debug data, retained as a string
+ streams map[uint32]*clientStream // client-initiated
+ nextStreamID uint32
+ bw *bufio.Writer
+ br *bufio.Reader
+ fr *Framer
+ lastActive time.Time
+ // Settings from peer: (also guarded by mu)
maxFrameSize uint32
maxConcurrentStreams uint32
initialWindowSize uint32
- hbuf bytes.Buffer // HPACK encoder writes into this
- henc *hpack.Encoder
- freeBuf [][]byte
+
+ hbuf bytes.Buffer // HPACK encoder writes into this
+ henc *hpack.Encoder
+ freeBuf [][]byte
wmu sync.Mutex // held while writing; acquire AFTER mu if holding both
werr error // first write error that has occurred
@@ -339,7 +341,7 @@ func shouldRetryRequest(req *http.Request, err error) bool {
return err == errClientConnUnusable
}
-func (t *Transport) dialClientConn(addr string) (*ClientConn, error) {
+func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) {
host, _, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
@@ -348,7 +350,7 @@ func (t *Transport) dialClientConn(addr string) (*ClientConn, error) {
if err != nil {
return nil, err
}
- return t.NewClientConn(tconn)
+ return t.newClientConn(tconn, singleUse)
}
func (t *Transport) newTLSConfig(host string) *tls.Config {
@@ -409,14 +411,10 @@ func (t *Transport) expectContinueTimeout() time.Duration {
}
func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
- if VerboseLogs {
- t.vlogf("http2: Transport creating client conn to %v", c.RemoteAddr())
- }
- if _, err := c.Write(clientPreface); err != nil {
- t.vlogf("client preface write error: %v", err)
- return nil, err
- }
+ return t.newClientConn(c, false)
+}
+func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
cc := &ClientConn{
t: t,
tconn: c,
@@ -426,7 +424,13 @@ func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
initialWindowSize: 65535, // spec default
maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
streams: make(map[uint32]*clientStream),
+ singleUse: singleUse,
+ wantSettingsAck: true,
+ }
+ if VerboseLogs {
+ t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
}
+
cc.cond = sync.NewCond(&cc.mu)
cc.flow.add(int32(initialWindowSize))
@@ -454,6 +458,8 @@ func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
if max := t.maxHeaderListSize(); max != 0 {
initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
}
+
+ cc.bw.Write(clientPreface)
cc.fr.WriteSettings(initialSettings...)
cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow)
cc.inflow.add(transportDefaultConnFlow + initialWindowSize)
@@ -462,33 +468,6 @@ func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
return nil, cc.werr
}
- // Read the obligatory SETTINGS frame
- f, err := cc.fr.ReadFrame()
- if err != nil {
- return nil, err
- }
- sf, ok := f.(*SettingsFrame)
- if !ok {
- return nil, fmt.Errorf("expected settings frame, got: %T", f)
- }
- cc.fr.WriteSettingsAck()
- cc.bw.Flush()
-
- sf.ForeachSetting(func(s Setting) error {
- switch s.ID {
- case SettingMaxFrameSize:
- cc.maxFrameSize = s.Val
- case SettingMaxConcurrentStreams:
- cc.maxConcurrentStreams = s.Val
- case SettingInitialWindowSize:
- cc.initialWindowSize = s.Val
- default:
- // TODO(bradfitz): handle more; at least SETTINGS_HEADER_TABLE_SIZE?
- t.vlogf("Unhandled Setting: %v", s)
- }
- return nil
- })
-
go cc.readLoop()
return cc, nil
}
@@ -521,7 +500,7 @@ func (cc *ClientConn) canTakeNewRequestLocked() bool {
}
return cc.goAway == nil && !cc.closed &&
int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
- cc.nextStreamID < 2147483647
+ cc.nextStreamID < math.MaxInt32
}
func (cc *ClientConn) closeIfIdle() {
@@ -531,9 +510,13 @@ func (cc *ClientConn) closeIfIdle() {
return
}
cc.closed = true
+ nextID := cc.nextStreamID
// TODO: do clients send GOAWAY too? maybe? Just Close:
cc.mu.Unlock()
+ if VerboseLogs {
+ cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
+ }
cc.tconn.Close()
}
@@ -639,15 +622,18 @@ func bodyAndLength(req *http.Request) (body io.Reader, contentLen int64) {
// We have a body but a zero content length. Test to see if
// it's actually zero or just unset.
var buf [1]byte
- n, rerr := io.ReadFull(body, buf[:])
+ n, rerr := body.Read(buf[:])
if rerr != nil && rerr != io.EOF {
return errorReader{rerr}, -1
}
if n == 1 {
// Oh, guess there is data in this Body Reader after all.
// The ContentLength field just wasn't set.
- // Stich the Body back together again, re-attaching our
+ // Stitch the Body back together again, re-attaching our
// consumed byte.
+ if rerr == io.EOF {
+ return bytes.NewReader(buf[:]), 1
+ }
return io.MultiReader(bytes.NewReader(buf[:]), body), -1
}
// Body is actually zero bytes.
@@ -747,30 +733,34 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
bodyWritten := false
ctx := reqContext(req)
+ handleReadLoopResponse := func(re resAndError) (*http.Response, error) {
+ res := re.res
+ if re.err != nil || res.StatusCode > 299 {
+ // On error or status code 3xx, 4xx, 5xx, etc abort any
+ // ongoing write, assuming that the server doesn't care
+ // about our request body. If the server replied with 1xx or
+ // 2xx, however, then assume the server DOES potentially
+ // want our body (e.g. full-duplex streaming:
+ // golang.org/issue/13444). If it turns out the server
+ // doesn't, they'll RST_STREAM us soon enough. This is a
+ // heuristic to avoid adding knobs to Transport. Hopefully
+ // we can keep it.
+ bodyWriter.cancel()
+ cs.abortRequestBodyWrite(errStopReqBodyWrite)
+ }
+ if re.err != nil {
+ cc.forgetStreamID(cs.ID)
+ return nil, re.err
+ }
+ res.Request = req
+ res.TLS = cc.tlsState
+ return res, nil
+ }
+
for {
select {
case re := <-readLoopResCh:
- res := re.res
- if re.err != nil || res.StatusCode > 299 {
- // On error or status code 3xx, 4xx, 5xx, etc abort any
- // ongoing write, assuming that the server doesn't care
- // about our request body. If the server replied with 1xx or
- // 2xx, however, then assume the server DOES potentially
- // want our body (e.g. full-duplex streaming:
- // golang.org/issue/13444). If it turns out the server
- // doesn't, they'll RST_STREAM us soon enough. This is a
- // heuristic to avoid adding knobs to Transport. Hopefully
- // we can keep it.
- bodyWriter.cancel()
- cs.abortRequestBodyWrite(errStopReqBodyWrite)
- }
- if re.err != nil {
- cc.forgetStreamID(cs.ID)
- return nil, re.err
- }
- res.Request = req
- res.TLS = cc.tlsState
- return res, nil
+ return handleReadLoopResponse(re)
case <-respHeaderTimer:
cc.forgetStreamID(cs.ID)
if !hasBody || bodyWritten {
@@ -804,6 +794,12 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
// forgetStreamID.
return nil, cs.resetErr
case err := <-bodyWriter.resc:
+ // Prefer the read loop's response, if available. Issue 16102.
+ select {
+ case re := <-readLoopResCh:
+ return handleReadLoopResponse(re)
+ default:
+ }
if err != nil {
return nil, err
}
@@ -908,10 +904,11 @@ func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (
err = cc.fr.WriteData(cs.ID, sentEnd, data)
if err == nil {
// TODO(bradfitz): this flush is for latency, not bandwidth.
- // Most requests won't need this. Make this opt-in or opt-out?
- // Use some heuristic on the body type? Nagel-like timers?
- // Based on 'n'? Only last chunk of this for loop, unless flow control
- // tokens are low? For now, always:
+ // Most requests won't need this. Make this opt-in or
+ // opt-out? Use some heuristic on the body type? Nagel-like
+ // timers? Based on 'n'? Only last chunk of this for loop,
+ // unless flow control tokens are low? For now, always.
+ // If we change this, see comment below.
err = cc.bw.Flush()
}
cc.wmu.Unlock()
@@ -921,28 +918,33 @@ func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (
}
}
+ if sentEnd {
+ // Already sent END_STREAM (which implies we have no
+ // trailers) and flushed, because currently all
+ // WriteData frames above get a flush. So we're done.
+ return nil
+ }
+
+ var trls []byte
+ if hasTrailers {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ trls = cc.encodeTrailers(req)
+ }
+
cc.wmu.Lock()
- if !sentEnd {
- var trls []byte
- if hasTrailers {
- cc.mu.Lock()
- trls = cc.encodeTrailers(req)
- cc.mu.Unlock()
- }
+ defer cc.wmu.Unlock()
- // Avoid forgetting to send an END_STREAM if the encoded
- // trailers are 0 bytes. Both results produce and END_STREAM.
- if len(trls) > 0 {
- err = cc.writeHeaders(cs.ID, true, trls)
- } else {
- err = cc.fr.WriteData(cs.ID, true, nil)
- }
+ // Two ways to send END_STREAM: either with trailers, or
+ // with an empty DATA frame.
+ if len(trls) > 0 {
+ err = cc.writeHeaders(cs.ID, true, trls)
+ } else {
+ err = cc.fr.WriteData(cs.ID, true, nil)
}
if ferr := cc.bw.Flush(); ferr != nil && err == nil {
err = ferr
}
- cc.wmu.Unlock()
-
return err
}
@@ -996,6 +998,22 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
host = req.URL.Host
}
+ var path string
+ if req.Method != "CONNECT" {
+ path = req.URL.RequestURI()
+ if !validPseudoPath(path) {
+ orig := path
+ path = strings.TrimPrefix(path, req.URL.Scheme+"://"+host)
+ if !validPseudoPath(path) {
+ if req.URL.Opaque != "" {
+ return nil, fmt.Errorf("invalid request :path %q from URL.Opaque = %q", orig, req.URL.Opaque)
+ } else {
+ return nil, fmt.Errorf("invalid request :path %q", orig)
+ }
+ }
+ }
+ }
+
// Check for any invalid headers and return an error before we
// potentially pollute our hpack state. (We want to be able to
// continue to reuse the hpack encoder for future requests)
@@ -1018,7 +1036,7 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
cc.writeHeader(":authority", host)
cc.writeHeader(":method", req.Method)
if req.Method != "CONNECT" {
- cc.writeHeader(":path", req.URL.RequestURI())
+ cc.writeHeader(":path", path)
cc.writeHeader(":scheme", "https")
}
if trailers != "" {
@@ -1188,6 +1206,14 @@ func (e GoAwayError) Error() string {
e.LastStreamID, e.ErrCode, e.DebugData)
}
+func isEOFOrNetReadError(err error) bool {
+ if err == io.EOF {
+ return true
+ }
+ ne, ok := err.(*net.OpError)
+ return ok && ne.Op == "read"
+}
+
func (rl *clientConnReadLoop) cleanup() {
cc := rl.cc
defer cc.tconn.Close()
@@ -1199,16 +1225,14 @@ func (rl *clientConnReadLoop) cleanup() {
// gotten a response yet.
err := cc.readerErr
cc.mu.Lock()
- if err == io.EOF {
- if cc.goAway != nil {
- err = GoAwayError{
- LastStreamID: cc.goAway.LastStreamID,
- ErrCode: cc.goAway.ErrCode,
- DebugData: cc.goAwayDebug,
- }
- } else {
- err = io.ErrUnexpectedEOF
+ if cc.goAway != nil && isEOFOrNetReadError(err) {
+ err = GoAwayError{
+ LastStreamID: cc.goAway.LastStreamID,
+ ErrCode: cc.goAway.ErrCode,
+ DebugData: cc.goAwayDebug,
}
+ } else if err == io.EOF {
+ err = io.ErrUnexpectedEOF
}
for _, cs := range rl.activeRes {
cs.bufPipe.CloseWithError(err)
@@ -1228,15 +1252,20 @@ func (rl *clientConnReadLoop) cleanup() {
func (rl *clientConnReadLoop) run() error {
cc := rl.cc
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
- gotReply := false // ever saw a reply
+ gotReply := false // ever saw a HEADERS reply
+ gotSettings := false
for {
f, err := cc.fr.ReadFrame()
if err != nil {
- cc.vlogf("Transport readFrame error: (%T) %v", err, err)
+ cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
if se, ok := err.(StreamError); ok {
if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil {
- rl.endStreamError(cs, cc.fr.errDetail)
+ cs.cc.writeStreamReset(cs.ID, se.Code, err)
+ if se.Cause == nil {
+ se.Cause = cc.fr.errDetail
+ }
+ rl.endStreamError(cs, se)
}
continue
} else if err != nil {
@@ -1245,6 +1274,13 @@ func (rl *clientConnReadLoop) run() error {
if VerboseLogs {
cc.vlogf("http2: Transport received %s", summarizeFrame(f))
}
+ if !gotSettings {
+ if _, ok := f.(*SettingsFrame); !ok {
+ cc.logf("protocol error: received %T before a SETTINGS frame", f)
+ return ConnectionError(ErrCodeProtocol)
+ }
+ gotSettings = true
+ }
maybeIdle := false // whether frame might transition us to idle
switch f := f.(type) {
@@ -1273,6 +1309,9 @@ func (rl *clientConnReadLoop) run() error {
cc.logf("Transport: unhandled response frame type %T", f)
}
if err != nil {
+ if VerboseLogs {
+ cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
+ }
return err
}
if rl.closeWhenIdle && gotReply && maybeIdle && len(rl.activeRes) == 0 {
@@ -1522,10 +1561,27 @@ var errClosedResponseBody = errors.New("http2: response body closed")
func (b transportResponseBody) Close() error {
cs := b.cs
- if cs.bufPipe.Err() != io.EOF {
- // TODO: write test for this
- cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
+ cc := cs.cc
+
+ serverSentStreamEnd := cs.bufPipe.Err() == io.EOF
+ unread := cs.bufPipe.Len()
+
+ if unread > 0 || !serverSentStreamEnd {
+ cc.mu.Lock()
+ cc.wmu.Lock()
+ if !serverSentStreamEnd {
+ cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel)
+ }
+ // Return connection-level flow control.
+ if unread > 0 {
+ cc.inflow.add(int32(unread))
+ cc.fr.WriteWindowUpdate(0, uint32(unread))
+ }
+ cc.bw.Flush()
+ cc.wmu.Unlock()
+ cc.mu.Unlock()
}
+
cs.bufPipe.BreakWithError(errClosedResponseBody)
return nil
}
@@ -1533,6 +1589,7 @@ func (b transportResponseBody) Close() error {
func (rl *clientConnReadLoop) processData(f *DataFrame) error {
cc := rl.cc
cs := cc.streamByID(f.StreamID, f.StreamEnded())
+ data := f.Data()
if cs == nil {
cc.mu.Lock()
neverSent := cc.nextStreamID
@@ -1546,10 +1603,22 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
// TODO: be stricter here? only silently ignore things which
// we canceled, but not things which were closed normally
// by the peer? Tough without accumulating too much state.
+
+ // But at least return their flow control:
+ if f.Length > 0 {
+ cc.mu.Lock()
+ cc.inflow.add(int32(f.Length))
+ cc.mu.Unlock()
+
+ cc.wmu.Lock()
+ cc.fr.WriteWindowUpdate(0, uint32(f.Length))
+ cc.bw.Flush()
+ cc.wmu.Unlock()
+ }
return nil
}
- if data := f.Data(); len(data) > 0 {
- if cs.bufPipe.b == nil {
+ if f.Length > 0 {
+ if len(data) > 0 && cs.bufPipe.b == nil {
// Data frame after it's already closed?
cc.logf("http2: Transport received DATA frame for closed stream; closing connection")
return ConnectionError(ErrCodeProtocol)
@@ -1557,17 +1626,30 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
// Check connection-level flow control.
cc.mu.Lock()
- if cs.inflow.available() >= int32(len(data)) {
- cs.inflow.take(int32(len(data)))
+ if cs.inflow.available() >= int32(f.Length) {
+ cs.inflow.take(int32(f.Length))
} else {
cc.mu.Unlock()
return ConnectionError(ErrCodeFlowControl)
}
+ // Return any padded flow control now, since we won't
+ // refund it later on body reads.
+ if pad := int32(f.Length) - int32(len(data)); pad > 0 {
+ cs.inflow.add(pad)
+ cc.inflow.add(pad)
+ cc.wmu.Lock()
+ cc.fr.WriteWindowUpdate(0, uint32(pad))
+ cc.fr.WriteWindowUpdate(cs.ID, uint32(pad))
+ cc.bw.Flush()
+ cc.wmu.Unlock()
+ }
cc.mu.Unlock()
- if _, err := cs.bufPipe.Write(data); err != nil {
- rl.endStreamError(cs, err)
- return err
+ if len(data) > 0 {
+ if _, err := cs.bufPipe.Write(data); err != nil {
+ rl.endStreamError(cs, err)
+ return err
+ }
}
}
@@ -1596,6 +1678,11 @@ func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
if isConnectionCloseRequest(cs.req) {
rl.closeWhenIdle = true
}
+
+ select {
+ case cs.resc <- resAndError{err: err}:
+ default:
+ }
}
func (cs *clientStream) copyTrailers() {
@@ -1623,18 +1710,39 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
cc := rl.cc
cc.mu.Lock()
defer cc.mu.Unlock()
- return f.ForeachSetting(func(s Setting) error {
+
+ if f.IsAck() {
+ if cc.wantSettingsAck {
+ cc.wantSettingsAck = false
+ return nil
+ }
+ return ConnectionError(ErrCodeProtocol)
+ }
+
+ err := f.ForeachSetting(func(s Setting) error {
switch s.ID {
case SettingMaxFrameSize:
cc.maxFrameSize = s.Val
case SettingMaxConcurrentStreams:
cc.maxConcurrentStreams = s.Val
case SettingInitialWindowSize:
- // TODO: error if this is too large.
+ // Values above the maximum flow-control
+ // window size of 2^31-1 MUST be treated as a
+ // connection error (Section 5.4.1) of type
+ // FLOW_CONTROL_ERROR.
+ if s.Val > math.MaxInt32 {
+ return ConnectionError(ErrCodeFlowControl)
+ }
- // TODO: adjust flow control of still-open
+ // Adjust flow control of currently-open
// frames by the difference of the old initial
// window size and this one.
+ delta := int32(s.Val) - int32(cc.initialWindowSize)
+ for _, cs := range cc.streams {
+ cs.flow.add(delta)
+ }
+ cc.cond.Broadcast()
+
cc.initialWindowSize = s.Val
default:
// TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
@@ -1642,6 +1750,16 @@ func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
}
return nil
})
+ if err != nil {
+ return err
+ }
+
+ cc.wmu.Lock()
+ defer cc.wmu.Unlock()
+
+ cc.fr.WriteSettingsAck()
+ cc.bw.Flush()
+ return cc.werr
}
func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
@@ -1678,7 +1796,7 @@ func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
// which closes this, so there
// isn't a race.
default:
- err := StreamError{cs.ID, f.ErrCode}
+ err := streamError(cs.ID, f.ErrCode)
cs.resetErr = err
close(cs.peerReset)
cs.bufPipe.CloseWithError(err)
@@ -1715,8 +1833,10 @@ func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
}
func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
- // TODO: do something with err? send it as a debug frame to the peer?
- // But that's only in GOAWAY. Invent a new frame type? Is there one already?
+ // TODO: map err to more interesting error codes, once the
+ // HTTP community comes up with some. But currently for
+ // RST_STREAM there's no equivalent to GOAWAY frame's debug
+ // data, and the error codes are all pretty vague ("cancel").
cc.wmu.Lock()
cc.fr.WriteRSTStream(streamID, code)
cc.bw.Flush()