Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(166)

Side by Side Diff: appengine/logdog/coordinator/backend/archiveCron_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package backend 5 package backend
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "net/http" 9 "net/http"
10 "net/http/httptest" 10 "net/http/httptest"
11 "testing" 11 "testing"
12 "time" 12 "time"
13 13
14 "github.com/julienschmidt/httprouter" 14 "github.com/julienschmidt/httprouter"
15 "github.com/luci/gae/filter/featureBreaker"
16 ds "github.com/luci/gae/service/datastore" 15 ds "github.com/luci/gae/service/datastore"
17 tq "github.com/luci/gae/service/taskqueue"
18 "github.com/luci/luci-go/appengine/gaetesting" 16 "github.com/luci/luci-go/appengine/gaetesting"
19 "github.com/luci/luci-go/appengine/logdog/coordinator" 17 "github.com/luci/luci-go/appengine/logdog/coordinator"
20 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest " 18 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest "
21 "github.com/luci/luci-go/common/clock" 19 "github.com/luci/luci-go/common/clock"
22 "github.com/luci/luci-go/common/clock/testclock" 20 "github.com/luci/luci-go/common/clock/testclock"
23 "github.com/luci/luci-go/common/errors" 21 "github.com/luci/luci-go/common/errors"
24 "github.com/luci/luci-go/common/proto/google" 22 "github.com/luci/luci-go/common/proto/google"
25 "github.com/luci/luci-go/common/proto/logdog/svcconfig"
26 23
27 . "github.com/smartystreets/goconvey/convey" 24 . "github.com/smartystreets/goconvey/convey"
28 ) 25 )
29 26
30 func TestHandleArchiveCron(t *testing.T) { 27 func TestHandleArchiveCron(t *testing.T) {
31 t.Parallel() 28 t.Parallel()
32 29
33 Convey(`A testing environment`, t, func() { 30 Convey(`A testing environment`, t, func() {
34 c := gaetesting.TestingContext() 31 c := gaetesting.TestingContext()
35 c, _ = testclock.UseTime(c, testclock.TestTimeUTC) 32 c, _ = testclock.UseTime(c, testclock.TestTimeUTC)
36 33
37 // Add the archival index from "index.yaml". 34 // Add the archival index from "index.yaml".
38 ds.Get(c).Testable().AddIndexes( 35 ds.Get(c).Testable().AddIndexes(
39 &ds.IndexDefinition{ 36 &ds.IndexDefinition{
40 Kind: "LogStream", 37 Kind: "LogStream",
41 SortBy: []ds.IndexColumn{ 38 SortBy: []ds.IndexColumn{
42 {Property: "State"}, 39 {Property: "State"},
43 » » » » » {Property: "Updated"}, 40 » » » » » {Property: "Created", Descending: true},
44 }, 41 },
45 }, 42 },
46 ) 43 )
47 44
48 // Install a base set of streams. 45 // Install a base set of streams.
46 //
47 // If the stream was created >= 10min ago, it will be candidate for
48 // archival.
49 now := clock.Now(c)
49 for _, v := range []struct { 50 for _, v := range []struct {
50 » » » name string 51 » » » name string
51 » » » d time.Duration 52 » » » d time.Duration // Time before "now" for this stream s Created.
52 » » » term bool 53 » » » state coordinator.LogStreamState
53 }{ 54 }{
54 » » » {"foo", 0, true}, // Not candidate for a rchival (too soon). 55 » » » {"foo", 0, coordinator.LSStreaming}, // Not candidate for archival (too soon).
55 » » » {"bar", 10 * time.Minute, true}, // Candidate for archi val. 56 » » » {"bar", 10 * time.Minute, coordinator.LSStreaming}, // Candidate for archival.
56 » » » {"baz", 10 * time.Minute, false}, // Not candidate for a rchival (not terminal). 57 » » » {"baz", 10 * time.Minute, coordinator.LSArchiveTasked}, // Not candidate for archival (archive tasked).
57 » » » {"qux", 24 * time.Hour, false}, // Candidate for non-t erminal archival. 58 » » » {"qux", 10 * time.Hour, coordinator.LSArchived}, // Not candidate for archival (archived).
59 » » » {"quux", 10 * time.Hour, coordinator.LSStreaming}, // Candidate for archival.
58 } { 60 } {
59 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, v.name)) 61 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, v.name))
60 62
61 // The entry was created a week ago. 63 // The entry was created a week ago.
62 » » » ls.Created = clock.Now(c).Add(-(7 * 24 * time.Hour)) 64 » » » ls.Created = now.Add(-v.d)
63 » » » ls.Updated = clock.Now(c).Add(-v.d) 65 » » » ls.State = v.state
64 » » » if v.term { 66 » » » ls.TerminatedTime = now // Doesn't matter, but required to Put the stream.
65 » » » » ls.State = coordinator.LSTerminated 67 » » » ls.ArchivedTime = now // Doesn't matter, but required to Put the stream.
66 » » » } 68 » » » if err := ds.Get(c).Put(ls); err != nil {
67 » » » if err := ls.Put(ds.Get(c)); err != nil {
68 panic(err) 69 panic(err)
69 } 70 }
70 } 71 }
71 ds.Get(c).Testable().CatchupIndexes() 72 ds.Get(c).Testable().CatchupIndexes()
72 73
73 // This allows us to update our Context in test setup. 74 // This allows us to update our Context in test setup.
75 tap := ct.ArchivalPublisher{}
76 svcStub := ct.Services{
77 AP: func() (coordinator.ArchivalPublisher, error) {
78 return &tap, nil
79 },
80 }
74 b := Backend{ 81 b := Backend{
75 » » » multiTaskBatchSize: 5, 82 » » » ServiceBase: coordinator.ServiceBase{&svcStub},
76 } 83 }
77 84
78 tb := testBase{Context: c} 85 tb := testBase{Context: c}
79 r := httprouter.New() 86 r := httprouter.New()
80 b.InstallHandlers(r, tb.base) 87 b.InstallHandlers(r, tb.base)
81 88
82 s := httptest.NewServer(r) 89 s := httptest.NewServer(r)
83 defer s.Close() 90 defer s.Close()
84 91
85 » » Convey(`With no configuration loaded`, func() { 92 » » endpoint := fmt.Sprintf("%s/archive/cron", s.URL)
86 » » » for _, ep := range []string{"terminal", "nonterminal"} { 93
87 » » » » Convey(fmt.Sprintf(`A %q endpoint hit will fail. `, ep), func() { 94 » » Convey(`With no configuration loaded, the endpoint will fail.`, func() {
88 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar chive/cron/%s", s.URL, ep)) 95 » » » resp, err := http.Get(endpoint)
89 » » » » » So(err, ShouldBeNil) 96 » » » So(err, ShouldBeNil)
90 » » » » » So(resp.StatusCode, ShouldEqual, http.St atusInternalServerError) 97 » » » So(resp.StatusCode, ShouldEqual, http.StatusInternalServ erError)
91 » » » » }) 98 » » })
92 » » » } 99
100 » » Convey(`With no task topic configured, the endpoint will fail.`, func() {
101 » » » svcStub.InitConfig()
102
103 » » » resp, err := http.Get(endpoint)
104 » » » So(err, ShouldBeNil)
105 » » » So(resp.StatusCode, ShouldEqual, http.StatusInternalServ erError)
93 }) 106 })
94 107
95 Convey(`With a configuration loaded`, func() { 108 Convey(`With a configuration loaded`, func() {
96 » » » qName := "archive-test-queue" 109 » » » svcStub.InitConfig()
97 » » » c = ct.UseConfig(c, &svcconfig.Coordinator{ 110 » » » svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projec ts/test/topics/archive-test-topic"
98 » » » » ArchiveDelay: google.NewDuration(10 * time.M inute), 111 » » » svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay = g oogle.NewDuration(10 * time.Second)
99 » » » » ArchiveDelayMax: google.NewDuration(24 * time.H our), 112 » » » svcStub.ServiceConfig.Coordinator.ArchiveDelayMax = goog le.NewDuration(10 * time.Minute)
100 » » » » ArchiveTaskQueue: qName,
101 » » » })
102 » » » tb.Context = c
103 113
104 » » » Convey(`With no task queue configured`, func() { 114 » » » Convey(`A request to the endpoint will be successful.`, func() {
105 » » » » for _, ep := range []string{"terminal", "nonterm inal", "purge"} { 115 » » » » resp, err := http.Get(endpoint)
106 » » » » » Convey(fmt.Sprintf(`A %q endpoint hit wi ll fail.`, ep), func() { 116 » » » » So(err, ShouldBeNil)
107 » » » » » » resp, err := http.Get(fmt.Sprint f("%s/archive/cron/%s", s.URL, ep)) 117 » » » » So(resp.StatusCode, ShouldEqual, http.StatusOK)
108 » » » » » » So(err, ShouldBeNil) 118
109 » » » » » » So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError) 119 » » » » // Candidate tasks should be scheduled.
110 » » » » » }) 120 » » » » So(tap.StreamNames(), ShouldResemble, []string{" bar", "quux"})
111 » » » » } 121
122 » » » » // Hit the endpoint again, no additional tasks s hould be scheduled.
123 » » » » Convey(`A subsequent endpoint hit will not sched ule any additional tasks.`, func() {
124 » » » » » resp, err = http.Get(endpoint)
125 » » » » » So(err, ShouldBeNil)
126 » » » » » So(resp.StatusCode, ShouldEqual, http.St atusOK)
127 » » » » » So(tap.StreamNames(), ShouldResemble, [] string{"bar", "quux"})
128 » » » » })
112 }) 129 })
113 130
114 » » » Convey(`With a task queue configured`, func() { 131 » » » Convey(`When scheduling multiple tasks`, func() {
115 » » » » tq.Get(c).Testable().CreateQueue(qName) 132 » » » » b.multiTaskBatchSize = 5
116 133
117 » » » » Convey(`A terminal endpoint hit will be successf ul and idempotent.`, func() { 134 » » » » // Create a lot of archival candidate tasks to s chedule.
118 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar chive/cron/terminal", s.URL)) 135 » » » » //
136 » » » » // Note that since task queue names are returned sorted, we want to
137 » » » » // name these streams greater than our stock str eam names.
138 » » » » names := []string{"bar", "quux"}
139 » » » » for i := 0; i < 11; i++ {
140 » » » » » ls := ct.TestLogStream(c, ct.TestLogStre amDescriptor(c, fmt.Sprintf("stream-%02d", i)))
141
142 » » » » » ls.Created = now.Add(-(10 * time.Minute) )
143 » » » » » ls.State = coordinator.LSStreaming
144 » » » » » ls.TerminalIndex = 1337
145 » » » » » ls.TerminatedTime = now
146 » » » » » if err := ds.Get(c).Put(ls); err != nil {
147 » » » » » » panic(err)
148 » » » » » }
149
150 » » » » » names = append(names, ls.Name)
151 » » » » }
152 » » » » ds.Get(c).Testable().CatchupIndexes()
153
154 » » » » Convey(`Will schedule all pages properly.`, func () {
155 » » » » » // Ensure that all of these tasks get ad ded to the task queue.
156 » » » » » resp, err := http.Get(endpoint)
119 So(err, ShouldBeNil) 157 So(err, ShouldBeNil)
120 So(resp.StatusCode, ShouldEqual, http.St atusOK) 158 So(resp.StatusCode, ShouldEqual, http.St atusOK)
159 So(tap.StreamNames(), ShouldResemble, na mes)
121 160
122 » » » » » // Candidate tasks should be scheduled. 161 » » » » » // Use this opportunity to assert that n one of the scheduled streams
123 » » » » » tasks := tq.Get(c).Testable().GetSchedul edTasks()[qName] 162 » » » » » // have any settle or completion delay.
124 » » » » » So(tasks, shouldHaveTasks, archiveTaskNa me("testing/+/bar")) 163 » » » » » for _, at := range tap.Tasks() {
164 » » » » » » So(at.SettleDelay.Duration(), Sh ouldEqual, 0)
165 » » » » » » So(at.CompletePeriod.Duration(), ShouldEqual, 0)
166 » » » » » }
125 167
126 » » » » » // Hit the endpoint again, the same task s should be scheduled. 168 » » » » » Convey(`Will not schedule additional tas ks on the next run.`, func() {
127 » » » » » resp, err = http.Get(fmt.Sprintf("%s/arc hive/cron/terminal", s.URL)) 169 » » » » » » resp, err := http.Get(endpoint)
128 » » » » » So(err, ShouldBeNil) 170 » » » » » » So(err, ShouldBeNil)
129 » » » » » So(resp.StatusCode, ShouldEqual, http.St atusOK) 171 » » » » » » So(resp.StatusCode, ShouldEqual, http.StatusOK)
130 172 » » » » » » So(tap.StreamNames(), ShouldRese mble, names)
131 » » » » » tasks2 := tq.Get(c).Testable().GetSchedu ledTasks()[qName] 173 » » » » » })
132 » » » » » So(tasks2, ShouldResemble, tasks)
133 }) 174 })
134 175
135 » » » » Convey(`A non-terminal endpoint hit will be succ essful and idempotent.`, func() { 176 » » » » Convey(`Will not schedule tasks if task scheduli ng fails.`, func() {
136 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar chive/cron/nonterminal", s.URL)) 177 » » » » » tap.Err = errors.New("test error")
178
179 » » » » » // Ensure that all of these tasks get ad ded to the task queue.
180 » » » » » resp, err := http.Get(endpoint)
137 So(err, ShouldBeNil) 181 So(err, ShouldBeNil)
138 » » » » » So(resp.StatusCode, ShouldEqual, http.St atusOK) 182 » » » » » So(resp.StatusCode, ShouldEqual, http.St atusInternalServerError)
183 » » » » » So(tap.StreamNames(), ShouldResemble, [] string{})
139 184
140 » » » » » // Candidate tasks should be scheduled. 185 » » » » » Convey(`And will schedule streams next r un.`, func() {
141 » » » » » tasks := tq.Get(c).Testable().GetSchedul edTasks()[qName] 186 » » » » » » tap.Err = nil
142 » » » » » So(tasks, shouldHaveTasks, archiveTaskNa me("testing/+/qux"))
143 187
144 » » » » » // Hit the endpoint again, the same task s should be scheduled. 188 » » » » » » resp, err := http.Get(endpoint)
145 » » » » » resp, err = http.Get(fmt.Sprintf("%s/arc hive/cron/nonterminal", s.URL))
146 » » » » » So(err, ShouldBeNil)
147 » » » » » So(resp.StatusCode, ShouldEqual, http.St atusOK)
148
149 » » » » » tasks2 := tq.Get(c).Testable().GetSchedu ledTasks()[qName]
150 » » » » » So(tasks2, ShouldResemble, tasks)
151 » » » » })
152
153 » » » » Convey(`A terminal endpoint hit followed by a no n-terminal endpoint hit will be successful.`, func() {
154 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar chive/cron/terminal", s.URL))
155 » » » » » So(err, ShouldBeNil)
156 » » » » » So(resp.StatusCode, ShouldEqual, http.St atusOK)
157 » » » » » So(tq.Get(c).Testable().GetScheduledTask s()[qName], shouldHaveTasks, archiveTaskName("testing/+/bar"))
158
159 » » » » » resp, err = http.Get(fmt.Sprintf("%s/arc hive/cron/nonterminal", s.URL))
160 » » » » » So(err, ShouldBeNil)
161 » » » » » So(resp.StatusCode, ShouldEqual, http.St atusOK)
162 » » » » » So(tq.Get(c).Testable().GetScheduledTask s()[qName], shouldHaveTasks,
163 » » » » » » archiveTaskName("testing/+/bar") , archiveTaskName("testing/+/qux"))
164 » » » » })
165
166 » » » » Convey(`When scheduling multiple tasks`, func() {
167 » » » » » // Create a lot of archival candidate ta sks to schedule.
168 » » » » » var names []string
169 » » » » » for i := 0; i < 11; i++ {
170 » » » » » » ls := ct.TestLogStream(c, ct.Tes tLogStreamDescriptor(c, fmt.Sprintf("stream-%d", i)))
171
172 » » » » » » ls.Created = clock.Now(c).Add(-( 10 * time.Minute))
173 » » » » » » ls.Updated = ls.Created
174 » » » » » » ls.State = coordinator.LSTermina ted
175 » » » » » » ls.TerminalIndex = 1337
176 » » » » » » if err := ls.Put(ds.Get(c)); err != nil {
177 » » » » » » » panic(err)
178 » » » » » » }
179
180 » » » » » » names = append(names, ls.Name)
181 » » » » » }
182 » » » » » names = append(names, "bar")
183 » » » » » ds.Get(c).Testable().CatchupIndexes()
184
185 » » » » » Convey(`Will schedule all pages properly .`, func() {
186 » » » » » » taskNames := make([]interface{}, len(names))
187 » » » » » » for i, n := range names {
188 » » » » » » » taskNames[i] = archiveTa skName(fmt.Sprintf("testing/+/%s", n))
189 » » » » » » }
190
191 » » » » » » // Ensure that all of these task s get added to the task queue.
192 » » » » » » resp, err := http.Get(fmt.Sprint f("%s/archive/cron/terminal", s.URL))
193 So(err, ShouldBeNil) 189 So(err, ShouldBeNil)
194 So(resp.StatusCode, ShouldEqual, http.StatusOK) 190 So(resp.StatusCode, ShouldEqual, http.StatusOK)
195 » » » » » » So(tq.Get(c).Testable().GetSched uledTasks()[qName], shouldHaveTasks, taskNames...) 191 » » » » » » So(tap.StreamNames(), ShouldRese mble, names)
196
197 » » » » » » Convey(`Will be successful when rescheduling the same tasks.`, func() {
198 » » » » » » » resp, err := http.Get(fm t.Sprintf("%s/archive/cron/terminal", s.URL))
199 » » » » » » » So(err, ShouldBeNil)
200 » » » » » » » So(resp.StatusCode, Shou ldEqual, http.StatusOK)
201 » » » » » » » So(tq.Get(c).Testable(). GetScheduledTasks()[qName], shouldHaveTasks, taskNames...)
202 » » » » » » })
203 » » » » » })
204
205 » » » » » Convey(`Will return an error if task sch eduling fails.`, func() {
206 » » » » » » c, fb := featureBreaker.FilterTQ (c, nil)
207 » » » » » » tb.Context = c
208 » » » » » » fb.BreakFeatures(errors.New("tes t error"), "AddMulti")
209
210 » » » » » » // Ensure that all of these task s get added to the task queue.
211 » » » » » » resp, err := http.Get(fmt.Sprint f("%s/archive/cron/terminal", s.URL))
212 » » » » » » So(err, ShouldBeNil)
213 » » » » » » So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
214 » » » » » })
215
216 » » » » » Convey(`Will return an error if a single task scheduling fails.`, func() {
217 » » » » » » merr := make(errors.MultiError, len(names))
218 » » » » » » merr[0] = errors.New("test error ")
219
220 » » » » » » c, fb := featureBreaker.FilterTQ (c, nil)
221 » » » » » » tb.Context = c
222 » » » » » » fb.BreakFeatures(merr, "AddMulti ")
223
224 » » » » » » // Ensure that all of these task s get added to the task queue.
225 » » » » » » resp, err := http.Get(fmt.Sprint f("%s/archive/cron/terminal", s.URL))
226 » » » » » » So(err, ShouldBeNil)
227 » » » » » » So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
228 » » » » » })
229
230 » » » » » Convey(`And a purge endpoint hit will pu rge the tasks.`, func() {
231 » » » » » » resp, err := http.Get(fmt.Sprint f("%s/archive/cron/purge", s.URL))
232 » » » » » » So(err, ShouldBeNil)
233 » » » » » » So(resp.StatusCode, ShouldEqual, http.StatusOK)
234 » » » » » » So(tq.Get(c).Testable().GetSched uledTasks()[qName], shouldHaveTasks)
235 }) 192 })
236 }) 193 })
237 }) 194 })
238 }) 195 })
239 }) 196 })
240 } 197 }
OLDNEW
« no previous file with comments | « appengine/logdog/coordinator/backend/archiveCron.go ('k') | appengine/logdog/coordinator/backend/backend.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698