Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(90)

Side by Side Diff: common/gcloud/gs/gs.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package gs 5 package gs
6 6
7 import ( 7 import (
8 "fmt"
8 "io" 9 "io"
9 "net/http" 10 "net/http"
10 "time" 11 "time"
11 12
12 "github.com/luci/luci-go/common/errors" 13 "github.com/luci/luci-go/common/errors"
13 log "github.com/luci/luci-go/common/logging" 14 log "github.com/luci/luci-go/common/logging"
14 "github.com/luci/luci-go/common/retry" 15 "github.com/luci/luci-go/common/retry"
15 "golang.org/x/net/context" 16 "golang.org/x/net/context"
16 "google.golang.org/api/googleapi" 17 "google.golang.org/api/googleapi"
17 "google.golang.org/cloud" 18 "google.golang.org/cloud"
(...skipping 11 matching lines...) Expand all
29 ) 30 )
30 31
31 // Client abstracts funcitonality to connect with and use Google Storage from 32 // Client abstracts funcitonality to connect with and use Google Storage from
32 // the actual Google Storage client. 33 // the actual Google Storage client.
33 // 34 //
34 // Non-production implementations are used primarily for testing. 35 // Non-production implementations are used primarily for testing.
35 type Client interface { 36 type Client interface {
36 io.Closer 37 io.Closer
37 38
38 // NewReader instantiates a new Reader instance for the named bucket/pat h. 39 // NewReader instantiates a new Reader instance for the named bucket/pat h.
39 » NewReader(bucket, relpath string, o Options) (io.ReadCloser, error) 40 » NewReader(p Path, o Options) (io.ReadCloser, error)
41
40 // NewWriter instantiates a new Writer instance for the named bucket/pat h. 42 // NewWriter instantiates a new Writer instance for the named bucket/pat h.
41 » NewWriter(bucket, relpath string) (Writer, error) 43 » NewWriter(p Path) (Writer, error)
dnj 2016/04/11 17:20:04 This API was originally created before I formally
42 » // Delete deletes the named Google Storage object. If the object doesn't 44
43 » // exist, a nil error will be returned. 45 » // Delete deletes the object at the specified path.
44 » Delete(bucket, relpath string) error 46 » //
47 » // If the object does not exist, it is considered a success.
48 » Delete(p Path) error
49
50 » // Rename renames an object from one path to another.
51 » //
52 » // NOTE: The object should be removed from its original path, but curren t
53 » // implementation uses two operations (Copy + Delete), so it may
54 » // occasionally fail.
55 » Rename(src, dst Path) error
45 } 56 }
46 57
47 // Options are the set of extra options to apply to the Google Storage request. 58 // Options are the set of extra options to apply to the Google Storage request.
48 type Options struct { 59 type Options struct {
49 // From is the range request starting index. If >0, the beginning of the 60 // From is the range request starting index. If >0, the beginning of the
50 // range request will be set. 61 // range request will be set.
51 From int64 62 From int64
52 // To is the range request ending index. If >0, the end of the 63 // To is the range request ending index. If >0, the end of the
53 // range request will be set. 64 // range request will be set.
54 // 65 //
(...skipping 29 matching lines...) Expand all
84 if err != nil { 95 if err != nil {
85 return nil, err 96 return nil, err
86 } 97 }
87 return &c, nil 98 return &c, nil
88 } 99 }
89 100
90 func (c *prodClient) Close() error { 101 func (c *prodClient) Close() error {
91 return c.baseClient.Close() 102 return c.baseClient.Close()
92 } 103 }
93 104
94 func (c *prodClient) NewWriter(bucket, relpath string) (Writer, error) { 105 func (c *prodClient) NewWriter(p Path) (Writer, error) {
106 » bucket, filename, err := splitPathErr(p)
107 » if err != nil {
108 » » return nil, err
109 » }
110
95 return &prodWriter{ 111 return &prodWriter{
96 Context: c, 112 Context: c,
97 client: c, 113 client: c,
98 bucket: bucket, 114 bucket: bucket,
99 » » relpath: relpath, 115 » » relpath: filename,
100 }, nil 116 }, nil
101 } 117 }
102 118
103 func (c *prodClient) NewReader(bucket, relpath string, o Options) (io.ReadCloser , error) { 119 func (c *prodClient) NewReader(p Path, o Options) (io.ReadCloser, error) {
104 if o.From < 0 { 120 if o.From < 0 {
105 o.From = 0 121 o.From = 0
106 } 122 }
107 if o.To <= 0 { 123 if o.To <= 0 {
108 o.To = -1 124 o.To = -1
109 } 125 }
110 » return c.baseClient.Bucket(bucket).Object(relpath).NewRangeReader(c, o.F rom, o.To) 126
127 » obj, err := c.handleForPath(p)
128 » if err != nil {
129 » » return nil, err
130 » }
131 » return obj.NewRangeReader(c, o.From, o.To)
111 } 132 }
112 133
113 func (c *prodClient) Delete(bucket, relpath string) error { 134 func (c *prodClient) Rename(src, dst Path) error {
dnj 2016/04/11 17:20:04 This is a new API.
114 » obj := c.baseClient.Bucket(bucket).Object(relpath) 135 » srcObj, err := c.handleForPath(src)
115 » return retry.Retry(c, retry.TransientOnly(retry.Default), func() error { 136 » if err != nil {
116 » » if err := obj.Delete(c); err != nil { 137 » » return fmt.Errorf("invalid source path: %s", err)
138 » }
139
140 » dstObj, err := c.handleForPath(dst)
141 » if err != nil {
142 » » return fmt.Errorf("invalid destination path: %s", err)
143 » }
144
145 » // First stage: CopyTo
146 » err = retry.Retry(c, retry.TransientOnly(retry.Default), func() error {
147 » » if _, err := srcObj.CopyTo(c, dstObj, nil); err != nil {
117 // The storage library doesn't return gs.ErrObjectNotExi st when Delete 148 // The storage library doesn't return gs.ErrObjectNotExi st when Delete
118 // returns a 404. Catch that explicitly. 149 // returns a 404. Catch that explicitly.
119 » » » if t, ok := err.(*googleapi.Error); ok { 150 » » » if isNotFoundError(err) {
120 » » » » switch t.Code { 151 » » » » return err
121 » » » » case http.StatusNotFound:
122 » » » » » // Delete failed because the object did not exist.
123 » » » » » return nil
124 » » » » }
125 } 152 }
126 153
127 // Assume all unexpected errors are transient. 154 // Assume all unexpected errors are transient.
155 return errors.WrapTransient(err)
156 }
157 return nil
158 }, func(err error, d time.Duration) {
159 log.Fields{
160 log.ErrorKey: err,
161 "delay": d,
162 "src": src,
163 "dst": dst,
164 }.Warningf(c, "Transient error copying GS file. Retrying...")
165 })
166 if err != nil {
167 return err
168 }
169
170 // Second stage: Delete. This is not fatal.
171 if err := c.deleteObject(srcObj); err != nil {
172 log.Fields{
173 log.ErrorKey: err,
174 "path": src,
175 }.Warningf(c, "(Non-fatal) Failed to delete source during rename .")
176 }
177 return nil
178 }
179
180 func (c *prodClient) Delete(p Path) error {
181 dstObj, err := c.handleForPath(p)
182 if err != nil {
183 return fmt.Errorf("invalid path: %s", err)
184 }
185
186 return c.deleteObject(dstObj)
187 }
188
189 func (c *prodClient) deleteObject(o *gs.ObjectHandle) error {
190 return retry.Retry(c, retry.TransientOnly(retry.Default), func() error {
191 if err := o.Delete(c); err != nil {
192 // The storage library doesn't return gs.ErrObjectNotExi st when Delete
193 // returns a 404. Catch that explicitly.
194 if isNotFoundError(err) {
195 // If the file wasn't found, then the delete "su cceeded".
196 return nil
197 }
198
199 // Assume all unexpected errors are transient.
128 return errors.WrapTransient(err) 200 return errors.WrapTransient(err)
129 } 201 }
130 return nil 202 return nil
131 }, func(err error, d time.Duration) { 203 }, func(err error, d time.Duration) {
132 log.Fields{ 204 log.Fields{
133 log.ErrorKey: err, 205 log.ErrorKey: err,
134 "delay": d, 206 "delay": d,
135 » » » "bucket": bucket, 207 » » }.Warningf(c, "Transient error deleting file. Retrying...")
136 » » » "path": relpath,
137 » » }.Warningf(c, "Transient error deleting GS file. Retrying...")
138 }) 208 })
139 } 209 }
140 210
141 func (c *prodClient) newClient() (*gs.Client, error) { 211 func (c *prodClient) newClient() (*gs.Client, error) {
142 var optsArray [1]cloud.ClientOption 212 var optsArray [1]cloud.ClientOption
143 opts := optsArray[:0] 213 opts := optsArray[:0]
144 if c.rt != nil { 214 if c.rt != nil {
145 opts = append(opts, cloud.WithBaseHTTP(&http.Client{ 215 opts = append(opts, cloud.WithBaseHTTP(&http.Client{
146 Transport: c.rt, 216 Transport: c.rt,
147 })) 217 }))
148 } 218 }
149 return gs.NewClient(c, opts...) 219 return gs.NewClient(c, opts...)
150 } 220 }
221
222 func (c *prodClient) handleForPath(p Path) (*gs.ObjectHandle, error) {
223 bucket, filename, err := splitPathErr(p)
224 if err != nil {
225 return nil, err
226 }
227 return c.baseClient.Bucket(bucket).Object(filename), nil
228 }
229
230 func splitPathErr(p Path) (bucket, filename string, err error) {
231 bucket, filename = p.Split()
232 switch {
233 case bucket == "":
234 err = errors.New("path has no bucket")
235 case filename == "":
236 err = errors.New("path has no filename")
237 }
238 return
239 }
240
241 func isNotFoundError(err error) bool {
242 // The storage library doesn't return gs.ErrObjectNotExist when Delete
243 // returns a 404. Catch that explicitly.
244 if t, ok := err.(*googleapi.Error); ok {
245 switch t.Code {
246 case http.StatusNotFound:
247 return true
248 }
249 }
250 return false
251 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698