Index: go/src/infra/tools/cipd/client.go |
diff --git a/go/src/infra/tools/cipd/client.go b/go/src/infra/tools/cipd/client.go |
index 546ad2be63a89ed01994a2f3e29f2e9f69c8f172..614ab8d6c3da8466523c50e97c498a9e60c12e28 100644 |
--- a/go/src/infra/tools/cipd/client.go |
+++ b/go/src/infra/tools/cipd/client.go |
@@ -41,13 +41,16 @@ import ( |
"io" |
"net/http" |
"os" |
+ "path/filepath" |
"sort" |
"strings" |
+ "sync" |
"time" |
"github.com/luci/luci-go/common/logging" |
"infra/tools/cipd/common" |
+ "infra/tools/cipd/internal" |
"infra/tools/cipd/local" |
) |
@@ -76,29 +79,29 @@ const ( |
var ( |
// ErrFinalizationTimeout is returned if CAS service can not finalize upload fast enough. |
- ErrFinalizationTimeout = errors.New("Timeout while waiting for CAS service to finalize the upload") |
+ ErrFinalizationTimeout = errors.New("timeout while waiting for CAS service to finalize the upload") |
// ErrBadUpload is returned when a package file is uploaded, but servers asks us to upload it again. |
- ErrBadUpload = errors.New("Package file is uploaded, but servers asks us to upload it again") |
+ ErrBadUpload = errors.New("package file is uploaded, but servers asks us to upload it again") |
// ErrBadUploadSession is returned by UploadToCAS if provided UploadSession is not valid. |
- ErrBadUploadSession = errors.New("UploadURL must be set if UploadSessionID is used") |
+ ErrBadUploadSession = errors.New("uploadURL must be set if UploadSessionID is used") |
// ErrUploadSessionDied is returned by UploadToCAS if upload session suddenly disappeared. |
- ErrUploadSessionDied = errors.New("Upload session is unexpectedly missing") |
+ ErrUploadSessionDied = errors.New("upload session is unexpectedly missing") |
// ErrNoUploadSessionID is returned by UploadToCAS if server didn't provide upload session ID. |
- ErrNoUploadSessionID = errors.New("Server didn't provide upload session ID") |
+ ErrNoUploadSessionID = errors.New("server didn't provide upload session ID") |
// ErrSetRefTimeout is returned when service refuses to move a ref for a long time. |
- ErrSetRefTimeout = errors.New("Timeout while moving a ref") |
+ ErrSetRefTimeout = errors.New("timeout while moving a ref") |
// ErrAttachTagsTimeout is returned when service refuses to accept tags for a long time. |
- ErrAttachTagsTimeout = errors.New("Timeout while attaching tags") |
+ ErrAttachTagsTimeout = errors.New("timeout while attaching tags") |
// ErrDownloadError is returned by FetchInstance on download errors. |
- ErrDownloadError = errors.New("Failed to download the package file after multiple attempts") |
+ ErrDownloadError = errors.New("failed to download the package file after multiple attempts") |
// ErrUploadError is returned by RegisterInstance and UploadToCAS on upload errors. |
- ErrUploadError = errors.New("Failed to upload the package file after multiple attempts") |
+ ErrUploadError = errors.New("failed to upload the package file after multiple attempts") |
// ErrAccessDenined is returned by calls talking to backend on 401 or 403 HTTP errors. |
- ErrAccessDenined = errors.New("Access denied (not authenticated or not enough permissions)") |
+ ErrAccessDenined = errors.New("access denied (not authenticated or not enough permissions)") |
// ErrBackendInaccessible is returned by calls talking to backed if it doesn't response. |
- ErrBackendInaccessible = errors.New("Request to the backend failed after multiple attempts") |
+ ErrBackendInaccessible = errors.New("request to the backend failed after multiple attempts") |
// ErrEnsurePackagesFailed is returned by EnsurePackages if something is not right. |
- ErrEnsurePackagesFailed = errors.New("Failed to update packages, see the log") |
+ ErrEnsurePackagesFailed = errors.New("failed to update packages, see the log") |
) |
// PackageACL is per package path per role access control list that is a part of |
@@ -135,7 +138,7 @@ type UploadSession struct { |
URL string |
} |
-// Client provides high-level CIPD client interface. |
+// Client provides high-level CIPD client interface. Thread safe. |
type Client interface { |
// FetchACL returns a list of PackageACL objects (parent paths first) that |
// together define the access control list for the given package subpath. |
@@ -195,6 +198,9 @@ type Client interface { |
// what packages (and versions) should be installed it will do all necessary |
// actions to bring the state of the site root to the desired one. |
EnsurePackages(pins []common.Pin) error |
+ |
+ // Close should be called to dump any cached state to disk. |
+ Close() |
} |
// HTTPClientFactory lazily creates http.Client to use for making requests. |
@@ -204,15 +210,25 @@ type HTTPClientFactory func() (*http.Client, error) |
type ClientOptions struct { |
// ServiceURL is root URL of the backend service. |
ServiceURL string |
- // Root is a site root directory (where packages will be installed). It can |
- // be empty string if client is not going to be used to deploy or remove local packages. |
+ |
+ // Root is a site root directory (a directory where packages will be |
+ // installed to). It also hosts .cipd/* directory that tracks internal state |
+ // of installed packages and keeps various cache files. 'Root' can be an empty |
+ // string if the client is not going to be used to deploy or remove local |
+ // packages. In that case caches are also disabled. |
Root string |
+ |
// Logger is a logger to use for logs (null-logger by default). |
Logger logging.Logger |
- // AuthenticatedClientFactory lazily creates http.Client to use for making RPC requests. |
+ |
+ // AuthenticatedClientFactory lazily creates http.Client to use for making |
+ // RPC requests. |
AuthenticatedClientFactory HTTPClientFactory |
- // AnonymousClientFactory lazily creates http.Client to use for making requests to storage. |
+ |
+ // AnonymousClientFactory lazily creates http.Client to use for making |
+ // requests to storage. |
AnonymousClientFactory HTTPClientFactory |
+ |
// UserAgent is put into User-Agent HTTP header with each request. |
UserAgent string |
} |
@@ -247,43 +263,110 @@ func NewClient(opts ClientOptions) Client { |
type clientImpl struct { |
ClientOptions |
- // clock provides current time and ability to sleep. |
+ // lock protects lazily initialized portions of the client. |
+ lock sync.Mutex |
+ |
+ // clock provides current time and ability to sleep. Thread safe. |
clock clock |
- // remote knows how to call backend REST API. |
+ |
+ // remote knows how to call backend REST API. Thread safe. |
remote remote |
+ |
// storage knows how to upload and download raw binaries using signed URLs. |
+ // Thread safe. |
storage storage |
- // deployer knows how to install packages to local file system. |
+ |
+ // deployer knows how to install packages to local file system. Thread safe. |
deployer local.Deployer |
- // authClient is a lazily created http.Client to use for authenticated requests. |
+ // tagCache is used to cache (pkgname, tag) -> instanceID mapping. |
+ // Thread safe, but lazily initialized under lock. |
+ tagCache *internal.TagCache |
+ |
+ // authClient is a lazily created http.Client to use for authenticated |
+ // requests. Thread safe, but lazily initialized under lock. |
authClient *http.Client |
+ |
// anonClient is a lazily created http.Client to use for anonymous requests. |
+ // Thread safe, but lazily initialized under lock. |
anonClient *http.Client |
} |
// doAuthenticatedHTTPRequest is used by remote implementation to make HTTP calls. |
func (client *clientImpl) doAuthenticatedHTTPRequest(req *http.Request) (*http.Response, error) { |
- if client.authClient == nil { |
- var err error |
- client.authClient, err = client.AuthenticatedClientFactory() |
- if err != nil { |
- return nil, err |
- } |
- } |
- return client.authClient.Do(req) |
+ return client.doRequest(req, &client.authClient, client.AuthenticatedClientFactory) |
} |
// doAnonymousHTTPRequest is used by storage implementation to make HTTP calls. |
func (client *clientImpl) doAnonymousHTTPRequest(req *http.Request) (*http.Response, error) { |
- if client.anonClient == nil { |
+ return client.doRequest(req, &client.anonClient, client.AnonymousClientFactory) |
+} |
+ |
+// doRequest lazy-initializes http.Client using provided factory and then |
+// executes the request. |
+func (client *clientImpl) doRequest(req *http.Request, c **http.Client, fac HTTPClientFactory) (*http.Response, error) { |
+ httpClient, err := func() (*http.Client, error) { |
+ client.lock.Lock() |
+ defer client.lock.Unlock() |
var err error |
- client.anonClient, err = client.AnonymousClientFactory() |
- if err != nil { |
- return nil, err |
+ if *c == nil { |
+ *c, err = fac() |
+ } |
+ return *c, err |
+ }() |
+ if err != nil { |
+ return nil, err |
+ } |
+ return httpClient.Do(req) |
+} |
+ |
+// tagCachePath returns path to a tag cache file or "" if no root dir. |
+func (client *clientImpl) tagCachePath() string { |
+ if client.Root == "" { |
+ return "" |
+ } |
+ return filepath.Join(client.Root, local.SiteServiceDir, "tagcache.db") |
+} |
+ |
+// getTagCache lazy-initializes tagCache instance and returns it. |
+func (client *clientImpl) getTagCache() *internal.TagCache { |
+ client.lock.Lock() |
+ defer client.lock.Unlock() |
+ if client.tagCache == nil { |
+ if path := client.tagCachePath(); path != "" { |
+ var err error |
+ client.tagCache, err = internal.LoadTagCacheFromFile(path) |
+ if err != nil { |
+ client.Logger.Warningf("cipd: failed to load tag cache - %s", err) |
+ } |
+ } |
+ if client.tagCache == nil { |
+ client.tagCache = &internal.TagCache{} |
} |
} |
- return client.anonClient.Do(req) |
+ return client.tagCache |
+} |
+ |
+// closeTagCache dumps any changes made to tag cache to disk, if necessary. |
+// Must be called under lock. |
+func (client *clientImpl) closeTagCache() { |
+ path := client.tagCachePath() |
+ if client.tagCache == nil || path == "" || !client.tagCache.Dirty() { |
+ client.tagCache = nil |
+ return |
+ } |
+ // It's tiny in size (and protobuf can't serialize to io.Reader anyway). Dump |
+ // it to disk via FileSystem object to deal with possible concurrent updates, |
+ // missing directories, etc. |
+ fs := local.NewFileSystem(filepath.Dir(path), client.Logger) |
+ out, err := client.tagCache.Save() |
+ if err == nil { |
+ err = fs.EnsureFile(path, out, 0666) |
+ } |
+ if err != nil { |
+ client.Logger.Warningf("cipd: failed to update tag cache - %s", err) |
+ } |
+ client.tagCache = nil |
} |
func (client *clientImpl) FetchACL(packagePath string) ([]PackageACL, error) { |
@@ -372,7 +455,23 @@ func (client *clientImpl) ResolveVersion(packageName, version string) (common.Pi |
if err := common.ValidateInstanceVersion(version); err != nil { |
return common.Pin{}, err |
} |
- return client.remote.resolveVersion(packageName, version) |
+ // Use local cache when resolving tags to avoid round trips to backend when |
+ // calling same 'cipd ensure' command again and again. |
+ isTag := common.ValidateInstanceTag(version) == nil |
+ if isTag { |
+ cached := client.getTagCache().ResolveTag(packageName, version) |
+ if cached.InstanceID != "" { |
+ return cached, nil |
+ } |
+ } |
+ pin, err := client.remote.resolveVersion(packageName, version) |
+ if err != nil { |
+ return pin, err |
+ } |
+ if isTag { |
+ client.getTagCache().AddTag(pin, version) |
+ } |
+ return pin, nil |
} |
func (client *clientImpl) RegisterInstance(instance local.PackageInstance) error { |
@@ -643,6 +742,14 @@ func (client *clientImpl) EnsurePackages(pins []common.Pin) error { |
return ErrEnsurePackagesFailed |
} |
+func (client *clientImpl) Close() { |
+ client.lock.Lock() |
+ defer client.lock.Unlock() |
+ client.closeTagCache() |
+ client.authClient = nil |
+ client.anonClient = nil |
+} |
+ |
//////////////////////////////////////////////////////////////////////////////// |
// Private structs and interfaces. |