aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/transport
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/transport')
-rw-r--r--vendor/google.golang.org/grpc/transport/control.go34
-rw-r--r--vendor/google.golang.org/grpc/transport/handler_server.go2
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_client.go94
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_server.go87
-rw-r--r--vendor/google.golang.org/grpc/transport/transport.go27
5 files changed, 171 insertions, 73 deletions
diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/control.go
index 4ef0830..2586cba 100644
--- a/vendor/google.golang.org/grpc/transport/control.go
+++ b/vendor/google.golang.org/grpc/transport/control.go
@@ -111,35 +111,9 @@ func newQuotaPool(q int) *quotaPool {
return qb
}
-// add adds n to the available quota and tries to send it on acquire.
-func (qb *quotaPool) add(n int) {
- qb.mu.Lock()
- defer qb.mu.Unlock()
- qb.quota += n
- if qb.quota <= 0 {
- return
- }
- select {
- case qb.c <- qb.quota:
- qb.quota = 0
- default:
- }
-}
-
-// cancel cancels the pending quota sent on acquire, if any.
-func (qb *quotaPool) cancel() {
- qb.mu.Lock()
- defer qb.mu.Unlock()
- select {
- case n := <-qb.c:
- qb.quota += n
- default:
- }
-}
-
-// reset cancels the pending quota sent on acquired, incremented by v and sends
+// add cancels the pending quota sent on acquired, incremented by v and sends
// it back on acquire.
-func (qb *quotaPool) reset(v int) {
+func (qb *quotaPool) add(v int) {
qb.mu.Lock()
defer qb.mu.Unlock()
select {
@@ -151,6 +125,10 @@ func (qb *quotaPool) reset(v int) {
if qb.quota <= 0 {
return
}
+ // After the pool has been created, this is the only place that sends on
+ // the channel. Since mu is held at this point and any quota that was sent
+ // on the channel has been retrieved, we know that this code will always
+ // place any positive quota value on the channel.
select {
case qb.c <- qb.quota:
qb.quota = 0
diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/transport/handler_server.go
index 114e349..10b6dc0 100644
--- a/vendor/google.golang.org/grpc/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/transport/handler_server.go
@@ -268,7 +268,7 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
})
}
-func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
+func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
// With this transport type there will be exactly 1 stream: this HTTP request.
var ctx context.Context
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go
index 2b0f680..605b1e5 100644
--- a/vendor/google.golang.org/grpc/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/transport/http2_client.go
@@ -51,16 +51,20 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
+ "google.golang.org/grpc/stats"
)
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
- target string // server name/addr
- userAgent string
- md interface{}
- conn net.Conn // underlying communication channel
- authInfo credentials.AuthInfo // auth info about the connection
- nextID uint32 // the next stream ID to be used
+ ctx context.Context
+ target string // server name/addr
+ userAgent string
+ md interface{}
+ conn net.Conn // underlying communication channel
+ remoteAddr net.Addr
+ localAddr net.Addr
+ authInfo credentials.AuthInfo // auth info about the connection
+ nextID uint32 // the next stream ID to be used
// writableChan synchronizes write access to the transport.
// A writer acquires the write lock by sending a value on writableChan
@@ -150,6 +154,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
scheme := "http"
conn, err := dial(ctx, opts.Dialer, addr.Addr)
if err != nil {
+ if opts.FailOnNonTempDialError {
+ return nil, connectionErrorf(isTemporary(err), err, "transport: %v", err)
+ }
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Any further errors will close the underlying connection
@@ -175,11 +182,14 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
}
var buf bytes.Buffer
t := &http2Client{
- target: addr.Addr,
- userAgent: ua,
- md: addr.Metadata,
- conn: conn,
- authInfo: authInfo,
+ ctx: ctx,
+ target: addr.Addr,
+ userAgent: ua,
+ md: addr.Metadata,
+ conn: conn,
+ remoteAddr: conn.RemoteAddr(),
+ localAddr: conn.LocalAddr(),
+ authInfo: authInfo,
// The client initiated stream id is odd starting from 1.
nextID: 1,
writableChan: make(chan int, 1),
@@ -199,6 +209,16 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
maxStreams: math.MaxInt32,
streamSendQuota: defaultWindowSize,
}
+ if stats.On() {
+ t.ctx = stats.TagConn(t.ctx, &stats.ConnTagInfo{
+ RemoteAddr: t.remoteAddr,
+ LocalAddr: t.localAddr,
+ })
+ connBegin := &stats.ConnBegin{
+ Client: true,
+ }
+ stats.HandleConn(t.ctx, connBegin)
+ }
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
// dispatches the frame to the corresponding stream entity.
@@ -270,12 +290,13 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// streams.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
pr := &peer.Peer{
- Addr: t.conn.RemoteAddr(),
+ Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
pr.AuthInfo = t.authInfo
}
+ userCtx := ctx
ctx = peer.NewContext(ctx, pr)
authData := make(map[string]string)
for _, c := range t.creds {
@@ -347,6 +368,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, ErrConnClosing
}
s := t.newStream(ctx, callHdr)
+ s.clientStatsCtx = userCtx
t.activeStreams[s.id] = s
// This stream is not counted when applySetings(...) initialize t.streamsQuota.
@@ -357,7 +379,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
t.mu.Unlock()
if reset {
- t.streamsQuota.reset(-1)
+ t.streamsQuota.add(-1)
}
// HPACK encodes various headers. Note that once WriteField(...) is
@@ -413,6 +435,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
}
first := true
+ bufLen := t.hBuf.Len()
// Sends the headers in a single batch even when they span multiple frames.
for !endHeaders {
size := t.hBuf.Len()
@@ -447,6 +470,17 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
+ if stats.On() {
+ outHeader := &stats.OutHeader{
+ Client: true,
+ WireLength: bufLen,
+ FullMethod: callHdr.Method,
+ RemoteAddr: t.remoteAddr,
+ LocalAddr: t.localAddr,
+ Compression: callHdr.SendCompress,
+ }
+ stats.HandleRPC(s.clientStatsCtx, outHeader)
+ }
t.writableChan <- 0
return s, nil
}
@@ -525,6 +559,12 @@ func (t *http2Client) Close() (err error) {
s.mu.Unlock()
s.write(recvMsg{err: ErrConnClosing})
}
+ if stats.On() {
+ connEnd := &stats.ConnEnd{
+ Client: true,
+ }
+ stats.HandleConn(t.ctx, connEnd)
+ }
return
}
@@ -582,19 +622,14 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
var p []byte
if r.Len() > 0 {
size := http2MaxFrameLen
- s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
- t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
- if _, ok := err.(StreamError); ok || err == io.EOF {
- t.sendQuotaPool.cancel()
- }
return err
}
if sq < size {
@@ -874,6 +909,24 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
endStream := frame.StreamEnded()
+ var isHeader bool
+ defer func() {
+ if stats.On() {
+ if isHeader {
+ inHeader := &stats.InHeader{
+ Client: true,
+ WireLength: int(frame.Header().Length),
+ }
+ stats.HandleRPC(s.clientStatsCtx, inHeader)
+ } else {
+ inTrailer := &stats.InTrailer{
+ Client: true,
+ WireLength: int(frame.Header().Length),
+ }
+ stats.HandleRPC(s.clientStatsCtx, inTrailer)
+ }
+ }
+ }()
s.mu.Lock()
if !endStream {
@@ -885,6 +938,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
close(s.headerChan)
s.headerDone = true
+ isHeader = true
}
if !endStream || s.state == streamDone {
s.mu.Unlock()
@@ -994,13 +1048,13 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
t.maxStreams = int(s.Val)
t.mu.Unlock()
if reset {
- t.streamsQuota.reset(int(s.Val) - ms)
+ t.streamsQuota.add(int(s.Val) - ms)
}
case http2.SettingInitialWindowSize:
t.mu.Lock()
for _, stream := range t.activeStreams {
// Adjust the sending quota for each stream.
- stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
+ stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota))
}
t.streamSendQuota = s.Val
t.mu.Unlock()
diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go
index a62fb7c..316188e 100644
--- a/vendor/google.golang.org/grpc/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/transport/http2_server.go
@@ -50,6 +50,8 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
+ "google.golang.org/grpc/stats"
+ "google.golang.org/grpc/tap"
)
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
@@ -58,9 +60,13 @@ var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHe
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
+ ctx context.Context
conn net.Conn
+ remoteAddr net.Addr
+ localAddr net.Addr
maxStreamID uint32 // max stream ID ever seen
authInfo credentials.AuthInfo // auth info about the connection
+ inTapHandle tap.ServerInHandle
// writableChan synchronizes write access to the transport.
// A writer acquires the write lock by receiving a value on writableChan
// and releases it by sending on writableChan.
@@ -91,12 +97,13 @@ type http2Server struct {
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
-func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) {
+func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
framer := newFramer(conn)
// Send initial settings as connection preface to client.
var settings []http2.Setting
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
+ maxStreams := config.MaxStreams
if maxStreams == 0 {
maxStreams = math.MaxUint32
} else {
@@ -121,12 +128,16 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
}
var buf bytes.Buffer
t := &http2Server{
+ ctx: context.Background(),
conn: conn,
- authInfo: authInfo,
+ remoteAddr: conn.RemoteAddr(),
+ localAddr: conn.LocalAddr(),
+ authInfo: config.AuthInfo,
framer: framer,
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
maxStreams: maxStreams,
+ inTapHandle: config.InTapHandle,
controlBuf: newRecvBuffer(),
fc: &inFlow{limit: initialConnWindowSize},
sendQuotaPool: newQuotaPool(defaultWindowSize),
@@ -136,13 +147,21 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
activeStreams: make(map[uint32]*Stream),
streamSendQuota: defaultWindowSize,
}
+ if stats.On() {
+ t.ctx = stats.TagConn(t.ctx, &stats.ConnTagInfo{
+ RemoteAddr: t.remoteAddr,
+ LocalAddr: t.localAddr,
+ })
+ connBegin := &stats.ConnBegin{}
+ stats.HandleConn(t.ctx, connBegin)
+ }
go t.controller()
t.writableChan <- 0
return t, nil
}
// operateHeader takes action on the decoded headers.
-func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
+func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
buf := newRecvBuffer()
s := &Stream{
id: frame.Header().StreamID,
@@ -168,12 +187,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
s.recvCompress = state.encoding
if state.timeoutSet {
- s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
+ s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
} else {
- s.ctx, s.cancel = context.WithCancel(context.TODO())
+ s.ctx, s.cancel = context.WithCancel(t.ctx)
}
pr := &peer.Peer{
- Addr: t.conn.RemoteAddr(),
+ Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
@@ -195,6 +214,18 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
s.recvCompress = state.encoding
s.method = state.method
+ if t.inTapHandle != nil {
+ var err error
+ info := &tap.Info{
+ FullMethodName: state.method,
+ }
+ s.ctx, err = t.inTapHandle(s.ctx, info)
+ if err != nil {
+ // TODO: Log the real error.
+ t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
+ return
+ }
+ }
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
@@ -218,13 +249,26 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
}
+ s.ctx = traceCtx(s.ctx, s.method)
+ if stats.On() {
+ s.ctx = stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
+ inHeader := &stats.InHeader{
+ FullMethod: s.method,
+ RemoteAddr: t.remoteAddr,
+ LocalAddr: t.localAddr,
+ Compression: s.recvCompress,
+ WireLength: int(frame.Header().Length),
+ }
+ stats.HandleRPC(s.ctx, inHeader)
+ }
handle(s)
return
}
// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
-func (t *http2Server) HandleStreams(handle func(*Stream)) {
+// traceCtx attaches trace to ctx and returns the new context.
+func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
// Check the validity of client preface.
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(t.conn, preface); err != nil {
@@ -279,7 +323,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
- if t.operateHeaders(frame, handle) {
+ if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
break
}
@@ -492,9 +536,16 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
}
}
+ bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, false); err != nil {
return err
}
+ if stats.On() {
+ outHeader := &stats.OutHeader{
+ WireLength: bufLen,
+ }
+ stats.HandleRPC(s.Context(), outHeader)
+ }
t.writableChan <- 0
return nil
}
@@ -547,10 +598,17 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
}
}
+ bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, true); err != nil {
t.Close()
return err
}
+ if stats.On() {
+ outTrailer := &stats.OutTrailer{
+ WireLength: bufLen,
+ }
+ stats.HandleRPC(s.Context(), outTrailer)
+ }
t.closeStream(s)
t.writableChan <- 0
return nil
@@ -579,19 +637,14 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
return nil
}
size := http2MaxFrameLen
- s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
- t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
- if _, ok := err.(StreamError); ok {
- t.sendQuotaPool.cancel()
- }
return err
}
if sq < size {
@@ -659,7 +712,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
t.mu.Lock()
defer t.mu.Unlock()
for _, stream := range t.activeStreams {
- stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
+ stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota))
}
t.streamSendQuota = s.Val
}
@@ -736,6 +789,10 @@ func (t *http2Server) Close() (err error) {
for _, s := range streams {
s.cancel()
}
+ if stats.On() {
+ connEnd := &stats.ConnEnd{}
+ stats.HandleConn(t.ctx, connEnd)
+ }
return
}
@@ -767,7 +824,7 @@ func (t *http2Server) closeStream(s *Stream) {
}
func (t *http2Server) RemoteAddr() net.Addr {
- return t.conn.RemoteAddr()
+ return t.remoteAddr
}
func (t *http2Server) Drain() {
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go
index 413f749..4726bb2 100644
--- a/vendor/google.golang.org/grpc/transport/transport.go
+++ b/vendor/google.golang.org/grpc/transport/transport.go
@@ -45,10 +45,10 @@ import (
"sync"
"golang.org/x/net/context"
- "golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/tap"
)
// recvMsg represents the received msg from the transport. All transport
@@ -167,6 +167,11 @@ type Stream struct {
id uint32
// nil for client side Stream.
st ServerTransport
+ // clientStatsCtx keeps the user context for stats handling.
+ // It's only valid on client side. Server side stats context is same as s.ctx.
+ // All client side stats collection should use the clientStatsCtx (instead of the stream context)
+ // so that all the generated stats for a particular RPC can be associated in the processing phase.
+ clientStatsCtx context.Context
// ctx is the associated context of the stream.
ctx context.Context
// cancel is always nil for client side Stream.
@@ -266,11 +271,6 @@ func (s *Stream) Context() context.Context {
return s.ctx
}
-// TraceContext recreates the context of s with a trace.Trace.
-func (s *Stream) TraceContext(tr trace.Trace) {
- s.ctx = trace.NewContext(s.ctx, tr)
-}
-
// Method returns the method for the stream.
func (s *Stream) Method() string {
return s.method
@@ -355,10 +355,17 @@ const (
draining
)
+// ServerConfig consists of all the configurations to establish a server transport.
+type ServerConfig struct {
+ MaxStreams uint32
+ AuthInfo credentials.AuthInfo
+ InTapHandle tap.ServerInHandle
+}
+
// NewServerTransport creates a ServerTransport with conn or non-nil error
// if it fails.
-func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (ServerTransport, error) {
- return newHTTP2Server(conn, maxStreams, authInfo)
+func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
+ return newHTTP2Server(conn, config)
}
// ConnectOptions covers all relevant options for communicating with the server.
@@ -367,6 +374,8 @@ type ConnectOptions struct {
UserAgent string
// Dialer specifies how to dial a network address.
Dialer func(context.Context, string) (net.Conn, error)
+ // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
+ FailOnNonTempDialError bool
// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
@@ -466,7 +475,7 @@ type ClientTransport interface {
// Write methods for a given Stream will be called serially.
type ServerTransport interface {
// HandleStreams receives incoming streams using the given handler.
- HandleStreams(func(*Stream))
+ HandleStreams(func(*Stream), func(context.Context, string) context.Context)
// WriteHeader sends the header metadata for the given stream.
// WriteHeader may not be called on all streams.