| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package distributor | 5 package distributor |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "github.com/luci/gae/filter/txnBuf" |
| 8 "github.com/luci/gae/service/datastore" | 9 "github.com/luci/gae/service/datastore" |
| 10 "github.com/luci/luci-go/common/errors" |
| 9 "github.com/luci/luci-go/common/logging" | 11 "github.com/luci/luci-go/common/logging" |
| 10 "github.com/luci/luci-go/dm/appengine/model" | 12 "github.com/luci/luci-go/dm/appengine/model" |
| 11 "github.com/luci/luci-go/tumble" | 13 "github.com/luci/luci-go/tumble" |
| 12 "golang.org/x/net/context" | 14 "golang.org/x/net/context" |
| 13 ) | 15 ) |
| 14 | 16 |
| 15 // NotifyExecution is used to finish an execution. Specifically it allows the | 17 // NotifyExecution is used to finish an execution. Specifically it allows the |
| 16 // appropriate distributor to HandleNotification, and then when that concludes, | 18 // appropriate distributor to HandleNotification, and then when that concludes, |
| 17 // invokes DM's FinishExecution (see mutate.FinishExecution). | 19 // invokes DM's FinishExecution (see mutate.FinishExecution). |
| 18 type NotifyExecution struct { | 20 type NotifyExecution struct { |
| (...skipping 10 matching lines...) Expand all Loading... |
| 29 func (f *NotifyExecution) RollForward(c context.Context) (muts []tumble.Mutation
, err error) { | 31 func (f *NotifyExecution) RollForward(c context.Context) (muts []tumble.Mutation
, err error) { |
| 30 reg := GetRegistry(c) | 32 reg := GetRegistry(c) |
| 31 dist, _, err := reg.MakeDistributor(c, f.CfgName) | 33 dist, _, err := reg.MakeDistributor(c, f.CfgName) |
| 32 if err != nil { | 34 if err != nil { |
| 33 logging.Fields{ | 35 logging.Fields{ |
| 34 logging.ErrorKey: err, | 36 logging.ErrorKey: err, |
| 35 "cfg": f.CfgName, | 37 "cfg": f.CfgName, |
| 36 }.Errorf(c, "Failed to make distributor") | 38 }.Errorf(c, "Failed to make distributor") |
| 37 return | 39 return |
| 38 } | 40 } |
| 39 » rslt, err := dist.HandleNotification(f.Notification) | 41 » dsNoTx := txnBuf.GetNoTxn(c) |
| 42 » q := &model.Quest{ID: f.Notification.ID.Quest} |
| 43 » if err := dsNoTx.Get(q); err != nil { |
| 44 » » return nil, errors.Annotate(err).Reason("getting Quest").Err() |
| 45 » } |
| 46 » rslt, err := dist.HandleNotification(&q.Desc, f.Notification) |
| 40 if err != nil { | 47 if err != nil { |
| 41 // TODO(riannucci): check for transient/non-transient | 48 // TODO(riannucci): check for transient/non-transient |
| 42 logging.Fields{ | 49 logging.Fields{ |
| 43 logging.ErrorKey: err, | 50 logging.ErrorKey: err, |
| 44 "cfg": f.CfgName, | 51 "cfg": f.CfgName, |
| 45 }.Errorf(c, "Failed to handle notification") | 52 }.Errorf(c, "Failed to handle notification") |
| 46 return | 53 return |
| 47 } | 54 } |
| 48 if rslt != nil { | 55 if rslt != nil { |
| 49 return reg.FinishExecution(c, f.Notification.ID, rslt) | 56 return reg.FinishExecution(c, f.Notification.ID, rslt) |
| 50 } | 57 } |
| 51 return | 58 return |
| 52 } | 59 } |
| 53 | 60 |
| 54 func init() { | 61 func init() { |
| 55 tumble.Register((*NotifyExecution)(nil)) | 62 tumble.Register((*NotifyExecution)(nil)) |
| 56 } | 63 } |
| OLD | NEW |