Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/rds v1.43.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.32.0
github.com/aws/aws-sdk-go-v2/service/sns v1.20.8
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.6
github.com/aws/aws-sdk-go-v2/service/sts v1.18.9
github.com/aws/aws-sigv4-auth-cassandra-gocql-driver-plugin v0.0.0-20220331165046-e4d000c0d6a6
github.com/beevik/etree v1.1.0
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ github.com/aws/aws-sdk-go v1.44.244 h1:QzBWLD5HjZHdRZyTMTOWtD9Pobzf1n8/CeTJB4giX
github.com/aws/aws-sdk-go v1.44.244/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/aws/aws-sdk-go-v2 v1.17.3/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2 v1.17.7/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2 v1.17.8 h1:GMupCNNI7FARX27L7GjCJM8NgivWbRgpjNI/hOQjFS8=
github.com/aws/aws-sdk-go-v2 v1.17.8/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 h1:dK82zF6kkPeCo8J1e+tGx4JdvDIQzj7ygIoLg8WMuGs=
Expand All @@ -179,9 +180,11 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.2/go.mod h1:cDh1p6XkSGSwSRIA
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.62 h1:LhVbe/UDWvBT/jp5LYAweFVH8s+DNtT07Qp2riWEovU=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.62/go.mod h1:4xCuu1TSwhW5UH6WOdtS4/x/9UfMr2XplzKc86Ffj78=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.27/go.mod h1:a1/UpzeyBBerajpnP5nGZa9mGzsBn5cOKxm6NWQsvoI=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.31/go.mod h1:QT0BqUvX1Bh2ABdTGnjqEjvjzrCfIniM9Sc8zn9Yndo=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.32 h1:dpbVNUjczQ8Ae3QKHbpHBpfvaVkRdesxpTOe9pTouhU=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.32/go.mod h1:RudqOgadTWdcS3t/erPQo24pcVEoYyqj/kKW5Vya21I=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.21/go.mod h1:+Gxn8jYn5k9ebfHEqlhrMirFjSW0v0C9fI+KN5vk2kE=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.25/go.mod h1:zBHOPwhBc3FlQjQJE/D3IfPWiWaQmT06Vq9aNukDo0k=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.26 h1:QH2kOS3Ht7x+u0gHCh06CXL/h6G8LQJFpZfFBYBNboo=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.26/go.mod h1:vq86l7956VgFr0/FWQ2BWnK07QC3WYsepKzy33qqY5U=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28/go.mod h1:yRZVr/iT0AqyHeep00SZ4YfBAKojXz08w3XMBscdi0c=
Expand Down Expand Up @@ -209,6 +212,8 @@ github.com/aws/aws-sdk-go-v2/service/s3 v1.32.0 h1:NAc8WQsVQ3+kz3rU619mlz8NcbpZI
github.com/aws/aws-sdk-go-v2/service/s3 v1.32.0/go.mod h1:aSl9/LJltSz1cVusiR/Mu8tvI4Sv/5w/WWrJmmkNii0=
github.com/aws/aws-sdk-go-v2/service/sns v1.20.8 h1:wy1jYAot40/Odzpzeq9S3OfSddJJ5RmpaKujvj5Hz7k=
github.com/aws/aws-sdk-go-v2/service/sns v1.20.8/go.mod h1:HmCFGnmh0Tx4Onh9xUklrVhNcCsBTeDx4n53WGhp+oY=
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.6 h1:4P/vyx7zCI5yBhlDZ2kwhoLjMJi0X7iR3cxqjNfbego=
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.6/go.mod h1:HQHh1eChX10zDnGmD53WLYk8nPhUKO/JkAUUzDZ530Y=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.0/go.mod h1:wo/B7uUm/7zw/dWhBJ4FXuw1sySU5lyIhVg1Bu2yL9A=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.8 h1:5cb3D6xb006bPTqEfCNaEA6PPEfBXxxy4NNeX/44kGk=
github.com/aws/aws-sdk-go-v2/service/sso v1.12.8/go.mod h1:GNIveDnP+aE3jujyUSH5aZ/rktsTM5EvtKnCqBZawdw=
Expand Down
57 changes: 42 additions & 15 deletions lib/events/athena/athena.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ import (
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/utils"
)

const (
// TODO(tobiaszheller): move to batcher.go in other PR.
// maxWaitTimeOnReceiveMessageFromSQS defines how long single
// receiveFromQueue will wait if there is no max events (10).
maxWaitTimeOnReceiveMessageFromSQS = 5 * time.Second
// defaultBatchItems defines default value for batch items count.
// 20000 items, per average 500KB event size = 10MB
defaultBatchItems = 20000
// defaultBatchInterval defines default batch interval.
defaultBatchInterval = 1 * time.Minute
)

// Config structure represents Athena configuration.
Expand Down Expand Up @@ -67,7 +69,10 @@ type Config struct {
TableName string
// LocationS3 is location on S3 where Parquet files partitioned by date are
// stored (required).
LocationS3 string
LocationS3 string
locationS3Bucket string
locationS3Prefix string

// QueryResultsS3 is location on S3 where Athena stored query results (optional).
// Default results path can be defined by in workgroup settings.
QueryResultsS3 string
Expand Down Expand Up @@ -102,6 +107,8 @@ type Config struct {
// using aws-sdk-go-v2.
AWSConfig *aws.Config

Backend backend.Backend

// TODO(tobiaszheller): add FIPS config in later phase.
}

Expand Down Expand Up @@ -139,9 +146,15 @@ func (cfg *Config) CheckAndSetDefaults(ctx context.Context) error {
if cfg.LocationS3 == "" {
return trace.BadParameter("LocationS3 is not specified")
}
if scheme, ok := isValidUrlWithScheme(cfg.LocationS3); !ok || scheme != "s3" {
return trace.BadParameter("LocationS3 must be valid url and start with s3")
locationS3URL, err := url.Parse(cfg.LocationS3)
if err != nil {
return trace.BadParameter("LocationS3 must be valid url")
}
if locationS3URL.Scheme != "s3" {
return trace.BadParameter("LocationS3 must starts with s3://")
}
cfg.locationS3Bucket = locationS3URL.Host
cfg.locationS3Prefix = strings.TrimSuffix(strings.TrimPrefix(locationS3URL.Path, "/"), "/")

if cfg.LargeEventsS3 == "" {
return trace.BadParameter("LargeEventsS3 is not specified")
Expand Down Expand Up @@ -169,12 +182,11 @@ func (cfg *Config) CheckAndSetDefaults(ctx context.Context) error {
}

if cfg.BatchMaxItems == 0 {
// 20000 items, per average 500KB event size = 10MB
cfg.BatchMaxItems = 20000
cfg.BatchMaxItems = defaultBatchItems
}

if cfg.BatchMaxInterval == 0 {
cfg.BatchMaxInterval = 1 * time.Minute
cfg.BatchMaxInterval = defaultBatchInterval
}

if cfg.BatchMaxInterval < maxWaitTimeOnReceiveMessageFromSQS {
Expand Down Expand Up @@ -227,6 +239,10 @@ func (cfg *Config) CheckAndSetDefaults(ctx context.Context) error {
cfg.AWSConfig = &awsCfg
}

if cfg.Backend == nil {
return trace.BadParameter("Backend cannot be nil")
}

return nil
}

Expand Down Expand Up @@ -316,8 +332,9 @@ func (cfg *Config) SetFromURL(url *url.URL) error {
// Parquet and send it to S3 for long term storage.
// Athena is used for quering Parquet files on S3.
type Log struct {
publisher *publisher
querier *querier
publisher *publisher
querier *querier
consumerStop context.CancelFunc
}

// New creates an instance of an Athena based audit log.
Expand All @@ -326,12 +343,14 @@ func New(ctx context.Context, cfg Config) (*Log, error) {
if err != nil {
return nil, trace.Wrap(err)
}

consumerCtx, consumerCancel := context.WithCancel(ctx)

l := &Log{
publisher: newPublisher(cfg),
publisher: newPublisher(cfg),
consumerStop: consumerCancel,
}

// TODO(tobiaszheller): initialize batcher

l.querier, err = newQuerier(querierConfig{
tablename: cfg.TableName,
database: cfg.Database,
Expand All @@ -346,6 +365,13 @@ func New(ctx context.Context, cfg Config) (*Log, error) {
return nil, trace.Wrap(err)
}

consumer, err := newConsumer(cfg)
if err != nil {
return nil, trace.Wrap(err)
}

go consumer.run(consumerCtx)

return l, nil
}

Expand All @@ -362,6 +388,7 @@ func (l *Log) SearchSessionEvents(fromUTC, toUTC time.Time, limit int, order typ
}

func (l *Log) Close() error {
l.consumerStop()
Comment thread
rosstimothy marked this conversation as resolved.
return nil
}

Expand Down
135 changes: 134 additions & 1 deletion lib/events/athena/athena_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,34 @@ package athena

import (
"context"
"errors"
"io"
"net/url"
"os"
"sort"
"strings"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"github.com/stretchr/testify/require"

apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/utils"
)

func TestMain(m *testing.M) {
utils.InitLoggerForTests()
os.Exit(m.Run())
}

func TestConfig_SetFromURL(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -104,6 +122,10 @@ func TestConfig_SetFromURL(t *testing.T) {
}

func TestConfig_CheckAndSetDefaults(t *testing.T) {
type mockBackend struct {
backend.Backend
}

validConfig := Config{
Database: "db",
TableName: "tbl",
Expand All @@ -112,6 +134,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
LocationS3: "s3://events-bucket",
QueueURL: "https://queue-url",
AWSConfig: &aws.Config{},
Backend: mockBackend{},
}
tests := []struct {
name string
Expand All @@ -131,11 +154,13 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
LargeEventsS3: "s3://large-payloads-bucket",
largeEventsBucket: "large-payloads-bucket",
LocationS3: "s3://events-bucket",
locationS3Bucket: "events-bucket",
QueueURL: "https://queue-url",
GetQueryResultsInterval: 100 * time.Millisecond,
BatchMaxItems: 20000,
BatchMaxInterval: 1 * time.Minute,
AWSConfig: &aws.Config{},
Backend: mockBackend{},
},
},
{
Expand Down Expand Up @@ -181,7 +206,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
cfg.LocationS3 = "https://abc"
return cfg
},
wantErr: "LocationS3 must be valid url and start with s3",
wantErr: "LocationS3 must starts with s3://",
},
{
name: "missing QueueURL",
Expand Down Expand Up @@ -235,3 +260,111 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
})
}
}

func TestPublisherConsumer(t *testing.T) {
fS3 := newFakeS3manager()
fq := newFakeQueue()
p := &publisher{
snsPublisher: fq,
uploader: fS3,
}

smallEvent := &apievents.AppCreate{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Now().UTC(),
Type: events.AppCreateEvent,
},
AppMetadata: apievents.AppMetadata{
AppName: "app-small",
},
}

largeEvent := &apievents.AppCreate{
Metadata: apievents.Metadata{
ID: uuid.NewString(),
Time: time.Now().UTC(),
Type: events.AppCreateEvent,
Code: strings.Repeat("d", 2*maxSNSMessageSize),
},
AppMetadata: apievents.AppMetadata{
AppName: "app-large",
},
}

cfg := validCollectCfgForTests(t)
cfg.sqsReceiver = fq
cfg.payloadDownloader = fS3
cfg.batchMaxItems = 2
require.NoError(t, cfg.CheckAndSetDefaults())
c := newSqsMessagesCollector(cfg)

eventsChan := c.getEventsChan()

ctx := context.Background()
readSQSCtx, readCancel := context.WithCancel(ctx)
defer readCancel()

go c.fromSQS(readSQSCtx)

// receiver is used to read messages from eventsChan.
r := &receiver{}
go r.Do(eventsChan)

err := p.EmitAuditEvent(ctx, smallEvent)
require.NoError(t, err)
err = p.EmitAuditEvent(ctx, largeEvent)
require.NoError(t, err)
require.Eventually(t, func() bool {
return len(r.GetMsgs()) == 2
}, 200*time.Millisecond, 1*time.Millisecond, "missing events, got %d", len(r.GetMsgs()))

requireEventsEqualInAnyOrder(t, []apievents.AuditEvent{smallEvent, largeEvent}, eventAndAckIDToAuditEvents(r.GetMsgs()))
// S3 for uplodad should be called only once.
require.Equal(t, 1, fS3.uploadCount)
}

// requireEventsEqualInAnyOrder compares slices of auditevents ignoring order.
// It's useful in tests because consumer does not guarantee order.
func requireEventsEqualInAnyOrder(t *testing.T, want, got []apievents.AuditEvent) {
sort.Slice(want, func(i, j int) bool {
return want[i].GetID() < want[j].GetID()
})
sort.Slice(got, func(i, j int) bool {
return got[i].GetID() < got[j].GetID()
})
require.Empty(t, cmp.Diff(want, got))
}

type fakeS3manager struct {
objects map[string][]byte
uploadCount int
}

func newFakeS3manager() *fakeS3manager {
return &fakeS3manager{
objects: map[string][]byte{},
}
}

func (f *fakeS3manager) Upload(ctx context.Context, input *s3.PutObjectInput, opts ...func(*manager.Uploader)) (*manager.UploadOutput, error) {
data, err := io.ReadAll(input.Body)
if err != nil {
return nil, err
}
f.objects[*input.Key] = data
f.uploadCount++
return &manager.UploadOutput{Key: input.Key}, nil
}

func (f *fakeS3manager) Download(ctx context.Context, w io.WriterAt, input *s3.GetObjectInput, options ...func(*manager.Downloader)) (int64, error) {
data, ok := f.objects[*input.Key]
if !ok {
return 0, errors.New("object not found")
}
n, err := w.WriteAt(data, 0)
if err != nil {
return 0, err
}
return int64(n), nil
}
Loading