diff --git a/core/bifrost.go b/core/bifrost.go index 34eadfeac2..dd1b7cf84c 100644 --- a/core/bifrost.go +++ b/core/bifrost.go @@ -4123,6 +4123,9 @@ func (bifrost *Bifrost) RunStreamPreHooks(ctx *schemas.BifrostContext, req *sche resp, bifrostErr := pipeline.RunPostLLMHooks(ctx, result, err, preCount) if IsFinalChunk(ctx) { drainAndAttachPluginLogs(ctx) + if traceID, ok := ctx.Value(schemas.BifrostContextKeyTraceID).(string); ok && strings.TrimSpace(traceID) != "" { + tracer.CompleteAndFlushTrace(strings.TrimSpace(traceID)) + } } if bifrostErr != nil { bifrostErr.PopulateExtraFields(req.RequestType, wsProvider, wsModel, wsModel) diff --git a/core/bifrost_test.go b/core/bifrost_test.go index 3dc68bfbec..87ab32b30a 100644 --- a/core/bifrost_test.go +++ b/core/bifrost_test.go @@ -2640,3 +2640,132 @@ func TestPluginPipelineStreamingRace(t *testing.T) { wg.Wait() } + +// recordingTracer wraps NoOpTracer and counts CompleteAndFlushTrace calls. +type recordingTracer struct { + schemas.NoOpTracer + mu sync.Mutex + flushed []string + createOnce sync.Once + traceID string +} + +func (r *recordingTracer) CreateTrace(_ string, _ ...string) string { + r.mu.Lock() + defer r.mu.Unlock() + r.traceID = "test-trace-id" + return r.traceID +} + +func (r *recordingTracer) CompleteAndFlushTrace(traceID string) { + r.mu.Lock() + defer r.mu.Unlock() + r.flushed = append(r.flushed, traceID) +} + +func (r *recordingTracer) flushedIDs() []string { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]string, len(r.flushed)) + copy(out, r.flushed) + return out +} + +// minimalBifrost returns a Bifrost instance with no providers and no plugins, +// suitable for testing RunStreamPreHooks without network calls. +func minimalBifrost(t *testing.T, tracer schemas.Tracer) *Bifrost { + t.Helper() + account := NewMockAccount() + cfg := schemas.BifrostConfig{ + Account: account, + Logger: NewDefaultLogger(schemas.LogLevelError), + Tracer: tracer, + } + bf, err := Init(context.Background(), cfg) + if err != nil { + t.Fatalf("Init failed: %v", err) + } + return bf +} + +// TestRunStreamPreHooks_PostHookRunner_FlushesTracerOnFinalChunk asserts that +// the postHookRunner returned by RunStreamPreHooks calls CompleteAndFlushTrace +// when the context carries BifrostContextKeyStreamEndIndicator = true. +// +// This is the regression test for the bug described in issue #2999: the +// happy-path WS postHookRunner was missing the CompleteAndFlushTrace call, +// which left audit log rows stuck in memory and never written to the store. +func TestRunStreamPreHooks_PostHookRunner_FlushesTracerOnFinalChunk(t *testing.T) { + rec := &recordingTracer{} + bf := minimalBifrost(t, rec) + + ctx := schemas.NewBifrostContext(context.Background(), schemas.NoDeadline) + + req := &schemas.BifrostRequest{ + RequestType: schemas.WebSocketResponsesRequest, + ResponsesRequest: &schemas.BifrostResponsesRequest{ + Provider: schemas.OpenAI, + Model: "gpt-4o", + }, + } + + hooks, bifrostErr := bf.RunStreamPreHooks(ctx, req) + if bifrostErr != nil { + t.Fatalf("RunStreamPreHooks returned unexpected error: %+v", bifrostErr) + } + if hooks == nil { + t.Fatal("RunStreamPreHooks returned nil hooks") + } + defer hooks.Cleanup() + + // Verify no flush has happened yet (stream is not finished). + if ids := rec.flushedIDs(); len(ids) != 0 { + t.Fatalf("expected no flush before final chunk, got %v", ids) + } + + // Mark context as final chunk and invoke postHookRunner. + ctx.SetValue(schemas.BifrostContextKeyStreamEndIndicator, true) + hooks.PostHookRunner(ctx, nil, nil) + + // The tracer must have been flushed exactly once with the correct traceID. + ids := rec.flushedIDs() + if len(ids) != 1 { + t.Fatalf("expected exactly 1 CompleteAndFlushTrace call, got %d: %v", len(ids), ids) + } + if ids[0] != "test-trace-id" { + t.Errorf("expected traceID %q, got %q", "test-trace-id", ids[0]) + } +} + +// TestRunStreamPreHooks_PostHookRunner_NoFlushOnNonFinalChunk asserts that +// the postHookRunner does NOT call CompleteAndFlushTrace for non-final chunks. +func TestRunStreamPreHooks_PostHookRunner_NoFlushOnNonFinalChunk(t *testing.T) { + rec := &recordingTracer{} + bf := minimalBifrost(t, rec) + + ctx := schemas.NewBifrostContext(context.Background(), schemas.NoDeadline) + + req := &schemas.BifrostRequest{ + RequestType: schemas.WebSocketResponsesRequest, + ResponsesRequest: &schemas.BifrostResponsesRequest{ + Provider: schemas.OpenAI, + Model: "gpt-4o", + }, + } + + hooks, bifrostErr := bf.RunStreamPreHooks(ctx, req) + if bifrostErr != nil { + t.Fatalf("RunStreamPreHooks returned unexpected error: %+v", bifrostErr) + } + if hooks == nil { + t.Fatal("RunStreamPreHooks returned nil hooks") + } + defer hooks.Cleanup() + + // Invoke without setting the stream end indicator — this is a mid-stream chunk. + hooks.PostHookRunner(ctx, nil, nil) + + if ids := rec.flushedIDs(); len(ids) != 0 { + t.Fatalf("expected no CompleteAndFlushTrace call on non-final chunk, got %v", ids) + } +}