Chromium Code Reviews| Index: appengine/logdog/coordinator/archivalPublisher.go |
| diff --git a/appengine/logdog/coordinator/archivalPublisher.go b/appengine/logdog/coordinator/archivalPublisher.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..51c95d396485d010d313ec8926aeff2dc37ae084 |
| --- /dev/null |
| +++ b/appengine/logdog/coordinator/archivalPublisher.go |
| @@ -0,0 +1,51 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package coordinator |
| + |
| +import ( |
| + "time" |
| + |
| + "github.com/golang/protobuf/proto" |
| + "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| + log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/retry" |
| + "golang.org/x/net/context" |
| + gcps "google.golang.org/cloud/pubsub" |
| +) |
| + |
| +// ArchivalPublisher is capable of publishing archival requests. |
| +type ArchivalPublisher interface { |
|
dnj
2016/04/11 17:20:04
This exists so we can mock it in tests and assert
iannucci
2016/04/19 00:55:28
Since it's just one function, maybe just make this
dnj
2016/04/20 18:15:10
ATM I'm not binding this to Pub/Sub, so some prope
|
| + // Publish publishes the supplied ArchiveTask. |
| + Publish(context.Context, *logdog.ArchiveTask) error |
| +} |
| + |
| +type pubsubArchivalPublisher struct { |
| + // topic is the authenticated Pub/Sub topic handle to publish to. |
| + topic *gcps.TopicHandle |
| +} |
| + |
| +func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTask) error { |
| + d, err := proto.Marshal(t) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to marshal task.") |
| + return err |
| + } |
| + |
| + // TODO: Route this through some system (e.g., task queue, tumble) that can |
| + // impose a dispatch delay for the settle period. |
| + msg := gcps.Message{ |
| + Data: d, |
| + } |
| + |
| + return retry.Retry(c, retry.Default, func() error { |
| + _, err := p.topic.Publish(c, &msg) |
| + return err |
| + }, func(err error, d time.Duration) { |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "delay": d, |
| + }.Warningf(c, "Failed to publish task. Retrying...") |
| + }) |
| +} |