| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package main |
| 6 |
| 7 import ( |
| 8 "time" |
| 9 |
| 10 "github.com/golang/protobuf/proto" |
| 11 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 12 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 13 log "github.com/luci/luci-go/common/logging" |
| 14 "github.com/luci/luci-go/common/retry" |
| 15 "golang.org/x/net/context" |
| 16 gcps "google.golang.org/cloud/pubsub" |
| 17 ) |
| 18 |
| 19 // pubsubArchiveTask implements the archivist.Task interface for a ArchiveTask |
| 20 // Pub/Sub message. |
| 21 type pubSubArchivistTask struct { |
| 22 // Context is a cloud package authenticated Context that can be used for
raw |
| 23 // Pub/Sub interaction. This is necessary because ModifyAckDeadline is n
ot |
| 24 // available to the "new API" Client. |
| 25 context.Context |
| 26 |
| 27 // subscriptionName is the name of the subscription that this task was p
ulled |
| 28 // from. This is NOT the full subscription path. |
| 29 subscriptionName string |
| 30 // msg is the message that this task is bound to. |
| 31 msg *gcps.Message |
| 32 |
| 33 // at is the unmarshalled ArchiveTask from msg. |
| 34 at logdog.ArchiveTask |
| 35 } |
| 36 |
| 37 func makePubSubArchivistTask(c context.Context, s string, msg *gcps.Message) (*p
ubSubArchivistTask, error) { |
| 38 // If we can't decode the archival task, we can't decide whether or not
to |
| 39 // delete it, so we will leave it in the queue. |
| 40 t := pubSubArchivistTask{ |
| 41 Context: c, |
| 42 subscriptionName: s, |
| 43 msg: msg, |
| 44 } |
| 45 |
| 46 if err := proto.Unmarshal(msg.Data, &t.at); err != nil { |
| 47 return nil, err |
| 48 } |
| 49 return &t, nil |
| 50 } |
| 51 |
| 52 func (t *pubSubArchivistTask) UniqueID() string { |
| 53 // The Message's AckID is guaranteed to be unique for a single lease. |
| 54 return t.msg.AckID |
| 55 } |
| 56 |
| 57 func (t *pubSubArchivistTask) Task() *logdog.ArchiveTask { |
| 58 return &t.at |
| 59 } |
| 60 |
| 61 func (t *pubSubArchivistTask) AssertLease(c context.Context) error { |
| 62 return retry.Retry(c, retry.Default, func() error { |
| 63 // Call ModifyAckDeadline directly, since we need immediate conf
irmation of |
| 64 // our continued ownership of the ACK. This will change the ACK'
s state |
| 65 // from that expected by the Message Iterator's keepalive system
; however, |
| 66 // since we're extending it to the maximum deadline, worst-case
the |
| 67 // keepalive will underestimate it and aggressively modify it. |
| 68 // |
| 69 // In practice, we tell the keepalive to use the maximum ACK dea
dline too, |
| 70 // so the disconnect will be minor at best. |
| 71 return gcps.ModifyAckDeadline(t, t.subscriptionName, t.msg.AckID
, pubsub.MaxACKDeadline) |
| 72 }, func(err error, d time.Duration) { |
| 73 log.Fields{ |
| 74 log.ErrorKey: err, |
| 75 "delay": d, |
| 76 }.Warningf(c, "Failed to modify ACK deadline. Retrying...") |
| 77 }) |
| 78 } |
| OLD | NEW |