aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/rpc_util.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/rpc_util.go')
-rw-r--r--vendor/google.golang.org/grpc/rpc_util.go74
1 files changed, 68 insertions, 6 deletions
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
index 6b60095..2619d39 100644
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -42,11 +42,13 @@ import (
"io/ioutil"
"math"
"os"
+ "time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@@ -255,9 +257,11 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro
// encode serializes msg and prepends the message header. If msg is nil, it
// generates the message header of 0 message length.
-func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte, error) {
- var b []byte
- var length uint
+func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayload *stats.OutPayload) ([]byte, error) {
+ var (
+ b []byte
+ length uint
+ )
if msg != nil {
var err error
// TODO(zhaoq): optimize to reduce memory alloc and copying.
@@ -265,6 +269,12 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte
if err != nil {
return nil, err
}
+ if outPayload != nil {
+ outPayload.Payload = msg
+ // TODO truncate large payload.
+ outPayload.Data = b
+ outPayload.Length = len(b)
+ }
if cp != nil {
if err := cp.Do(cbuf, b); err != nil {
return nil, err
@@ -295,6 +305,10 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte
// Copy encoded msg to buf
copy(buf[5:], b)
+ if outPayload != nil {
+ outPayload.WireLength = len(buf)
+ }
+
return buf, nil
}
@@ -311,11 +325,14 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er
return nil
}
-func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int) error {
+func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error {
pf, d, err := p.recvMsg(maxMsgSize)
if err != nil {
return err
}
+ if inPayload != nil {
+ inPayload.WireLength = len(d)
+ }
if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {
return err
}
@@ -333,6 +350,13 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
if err := c.Unmarshal(d, m); err != nil {
return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
}
+ if inPayload != nil {
+ inPayload.RecvTime = time.Now()
+ inPayload.Payload = m
+ // TODO truncate large payload.
+ inPayload.Data = d
+ inPayload.Length = len(d)
+ }
return nil
}
@@ -448,10 +472,48 @@ func convertCode(err error) codes.Code {
return codes.Unknown
}
-// SupportPackageIsVersion3 is referenced from generated protocol buffer files
+// MethodConfig defines the configuration recommended by the service providers for a
+// particular method.
+// This is EXPERIMENTAL and subject to change.
+type MethodConfig struct {
+ // WaitForReady indicates whether RPCs sent to this method should wait until
+ // the connection is ready by default (!failfast). The value specified via the
+ // gRPC client API will override the value set here.
+ WaitForReady bool
+ // Timeout is the default timeout for RPCs sent to this method. The actual
+ // deadline used will be the minimum of the value specified here and the value
+ // set by the application via the gRPC client API. If either one is not set,
+ // then the other will be used. If neither is set, then the RPC has no deadline.
+ Timeout time.Duration
+ // MaxReqSize is the maximum allowed payload size for an individual request in a
+ // stream (client->server) in bytes. The size which is measured is the serialized,
+ // uncompressed payload in bytes. The actual value used is the minumum of the value
+ // specified here and the value set by the application via the gRPC client API. If
+ // either one is not set, then the other will be used. If neither is set, then the
+ // built-in default is used.
+ // TODO: support this.
+ MaxReqSize uint64
+ // MaxRespSize is the maximum allowed payload size for an individual response in a
+ // stream (server->client) in bytes.
+ // TODO: support this.
+ MaxRespSize uint64
+}
+
+// ServiceConfig is provided by the service provider and contains parameters for how
+// clients that connect to the service should behave.
+// This is EXPERIMENTAL and subject to change.
+type ServiceConfig struct {
+ // LB is the load balancer the service providers recommends. The balancer specified
+ // via grpc.WithBalancer will override this.
+ LB Balancer
+ // Methods contains a map for the methods in this service.
+ Methods map[string]MethodConfig
+}
+
+// SupportPackageIsVersion4 is referenced from generated protocol buffer files
// to assert that that code is compatible with this version of the grpc package.
//
// This constant may be renamed in the future if a change in the generated code
// requires a synchronised update of grpc-go and protoc-gen-go. This constant
// should not be referenced from any other code.
-const SupportPackageIsVersion3 = true
+const SupportPackageIsVersion4 = true