diff --git a/integrations/event-handler/README.md b/integrations/event-handler/README.md index d1ef8d4ad4c7b..2baf9bc3ec8ec 100644 --- a/integrations/event-handler/README.md +++ b/integrations/event-handler/README.md @@ -28,7 +28,7 @@ You may specify configuration options via command line arguments, environment va | teleport-refresh-interval | How often to load the identity file from disk when teleport-refresh-enabled is specified. Default: 1m | FDFWD_TELEPORT_REFRESH_INTERVAL | | fluentd-url | Fluentd URL | FDFWD_FLUENTD_URL | | fluentd-session-url | Fluentd session URL | FDFWD_FLUENTD_SESSION_URL | -| fluentd-ca | fluentd TLS CA file | FDFWD_FLUENTD_CA | +| fluentd-ca | Fluentd TLS CA file | FDFWD_FLUENTD_CA | | fluentd-cert | Fluentd TLS certificate file | FDFWD_FLUENTD_CERT | | fluentd-key | Fluentd TLS key file | FDFWD_FLUENTD_KEY | | storage | Storage directory | FDFWD_STORAGE | @@ -38,7 +38,6 @@ You may specify configuration options via command line arguments, environment va | skip-session-types | Comma-separated list of session event types to skip | FDFWD_SKIP_SESSION_TYPES | | start-time | Minimum event time (RFC3339 format) | FDFWD_START_TIME | | timeout | Polling timeout | FDFWD_TIMEOUT | -| cursor | Start cursor value | FDFWD_CURSOR | | debug | Debug logging | FDFWD_DEBUG | TOML configuration keys are the same as CLI args. Teleport and Fluentd variables can be grouped into sections. See [example TOML](example/config.toml). You can specify TOML file location using `--config` CLI flag. diff --git a/integrations/event-handler/cli.go b/integrations/event-handler/cli.go index 76d629896fda3..633b338a78430 100644 --- a/integrations/event-handler/cli.go +++ b/integrations/event-handler/cli.go @@ -125,8 +125,11 @@ type IngestConfig struct { // BatchSize is a fetch batch size BatchSize int `help:"Fetch batch size" default:"20" env:"FDFWD_BATCH" name:"batch"` - // Types are event types to log - Types []string `help:"Comma-separated list of event types to forward" env:"FDFWD_TYPES"` + // TypesRaw are event types to log + TypesRaw []string `name:"types" help:"Comma-separated list of event types to forward" env:"FDFWD_TYPES"` + + // Types is a map generated from TypesRaw + Types map[string]struct{} `kong:"-"` // SkipEventTypesRaw are event types to skip SkipEventTypesRaw []string `name:"skip-event-types" help:"Comma-separated list of audit log event types to skip" env:"FDFWD_SKIP_EVENT_TYPES"` @@ -239,6 +242,7 @@ func (c *StartCmdConfig) Validate() error { if err := c.TeleportConfig.Check(); err != nil { return trace.Wrap(err) } + c.Types = lib.SliceToAnonymousMap(c.TypesRaw) c.SkipSessionTypes = lib.SliceToAnonymousMap(c.SkipSessionTypesRaw) c.SkipEventTypes = lib.SliceToAnonymousMap(c.SkipEventTypesRaw) diff --git a/integrations/event-handler/cli_test.go b/integrations/event-handler/cli_test.go index 168d72245b737..a7e909d439959 100644 --- a/integrations/event-handler/cli_test.go +++ b/integrations/event-handler/cli_test.go @@ -61,6 +61,7 @@ func TestStartCmdConfig(t *testing.T) { IngestConfig: IngestConfig{ StorageDir: "./storage", BatchSize: 20, + Types: map[string]struct{}{}, SkipEventTypes: map[string]struct{}{}, SkipSessionTypesRaw: []string{"print", "desktop.recording"}, SkipSessionTypes: map[string]struct{}{ @@ -101,6 +102,7 @@ func TestStartCmdConfig(t *testing.T) { IngestConfig: IngestConfig{ StorageDir: "./storage", BatchSize: 20, + Types: map[string]struct{}{}, SkipEventTypes: map[string]struct{}{}, SkipSessionTypesRaw: []string{"print", "desktop.recording"}, SkipSessionTypes: map[string]struct{}{ @@ -141,6 +143,7 @@ func TestStartCmdConfig(t *testing.T) { IngestConfig: IngestConfig{ StorageDir: "./storage", BatchSize: 20, + Types: map[string]struct{}{}, SkipEventTypes: map[string]struct{}{}, SkipSessionTypesRaw: []string{"print", "desktop.recording"}, SkipSessionTypes: map[string]struct{}{ diff --git a/integrations/event-handler/events_job.go b/integrations/event-handler/events_job.go index 4a26c3cc4e531..8c0ca4000e687 100644 --- a/integrations/event-handler/events_job.go +++ b/integrations/event-handler/events_job.go @@ -122,6 +122,7 @@ func (j *EventsJob) runPolling(ctx context.Context) error { if err := stream.Drain(chunks); err != nil { if trace.IsNotImplemented(err) { // fallback to legacy behavior + j.app.log.DebugContext(ctx, "Bulk event export API is not implemented on this server, reverting to legacy watcher") return j.runLegacyPolling(ctx) } return trace.Wrap(err) @@ -264,10 +265,13 @@ func (j *EventsJob) runLegacyPolling(ctx context.Context) error { // handleEventV2 processes an event from the newer export API. func (j *EventsJob) handleEventV2(ctx context.Context, evt *auditlogpb.ExportEventUnstructured) error { - // filter out unwanted event types (in the v1 event export logic this was an internal behavior // of the event processing helper since it would perform conversion prior to storing the event // in its internal buffer). + if _, ok := j.app.Config.Types[evt.Event.Type]; len(j.app.Config.Types) > 0 && !ok { + return nil + } + if _, ok := j.app.Config.SkipEventTypes[evt.Event.Type]; ok { return nil } @@ -292,6 +296,7 @@ func (j *EventsJob) handleEvent(ctx context.Context, evt *TeleportEvent) error { // Start session ingestion if needed if evt.IsSessionEnd { + j.app.log.DebugContext(ctx, "Registering session", "sid", evt.SessionID) j.app.RegisterSession(ctx, evt) } diff --git a/integrations/event-handler/events_job_test.go b/integrations/event-handler/events_job_test.go new file mode 100644 index 0000000000000..d196db50bb552 --- /dev/null +++ b/integrations/event-handler/events_job_test.go @@ -0,0 +1,120 @@ +// Teleport +// Copyright (C) 2025 Gravitational, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "bytes" + "log/slog" + "regexp" + "testing" + + "github.com/peterbourgon/diskv/v3" + "github.com/stretchr/testify/require" + + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + apievents "github.com/gravitational/teleport/api/types/events" + "github.com/gravitational/teleport/lib/events" +) + +func TestEventHandlerFilters(t *testing.T) { + tests := []struct { + name string + ingestConfig IngestConfig + events []apievents.AuditEvent + }{ + { + name: "types filter out role.created", + ingestConfig: IngestConfig{ + Types: map[string]struct{}{"join_token.create": {}}, + SkipSessionTypes: map[string]struct{}{"print": {}, "desktop.recording": {}}, + DryRun: true, + }, + events: []apievents.AuditEvent{ + &apievents.RoleCreate{ + Metadata: apievents.Metadata{ + Type: events.RoleCreatedEvent, + Code: events.RoleCreatedCode, + }, + }, + &apievents.ProvisionTokenCreate{ + Metadata: apievents.Metadata{ + Type: events.ProvisionTokenCreateEvent, + Code: events.ProvisionTokenCreateCode, + }, + }, + }, + }, + { + name: "skip-event-types filter out role.created", + ingestConfig: IngestConfig{ + SkipEventTypes: map[string]struct{}{"role.created": {}}, + SkipSessionTypes: map[string]struct{}{"print": {}, "desktop.recording": {}}, + DryRun: true, + }, + events: []apievents.AuditEvent{ + &apievents.RoleCreate{ + Metadata: apievents.Metadata{ + Type: events.RoleCreatedEvent, + Code: events.RoleCreatedCode, + }, + }, + &apievents.ProvisionTokenCreate{ + Metadata: apievents.Metadata{ + Type: events.ProvisionTokenCreateEvent, + Code: events.ProvisionTokenCreateCode, + }, + }, + }, + }, + } + + skipEvent := regexp.MustCompile("\"Event sent\".*type=role.created") + checkEvent := regexp.MustCompile("\"Event sent\".*type=join_token.create") + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + out := &bytes.Buffer{} + log := slog.New(slog.NewTextHandler(out, &slog.HandlerOptions{Level: slog.LevelDebug})) + + job := NewEventsJob(&App{ + Config: &StartCmdConfig{ + IngestConfig: tt.ingestConfig}, + State: &State{ + dv: diskv.New(diskv.Options{ + BasePath: t.TempDir(), + }), + }, + client: &mockClient{}, + log: log, + }) + + generateEvents, err := eventsToProto(tt.events) + require.NoError(t, err) + + for _, event := range generateEvents { + exportEvent := &auditlogpb.ExportEventUnstructured{Event: event} + + err := job.handleEventV2(t.Context(), exportEvent) + require.NoError(t, err) + + } + + require.NotRegexp(t, skipEvent, out.String()) + require.Regexp(t, checkEvent, out.String()) + }) + } +} diff --git a/integrations/event-handler/legacy_events_watcher.go b/integrations/event-handler/legacy_events_watcher.go index 0bc48f13f43e6..98182fb66155e 100644 --- a/integrations/event-handler/legacy_events_watcher.go +++ b/integrations/event-handler/legacy_events_watcher.go @@ -278,7 +278,7 @@ func (t *LegacyEventsWatcher) getEventsInWindow(ctx context.Context, from, to ti from, to, "default", - t.config.Types, + t.config.TypesRaw, t.config.BatchSize, types.EventOrderAscending, t.cursor, diff --git a/integrations/event-handler/teleport_event.go b/integrations/event-handler/teleport_event.go index bb094eb0fb716..6c613c3097753 100644 --- a/integrations/event-handler/teleport_event.go +++ b/integrations/event-handler/teleport_event.go @@ -77,18 +77,18 @@ func NewTeleportEvent(e *auditlogpb.EventUnstructured) (*TeleportEvent, error) { return nil, trace.Wrap(err) } evt := &TeleportEvent{ + Event: payload, + ID: e.Id, Type: e.GetType(), Time: e.Time.AsTime(), Index: e.GetIndex(), - ID: e.Id, - Event: payload, } - switch e.GetType() { + switch evt.Type { case sessionEndType: - err = evt.setSessionID(e) + err = evt.setSessionID() case loginType: - err = evt.setLoginData(e) + err = evt.setLoginData() } if err != nil { return nil, trace.Wrap(err) @@ -111,7 +111,7 @@ func NewLegacyTeleportEvent(e *auditlogpb.EventUnstructured, cursor string, wind } // setSessionID sets session id for session end event -func (e *TeleportEvent) setSessionID(evt *auditlogpb.EventUnstructured) error { +func (e *TeleportEvent) setSessionID() error { sessionUploadEvt := &events.SessionUpload{} if err := json.Unmarshal(e.Event, sessionUploadEvt); err != nil { return trace.Wrap(err) @@ -124,8 +124,8 @@ func (e *TeleportEvent) setSessionID(evt *auditlogpb.EventUnstructured) error { return nil } -// setLoginValues sets values related to login event -func (e *TeleportEvent) setLoginData(evt *auditlogpb.EventUnstructured) error { +// setLoginData sets values related to login event +func (e *TeleportEvent) setLoginData() error { loginEvent := &events.UserLogin{} if err := json.Unmarshal(e.Event, loginEvent); err != nil { return trace.Wrap(err) diff --git a/lib/events/api.go b/lib/events/api.go index 3c36a4e4dca7b..da3f3f87bdd55 100644 --- a/lib/events/api.go +++ b/lib/events/api.go @@ -984,7 +984,7 @@ const ( ) // SessionRecordingEvents is a list of events that are related to session -// recorings. +// recordings. var SessionRecordingEvents = []string{ SessionEndEvent, WindowsDesktopSessionEndEvent,