diff --git a/lib/backend/firestore/firestorebk.go b/lib/backend/firestore/firestorebk.go index 058e588a2e8a9..c16bceee45fba 100644 --- a/lib/backend/firestore/firestorebk.go +++ b/lib/backend/firestore/firestorebk.go @@ -24,7 +24,6 @@ import ( "cloud.google.com/go/firestore" apiv1 "cloud.google.com/go/firestore/apiv1/admin" - "github.com/gogo/protobuf/proto" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/utils" @@ -33,13 +32,14 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" log "github.com/sirupsen/logrus" ) -// FirestoreConfig structure represents Firestore configuration as appears in `storage` section of Teleport YAML +// Config structure represents Firestore configuration as appears in `storage` section of Teleport YAML type Config struct { // Credentials path for the Firestore client CredentialsPath string `json:"credentials_path,omitempty"` @@ -244,8 +244,8 @@ func New(ctx context.Context, params backend.Params) (*Backend, error) { if err != nil { return nil, trace.BadParameter("firestore: configuration is invalid: %v", err) } - l.Infof("Firestore: initializing backend.") - defer l.Debug("Firestore: backend created.") + l.Info("Initializing backend.") + defer l.Info("Backend created.") if err := cfg.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } @@ -569,13 +569,13 @@ func (b *Backend) keyToDocumentID(key []byte) string { func RetryingAsyncFunctionRunner(ctx context.Context, retryConfig utils.LinearConfig, logger *log.Logger, task func() error, taskName string) { retry, err := utils.NewLinear(retryConfig) if err != nil { - logger.Errorf("bad retry parameters: %v, returning and not running", err) + logger.WithError(err).Error("Bad retry parameters, returning and not running.") return } for { err := task() - if err != nil && (err != context.Canceled || status.Convert(err).Code() != codes.Canceled) { - logger.Errorf("%v returned with error: %v", taskName, err) + if err != nil && !isCanceled(err) { + logger.WithError(err).Errorf("Task %v has returned with error.", taskName) } logger.Debugf("Reloading %v for %s.", retry, taskName) select { @@ -588,6 +588,20 @@ func RetryingAsyncFunctionRunner(ctx context.Context, retryConfig utils.LinearCo } } +func isCanceled(err error) bool { + err = trace.Unwrap(err) + switch { + case err == nil: + return false + case err == context.Canceled: + return true + case status.Convert(err).Code() == codes.Canceled: + return true + default: + return false + } +} + // watchCollection watches a firestore collection for changes and pushes those changes, events into the buffer for watchers func (b *Backend) watchCollection() error { var snaps *firestore.QuerySnapshotIterator @@ -625,7 +639,6 @@ func (b *Backend) watchCollection() error { }, } } - b.Logger.Debugf("pushing event %v for key '%v'.", e.Type.String(), r.Key) b.buf.Push(e) } } @@ -724,10 +737,8 @@ func EnsureIndexes(ctx context.Context, adminSvc *apiv1.FirestoreAdminClient, tu }, }) if err != nil && status.Code(err) != codes.AlreadyExists { - l.Debug("non-already exists error, returning.") return ConvertGRPCError(err) } - // operation can be nil if error code is codes.AlreadyExists. if operation != nil { meta := adminpb.IndexOperationMetadata{} @@ -743,11 +754,16 @@ func EnsureIndexes(ctx context.Context, adminSvc *apiv1.FirestoreAdminClient, tu // check for statuses and block for len(tuplesToIndexNames) != 0 { - time.Sleep(timeInBetweenIndexCreationStatusChecks) + select { + case <-time.After(timeInBetweenIndexCreationStatusChecks): + case <-ctx.Done(): + return trace.ConnectionProblem(ctx.Err(), "context timed out or canceled") + } + for tuple, name := range tuplesToIndexNames { index, err := adminSvc.GetIndex(ctx, &adminpb.GetIndexRequest{Name: name}) if err != nil { - l.Warningf("error fetching index %q: %v", name, err) + l.WithError(err).Warningf("Failed to fetch index %q.", name) continue } l.Infof("Index for tuple %s-%s, %s, state is %s.", tuple.FirstField, tuple.SecondField, index.Name, index.State) diff --git a/lib/backend/firestore/firestorebk_test.go b/lib/backend/firestore/firestorebk_test.go index 83595fbc0d96d..67b5d1d4897a1 100644 --- a/lib/backend/firestore/firestorebk_test.go +++ b/lib/backend/firestore/firestorebk_test.go @@ -26,9 +26,27 @@ import ( "github.com/gravitational/teleport/lib/backend/test" "github.com/gravitational/teleport/lib/utils" + "github.com/stretchr/testify/assert" + adminpb "google.golang.org/genproto/googleapis/firestore/admin/v1" + "google.golang.org/protobuf/proto" + "gopkg.in/check.v1" ) +// TestMarshal tests index operation metadata marshal and unmarshal +// to verify backwards compatibility. Gogoproto is incompatible with ApiV2 protoc-gen-go code. +// +// Track the issue here: https://github.com/gogo/protobuf/issues/678 +// +func TestMarshal(t *testing.T) { + meta := adminpb.IndexOperationMetadata{} + data, err := proto.Marshal(&meta) + assert.NoError(t, err) + out := adminpb.IndexOperationMetadata{} + err = proto.Unmarshal(data, &out) + assert.NoError(t, err) +} + func TestFirestoreDB(t *testing.T) { check.TestingT(t) } type FirestoreSuite struct { @@ -42,7 +60,7 @@ func (s *FirestoreSuite) SetUpSuite(c *check.C) { utils.InitLoggerForTests(testing.Verbose()) if !emulatorRunning() { - c.Skip("firestore emulator not running, start it with: gcloud beta emulators firestore start --host-port=localhost:8618") + c.Skip("Firestore emulator is not running, start it with: gcloud beta emulators firestore start --host-port=localhost:8618") } newBackend := func() (backend.Backend, error) { diff --git a/lib/events/firestoreevents/firestoreevents.go b/lib/events/firestoreevents/firestoreevents.go index 0198bb8a3d193..832ed8db8f06f 100644 --- a/lib/events/firestoreevents/firestoreevents.go +++ b/lib/events/firestoreevents/firestoreevents.go @@ -302,6 +302,40 @@ func New(cfg EventsConfig) (*Log, error) { return b, nil } +// EmitAuditEvent emits audit event +func (l *Log) EmitAuditEvent(ctx context.Context, in events.AuditEvent) error { + data, err := utils.FastMarshal(in) + if err != nil { + return trace.Wrap(err) + } + + var sessionID string + getter, ok := in.(events.SessionMetadataGetter) + if ok && getter.GetSessionID() != "" { + sessionID = getter.GetSessionID() + } else { + // no session id - global event gets a random uuid to get a good partition + // key distribution + sessionID = uuid.New() + } + event := event{ + SessionID: sessionID, + EventIndex: in.GetIndex(), + EventType: in.GetType(), + EventNamespace: defaults.Namespace, + CreatedAt: in.GetTime().Unix(), + Fields: string(data), + } + start := time.Now() + _, err = l.svc.Collection(l.CollectionName).Doc(l.getDocIDForEvent(event)).Create(l.svcContext, event) + writeLatencies.Observe(time.Since(start).Seconds()) + writeRequests.Inc() + if err != nil { + return firestorebk.ConvertGRPCError(err) + } + return nil +} + // EmitAuditEventLegacy emits audit event func (l *Log) EmitAuditEventLegacy(ev events.Event, fields events.EventFields) error { sessionID := fields.GetString(events.SessionEventID) @@ -535,7 +569,7 @@ func (l *Log) Close() error { } func (l *Log) getDocIDForEvent(event event) string { - return event.SessionID + "-" + event.EventType + return uuid.New() } func (l *Log) purgeExpiredEvents() error { diff --git a/lib/events/firestoreevents/firestoreevents_test.go b/lib/events/firestoreevents/firestoreevents_test.go index 99c72021c0f37..8308329e4c62f 100644 --- a/lib/events/firestoreevents/firestoreevents_test.go +++ b/lib/events/firestoreevents/firestoreevents_test.go @@ -41,7 +41,7 @@ func (s *FirestoreeventsSuite) SetUpSuite(c *check.C) { utils.InitLoggerForTests() if !emulatorRunning() { - c.Skip("firestore emulator not running, start it with: gcloud beta emulators firestore start --host-port=localhost:8618") + c.Skip("Firestore emulator is not running, start it with: gcloud beta emulators firestore start --host-port=localhost:8618") } fakeClock := clockwork.NewFakeClock()