Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration/appaccess/appaccess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ func testAuditEvents(p *Pack, t *testing.T) {
expectedEvent,
event,
cmpopts.IgnoreTypes(apievents.ServerMetadata{}, apievents.SessionMetadata{}, apievents.UserMetadata{}, apievents.ConnectionMetadata{}),
cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "Time"),
cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "Time", "Index"),
cmpopts.IgnoreFields(apievents.AppSessionChunk{}, "SessionChunkID"),
))
})
Expand Down
5 changes: 3 additions & 2 deletions lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,9 @@ func (l *Log) createPutItem(sessionID string, in apievents.AuditEvent) (*dynamod
return nil, trace.Wrap(err)
}
return &dynamodb.PutItemInput{
Item: av,
TableName: aws.String(l.Tablename),
Item: av,
TableName: aws.String(l.Tablename),
ConditionExpression: aws.String("attribute_not_exists(SessionID) AND attribute_not_exists(EventIndex)"),
}, nil
}

Expand Down
38 changes: 38 additions & 0 deletions lib/events/dynamoevents/dynamoevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ import (
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/events/test"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"
)

Expand Down Expand Up @@ -434,6 +436,42 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
}
}

// TestEmitSessionEventsSameIndex given events that share the same session ID
// and index, the emit should fail, avoiding any event to get overwritten.
func TestEmitSessionEventsSameIndex(t *testing.T) {
ctx := context.Background()
tt := setupDynamoContext(t)
sessionID := session.NewID()

require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 0)))
require.NoError(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1)))
require.Error(t, tt.log.EmitAuditEvent(ctx, generateEvent(sessionID, 1)))
}

func generateEvent(sessionID session.ID, index int64) apievents.AuditEvent {
return &apievents.AppSessionChunk{
Metadata: apievents.Metadata{
Type: events.AppSessionChunkEvent,
Code: events.AppSessionChunkCode,
ClusterName: "root",
Index: index,
},
ServerMetadata: apievents.ServerMetadata{
ServerID: uuid.New().String(),
ServerNamespace: apidefaults.Namespace,
},
SessionMetadata: apievents.SessionMetadata{
SessionID: sessionID.String(),
},
AppMetadata: apievents.AppMetadata{
AppURI: "nginx",
AppPublicAddr: "https://nginx",
AppName: "nginx",
},
SessionChunkID: uuid.New().String(),
}
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randStringAlpha(n int) string {
Expand Down
5 changes: 5 additions & 0 deletions lib/events/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ type Config struct {

// BackoffDuration is a duration of the backoff before the next try.
BackoffDuration time.Duration

// StartTime represents the time the recorder started. If not zero, this
// value is used to generate the events index.
StartTime time.Time
}

// New returns a [events.SessionPreparerRecorder]. If session recording is disabled,
Expand All @@ -112,6 +116,7 @@ func New(cfg Config) (events.SessionPreparerRecorder, error) {
Clock: cfg.Clock,
UID: cfg.UID,
ClusterName: cfg.ClusterName,
StartTime: cfg.StartTime,
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down
16 changes: 14 additions & 2 deletions lib/events/setter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -60,6 +61,10 @@ type PreparerConfig struct {

// ClusterName defines the name of this teleport cluster.
ClusterName string

// StartTime represents the time the recorder started. If not zero, this
// value is used to generate the events index.
StartTime time.Time
}

// CheckAndSetDefaults checks and sets defaults
Expand Down Expand Up @@ -110,8 +115,7 @@ func (c *Preparer) PrepareSessionEvent(event apievents.AuditEvent) (apievents.Pr
srv.SetServerID(c.cfg.ServerID)
}

// ensure index is incremented and loaded atomically
event.SetIndex(int64(c.eventIndex.Add(1) - 1))
event.SetIndex(c.nextIndex())

preparedEvent := preparedSessionEvent{
event: event,
Expand All @@ -135,6 +139,14 @@ func (c *Preparer) PrepareSessionEvent(event apievents.AuditEvent) (apievents.Pr
return preparedEvent, nil
}

func (c *Preparer) nextIndex() int64 {
if !c.cfg.StartTime.IsZero() {
return c.cfg.Clock.Since(c.cfg.StartTime).Nanoseconds()
}

return int64(c.eventIndex.Add(1) - 1)
}

type preparedSessionEvent struct {
event apievents.AuditEvent
}
Expand Down
127 changes: 127 additions & 0 deletions lib/events/setter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package events

import (
"testing"
"time"

"github.com/google/uuid"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"

apidefaults "github.com/gravitational/teleport/api/defaults"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/session"
)

func TestPreparerIncrementalIndex(t *testing.T) {
sessionID := session.NewID()
preparer, err := NewPreparer(PreparerConfig{
SessionID: sessionID,
ClusterName: "root",
})
require.NoError(t, err)

for i := 0; i < 10; i++ {
e, err := preparer.PrepareSessionEvent(generateEvent())
require.NoError(t, err)
require.Equal(t, int64(i), e.GetAuditEvent().GetIndex(), "unexpected event index")
}
}

func TestPreparerTimeBasedIndex(t *testing.T) {
clock := clockwork.NewFakeClock()
preparer, err := NewPreparer(PreparerConfig{
SessionID: session.NewID(),
ServerID: uuid.New().String(),
ClusterName: "root",
Clock: clock,
StartTime: clock.Now(),
})
require.NoError(t, err)

var lastIndex int64
for i := 0; i < 9; i++ {
clock.Advance(time.Second)
e, err := preparer.PrepareSessionEvent(generateEvent())
require.NoError(t, err)
require.Greater(t, e.GetAuditEvent().GetIndex(), lastIndex, "expected a larger index")
lastIndex = e.GetAuditEvent().GetIndex()
}
}

func TestPreparerTimeBasedIndexCollisions(t *testing.T) {
serverID := uuid.New().String()
sessionID := session.NewID()
clusterName := "root"
clock := clockwork.NewFakeClock()
loginTime := clock.Now()

preparerOne, err := NewPreparer(PreparerConfig{
SessionID: sessionID,
ServerID: serverID,
ClusterName: clusterName,
Clock: clock,
StartTime: loginTime,
})
require.NoError(t, err)

preparerTwo, err := NewPreparer(PreparerConfig{
SessionID: sessionID,
ServerID: serverID,
ClusterName: clusterName,
Clock: clock,
StartTime: loginTime,
})
require.NoError(t, err)

for i := 0; i < 9; i++ {
clock.Advance(time.Second)
evtOne, err := preparerOne.PrepareSessionEvent(generateEvent())
require.NoError(t, err)
idxOne := evtOne.GetAuditEvent().GetIndex()

clock.Advance(time.Second)
evtTwo, err := preparerTwo.PrepareSessionEvent(generateEvent())
require.NoError(t, err)
idxTwo := evtTwo.GetAuditEvent().GetIndex()

require.NotEqual(t, idxOne, idxTwo)
require.Greater(t, idxTwo, idxOne)
}
}

func generateEvent() apievents.AuditEvent {
return &apievents.AppSessionChunk{
Metadata: apievents.Metadata{
Type: AppSessionChunkEvent,
Code: AppSessionChunkCode,
ClusterName: "root",
},
ServerMetadata: apievents.ServerMetadata{
ServerID: uuid.NewString(),
ServerNamespace: apidefaults.Namespace,
},
AppMetadata: apievents.AppMetadata{
AppURI: "nginx",
AppPublicAddr: "https://nginx",
AppName: "nginx",
},
SessionChunkID: uuid.NewString(),
}
}
32 changes: 29 additions & 3 deletions lib/srv/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package app
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"net"
"net/http"
Expand Down Expand Up @@ -774,6 +775,10 @@ func (s *Server) handleConnection(conn net.Conn) (func(), error) {
}
}

// Add user certificate into the context after the monitor connection
// initialization to ensure value is present on the context.
ctx = authz.ContextWithUserCertificate(ctx, leafCertFromConn(tlsConn))

// Application access supports plain TCP connections which are handled
// differently than HTTP requests from web apps.
if app.IsTCP() {
Expand Down Expand Up @@ -912,7 +917,7 @@ func (s *Server) serveSession(w http.ResponseWriter, r *http.Request, identity *
// minutes. Used to stream session chunks to the Audit Log.
ttl := min(identity.Expires.Sub(s.c.Clock.Now()), 5*time.Minute)
session, err := utils.FnCacheGetWithTTL(r.Context(), s.cache, identity.RouteToApp.SessionID, ttl, func(ctx context.Context) (*sessionChunk, error) {
session, err := s.newSessionChunk(ctx, identity, app, opts...)
session, err := s.newSessionChunk(ctx, identity, app, s.sessionStartTime(r.Context()), opts...)
return session, trace.Wrap(err)
})
if err != nil {
Expand Down Expand Up @@ -1101,10 +1106,10 @@ func (s *Server) newHTTPServer(clusterName string) *http.Server {
// newTCPServer creates a server that proxies TCP applications.
func (s *Server) newTCPServer() (*tcpServer, error) {
return &tcpServer{
newAudit: func(sessionID string) (common.Audit, error) {
newAudit: func(ctx context.Context, sessionID string) (common.Audit, error) {
// Audit stream is using server context, not session context,
// to make sure that session is uploaded even after it is closed.
rec, err := s.newSessionRecorder(s.closeContext, sessionID)
rec, err := s.newSessionRecorder(s.closeContext, s.sessionStartTime(ctx), sessionID)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -1139,6 +1144,17 @@ func (s *Server) getProxyPort() string {
return port
}

// sessionStartTime fetches the session start time based on the the certificate
// valid date.
func (s *Server) sessionStartTime(ctx context.Context) time.Time {
if userCert, err := authz.UserCertificateFromContext(ctx); err == nil {
return userCert.NotBefore
}

s.log.Warn("Unable to retrieve session start time from certificate.")
return time.Time{}
}

// CopyAndConfigureTLS can be used to copy and modify an existing *tls.Config
// for Teleport application proxy servers.
func CopyAndConfigureTLS(log logrus.FieldLogger, client auth.AccessCache, config *tls.Config) *tls.Config {
Expand Down Expand Up @@ -1187,3 +1203,13 @@ func newGetConfigForClientFn(log logrus.FieldLogger, client auth.AccessCache, tl
return tlsCopy, nil
}
}

// leafCertFromConn returns the leaf certificate from the connection.
func leafCertFromConn(tlsConn *tls.Conn) *x509.Certificate {
state := tlsConn.ConnectionState()
if len(state.PeerCertificates) == 0 {
return nil
}

return state.PeerCertificates[0]
}
8 changes: 3 additions & 5 deletions lib/srv/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,6 @@ func TestRequestAuditEvents(t *testing.T) {
Type: events.AppSessionChunkEvent,
Code: events.AppSessionChunkCode,
ClusterName: "root.example.com",
Index: 0,
},
AppMetadata: apievents.AppMetadata{
AppURI: app.Spec.URI,
Expand All @@ -1023,7 +1022,7 @@ func TestRequestAuditEvents(t *testing.T) {
expectedEvent,
event,
cmpopts.IgnoreTypes(apievents.ServerMetadata{}, apievents.SessionMetadata{}, apievents.UserMetadata{}, apievents.ConnectionMetadata{}),
cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time"),
cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time", "Index"),
cmpopts.IgnoreFields(apievents.AppSessionChunk{}, "SessionChunkID"),
))
case events.AppSessionRequestEvent:
Expand All @@ -1033,7 +1032,6 @@ func TestRequestAuditEvents(t *testing.T) {
Type: events.AppSessionRequestEvent,
Code: events.AppSessionRequestCode,
ClusterName: "root.example.com",
Index: 1,
},
AppMetadata: apievents.AppMetadata{
AppURI: app.Spec.URI,
Expand All @@ -1048,7 +1046,7 @@ func TestRequestAuditEvents(t *testing.T) {
expectedEvent,
event,
cmpopts.IgnoreTypes(apievents.ServerMetadata{}, apievents.SessionMetadata{}, apievents.UserMetadata{}, apievents.ConnectionMetadata{}),
cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time"),
cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time", "Index"),
cmpopts.IgnoreFields(apievents.AppSessionChunk{}, "SessionChunkID"),
))
}
Expand Down Expand Up @@ -1101,7 +1099,7 @@ func TestRequestAuditEvents(t *testing.T) {
expectedEvent,
searchEvents[0],
cmpopts.IgnoreTypes(apievents.ServerMetadata{}, apievents.SessionMetadata{}, apievents.UserMetadata{}, apievents.ConnectionMetadata{}),
cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time"),
cmpopts.IgnoreFields(apievents.Metadata{}, "ID", "ClusterName", "Time", "Index"),
cmpopts.IgnoreFields(apievents.AppSessionChunk{}, "SessionChunkID"),
))
}
Expand Down
Loading