Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(300)

Side by Side Diff: server/cmd/logdog_collector/main.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « server/cmd/logdog_archivist/task.go ('k') | server/internal/logdog/archivist/archivist.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package main 5 package main
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "io"
9 "time" 10 "time"
10 11
11 "github.com/luci/luci-go/common/auth" 12 "github.com/luci/luci-go/common/auth"
12 "github.com/luci/luci-go/common/clock" 13 "github.com/luci/luci-go/common/clock"
13 "github.com/luci/luci-go/common/errors" 14 "github.com/luci/luci-go/common/errors"
14 gcps "github.com/luci/luci-go/common/gcloud/pubsub" 15 gcps "github.com/luci/luci-go/common/gcloud/pubsub"
15 log "github.com/luci/luci-go/common/logging" 16 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/parallel" 17 "github.com/luci/luci-go/common/parallel"
17 "github.com/luci/luci-go/server/internal/logdog/collector" 18 "github.com/luci/luci-go/server/internal/logdog/collector"
18 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator" 19 "github.com/luci/luci-go/server/internal/logdog/collector/coordinator"
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
90 log.Fields{ 91 log.Fields{
91 "subscription": sub, 92 "subscription": sub,
92 }.Infof(c, "Successfully validated Pub/Sub subscription.") 93 }.Infof(c, "Successfully validated Pub/Sub subscription.")
93 94
94 st, err := a.IntermediateStorage(c) 95 st, err := a.IntermediateStorage(c)
95 if err != nil { 96 if err != nil {
96 return err 97 return err
97 } 98 }
98 defer st.Close() 99 defer st.Close()
99 100
100 // Application shutdown will now operate by cancelling the Collector's
101 // shutdown Context.
102 shutdownCtx, shutdownFunc := context.WithCancel(c)
103 a.SetShutdownFunc(shutdownFunc)
104
105 // Initialize our Collector service object using a caching Coordinator 101 // Initialize our Collector service object using a caching Coordinator
106 // interface. 102 // interface.
107 coord := coordinator.NewCoordinator(a.Coordinator()) 103 coord := coordinator.NewCoordinator(a.Coordinator())
108 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State CacheExpiration.Duration()) 104 coord = coordinator.NewCache(coord, int(ccfg.StateCacheSize), ccfg.State CacheExpiration.Duration())
109 105
110 coll := collector.Collector{ 106 coll := collector.Collector{
111 Coordinator: coord, 107 Coordinator: coord,
112 Storage: st, 108 Storage: st,
113 MaxMessageWorkers: int(ccfg.MaxMessageWorkers), 109 MaxMessageWorkers: int(ccfg.MaxMessageWorkers),
114 } 110 }
115 defer coll.Close() 111 defer coll.Close()
116 112
117 // Execute our main subscription pull loop. It will run until the suppli ed 113 // Execute our main subscription pull loop. It will run until the suppli ed
118 // Context is cancelled. 114 // Context is cancelled.
119 psIterator, err := psSub.Pull(c) 115 psIterator, err := psSub.Pull(c)
120 if err != nil { 116 if err != nil {
121 log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator. ") 117 log.WithError(err).Errorf(c, "Failed to create Pub/Sub iterator. ")
122 return err 118 return err
123 } 119 }
124 » defer func() { 120 » defer psIterator.Stop()
125 » » log.Debugf(c, "Waiting for Pub/Sub subscription iterator to stop ...") 121
126 » » psIterator.Stop() 122 » // Application shutdown will now operate by cancelling the Collector's
127 » » log.Debugf(c, "Pub/Sub subscription iterator has stopped.") 123 » // shutdown Context.
128 » }() 124 » a.SetShutdownFunc(psIterator.Stop)
129 125
130 parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC chan<- func() error) { 126 parallel.Ignore(parallel.Run(int(ccfg.MaxConcurrentMessages), func(taskC chan<- func() error) {
131 // Loop until shut down. 127 // Loop until shut down.
132 » » for shutdownCtx.Err() == nil { 128 » » for {
133 msg, err := psIterator.Next() 129 msg, err := psIterator.Next()
134 » » » if err != nil { 130 » » » switch err {
131 » » » case nil:
132 » » » » taskC <- func() error {
133 » » » » » c := log.SetField(c, "messageID", msg.ID )
134 » » » » » msg.Done(a.processMessage(c, &coll, msg) )
135 » » » » » return nil
136 » » » » }
137
138 » » » case io.EOF, context.Canceled, context.DeadlineExceeded:
139 » » » » return
140
141 » » » default:
135 log.Fields{ 142 log.Fields{
136 log.ErrorKey: err, 143 log.ErrorKey: err,
137 "delay": pubsubPullErrorDelay, 144 "delay": pubsubPullErrorDelay,
138 }.Errorf(c, "Failed to fetch Pub/Sub message, re try after delay...") 145 }.Errorf(c, "Failed to fetch Pub/Sub message, re try after delay...")
139 clock.Sleep(c, pubsubPullErrorDelay) 146 clock.Sleep(c, pubsubPullErrorDelay)
140 continue
141 }
142
143 taskC <- func() error {
144 c := log.SetField(c, "messageID", msg.ID)
145 msg.Done(a.processMessage(c, &coll, msg))
146 return nil
147 } 147 }
148 } 148 }
149 })) 149 }))
150 150
151 log.Debugf(c, "Collector finished.") 151 log.Debugf(c, "Collector finished.")
152 return nil 152 return nil
153 } 153 }
154 154
155 // processMessage returns true if the message should be ACK'd (deleted from 155 // processMessage returns true if the message should be ACK'd (deleted from
156 // Pub/Sub) or false if the message should not be ACK'd. 156 // Pub/Sub) or false if the message should not be ACK'd.
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
190 }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKi ng.") 190 }.Errorf(c, "Non-transient error ingesting Pub/Sub message; ACKi ng.")
191 return true 191 return true
192 } 192 }
193 } 193 }
194 194
195 // Entry point. 195 // Entry point.
196 func main() { 196 func main() {
197 a := application{} 197 a := application{}
198 a.Run(context.Background(), a.runCollector) 198 a.Run(context.Background(), a.runCollector)
199 } 199 }
OLDNEW
« no previous file with comments | « server/cmd/logdog_archivist/task.go ('k') | server/internal/logdog/archivist/archivist.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698