Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 1 addition & 19 deletions lib/backend/etcdbk/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestEtcd(t *testing.T) {
// we can't fiddle with clocks inside the etcd client, so instead of creating
// and returning a fake clock, we wrap the real clock used by the etcd client
// in a FakeClock interface that sleeps instead of instantly advancing.
sleepingClock := blockingFakeClock{bk.clock}
sleepingClock := test.BlockingFakeClock{Clock: bk.clock}

return bk, sleepingClock, nil
}
Expand Down Expand Up @@ -255,21 +255,3 @@ func etcdTestEndpoint() string {
}
return "https://127.0.0.1:2379"
}

func (r blockingFakeClock) Advance(d time.Duration) {
if d < 0 {
panic("Invalid argument, negative duration")
}

// We cannot rewind time for etcd since it will not have any effect on the server
// so we actually sleep in this case
time.Sleep(d)
}

func (r blockingFakeClock) BlockUntil(int) {
panic("Not implemented")
}

type blockingFakeClock struct {
clockwork.Clock
}
135 changes: 101 additions & 34 deletions lib/backend/firestore/firestorebk.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ const (
timestampDocProperty = "timestamp"
// idDocProperty references the record's internal ID
idDocProperty = "id"
// valueDocProperty references the value of the record
valueDocProperty = "value"
// timeInBetweenIndexCreationStatusChecks
timeInBetweenIndexCreationStatusChecks = time.Second * 10
)
Expand All @@ -209,16 +211,38 @@ func GetName() string {
}

// keep this here to test interface conformance
var _ backend.Backend = &Backend{}
var _ backend.Backend = (*Backend)(nil)

// ownerCredentials adds the needed authorization headers when
// interacting with the emulator to allow access to the
// batched write api. Without the header, the emulator returns
// the following error:
// rpc error: code = PermissionDenied desc = Batch writes require admin authentication
//
// See the following issues for more details:
// https://github.com/firebase/firebase-tools/issues/1363
// https://github.com/firebase/firebase-tools/issues/3833
type ownerCredentials struct{}

func (t ownerCredentials) GetRequestMetadata(context.Context, ...string) (map[string]string, error) {
return map[string]string{"Authorization": "Bearer owner"}, nil
}

func (t ownerCredentials) RequireTransportSecurity() bool { return false }

// CreateFirestoreClients creates a firestore admin and normal client given the supplied parameters
func CreateFirestoreClients(ctx context.Context, projectID string, endPoint string, credentialsFile string) (*apiv1.FirestoreAdminClient, *firestore.Client, error) {

var args []option.ClientOption

if len(endPoint) != 0 {
args = append(args, option.WithoutAuthentication(), option.WithEndpoint(endPoint), option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
} else if len(credentialsFile) != 0 {
if endPoint != "" {
args = append(args,
option.WithTelemetryDisabled(),
option.WithoutAuthentication(),
option.WithEndpoint(endPoint),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
option.WithGRPCDialOption(grpc.WithPerRPCCredentials(ownerCredentials{})),
)
} else if credentialsFile != "" {
args = append(args, option.WithCredentialsFile(credentialsFile))
}

Expand Down Expand Up @@ -337,14 +361,20 @@ func (b *Backend) Put(ctx context.Context, item backend.Item) (*backend.Lease, e
// Update updates value in the backend
func (b *Backend) Update(ctx context.Context, item backend.Item) (*backend.Lease, error) {
r := newRecord(item, b.clock)
_, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(item.Key)).Get(ctx)
if err != nil {
return nil, ConvertGRPCError(err)

updates := []firestore.Update{
{Path: keyDocProperty, Value: r.Key},
{Path: timestampDocProperty, Value: r.Timestamp},
{Path: expiresDocProperty, Value: r.Expires},
{Path: idDocProperty, Value: r.ID},
{Path: valueDocProperty, Value: r.Value},
}
_, err = b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(item.Key)).Set(ctx, r)

_, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(item.Key)).Update(ctx, updates)
if err != nil {
return nil, ConvertGRPCError(err)
}

return b.newLease(item), nil
}

Expand Down Expand Up @@ -396,8 +426,21 @@ func (b *Backend) GetRange(ctx context.Context, startKey []byte, endKey []byte,
}

if r.isExpired(b.clock.Now()) {
if _, err := docSnap.Ref.Delete(ctx); err != nil {
return nil, ConvertGRPCError(err)
if _, err := docSnap.Ref.Delete(ctx, firestore.LastUpdateTime(docSnap.UpdateTime)); err != nil && status.Code(err) == codes.FailedPrecondition {
Comment thread
espadolini marked this conversation as resolved.
// If the document has been updated, then attempt one additional get to see if the
// resource was updated and is no longer expired.
docSnap, err := b.svc.Collection(b.CollectionName).Doc(docSnap.Ref.ID).Get(ctx)
if err != nil {
return nil, ConvertGRPCError(err)
}
r, err := newRecordFromDoc(docSnap)
if err != nil {
return nil, trace.Wrap(err)
}

if !r.isExpired(b.clock.Now()) {
values = append(values, r.backendItem())
}
Comment on lines +429 to +443
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low-priority nit: this doesn't violate the backend model, but there may be cases where it would result in what I'd call "annoying" behavior:

Say that I have some range R, and the following sequence of operations happens:

  • An initial range request occurs across range R at time T1.
  • A write happens to key R1 at time T2.
  • A write happens to key R2 at time T3.
  • A delete of the old value of R2 is attempted by the original range request, falling back to a read of R2 at time T4.

In this sequence of events, the resulting range contains the value of R2 as set at T3, but omits the value of R1 as set at T2.

This is ultimately fine, and can happen for any number of legitimate reasons, since many of our backends either don't guarantee that writes to different keys are well-ordered in the event stream, and/or because range requests are evaluated sequentially from the "latest" state, so values that appear later in the range may also be "newer".

However, all else being equal, less inconsistencies of this nature are probably better, and in this case I think we could have less network round-trips, simpler logic, and more consistency by just partially or completely omitting this logic.

We already treat expiry as a lazy/best-effort affair, and given the limitations of our backends we're going to keep doing that at least to some extent. I'd suggest either completely omitting optimistic cleanup, or simply returning the expired value if optimistic cleanup fails for any reason.

Additionally, it might be worth putting a per-request cap on the amount of optimistic cleanup we're willing to do... if a cluster was offline for a while, we might start up to a backend state with 20_000 expired nodes. We don't really want a GetRange call waiting on 20_000 API requests I think. Better to just let those 20_000 nodes get whittled down asynchronously.

Anyhow, like I said... not actually a violation of the backend model, just a potential annoyance, so take it or leave it.

}
// Do not include this document in result.
continue
Expand All @@ -423,7 +466,10 @@ func (b *Backend) Get(ctx context.Context, key []byte) (*backend.Item, error) {
if len(key) == 0 {
return nil, trace.BadParameter("missing parameter key")
}
docSnap, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(key)).Get(ctx)

documentID := b.keyToDocumentID(key)

docSnap, err := b.svc.Collection(b.CollectionName).Doc(documentID).Get(ctx)
if err != nil {
return nil, ConvertGRPCError(err)
}
Expand All @@ -433,8 +479,22 @@ func (b *Backend) Get(ctx context.Context, key []byte) (*backend.Item, error) {
}

if r.isExpired(b.clock.Now()) {
if _, err := docSnap.Ref.Delete(ctx); err != nil {
return nil, trace.Wrap(err)
if _, err := docSnap.Ref.Delete(ctx, firestore.LastUpdateTime(docSnap.UpdateTime)); err != nil && status.Code(err) == codes.FailedPrecondition {
// If the document has been updated, then attempt one additional get to see if the
// resource was updated and is no longer expired.
docSnap, err := b.svc.Collection(b.CollectionName).Doc(documentID).Get(ctx)
if err != nil {
return nil, ConvertGRPCError(err)
}
r, err := newRecordFromDoc(docSnap)
if err != nil {
return nil, trace.Wrap(err)
}

if !r.isExpired(b.clock.Now()) {
bi := r.backendItem()
return &bi, nil
}
}
return nil, trace.NotFound("the supplied key: %q does not exist", string(key))
}
Expand All @@ -443,7 +503,6 @@ func (b *Backend) Get(ctx context.Context, key []byte) (*backend.Item, error) {
return &bi, nil
}

// CompareAndSwap compares and swap values in atomic operation
// CompareAndSwap compares item with existing item
// and replaces is with replaceWith item
func (b *Backend) CompareAndSwap(ctx context.Context, expected backend.Item, replaceWith backend.Item) (*backend.Lease, error) {
Expand All @@ -457,25 +516,32 @@ func (b *Backend) CompareAndSwap(ctx context.Context, expected backend.Item, rep
return nil, trace.BadParameter("expected and replaceWith keys should match")
}

expectedDocSnap, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(expected.Key)).Get(ctx)
if err != nil {
return nil, trace.CompareFailed("error or object not found, error: %v", ConvertGRPCError(err))
}
ref := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(expected.Key))
err := b.svc.RunTransaction(ctx, func(ctx context.Context, tx *firestore.Transaction) error {
expectedDocSnap, err := tx.Get(ref)
if err != nil {
return trace.CompareFailed("error or object not found, error: %v", ConvertGRPCError(err))
}

existingRecord, err := newRecordFromDoc(expectedDocSnap)
if err != nil {
return nil, trace.Wrap(err)
}
existingRecord, err := newRecordFromDoc(expectedDocSnap)
if err != nil {
return trace.Wrap(err)
}

if !bytes.Equal(existingRecord.Value, expected.Value) {
return nil, trace.CompareFailed("expected item value %v does not match actual item value %v", string(expected.Value), existingRecord.Value)
}
if !bytes.Equal(existingRecord.Value, expected.Value) {
return trace.CompareFailed("expected item value %v does not match actual item value %v", string(expected.Value), existingRecord.Value)
}

if err := tx.Set(ref, newRecord(replaceWith, b.clock)); err != nil {
return ConvertGRPCError(err)
}

r := newRecord(replaceWith, b.clock)
_, err = expectedDocSnap.Ref.Set(ctx, r)
return nil
})
if err != nil {
return nil, ConvertGRPCError(err)
return nil, trace.Wrap(err)
}

return b.newLease(replaceWith), nil
}

Expand Down Expand Up @@ -637,14 +703,15 @@ const driftTolerance = time.Millisecond * 2500

// watchCollection watches a firestore collection for changes and pushes those changes, events into the buffer for watchers
func (b *Backend) watchCollection() error {
var snaps *firestore.QuerySnapshotIterator

// Filter any documents that don't have a key. If the collection is shared between
// the cluster state and audit events, this filters out the event documents since they
// have a different schema, and it's a requirement for all resources to have a key.
query := b.svc.Collection(b.CollectionName).Where(keyDocProperty, "!=", "")
if b.LimitWatchQuery {
snaps = b.svc.Collection(b.CollectionName).Query.Where(timestampDocProperty, ">=", b.clock.Now().UTC().Add(-driftTolerance).Unix()).Snapshots(b.clientContext)
} else {
snaps = b.svc.Collection(b.CollectionName).Snapshots(b.clientContext)
query = query.Where(timestampDocProperty, ">=", b.clock.Now().UTC().Add(-driftTolerance).Unix())
}

snaps := query.Snapshots(b.clientContext)
b.buf.SetInit()
defer b.buf.Reset()
defer snaps.Stop()
Expand Down
41 changes: 33 additions & 8 deletions lib/backend/firestore/firestorebk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,27 @@ func firestoreParams() backend.Params {
// Creating the indices on - even an empty - live Firestore collection
// can take 5 minutes, so we re-use the same project and collection
// names for each test.
collection := "tp-cluster-data-test"
projectID := "tp-testproj"
endpoint := ""

if c := os.Getenv("TELEPORT_FIRESTORE_TEST_COLLECTION"); c != "" {
collection = c
}

if p := os.Getenv("TELEPORT_FIRESTORE_TEST_PROJECT"); p != "" {
projectID = p
}

if e := os.Getenv("TELEPORT_FIRESTORE_TEST_ENDPOINT"); e != "" {
endpoint = e
}

return map[string]interface{}{
"collection_name": "tp-cluster-data-test",
"project_id": "tp-testproj",
"endpoint": "localhost:8618",
"purgeExpiredDocumentsPollInterval": time.Second,
"collection_name": collection,
"project_id": projectID,
"endpoint": endpoint,
"purge_expired_documents_poll_interval": 300 * time.Millisecond,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add explain why 300ms?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably adjust this a bit if needed but I took the value from a similarish value in the dynamodb tests https://github.com/gravitational/teleport/blob/master/lib/backend/dynamo/dynamodbbk_test.go#L52

}
}

Expand All @@ -87,7 +102,12 @@ func ensureTestsEnabled(t *testing.T) {
}

func ensureEmulatorRunning(t *testing.T, cfg map[string]interface{}) {
con, err := net.Dial("tcp", cfg["endpoint"].(string))
endpoint, _ := cfg["endpoint"].(string)
if endpoint == "" {
return
Comment thread
espadolini marked this conversation as resolved.
}

con, err := net.Dial("tcp", endpoint)
if err != nil {
t.Skip("Firestore emulator is not running, start it with: gcloud beta emulators firestore start --host-port=localhost:8618")
}
Expand All @@ -114,14 +134,19 @@ func TestFirestoreDB(t *testing.T) {
return nil, nil, test.ErrConcurrentAccessNotSupported
}

clock := clockwork.NewFakeClock()
clock := clockwork.NewRealClock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😿


// we can't fiddle with clocks inside the firestore client, so instead of creating
// and returning a fake clock, we wrap the real clock used by the client
// in a FakeClock interface that sleeps instead of instantly advancing.
sleepingClock := test.BlockingFakeClock{Clock: clock}

uut, err := New(context.Background(), cfg, Options{Clock: clock})
uut, err := New(context.Background(), cfg, Options{Clock: sleepingClock})
if err != nil {
return nil, nil, trace.Wrap(err)
}

return uut, clock, nil
return uut, sleepingClock, nil
}

test.RunBackendComplianceSuite(t, newBackend)
Expand Down
Loading