| Index: common/gcloud/gs/gs.go
|
| diff --git a/common/gcloud/gs/gs.go b/common/gcloud/gs/gs.go
|
| index 3c1f08170a03de9c70bfc55aaf843fb8da6dae79..be2bdee9a2844cedb787777cc1b284bde9b9a05c 100644
|
| --- a/common/gcloud/gs/gs.go
|
| +++ b/common/gcloud/gs/gs.go
|
| @@ -5,6 +5,7 @@
|
| package gs
|
|
|
| import (
|
| + "fmt"
|
| "io"
|
| "net/http"
|
| "time"
|
| @@ -36,12 +37,22 @@ type Client interface {
|
| io.Closer
|
|
|
| // NewReader instantiates a new Reader instance for the named bucket/path.
|
| - NewReader(bucket, relpath string, o Options) (io.ReadCloser, error)
|
| + NewReader(p Path, o Options) (io.ReadCloser, error)
|
| +
|
| // NewWriter instantiates a new Writer instance for the named bucket/path.
|
| - NewWriter(bucket, relpath string) (Writer, error)
|
| - // Delete deletes the named Google Storage object. If the object doesn't
|
| - // exist, a nil error will be returned.
|
| - Delete(bucket, relpath string) error
|
| + NewWriter(p Path) (Writer, error)
|
| +
|
| + // Delete deletes the object at the specified path.
|
| + //
|
| + // If the object does not exist, it is considered a success.
|
| + Delete(p Path) error
|
| +
|
| + // Rename renames an object from one path to another.
|
| + //
|
| + // NOTE: The object should be removed from its original path, but current
|
| + // implementation uses two operations (Copy + Delete), so it may
|
| + // occasionally fail.
|
| + Rename(src, dst Path) error
|
| }
|
|
|
| // Options are the set of extra options to apply to the Google Storage request.
|
| @@ -91,37 +102,98 @@ func (c *prodClient) Close() error {
|
| return c.baseClient.Close()
|
| }
|
|
|
| -func (c *prodClient) NewWriter(bucket, relpath string) (Writer, error) {
|
| +func (c *prodClient) NewWriter(p Path) (Writer, error) {
|
| + bucket, filename, err := splitPathErr(p)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| return &prodWriter{
|
| Context: c,
|
| client: c,
|
| bucket: bucket,
|
| - relpath: relpath,
|
| + relpath: filename,
|
| }, nil
|
| }
|
|
|
| -func (c *prodClient) NewReader(bucket, relpath string, o Options) (io.ReadCloser, error) {
|
| +func (c *prodClient) NewReader(p Path, o Options) (io.ReadCloser, error) {
|
| if o.From < 0 {
|
| o.From = 0
|
| }
|
| if o.To <= 0 {
|
| o.To = -1
|
| }
|
| - return c.baseClient.Bucket(bucket).Object(relpath).NewRangeReader(c, o.From, o.To)
|
| +
|
| + obj, err := c.handleForPath(p)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + return obj.NewRangeReader(c, o.From, o.To)
|
| }
|
|
|
| -func (c *prodClient) Delete(bucket, relpath string) error {
|
| - obj := c.baseClient.Bucket(bucket).Object(relpath)
|
| +func (c *prodClient) Rename(src, dst Path) error {
|
| + srcObj, err := c.handleForPath(src)
|
| + if err != nil {
|
| + return fmt.Errorf("invalid source path: %s", err)
|
| + }
|
| +
|
| + dstObj, err := c.handleForPath(dst)
|
| + if err != nil {
|
| + return fmt.Errorf("invalid destination path: %s", err)
|
| + }
|
| +
|
| + // First stage: CopyTo
|
| + err = retry.Retry(c, retry.TransientOnly(retry.Default), func() error {
|
| + if _, err := srcObj.CopyTo(c, dstObj, nil); err != nil {
|
| + // The storage library doesn't return gs.ErrObjectNotExist when Delete
|
| + // returns a 404. Catch that explicitly.
|
| + if isNotFoundError(err) {
|
| + return err
|
| + }
|
| +
|
| + // Assume all unexpected errors are transient.
|
| + return errors.WrapTransient(err)
|
| + }
|
| + return nil
|
| + }, func(err error, d time.Duration) {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "delay": d,
|
| + "src": src,
|
| + "dst": dst,
|
| + }.Warningf(c, "Transient error copying GS file. Retrying...")
|
| + })
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + // Second stage: Delete. This is not fatal.
|
| + if err := c.deleteObject(srcObj); err != nil {
|
| + log.Fields{
|
| + log.ErrorKey: err,
|
| + "path": src,
|
| + }.Warningf(c, "(Non-fatal) Failed to delete source during rename.")
|
| + }
|
| + return nil
|
| +}
|
| +
|
| +func (c *prodClient) Delete(p Path) error {
|
| + dstObj, err := c.handleForPath(p)
|
| + if err != nil {
|
| + return fmt.Errorf("invalid path: %s", err)
|
| + }
|
| +
|
| + return c.deleteObject(dstObj)
|
| +}
|
| +
|
| +func (c *prodClient) deleteObject(o *gs.ObjectHandle) error {
|
| return retry.Retry(c, retry.TransientOnly(retry.Default), func() error {
|
| - if err := obj.Delete(c); err != nil {
|
| + if err := o.Delete(c); err != nil {
|
| // The storage library doesn't return gs.ErrObjectNotExist when Delete
|
| // returns a 404. Catch that explicitly.
|
| - if t, ok := err.(*googleapi.Error); ok {
|
| - switch t.Code {
|
| - case http.StatusNotFound:
|
| - // Delete failed because the object did not exist.
|
| - return nil
|
| - }
|
| + if isNotFoundError(err) {
|
| + // If the file wasn't found, then the delete "succeeded".
|
| + return nil
|
| }
|
|
|
| // Assume all unexpected errors are transient.
|
| @@ -132,9 +204,7 @@ func (c *prodClient) Delete(bucket, relpath string) error {
|
| log.Fields{
|
| log.ErrorKey: err,
|
| "delay": d,
|
| - "bucket": bucket,
|
| - "path": relpath,
|
| - }.Warningf(c, "Transient error deleting GS file. Retrying...")
|
| + }.Warningf(c, "Transient error deleting file. Retrying...")
|
| })
|
| }
|
|
|
| @@ -148,3 +218,34 @@ func (c *prodClient) newClient() (*gs.Client, error) {
|
| }
|
| return gs.NewClient(c, opts...)
|
| }
|
| +
|
| +func (c *prodClient) handleForPath(p Path) (*gs.ObjectHandle, error) {
|
| + bucket, filename, err := splitPathErr(p)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + return c.baseClient.Bucket(bucket).Object(filename), nil
|
| +}
|
| +
|
| +func splitPathErr(p Path) (bucket, filename string, err error) {
|
| + bucket, filename = p.Split()
|
| + switch {
|
| + case bucket == "":
|
| + err = errors.New("path has no bucket")
|
| + case filename == "":
|
| + err = errors.New("path has no filename")
|
| + }
|
| + return
|
| +}
|
| +
|
| +func isNotFoundError(err error) bool {
|
| + // The storage library doesn't return gs.ErrObjectNotExist when Delete
|
| + // returns a 404. Catch that explicitly.
|
| + if t, ok := err.(*googleapi.Error); ok {
|
| + switch t.Code {
|
| + case http.StatusNotFound:
|
| + return true
|
| + }
|
| + }
|
| + return false
|
| +}
|
|
|