Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 1 addition & 19 deletions lib/backend/etcdbk/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestEtcd(t *testing.T) {
// we can't fiddle with clocks inside the etcd client, so instead of creating
// and returning a fake clock, we wrap the real clock used by the etcd client
// in a FakeClock interface that sleeps instead of instantly advancing.
sleepingClock := blockingFakeClock{bk.clock}
sleepingClock := test.BlockingFakeClock{Clock: bk.clock}

return bk, sleepingClock, nil
}
Expand Down Expand Up @@ -255,21 +255,3 @@ func etcdTestEndpoint() string {
}
return "https://127.0.0.1:2379"
}

func (r blockingFakeClock) Advance(d time.Duration) {
if d < 0 {
panic("Invalid argument, negative duration")
}

// We cannot rewind time for etcd since it will not have any effect on the server
// so we actually sleep in this case
time.Sleep(d)
}

func (r blockingFakeClock) BlockUntil(int) {
panic("Not implemented")
}

type blockingFakeClock struct {
clockwork.Clock
}
135 changes: 101 additions & 34 deletions lib/backend/firestore/firestorebk.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ const (
timestampDocProperty = "timestamp"
// idDocProperty references the record's internal ID
idDocProperty = "id"
// valueDocProperty references the value of the record
valueDocProperty = "value"
// timeInBetweenIndexCreationStatusChecks
timeInBetweenIndexCreationStatusChecks = time.Second * 10
// commitLimit is the maximum number of writes per commit
Expand All @@ -211,16 +213,38 @@ func GetName() string {
}

// keep this here to test interface conformance
var _ backend.Backend = &Backend{}
var _ backend.Backend = (*Backend)(nil)

// ownerCredentials adds the needed authorization headers when
// interacting with the emulator to allow access to the
// batched write api. Without the header, the emulator returns
// the following error:
// rpc error: code = PermissionDenied desc = Batch writes require admin authentication
//
// See the following issues for more details:
// https://github.com/firebase/firebase-tools/issues/1363
// https://github.com/firebase/firebase-tools/issues/3833
type ownerCredentials struct{}

func (t ownerCredentials) GetRequestMetadata(context.Context, ...string) (map[string]string, error) {
return map[string]string{"Authorization": "Bearer owner"}, nil
}

func (t ownerCredentials) RequireTransportSecurity() bool { return false }

// CreateFirestoreClients creates a firestore admin and normal client given the supplied parameters
func CreateFirestoreClients(ctx context.Context, projectID string, endPoint string, credentialsFile string) (*apiv1.FirestoreAdminClient, *firestore.Client, error) {

var args []option.ClientOption

if len(endPoint) != 0 {
args = append(args, option.WithoutAuthentication(), option.WithEndpoint(endPoint), option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())))
} else if len(credentialsFile) != 0 {
if endPoint != "" {
args = append(args,
option.WithTelemetryDisabled(),
option.WithoutAuthentication(),
option.WithEndpoint(endPoint),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
option.WithGRPCDialOption(grpc.WithPerRPCCredentials(ownerCredentials{})),
)
} else if credentialsFile != "" {
args = append(args, option.WithCredentialsFile(credentialsFile))
}

Expand Down Expand Up @@ -339,14 +363,20 @@ func (b *Backend) Put(ctx context.Context, item backend.Item) (*backend.Lease, e
// Update updates value in the backend
func (b *Backend) Update(ctx context.Context, item backend.Item) (*backend.Lease, error) {
r := newRecord(item, b.clock)
_, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(item.Key)).Get(ctx)
if err != nil {
return nil, ConvertGRPCError(err)

updates := []firestore.Update{
{Path: keyDocProperty, Value: r.Key},
{Path: timestampDocProperty, Value: r.Timestamp},
{Path: expiresDocProperty, Value: r.Expires},
{Path: idDocProperty, Value: r.ID},
{Path: valueDocProperty, Value: r.Value},
}
_, err = b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(item.Key)).Set(ctx, r)

_, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(item.Key)).Update(ctx, updates)
if err != nil {
return nil, ConvertGRPCError(err)
}

return b.newLease(item), nil
}

Expand Down Expand Up @@ -398,8 +428,21 @@ func (b *Backend) GetRange(ctx context.Context, startKey []byte, endKey []byte,
}

if r.isExpired(b.clock.Now()) {
if _, err := docSnap.Ref.Delete(ctx); err != nil {
return nil, ConvertGRPCError(err)
if _, err := docSnap.Ref.Delete(ctx, firestore.LastUpdateTime(docSnap.UpdateTime)); err != nil && status.Code(err) == codes.FailedPrecondition {
// If the document has been updated, then attempt one additional get to see if the
// resource was updated and is no longer expired.
docSnap, err := b.svc.Collection(b.CollectionName).Doc(docSnap.Ref.ID).Get(ctx)
if err != nil {
return nil, ConvertGRPCError(err)
}
r, err := newRecordFromDoc(docSnap)
if err != nil {
return nil, trace.Wrap(err)
}

if !r.isExpired(b.clock.Now()) {
values = append(values, r.backendItem())
}
}
// Do not include this document in result.
continue
Expand All @@ -425,7 +468,10 @@ func (b *Backend) Get(ctx context.Context, key []byte) (*backend.Item, error) {
if len(key) == 0 {
return nil, trace.BadParameter("missing parameter key")
}
docSnap, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(key)).Get(ctx)

documentID := b.keyToDocumentID(key)

docSnap, err := b.svc.Collection(b.CollectionName).Doc(documentID).Get(ctx)
if err != nil {
return nil, ConvertGRPCError(err)
}
Expand All @@ -435,8 +481,22 @@ func (b *Backend) Get(ctx context.Context, key []byte) (*backend.Item, error) {
}

if r.isExpired(b.clock.Now()) {
if _, err := docSnap.Ref.Delete(ctx); err != nil {
return nil, trace.Wrap(err)
if _, err := docSnap.Ref.Delete(ctx, firestore.LastUpdateTime(docSnap.UpdateTime)); err != nil && status.Code(err) == codes.FailedPrecondition {
// If the document has been updated, then attempt one additional get to see if the
// resource was updated and is no longer expired.
docSnap, err := b.svc.Collection(b.CollectionName).Doc(documentID).Get(ctx)
if err != nil {
return nil, ConvertGRPCError(err)
}
r, err := newRecordFromDoc(docSnap)
if err != nil {
return nil, trace.Wrap(err)
}

if !r.isExpired(b.clock.Now()) {
bi := r.backendItem()
return &bi, nil
}
}
return nil, trace.NotFound("the supplied key: %q does not exist", string(key))
}
Expand All @@ -445,7 +505,6 @@ func (b *Backend) Get(ctx context.Context, key []byte) (*backend.Item, error) {
return &bi, nil
}

// CompareAndSwap compares and swap values in atomic operation
// CompareAndSwap compares item with existing item
// and replaces is with replaceWith item
func (b *Backend) CompareAndSwap(ctx context.Context, expected backend.Item, replaceWith backend.Item) (*backend.Lease, error) {
Expand All @@ -459,25 +518,32 @@ func (b *Backend) CompareAndSwap(ctx context.Context, expected backend.Item, rep
return nil, trace.BadParameter("expected and replaceWith keys should match")
}

expectedDocSnap, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(expected.Key)).Get(ctx)
if err != nil {
return nil, trace.CompareFailed("error or object not found, error: %v", ConvertGRPCError(err))
}
ref := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(expected.Key))
err := b.svc.RunTransaction(ctx, func(ctx context.Context, tx *firestore.Transaction) error {
expectedDocSnap, err := tx.Get(ref)
if err != nil {
return trace.CompareFailed("error or object not found, error: %v", ConvertGRPCError(err))
}

existingRecord, err := newRecordFromDoc(expectedDocSnap)
if err != nil {
return nil, trace.Wrap(err)
}
existingRecord, err := newRecordFromDoc(expectedDocSnap)
if err != nil {
return trace.Wrap(err)
}

if !bytes.Equal(existingRecord.Value, expected.Value) {
return nil, trace.CompareFailed("expected item value %v does not match actual item value %v", string(expected.Value), existingRecord.Value)
}
if !bytes.Equal(existingRecord.Value, expected.Value) {
return trace.CompareFailed("expected item value %v does not match actual item value %v", string(expected.Value), existingRecord.Value)
}

if err := tx.Set(ref, newRecord(replaceWith, b.clock)); err != nil {
return ConvertGRPCError(err)
}

r := newRecord(replaceWith, b.clock)
_, err = expectedDocSnap.Ref.Set(ctx, r)
return nil
})
if err != nil {
return nil, ConvertGRPCError(err)
return nil, trace.Wrap(err)
}

return b.newLease(replaceWith), nil
}

Expand Down Expand Up @@ -639,14 +705,15 @@ const driftTolerance = time.Millisecond * 2500

// watchCollection watches a firestore collection for changes and pushes those changes, events into the buffer for watchers
func (b *Backend) watchCollection() error {
var snaps *firestore.QuerySnapshotIterator

// Filter any documents that don't have a key. If the collection is shared between
// the cluster state and audit events, this filters out the event documents since they
// have a different schema, and it's a requirement for all resources to have a key.
query := b.svc.Collection(b.CollectionName).Where(keyDocProperty, "!=", "")
if b.LimitWatchQuery {
snaps = b.svc.Collection(b.CollectionName).Query.Where(timestampDocProperty, ">=", b.clock.Now().UTC().Add(-driftTolerance).Unix()).Snapshots(b.clientContext)
} else {
snaps = b.svc.Collection(b.CollectionName).Snapshots(b.clientContext)
query = query.Where(timestampDocProperty, ">=", b.clock.Now().UTC().Add(-driftTolerance).Unix())
}

snaps := query.Snapshots(b.clientContext)
b.buf.SetInit()
defer b.buf.Reset()
defer snaps.Stop()
Expand Down
41 changes: 33 additions & 8 deletions lib/backend/firestore/firestorebk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,27 @@ func firestoreParams() backend.Params {
// Creating the indices on - even an empty - live Firestore collection
// can take 5 minutes, so we re-use the same project and collection
// names for each test.
collection := "tp-cluster-data-test"
projectID := "tp-testproj"
endpoint := ""

if c := os.Getenv("TELEPORT_FIRESTORE_TEST_COLLECTION"); c != "" {
collection = c
}

if p := os.Getenv("TELEPORT_FIRESTORE_TEST_PROJECT"); p != "" {
projectID = p
}

if e := os.Getenv("TELEPORT_FIRESTORE_TEST_ENDPOINT"); e != "" {
endpoint = e
}

return map[string]interface{}{
"collection_name": "tp-cluster-data-test",
"project_id": "tp-testproj",
"endpoint": "localhost:8618",
"purgeExpiredDocumentsPollInterval": time.Second,
"collection_name": collection,
"project_id": projectID,
"endpoint": endpoint,
"purge_expired_documents_poll_interval": 300 * time.Millisecond,
}
}

Expand All @@ -84,7 +99,12 @@ func ensureTestsEnabled(t *testing.T) {
}

func ensureEmulatorRunning(t *testing.T, cfg map[string]interface{}) {
con, err := net.Dial("tcp", cfg["endpoint"].(string))
endpoint, _ := cfg["endpoint"].(string)
if endpoint == "" {
return
}

con, err := net.Dial("tcp", endpoint)
if err != nil {
t.Skip("Firestore emulator is not running, start it with: gcloud beta emulators firestore start --host-port=localhost:8618")
}
Expand All @@ -111,14 +131,19 @@ func TestFirestoreDB(t *testing.T) {
return nil, nil, test.ErrConcurrentAccessNotSupported
}

clock := clockwork.NewFakeClock()
clock := clockwork.NewRealClock()

// we can't fiddle with clocks inside the firestore client, so instead of creating
// and returning a fake clock, we wrap the real clock used by the client
// in a FakeClock interface that sleeps instead of instantly advancing.
sleepingClock := test.BlockingFakeClock{Clock: clock}

uut, err := New(context.Background(), cfg, Options{Clock: clock})
uut, err := New(context.Background(), cfg, Options{Clock: sleepingClock})
if err != nil {
return nil, nil, trace.Wrap(err)
}

return uut, clock, nil
return uut, sleepingClock, nil
}

test.RunBackendComplianceSuite(t, newBackend)
Expand Down
Loading