From 6c6b5b24507e69d07272c195b8684dc4fdee223e Mon Sep 17 00:00:00 2001 From: Andrew Lytvynov <andrew@gravitational.com> Date: Tue, 26 May 2020 10:31:57 -0700 Subject: [PATCH 1/4] Cleanup of GCP backend implementation - close all io.Closers where missing - add error handling where missing - improve error messages --- lib/backend/firestore/doc.go | 16 ++ lib/backend/firestore/firestore.go | 17 -- lib/backend/firestore/firestorebk.go | 197 +++++++++--------- lib/events/firestoreevents/firestoreevents.go | 45 ++-- lib/events/gcssessions/gcshandler.go | 17 +- 5 files changed, 152 insertions(+), 140 deletions(-) delete mode 100644 lib/backend/firestore/firestore.go diff --git a/lib/backend/firestore/doc.go b/lib/backend/firestore/doc.go index b189961076862..bd70df1d06c9e 100644 --- a/lib/backend/firestore/doc.go +++ b/lib/backend/firestore/doc.go @@ -1,3 +1,19 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + /* Package firestoreFirestoreBackend implements Firestore storage backend for Teleport auth service, similar to DynamoDB backend. diff --git a/lib/backend/firestore/firestore.go b/lib/backend/firestore/firestore.go deleted file mode 100644 index 8b36559a5441c..0000000000000 --- a/lib/backend/firestore/firestore.go +++ /dev/null @@ -1,17 +0,0 @@ -/* - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -*/ - -package firestore diff --git a/lib/backend/firestore/firestorebk.go b/lib/backend/firestore/firestorebk.go index f459225d9a938..31cd93db62cea 100644 --- a/lib/backend/firestore/firestorebk.go +++ b/lib/backend/firestore/firestorebk.go @@ -19,11 +19,12 @@ package firestore import ( "bytes" "context" - "strings" + "encoding/base64" "time" "cloud.google.com/go/firestore" apiv1 "cloud.google.com/go/firestore/apiv1/admin" + "github.com/gogo/protobuf/proto" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/utils" @@ -124,6 +125,18 @@ func (r *record) isExpired() bool { return time.Now().UTC().After(expiryDateUTC) } +func (r *record) backendItem() backend.Item { + bi := backend.Item{ + Key: []byte(r.Key), + Value: []byte(r.Value), + ID: r.ID, + } + if r.Expires != 0 { + bi.Expires = time.Unix(r.Expires, 0) + } + return bi +} + const ( // BackendName is the name of this backend BackendName = "firestore" @@ -137,14 +150,6 @@ const ( timestampDocProperty = "timestamp" // timeInBetweenIndexCreationStatusChecks timeInBetweenIndexCreationStatusChecks = time.Second * 10 - // documentNameIllegalCharacter the character key search criteria for normal document replacement - documentNameIllegalCharacter = "/" - // documentNameReplacementCharacter the replacement path separator for firestore records - documentNameReplacementCharacter = "\\" - // documentNameLockIllegalCharacter the character key search criteria for lock replacement - documentNameLockIllegalCharacter = "." - // documentNameLockReplacementCharacter the replacement key prefix for lock values - documentNameLockReplacementCharacter = "" ) // GetName is a part of backend API and it returns Firestore backend type @@ -199,6 +204,10 @@ func New(ctx context.Context, params backend.Params) (*FirestoreBackend, error) cancel() return nil, trace.Wrap(err) } + // Admin client is only used for building indexes at startup. + // It won't be needed after New returns. + defer firestoreAdminClient.Close() + buf, err := backend.NewCircularBuffer(ctx, cfg.BufferSize) if err != nil { cancel() @@ -237,20 +246,20 @@ func New(ctx context.Context, params backend.Params) (*FirestoreBackend, error) // Create creates item if it does not exist func (b *FirestoreBackend) Create(ctx context.Context, item backend.Item) (*backend.Lease, error) { - var r record - r.Key = string(item.Key) - r.Value = string(item.Value) - r.Timestamp = b.clock.Now().UTC().Unix() - r.ID = b.clock.Now().UTC().UnixNano() + r := record{ + Key: string(item.Key), + Value: string(item.Value), + Timestamp: b.clock.Now().UTC().Unix(), + ID: b.clock.Now().UTC().UnixNano(), + } if !item.Expires.IsZero() { r.Expires = item.Expires.UTC().Unix() } - _, err := b.svc.Collection(b.CollectionName).Doc(b.convertKeyToSupportedDocumentID(item.Key)).Create(ctx, r) + _, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(item.Key)).Create(ctx, r) if err != nil { return nil, ConvertGRPCError(err) - } else { - return b.newLease(item), nil } + return b.newLease(item), nil } // Put puts value into backend (creates if it does not exists, updates it otherwise) @@ -263,7 +272,7 @@ func (b *FirestoreBackend) Put(ctx context.Context, item backend.Item) (*backend if !item.Expires.IsZero() { r.Expires = item.Expires.UTC().Unix() } - _, err := b.svc.Collection(b.CollectionName).Doc(b.convertKeyToSupportedDocumentID(item.Key)).Set(ctx, r) + _, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(item.Key)).Set(ctx, r) if err != nil { return nil, ConvertGRPCError(err) } else { @@ -281,11 +290,11 @@ func (b *FirestoreBackend) Update(ctx context.Context, item backend.Item) (*back if !item.Expires.IsZero() { r.Expires = item.Expires.UTC().Unix() } - _, err := b.svc.Collection(b.CollectionName).Doc(b.convertKeyToSupportedDocumentID(item.Key)).Get(ctx) + _, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(item.Key)).Get(ctx) if err != nil { return nil, ConvertGRPCError(err) } - _, err = b.svc.Collection(b.CollectionName).Doc(b.convertKeyToSupportedDocumentID(item.Key)).Set(ctx, r) + _, err = b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(item.Key)).Set(ctx, r) if err != nil { return nil, ConvertGRPCError(err) } else { @@ -303,10 +312,11 @@ func (b *FirestoreBackend) getRangeDocs(ctx context.Context, startKey []byte, en if limit <= 0 { limit = backend.DefaultLargeLimit } - docSnaps, _ := b.svc.Collection(b.CollectionName).Where(keyDocProperty, ">=", string(startKey)). - Where(keyDocProperty, "<=", string(endKey)).Limit(limit).Documents(ctx).GetAll() - - return docSnaps, nil + return b.svc.Collection(b.CollectionName). + Where(keyDocProperty, ">=", string(startKey)). + Where(keyDocProperty, "<=", string(endKey)). + Limit(limit). + Documents(ctx).GetAll() } // GetRange returns range of elements @@ -328,12 +338,11 @@ func (b *FirestoreBackend) GetRange(ctx context.Context, startKey []byte, endKey if err != nil { return nil, ConvertGRPCError(err) } + // Do not include this document in result. + continue } - values = append(values, backend.Item{ - Key: []byte(r.Key), - Value: []byte(r.Value), - }) + values = append(values, r.backendItem()) } return &backend.GetResult{Items: values}, nil } @@ -344,17 +353,17 @@ func (b *FirestoreBackend) DeleteRange(ctx context.Context, startKey, endKey []b if err != nil { return trace.Wrap(err) } + if len(docSnaps) == 0 { + // Nothing to delete. + return nil + } batch := b.svc.Batch() - numDeleted := 0 for _, docSnap := range docSnaps { batch.Delete(docSnap.Ref) - numDeleted++ } _, err = batch.Commit(ctx) - if numDeleted > 0 { - if err != nil { - return ConvertGRPCError(err) - } + if err != nil { + return ConvertGRPCError(err) } return nil } @@ -364,7 +373,7 @@ func (b *FirestoreBackend) Get(ctx context.Context, key []byte) (*backend.Item, if len(key) == 0 { return nil, trace.BadParameter("missing parameter key") } - docSnap, err := b.svc.Collection(b.CollectionName).Doc(b.convertKeyToSupportedDocumentID(key)).Get(ctx) + docSnap, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(key)).Get(ctx) if err != nil { return nil, ConvertGRPCError(err) } @@ -383,15 +392,8 @@ func (b *FirestoreBackend) Get(ctx context.Context, key []byte) (*backend.Item, } } - item := &backend.Item{ - Key: []byte(r.Key), - Value: []byte(r.Value), - ID: r.ID, - } - if r.Expires != 0 { - item.Expires = time.Unix(r.Expires, 0) - } - return item, nil + bi := r.backendItem() + return &bi, nil } // CompareAndSwap compares and swap values in atomic operation @@ -408,7 +410,7 @@ func (b *FirestoreBackend) CompareAndSwap(ctx context.Context, expected backend. return nil, trace.BadParameter("expected and replaceWith keys should match") } - expectedDocSnap, err := b.svc.Collection(b.CollectionName).Doc(b.convertKeyToSupportedDocumentID(expected.Key)).Get(ctx) + expectedDocSnap, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(expected.Key)).Get(ctx) if err != nil { return nil, trace.CompareFailed("error or object not found, error: %v", ConvertGRPCError(err)) } @@ -446,14 +448,16 @@ func (b *FirestoreBackend) Delete(ctx context.Context, key []byte) error { if len(key) == 0 { return trace.BadParameter("missing parameter key") } - docRef := b.svc.Collection(b.CollectionName).Doc(b.convertKeyToSupportedDocumentID(key)) - doc, _ := docRef.Get(ctx) + docRef := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(key)) + doc, err := docRef.Get(ctx) + if err != nil { + return ConvertGRPCError(err) + } if !doc.Exists() { return trace.NotFound("key %s does not exist", string(key)) } - _, err := docRef.Delete(ctx) - + _, err = docRef.Delete(ctx) if err != nil { return ConvertGRPCError(err) } @@ -478,7 +482,7 @@ func (b *FirestoreBackend) KeepAlive(ctx context.Context, lease backend.Lease, e if len(lease.Key) == 0 { return trace.BadParameter("lease is missing key") } - docSnap, err := b.svc.Collection(b.CollectionName).Doc(b.convertKeyToSupportedDocumentID(lease.Key)).Get(ctx) + docSnap, err := b.svc.Collection(b.CollectionName).Doc(b.keyToDocumentID(lease.Key)).Get(ctx) if err != nil { return ConvertGRPCError(err) } @@ -502,7 +506,7 @@ func (b *FirestoreBackend) KeepAlive(ctx context.Context, lease backend.Lease, e if err != nil { return ConvertGRPCError(err) } - return err + return nil } // Close closes the Firestore client contexts and releases associated resources @@ -526,18 +530,17 @@ func (b *FirestoreBackend) Clock() clockwork.Clock { } func (b *FirestoreBackend) newLease(item backend.Item) *backend.Lease { - var lease backend.Lease - if item.Expires.IsZero() { - return &lease - } - lease.Key = item.Key - return &lease + return &backend.Lease{Key: item.Key} } -// convertKeyToSupportedDocumentID converts the key for the stored member to one supported by Firestore -func (b *FirestoreBackend) convertKeyToSupportedDocumentID(key []byte) string { - return strings.Replace(strings.Replace(string(key), documentNameLockIllegalCharacter, documentNameLockReplacementCharacter, 1), - documentNameIllegalCharacter, documentNameReplacementCharacter, -1) +// keyToDocumentID converts key to a format supported by Firestore for document +// IDs. See +// https://firebase.google.com/docs/firestore/quotas#collections_documents_and_fields +// for Firestore limitations. +func (b *FirestoreBackend) keyToDocumentID(key []byte) string { + // URL-safe base64 will not have periods or forward slashes. + // This should satisfy the Firestore requirements. + return base64.URLEncoding.EncodeToString(key) } // RetryingAsyncFunctionRunner wraps a task target in retry logic @@ -586,23 +589,12 @@ func (b *FirestoreBackend) watchCollection() error { if err != nil { return ConvertGRPCError(err) } - var expires time.Time - if r.Expires != 0 { - expires = time.Unix(r.Expires, 0) - } var e backend.Event switch change.Kind { - case firestore.DocumentAdded: - fallthrough - case firestore.DocumentModified: + case firestore.DocumentAdded, firestore.DocumentModified: e = backend.Event{ Type: backend.OpPut, - Item: backend.Item{ - Key: []byte(r.Key), - Value: []byte(r.Value), - Expires: expires, - ID: r.ID, - }, + Item: r.backendItem(), } case firestore.DocumentRemoved: e = backend.Event{ @@ -673,14 +665,11 @@ func (b *FirestoreBackend) getIndexParent() string { } func (b *FirestoreBackend) ensureIndexes(adminSvc *apiv1.FirestoreAdminClient) error { - defer adminSvc.Close() - tuples := make([]*IndexTuple, 0) - tuples = append(tuples, &IndexTuple{ + tuples := []*IndexTuple{{ FirstField: keyDocProperty, SecondField: expiresDocProperty, - }) - err := EnsureIndexes(b.clientContext, adminSvc, tuples, b.getIndexParent()) - return err + }} + return EnsureIndexes(b.clientContext, adminSvc, tuples, b.getIndexParent()) } type IndexTuple struct { @@ -693,22 +682,23 @@ type IndexTuple struct { func EnsureIndexes(ctx context.Context, adminSvc *apiv1.FirestoreAdminClient, tuples []*IndexTuple, indexParent string) error { l := log.WithFields(log.Fields{trace.Component: BackendName}) - ascendingFieldOrder := adminpb.Index_IndexField_Order_{ + ascendingFieldOrder := &adminpb.Index_IndexField_Order_{ Order: adminpb.Index_IndexField_ASCENDING, } tuplesToIndexNames := make(map[*IndexTuple]string) // create the indexes for _, tuple := range tuples { - fields := make([]*adminpb.Index_IndexField, 0) - fields = append(fields, &adminpb.Index_IndexField{ - FieldPath: tuple.FirstField, - ValueMode: &ascendingFieldOrder, - }) - fields = append(fields, &adminpb.Index_IndexField{ - FieldPath: tuple.SecondField, - ValueMode: &ascendingFieldOrder, - }) + fields := []*adminpb.Index_IndexField{ + { + FieldPath: tuple.FirstField, + ValueMode: ascendingFieldOrder, + }, + { + FieldPath: tuple.SecondField, + ValueMode: ascendingFieldOrder, + }, + } operation, err := adminSvc.CreateIndex(ctx, &adminpb.CreateIndexRequest{ Parent: indexParent, Index: &adminpb.Index{ @@ -716,26 +706,31 @@ func EnsureIndexes(ctx context.Context, adminSvc *apiv1.FirestoreAdminClient, tu Fields: fields, }, }) - if err != nil && status.Convert(err).Code() != codes.AlreadyExists { + if err != nil && status.Code(err) != codes.AlreadyExists { l.Debug("non-already exists error, returning.") return ConvertGRPCError(err) } - if operation != nil { - meta := adminpb.IndexOperationMetadata{} - _ = meta.XXX_Unmarshal(operation.Metadata.Value) - tuplesToIndexNames[tuple] = meta.Index + + meta := adminpb.IndexOperationMetadata{} + if err := proto.Unmarshal(operation.Metadata.Value, &meta); err != nil { + return trace.Wrap(err) } + tuplesToIndexNames[tuple] = meta.Index } + // Instead of polling the Index state, we should wait for the Operation to + // finish. Also, there should ideally be a timeout on index creation. + // check for statuses and block - for { - if len(tuplesToIndexNames) == 0 { - break - } + for len(tuplesToIndexNames) != 0 { time.Sleep(timeInBetweenIndexCreationStatusChecks) for tuple, name := range tuplesToIndexNames { - index, _ := adminSvc.GetIndex(ctx, &adminpb.GetIndexRequest{Name: name}) - l.Infof("Index for tuple %s-%s, %s, state is %s.", tuple.FirstField, tuple.SecondField, index.Name, index.State.String()) + index, err := adminSvc.GetIndex(ctx, &adminpb.GetIndexRequest{Name: name}) + if err != nil { + l.Warningf("error fetching index %q: %v", name, err) + continue + } + l.Infof("Index for tuple %s-%s, %s, state is %s.", tuple.FirstField, tuple.SecondField, index.Name, index.State) if index.State == adminpb.Index_READY { delete(tuplesToIndexNames, tuple) } diff --git a/lib/events/firestoreevents/firestoreevents.go b/lib/events/firestoreevents/firestoreevents.go index f9d2581c357b7..442bb1e4dd790 100644 --- a/lib/events/firestoreevents/firestoreevents.go +++ b/lib/events/firestoreevents/firestoreevents.go @@ -158,7 +158,7 @@ type EventsConfig struct { func (cfg *EventsConfig) SetFromParams(params backend.Params) error { err := utils.ObjectToStruct(params, &cfg) if err != nil { - return trace.BadParameter("Firestore: configuration is invalid: %v", err) + return trace.BadParameter("firestore: configuration is invalid: %v", err) } else { return nil } @@ -173,7 +173,7 @@ func (cfg *EventsConfig) SetFromURL(url *url.URL) error { } else { disableExpiredDocumentPurge, err := strconv.ParseBool(disableExpiredDocumentPurgeParamString) if err != nil { - return trace.BadParameter("parameter %s with value '%s' is invalid", disableExpiredDocumentPurgePropertyKey, disableExpiredDocumentPurgeParamString) + return trace.BadParameter("parameter %s with value '%s' is invalid: %v", disableExpiredDocumentPurgePropertyKey, disableExpiredDocumentPurgeParamString, err) } cfg.DisableExpiredDocumentPurge = disableExpiredDocumentPurge } @@ -202,7 +202,7 @@ func (cfg *EventsConfig) SetFromURL(url *url.URL) error { } else { eventRetentionPeriod, err := time.ParseDuration(eventRetentionPeriodParamString) if err != nil { - return trace.BadParameter("parameter %s with value '%s' is invalid", eventRetentionPeriodPropertyKey, eventRetentionPeriodParamString) + return trace.BadParameter("parameter %s with value '%s' is invalid: %v", eventRetentionPeriodPropertyKey, eventRetentionPeriodParamString, err) } cfg.RetentionPeriod = eventRetentionPeriod } @@ -213,7 +213,7 @@ func (cfg *EventsConfig) SetFromURL(url *url.URL) error { } else { retryPeriodParamString, err := time.ParseDuration(retryPeriodParamString) if err != nil { - return trace.BadParameter("parameter %s with value '%s' is invalid", retryPeriodPropertyKey, retryPeriodParamString) + return trace.BadParameter("parameter %s with value '%s' is invalid: %v", retryPeriodPropertyKey, retryPeriodParamString, err) } cfg.RetryPeriod = retryPeriodParamString } @@ -224,7 +224,7 @@ func (cfg *EventsConfig) SetFromURL(url *url.URL) error { } else { purgeInterval, err := time.ParseDuration(purgeIntervalParamString) if err != nil { - return trace.BadParameter("parameter %s with value '%s' is invalid", purgeIntervalPropertyKey, purgeIntervalParamString) + return trace.BadParameter("parameter %s with value '%s' is invalid: %v", purgeIntervalPropertyKey, purgeIntervalParamString, err) } cfg.PurgeExpiredDocumentsPollInterval = purgeInterval } @@ -281,6 +281,7 @@ func New(cfg EventsConfig) (*Log, error) { cancel() return nil, trace.Wrap(err) } + defer firestoreAdminClient.Close() b := &Log{ svcContext: closeCtx, svcCancel: cancel, @@ -380,7 +381,7 @@ func (l *Log) PostSessionSlice(slice events.SessionSlice) error { } func (l *Log) UploadSessionRecording(events.SessionRecording) error { - return trace.BadParameter("not supported") + return trace.NotImplemented("UploadSessionRecording not implemented for firestore backend") } // GetSessionChunk returns a reader which can be used to read a byte stream @@ -389,7 +390,7 @@ func (l *Log) UploadSessionRecording(events.SessionRecording) error { // // If maxBytes > MaxChunkBytes, it gets rounded down to MaxChunkBytes func (l *Log) GetSessionChunk(namespace string, sid session.ID, offsetBytes, maxBytes int) ([]byte, error) { - return nil, nil + return nil, trace.NotImplemented("GetSessionChunk not implemented for firestore backend") } // Returns all events that happen during a session sorted by time @@ -402,10 +403,13 @@ func (l *Log) GetSessionChunk(namespace string, sid session.ID, offsetBytes, max func (l *Log) GetSessionEvents(namespace string, sid session.ID, after int, inlcudePrintEvents bool) ([]events.EventFields, error) { var values []events.EventFields start := time.Now() - docSnaps, _ := l.svc.Collection(l.CollectionName).Where(sessionIDDocProperty, "==", string(sid)). + docSnaps, err := l.svc.Collection(l.CollectionName).Where(sessionIDDocProperty, "==", string(sid)). Documents(l.svcContext).GetAll() batchReadLatencies.Observe(time.Since(start).Seconds()) batchReadRequests.Inc() + if err != nil { + return nil, firestorebk.ConvertGRPCError(err) + } for _, docSnap := range docSnaps { var e event err := docSnap.DataTo(&e) @@ -435,7 +439,7 @@ func (l *Log) SearchEvents(fromUTC, toUTC time.Time, filter string, limit int) ( g := l.WithFields(log.Fields{"From": fromUTC, "To": toUTC, "Filter": filter, "Limit": limit}) filterVals, err := url.ParseQuery(filter) if err != nil { - return nil, trace.BadParameter("missing parameter query") + return nil, trace.BadParameter("missing or invalid parameter query in %q: %v", filter, err) } eventFilter, ok := filterVals[events.EventType] if !ok && len(filterVals) > 0 { @@ -446,14 +450,20 @@ func (l *Log) SearchEvents(fromUTC, toUTC time.Time, filter string, limit int) ( var values []events.EventFields start := time.Now() - docSnaps, _ := l.svc.Collection(l.CollectionName).Where(eventNamespaceDocProperty, "==", defaults.Namespace). - Where(createdAtDocProperty, ">=", fromUTC.Unix()).Where(createdAtDocProperty, "<=", toUTC.Unix()). - OrderBy(createdAtDocProperty, firestore.Asc).Limit(limit).Documents(l.svcContext).GetAll() + docSnaps, err := l.svc.Collection(l.CollectionName). + Where(eventNamespaceDocProperty, "==", defaults.Namespace). + Where(createdAtDocProperty, ">=", fromUTC.Unix()). + Where(createdAtDocProperty, "<=", toUTC.Unix()). + OrderBy(createdAtDocProperty, firestore.Asc). + Limit(limit). + Documents(l.svcContext).GetAll() batchReadLatencies.Observe(time.Since(start).Seconds()) batchReadRequests.Inc() + if err != nil { + return nil, firestorebk.ConvertGRPCError(err) + } g.WithFields(log.Fields{"duration": time.Since(start)}).Debugf("Query completed.") - var total int for _, docSnap := range docSnaps { var e event @@ -476,8 +486,7 @@ func (l *Log) SearchEvents(fromUTC, toUTC time.Time, filter string, limit int) ( } if accepted || !doFilter { values = append(values, fields) - total += 1 - if limit > 0 && total >= limit { + if limit > 0 && len(values) >= limit { break } } @@ -509,7 +518,6 @@ func (l *Log) getIndexParent() string { } func (l *Log) ensureIndexes(adminSvc *apiv1.FirestoreAdminClient) error { - defer adminSvc.Close() tuples := make([]*firestorebk.IndexTuple, 0) tuples = append(tuples, &firestorebk.IndexTuple{ FirstField: eventNamespaceDocProperty, @@ -543,9 +551,12 @@ func (l *Log) purgeExpiredEvents() error { case <-t.C: expiryTime := l.Clock.Now().UTC().Add(-1 * l.RetentionPeriod) start := time.Now() - docSnaps, _ := l.svc.Collection(l.CollectionName).Where(createdAtDocProperty, "<=", expiryTime.Unix()).Documents(l.svcContext).GetAll() + docSnaps, err := l.svc.Collection(l.CollectionName).Where(createdAtDocProperty, "<=", expiryTime.Unix()).Documents(l.svcContext).GetAll() batchReadLatencies.Observe(time.Since(start).Seconds()) batchReadRequests.Inc() + if err != nil { + return firestorebk.ConvertGRPCError(err) + } numDeleted := 0 batch := l.svc.Batch() for _, docSnap := range docSnaps { diff --git a/lib/events/gcssessions/gcshandler.go b/lib/events/gcssessions/gcshandler.go index f9ba95cb864e2..c02300f760c20 100644 --- a/lib/events/gcssessions/gcshandler.go +++ b/lib/events/gcssessions/gcshandler.go @@ -214,8 +214,10 @@ func (h *Handler) Upload(ctx context.Context, sessionID session.ID, reader io.Re writer := h.gcsClient.Bucket(h.Config.Bucket).Object(path).NewWriter(ctx) start := time.Now() _, err := io.Copy(writer, reader) + // Always close the writer, even if upload failed. + closeErr := writer.Close() if err == nil { - err = writer.Close() + err = closeErr } uploadLatencies.Observe(time.Since(start).Seconds()) uploadRequests.Inc() @@ -227,22 +229,27 @@ func (h *Handler) Upload(ctx context.Context, sessionID session.ID, reader io.Re // Download downloads recorded session from GCS bucket and writes the results into writer // return trace.NotFound error is object is not found -func (h *Handler) Download(ctx context.Context, sessionID session.ID, writer io.WriterAt) error { +func (h *Handler) Download(ctx context.Context, sessionID session.ID, writerAt io.WriterAt) error { path := h.path(sessionID) h.Logger.Debugf("downloading %s", path) + writer, ok := writerAt.(io.Writer) + if !ok { + return trace.BadParameter("the provided writerAt is %T which does not implement io.Writer", writerAt) + } reader, err := h.gcsClient.Bucket(h.Config.Bucket).Object(path).NewReader(ctx) if err != nil { return convertGCSError(err) } + defer reader.Close() start := time.Now() - written, err := io.Copy(writer.(io.Writer), reader) + written, err := io.Copy(writer, reader) if err != nil { return convertGCSError(err) } downloadLatencies.Observe(time.Since(start).Seconds()) downloadRequests.Inc() if written == 0 { - return trace.NotFound("recording for %v is not found", sessionID) + return trace.NotFound("recording for %v is empty", sessionID) } return nil } @@ -290,6 +297,6 @@ func convertGCSError(err error, args ...interface{}) error { case storage.ErrBucketNotExist, storage.ErrObjectNotExist: return trace.NotFound(err.Error(), args...) default: - return trace.BadParameter(err.Error(), args...) + return trace.Wrap(err, args...) } } From db634da3aae1dc6012dea00bde87db2829b8da5f Mon Sep 17 00:00:00 2001 From: Andrew Lytvynov <andrew@gravitational.com> Date: Tue, 26 May 2020 15:28:44 -0700 Subject: [PATCH 2/4] gcssessions: don't overwrite sessions and protect auto-created buckets - before uploading a session, check that it doesn't already exist; we don't want to lose an existing session - when auto-creating a missing bucket at startup, set predefined ACLs to `projectPrivate`, which means "Project team members get access according to their roles." See https://cloud.google.com/storage/docs/json_api/v1/buckets/insert#parameters --- lib/events/gcssessions/gcshandler.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/lib/events/gcssessions/gcshandler.go b/lib/events/gcssessions/gcshandler.go index c02300f760c20..11ce76e76380c 100644 --- a/lib/events/gcssessions/gcshandler.go +++ b/lib/events/gcssessions/gcshandler.go @@ -211,9 +211,19 @@ func (h *Handler) Close() error { func (h *Handler) Upload(ctx context.Context, sessionID session.ID, reader io.Reader) (string, error) { path := h.path(sessionID) h.Logger.Debugf("uploading %s", path) + + // Make sure we don't overwrite an existing recording. + _, err := h.gcsClient.Bucket(h.Config.Bucket).Object(path).Attrs(ctx) + if err != storage.ErrObjectNotExist { + if err != nil { + return "", convertGCSError(err) + } + return "", trace.AlreadyExists("recording for session %q already exists in GCS", sessionID) + } + writer := h.gcsClient.Bucket(h.Config.Bucket).Object(path).NewWriter(ctx) start := time.Now() - _, err := io.Copy(writer, reader) + _, err = io.Copy(writer, reader) // Always close the writer, even if upload failed. closeErr := writer.Close() if err == nil { @@ -276,6 +286,9 @@ func (h *Handler) ensureBucket() error { err = h.gcsClient.Bucket(h.Config.Bucket).Create(h.clientContext, h.Config.ProjectID, &storage.BucketAttrs{ VersioningEnabled: true, Encryption: &storage.BucketEncryption{DefaultKMSKeyName: h.Config.KMSKeyName}, + // See https://cloud.google.com/storage/docs/json_api/v1/buckets/insert#parameters + PredefinedACL: "projectPrivate", + PredefinedDefaultObjectACL: "projectPrivate", }) err = convertGCSError(err) if err != nil { From 3b016460d39c616205b5c53ca500b2c3391e3e11 Mon Sep 17 00:00:00 2001 From: Andrew Lytvynov <andrew@gravitational.com> Date: Tue, 26 May 2020 15:40:11 -0700 Subject: [PATCH 3/4] gcssessions: remove build tag from tests Tests run against an in-memory mock GCS, no need to guard them with a build tag. --- lib/events/gcssessions/gcshandler_test.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/lib/events/gcssessions/gcshandler_test.go b/lib/events/gcssessions/gcshandler_test.go index c96e4016c5234..e40e124e0cf15 100644 --- a/lib/events/gcssessions/gcshandler_test.go +++ b/lib/events/gcssessions/gcshandler_test.go @@ -1,5 +1,3 @@ -// +build gcs - /* Licensed under the Apache License, Version 2.0 (the "License"); @@ -26,7 +24,6 @@ import ( "github.com/fsouza/fake-gcs-server/fakestorage" "github.com/gravitational/teleport/lib/events/test" "github.com/gravitational/teleport/lib/utils" - "github.com/gravitational/trace" "github.com/pborman/uuid" "gopkg.in/check.v1" ) @@ -71,8 +68,6 @@ func (s *GCSSuite) TearDownSuite(c *check.C) { s.gcsServer.Stop() } if s.handler != nil { - if err := s.handler.deleteBucket(); err != nil { - c.Fatalf("Failed to delete bucket: %#v", trace.DebugReport(err)) - } + s.handler.Close() } } From 668b22b38520ae0e48a838548b8b058a2c8867e4 Mon Sep 17 00:00:00 2001 From: Andrew Lytvynov <andrew@gravitational.com> Date: Tue, 26 May 2020 16:07:52 -0700 Subject: [PATCH 4/4] Fix tests for firestore backends Reimplement cleanup code in the test itself, since the helper method is gone. --- lib/backend/firestore/firestorebk_test.go | 18 ++++++++++--- .../firestoreevents/firestoreevents_test.go | 25 ++++++++++++++----- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/lib/backend/firestore/firestorebk_test.go b/lib/backend/firestore/firestorebk_test.go index e362667e90584..cf3847716f812 100644 --- a/lib/backend/firestore/firestorebk_test.go +++ b/lib/backend/firestore/firestorebk_test.go @@ -59,12 +59,24 @@ func (s *FirestoreSuite) SetUpSuite(c *check.C) { s.suite.NewBackend = newBackend } -func (s *FirestoreSuite) SetUpTest(c *check.C) { - s.bk.deleteAllItems() +func (s *FirestoreSuite) TearDownTest(c *check.C) { + // Delete all documents. + ctx := context.Background() + docSnaps, err := s.bk.svc.Collection(s.bk.CollectionName).Documents(ctx).GetAll() + c.Assert(err, check.IsNil) + if len(docSnaps) == 0 { + return + } + batch := s.bk.svc.Batch() + for _, docSnap := range docSnaps { + batch.Delete(docSnap.Ref) + } + _, err = batch.Commit(ctx) + c.Assert(err, check.IsNil) } func (s *FirestoreSuite) TearDownSuite(c *check.C) { - s.bk.deleteAllItems() + s.bk.Close() } func (s *FirestoreSuite) TestCRUD(c *check.C) { diff --git a/lib/events/firestoreevents/firestoreevents_test.go b/lib/events/firestoreevents/firestoreevents_test.go index 19f1765e7c7ed..09e748e314868 100644 --- a/lib/events/firestoreevents/firestoreevents_test.go +++ b/lib/events/firestoreevents/firestoreevents_test.go @@ -19,6 +19,7 @@ limitations under the License. package firestoreevents import ( + "context" "testing" "time" @@ -61,14 +62,26 @@ func (s *FirestoreeventsSuite) SetUpSuite(c *check.C) { s.EventsSuite.QueryDelay = time.Second } -func (s *FirestoreeventsSuite) SetUpTest(c *check.C) { - s.log.deleteAllItems() +func (s *FirestoreeventsSuite) TearDownSuite(c *check.C) { + s.log.Close() } -func (s *FirestoreeventsSuite) TestSessionEventsCRUD(c *check.C) { - s.SessionEventsCRUD(c) +func (s *FirestoreeventsSuite) TearDownTest(c *check.C) { + // Delete all documents. + ctx := context.Background() + docSnaps, err := s.log.svc.Collection(s.log.CollectionName).Documents(ctx).GetAll() + c.Assert(err, check.IsNil) + if len(docSnaps) == 0 { + return + } + batch := s.log.svc.Batch() + for _, docSnap := range docSnaps { + batch.Delete(docSnap.Ref) + } + _, err = batch.Commit(ctx) + c.Assert(err, check.IsNil) } -func (s *FirestoreeventsSuite) TearDownSuite(c *check.C) { - s.log.deleteAllItems() +func (s *FirestoreeventsSuite) TestSessionEventsCRUD(c *check.C) { + s.SessionEventsCRUD(c) }