aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org')
-rw-r--r--vendor/google.golang.org/api/gensupport/resumable.go32
-rw-r--r--vendor/google.golang.org/api/gensupport/retry.go8
-rw-r--r--vendor/google.golang.org/api/googleapi/transport/apikey.go38
-rw-r--r--vendor/google.golang.org/api/googleapi/types.go20
-rw-r--r--vendor/google.golang.org/api/internal/settings.go1
-rw-r--r--vendor/google.golang.org/api/oauth2/v2/oauth2-api.json8
-rw-r--r--vendor/google.golang.org/api/oauth2/v2/oauth2-gen.go52
-rw-r--r--vendor/google.golang.org/api/option/option.go10
-rw-r--r--vendor/google.golang.org/api/storage/v1/storage-api.json9
-rw-r--r--vendor/google.golang.org/api/storage/v1/storage-gen.go460
-rw-r--r--vendor/google.golang.org/api/transport/dial.go10
-rw-r--r--vendor/google.golang.org/appengine/internal/api.go3
-rw-r--r--vendor/google.golang.org/grpc/README.md25
-rw-r--r--vendor/google.golang.org/grpc/call.go69
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go86
-rwxr-xr-xvendor/google.golang.org/grpc/codegen.sh2
-rwxr-xr-xvendor/google.golang.org/grpc/coverage.sh3
-rw-r--r--vendor/google.golang.org/grpc/credentials/oauth/oauth.go2
-rw-r--r--vendor/google.golang.org/grpc/metadata/metadata.go2
-rw-r--r--vendor/google.golang.org/grpc/rpc_util.go74
-rw-r--r--vendor/google.golang.org/grpc/server.go154
-rw-r--r--vendor/google.golang.org/grpc/stats/handlers.go146
-rw-r--r--vendor/google.golang.org/grpc/stats/stats.go223
-rw-r--r--vendor/google.golang.org/grpc/stream.go146
-rw-r--r--vendor/google.golang.org/grpc/tap/tap.go54
-rw-r--r--vendor/google.golang.org/grpc/transport/control.go34
-rw-r--r--vendor/google.golang.org/grpc/transport/handler_server.go2
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_client.go94
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_server.go87
-rw-r--r--vendor/google.golang.org/grpc/transport/transport.go27
30 files changed, 1708 insertions, 173 deletions
diff --git a/vendor/google.golang.org/api/gensupport/resumable.go b/vendor/google.golang.org/api/gensupport/resumable.go
index 695e365..dcd591f 100644
--- a/vendor/google.golang.org/api/gensupport/resumable.go
+++ b/vendor/google.golang.org/api/gensupport/resumable.go
@@ -5,6 +5,7 @@
package gensupport
import (
+ "errors"
"fmt"
"io"
"net/http"
@@ -15,10 +16,6 @@ import (
)
const (
- // statusResumeIncomplete is the code returned by the Google uploader
- // when the transfer is not yet complete.
- statusResumeIncomplete = 308
-
// statusTooManyRequests is returned by the storage API if the
// per-project limits have been temporarily exceeded. The request
// should be retried.
@@ -79,8 +76,23 @@ func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader,
req.Header.Set("Content-Range", contentRange)
req.Header.Set("Content-Type", rx.MediaType)
req.Header.Set("User-Agent", rx.UserAgent)
+
+ // Google's upload endpoint uses status code 308 for a
+ // different purpose than the "308 Permanent Redirect"
+ // since-standardized in RFC 7238. Because of the conflict in
+ // semantics, Google added this new request header which
+ // causes it to not use "308" and instead reply with 200 OK
+ // and sets the upload-specific "X-HTTP-Status-Code-Override:
+ // 308" response header.
+ req.Header.Set("X-GUploader-No-308", "yes")
+
return SendRequest(ctx, rx.Client, req)
+}
+func statusResumeIncomplete(resp *http.Response) bool {
+ // This is how the server signals "status resume incomplete"
+ // when X-GUploader-No-308 is set to "yes":
+ return resp != nil && resp.Header.Get("X-Http-Status-Code-Override") == "308"
}
// reportProgress calls a user-supplied callback to report upload progress.
@@ -111,11 +123,17 @@ func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, e
return res, err
}
- if res.StatusCode == statusResumeIncomplete || res.StatusCode == http.StatusOK {
+ // We sent "X-GUploader-No-308: yes" (see comment elsewhere in
+ // this file), so we don't expect to get a 308.
+ if res.StatusCode == 308 {
+ return nil, errors.New("unexpected 308 response status code")
+ }
+
+ if res.StatusCode == http.StatusOK {
rx.reportProgress(off, off+int64(size))
}
- if res.StatusCode == statusResumeIncomplete {
+ if statusResumeIncomplete(res) {
rx.Media.Next()
}
return res, nil
@@ -177,7 +195,7 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err
// If the chunk was uploaded successfully, but there's still
// more to go, upload the next chunk without any delay.
- if status == statusResumeIncomplete {
+ if statusResumeIncomplete(resp) {
pause = 0
backoff.Reset()
resp.Body.Close()
diff --git a/vendor/google.golang.org/api/gensupport/retry.go b/vendor/google.golang.org/api/gensupport/retry.go
index 7f83d1d..9023368 100644
--- a/vendor/google.golang.org/api/gensupport/retry.go
+++ b/vendor/google.golang.org/api/gensupport/retry.go
@@ -55,23 +55,17 @@ func DefaultBackoffStrategy() BackoffStrategy {
// shouldRetry returns true if the HTTP response / error indicates that the
// request should be attempted again.
func shouldRetry(status int, err error) bool {
- // Retry for 5xx response codes.
- if 500 <= status && status < 600 {
+ if 500 <= status && status <= 599 {
return true
}
-
- // Retry on statusTooManyRequests{
if status == statusTooManyRequests {
return true
}
-
- // Retry on unexpected EOFs and temporary network errors.
if err == io.ErrUnexpectedEOF {
return true
}
if err, ok := err.(net.Error); ok {
return err.Temporary()
}
-
return false
}
diff --git a/vendor/google.golang.org/api/googleapi/transport/apikey.go b/vendor/google.golang.org/api/googleapi/transport/apikey.go
new file mode 100644
index 0000000..eca1ea2
--- /dev/null
+++ b/vendor/google.golang.org/api/googleapi/transport/apikey.go
@@ -0,0 +1,38 @@
+// Copyright 2012 Google Inc. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package transport contains HTTP transports used to make
+// authenticated API requests.
+package transport
+
+import (
+ "errors"
+ "net/http"
+)
+
+// APIKey is an HTTP Transport which wraps an underlying transport and
+// appends an API Key "key" parameter to the URL of outgoing requests.
+type APIKey struct {
+ // Key is the API Key to set on requests.
+ Key string
+
+ // Transport is the underlying HTTP transport.
+ // If nil, http.DefaultTransport is used.
+ Transport http.RoundTripper
+}
+
+func (t *APIKey) RoundTrip(req *http.Request) (*http.Response, error) {
+ rt := t.Transport
+ if rt == nil {
+ rt = http.DefaultTransport
+ if rt == nil {
+ return nil, errors.New("googleapi/transport: no Transport specified or available")
+ }
+ }
+ newReq := *req
+ args := newReq.URL.Query()
+ args.Set("key", t.Key)
+ newReq.URL.RawQuery = args.Encode()
+ return rt.RoundTrip(&newReq)
+}
diff --git a/vendor/google.golang.org/api/googleapi/types.go b/vendor/google.golang.org/api/googleapi/types.go
index a02b4b0..c8fdd54 100644
--- a/vendor/google.golang.org/api/googleapi/types.go
+++ b/vendor/google.golang.org/api/googleapi/types.go
@@ -6,6 +6,7 @@ package googleapi
import (
"encoding/json"
+ "errors"
"strconv"
)
@@ -149,6 +150,25 @@ func (s Float64s) MarshalJSON() ([]byte, error) {
})
}
+// RawMessage is a raw encoded JSON value.
+// It is identical to json.RawMessage, except it does not suffer from
+// https://golang.org/issue/14493.
+type RawMessage []byte
+
+// MarshalJSON returns m.
+func (m RawMessage) MarshalJSON() ([]byte, error) {
+ return m, nil
+}
+
+// UnmarshalJSON sets *m to a copy of data.
+func (m *RawMessage) UnmarshalJSON(data []byte) error {
+ if m == nil {
+ return errors.New("googleapi.RawMessage: UnmarshalJSON on nil pointer")
+ }
+ *m = append((*m)[:0], data...)
+ return nil
+}
+
/*
* Helper routines for simplifying the creation of optional fields of basic type.
*/
diff --git a/vendor/google.golang.org/api/internal/settings.go b/vendor/google.golang.org/api/internal/settings.go
index 976280b..6e60e48 100644
--- a/vendor/google.golang.org/api/internal/settings.go
+++ b/vendor/google.golang.org/api/internal/settings.go
@@ -16,6 +16,7 @@ type DialSettings struct {
ServiceAccountJSONFilename string // if set, TokenSource is ignored.
TokenSource oauth2.TokenSource
UserAgent string
+ APIKey string
HTTPClient *http.Client
GRPCDialOpts []grpc.DialOption
GRPCConn *grpc.ClientConn
diff --git a/vendor/google.golang.org/api/oauth2/v2/oauth2-api.json b/vendor/google.golang.org/api/oauth2/v2/oauth2-api.json
index fb63f08..f1f8beb 100644
--- a/vendor/google.golang.org/api/oauth2/v2/oauth2-api.json
+++ b/vendor/google.golang.org/api/oauth2/v2/oauth2-api.json
@@ -1,18 +1,18 @@
{
"kind": "discovery#restDescription",
- "etag": "\"jQLIOHBVnDZie4rQHGH1WJF-INE/VY5CxwtmcPmH-ruiSL2amW9TN0Q\"",
+ "etag": "\"tbys6C40o18GZwyMen5GMkdK-3s/DLGEtypjIuIXh77Iqrmfcan50ew\"",
"discoveryVersion": "v1",
"id": "oauth2:v2",
"name": "oauth2",
"version": "v2",
- "revision": "20160330",
+ "revision": "20161103",
"title": "Google OAuth2 API",
"description": "Obtains end-user authorization grants for use with other Google APIs.",
"ownerDomain": "google.com",
"ownerName": "Google",
"icons": {
- "x16": "http://www.google.com/images/icons/product/search-16.gif",
- "x32": "http://www.google.com/images/icons/product/search-32.gif"
+ "x16": "https://www.gstatic.com/images/branding/product/1x/googleg_16dp.png",
+ "x32": "https://www.gstatic.com/images/branding/product/1x/googleg_32dp.png"
},
"documentationLink": "https://developers.google.com/accounts/docs/OAuth2",
"protocol": "rest",
diff --git a/vendor/google.golang.org/api/oauth2/v2/oauth2-gen.go b/vendor/google.golang.org/api/oauth2/v2/oauth2-gen.go
index b1a7694..7440468 100644
--- a/vendor/google.golang.org/api/oauth2/v2/oauth2-gen.go
+++ b/vendor/google.golang.org/api/oauth2/v2/oauth2-gen.go
@@ -317,6 +317,7 @@ type GetCertForOpenIdConnectCall struct {
urlParams_ gensupport.URLParams
ifNoneMatch_ string
ctx_ context.Context
+ header_ http.Header
}
// GetCertForOpenIdConnect:
@@ -351,8 +352,20 @@ func (c *GetCertForOpenIdConnectCall) Context(ctx context.Context) *GetCertForOp
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *GetCertForOpenIdConnectCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *GetCertForOpenIdConnectCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
if c.ifNoneMatch_ != "" {
reqHeaders.Set("If-None-Match", c.ifNoneMatch_)
@@ -420,6 +433,7 @@ type TokeninfoCall struct {
s *Service
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Tokeninfo:
@@ -462,8 +476,20 @@ func (c *TokeninfoCall) Context(ctx context.Context) *TokeninfoCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *TokeninfoCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *TokeninfoCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
c.urlParams_.Set("alt", alt)
@@ -543,6 +569,7 @@ type UserinfoGetCall struct {
urlParams_ gensupport.URLParams
ifNoneMatch_ string
ctx_ context.Context
+ header_ http.Header
}
// Get:
@@ -577,8 +604,20 @@ func (c *UserinfoGetCall) Context(ctx context.Context) *UserinfoGetCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *UserinfoGetCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *UserinfoGetCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
if c.ifNoneMatch_ != "" {
reqHeaders.Set("If-None-Match", c.ifNoneMatch_)
@@ -653,6 +692,7 @@ type UserinfoV2MeGetCall struct {
urlParams_ gensupport.URLParams
ifNoneMatch_ string
ctx_ context.Context
+ header_ http.Header
}
// Get:
@@ -687,8 +727,20 @@ func (c *UserinfoV2MeGetCall) Context(ctx context.Context) *UserinfoV2MeGetCall
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *UserinfoV2MeGetCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *UserinfoV2MeGetCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
if c.ifNoneMatch_ != "" {
reqHeaders.Set("If-None-Match", c.ifNoneMatch_)
diff --git a/vendor/google.golang.org/api/option/option.go b/vendor/google.golang.org/api/option/option.go
index b935e6d..f266919 100644
--- a/vendor/google.golang.org/api/option/option.go
+++ b/vendor/google.golang.org/api/option/option.go
@@ -130,3 +130,13 @@ func (w withGRPCConnectionPool) Apply(o *internal.DialSettings) {
balancer := grpc.RoundRobin(internal.NewPoolResolver(int(w), o))
o.GRPCDialOpts = append(o.GRPCDialOpts, grpc.WithBalancer(balancer))
}
+
+// WithAPIKey returns a ClientOption that specifies an API key to be used
+// as the basis for authentication.
+func WithAPIKey(apiKey string) ClientOption {
+ return withAPIKey(apiKey)
+}
+
+type withAPIKey string
+
+func (w withAPIKey) Apply(o *internal.DialSettings) { o.APIKey = string(w) }
diff --git a/vendor/google.golang.org/api/storage/v1/storage-api.json b/vendor/google.golang.org/api/storage/v1/storage-api.json
index 598d1a5..67fe1a2 100644
--- a/vendor/google.golang.org/api/storage/v1/storage-api.json
+++ b/vendor/google.golang.org/api/storage/v1/storage-api.json
@@ -1,11 +1,11 @@
{
"kind": "discovery#restDescription",
- "etag": "\"C5oy1hgQsABtYOYIOXWcR3BgYqU/G3kZz5Dv92Y-2NZwaNrcr5jwm4A\"",
+ "etag": "\"tbys6C40o18GZwyMen5GMkdK-3s/sMgjc4eoIFjgub4daTU-MGW0WMA\"",
"discoveryVersion": "v1",
"id": "storage:v1",
"name": "storage",
"version": "v1",
- "revision": "20161019",
+ "revision": "20161109",
"title": "Cloud Storage JSON API",
"description": "Stores and retrieves potentially large, immutable data objects.",
"ownerDomain": "google.com",
@@ -685,6 +685,11 @@
"description": "The deletion time of the object in RFC 3339 format. Will be returned if and only if this version of the object has been deleted.",
"format": "date-time"
},
+ "timeStorageClassUpdated": {
+ "type": "string",
+ "description": "The time at which the object's storage class was last changed. When the object is initially created, it will be set to timeCreated.",
+ "format": "date-time"
+ },
"updated": {
"type": "string",
"description": "The modification time of the object metadata in RFC 3339 format.",
diff --git a/vendor/google.golang.org/api/storage/v1/storage-gen.go b/vendor/google.golang.org/api/storage/v1/storage-gen.go
index 4c458af..0f1d094 100644
--- a/vendor/google.golang.org/api/storage/v1/storage-gen.go
+++ b/vendor/google.golang.org/api/storage/v1/storage-gen.go
@@ -1034,6 +1034,11 @@ type Object struct {
// deleted.
TimeDeleted string `json:"timeDeleted,omitempty"`
+ // TimeStorageClassUpdated: The time at which the object's storage class
+ // was last changed. When the object is initially created, it will be
+ // set to timeCreated.
+ TimeStorageClassUpdated string `json:"timeStorageClassUpdated,omitempty"`
+
// Updated: The modification time of the object metadata in RFC 3339
// format.
Updated string `json:"updated,omitempty"`
@@ -1390,6 +1395,7 @@ type BucketAccessControlsDeleteCall struct {
entity string
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Delete: Permanently deletes the ACL entry for the specified entity on
@@ -1417,8 +1423,20 @@ func (c *BucketAccessControlsDeleteCall) Context(ctx context.Context) *BucketAcc
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *BucketAccessControlsDeleteCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *BucketAccessControlsDeleteCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
c.urlParams_.Set("alt", alt)
@@ -1485,6 +1503,7 @@ type BucketAccessControlsGetCall struct {
urlParams_ gensupport.URLParams
ifNoneMatch_ string
ctx_ context.Context
+ header_ http.Header
}
// Get: Returns the ACL entry for the specified entity on the specified
@@ -1522,8 +1541,20 @@ func (c *BucketAccessControlsGetCall) Context(ctx context.Context) *BucketAccess
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *BucketAccessControlsGetCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *BucketAccessControlsGetCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
if c.ifNoneMatch_ != "" {
reqHeaders.Set("If-None-Match", c.ifNoneMatch_)
@@ -1620,6 +1651,7 @@ type BucketAccessControlsInsertCall struct {
bucketaccesscontrol *BucketAccessControl
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Insert: Creates a new ACL entry on the specified bucket.
@@ -1646,8 +1678,20 @@ func (c *BucketAccessControlsInsertCall) Context(ctx context.Context) *BucketAcc
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *BucketAccessControlsInsertCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *BucketAccessControlsInsertCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.bucketaccesscontrol)
@@ -1741,6 +1785,7 @@ type BucketAccessControlsListCall struct {
urlParams_ gensupport.URLParams
ifNoneMatch_ string
ctx_ context.Context
+ header_ http.Header
}
// List: Retrieves ACL entries on the specified bucket.
@@ -1776,8 +1821,20 @@ func (c *BucketAccessControlsListCall) Context(ctx context.Context) *BucketAcces
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *BucketAccessControlsListCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *BucketAccessControlsListCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
if c.ifNoneMatch_ != "" {
reqHeaders.Set("If-None-Match", c.ifNoneMatch_)
@@ -1867,6 +1924,7 @@ type BucketAccessControlsPatchCall struct {
bucketaccesscontrol *BucketAccessControl
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Patch: Updates an ACL entry on the specified bucket. This method
@@ -1895,8 +1953,20 @@ func (c *BucketAccessControlsPatchCall) Context(ctx context.Context) *BucketAcce
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *BucketAccessControlsPatchCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *BucketAccessControlsPatchCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.bucketaccesscontrol)
@@ -1999,6 +2069,7 @@ type BucketAccessControlsUpdateCall struct {
bucketaccesscontrol *BucketAccessControl
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Update: Updates an ACL entry on the specified bucket.
@@ -2026,8 +2097,20 @@ func (c *BucketAccessControlsUpdateCall) Context(ctx context.Context) *BucketAcc
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *BucketAccessControlsUpdateCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *BucketAccessControlsUpdateCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.bucketaccesscontrol)
@@ -2128,6 +2211,7 @@ type BucketsDeleteCall struct {
bucket string
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Delete: Permanently deletes an empty bucket.
@@ -2169,8 +2253,20 @@ func (c *BucketsDeleteCall) Context(ctx context.Context) *BucketsDeleteCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *BucketsDeleteCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *BucketsDeleteCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
c.urlParams_.Set("alt", alt)
@@ -2241,6 +2337,7 @@ type BucketsGetCall struct {
urlParams_ gensupport.URLParams
ifNoneMatch_ string
ctx_ context.Context
+ header_ http.Header
}
// Get: Returns metadata for the specified bucket.
@@ -2305,8 +2402,20 @@ func (c *BucketsGetCall) Context(ctx context.Context) *BucketsGetCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *BucketsGetCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *BucketsGetCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
if c.ifNoneMatch_ != "" {
reqHeaders.Set("If-None-Match", c.ifNoneMatch_)
@@ -2422,6 +2531,7 @@ type BucketsInsertCall struct {
bucket *Bucket
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Insert: Creates a new bucket.
@@ -2500,8 +2610,20 @@ func (c *BucketsInsertCall) Context(ctx context.Context) *BucketsInsertCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *BucketsInsertCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *BucketsInsertCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.bucket)
@@ -2645,6 +2767,7 @@ type BucketsListCall struct {
urlParams_ gensupport.URLParams
ifNoneMatch_ string
ctx_ context.Context
+ header_ http.Header
}
// List: Retrieves a list of buckets for a given project.
@@ -2713,8 +2836,20 @@ func (c *BucketsListCall) Context(ctx context.Context) *BucketsListCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *BucketsListCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *BucketsListCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
if c.ifNoneMatch_ != "" {
reqHeaders.Set("If-None-Match", c.ifNoneMatch_)
@@ -2854,6 +2989,7 @@ type BucketsPatchCall struct {
bucket2 *Bucket
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Patch: Updates a bucket. Changes to the bucket will be readable
@@ -2950,8 +3086,20 @@ func (c *BucketsPatchCall) Context(ctx context.Context) *BucketsPatchCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *BucketsPatchCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *BucketsPatchCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.bucket2)
@@ -3110,6 +3258,7 @@ type BucketsUpdateCall struct {
bucket2 *Bucket
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Update: Updates a bucket. Changes to the bucket will be readable
@@ -3206,8 +3355,20 @@ func (c *BucketsUpdateCall) Context(ctx context.Context) *BucketsUpdateCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *BucketsUpdateCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *BucketsUpdateCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.bucket2)
@@ -3365,6 +3526,7 @@ type ChannelsStopCall struct {
channel *Channel
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Stop: Stop watching resources through this channel
@@ -3390,8 +3552,20 @@ func (c *ChannelsStopCall) Context(ctx context.Context) *ChannelsStopCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ChannelsStopCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ChannelsStopCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.channel)
@@ -3447,6 +3621,7 @@ type DefaultObjectAccessControlsDeleteCall struct {
entity string
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Delete: Permanently deletes the default object ACL entry for the
@@ -3474,8 +3649,20 @@ func (c *DefaultObjectAccessControlsDeleteCall) Context(ctx context.Context) *De
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *DefaultObjectAccessControlsDeleteCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *DefaultObjectAccessControlsDeleteCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
c.urlParams_.Set("alt", alt)
@@ -3542,6 +3729,7 @@ type DefaultObjectAccessControlsGetCall struct {
urlParams_ gensupport.URLParams
ifNoneMatch_ string
ctx_ context.Context
+ header_ http.Header
}
// Get: Returns the default object ACL entry for the specified entity on
@@ -3579,8 +3767,20 @@ func (c *DefaultObjectAccessControlsGetCall) Context(ctx context.Context) *Defau
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *DefaultObjectAccessControlsGetCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *DefaultObjectAccessControlsGetCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
if c.ifNoneMatch_ != "" {
reqHeaders.Set("If-None-Match", c.ifNoneMatch_)
@@ -3677,6 +3877,7 @@ type DefaultObjectAccessControlsInsertCall struct {
objectaccesscontrol *ObjectAccessControl
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Insert: Creates a new default object ACL entry on the specified
@@ -3704,8 +3905,20 @@ func (c *DefaultObjectAccessControlsInsertCall) Context(ctx context.Context) *De
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *DefaultObjectAccessControlsInsertCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *DefaultObjectAccessControlsInsertCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.objectaccesscontrol)
@@ -3799,6 +4012,7 @@ type DefaultObjectAccessControlsListCall struct {
urlParams_ gensupport.URLParams
ifNoneMatch_ string
ctx_ context.Context
+ header_ http.Header
}
// List: Retrieves default object ACL entries on the specified bucket.
@@ -3851,8 +4065,20 @@ func (c *DefaultObjectAccessControlsListCall) Context(ctx context.Context) *Defa
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *DefaultObjectAccessControlsListCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *DefaultObjectAccessControlsListCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
if c.ifNoneMatch_ != "" {
reqHeaders.Set("If-None-Match", c.ifNoneMatch_)
@@ -3954,6 +4180,7 @@ type DefaultObjectAccessControlsPatchCall struct {
objectaccesscontrol *ObjectAccessControl
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Patch: Updates a default object ACL entry on the specified bucket.
@@ -3982,8 +4209,20 @@ func (c *DefaultObjectAccessControlsPatchCall) Context(ctx context.Context) *Def
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *DefaultObjectAccessControlsPatchCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *DefaultObjectAccessControlsPatchCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.objectaccesscontrol)
@@ -4086,6 +4325,7 @@ type DefaultObjectAccessControlsUpdateCall struct {
objectaccesscontrol *ObjectAccessControl
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Update: Updates a default object ACL entry on the specified bucket.
@@ -4113,8 +4353,20 @@ func (c *DefaultObjectAccessControlsUpdateCall) Context(ctx context.Context) *De
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *DefaultObjectAccessControlsUpdateCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *DefaultObjectAccessControlsUpdateCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.objectaccesscontrol)
@@ -4217,6 +4469,7 @@ type ObjectAccessControlsDeleteCall struct {
entity string
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Delete: Permanently deletes the ACL entry for the specified entity on
@@ -4253,8 +4506,20 @@ func (c *ObjectAccessControlsDeleteCall) Context(ctx context.Context) *ObjectAcc
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectAccessControlsDeleteCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectAccessControlsDeleteCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
c.urlParams_.Set("alt", alt)
@@ -4336,6 +4601,7 @@ type ObjectAccessControlsGetCall struct {
urlParams_ gensupport.URLParams
ifNoneMatch_ string
ctx_ context.Context
+ header_ http.Header
}
// Get: Returns the ACL entry for the specified entity on the specified
@@ -4382,8 +4648,20 @@ func (c *ObjectAccessControlsGetCall) Context(ctx context.Context) *ObjectAccess
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectAccessControlsGetCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectAccessControlsGetCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
if c.ifNoneMatch_ != "" {
reqHeaders.Set("If-None-Match", c.ifNoneMatch_)
@@ -4495,6 +4773,7 @@ type ObjectAccessControlsInsertCall struct {
objectaccesscontrol *ObjectAccessControl
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Insert: Creates a new ACL entry on the specified object.
@@ -4530,8 +4809,20 @@ func (c *ObjectAccessControlsInsertCall) Context(ctx context.Context) *ObjectAcc
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectAccessControlsInsertCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectAccessControlsInsertCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.objectaccesscontrol)
@@ -4640,6 +4931,7 @@ type ObjectAccessControlsListCall struct {
urlParams_ gensupport.URLParams
ifNoneMatch_ string
ctx_ context.Context
+ header_ http.Header
}
// List: Retrieves ACL entries on the specified object.
@@ -4684,8 +4976,20 @@ func (c *ObjectAccessControlsListCall) Context(ctx context.Context) *ObjectAcces
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectAccessControlsListCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectAccessControlsListCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
if c.ifNoneMatch_ != "" {
reqHeaders.Set("If-None-Match", c.ifNoneMatch_)
@@ -4790,6 +5094,7 @@ type ObjectAccessControlsPatchCall struct {
objectaccesscontrol *ObjectAccessControl
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Patch: Updates an ACL entry on the specified object. This method
@@ -4827,8 +5132,20 @@ func (c *ObjectAccessControlsPatchCall) Context(ctx context.Context) *ObjectAcce
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectAccessControlsPatchCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectAccessControlsPatchCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.objectaccesscontrol)
@@ -4946,6 +5263,7 @@ type ObjectAccessControlsUpdateCall struct {
objectaccesscontrol *ObjectAccessControl
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Update: Updates an ACL entry on the specified object.
@@ -4982,8 +5300,20 @@ func (c *ObjectAccessControlsUpdateCall) Context(ctx context.Context) *ObjectAcc
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectAccessControlsUpdateCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectAccessControlsUpdateCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.objectaccesscontrol)
@@ -5100,6 +5430,7 @@ type ObjectsComposeCall struct {
composerequest *ComposeRequest
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Compose: Concatenates a list of existing objects into a new object in
@@ -5165,8 +5496,20 @@ func (c *ObjectsComposeCall) Context(ctx context.Context) *ObjectsComposeCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectsComposeCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectsComposeCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.composerequest)
@@ -5323,6 +5666,7 @@ type ObjectsCopyCall struct {
object *Object
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Copy: Copies a source object to a destination object. Optionally
@@ -5464,8 +5808,20 @@ func (c *ObjectsCopyCall) Context(ctx context.Context) *ObjectsCopyCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectsCopyCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectsCopyCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.object)
@@ -5690,6 +6046,7 @@ type ObjectsDeleteCall struct {
object string
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Delete: Deletes an object and its metadata. Deletions are permanent
@@ -5759,8 +6116,20 @@ func (c *ObjectsDeleteCall) Context(ctx context.Context) *ObjectsDeleteCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectsDeleteCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectsDeleteCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
c.urlParams_.Set("alt", alt)
@@ -5858,6 +6227,7 @@ type ObjectsGetCall struct {
urlParams_ gensupport.URLParams
ifNoneMatch_ string
ctx_ context.Context
+ header_ http.Header
}
// Get: Retrieves an object or its metadata.
@@ -5946,8 +6316,20 @@ func (c *ObjectsGetCall) Context(ctx context.Context) *ObjectsGetCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectsGetCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectsGetCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
if c.ifNoneMatch_ != "" {
reqHeaders.Set("If-None-Match", c.ifNoneMatch_)
@@ -6113,6 +6495,7 @@ type ObjectsInsertCall struct {
mediaSize_ int64 // mediaSize, if known. Used only for calls to progressUpdater_.
progressUpdater_ googleapi.ProgressUpdater
ctx_ context.Context
+ header_ http.Header
}
// Insert: Stores a new object and metadata.
@@ -6275,8 +6658,20 @@ func (c *ObjectsInsertCall) Context(ctx context.Context) *ObjectsInsertCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectsInsertCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectsInsertCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.object)
@@ -6505,6 +6900,7 @@ type ObjectsListCall struct {
urlParams_ gensupport.URLParams
ifNoneMatch_ string
ctx_ context.Context
+ header_ http.Header
}
// List: Retrieves a list of objects matching the criteria.
@@ -6594,8 +6990,20 @@ func (c *ObjectsListCall) Context(ctx context.Context) *ObjectsListCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectsListCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectsListCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
if c.ifNoneMatch_ != "" {
reqHeaders.Set("If-None-Match", c.ifNoneMatch_)
@@ -6750,6 +7158,7 @@ type ObjectsPatchCall struct {
object2 *Object
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Patch: Updates an object's metadata. This method supports patch
@@ -6850,8 +7259,20 @@ func (c *ObjectsPatchCall) Context(ctx context.Context) *ObjectsPatchCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectsPatchCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectsPatchCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.object2)
@@ -7020,6 +7441,7 @@ type ObjectsRewriteCall struct {
object *Object
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Rewrite: Rewrites a source object to a destination object. Optionally
@@ -7186,8 +7608,20 @@ func (c *ObjectsRewriteCall) Context(ctx context.Context) *ObjectsRewriteCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectsRewriteCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectsRewriteCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.object)
@@ -7406,6 +7840,7 @@ type ObjectsUpdateCall struct {
object2 *Object
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// Update: Updates an object's metadata.
@@ -7505,8 +7940,20 @@ func (c *ObjectsUpdateCall) Context(ctx context.Context) *ObjectsUpdateCall {
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectsUpdateCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectsUpdateCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.object2)
@@ -7690,6 +8137,7 @@ type ObjectsWatchAllCall struct {
channel *Channel
urlParams_ gensupport.URLParams
ctx_ context.Context
+ header_ http.Header
}
// WatchAll: Watch for changes on all objects in a bucket.
@@ -7770,8 +8218,20 @@ func (c *ObjectsWatchAllCall) Context(ctx context.Context) *ObjectsWatchAllCall
return c
}
+// Header returns an http.Header that can be modified by the caller to
+// add HTTP headers to the request.
+func (c *ObjectsWatchAllCall) Header() http.Header {
+ if c.header_ == nil {
+ c.header_ = make(http.Header)
+ }
+ return c.header_
+}
+
func (c *ObjectsWatchAllCall) doRequest(alt string) (*http.Response, error) {
reqHeaders := make(http.Header)
+ for k, v := range c.header_ {
+ reqHeaders[k] = v
+ }
reqHeaders.Set("User-Agent", c.s.userAgent())
var body io.Reader = nil
body, err := googleapi.WithoutDataWrapper.JSONReader(c.channel)
diff --git a/vendor/google.golang.org/api/transport/dial.go b/vendor/google.golang.org/api/transport/dial.go
index c054460..9971eb8 100644
--- a/vendor/google.golang.org/api/transport/dial.go
+++ b/vendor/google.golang.org/api/transport/dial.go
@@ -30,6 +30,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/oauth"
+ gtransport "google.golang.org/api/googleapi/transport"
"google.golang.org/api/internal"
"google.golang.org/api/option"
)
@@ -49,6 +50,15 @@ func NewHTTPClient(ctx context.Context, opts ...option.ClientOption) (*http.Clie
if o.HTTPClient != nil {
return o.HTTPClient, o.Endpoint, nil
}
+ if o.APIKey != "" {
+ hc := &http.Client{
+ Transport: &gtransport.APIKey{
+ Key: o.APIKey,
+ Transport: http.DefaultTransport,
+ },
+ }
+ return hc, o.Endpoint, nil
+ }
if o.ServiceAccountJSONFilename != "" {
ts, err := serviceAcctTokenSource(ctx, o.ServiceAccountJSONFilename, o.Scopes...)
if err != nil {
diff --git a/vendor/google.golang.org/appengine/internal/api.go b/vendor/google.golang.org/appengine/internal/api.go
index 09562c4..aba0f83 100644
--- a/vendor/google.golang.org/appengine/internal/api.go
+++ b/vendor/google.golang.org/appengine/internal/api.go
@@ -576,6 +576,9 @@ var logLevelName = map[int64]string{
}
func logf(c *context, level int64, format string, args ...interface{}) {
+ if c == nil {
+ panic("not an App Engine context")
+ }
s := fmt.Sprintf(format, args...)
s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
c.addLogLine(&logpb.UserAppLogLine{
diff --git a/vendor/google.golang.org/grpc/README.md b/vendor/google.golang.org/grpc/README.md
index 110a8cf..39120c2 100644
--- a/vendor/google.golang.org/grpc/README.md
+++ b/vendor/google.golang.org/grpc/README.md
@@ -18,6 +18,22 @@ Prerequisites
This requires Go 1.5 or later.
+A note on the version used: significant performance improvements in benchmarks
+of grpc-go have been seen by upgrading the go version from 1.5 to the latest
+1.7.1.
+
+From https://golang.org/doc/install, one way to install the latest version of go is:
+```
+$ GO_VERSION=1.7.1
+$ OS=linux
+$ ARCH=amd64
+$ curl -O https://storage.googleapis.com/golang/go${GO_VERSION}.${OS}-${ARCH}.tar.gz
+$ sudo tar -C /usr/local -xzf go$GO_VERSION.$OS-$ARCH.tar.gz
+$ # Put go on the PATH, keep the usual installation dir
+$ sudo ln -s /usr/local/go/bin/go /usr/bin/go
+$ rm go$GO_VERSION.$OS-$ARCH.tar.gz
+```
+
Constraints
-----------
The grpc package should only depend on standard Go packages and a small number of exceptions. If your contribution introduces new dependencies which are NOT in the [list](http://godoc.org/google.golang.org/grpc?imports), you need a discussion with gRPC-Go authors and consultants.
@@ -30,3 +46,12 @@ Status
------
GA
+FAQ
+---
+
+#### Compiling error, undefined: grpc.SupportPackageIsVersion
+
+Please update proto package, gRPC package and rebuild the proto files:
+ - `go get -u github.com/golang/protobuf/{proto,protoc-gen-go}`
+ - `go get -u google.golang.org/grpc`
+ - `protoc --go_out=plugins=grpc:. *.proto`
diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go
index 772c817..4d8023d 100644
--- a/vendor/google.golang.org/grpc/call.go
+++ b/vendor/google.golang.org/grpc/call.go
@@ -42,6 +42,7 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@@ -49,7 +50,8 @@ import (
// On error, it returns the error and indicates whether the call should be retried.
//
// TODO(zhaoq): Check whether the received message sequence is valid.
-func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {
+// TODO ctx is used for stats collection and processing. It is the context passed from the application.
+func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {
// Try to acquire header metadata from the server if there is any.
defer func() {
if err != nil {
@@ -63,14 +65,25 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s
return
}
p := &parser{r: stream}
+ var inPayload *stats.InPayload
+ if stats.On() {
+ inPayload = &stats.InPayload{
+ Client: true,
+ }
+ }
for {
- if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32); err != nil {
+ if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inPayload); err != nil {
if err == io.EOF {
break
}
return
}
}
+ if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK {
+ // TODO in the current implementation, inTrailer may be handled before inPayload in some cases.
+ // Fix the order if necessary.
+ stats.HandleRPC(ctx, inPayload)
+ }
c.trailerMD = stream.Trailer()
return nil
}
@@ -89,15 +102,27 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
}
}
}()
- var cbuf *bytes.Buffer
+ var (
+ cbuf *bytes.Buffer
+ outPayload *stats.OutPayload
+ )
if compressor != nil {
cbuf = new(bytes.Buffer)
}
- outBuf, err := encode(codec, args, compressor, cbuf)
+ if stats.On() {
+ outPayload = &stats.OutPayload{
+ Client: true,
+ }
+ }
+ outBuf, err := encode(codec, args, compressor, cbuf, outPayload)
if err != nil {
return nil, Errorf(codes.Internal, "grpc: %v", err)
}
err = t.Write(stream, outBuf, opts)
+ if err == nil && outPayload != nil {
+ outPayload.SentTime = time.Now()
+ stats.HandleRPC(ctx, outPayload)
+ }
// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
// recvResponse to get the final status.
@@ -118,8 +143,16 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
return invoke(ctx, method, args, reply, cc, opts...)
}
-func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
+func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
c := defaultCallInfo
+ if mc, ok := cc.getMethodConfig(method); ok {
+ c.failFast = !mc.WaitForReady
+ if mc.Timeout > 0 {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
+ defer cancel()
+ }
+ }
for _, o := range opts {
if err := o.before(&c); err != nil {
return toRPCErr(err)
@@ -140,12 +173,31 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)
// TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set.
defer func() {
- if err != nil {
- c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
+ if e != nil {
+ c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true)
c.traceInfo.tr.SetError()
}
}()
}
+ if stats.On() {
+ ctx = stats.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
+ begin := &stats.Begin{
+ Client: true,
+ BeginTime: time.Now(),
+ FailFast: c.failFast,
+ }
+ stats.HandleRPC(ctx, begin)
+ }
+ defer func() {
+ if stats.On() {
+ end := &stats.End{
+ Client: true,
+ EndTime: time.Now(),
+ Error: e,
+ }
+ stats.HandleRPC(ctx, end)
+ }
+ }()
topts := &transport.Options{
Last: true,
Delay: false,
@@ -167,6 +219,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
}
+
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
}
@@ -205,7 +258,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
return toRPCErr(err)
}
- err = recvResponse(cc.dopts, t, &c, stream, reply)
+ err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)
if err != nil {
if put != nil {
put()
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 6167472..aa6b63d 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -54,6 +54,8 @@ var (
ErrClientConnClosing = errors.New("grpc: the client connection is closing")
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
+ // DEPRECATED: Please use context.DeadlineExceeded instead. This error will be
+ // removed in Q1 2017.
ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
// errNoTransportSecurity indicates that there is no transport security
@@ -93,6 +95,7 @@ type dialOptions struct {
block bool
insecure bool
timeout time.Duration
+ scChan <-chan ServiceConfig
copts transport.ConnectOptions
}
@@ -129,6 +132,13 @@ func WithBalancer(b Balancer) DialOption {
}
}
+// WithServiceConfig returns a DialOption which has a channel to read the service configuration.
+func WithServiceConfig(c <-chan ServiceConfig) DialOption {
+ return func(o *dialOptions) {
+ o.scChan = c
+ }
+}
+
// WithBackoffMaxDelay configures the dialer to use the provided maximum delay
// when backing off after failed connection attempts.
func WithBackoffMaxDelay(md time.Duration) DialOption {
@@ -199,6 +209,8 @@ func WithTimeout(d time.Duration) DialOption {
}
// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
+// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
+// Temporary() method to decide if it should try to reconnect to the network address.
func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
return func(o *dialOptions) {
o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
@@ -210,6 +222,17 @@ func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
}
}
+// FailOnNonTempDialError returns a DialOption that specified if gRPC fails on non-temporary dial errors.
+// If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
+// address and won't try to reconnect.
+// The default value of FailOnNonTempDialError is false.
+// This is an EXPERIMENTAL API.
+func FailOnNonTempDialError(f bool) DialOption {
+ return func(o *dialOptions) {
+ o.copts.FailOnNonTempDialError = f
+ }
+}
+
// WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
func WithUserAgent(s string) DialOption {
return func(o *dialOptions) {
@@ -247,6 +270,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
conns: make(map[Address]*addrConn),
}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
+ for _, opt := range opts {
+ opt(&cc.dopts)
+ }
+ if cc.dopts.timeout > 0 {
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
+ defer cancel()
+ }
+
defer func() {
select {
case <-ctx.Done():
@@ -259,10 +291,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}()
- for _, opt := range opts {
- opt(&cc.dopts)
+ if cc.dopts.scChan != nil {
+ // Wait for the initial service config.
+ select {
+ case sc, ok := <-cc.dopts.scChan:
+ if ok {
+ cc.sc = sc
+ }
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
}
-
// Set defaults.
if cc.dopts.codec == nil {
cc.dopts.codec = protoCodec{}
@@ -284,6 +323,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
waitC := make(chan error, 1)
go func() {
var addrs []Address
+ if cc.dopts.balancer == nil && cc.sc.LB != nil {
+ cc.dopts.balancer = cc.sc.LB
+ }
if cc.dopts.balancer == nil {
// Connect to target directly if balancer is nil.
addrs = append(addrs, Address{Addr: target})
@@ -319,10 +361,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
close(waitC)
}()
- var timeoutCh <-chan time.Time
- if cc.dopts.timeout > 0 {
- timeoutCh = time.After(cc.dopts.timeout)
- }
select {
case <-ctx.Done():
return nil, ctx.Err()
@@ -330,14 +368,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if err != nil {
return nil, err
}
- case <-timeoutCh:
- return nil, ErrClientConnTimeout
}
+
// If balancer is nil or balancer.Notify() is nil, ok will be false here.
// The lbWatcher goroutine will not be created.
if ok {
go cc.lbWatcher()
}
+
+ if cc.dopts.scChan != nil {
+ go cc.scWatcher()
+ }
return cc, nil
}
@@ -384,6 +425,7 @@ type ClientConn struct {
dopts dialOptions
mu sync.RWMutex
+ sc ServiceConfig
conns map[Address]*addrConn
}
@@ -422,6 +464,24 @@ func (cc *ClientConn) lbWatcher() {
}
}
+func (cc *ClientConn) scWatcher() {
+ for {
+ select {
+ case sc, ok := <-cc.dopts.scChan:
+ if !ok {
+ return
+ }
+ cc.mu.Lock()
+ // TODO: load balance policy runtime change is ignored.
+ // We may revist this decision in the future.
+ cc.sc = sc
+ cc.mu.Unlock()
+ case <-cc.ctx.Done():
+ return
+ }
+ }
+}
+
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
@@ -509,6 +569,14 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err
return nil
}
+// TODO: Avoid the locking here.
+func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) {
+ cc.mu.RLock()
+ defer cc.mu.RUnlock()
+ m, ok = cc.sc.Methods[method]
+ return
+}
+
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
var (
ac *addrConn
diff --git a/vendor/google.golang.org/grpc/codegen.sh b/vendor/google.golang.org/grpc/codegen.sh
index b009488..4cdc6ba 100755
--- a/vendor/google.golang.org/grpc/codegen.sh
+++ b/vendor/google.golang.org/grpc/codegen.sh
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/usr/bin/env bash
# This script serves as an example to demonstrate how to generate the gRPC-Go
# interface and the related messages from .proto file.
diff --git a/vendor/google.golang.org/grpc/coverage.sh b/vendor/google.golang.org/grpc/coverage.sh
index 1202353..b85f918 100755
--- a/vendor/google.golang.org/grpc/coverage.sh
+++ b/vendor/google.golang.org/grpc/coverage.sh
@@ -1,4 +1,5 @@
-#!/bin/bash
+#!/usr/bin/env bash
+
set -e
diff --git a/vendor/google.golang.org/grpc/credentials/oauth/oauth.go b/vendor/google.golang.org/grpc/credentials/oauth/oauth.go
index 8e68c4d..25393cc 100644
--- a/vendor/google.golang.org/grpc/credentials/oauth/oauth.go
+++ b/vendor/google.golang.org/grpc/credentials/oauth/oauth.go
@@ -61,7 +61,7 @@ func (ts TokenSource) GetRequestMetadata(ctx context.Context, uri ...string) (ma
}, nil
}
-// RequireTransportSecurity indicates whether the credentails requires transport security.
+// RequireTransportSecurity indicates whether the credentials requires transport security.
func (ts TokenSource) RequireTransportSecurity() bool {
return true
}
diff --git a/vendor/google.golang.org/grpc/metadata/metadata.go b/vendor/google.golang.org/grpc/metadata/metadata.go
index 3c0ca7a..65dc5af 100644
--- a/vendor/google.golang.org/grpc/metadata/metadata.go
+++ b/vendor/google.golang.org/grpc/metadata/metadata.go
@@ -141,6 +141,8 @@ func NewContext(ctx context.Context, md MD) context.Context {
}
// FromContext returns the MD in ctx if it exists.
+// The returned md should be immutable, writing to it may cause races.
+// Modification should be made to the copies of the returned md.
func FromContext(ctx context.Context) (md MD, ok bool) {
md, ok = ctx.Value(mdKey{}).(MD)
return
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
index 6b60095..2619d39 100644
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -42,11 +42,13 @@ import (
"io/ioutil"
"math"
"os"
+ "time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@@ -255,9 +257,11 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro
// encode serializes msg and prepends the message header. If msg is nil, it
// generates the message header of 0 message length.
-func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte, error) {
- var b []byte
- var length uint
+func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayload *stats.OutPayload) ([]byte, error) {
+ var (
+ b []byte
+ length uint
+ )
if msg != nil {
var err error
// TODO(zhaoq): optimize to reduce memory alloc and copying.
@@ -265,6 +269,12 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte
if err != nil {
return nil, err
}
+ if outPayload != nil {
+ outPayload.Payload = msg
+ // TODO truncate large payload.
+ outPayload.Data = b
+ outPayload.Length = len(b)
+ }
if cp != nil {
if err := cp.Do(cbuf, b); err != nil {
return nil, err
@@ -295,6 +305,10 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer) ([]byte
// Copy encoded msg to buf
copy(buf[5:], b)
+ if outPayload != nil {
+ outPayload.WireLength = len(buf)
+ }
+
return buf, nil
}
@@ -311,11 +325,14 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er
return nil
}
-func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int) error {
+func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error {
pf, d, err := p.recvMsg(maxMsgSize)
if err != nil {
return err
}
+ if inPayload != nil {
+ inPayload.WireLength = len(d)
+ }
if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {
return err
}
@@ -333,6 +350,13 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
if err := c.Unmarshal(d, m); err != nil {
return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
}
+ if inPayload != nil {
+ inPayload.RecvTime = time.Now()
+ inPayload.Payload = m
+ // TODO truncate large payload.
+ inPayload.Data = d
+ inPayload.Length = len(d)
+ }
return nil
}
@@ -448,10 +472,48 @@ func convertCode(err error) codes.Code {
return codes.Unknown
}
-// SupportPackageIsVersion3 is referenced from generated protocol buffer files
+// MethodConfig defines the configuration recommended by the service providers for a
+// particular method.
+// This is EXPERIMENTAL and subject to change.
+type MethodConfig struct {
+ // WaitForReady indicates whether RPCs sent to this method should wait until
+ // the connection is ready by default (!failfast). The value specified via the
+ // gRPC client API will override the value set here.
+ WaitForReady bool
+ // Timeout is the default timeout for RPCs sent to this method. The actual
+ // deadline used will be the minimum of the value specified here and the value
+ // set by the application via the gRPC client API. If either one is not set,
+ // then the other will be used. If neither is set, then the RPC has no deadline.
+ Timeout time.Duration
+ // MaxReqSize is the maximum allowed payload size for an individual request in a
+ // stream (client->server) in bytes. The size which is measured is the serialized,
+ // uncompressed payload in bytes. The actual value used is the minumum of the value
+ // specified here and the value set by the application via the gRPC client API. If
+ // either one is not set, then the other will be used. If neither is set, then the
+ // built-in default is used.
+ // TODO: support this.
+ MaxReqSize uint64
+ // MaxRespSize is the maximum allowed payload size for an individual response in a
+ // stream (server->client) in bytes.
+ // TODO: support this.
+ MaxRespSize uint64
+}
+
+// ServiceConfig is provided by the service provider and contains parameters for how
+// clients that connect to the service should behave.
+// This is EXPERIMENTAL and subject to change.
+type ServiceConfig struct {
+ // LB is the load balancer the service providers recommends. The balancer specified
+ // via grpc.WithBalancer will override this.
+ LB Balancer
+ // Methods contains a map for the methods in this service.
+ Methods map[string]MethodConfig
+}
+
+// SupportPackageIsVersion4 is referenced from generated protocol buffer files
// to assert that that code is compatible with this version of the grpc package.
//
// This constant may be renamed in the future if a change in the generated code
// requires a synchronised update of grpc-go and protoc-gen-go. This constant
// should not be referenced from any other code.
-const SupportPackageIsVersion3 = true
+const SupportPackageIsVersion4 = true
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index e0bb187..b52a563 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -54,6 +54,8 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/stats"
+ "google.golang.org/grpc/tap"
"google.golang.org/grpc/transport"
)
@@ -110,6 +112,7 @@ type options struct {
maxMsgSize int
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
+ inTapHandle tap.ServerInHandle
maxConcurrentStreams uint32
useHandlerImpl bool // use http.Handler-based server
}
@@ -186,6 +189,17 @@ func StreamInterceptor(i StreamServerInterceptor) ServerOption {
}
}
+// InTapHandle returns a ServerOption that sets the tap handle for all the server
+// transport to be created. Only one can be installed.
+func InTapHandle(h tap.ServerInHandle) ServerOption {
+ return func(o *options) {
+ if o.inTapHandle != nil {
+ panic("The tap handle has been set.")
+ }
+ o.inTapHandle = h
+ }
+}
+
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
@@ -329,6 +343,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
+// Serve always returns non-nil error.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("serving")
@@ -412,17 +427,22 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
if s.opts.useHandlerImpl {
s.serveUsingHandler(conn)
} else {
- s.serveNewHTTP2Transport(conn, authInfo)
+ s.serveHTTP2Transport(conn, authInfo)
}
}
-// serveNewHTTP2Transport sets up a new http/2 transport (using the
+// serveHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go) and
// serves streams on it.
// This is run in its own goroutine (it does network I/O in
// transport.NewServerTransport).
-func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
- st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo)
+func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
+ config := &transport.ServerConfig{
+ MaxStreams: s.opts.maxConcurrentStreams,
+ AuthInfo: authInfo,
+ InTapHandle: s.opts.inTapHandle,
+ }
+ st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
s.mu.Lock()
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
@@ -448,6 +468,12 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
+ }, func(ctx context.Context, method string) context.Context {
+ if !EnableTracing {
+ return ctx
+ }
+ tr := trace.New("grpc.Recv."+methodFamily(method), method)
+ return trace.NewContext(ctx, tr)
})
wg.Wait()
}
@@ -497,15 +523,17 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
// If tracing is not enabled, it returns nil.
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
- if !EnableTracing {
+ tr, ok := trace.FromContext(stream.Context())
+ if !ok {
return nil
}
+
trInfo = &traceInfo{
- tr: trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()),
+ tr: tr,
}
trInfo.firstLine.client = false
trInfo.firstLine.remoteAddr = st.RemoteAddr()
- stream.TraceContext(trInfo.tr)
+
if dl, ok := stream.Context().Deadline(); ok {
trInfo.firstLine.deadline = dl.Sub(time.Now())
}
@@ -532,11 +560,17 @@ func (s *Server) removeConn(c io.Closer) {
}
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
- var cbuf *bytes.Buffer
+ var (
+ cbuf *bytes.Buffer
+ outPayload *stats.OutPayload
+ )
if cp != nil {
cbuf = new(bytes.Buffer)
}
- p, err := encode(s.opts.codec, msg, cp, cbuf)
+ if stats.On() {
+ outPayload = &stats.OutPayload{}
+ }
+ p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
if err != nil {
// This typically indicates a fatal issue (e.g., memory
// corruption or hardware faults) the application program
@@ -547,10 +581,32 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
// the optimal option.
grpclog.Fatalf("grpc: Server failed to encode response %v", err)
}
- return t.Write(stream, p, opts)
+ err = t.Write(stream, p, opts)
+ if err == nil && outPayload != nil {
+ outPayload.SentTime = time.Now()
+ stats.HandleRPC(stream.Context(), outPayload)
+ }
+ return err
}
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
+ if stats.On() {
+ begin := &stats.Begin{
+ BeginTime: time.Now(),
+ }
+ stats.HandleRPC(stream.Context(), begin)
+ }
+ defer func() {
+ if stats.On() {
+ end := &stats.End{
+ EndTime: time.Now(),
+ }
+ if err != nil && err != io.EOF {
+ end.Error = toRPCErr(err)
+ }
+ stats.HandleRPC(stream.Context(), end)
+ }
+ }()
if trInfo != nil {
defer trInfo.tr.Finish()
trInfo.firstLine.client = false
@@ -579,14 +635,14 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err != nil {
switch err := err.(type) {
case *rpcError:
- if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
+ if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
case transport.ConnectionError:
// Nothing to do here.
case transport.StreamError:
- if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
+ if e := t.WriteStatus(stream, err.Code, err.Desc); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
default:
panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
@@ -597,20 +653,29 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
switch err := err.(type) {
case *rpcError:
- if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
+ if e := t.WriteStatus(stream, err.code, err.desc); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
+ return err
default:
- if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
+ if e := t.WriteStatus(stream, codes.Internal, err.Error()); e != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
-
+ // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
+ }
+ }
+ var inPayload *stats.InPayload
+ if stats.On() {
+ inPayload = &stats.InPayload{
+ RecvTime: time.Now(),
}
- return err
}
statusCode := codes.OK
statusDesc := ""
df := func(v interface{}) error {
+ if inPayload != nil {
+ inPayload.WireLength = len(req)
+ }
if pf == compressionMade {
var err error
req, err = s.opts.dc.Do(bytes.NewReader(req))
@@ -618,7 +683,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := t.WriteStatus(stream, codes.Internal, err.Error()); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
}
- return err
+ return Errorf(codes.Internal, err.Error())
}
}
if len(req) > s.opts.maxMsgSize {
@@ -630,6 +695,12 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := s.opts.codec.Unmarshal(req, v); err != nil {
return err
}
+ if inPayload != nil {
+ inPayload.Payload = v
+ inPayload.Data = req
+ inPayload.Length = len(req)
+ stats.HandleRPC(stream.Context(), inPayload)
+ }
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
}
@@ -650,9 +721,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
- return err
}
- return nil
+ return Errorf(statusCode, statusDesc)
}
if trInfo != nil {
trInfo.tr.LazyLog(stringer("OK"), false)
@@ -677,11 +747,32 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
}
- return t.WriteStatus(stream, statusCode, statusDesc)
+ errWrite := t.WriteStatus(stream, statusCode, statusDesc)
+ if statusCode != codes.OK {
+ return Errorf(statusCode, statusDesc)
+ }
+ return errWrite
}
}
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
+ if stats.On() {
+ begin := &stats.Begin{
+ BeginTime: time.Now(),
+ }
+ stats.HandleRPC(stream.Context(), begin)
+ }
+ defer func() {
+ if stats.On() {
+ end := &stats.End{
+ EndTime: time.Now(),
+ }
+ if err != nil && err != io.EOF {
+ end.Error = toRPCErr(err)
+ }
+ stats.HandleRPC(stream.Context(), end)
+ }
+ }()
if s.opts.cp != nil {
stream.SetSendCompress(s.opts.cp.Type())
}
@@ -744,7 +835,11 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
}
ss.mu.Unlock()
}
- return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
+ errWrite := t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
+ if ss.statusCode != codes.OK {
+ return Errorf(ss.statusCode, ss.statusDesc)
+ }
+ return errWrite
}
@@ -759,7 +854,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
trInfo.tr.SetError()
}
- if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil {
+ errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
+ if err := t.WriteStatus(stream, codes.InvalidArgument, errDesc); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
@@ -779,7 +875,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
trInfo.tr.SetError()
}
- if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil {
+ errDesc := fmt.Sprintf("unknown service %v", service)
+ if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
@@ -804,7 +901,8 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
trInfo.tr.SetError()
}
- if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {
+ errDesc := fmt.Sprintf("unknown method %v", method)
+ if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
diff --git a/vendor/google.golang.org/grpc/stats/handlers.go b/vendor/google.golang.org/grpc/stats/handlers.go
new file mode 100644
index 0000000..ce47786
--- /dev/null
+++ b/vendor/google.golang.org/grpc/stats/handlers.go
@@ -0,0 +1,146 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package stats
+
+import (
+ "net"
+ "sync/atomic"
+
+ "golang.org/x/net/context"
+ "google.golang.org/grpc/grpclog"
+)
+
+// ConnTagInfo defines the relevant information needed by connection context tagger.
+type ConnTagInfo struct {
+ // RemoteAddr is the remote address of the corresponding connection.
+ RemoteAddr net.Addr
+ // LocalAddr is the local address of the corresponding connection.
+ LocalAddr net.Addr
+ // TODO add QOS related fields.
+}
+
+// RPCTagInfo defines the relevant information needed by RPC context tagger.
+type RPCTagInfo struct {
+ // FullMethodName is the RPC method in the format of /package.service/method.
+ FullMethodName string
+}
+
+var (
+ on = new(int32)
+ rpcHandler func(context.Context, RPCStats)
+ connHandler func(context.Context, ConnStats)
+ connTagger func(context.Context, *ConnTagInfo) context.Context
+ rpcTagger func(context.Context, *RPCTagInfo) context.Context
+)
+
+// HandleRPC processes the RPC stats using the rpc handler registered by the user.
+func HandleRPC(ctx context.Context, s RPCStats) {
+ if rpcHandler == nil {
+ return
+ }
+ rpcHandler(ctx, s)
+}
+
+// RegisterRPCHandler registers the user handler function for RPC stats processing.
+// It should be called only once. The later call will overwrite the former value if it is called multiple times.
+// This handler function will be called to process the rpc stats.
+func RegisterRPCHandler(f func(context.Context, RPCStats)) {
+ rpcHandler = f
+}
+
+// HandleConn processes the stats using the call back function registered by user.
+func HandleConn(ctx context.Context, s ConnStats) {
+ if connHandler == nil {
+ return
+ }
+ connHandler(ctx, s)
+}
+
+// RegisterConnHandler registers the user handler function for conn stats.
+// It should be called only once. The later call will overwrite the former value if it is called multiple times.
+// This handler function will be called to process the conn stats.
+func RegisterConnHandler(f func(context.Context, ConnStats)) {
+ connHandler = f
+}
+
+// TagConn calls user registered connection context tagger.
+func TagConn(ctx context.Context, info *ConnTagInfo) context.Context {
+ if connTagger == nil {
+ return ctx
+ }
+ return connTagger(ctx, info)
+}
+
+// RegisterConnTagger registers the user connection context tagger function.
+// The connection context tagger can attach some information to the given context.
+// The returned context will be used for stats handling.
+// For conn stats handling, the context used in connHandler for this
+// connection will be derived from the context returned.
+// For RPC stats handling,
+// - On server side, the context used in rpcHandler for all RPCs on this
+// connection will be derived from the context returned.
+// - On client side, the context is not derived from the context returned.
+func RegisterConnTagger(t func(context.Context, *ConnTagInfo) context.Context) {
+ connTagger = t
+}
+
+// TagRPC calls the user registered RPC context tagger.
+func TagRPC(ctx context.Context, info *RPCTagInfo) context.Context {
+ if rpcTagger == nil {
+ return ctx
+ }
+ return rpcTagger(ctx, info)
+}
+
+// RegisterRPCTagger registers the user RPC context tagger function.
+// The RPC context tagger can attach some information to the given context.
+// The context used in stats rpcHandler for this RPC will be derived from the
+// context returned.
+func RegisterRPCTagger(t func(context.Context, *RPCTagInfo) context.Context) {
+ rpcTagger = t
+}
+
+// Start starts the stats collection and processing if there is a registered stats handle.
+func Start() {
+ if rpcHandler == nil && connHandler == nil {
+ grpclog.Println("rpcHandler and connHandler are both nil when starting stats. Stats is not started")
+ return
+ }
+ atomic.StoreInt32(on, 1)
+}
+
+// On indicates whether the stats collection and processing is on.
+func On() bool {
+ return atomic.CompareAndSwapInt32(on, 1, 1)
+}
diff --git a/vendor/google.golang.org/grpc/stats/stats.go b/vendor/google.golang.org/grpc/stats/stats.go
new file mode 100644
index 0000000..a82448a
--- /dev/null
+++ b/vendor/google.golang.org/grpc/stats/stats.go
@@ -0,0 +1,223 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+// Package stats is for collecting and reporting various network and RPC stats.
+// This package is for monitoring purpose only. All fields are read-only.
+// All APIs are experimental.
+package stats // import "google.golang.org/grpc/stats"
+
+import (
+ "net"
+ "time"
+)
+
+// RPCStats contains stats information about RPCs.
+type RPCStats interface {
+ isRPCStats()
+ // IsClient returns true if this RPCStats is from client side.
+ IsClient() bool
+}
+
+// Begin contains stats when an RPC begins.
+// FailFast are only valid if Client is true.
+type Begin struct {
+ // Client is true if this Begin is from client side.
+ Client bool
+ // BeginTime is the time when the RPC begins.
+ BeginTime time.Time
+ // FailFast indicates if this RPC is failfast.
+ FailFast bool
+}
+
+// IsClient indicates if this is from client side.
+func (s *Begin) IsClient() bool { return s.Client }
+
+func (s *Begin) isRPCStats() {}
+
+// InPayload contains the information for an incoming payload.
+type InPayload struct {
+ // Client is true if this InPayload is from client side.
+ Client bool
+ // Payload is the payload with original type.
+ Payload interface{}
+ // Data is the serialized message payload.
+ Data []byte
+ // Length is the length of uncompressed data.
+ Length int
+ // WireLength is the length of data on wire (compressed, signed, encrypted).
+ WireLength int
+ // RecvTime is the time when the payload is received.
+ RecvTime time.Time
+}
+
+// IsClient indicates if this is from client side.
+func (s *InPayload) IsClient() bool { return s.Client }
+
+func (s *InPayload) isRPCStats() {}
+
+// InHeader contains stats when a header is received.
+// FullMethod, addresses and Compression are only valid if Client is false.
+type InHeader struct {
+ // Client is true if this InHeader is from client side.
+ Client bool
+ // WireLength is the wire length of header.
+ WireLength int
+
+ // FullMethod is the full RPC method string, i.e., /package.service/method.
+ FullMethod string
+ // RemoteAddr is the remote address of the corresponding connection.
+ RemoteAddr net.Addr
+ // LocalAddr is the local address of the corresponding connection.
+ LocalAddr net.Addr
+ // Compression is the compression algorithm used for the RPC.
+ Compression string
+}
+
+// IsClient indicates if this is from client side.
+func (s *InHeader) IsClient() bool { return s.Client }
+
+func (s *InHeader) isRPCStats() {}
+
+// InTrailer contains stats when a trailer is received.
+type InTrailer struct {
+ // Client is true if this InTrailer is from client side.
+ Client bool
+ // WireLength is the wire length of trailer.
+ WireLength int
+}
+
+// IsClient indicates if this is from client side.
+func (s *InTrailer) IsClient() bool { return s.Client }
+
+func (s *InTrailer) isRPCStats() {}
+
+// OutPayload contains the information for an outgoing payload.
+type OutPayload struct {
+ // Client is true if this OutPayload is from client side.
+ Client bool
+ // Payload is the payload with original type.
+ Payload interface{}
+ // Data is the serialized message payload.
+ Data []byte
+ // Length is the length of uncompressed data.
+ Length int
+ // WireLength is the length of data on wire (compressed, signed, encrypted).
+ WireLength int
+ // SentTime is the time when the payload is sent.
+ SentTime time.Time
+}
+
+// IsClient indicates if this is from client side.
+func (s *OutPayload) IsClient() bool { return s.Client }
+
+func (s *OutPayload) isRPCStats() {}
+
+// OutHeader contains stats when a header is sent.
+// FullMethod, addresses and Compression are only valid if Client is true.
+type OutHeader struct {
+ // Client is true if this OutHeader is from client side.
+ Client bool
+ // WireLength is the wire length of header.
+ WireLength int
+
+ // FullMethod is the full RPC method string, i.e., /package.service/method.
+ FullMethod string
+ // RemoteAddr is the remote address of the corresponding connection.
+ RemoteAddr net.Addr
+ // LocalAddr is the local address of the corresponding connection.
+ LocalAddr net.Addr
+ // Compression is the compression algorithm used for the RPC.
+ Compression string
+}
+
+// IsClient indicates if this is from client side.
+func (s *OutHeader) IsClient() bool { return s.Client }
+
+func (s *OutHeader) isRPCStats() {}
+
+// OutTrailer contains stats when a trailer is sent.
+type OutTrailer struct {
+ // Client is true if this OutTrailer is from client side.
+ Client bool
+ // WireLength is the wire length of trailer.
+ WireLength int
+}
+
+// IsClient indicates if this is from client side.
+func (s *OutTrailer) IsClient() bool { return s.Client }
+
+func (s *OutTrailer) isRPCStats() {}
+
+// End contains stats when an RPC ends.
+type End struct {
+ // Client is true if this End is from client side.
+ Client bool
+ // EndTime is the time when the RPC ends.
+ EndTime time.Time
+ // Error is the error just happened. Its type is gRPC error.
+ Error error
+}
+
+// IsClient indicates if this is from client side.
+func (s *End) IsClient() bool { return s.Client }
+
+func (s *End) isRPCStats() {}
+
+// ConnStats contains stats information about connections.
+type ConnStats interface {
+ isConnStats()
+ // IsClient returns true if this ConnStats is from client side.
+ IsClient() bool
+}
+
+// ConnBegin contains the stats of a connection when it is established.
+type ConnBegin struct {
+ // Client is true if this ConnBegin is from client side.
+ Client bool
+}
+
+// IsClient indicates if this is from client side.
+func (s *ConnBegin) IsClient() bool { return s.Client }
+
+func (s *ConnBegin) isConnStats() {}
+
+// ConnEnd contains the stats of a connection when it ends.
+type ConnEnd struct {
+ // Client is true if this ConnEnd is from client side.
+ Client bool
+}
+
+// IsClient indicates if this is from client side.
+func (s *ConnEnd) IsClient() bool { return s.Client }
+
+func (s *ConnEnd) isConnStats() {}
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 4681054..d3a4deb 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -45,6 +45,7 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@@ -97,7 +98,7 @@ type ClientStream interface {
// NewClientStream creates a new Stream for the client side. This is called
// by generated code.
-func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
+func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
if cc.dopts.streamInt != nil {
return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
}
@@ -106,11 +107,18 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
var (
- t transport.ClientTransport
- s *transport.Stream
- put func()
+ t transport.ClientTransport
+ s *transport.Stream
+ put func()
+ cancel context.CancelFunc
)
c := defaultCallInfo
+ if mc, ok := cc.getMethodConfig(method); ok {
+ c.failFast = !mc.WaitForReady
+ if mc.Timeout > 0 {
+ ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
+ }
+ }
for _, o := range opts {
if err := o.before(&c); err != nil {
return nil, toRPCErr(err)
@@ -143,6 +151,25 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
}()
}
+ if stats.On() {
+ ctx = stats.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
+ begin := &stats.Begin{
+ Client: true,
+ BeginTime: time.Now(),
+ FailFast: c.failFast,
+ }
+ stats.HandleRPC(ctx, begin)
+ }
+ defer func() {
+ if err != nil && stats.On() {
+ // Only handle end stats if err != nil.
+ end := &stats.End{
+ Client: true,
+ Error: err,
+ }
+ stats.HandleRPC(ctx, end)
+ }
+ }()
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
}
@@ -180,12 +207,13 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
break
}
cs := &clientStream{
- opts: opts,
- c: c,
- desc: desc,
- codec: cc.dopts.codec,
- cp: cc.dopts.cp,
- dc: cc.dopts.dc,
+ opts: opts,
+ c: c,
+ desc: desc,
+ codec: cc.dopts.codec,
+ cp: cc.dopts.cp,
+ dc: cc.dopts.dc,
+ cancel: cancel,
put: put,
t: t,
@@ -194,6 +222,8 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
tracing: EnableTracing,
trInfo: trInfo,
+
+ statsCtx: ctx,
}
if cc.dopts.cp != nil {
cs.cbuf = new(bytes.Buffer)
@@ -227,16 +257,17 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
// clientStream implements a client side Stream.
type clientStream struct {
- opts []CallOption
- c callInfo
- t transport.ClientTransport
- s *transport.Stream
- p *parser
- desc *StreamDesc
- codec Codec
- cp Compressor
- cbuf *bytes.Buffer
- dc Decompressor
+ opts []CallOption
+ c callInfo
+ t transport.ClientTransport
+ s *transport.Stream
+ p *parser
+ desc *StreamDesc
+ codec Codec
+ cp Compressor
+ cbuf *bytes.Buffer
+ dc Decompressor
+ cancel context.CancelFunc
tracing bool // set to EnableTracing when the clientStream is created.
@@ -246,6 +277,11 @@ type clientStream struct {
// trInfo.tr is set when the clientStream is created (if EnableTracing is true),
// and is set to nil when the clientStream's finish method is called.
trInfo traceInfo
+
+ // statsCtx keeps the user context for stats handling.
+ // All stats collection should use the statsCtx (instead of the stream context)
+ // so that all the generated stats for a particular RPC can be associated in the processing phase.
+ statsCtx context.Context
}
func (cs *clientStream) Context() context.Context {
@@ -274,6 +310,8 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
cs.mu.Unlock()
}
+ // TODO Investigate how to signal the stats handling party.
+ // generate error stats if err != nil && err != io.EOF?
defer func() {
if err != nil {
cs.finish(err)
@@ -296,7 +334,13 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
err = toRPCErr(err)
}()
- out, err := encode(cs.codec, m, cs.cp, cs.cbuf)
+ var outPayload *stats.OutPayload
+ if stats.On() {
+ outPayload = &stats.OutPayload{
+ Client: true,
+ }
+ }
+ out, err := encode(cs.codec, m, cs.cp, cs.cbuf, outPayload)
defer func() {
if cs.cbuf != nil {
cs.cbuf.Reset()
@@ -305,11 +349,37 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if err != nil {
return Errorf(codes.Internal, "grpc: %v", err)
}
- return cs.t.Write(cs.s, out, &transport.Options{Last: false})
+ err = cs.t.Write(cs.s, out, &transport.Options{Last: false})
+ if err == nil && outPayload != nil {
+ outPayload.SentTime = time.Now()
+ stats.HandleRPC(cs.statsCtx, outPayload)
+ }
+ return err
}
func (cs *clientStream) RecvMsg(m interface{}) (err error) {
- err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
+ defer func() {
+ if err != nil && stats.On() {
+ // Only generate End if err != nil.
+ // If err == nil, it's not the last RecvMsg.
+ // The last RecvMsg gets either an RPC error or io.EOF.
+ end := &stats.End{
+ Client: true,
+ EndTime: time.Now(),
+ }
+ if err != io.EOF {
+ end.Error = toRPCErr(err)
+ }
+ stats.HandleRPC(cs.statsCtx, end)
+ }
+ }()
+ var inPayload *stats.InPayload
+ if stats.On() {
+ inPayload = &stats.InPayload{
+ Client: true,
+ }
+ }
+ err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, inPayload)
defer func() {
// err != nil indicates the termination of the stream.
if err != nil {
@@ -324,11 +394,15 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
}
cs.mu.Unlock()
}
+ if inPayload != nil {
+ stats.HandleRPC(cs.statsCtx, inPayload)
+ }
if !cs.desc.ClientStreams || cs.desc.ServerStreams {
return
}
// Special handling for client streaming rpc.
- err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
+ // This recv expects EOF or errors, so we don't collect inPayload.
+ err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32, nil)
cs.closeTransportStream(err)
if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
@@ -384,6 +458,11 @@ func (cs *clientStream) closeTransportStream(err error) {
}
func (cs *clientStream) finish(err error) {
+ defer func() {
+ if cs.cancel != nil {
+ cs.cancel()
+ }
+ }()
cs.mu.Lock()
defer cs.mu.Unlock()
for _, o := range cs.opts {
@@ -482,7 +561,11 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
ss.mu.Unlock()
}
}()
- out, err := encode(ss.codec, m, ss.cp, ss.cbuf)
+ var outPayload *stats.OutPayload
+ if stats.On() {
+ outPayload = &stats.OutPayload{}
+ }
+ out, err := encode(ss.codec, m, ss.cp, ss.cbuf, outPayload)
defer func() {
if ss.cbuf != nil {
ss.cbuf.Reset()
@@ -495,6 +578,10 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
+ if outPayload != nil {
+ outPayload.SentTime = time.Now()
+ stats.HandleRPC(ss.s.Context(), outPayload)
+ }
return nil
}
@@ -513,7 +600,11 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
ss.mu.Unlock()
}
}()
- if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize); err != nil {
+ var inPayload *stats.InPayload
+ if stats.On() {
+ inPayload = &stats.InPayload{}
+ }
+ if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize, inPayload); err != nil {
if err == io.EOF {
return err
}
@@ -522,5 +613,8 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}
return toRPCErr(err)
}
+ if inPayload != nil {
+ stats.HandleRPC(ss.s.Context(), inPayload)
+ }
return nil
}
diff --git a/vendor/google.golang.org/grpc/tap/tap.go b/vendor/google.golang.org/grpc/tap/tap.go
new file mode 100644
index 0000000..0f36647
--- /dev/null
+++ b/vendor/google.golang.org/grpc/tap/tap.go
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+// Package tap defines the function handles which are executed on the transport
+// layer of gRPC-Go and related information. Everything here is EXPERIMENTAL.
+package tap
+
+import (
+ "golang.org/x/net/context"
+)
+
+// Info defines the relevant information needed by the handles.
+type Info struct {
+ // FullMethodName is the string of grpc method (in the format of
+ // /package.service/method).
+ FullMethodName string
+ // TODO: More to be added.
+}
+
+// ServerInHandle defines the function which runs when a new stream is created
+// on the server side. Note that it is executed in the per-connection I/O goroutine(s) instead
+// of per-RPC goroutine. Therefore, users should NOT have any blocking/time-consuming
+// work in this handle. Otherwise all the RPCs would slow down.
+type ServerInHandle func(ctx context.Context, info *Info) (context.Context, error)
diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/control.go
index 4ef0830..2586cba 100644
--- a/vendor/google.golang.org/grpc/transport/control.go
+++ b/vendor/google.golang.org/grpc/transport/control.go
@@ -111,35 +111,9 @@ func newQuotaPool(q int) *quotaPool {
return qb
}
-// add adds n to the available quota and tries to send it on acquire.
-func (qb *quotaPool) add(n int) {
- qb.mu.Lock()
- defer qb.mu.Unlock()
- qb.quota += n
- if qb.quota <= 0 {
- return
- }
- select {
- case qb.c <- qb.quota:
- qb.quota = 0
- default:
- }
-}
-
-// cancel cancels the pending quota sent on acquire, if any.
-func (qb *quotaPool) cancel() {
- qb.mu.Lock()
- defer qb.mu.Unlock()
- select {
- case n := <-qb.c:
- qb.quota += n
- default:
- }
-}
-
-// reset cancels the pending quota sent on acquired, incremented by v and sends
+// add cancels the pending quota sent on acquired, incremented by v and sends
// it back on acquire.
-func (qb *quotaPool) reset(v int) {
+func (qb *quotaPool) add(v int) {
qb.mu.Lock()
defer qb.mu.Unlock()
select {
@@ -151,6 +125,10 @@ func (qb *quotaPool) reset(v int) {
if qb.quota <= 0 {
return
}
+ // After the pool has been created, this is the only place that sends on
+ // the channel. Since mu is held at this point and any quota that was sent
+ // on the channel has been retrieved, we know that this code will always
+ // place any positive quota value on the channel.
select {
case qb.c <- qb.quota:
qb.quota = 0
diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/transport/handler_server.go
index 114e349..10b6dc0 100644
--- a/vendor/google.golang.org/grpc/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/transport/handler_server.go
@@ -268,7 +268,7 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
})
}
-func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
+func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
// With this transport type there will be exactly 1 stream: this HTTP request.
var ctx context.Context
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go
index 2b0f680..605b1e5 100644
--- a/vendor/google.golang.org/grpc/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/transport/http2_client.go
@@ -51,16 +51,20 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
+ "google.golang.org/grpc/stats"
)
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
- target string // server name/addr
- userAgent string
- md interface{}
- conn net.Conn // underlying communication channel
- authInfo credentials.AuthInfo // auth info about the connection
- nextID uint32 // the next stream ID to be used
+ ctx context.Context
+ target string // server name/addr
+ userAgent string
+ md interface{}
+ conn net.Conn // underlying communication channel
+ remoteAddr net.Addr
+ localAddr net.Addr
+ authInfo credentials.AuthInfo // auth info about the connection
+ nextID uint32 // the next stream ID to be used
// writableChan synchronizes write access to the transport.
// A writer acquires the write lock by sending a value on writableChan
@@ -150,6 +154,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
scheme := "http"
conn, err := dial(ctx, opts.Dialer, addr.Addr)
if err != nil {
+ if opts.FailOnNonTempDialError {
+ return nil, connectionErrorf(isTemporary(err), err, "transport: %v", err)
+ }
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Any further errors will close the underlying connection
@@ -175,11 +182,14 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
}
var buf bytes.Buffer
t := &http2Client{
- target: addr.Addr,
- userAgent: ua,
- md: addr.Metadata,
- conn: conn,
- authInfo: authInfo,
+ ctx: ctx,
+ target: addr.Addr,
+ userAgent: ua,
+ md: addr.Metadata,
+ conn: conn,
+ remoteAddr: conn.RemoteAddr(),
+ localAddr: conn.LocalAddr(),
+ authInfo: authInfo,
// The client initiated stream id is odd starting from 1.
nextID: 1,
writableChan: make(chan int, 1),
@@ -199,6 +209,16 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
maxStreams: math.MaxInt32,
streamSendQuota: defaultWindowSize,
}
+ if stats.On() {
+ t.ctx = stats.TagConn(t.ctx, &stats.ConnTagInfo{
+ RemoteAddr: t.remoteAddr,
+ LocalAddr: t.localAddr,
+ })
+ connBegin := &stats.ConnBegin{
+ Client: true,
+ }
+ stats.HandleConn(t.ctx, connBegin)
+ }
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
// dispatches the frame to the corresponding stream entity.
@@ -270,12 +290,13 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// streams.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
pr := &peer.Peer{
- Addr: t.conn.RemoteAddr(),
+ Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
pr.AuthInfo = t.authInfo
}
+ userCtx := ctx
ctx = peer.NewContext(ctx, pr)
authData := make(map[string]string)
for _, c := range t.creds {
@@ -347,6 +368,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, ErrConnClosing
}
s := t.newStream(ctx, callHdr)
+ s.clientStatsCtx = userCtx
t.activeStreams[s.id] = s
// This stream is not counted when applySetings(...) initialize t.streamsQuota.
@@ -357,7 +379,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
t.mu.Unlock()
if reset {
- t.streamsQuota.reset(-1)
+ t.streamsQuota.add(-1)
}
// HPACK encodes various headers. Note that once WriteField(...) is
@@ -413,6 +435,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
}
first := true
+ bufLen := t.hBuf.Len()
// Sends the headers in a single batch even when they span multiple frames.
for !endHeaders {
size := t.hBuf.Len()
@@ -447,6 +470,17 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
+ if stats.On() {
+ outHeader := &stats.OutHeader{
+ Client: true,
+ WireLength: bufLen,
+ FullMethod: callHdr.Method,
+ RemoteAddr: t.remoteAddr,
+ LocalAddr: t.localAddr,
+ Compression: callHdr.SendCompress,
+ }
+ stats.HandleRPC(s.clientStatsCtx, outHeader)
+ }
t.writableChan <- 0
return s, nil
}
@@ -525,6 +559,12 @@ func (t *http2Client) Close() (err error) {
s.mu.Unlock()
s.write(recvMsg{err: ErrConnClosing})
}
+ if stats.On() {
+ connEnd := &stats.ConnEnd{
+ Client: true,
+ }
+ stats.HandleConn(t.ctx, connEnd)
+ }
return
}
@@ -582,19 +622,14 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
var p []byte
if r.Len() > 0 {
size := http2MaxFrameLen
- s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
- t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
- if _, ok := err.(StreamError); ok || err == io.EOF {
- t.sendQuotaPool.cancel()
- }
return err
}
if sq < size {
@@ -874,6 +909,24 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
endStream := frame.StreamEnded()
+ var isHeader bool
+ defer func() {
+ if stats.On() {
+ if isHeader {
+ inHeader := &stats.InHeader{
+ Client: true,
+ WireLength: int(frame.Header().Length),
+ }
+ stats.HandleRPC(s.clientStatsCtx, inHeader)
+ } else {
+ inTrailer := &stats.InTrailer{
+ Client: true,
+ WireLength: int(frame.Header().Length),
+ }
+ stats.HandleRPC(s.clientStatsCtx, inTrailer)
+ }
+ }
+ }()
s.mu.Lock()
if !endStream {
@@ -885,6 +938,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
close(s.headerChan)
s.headerDone = true
+ isHeader = true
}
if !endStream || s.state == streamDone {
s.mu.Unlock()
@@ -994,13 +1048,13 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
t.maxStreams = int(s.Val)
t.mu.Unlock()
if reset {
- t.streamsQuota.reset(int(s.Val) - ms)
+ t.streamsQuota.add(int(s.Val) - ms)
}
case http2.SettingInitialWindowSize:
t.mu.Lock()
for _, stream := range t.activeStreams {
// Adjust the sending quota for each stream.
- stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
+ stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota))
}
t.streamSendQuota = s.Val
t.mu.Unlock()
diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go
index a62fb7c..316188e 100644
--- a/vendor/google.golang.org/grpc/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/transport/http2_server.go
@@ -50,6 +50,8 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
+ "google.golang.org/grpc/stats"
+ "google.golang.org/grpc/tap"
)
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
@@ -58,9 +60,13 @@ var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHe
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
+ ctx context.Context
conn net.Conn
+ remoteAddr net.Addr
+ localAddr net.Addr
maxStreamID uint32 // max stream ID ever seen
authInfo credentials.AuthInfo // auth info about the connection
+ inTapHandle tap.ServerInHandle
// writableChan synchronizes write access to the transport.
// A writer acquires the write lock by receiving a value on writableChan
// and releases it by sending on writableChan.
@@ -91,12 +97,13 @@ type http2Server struct {
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
-func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) {
+func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
framer := newFramer(conn)
// Send initial settings as connection preface to client.
var settings []http2.Setting
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
// permitted in the HTTP2 spec.
+ maxStreams := config.MaxStreams
if maxStreams == 0 {
maxStreams = math.MaxUint32
} else {
@@ -121,12 +128,16 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
}
var buf bytes.Buffer
t := &http2Server{
+ ctx: context.Background(),
conn: conn,
- authInfo: authInfo,
+ remoteAddr: conn.RemoteAddr(),
+ localAddr: conn.LocalAddr(),
+ authInfo: config.AuthInfo,
framer: framer,
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
maxStreams: maxStreams,
+ inTapHandle: config.InTapHandle,
controlBuf: newRecvBuffer(),
fc: &inFlow{limit: initialConnWindowSize},
sendQuotaPool: newQuotaPool(defaultWindowSize),
@@ -136,13 +147,21 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
activeStreams: make(map[uint32]*Stream),
streamSendQuota: defaultWindowSize,
}
+ if stats.On() {
+ t.ctx = stats.TagConn(t.ctx, &stats.ConnTagInfo{
+ RemoteAddr: t.remoteAddr,
+ LocalAddr: t.localAddr,
+ })
+ connBegin := &stats.ConnBegin{}
+ stats.HandleConn(t.ctx, connBegin)
+ }
go t.controller()
t.writableChan <- 0
return t, nil
}
// operateHeader takes action on the decoded headers.
-func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
+func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
buf := newRecvBuffer()
s := &Stream{
id: frame.Header().StreamID,
@@ -168,12 +187,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
s.recvCompress = state.encoding
if state.timeoutSet {
- s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
+ s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
} else {
- s.ctx, s.cancel = context.WithCancel(context.TODO())
+ s.ctx, s.cancel = context.WithCancel(t.ctx)
}
pr := &peer.Peer{
- Addr: t.conn.RemoteAddr(),
+ Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
@@ -195,6 +214,18 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
s.recvCompress = state.encoding
s.method = state.method
+ if t.inTapHandle != nil {
+ var err error
+ info := &tap.Info{
+ FullMethodName: state.method,
+ }
+ s.ctx, err = t.inTapHandle(s.ctx, info)
+ if err != nil {
+ // TODO: Log the real error.
+ t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
+ return
+ }
+ }
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
@@ -218,13 +249,26 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
}
+ s.ctx = traceCtx(s.ctx, s.method)
+ if stats.On() {
+ s.ctx = stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
+ inHeader := &stats.InHeader{
+ FullMethod: s.method,
+ RemoteAddr: t.remoteAddr,
+ LocalAddr: t.localAddr,
+ Compression: s.recvCompress,
+ WireLength: int(frame.Header().Length),
+ }
+ stats.HandleRPC(s.ctx, inHeader)
+ }
handle(s)
return
}
// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
-func (t *http2Server) HandleStreams(handle func(*Stream)) {
+// traceCtx attaches trace to ctx and returns the new context.
+func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
// Check the validity of client preface.
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(t.conn, preface); err != nil {
@@ -279,7 +323,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
- if t.operateHeaders(frame, handle) {
+ if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
break
}
@@ -492,9 +536,16 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
}
}
+ bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, false); err != nil {
return err
}
+ if stats.On() {
+ outHeader := &stats.OutHeader{
+ WireLength: bufLen,
+ }
+ stats.HandleRPC(s.Context(), outHeader)
+ }
t.writableChan <- 0
return nil
}
@@ -547,10 +598,17 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
}
}
+ bufLen := t.hBuf.Len()
if err := t.writeHeaders(s, t.hBuf, true); err != nil {
t.Close()
return err
}
+ if stats.On() {
+ outTrailer := &stats.OutTrailer{
+ WireLength: bufLen,
+ }
+ stats.HandleRPC(s.Context(), outTrailer)
+ }
t.closeStream(s)
t.writableChan <- 0
return nil
@@ -579,19 +637,14 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
return nil
}
size := http2MaxFrameLen
- s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
- t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
- if _, ok := err.(StreamError); ok {
- t.sendQuotaPool.cancel()
- }
return err
}
if sq < size {
@@ -659,7 +712,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
t.mu.Lock()
defer t.mu.Unlock()
for _, stream := range t.activeStreams {
- stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
+ stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota))
}
t.streamSendQuota = s.Val
}
@@ -736,6 +789,10 @@ func (t *http2Server) Close() (err error) {
for _, s := range streams {
s.cancel()
}
+ if stats.On() {
+ connEnd := &stats.ConnEnd{}
+ stats.HandleConn(t.ctx, connEnd)
+ }
return
}
@@ -767,7 +824,7 @@ func (t *http2Server) closeStream(s *Stream) {
}
func (t *http2Server) RemoteAddr() net.Addr {
- return t.conn.RemoteAddr()
+ return t.remoteAddr
}
func (t *http2Server) Drain() {
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go
index 413f749..4726bb2 100644
--- a/vendor/google.golang.org/grpc/transport/transport.go
+++ b/vendor/google.golang.org/grpc/transport/transport.go
@@ -45,10 +45,10 @@ import (
"sync"
"golang.org/x/net/context"
- "golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/tap"
)
// recvMsg represents the received msg from the transport. All transport
@@ -167,6 +167,11 @@ type Stream struct {
id uint32
// nil for client side Stream.
st ServerTransport
+ // clientStatsCtx keeps the user context for stats handling.
+ // It's only valid on client side. Server side stats context is same as s.ctx.
+ // All client side stats collection should use the clientStatsCtx (instead of the stream context)
+ // so that all the generated stats for a particular RPC can be associated in the processing phase.
+ clientStatsCtx context.Context
// ctx is the associated context of the stream.
ctx context.Context
// cancel is always nil for client side Stream.
@@ -266,11 +271,6 @@ func (s *Stream) Context() context.Context {
return s.ctx
}
-// TraceContext recreates the context of s with a trace.Trace.
-func (s *Stream) TraceContext(tr trace.Trace) {
- s.ctx = trace.NewContext(s.ctx, tr)
-}
-
// Method returns the method for the stream.
func (s *Stream) Method() string {
return s.method
@@ -355,10 +355,17 @@ const (
draining
)
+// ServerConfig consists of all the configurations to establish a server transport.
+type ServerConfig struct {
+ MaxStreams uint32
+ AuthInfo credentials.AuthInfo
+ InTapHandle tap.ServerInHandle
+}
+
// NewServerTransport creates a ServerTransport with conn or non-nil error
// if it fails.
-func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (ServerTransport, error) {
- return newHTTP2Server(conn, maxStreams, authInfo)
+func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
+ return newHTTP2Server(conn, config)
}
// ConnectOptions covers all relevant options for communicating with the server.
@@ -367,6 +374,8 @@ type ConnectOptions struct {
UserAgent string
// Dialer specifies how to dial a network address.
Dialer func(context.Context, string) (net.Conn, error)
+ // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
+ FailOnNonTempDialError bool
// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
@@ -466,7 +475,7 @@ type ClientTransport interface {
// Write methods for a given Stream will be called serially.
type ServerTransport interface {
// HandleStreams receives incoming streams using the given handler.
- HandleStreams(func(*Stream))
+ HandleStreams(func(*Stream), func(context.Context, string) context.Context)
// WriteHeader sends the header metadata for the given stream.
// WriteHeader may not be called on all streams.