| Index: appengine/logdog/coordinator/endpoints/logs/get_test.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/logs/get_test.go b/appengine/logdog/coordinator/endpoints/logs/get_test.go
|
| index 657f568d945fdb0443dc14f69db3222b3f22e485..3fa8c80b62eba3048ccb7befc2d5ffbd86f573df 100644
|
| --- a/appengine/logdog/coordinator/endpoints/logs/get_test.go
|
| +++ b/appengine/logdog/coordinator/endpoints/logs/get_test.go
|
| @@ -24,7 +24,6 @@ import (
|
| "github.com/luci/luci-go/common/iotools"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| "github.com/luci/luci-go/common/proto/logdog/logpb"
|
| - "github.com/luci/luci-go/common/proto/logdog/svcconfig"
|
| "github.com/luci/luci-go/common/recordio"
|
| "github.com/luci/luci-go/server/auth"
|
| "github.com/luci/luci-go/server/auth/authtest"
|
| @@ -48,32 +47,29 @@ func (s *staticArchiveSource) NextLogEntry() (le *logpb.LogEntry, err error) {
|
| return
|
| }
|
|
|
| -type testGSClient map[string][]byte
|
| +type testGSClient map[gs.Path][]byte
|
|
|
| -func (c testGSClient) key(bucket, relpath string) string {
|
| - return fmt.Sprintf("%s/%s", bucket, relpath)
|
| +func (c testGSClient) put(path gs.Path, d []byte) {
|
| + c[path] = d
|
| }
|
|
|
| -func (c testGSClient) put(bucket, relpath string, d []byte) {
|
| - c[c.key(bucket, relpath)] = d
|
| -}
|
| -
|
| -func (c testGSClient) get(bucket, relpath string) []byte {
|
| - return c[c.key(bucket, relpath)]
|
| +func (c testGSClient) get(path gs.Path) []byte {
|
| + return c[path]
|
| }
|
|
|
| func (c testGSClient) Close() error { return nil }
|
| -func (c testGSClient) NewWriter(string, string) (gs.Writer, error) {
|
| +func (c testGSClient) NewWriter(gs.Path) (gs.Writer, error) {
|
| return nil, errors.New("not implemented")
|
| }
|
| -func (c testGSClient) Delete(string, string) error { return errors.New("not implemented") }
|
| +func (c testGSClient) Rename(gs.Path, gs.Path) error { return errors.New("not implemented") }
|
| +func (c testGSClient) Delete(gs.Path) error { return errors.New("not implemented") }
|
|
|
| -func (c testGSClient) NewReader(bucket, relpath string, o gs.Options) (io.ReadCloser, error) {
|
| +func (c testGSClient) NewReader(path gs.Path, o gs.Options) (io.ReadCloser, error) {
|
| if d, ok := c["error"]; ok {
|
| return nil, errors.New(string(d))
|
| }
|
|
|
| - d, ok := c[c.key(bucket, relpath)]
|
| + d, ok := c[path]
|
| if !ok {
|
| return nil, errors.New("does not exist")
|
| }
|
| @@ -155,14 +151,27 @@ func TestGet(t *testing.T) {
|
| fs := authtest.FakeState{}
|
| c = auth.WithState(c, &fs)
|
|
|
| - c = ct.UseConfig(c, &svcconfig.Coordinator{
|
| - AdminAuthGroup: "test-administrators",
|
| - })
|
| + ms := memoryStorage.Storage{}
|
| + gsc := testGSClient{}
|
| + svcStub := ct.Services{
|
| + IS: func() (storage.Storage, error) {
|
| + return &ms, nil
|
| + },
|
| + GS: func() (gs.Client, error) {
|
| + return gsc, nil
|
| + },
|
| + }
|
| + svcStub.InitConfig()
|
| + svcStub.ServiceConfig.Coordinator.AdminAuthGroup = "test-administrators"
|
| +
|
| + s := Server{
|
| + ServiceBase: coordinator.ServiceBase{&svcStub},
|
| + }
|
|
|
| // Generate our test stream.
|
| desc := ct.TestLogStreamDescriptor(c, "foo/bar")
|
| ls := ct.TestLogStream(c, desc)
|
| - if err := ls.Put(ds.Get(c)); err != nil {
|
| + if err := ds.Get(c).Put(ls); err != nil {
|
| panic(err)
|
| }
|
|
|
| @@ -204,18 +213,6 @@ func TestGet(t *testing.T) {
|
| protobufs[uint64(v)] = d
|
| }
|
|
|
| - // Define and populate our Storage.
|
| - s := Server{}
|
| -
|
| - ms := memoryStorage.Storage{}
|
| - gsc := testGSClient{}
|
| - s.StorageFunc = func(context.Context) (storage.Storage, error) {
|
| - return &ms, nil
|
| - }
|
| - s.GSClientFunc = func(context.Context) (gs.Client, error) {
|
| - return gsc, nil
|
| - }
|
| -
|
| Convey(`Testing Get requests (no logs)`, func() {
|
| req := logdog.GetRequest{
|
| Path: string(ls.Path()),
|
| @@ -280,13 +277,17 @@ func TestGet(t *testing.T) {
|
| panic(err)
|
| }
|
|
|
| - gsc.put("testbucket", "stream", lbuf.Bytes())
|
| - gsc.put("testbucket", "index", ibuf.Bytes())
|
| + now := tc.Now().UTC()
|
| +
|
| + gsc.put("gs://testbucket/stream", lbuf.Bytes())
|
| + gsc.put("gs://testbucket/index", ibuf.Bytes())
|
| ls.State = coordinator.LSArchived
|
| + ls.TerminatedTime = now
|
| + ls.ArchivedTime = now
|
| ls.ArchiveStreamURL = "gs://testbucket/stream"
|
| ls.ArchiveIndexURL = "gs://testbucket/index"
|
| }
|
| - if err := ls.Put(ds.Get(c)); err != nil {
|
| + if err := ds.Get(c).Put(ls); err != nil {
|
| panic(err)
|
| }
|
|
|
| @@ -297,7 +298,7 @@ func TestGet(t *testing.T) {
|
|
|
| Convey(`When the log stream is purged`, func() {
|
| ls.Purged = true
|
| - if err := ls.Put(ds.Get(c)); err != nil {
|
| + if err := ds.Get(c).Put(ls); err != nil {
|
| panic(err)
|
| }
|
|
|
| @@ -404,7 +405,7 @@ func TestGet(t *testing.T) {
|
| })
|
|
|
| Convey(`Will successfully retrieve a stream path hash.`, func() {
|
| - req.Path = ls.HashID()
|
| + req.Path = ls.HashID
|
| resp, err := s.Get(c, &req)
|
| So(err, ShouldBeRPCOK)
|
| So(resp, shouldHaveLogs, 0, 1, 2)
|
| @@ -422,7 +423,7 @@ func TestGet(t *testing.T) {
|
| })
|
|
|
| Convey(`Will return Internal if the protobuf descriptor data is corrupt.`, func() {
|
| - // We can't use "ls.Put" here because it validates the protobuf!
|
| + ls.SetDSValidate(false)
|
| ls.Descriptor = []byte{0x00} // Invalid protobuf, zero tag.
|
| if err := ds.Get(c).Put(ls); err != nil {
|
| panic(err)
|
| @@ -436,7 +437,7 @@ func TestGet(t *testing.T) {
|
| Convey(`Will return Internal if the protobuf log entry data is corrupt.`, func() {
|
| if v {
|
| // Corrupt the archive datastream.
|
| - stream := gsc.get("testbucket", "stream")
|
| + stream := gsc.get("gs://testbucket/stream")
|
| zeroRecords(stream)
|
| } else {
|
| // Add corrupted entry to Storage. Create a new entry here, since
|
| @@ -563,7 +564,7 @@ func TestGet(t *testing.T) {
|
| })
|
|
|
| Convey(`Will successfully retrieve a stream path hash and state.`, func() {
|
| - req.Path = ls.HashID()
|
| + req.Path = ls.HashID
|
| req.State = true
|
|
|
| resp, err := s.Tail(c, &req)
|
|
|