| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package backend | 5 package backend |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "net/http" | 9 "net/http" |
| 10 "net/http/httptest" | 10 "net/http/httptest" |
| 11 "testing" | 11 "testing" |
| 12 "time" | 12 "time" |
| 13 | 13 |
| 14 "github.com/julienschmidt/httprouter" | 14 "github.com/julienschmidt/httprouter" |
| 15 "github.com/luci/gae/filter/featureBreaker" | |
| 16 ds "github.com/luci/gae/service/datastore" | 15 ds "github.com/luci/gae/service/datastore" |
| 17 tq "github.com/luci/gae/service/taskqueue" | |
| 18 "github.com/luci/luci-go/appengine/gaetesting" | 16 "github.com/luci/luci-go/appengine/gaetesting" |
| 19 "github.com/luci/luci-go/appengine/logdog/coordinator" | 17 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 20 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" | 18 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" |
| 21 "github.com/luci/luci-go/common/clock" | 19 "github.com/luci/luci-go/common/clock" |
| 22 "github.com/luci/luci-go/common/clock/testclock" | 20 "github.com/luci/luci-go/common/clock/testclock" |
| 23 "github.com/luci/luci-go/common/errors" | 21 "github.com/luci/luci-go/common/errors" |
| 24 "github.com/luci/luci-go/common/proto/google" | 22 "github.com/luci/luci-go/common/proto/google" |
| 25 "github.com/luci/luci-go/common/proto/logdog/svcconfig" | |
| 26 | 23 |
| 27 . "github.com/smartystreets/goconvey/convey" | 24 . "github.com/smartystreets/goconvey/convey" |
| 28 ) | 25 ) |
| 29 | 26 |
| 30 func TestHandleArchiveCron(t *testing.T) { | 27 func TestHandleArchiveCron(t *testing.T) { |
| 31 t.Parallel() | 28 t.Parallel() |
| 32 | 29 |
| 33 Convey(`A testing environment`, t, func() { | 30 Convey(`A testing environment`, t, func() { |
| 34 c := gaetesting.TestingContext() | 31 c := gaetesting.TestingContext() |
| 35 c, _ = testclock.UseTime(c, testclock.TestTimeUTC) | 32 c, _ = testclock.UseTime(c, testclock.TestTimeUTC) |
| 36 | 33 |
| 37 // Add the archival index from "index.yaml". | 34 // Add the archival index from "index.yaml". |
| 38 ds.Get(c).Testable().AddIndexes( | 35 ds.Get(c).Testable().AddIndexes( |
| 39 &ds.IndexDefinition{ | 36 &ds.IndexDefinition{ |
| 40 Kind: "LogStream", | 37 Kind: "LogStream", |
| 41 SortBy: []ds.IndexColumn{ | 38 SortBy: []ds.IndexColumn{ |
| 42 {Property: "State"}, | 39 {Property: "State"}, |
| 43 » » » » » {Property: "Updated"}, | 40 » » » » » {Property: "Created", Descending: true}, |
| 44 }, | 41 }, |
| 45 }, | 42 }, |
| 46 ) | 43 ) |
| 47 | 44 |
| 48 // Install a base set of streams. | 45 // Install a base set of streams. |
| 46 // |
| 47 // If the stream was created >= 10min ago, it will be candidate
for |
| 48 // archival. |
| 49 now := clock.Now(c) |
| 49 for _, v := range []struct { | 50 for _, v := range []struct { |
| 50 » » » name string | 51 » » » name string |
| 51 » » » d time.Duration | 52 » » » d time.Duration // Time before "now" for this stream
s Created. |
| 52 » » » term bool | 53 » » » state coordinator.LogStreamState |
| 53 }{ | 54 }{ |
| 54 » » » {"foo", 0, true}, // Not candidate for a
rchival (too soon). | 55 » » » {"foo", 0, coordinator.LSStreaming},
// Not candidate for archival (too soon). |
| 55 » » » {"bar", 10 * time.Minute, true}, // Candidate for archi
val. | 56 » » » {"bar", 10 * time.Minute, coordinator.LSStreaming},
// Candidate for archival. |
| 56 » » » {"baz", 10 * time.Minute, false}, // Not candidate for a
rchival (not terminal). | 57 » » » {"baz", 10 * time.Minute, coordinator.LSArchiveTasked},
// Not candidate for archival (archive tasked). |
| 57 » » » {"qux", 24 * time.Hour, false}, // Candidate for non-t
erminal archival. | 58 » » » {"qux", 10 * time.Hour, coordinator.LSArchived},
// Not candidate for archival (archived). |
| 59 » » » {"quux", 10 * time.Hour, coordinator.LSStreaming},
// Candidate for archival. |
| 58 } { | 60 } { |
| 59 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c,
v.name)) | 61 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c,
v.name)) |
| 60 | 62 |
| 61 // The entry was created a week ago. | 63 // The entry was created a week ago. |
| 62 » » » ls.Created = clock.Now(c).Add(-(7 * 24 * time.Hour)) | 64 » » » ls.Created = now.Add(-v.d) |
| 63 » » » ls.Updated = clock.Now(c).Add(-v.d) | 65 » » » ls.State = v.state |
| 64 » » » if v.term { | 66 » » » ls.TerminatedTime = now // Doesn't matter, but required
to Put the stream. |
| 65 » » » » ls.State = coordinator.LSTerminated | 67 » » » ls.ArchivedTime = now // Doesn't matter, but required
to Put the stream. |
| 66 » » » } | 68 » » » if err := ds.Get(c).Put(ls); err != nil { |
| 67 » » » if err := ls.Put(ds.Get(c)); err != nil { | |
| 68 panic(err) | 69 panic(err) |
| 69 } | 70 } |
| 70 } | 71 } |
| 71 ds.Get(c).Testable().CatchupIndexes() | 72 ds.Get(c).Testable().CatchupIndexes() |
| 72 | 73 |
| 73 // This allows us to update our Context in test setup. | 74 // This allows us to update our Context in test setup. |
| 75 tap := ct.ArchivalPublisher{} |
| 76 svcStub := ct.Services{ |
| 77 AP: func() (coordinator.ArchivalPublisher, error) { |
| 78 return &tap, nil |
| 79 }, |
| 80 } |
| 74 b := Backend{ | 81 b := Backend{ |
| 75 » » » multiTaskBatchSize: 5, | 82 » » » ServiceBase: coordinator.ServiceBase{&svcStub}, |
| 76 } | 83 } |
| 77 | 84 |
| 78 tb := testBase{Context: c} | 85 tb := testBase{Context: c} |
| 79 r := httprouter.New() | 86 r := httprouter.New() |
| 80 b.InstallHandlers(r, tb.base) | 87 b.InstallHandlers(r, tb.base) |
| 81 | 88 |
| 82 s := httptest.NewServer(r) | 89 s := httptest.NewServer(r) |
| 83 defer s.Close() | 90 defer s.Close() |
| 84 | 91 |
| 85 » » Convey(`With no configuration loaded`, func() { | 92 » » endpoint := fmt.Sprintf("%s/archive/cron", s.URL) |
| 86 » » » for _, ep := range []string{"terminal", "nonterminal"} { | 93 |
| 87 » » » » Convey(fmt.Sprintf(`A %q endpoint hit will fail.
`, ep), func() { | 94 » » Convey(`With no configuration loaded, the endpoint will fail.`,
func() { |
| 88 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar
chive/cron/%s", s.URL, ep)) | 95 » » » resp, err := http.Get(endpoint) |
| 89 » » » » » So(err, ShouldBeNil) | 96 » » » So(err, ShouldBeNil) |
| 90 » » » » » So(resp.StatusCode, ShouldEqual, http.St
atusInternalServerError) | 97 » » » So(resp.StatusCode, ShouldEqual, http.StatusInternalServ
erError) |
| 91 » » » » }) | 98 » » }) |
| 92 » » » } | 99 |
| 100 » » Convey(`With no task topic configured, the endpoint will fail.`,
func() { |
| 101 » » » svcStub.InitConfig() |
| 102 |
| 103 » » » resp, err := http.Get(endpoint) |
| 104 » » » So(err, ShouldBeNil) |
| 105 » » » So(resp.StatusCode, ShouldEqual, http.StatusInternalServ
erError) |
| 93 }) | 106 }) |
| 94 | 107 |
| 95 Convey(`With a configuration loaded`, func() { | 108 Convey(`With a configuration loaded`, func() { |
| 96 » » » qName := "archive-test-queue" | 109 » » » svcStub.InitConfig() |
| 97 » » » c = ct.UseConfig(c, &svcconfig.Coordinator{ | 110 » » » svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projec
ts/test/topics/archive-test-topic" |
| 98 » » » » ArchiveDelay: google.NewDuration(10 * time.M
inute), | 111 » » » svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay = g
oogle.NewDuration(10 * time.Second) |
| 99 » » » » ArchiveDelayMax: google.NewDuration(24 * time.H
our), | 112 » » » svcStub.ServiceConfig.Coordinator.ArchiveDelayMax = goog
le.NewDuration(10 * time.Minute) |
| 100 » » » » ArchiveTaskQueue: qName, | |
| 101 » » » }) | |
| 102 » » » tb.Context = c | |
| 103 | 113 |
| 104 » » » Convey(`With no task queue configured`, func() { | 114 » » » Convey(`A request to the endpoint will be successful.`,
func() { |
| 105 » » » » for _, ep := range []string{"terminal", "nonterm
inal", "purge"} { | 115 » » » » resp, err := http.Get(endpoint) |
| 106 » » » » » Convey(fmt.Sprintf(`A %q endpoint hit wi
ll fail.`, ep), func() { | 116 » » » » So(err, ShouldBeNil) |
| 107 » » » » » » resp, err := http.Get(fmt.Sprint
f("%s/archive/cron/%s", s.URL, ep)) | 117 » » » » So(resp.StatusCode, ShouldEqual, http.StatusOK) |
| 108 » » » » » » So(err, ShouldBeNil) | 118 |
| 109 » » » » » » So(resp.StatusCode, ShouldEqual,
http.StatusInternalServerError) | 119 » » » » // Candidate tasks should be scheduled. |
| 110 » » » » » }) | 120 » » » » So(tap.StreamNames(), ShouldResemble, []string{"
bar", "quux"}) |
| 111 » » » » } | 121 |
| 122 » » » » // Hit the endpoint again, no additional tasks s
hould be scheduled. |
| 123 » » » » Convey(`A subsequent endpoint hit will not sched
ule any additional tasks.`, func() { |
| 124 » » » » » resp, err = http.Get(endpoint) |
| 125 » » » » » So(err, ShouldBeNil) |
| 126 » » » » » So(resp.StatusCode, ShouldEqual, http.St
atusOK) |
| 127 » » » » » So(tap.StreamNames(), ShouldResemble, []
string{"bar", "quux"}) |
| 128 » » » » }) |
| 112 }) | 129 }) |
| 113 | 130 |
| 114 » » » Convey(`With a task queue configured`, func() { | 131 » » » Convey(`When scheduling multiple tasks`, func() { |
| 115 » » » » tq.Get(c).Testable().CreateQueue(qName) | 132 » » » » b.multiTaskBatchSize = 5 |
| 116 | 133 |
| 117 » » » » Convey(`A terminal endpoint hit will be successf
ul and idempotent.`, func() { | 134 » » » » // Create a lot of archival candidate tasks to s
chedule. |
| 118 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar
chive/cron/terminal", s.URL)) | 135 » » » » // |
| 136 » » » » // Note that since task queue names are returned
sorted, we want to |
| 137 » » » » // name these streams greater than our stock str
eam names. |
| 138 » » » » names := []string{"bar", "quux"} |
| 139 » » » » for i := 0; i < 11; i++ { |
| 140 » » » » » ls := ct.TestLogStream(c, ct.TestLogStre
amDescriptor(c, fmt.Sprintf("stream-%02d", i))) |
| 141 |
| 142 » » » » » ls.Created = now.Add(-(10 * time.Minute)
) |
| 143 » » » » » ls.State = coordinator.LSStreaming |
| 144 » » » » » ls.TerminalIndex = 1337 |
| 145 » » » » » ls.TerminatedTime = now |
| 146 » » » » » if err := ds.Get(c).Put(ls); err != nil
{ |
| 147 » » » » » » panic(err) |
| 148 » » » » » } |
| 149 |
| 150 » » » » » names = append(names, ls.Name) |
| 151 » » » » } |
| 152 » » » » ds.Get(c).Testable().CatchupIndexes() |
| 153 |
| 154 » » » » Convey(`Will schedule all pages properly.`, func
() { |
| 155 » » » » » // Ensure that all of these tasks get ad
ded to the task queue. |
| 156 » » » » » resp, err := http.Get(endpoint) |
| 119 So(err, ShouldBeNil) | 157 So(err, ShouldBeNil) |
| 120 So(resp.StatusCode, ShouldEqual, http.St
atusOK) | 158 So(resp.StatusCode, ShouldEqual, http.St
atusOK) |
| 159 So(tap.StreamNames(), ShouldResemble, na
mes) |
| 121 | 160 |
| 122 » » » » » // Candidate tasks should be scheduled. | 161 » » » » » // Use this opportunity to assert that n
one of the scheduled streams |
| 123 » » » » » tasks := tq.Get(c).Testable().GetSchedul
edTasks()[qName] | 162 » » » » » // have any settle or completion delay. |
| 124 » » » » » So(tasks, shouldHaveTasks, archiveTaskNa
me("testing/+/bar")) | 163 » » » » » for _, at := range tap.Tasks() { |
| 164 » » » » » » So(at.SettleDelay.Duration(), Sh
ouldEqual, 0) |
| 165 » » » » » » So(at.CompletePeriod.Duration(),
ShouldEqual, 0) |
| 166 » » » » » } |
| 125 | 167 |
| 126 » » » » » // Hit the endpoint again, the same task
s should be scheduled. | 168 » » » » » Convey(`Will not schedule additional tas
ks on the next run.`, func() { |
| 127 » » » » » resp, err = http.Get(fmt.Sprintf("%s/arc
hive/cron/terminal", s.URL)) | 169 » » » » » » resp, err := http.Get(endpoint) |
| 128 » » » » » So(err, ShouldBeNil) | 170 » » » » » » So(err, ShouldBeNil) |
| 129 » » » » » So(resp.StatusCode, ShouldEqual, http.St
atusOK) | 171 » » » » » » So(resp.StatusCode, ShouldEqual,
http.StatusOK) |
| 130 | 172 » » » » » » So(tap.StreamNames(), ShouldRese
mble, names) |
| 131 » » » » » tasks2 := tq.Get(c).Testable().GetSchedu
ledTasks()[qName] | 173 » » » » » }) |
| 132 » » » » » So(tasks2, ShouldResemble, tasks) | |
| 133 }) | 174 }) |
| 134 | 175 |
| 135 » » » » Convey(`A non-terminal endpoint hit will be succ
essful and idempotent.`, func() { | 176 » » » » Convey(`Will not schedule tasks if task scheduli
ng fails.`, func() { |
| 136 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar
chive/cron/nonterminal", s.URL)) | 177 » » » » » tap.Err = errors.New("test error") |
| 178 |
| 179 » » » » » // Ensure that all of these tasks get ad
ded to the task queue. |
| 180 » » » » » resp, err := http.Get(endpoint) |
| 137 So(err, ShouldBeNil) | 181 So(err, ShouldBeNil) |
| 138 » » » » » So(resp.StatusCode, ShouldEqual, http.St
atusOK) | 182 » » » » » So(resp.StatusCode, ShouldEqual, http.St
atusInternalServerError) |
| 183 » » » » » So(tap.StreamNames(), ShouldResemble, []
string{}) |
| 139 | 184 |
| 140 » » » » » // Candidate tasks should be scheduled. | 185 » » » » » Convey(`And will schedule streams next r
un.`, func() { |
| 141 » » » » » tasks := tq.Get(c).Testable().GetSchedul
edTasks()[qName] | 186 » » » » » » tap.Err = nil |
| 142 » » » » » So(tasks, shouldHaveTasks, archiveTaskNa
me("testing/+/qux")) | |
| 143 | 187 |
| 144 » » » » » // Hit the endpoint again, the same task
s should be scheduled. | 188 » » » » » » resp, err := http.Get(endpoint) |
| 145 » » » » » resp, err = http.Get(fmt.Sprintf("%s/arc
hive/cron/nonterminal", s.URL)) | |
| 146 » » » » » So(err, ShouldBeNil) | |
| 147 » » » » » So(resp.StatusCode, ShouldEqual, http.St
atusOK) | |
| 148 | |
| 149 » » » » » tasks2 := tq.Get(c).Testable().GetSchedu
ledTasks()[qName] | |
| 150 » » » » » So(tasks2, ShouldResemble, tasks) | |
| 151 » » » » }) | |
| 152 | |
| 153 » » » » Convey(`A terminal endpoint hit followed by a no
n-terminal endpoint hit will be successful.`, func() { | |
| 154 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar
chive/cron/terminal", s.URL)) | |
| 155 » » » » » So(err, ShouldBeNil) | |
| 156 » » » » » So(resp.StatusCode, ShouldEqual, http.St
atusOK) | |
| 157 » » » » » So(tq.Get(c).Testable().GetScheduledTask
s()[qName], shouldHaveTasks, archiveTaskName("testing/+/bar")) | |
| 158 | |
| 159 » » » » » resp, err = http.Get(fmt.Sprintf("%s/arc
hive/cron/nonterminal", s.URL)) | |
| 160 » » » » » So(err, ShouldBeNil) | |
| 161 » » » » » So(resp.StatusCode, ShouldEqual, http.St
atusOK) | |
| 162 » » » » » So(tq.Get(c).Testable().GetScheduledTask
s()[qName], shouldHaveTasks, | |
| 163 » » » » » » archiveTaskName("testing/+/bar")
, archiveTaskName("testing/+/qux")) | |
| 164 » » » » }) | |
| 165 | |
| 166 » » » » Convey(`When scheduling multiple tasks`, func()
{ | |
| 167 » » » » » // Create a lot of archival candidate ta
sks to schedule. | |
| 168 » » » » » var names []string | |
| 169 » » » » » for i := 0; i < 11; i++ { | |
| 170 » » » » » » ls := ct.TestLogStream(c, ct.Tes
tLogStreamDescriptor(c, fmt.Sprintf("stream-%d", i))) | |
| 171 | |
| 172 » » » » » » ls.Created = clock.Now(c).Add(-(
10 * time.Minute)) | |
| 173 » » » » » » ls.Updated = ls.Created | |
| 174 » » » » » » ls.State = coordinator.LSTermina
ted | |
| 175 » » » » » » ls.TerminalIndex = 1337 | |
| 176 » » » » » » if err := ls.Put(ds.Get(c)); err
!= nil { | |
| 177 » » » » » » » panic(err) | |
| 178 » » » » » » } | |
| 179 | |
| 180 » » » » » » names = append(names, ls.Name) | |
| 181 » » » » » } | |
| 182 » » » » » names = append(names, "bar") | |
| 183 » » » » » ds.Get(c).Testable().CatchupIndexes() | |
| 184 | |
| 185 » » » » » Convey(`Will schedule all pages properly
.`, func() { | |
| 186 » » » » » » taskNames := make([]interface{},
len(names)) | |
| 187 » » » » » » for i, n := range names { | |
| 188 » » » » » » » taskNames[i] = archiveTa
skName(fmt.Sprintf("testing/+/%s", n)) | |
| 189 » » » » » » } | |
| 190 | |
| 191 » » » » » » // Ensure that all of these task
s get added to the task queue. | |
| 192 » » » » » » resp, err := http.Get(fmt.Sprint
f("%s/archive/cron/terminal", s.URL)) | |
| 193 So(err, ShouldBeNil) | 189 So(err, ShouldBeNil) |
| 194 So(resp.StatusCode, ShouldEqual,
http.StatusOK) | 190 So(resp.StatusCode, ShouldEqual,
http.StatusOK) |
| 195 » » » » » » So(tq.Get(c).Testable().GetSched
uledTasks()[qName], shouldHaveTasks, taskNames...) | 191 » » » » » » So(tap.StreamNames(), ShouldRese
mble, names) |
| 196 | |
| 197 » » » » » » Convey(`Will be successful when
rescheduling the same tasks.`, func() { | |
| 198 » » » » » » » resp, err := http.Get(fm
t.Sprintf("%s/archive/cron/terminal", s.URL)) | |
| 199 » » » » » » » So(err, ShouldBeNil) | |
| 200 » » » » » » » So(resp.StatusCode, Shou
ldEqual, http.StatusOK) | |
| 201 » » » » » » » So(tq.Get(c).Testable().
GetScheduledTasks()[qName], shouldHaveTasks, taskNames...) | |
| 202 » » » » » » }) | |
| 203 » » » » » }) | |
| 204 | |
| 205 » » » » » Convey(`Will return an error if task sch
eduling fails.`, func() { | |
| 206 » » » » » » c, fb := featureBreaker.FilterTQ
(c, nil) | |
| 207 » » » » » » tb.Context = c | |
| 208 » » » » » » fb.BreakFeatures(errors.New("tes
t error"), "AddMulti") | |
| 209 | |
| 210 » » » » » » // Ensure that all of these task
s get added to the task queue. | |
| 211 » » » » » » resp, err := http.Get(fmt.Sprint
f("%s/archive/cron/terminal", s.URL)) | |
| 212 » » » » » » So(err, ShouldBeNil) | |
| 213 » » » » » » So(resp.StatusCode, ShouldEqual,
http.StatusInternalServerError) | |
| 214 » » » » » }) | |
| 215 | |
| 216 » » » » » Convey(`Will return an error if a single
task scheduling fails.`, func() { | |
| 217 » » » » » » merr := make(errors.MultiError,
len(names)) | |
| 218 » » » » » » merr[0] = errors.New("test error
") | |
| 219 | |
| 220 » » » » » » c, fb := featureBreaker.FilterTQ
(c, nil) | |
| 221 » » » » » » tb.Context = c | |
| 222 » » » » » » fb.BreakFeatures(merr, "AddMulti
") | |
| 223 | |
| 224 » » » » » » // Ensure that all of these task
s get added to the task queue. | |
| 225 » » » » » » resp, err := http.Get(fmt.Sprint
f("%s/archive/cron/terminal", s.URL)) | |
| 226 » » » » » » So(err, ShouldBeNil) | |
| 227 » » » » » » So(resp.StatusCode, ShouldEqual,
http.StatusInternalServerError) | |
| 228 » » » » » }) | |
| 229 | |
| 230 » » » » » Convey(`And a purge endpoint hit will pu
rge the tasks.`, func() { | |
| 231 » » » » » » resp, err := http.Get(fmt.Sprint
f("%s/archive/cron/purge", s.URL)) | |
| 232 » » » » » » So(err, ShouldBeNil) | |
| 233 » » » » » » So(resp.StatusCode, ShouldEqual,
http.StatusOK) | |
| 234 » » » » » » So(tq.Get(c).Testable().GetSched
uledTasks()[qName], shouldHaveTasks) | |
| 235 }) | 192 }) |
| 236 }) | 193 }) |
| 237 }) | 194 }) |
| 238 }) | 195 }) |
| 239 }) | 196 }) |
| 240 } | 197 } |
| OLD | NEW |