aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/api/gensupport/resumable.go
blob: ad169439eefff58ef13a23728a406ed2bfc64068 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package gensupport

import (
	"fmt"
	"io"
	"net/http"
	"sync"
	"time"

	"golang.org/x/net/context"
	"golang.org/x/net/context/ctxhttp"
)

const (
	// statusResumeIncomplete is the code returned by the Google uploader
	// when the transfer is not yet complete.
	statusResumeIncomplete = 308

	// statusTooManyRequests is returned by the storage API if the
	// per-project limits have been temporarily exceeded. The request
	// should be retried.
	// https://cloud.google.com/storage/docs/json_api/v1/status-codes#standardcodes
	statusTooManyRequests = 429
)

// ResumableUpload is used by the generated APIs to provide resumable uploads.
// It is not used by developers directly.
type ResumableUpload struct {
	Client *http.Client
	// URI is the resumable resource destination provided by the server after specifying "&uploadType=resumable".
	URI       string
	UserAgent string // User-Agent for header of the request
	// Media is the object being uploaded.
	Media *MediaBuffer
	// MediaType defines the media type, e.g. "image/jpeg".
	MediaType string

	mu       sync.Mutex // guards progress
	progress int64      // number of bytes uploaded so far

	// Callback is an optional function that will be periodically called with the cumulative number of bytes uploaded.
	Callback func(int64)

	// If not specified, a default exponential backoff strategy will be used.
	Backoff BackoffStrategy
}

// Progress returns the number of bytes uploaded at this point.
func (rx *ResumableUpload) Progress() int64 {
	rx.mu.Lock()
	defer rx.mu.Unlock()
	return rx.progress
}

// doUploadRequest performs a single HTTP request to upload data.
// off specifies the offset in rx.Media from which data is drawn.
// size is the number of bytes in data.
// final specifies whether data is the final chunk to be uploaded.
func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader, off, size int64, final bool) (*http.Response, error) {
	req, err := http.NewRequest("POST", rx.URI, data)
	if err != nil {
		return nil, err
	}

	req.ContentLength = size
	var contentRange string
	if final {
		if size == 0 {
			contentRange = fmt.Sprintf("bytes */%v", off)
		} else {
			contentRange = fmt.Sprintf("bytes %v-%v/%v", off, off+size-1, off+size)
		}
	} else {
		contentRange = fmt.Sprintf("bytes %v-%v/*", off, off+size-1)
	}
	req.Header.Set("Content-Range", contentRange)
	req.Header.Set("Content-Type", rx.MediaType)
	req.Header.Set("User-Agent", rx.UserAgent)
	return ctxhttp.Do(ctx, rx.Client, req)

}

// reportProgress calls a user-supplied callback to report upload progress.
// If old==updated, the callback is not called.
func (rx *ResumableUpload) reportProgress(old, updated int64) {
	if updated-old == 0 {
		return
	}
	rx.mu.Lock()
	rx.progress = updated
	rx.mu.Unlock()
	if rx.Callback != nil {
		rx.Callback(updated)
	}
}

// transferChunk performs a single HTTP request to upload a single chunk from rx.Media.
func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, error) {
	chunk, off, size, err := rx.Media.Chunk()

	done := err == io.EOF
	if !done && err != nil {
		return nil, err
	}

	res, err := rx.doUploadRequest(ctx, chunk, off, int64(size), done)
	if err != nil {
		return res, err
	}

	if res.StatusCode == statusResumeIncomplete || res.StatusCode == http.StatusOK {
		rx.reportProgress(off, off+int64(size))
	}

	if res.StatusCode == statusResumeIncomplete {
		rx.Media.Next()
	}
	return res, nil
}

func contextDone(ctx context.Context) bool {
	select {
	case <-ctx.Done():
		return true
	default:
		return false
	}
}

// Upload starts the process of a resumable upload with a cancellable context.
// It retries using the provided back off strategy until cancelled or the
// strategy indicates to stop retrying.
// It is called from the auto-generated API code and is not visible to the user.
// rx is private to the auto-generated API code.
// Exactly one of resp or err will be nil.  If resp is non-nil, the caller must call resp.Body.Close.
func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err error) {
	var pause time.Duration
	backoff := rx.Backoff
	if backoff == nil {
		backoff = DefaultBackoffStrategy()
	}

	for {
		// Ensure that we return in the case of cancelled context, even if pause is 0.
		if contextDone(ctx) {
			return nil, ctx.Err()
		}
		select {
		case <-ctx.Done():
			return nil, ctx.Err()
		case <-time.After(pause):
		}

		resp, err = rx.transferChunk(ctx)

		var status int
		if resp != nil {
			status = resp.StatusCode
		}

		// Check if we should retry the request.
		if shouldRetry(status, err) {
			var retry bool
			pause, retry = backoff.Pause()
			if retry {
				if resp != nil && resp.Body != nil {
					resp.Body.Close()
				}
				continue
			}
		}

		// If the chunk was uploaded successfully, but there's still
		// more to go, upload the next chunk without any delay.
		if status == statusResumeIncomplete {
			pause = 0
			backoff.Reset()
			resp.Body.Close()
			continue
		}

		// It's possible for err and resp to both be non-nil here, but we expose a simpler
		// contract to our callers: exactly one of resp and err will be non-nil.  This means
		// that any response body must be closed here before returning a non-nil error.
		if err != nil {
			if resp != nil && resp.Body != nil {
				resp.Body.Close()
			}
			return nil, err
		}

		return resp, nil
	}
}