diff --git a/integration/appaccess/appaccess_test.go b/integration/appaccess/appaccess_test.go index a86156280023b..7ab0e685fb267 100644 --- a/integration/appaccess/appaccess_test.go +++ b/integration/appaccess/appaccess_test.go @@ -582,7 +582,7 @@ func testAuditEvents(p *Pack, t *testing.T) { expectedEvent, event, cmpopts.IgnoreTypes(apievents.ServerMetadata{}, apievents.SessionMetadata{}, apievents.UserMetadata{}, apievents.ConnectionMetadata{}), - cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "Time"), + cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "Time", "Index"), cmpopts.IgnoreFields(apievents.AppSessionChunk{}, "SessionChunkID"), )) }) diff --git a/lib/events/dynamoevents/dynamoevents.go b/lib/events/dynamoevents/dynamoevents.go index bc50860062c15..63f0826cde9a3 100644 --- a/lib/events/dynamoevents/dynamoevents.go +++ b/lib/events/dynamoevents/dynamoevents.go @@ -446,8 +446,9 @@ func (l *Log) createPutItem(sessionID string, in apievents.AuditEvent) (*dynamod return nil, trace.Wrap(err) } return &dynamodb.PutItemInput{ - Item: av, - TableName: aws.String(l.Tablename), + Item: av, + TableName: aws.String(l.Tablename), + ConditionExpression: aws.String("attribute_not_exists(SessionID) AND attribute_not_exists(EventIndex)"), }, nil } diff --git a/lib/events/dynamoevents/dynamoevents_test.go b/lib/events/dynamoevents/dynamoevents_test.go index d9a701ed62b40..452c9c97c7a0a 100644 --- a/lib/events/dynamoevents/dynamoevents_test.go +++ b/lib/events/dynamoevents/dynamoevents_test.go @@ -35,10 +35,12 @@ import ( "github.com/stretchr/testify/require" "github.com/gravitational/teleport" + apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/events/test" + "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/utils" ) @@ -434,6 +436,42 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { } } +// TestEmitSessionEventsSameIndex given events that share the same session ID +// and index, the emit should fail, avoiding any event to get overwritten. +func TestEmitSessionEventsSameIndex(t *testing.T) { + ctx := context.Background() + tt := setupDynamoContext(t) + sessionID := session.NewID() + + require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 0))) + require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1))) + require.Error(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1))) +} + +func generateEvent(sessionID session.ID, index int64) apievents.AuditEvent { + return &apievents.AppSessionChunk{ + Metadata: apievents.Metadata{ + Type: events.AppSessionChunkEvent, + Code: events.AppSessionChunkCode, + ClusterName: "root", + Index: index, + }, + ServerMetadata: apievents.ServerMetadata{ + ServerID: uuid.New().String(), + ServerNamespace: apidefaults.Namespace, + }, + SessionMetadata: apievents.SessionMetadata{ + SessionID: sessionID.String(), + }, + AppMetadata: apievents.AppMetadata{ + AppURI: "nginx", + AppPublicAddr: "https://nginx", + AppName: "nginx", + }, + SessionChunkID: uuid.New().String(), + } +} + var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") func randStringAlpha(n int) string { diff --git a/lib/events/recorder/recorder.go b/lib/events/recorder/recorder.go index a053b740d0f7f..b2bf1be6ecbf9 100644 --- a/lib/events/recorder/recorder.go +++ b/lib/events/recorder/recorder.go @@ -87,6 +87,10 @@ type Config struct { // BackoffDuration is a duration of the backoff before the next try. BackoffDuration time.Duration + + // StartTime represents the time the recorder started. If not zero, this + // value is used to generate the events index. + StartTime time.Time } // New returns a [events.SessionPreparerRecorder]. If session recording is disabled, @@ -112,6 +116,7 @@ func New(cfg Config) (events.SessionPreparerRecorder, error) { Clock: cfg.Clock, UID: cfg.UID, ClusterName: cfg.ClusterName, + StartTime: cfg.StartTime, }) if err != nil { return nil, trace.Wrap(err) diff --git a/lib/events/setter.go b/lib/events/setter.go index 309ea07e5744e..a2e4d60b6ccea 100644 --- a/lib/events/setter.go +++ b/lib/events/setter.go @@ -22,6 +22,7 @@ import ( "context" "sync" "sync/atomic" + "time" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" @@ -60,6 +61,10 @@ type PreparerConfig struct { // ClusterName defines the name of this teleport cluster. ClusterName string + + // StartTime represents the time the recorder started. If not zero, this + // value is used to generate the events index. + StartTime time.Time } // CheckAndSetDefaults checks and sets defaults @@ -110,8 +115,7 @@ func (c *Preparer) PrepareSessionEvent(event apievents.AuditEvent) (apievents.Pr srv.SetServerID(c.cfg.ServerID) } - // ensure index is incremented and loaded atomically - event.SetIndex(int64(c.eventIndex.Add(1) - 1)) + event.SetIndex(c.nextIndex()) preparedEvent := preparedSessionEvent{ event: event, @@ -135,6 +139,14 @@ func (c *Preparer) PrepareSessionEvent(event apievents.AuditEvent) (apievents.Pr return preparedEvent, nil } +func (c *Preparer) nextIndex() int64 { + if !c.cfg.StartTime.IsZero() { + return c.cfg.Clock.Since(c.cfg.StartTime).Nanoseconds() + } + + return int64(c.eventIndex.Add(1) - 1) +} + type preparedSessionEvent struct { event apievents.AuditEvent } diff --git a/lib/events/setter_test.go b/lib/events/setter_test.go new file mode 100644 index 0000000000000..8ad8a396f927f --- /dev/null +++ b/lib/events/setter_test.go @@ -0,0 +1,127 @@ +// Teleport +// Copyright (C) 2024 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package events + +import ( + "testing" + "time" + + "github.com/google/uuid" + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/require" + + apidefaults "github.com/gravitational/teleport/api/defaults" + apievents "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/lib/session" +) + +func TestPreparerIncrementalIndex(t *testing.T) { + sessionID := session.NewID() + preparer, err := NewPreparer(PreparerConfig{ + SessionID: sessionID, + ClusterName: "root", + }) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + e, err := preparer.PrepareSessionEvent(generateEvent()) + require.NoError(t, err) + require.Equal(t, int64(i), e.GetAuditEvent().GetIndex(), "unexpected event index") + } +} + +func TestPreparerTimeBasedIndex(t *testing.T) { + clock := clockwork.NewFakeClock() + preparer, err := NewPreparer(PreparerConfig{ + SessionID: session.NewID(), + ServerID: uuid.New().String(), + ClusterName: "root", + Clock: clock, + StartTime: clock.Now(), + }) + require.NoError(t, err) + + var lastIndex int64 + for i := 0; i < 9; i++ { + clock.Advance(time.Second) + e, err := preparer.PrepareSessionEvent(generateEvent()) + require.NoError(t, err) + require.Greater(t, e.GetAuditEvent().GetIndex(), lastIndex, "expected a larger index") + lastIndex = e.GetAuditEvent().GetIndex() + } +} + +func TestPreparerTimeBasedIndexCollisions(t *testing.T) { + serverID := uuid.New().String() + sessionID := session.NewID() + clusterName := "root" + clock := clockwork.NewFakeClock() + loginTime := clock.Now() + + preparerOne, err := NewPreparer(PreparerConfig{ + SessionID: sessionID, + ServerID: serverID, + ClusterName: clusterName, + Clock: clock, + StartTime: loginTime, + }) + require.NoError(t, err) + + preparerTwo, err := NewPreparer(PreparerConfig{ + SessionID: sessionID, + ServerID: serverID, + ClusterName: clusterName, + Clock: clock, + StartTime: loginTime, + }) + require.NoError(t, err) + + for i := 0; i < 9; i++ { + clock.Advance(time.Second) + evtOne, err := preparerOne.PrepareSessionEvent(generateEvent()) + require.NoError(t, err) + idxOne := evtOne.GetAuditEvent().GetIndex() + + clock.Advance(time.Second) + evtTwo, err := preparerTwo.PrepareSessionEvent(generateEvent()) + require.NoError(t, err) + idxTwo := evtTwo.GetAuditEvent().GetIndex() + + require.NotEqual(t, idxOne, idxTwo) + require.Greater(t, idxTwo, idxOne) + } +} + +func generateEvent() apievents.AuditEvent { + return &apievents.AppSessionChunk{ + Metadata: apievents.Metadata{ + Type: AppSessionChunkEvent, + Code: AppSessionChunkCode, + ClusterName: "root", + }, + ServerMetadata: apievents.ServerMetadata{ + ServerID: uuid.NewString(), + ServerNamespace: apidefaults.Namespace, + }, + AppMetadata: apievents.AppMetadata{ + AppURI: "nginx", + AppPublicAddr: "https://nginx", + AppName: "nginx", + }, + SessionChunkID: uuid.NewString(), + } +} diff --git a/lib/srv/app/server.go b/lib/srv/app/server.go index 0be9bf40caa08..0e546410ab094 100644 --- a/lib/srv/app/server.go +++ b/lib/srv/app/server.go @@ -24,6 +24,7 @@ package app import ( "context" "crypto/tls" + "crypto/x509" "errors" "net" "net/http" @@ -774,6 +775,10 @@ func (s *Server) handleConnection(conn net.Conn) (func(), error) { } } + // Add user certificate into the context after the monitor connection + // initialization to ensure value is present on the context. + ctx = authz.ContextWithUserCertificate(ctx, leafCertFromConn(tlsConn)) + // Application access supports plain TCP connections which are handled // differently than HTTP requests from web apps. if app.IsTCP() { @@ -912,7 +917,7 @@ func (s *Server) serveSession(w http.ResponseWriter, r *http.Request, identity * // minutes. Used to stream session chunks to the Audit Log. ttl := min(identity.Expires.Sub(s.c.Clock.Now()), 5*time.Minute) session, err := utils.FnCacheGetWithTTL(r.Context(), s.cache, identity.RouteToApp.SessionID, ttl, func(ctx context.Context) (*sessionChunk, error) { - session, err := s.newSessionChunk(ctx, identity, app, opts...) + session, err := s.newSessionChunk(ctx, identity, app, s.sessionStartTime(r.Context()), opts...) return session, trace.Wrap(err) }) if err != nil { @@ -1101,10 +1106,10 @@ func (s *Server) newHTTPServer(clusterName string) *http.Server { // newTCPServer creates a server that proxies TCP applications. func (s *Server) newTCPServer() (*tcpServer, error) { return &tcpServer{ - newAudit: func(sessionID string) (common.Audit, error) { + newAudit: func(ctx context.Context, sessionID string) (common.Audit, error) { // Audit stream is using server context, not session context, // to make sure that session is uploaded even after it is closed. - rec, err := s.newSessionRecorder(s.closeContext, sessionID) + rec, err := s.newSessionRecorder(s.closeContext, s.sessionStartTime(ctx), sessionID) if err != nil { return nil, trace.Wrap(err) } @@ -1139,6 +1144,17 @@ func (s *Server) getProxyPort() string { return port } +// sessionStartTime fetches the session start time based on the the certificate +// valid date. +func (s *Server) sessionStartTime(ctx context.Context) time.Time { + if userCert, err := authz.UserCertificateFromContext(ctx); err == nil { + return userCert.NotBefore + } + + s.log.Warn("Unable to retrieve session start time from certificate.") + return time.Time{} +} + // CopyAndConfigureTLS can be used to copy and modify an existing *tls.Config // for Teleport application proxy servers. func CopyAndConfigureTLS(log logrus.FieldLogger, client auth.AccessCache, config *tls.Config) *tls.Config { @@ -1187,3 +1203,13 @@ func newGetConfigForClientFn(log logrus.FieldLogger, client auth.AccessCache, tl return tlsCopy, nil } } + +// leafCertFromConn returns the leaf certificate from the connection. +func leafCertFromConn(tlsConn *tls.Conn) *x509.Certificate { + state := tlsConn.ConnectionState() + if len(state.PeerCertificates) == 0 { + return nil + } + + return state.PeerCertificates[0] +} diff --git a/lib/srv/app/server_test.go b/lib/srv/app/server_test.go index e493f06eae415..5fa1a45905a78 100644 --- a/lib/srv/app/server_test.go +++ b/lib/srv/app/server_test.go @@ -1011,7 +1011,6 @@ func TestRequestAuditEvents(t *testing.T) { Type: events.AppSessionChunkEvent, Code: events.AppSessionChunkCode, ClusterName: "root.example.com", - Index: 0, }, AppMetadata: apievents.AppMetadata{ AppURI: app.Spec.URI, @@ -1023,7 +1022,7 @@ func TestRequestAuditEvents(t *testing.T) { expectedEvent, event, cmpopts.IgnoreTypes(apievents.ServerMetadata{}, apievents.SessionMetadata{}, apievents.UserMetadata{}, apievents.ConnectionMetadata{}), - cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time"), + cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time", "Index"), cmpopts.IgnoreFields(apievents.AppSessionChunk{}, "SessionChunkID"), )) case events.AppSessionRequestEvent: @@ -1033,7 +1032,6 @@ func TestRequestAuditEvents(t *testing.T) { Type: events.AppSessionRequestEvent, Code: events.AppSessionRequestCode, ClusterName: "root.example.com", - Index: 1, }, AppMetadata: apievents.AppMetadata{ AppURI: app.Spec.URI, @@ -1048,7 +1046,7 @@ func TestRequestAuditEvents(t *testing.T) { expectedEvent, event, cmpopts.IgnoreTypes(apievents.ServerMetadata{}, apievents.SessionMetadata{}, apievents.UserMetadata{}, apievents.ConnectionMetadata{}), - cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time"), + cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time", "Index"), cmpopts.IgnoreFields(apievents.AppSessionChunk{}, "SessionChunkID"), )) } @@ -1101,7 +1099,7 @@ func TestRequestAuditEvents(t *testing.T) { expectedEvent, searchEvents[0], cmpopts.IgnoreTypes(apievents.ServerMetadata{}, apievents.SessionMetadata{}, apievents.UserMetadata{}, apievents.ConnectionMetadata{}), - cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time"), + cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time", "Index"), cmpopts.IgnoreFields(apievents.AppSessionChunk{}, "SessionChunkID"), )) } diff --git a/lib/srv/app/session.go b/lib/srv/app/session.go index 7bbedc96d0896..29c935180c155 100644 --- a/lib/srv/app/session.go +++ b/lib/srv/app/session.go @@ -92,7 +92,7 @@ type sessionOpt func(context.Context, *sessionChunk, *tlsca.Identity, types.Appl // The session chunk is created with inflight=1, // and as such expects `release()` to eventually be called // by the caller of this function. -func (s *Server) newSessionChunk(ctx context.Context, identity *tlsca.Identity, app types.Application, opts ...sessionOpt) (*sessionChunk, error) { +func (s *Server) newSessionChunk(ctx context.Context, identity *tlsca.Identity, app types.Application, startTime time.Time, opts ...sessionOpt) (*sessionChunk, error) { sess := &sessionChunk{ id: uuid.New().String(), closeC: make(chan struct{}), @@ -112,7 +112,7 @@ func (s *Server) newSessionChunk(ctx context.Context, identity *tlsca.Identity, // Create the stream writer that will write this chunk to the audit log. // Audit stream is using server context, not session context, // to make sure that session is uploaded even after it is closed. - rec, err := s.newSessionRecorder(s.closeContext, sess.id) + rec, err := s.newSessionRecorder(s.closeContext, startTime, sess.id) if err != nil { return nil, trace.Wrap(err) } @@ -295,7 +295,7 @@ func (s *Server) onSessionExpired(ctx context.Context, key, expired any) { // newSessionRecorder creates a session stream that will be used to record // requests that occur within this session chunk and upload the recording // to the Auth server. -func (s *Server) newSessionRecorder(ctx context.Context, chunkID string) (events.SessionPreparerRecorder, error) { +func (s *Server) newSessionRecorder(ctx context.Context, startTime time.Time, chunkID string) (events.SessionPreparerRecorder, error) { recConfig, err := s.c.AccessPoint.GetSessionRecordingConfig(ctx) if err != nil { return nil, trace.Wrap(err) @@ -317,6 +317,7 @@ func (s *Server) newSessionRecorder(ctx context.Context, chunkID string) (events DataDir: s.c.DataDir, Component: teleport.Component(teleport.ComponentSession, teleport.ComponentApp), Context: ctx, + StartTime: startTime, }) if err != nil { return nil, trace.Wrap(err) diff --git a/lib/srv/app/tcpserver.go b/lib/srv/app/tcpserver.go index 1ecb316a7f042..b90e1410f9357 100644 --- a/lib/srv/app/tcpserver.go +++ b/lib/srv/app/tcpserver.go @@ -33,7 +33,7 @@ import ( ) type tcpServer struct { - newAudit func(sessionID string) (common.Audit, error) + newAudit func(ctx context.Context, sessionID string) (common.Audit, error) hostID string log logrus.FieldLogger } @@ -55,7 +55,7 @@ func (s *tcpServer) handleConnection(ctx context.Context, clientConn net.Conn, i return trace.Wrap(err) } - audit, err := s.newAudit(identity.RouteToApp.SessionID) + audit, err := s.newAudit(ctx, identity.RouteToApp.SessionID) if err != nil { return trace.Wrap(err) }