aboutsummaryrefslogtreecommitdiff
path: root/vendor/cloud.google.com/go/storage/reader.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/cloud.google.com/go/storage/reader.go')
-rw-r--r--vendor/cloud.google.com/go/storage/reader.go234
1 files changed, 222 insertions, 12 deletions
diff --git a/vendor/cloud.google.com/go/storage/reader.go b/vendor/cloud.google.com/go/storage/reader.go
index c96ca8a..c0f26bf 100644
--- a/vendor/cloud.google.com/go/storage/reader.go
+++ b/vendor/cloud.google.com/go/storage/reader.go
@@ -1,4 +1,4 @@
-// Copyright 2016 Google Inc. All Rights Reserved.
+// Copyright 2016 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,23 +15,196 @@
package storage
import (
+ "errors"
"fmt"
"hash/crc32"
"io"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "reflect"
+ "strconv"
+ "strings"
+
+ "cloud.google.com/go/internal/trace"
+ "golang.org/x/net/context"
+ "google.golang.org/api/googleapi"
)
var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
+// NewReader creates a new Reader to read the contents of the
+// object.
+// ErrObjectNotExist will be returned if the object is not found.
+//
+// The caller must call Close on the returned Reader when done reading.
+func (o *ObjectHandle) NewReader(ctx context.Context) (*Reader, error) {
+ return o.NewRangeReader(ctx, 0, -1)
+}
+
+// NewRangeReader reads part of an object, reading at most length bytes
+// starting at the given offset. If length is negative, the object is read
+// until the end.
+func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) (r *Reader, err error) {
+ ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.NewRangeReader")
+ defer func() { trace.EndSpan(ctx, err) }()
+
+ if err := o.validate(); err != nil {
+ return nil, err
+ }
+ if offset < 0 {
+ return nil, fmt.Errorf("storage: invalid offset %d < 0", offset)
+ }
+ if o.conds != nil {
+ if err := o.conds.validate("NewRangeReader"); err != nil {
+ return nil, err
+ }
+ }
+ u := &url.URL{
+ Scheme: "https",
+ Host: "storage.googleapis.com",
+ Path: fmt.Sprintf("/%s/%s", o.bucket, o.object),
+ RawQuery: conditionsQuery(o.gen, o.conds),
+ }
+ verb := "GET"
+ if length == 0 {
+ verb = "HEAD"
+ }
+ req, err := http.NewRequest(verb, u.String(), nil)
+ if err != nil {
+ return nil, err
+ }
+ req = withContext(req, ctx)
+ if o.userProject != "" {
+ req.Header.Set("X-Goog-User-Project", o.userProject)
+ }
+ if o.readCompressed {
+ req.Header.Set("Accept-Encoding", "gzip")
+ }
+ if err := setEncryptionHeaders(req.Header, o.encryptionKey, false); err != nil {
+ return nil, err
+ }
+
+ // Define a function that initiates a Read with offset and length, assuming we
+ // have already read seen bytes.
+ reopen := func(seen int64) (*http.Response, error) {
+ start := offset + seen
+ if length < 0 && start > 0 {
+ req.Header.Set("Range", fmt.Sprintf("bytes=%d-", start))
+ } else if length > 0 {
+ // The end character isn't affected by how many bytes we've seen.
+ req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, offset+length-1))
+ }
+ var res *http.Response
+ err = runWithRetry(ctx, func() error {
+ res, err = o.c.hc.Do(req)
+ if err != nil {
+ return err
+ }
+ if res.StatusCode == http.StatusNotFound {
+ res.Body.Close()
+ return ErrObjectNotExist
+ }
+ if res.StatusCode < 200 || res.StatusCode > 299 {
+ body, _ := ioutil.ReadAll(res.Body)
+ res.Body.Close()
+ return &googleapi.Error{
+ Code: res.StatusCode,
+ Header: res.Header,
+ Body: string(body),
+ }
+ }
+ if start > 0 && length != 0 && res.StatusCode != http.StatusPartialContent {
+ res.Body.Close()
+ return errors.New("storage: partial request not satisfied")
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return res, nil
+ }
+
+ res, err := reopen(0)
+ if err != nil {
+ return nil, err
+ }
+ var size int64 // total size of object, even if a range was requested.
+ if res.StatusCode == http.StatusPartialContent {
+ cr := strings.TrimSpace(res.Header.Get("Content-Range"))
+ if !strings.HasPrefix(cr, "bytes ") || !strings.Contains(cr, "/") {
+
+ return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
+ }
+ size, err = strconv.ParseInt(cr[strings.LastIndex(cr, "/")+1:], 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
+ }
+ } else {
+ size = res.ContentLength
+ }
+
+ remain := res.ContentLength
+ body := res.Body
+ if length == 0 {
+ remain = 0
+ body.Close()
+ body = emptyBody
+ }
+ var (
+ checkCRC bool
+ crc uint32
+ )
+ // Even if there is a CRC header, we can't compute the hash on partial data.
+ if remain == size {
+ crc, checkCRC = parseCRC32c(res)
+ }
+ return &Reader{
+ body: body,
+ size: size,
+ remain: remain,
+ contentType: res.Header.Get("Content-Type"),
+ contentEncoding: res.Header.Get("Content-Encoding"),
+ cacheControl: res.Header.Get("Cache-Control"),
+ wantCRC: crc,
+ checkCRC: checkCRC,
+ reopen: reopen,
+ }, nil
+}
+
+func parseCRC32c(res *http.Response) (uint32, bool) {
+ const prefix = "crc32c="
+ for _, spec := range res.Header["X-Goog-Hash"] {
+ if strings.HasPrefix(spec, prefix) {
+ c, err := decodeUint32(spec[len(prefix):])
+ if err == nil {
+ return c, true
+ }
+ }
+ }
+ return 0, false
+}
+
+var emptyBody = ioutil.NopCloser(strings.NewReader(""))
+
// Reader reads a Cloud Storage object.
// It implements io.Reader.
+//
+// Typically, a Reader computes the CRC of the downloaded content and compares it to
+// the stored CRC, returning an error from Read if there is a mismatch. This integrity check
+// is skipped if transcoding occurs. See https://cloud.google.com/storage/docs/transcoding.
type Reader struct {
- body io.ReadCloser
- remain, size int64
- contentType string
- cacheControl string
- checkCRC bool // should we check the CRC?
- wantCRC uint32 // the CRC32c value the server sent in the header
- gotCRC uint32 // running crc
+ body io.ReadCloser
+ seen, remain, size int64
+ contentType string
+ contentEncoding string
+ cacheControl string
+ checkCRC bool // should we check the CRC?
+ wantCRC uint32 // the CRC32c value the server sent in the header
+ gotCRC uint32 // running crc
+ checkedCRC bool // did we check the CRC? (For tests.)
+ reopen func(seen int64) (*http.Response, error)
}
// Close closes the Reader. It must be called when done reading.
@@ -40,7 +213,7 @@ func (r *Reader) Close() error {
}
func (r *Reader) Read(p []byte) (int, error) {
- n, err := r.body.Read(p)
+ n, err := r.readWithRetry(p)
if r.remain != -1 {
r.remain -= int64(n)
}
@@ -49,14 +222,46 @@ func (r *Reader) Read(p []byte) (int, error) {
// Check CRC here. It would be natural to check it in Close, but
// everybody defers Close on the assumption that it doesn't return
// anything worth looking at.
- if r.remain == 0 && r.gotCRC != r.wantCRC {
- return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d",
- r.gotCRC, r.wantCRC)
+ if r.remain == 0 { // Only check if we have Content-Length.
+ r.checkedCRC = true
+ if r.gotCRC != r.wantCRC {
+ return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d",
+ r.gotCRC, r.wantCRC)
+ }
}
}
return n, err
}
+func (r *Reader) readWithRetry(p []byte) (int, error) {
+ n := 0
+ for len(p[n:]) > 0 {
+ m, err := r.body.Read(p[n:])
+ n += m
+ r.seen += int64(m)
+ if !shouldRetryRead(err) {
+ return n, err
+ }
+ // Read failed, but we will try again. Send a ranged read request that takes
+ // into account the number of bytes we've already seen.
+ res, err := r.reopen(r.seen)
+ if err != nil {
+ // reopen already retries
+ return n, err
+ }
+ r.body.Close()
+ r.body = res.Body
+ }
+ return n, nil
+}
+
+func shouldRetryRead(err error) bool {
+ if err == nil {
+ return false
+ }
+ return strings.HasSuffix(err.Error(), "INTERNAL_ERROR") && strings.Contains(reflect.TypeOf(err).String(), "http2")
+}
+
// Size returns the size of the object in bytes.
// The returned value is always the same and is not affected by
// calls to Read or Close.
@@ -74,6 +279,11 @@ func (r *Reader) ContentType() string {
return r.contentType
}
+// ContentEncoding returns the content encoding of the object.
+func (r *Reader) ContentEncoding() string {
+ return r.contentEncoding
+}
+
// CacheControl returns the cache control of the object.
func (r *Reader) CacheControl() string {
return r.cacheControl