Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,14 @@ require (
github.com/russellhaering/goxmldsig v1.4.0
github.com/sashabaranov/go-openai v1.10.1
github.com/schollz/progressbar/v3 v3.13.1
github.com/segmentio/parquet-go v0.0.0-20230622230624-510764ae9e80
github.com/sethvargo/go-diceware v0.3.0
github.com/sirupsen/logrus v1.9.3
github.com/snowflakedb/gosnowflake v1.6.22
github.com/spf13/cobra v1.7.0
github.com/stretchr/testify v1.8.3
github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb
github.com/vulcand/predicate v1.2.0 // replaced
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20230312005205-fbbcdea5f512
go.etcd.io/etcd/api/v3 v3.5.9
go.etcd.io/etcd/client/v3 v3.5.9
go.mongodb.org/mongo-driver v1.12.0
Expand Down Expand Up @@ -214,7 +213,6 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
github.com/apache/arrow/go/v12 v12.0.0 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 // indirect
Expand Down Expand Up @@ -315,6 +313,7 @@ require (
github.com/kr/text v0.2.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/kyroy/priority-queue v0.0.0-20180327160706-6e21825e7e0c // indirect
github.com/lib/pq v1.10.4 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/lithammer/dedent v1.1.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand All @@ -337,6 +336,7 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/nsf/termbox-go v1.1.1 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/patrickmn/go-cache v0.0.0-20180815053127-5633e0862627 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
Expand All @@ -355,6 +355,7 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 // indirect
github.com/sasha-s/go-deadlock v0.0.0-20180226215254-237a9547c8a5 // indirect
github.com/segmentio/encoding v0.3.5 // indirect
github.com/shabbyrobe/gocovmerge v0.0.0-20190829150210-3e036491d500 // indirect
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 // indirect
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 // indirect
Expand Down
255 changes: 9 additions & 246 deletions go.sum

Large diffs are not rendered by default.

35 changes: 12 additions & 23 deletions lib/events/athena/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,8 @@ import (
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/prometheus/client_golang/prometheus"
"github.com/segmentio/parquet-go"
log "github.com/sirupsen/logrus"
"github.com/xitongsys/parquet-go-source/s3v2"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/source"
"github.com/xitongsys/parquet-go/writer"
"golang.org/x/exp/slices"

"github.com/gravitational/teleport"
Expand Down Expand Up @@ -73,7 +70,7 @@ type consumer struct {

// perDateFileParquetWriter returns file writer per date.
// Added in config to allow testing.
perDateFileParquetWriter func(ctx context.Context, date string) (source.ParquetFile, error)
perDateFileParquetWriter func(ctx context.Context, date string) (io.WriteCloser, error)

collectConfig sqsCollectConfig

Expand Down Expand Up @@ -135,9 +132,9 @@ func newConsumer(cfg Config, cancelFn context.CancelFunc, metricConsumerBatchPro
collectConfig: collectCfg,
sqsDeleter: sqsClient,
queueURL: cfg.QueueURL,
perDateFileParquetWriter: func(ctx context.Context, date string) (source.ParquetFile, error) {
perDateFileParquetWriter: func(ctx context.Context, date string) (io.WriteCloser, error) {
key := fmt.Sprintf("%s/%s/%s.parquet", cfg.locationS3Prefix, date, uuid.NewString())
fw, err := s3v2.NewS3FileWriterWithClient(ctx, s3client, cfg.locationS3Bucket, key, nil /* uploader options */, func(poi *s3.PutObjectInput) {
fw, err := awsutils.NewS3V2FileWriter(ctx, s3client, cfg.locationS3Bucket, key, nil /* uploader options */, func(poi *s3.PutObjectInput) {
// ChecksumAlgorithm is required for putting objects when object lock is enabled.
poi.ChecksumAlgorithm = s3Types.ChecksumAlgorithmSha256
})
Expand Down Expand Up @@ -707,7 +704,7 @@ func (s *sqsMessagesCollector) downloadEventFromS3(ctx context.Context, payload
// to s3 bucket. It returns receiptHandles of elements to delete from queue.
// If error is returned, it means that messages won't be deleted from SQS,
// and events will be retried or go to dead-letter queue.
func (c *consumer) writeToS3(ctx context.Context, eventsCh <-chan eventAndAckID, newPerDateFileWriterFn func(ctx context.Context, date string) (source.ParquetFile, error)) ([]string, error) {
func (c *consumer) writeToS3(ctx context.Context, eventsCh <-chan eventAndAckID, newPerDateFileWriterFn func(ctx context.Context, date string) (io.WriteCloser, error)) ([]string, error) {
toDelete := make([]string, 0, c.batchMaxItems)
// TODO(tobiaszheller): later write in goroutine, so far it's not bottleneck.
perDateWriter := map[string]*parquetWriter{}
Expand All @@ -725,7 +722,7 @@ eventLoop:
c.logger.WithError(err).Error("Could not convert event to parquet format")
continue
}
date := pqtEvent.GetDate()
date := pqtEvent.EventTime.Format(time.DateOnly)
pw := perDateWriter[date]
if pw == nil {
fw, err := newPerDateFileWriterFn(ctx, date)
Expand Down Expand Up @@ -771,23 +768,14 @@ eventLoop:
// If we are not able to close parquet file, it make sense to retrun
// error and retry whole batch again from SQS.

// TODO(tobiaszheller): verify if broken files are removed from s3.
return nil, trace.Wrap(err)
}
}
return toDelete, nil
}

func newParquetWriter(ctx context.Context, fw source.ParquetFile) (*parquetWriter, error) {
// numberOfWorkersMarshalingParquet defines number how many goroutines
// will do marshaling of objects. I have followed example from xitongsys/parquet-go, where they use 4.
const numberOfWorkersMarshalingParquet = 4
pw, err := writer.NewParquetWriter(fw, new(eventParquet), numberOfWorkersMarshalingParquet)
if err != nil {
return nil, trace.Wrap(err)
}
pw.CompressionType = parquet.CompressionCodec_SNAPPY

func newParquetWriter(ctx context.Context, fw io.WriteCloser) (*parquetWriter, error) {
pw := parquet.NewGenericWriter[eventParquet](fw, parquet.Compression(&parquet.Snappy))
return &parquetWriter{
closer: fw,
writer: pw,
Expand All @@ -796,15 +784,16 @@ func newParquetWriter(ctx context.Context, fw source.ParquetFile) (*parquetWrite

type parquetWriter struct {
closer io.Closer
writer *writer.ParquetWriter
writer *parquet.GenericWriter[eventParquet]
}

func (pw *parquetWriter) Write(ctx context.Context, in eventParquet) error {
return trace.Wrap(pw.writer.Write(in))
_, err := pw.writer.Write([]eventParquet{in})
return trace.Wrap(err)
}

func (pw *parquetWriter) Close() error {
if err := pw.writer.WriteStop(); err != nil {
if err := pw.writer.Close(); err != nil {
return trace.NewAggregate(err, pw.closer.Close())
}
return trace.Wrap(pw.closer.Close())
Expand Down
66 changes: 48 additions & 18 deletions lib/events/athena/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"io"
"math/big"
"os"
"path/filepath"
Expand All @@ -37,9 +38,8 @@ import (
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/prometheus/client_golang/prometheus"
"github.com/segmentio/parquet-go"
"github.com/stretchr/testify/require"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/source"

apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/backend/memory"
Expand Down Expand Up @@ -600,12 +600,12 @@ func TestConsumerWriteToS3(t *testing.T) {
defer cancel()

tmp := t.TempDir()
localWriter := func(ctx context.Context, date string) (source.ParquetFile, error) {
localWriter := func(ctx context.Context, date string) (io.WriteCloser, error) {
err := os.MkdirAll(filepath.Join(tmp, date), 0o777)
if err != nil {
return nil, err
}
localW, err := local.NewLocalFileWriter(filepath.Join(tmp, date, "test.parquet"))
localW, err := os.Create(filepath.Join(tmp, date, "test.parquet"))
return localW, err
}

Expand All @@ -617,11 +617,15 @@ func TestConsumerWriteToS3(t *testing.T) {
return &apievents.AppCreate{Metadata: apievents.Metadata{Type: events.AppCreateEvent, Time: t}, AppMetadata: apievents.AppMetadata{AppName: name}}
}

eventR1 := makeAppCreateEventWithTime(april1st2023Afternoon, "app-1")
eventR2 := makeAppCreateEventWithTime(april1st2023Afternoon.Add(10*time.Second), "app-2")
// r3 date is next date, so it should be written as separate file.
eventR3 := makeAppCreateEventWithTime(april1st2023Afternoon.Add(18*time.Hour), "app3")

events := []eventAndAckID{
{receiptHandle: "r1", event: makeAppCreateEventWithTime(april1st2023Afternoon, "app-1")},
{receiptHandle: "r2", event: makeAppCreateEventWithTime(april1st2023Afternoon.Add(10*time.Second), "app-2")},
// r3 date is next date, so it should be written as separate file.
{receiptHandle: "r3", event: makeAppCreateEventWithTime(april1st2023Afternoon.Add(18*time.Hour), "app3")},
{receiptHandle: "r1", event: eventR1},
{receiptHandle: "r2", event: eventR2},
{receiptHandle: "r3", event: eventR3},
}

eventsC := make(chan eventAndAckID, 100)
Expand All @@ -638,28 +642,54 @@ func TestConsumerWriteToS3(t *testing.T) {
// Make sure that all events are marked to delete.
require.Equal(t, []string{"r1", "r2", "r3"}, gotHandlesToDelete)

// vefiry that both files for 2023-04-01 and 2023-04-02 were written and
// if they are equal to test data.
// verify that both files for 2023-04-01 and 2023-04-02 were written and
// if they contain audit events.
type wantGot struct {
wantFilepath string
gotFile string
name string
wantEvents []apievents.AuditEvent
gotFile string
}
toCheck := []wantGot{
{wantFilepath: filepath.Join("testdata/events_2023-04-01.parquet"), gotFile: filepath.Join(tmp, "2023-04-01", "test.parquet")},
{wantFilepath: filepath.Join("testdata/events_2023-04-02.parquet"), gotFile: filepath.Join(tmp, "2023-04-02", "test.parquet")},
Comment on lines 648 to 649
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reworked tests, because what we really want is to check if two parqet files with events are created and if events are stored there.
Generating parquet files and comparing it to testdata is already done in library.

{
name: "2023-04-01 should contain 2 events",
wantEvents: []apievents.AuditEvent{eventR1, eventR2},
gotFile: filepath.Join(tmp, "2023-04-01", "test.parquet"),
},
{
name: "2023-04-02 should contain 1 events",
wantEvents: []apievents.AuditEvent{eventR3},
gotFile: filepath.Join(tmp, "2023-04-02", "test.parquet"),
},
}

for _, v := range toCheck {
t.Run("Checking "+filepath.Base(v.wantFilepath), func(t *testing.T) {
got, err := os.ReadFile(v.gotFile)
t.Run("Checking "+v.name, func(t *testing.T) {
rows, err := parquet.ReadFile[eventParquet](v.gotFile)
require.NoError(t, err)
want, err := os.ReadFile(v.wantFilepath)
gotEvents, err := parquetRowsToAuditEvents(rows)
require.NoError(t, err)
require.Empty(t, cmp.Diff(got, want))

require.Empty(t, cmp.Diff(gotEvents, v.wantEvents))
})
}
}

func parquetRowsToAuditEvents(in []eventParquet) ([]apievents.AuditEvent, error) {
out := make([]apievents.AuditEvent, 0, len(in))
for _, p := range in {
var fields events.EventFields
if err := utils.FastUnmarshal([]byte(p.EventData), &fields); err != nil {
return nil, trace.Wrap(err, "failed to unmarshal event, %s", p.EventData)
}
event, err := events.FromEventFields(fields)
if err != nil {
return nil, trace.Wrap(err)
}
out = append(out, event)
}
return out, nil
}

func TestDeleteMessagesFromQueue(t *testing.T) {
t.Parallel()
ctx := context.Background()
Expand Down
2 changes: 2 additions & 0 deletions lib/events/athena/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/gravitational/teleport/lib/backend/memory"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/events/test"
"github.com/gravitational/teleport/lib/observability/tracing"
)

type athenaContext struct {
Expand Down Expand Up @@ -431,6 +432,7 @@ func (ac *athenaContext) setupInfraWithCleanup(t *testing.T, ctx context.Context
querierConfig: querierConfig{
getQueryResultsInterval: 100 * time.Millisecond,
clock: ac.clock,
tracer: tracing.NoopTracer(teleport.ComponentAthena),
},
}
err = q.waitForSuccess(ctx, aws.ToString(startQueryExecResp.QueryExecutionId))
Expand Down
Binary file removed lib/events/athena/testdata/events_2023-04-01.parquet
Binary file not shown.
Binary file removed lib/events/athena/testdata/events_2023-04-02.parquet
Binary file not shown.
19 changes: 7 additions & 12 deletions lib/events/athena/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,12 @@ import (
)

type eventParquet struct {
EventType string `parquet:"name=event_type, type=BYTE_ARRAY, convertedtype=UTF8"`
// TODO(tobiaszheller): what precision of timestamp we want. AWS supports micros, maybe we can use it instead of mili?
EventTime int64 `parquet:"name=event_time, type=INT64, convertedtype=TIMESTAMP_MILLIS"`
UID string `parquet:"name=uid, type=BYTE_ARRAY, convertedtype=UTF8"`
SessionID string `parquet:"name=session_id, type=BYTE_ARRAY, convertedtype=UTF8"`
User string `parquet:"name=user, type=BYTE_ARRAY, convertedtype=UTF8"`
EventData string `parquet:"name=event_data, type=BYTE_ARRAY, convertedtype=UTF8"`
}

func (e eventParquet) GetDate() string {
return time.UnixMilli(e.EventTime).Format(time.DateOnly)
EventType string `parquet:"event_type"`
EventTime time.Time `parquet:"event_time,timestamp(millisecond)"`
UID string `parquet:"uid"`
SessionID string `parquet:"session_id"`
User string `parquet:"user"`
EventData string `parquet:"event_data"`
}

func auditEventToParquet(event apievents.AuditEvent) (*eventParquet, error) {
Expand All @@ -46,7 +41,7 @@ func auditEventToParquet(event apievents.AuditEvent) (*eventParquet, error) {

return &eventParquet{
EventType: event.GetType(),
EventTime: event.GetTime().UnixMilli(),
EventTime: event.GetTime().UTC(),
UID: event.GetID(),
SessionID: events.GetSessionID(event),
User: events.GetTeleportUser(event),
Expand Down
66 changes: 66 additions & 0 deletions lib/utils/aws/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
package aws

import (
"context"
"errors"
"io"

awsV2 "github.com/aws/aws-sdk-go-v2/aws"
managerV2 "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
s3v2 "github.com/aws/aws-sdk-go-v2/service/s3"
s3Types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
Expand Down Expand Up @@ -62,3 +67,64 @@ func ConvertS3Error(err error, args ...interface{}) error {
}
return err
}

// s3V2FileWriter can be used to upload data to s3 via io.WriteCloser interface.
type s3V2FileWriter struct {
// uploadFinisherErrChan is used to wait for completed upload as well as
// sending error message.
uploadFinisherErrChan <-chan error
pipeWriter *io.PipeWriter
pipeReader *io.PipeReader
}

// NewS3V2FileWriter created s3V2FileWriter. Close method on writer should be called
// to make sure that reader has finished.
func NewS3V2FileWriter(ctx context.Context, s3Client managerV2.UploadAPIClient, bucket, key string, uploaderOptions []func(*managerV2.Uploader), putObjectInputOptions ...func(*s3v2.PutObjectInput)) (*s3V2FileWriter, error) {
uploader := managerV2.NewUploader(s3Client, uploaderOptions...)
pr, pw := io.Pipe()

uploadParams := &s3v2.PutObjectInput{
Bucket: awsV2.String(bucket),
Key: awsV2.String(key),
Body: pr,
}

for _, f := range putObjectInputOptions {
f(uploadParams)
}
uploadFinisherErrChan := make(chan error)
go func() {
defer close(uploadFinisherErrChan)
_, err := uploader.Upload(ctx, uploadParams)
if err != nil {
pr.CloseWithError(err)
}
uploadFinisherErrChan <- trace.Wrap(err)
Comment thread
tobiaszheller marked this conversation as resolved.
Outdated
}()

return &s3V2FileWriter{
uploadFinisherErrChan: uploadFinisherErrChan,
pipeWriter: pw,
pipeReader: pr,
}, nil
}

// Write bytes from in to the connected pipe.
func (s *s3V2FileWriter) Write(in []byte) (int, error) {
bytesWritten, writeError := s.pipeWriter.Write(in)
if writeError != nil {
s.pipeWriter.CloseWithError(writeError)
return bytesWritten, writeError
}
return bytesWritten, nil
}

// Close signals write completion and cleans up any
// open streams. Will block until pending uploads are complete.
func (s *s3V2FileWriter) Close() error {
wCloseErr := s.pipeWriter.Close()
// wait for reader to finish, it will be triggered by writer.Close
readerErr := <-s.uploadFinisherErrChan
rCloseErr := s.pipeReader.Close()
return trace.Wrap(trace.NewAggregate(wCloseErr, readerErr, rCloseErr))
Comment thread
tobiaszheller marked this conversation as resolved.
Outdated
}
Loading