aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/rpc_util.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/rpc_util.go')
-rw-r--r--vendor/google.golang.org/grpc/rpc_util.go415
1 files changed, 415 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
new file mode 100644
index 0000000..06544ad
--- /dev/null
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -0,0 +1,415 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package grpc
+
+import (
+ "bytes"
+ "compress/gzip"
+ "encoding/binary"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "math"
+ "os"
+
+ "github.com/golang/protobuf/proto"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/transport"
+)
+
+// Codec defines the interface gRPC uses to encode and decode messages.
+type Codec interface {
+ // Marshal returns the wire format of v.
+ Marshal(v interface{}) ([]byte, error)
+ // Unmarshal parses the wire format into v.
+ Unmarshal(data []byte, v interface{}) error
+ // String returns the name of the Codec implementation. The returned
+ // string will be used as part of content type in transmission.
+ String() string
+}
+
+// protoCodec is a Codec implemetation with protobuf. It is the default codec for gRPC.
+type protoCodec struct{}
+
+func (protoCodec) Marshal(v interface{}) ([]byte, error) {
+ return proto.Marshal(v.(proto.Message))
+}
+
+func (protoCodec) Unmarshal(data []byte, v interface{}) error {
+ return proto.Unmarshal(data, v.(proto.Message))
+}
+
+func (protoCodec) String() string {
+ return "proto"
+}
+
+// Compressor defines the interface gRPC uses to compress a message.
+type Compressor interface {
+ // Do compresses p into w.
+ Do(w io.Writer, p []byte) error
+ // Type returns the compression algorithm the Compressor uses.
+ Type() string
+}
+
+// NewGZIPCompressor creates a Compressor based on GZIP.
+func NewGZIPCompressor() Compressor {
+ return &gzipCompressor{}
+}
+
+type gzipCompressor struct {
+}
+
+func (c *gzipCompressor) Do(w io.Writer, p []byte) error {
+ z := gzip.NewWriter(w)
+ if _, err := z.Write(p); err != nil {
+ return err
+ }
+ return z.Close()
+}
+
+func (c *gzipCompressor) Type() string {
+ return "gzip"
+}
+
+// Decompressor defines the interface gRPC uses to decompress a message.
+type Decompressor interface {
+ // Do reads the data from r and uncompress them.
+ Do(r io.Reader) ([]byte, error)
+ // Type returns the compression algorithm the Decompressor uses.
+ Type() string
+}
+
+type gzipDecompressor struct {
+}
+
+// NewGZIPDecompressor creates a Decompressor based on GZIP.
+func NewGZIPDecompressor() Decompressor {
+ return &gzipDecompressor{}
+}
+
+func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) {
+ z, err := gzip.NewReader(r)
+ if err != nil {
+ return nil, err
+ }
+ defer z.Close()
+ return ioutil.ReadAll(z)
+}
+
+func (d *gzipDecompressor) Type() string {
+ return "gzip"
+}
+
+// callInfo contains all related configuration and information about an RPC.
+type callInfo struct {
+ failFast bool
+ headerMD metadata.MD
+ trailerMD metadata.MD
+ traceInfo traceInfo // in trace.go
+}
+
+// CallOption configures a Call before it starts or extracts information from
+// a Call after it completes.
+type CallOption interface {
+ // before is called before the call is sent to any server. If before
+ // returns a non-nil error, the RPC fails with that error.
+ before(*callInfo) error
+
+ // after is called after the call has completed. after cannot return an
+ // error, so any failures should be reported via output parameters.
+ after(*callInfo)
+}
+
+type beforeCall func(c *callInfo) error
+
+func (o beforeCall) before(c *callInfo) error { return o(c) }
+func (o beforeCall) after(c *callInfo) {}
+
+type afterCall func(c *callInfo)
+
+func (o afterCall) before(c *callInfo) error { return nil }
+func (o afterCall) after(c *callInfo) { o(c) }
+
+// Header returns a CallOptions that retrieves the header metadata
+// for a unary RPC.
+func Header(md *metadata.MD) CallOption {
+ return afterCall(func(c *callInfo) {
+ *md = c.headerMD
+ })
+}
+
+// Trailer returns a CallOptions that retrieves the trailer metadata
+// for a unary RPC.
+func Trailer(md *metadata.MD) CallOption {
+ return afterCall(func(c *callInfo) {
+ *md = c.trailerMD
+ })
+}
+
+// The format of the payload: compressed or not?
+type payloadFormat uint8
+
+const (
+ compressionNone payloadFormat = iota // no compression
+ compressionMade
+)
+
+// parser reads complelete gRPC messages from the underlying reader.
+type parser struct {
+ // r is the underlying reader.
+ // See the comment on recvMsg for the permissible
+ // error types.
+ r io.Reader
+
+ // The header of a gRPC message. Find more detail
+ // at http://www.grpc.io/docs/guides/wire.html.
+ header [5]byte
+}
+
+// recvMsg reads a complete gRPC message from the stream.
+//
+// It returns the message and its payload (compression/encoding)
+// format. The caller owns the returned msg memory.
+//
+// If there is an error, possible values are:
+// * io.EOF, when no messages remain
+// * io.ErrUnexpectedEOF
+// * of type transport.ConnectionError
+// * of type transport.StreamError
+// No other error values or types must be returned, which also means
+// that the underlying io.Reader must not return an incompatible
+// error.
+func (p *parser) recvMsg() (pf payloadFormat, msg []byte, err error) {
+ if _, err := io.ReadFull(p.r, p.header[:]); err != nil {
+ return 0, nil, err
+ }
+
+ pf = payloadFormat(p.header[0])
+ length := binary.BigEndian.Uint32(p.header[1:])
+
+ if length == 0 {
+ return pf, nil, nil
+ }
+ // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
+ // of making it for each message:
+ msg = make([]byte, int(length))
+ if _, err := io.ReadFull(p.r, msg); err != nil {
+ if err == io.EOF {
+ err = io.ErrUnexpectedEOF
+ }
+ return 0, nil, err
+ }
+ return pf, msg, nil
+}
+
+// encode serializes msg and prepends the message header. If msg is nil, it
+// generates the message header of 0 message length.
+func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte, error) {
+ var b []byte
+ var length uint
+ if msg != nil {
+ var err error
+ // TODO(zhaoq): optimize to reduce memory alloc and copying.
+ b, err = c.Marshal(msg)
+ if err != nil {
+ return nil, err
+ }
+ if cp != nil {
+ if err := cp.Do(cbuf, b); err != nil {
+ return nil, err
+ }
+ b = cbuf.Bytes()
+ }
+ length = uint(len(b))
+ }
+ if length > math.MaxUint32 {
+ return nil, Errorf(codes.InvalidArgument, "grpc: message too large (%d bytes)", length)
+ }
+
+ const (
+ payloadLen = 1
+ sizeLen = 4
+ )
+
+ var buf = make([]byte, payloadLen+sizeLen+len(b))
+
+ // Write payload format
+ if cp == nil {
+ buf[0] = byte(compressionNone)
+ } else {
+ buf[0] = byte(compressionMade)
+ }
+ // Write length of b into buf
+ binary.BigEndian.PutUint32(buf[1:], uint32(length))
+ // Copy encoded msg to buf
+ copy(buf[5:], b)
+
+ return buf, nil
+}
+
+func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) error {
+ switch pf {
+ case compressionNone:
+ case compressionMade:
+ if dc == nil || recvCompress != dc.Type() {
+ return transport.StreamErrorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
+ }
+ default:
+ return transport.StreamErrorf(codes.Internal, "grpc: received unexpected payload format %d", pf)
+ }
+ return nil
+}
+
+func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}) error {
+ pf, d, err := p.recvMsg()
+ if err != nil {
+ return err
+ }
+ if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {
+ return err
+ }
+ if pf == compressionMade {
+ d, err = dc.Do(bytes.NewReader(d))
+ if err != nil {
+ return transport.StreamErrorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
+ }
+ }
+ if err := c.Unmarshal(d, m); err != nil {
+ return transport.StreamErrorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
+ }
+ return nil
+}
+
+// rpcError defines the status from an RPC.
+type rpcError struct {
+ code codes.Code
+ desc string
+}
+
+func (e rpcError) Error() string {
+ return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc)
+}
+
+// Code returns the error code for err if it was produced by the rpc system.
+// Otherwise, it returns codes.Unknown.
+func Code(err error) codes.Code {
+ if err == nil {
+ return codes.OK
+ }
+ if e, ok := err.(rpcError); ok {
+ return e.code
+ }
+ return codes.Unknown
+}
+
+// ErrorDesc returns the error description of err if it was produced by the rpc system.
+// Otherwise, it returns err.Error() or empty string when err is nil.
+func ErrorDesc(err error) string {
+ if err == nil {
+ return ""
+ }
+ if e, ok := err.(rpcError); ok {
+ return e.desc
+ }
+ return err.Error()
+}
+
+// Errorf returns an error containing an error code and a description;
+// Errorf returns nil if c is OK.
+func Errorf(c codes.Code, format string, a ...interface{}) error {
+ if c == codes.OK {
+ return nil
+ }
+ return rpcError{
+ code: c,
+ desc: fmt.Sprintf(format, a...),
+ }
+}
+
+// toRPCErr converts an error into a rpcError.
+func toRPCErr(err error) error {
+ switch e := err.(type) {
+ case rpcError:
+ return err
+ case transport.StreamError:
+ return rpcError{
+ code: e.Code,
+ desc: e.Desc,
+ }
+ case transport.ConnectionError:
+ return rpcError{
+ code: codes.Internal,
+ desc: e.Desc,
+ }
+ }
+ return Errorf(codes.Unknown, "%v", err)
+}
+
+// convertCode converts a standard Go error into its canonical code. Note that
+// this is only used to translate the error returned by the server applications.
+func convertCode(err error) codes.Code {
+ switch err {
+ case nil:
+ return codes.OK
+ case io.EOF:
+ return codes.OutOfRange
+ case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF:
+ return codes.FailedPrecondition
+ case os.ErrInvalid:
+ return codes.InvalidArgument
+ case context.Canceled:
+ return codes.Canceled
+ case context.DeadlineExceeded:
+ return codes.DeadlineExceeded
+ }
+ switch {
+ case os.IsExist(err):
+ return codes.AlreadyExists
+ case os.IsNotExist(err):
+ return codes.NotFound
+ case os.IsPermission(err):
+ return codes.PermissionDenied
+ }
+ return codes.Unknown
+}
+
+// SupportPackageIsVersion2 is referenced from generated protocol buffer files
+// to assert that that code is compatible with this version of the grpc package.
+//
+// This constant may be renamed in the future if a change in the generated code
+// requires a synchronised update of grpc-go and protoc-gen-go. This constant
+// should not be referenced from any other code.
+const SupportPackageIsVersion2 = true