Skip to content

Commit

Permalink
Fix firestore events interface and init stage.
Browse files Browse the repository at this point in the history
This comit fixes #4508

Gogoproto is not compatible with APIv2 protoc-gen-go.
Track the issue here: gogo/protobuf#678
Meanwhile, this commit switches to google protobuf to unmarshal firebase struct.

Add a missing method EmitAuditEvent causing teleport to crash
with firestore events backend.
  • Loading branch information
klizhentas committed Oct 15, 2020
1 parent 1ab776a commit c9d8555
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 15 deletions.
40 changes: 28 additions & 12 deletions lib/backend/firestore/firestorebk.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"cloud.google.com/go/firestore"
apiv1 "cloud.google.com/go/firestore/apiv1/admin"
"github.com/gogo/protobuf/proto"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/utils"
Expand All @@ -33,13 +32,14 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
)

// FirestoreConfig structure represents Firestore configuration as appears in `storage` section of Teleport YAML
// Config structure represents Firestore configuration as appears in `storage` section of Teleport YAML
type Config struct {
// Credentials path for the Firestore client
CredentialsPath string `json:"credentials_path,omitempty"`
Expand Down Expand Up @@ -244,8 +244,8 @@ func New(ctx context.Context, params backend.Params) (*Backend, error) {
if err != nil {
return nil, trace.BadParameter("firestore: configuration is invalid: %v", err)
}
l.Infof("Firestore: initializing backend.")
defer l.Debug("Firestore: backend created.")
l.Info("Initializing backend.")
defer l.Info("Backend created.")
if err := cfg.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -569,13 +569,13 @@ func (b *Backend) keyToDocumentID(key []byte) string {
func RetryingAsyncFunctionRunner(ctx context.Context, retryConfig utils.LinearConfig, logger *log.Logger, task func() error, taskName string) {
retry, err := utils.NewLinear(retryConfig)
if err != nil {
logger.Errorf("bad retry parameters: %v, returning and not running", err)
logger.WithError(err).Error("Bad retry parameters, returning and not running.")
return
}
for {
err := task()
if err != nil && (err != context.Canceled || status.Convert(err).Code() != codes.Canceled) {
logger.Errorf("%v returned with error: %v", taskName, err)
if err != nil && !isCanceled(err) {
logger.WithError(err).Errorf("Task %v has returned with error.", taskName)
}
logger.Debugf("Reloading %v for %s.", retry, taskName)
select {
Expand All @@ -588,6 +588,20 @@ func RetryingAsyncFunctionRunner(ctx context.Context, retryConfig utils.LinearCo
}
}

func isCanceled(err error) bool {
err = trace.Unwrap(err)
switch {
case err == nil:
return false
case err == context.Canceled:
return true
case status.Convert(err).Code() == codes.Canceled:
return true
default:
return false
}
}

// watchCollection watches a firestore collection for changes and pushes those changes, events into the buffer for watchers
func (b *Backend) watchCollection() error {
var snaps *firestore.QuerySnapshotIterator
Expand Down Expand Up @@ -625,7 +639,6 @@ func (b *Backend) watchCollection() error {
},
}
}
b.Logger.Debugf("pushing event %v for key '%v'.", e.Type.String(), r.Key)
b.buf.Push(e)
}
}
Expand Down Expand Up @@ -724,10 +737,8 @@ func EnsureIndexes(ctx context.Context, adminSvc *apiv1.FirestoreAdminClient, tu
},
})
if err != nil && status.Code(err) != codes.AlreadyExists {
l.Debug("non-already exists error, returning.")
return ConvertGRPCError(err)
}

// operation can be nil if error code is codes.AlreadyExists.
if operation != nil {
meta := adminpb.IndexOperationMetadata{}
Expand All @@ -743,11 +754,16 @@ func EnsureIndexes(ctx context.Context, adminSvc *apiv1.FirestoreAdminClient, tu

// check for statuses and block
for len(tuplesToIndexNames) != 0 {
time.Sleep(timeInBetweenIndexCreationStatusChecks)
select {
case <-time.After(timeInBetweenIndexCreationStatusChecks):
case <-ctx.Done():
return trace.ConnectionProblem(ctx.Err(), "context timed out or canceled")
}

for tuple, name := range tuplesToIndexNames {
index, err := adminSvc.GetIndex(ctx, &adminpb.GetIndexRequest{Name: name})
if err != nil {
l.Warningf("error fetching index %q: %v", name, err)
l.WithError(err).Warningf("Failed to fetch index %q.", name)
continue
}
l.Infof("Index for tuple %s-%s, %s, state is %s.", tuple.FirstField, tuple.SecondField, index.Name, index.State)
Expand Down
20 changes: 19 additions & 1 deletion lib/backend/firestore/firestorebk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,27 @@ import (
"github.com/gravitational/teleport/lib/backend/test"
"github.com/gravitational/teleport/lib/utils"

"github.com/stretchr/testify/assert"
adminpb "google.golang.org/genproto/googleapis/firestore/admin/v1"
"google.golang.org/protobuf/proto"

"gopkg.in/check.v1"
)

// TestMarshal tests index operation metadata marshal and unmarshal
// to verify backwards compatibility. Gogoproto is incompatible with ApiV2 protoc-gen-go code.
//
// Track the issue here: https://github.com/gogo/protobuf/issues/678
//
func TestMarshal(t *testing.T) {
meta := adminpb.IndexOperationMetadata{}
data, err := proto.Marshal(&meta)
assert.NoError(t, err)
out := adminpb.IndexOperationMetadata{}
err = proto.Unmarshal(data, &out)
assert.NoError(t, err)
}

func TestFirestoreDB(t *testing.T) { check.TestingT(t) }

type FirestoreSuite struct {
Expand All @@ -42,7 +60,7 @@ func (s *FirestoreSuite) SetUpSuite(c *check.C) {
utils.InitLoggerForTests(testing.Verbose())

if !emulatorRunning() {
c.Skip("firestore emulator not running, start it with: gcloud beta emulators firestore start --host-port=localhost:8618")
c.Skip("Firestore emulator is not running, start it with: gcloud beta emulators firestore start --host-port=localhost:8618")
}

newBackend := func() (backend.Backend, error) {
Expand Down
36 changes: 35 additions & 1 deletion lib/events/firestoreevents/firestoreevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,40 @@ func New(cfg EventsConfig) (*Log, error) {
return b, nil
}

// EmitAuditEvent emits audit event
func (l *Log) EmitAuditEvent(ctx context.Context, in events.AuditEvent) error {
data, err := utils.FastMarshal(in)
if err != nil {
return trace.Wrap(err)
}

var sessionID string
getter, ok := in.(events.SessionMetadataGetter)
if ok && getter.GetSessionID() != "" {
sessionID = getter.GetSessionID()
} else {
// no session id - global event gets a random uuid to get a good partition
// key distribution
sessionID = uuid.New()
}
event := event{
SessionID: sessionID,
EventIndex: in.GetIndex(),
EventType: in.GetType(),
EventNamespace: defaults.Namespace,
CreatedAt: in.GetTime().Unix(),
Fields: string(data),
}
start := time.Now()
_, err = l.svc.Collection(l.CollectionName).Doc(l.getDocIDForEvent(event)).Create(l.svcContext, event)
writeLatencies.Observe(time.Since(start).Seconds())
writeRequests.Inc()
if err != nil {
return firestorebk.ConvertGRPCError(err)
}
return nil
}

// EmitAuditEventLegacy emits audit event
func (l *Log) EmitAuditEventLegacy(ev events.Event, fields events.EventFields) error {
sessionID := fields.GetString(events.SessionEventID)
Expand Down Expand Up @@ -535,7 +569,7 @@ func (l *Log) Close() error {
}

func (l *Log) getDocIDForEvent(event event) string {
return event.SessionID + "-" + event.EventType
return uuid.New()
}

func (l *Log) purgeExpiredEvents() error {
Expand Down
2 changes: 1 addition & 1 deletion lib/events/firestoreevents/firestoreevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *FirestoreeventsSuite) SetUpSuite(c *check.C) {
utils.InitLoggerForTests()

if !emulatorRunning() {
c.Skip("firestore emulator not running, start it with: gcloud beta emulators firestore start --host-port=localhost:8618")
c.Skip("Firestore emulator is not running, start it with: gcloud beta emulators firestore start --host-port=localhost:8618")
}

fakeClock := clockwork.NewFakeClock()
Expand Down

0 comments on commit c9d8555

Please sign in to comment.