aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/transport/transport.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/transport/transport.go')
-rw-r--r--vendor/google.golang.org/grpc/transport/transport.go384
1 files changed, 267 insertions, 117 deletions
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go
index b62f702..2cf9bd3 100644
--- a/vendor/google.golang.org/grpc/transport/transport.go
+++ b/vendor/google.golang.org/grpc/transport/transport.go
@@ -1,48 +1,32 @@
/*
*
- * Copyright 2014, Google Inc.
- * All rights reserved.
+ * Copyright 2014 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
-/*
-Package transport defines and implements message oriented communication channel
-to complete various transactions (e.g., an RPC).
-*/
+// Package transport defines and implements message oriented communication
+// channel to complete various transactions (e.g., an RPC).
package transport // import "google.golang.org/grpc/transport"
import (
- "bytes"
+ stdctx "context"
"fmt"
"io"
"net"
"sync"
+ "time"
"golang.org/x/net/context"
"golang.org/x/net/http2"
@@ -65,57 +49,56 @@ type recvMsg struct {
err error
}
-func (*recvMsg) item() {}
-
-// All items in an out of a recvBuffer should be the same type.
-type item interface {
- item()
-}
-
-// recvBuffer is an unbounded channel of item.
+// recvBuffer is an unbounded channel of recvMsg structs.
+// Note recvBuffer differs from controlBuffer only in that recvBuffer
+// holds a channel of only recvMsg structs instead of objects implementing "item" interface.
+// recvBuffer is written to much more often than
+// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
type recvBuffer struct {
- c chan item
+ c chan recvMsg
mu sync.Mutex
- backlog []item
+ backlog []recvMsg
}
func newRecvBuffer() *recvBuffer {
b := &recvBuffer{
- c: make(chan item, 1),
+ c: make(chan recvMsg, 1),
}
return b
}
-func (b *recvBuffer) put(r item) {
+func (b *recvBuffer) put(r recvMsg) {
b.mu.Lock()
- defer b.mu.Unlock()
if len(b.backlog) == 0 {
select {
case b.c <- r:
+ b.mu.Unlock()
return
default:
}
}
b.backlog = append(b.backlog, r)
+ b.mu.Unlock()
}
func (b *recvBuffer) load() {
b.mu.Lock()
- defer b.mu.Unlock()
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
+ b.backlog[0] = recvMsg{}
b.backlog = b.backlog[1:]
default:
}
}
+ b.mu.Unlock()
}
-// get returns the channel that receives an item in the buffer.
+// get returns the channel that receives a recvMsg in the buffer.
//
-// Upon receipt of an item, the caller should call load to send another
-// item onto the channel if there is any.
-func (b *recvBuffer) get() <-chan item {
+// Upon receipt of a recvMsg, the caller should call load to send another
+// recvMsg onto the channel if there is any.
+func (b *recvBuffer) get() <-chan recvMsg {
return b.c
}
@@ -125,7 +108,7 @@ type recvBufferReader struct {
ctx context.Context
goAway chan struct{}
recv *recvBuffer
- last *bytes.Reader // Stores the remaining data in the previous calls.
+ last []byte // Stores the remaining data in the previous calls.
err error
}
@@ -136,27 +119,87 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) {
if r.err != nil {
return 0, r.err
}
- defer func() { r.err = err }()
- if r.last != nil && r.last.Len() > 0 {
+ n, r.err = r.read(p)
+ return n, r.err
+}
+
+func (r *recvBufferReader) read(p []byte) (n int, err error) {
+ if r.last != nil && len(r.last) > 0 {
// Read remaining data left in last call.
- return r.last.Read(p)
+ copied := copy(p, r.last)
+ r.last = r.last[copied:]
+ return copied, nil
}
select {
case <-r.ctx.Done():
return 0, ContextErr(r.ctx.Err())
case <-r.goAway:
return 0, ErrStreamDrain
- case i := <-r.recv.get():
+ case m := <-r.recv.get():
r.recv.load()
- m := i.(*recvMsg)
if m.err != nil {
return 0, m.err
}
- r.last = bytes.NewReader(m.data)
- return r.last.Read(p)
+ copied := copy(p, m.data)
+ r.last = m.data[copied:]
+ return copied, nil
}
}
+// All items in an out of a controlBuffer should be the same type.
+type item interface {
+ item()
+}
+
+// controlBuffer is an unbounded channel of item.
+type controlBuffer struct {
+ c chan item
+ mu sync.Mutex
+ backlog []item
+}
+
+func newControlBuffer() *controlBuffer {
+ b := &controlBuffer{
+ c: make(chan item, 1),
+ }
+ return b
+}
+
+func (b *controlBuffer) put(r item) {
+ b.mu.Lock()
+ if len(b.backlog) == 0 {
+ select {
+ case b.c <- r:
+ b.mu.Unlock()
+ return
+ default:
+ }
+ }
+ b.backlog = append(b.backlog, r)
+ b.mu.Unlock()
+}
+
+func (b *controlBuffer) load() {
+ b.mu.Lock()
+ if len(b.backlog) > 0 {
+ select {
+ case b.c <- b.backlog[0]:
+ b.backlog[0] = nil
+ b.backlog = b.backlog[1:]
+ default:
+ }
+ }
+ b.mu.Unlock()
+}
+
+// get returns the channel that receives an item in the buffer.
+//
+// Upon receipt of an item, the caller should call load to send another
+// item onto the channel if there is any.
+func (b *controlBuffer) get() <-chan item {
+ return b.c
+}
+
type streamState uint8
const (
@@ -171,11 +214,6 @@ type Stream struct {
id uint32
// nil for client side Stream.
st ServerTransport
- // clientStatsCtx keeps the user context for stats handling.
- // It's only valid on client side. Server side stats context is same as s.ctx.
- // All client side stats collection should use the clientStatsCtx (instead of the stream context)
- // so that all the generated stats for a particular RPC can be associated in the processing phase.
- clientStatsCtx context.Context
// ctx is the associated context of the stream.
ctx context.Context
// cancel is always nil for client side Stream.
@@ -189,16 +227,20 @@ type Stream struct {
recvCompress string
sendCompress string
buf *recvBuffer
- dec io.Reader
+ trReader io.Reader
fc *inFlow
recvQuota uint32
+
+ // TODO: Remote this unused variable.
// The accumulated inbound quota pending for window update.
updateQuota uint32
- // The handler to control the window update procedure for both this
- // particular stream and the associated transport.
- windowHandler func(int)
- sendQuotaPool *quotaPool
+ // Callback to state application's intentions to read data. This
+ // is used to adjust flow control, if need be.
+ requestRead func(int)
+
+ sendQuotaPool *quotaPool
+ localSendQuota *quotaPool
// Close headerChan to indicate the end of reception of header metadata.
headerChan chan struct{}
// header caches the received header metadata.
@@ -220,6 +262,10 @@ type Stream struct {
rstStream bool
// rstError is the error that needs to be sent along with the RST_STREAM frame.
rstError http2.ErrCode
+ // bytesSent and bytesReceived indicates whether any bytes have been sent or
+ // received on this stream.
+ bytesSent bool
+ bytesReceived bool
}
// RecvCompress returns the compression algorithm applied to the inbound
@@ -247,16 +293,24 @@ func (s *Stream) GoAway() <-chan struct{} {
// Header acquires the key-value pairs of header metadata once it
// is available. It blocks until i) the metadata is ready or ii) there is no
-// header metadata or iii) the stream is cancelled/expired.
+// header metadata or iii) the stream is canceled/expired.
func (s *Stream) Header() (metadata.MD, error) {
+ var err error
select {
case <-s.ctx.Done():
- return nil, ContextErr(s.ctx.Err())
+ err = ContextErr(s.ctx.Err())
case <-s.goAway:
- return nil, ErrStreamDrain
+ err = ErrStreamDrain
+ case <-s.headerChan:
+ return s.header.Copy(), nil
+ }
+ // Even if the stream is closed, header is returned if available.
+ select {
case <-s.headerChan:
return s.header.Copy(), nil
+ default:
}
+ return nil, err
}
// Trailer returns the cached trailer metedata. Note that if it is not called
@@ -264,8 +318,9 @@ func (s *Stream) Header() (metadata.MD, error) {
// side only.
func (s *Stream) Trailer() metadata.MD {
s.mu.RLock()
- defer s.mu.RUnlock()
- return s.trailer.Copy()
+ c := s.trailer.Copy()
+ s.mu.RUnlock()
+ return c
}
// ServerTransport returns the underlying ServerTransport for the stream.
@@ -293,14 +348,16 @@ func (s *Stream) Status() *status.Status {
// Server side only.
func (s *Stream) SetHeader(md metadata.MD) error {
s.mu.Lock()
- defer s.mu.Unlock()
if s.headerOk || s.state == streamDone {
+ s.mu.Unlock()
return ErrIllegalHeaderWrite
}
if md.Len() == 0 {
+ s.mu.Unlock()
return nil
}
s.header = metadata.Join(s.header, md)
+ s.mu.Unlock()
return nil
}
@@ -311,25 +368,44 @@ func (s *Stream) SetTrailer(md metadata.MD) error {
return nil
}
s.mu.Lock()
- defer s.mu.Unlock()
s.trailer = metadata.Join(s.trailer, md)
+ s.mu.Unlock()
return nil
}
func (s *Stream) write(m recvMsg) {
- s.buf.put(&m)
+ s.buf.put(m)
}
-// Read reads all the data available for this Stream from the transport and
+// Read reads all p bytes from the wire for this stream.
+func (s *Stream) Read(p []byte) (n int, err error) {
+ // Don't request a read if there was an error earlier
+ if er := s.trReader.(*transportReader).er; er != nil {
+ return 0, er
+ }
+ s.requestRead(len(p))
+ return io.ReadFull(s.trReader, p)
+}
+
+// tranportReader reads all the data available for this Stream from the transport and
// passes them into the decoder, which converts them into a gRPC message stream.
// The error is io.EOF when the stream is done or another non-nil error if
// the stream broke.
-func (s *Stream) Read(p []byte) (n int, err error) {
- n, err = s.dec.Read(p)
+type transportReader struct {
+ reader io.Reader
+ // The handler to control the window update procedure for both this
+ // particular stream and the associated transport.
+ windowHandler func(int)
+ er error
+}
+
+func (t *transportReader) Read(p []byte) (n int, err error) {
+ n, err = t.reader.Read(p)
if err != nil {
+ t.er = err
return
}
- s.windowHandler(n)
+ t.windowHandler(n)
return
}
@@ -341,6 +417,22 @@ func (s *Stream) finish(st *status.Status) {
close(s.done)
}
+// BytesSent indicates whether any bytes have been sent on this stream.
+func (s *Stream) BytesSent() bool {
+ s.mu.Lock()
+ bs := s.bytesSent
+ s.mu.Unlock()
+ return bs
+}
+
+// BytesReceived indicates whether any bytes have been received on this stream.
+func (s *Stream) BytesReceived() bool {
+ s.mu.Lock()
+ br := s.bytesReceived
+ s.mu.Unlock()
+ return br
+}
+
// GoString is implemented by Stream so context.String() won't
// race when printing %#v.
func (s *Stream) GoString() string {
@@ -367,19 +459,22 @@ type transportState int
const (
reachable transportState = iota
- unreachable
closing
draining
)
// ServerConfig consists of all the configurations to establish a server transport.
type ServerConfig struct {
- MaxStreams uint32
- AuthInfo credentials.AuthInfo
- InTapHandle tap.ServerInHandle
- StatsHandler stats.Handler
- KeepaliveParams keepalive.ServerParameters
- KeepalivePolicy keepalive.EnforcementPolicy
+ MaxStreams uint32
+ AuthInfo credentials.AuthInfo
+ InTapHandle tap.ServerInHandle
+ StatsHandler stats.Handler
+ KeepaliveParams keepalive.ServerParameters
+ KeepalivePolicy keepalive.EnforcementPolicy
+ InitialWindowSize int32
+ InitialConnWindowSize int32
+ WriteBufferSize int
+ ReadBufferSize int
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
@@ -407,6 +502,14 @@ type ConnectOptions struct {
KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
+ // InitialWindowSize sets the initial window size for a stream.
+ InitialWindowSize int32
+ // InitialConnWindowSize sets the initial window size for a connection.
+ InitialConnWindowSize int32
+ // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
+ WriteBufferSize int
+ // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
+ ReadBufferSize int
}
// TargetInfo contains the information of the target such as network address and metadata.
@@ -417,8 +520,8 @@ type TargetInfo struct {
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
-func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions) (ClientTransport, error) {
- return newHTTP2Client(ctx, target, opts)
+func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions, timeout time.Duration) (ClientTransport, error) {
+ return newHTTP2Client(ctx, target, opts, timeout)
}
// Options provides additional hints and information for message
@@ -430,7 +533,7 @@ type Options struct {
// Delay is a hint to the transport implementation for whether
// the data could be buffered for a batching write. The
- // Transport implementation may ignore the hint.
+ // transport implementation may ignore the hint.
Delay bool
}
@@ -450,10 +553,15 @@ type CallHdr struct {
// outbound message.
SendCompress string
+ // Creds specifies credentials.PerRPCCredentials for a call.
+ Creds credentials.PerRPCCredentials
+
// Flush indicates whether a new stream command should be sent
// to the peer without waiting for the first data. This is
- // only a hint. The transport may modify the flush decision
+ // only a hint.
+ // If it's true, the transport may modify the flush decision
// for performance purposes.
+ // If it's false, new stream will never be flushed.
Flush bool
}
@@ -471,7 +579,7 @@ type ClientTransport interface {
// Write sends the data for the given stream. A nil stream indicates
// the write is to be performed on the transport as a whole.
- Write(s *Stream, data []byte, opts *Options) error
+ Write(s *Stream, hdr []byte, data []byte, opts *Options) error
// NewStream creates a Stream for an RPC.
NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
@@ -489,10 +597,13 @@ type ClientTransport interface {
// once the transport is initiated.
Error() <-chan struct{}
- // GoAway returns a channel that is closed when ClientTranspor
+ // GoAway returns a channel that is closed when ClientTransport
// receives the draining signal from the server (e.g., GOAWAY frame in
// HTTP/2).
GoAway() <-chan struct{}
+
+ // GetGoAwayReason returns the reason why GoAway frame was received.
+ GetGoAwayReason() GoAwayReason
}
// ServerTransport is the common interface for all gRPC server-side transport
@@ -510,7 +621,7 @@ type ServerTransport interface {
// Write sends the data for the given stream.
// Write may not be called on all streams.
- Write(s *Stream, data []byte, opts *Options) error
+ Write(s *Stream, hdr []byte, data []byte, opts *Options) error
// WriteStatus sends the status of a stream to the client. WriteStatus is
// the final call made on a stream and always occurs.
@@ -592,41 +703,80 @@ func (e StreamError) Error() string {
return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
}
+// wait blocks until it can receive from one of the provided contexts or
+// channels. ctx is the context of the RPC, tctx is the context of the
+// transport, done is a channel closed to indicate the end of the RPC, goAway
+// is a channel closed to indicate a GOAWAY was received, and proceed is a
+// quota channel, whose received value is returned from this function if none
+// of the other signals occur first.
+func wait(ctx, tctx context.Context, done, goAway <-chan struct{}, proceed <-chan int) (int, error) {
+ select {
+ case <-ctx.Done():
+ return 0, ContextErr(ctx.Err())
+ case <-done:
+ return 0, io.EOF
+ case <-goAway:
+ return 0, ErrStreamDrain
+ case <-tctx.Done():
+ return 0, ErrConnClosing
+ case i := <-proceed:
+ return i, nil
+ }
+}
+
// ContextErr converts the error from context package into a StreamError.
func ContextErr(err error) StreamError {
switch err {
- case context.DeadlineExceeded:
+ case context.DeadlineExceeded, stdctx.DeadlineExceeded:
return streamErrorf(codes.DeadlineExceeded, "%v", err)
- case context.Canceled:
+ case context.Canceled, stdctx.Canceled:
return streamErrorf(codes.Canceled, "%v", err)
}
- panic(fmt.Sprintf("Unexpected error from context packet: %v", err))
+ return streamErrorf(codes.Internal, "Unexpected error from context packet: %v", err)
}
-// wait blocks until it can receive from ctx.Done, closing, or proceed.
-// If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err.
-// If it receives from done, it returns 0, io.EOF if ctx is not done; otherwise
-// it return the StreamError for ctx.Err.
-// If it receives from goAway, it returns 0, ErrStreamDrain.
-// If it receives from closing, it returns 0, ErrConnClosing.
-// If it receives from proceed, it returns the received integer, nil.
-func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-chan int) (int, error) {
- select {
- case <-ctx.Done():
- return 0, ContextErr(ctx.Err())
- case <-done:
- // User cancellation has precedence.
+// GoAwayReason contains the reason for the GoAway frame received.
+type GoAwayReason uint8
+
+const (
+ // Invalid indicates that no GoAway frame is received.
+ Invalid GoAwayReason = 0
+ // NoReason is the default value when GoAway frame is received.
+ NoReason GoAwayReason = 1
+ // TooManyPings indicates that a GoAway frame with ErrCodeEnhanceYourCalm
+ // was received and that the debug data said "too_many_pings".
+ TooManyPings GoAwayReason = 2
+)
+
+// loopyWriter is run in a separate go routine. It is the single code path that will
+// write data on wire.
+func loopyWriter(ctx context.Context, cbuf *controlBuffer, handler func(item) error) {
+ for {
select {
+ case i := <-cbuf.get():
+ cbuf.load()
+ if err := handler(i); err != nil {
+ return
+ }
case <-ctx.Done():
- return 0, ContextErr(ctx.Err())
- default:
+ return
+ }
+ hasData:
+ for {
+ select {
+ case i := <-cbuf.get():
+ cbuf.load()
+ if err := handler(i); err != nil {
+ return
+ }
+ case <-ctx.Done():
+ return
+ default:
+ if err := handler(&flushIO{}); err != nil {
+ return
+ }
+ break hasData
+ }
}
- return 0, io.EOF
- case <-goAway:
- return 0, ErrStreamDrain
- case <-closing:
- return 0, ErrConnClosing
- case i := <-proceed:
- return i, nil
}
}