| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package main |
| 6 |
| 7 import ( |
| 8 "time" |
| 9 |
| 10 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 11 log "github.com/luci/luci-go/common/logging" |
| 12 "github.com/luci/luci-go/common/retry" |
| 13 "golang.org/x/net/context" |
| 14 gcps "google.golang.org/cloud/pubsub" |
| 15 ) |
| 16 |
| 17 // pubsubArchiveTask implements the archivist.Task interface for a ArchiveTask |
| 18 // Pub/Sub message. |
| 19 type pubSubArchivistTask struct { |
| 20 // Context is a cloud package authenticated Context that can be used for
raw |
| 21 // Pub/Sub interaction. This is necessary because ModifyAckDeadline is n
ot |
| 22 // available to the "new API" Client. |
| 23 context.Context |
| 24 |
| 25 // subscriptionName is the name of the subscription that this task was p
ulled |
| 26 // from. This is NOT the full subscription path. |
| 27 subscriptionName string |
| 28 // msg is the message that this task is bound to. |
| 29 msg *gcps.Message |
| 30 } |
| 31 |
| 32 func (t *pubSubArchivistTask) UniqueID() string { |
| 33 // The Message's AckID is guaranteed to be unique for a single lease. |
| 34 return t.msg.AckID |
| 35 } |
| 36 |
| 37 func (t *pubSubArchivistTask) Data() []byte { |
| 38 return t.msg.Data |
| 39 } |
| 40 |
| 41 func (t *pubSubArchivistTask) AssertLease(c context.Context) error { |
| 42 return retry.Retry(c, retry.Default, func() error { |
| 43 // Call ModifyAckDeadline directly, since we need immediate conf
irmation of |
| 44 // our continued ownership of the ACK. This will change the ACK'
s state |
| 45 // from that expected by the Message Iterator's keepalive system
; however, |
| 46 // since we're extending it to the maximum deadline, worst-case
the |
| 47 // keepalive will underestimate it and aggressively modify it. |
| 48 // |
| 49 // In practice, we tell the keepalive to use the maximum ACK dea
dline too, |
| 50 // so the disconnect will be minor at best. |
| 51 return gcps.ModifyAckDeadline(t, t.subscriptionName, t.msg.AckID
, pubsub.MaxACKDeadline) |
| 52 }, func(err error, d time.Duration) { |
| 53 log.Fields{ |
| 54 log.ErrorKey: err, |
| 55 "delay": d, |
| 56 }.Warningf(c, "Failed to modify ACK deadline. Retrying...") |
| 57 }) |
| 58 } |
| OLD | NEW |