diff options
author | Niall Sheridan <nsheridan@gmail.com> | 2017-10-18 13:15:14 +0100 |
---|---|---|
committer | Niall Sheridan <niall@intercom.io> | 2017-10-18 13:25:46 +0100 |
commit | 7b320119ba532fd409ec7dade7ad02011c309599 (patch) | |
tree | a39860f35b55e6cc499f8f5bfa969138c5dd6b73 /vendor/google.golang.org/grpc/call.go | |
parent | 7c99874c7a3e7a89716f3ee0cdf696532e35ae35 (diff) |
Update dependencies
Diffstat (limited to 'vendor/google.golang.org/grpc/call.go')
-rw-r--r-- | vendor/google.golang.org/grpc/call.go | 177 |
1 files changed, 97 insertions, 80 deletions
diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go index 13ca5b7..1ef2507 100644 --- a/vendor/google.golang.org/grpc/call.go +++ b/vendor/google.golang.org/grpc/call.go @@ -1,33 +1,18 @@ /* * - * Copyright 2014, Google Inc. - * All rights reserved. + * Copyright 2014 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -40,6 +25,7 @@ import ( "golang.org/x/net/context" "golang.org/x/net/trace" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/peer" "google.golang.org/grpc/stats" @@ -73,7 +59,10 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran } } for { - if err = recv(p, dopts.codec, stream, dopts.dc, reply, dopts.maxMsgSize, inPayload); err != nil { + if c.maxReceiveMessageSize == nil { + return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") + } + if err = recv(p, dopts.codec, stream, dopts.dc, reply, *c.maxReceiveMessageSize, inPayload); err != nil { if err == io.EOF { break } @@ -86,18 +75,11 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran dopts.copts.StatsHandler.HandleRPC(ctx, inPayload) } c.trailerMD = stream.Trailer() - if peer, ok := peer.FromContext(stream.Context()); ok { - c.peer = peer - } return nil } // sendRequest writes out various information of an RPC such as Context and Message. -func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) { - stream, err := t.NewStream(ctx, callHdr) - if err != nil { - return nil, err - } +func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, c *callInfo, callHdr *transport.CallHdr, stream *transport.Stream, t transport.ClientTransport, args interface{}, opts *transport.Options) (err error) { defer func() { if err != nil { // If err is connection error, t will be closed, no need to close stream here. @@ -118,11 +100,17 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, Client: true, } } - outBuf, err := encode(dopts.codec, args, compressor, cbuf, outPayload) + hdr, data, err := encode(dopts.codec, args, compressor, cbuf, outPayload) if err != nil { - return nil, Errorf(codes.Internal, "grpc: %v", err) + return err + } + if c.maxSendMessageSize == nil { + return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") } - err = t.Write(stream, outBuf, opts) + if len(data) > *c.maxSendMessageSize { + return Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), *c.maxSendMessageSize) + } + err = t.Write(stream, hdr, data, opts) if err == nil && outPayload != nil { outPayload.SentTime = time.Now() dopts.copts.StatsHandler.HandleRPC(ctx, outPayload) @@ -131,10 +119,10 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, // does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following // recvResponse to get the final status. if err != nil && err != io.EOF { - return nil, err + return err } // Sent successfully. - return stream, nil + return nil } // Invoke sends the RPC request on the wire and returns after response is received. @@ -148,25 +136,33 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) { - c := defaultCallInfo - if mc, ok := cc.getMethodConfig(method); ok { - c.failFast = !mc.WaitForReady - if mc.Timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, mc.Timeout) - defer cancel() - } + c := defaultCallInfo() + mc := cc.GetMethodConfig(method) + if mc.WaitForReady != nil { + c.failFast = !*mc.WaitForReady } + + if mc.Timeout != nil && *mc.Timeout >= 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) + defer cancel() + } + + opts = append(cc.dopts.callOptions, opts...) for _, o := range opts { - if err := o.before(&c); err != nil { + if err := o.before(c); err != nil { return toRPCErr(err) } } defer func() { for _, o := range opts { - o.after(&c) + o.after(c) } }() + + c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) + c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) + if EnableTracing { c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) defer c.traceInfo.tr.Finish() @@ -183,26 +179,25 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } }() } + ctx = newContextWithRPCInfo(ctx, c.failFast) sh := cc.dopts.copts.StatsHandler if sh != nil { - ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method}) + ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) begin := &stats.Begin{ Client: true, BeginTime: time.Now(), FailFast: c.failFast, } sh.HandleRPC(ctx, begin) - } - defer func() { - if sh != nil { + defer func() { end := &stats.End{ Client: true, EndTime: time.Now(), Error: e, } sh.HandleRPC(ctx, end) - } - }() + }() + } topts := &transport.Options{ Last: true, Delay: false, @@ -212,9 +207,9 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli err error t transport.ClientTransport stream *transport.Stream - // Record the put handler from Balancer.Get(...). It is called once the + // Record the done handler from Balancer.Get(...). It is called once the // RPC has completed or failed. - put func() + done func(balancer.DoneInfo) ) // TODO(zhaoq): Need a formal spec of fail-fast. callHdr := &transport.CallHdr{ @@ -224,11 +219,11 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli if cc.dopts.cp != nil { callHdr.SendCompress = cc.dopts.cp.Type() } - - gopts := BalancerGetOptions{ - BlockingWait: !c.failFast, + if c.creds != nil { + callHdr.Creds = c.creds } - t, put, err = cc.getTransport(ctx, gopts) + + t, done, err = cc.getTransport(ctx, c.failFast) if err != nil { // TODO(zhaoq): Probably revisit the error handling. if _, ok := status.FromError(err); ok { @@ -246,33 +241,52 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli if c.traceInfo.tr != nil { c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) } - stream, err = sendRequest(ctx, cc.dopts, cc.dopts.cp, callHdr, t, args, topts) + stream, err = t.NewStream(ctx, callHdr) if err != nil { - if put != nil { - put() - put = nil + if done != nil { + if _, ok := err.(transport.ConnectionError); ok { + // If error is connection error, transport was sending data on wire, + // and we are not sure if anything has been sent on wire. + // If error is not connection error, we are sure nothing has been sent. + updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false}) + } + done(balancer.DoneInfo{Err: err}) + } + if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { + continue + } + return toRPCErr(err) + } + if peer, ok := peer.FromContext(stream.Context()); ok { + c.peer = peer + } + err = sendRequest(ctx, cc.dopts, cc.dopts.cp, c, callHdr, stream, t, args, topts) + if err != nil { + if done != nil { + updateRPCInfoInContext(ctx, rpcInfo{ + bytesSent: stream.BytesSent(), + bytesReceived: stream.BytesReceived(), + }) + done(balancer.DoneInfo{Err: err}) } // Retry a non-failfast RPC when // i) there is a connection error; or // ii) the server started to drain before this RPC was initiated. - if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain { - if c.failFast { - return toRPCErr(err) - } + if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { continue } return toRPCErr(err) } - err = recvResponse(ctx, cc.dopts, t, &c, stream, reply) + err = recvResponse(ctx, cc.dopts, t, c, stream, reply) if err != nil { - if put != nil { - put() - put = nil + if done != nil { + updateRPCInfoInContext(ctx, rpcInfo{ + bytesSent: stream.BytesSent(), + bytesReceived: stream.BytesReceived(), + }) + done(balancer.DoneInfo{Err: err}) } - if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain { - if c.failFast { - return toRPCErr(err) - } + if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { continue } return toRPCErr(err) @@ -281,9 +295,12 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true) } t.CloseStream(stream, nil) - if put != nil { - put() - put = nil + if done != nil { + updateRPCInfoInContext(ctx, rpcInfo{ + bytesSent: stream.BytesSent(), + bytesReceived: stream.BytesReceived(), + }) + done(balancer.DoneInfo{Err: err}) } return stream.Status().Err() } |