aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/transport/http2_client.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/transport/http2_client.go')
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_client.go27
1 files changed, 14 insertions, 13 deletions
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go
index 4892faa..3c18554 100644
--- a/vendor/google.golang.org/grpc/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/transport/http2_client.go
@@ -252,8 +252,10 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
}
- // Make a stream be able to cancel the pending operations by itself.
- s.ctx, s.cancel = context.WithCancel(ctx)
+ // The client side stream context should have exactly the same life cycle with the user provided context.
+ // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
+ // So we use the original context here instead of creating a copy.
+ s.ctx = ctx
s.dec = &recvBufferReader{
ctx: s.ctx,
goAway: s.goAway,
@@ -265,16 +267,6 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// NewStream creates a stream and register it into the transport as "active"
// streams.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
- // Record the timeout value on the context.
- var timeout time.Duration
- if dl, ok := ctx.Deadline(); ok {
- timeout = dl.Sub(time.Now())
- }
- select {
- case <-ctx.Done():
- return nil, ContextErr(ctx.Err())
- default:
- }
pr := &peer.Peer{
Addr: t.conn.RemoteAddr(),
}
@@ -381,9 +373,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if callHdr.SendCompress != "" {
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
}
- if timeout > 0 {
+ if dl, ok := ctx.Deadline(); ok {
+ // Send out timeout regardless its value. The server can detect timeout context by itself.
+ timeout := dl.Sub(time.Now())
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
}
+
for k, v := range authData {
// Capital header names are illegal in HTTP/2.
k = strings.ToLower(k)
@@ -852,6 +847,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
state.processHeaderField(hf)
}
if state.err != nil {
+ s.mu.Lock()
+ if !s.headerDone {
+ close(s.headerChan)
+ s.headerDone = true
+ }
+ s.mu.Unlock()
s.write(recvMsg{err: state.err})
// Something wrong. Stops reading even when there is remaining.
return