Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ require (
go.etcd.io/etcd/client/v3 v3.5.9
go.mongodb.org/mongo-driver v1.10.6
go.mozilla.org/pkcs7 v0.0.0-20210826202110-33d05740a352
go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-sdk-go-v2/otelaws v0.42.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0
go.opentelemetry.io/otel v1.16.0
Expand Down Expand Up @@ -212,8 +213,10 @@ require (
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.25 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.19.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.28 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.27 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.14.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.10 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.25 h1:AzwRi5OKKwo4QNqPf7TjeO+tK8A
github.com/aws/aws-sdk-go-v2/internal/v4a v1.0.25/go.mod h1:SUbB4wcbSEyCvqBxv/O/IBf93RbEze7U7OnoTlpPB+g=
github.com/aws/aws-sdk-go-v2/service/athena v1.28.0 h1:fG77b1smde0usA1F7hM6VX3wK+NagrFWdbFOUhS63RQ=
github.com/aws/aws-sdk-go-v2/service/athena v1.28.0/go.mod h1:97btS9UBEnajlbXXJkaCAFIu1j3vfJKdQCnIhs853xY=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.19.7 h1:yb2o8oh3Y+Gg2g+wlzrWS3pB89+dHrXayT/d9cs8McU=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.19.7/go.mod h1:1MNss6sqoIsFGisX92do/5doiUCBrN7EjhZCS/8DUjI=
github.com/aws/aws-sdk-go-v2/service/ec2 v1.98.0 h1:WblDV33AG9dhv0zFEPEmGtD5UECSNpKMxtdENULfR8M=
github.com/aws/aws-sdk-go-v2/service/ec2 v1.98.0/go.mod h1:L3ZT0N/vBsw77mOAawXmRnREpEjcHd2v5Hzf7AkIH8M=
github.com/aws/aws-sdk-go-v2/service/glue v1.48.0 h1:XM5b9GQr3y72FZwPJFuQ0Eir2HtvUpqR2YbUid2AJp8=
Expand All @@ -344,6 +346,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.11/go.mod h1:
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.3/go.mod h1:Seb8KNmD6kVTjwRjVEgOT5hPin6sq+v4C2ycJQDwuH8=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.28 h1:vGWm5vTpMr39tEZfQeDiDAMgk+5qsnvRny3FjLpnH5w=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.28/go.mod h1:spfrICMD6wCAhjhzHuy6DOZZ+LAIY10UxhUmLzpJTTs=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.27 h1:QmyPCRZNMR1pFbiOi9kBZWZuKrKB9LD4cxltxQk4tNE=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.7.27/go.mod h1:DfuVY36ixXnsG+uTqnoLWunXAKJ4qjccoFrXUPpj+hs=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.3/go.mod h1:wlY6SVjuwvh3TVRpTqdy4I1JpBFLX4UGeKZdWntaocw=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.21/go.mod h1:lRToEJsn+DRA9lW4O9L9+/3hjTkUzlzyzHqn8MTds5k=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.27 h1:0iKliEXAcCa2qVtRs7Ot5hItA2MsufrphbRFlz1Owxo=
Expand Down Expand Up @@ -1686,6 +1690,8 @@ go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib v0.20.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUzl5H4LY0Kc=
go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-sdk-go-v2/otelaws v0.42.0 h1:qbwkDPy5GlvujCZypVtj9hUs6MuYc7CcGutZrJvOBCo=
go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-sdk-go-v2/otelaws v0.42.0/go.mod h1:PBzurQk0YP4VsskpIqA3hty1HyDhLywyGMhjUKdXfds=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0/go.mod h1:oVGt1LRbBOBq1A5BQLlUg9UaU/54aiHw8cgjV3aWZ/E=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0 h1:ZOLJc06r4CB42laIXg/7udr0pbZyuAihN10A/XuiQRY=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0/go.mod h1:5z+/ZWJQKXa9YT34fQNx5K8Hd1EoIhvtUygUQPqEOgQ=
Expand Down
12 changes: 12 additions & 0 deletions lib/events/athena/athena.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ import (
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-sdk-go-v2/otelaws"
oteltrace "go.opentelemetry.io/otel/trace"

"github.com/gravitational/teleport"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/observability/tracing"
"github.com/gravitational/teleport/lib/utils"
)

Expand Down Expand Up @@ -114,6 +117,9 @@ type Config struct {

Backend backend.Backend

// Tracer is used to create spans
Tracer oteltrace.Tracer

// TODO(tobiaszheller): add FIPS config in later phase.
}

Expand Down Expand Up @@ -245,13 +251,18 @@ func (cfg *Config) CheckAndSetDefaults(ctx context.Context) error {
if cfg.Region != "" {
awsCfg.Region = cfg.Region
}
otelaws.AppendMiddlewares(&awsCfg.APIOptions)
cfg.AWSConfig = &awsCfg
}

if cfg.Backend == nil {
return trace.BadParameter("Backend cannot be nil")
}

if cfg.Tracer == nil {
cfg.Tracer = tracing.NoopTracer(teleport.ComponentAthena)
}

return nil
}

Expand Down Expand Up @@ -370,6 +381,7 @@ func New(ctx context.Context, cfg Config) (*Log, error) {
awsCfg: cfg.AWSConfig,
logger: cfg.LogEntry,
clock: cfg.Clock,
tracer: cfg.Tracer,
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down
2 changes: 1 addition & 1 deletion lib/events/athena/athena_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
err := cfg.CheckAndSetDefaults(context.Background())
if tt.wantErr == "" {
require.NoError(t, err, "CheckAndSetDefaults return unexpected err")
require.Empty(t, cmp.Diff(tt.want, cfg, cmpopts.EquateApprox(0, 0.0001), cmpopts.IgnoreFields(Config{}, "Clock", "UIDGenerator", "LogEntry"), cmp.AllowUnexported(Config{})))
require.Empty(t, cmp.Diff(tt.want, cfg, cmpopts.EquateApprox(0, 0.0001), cmpopts.IgnoreFields(Config{}, "Clock", "UIDGenerator", "LogEntry", "Tracer"), cmp.AllowUnexported(Config{})))
} else {
require.ErrorContains(t, err, tt.wantErr)
}
Expand Down
48 changes: 48 additions & 0 deletions lib/events/athena/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ import (
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
oteltrace "go.opentelemetry.io/otel/trace"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/observability/tracing"
"github.com/gravitational/teleport/lib/utils"
)

Expand Down Expand Up @@ -72,6 +75,9 @@ type querierConfig struct {
clock clockwork.Clock
awsCfg *aws.Config
logger log.FieldLogger

// tracer is used to create spans
tracer oteltrace.Tracer
}

func (cfg *querierConfig) CheckAndSetDefaults() error {
Expand Down Expand Up @@ -99,6 +105,10 @@ func (cfg *querierConfig) CheckAndSetDefaults() error {
cfg.clock = clockwork.NewRealClock()
}

if cfg.tracer == nil {
cfg.tracer = tracing.NoopTracer(teleport.ComponentAthena)
}

return nil
}

Expand All @@ -114,6 +124,16 @@ func newQuerier(cfg querierConfig) (*querier, error) {
}

func (q *querier) SearchEvents(ctx context.Context, req events.SearchEventsRequest) ([]apievents.AuditEvent, string, error) {
ctx, span := q.tracer.Start(
ctx,
"audit/SearchEvents",
Comment thread
tobiaszheller marked this conversation as resolved.
oteltrace.WithAttributes(
attribute.Int("limit", req.Limit),
attribute.String("from", req.From.Format(time.RFC3339)),
attribute.String("to", req.To.Format(time.RFC3339)),
),
)
defer span.End()
filter := searchEventsFilter{eventTypes: req.EventTypes}
events, keyset, err := q.searchEvents(ctx, searchEventsRequest{
fromUTC: req.From.UTC(),
Expand All @@ -128,6 +148,16 @@ func (q *querier) SearchEvents(ctx context.Context, req events.SearchEventsReque
}

func (q *querier) SearchSessionEvents(ctx context.Context, req events.SearchSessionEventsRequest) ([]apievents.AuditEvent, string, error) {
ctx, span := q.tracer.Start(
ctx,
"audit/SearchSessionEvents",
oteltrace.WithAttributes(
attribute.Int("limit", req.Limit),
attribute.String("from", req.From.Format(time.RFC3339)),
attribute.String("to", req.To.Format(time.RFC3339)),
),
)
defer span.End()
// TODO(tobiaszheller): maybe if fromUTC is 0000-00-00, ask first last 30days and fallback to -inf - now-30
// for sessionID != "". This kind of call is done on RBAC to check if user can access that session.
filter := searchEventsFilter{eventTypes: []string{events.SessionEndEvent, events.WindowsDesktopSessionEndEvent}}
Expand Down Expand Up @@ -309,6 +339,8 @@ func prepareQuery(params searchParams) (query string, execParams []string) {
}

func (q *querier) startQueryExecution(ctx context.Context, query string, params []string) (string, error) {
ctx, span := q.tracer.Start(ctx, "athena/startQueryExecution")
defer span.End()
startQueryInput := &athena.StartQueryExecutionInput{
QueryExecutionContext: &athenaTypes.QueryExecutionContext{
Database: aws.String(q.database),
Expand All @@ -334,6 +366,14 @@ func (q *querier) startQueryExecution(ctx context.Context, query string, params
}

func (q *querier) waitForSuccess(ctx context.Context, queryId string) error {
ctx, span := q.tracer.Start(
ctx,
"athena/waitForSuccess",
oteltrace.WithAttributes(
attribute.String("queryId", queryId),
),
)
defer span.End()
ctx, cancel := context.WithTimeout(ctx, getQueryResultsMaxTime)
defer cancel()

Expand Down Expand Up @@ -373,6 +413,14 @@ func (q *querier) waitForSuccess(ctx context.Context, queryId string) error {
// Athena API allows only fetch 1000 results, so if client asks for more, multiple
// calls to GetQueryResults will be necessary.
func (q *querier) fetchResults(ctx context.Context, queryId string, limit int, condition utils.FieldsCondition) ([]apievents.AuditEvent, string, error) {
ctx, span := q.tracer.Start(
ctx,
"athena/fetchResults",
oteltrace.WithAttributes(
attribute.String("queryId", queryId),
),
)
defer span.End()
rb := &responseBuilder{}
// nextToken is used as offset to next calls for GetQueryResults.
var nextToken string
Expand Down
3 changes: 3 additions & 0 deletions lib/events/athena/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ import (
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/observability/tracing"
"github.com/gravitational/teleport/lib/utils"
)

Expand Down Expand Up @@ -294,6 +296,7 @@ func Test_querier_fetchResults(t *testing.T) {
querierConfig: querierConfig{
tablename: tableName,
logger: utils.NewLoggerForTests(),
tracer: tracing.NoopTracer(teleport.ComponentAthena),
},
athenaClient: &fakeAthenaResultsGetter{
resp: tt.fakeResp,
Expand Down
7 changes: 5 additions & 2 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ func initAuthUploadHandler(ctx context.Context, auditConfig types.ClusterAuditCo
}

// initAuthExternalAuditLog initializes the auth server's audit log.
func initAuthExternalAuditLog(ctx context.Context, auditConfig types.ClusterAuditConfig, backend backend.Backend) (events.AuditLogger, error) {
func initAuthExternalAuditLog(ctx context.Context, auditConfig types.ClusterAuditConfig, backend backend.Backend, tracingProvider *tracing.Provider) (events.AuditLogger, error) {
var hasNonFileLog bool
var loggers []events.AuditLogger
for _, eventsURI := range auditConfig.AuditEventsURIs() {
Expand Down Expand Up @@ -1424,6 +1424,9 @@ func initAuthExternalAuditLog(ctx context.Context, auditConfig types.ClusterAudi
Region: auditConfig.Region(),
Backend: backend,
}
if tracingProvider != nil {
cfg.Tracer = tracingProvider.Tracer(teleport.ComponentAthena)
}
err = cfg.SetFromURL(uri)
if err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -1545,7 +1548,7 @@ func (process *TeleportProcess) initAuthService() error {
}
// initialize external loggers. may return (nil, nil) if no
// external loggers have been defined.
externalLog, err := initAuthExternalAuditLog(process.ExitContext(), cfg.Auth.AuditConfig, process.backend)
externalLog, err := initAuthExternalAuditLog(process.ExitContext(), cfg.Auth.AuditConfig, process.backend, process.TracingProvider)
if err != nil {
if !trace.IsNotFound(err) {
return trace.Wrap(err)
Expand Down
4 changes: 2 additions & 2 deletions lib/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func TestServiceInitExternalLog(t *testing.T) {
AuditEventsURI: tt.events,
})
require.NoError(t, err)
loggers, err := initAuthExternalAuditLog(context.Background(), auditConfig, backend)
loggers, err := initAuthExternalAuditLog(context.Background(), auditConfig, backend, nil /* tracingProvider */)
if tt.isErr {
require.Error(t, err)
} else {
Expand Down Expand Up @@ -427,7 +427,7 @@ func TestAthenaAuditLogSetup(t *testing.T) {
AuditSessionsURI: "s3://testbucket/sessions-rec",
})
require.NoError(t, err)
log, err := initAuthExternalAuditLog(context.Background(), auditConfig, backend)
log, err := initAuthExternalAuditLog(context.Background(), auditConfig, backend, nil /* tracingProvider */)
tt.wantFn(t, log, err)
})
}
Expand Down