aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/transport
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/transport')
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_client.go27
-rw-r--r--vendor/google.golang.org/grpc/transport/http_util.go5
-rw-r--r--vendor/google.golang.org/grpc/transport/transport.go15
3 files changed, 22 insertions, 25 deletions
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go
index 4892faa..3c18554 100644
--- a/vendor/google.golang.org/grpc/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/transport/http2_client.go
@@ -252,8 +252,10 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
}
- // Make a stream be able to cancel the pending operations by itself.
- s.ctx, s.cancel = context.WithCancel(ctx)
+ // The client side stream context should have exactly the same life cycle with the user provided context.
+ // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
+ // So we use the original context here instead of creating a copy.
+ s.ctx = ctx
s.dec = &recvBufferReader{
ctx: s.ctx,
goAway: s.goAway,
@@ -265,16 +267,6 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// NewStream creates a stream and register it into the transport as "active"
// streams.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
- // Record the timeout value on the context.
- var timeout time.Duration
- if dl, ok := ctx.Deadline(); ok {
- timeout = dl.Sub(time.Now())
- }
- select {
- case <-ctx.Done():
- return nil, ContextErr(ctx.Err())
- default:
- }
pr := &peer.Peer{
Addr: t.conn.RemoteAddr(),
}
@@ -381,9 +373,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if callHdr.SendCompress != "" {
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
}
- if timeout > 0 {
+ if dl, ok := ctx.Deadline(); ok {
+ // Send out timeout regardless its value. The server can detect timeout context by itself.
+ timeout := dl.Sub(time.Now())
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
}
+
for k, v := range authData {
// Capital header names are illegal in HTTP/2.
k = strings.ToLower(k)
@@ -852,6 +847,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
state.processHeaderField(hf)
}
if state.err != nil {
+ s.mu.Lock()
+ if !s.headerDone {
+ close(s.headerChan)
+ s.headerDone = true
+ }
+ s.mu.Unlock()
s.write(recvMsg{err: state.err})
// Something wrong. Stops reading even when there is remaining.
return
diff --git a/vendor/google.golang.org/grpc/transport/http_util.go b/vendor/google.golang.org/grpc/transport/http_util.go
index b024594..a3c68d4 100644
--- a/vendor/google.golang.org/grpc/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/transport/http_util.go
@@ -253,6 +253,9 @@ func div(d, r time.Duration) int64 {
// TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it.
func encodeTimeout(t time.Duration) string {
+ if t <= 0 {
+ return "0n"
+ }
if d := div(t, time.Nanosecond); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "n"
}
@@ -349,7 +352,7 @@ func decodeGrpcMessageUnchecked(msg string) string {
for i := 0; i < lenMsg; i++ {
c := msg[i]
if c == percentByte && i+2 < lenMsg {
- parsed, err := strconv.ParseInt(msg[i+1:i+3], 16, 8)
+ parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
if err != nil {
buf.WriteByte(c)
} else {
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go
index f4d8daf..3d6b6a6 100644
--- a/vendor/google.golang.org/grpc/transport/transport.go
+++ b/vendor/google.golang.org/grpc/transport/transport.go
@@ -39,7 +39,6 @@ package transport // import "google.golang.org/grpc/transport"
import (
"bytes"
- "errors"
"fmt"
"io"
"net"
@@ -169,7 +168,8 @@ type Stream struct {
// nil for client side Stream.
st ServerTransport
// ctx is the associated context of the stream.
- ctx context.Context
+ ctx context.Context
+ // cancel is always nil for client side Stream.
cancel context.CancelFunc
// done is closed when the final status arrives.
done chan struct{}
@@ -286,19 +286,12 @@ func (s *Stream) StatusDesc() string {
return s.statusDesc
}
-// ErrIllegalTrailerSet indicates that the trailer has already been set or it
-// is too late to do so.
-var ErrIllegalTrailerSet = errors.New("transport: trailer has been set")
-
// SetTrailer sets the trailer metadata which will be sent with the RPC status
-// by the server. This can only be called at most once. Server side only.
+// by the server. This can be called multiple times. Server side only.
func (s *Stream) SetTrailer(md metadata.MD) error {
s.mu.Lock()
defer s.mu.Unlock()
- if s.trailer != nil {
- return ErrIllegalTrailerSet
- }
- s.trailer = md.Copy()
+ s.trailer = metadata.Join(s.trailer, md)
return nil
}