From 2d51c62a019b1a9de360ca6d0acce744f6347860 Mon Sep 17 00:00:00 2001 From: Gabriel Corado Date: Tue, 20 Feb 2024 21:53:52 -0300 Subject: [PATCH 1/6] feat(events): add event indexer based on time --- lib/events/dynamoevents/dynamoevents.go | 5 +- lib/events/dynamoevents/dynamoevents_test.go | 40 ++++++ lib/events/recorder/recorder.go | 5 + lib/events/setter.go | 16 ++- lib/events/setter_test.go | 125 +++++++++++++++++++ lib/srv/app/server.go | 35 +++++- lib/srv/app/server_test.go | 8 +- lib/srv/app/session.go | 7 +- lib/srv/app/tcpserver.go | 4 +- 9 files changed, 228 insertions(+), 17 deletions(-) create mode 100644 lib/events/setter_test.go diff --git a/lib/events/dynamoevents/dynamoevents.go b/lib/events/dynamoevents/dynamoevents.go index 770cf367eb6a2..57a2c7b77a67f 100644 --- a/lib/events/dynamoevents/dynamoevents.go +++ b/lib/events/dynamoevents/dynamoevents.go @@ -446,8 +446,9 @@ func (l *Log) createPutItem(sessionID string, in apievents.AuditEvent) (*dynamod return nil, trace.Wrap(err) } return &dynamodb.PutItemInput{ - Item: av, - TableName: aws.String(l.Tablename), + Item: av, + TableName: aws.String(l.Tablename), + ConditionExpression: aws.String("attribute_not_exists(SessionID) AND attribute_not_exists(EventIndex)"), }, nil } diff --git a/lib/events/dynamoevents/dynamoevents_test.go b/lib/events/dynamoevents/dynamoevents_test.go index d9a701ed62b40..7f86ef23aa662 100644 --- a/lib/events/dynamoevents/dynamoevents_test.go +++ b/lib/events/dynamoevents/dynamoevents_test.go @@ -35,10 +35,12 @@ import ( "github.com/stretchr/testify/require" "github.com/gravitational/teleport" + apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/events/test" + "github.com/gravitational/teleport/lib/session" "github.com/gravitational/teleport/lib/utils" ) @@ -434,6 +436,44 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { } } +// TestEmitSessionEventsSameIndex given events that share the same session ID +// and index, the emit should fail, avoiding any event to get overwriteen. +func TestEmitSessionEventsSameIndex(t *testing.T) { + ctx := context.Background() + tt := setupDynamoContext(t) + sessionID := session.NewID() + + require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(t, sessionID, 0))) + require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(t, sessionID, 1))) + require.Error(t, tt.log.EmitAuditEvent(ctx, generateEvent(t, sessionID, 1))) +} + +func generateEvent(t *testing.T, sessionID session.ID, index int64) apievents.AuditEvent { + t.Helper() + + return &apievents.AppSessionChunk{ + Metadata: apievents.Metadata{ + Type: events.AppSessionChunkEvent, + Code: events.AppSessionChunkCode, + ClusterName: "root", + Index: index, + }, + ServerMetadata: apievents.ServerMetadata{ + ServerID: uuid.New().String(), + ServerNamespace: apidefaults.Namespace, + }, + SessionMetadata: apievents.SessionMetadata{ + SessionID: sessionID.String(), + }, + AppMetadata: apievents.AppMetadata{ + AppURI: "nginx", + AppPublicAddr: "https://nginx", + AppName: "nginx", + }, + SessionChunkID: uuid.New().String(), + } +} + var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") func randStringAlpha(n int) string { diff --git a/lib/events/recorder/recorder.go b/lib/events/recorder/recorder.go index a053b740d0f7f..3ec510b71fe71 100644 --- a/lib/events/recorder/recorder.go +++ b/lib/events/recorder/recorder.go @@ -87,6 +87,10 @@ type Config struct { // BackoffDuration is a duration of the backoff before the next try. BackoffDuration time.Duration + + // StartTime represents the time the recorder statarted. If not zero, this + // value is used to generate the events index. + StartTime time.Time } // New returns a [events.SessionPreparerRecorder]. If session recording is disabled, @@ -112,6 +116,7 @@ func New(cfg Config) (events.SessionPreparerRecorder, error) { Clock: cfg.Clock, UID: cfg.UID, ClusterName: cfg.ClusterName, + StartTime: cfg.StartTime, }) if err != nil { return nil, trace.Wrap(err) diff --git a/lib/events/setter.go b/lib/events/setter.go index 309ea07e5744e..2a042360a5ea0 100644 --- a/lib/events/setter.go +++ b/lib/events/setter.go @@ -22,6 +22,7 @@ import ( "context" "sync" "sync/atomic" + "time" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" @@ -60,6 +61,10 @@ type PreparerConfig struct { // ClusterName defines the name of this teleport cluster. ClusterName string + + // StartTime represents the time the recorder statarted. If not zero, this + // value is used to generate the events index. + StartTime time.Time } // CheckAndSetDefaults checks and sets defaults @@ -110,8 +115,7 @@ func (c *Preparer) PrepareSessionEvent(event apievents.AuditEvent) (apievents.Pr srv.SetServerID(c.cfg.ServerID) } - // ensure index is incremented and loaded atomically - event.SetIndex(int64(c.eventIndex.Add(1) - 1)) + event.SetIndex(c.nextIndex()) preparedEvent := preparedSessionEvent{ event: event, @@ -135,6 +139,14 @@ func (c *Preparer) PrepareSessionEvent(event apievents.AuditEvent) (apievents.Pr return preparedEvent, nil } +func (c *Preparer) nextIndex() int64 { + if !c.cfg.StartTime.IsZero() { + return c.cfg.Clock.Now().Sub(c.cfg.StartTime).Nanoseconds() + } + + return int64(c.eventIndex.Add(1) - 1) +} + type preparedSessionEvent struct { event apievents.AuditEvent } diff --git a/lib/events/setter_test.go b/lib/events/setter_test.go new file mode 100644 index 0000000000000..6dc013392ca53 --- /dev/null +++ b/lib/events/setter_test.go @@ -0,0 +1,125 @@ +// Teleport +// Copyright (C) 2024 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package events + +import ( + "testing" + + "github.com/google/uuid" + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/require" + + apidefaults "github.com/gravitational/teleport/api/defaults" + apievents "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/lib/session" +) + +func TestPreparerIncrementalIndex(t *testing.T) { + sessionID := session.NewID() + preparer, err := NewPreparer(PreparerConfig{ + SessionID: sessionID, + ClusterName: "root", + }) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + e, err := preparer.PrepareSessionEvent(generateEvent(t)) + require.NoError(t, err) + require.Equal(t, int64(i), e.GetAuditEvent().GetIndex(), "unexpected event index") + } +} + +func TestPreparerTimeBasedIndex(t *testing.T) { + clock := clockwork.NewRealClock() + preparer, err := NewPreparer(PreparerConfig{ + SessionID: session.NewID(), + ServerID: uuid.New().String(), + ClusterName: "root", + Clock: clock, + StartTime: clock.Now(), + }) + require.NoError(t, err) + + var lastIndex int64 + for i := 0; i < 9; i++ { + e, err := preparer.PrepareSessionEvent(generateEvent(t)) + require.NoError(t, err) + require.Greater(t, e.GetAuditEvent().GetIndex(), lastIndex, "expected a larger index") + lastIndex = e.GetAuditEvent().GetIndex() + } +} + +func TestPreparerTimeBasedIndexCollisions(t *testing.T) { + serverID := uuid.New().String() + sessionID := session.NewID() + clusterName := "root" + clock := clockwork.NewRealClock() + loginTime := clock.Now() + + preparerOne, err := NewPreparer(PreparerConfig{ + SessionID: sessionID, + ServerID: serverID, + ClusterName: clusterName, + Clock: clock, + StartTime: loginTime, + }) + require.NoError(t, err) + + preparerTwo, err := NewPreparer(PreparerConfig{ + SessionID: sessionID, + ServerID: serverID, + ClusterName: clusterName, + Clock: clock, + StartTime: loginTime, + }) + require.NoError(t, err) + + for i := 0; i < 9; i++ { + evtOne, err := preparerOne.PrepareSessionEvent(generateEvent(t)) + require.NoError(t, err) + idxOne := evtOne.GetAuditEvent().GetIndex() + + evtTwo, err := preparerTwo.PrepareSessionEvent(generateEvent(t)) + require.NoError(t, err) + idxTwo := evtTwo.GetAuditEvent().GetIndex() + + require.NotEqual(t, idxOne, idxTwo) + require.Greater(t, idxTwo, idxOne) + } +} + +func generateEvent(t *testing.T) apievents.AuditEvent { + t.Helper() + + return &apievents.AppSessionChunk{ + Metadata: apievents.Metadata{ + Type: AppSessionChunkEvent, + Code: AppSessionChunkCode, + ClusterName: "root", + }, + ServerMetadata: apievents.ServerMetadata{ + ServerID: uuid.New().String(), + ServerNamespace: apidefaults.Namespace, + }, + AppMetadata: apievents.AppMetadata{ + AppURI: "nginx", + AppPublicAddr: "https://nginx", + AppName: "nginx", + }, + SessionChunkID: uuid.New().String(), + } +} diff --git a/lib/srv/app/server.go b/lib/srv/app/server.go index 0be9bf40caa08..26664e95af993 100644 --- a/lib/srv/app/server.go +++ b/lib/srv/app/server.go @@ -24,6 +24,7 @@ package app import ( "context" "crypto/tls" + "crypto/x509" "errors" "net" "net/http" @@ -774,6 +775,10 @@ func (s *Server) handleConnection(conn net.Conn) (func(), error) { } } + // Add user certificate into the context after the monitor connection + // initializion to avoid it from being lost. + ctx = authz.ContextWithUserCertificate(ctx, certFromConn(tlsConn)) + // Application access supports plain TCP connections which are handled // differently than HTTP requests from web apps. if app.IsTCP() { @@ -912,7 +917,7 @@ func (s *Server) serveSession(w http.ResponseWriter, r *http.Request, identity * // minutes. Used to stream session chunks to the Audit Log. ttl := min(identity.Expires.Sub(s.c.Clock.Now()), 5*time.Minute) session, err := utils.FnCacheGetWithTTL(r.Context(), s.cache, identity.RouteToApp.SessionID, ttl, func(ctx context.Context) (*sessionChunk, error) { - session, err := s.newSessionChunk(ctx, identity, app, opts...) + session, err := s.newSessionChunk(ctx, identity, app, s.sessionStartTime(r.Context()), opts...) return session, trace.Wrap(err) }) if err != nil { @@ -1101,10 +1106,10 @@ func (s *Server) newHTTPServer(clusterName string) *http.Server { // newTCPServer creates a server that proxies TCP applications. func (s *Server) newTCPServer() (*tcpServer, error) { return &tcpServer{ - newAudit: func(sessionID string) (common.Audit, error) { + newAudit: func(ctx context.Context, sessionID string) (common.Audit, error) { // Audit stream is using server context, not session context, // to make sure that session is uploaded even after it is closed. - rec, err := s.newSessionRecorder(s.closeContext, sessionID) + rec, err := s.newSessionRecorder(s.closeContext, s.sessionStartTime(ctx), sessionID) if err != nil { return nil, trace.Wrap(err) } @@ -1139,6 +1144,20 @@ func (s *Server) getProxyPort() string { return port } +// sessionStartTime fetches the session start time based on the the certificate +// valid date. If the certificate cannot be retrieve from the context, return +// zero start time and emit a warning log. +func (s *Server) sessionStartTime(ctx context.Context) time.Time { + var startTime time.Time + if userCert, err := authz.UserCertificateFromContext(ctx); err == nil { + startTime = userCert.NotBefore + } else { + s.log.Warn("Unable to retrieve session start time from certificate.") + } + + return startTime +} + // CopyAndConfigureTLS can be used to copy and modify an existing *tls.Config // for Teleport application proxy servers. func CopyAndConfigureTLS(log logrus.FieldLogger, client auth.AccessCache, config *tls.Config) *tls.Config { @@ -1187,3 +1206,13 @@ func newGetConfigForClientFn(log logrus.FieldLogger, client auth.AccessCache, tl return tlsCopy, nil } } + +// certFromConnState returns the connection certificate from the connection. +func certFromConn(tlsConn *tls.Conn) *x509.Certificate { + state := tlsConn.ConnectionState() + if len(state.PeerCertificates) != 1 { + return nil + } + + return state.PeerCertificates[0] +} diff --git a/lib/srv/app/server_test.go b/lib/srv/app/server_test.go index e493f06eae415..5fa1a45905a78 100644 --- a/lib/srv/app/server_test.go +++ b/lib/srv/app/server_test.go @@ -1011,7 +1011,6 @@ func TestRequestAuditEvents(t *testing.T) { Type: events.AppSessionChunkEvent, Code: events.AppSessionChunkCode, ClusterName: "root.example.com", - Index: 0, }, AppMetadata: apievents.AppMetadata{ AppURI: app.Spec.URI, @@ -1023,7 +1022,7 @@ func TestRequestAuditEvents(t *testing.T) { expectedEvent, event, cmpopts.IgnoreTypes(apievents.ServerMetadata{}, apievents.SessionMetadata{}, apievents.UserMetadata{}, apievents.ConnectionMetadata{}), - cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time"), + cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time", "Index"), cmpopts.IgnoreFields(apievents.AppSessionChunk{}, "SessionChunkID"), )) case events.AppSessionRequestEvent: @@ -1033,7 +1032,6 @@ func TestRequestAuditEvents(t *testing.T) { Type: events.AppSessionRequestEvent, Code: events.AppSessionRequestCode, ClusterName: "root.example.com", - Index: 1, }, AppMetadata: apievents.AppMetadata{ AppURI: app.Spec.URI, @@ -1048,7 +1046,7 @@ func TestRequestAuditEvents(t *testing.T) { expectedEvent, event, cmpopts.IgnoreTypes(apievents.ServerMetadata{}, apievents.SessionMetadata{}, apievents.UserMetadata{}, apievents.ConnectionMetadata{}), - cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time"), + cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time", "Index"), cmpopts.IgnoreFields(apievents.AppSessionChunk{}, "SessionChunkID"), )) } @@ -1101,7 +1099,7 @@ func TestRequestAuditEvents(t *testing.T) { expectedEvent, searchEvents[0], cmpopts.IgnoreTypes(apievents.ServerMetadata{}, apievents.SessionMetadata{}, apievents.UserMetadata{}, apievents.ConnectionMetadata{}), - cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time"), + cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time", "Index"), cmpopts.IgnoreFields(apievents.AppSessionChunk{}, "SessionChunkID"), )) } diff --git a/lib/srv/app/session.go b/lib/srv/app/session.go index 7bbedc96d0896..29c935180c155 100644 --- a/lib/srv/app/session.go +++ b/lib/srv/app/session.go @@ -92,7 +92,7 @@ type sessionOpt func(context.Context, *sessionChunk, *tlsca.Identity, types.Appl // The session chunk is created with inflight=1, // and as such expects `release()` to eventually be called // by the caller of this function. -func (s *Server) newSessionChunk(ctx context.Context, identity *tlsca.Identity, app types.Application, opts ...sessionOpt) (*sessionChunk, error) { +func (s *Server) newSessionChunk(ctx context.Context, identity *tlsca.Identity, app types.Application, startTime time.Time, opts ...sessionOpt) (*sessionChunk, error) { sess := &sessionChunk{ id: uuid.New().String(), closeC: make(chan struct{}), @@ -112,7 +112,7 @@ func (s *Server) newSessionChunk(ctx context.Context, identity *tlsca.Identity, // Create the stream writer that will write this chunk to the audit log. // Audit stream is using server context, not session context, // to make sure that session is uploaded even after it is closed. - rec, err := s.newSessionRecorder(s.closeContext, sess.id) + rec, err := s.newSessionRecorder(s.closeContext, startTime, sess.id) if err != nil { return nil, trace.Wrap(err) } @@ -295,7 +295,7 @@ func (s *Server) onSessionExpired(ctx context.Context, key, expired any) { // newSessionRecorder creates a session stream that will be used to record // requests that occur within this session chunk and upload the recording // to the Auth server. -func (s *Server) newSessionRecorder(ctx context.Context, chunkID string) (events.SessionPreparerRecorder, error) { +func (s *Server) newSessionRecorder(ctx context.Context, startTime time.Time, chunkID string) (events.SessionPreparerRecorder, error) { recConfig, err := s.c.AccessPoint.GetSessionRecordingConfig(ctx) if err != nil { return nil, trace.Wrap(err) @@ -317,6 +317,7 @@ func (s *Server) newSessionRecorder(ctx context.Context, chunkID string) (events DataDir: s.c.DataDir, Component: teleport.Component(teleport.ComponentSession, teleport.ComponentApp), Context: ctx, + StartTime: startTime, }) if err != nil { return nil, trace.Wrap(err) diff --git a/lib/srv/app/tcpserver.go b/lib/srv/app/tcpserver.go index 1ecb316a7f042..b90e1410f9357 100644 --- a/lib/srv/app/tcpserver.go +++ b/lib/srv/app/tcpserver.go @@ -33,7 +33,7 @@ import ( ) type tcpServer struct { - newAudit func(sessionID string) (common.Audit, error) + newAudit func(ctx context.Context, sessionID string) (common.Audit, error) hostID string log logrus.FieldLogger } @@ -55,7 +55,7 @@ func (s *tcpServer) handleConnection(ctx context.Context, clientConn net.Conn, i return trace.Wrap(err) } - audit, err := s.newAudit(identity.RouteToApp.SessionID) + audit, err := s.newAudit(ctx, identity.RouteToApp.SessionID) if err != nil { return trace.Wrap(err) } From 74e148face621d2b433a12fcbd017730c3ca4346 Mon Sep 17 00:00:00 2001 From: Gabriel Corado Date: Wed, 21 Feb 2024 14:04:27 -0300 Subject: [PATCH 2/6] chore(events): code review changes --- lib/events/dynamoevents/dynamoevents_test.go | 10 ++++------ lib/events/setter.go | 2 +- lib/events/setter_test.go | 12 +++++------- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/lib/events/dynamoevents/dynamoevents_test.go b/lib/events/dynamoevents/dynamoevents_test.go index 7f86ef23aa662..7c6a9276563d8 100644 --- a/lib/events/dynamoevents/dynamoevents_test.go +++ b/lib/events/dynamoevents/dynamoevents_test.go @@ -443,14 +443,12 @@ func TestEmitSessionEventsSameIndex(t *testing.T) { tt := setupDynamoContext(t) sessionID := session.NewID() - require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(t, sessionID, 0))) - require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(t, sessionID, 1))) - require.Error(t, tt.log.EmitAuditEvent(ctx, generateEvent(t, sessionID, 1))) + require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 0))) + require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1))) + require.Error(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1))) } -func generateEvent(t *testing.T, sessionID session.ID, index int64) apievents.AuditEvent { - t.Helper() - +func generateEvent(sessionID session.ID, index int64) apievents.AuditEvent { return &apievents.AppSessionChunk{ Metadata: apievents.Metadata{ Type: events.AppSessionChunkEvent, diff --git a/lib/events/setter.go b/lib/events/setter.go index 2a042360a5ea0..ab2fd948faf1d 100644 --- a/lib/events/setter.go +++ b/lib/events/setter.go @@ -141,7 +141,7 @@ func (c *Preparer) PrepareSessionEvent(event apievents.AuditEvent) (apievents.Pr func (c *Preparer) nextIndex() int64 { if !c.cfg.StartTime.IsZero() { - return c.cfg.Clock.Now().Sub(c.cfg.StartTime).Nanoseconds() + return c.cfg.Clock.Since(c.cfg.StartTime).Nanoseconds() } return int64(c.eventIndex.Add(1) - 1) diff --git a/lib/events/setter_test.go b/lib/events/setter_test.go index 6dc013392ca53..8b8e0f010b4db 100644 --- a/lib/events/setter_test.go +++ b/lib/events/setter_test.go @@ -37,7 +37,7 @@ func TestPreparerIncrementalIndex(t *testing.T) { require.NoError(t, err) for i := 0; i < 10; i++ { - e, err := preparer.PrepareSessionEvent(generateEvent(t)) + e, err := preparer.PrepareSessionEvent(generateEvent()) require.NoError(t, err) require.Equal(t, int64(i), e.GetAuditEvent().GetIndex(), "unexpected event index") } @@ -56,7 +56,7 @@ func TestPreparerTimeBasedIndex(t *testing.T) { var lastIndex int64 for i := 0; i < 9; i++ { - e, err := preparer.PrepareSessionEvent(generateEvent(t)) + e, err := preparer.PrepareSessionEvent(generateEvent()) require.NoError(t, err) require.Greater(t, e.GetAuditEvent().GetIndex(), lastIndex, "expected a larger index") lastIndex = e.GetAuditEvent().GetIndex() @@ -89,11 +89,11 @@ func TestPreparerTimeBasedIndexCollisions(t *testing.T) { require.NoError(t, err) for i := 0; i < 9; i++ { - evtOne, err := preparerOne.PrepareSessionEvent(generateEvent(t)) + evtOne, err := preparerOne.PrepareSessionEvent(generateEvent()) require.NoError(t, err) idxOne := evtOne.GetAuditEvent().GetIndex() - evtTwo, err := preparerTwo.PrepareSessionEvent(generateEvent(t)) + evtTwo, err := preparerTwo.PrepareSessionEvent(generateEvent()) require.NoError(t, err) idxTwo := evtTwo.GetAuditEvent().GetIndex() @@ -102,9 +102,7 @@ func TestPreparerTimeBasedIndexCollisions(t *testing.T) { } } -func generateEvent(t *testing.T) apievents.AuditEvent { - t.Helper() - +func generateEvent() apievents.AuditEvent { return &apievents.AppSessionChunk{ Metadata: apievents.Metadata{ Type: AppSessionChunkEvent, From 5a2d8ac8cbcaca5c242706f7061491c720ba98d1 Mon Sep 17 00:00:00 2001 From: Gabriel Corado Date: Wed, 21 Feb 2024 14:13:59 -0300 Subject: [PATCH 3/6] chore: typos --- lib/events/dynamoevents/dynamoevents_test.go | 2 +- lib/srv/app/server.go | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/lib/events/dynamoevents/dynamoevents_test.go b/lib/events/dynamoevents/dynamoevents_test.go index 7c6a9276563d8..452c9c97c7a0a 100644 --- a/lib/events/dynamoevents/dynamoevents_test.go +++ b/lib/events/dynamoevents/dynamoevents_test.go @@ -437,7 +437,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { } // TestEmitSessionEventsSameIndex given events that share the same session ID -// and index, the emit should fail, avoiding any event to get overwriteen. +// and index, the emit should fail, avoiding any event to get overwritten. func TestEmitSessionEventsSameIndex(t *testing.T) { ctx := context.Background() tt := setupDynamoContext(t) diff --git a/lib/srv/app/server.go b/lib/srv/app/server.go index 26664e95af993..97c8f6a0b6e6f 100644 --- a/lib/srv/app/server.go +++ b/lib/srv/app/server.go @@ -776,7 +776,7 @@ func (s *Server) handleConnection(conn net.Conn) (func(), error) { } // Add user certificate into the context after the monitor connection - // initializion to avoid it from being lost. + // initialization to ensure value is present on the context. ctx = authz.ContextWithUserCertificate(ctx, certFromConn(tlsConn)) // Application access supports plain TCP connections which are handled @@ -1145,8 +1145,7 @@ func (s *Server) getProxyPort() string { } // sessionStartTime fetches the session start time based on the the certificate -// valid date. If the certificate cannot be retrieve from the context, return -// zero start time and emit a warning log. +// valid date. func (s *Server) sessionStartTime(ctx context.Context) time.Time { var startTime time.Time if userCert, err := authz.UserCertificateFromContext(ctx); err == nil { @@ -1207,7 +1206,7 @@ func newGetConfigForClientFn(log logrus.FieldLogger, client auth.AccessCache, tl } } -// certFromConnState returns the connection certificate from the connection. +// certFromConnState returns certificate from the connection. func certFromConn(tlsConn *tls.Conn) *x509.Certificate { state := tlsConn.ConnectionState() if len(state.PeerCertificates) != 1 { From 600690a3c037216ab335e285e695e6a267789e04 Mon Sep 17 00:00:00 2001 From: Gabriel Corado Date: Fri, 23 Feb 2024 15:05:45 -0300 Subject: [PATCH 4/6] Apply suggestions from code review Co-authored-by: Alan Parra --- lib/events/recorder/recorder.go | 2 +- lib/events/setter.go | 2 +- lib/events/setter_test.go | 4 ++-- lib/srv/app/server.go | 12 +++++------- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/lib/events/recorder/recorder.go b/lib/events/recorder/recorder.go index 3ec510b71fe71..b2bf1be6ecbf9 100644 --- a/lib/events/recorder/recorder.go +++ b/lib/events/recorder/recorder.go @@ -88,7 +88,7 @@ type Config struct { // BackoffDuration is a duration of the backoff before the next try. BackoffDuration time.Duration - // StartTime represents the time the recorder statarted. If not zero, this + // StartTime represents the time the recorder started. If not zero, this // value is used to generate the events index. StartTime time.Time } diff --git a/lib/events/setter.go b/lib/events/setter.go index ab2fd948faf1d..a2e4d60b6ccea 100644 --- a/lib/events/setter.go +++ b/lib/events/setter.go @@ -62,7 +62,7 @@ type PreparerConfig struct { // ClusterName defines the name of this teleport cluster. ClusterName string - // StartTime represents the time the recorder statarted. If not zero, this + // StartTime represents the time the recorder started. If not zero, this // value is used to generate the events index. StartTime time.Time } diff --git a/lib/events/setter_test.go b/lib/events/setter_test.go index 8b8e0f010b4db..b193c896fd544 100644 --- a/lib/events/setter_test.go +++ b/lib/events/setter_test.go @@ -110,7 +110,7 @@ func generateEvent() apievents.AuditEvent { ClusterName: "root", }, ServerMetadata: apievents.ServerMetadata{ - ServerID: uuid.New().String(), + ServerID: uuid.NewString(), ServerNamespace: apidefaults.Namespace, }, AppMetadata: apievents.AppMetadata{ @@ -118,6 +118,6 @@ func generateEvent() apievents.AuditEvent { AppPublicAddr: "https://nginx", AppName: "nginx", }, - SessionChunkID: uuid.New().String(), + SessionChunkID: uuid.NewString(), } } diff --git a/lib/srv/app/server.go b/lib/srv/app/server.go index 97c8f6a0b6e6f..1346403004325 100644 --- a/lib/srv/app/server.go +++ b/lib/srv/app/server.go @@ -1147,14 +1147,12 @@ func (s *Server) getProxyPort() string { // sessionStartTime fetches the session start time based on the the certificate // valid date. func (s *Server) sessionStartTime(ctx context.Context) time.Time { - var startTime time.Time if userCert, err := authz.UserCertificateFromContext(ctx); err == nil { - startTime = userCert.NotBefore - } else { - s.log.Warn("Unable to retrieve session start time from certificate.") + return userCert.NotBefore } - return startTime + s.log.Warn("Unable to retrieve session start time from certificate.") + return time.Time{} } // CopyAndConfigureTLS can be used to copy and modify an existing *tls.Config @@ -1206,10 +1204,10 @@ func newGetConfigForClientFn(log logrus.FieldLogger, client auth.AccessCache, tl } } -// certFromConnState returns certificate from the connection. +// certFromConnState returns the leaf certificate from the connection. func certFromConn(tlsConn *tls.Conn) *x509.Certificate { state := tlsConn.ConnectionState() - if len(state.PeerCertificates) != 1 { + if len(state.PeerCertificates) == 0 { return nil } From 5d1abe6986580362c75b9ad7900a4ceb77113b74 Mon Sep 17 00:00:00 2001 From: Gabriel Corado Date: Mon, 26 Feb 2024 18:58:56 -0300 Subject: [PATCH 5/6] chore: code review changes --- lib/events/setter_test.go | 8 ++++++-- lib/srv/app/server.go | 6 +++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/lib/events/setter_test.go b/lib/events/setter_test.go index b193c896fd544..8ad8a396f927f 100644 --- a/lib/events/setter_test.go +++ b/lib/events/setter_test.go @@ -18,6 +18,7 @@ package events import ( "testing" + "time" "github.com/google/uuid" "github.com/jonboulle/clockwork" @@ -44,7 +45,7 @@ func TestPreparerIncrementalIndex(t *testing.T) { } func TestPreparerTimeBasedIndex(t *testing.T) { - clock := clockwork.NewRealClock() + clock := clockwork.NewFakeClock() preparer, err := NewPreparer(PreparerConfig{ SessionID: session.NewID(), ServerID: uuid.New().String(), @@ -56,6 +57,7 @@ func TestPreparerTimeBasedIndex(t *testing.T) { var lastIndex int64 for i := 0; i < 9; i++ { + clock.Advance(time.Second) e, err := preparer.PrepareSessionEvent(generateEvent()) require.NoError(t, err) require.Greater(t, e.GetAuditEvent().GetIndex(), lastIndex, "expected a larger index") @@ -67,7 +69,7 @@ func TestPreparerTimeBasedIndexCollisions(t *testing.T) { serverID := uuid.New().String() sessionID := session.NewID() clusterName := "root" - clock := clockwork.NewRealClock() + clock := clockwork.NewFakeClock() loginTime := clock.Now() preparerOne, err := NewPreparer(PreparerConfig{ @@ -89,10 +91,12 @@ func TestPreparerTimeBasedIndexCollisions(t *testing.T) { require.NoError(t, err) for i := 0; i < 9; i++ { + clock.Advance(time.Second) evtOne, err := preparerOne.PrepareSessionEvent(generateEvent()) require.NoError(t, err) idxOne := evtOne.GetAuditEvent().GetIndex() + clock.Advance(time.Second) evtTwo, err := preparerTwo.PrepareSessionEvent(generateEvent()) require.NoError(t, err) idxTwo := evtTwo.GetAuditEvent().GetIndex() diff --git a/lib/srv/app/server.go b/lib/srv/app/server.go index 1346403004325..0e546410ab094 100644 --- a/lib/srv/app/server.go +++ b/lib/srv/app/server.go @@ -777,7 +777,7 @@ func (s *Server) handleConnection(conn net.Conn) (func(), error) { // Add user certificate into the context after the monitor connection // initialization to ensure value is present on the context. - ctx = authz.ContextWithUserCertificate(ctx, certFromConn(tlsConn)) + ctx = authz.ContextWithUserCertificate(ctx, leafCertFromConn(tlsConn)) // Application access supports plain TCP connections which are handled // differently than HTTP requests from web apps. @@ -1204,8 +1204,8 @@ func newGetConfigForClientFn(log logrus.FieldLogger, client auth.AccessCache, tl } } -// certFromConnState returns the leaf certificate from the connection. -func certFromConn(tlsConn *tls.Conn) *x509.Certificate { +// leafCertFromConn returns the leaf certificate from the connection. +func leafCertFromConn(tlsConn *tls.Conn) *x509.Certificate { state := tlsConn.ConnectionState() if len(state.PeerCertificates) == 0 { return nil From c7551ebaa07b4ea91cd6e7177ea3fcb3448cf697 Mon Sep 17 00:00:00 2001 From: Gabriel Corado Date: Wed, 28 Feb 2024 23:57:44 -0300 Subject: [PATCH 6/6] test(integration): ignore index field on audit events --- integration/appaccess/appaccess_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/appaccess/appaccess_test.go b/integration/appaccess/appaccess_test.go index a86156280023b..7ab0e685fb267 100644 --- a/integration/appaccess/appaccess_test.go +++ b/integration/appaccess/appaccess_test.go @@ -582,7 +582,7 @@ func testAuditEvents(p *Pack, t *testing.T) { expectedEvent, event, cmpopts.IgnoreTypes(apievents.ServerMetadata{}, apievents.SessionMetadata{}, apievents.UserMetadata{}, apievents.ConnectionMetadata{}), - cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "Time"), + cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "Time", "Index"), cmpopts.IgnoreFields(apievents.AppSessionChunk{}, "SessionChunkID"), )) })