Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(334)

Unified Diff: appengine/logdog/coordinator/archivalPublisher.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/archivalPublisher.go
diff --git a/appengine/logdog/coordinator/archivalPublisher.go b/appengine/logdog/coordinator/archivalPublisher.go
new file mode 100644
index 0000000000000000000000000000000000000000..51c95d396485d010d313ec8926aeff2dc37ae084
--- /dev/null
+++ b/appengine/logdog/coordinator/archivalPublisher.go
@@ -0,0 +1,51 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package coordinator
+
+import (
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
+ log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/retry"
+ "golang.org/x/net/context"
+ gcps "google.golang.org/cloud/pubsub"
+)
+
+// ArchivalPublisher is capable of publishing archival requests.
+type ArchivalPublisher interface {
dnj 2016/04/11 17:20:04 This exists so we can mock it in tests and assert
iannucci 2016/04/19 00:55:28 Since it's just one function, maybe just make this
dnj 2016/04/20 18:15:10 ATM I'm not binding this to Pub/Sub, so some prope
+ // Publish publishes the supplied ArchiveTask.
+ Publish(context.Context, *logdog.ArchiveTask) error
+}
+
+type pubsubArchivalPublisher struct {
+ // topic is the authenticated Pub/Sub topic handle to publish to.
+ topic *gcps.TopicHandle
+}
+
+func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTask) error {
+ d, err := proto.Marshal(t)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to marshal task.")
+ return err
+ }
+
+ // TODO: Route this through some system (e.g., task queue, tumble) that can
+ // impose a dispatch delay for the settle period.
+ msg := gcps.Message{
+ Data: d,
+ }
+
+ return retry.Retry(c, retry.Default, func() error {
+ _, err := p.topic.Publish(c, &msg)
+ return err
+ }, func(err error, d time.Duration) {
+ log.Fields{
+ log.ErrorKey: err,
+ "delay": d,
+ }.Warningf(c, "Failed to publish task. Retrying...")
+ })
+}

Powered by Google App Engine
This is Rietveld 408576698