diff --git a/lib/backend/etcdbk/etcd_test.go b/lib/backend/etcdbk/etcd_test.go index 79939b4636c50..301761bbb3431 100644 --- a/lib/backend/etcdbk/etcd_test.go +++ b/lib/backend/etcdbk/etcd_test.go @@ -82,7 +82,7 @@ func TestEtcd(t *testing.T) { // we can't fiddle with clocks inside the etcd client, so instead of creating // and returning a fake clock, we wrap the real clock used by the etcd client // in a FakeClock interface that sleeps instead of instantly advancing. - sleepingClock := blockingFakeClock{bk.clock} + sleepingClock := test.BlockingFakeClock{Clock: bk.clock} return bk, sleepingClock, nil } @@ -255,21 +255,3 @@ func etcdTestEndpoint() string { } return "https://127.0.0.1:2379" } - -func (r blockingFakeClock) Advance(d time.Duration) { - if d < 0 { - panic("Invalid argument, negative duration") - } - - // We cannot rewind time for etcd since it will not have any effect on the server - // so we actually sleep in this case - time.Sleep(d) -} - -func (r blockingFakeClock) BlockUntil(int) { - panic("Not implemented") -} - -type blockingFakeClock struct { - clockwork.Clock -} diff --git a/lib/backend/firestore/firestorebk.go b/lib/backend/firestore/firestorebk.go index 1d2859ecfb9c7..f9b87b65229d7 100644 --- a/lib/backend/firestore/firestorebk.go +++ b/lib/backend/firestore/firestorebk.go @@ -198,6 +198,8 @@ const ( timestampDocProperty = "timestamp" // idDocProperty references the record's internal ID idDocProperty = "id" + // valueDocProperty references the value of the record + valueDocProperty = "value" // timeInBetweenIndexCreationStatusChecks timeInBetweenIndexCreationStatusChecks = time.Second * 10 // commitLimit is the maximum number of writes per commit @@ -211,16 +213,38 @@ func GetName() string { } // keep this here to test interface conformance -var _ backend.Backend = &Backend{} +var _ backend.Backend = (*Backend)(nil) + +// ownerCredentials adds the needed authorization headers when +// interacting with the emulator to allow access to the +// batched write api. Without the header, the emulator returns +// the following error: +// rpc error: code = PermissionDenied desc = Batch writes require admin authentication +// +// See the following issues for more details: +// https://github.com/firebase/firebase-tools/issues/1363 +// https://github.com/firebase/firebase-tools/issues/3833 +type ownerCredentials struct{} + +func (t ownerCredentials) GetRequestMetadata(context.Context, ...string) (map[string]string, error) { + return map[string]string{"Authorization": "Bearer owner"}, nil +} + +func (t ownerCredentials) RequireTransportSecurity() bool { return false } // CreateFirestoreClients creates a firestore admin and normal client given the supplied parameters func CreateFirestoreClients(ctx context.Context, projectID string, endPoint string, credentialsFile string) (*apiv1.FirestoreAdminClient, *firestore.Client, error) { - var args []option.ClientOption - if len(endPoint) != 0 { - args = append(args, option.WithoutAuthentication(), option.WithEndpoint(endPoint), option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials()))) - } else if len(credentialsFile) != 0 { + if endPoint != "" { + args = append(args, + option.WithTelemetryDisabled(), + option.WithoutAuthentication(), + option.WithEndpoint(endPoint), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + option.WithGRPCDialOption(grpc.WithPerRPCCredentials(ownerCredentials{})), + ) + } else if credentialsFile != "" { args = append(args, option.WithCredentialsFile(credentialsFile)) } @@ -339,14 +363,20 @@ func (b *Backend) Put(ctx context.Context, item backend.Item) (*backend.Lease, e // Update updates value in the backend func (b *Backend) Update(ctx context.Context, item backend.Item) (*backend.Lease, error) { r := newRecord(item, b.clock) - _, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(item.Key)).Get(ctx) - if err != nil { - return nil, ConvertGRPCError(err) + + updates := []firestore.Update{ + {Path: keyDocProperty, Value: r.Key}, + {Path: timestampDocProperty, Value: r.Timestamp}, + {Path: expiresDocProperty, Value: r.Expires}, + {Path: idDocProperty, Value: r.ID}, + {Path: valueDocProperty, Value: r.Value}, } - _, err = b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(item.Key)).Set(ctx, r) + + _, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(item.Key)).Update(ctx, updates) if err != nil { return nil, ConvertGRPCError(err) } + return b.newLease(item), nil } @@ -398,8 +428,21 @@ func (b *Backend) GetRange(ctx context.Context, startKey []byte, endKey []byte, } if r.isExpired(b.clock.Now()) { - if _, err := docSnap.Ref.Delete(ctx); err != nil { - return nil, ConvertGRPCError(err) + if _, err := docSnap.Ref.Delete(ctx, firestore.LastUpdateTime(docSnap.UpdateTime)); err != nil && status.Code(err) == codes.FailedPrecondition { + // If the document has been updated, then attempt one additional get to see if the + // resource was updated and is no longer expired. + docSnap, err := b.svc.Collection(b.CollectionName).Doc(docSnap.Ref.ID).Get(ctx) + if err != nil { + return nil, ConvertGRPCError(err) + } + r, err := newRecordFromDoc(docSnap) + if err != nil { + return nil, trace.Wrap(err) + } + + if !r.isExpired(b.clock.Now()) { + values = append(values, r.backendItem()) + } } // Do not include this document in result. continue @@ -425,7 +468,10 @@ func (b *Backend) Get(ctx context.Context, key []byte) (*backend.Item, error) { if len(key) == 0 { return nil, trace.BadParameter("missing parameter key") } - docSnap, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(key)).Get(ctx) + + documentID := b.keyToDocumentID(key) + + docSnap, err := b.svc.Collection(b.CollectionName).Doc(documentID).Get(ctx) if err != nil { return nil, ConvertGRPCError(err) } @@ -435,8 +481,22 @@ func (b *Backend) Get(ctx context.Context, key []byte) (*backend.Item, error) { } if r.isExpired(b.clock.Now()) { - if _, err := docSnap.Ref.Delete(ctx); err != nil { - return nil, trace.Wrap(err) + if _, err := docSnap.Ref.Delete(ctx, firestore.LastUpdateTime(docSnap.UpdateTime)); err != nil && status.Code(err) == codes.FailedPrecondition { + // If the document has been updated, then attempt one additional get to see if the + // resource was updated and is no longer expired. + docSnap, err := b.svc.Collection(b.CollectionName).Doc(documentID).Get(ctx) + if err != nil { + return nil, ConvertGRPCError(err) + } + r, err := newRecordFromDoc(docSnap) + if err != nil { + return nil, trace.Wrap(err) + } + + if !r.isExpired(b.clock.Now()) { + bi := r.backendItem() + return &bi, nil + } } return nil, trace.NotFound("the supplied key: %q does not exist", string(key)) } @@ -445,7 +505,6 @@ func (b *Backend) Get(ctx context.Context, key []byte) (*backend.Item, error) { return &bi, nil } -// CompareAndSwap compares and swap values in atomic operation // CompareAndSwap compares item with existing item // and replaces is with replaceWith item func (b *Backend) CompareAndSwap(ctx context.Context, expected backend.Item, replaceWith backend.Item) (*backend.Lease, error) { @@ -459,25 +518,32 @@ func (b *Backend) CompareAndSwap(ctx context.Context, expected backend.Item, rep return nil, trace.BadParameter("expected and replaceWith keys should match") } - expectedDocSnap, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(expected.Key)).Get(ctx) - if err != nil { - return nil, trace.CompareFailed("error or object not found, error: %v", ConvertGRPCError(err)) - } + ref := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(expected.Key)) + err := b.svc.RunTransaction(ctx, func(ctx context.Context, tx *firestore.Transaction) error { + expectedDocSnap, err := tx.Get(ref) + if err != nil { + return trace.CompareFailed("error or object not found, error: %v", ConvertGRPCError(err)) + } - existingRecord, err := newRecordFromDoc(expectedDocSnap) - if err != nil { - return nil, trace.Wrap(err) - } + existingRecord, err := newRecordFromDoc(expectedDocSnap) + if err != nil { + return trace.Wrap(err) + } - if !bytes.Equal(existingRecord.Value, expected.Value) { - return nil, trace.CompareFailed("expected item value %v does not match actual item value %v", string(expected.Value), existingRecord.Value) - } + if !bytes.Equal(existingRecord.Value, expected.Value) { + return trace.CompareFailed("expected item value %v does not match actual item value %v", string(expected.Value), existingRecord.Value) + } + + if err := tx.Set(ref, newRecord(replaceWith, b.clock)); err != nil { + return ConvertGRPCError(err) + } - r := newRecord(replaceWith, b.clock) - _, err = expectedDocSnap.Ref.Set(ctx, r) + return nil + }) if err != nil { - return nil, ConvertGRPCError(err) + return nil, trace.Wrap(err) } + return b.newLease(replaceWith), nil } @@ -639,14 +705,15 @@ const driftTolerance = time.Millisecond * 2500 // watchCollection watches a firestore collection for changes and pushes those changes, events into the buffer for watchers func (b *Backend) watchCollection() error { - var snaps *firestore.QuerySnapshotIterator - + // Filter any documents that don't have a key. If the collection is shared between + // the cluster state and audit events, this filters out the event documents since they + // have a different schema, and it's a requirement for all resources to have a key. + query := b.svc.Collection(b.CollectionName).Where(keyDocProperty, "!=", "") if b.LimitWatchQuery { - snaps = b.svc.Collection(b.CollectionName).Query.Where(timestampDocProperty, ">=", b.clock.Now().UTC().Add(-driftTolerance).Unix()).Snapshots(b.clientContext) - } else { - snaps = b.svc.Collection(b.CollectionName).Snapshots(b.clientContext) + query = query.Where(timestampDocProperty, ">=", b.clock.Now().UTC().Add(-driftTolerance).Unix()) } + snaps := query.Snapshots(b.clientContext) b.buf.SetInit() defer b.buf.Reset() defer snaps.Stop() diff --git a/lib/backend/firestore/firestorebk_test.go b/lib/backend/firestore/firestorebk_test.go index c9a118764214e..1b1c90a1fc637 100644 --- a/lib/backend/firestore/firestorebk_test.go +++ b/lib/backend/firestore/firestorebk_test.go @@ -67,12 +67,27 @@ func firestoreParams() backend.Params { // Creating the indices on - even an empty - live Firestore collection // can take 5 minutes, so we re-use the same project and collection // names for each test. + collection := "tp-cluster-data-test" + projectID := "tp-testproj" + endpoint := "" + + if c := os.Getenv("TELEPORT_FIRESTORE_TEST_COLLECTION"); c != "" { + collection = c + } + + if p := os.Getenv("TELEPORT_FIRESTORE_TEST_PROJECT"); p != "" { + projectID = p + } + + if e := os.Getenv("TELEPORT_FIRESTORE_TEST_ENDPOINT"); e != "" { + endpoint = e + } return map[string]interface{}{ - "collection_name": "tp-cluster-data-test", - "project_id": "tp-testproj", - "endpoint": "localhost:8618", - "purgeExpiredDocumentsPollInterval": time.Second, + "collection_name": collection, + "project_id": projectID, + "endpoint": endpoint, + "purge_expired_documents_poll_interval": 300 * time.Millisecond, } } @@ -84,7 +99,12 @@ func ensureTestsEnabled(t *testing.T) { } func ensureEmulatorRunning(t *testing.T, cfg map[string]interface{}) { - con, err := net.Dial("tcp", cfg["endpoint"].(string)) + endpoint, _ := cfg["endpoint"].(string) + if endpoint == "" { + return + } + + con, err := net.Dial("tcp", endpoint) if err != nil { t.Skip("Firestore emulator is not running, start it with: gcloud beta emulators firestore start --host-port=localhost:8618") } @@ -111,14 +131,19 @@ func TestFirestoreDB(t *testing.T) { return nil, nil, test.ErrConcurrentAccessNotSupported } - clock := clockwork.NewFakeClock() + clock := clockwork.NewRealClock() + + // we can't fiddle with clocks inside the firestore client, so instead of creating + // and returning a fake clock, we wrap the real clock used by the client + // in a FakeClock interface that sleeps instead of instantly advancing. + sleepingClock := test.BlockingFakeClock{Clock: clock} - uut, err := New(context.Background(), cfg, Options{Clock: clock}) + uut, err := New(context.Background(), cfg, Options{Clock: sleepingClock}) if err != nil { return nil, nil, trace.Wrap(err) } - return uut, clock, nil + return uut, sleepingClock, nil } test.RunBackendComplianceSuite(t, newBackend) diff --git a/lib/backend/test/suite.go b/lib/backend/test/suite.go index 6a4ff3111d94c..bd70e00f70a18 100644 --- a/lib/backend/test/suite.go +++ b/lib/backend/test/suite.go @@ -88,7 +88,7 @@ func WithMirrorMode(mirror bool) ConstructionOption { } } -// WithConcurrentBackend() asks the constructor to create a +// WithConcurrentBackend asks the constructor to create a func WithConcurrentBackend(target backend.Backend) ConstructionOption { return func(opts *ConstructionOptions) error { opts.ConcurrentBackend = target @@ -96,6 +96,26 @@ func WithConcurrentBackend(target backend.Backend) ConstructionOption { } } +// BlockingFakeClock simulates a fake clock by +// sleeping instead of advancing an actual fake clock. +// This is required for backend clients which cannot +// time travel via a fake clock. +type BlockingFakeClock struct { + clockwork.Clock +} + +func (r BlockingFakeClock) Advance(d time.Duration) { + if d < 0 { + panic("Invalid argument, negative duration") + } + + time.Sleep(d) +} + +func (r BlockingFakeClock) BlockUntil(int) { + panic("Not implemented") +} + // Constructor describes a function for constructing new instances of a // backend, with various options as required by a given test. Note that // it's the caller's responsibility to close it when the test is finished. @@ -110,57 +130,71 @@ type Constructor func(options ...ConstructionOption) (backend.Backend, clockwork // backend under test. func RunBackendComplianceSuite(t *testing.T, newBackend Constructor) { t.Run("CRUD", func(t *testing.T) { + t.Parallel() testCRUD(t, newBackend) }) t.Run("QueryRange", func(t *testing.T) { + t.Parallel() testQueryRange(t, newBackend) }) t.Run("DeleteRange", func(t *testing.T) { + t.Parallel() testDeleteRange(t, newBackend) }) t.Run("PutRange", func(t *testing.T) { + t.Parallel() testPutRange(t, newBackend) }) t.Run("CompareAndSwap", func(t *testing.T) { + t.Parallel() testCompareAndSwap(t, newBackend) }) t.Run("Expiration", func(t *testing.T) { + t.Parallel() testExpiration(t, newBackend) }) t.Run("KeepAlive", func(t *testing.T) { + t.Parallel() testKeepAlive(t, newBackend) }) t.Run("Events", func(t *testing.T) { + t.Parallel() testEvents(t, newBackend) }) t.Run("WatchersClose", func(t *testing.T) { + t.Parallel() testWatchersClose(t, newBackend) }) t.Run("Locking", func(t *testing.T) { + t.Parallel() testLocking(t, newBackend) }) t.Run("ConcurrentOperations", func(t *testing.T) { + t.Parallel() testConcurrentOperations(t, newBackend) }) t.Run("Mirror", func(t *testing.T) { + t.Parallel() testMirror(t, newBackend) }) t.Run("FetchLimit", func(t *testing.T) { + t.Parallel() testFetchLimit(t, newBackend) }) t.Run("Limit", func(t *testing.T) { + t.Parallel() testLimit(t, newBackend) }) } @@ -168,15 +202,15 @@ func RunBackendComplianceSuite(t *testing.T, newBackend Constructor) { // RequireItems asserts that the supplied `actual` items collection matches // the `expected` collection, in size, ordering and the key/value pairs of // each entry. -func RequireItems(t *testing.T, actual, expected []backend.Item) { +func RequireItems(t *testing.T, expected, actual []backend.Item) { require.Len(t, actual, len(expected)) for i := range expected { - require.Equal(t, actual[i].Key, expected[i].Key) - require.Equal(t, actual[i].Value, expected[i].Value) + require.Equal(t, expected[i].Key, actual[i].Key) + require.Equal(t, expected[i].Value, actual[i].Value) } } -// CRUD tests create read update scenarios +// testCRUD tests create read update scenarios func testCRUD(t *testing.T, newBackend Constructor) { uut, _, err := newBackend() require.NoError(t, err) @@ -201,13 +235,13 @@ func testCRUD(t *testing.T, newBackend Constructor) { // get succeeds out, err := uut.Get(ctx, item.Key) require.NoError(t, err) - require.Equal(t, out.Value, item.Value) + require.Equal(t, item.Value, out.Value) // get range succeeds res, err := uut.GetRange(ctx, item.Key, backend.RangeEnd(item.Key), backend.NoLimit) require.NoError(t, err) require.Len(t, res.Items, 1) - RequireItems(t, res.Items, []backend.Item{item}) + RequireItems(t, []backend.Item{item}, res.Items) // update succeeds updated := backend.Item{Key: prefix("/hello"), Value: []byte("world 2")} @@ -216,7 +250,7 @@ func testCRUD(t *testing.T, newBackend Constructor) { out, err = uut.Get(ctx, item.Key) require.NoError(t, err) - require.Equal(t, out.Value, updated.Value) + require.Equal(t, updated.Value, out.Value) // delete succeeds require.NoError(t, uut.Delete(ctx, item.Key)) @@ -234,7 +268,7 @@ func testCRUD(t *testing.T, newBackend Constructor) { out, err = uut.Get(ctx, item.Key) require.NoError(t, err) - require.Equal(t, out.Value, item.Value) + require.Equal(t, item.Value, out.Value) // put with large key and binary value succeeds. // NB: DynamoDB has a maximum overall key length of 1024 bytes, so @@ -244,18 +278,20 @@ func testCRUD(t *testing.T, newBackend Constructor) { // (485 bytes * 2 (for hex encoding)) + 33 = 1003 // which gives us a little bit of room to spare keyBytes := make([]byte, 485) - rand.Read(keyBytes) + _, err = rand.Read(keyBytes) + require.NoError(t, err) key := hex.EncodeToString(keyBytes) data := make([]byte, 1024) - rand.Read(data) + _, err = rand.Read(data) + require.NoError(t, err) item = backend.Item{Key: prefix(key), Value: data} _, err = uut.Put(ctx, item) require.NoError(t, err) out, err = uut.Get(ctx, item.Key) require.NoError(t, err) - require.Equal(t, out.Value, item.Value) + require.Equal(t, item.Value, out.Value) } func testQueryRange(t *testing.T, newBackend Constructor) { @@ -280,31 +316,31 @@ func testQueryRange(t *testing.T, newBackend Constructor) { // prefix range fetch result, err := uut.GetRange(ctx, prefix("/prefix"), backend.RangeEnd(prefix("/prefix")), backend.NoLimit) require.NoError(t, err) - RequireItems(t, result.Items, []backend.Item{a, b, c1, c2}) + RequireItems(t, []backend.Item{a, b, c1, c2}, result.Items) // sub prefix range fetch result, err = uut.GetRange(ctx, prefix("/prefix/c"), backend.RangeEnd(prefix("/prefix/c")), backend.NoLimit) require.NoError(t, err) - RequireItems(t, result.Items, []backend.Item{c1, c2}) + RequireItems(t, []backend.Item{c1, c2}, result.Items) // range match result, err = uut.GetRange(ctx, prefix("/prefix/c/c1"), backend.RangeEnd(prefix("/prefix/c/cz")), backend.NoLimit) require.NoError(t, err) - RequireItems(t, result.Items, []backend.Item{c1, c2}) + RequireItems(t, []backend.Item{c1, c2}, result.Items) // pagination result, err = uut.GetRange(ctx, prefix("/prefix"), backend.RangeEnd(prefix("/prefix")), 2) require.NoError(t, err) // expect two first records - RequireItems(t, result.Items, []backend.Item{a, b}) + RequireItems(t, []backend.Item{a, b}, result.Items) // fetch next two items result, err = uut.GetRange(ctx, backend.RangeEnd(prefix("/prefix/b")), backend.RangeEnd(prefix("/prefix")), 2) require.NoError(t, err) // expect two last records - RequireItems(t, result.Items, []backend.Item{c1, c2}) + RequireItems(t, []backend.Item{c1, c2}, result.Items) // next fetch is empty result, err = uut.GetRange(ctx, backend.RangeEnd(prefix("/prefix/c/c2")), backend.RangeEnd(prefix("/prefix")), 2) @@ -337,7 +373,7 @@ func testDeleteRange(t *testing.T, newBackend Constructor) { // make sure items with "/prefix/c" are gone result, err := uut.GetRange(ctx, prefix("/prefix"), backend.RangeEnd(prefix("/prefix")), backend.NoLimit) require.NoError(t, err) - RequireItems(t, result.Items, []backend.Item{a, b}) + RequireItems(t, []backend.Item{a, b}, result.Items) } // testPutRange tests scenarios with put range @@ -363,7 +399,7 @@ func testPutRange(t *testing.T, newBackend Constructor) { // prefix range fetch result, err := uut.GetRange(ctx, prefix("/prefix"), backend.RangeEnd(prefix("/prefix")), backend.NoLimit) require.NoError(t, err) - RequireItems(t, result.Items, []backend.Item{a, b}) + RequireItems(t, []backend.Item{a, b}, result.Items) } // testCompareAndSwap tests compare and swap functionality @@ -375,33 +411,72 @@ func testCompareAndSwap(t *testing.T, newBackend Constructor) { prefix := MakePrefix() ctx := context.Background() + key := prefix("one") + // compare and swap on non existing value will fail - _, err = uut.CompareAndSwap(ctx, backend.Item{Key: prefix("one"), Value: []byte("1")}, backend.Item{Key: prefix("one"), Value: []byte("2")}) + _, err = uut.CompareAndSwap(ctx, backend.Item{Key: key, Value: []byte("1")}, backend.Item{Key: key, Value: []byte("2")}) require.True(t, trace.IsCompareFailed(err)) // create value and try again... - _, err = uut.Create(ctx, backend.Item{Key: prefix("one"), Value: []byte("1")}) + _, err = uut.Create(ctx, backend.Item{Key: key, Value: []byte("1")}) require.NoError(t, err) // success CAS! - _, err = uut.CompareAndSwap(ctx, backend.Item{Key: prefix("one"), Value: []byte("1")}, backend.Item{Key: prefix("one"), Value: []byte("2")}) + _, err = uut.CompareAndSwap(ctx, backend.Item{Key: key, Value: []byte("1")}, backend.Item{Key: key, Value: []byte("2")}) require.NoError(t, err) - out, err := uut.Get(ctx, prefix("one")) + out, err := uut.Get(ctx, key) require.NoError(t, err) require.Equal(t, []byte("2"), out.Value) // value has been updated - not '1' any more - _, err = uut.CompareAndSwap(ctx, backend.Item{Key: prefix("one"), Value: []byte("1")}, backend.Item{Key: prefix("one"), Value: []byte("3")}) + _, err = uut.CompareAndSwap(ctx, backend.Item{Key: key, Value: []byte("1")}, backend.Item{Key: key, Value: []byte("3")}) require.True(t, trace.IsCompareFailed(err)) // existing value has not been changed by the failed CAS operation - out, err = uut.Get(ctx, prefix("one")) + out, err = uut.Get(ctx, key) require.NoError(t, err) require.Equal(t, []byte("2"), out.Value) + + for i := 0; i < 10; i++ { + i := i + var wg sync.WaitGroup + wg.Add(1) + errs := make(chan error, 2) + go func(value byte) { + defer wg.Done() + _, err := uut.CompareAndSwap(ctx, backend.Item{Key: key, Value: out.Value}, backend.Item{Key: key, Value: []byte{value}}) + errs <- err + }(byte(i + 10)) + + wg.Add(1) + go func(value byte) { + defer wg.Done() + _, err := uut.CompareAndSwap(ctx, backend.Item{Key: key, Value: out.Value}, backend.Item{Key: key, Value: []byte{value}}) + errs <- err + }(byte(i + 100)) + + // validate that only a single failure occurred + var failed int + for i := 0; i < 2; i++ { + err := <-errs + if err != nil { + failed++ + } + } + require.Equal(t, 1, failed) + + // validate that the value for the key was updated - we + // don't care which CAS above won only that one of them + // succeeded. + item, err := uut.Get(ctx, key) + require.NoError(t, err) + require.NotEqual(t, out.Value, item.Value) + out = item + } } -// Expiration tests scenario with expiring values +// testExpiration tests scenario with expiring values func testExpiration(t *testing.T, newBackend Constructor) { uut, clock, err := newBackend() require.NoError(t, err) @@ -422,7 +497,7 @@ func testExpiration(t *testing.T, newBackend Constructor) { res, err := uut.GetRange(ctx, prefix(""), backend.RangeEnd(prefix("")), backend.NoLimit) require.NoError(t, err) - RequireItems(t, res.Items, []backend.Item{itemA}) + RequireItems(t, []backend.Item{itemA}, res.Items) } // addSeconds adds seconds with a seconds precision @@ -432,7 +507,7 @@ func addSeconds(t time.Time, seconds int64) time.Time { return time.Unix(t.UTC().Unix()+seconds+1, 0) } -// KeepAlive tests keep alive API +// testKeepAlive tests keep alive API func testKeepAlive(t *testing.T, newBackend Constructor) { uut, clock, err := newBackend() require.NoError(t, err) @@ -454,8 +529,8 @@ func testKeepAlive(t *testing.T, newBackend Constructor) { {Type: types.OpInit, Item: backend.Item{}}, }) - // When I create an item that expires in 2 seconds and add it to the DB - expiresAt := addSeconds(clock.Now(), 2) + // When I create an item that expires in 10 seconds and add it to the DB + expiresAt := addSeconds(clock.Now(), 10) item, lease := AddItem(ctx, t, uut, prefix("key"), "val1", expiresAt) events := collectEvents(ctx, t, watcher, 1) @@ -465,7 +540,7 @@ func testKeepAlive(t *testing.T, newBackend Constructor) { // move the current slightly forward, but still *before* the item's // expiry time - clock.Advance(1 * time.Second) + clock.Advance(2 * time.Second) // Move the item's expiration further in the future using a KeepAlive updatedAt := addSeconds(clock.Now(), 60) @@ -507,9 +582,9 @@ func collectEvents(ctx context.Context, t *testing.T, watcher backend.Watcher, c return events } -// Events tests scenarios with event watches +// testEvents tests scenarios with event watches func testEvents(t *testing.T, newBackend Constructor) { - eventTimeout := 2 * time.Second + eventTimeout := 10 * time.Second uut, clock, err := newBackend() require.NoError(t, err) @@ -617,7 +692,7 @@ func testLimit(t *testing.T, newBackend Constructor) { item := &backend.Item{ Key: prefix("/db/database_tail_item"), Value: []byte("data"), - Expires: clock.Now().Add(time.Minute), + Expires: clock.Now().Add(10 * time.Minute), } _, err = uut.Put(ctx, *item) require.NoError(t, err) @@ -625,17 +700,17 @@ func testLimit(t *testing.T, newBackend Constructor) { item := &backend.Item{ Key: prefix(fmt.Sprintf("/db/database%d", i)), Value: []byte("data"), - Expires: clock.Now().Add(time.Second * 10), + Expires: clock.Now().Add(time.Second * 3), } _, err = uut.Put(ctx, *item) require.NoError(t, err) } - clock.Advance(time.Second * 20) + clock.Advance(time.Second * 5) item = &backend.Item{ Key: prefix("/db/database_head_item"), Value: []byte("data"), - Expires: clock.Now().Add(time.Minute), + Expires: clock.Now().Add(10 * time.Minute), } _, err = uut.Put(ctx, *item) require.NoError(t, err) diff --git a/lib/events/firestoreevents/firestoreevents.go b/lib/events/firestoreevents/firestoreevents.go index ed597f6230c5f..bf16d846ef149 100644 --- a/lib/events/firestoreevents/firestoreevents.go +++ b/lib/events/firestoreevents/firestoreevents.go @@ -424,6 +424,7 @@ func (l *Log) searchEventsWithFilter(fromUTC, toUTC time.Time, namespace string, } query := l.svc.Collection(l.CollectionName). + Where(sessionIDDocProperty, "!=", ""). // exclude any non audit event documents in the collection Where(eventNamespaceDocProperty, "==", apidefaults.Namespace). Where(createdAtDocProperty, ">=", fromUTC.Unix()). Where(createdAtDocProperty, "<=", toUTC.Unix())