Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions integrations/event-handler/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ You may specify configuration options via command line arguments, environment va
| teleport-refresh-interval | How often to load the identity file from disk when teleport-refresh-enabled is specified. Default: 1m | FDFWD_TELEPORT_REFRESH_INTERVAL |
| fluentd-url | Fluentd URL | FDFWD_FLUENTD_URL |
| fluentd-session-url | Fluentd session URL | FDFWD_FLUENTD_SESSION_URL |
| fluentd-ca | fluentd TLS CA file | FDFWD_FLUENTD_CA |
| fluentd-ca | Fluentd TLS CA file | FDFWD_FLUENTD_CA |
| fluentd-cert | Fluentd TLS certificate file | FDFWD_FLUENTD_CERT |
| fluentd-key | Fluentd TLS key file | FDFWD_FLUENTD_KEY |
| storage | Storage directory | FDFWD_STORAGE |
Expand All @@ -38,7 +38,6 @@ You may specify configuration options via command line arguments, environment va
| skip-session-types | Comma-separated list of session event types to skip | FDFWD_SKIP_SESSION_TYPES |
| start-time | Minimum event time (RFC3339 format) | FDFWD_START_TIME |
| timeout | Polling timeout | FDFWD_TIMEOUT |
| cursor | Start cursor value | FDFWD_CURSOR |
| debug | Debug logging | FDFWD_DEBUG |

TOML configuration keys are the same as CLI args. Teleport and Fluentd variables can be grouped into sections. See [example TOML](example/config.toml). You can specify TOML file location using `--config` CLI flag.
Expand Down
8 changes: 6 additions & 2 deletions integrations/event-handler/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,11 @@ type IngestConfig struct {
// BatchSize is a fetch batch size
BatchSize int `help:"Fetch batch size" default:"20" env:"FDFWD_BATCH" name:"batch"`

// Types are event types to log
Types []string `help:"Comma-separated list of event types to forward" env:"FDFWD_TYPES"`
// TypesRaw are event types to log
TypesRaw []string `name:"types" help:"Comma-separated list of event types to forward" env:"FDFWD_TYPES"`

// Types is a map generated from TypesRaw
Types map[string]struct{} `kong:"-"`

// SkipEventTypesRaw are event types to skip
SkipEventTypesRaw []string `name:"skip-event-types" help:"Comma-separated list of audit log event types to skip" env:"FDFWD_SKIP_EVENT_TYPES"`
Expand Down Expand Up @@ -239,6 +242,7 @@ func (c *StartCmdConfig) Validate() error {
if err := c.TeleportConfig.Check(); err != nil {
return trace.Wrap(err)
}
c.Types = lib.SliceToAnonymousMap(c.TypesRaw)
c.SkipSessionTypes = lib.SliceToAnonymousMap(c.SkipSessionTypesRaw)
c.SkipEventTypes = lib.SliceToAnonymousMap(c.SkipEventTypesRaw)

Expand Down
3 changes: 3 additions & 0 deletions integrations/event-handler/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func TestStartCmdConfig(t *testing.T) {
IngestConfig: IngestConfig{
StorageDir: "./storage",
BatchSize: 20,
Types: map[string]struct{}{},
SkipEventTypes: map[string]struct{}{},
SkipSessionTypesRaw: []string{"print", "desktop.recording"},
SkipSessionTypes: map[string]struct{}{
Expand Down Expand Up @@ -101,6 +102,7 @@ func TestStartCmdConfig(t *testing.T) {
IngestConfig: IngestConfig{
StorageDir: "./storage",
BatchSize: 20,
Types: map[string]struct{}{},
SkipEventTypes: map[string]struct{}{},
SkipSessionTypesRaw: []string{"print", "desktop.recording"},
SkipSessionTypes: map[string]struct{}{
Expand Down Expand Up @@ -141,6 +143,7 @@ func TestStartCmdConfig(t *testing.T) {
IngestConfig: IngestConfig{
StorageDir: "./storage",
BatchSize: 20,
Types: map[string]struct{}{},
SkipEventTypes: map[string]struct{}{},
SkipSessionTypesRaw: []string{"print", "desktop.recording"},
SkipSessionTypes: map[string]struct{}{
Expand Down
7 changes: 6 additions & 1 deletion integrations/event-handler/events_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (j *EventsJob) runPolling(ctx context.Context) error {
if err := stream.Drain(chunks); err != nil {
if trace.IsNotImplemented(err) {
// fallback to legacy behavior
j.app.log.DebugContext(ctx, "Bulk event export API is not implemented on this server, reverting to legacy watcher")
return j.runLegacyPolling(ctx)
}
return trace.Wrap(err)
Expand Down Expand Up @@ -264,10 +265,13 @@ func (j *EventsJob) runLegacyPolling(ctx context.Context) error {

// handleEventV2 processes an event from the newer export API.
func (j *EventsJob) handleEventV2(ctx context.Context, evt *auditlogpb.ExportEventUnstructured) error {

// filter out unwanted event types (in the v1 event export logic this was an internal behavior
// of the event processing helper since it would perform conversion prior to storing the event
// in its internal buffer).
if _, ok := j.app.Config.Types[evt.Event.Type]; len(j.app.Config.Types) > 0 && !ok {
return nil
}

if _, ok := j.app.Config.SkipEventTypes[evt.Event.Type]; ok {
return nil
}
Expand All @@ -292,6 +296,7 @@ func (j *EventsJob) handleEvent(ctx context.Context, evt *TeleportEvent) error {

// Start session ingestion if needed
if evt.IsSessionEnd {
j.app.log.DebugContext(ctx, "Registering session", "sid", evt.SessionID)
j.app.RegisterSession(ctx, evt)
}

Expand Down
120 changes: 120 additions & 0 deletions integrations/event-handler/events_job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Teleport
// Copyright (C) 2025 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package main

import (
"bytes"
"log/slog"
"regexp"
"testing"

"github.com/peterbourgon/diskv/v3"
"github.com/stretchr/testify/require"

auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
)

func TestEventHandlerFilters(t *testing.T) {
tests := []struct {
name string
ingestConfig IngestConfig
events []apievents.AuditEvent
}{
{
name: "types filter out role.created",
ingestConfig: IngestConfig{
Types: map[string]struct{}{"join_token.create": {}},
SkipSessionTypes: map[string]struct{}{"print": {}, "desktop.recording": {}},
DryRun: true,
},
events: []apievents.AuditEvent{
&apievents.RoleCreate{
Metadata: apievents.Metadata{
Type: events.RoleCreatedEvent,
Code: events.RoleCreatedCode,
},
},
&apievents.ProvisionTokenCreate{
Metadata: apievents.Metadata{
Type: events.ProvisionTokenCreateEvent,
Code: events.ProvisionTokenCreateCode,
},
},
},
},
{
name: "skip-event-types filter out role.created",
ingestConfig: IngestConfig{
SkipEventTypes: map[string]struct{}{"role.created": {}},
SkipSessionTypes: map[string]struct{}{"print": {}, "desktop.recording": {}},
DryRun: true,
},
events: []apievents.AuditEvent{
&apievents.RoleCreate{
Metadata: apievents.Metadata{
Type: events.RoleCreatedEvent,
Code: events.RoleCreatedCode,
},
},
&apievents.ProvisionTokenCreate{
Metadata: apievents.Metadata{
Type: events.ProvisionTokenCreateEvent,
Code: events.ProvisionTokenCreateCode,
},
},
},
},
}

skipEvent := regexp.MustCompile("\"Event sent\".*type=role.created")
checkEvent := regexp.MustCompile("\"Event sent\".*type=join_token.create")

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out := &bytes.Buffer{}
log := slog.New(slog.NewTextHandler(out, &slog.HandlerOptions{Level: slog.LevelDebug}))

job := NewEventsJob(&App{
Config: &StartCmdConfig{
IngestConfig: tt.ingestConfig},
State: &State{
dv: diskv.New(diskv.Options{
BasePath: t.TempDir(),
}),
},
client: &mockClient{},
log: log,
})

generateEvents, err := eventsToProto(tt.events)
require.NoError(t, err)

for _, event := range generateEvents {
exportEvent := &auditlogpb.ExportEventUnstructured{Event: event}

err := job.handleEventV2(t.Context(), exportEvent)
require.NoError(t, err)

}

require.NotRegexp(t, skipEvent, out.String())
require.Regexp(t, checkEvent, out.String())
})
}
}
2 changes: 1 addition & 1 deletion integrations/event-handler/legacy_events_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (t *LegacyEventsWatcher) getEventsInWindow(ctx context.Context, from, to ti
from,
to,
"default",
t.config.Types,
t.config.TypesRaw,
t.config.BatchSize,
types.EventOrderAscending,
t.cursor,
Expand Down
16 changes: 8 additions & 8 deletions integrations/event-handler/teleport_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@ func NewTeleportEvent(e *auditlogpb.EventUnstructured) (*TeleportEvent, error) {
return nil, trace.Wrap(err)
}
evt := &TeleportEvent{
Event: payload,
ID: e.Id,
Type: e.GetType(),
Time: e.Time.AsTime(),
Index: e.GetIndex(),
ID: e.Id,
Event: payload,
}

switch e.GetType() {
switch evt.Type {
case sessionEndType:
err = evt.setSessionID(e)
err = evt.setSessionID()
case loginType:
err = evt.setLoginData(e)
err = evt.setLoginData()
}
if err != nil {
return nil, trace.Wrap(err)
Expand All @@ -111,7 +111,7 @@ func NewLegacyTeleportEvent(e *auditlogpb.EventUnstructured, cursor string, wind
}

// setSessionID sets session id for session end event
func (e *TeleportEvent) setSessionID(evt *auditlogpb.EventUnstructured) error {
func (e *TeleportEvent) setSessionID() error {
sessionUploadEvt := &events.SessionUpload{}
if err := json.Unmarshal(e.Event, sessionUploadEvt); err != nil {
return trace.Wrap(err)
Expand All @@ -124,8 +124,8 @@ func (e *TeleportEvent) setSessionID(evt *auditlogpb.EventUnstructured) error {
return nil
}

// setLoginValues sets values related to login event
func (e *TeleportEvent) setLoginData(evt *auditlogpb.EventUnstructured) error {
// setLoginData sets values related to login event
func (e *TeleportEvent) setLoginData() error {
loginEvent := &events.UserLogin{}
if err := json.Unmarshal(e.Event, loginEvent); err != nil {
return trace.Wrap(err)
Expand Down
2 changes: 1 addition & 1 deletion lib/events/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ const (
)

// SessionRecordingEvents is a list of events that are related to session
// recorings.
// recordings.
var SessionRecordingEvents = []string{
SessionEndEvent,
WindowsDesktopSessionEndEvent,
Expand Down
Loading