aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org
diff options
context:
space:
mode:
authorNiall Sheridan <nsheridan@gmail.com>2016-08-27 01:32:30 +0100
committerNiall Sheridan <nsheridan@gmail.com>2016-08-27 01:32:30 +0100
commit921818bca208f0c70e85ec670074cb3905cbbc82 (patch)
tree4aa67ad2bb2083bd486db3f99680d6d08a2c36b3 /vendor/google.golang.org
parent7f1c9358805302344a89c1fed4eab1342931b061 (diff)
Update dependencies
Diffstat (limited to 'vendor/google.golang.org')
-rw-r--r--vendor/google.golang.org/api/gensupport/resumable.go5
-rw-r--r--vendor/google.golang.org/api/gensupport/send.go55
-rw-r--r--vendor/google.golang.org/api/internal/pool.go59
-rw-r--r--vendor/google.golang.org/api/oauth2/v2/oauth2-gen.go20
-rw-r--r--vendor/google.golang.org/api/option/option.go14
-rw-r--r--vendor/google.golang.org/api/storage/v1/storage-api.json42
-rw-r--r--vendor/google.golang.org/api/storage/v1/storage-gen.go265
-rw-r--r--vendor/google.golang.org/appengine/appengine.go36
-rw-r--r--vendor/google.golang.org/appengine/appengine_vm.go36
-rw-r--r--vendor/google.golang.org/appengine/internal/internal.go34
-rw-r--r--vendor/google.golang.org/appengine/internal/main.go15
-rw-r--r--vendor/google.golang.org/appengine/internal/main_vm.go44
-rw-r--r--vendor/google.golang.org/cloud/CONTRIBUTORS1
-rw-r--r--vendor/google.golang.org/cloud/README.md21
-rw-r--r--vendor/google.golang.org/cloud/compute/metadata/metadata.go3
-rw-r--r--vendor/google.golang.org/cloud/option.go7
-rw-r--r--vendor/google.golang.org/cloud/storage/storage.go251
-rw-r--r--vendor/google.golang.org/grpc/README.md2
-rw-r--r--vendor/google.golang.org/grpc/call.go29
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go359
-rw-r--r--vendor/google.golang.org/grpc/credentials/credentials.go60
-rw-r--r--vendor/google.golang.org/grpc/credentials/credentials_util_go17.go76
-rw-r--r--vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go74
-rw-r--r--vendor/google.golang.org/grpc/metadata/metadata.go14
-rw-r--r--vendor/google.golang.org/grpc/rpc_util.go18
-rw-r--r--vendor/google.golang.org/grpc/server.go100
-rw-r--r--vendor/google.golang.org/grpc/stream.go115
-rw-r--r--vendor/google.golang.org/grpc/transport/control.go5
-rw-r--r--vendor/google.golang.org/grpc/transport/go16.go46
-rw-r--r--vendor/google.golang.org/grpc/transport/go17.go46
-rw-r--r--vendor/google.golang.org/grpc/transport/handler_server.go10
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_client.go237
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_server.go76
-rw-r--r--vendor/google.golang.org/grpc/transport/http_util.go85
-rw-r--r--vendor/google.golang.org/grpc/transport/pre_go16.go51
-rw-r--r--vendor/google.golang.org/grpc/transport/transport.go93
36 files changed, 1767 insertions, 637 deletions
diff --git a/vendor/google.golang.org/api/gensupport/resumable.go b/vendor/google.golang.org/api/gensupport/resumable.go
index ad16943..695e365 100644
--- a/vendor/google.golang.org/api/gensupport/resumable.go
+++ b/vendor/google.golang.org/api/gensupport/resumable.go
@@ -12,7 +12,6 @@ import (
"time"
"golang.org/x/net/context"
- "golang.org/x/net/context/ctxhttp"
)
const (
@@ -80,7 +79,7 @@ func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader,
req.Header.Set("Content-Range", contentRange)
req.Header.Set("Content-Type", rx.MediaType)
req.Header.Set("User-Agent", rx.UserAgent)
- return ctxhttp.Do(ctx, rx.Client, req)
+ return SendRequest(ctx, rx.Client, req)
}
@@ -135,6 +134,8 @@ func contextDone(ctx context.Context) bool {
// It retries using the provided back off strategy until cancelled or the
// strategy indicates to stop retrying.
// It is called from the auto-generated API code and is not visible to the user.
+// Before sending an HTTP request, Upload calls any registered hook functions,
+// and calls the returned functions after the request returns (see send.go).
// rx is private to the auto-generated API code.
// Exactly one of resp or err will be nil. If resp is non-nil, the caller must call resp.Body.Close.
func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err error) {
diff --git a/vendor/google.golang.org/api/gensupport/send.go b/vendor/google.golang.org/api/gensupport/send.go
new file mode 100644
index 0000000..3d22f63
--- /dev/null
+++ b/vendor/google.golang.org/api/gensupport/send.go
@@ -0,0 +1,55 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package gensupport
+
+import (
+ "net/http"
+
+ "golang.org/x/net/context"
+ "golang.org/x/net/context/ctxhttp"
+)
+
+// Hook is the type of a function that is called once before each HTTP request
+// that is sent by a generated API. It returns a function that is called after
+// the request returns.
+// Hooks are not called if the context is nil.
+type Hook func(ctx context.Context, req *http.Request) func(resp *http.Response)
+
+var hooks []Hook
+
+// RegisterHook registers a Hook to be called before each HTTP request by a
+// generated API. Hooks are called in the order they are registered. Each
+// hook can return a function; if it is non-nil, it is called after the HTTP
+// request returns. These functions are called in the reverse order.
+// RegisterHook should not be called concurrently with itself or SendRequest.
+func RegisterHook(h Hook) {
+ hooks = append(hooks, h)
+}
+
+// SendRequest sends a single HTTP request using the given client.
+// If ctx is non-nil, it calls all hooks, then sends the request with
+// ctxhttp.Do, then calls any functions returned by the hooks in reverse order.
+func SendRequest(ctx context.Context, client *http.Client, req *http.Request) (*http.Response, error) {
+ if ctx == nil {
+ return client.Do(req)
+ }
+ // Call hooks in order of registration, store returned funcs.
+ post := make([]func(resp *http.Response), len(hooks))
+ for i, h := range hooks {
+ fn := h(ctx, req)
+ post[i] = fn
+ }
+
+ // Send request.
+ resp, err := ctxhttp.Do(ctx, client, req)
+
+ // Call returned funcs in reverse order.
+ for i := len(post) - 1; i >= 0; i-- {
+ if fn := post[i]; fn != nil {
+ fn(resp)
+ }
+ }
+ return resp, err
+}
diff --git a/vendor/google.golang.org/api/internal/pool.go b/vendor/google.golang.org/api/internal/pool.go
new file mode 100644
index 0000000..4150feb
--- /dev/null
+++ b/vendor/google.golang.org/api/internal/pool.go
@@ -0,0 +1,59 @@
+// Copyright 2016 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "errors"
+ "google.golang.org/grpc/naming"
+)
+
+// PoolResolver provides a fixed list of addresses to load balance between
+// and does not provide further updates.
+type PoolResolver struct {
+ poolSize int
+ dialOpt *DialSettings
+ ch chan []*naming.Update
+}
+
+// NewPoolResolver returns a PoolResolver
+// This is an EXPERIMENTAL API and may be changed or removed in the future.
+func NewPoolResolver(size int, o *DialSettings) *PoolResolver {
+ return &PoolResolver{poolSize: size, dialOpt: o}
+}
+
+// Resolve returns a Watcher for the endpoint defined by the DialSettings
+// provided to NewPoolResolver.
+func (r *PoolResolver) Resolve(target string) (naming.Watcher, error) {
+ if r.dialOpt.Endpoint == "" {
+ return nil, errors.New("No endpoint configured")
+ }
+ addrs := make([]*naming.Update, 0, r.poolSize)
+ for i := 0; i < r.poolSize; i++ {
+ addrs = append(addrs, &naming.Update{Op: naming.Add, Addr: r.dialOpt.Endpoint, Metadata: i})
+ }
+ r.ch = make(chan []*naming.Update, 1)
+ r.ch <- addrs
+ return r, nil
+}
+
+// Next returns a static list of updates on the first call,
+// and blocks indefinitely until Close is called on subsequent calls.
+func (r *PoolResolver) Next() ([]*naming.Update, error) {
+ return <-r.ch, nil
+}
+
+func (r *PoolResolver) Close() {
+ close(r.ch)
+}
diff --git a/vendor/google.golang.org/api/oauth2/v2/oauth2-gen.go b/vendor/google.golang.org/api/oauth2/v2/oauth2-gen.go
index 268592b..dd23316 100644
--- a/vendor/google.golang.org/api/oauth2/v2/oauth2-gen.go
+++ b/vendor/google.golang.org/api/oauth2/v2/oauth2-gen.go
@@ -332,10 +332,7 @@ func (c *GetCertForOpenIdConnectCall) doRequest(alt string) (*http.Response, err
req, _ := http.NewRequest("GET", urls, body)
req.Header = reqHeaders
googleapi.SetOpaque(req.URL)
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "oauth2.getCertForOpenIdConnect" call.
@@ -444,10 +441,7 @@ func (c *TokeninfoCall) doRequest(alt string) (*http.Response, error) {
req, _ := http.NewRequest("POST", urls, body)
req.Header = reqHeaders
googleapi.SetOpaque(req.URL)
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "oauth2.tokeninfo" call.
@@ -566,10 +560,7 @@ func (c *UserinfoGetCall) doRequest(alt string) (*http.Response, error) {
req, _ := http.NewRequest("GET", urls, body)
req.Header = reqHeaders
googleapi.SetOpaque(req.URL)
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "oauth2.userinfo.get" call.
@@ -680,10 +671,7 @@ func (c *UserinfoV2MeGetCall) doRequest(alt string) (*http.Response, error) {
req, _ := http.NewRequest("GET", urls, body)
req.Header = reqHeaders
googleapi.SetOpaque(req.URL)
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "oauth2.userinfo.v2.me.get" call.
diff --git a/vendor/google.golang.org/api/option/option.go b/vendor/google.golang.org/api/option/option.go
index 0be53c1..556a6df 100644
--- a/vendor/google.golang.org/api/option/option.go
+++ b/vendor/google.golang.org/api/option/option.go
@@ -101,3 +101,17 @@ type withGRPCDialOption struct{ opt grpc.DialOption }
func (w withGRPCDialOption) Apply(o *internal.DialSettings) {
o.GRPCDialOpts = append(o.GRPCDialOpts, w.opt)
}
+
+// WithGRPCConnectionPool returns a ClientOption that creates a pool of gRPC
+// connections that requests will be balanced between.
+// This is an EXPERIMENTAL API and may be changed or removed in the future.
+func WithGRPCConnectionPool(size int) ClientOption {
+ return withGRPCConnectionPool(size)
+}
+
+type withGRPCConnectionPool int
+
+func (w withGRPCConnectionPool) Apply(o *internal.DialSettings) {
+ balancer := grpc.RoundRobin(internal.NewPoolResolver(int(w), o))
+ o.GRPCDialOpts = append(o.GRPCDialOpts, grpc.WithBalancer(balancer))
+}
diff --git a/vendor/google.golang.org/api/storage/v1/storage-api.json b/vendor/google.golang.org/api/storage/v1/storage-api.json
index 52811fd..8d9118b 100644
--- a/vendor/google.golang.org/api/storage/v1/storage-api.json
+++ b/vendor/google.golang.org/api/storage/v1/storage-api.json
@@ -1,11 +1,11 @@
{
"kind": "discovery#restDescription",
- "etag": "\"C5oy1hgQsABtYOYIOXWcR3BgYqU/cPnwg2U9hg8m8Y6wHWcvqIF8qSM\"",
+ "etag": "\"C5oy1hgQsABtYOYIOXWcR3BgYqU/gtcWNCypj7VTDxrk1rvvuniNHZo\"",
"discoveryVersion": "v1",
"id": "storage:v1",
"name": "storage",
"version": "v1",
- "revision": "20160609",
+ "revision": "20160802",
"title": "Cloud Storage JSON API",
"description": "Stores and retrieves potentially large, immutable data objects.",
"ownerDomain": "google.com",
@@ -150,6 +150,15 @@
"$ref": "ObjectAccessControl"
}
},
+ "encryption": {
+ "type": "object",
+ "description": "Encryption configuration used by default for newly inserted objects, when no encryption config is specified.",
+ "properties": {
+ "default_kms_key_name": {
+ "type": "string"
+ }
+ }
+ },
"etag": {
"type": "string",
"description": "HTTP 1.1 Entity tag for the bucket."
@@ -361,13 +370,13 @@
},
"team": {
"type": "string",
- "description": "The team. Can be owners, editors, or viewers."
+ "description": "The team."
}
}
},
"role": {
"type": "string",
- "description": "The access permission for the entity. Can be READER, WRITER, or OWNER.",
+ "description": "The access permission for the entity.",
"annotations": {
"required": [
"storage.bucketAccessControls.insert"
@@ -553,7 +562,7 @@
},
"cacheControl": {
"type": "string",
- "description": "Cache-Control directive for the object data."
+ "description": "Cache-Control directive for the object data. If omitted, and the object is accessible to all anonymous users, the default will be public, max-age=3600."
},
"componentCount": {
"type": "integer",
@@ -612,6 +621,10 @@
"description": "The kind of item this is. For objects, this is always storage#object.",
"default": "storage#object"
},
+ "kmsKeyName": {
+ "type": "string",
+ "description": "Cloud KMS Key used to encrypt this object, if the object is encrypted by such a key."
+ },
"md5Hash": {
"type": "string",
"description": "MD5 hash of the data; encoded using base64. For more information about using the MD5 hash, see Hashes and ETags: Best Practices."
@@ -738,13 +751,13 @@
},
"team": {
"type": "string",
- "description": "The team. Can be owners, editors, or viewers."
+ "description": "The team."
}
}
},
"role": {
"type": "string",
- "description": "The access permission for the entity. Can be READER or OWNER."
+ "description": "The access permission for the entity."
},
"selfLink": {
"type": "string",
@@ -1958,6 +1971,11 @@
"description": "Makes the operation conditional on whether the object's current metageneration matches the given value.",
"format": "int64",
"location": "query"
+ },
+ "kmsKeyName": {
+ "type": "string",
+ "description": "Resource name of the Cloud KMS key, of the form projects/my-project/locations/global/keyRings/my-kr/cryptoKeys/my-key, that will be used to encrypt the object. Overrides the object metadata's kms_key_name value, if any.",
+ "location": "query"
}
},
"parameterOrder": [
@@ -2296,6 +2314,11 @@
"format": "int64",
"location": "query"
},
+ "kmsKeyName": {
+ "type": "string",
+ "description": "Resource name of the Cloud KMS key, of the form projects/my-project/locations/global/keyRings/my-kr/cryptoKeys/my-key, that will be used to encrypt the object. Overrides the object metadata's kms_key_name value, if any.",
+ "location": "query"
+ },
"name": {
"type": "string",
"description": "Name of the object. Required when the object metadata is not otherwise provided. Overrides the object metadata's name value, if any. For information about how to URL encode object names to be path safe, see Encoding URI Path Parts.",
@@ -2547,6 +2570,11 @@
"required": true,
"location": "path"
},
+ "destinationKmsKeyName": {
+ "type": "string",
+ "description": "Resource name of the Cloud KMS key, of the form projects/my-project/locations/global/keyRings/my-kr/cryptoKeys/my-key, that will be used to encrypt the object. Overrides the object metadata's kms_key_name value, if any.",
+ "location": "query"
+ },
"destinationObject": {
"type": "string",
"description": "Name of the new object. Required when the object metadata is not otherwise provided. Overrides the object metadata's name value, if any. For information about how to URL encode object names to be path safe, see Encoding URI Path Parts.",
diff --git a/vendor/google.golang.org/api/storage/v1/storage-gen.go b/vendor/google.golang.org/api/storage/v1/storage-gen.go
index f7e422c..346751b 100644
--- a/vendor/google.golang.org/api/storage/v1/storage-gen.go
+++ b/vendor/google.golang.org/api/storage/v1/storage-gen.go
@@ -169,6 +169,10 @@ type Bucket struct {
// when no ACL is provided.
DefaultObjectAcl []*ObjectAccessControl `json:"defaultObjectAcl,omitempty"`
+ // Encryption: Encryption configuration used by default for newly
+ // inserted objects, when no encryption config is specified.
+ Encryption *BucketEncryption `json:"encryption,omitempty"`
+
// Etag: HTTP 1.1 Entity tag for the bucket.
Etag string `json:"etag,omitempty"`
@@ -284,6 +288,26 @@ func (s *BucketCors) MarshalJSON() ([]byte, error) {
return gensupport.MarshalJSON(raw, s.ForceSendFields)
}
+// BucketEncryption: Encryption configuration used by default for newly
+// inserted objects, when no encryption config is specified.
+type BucketEncryption struct {
+ DefaultKmsKeyName string `json:"default_kms_key_name,omitempty"`
+
+ // ForceSendFields is a list of field names (e.g. "DefaultKmsKeyName")
+ // to unconditionally include in API requests. By default, fields with
+ // empty values are omitted from API requests. However, any non-pointer,
+ // non-interface field appearing in ForceSendFields will be sent to the
+ // server regardless of whether the field is empty or not. This may be
+ // used to include empty fields in Patch requests.
+ ForceSendFields []string `json:"-"`
+}
+
+func (s *BucketEncryption) MarshalJSON() ([]byte, error) {
+ type noMethod BucketEncryption
+ raw := noMethod(*s)
+ return gensupport.MarshalJSON(raw, s.ForceSendFields)
+}
+
// BucketLifecycle: The bucket's lifecycle configuration. See lifecycle
// management for more information.
type BucketLifecycle struct {
@@ -531,8 +555,7 @@ type BucketAccessControl struct {
// ProjectTeam: The project team associated with the entity, if any.
ProjectTeam *BucketAccessControlProjectTeam `json:"projectTeam,omitempty"`
- // Role: The access permission for the entity. Can be READER, WRITER, or
- // OWNER.
+ // Role: The access permission for the entity.
Role string `json:"role,omitempty"`
// SelfLink: The link to this access-control entry.
@@ -563,7 +586,7 @@ type BucketAccessControlProjectTeam struct {
// ProjectNumber: The project number.
ProjectNumber string `json:"projectNumber,omitempty"`
- // Team: The team. Can be owners, editors, or viewers.
+ // Team: The team.
Team string `json:"team,omitempty"`
// ForceSendFields is a list of field names (e.g. "ProjectNumber") to
@@ -786,7 +809,9 @@ type Object struct {
// Bucket: The name of the bucket containing this object.
Bucket string `json:"bucket,omitempty"`
- // CacheControl: Cache-Control directive for the object data.
+ // CacheControl: Cache-Control directive for the object data. If
+ // omitted, and the object is accessible to all anonymous users, the
+ // default will be public, max-age=3600.
CacheControl string `json:"cacheControl,omitempty"`
// ComponentCount: Number of underlying components that make up this
@@ -831,6 +856,10 @@ type Object struct {
// storage#object.
Kind string `json:"kind,omitempty"`
+ // KmsKeyName: Cloud KMS Key used to encrypt this object, if the object
+ // is encrypted by such a key.
+ KmsKeyName string `json:"kmsKeyName,omitempty"`
+
// Md5Hash: MD5 hash of the data; encoded using base64. For more
// information about using the MD5 hash, see Hashes and ETags: Best
// Practices.
@@ -994,7 +1023,7 @@ type ObjectAccessControl struct {
// ProjectTeam: The project team associated with the entity, if any.
ProjectTeam *ObjectAccessControlProjectTeam `json:"projectTeam,omitempty"`
- // Role: The access permission for the entity. Can be READER or OWNER.
+ // Role: The access permission for the entity.
Role string `json:"role,omitempty"`
// SelfLink: The link to this access-control entry.
@@ -1025,7 +1054,7 @@ type ObjectAccessControlProjectTeam struct {
// ProjectNumber: The project number.
ProjectNumber string `json:"projectNumber,omitempty"`
- // Team: The team. Can be owners, editors, or viewers.
+ // Team: The team.
Team string `json:"team,omitempty"`
// ForceSendFields is a list of field names (e.g. "ProjectNumber") to
@@ -1203,10 +1232,7 @@ func (c *BucketAccessControlsDeleteCall) doRequest(alt string) (*http.Response,
"bucket": c.bucket,
"entity": c.entity,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.bucketAccessControls.delete" call.
@@ -1314,10 +1340,7 @@ func (c *BucketAccessControlsGetCall) doRequest(alt string) (*http.Response, err
"bucket": c.bucket,
"entity": c.entity,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.bucketAccessControls.get" call.
@@ -1442,10 +1465,7 @@ func (c *BucketAccessControlsInsertCall) doRequest(alt string) (*http.Response,
googleapi.Expand(req.URL, map[string]string{
"bucket": c.bucket,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.bucketAccessControls.insert" call.
@@ -1573,10 +1593,7 @@ func (c *BucketAccessControlsListCall) doRequest(alt string) (*http.Response, er
googleapi.Expand(req.URL, map[string]string{
"bucket": c.bucket,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.bucketAccessControls.list" call.
@@ -1698,10 +1715,7 @@ func (c *BucketAccessControlsPatchCall) doRequest(alt string) (*http.Response, e
"bucket": c.bucket,
"entity": c.entity,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.bucketAccessControls.patch" call.
@@ -1832,10 +1846,7 @@ func (c *BucketAccessControlsUpdateCall) doRequest(alt string) (*http.Response,
"bucket": c.bucket,
"entity": c.entity,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.bucketAccessControls.update" call.
@@ -1972,10 +1983,7 @@ func (c *BucketsDeleteCall) doRequest(alt string) (*http.Response, error) {
googleapi.Expand(req.URL, map[string]string{
"bucket": c.bucket,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.buckets.delete" call.
@@ -2114,10 +2122,7 @@ func (c *BucketsGetCall) doRequest(alt string) (*http.Response, error) {
googleapi.Expand(req.URL, map[string]string{
"bucket": c.bucket,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.buckets.get" call.
@@ -2312,10 +2317,7 @@ func (c *BucketsInsertCall) doRequest(alt string) (*http.Response, error) {
req, _ := http.NewRequest("POST", urls, body)
req.Header = reqHeaders
googleapi.SetOpaque(req.URL)
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.buckets.insert" call.
@@ -2527,10 +2529,7 @@ func (c *BucketsListCall) doRequest(alt string) (*http.Response, error) {
req, _ := http.NewRequest("GET", urls, body)
req.Header = reqHeaders
googleapi.SetOpaque(req.URL)
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.buckets.list" call.
@@ -2770,10 +2769,7 @@ func (c *BucketsPatchCall) doRequest(alt string) (*http.Response, error) {
googleapi.Expand(req.URL, map[string]string{
"bucket": c.bucket,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.buckets.patch" call.
@@ -3027,10 +3023,7 @@ func (c *BucketsUpdateCall) doRequest(alt string) (*http.Response, error) {
googleapi.Expand(req.URL, map[string]string{
"bucket": c.bucket,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.buckets.update" call.
@@ -3212,10 +3205,7 @@ func (c *ChannelsStopCall) doRequest(alt string) (*http.Response, error) {
req, _ := http.NewRequest("POST", urls, body)
req.Header = reqHeaders
googleapi.SetOpaque(req.URL)
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.channels.stop" call.
@@ -3298,10 +3288,7 @@ func (c *DefaultObjectAccessControlsDeleteCall) doRequest(alt string) (*http.Res
"bucket": c.bucket,
"entity": c.entity,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.defaultObjectAccessControls.delete" call.
@@ -3409,10 +3396,7 @@ func (c *DefaultObjectAccessControlsGetCall) doRequest(alt string) (*http.Respon
"bucket": c.bucket,
"entity": c.entity,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.defaultObjectAccessControls.get" call.
@@ -3538,10 +3522,7 @@ func (c *DefaultObjectAccessControlsInsertCall) doRequest(alt string) (*http.Res
googleapi.Expand(req.URL, map[string]string{
"bucket": c.bucket,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.defaultObjectAccessControls.insert" call.
@@ -3686,10 +3667,7 @@ func (c *DefaultObjectAccessControlsListCall) doRequest(alt string) (*http.Respo
googleapi.Expand(req.URL, map[string]string{
"bucket": c.bucket,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.defaultObjectAccessControls.list" call.
@@ -3823,10 +3801,7 @@ func (c *DefaultObjectAccessControlsPatchCall) doRequest(alt string) (*http.Resp
"bucket": c.bucket,
"entity": c.entity,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.defaultObjectAccessControls.patch" call.
@@ -3957,10 +3932,7 @@ func (c *DefaultObjectAccessControlsUpdateCall) doRequest(alt string) (*http.Res
"bucket": c.bucket,
"entity": c.entity,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.defaultObjectAccessControls.update" call.
@@ -4096,10 +4068,7 @@ func (c *ObjectAccessControlsDeleteCall) doRequest(alt string) (*http.Response,
"object": c.object,
"entity": c.entity,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.objectAccessControls.delete" call.
@@ -4231,10 +4200,7 @@ func (c *ObjectAccessControlsGetCall) doRequest(alt string) (*http.Response, err
"object": c.object,
"entity": c.entity,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.objectAccessControls.get" call.
@@ -4383,10 +4349,7 @@ func (c *ObjectAccessControlsInsertCall) doRequest(alt string) (*http.Response,
"bucket": c.bucket,
"object": c.object,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.objectAccessControls.insert" call.
@@ -4538,10 +4501,7 @@ func (c *ObjectAccessControlsListCall) doRequest(alt string) (*http.Response, er
"bucket": c.bucket,
"object": c.object,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.objectAccessControls.list" call.
@@ -4687,10 +4647,7 @@ func (c *ObjectAccessControlsPatchCall) doRequest(alt string) (*http.Response, e
"object": c.object,
"entity": c.entity,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.objectAccessControls.patch" call.
@@ -4845,10 +4802,7 @@ func (c *ObjectAccessControlsUpdateCall) doRequest(alt string) (*http.Response,
"object": c.object,
"entity": c.entity,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.objectAccessControls.update" call.
@@ -4996,6 +4950,16 @@ func (c *ObjectsComposeCall) IfMetagenerationMatch(ifMetagenerationMatch int64)
return c
}
+// KmsKeyName sets the optional parameter "kmsKeyName": Resource name of
+// the Cloud KMS key, of the form
+// projects/my-project/locations/global/keyRings/my-kr/cryptoKeys/my-key,
+// that will be used to encrypt the object. Overrides the object
+// metadata's kms_key_name value, if any.
+func (c *ObjectsComposeCall) KmsKeyName(kmsKeyName string) *ObjectsComposeCall {
+ c.urlParams_.Set("kmsKeyName", kmsKeyName)
+ return c
+}
+
// Fields allows partial responses to be retrieved. See
// https://developers.google.com/gdata/docs/2.0/basics#PartialResponse
// for more information.
@@ -5030,10 +4994,7 @@ func (c *ObjectsComposeCall) doRequest(alt string) (*http.Response, error) {
"destinationBucket": c.destinationBucket,
"destinationObject": c.destinationObject,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Download fetches the API endpoint's "media" value, instead of the normal
@@ -5142,6 +5103,11 @@ func (c *ObjectsComposeCall) Do(opts ...googleapi.CallOption) (*Object, error) {
// "format": "int64",
// "location": "query",
// "type": "string"
+ // },
+ // "kmsKeyName": {
+ // "description": "Resource name of the Cloud KMS key, of the form projects/my-project/locations/global/keyRings/my-kr/cryptoKeys/my-key, that will be used to encrypt the object. Overrides the object metadata's kms_key_name value, if any.",
+ // "location": "query",
+ // "type": "string"
// }
// },
// "path": "b/{destinationBucket}/o/{destinationObject}/compose",
@@ -5334,10 +5300,7 @@ func (c *ObjectsCopyCall) doRequest(alt string) (*http.Response, error) {
"destinationBucket": c.destinationBucket,
"destinationObject": c.destinationObject,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Download fetches the API endpoint's "media" value, instead of the normal
@@ -5625,10 +5588,7 @@ func (c *ObjectsDeleteCall) doRequest(alt string) (*http.Response, error) {
"bucket": c.bucket,
"object": c.object,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.objects.delete" call.
@@ -5818,10 +5778,7 @@ func (c *ObjectsGetCall) doRequest(alt string) (*http.Response, error) {
"bucket": c.bucket,
"object": c.object,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Download fetches the API endpoint's "media" value, instead of the normal
@@ -6026,6 +5983,16 @@ func (c *ObjectsInsertCall) IfMetagenerationNotMatch(ifMetagenerationNotMatch in
return c
}
+// KmsKeyName sets the optional parameter "kmsKeyName": Resource name of
+// the Cloud KMS key, of the form
+// projects/my-project/locations/global/keyRings/my-kr/cryptoKeys/my-key,
+// that will be used to encrypt the object. Overrides the object
+// metadata's kms_key_name value, if any.
+func (c *ObjectsInsertCall) KmsKeyName(kmsKeyName string) *ObjectsInsertCall {
+ c.urlParams_.Set("kmsKeyName", kmsKeyName)
+ return c
+}
+
// Name sets the optional parameter "name": Name of the object. Required
// when the object metadata is not otherwise provided. Overrides the
// object metadata's name value, if any. For information about how to
@@ -6076,6 +6043,9 @@ func (c *ObjectsInsertCall) Projection(projection string) *ObjectsInsertCall {
// supplied.
// At most one of Media and ResumableMedia may be set.
func (c *ObjectsInsertCall) Media(r io.Reader, options ...googleapi.MediaOption) *ObjectsInsertCall {
+ if ct := c.object.ContentType; ct != "" {
+ options = append([]googleapi.MediaOption{googleapi.ContentType(ct)}, options...)
+ }
opts := googleapi.ProcessMediaOptions(options)
chunkSize := opts.ChunkSize
if !opts.ForceEmptyContentType {
@@ -6169,10 +6139,7 @@ func (c *ObjectsInsertCall) doRequest(alt string) (*http.Response, error) {
googleapi.Expand(req.URL, map[string]string{
"bucket": c.bucket,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.objects.insert" call.
@@ -6297,6 +6264,11 @@ func (c *ObjectsInsertCall) Do(opts ...googleapi.CallOption) (*Object, error) {
// "location": "query",
// "type": "string"
// },
+ // "kmsKeyName": {
+ // "description": "Resource name of the Cloud KMS key, of the form projects/my-project/locations/global/keyRings/my-kr/cryptoKeys/my-key, that will be used to encrypt the object. Overrides the object metadata's kms_key_name value, if any.",
+ // "location": "query",
+ // "type": "string"
+ // },
// "name": {
// "description": "Name of the object. Required when the object metadata is not otherwise provided. Overrides the object metadata's name value, if any. For information about how to URL encode object names to be path safe, see Encoding URI Path Parts.",
// "location": "query",
@@ -6468,10 +6440,7 @@ func (c *ObjectsListCall) doRequest(alt string) (*http.Response, error) {
googleapi.Expand(req.URL, map[string]string{
"bucket": c.bucket,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.objects.list" call.
@@ -6730,10 +6699,7 @@ func (c *ObjectsPatchCall) doRequest(alt string) (*http.Response, error) {
"bucket": c.bucket,
"object": c.object,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.objects.patch" call.
@@ -6899,6 +6865,17 @@ func (r *ObjectsService) Rewrite(sourceBucket string, sourceObject string, desti
return c
}
+// DestinationKmsKeyName sets the optional parameter
+// "destinationKmsKeyName": Resource name of the Cloud KMS key, of the
+// form
+// projects/my-project/locations/global/keyRings/my-kr/cryptoKeys/my-key,
+// that will be used to encrypt the object. Overrides the object
+// metadata's kms_key_name value, if any.
+func (c *ObjectsRewriteCall) DestinationKmsKeyName(destinationKmsKeyName string) *ObjectsRewriteCall {
+ c.urlParams_.Set("destinationKmsKeyName", destinationKmsKeyName)
+ return c
+}
+
// DestinationPredefinedAcl sets the optional parameter
// "destinationPredefinedAcl": Apply a predefined set of access controls
// to the destination object.
@@ -7071,10 +7048,7 @@ func (c *ObjectsRewriteCall) doRequest(alt string) (*http.Response, error) {
"destinationBucket": c.destinationBucket,
"destinationObject": c.destinationObject,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.objects.rewrite" call.
@@ -7131,6 +7105,11 @@ func (c *ObjectsRewriteCall) Do(opts ...googleapi.CallOption) (*RewriteResponse,
// "required": true,
// "type": "string"
// },
+ // "destinationKmsKeyName": {
+ // "description": "Resource name of the Cloud KMS key, of the form projects/my-project/locations/global/keyRings/my-kr/cryptoKeys/my-key, that will be used to encrypt the object. Overrides the object metadata's kms_key_name value, if any.",
+ // "location": "query",
+ // "type": "string"
+ // },
// "destinationObject": {
// "description": "Name of the new object. Required when the object metadata is not otherwise provided. Overrides the object metadata's name value, if any. For information about how to URL encode object names to be path safe, see Encoding URI Path Parts.",
// "location": "path",
@@ -7391,10 +7370,7 @@ func (c *ObjectsUpdateCall) doRequest(alt string) (*http.Response, error) {
"bucket": c.bucket,
"object": c.object,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Download fetches the API endpoint's "media" value, instead of the normal
@@ -7658,10 +7634,7 @@ func (c *ObjectsWatchAllCall) doRequest(alt string) (*http.Response, error) {
googleapi.Expand(req.URL, map[string]string{
"bucket": c.bucket,
})
- if c.ctx_ != nil {
- return ctxhttp.Do(c.ctx_, c.s.client, req)
- }
- return c.s.client.Do(req)
+ return gensupport.SendRequest(c.ctx_, c.s.client, req)
}
// Do executes the "storage.objects.watchAll" call.
diff --git a/vendor/google.golang.org/appengine/appengine.go b/vendor/google.golang.org/appengine/appengine.go
index aa23a7a..475cf2e 100644
--- a/vendor/google.golang.org/appengine/appengine.go
+++ b/vendor/google.golang.org/appengine/appengine.go
@@ -17,6 +17,42 @@ import (
"google.golang.org/appengine/internal"
)
+// The gophers party all night; the rabbits provide the beats.
+
+// Main is the principal entry point for an app running in App Engine.
+//
+// On App Engine Flexible it installs a trivial health checker if one isn't
+// already registered, and starts listening on port 8080 (overridden by the
+// $PORT environment variable).
+//
+// See https://cloud.google.com/appengine/docs/flexible/custom-runtimes#health_check_requests
+// for details on how to do your own health checking.
+//
+// Main is not yet supported on App Engine Standard.
+//
+// Main never returns.
+//
+// Main is designed so that the app's main package looks like this:
+//
+// package main
+//
+// import (
+// "google.golang.org/appengine"
+//
+// _ "myapp/package0"
+// _ "myapp/package1"
+// )
+//
+// func main() {
+// appengine.Main()
+// }
+//
+// The "myapp/packageX" packages are expected to register HTTP handlers
+// in their init functions.
+func Main() {
+ internal.Main()
+}
+
// IsDevAppServer reports whether the App Engine app is running in the
// development App Server.
func IsDevAppServer() bool {
diff --git a/vendor/google.golang.org/appengine/appengine_vm.go b/vendor/google.golang.org/appengine/appengine_vm.go
index e4399ed..f4b645a 100644
--- a/vendor/google.golang.org/appengine/appengine_vm.go
+++ b/vendor/google.golang.org/appengine/appengine_vm.go
@@ -12,42 +12,6 @@ import (
"google.golang.org/appengine/internal"
)
-// The comment below must not be changed.
-// It is used by go-app-builder to recognise that this package has
-// the Main function to use in the synthetic main.
-// The gophers party all night; the rabbits provide the beats.
-
-// Main is the principal entry point for an app running in App Engine "flexible environment".
-// It installs a trivial health checker if one isn't already registered,
-// and starts listening on port 8080 (overridden by the $PORT environment
-// variable).
-//
-// See https://cloud.google.com/appengine/docs/flexible/custom-runtimes#health_check_requests
-// for details on how to do your own health checking.
-//
-// Main never returns.
-//
-// Main is designed so that the app's main package looks like this:
-//
-// package main
-//
-// import (
-// "google.golang.org/appengine"
-//
-// _ "myapp/package0"
-// _ "myapp/package1"
-// )
-//
-// func main() {
-// appengine.Main()
-// }
-//
-// The "myapp/packageX" packages are expected to register HTTP handlers
-// in their init functions.
-func Main() {
- internal.Main()
-}
-
// BackgroundContext returns a context not associated with a request.
// This should only be used when not servicing a request.
// This only works in App Engine "flexible environment".
diff --git a/vendor/google.golang.org/appengine/internal/internal.go b/vendor/google.golang.org/appengine/internal/internal.go
index 66e8d76..051ea39 100644
--- a/vendor/google.golang.org/appengine/internal/internal.go
+++ b/vendor/google.golang.org/appengine/internal/internal.go
@@ -10,11 +10,6 @@ package internal
import (
"fmt"
- "io"
- "log"
- "net/http"
- "net/url"
- "os"
"github.com/golang/protobuf/proto"
@@ -109,35 +104,6 @@ func (e *CallError) IsTimeout() bool {
return e.Timeout
}
-func Main() {
- installHealthChecker(http.DefaultServeMux)
-
- port := "8080"
- if s := os.Getenv("PORT"); s != "" {
- port = s
- }
-
- if err := http.ListenAndServe(":"+port, http.HandlerFunc(handleHTTP)); err != nil {
- log.Fatalf("http.ListenAndServe: %v", err)
- }
-}
-
-func installHealthChecker(mux *http.ServeMux) {
- // If no health check handler has been installed by this point, add a trivial one.
- const healthPath = "/_ah/health"
- hreq := &http.Request{
- Method: "GET",
- URL: &url.URL{
- Path: healthPath,
- },
- }
- if _, pat := mux.Handler(hreq); pat != healthPath {
- mux.HandleFunc(healthPath, func(w http.ResponseWriter, r *http.Request) {
- io.WriteString(w, "ok")
- })
- }
-}
-
// NamespaceMods is a map from API service to a function that will mutate an RPC request to attach a namespace.
// The function should be prepared to be called on the same message more than once; it should only modify the
// RPC request the first time.
diff --git a/vendor/google.golang.org/appengine/internal/main.go b/vendor/google.golang.org/appengine/internal/main.go
new file mode 100644
index 0000000..4903616
--- /dev/null
+++ b/vendor/google.golang.org/appengine/internal/main.go
@@ -0,0 +1,15 @@
+// Copyright 2011 Google Inc. All rights reserved.
+// Use of this source code is governed by the Apache 2.0
+// license that can be found in the LICENSE file.
+
+// +build appengine
+
+package internal
+
+import (
+ "appengine_internal"
+)
+
+func Main() {
+ appengine_internal.Main()
+}
diff --git a/vendor/google.golang.org/appengine/internal/main_vm.go b/vendor/google.golang.org/appengine/internal/main_vm.go
new file mode 100644
index 0000000..57331ad
--- /dev/null
+++ b/vendor/google.golang.org/appengine/internal/main_vm.go
@@ -0,0 +1,44 @@
+// Copyright 2011 Google Inc. All rights reserved.
+// Use of this source code is governed by the Apache 2.0
+// license that can be found in the LICENSE file.
+
+// +build !appengine
+
+package internal
+
+import (
+ "io"
+ "log"
+ "net/http"
+ "net/url"
+ "os"
+)
+
+func Main() {
+ installHealthChecker(http.DefaultServeMux)
+
+ port := "8080"
+ if s := os.Getenv("PORT"); s != "" {
+ port = s
+ }
+
+ if err := http.ListenAndServe(":"+port, http.HandlerFunc(handleHTTP)); err != nil {
+ log.Fatalf("http.ListenAndServe: %v", err)
+ }
+}
+
+func installHealthChecker(mux *http.ServeMux) {
+ // If no health check handler has been installed by this point, add a trivial one.
+ const healthPath = "/_ah/health"
+ hreq := &http.Request{
+ Method: "GET",
+ URL: &url.URL{
+ Path: healthPath,
+ },
+ }
+ if _, pat := mux.Handler(hreq); pat != healthPath {
+ mux.HandleFunc(healthPath, func(w http.ResponseWriter, r *http.Request) {
+ io.WriteString(w, "ok")
+ })
+ }
+}
diff --git a/vendor/google.golang.org/cloud/CONTRIBUTORS b/vendor/google.golang.org/cloud/CONTRIBUTORS
index 6e1e7f1..cfae12c 100644
--- a/vendor/google.golang.org/cloud/CONTRIBUTORS
+++ b/vendor/google.golang.org/cloud/CONTRIBUTORS
@@ -11,6 +11,7 @@
# Keep the list alphabetically sorted.
+Andreas Litt <andreas.litt@gmail.com>
Andrew Gerrand <adg@golang.org>
Brad Fitzpatrick <bradfitz@golang.org>
Burcu Dogan <jbd@google.com>
diff --git a/vendor/google.golang.org/cloud/README.md b/vendor/google.golang.org/cloud/README.md
index 75b94eb..de29fb9 100644
--- a/vendor/google.golang.org/cloud/README.md
+++ b/vendor/google.golang.org/cloud/README.md
@@ -1,5 +1,15 @@
# Google Cloud for Go
+**NOTE:** This repo exists solely to transition to the new import paths at
+ cloud.google.com/go. It will be removed on September 12, 2016. Only emergency
+ pull requests will be accepted.
+
+To migrate off this repo, change import paths beginning
+`google.golang.org/cloud` to `cloud.google.com/go`, except for the options in
+the `google.golang.org/cloud` package itself, which are now at
+`google.golang.org/api/option`. See details [here](https://groups.google.com/forum/#!topic/google-api-go-announce/nXY-DYZGqz8).
+
+
[![Build Status](https://travis-ci.org/GoogleCloudPlatform/gcloud-golang.svg?branch=master)](https://travis-ci.org/GoogleCloudPlatform/gcloud-golang)
[![GoDoc](https://godoc.org/google.golang.org/cloud?status.svg)](https://godoc.org/google.golang.org/cloud)
@@ -61,7 +71,7 @@ function for the relevant API using a
[`cloud.WithTokenSource`](https://godoc.org/google.golang.org/cloud#WithTokenSource)
option.
-## Google Cloud Datastore
+## Google Cloud Datastore [![GoDoc](https://godoc.org/google.golang.org/cloud/datastore?status.svg)](https://godoc.org/google.golang.org/cloud/datastore)
[Google Cloud Datastore][cloud-datastore] ([docs][cloud-datastore-docs]) is a fully-
managed, schemaless database for storing non-relational data. Cloud Datastore
@@ -72,8 +82,6 @@ consistency for all other queries.
Follow the [activation instructions][cloud-datastore-activation] to use the Google
Cloud Datastore API with your project.
-https://godoc.org/google.golang.org/cloud/datastore
-
First create a `datastore.Client` to use throughout your application:
```go
@@ -104,7 +112,7 @@ if _, err := client.PutMulti(ctx, keys, posts); err != nil {
}
```
-## Google Cloud Storage
+## Google Cloud Storage [![GoDoc](https://godoc.org/google.golang.org/cloud/storage?status.svg)](https://godoc.org/google.golang.org/cloud/storage)
[Google Cloud Storage][cloud-storage] ([docs][cloud-storage-docs]) allows you to store
data on Google infrastructure with very high reliability, performance and availability,
@@ -134,16 +142,13 @@ if err != nil {
}
```
-## Google Cloud Pub/Sub
+## Google Cloud Pub/Sub [![GoDoc](https://godoc.org/google.golang.org/cloud/pubsub?status.svg)](https://godoc.org/google.golang.org/cloud/pubsub)
[Google Cloud Pub/Sub][cloud-pubsub] ([docs][cloud-pubsub-docs]) allows you to connect
your services with reliable, many-to-many, asynchronous messaging hosted on Google's
infrastructure. Cloud Pub/Sub automatically scales as you need it and provides a foundation
for building your own robust, global services.
-https://godoc.org/google.golang.org/cloud/pubsub
-
-
```go
// Publish "hello world" on topic1.
msgIDs, err := pubsub.Publish(ctx, "topic1", &pubsub.Message{
diff --git a/vendor/google.golang.org/cloud/compute/metadata/metadata.go b/vendor/google.golang.org/cloud/compute/metadata/metadata.go
index 9821843..b791790 100644
--- a/vendor/google.golang.org/cloud/compute/metadata/metadata.go
+++ b/vendor/google.golang.org/cloud/compute/metadata/metadata.go
@@ -263,7 +263,8 @@ func systemInfoSuggestsGCE() bool {
return false
}
slurp, _ := ioutil.ReadFile("/sys/class/dmi/id/product_name")
- return strings.TrimSpace(string(slurp)) == "Google"
+ name := strings.TrimSpace(string(slurp))
+ return name == "Google" || name == "Google Compute Engine"
}
// Subscribe subscribes to a value from the metadata service.
diff --git a/vendor/google.golang.org/cloud/option.go b/vendor/google.golang.org/cloud/option.go
index 4788c67..c012c73 100644
--- a/vendor/google.golang.org/cloud/option.go
+++ b/vendor/google.golang.org/cloud/option.go
@@ -79,3 +79,10 @@ func WithBaseGRPC(conn *grpc.ClientConn) ClientOption {
func WithGRPCDialOption(o grpc.DialOption) ClientOption {
return wrapOpt{option.WithGRPCDialOption(o)}
}
+
+// WithGRPCConnectionPool returns a ClientOption that creates a pool of gRPC
+// connections that requests will be balanced between.
+// This is an EXPERIMENTAL API and may be changed or removed in the future.
+func WithGRPCConnectionPool(size int) ClientOption {
+ return wrapOpt{option.WithGRPCConnectionPool(size)}
+}
diff --git a/vendor/google.golang.org/cloud/storage/storage.go b/vendor/google.golang.org/cloud/storage/storage.go
index 1f667c1..85dca80 100644
--- a/vendor/google.golang.org/cloud/storage/storage.go
+++ b/vendor/google.golang.org/cloud/storage/storage.go
@@ -18,6 +18,7 @@
package storage // import "google.golang.org/cloud/storage"
import (
+ "bytes"
"crypto"
"crypto/rand"
"crypto/rsa"
@@ -48,6 +49,9 @@ import (
var (
ErrBucketNotExist = errors.New("storage: bucket doesn't exist")
ErrObjectNotExist = errors.New("storage: object doesn't exist")
+
+ // Done is returned by iterators in this package when they have no more items.
+ Done = errors.New("storage: no more results")
)
const userAgent = "gcloud-golang-storage/20151204"
@@ -248,43 +252,162 @@ func (b *BucketHandle) Attrs(ctx context.Context) (*BucketAttrs, error) {
// List lists objects from the bucket. You can specify a query
// to filter the results. If q is nil, no filtering is applied.
+//
+// Deprecated. Use BucketHandle.Objects instead.
func (b *BucketHandle) List(ctx context.Context, q *Query) (*ObjectList, error) {
- req := b.c.raw.Objects.List(b.name)
- req.Projection("full")
- if q != nil {
- req.Delimiter(q.Delimiter)
- req.Prefix(q.Prefix)
- req.Versions(q.Versions)
- req.PageToken(q.Cursor)
- if q.MaxResults > 0 {
- req.MaxResults(int64(q.MaxResults))
- }
- }
- resp, err := req.Context(ctx).Do()
- if err != nil {
+ it := b.Objects(ctx, q)
+ attrs, pres, err := it.NextPage()
+ if err != nil && err != Done {
return nil, err
}
objects := &ObjectList{
- Results: make([]*ObjectAttrs, len(resp.Items)),
- Prefixes: make([]string, len(resp.Prefixes)),
+ Results: attrs,
+ Prefixes: pres,
+ }
+ if it.NextPageToken() != "" {
+ objects.Next = &it.query
}
- for i, item := range resp.Items {
- objects.Results[i] = newObject(item)
+ return objects, nil
+}
+
+func (b *BucketHandle) Objects(ctx context.Context, q *Query) *ObjectIterator {
+ it := &ObjectIterator{
+ ctx: ctx,
+ bucket: b,
}
- for i, prefix := range resp.Prefixes {
- objects.Prefixes[i] = prefix
+ if q != nil {
+ it.query = *q
}
- if resp.NextPageToken != "" {
- next := Query{}
- if q != nil {
- // keep the other filtering
- // criteria if there is a query
- next = *q
+ return it
+}
+
+type ObjectIterator struct {
+ ctx context.Context
+ bucket *BucketHandle
+ query Query
+ pageSize int32
+ objs []*ObjectAttrs
+ prefixes []string
+ err error
+}
+
+// Next returns the next result. Its second return value is Done if there are
+// no more results. Once Next returns Done, all subsequent calls will return
+// Done.
+//
+// Internally, Next retrieves results in bulk. You can call SetPageSize as a
+// performance hint to affect how many results are retrieved in a single RPC.
+//
+// SetPageToken should not be called when using Next.
+//
+// Next and NextPage should not be used with the same iterator.
+//
+// If Query.Delimiter is non-empty, Next returns an error. Use NextPage when using delimiters.
+func (it *ObjectIterator) Next() (*ObjectAttrs, error) {
+ if it.query.Delimiter != "" {
+ return nil, errors.New("cannot use ObjectIterator.Next with a delimiter")
+ }
+ for len(it.objs) == 0 { // "for", not "if", to handle empty pages
+ if it.err != nil {
+ return nil, it.err
+ }
+ it.nextPage()
+ if it.err != nil {
+ it.objs = nil
+ return nil, it.err
+ }
+ if it.query.Cursor == "" {
+ it.err = Done
}
- next.Cursor = resp.NextPageToken
- objects.Next = &next
}
- return objects, nil
+ o := it.objs[0]
+ it.objs = it.objs[1:]
+ return o, nil
+}
+
+const DefaultPageSize = 1000
+
+// NextPage returns the next page of results, both objects (as *ObjectAttrs)
+// and prefixes. Prefixes will be nil if query.Delimiter is empty.
+//
+// NextPage will return exactly the number of results (the total of objects and
+// prefixes) specified by the last call to SetPageSize, unless there are not
+// enough results available. If no page size was specified, it uses
+// DefaultPageSize.
+//
+// NextPage may return a second return value of Done along with the last page
+// of results.
+//
+// After NextPage returns Done, all subsequent calls to NextPage will return
+// (nil, Done).
+//
+// Next and NextPage should not be used with the same iterator.
+func (it *ObjectIterator) NextPage() (objs []*ObjectAttrs, prefixes []string, err error) {
+ defer it.SetPageSize(it.pageSize) // restore value at entry
+ if it.pageSize <= 0 {
+ it.pageSize = DefaultPageSize
+ }
+ for len(objs)+len(prefixes) < int(it.pageSize) {
+ it.pageSize -= int32(len(objs) + len(prefixes))
+ it.nextPage()
+ if it.err != nil {
+ return nil, nil, it.err
+ }
+ objs = append(objs, it.objs...)
+ prefixes = append(prefixes, it.prefixes...)
+ if it.query.Cursor == "" {
+ it.err = Done
+ return objs, prefixes, it.err
+ }
+ }
+ return objs, prefixes, it.err
+}
+
+// nextPage gets the next page of results by making a single call to the underlying method.
+// It sets it.objs, it.prefixes, it.query.Cursor, and it.err. It never sets it.err to Done.
+func (it *ObjectIterator) nextPage() {
+ if it.err != nil {
+ return
+ }
+ req := it.bucket.c.raw.Objects.List(it.bucket.name)
+ req.Projection("full")
+ req.Delimiter(it.query.Delimiter)
+ req.Prefix(it.query.Prefix)
+ req.Versions(it.query.Versions)
+ req.PageToken(it.query.Cursor)
+ if it.pageSize > 0 {
+ req.MaxResults(int64(it.pageSize))
+ }
+ resp, err := req.Context(it.ctx).Do()
+ if err != nil {
+ it.err = err
+ return
+ }
+ it.query.Cursor = resp.NextPageToken
+ it.objs = nil
+ for _, item := range resp.Items {
+ it.objs = append(it.objs, newObject(item))
+ }
+ it.prefixes = resp.Prefixes
+}
+
+// SetPageSize sets the page size for all subsequent calls to NextPage.
+// NextPage will return exactly this many items if they are present.
+func (it *ObjectIterator) SetPageSize(pageSize int32) {
+ it.pageSize = pageSize
+}
+
+// SetPageToken sets the page token for the next call to NextPage, to resume
+// the iteration from a previous point.
+func (it *ObjectIterator) SetPageToken(t string) {
+ it.query.Cursor = t
+}
+
+// NextPageToken returns a page token that can be used with SetPageToken to
+// resume iteration from the next page. It returns the empty string if there
+// are no more pages. For an example, see SetPageToken.
+func (it *ObjectIterator) NextPageToken() string {
+ return it.query.Cursor
}
// SignedURLOptions allows you to restrict the access to the signed URL.
@@ -306,9 +429,25 @@ type SignedURLOptions struct {
// $ openssl pkcs12 -in key.p12 -passin pass:notasecret -out key.pem -nodes
//
// Provide the contents of the PEM file as a byte slice.
- // Required.
+ // Exactly one of PrivateKey or SignBytes must be non-nil.
PrivateKey []byte
+ // SignBytes is a function for implementing custom signing.
+ // If your application is running on Google App Engine, you can use appengine's internal signing function:
+ // ctx := appengine.NewContext(request)
+ // acc, _ := appengine.ServiceAccount(ctx)
+ // url, err := SignedURL("bucket", "object", &SignedURLOptions{
+ // GoogleAccessID: acc,
+ // SignBytes: func(b []byte) ([]byte, error) {
+ // _, signedBytes, err := appengine.SignBytes(ctx, b)
+ // return signedBytes, err
+ // },
+ // // etc.
+ // })
+ //
+ // Exactly one of PrivateKey or SignBytes must be non-nil.
+ SignBytes func([]byte) ([]byte, error)
+
// Method is the HTTP method to be used with the signed URL.
// Signed URLs can be used with GET, HEAD, PUT, and DELETE requests.
// Required.
@@ -344,8 +483,11 @@ func SignedURL(bucket, name string, opts *SignedURLOptions) (string, error) {
if opts == nil {
return "", errors.New("storage: missing required SignedURLOptions")
}
- if opts.GoogleAccessID == "" || opts.PrivateKey == nil {
- return "", errors.New("storage: missing required credentials to generate a signed URL")
+ if opts.GoogleAccessID == "" {
+ return "", errors.New("storage: missing required GoogleAccessID")
+ }
+ if (opts.PrivateKey == nil) == (opts.SignBytes == nil) {
+ return "", errors.New("storage: exactly one of PrivateKey or SignedBytes must be set")
}
if opts.Method == "" {
return "", errors.New("storage: missing required method option")
@@ -353,26 +495,39 @@ func SignedURL(bucket, name string, opts *SignedURLOptions) (string, error) {
if opts.Expires.IsZero() {
return "", errors.New("storage: missing required expires option")
}
- key, err := parseKey(opts.PrivateKey)
- if err != nil {
- return "", err
+
+ signBytes := opts.SignBytes
+ if opts.PrivateKey != nil {
+ key, err := parseKey(opts.PrivateKey)
+ if err != nil {
+ return "", err
+ }
+ signBytes = func(b []byte) ([]byte, error) {
+ sum := sha256.Sum256(b)
+ return rsa.SignPKCS1v15(
+ rand.Reader,
+ key,
+ crypto.SHA256,
+ sum[:],
+ )
+ }
+ } else {
+ signBytes = opts.SignBytes
}
+
u := &url.URL{
Path: fmt.Sprintf("/%s/%s", bucket, name),
}
- h := sha256.New()
- fmt.Fprintf(h, "%s\n", opts.Method)
- fmt.Fprintf(h, "%s\n", opts.MD5)
- fmt.Fprintf(h, "%s\n", opts.ContentType)
- fmt.Fprintf(h, "%d\n", opts.Expires.Unix())
- fmt.Fprintf(h, "%s", strings.Join(opts.Headers, "\n"))
- fmt.Fprintf(h, "%s", u.String())
- b, err := rsa.SignPKCS1v15(
- rand.Reader,
- key,
- crypto.SHA256,
- h.Sum(nil),
- )
+
+ buf := &bytes.Buffer{}
+ fmt.Fprintf(buf, "%s\n", opts.Method)
+ fmt.Fprintf(buf, "%s\n", opts.MD5)
+ fmt.Fprintf(buf, "%s\n", opts.ContentType)
+ fmt.Fprintf(buf, "%d\n", opts.Expires.Unix())
+ fmt.Fprintf(buf, "%s", strings.Join(opts.Headers, "\n"))
+ fmt.Fprintf(buf, "%s", u.String())
+
+ b, err := signBytes(buf.Bytes())
if err != nil {
return "", err
}
@@ -915,6 +1070,8 @@ type Query struct {
// to return. As duplicate prefixes are omitted,
// fewer total results may be returned than requested.
// The default page limit is used if it is negative or zero.
+ //
+ // Deprecated. Use ObjectIterator.SetPageSize.
MaxResults int
}
diff --git a/vendor/google.golang.org/grpc/README.md b/vendor/google.golang.org/grpc/README.md
index 90e9453..660658b 100644
--- a/vendor/google.golang.org/grpc/README.md
+++ b/vendor/google.golang.org/grpc/README.md
@@ -28,5 +28,5 @@ See [API documentation](https://godoc.org/google.golang.org/grpc) for package an
Status
------
-Beta release
+GA
diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go
index 84ac178..fea0799 100644
--- a/vendor/google.golang.org/grpc/call.go
+++ b/vendor/google.golang.org/grpc/call.go
@@ -36,6 +36,7 @@ package grpc
import (
"bytes"
"io"
+ "math"
"time"
"golang.org/x/net/context"
@@ -51,13 +52,20 @@ import (
func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) error {
// Try to acquire header metadata from the server if there is any.
var err error
+ defer func() {
+ if err != nil {
+ if _, ok := err.(transport.ConnectionError); !ok {
+ t.CloseStream(stream, err)
+ }
+ }
+ }()
c.headerMD, err = stream.Header()
if err != nil {
return err
}
p := &parser{r: stream}
for {
- if err = recv(p, dopts.codec, stream, dopts.dc, reply); err != nil {
+ if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32); err != nil {
if err == io.EOF {
break
}
@@ -76,6 +84,7 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
}
defer func() {
if err != nil {
+ // If err is connection error, t will be closed, no need to close stream here.
if _, ok := err.(transport.ConnectionError); !ok {
t.CloseStream(stream, err)
}
@@ -90,7 +99,10 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
return nil, transport.StreamErrorf(codes.Internal, "grpc: %v", err)
}
err = t.Write(stream, outBuf, opts)
- if err != nil {
+ // t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
+ // does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
+ // recvResponse to get the final status.
+ if err != nil && err != io.EOF {
return nil, err
}
// Sent successfully.
@@ -158,9 +170,9 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if _, ok := err.(*rpcError); ok {
return err
}
- if err == errConnClosing {
+ if err == errConnClosing || err == errConnUnavailable {
if c.failFast {
- return Errorf(codes.Unavailable, "%v", errConnClosing)
+ return Errorf(codes.Unavailable, "%v", err)
}
continue
}
@@ -176,7 +188,10 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
put()
put = nil
}
- if _, ok := err.(transport.ConnectionError); ok {
+ // Retry a non-failfast RPC when
+ // i) there is a connection error; or
+ // ii) the server started to drain before this RPC was initiated.
+ if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
}
@@ -184,20 +199,18 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
return toRPCErr(err)
}
- // Receive the response
err = recvResponse(cc.dopts, t, &c, stream, reply)
if err != nil {
if put != nil {
put()
put = nil
}
- if _, ok := err.(transport.ConnectionError); ok {
+ if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
}
continue
}
- t.CloseStream(stream, err)
return toRPCErr(err)
}
if c.traceInfo.tr != nil {
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index c3c7691..1d3b46c 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -43,7 +43,6 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/trace"
- "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/transport"
@@ -68,13 +67,15 @@ var (
// errCredentialsConflict indicates that grpc.WithTransportCredentials()
// and grpc.WithInsecure() are both called for a connection.
errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
- // errNetworkIP indicates that the connection is down due to some network I/O error.
+ // errNetworkIO indicates that the connection is down due to some network I/O error.
errNetworkIO = errors.New("grpc: failed with network I/O error")
// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
- errNoAddr = errors.New("grpc: there is no address available to dial")
+ // errConnUnavailable indicates that the connection is unavailable.
+ errConnUnavailable = errors.New("grpc: the connection is unavailable")
+ errNoAddr = errors.New("grpc: there is no address available to dial")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
@@ -196,9 +197,14 @@ func WithTimeout(d time.Duration) DialOption {
}
// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
-func WithDialer(f func(addr string, timeout time.Duration) (net.Conn, error)) DialOption {
+func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
return func(o *dialOptions) {
- o.copts.Dialer = f
+ o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
+ if deadline, ok := ctx.Deadline(); ok {
+ return f(addr, deadline.Sub(time.Now()))
+ }
+ return f(addr, 0)
+ }
}
}
@@ -209,12 +215,34 @@ func WithUserAgent(s string) DialOption {
}
}
-// Dial creates a client connection the given target.
+// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
+ return DialContext(context.Background(), target, opts...)
+}
+
+// DialContext creates a client connection to the given target. ctx can be used to
+// cancel or expire the pending connecting. Once this function returns, the
+// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
+// to terminate all the pending operations after this function returns.
+// This is the EXPERIMENTAL API.
+func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
conns: make(map[Address]*addrConn),
}
+ cc.ctx, cc.cancel = context.WithCancel(context.Background())
+ defer func() {
+ select {
+ case <-ctx.Done():
+ conn, err = nil, ctx.Err()
+ default:
+ }
+
+ if err != nil {
+ cc.Close()
+ }
+ }()
+
for _, opt := range opts {
opt(&cc.dopts)
}
@@ -226,31 +254,33 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
if cc.dopts.bs == nil {
cc.dopts.bs = DefaultBackoffConfig
}
- if cc.dopts.balancer == nil {
- cc.dopts.balancer = RoundRobin(nil)
- }
- if err := cc.dopts.balancer.Start(target); err != nil {
- return nil, err
- }
var (
ok bool
addrs []Address
)
- ch := cc.dopts.balancer.Notify()
- if ch == nil {
- // There is no name resolver installed.
+ if cc.dopts.balancer == nil {
+ // Connect to target directly if balancer is nil.
addrs = append(addrs, Address{Addr: target})
} else {
- addrs, ok = <-ch
- if !ok || len(addrs) == 0 {
- return nil, errNoAddr
+ if err := cc.dopts.balancer.Start(target); err != nil {
+ return nil, err
+ }
+ ch := cc.dopts.balancer.Notify()
+ if ch == nil {
+ // There is no name resolver installed.
+ addrs = append(addrs, Address{Addr: target})
+ } else {
+ addrs, ok = <-ch
+ if !ok || len(addrs) == 0 {
+ return nil, errNoAddr
+ }
}
}
waitC := make(chan error, 1)
go func() {
for _, a := range addrs {
- if err := cc.newAddrConn(a, false); err != nil {
+ if err := cc.resetAddrConn(a, false, nil); err != nil {
waitC <- err
return
}
@@ -262,15 +292,17 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
timeoutCh = time.After(cc.dopts.timeout)
}
select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
case err := <-waitC:
if err != nil {
- cc.Close()
return nil, err
}
case <-timeoutCh:
- cc.Close()
return nil, ErrClientConnTimeout
}
+ // If balancer is nil or balancer.Notify() is nil, ok will be false here.
+ // The lbWatcher goroutine will not be created.
if ok {
go cc.lbWatcher()
}
@@ -317,6 +349,9 @@ func (s ConnectivityState) String() string {
// ClientConn represents a client connection to an RPC server.
type ClientConn struct {
+ ctx context.Context
+ cancel context.CancelFunc
+
target string
authority string
dopts dialOptions
@@ -347,11 +382,12 @@ func (cc *ClientConn) lbWatcher() {
}
if !keep {
del = append(del, c)
+ delete(cc.conns, c.addr)
}
}
cc.mu.Unlock()
for _, a := range add {
- cc.newAddrConn(a, true)
+ cc.resetAddrConn(a, true, nil)
}
for _, c := range del {
c.tearDown(errConnDrain)
@@ -359,13 +395,17 @@ func (cc *ClientConn) lbWatcher() {
}
}
-func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
+// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
+// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
+// If tearDownErr is nil, errConnDrain will be used instead.
+func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
ac := &addrConn{
- cc: cc,
- addr: addr,
- dopts: cc.dopts,
- shutdownChan: make(chan struct{}),
+ cc: cc,
+ addr: addr,
+ dopts: cc.dopts,
}
+ ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
+ ac.stateCV = sync.NewCond(&ac.mu)
if EnableTracing {
ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
}
@@ -383,26 +423,44 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
}
}
}
- // Insert ac into ac.cc.conns. This needs to be done before any getTransport(...) is called.
- ac.cc.mu.Lock()
- if ac.cc.conns == nil {
- ac.cc.mu.Unlock()
+ // Track ac in cc. This needs to be done before any getTransport(...) is called.
+ cc.mu.Lock()
+ if cc.conns == nil {
+ cc.mu.Unlock()
return ErrClientConnClosing
}
- stale := ac.cc.conns[ac.addr]
- ac.cc.conns[ac.addr] = ac
- ac.cc.mu.Unlock()
+ stale := cc.conns[ac.addr]
+ cc.conns[ac.addr] = ac
+ cc.mu.Unlock()
if stale != nil {
// There is an addrConn alive on ac.addr already. This could be due to
- // i) stale's Close is undergoing;
- // ii) a buggy Balancer notifies duplicated Addresses.
- stale.tearDown(errConnDrain)
+ // 1) a buggy Balancer notifies duplicated Addresses;
+ // 2) goaway was received, a new ac will replace the old ac.
+ // The old ac should be deleted from cc.conns, but the
+ // underlying transport should drain rather than close.
+ if tearDownErr == nil {
+ // tearDownErr is nil if resetAddrConn is called by
+ // 1) Dial
+ // 2) lbWatcher
+ // In both cases, the stale ac should drain, not close.
+ stale.tearDown(errConnDrain)
+ } else {
+ stale.tearDown(tearDownErr)
+ }
}
- ac.stateCV = sync.NewCond(&ac.mu)
// skipWait may overwrite the decision in ac.dopts.block.
if ac.dopts.block && !skipWait {
if err := ac.resetTransport(false); err != nil {
- ac.tearDown(err)
+ if err != errConnClosing {
+ // Tear down ac and delete it from cc.conns.
+ cc.mu.Lock()
+ delete(cc.conns, ac.addr)
+ cc.mu.Unlock()
+ ac.tearDown(err)
+ }
+ if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
+ return e.Origin()
+ }
return err
}
// Start to monitor the error status of transport.
@@ -412,7 +470,10 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
go func() {
if err := ac.resetTransport(false); err != nil {
grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
- ac.tearDown(err)
+ if err != errConnClosing {
+ // Keep this ac in cc.conns, to get the reason it's torn down.
+ ac.tearDown(err)
+ }
return
}
ac.transportMonitor()
@@ -422,24 +483,48 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
}
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
- addr, put, err := cc.dopts.balancer.Get(ctx, opts)
- if err != nil {
- return nil, nil, toRPCErr(err)
- }
- cc.mu.RLock()
- if cc.conns == nil {
+ var (
+ ac *addrConn
+ ok bool
+ put func()
+ )
+ if cc.dopts.balancer == nil {
+ // If balancer is nil, there should be only one addrConn available.
+ cc.mu.RLock()
+ if cc.conns == nil {
+ cc.mu.RUnlock()
+ return nil, nil, toRPCErr(ErrClientConnClosing)
+ }
+ for _, ac = range cc.conns {
+ // Break after the first iteration to get the first addrConn.
+ ok = true
+ break
+ }
+ cc.mu.RUnlock()
+ } else {
+ var (
+ addr Address
+ err error
+ )
+ addr, put, err = cc.dopts.balancer.Get(ctx, opts)
+ if err != nil {
+ return nil, nil, toRPCErr(err)
+ }
+ cc.mu.RLock()
+ if cc.conns == nil {
+ cc.mu.RUnlock()
+ return nil, nil, toRPCErr(ErrClientConnClosing)
+ }
+ ac, ok = cc.conns[addr]
cc.mu.RUnlock()
- return nil, nil, toRPCErr(ErrClientConnClosing)
}
- ac, ok := cc.conns[addr]
- cc.mu.RUnlock()
if !ok {
if put != nil {
put()
}
- return nil, nil, Errorf(codes.Internal, "grpc: failed to find the transport to send the rpc")
+ return nil, nil, errConnClosing
}
- t, err := ac.wait(ctx, !opts.BlockingWait)
+ t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
if err != nil {
if put != nil {
put()
@@ -451,6 +536,8 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions)
// Close tears down the ClientConn and all underlying connections.
func (cc *ClientConn) Close() error {
+ cc.cancel()
+
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
@@ -459,7 +546,9 @@ func (cc *ClientConn) Close() error {
conns := cc.conns
cc.conns = nil
cc.mu.Unlock()
- cc.dopts.balancer.Close()
+ if cc.dopts.balancer != nil {
+ cc.dopts.balancer.Close()
+ }
for _, ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
@@ -468,11 +557,13 @@ func (cc *ClientConn) Close() error {
// addrConn is a network connection to a given address.
type addrConn struct {
- cc *ClientConn
- addr Address
- dopts dialOptions
- shutdownChan chan struct{}
- events trace.EventLog
+ ctx context.Context
+ cancel context.CancelFunc
+
+ cc *ClientConn
+ addr Address
+ dopts dialOptions
+ events trace.EventLog
mu sync.Mutex
state ConnectivityState
@@ -482,6 +573,9 @@ type addrConn struct {
// due to timeout.
ready chan struct{}
transport transport.ClientTransport
+
+ // The reason this addrConn is torn down.
+ tearDownErr error
}
// printf records an event in ac's event log, unless ac has been closed.
@@ -537,8 +631,7 @@ func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState Connecti
}
func (ac *addrConn) resetTransport(closeTransport bool) error {
- var retries int
- for {
+ for retries := 0; ; retries++ {
ac.mu.Lock()
ac.printf("connecting")
if ac.state == Shutdown {
@@ -558,13 +651,20 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
t.Close()
}
sleepTime := ac.dopts.bs.backoff(retries)
- ac.dopts.copts.Timeout = sleepTime
- if sleepTime < minConnectTimeout {
- ac.dopts.copts.Timeout = minConnectTimeout
+ timeout := minConnectTimeout
+ if timeout < sleepTime {
+ timeout = sleepTime
}
+ ctx, cancel := context.WithTimeout(ac.ctx, timeout)
connectTime := time.Now()
- newTransport, err := transport.NewClientTransport(ac.addr.Addr, &ac.dopts.copts)
+ newTransport, err := transport.NewClientTransport(ctx, ac.addr.Addr, ac.dopts.copts)
if err != nil {
+ cancel()
+
+ if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
+ return err
+ }
+ grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr)
ac.mu.Lock()
if ac.state == Shutdown {
// ac.tearDown(...) has been invoked.
@@ -579,17 +679,12 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
ac.ready = nil
}
ac.mu.Unlock()
- sleepTime -= time.Since(connectTime)
- if sleepTime < 0 {
- sleepTime = 0
- }
closeTransport = false
select {
- case <-time.After(sleepTime):
- case <-ac.shutdownChan:
+ case <-time.After(sleepTime - time.Since(connectTime)):
+ case <-ac.ctx.Done():
+ return ac.ctx.Err()
}
- retries++
- grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr)
continue
}
ac.mu.Lock()
@@ -607,7 +702,9 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
close(ac.ready)
ac.ready = nil
}
- ac.down = ac.cc.dopts.balancer.Up(ac.addr)
+ if ac.cc.dopts.balancer != nil {
+ ac.down = ac.cc.dopts.balancer.Up(ac.addr)
+ }
ac.mu.Unlock()
return nil
}
@@ -621,14 +718,42 @@ func (ac *addrConn) transportMonitor() {
t := ac.transport
ac.mu.Unlock()
select {
- // shutdownChan is needed to detect the teardown when
+ // This is needed to detect the teardown when
// the addrConn is idle (i.e., no RPC in flight).
- case <-ac.shutdownChan:
+ case <-ac.ctx.Done():
+ select {
+ case <-t.Error():
+ t.Close()
+ default:
+ }
+ return
+ case <-t.GoAway():
+ // If GoAway happens without any network I/O error, ac is closed without shutting down the
+ // underlying transport (the transport will be closed when all the pending RPCs finished or
+ // failed.).
+ // If GoAway and some network I/O error happen concurrently, ac and its underlying transport
+ // are closed.
+ // In both cases, a new ac is created.
+ select {
+ case <-t.Error():
+ ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
+ default:
+ ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
+ }
return
case <-t.Error():
+ select {
+ case <-ac.ctx.Done():
+ t.Close()
+ return
+ case <-t.GoAway():
+ ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
+ return
+ default:
+ }
ac.mu.Lock()
if ac.state == Shutdown {
- // ac.tearDown(...) has been invoked.
+ // ac has been shutdown.
ac.mu.Unlock()
return
}
@@ -640,6 +765,10 @@ func (ac *addrConn) transportMonitor() {
ac.printf("transport exiting: %v", err)
ac.mu.Unlock()
grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
+ if err != errConnClosing {
+ // Keep this ac in cc.conns, to get the reason it's torn down.
+ ac.tearDown(err)
+ }
return
}
}
@@ -647,35 +776,42 @@ func (ac *addrConn) transportMonitor() {
}
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
-// iv) transport is in TransientFailure and the RPC is fail-fast.
-func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTransport, error) {
+// iv) transport is in TransientFailure and there's no balancer/failfast is true.
+func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
for {
ac.mu.Lock()
switch {
case ac.state == Shutdown:
+ if failfast || !hasBalancer {
+ // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
+ err := ac.tearDownErr
+ ac.mu.Unlock()
+ return nil, err
+ }
ac.mu.Unlock()
return nil, errConnClosing
case ac.state == Ready:
ct := ac.transport
ac.mu.Unlock()
return ct, nil
- case ac.state == TransientFailure && failFast:
- ac.mu.Unlock()
- return nil, Errorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure")
- default:
- ready := ac.ready
- if ready == nil {
- ready = make(chan struct{})
- ac.ready = ready
- }
- ac.mu.Unlock()
- select {
- case <-ctx.Done():
- return nil, toRPCErr(ctx.Err())
- // Wait until the new transport is ready or failed.
- case <-ready:
+ case ac.state == TransientFailure:
+ if failfast || hasBalancer {
+ ac.mu.Unlock()
+ return nil, errConnUnavailable
}
}
+ ready := ac.ready
+ if ready == nil {
+ ready = make(chan struct{})
+ ac.ready = ready
+ }
+ ac.mu.Unlock()
+ select {
+ case <-ctx.Done():
+ return nil, toRPCErr(ctx.Err())
+ // Wait until the new transport is ready or failed.
+ case <-ready:
+ }
}
}
@@ -683,24 +819,28 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
// some edge cases (e.g., the caller opens and closes many addrConn's in a
// tight loop.
+// tearDown doesn't remove ac from ac.cc.conns.
func (ac *addrConn) tearDown(err error) {
+ ac.cancel()
+
ac.mu.Lock()
- defer func() {
- ac.mu.Unlock()
- ac.cc.mu.Lock()
- if ac.cc.conns != nil {
- delete(ac.cc.conns, ac.addr)
- }
- ac.cc.mu.Unlock()
- }()
- if ac.state == Shutdown {
- return
- }
- ac.state = Shutdown
+ defer ac.mu.Unlock()
if ac.down != nil {
ac.down(downErrorf(false, false, "%v", err))
ac.down = nil
}
+ if err == errConnDrain && ac.transport != nil {
+ // GracefulClose(...) may be executed multiple times when
+ // i) receiving multiple GoAway frames from the server; or
+ // ii) there are concurrent name resolver/Balancer triggered
+ // address removal and GoAway.
+ ac.transport.GracefulClose()
+ }
+ if ac.state == Shutdown {
+ return
+ }
+ ac.state = Shutdown
+ ac.tearDownErr = err
ac.stateCV.Broadcast()
if ac.events != nil {
ac.events.Finish()
@@ -710,15 +850,8 @@ func (ac *addrConn) tearDown(err error) {
close(ac.ready)
ac.ready = nil
}
- if ac.transport != nil {
- if err == errConnDrain {
- ac.transport.GracefulClose()
- } else {
- ac.transport.Close()
- }
- }
- if ac.shutdownChan != nil {
- close(ac.shutdownChan)
+ if ac.transport != nil && err != errConnDrain {
+ ac.transport.Close()
}
return
}
diff --git a/vendor/google.golang.org/grpc/credentials/credentials.go b/vendor/google.golang.org/grpc/credentials/credentials.go
index 8d4c57c..13be457 100644
--- a/vendor/google.golang.org/grpc/credentials/credentials.go
+++ b/vendor/google.golang.org/grpc/credentials/credentials.go
@@ -40,11 +40,11 @@ package credentials // import "google.golang.org/grpc/credentials"
import (
"crypto/tls"
"crypto/x509"
+ "errors"
"fmt"
"io/ioutil"
"net"
"strings"
- "time"
"golang.org/x/net/context"
)
@@ -87,17 +87,24 @@ type AuthInfo interface {
AuthType() string
}
+var (
+ // ErrConnDispatched indicates that rawConn has been dispatched out of gRPC
+ // and the caller should not close rawConn.
+ ErrConnDispatched = errors.New("credentials: rawConn is dispatched out of gRPC")
+)
+
// TransportCredentials defines the common interface for all the live gRPC wire
// protocols and supported transport security protocols (e.g., TLS, SSL).
type TransportCredentials interface {
// ClientHandshake does the authentication handshake specified by the corresponding
// authentication protocol on rawConn for clients. It returns the authenticated
// connection and the corresponding auth information about the connection.
- ClientHandshake(addr string, rawConn net.Conn, timeout time.Duration) (net.Conn, AuthInfo, error)
+ // Implementations must use the provided context to implement timely cancellation.
+ ClientHandshake(context.Context, string, net.Conn) (net.Conn, AuthInfo, error)
// ServerHandshake does the authentication handshake for servers. It returns
// the authenticated connection and the corresponding auth information about
// the connection.
- ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error)
+ ServerHandshake(net.Conn) (net.Conn, AuthInfo, error)
// Info provides the ProtocolInfo of this TransportCredentials.
Info() ProtocolInfo
}
@@ -136,42 +143,28 @@ func (c *tlsCreds) RequireTransportSecurity() bool {
return true
}
-type timeoutError struct{}
-
-func (timeoutError) Error() string { return "credentials: Dial timed out" }
-func (timeoutError) Timeout() bool { return true }
-func (timeoutError) Temporary() bool { return true }
-
-func (c *tlsCreds) ClientHandshake(addr string, rawConn net.Conn, timeout time.Duration) (_ net.Conn, _ AuthInfo, err error) {
- // borrow some code from tls.DialWithDialer
- var errChannel chan error
- if timeout != 0 {
- errChannel = make(chan error, 2)
- time.AfterFunc(timeout, func() {
- errChannel <- timeoutError{}
- })
- }
+func (c *tlsCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (_ net.Conn, _ AuthInfo, err error) {
// use local cfg to avoid clobbering ServerName if using multiple endpoints
- cfg := *c.config
- if c.config.ServerName == "" {
+ cfg := cloneTLSConfig(c.config)
+ if cfg.ServerName == "" {
colonPos := strings.LastIndex(addr, ":")
if colonPos == -1 {
colonPos = len(addr)
}
cfg.ServerName = addr[:colonPos]
}
- conn := tls.Client(rawConn, &cfg)
- if timeout == 0 {
- err = conn.Handshake()
- } else {
- go func() {
- errChannel <- conn.Handshake()
- }()
- err = <-errChannel
- }
- if err != nil {
- rawConn.Close()
- return nil, nil, err
+ conn := tls.Client(rawConn, cfg)
+ errChannel := make(chan error, 1)
+ go func() {
+ errChannel <- conn.Handshake()
+ }()
+ select {
+ case err := <-errChannel:
+ if err != nil {
+ return nil, nil, err
+ }
+ case <-ctx.Done():
+ return nil, nil, ctx.Err()
}
// TODO(zhaoq): Omit the auth info for client now. It is more for
// information than anything else.
@@ -181,7 +174,6 @@ func (c *tlsCreds) ClientHandshake(addr string, rawConn net.Conn, timeout time.D
func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error) {
conn := tls.Server(rawConn, c.config)
if err := conn.Handshake(); err != nil {
- rawConn.Close()
return nil, nil, err
}
return conn, TLSInfo{conn.ConnectionState()}, nil
@@ -189,7 +181,7 @@ func (c *tlsCreds) ServerHandshake(rawConn net.Conn) (net.Conn, AuthInfo, error)
// NewTLS uses c to construct a TransportCredentials based on TLS.
func NewTLS(c *tls.Config) TransportCredentials {
- tc := &tlsCreds{c}
+ tc := &tlsCreds{cloneTLSConfig(c)}
tc.config.NextProtos = alpnProtoStr
return tc
}
diff --git a/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go b/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go
new file mode 100644
index 0000000..9647b9e
--- /dev/null
+++ b/vendor/google.golang.org/grpc/credentials/credentials_util_go17.go
@@ -0,0 +1,76 @@
+// +build go1.7
+
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package credentials
+
+import (
+ "crypto/tls"
+)
+
+// cloneTLSConfig returns a shallow clone of the exported
+// fields of cfg, ignoring the unexported sync.Once, which
+// contains a mutex and must not be copied.
+//
+// If cfg is nil, a new zero tls.Config is returned.
+//
+// TODO replace this function with official clone function.
+func cloneTLSConfig(cfg *tls.Config) *tls.Config {
+ if cfg == nil {
+ return &tls.Config{}
+ }
+ return &tls.Config{
+ Rand: cfg.Rand,
+ Time: cfg.Time,
+ Certificates: cfg.Certificates,
+ NameToCertificate: cfg.NameToCertificate,
+ GetCertificate: cfg.GetCertificate,
+ RootCAs: cfg.RootCAs,
+ NextProtos: cfg.NextProtos,
+ ServerName: cfg.ServerName,
+ ClientAuth: cfg.ClientAuth,
+ ClientCAs: cfg.ClientCAs,
+ InsecureSkipVerify: cfg.InsecureSkipVerify,
+ CipherSuites: cfg.CipherSuites,
+ PreferServerCipherSuites: cfg.PreferServerCipherSuites,
+ SessionTicketsDisabled: cfg.SessionTicketsDisabled,
+ SessionTicketKey: cfg.SessionTicketKey,
+ ClientSessionCache: cfg.ClientSessionCache,
+ MinVersion: cfg.MinVersion,
+ MaxVersion: cfg.MaxVersion,
+ CurvePreferences: cfg.CurvePreferences,
+ DynamicRecordSizingDisabled: cfg.DynamicRecordSizingDisabled,
+ Renegotiation: cfg.Renegotiation,
+ }
+}
diff --git a/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go b/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go
new file mode 100644
index 0000000..09b8d12
--- /dev/null
+++ b/vendor/google.golang.org/grpc/credentials/credentials_util_pre_go17.go
@@ -0,0 +1,74 @@
+// +build !go1.7
+
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package credentials
+
+import (
+ "crypto/tls"
+)
+
+// cloneTLSConfig returns a shallow clone of the exported
+// fields of cfg, ignoring the unexported sync.Once, which
+// contains a mutex and must not be copied.
+//
+// If cfg is nil, a new zero tls.Config is returned.
+//
+// TODO replace this function with official clone function.
+func cloneTLSConfig(cfg *tls.Config) *tls.Config {
+ if cfg == nil {
+ return &tls.Config{}
+ }
+ return &tls.Config{
+ Rand: cfg.Rand,
+ Time: cfg.Time,
+ Certificates: cfg.Certificates,
+ NameToCertificate: cfg.NameToCertificate,
+ GetCertificate: cfg.GetCertificate,
+ RootCAs: cfg.RootCAs,
+ NextProtos: cfg.NextProtos,
+ ServerName: cfg.ServerName,
+ ClientAuth: cfg.ClientAuth,
+ ClientCAs: cfg.ClientCAs,
+ InsecureSkipVerify: cfg.InsecureSkipVerify,
+ CipherSuites: cfg.CipherSuites,
+ PreferServerCipherSuites: cfg.PreferServerCipherSuites,
+ SessionTicketsDisabled: cfg.SessionTicketsDisabled,
+ SessionTicketKey: cfg.SessionTicketKey,
+ ClientSessionCache: cfg.ClientSessionCache,
+ MinVersion: cfg.MinVersion,
+ MaxVersion: cfg.MaxVersion,
+ CurvePreferences: cfg.CurvePreferences,
+ }
+}
diff --git a/vendor/google.golang.org/grpc/metadata/metadata.go b/vendor/google.golang.org/grpc/metadata/metadata.go
index 52070db..954c0f7 100644
--- a/vendor/google.golang.org/grpc/metadata/metadata.go
+++ b/vendor/google.golang.org/grpc/metadata/metadata.go
@@ -60,15 +60,21 @@ func encodeKeyValue(k, v string) (string, string) {
// DecodeKeyValue returns the original key and value corresponding to the
// encoded data in k, v.
+// If k is a binary header and v contains comma, v is split on comma before decoded,
+// and the decoded v will be joined with comma before returned.
func DecodeKeyValue(k, v string) (string, string, error) {
if !strings.HasSuffix(k, binHdrSuffix) {
return k, v, nil
}
- val, err := base64.StdEncoding.DecodeString(v)
- if err != nil {
- return "", "", err
+ vvs := strings.Split(v, ",")
+ for i, vv := range vvs {
+ val, err := base64.StdEncoding.DecodeString(vv)
+ if err != nil {
+ return "", "", err
+ }
+ vvs[i] = string(val)
}
- return k, string(val), nil
+ return k, strings.Join(vvs, ","), nil
}
// MD is a mapping from metadata keys to values. Users should use the following
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
index d628717..35ac9cc 100644
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -227,7 +227,7 @@ type parser struct {
// No other error values or types must be returned, which also means
// that the underlying io.Reader must not return an incompatible
// error.
-func (p *parser) recvMsg() (pf payloadFormat, msg []byte, err error) {
+func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err error) {
if _, err := io.ReadFull(p.r, p.header[:]); err != nil {
return 0, nil, err
}
@@ -238,6 +238,9 @@ func (p *parser) recvMsg() (pf payloadFormat, msg []byte, err error) {
if length == 0 {
return pf, nil, nil
}
+ if length > uint32(maxMsgSize) {
+ return 0, nil, Errorf(codes.Internal, "grpc: received message length %d exceeding the max size %d", length, maxMsgSize)
+ }
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message:
msg = make([]byte, int(length))
@@ -308,8 +311,8 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er
return nil
}
-func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}) error {
- pf, d, err := p.recvMsg()
+func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int) error {
+ pf, d, err := p.recvMsg(maxMsgSize)
if err != nil {
return err
}
@@ -319,11 +322,16 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
if pf == compressionMade {
d, err = dc.Do(bytes.NewReader(d))
if err != nil {
- return transport.StreamErrorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
+ return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
}
+ if len(d) > maxMsgSize {
+ // TODO: Revisit the error code. Currently keep it consistent with java
+ // implementation.
+ return Errorf(codes.Internal, "grpc: received a message of %d bytes exceeding %d limit", len(d), maxMsgSize)
+ }
if err := c.Unmarshal(d, m); err != nil {
- return transport.StreamErrorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
+ return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
}
return nil
}
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index a2b2b94..b2a825a 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -89,9 +89,13 @@ type service struct {
type Server struct {
opts options
- mu sync.Mutex // guards following
- lis map[net.Listener]bool
- conns map[io.Closer]bool
+ mu sync.Mutex // guards following
+ lis map[net.Listener]bool
+ conns map[io.Closer]bool
+ drain bool
+ // A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
+ // and all the transport goes away.
+ cv *sync.Cond
m map[string]*service // service name -> service info
events trace.EventLog
}
@@ -101,12 +105,15 @@ type options struct {
codec Codec
cp Compressor
dc Decompressor
+ maxMsgSize int
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
maxConcurrentStreams uint32
useHandlerImpl bool // use http.Handler-based server
}
+var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
+
// A ServerOption sets options.
type ServerOption func(*options)
@@ -117,20 +124,28 @@ func CustomCodec(codec Codec) ServerOption {
}
}
-// RPCCompressor returns a ServerOption that sets a compressor for outbound message.
+// RPCCompressor returns a ServerOption that sets a compressor for outbound messages.
func RPCCompressor(cp Compressor) ServerOption {
return func(o *options) {
o.cp = cp
}
}
-// RPCDecompressor returns a ServerOption that sets a decompressor for inbound message.
+// RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages.
func RPCDecompressor(dc Decompressor) ServerOption {
return func(o *options) {
o.dc = dc
}
}
+// MaxMsgSize returns a ServerOption to set the max message size in bytes for inbound mesages.
+// If this is not set, gRPC uses the default 4MB.
+func MaxMsgSize(m int) ServerOption {
+ return func(o *options) {
+ o.maxMsgSize = m
+ }
+}
+
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
// of concurrent streams to each ServerTransport.
func MaxConcurrentStreams(n uint32) ServerOption {
@@ -173,6 +188,7 @@ func StreamInterceptor(i StreamServerInterceptor) ServerOption {
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
var opts options
+ opts.maxMsgSize = defaultMaxMsgSize
for _, o := range opt {
o(&opts)
}
@@ -186,6 +202,7 @@ func NewServer(opt ...ServerOption) *Server {
conns: make(map[io.Closer]bool),
m: make(map[string]*service),
}
+ s.cv = sync.NewCond(&s.mu)
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
@@ -264,8 +281,8 @@ type ServiceInfo struct {
// GetServiceInfo returns a map from service names to ServiceInfo.
// Service names include the package names, in the form of <package>.<service>.
-func (s *Server) GetServiceInfo() map[string]*ServiceInfo {
- ret := make(map[string]*ServiceInfo)
+func (s *Server) GetServiceInfo() map[string]ServiceInfo {
+ ret := make(map[string]ServiceInfo)
for n, srv := range s.m {
methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
for m := range srv.md {
@@ -283,7 +300,7 @@ func (s *Server) GetServiceInfo() map[string]*ServiceInfo {
})
}
- ret[n] = &ServiceInfo{
+ ret[n] = ServiceInfo{
Methods: methods,
Metadata: srv.mdata,
}
@@ -350,7 +367,10 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
- rawConn.Close()
+ // If serverHandShake returns ErrConnDispatched, keep rawConn open.
+ if err != credentials.ErrConnDispatched {
+ rawConn.Close()
+ }
return
}
@@ -468,7 +488,7 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
func (s *Server) addConn(c io.Closer) bool {
s.mu.Lock()
defer s.mu.Unlock()
- if s.conns == nil {
+ if s.conns == nil || s.drain {
return false
}
s.conns[c] = true
@@ -480,6 +500,7 @@ func (s *Server) removeConn(c io.Closer) {
defer s.mu.Unlock()
if s.conns != nil {
delete(s.conns, c)
+ s.cv.Signal()
}
}
@@ -520,7 +541,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
p := &parser{r: stream}
for {
- pf, req, err := p.recvMsg()
+ pf, req, err := p.recvMsg(s.opts.maxMsgSize)
if err == io.EOF {
// The entire stream is done (for unary RPC only).
return err
@@ -530,6 +551,10 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
if err != nil {
switch err := err.(type) {
+ case *rpcError:
+ if err := t.WriteStatus(stream, err.code, err.desc); err != nil {
+ grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", err)
+ }
case transport.ConnectionError:
// Nothing to do here.
case transport.StreamError:
@@ -569,6 +594,12 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
return err
}
}
+ if len(req) > s.opts.maxMsgSize {
+ // TODO: Revisit the error code. Currently keep it consistent with
+ // java implementation.
+ statusCode = codes.Internal
+ statusDesc = fmt.Sprintf("grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
+ }
if err := s.opts.codec.Unmarshal(req, v); err != nil {
return err
}
@@ -628,13 +659,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
stream.SetSendCompress(s.opts.cp.Type())
}
ss := &serverStream{
- t: t,
- s: stream,
- p: &parser{r: stream},
- codec: s.opts.codec,
- cp: s.opts.cp,
- dc: s.opts.dc,
- trInfo: trInfo,
+ t: t,
+ s: stream,
+ p: &parser{r: stream},
+ codec: s.opts.codec,
+ cp: s.opts.cp,
+ dc: s.opts.dc,
+ maxMsgSize: s.opts.maxMsgSize,
+ trInfo: trInfo,
}
if ss.cp != nil {
ss.cbuf = new(bytes.Buffer)
@@ -766,14 +798,16 @@ func (s *Server) Stop() {
s.mu.Lock()
listeners := s.lis
s.lis = nil
- cs := s.conns
+ st := s.conns
s.conns = nil
+ // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
+ s.cv.Signal()
s.mu.Unlock()
for lis := range listeners {
lis.Close()
}
- for c := range cs {
+ for c := range st {
c.Close()
}
@@ -785,6 +819,32 @@ func (s *Server) Stop() {
s.mu.Unlock()
}
+// GracefulStop stops the gRPC server gracefully. It stops the server to accept new
+// connections and RPCs and blocks until all the pending RPCs are finished.
+func (s *Server) GracefulStop() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.drain == true || s.conns == nil {
+ return
+ }
+ s.drain = true
+ for lis := range s.lis {
+ lis.Close()
+ }
+ s.lis = nil
+ for c := range s.conns {
+ c.(transport.ServerTransport).Drain()
+ }
+ for len(s.conns) != 0 {
+ s.cv.Wait()
+ }
+ s.conns = nil
+ if s.events != nil {
+ s.events.Finish()
+ s.events = nil
+ }
+}
+
func init() {
internal.TestingCloseConns = func(arg interface{}) {
arg.(*Server).testingCloseConns()
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 7a3bef5..51df3f0 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -37,6 +37,7 @@ import (
"bytes"
"errors"
"io"
+ "math"
"sync"
"time"
@@ -84,12 +85,9 @@ type ClientStream interface {
// Header returns the header metadata received from the server if there
// is any. It blocks if the metadata is not ready to read.
Header() (metadata.MD, error)
- // Trailer returns the trailer metadata from the server. It must be called
- // after stream.Recv() returns non-nil error (including io.EOF) for
- // bi-directional streaming and server streaming or stream.CloseAndRecv()
- // returns for client streaming in order to receive trailer metadata if
- // present. Otherwise, it could returns an empty MD even though trailer
- // is present.
+ // Trailer returns the trailer metadata from the server, if there is any.
+ // It must only be called after stream.CloseAndRecv has returned, or
+ // stream.Recv has returned a non-nil error (including io.EOF).
Trailer() metadata.MD
// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met.
@@ -99,11 +97,10 @@ type ClientStream interface {
// NewClientStream creates a new Stream for the client side. This is called
// by generated code.
-func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
+func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
var (
t transport.ClientTransport
s *transport.Stream
- err error
put func()
)
c := defaultCallInfo
@@ -120,27 +117,24 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
}
- cs := &clientStream{
- opts: opts,
- c: c,
- desc: desc,
- codec: cc.dopts.codec,
- cp: cc.dopts.cp,
- dc: cc.dopts.dc,
- tracing: EnableTracing,
- }
- if cc.dopts.cp != nil {
- callHdr.SendCompress = cc.dopts.cp.Type()
- cs.cbuf = new(bytes.Buffer)
- }
- if cs.tracing {
- cs.trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
- cs.trInfo.firstLine.client = true
+ var trInfo traceInfo
+ if EnableTracing {
+ trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
+ trInfo.firstLine.client = true
if deadline, ok := ctx.Deadline(); ok {
- cs.trInfo.firstLine.deadline = deadline.Sub(time.Now())
+ trInfo.firstLine.deadline = deadline.Sub(time.Now())
}
- cs.trInfo.tr.LazyLog(&cs.trInfo.firstLine, false)
- ctx = trace.NewContext(ctx, cs.trInfo.tr)
+ trInfo.tr.LazyLog(&trInfo.firstLine, false)
+ ctx = trace.NewContext(ctx, trInfo.tr)
+ defer func() {
+ if err != nil {
+ // Need to call tr.finish() if error is returned.
+ // Because tr will not be returned to caller.
+ trInfo.tr.LazyPrintf("RPC: [%v]", err)
+ trInfo.tr.SetError()
+ trInfo.tr.Finish()
+ }
+ }()
}
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
@@ -152,9 +146,9 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
if _, ok := err.(*rpcError); ok {
return nil, err
}
- if err == errConnClosing {
+ if err == errConnClosing || err == errConnUnavailable {
if c.failFast {
- return nil, Errorf(codes.Unavailable, "%v", errConnClosing)
+ return nil, Errorf(codes.Unavailable, "%v", err)
}
continue
}
@@ -168,9 +162,8 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
put()
put = nil
}
- if _, ok := err.(transport.ConnectionError); ok {
+ if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
- cs.finish(err)
return nil, toRPCErr(err)
}
continue
@@ -179,16 +172,43 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
break
}
- cs.put = put
- cs.t = t
- cs.s = s
- cs.p = &parser{r: s}
- // Listen on ctx.Done() to detect cancellation when there is no pending
- // I/O operations on this stream.
+ cs := &clientStream{
+ opts: opts,
+ c: c,
+ desc: desc,
+ codec: cc.dopts.codec,
+ cp: cc.dopts.cp,
+ dc: cc.dopts.dc,
+
+ put: put,
+ t: t,
+ s: s,
+ p: &parser{r: s},
+
+ tracing: EnableTracing,
+ trInfo: trInfo,
+ }
+ if cc.dopts.cp != nil {
+ cs.cbuf = new(bytes.Buffer)
+ }
+ // Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination
+ // when there is no pending I/O operations on this stream.
go func() {
select {
case <-t.Error():
// Incur transport error, simply exit.
+ case <-s.Done():
+ // TODO: The trace of the RPC is terminated here when there is no pending
+ // I/O, which is probably not the optimal solution.
+ if s.StatusCode() == codes.OK {
+ cs.finish(nil)
+ } else {
+ cs.finish(Errorf(s.StatusCode(), "%s", s.StatusDesc()))
+ }
+ cs.closeTransportStream(nil)
+ case <-s.GoAway():
+ cs.finish(errConnDrain)
+ cs.closeTransportStream(errConnDrain)
case <-s.Context().Done():
err := s.Context().Err()
cs.finish(err)
@@ -251,7 +271,17 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if err != nil {
cs.finish(err)
}
- if err == nil || err == io.EOF {
+ if err == nil {
+ return
+ }
+ if err == io.EOF {
+ // Specialize the process for server streaming. SendMesg is only called
+ // once when creating the stream object. io.EOF needs to be skipped when
+ // the rpc is early finished (before the stream object is created.).
+ // TODO: It is probably better to move this into the generated code.
+ if !cs.desc.ClientStreams && cs.desc.ServerStreams {
+ err = nil
+ }
return
}
if _, ok := err.(transport.ConnectionError); !ok {
@@ -272,7 +302,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
func (cs *clientStream) RecvMsg(m interface{}) (err error) {
- err = recv(cs.p, cs.codec, cs.s, cs.dc, m)
+ err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
defer func() {
// err != nil indicates the termination of the stream.
if err != nil {
@@ -291,7 +321,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
return
}
// Special handling for client streaming rpc.
- err = recv(cs.p, cs.codec, cs.s, cs.dc, m)
+ err = recv(cs.p, cs.codec, cs.s, cs.dc, m, math.MaxInt32)
cs.closeTransportStream(err)
if err == nil {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
@@ -326,7 +356,7 @@ func (cs *clientStream) CloseSend() (err error) {
}
}()
if err == nil || err == io.EOF {
- return
+ return nil
}
if _, ok := err.(transport.ConnectionError); !ok {
cs.closeTransportStream(err)
@@ -392,6 +422,7 @@ type serverStream struct {
cp Compressor
dc Decompressor
cbuf *bytes.Buffer
+ maxMsgSize int
statusCode codes.Code
statusDesc string
trInfo *traceInfo
@@ -458,5 +489,5 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
ss.mu.Unlock()
}
}()
- return recv(ss.p, ss.codec, ss.s, ss.dc, m)
+ return recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxMsgSize)
}
diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/control.go
index 7e9bdf3..4ef0830 100644
--- a/vendor/google.golang.org/grpc/transport/control.go
+++ b/vendor/google.golang.org/grpc/transport/control.go
@@ -72,6 +72,11 @@ type resetStream struct {
func (*resetStream) item() {}
+type goAway struct {
+}
+
+func (*goAway) item() {}
+
type flushIO struct {
}
diff --git a/vendor/google.golang.org/grpc/transport/go16.go b/vendor/google.golang.org/grpc/transport/go16.go
new file mode 100644
index 0000000..ee1c46b
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/go16.go
@@ -0,0 +1,46 @@
+// +build go1.6,!go1.7
+
+/*
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package transport
+
+import (
+ "net"
+
+ "golang.org/x/net/context"
+)
+
+// dialContext connects to the address on the named network.
+func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
+ return (&net.Dialer{Cancel: ctx.Done()}).Dial(network, address)
+}
diff --git a/vendor/google.golang.org/grpc/transport/go17.go b/vendor/google.golang.org/grpc/transport/go17.go
new file mode 100644
index 0000000..356f13f
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/go17.go
@@ -0,0 +1,46 @@
+// +build go1.7
+
+/*
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package transport
+
+import (
+ "net"
+
+ "golang.org/x/net/context"
+)
+
+// dialContext connects to the address on the named network.
+func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
+ return (&net.Dialer{}).DialContext(ctx, network, address)
+}
diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/transport/handler_server.go
index 4b0d525..f23b2da 100644
--- a/vendor/google.golang.org/grpc/transport/handler_server.go
+++ b/vendor/google.golang.org/grpc/transport/handler_server.go
@@ -83,7 +83,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTr
}
if v := r.Header.Get("grpc-timeout"); v != "" {
- to, err := timeoutDecode(v)
+ to, err := decodeTimeout(v)
if err != nil {
return nil, StreamErrorf(codes.Internal, "malformed time-out: %v", err)
}
@@ -194,7 +194,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, statusCode codes.Code,
h := ht.rw.Header()
h.Set("Grpc-Status", fmt.Sprintf("%d", statusCode))
if statusDesc != "" {
- h.Set("Grpc-Message", statusDesc)
+ h.Set("Grpc-Message", encodeGrpcMessage(statusDesc))
}
if md := s.Trailer(); len(md) > 0 {
for k, vv := range md {
@@ -370,6 +370,10 @@ func (ht *serverHandlerTransport) runStream() {
}
}
+func (ht *serverHandlerTransport) Drain() {
+ panic("Drain() is not implemented")
+}
+
// mapRecvMsgError returns the non-nil err into the appropriate
// error value as expected by callers of *grpc.parser.recvMsg.
// In particular, in can only be:
@@ -389,5 +393,5 @@ func mapRecvMsgError(err error) error {
}
}
}
- return ConnectionError{Desc: err.Error()}
+ return ConnectionErrorf(true, err, err.Error())
}
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go
index f66435f..afbba45 100644
--- a/vendor/google.golang.org/grpc/transport/http2_client.go
+++ b/vendor/google.golang.org/grpc/transport/http2_client.go
@@ -35,6 +35,7 @@ package transport
import (
"bytes"
+ "fmt"
"io"
"math"
"net"
@@ -71,6 +72,9 @@ type http2Client struct {
shutdownChan chan struct{}
// errorChan is closed to notify the I/O error to the caller.
errorChan chan struct{}
+ // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
+ // that the server sent GoAway on this transport.
+ goAway chan struct{}
framer *framer
hBuf *bytes.Buffer // the buffer for HPACK encoding
@@ -97,41 +101,73 @@ type http2Client struct {
maxStreams int
// the per-stream outbound flow control window size set by the peer.
streamSendQuota uint32
+ // goAwayID records the Last-Stream-ID in the GoAway frame from the server.
+ goAwayID uint32
+ // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
+ prevGoAwayID uint32
+}
+
+func dial(fn func(context.Context, string) (net.Conn, error), ctx context.Context, addr string) (net.Conn, error) {
+ if fn != nil {
+ return fn(ctx, addr)
+ }
+ return dialContext(ctx, "tcp", addr)
+}
+
+func isTemporary(err error) bool {
+ switch err {
+ case io.EOF:
+ // Connection closures may be resolved upon retry, and are thus
+ // treated as temporary.
+ return true
+ case context.DeadlineExceeded:
+ // In Go 1.7, context.DeadlineExceeded implements Timeout(), and this
+ // special case is not needed. Until then, we need to keep this
+ // clause.
+ return true
+ }
+
+ switch err := err.(type) {
+ case interface {
+ Temporary() bool
+ }:
+ return err.Temporary()
+ case interface {
+ Timeout() bool
+ }:
+ // Timeouts may be resolved upon retry, and are thus treated as
+ // temporary.
+ return err.Timeout()
+ }
+ return false
}
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
-func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err error) {
- if opts.Dialer == nil {
- // Set the default Dialer.
- opts.Dialer = func(addr string, timeout time.Duration) (net.Conn, error) {
- return net.DialTimeout("tcp", addr, timeout)
- }
- }
+func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ ClientTransport, err error) {
scheme := "http"
- startT := time.Now()
- timeout := opts.Timeout
- conn, connErr := opts.Dialer(addr, timeout)
- if connErr != nil {
- return nil, ConnectionErrorf("transport: %v", connErr)
+ conn, err := dial(opts.Dialer, ctx, addr)
+ if err != nil {
+ return nil, ConnectionErrorf(true, err, "transport: %v", err)
}
+ // Any further errors will close the underlying connection
+ defer func(conn net.Conn) {
+ if err != nil {
+ conn.Close()
+ }
+ }(conn)
var authInfo credentials.AuthInfo
- if opts.TransportCredentials != nil {
+ if creds := opts.TransportCredentials; creds != nil {
scheme = "https"
- if timeout > 0 {
- timeout -= time.Since(startT)
- }
- conn, authInfo, connErr = opts.TransportCredentials.ClientHandshake(addr, conn, timeout)
- }
- if connErr != nil {
- return nil, ConnectionErrorf("transport: %v", connErr)
- }
- defer func() {
+ conn, authInfo, err = creds.ClientHandshake(ctx, addr, conn)
if err != nil {
- conn.Close()
+ // Credentials handshake errors are typically considered permanent
+ // to avoid retrying on e.g. bad certificates.
+ temp := isTemporary(err)
+ return nil, ConnectionErrorf(temp, err, "transport: %v", err)
}
- }()
+ }
ua := primaryUA
if opts.UserAgent != "" {
ua = opts.UserAgent + " " + ua
@@ -147,6 +183,7 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
writableChan: make(chan int, 1),
shutdownChan: make(chan struct{}),
errorChan: make(chan struct{}),
+ goAway: make(chan struct{}),
framer: newFramer(conn),
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
@@ -168,11 +205,11 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
n, err := t.conn.Write(clientPreface)
if err != nil {
t.Close()
- return nil, ConnectionErrorf("transport: %v", err)
+ return nil, ConnectionErrorf(true, err, "transport: %v", err)
}
if n != len(clientPreface) {
t.Close()
- return nil, ConnectionErrorf("transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
+ return nil, ConnectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
}
if initialWindowSize != defaultWindowSize {
err = t.framer.writeSettings(true, http2.Setting{
@@ -184,13 +221,13 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
}
if err != nil {
t.Close()
- return nil, ConnectionErrorf("transport: %v", err)
+ return nil, ConnectionErrorf(true, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
t.Close()
- return nil, ConnectionErrorf("transport: %v", err)
+ return nil, ConnectionErrorf(true, err, "transport: %v", err)
}
}
go t.controller()
@@ -202,6 +239,8 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
s := &Stream{
id: t.nextID,
+ done: make(chan struct{}),
+ goAway: make(chan struct{}),
method: callHdr.Method,
sendCompress: callHdr.SendCompress,
buf: newRecvBuffer(),
@@ -216,8 +255,9 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
// Make a stream be able to cancel the pending operations by itself.
s.ctx, s.cancel = context.WithCancel(ctx)
s.dec = &recvBufferReader{
- ctx: s.ctx,
- recv: s.buf,
+ ctx: s.ctx,
+ goAway: s.goAway,
+ recv: s.buf,
}
return s
}
@@ -271,6 +311,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.mu.Unlock()
return nil, ErrConnClosing
}
+ if t.state == draining {
+ t.mu.Unlock()
+ return nil, ErrStreamDrain
+ }
if t.state != reachable {
t.mu.Unlock()
return nil, ErrConnClosing
@@ -278,7 +322,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
checkStreamsQuota := t.streamsQuota != nil
t.mu.Unlock()
if checkStreamsQuota {
- sq, err := wait(ctx, t.shutdownChan, t.streamsQuota.acquire())
+ sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
if err != nil {
return nil, err
}
@@ -287,7 +331,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.streamsQuota.add(sq - 1)
}
}
- if _, err := wait(ctx, t.shutdownChan, t.writableChan); err != nil {
+ if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
// Return the quota back now because there is no stream returned to the caller.
if _, ok := err.(StreamError); ok && checkStreamsQuota {
t.streamsQuota.add(1)
@@ -295,6 +339,15 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
return nil, err
}
t.mu.Lock()
+ if t.state == draining {
+ t.mu.Unlock()
+ if checkStreamsQuota {
+ t.streamsQuota.add(1)
+ }
+ // Need to make t writable again so that the rpc in flight can still proceed.
+ t.writableChan <- 0
+ return nil, ErrStreamDrain
+ }
if t.state != reachable {
t.mu.Unlock()
return nil, ErrConnClosing
@@ -329,7 +382,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
}
if timeout > 0 {
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: timeoutEncode(timeout)})
+ t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
}
for k, v := range authData {
// Capital header names are illegal in HTTP/2.
@@ -384,7 +437,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
if err != nil {
t.notifyError(err)
- return nil, ConnectionErrorf("transport: %v", err)
+ return nil, ConnectionErrorf(true, err, "transport: %v", err)
}
}
t.writableChan <- 0
@@ -403,22 +456,17 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
if t.streamsQuota != nil {
updateStreams = true
}
- if t.state == draining && len(t.activeStreams) == 1 {
+ delete(t.activeStreams, s.id)
+ if t.state == draining && len(t.activeStreams) == 0 {
// The transport is draining and s is the last live stream on t.
t.mu.Unlock()
t.Close()
return
}
- delete(t.activeStreams, s.id)
t.mu.Unlock()
if updateStreams {
t.streamsQuota.add(1)
}
- // In case stream sending and receiving are invoked in separate
- // goroutines (e.g., bi-directional streaming), the caller needs
- // to call cancel on the stream to interrupt the blocking on
- // other goroutines.
- s.cancel()
s.mu.Lock()
if q := s.fc.resetPendingData(); q > 0 {
if n := t.fc.onRead(q); n > 0 {
@@ -445,13 +493,13 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
// accessed any more.
func (t *http2Client) Close() (err error) {
t.mu.Lock()
- if t.state == reachable {
- close(t.errorChan)
- }
if t.state == closing {
t.mu.Unlock()
return
}
+ if t.state == reachable || t.state == draining {
+ close(t.errorChan)
+ }
t.state = closing
t.mu.Unlock()
close(t.shutdownChan)
@@ -475,10 +523,35 @@ func (t *http2Client) Close() (err error) {
func (t *http2Client) GracefulClose() error {
t.mu.Lock()
- if t.state == closing {
+ switch t.state {
+ case unreachable:
+ // The server may close the connection concurrently. t is not available for
+ // any streams. Close it now.
+ t.mu.Unlock()
+ t.Close()
+ return nil
+ case closing:
t.mu.Unlock()
return nil
}
+ // Notify the streams which were initiated after the server sent GOAWAY.
+ select {
+ case <-t.goAway:
+ n := t.prevGoAwayID
+ if n == 0 && t.nextID > 1 {
+ n = t.nextID - 2
+ }
+ m := t.goAwayID + 2
+ if m == 2 {
+ m = 1
+ }
+ for i := m; i <= n; i += 2 {
+ if s, ok := t.activeStreams[i]; ok {
+ close(s.goAway)
+ }
+ }
+ default:
+ }
if t.state == draining {
t.mu.Unlock()
return nil
@@ -504,15 +577,15 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
size := http2MaxFrameLen
s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
- sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire())
+ sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
- tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire())
+ tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
- if _, ok := err.(StreamError); ok {
+ if _, ok := err.(StreamError); ok || err == io.EOF {
t.sendQuotaPool.cancel()
}
return err
@@ -544,8 +617,8 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
// Indicate there is a writer who is about to write a data frame.
t.framer.adjustNumWriters(1)
// Got some quota. Try to acquire writing privilege on the transport.
- if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
- if _, ok := err.(StreamError); ok {
+ if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil {
+ if _, ok := err.(StreamError); ok || err == io.EOF {
// Return the connection quota back.
t.sendQuotaPool.add(len(p))
}
@@ -578,7 +651,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
// invoked.
if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
t.notifyError(err)
- return ConnectionErrorf("transport: %v", err)
+ return ConnectionErrorf(true, err, "transport: %v", err)
}
if t.framer.adjustNumWriters(-1) == 0 {
t.framer.flushWrite()
@@ -593,11 +666,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
}
s.mu.Lock()
if s.state != streamDone {
- if s.state == streamReadDone {
- s.state = streamDone
- } else {
- s.state = streamWriteDone
- }
+ s.state = streamWriteDone
}
s.mu.Unlock()
return nil
@@ -630,7 +699,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
func (t *http2Client) handleData(f *http2.DataFrame) {
size := len(f.Data())
if err := t.fc.onData(uint32(size)); err != nil {
- t.notifyError(ConnectionErrorf("%v", err))
+ t.notifyError(ConnectionErrorf(true, err, "%v", err))
return
}
// Select the right stream to dispatch.
@@ -655,6 +724,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
s.state = streamDone
s.statusCode = codes.Internal
s.statusDesc = err.Error()
+ close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
@@ -672,13 +742,14 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
// the read direction is closed, and set the status appropriately.
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
s.mu.Lock()
- if s.state == streamWriteDone {
- s.state = streamDone
- } else {
- s.state = streamReadDone
+ if s.state == streamDone {
+ s.mu.Unlock()
+ return
}
+ s.state = streamDone
s.statusCode = codes.Internal
s.statusDesc = "server closed the stream without sending trailers"
+ close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
}
@@ -704,6 +775,8 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode)
s.statusCode = codes.Unknown
}
+ s.statusDesc = fmt.Sprintf("stream terminated by RST_STREAM with error code: %d", f.ErrCode)
+ close(s.done)
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
}
@@ -728,7 +801,32 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
}
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
- // TODO(zhaoq): GoAwayFrame handler to be implemented
+ t.mu.Lock()
+ if t.state == reachable || t.state == draining {
+ if f.LastStreamID > 0 && f.LastStreamID%2 != 1 {
+ t.mu.Unlock()
+ t.notifyError(ConnectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
+ return
+ }
+ select {
+ case <-t.goAway:
+ id := t.goAwayID
+ // t.goAway has been closed (i.e.,multiple GoAways).
+ if id < f.LastStreamID {
+ t.mu.Unlock()
+ t.notifyError(ConnectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
+ return
+ }
+ t.prevGoAwayID = id
+ t.goAwayID = f.LastStreamID
+ t.mu.Unlock()
+ return
+ default:
+ }
+ t.goAwayID = f.LastStreamID
+ close(t.goAway)
+ }
+ t.mu.Unlock()
}
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
@@ -780,11 +878,11 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
if len(state.mdata) > 0 {
s.trailer = state.mdata
}
- s.state = streamDone
s.statusCode = state.statusCode
s.statusDesc = state.statusDesc
+ close(s.done)
+ s.state = streamDone
s.mu.Unlock()
-
s.write(recvMsg{err: io.EOF})
}
@@ -937,13 +1035,22 @@ func (t *http2Client) Error() <-chan struct{} {
return t.errorChan
}
+func (t *http2Client) GoAway() <-chan struct{} {
+ return t.goAway
+}
+
func (t *http2Client) notifyError(err error) {
t.mu.Lock()
- defer t.mu.Unlock()
// make sure t.errorChan is closed only once.
+ if t.state == draining {
+ t.mu.Unlock()
+ t.Close()
+ return
+ }
if t.state == reachable {
t.state = unreachable
close(t.errorChan)
grpclog.Printf("transport: http2Client.notifyError got notified that the client transport was broken %v.", err)
}
+ t.mu.Unlock()
}
diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go
index cee1542..16010d5 100644
--- a/vendor/google.golang.org/grpc/transport/http2_server.go
+++ b/vendor/google.golang.org/grpc/transport/http2_server.go
@@ -111,12 +111,12 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
Val: uint32(initialWindowSize)})
}
if err := framer.writeSettings(true, settings...); err != nil {
- return nil, ConnectionErrorf("transport: %v", err)
+ return nil, ConnectionErrorf(true, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
- return nil, ConnectionErrorf("transport: %v", err)
+ return nil, ConnectionErrorf(true, err, "transport: %v", err)
}
}
var buf bytes.Buffer
@@ -142,7 +142,7 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
}
// operateHeader takes action on the decoded headers.
-func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) {
+func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
buf := newRecvBuffer()
s := &Stream{
id: frame.Header().StreamID,
@@ -205,6 +205,13 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
return
}
+ if s.id%2 != 1 || s.id <= t.maxStreamID {
+ t.mu.Unlock()
+ // illegal gRPC stream id.
+ grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", s.id)
+ return true
+ }
+ t.maxStreamID = s.id
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
t.activeStreams[s.id] = s
t.mu.Unlock()
@@ -212,6 +219,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
t.updateWindow(s, uint32(n))
}
handle(s)
+ return
}
// HandleStreams receives incoming streams using the given handler. This is
@@ -231,6 +239,10 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
}
frame, err := t.framer.readFrame()
+ if err == io.EOF || err == io.ErrUnexpectedEOF {
+ t.Close()
+ return
+ }
if err != nil {
grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
t.Close()
@@ -257,20 +269,20 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
t.controlBuf.put(&resetStream{se.StreamID, se.Code})
continue
}
+ if err == io.EOF || err == io.ErrUnexpectedEOF {
+ t.Close()
+ return
+ }
+ grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
t.Close()
return
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
- id := frame.Header().StreamID
- if id%2 != 1 || id <= t.maxStreamID {
- // illegal gRPC stream id.
- grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", id)
+ if t.operateHeaders(frame, handle) {
t.Close()
break
}
- t.maxStreamID = id
- t.operateHeaders(frame, handle)
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
@@ -282,7 +294,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
case *http2.GoAwayFrame:
- break
+ // TODO: Handle GoAway from the client appropriately.
default:
grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
}
@@ -364,11 +376,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
// Received the end of stream from the client.
s.mu.Lock()
if s.state != streamDone {
- if s.state == streamWriteDone {
- s.state = streamDone
- } else {
- s.state = streamReadDone
- }
+ s.state = streamReadDone
}
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
@@ -440,7 +448,7 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e
}
if err != nil {
t.Close()
- return ConnectionErrorf("transport: %v", err)
+ return ConnectionErrorf(true, err, "transport: %v", err)
}
}
return nil
@@ -455,7 +463,7 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
}
s.headerOk = true
s.mu.Unlock()
- if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
+ if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
}
t.hBuf.Reset()
@@ -495,7 +503,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
headersSent = true
}
s.mu.Unlock()
- if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
+ if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
}
t.hBuf.Reset()
@@ -508,7 +516,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
Name: "grpc-status",
Value: strconv.Itoa(int(statusCode)),
})
- t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: statusDesc})
+ t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(statusDesc)})
// Attach the trailer metadata.
for k, v := range s.trailer {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
@@ -544,7 +552,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
}
s.mu.Unlock()
if writeHeaderFrame {
- if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
+ if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
return err
}
t.hBuf.Reset()
@@ -560,7 +568,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
}
if err := t.framer.writeHeaders(false, p); err != nil {
t.Close()
- return ConnectionErrorf("transport: %v", err)
+ return ConnectionErrorf(true, err, "transport: %v", err)
}
t.writableChan <- 0
}
@@ -572,13 +580,13 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
size := http2MaxFrameLen
s.sendQuotaPool.add(0)
// Wait until the stream has some quota to send the data.
- sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire())
+ sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
if err != nil {
return err
}
t.sendQuotaPool.add(0)
// Wait until the transport has some quota to send the data.
- tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire())
+ tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
if err != nil {
if _, ok := err.(StreamError); ok {
t.sendQuotaPool.cancel()
@@ -604,7 +612,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
t.framer.adjustNumWriters(1)
// Got some quota. Try to acquire writing privilege on the
// transport.
- if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
+ if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
if _, ok := err.(StreamError); ok {
// Return the connection quota back.
t.sendQuotaPool.add(ps)
@@ -634,7 +642,7 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
}
if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
t.Close()
- return ConnectionErrorf("transport: %v", err)
+ return ConnectionErrorf(true, err, "transport: %v", err)
}
if t.framer.adjustNumWriters(-1) == 0 {
t.framer.flushWrite()
@@ -679,6 +687,17 @@ func (t *http2Server) controller() {
}
case *resetStream:
t.framer.writeRSTStream(true, i.streamID, i.code)
+ case *goAway:
+ t.mu.Lock()
+ if t.state == closing {
+ t.mu.Unlock()
+ // The transport is closing.
+ return
+ }
+ sid := t.maxStreamID
+ t.state = draining
+ t.mu.Unlock()
+ t.framer.writeGoAway(true, sid, http2.ErrCodeNo, nil)
case *flushIO:
t.framer.flushWrite()
case *ping:
@@ -724,6 +743,9 @@ func (t *http2Server) Close() (err error) {
func (t *http2Server) closeStream(s *Stream) {
t.mu.Lock()
delete(t.activeStreams, s.id)
+ if t.state == draining && len(t.activeStreams) == 0 {
+ defer t.Close()
+ }
t.mu.Unlock()
// In case stream sending and receiving are invoked in separate
// goroutines (e.g., bi-directional streaming), cancel needs to be
@@ -746,3 +768,7 @@ func (t *http2Server) closeStream(s *Stream) {
func (t *http2Server) RemoteAddr() net.Addr {
return t.conn.RemoteAddr()
}
+
+func (t *http2Server) Drain() {
+ t.controlBuf.put(&goAway{})
+}
diff --git a/vendor/google.golang.org/grpc/transport/http_util.go b/vendor/google.golang.org/grpc/transport/http_util.go
index f2e23dc..79da512 100644
--- a/vendor/google.golang.org/grpc/transport/http_util.go
+++ b/vendor/google.golang.org/grpc/transport/http_util.go
@@ -35,6 +35,7 @@ package transport
import (
"bufio"
+ "bytes"
"fmt"
"io"
"net"
@@ -52,7 +53,7 @@ import (
const (
// The primary user agent
- primaryUA = "grpc-go/0.11"
+ primaryUA = "grpc-go/1.0"
// http2MaxFrameLen specifies the max length of a HTTP2 frame.
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues
@@ -174,11 +175,11 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
}
d.statusCode = codes.Code(code)
case "grpc-message":
- d.statusDesc = f.Value
+ d.statusDesc = decodeGrpcMessage(f.Value)
case "grpc-timeout":
d.timeoutSet = true
var err error
- d.timeout, err = timeoutDecode(f.Value)
+ d.timeout, err = decodeTimeout(f.Value)
if err != nil {
d.setErr(StreamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
return
@@ -251,7 +252,7 @@ func div(d, r time.Duration) int64 {
}
// TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it.
-func timeoutEncode(t time.Duration) string {
+func encodeTimeout(t time.Duration) string {
if d := div(t, time.Nanosecond); d <= maxTimeoutValue {
return strconv.FormatInt(d, 10) + "n"
}
@@ -271,7 +272,7 @@ func timeoutEncode(t time.Duration) string {
return strconv.FormatInt(div(t, time.Hour), 10) + "H"
}
-func timeoutDecode(s string) (time.Duration, error) {
+func decodeTimeout(s string) (time.Duration, error) {
size := len(s)
if size < 2 {
return 0, fmt.Errorf("transport: timeout string is too short: %q", s)
@@ -288,6 +289,80 @@ func timeoutDecode(s string) (time.Duration, error) {
return d * time.Duration(t), nil
}
+const (
+ spaceByte = ' '
+ tildaByte = '~'
+ percentByte = '%'
+)
+
+// encodeGrpcMessage is used to encode status code in header field
+// "grpc-message".
+// It checks to see if each individual byte in msg is an
+// allowable byte, and then either percent encoding or passing it through.
+// When percent encoding, the byte is converted into hexadecimal notation
+// with a '%' prepended.
+func encodeGrpcMessage(msg string) string {
+ if msg == "" {
+ return ""
+ }
+ lenMsg := len(msg)
+ for i := 0; i < lenMsg; i++ {
+ c := msg[i]
+ if !(c >= spaceByte && c < tildaByte && c != percentByte) {
+ return encodeGrpcMessageUnchecked(msg)
+ }
+ }
+ return msg
+}
+
+func encodeGrpcMessageUnchecked(msg string) string {
+ var buf bytes.Buffer
+ lenMsg := len(msg)
+ for i := 0; i < lenMsg; i++ {
+ c := msg[i]
+ if c >= spaceByte && c < tildaByte && c != percentByte {
+ buf.WriteByte(c)
+ } else {
+ buf.WriteString(fmt.Sprintf("%%%02X", c))
+ }
+ }
+ return buf.String()
+}
+
+// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
+func decodeGrpcMessage(msg string) string {
+ if msg == "" {
+ return ""
+ }
+ lenMsg := len(msg)
+ for i := 0; i < lenMsg; i++ {
+ if msg[i] == percentByte && i+2 < lenMsg {
+ return decodeGrpcMessageUnchecked(msg)
+ }
+ }
+ return msg
+}
+
+func decodeGrpcMessageUnchecked(msg string) string {
+ var buf bytes.Buffer
+ lenMsg := len(msg)
+ for i := 0; i < lenMsg; i++ {
+ c := msg[i]
+ if c == percentByte && i+2 < lenMsg {
+ parsed, err := strconv.ParseInt(msg[i+1:i+3], 16, 8)
+ if err != nil {
+ buf.WriteByte(c)
+ } else {
+ buf.WriteByte(byte(parsed))
+ i += 2
+ }
+ } else {
+ buf.WriteByte(c)
+ }
+ }
+ return buf.String()
+}
+
type framer struct {
numWriters int32
reader io.Reader
diff --git a/vendor/google.golang.org/grpc/transport/pre_go16.go b/vendor/google.golang.org/grpc/transport/pre_go16.go
new file mode 100644
index 0000000..33d91c1
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/pre_go16.go
@@ -0,0 +1,51 @@
+// +build !go1.6
+
+/*
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package transport
+
+import (
+ "net"
+ "time"
+
+ "golang.org/x/net/context"
+)
+
+// dialContext connects to the address on the named network.
+func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
+ var dialer net.Dialer
+ if deadline, ok := ctx.Deadline(); ok {
+ dialer.Timeout = deadline.Sub(time.Now())
+ }
+ return dialer.Dial(network, address)
+}
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go
index d4c220a..b31769a 100644
--- a/vendor/google.golang.org/grpc/transport/transport.go
+++ b/vendor/google.golang.org/grpc/transport/transport.go
@@ -44,7 +44,6 @@ import (
"io"
"net"
"sync"
- "time"
"golang.org/x/net/context"
"golang.org/x/net/trace"
@@ -120,10 +119,11 @@ func (b *recvBuffer) get() <-chan item {
// recvBufferReader implements io.Reader interface to read the data from
// recvBuffer.
type recvBufferReader struct {
- ctx context.Context
- recv *recvBuffer
- last *bytes.Reader // Stores the remaining data in the previous calls.
- err error
+ ctx context.Context
+ goAway chan struct{}
+ recv *recvBuffer
+ last *bytes.Reader // Stores the remaining data in the previous calls.
+ err error
}
// Read reads the next len(p) bytes from last. If last is drained, it tries to
@@ -141,6 +141,8 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) {
select {
case <-r.ctx.Done():
return 0, ContextErr(r.ctx.Err())
+ case <-r.goAway:
+ return 0, ErrStreamDrain
case i := <-r.recv.get():
r.recv.load()
m := i.(*recvMsg)
@@ -158,7 +160,7 @@ const (
streamActive streamState = iota
streamWriteDone // EndStream sent
streamReadDone // EndStream received
- streamDone // sendDone and recvDone or RSTStreamFrame is sent or received.
+ streamDone // the entire stream is finished.
)
// Stream represents an RPC in the transport layer.
@@ -169,6 +171,10 @@ type Stream struct {
// ctx is the associated context of the stream.
ctx context.Context
cancel context.CancelFunc
+ // done is closed when the final status arrives.
+ done chan struct{}
+ // goAway is closed when the server sent GoAways signal before this stream was initiated.
+ goAway chan struct{}
// method records the associated RPC method of the stream.
method string
recvCompress string
@@ -214,6 +220,18 @@ func (s *Stream) SetSendCompress(str string) {
s.sendCompress = str
}
+// Done returns a chanel which is closed when it receives the final status
+// from the server.
+func (s *Stream) Done() <-chan struct{} {
+ return s.done
+}
+
+// GoAway returns a channel which is closed when the server sent GoAways signal
+// before this stream was initiated.
+func (s *Stream) GoAway() <-chan struct{} {
+ return s.goAway
+}
+
// Header acquires the key-value pairs of header metadata once it
// is available. It blocks until i) the metadata is ready or ii) there is no
// header metadata or iii) the stream is cancelled/expired.
@@ -221,6 +239,8 @@ func (s *Stream) Header() (metadata.MD, error) {
select {
case <-s.ctx.Done():
return nil, ContextErr(s.ctx.Err())
+ case <-s.goAway:
+ return nil, ErrStreamDrain
case <-s.headerChan:
return s.header.Copy(), nil
}
@@ -335,19 +355,17 @@ type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
// Dialer specifies how to dial a network address.
- Dialer func(string, time.Duration) (net.Conn, error)
+ Dialer func(context.Context, string) (net.Conn, error)
// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
PerRPCCredentials []credentials.PerRPCCredentials
// TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials
- // Timeout specifies the timeout for dialing a ClientTransport.
- Timeout time.Duration
}
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
-func NewClientTransport(target string, opts *ConnectOptions) (ClientTransport, error) {
- return newHTTP2Client(target, opts)
+func NewClientTransport(ctx context.Context, target string, opts ConnectOptions) (ClientTransport, error) {
+ return newHTTP2Client(ctx, target, opts)
}
// Options provides additional hints and information for message
@@ -417,6 +435,11 @@ type ClientTransport interface {
// and create a new one) in error case. It should not return nil
// once the transport is initiated.
Error() <-chan struct{}
+
+ // GoAway returns a channel that is closed when ClientTranspor
+ // receives the draining signal from the server (e.g., GOAWAY frame in
+ // HTTP/2).
+ GoAway() <-chan struct{}
}
// ServerTransport is the common interface for all gRPC server-side transport
@@ -448,6 +471,9 @@ type ServerTransport interface {
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
+
+ // Drain notifies the client this ServerTransport stops accepting new RPCs.
+ Drain()
}
// StreamErrorf creates an StreamError with the specified error code and description.
@@ -459,9 +485,11 @@ func StreamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
}
// ConnectionErrorf creates an ConnectionError with the specified error description.
-func ConnectionErrorf(format string, a ...interface{}) ConnectionError {
+func ConnectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
return ConnectionError{
Desc: fmt.Sprintf(format, a...),
+ temp: temp,
+ err: e,
}
}
@@ -469,14 +497,36 @@ func ConnectionErrorf(format string, a ...interface{}) ConnectionError {
// entire connection and the retry of all the active streams.
type ConnectionError struct {
Desc string
+ temp bool
+ err error
}
func (e ConnectionError) Error() string {
return fmt.Sprintf("connection error: desc = %q", e.Desc)
}
-// ErrConnClosing indicates that the transport is closing.
-var ErrConnClosing = ConnectionError{Desc: "transport is closing"}
+// Temporary indicates if this connection error is temporary or fatal.
+func (e ConnectionError) Temporary() bool {
+ return e.temp
+}
+
+// Origin returns the original error of this connection error.
+func (e ConnectionError) Origin() error {
+ // Never return nil error here.
+ // If the original error is nil, return itself.
+ if e.err == nil {
+ return e
+ }
+ return e.err
+}
+
+var (
+ // ErrConnClosing indicates that the transport is closing.
+ ErrConnClosing = ConnectionErrorf(true, nil, "transport is closing")
+ // ErrStreamDrain indicates that the stream is rejected by the server because
+ // the server stops accepting new RPCs.
+ ErrStreamDrain = StreamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
+)
// StreamError is an error that only affects one stream within a connection.
type StreamError struct {
@@ -501,12 +551,25 @@ func ContextErr(err error) StreamError {
// wait blocks until it can receive from ctx.Done, closing, or proceed.
// If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err.
+// If it receives from done, it returns 0, io.EOF if ctx is not done; otherwise
+// it return the StreamError for ctx.Err.
+// If it receives from goAway, it returns 0, ErrStreamDrain.
// If it receives from closing, it returns 0, ErrConnClosing.
// If it receives from proceed, it returns the received integer, nil.
-func wait(ctx context.Context, closing <-chan struct{}, proceed <-chan int) (int, error) {
+func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-chan int) (int, error) {
select {
case <-ctx.Done():
return 0, ContextErr(ctx.Err())
+ case <-done:
+ // User cancellation has precedence.
+ select {
+ case <-ctx.Done():
+ return 0, ContextErr(ctx.Err())
+ default:
+ }
+ return 0, io.EOF
+ case <-goAway:
+ return 0, ErrStreamDrain
case <-closing:
return 0, ErrConnClosing
case i := <-proceed: