Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/bifrost.go
Original file line number Diff line number Diff line change
Expand Up @@ -4123,6 +4123,9 @@ func (bifrost *Bifrost) RunStreamPreHooks(ctx *schemas.BifrostContext, req *sche
resp, bifrostErr := pipeline.RunPostLLMHooks(ctx, result, err, preCount)
if IsFinalChunk(ctx) {
drainAndAttachPluginLogs(ctx)
if traceID, ok := ctx.Value(schemas.BifrostContextKeyTraceID).(string); ok && strings.TrimSpace(traceID) != "" {
tracer.CompleteAndFlushTrace(strings.TrimSpace(traceID))
}
}
if bifrostErr != nil {
bifrostErr.PopulateExtraFields(req.RequestType, wsProvider, wsModel, wsModel)
Expand Down
129 changes: 129 additions & 0 deletions core/bifrost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2640,3 +2640,132 @@ func TestPluginPipelineStreamingRace(t *testing.T) {

wg.Wait()
}

// recordingTracer wraps NoOpTracer and counts CompleteAndFlushTrace calls.
type recordingTracer struct {
schemas.NoOpTracer
mu sync.Mutex
flushed []string
createOnce sync.Once
traceID string
}

func (r *recordingTracer) CreateTrace(_ string, _ ...string) string {
r.mu.Lock()
defer r.mu.Unlock()
r.traceID = "test-trace-id"
return r.traceID
}

func (r *recordingTracer) CompleteAndFlushTrace(traceID string) {
r.mu.Lock()
defer r.mu.Unlock()
r.flushed = append(r.flushed, traceID)
}

func (r *recordingTracer) flushedIDs() []string {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]string, len(r.flushed))
copy(out, r.flushed)
return out
}

// minimalBifrost returns a Bifrost instance with no providers and no plugins,
// suitable for testing RunStreamPreHooks without network calls.
func minimalBifrost(t *testing.T, tracer schemas.Tracer) *Bifrost {
t.Helper()
account := NewMockAccount()
cfg := schemas.BifrostConfig{
Account: account,
Logger: NewDefaultLogger(schemas.LogLevelError),
Tracer: tracer,
}
bf, err := Init(context.Background(), cfg)
if err != nil {
t.Fatalf("Init failed: %v", err)
}
return bf
}

// TestRunStreamPreHooks_PostHookRunner_FlushesTracerOnFinalChunk asserts that
// the postHookRunner returned by RunStreamPreHooks calls CompleteAndFlushTrace
// when the context carries BifrostContextKeyStreamEndIndicator = true.
//
// This is the regression test for the bug described in issue #2999: the
// happy-path WS postHookRunner was missing the CompleteAndFlushTrace call,
// which left audit log rows stuck in memory and never written to the store.
func TestRunStreamPreHooks_PostHookRunner_FlushesTracerOnFinalChunk(t *testing.T) {
rec := &recordingTracer{}
bf := minimalBifrost(t, rec)

ctx := schemas.NewBifrostContext(context.Background(), schemas.NoDeadline)

req := &schemas.BifrostRequest{
RequestType: schemas.WebSocketResponsesRequest,
ResponsesRequest: &schemas.BifrostResponsesRequest{
Provider: schemas.OpenAI,
Model: "gpt-4o",
},
}

hooks, bifrostErr := bf.RunStreamPreHooks(ctx, req)
if bifrostErr != nil {
t.Fatalf("RunStreamPreHooks returned unexpected error: %+v", bifrostErr)
}
if hooks == nil {
t.Fatal("RunStreamPreHooks returned nil hooks")
}
defer hooks.Cleanup()

// Verify no flush has happened yet (stream is not finished).
if ids := rec.flushedIDs(); len(ids) != 0 {
t.Fatalf("expected no flush before final chunk, got %v", ids)
}

// Mark context as final chunk and invoke postHookRunner.
ctx.SetValue(schemas.BifrostContextKeyStreamEndIndicator, true)
hooks.PostHookRunner(ctx, nil, nil)

// The tracer must have been flushed exactly once with the correct traceID.
ids := rec.flushedIDs()
if len(ids) != 1 {
t.Fatalf("expected exactly 1 CompleteAndFlushTrace call, got %d: %v", len(ids), ids)
}
if ids[0] != "test-trace-id" {
t.Errorf("expected traceID %q, got %q", "test-trace-id", ids[0])
}
}

// TestRunStreamPreHooks_PostHookRunner_NoFlushOnNonFinalChunk asserts that
// the postHookRunner does NOT call CompleteAndFlushTrace for non-final chunks.
func TestRunStreamPreHooks_PostHookRunner_NoFlushOnNonFinalChunk(t *testing.T) {
rec := &recordingTracer{}
bf := minimalBifrost(t, rec)

ctx := schemas.NewBifrostContext(context.Background(), schemas.NoDeadline)

req := &schemas.BifrostRequest{
RequestType: schemas.WebSocketResponsesRequest,
ResponsesRequest: &schemas.BifrostResponsesRequest{
Provider: schemas.OpenAI,
Model: "gpt-4o",
},
}

hooks, bifrostErr := bf.RunStreamPreHooks(ctx, req)
if bifrostErr != nil {
t.Fatalf("RunStreamPreHooks returned unexpected error: %+v", bifrostErr)
}
if hooks == nil {
t.Fatal("RunStreamPreHooks returned nil hooks")
}
defer hooks.Cleanup()

// Invoke without setting the stream end indicator — this is a mid-stream chunk.
hooks.PostHookRunner(ctx, nil, nil)

if ids := rec.flushedIDs(); len(ids) != 0 {
t.Fatalf("expected no CompleteAndFlushTrace call on non-final chunk, got %v", ids)
}
}