OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package tumble |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "fmt" |
| 10 "math" |
| 11 "net/http" |
| 12 "strconv" |
| 13 "sync/atomic" |
| 14 "time" |
| 15 |
| 16 "github.com/julienschmidt/httprouter" |
| 17 "github.com/luci/gae/filter/txnBuf" |
| 18 "github.com/luci/gae/service/datastore" |
| 19 "github.com/luci/gae/service/datastore/serialize" |
| 20 "github.com/luci/gae/service/memcache" |
| 21 "github.com/luci/luci-go/appengine/memlock" |
| 22 "github.com/luci/luci-go/common/clock" |
| 23 "github.com/luci/luci-go/common/errors" |
| 24 "github.com/luci/luci-go/common/logging" |
| 25 "github.com/luci/luci-go/common/parallel" |
| 26 "github.com/luci/luci-go/common/stringset" |
| 27 "golang.org/x/net/context" |
| 28 ) |
| 29 |
| 30 // expandedShardBounds returns the boundary of the expandedShard query that |
| 31 // currently corresponds to this shard number. If Shard is < 0 or > NumShards |
| 32 // (the currently configured number of shards), this will return a low > high. |
| 33 // Otherwise low < high. |
| 34 func expandedShardBounds(c context.Context, shard uint64) (low, high int64) { |
| 35 cfg := GetConfig(c) |
| 36 |
| 37 if shard < 0 || uint64(shard) >= cfg.NumShards { |
| 38 logging.Warningf(c, "Invalid shard: %d", shard) |
| 39 // return inverted bounds |
| 40 return 0, -1 |
| 41 } |
| 42 |
| 43 expandedShardsPerShard := int64(math.MaxUint64 / cfg.NumShards) |
| 44 low = math.MinInt64 + (int64(shard) * expandedShardsPerShard) |
| 45 if uint64(shard) == cfg.NumShards-1 { |
| 46 high = math.MaxInt64 |
| 47 } else { |
| 48 high = low + expandedShardsPerShard |
| 49 } |
| 50 return |
| 51 } |
| 52 |
| 53 var dustSettleTimeout = 2 * time.Second |
| 54 |
| 55 // ProcessShardHandler is a http handler suitable for installation into |
| 56 // a httprouter. It expects `logging` and `luci/gae` services to be installed |
| 57 // into the context. |
| 58 // |
| 59 // ProcessShardHandler verifies that its being run as a taskqueue task and that |
| 60 // the following parameters exist and are well-formed: |
| 61 // * timestamp: decimal-encoded UNIX/UTC timestamp in seconds. |
| 62 // * shard_id: decimal-encoded shard identifier. |
| 63 // |
| 64 // ProcessShardHandler then invokes ProcessShard with the parsed parameters. |
| 65 func ProcessShardHandler(c context.Context, rw http.ResponseWriter, r *http.Requ
est, p httprouter.Params) { |
| 66 tstampStr := p.ByName("timestamp") |
| 67 sidStr := p.ByName("shard_id") |
| 68 |
| 69 tstamp, err := strconv.ParseInt(tstampStr, 10, 64) |
| 70 if err != nil { |
| 71 logging.Errorf(c, "bad timestamp %q", tstampStr) |
| 72 rw.WriteHeader(http.StatusNotFound) |
| 73 fmt.Fprintf(rw, "bad timestamp") |
| 74 return |
| 75 } |
| 76 |
| 77 sid, err := strconv.ParseUint(sidStr, 10, 64) |
| 78 if err != nil { |
| 79 logging.Errorf(c, "bad shardID %q", tstampStr) |
| 80 rw.WriteHeader(http.StatusNotFound) |
| 81 fmt.Fprintf(rw, "bad shardID") |
| 82 return |
| 83 } |
| 84 |
| 85 err = ProcessShard(c, time.Unix(tstamp, 0).UTC(), sid) |
| 86 if err != nil { |
| 87 rw.WriteHeader(http.StatusInternalServerError) |
| 88 fmt.Fprintf(rw, "error: %s", err) |
| 89 } else { |
| 90 rw.Write([]byte("ok")) |
| 91 } |
| 92 } |
| 93 |
| 94 // ProcessShard is the tumble backend endpoint. This accepts a shard number whic
h |
| 95 // is expected to be < GlobalConfig.NumShards. |
| 96 func ProcessShard(c context.Context, timestamp time.Time, shard uint64) error { |
| 97 low, high := expandedShardBounds(c, shard) |
| 98 if low > high { |
| 99 return nil |
| 100 } |
| 101 |
| 102 l := logging.Get(logging.SetField(c, "shard", shard)) |
| 103 |
| 104 cfg := GetConfig(c) |
| 105 |
| 106 lockKey := fmt.Sprintf("%s.%d.lock", cfg.Name, shard) |
| 107 clientID := fmt.Sprintf("%d_%d", timestamp.Unix(), shard) |
| 108 |
| 109 // this last key allows buffered tasks to early exit if some other shard |
| 110 // processor has already processed past this task's target timestamp. |
| 111 lastKey := fmt.Sprintf("%s.%d.last", cfg.Name, shard) |
| 112 mc := memcache.Get(c) |
| 113 lastItm, err := mc.Get(lastKey) |
| 114 if err != nil { |
| 115 if err != memcache.ErrCacheMiss { |
| 116 l.Warningf("couldn't obtain last timestamp: %s", err) |
| 117 } |
| 118 } else { |
| 119 val := lastItm.Value() |
| 120 last, err := serialize.ReadTime(bytes.NewBuffer(val)) |
| 121 if err != nil { |
| 122 l.Warningf("could not decode timestamp %v: %s", val, err
) |
| 123 } else { |
| 124 last = last.Add(cfg.TemporalRoundFactor) |
| 125 if timestamp.After(last) { |
| 126 l.Infof("early exit, %s > %s", timestamp, last) |
| 127 return nil |
| 128 } |
| 129 } |
| 130 } |
| 131 err = nil |
| 132 |
| 133 q := datastore.NewQuery("tumble.Mutation"). |
| 134 Gte("ExpandedShard", low).Lte("ExpandedShard", high). |
| 135 Project("TargetRoot").Distinct(true). |
| 136 Limit(cfg.ProcessMaxBatchSize) |
| 137 |
| 138 banSets := map[string]stringset.Set{} |
| 139 |
| 140 limitSemaphore := make(chan struct{}, cfg.NumGoroutines) |
| 141 |
| 142 for try := 0; try < 2; try++ { |
| 143 err = memlock.TryWithLock(c, lockKey, clientID, func(c context.C
ontext) error { |
| 144 l.Infof("Got lock (try %d)", try) |
| 145 |
| 146 for { |
| 147 processCounters := []*int64{} |
| 148 err := parallel.FanOutIn(func(ch chan<- func() e
rror) { |
| 149 err := datastore.Get(c).Run(q, func(pm d
atastore.PropertyMap, _ datastore.CursorCB) bool { |
| 150 root := pm["TargetRoot"][0].Valu
e().(*datastore.Key) |
| 151 encRoot := root.Encode() |
| 152 |
| 153 // TODO(riannucci): make banSets
remove keys from the banSet which |
| 154 // weren't hit. Once they stop s
howing up, they'll never show up |
| 155 // again. |
| 156 |
| 157 bs := banSets[encRoot] |
| 158 if bs == nil { |
| 159 bs = stringset.New(0) |
| 160 banSets[encRoot] = bs |
| 161 } |
| 162 counter := new(int64) |
| 163 processCounters = append(process
Counters, counter) |
| 164 |
| 165 ch <- func() error { |
| 166 limitSemaphore <- struct
{}{} |
| 167 defer func() { |
| 168 <-limitSemaphore |
| 169 }() |
| 170 return processRoot(c, ro
ot, bs, counter) |
| 171 } |
| 172 |
| 173 select { |
| 174 case <-c.Done(): |
| 175 l.Warningf("Lost lock!") |
| 176 return false |
| 177 default: |
| 178 return true |
| 179 } |
| 180 }) |
| 181 if err != nil { |
| 182 l.Errorf("Failure to query: %s",
err) |
| 183 ch <- func() error { |
| 184 return err |
| 185 } |
| 186 } |
| 187 }) |
| 188 if err != nil { |
| 189 return err |
| 190 } |
| 191 numProcessed := int64(0) |
| 192 for _, n := range processCounters { |
| 193 numProcessed += *n |
| 194 } |
| 195 l.Infof("cumulatively processed %d items", numPr
ocessed) |
| 196 if numProcessed == 0 { |
| 197 break |
| 198 } |
| 199 |
| 200 err = mc.Set(mc.NewItem(lastKey).SetValue(serial
ize.ToBytes(clock.Now(c)))) |
| 201 if err != nil { |
| 202 l.Warningf("could not update last proces
s memcache key %s: %s", lastKey, err) |
| 203 } |
| 204 |
| 205 clock.Sleep(c, dustSettleTimeout) |
| 206 } |
| 207 return nil |
| 208 }) |
| 209 if err != memlock.ErrFailedToLock { |
| 210 break |
| 211 } |
| 212 l.Infof("Couldn't obtain lock (try %d) (sleeping 2s)", try+1) |
| 213 clock.Sleep(c, time.Second*2) |
| 214 } |
| 215 if err == memlock.ErrFailedToLock { |
| 216 l.Infof("Couldn't obtain lock (giving up): %s", err) |
| 217 err = nil |
| 218 } |
| 219 return err |
| 220 } |
| 221 |
| 222 func getBatchByRoot(c context.Context, root *datastore.Key, banSet stringset.Set
) ([]*realMutation, error) { |
| 223 cfg := GetConfig(c) |
| 224 ds := datastore.Get(c) |
| 225 q := datastore.NewQuery("tumble.Mutation").Eq("TargetRoot", root) |
| 226 toFetch := make([]*realMutation, 0, cfg.ProcessMaxBatchSize) |
| 227 err := ds.Run(q, func(k *datastore.Key, _ datastore.CursorCB) bool { |
| 228 if !banSet.Has(k.Encode()) { |
| 229 toFetch = append(toFetch, &realMutation{ |
| 230 ID: k.StringID(), |
| 231 Parent: k.Parent(), |
| 232 }) |
| 233 } |
| 234 return len(toFetch) < cap(toFetch) |
| 235 }) |
| 236 return toFetch, err |
| 237 } |
| 238 |
| 239 func loadFilteredMutations(c context.Context, rms []*realMutation) ([]*datastore
.Key, []Mutation, error) { |
| 240 ds := datastore.Get(c) |
| 241 |
| 242 mutKeys := make([]*datastore.Key, 0, len(rms)) |
| 243 muts := make([]Mutation, 0, len(rms)) |
| 244 err := ds.GetMulti(rms) |
| 245 me, ok := err.(errors.MultiError) |
| 246 if !ok && err != nil { |
| 247 return nil, nil, err |
| 248 } |
| 249 |
| 250 for i, rm := range rms { |
| 251 err = nil |
| 252 if me != nil { |
| 253 err = me[i] |
| 254 } |
| 255 if err == nil { |
| 256 if rm.Version != getAppVersion(c) { |
| 257 logging.Fields{ |
| 258 "mut_version": rm.Version, |
| 259 "cur_version": getAppVersion(c), |
| 260 }.Warningf(c, "loading mutation with different c
ode version") |
| 261 } |
| 262 m, err := rm.GetMutation() |
| 263 if err != nil { |
| 264 logging.Errorf(c, "couldn't load mutation: %s",
err) |
| 265 continue |
| 266 } |
| 267 muts = append(muts, m) |
| 268 mutKeys = append(mutKeys, ds.KeyForObj(rm)) |
| 269 } else if err != datastore.ErrNoSuchEntity { |
| 270 return nil, nil, me |
| 271 } |
| 272 } |
| 273 |
| 274 return mutKeys, muts, nil |
| 275 } |
| 276 |
| 277 func processRoot(c context.Context, root *datastore.Key, banSet stringset.Set, c
ounter *int64) error { |
| 278 l := logging.Get(c) |
| 279 |
| 280 toFetch, err := getBatchByRoot(c, root, banSet) |
| 281 if err != nil || len(toFetch) == 0 { |
| 282 return err |
| 283 } |
| 284 |
| 285 mutKeys, muts, err := loadFilteredMutations(c, toFetch) |
| 286 if err != nil { |
| 287 return err |
| 288 } |
| 289 |
| 290 select { |
| 291 case <-c.Done(): |
| 292 l.Warningf("Lost lock during processRoot") |
| 293 default: |
| 294 } |
| 295 |
| 296 allShards := map[uint64]struct{}{} |
| 297 |
| 298 toDel := make([]*datastore.Key, 0, len(muts)) |
| 299 numMuts := uint64(0) |
| 300 err = datastore.Get(txnBuf.FilterRDS(c)).RunInTransaction(func(c context
.Context) error { |
| 301 toDel = toDel[:0] |
| 302 numMuts = 0 |
| 303 |
| 304 for i, m := range muts { |
| 305 shards, numNewMuts, err := enterTransactionInternal(c, r
oot, m.RollForward) |
| 306 if err != nil { |
| 307 l.Errorf("Executing decoded gob(%T) failed: %q:
%+v", m, err, m) |
| 308 continue |
| 309 } |
| 310 toDel = append(toDel, mutKeys[i]) |
| 311 numMuts += uint64(numNewMuts) |
| 312 for shard := range shards { |
| 313 allShards[shard] = struct{}{} |
| 314 } |
| 315 } |
| 316 |
| 317 return nil |
| 318 }, nil) |
| 319 if err != nil { |
| 320 l.Errorf("failed running transaction: %s", err) |
| 321 return err |
| 322 } |
| 323 fireTasks(c, allShards) |
| 324 l.Infof("successfully processed %d mutations, adding %d more", len(toDel
), numMuts) |
| 325 |
| 326 if len(toDel) > 0 { |
| 327 atomic.StoreInt64(counter, int64(len(toDel))) |
| 328 |
| 329 for _, k := range toDel { |
| 330 banSet.Add(k.Encode()) |
| 331 } |
| 332 if err := datastore.Get(c).DeleteMulti(toDel); err != nil { |
| 333 l.Warningf("error deleting finished mutations: %s", err) |
| 334 } |
| 335 } |
| 336 |
| 337 return nil |
| 338 } |
OLD | NEW |