Determine the session type based on the events on session events streaming#49361
Determine the session type based on the events on session events streaming#49361gabrielcorado wants to merge 3 commits intomasterfrom
Conversation
| var sessionKind types.SessionKind | ||
| select { | ||
| case <-ctx.Done(): | ||
| case <-firstErrCh: |
There was a problem hiding this comment.
Not sure why we need to peek if the error handling is ignored
The comments says
Also peek the error channel to avoid blocking
but the implementation will resent the error to errCh so I'm but consufed how this help to avoid blocking.
There was a problem hiding this comment.
If the StreamSessionEvents call makes an error, we never receive an event on the regular events channel (only one error on the error channel). If we don't peek at the error channel, we will only be unblocked when the context is finished.
As for peeking, we only don't forward the error channel if the emit audit event fails. If we peek, the caller will only be sent a message on the events channel that will never be sent instead of a channel receiving the error.
To clarify, we ignored the error because the audit log does not specify whether the session recording access was successful.
Flow:
sequenceDiagram
participant C as Caller
participant S as Server
participant A as Audit log
C->>S: StreamSessionEvents
S->>A: StreamSessionEvents
A->>S: (chan AuditEvent, chan error)
alt Receives event (on channel)
S->>S: Determine session type
else Receives error (on channel)
S->>S: Does nothing
else Context done
S->>S: Does nothing
end
S->>S: EmitAuditEvent
alt Emit fails
S->>C: return new error channel<br/>and empty events channel
else
S->>C: peeked channels
end
| DatabaseSessionKind SessionKind = "db" | ||
| AppSessionKind SessionKind = "app" | ||
| WindowsDesktopSessionKind SessionKind = "desktop" | ||
| UnknownSessionKind SessionKind = "" |
There was a problem hiding this comment.
Could you add a comment explaining the purpose of UnknownSessionKind and describing how it might occur?
| return c, e | ||
| } | ||
|
|
||
| eventsCh, errCh := a.alog.StreamSessionEvents(ctx, sessionID, startIndex) |
There was a problem hiding this comment.
we will miss the first event when startIndex is non-zero. How complex would it be if we had to change the StreamSessionEvents interface?
And curious how often/when the startIndex is non-zero
There was a problem hiding this comment.
The events won't be lost. The only difference is that we won't be able to set the session type (because the event type differs from the ones we're checking). In this case, the value would be unknown (empty string).
|
This peek implementation looks a little too complicated. I haven't looked in detail yet, but can we do something like this? |
|
@zmb3 We need to peek at the first event and return a channel containing it so that it does not affect the callers. We have thought about some alternatives to this, but those require updating the interface (which is defined in different places and has mocks so that the changes would be more significant):
Do you think those alternatives will work better? |
|
@gabrielcorado how about this diff? (I haven't removed the emitting from the current place yet..) If we move where we emit the recording access event from the auth grpc server to the actual audit log implementation we should be able to do this without changing any interface anywhere. The other nice property of this implementation is it works even if you specify a start event > 0. diff --git a/lib/events/auditlog.go b/lib/events/auditlog.go
index 51180746cb..0c6c1dab60 100644
--- a/lib/events/auditlog.go
+++ b/lib/events/auditlog.go
@@ -39,6 +39,7 @@ import (
"github.com/gravitational/teleport/api/internalutils/stream"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/defaults"
+ "github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/observability/metrics"
"github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"
@@ -545,6 +546,22 @@ func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID
"session_id", string(sessionID),
)
+ recordingFormat := make(chan string, 1)
+ go func() {
+ var format string
+ for {
+ select {
+ case f := <-recordingFormat:
+ format = f
+ case <-l.Clock.After(5 * time.Second):
+ l.EmitAuditEvent(ctx, apievents.SessionRecordingAccess{
+ // TODO: other fields
+ Format: format,
+ })
+ return
+ }
+ }
+ }()
+
go func() {
defer rawSession.Close()
// this shouldn't be necessary as the position should be already 0 (Download
@@ -572,6 +589,14 @@ func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID
return
}
+ switch event.GetType() {
+ case events.SessionStartEvent:
+ recordingFormat <- "ssh"
+ case events.WindowsDesktopSessionStartEvent:
+ recordingFormat <- "desktop"
+ // TODO: other cases
+ }
+
if event.GetIndex() >= startIndex {
select {
case c <- event:
|
|
@zmb3 I've tried your proposed solution, and after discussing it with @greedy52, it seems that it raises a few concerns, given we're calling it directly:
Given this, we proposed a new solution that uses a callback (provided to the AuditLog through context so we don't change the function signature) called when the first event is available. Please take a look and see what you think. I've placed it in this draft PR. |
|
Closing this in favor of #50395 |
Related to #49164
Brief context: After the changes are made at #47309, when a request for session recording events (playback) is made, we query to find the last event to set the
SessionTypefield on the audit log. Depending on the cluster's backend and the volume of audit events, this can cause increased latency.This PR updates the logic of determining the session type by consuming (peeking) the first event of the stream.
changelog: Improve session playback initial delay caused by an additional events query.