Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(304)

Side by Side Diff: appengine/logdog/coordinator/archivalPublisher.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package coordinator
6
7 import (
8 "time"
9
10 "github.com/golang/protobuf/proto"
11 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
12 log "github.com/luci/luci-go/common/logging"
13 "github.com/luci/luci-go/common/retry"
14 "golang.org/x/net/context"
15 gcps "google.golang.org/cloud/pubsub"
16 )
17
18 // ArchivalPublisher is capable of publishing archival requests.
19 type ArchivalPublisher interface {
dnj 2016/04/11 17:20:04 This exists so we can mock it in tests and assert
iannucci 2016/04/19 00:55:28 Since it's just one function, maybe just make this
dnj 2016/04/20 18:15:10 ATM I'm not binding this to Pub/Sub, so some prope
20 // Publish publishes the supplied ArchiveTask.
21 Publish(context.Context, *logdog.ArchiveTask) error
22 }
23
24 type pubsubArchivalPublisher struct {
25 // topic is the authenticated Pub/Sub topic handle to publish to.
26 topic *gcps.TopicHandle
27 }
28
29 func (p *pubsubArchivalPublisher) Publish(c context.Context, t *logdog.ArchiveTa sk) error {
30 d, err := proto.Marshal(t)
31 if err != nil {
32 log.WithError(err).Errorf(c, "Failed to marshal task.")
33 return err
34 }
35
36 // TODO: Route this through some system (e.g., task queue, tumble) that can
37 // impose a dispatch delay for the settle period.
38 msg := gcps.Message{
39 Data: d,
40 }
41
42 return retry.Retry(c, retry.Default, func() error {
43 _, err := p.topic.Publish(c, &msg)
44 return err
45 }, func(err error, d time.Duration) {
46 log.Fields{
47 log.ErrorKey: err,
48 "delay": d,
49 }.Warningf(c, "Failed to publish task. Retrying...")
50 })
51 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698