Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ require (
github.com/stretchr/testify v1.8.2
github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb
github.com/vulcand/predicate v1.2.0 // replaced
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20230312005205-fbbcdea5f512
go.etcd.io/etcd/api/v3 v3.5.7
go.etcd.io/etcd/client/v3 v3.5.7
go.mongodb.org/mongo-driver v1.10.6
Expand Down Expand Up @@ -203,6 +205,7 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
github.com/apache/arrow/go/v10 v10.0.1 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
Expand Down Expand Up @@ -294,7 +297,7 @@ require (
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.15.13 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/klauspost/cpuid/v2 v2.1.0 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
Expand Down Expand Up @@ -357,7 +360,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/tools v0.7.0 // indirect
Expand Down
255 changes: 250 additions & 5 deletions go.sum

Large diffs are not rendered by default.

129 changes: 118 additions & 11 deletions lib/events/athena/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"sync"
"time"
Expand All @@ -27,8 +28,13 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sqs"
sqsTypes "github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/google/uuid"
"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"
"github.com/xitongsys/parquet-go-source/s3v2"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/source"
"github.com/xitongsys/parquet-go/writer"
"golang.org/x/exp/slices"

"github.com/gravitational/teleport"
Expand Down Expand Up @@ -61,6 +67,10 @@ type consumer struct {
batchMaxItems int
batchMaxInterval time.Duration

// perDateFileParquetWriter returns file writer per date.
// Added in config to allow testing.
perDateFileParquetWriter func(ctx context.Context, date string) (source.ParquetFile, error)

collectConfig sqsCollectConfig
}

Expand Down Expand Up @@ -100,6 +110,16 @@ func newConsumer(cfg Config) (*consumer, error) {
batchMaxItems: cfg.BatchMaxItems,
batchMaxInterval: cfg.BatchMaxInterval,
collectConfig: collectCfg,
perDateFileParquetWriter: func(ctx context.Context, date string) (source.ParquetFile, error) {
key := fmt.Sprintf("%s/%s/%s.parquet", cfg.locationS3Prefix, date, uuid.NewString())

// TODO(tobiaszheller): verify later acl, kms customer, object lock etc.
fw, err := s3v2.NewS3FileWriterWithClient(ctx, s3client, cfg.locationS3Bucket, key, nil)
if err != nil {
return nil, trace.Wrap(err)
}
return fw, nil
},
}, nil
}

Expand Down Expand Up @@ -188,11 +208,11 @@ func (c *consumer) processBatchOfEvents(ctx context.Context) (reachedMaxSize boo
go func() {
msgsCollector.fromSQS(readSQSCtx)
}()
var err error
size, err = c.writeToS3(ctx, msgsCollector.getEventsChan())
toDelete, err := c.writeToS3(ctx, msgsCollector.getEventsChan(), c.perDateFileParquetWriter)
if err != nil {
return false, trace.Wrap(err)
}
size = len(toDelete)
return size >= c.batchMaxItems, nil
// TODO(tobiaszheller): delete messages from queue in next PR.
}
Expand Down Expand Up @@ -513,20 +533,107 @@ func (s *sqsMessagesCollector) downloadEventFromS3(ctx context.Context, payload
return buf.Bytes(), nil
}

// writeToS3 is not doing anything then just receiving from channel and printing
// for now. It will be changed in next PRs to actually write to S3 via parquet writer.
func (c *consumer) writeToS3(ctx context.Context, eventsChan <-chan eventAndAckID) (int, error) {
var size int
// writeToS3 reades events from eventsCh and writes them via parquet writer
// to s3 bucket. It returns receiptHandles of elements to delete from queue.
// If error is returned, it means that messages won't be deleted from SQS,
// and events will be retried or go to dead-letter queue.
func (c *consumer) writeToS3(ctx context.Context, eventsCh <-chan eventAndAckID, newPerDateFileWriterFn func(ctx context.Context, date string) (source.ParquetFile, error)) ([]string, error) {
toDelete := make([]string, 0, c.batchMaxItems)
// TODO(tobiaszheller): later write in goroutine, so far it's not bottleneck.
perDateWriter := map[string]*parquetWriter{}
eventLoop:
for {
select {
case <-ctx.Done():
return size, trace.Wrap(ctx.Err())
case eventAndAckID, ok := <-eventsChan:
return nil, trace.Wrap(ctx.Err())
case eventAndAckID, ok := <-eventsCh:
if !ok {
return size, nil
break eventLoop
}
pqtEvent, err := auditEventToParquet(eventAndAckID.event)
if err != nil {
// TODO(tobiaszheller): come back and add some metrics here.
c.logger.WithError(err).Error("Could not convert event to parquet format")
continue
}
date := pqtEvent.GetDate()
pw := perDateWriter[date]
if pw == nil {
fw, err := newPerDateFileWriterFn(ctx, date)
if err != nil {
// While using s3 file writer, error is not used
// when creating file writer.
return nil, trace.Wrap(err)
}
pw, err = newParquetWriter(ctx, fw)
if err != nil {
// Error here means that probably something is wrong with
// parquet schema. Returning from fn with error make sense.
return nil, trace.Wrap(err)
}
perDateWriter[date] = pw
}
size++
c.logger.Debugf("Received event: %s %s", eventAndAckID.event.GetID(), eventAndAckID.event.GetType())
if err := pw.Write(ctx, *pqtEvent); err != nil {
// pw.Write returns error only on flushing operation which
// does not happen on every write.
// So there is no easy way to say, which event caused trouble and
// skip it.
// It may happen that one wrong entry will cause whole batch
// to write failure. Although it should not happen often because
// we are validating message before. If it happen though, whole
// batch will go to dead letter.
// TODO(tobiaszheller): check how other parquet libs are handling it.
// or maybe use Flush explicitly. Need to check performance.
return nil, trace.Wrap(err)
}

// Elements are just added to slice here. Acknowledge happens only if whole
// writeToS3 method succeed.
toDelete = append(toDelete, eventAndAckID.receiptHandle)
}
}

for _, pw := range perDateWriter {
if err := pw.Close(); err != nil {
// Typically there will be data just for one date.
// If we are not able to close parquet file, it make sense to retrun
// error and retry whole batch again from SQS.

// TODO(tobiaszheller): verify if broken files are removed from s3.
return nil, trace.Wrap(err)
}
}
return toDelete, nil
}

func newParquetWriter(ctx context.Context, fw source.ParquetFile) (*parquetWriter, error) {
// numberOfWorkersMarshalingParquet defines number how many goroutines
// will do marshaling of objects. I have followed example from xitongsys/parquet-go, where they use 4.
const numberOfWorkersMarshalingParquet = 4
pw, err := writer.NewParquetWriter(fw, new(eventParquet), numberOfWorkersMarshalingParquet)
if err != nil {
return nil, trace.Wrap(err)
}
pw.CompressionType = parquet.CompressionCodec_SNAPPY

return &parquetWriter{
closer: fw,
writer: pw,
}, nil
}

type parquetWriter struct {
closer io.Closer
writer *writer.ParquetWriter
}

func (pw *parquetWriter) Write(ctx context.Context, in eventParquet) error {
return trace.Wrap(pw.writer.Write(in))
}

func (pw *parquetWriter) Close() error {
if err := pw.writer.WriteStop(); err != nil {
return trace.NewAggregate(err, pw.closer.Close())
}
return trace.Wrap(pw.closer.Close())
}
73 changes: 73 additions & 0 deletions lib/events/athena/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"errors"
"fmt"
"math/big"
"os"
"path/filepath"
"strings"
"sync"
"testing"
Expand All @@ -36,6 +38,8 @@ import (
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/source"

apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
Expand Down Expand Up @@ -500,3 +504,72 @@ func TestErrHandlingFnFromSQS(t *testing.T) {
require.Contains(t, buf.String(), "printed only first")
})
}

// TestConsumerWriteToS3 is writing parquet files per date works.
// It receives events from different dates and make sure that multiple
// files are created and compare it against file in testdata.
// Testdata files should be verified with "parquet tools" cli after changing.
func TestConsumerWriteToS3(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

tmp := t.TempDir()
localWriter := func(ctx context.Context, date string) (source.ParquetFile, error) {
err := os.MkdirAll(filepath.Join(tmp, date), 0o777)
if err != nil {
return nil, err
}
localW, err := local.NewLocalFileWriter(filepath.Join(tmp, date, "test.parquet"))
return localW, err
}

april1st2023AfternoonStr := "2023-04-01T16:20:50.52Z"
april1st2023Afternoon, err := time.Parse(time.RFC3339, april1st2023AfternoonStr)
require.NoError(t, err)

makeAppCreateEventWithTime := func(t time.Time, name string) apievents.AuditEvent {
return &apievents.AppCreate{Metadata: apievents.Metadata{Type: events.AppCreateEvent, Time: t}, AppMetadata: apievents.AppMetadata{AppName: name}}
}

events := []eventAndAckID{
{receiptHandle: "r1", event: makeAppCreateEventWithTime(april1st2023Afternoon, "app-1")},
{receiptHandle: "r2", event: makeAppCreateEventWithTime(april1st2023Afternoon.Add(10*time.Second), "app-2")},
// r3 date is next date, so it should be written as separate file.
{receiptHandle: "r3", event: makeAppCreateEventWithTime(april1st2023Afternoon.Add(18*time.Hour), "app3")},
}

eventsC := make(chan eventAndAckID, 100)
go func() {
for _, e := range events {
eventsC <- e
}
close(eventsC)
}()

c := &consumer{}
gotHandlesToDelete, err := c.writeToS3(ctx, eventsC, localWriter)
require.NoError(t, err)
// Make sure that all events are marked to delete.
require.Equal(t, []string{"r1", "r2", "r3"}, gotHandlesToDelete)

// vefiry that both files for 2023-04-01 and 2023-04-02 were written and
// if they are equal to test data.
type wantGot struct {
wantFilepath string
gotFile string
}
toCheck := []wantGot{
{wantFilepath: filepath.Join("testdata/events_2023-04-01.parquet"), gotFile: filepath.Join(tmp, "2023-04-01", "test.parquet")},
{wantFilepath: filepath.Join("testdata/events_2023-04-02.parquet"), gotFile: filepath.Join(tmp, "2023-04-02", "test.parquet")},
}

for _, v := range toCheck {
t.Run("Checking "+filepath.Base(v.wantFilepath), func(t *testing.T) {
got, err := os.ReadFile(v.gotFile)
require.NoError(t, err)
want, err := os.ReadFile(v.wantFilepath)
require.NoError(t, err)
require.Empty(t, cmp.Diff(got, want))
})
}
}
Binary file not shown.
Binary file not shown.
54 changes: 54 additions & 0 deletions lib/events/athena/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 Gravitational, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package athena

import (
"time"

"github.com/gravitational/trace"

apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/utils"
)

// TODO(tobiaszheller): pass user at some point.
type eventParquet struct {
EventType string `parquet:"name=event_type, type=BYTE_ARRAY, convertedtype=UTF8"`
// TODO(tobiaszheller): what precision of timestamp we want. AWS supports micros, maybe we can use it instead of mili?
EventTime int64 `parquet:"name=event_time, type=INT64, convertedtype=TIMESTAMP_MILLIS"`
UID string `parquet:"name=uid, type=BYTE_ARRAY, convertedtype=UTF8"`
SessionID string `parquet:"name=session_id, type=BYTE_ARRAY, convertedtype=UTF8"`
EventData string `parquet:"name=event_data, type=BYTE_ARRAY, convertedtype=UTF8"`
}

func (e eventParquet) GetDate() string {
return time.UnixMilli(e.EventTime).Format(time.DateOnly)
}

func auditEventToParquet(event apievents.AuditEvent) (*eventParquet, error) {
jsonBlob, err := utils.FastMarshal(event)
if err != nil {
return nil, trace.Wrap(err)
}

return &eventParquet{
EventType: event.GetType(),
EventTime: event.GetTime().UnixMilli(),
UID: event.GetID(),
SessionID: events.GetSessionID(event),
EventData: string(jsonBlob),
}, nil
}