From 7939f85b1b2bcd6f24d28d63e6a943f43de869a3 Mon Sep 17 00:00:00 2001 From: akshaydeo Date: Mon, 23 Mar 2026 05:32:15 +0530 Subject: [PATCH] logging in plugins --- core/bifrost.go | 99 ++++++++- core/schemas/bifrost.go | 23 +++ core/schemas/context.go | 158 +++++++++++++- core/schemas/context_test.go | 122 +++++++++++ core/schemas/trace.go | 36 ++++ core/schemas/tracer.go | 12 +- examples/plugins/hello-world/main.go | 23 ++- framework/logstore/migrations.go | 37 ++++ framework/logstore/rdb.go | 14 ++ framework/logstore/store.go | 1 + framework/logstore/tables.go | 3 +- framework/tracing/store.go | 14 +- framework/tracing/tracer.go | 16 +- plugins/logging/main.go | 124 ++++++++--- plugins/logging/operations_test.go | 68 ++++++ plugins/logging/writer.go | 21 +- .../bifrost-http/handlers/middlewares.go | 130 +++++++++--- transports/bifrost-http/lib/config.go | 14 +- transports/bifrost-http/server/server.go | 12 +- .../workspace/logs/sheets/logDetailsSheet.tsx | 195 +++++++++++------- .../workspace/logs/views/pluginLogsView.tsx | 75 +++++++ ui/lib/types/logs.ts | 8 + 22 files changed, 1042 insertions(+), 163 deletions(-) create mode 100644 ui/app/workspace/logs/views/pluginLogsView.tsx diff --git a/core/bifrost.go b/core/bifrost.go index 6e42fe25e5..bfb5ed57e1 100644 --- a/core/bifrost.go +++ b/core/bifrost.go @@ -152,6 +152,9 @@ type PluginPipeline struct { postHookTimings map[string]*pluginTimingAccumulator // keyed by plugin name postHookPluginOrder []string // order in which post-hooks ran (for nested span creation) chunkCount int + + // Plugin logging: cached scoped contexts for streaming post-hooks (reused across chunks) + streamScopedCtxs map[string]*schemas.BifrostContext } // pluginTimingAccumulator accumulates timing information for a plugin across streaming chunks @@ -3801,6 +3804,7 @@ func (bifrost *Bifrost) RunStreamPreHooks(ctx *schemas.BifrostContext, req *sche if shortCircuit != nil { if shortCircuit.Error != nil { _, bifrostErr := pipeline.RunPostLLMHooks(ctx, nil, shortCircuit.Error, preCount) + drainAndAttachPluginLogs(ctx) cleanup() if bifrostErr != nil { return nil, bifrostErr @@ -3809,6 +3813,7 @@ func (bifrost *Bifrost) RunStreamPreHooks(ctx *schemas.BifrostContext, req *sche } if shortCircuit.Response != nil { resp, bifrostErr := pipeline.RunPostLLMHooks(ctx, shortCircuit.Response, nil, preCount) + drainAndAttachPluginLogs(ctx) cleanup() if bifrostErr != nil { return nil, bifrostErr @@ -3821,7 +3826,11 @@ func (bifrost *Bifrost) RunStreamPreHooks(ctx *schemas.BifrostContext, req *sche } postHookRunner := func(ctx *schemas.BifrostContext, result *schemas.BifrostResponse, err *schemas.BifrostError) (*schemas.BifrostResponse, *schemas.BifrostError) { - return pipeline.RunPostLLMHooks(ctx, result, err, preCount) + resp, bifrostErr := pipeline.RunPostLLMHooks(ctx, result, err, preCount) + if IsFinalChunk(ctx) { + drainAndAttachPluginLogs(ctx) + } + return resp, bifrostErr } return &WSStreamHooks{ @@ -4294,6 +4303,7 @@ func (bifrost *Bifrost) tryRequest(ctx *schemas.BifrostContext, req *schemas.Bif // Handle short-circuit with response (success case) if shortCircuit.Response != nil { resp, bifrostErr := pipeline.RunPostLLMHooks(ctx, shortCircuit.Response, nil, preCount) + drainAndAttachPluginLogs(ctx) if bifrostErr != nil { return nil, bifrostErr } @@ -4302,6 +4312,7 @@ func (bifrost *Bifrost) tryRequest(ctx *schemas.BifrostContext, req *schemas.Bif // Handle short-circuit with error if shortCircuit.Error != nil { resp, bifrostErr := pipeline.RunPostLLMHooks(ctx, nil, shortCircuit.Error, preCount) + drainAndAttachPluginLogs(ctx) if bifrostErr != nil { return nil, bifrostErr } @@ -4409,6 +4420,7 @@ func (bifrost *Bifrost) tryRequest(ctx *schemas.BifrostContext, req *schemas.Bif select { case result = <-msg.Response: resp, bifrostErr := pipeline.RunPostLLMHooks(msg.Context, result, nil, pluginCount) + drainAndAttachPluginLogs(msg.Context) if bifrostErr != nil { bifrost.releaseChannelMessage(msg) return nil, bifrostErr @@ -4425,6 +4437,7 @@ func (bifrost *Bifrost) tryRequest(ctx *schemas.BifrostContext, req *schemas.Bif case bifrostErrVal := <-msg.Err: bifrostErrPtr := &bifrostErrVal resp, bifrostErrPtr = pipeline.RunPostLLMHooks(msg.Context, nil, bifrostErrPtr, pluginCount) + drainAndAttachPluginLogs(msg.Context) bifrost.releaseChannelMessage(msg) // Drop raw request/response on error path too if drop, ok := ctx.Value(schemas.BifrostContextKeyRawRequestResponseForLogging).(bool); ok && drop { @@ -4505,13 +4518,19 @@ func (bifrost *Bifrost) tryStreamRequest(ctx *schemas.BifrostContext, req *schem } pipeline := bifrost.getPluginPipeline() - defer bifrost.releasePluginPipeline(pipeline) + releasePipeline := true + defer func() { + if releasePipeline { + bifrost.releasePluginPipeline(pipeline) + } + }() preReq, shortCircuit, preCount := pipeline.RunLLMPreHooks(ctx, req) if shortCircuit != nil { // Handle short-circuit with response (success case) if shortCircuit.Response != nil { resp, bifrostErr := pipeline.RunPostLLMHooks(ctx, shortCircuit.Response, nil, preCount) + drainAndAttachPluginLogs(ctx) if bifrostErr != nil { return nil, bifrostErr } @@ -4520,13 +4539,23 @@ func (bifrost *Bifrost) tryStreamRequest(ctx *schemas.BifrostContext, req *schem // Handle short-circuit with stream if shortCircuit.Stream != nil { outputStream := make(chan *schemas.BifrostStreamChunk) + releasePipeline = false // pipeline is released inside the goroutine after stream drains // Create a post hook runner cause pipeline object is put back in the pool on defer pipelinePostHookRunner := func(ctx *schemas.BifrostContext, result *schemas.BifrostResponse, err *schemas.BifrostError) (*schemas.BifrostResponse, *schemas.BifrostError) { - return pipeline.RunPostLLMHooks(ctx, result, err, preCount) + resp, bifrostErr := pipeline.RunPostLLMHooks(ctx, result, err, preCount) + if IsFinalChunk(ctx) { + drainAndAttachPluginLogs(ctx) + } + return resp, bifrostErr } go func() { + defer func() { + drainAndAttachPluginLogs(ctx) // ensure logs are drained even if stream closes without a final chunk + pipeline.FinalizeStreamingPostHookSpans(ctx) + bifrost.releasePluginPipeline(pipeline) + }() defer close(outputStream) for streamMsg := range shortCircuit.Stream { @@ -4574,6 +4603,7 @@ func (bifrost *Bifrost) tryStreamRequest(ctx *schemas.BifrostContext, req *schem // Handle short-circuit with error if shortCircuit.Error != nil { resp, bifrostErr := pipeline.RunPostLLMHooks(ctx, nil, shortCircuit.Error, preCount) + drainAndAttachPluginLogs(ctx) if bifrostErr != nil { return nil, bifrostErr } @@ -4689,6 +4719,7 @@ func (bifrost *Bifrost) tryStreamRequest(ctx *schemas.BifrostContext, req *schem ctx.SetValue(schemas.BifrostContextKeyStreamEndIndicator, true) // On error we will complete post-hooks recoveredResp, recoveredErr := pipeline.RunPostLLMHooks(ctx, nil, &bifrostErrVal, len(*bifrost.llmPlugins.Load())) + drainAndAttachPluginLogs(ctx) bifrost.releaseChannelMessage(msg) if recoveredErr != nil { return nil, recoveredErr @@ -5022,6 +5053,9 @@ func (bifrost *Bifrost) requestWorker(provider schemas.Provider, config *schemas pipeline = bifrost.getPluginPipeline() postHookRunner = func(ctx *schemas.BifrostContext, result *schemas.BifrostResponse, err *schemas.BifrostError) (*schemas.BifrostResponse, *schemas.BifrostError) { resp, bifrostErr := pipeline.RunPostLLMHooks(ctx, result, err, len(*bifrost.llmPlugins.Load())) + if IsFinalChunk(ctx) { + drainAndAttachPluginLogs(ctx) + } if bifrostErr != nil { return nil, bifrostErr } @@ -5445,6 +5479,7 @@ func (bifrost *Bifrost) handleMCPToolExecution(ctx *schemas.BifrostContext, mcpR // Handle short-circuit with response (success case) if shortCircuit.Response != nil { finalMcpResp, bifrostErr := pipeline.RunMCPPostHooks(ctx, shortCircuit.Response, nil, preCount) + drainAndAttachPluginLogs(ctx) if bifrostErr != nil { return nil, bifrostErr } @@ -5454,6 +5489,7 @@ func (bifrost *Bifrost) handleMCPToolExecution(ctx *schemas.BifrostContext, mcpR if shortCircuit.Error != nil { // Capture post-hook results to respect transformations or recovery finalResp, finalErr := pipeline.RunMCPPostHooks(ctx, nil, shortCircuit.Error, preCount) + drainAndAttachPluginLogs(ctx) // Return post-hook error if present (post-hook may have transformed the error) if finalErr != nil { return nil, finalErr @@ -5513,6 +5549,7 @@ func (bifrost *Bifrost) handleMCPToolExecution(ctx *schemas.BifrostContext, mcpR // Run post-hooks finalResp, finalErr := pipeline.RunMCPPostHooks(ctx, mcpResp, bifrostErr, preCount) + drainAndAttachPluginLogs(ctx) if finalErr != nil { return nil, finalErr @@ -5577,7 +5614,9 @@ func (p *PluginPipeline) RunLLMPreHooks(ctx *schemas.BifrostContext, req *schema } } - req, shortCircuit, err = plugin.PreLLMHook(ctx, req) + pluginCtx := ctx.WithPluginScope(&pluginName) + req, shortCircuit, err = plugin.PreLLMHook(pluginCtx, req) + pluginCtx.ReleasePluginScope() // End span with appropriate status if err != nil { @@ -5628,8 +5667,17 @@ func (p *PluginPipeline) RunPostLLMHooks(ctx *schemas.BifrostContext, resp *sche p.logger.Debug("running post-hook for plugin %s", pluginName) if isStreaming { // For streaming: accumulate timing, don't create individual spans per chunk + // Lazily create cached scoped contexts on first chunk (reused across all chunks) + if p.streamScopedCtxs == nil { + p.streamScopedCtxs = make(map[string]*schemas.BifrostContext, len(p.llmPlugins)) + for _, pl := range p.llmPlugins { + name := pl.GetName() + p.streamScopedCtxs[name] = ctx.WithPluginScope(&name) + } + } + pluginCtx := p.streamScopedCtxs[pluginName] start := time.Now() - resp, bifrostErr, err = plugin.PostLLMHook(ctx, resp, bifrostErr) + resp, bifrostErr, err = plugin.PostLLMHook(pluginCtx, resp, bifrostErr) duration := time.Since(start) p.accumulatePluginTiming(pluginName, duration, err != nil) @@ -5646,7 +5694,9 @@ func (p *PluginPipeline) RunPostLLMHooks(ctx *schemas.BifrostContext, resp *sche ctx.SetValue(schemas.BifrostContextKeySpanID, spanID) } } - resp, bifrostErr, err = plugin.PostLLMHook(ctx, resp, bifrostErr) + pluginCtx := ctx.WithPluginScope(&pluginName) + resp, bifrostErr, err = plugin.PostLLMHook(pluginCtx, resp, bifrostErr) + pluginCtx.ReleasePluginScope() // End span with appropriate status if err != nil { p.tracer.SetAttribute(handle, "error", err.Error()) @@ -5700,7 +5750,9 @@ func (p *PluginPipeline) RunMCPPreHooks(ctx *schemas.BifrostContext, req *schema } } - req, shortCircuit, err = plugin.PreMCPHook(ctx, req) + pluginCtx := ctx.WithPluginScope(&pluginName) + req, shortCircuit, err = plugin.PreMCPHook(pluginCtx, req) + pluginCtx.ReleasePluginScope() // End span with appropriate status if err != nil { @@ -5755,7 +5807,9 @@ func (p *PluginPipeline) RunMCPPostHooks(ctx *schemas.BifrostContext, mcpResp *s } } - mcpResp, bifrostErr, err = plugin.PostMCPHook(ctx, mcpResp, bifrostErr) + pluginCtx := ctx.WithPluginScope(&pluginName) + mcpResp, bifrostErr, err = plugin.PostMCPHook(pluginCtx, mcpResp, bifrostErr) + pluginCtx.ReleasePluginScope() // End span with appropriate status if err != nil { @@ -5781,7 +5835,11 @@ func (p *PluginPipeline) RunMCPPostHooks(ctx *schemas.BifrostContext, mcpResp *s return mcpResp, nil } -// resetPluginPipeline resets a PluginPipeline instance for reuse +// resetPluginPipeline resets a PluginPipeline instance for reuse. +// IMPORTANT: drainAndAttachPluginLogs must be called on the root BifrostContext +// BEFORE this method, because it calls ReleasePluginScope on cached scoped contexts +// which nils out their pluginLogs pointer. The drain reads from the shared store +// on the root context, so it must happen while the store is still referenced. func (p *PluginPipeline) resetPluginPipeline() { p.executedPreHooks = 0 p.preHookErrors = p.preHookErrors[:0] @@ -5792,6 +5850,25 @@ func (p *PluginPipeline) resetPluginPipeline() { clear(p.postHookTimings) } p.postHookPluginOrder = p.postHookPluginOrder[:0] + // Release cached scoped contexts for streaming + for _, scopedCtx := range p.streamScopedCtxs { + scopedCtx.ReleasePluginScope() + } + p.streamScopedCtxs = nil +} + +// drainAndAttachPluginLogs drains accumulated plugin logs from the BifrostContext +// and attaches them to the trace for later retrieval by observability plugins. +func drainAndAttachPluginLogs(ctx *schemas.BifrostContext) { + tracer, traceID, err := GetTracerFromContext(ctx) + if err != nil || tracer == nil || traceID == "" { + return + } + logs := ctx.DrainPluginLogs() + if len(logs) == 0 { + return + } + tracer.AttachPluginLogs(traceID, logs) } // accumulatePluginTiming accumulates timing for a plugin during streaming @@ -5883,7 +5960,9 @@ func (bifrost *Bifrost) getPluginPipeline() *PluginPipeline { return pipeline } -// releasePluginPipeline returns a PluginPipeline to the pool +// releasePluginPipeline returns a PluginPipeline to the pool. +// Caller must ensure drainAndAttachPluginLogs has already been called on the +// associated BifrostContext before calling this method. func (bifrost *Bifrost) releasePluginPipeline(pipeline *PluginPipeline) { pipeline.resetPluginPipeline() bifrost.pluginPipelinePool.Put(pipeline) diff --git a/core/schemas/bifrost.go b/core/schemas/bifrost.go index 9279a21f81..4dc9dc7973 100644 --- a/core/schemas/bifrost.go +++ b/core/schemas/bifrost.go @@ -220,6 +220,8 @@ const ( BifrostContextKeyPassthroughExtraParams BifrostContextKey = "bifrost-passthrough-extra-params" // bool BifrostContextKeyRoutingEnginesUsed BifrostContextKey = "bifrost-routing-engines-used" // []string (set by bifrost - DO NOT SET THIS MANUALLY) - list of routing engines used ("routing-rule", "governance", "loadbalancing", etc.) BifrostContextKeyRoutingEngineLogs BifrostContextKey = "bifrost-routing-engine-logs" // []RoutingEngineLogEntry (set by bifrost - DO NOT SET THIS MANUALLY) - list of routing engine log entries + BifrostContextKeyTransportPluginLogs BifrostContextKey = "bifrost-transport-plugin-logs" // []PluginLogEntry (transport-layer plugin logs accumulated during HTTP transport hooks) + BifrostContextKeyTransportPostHookCompleter BifrostContextKey = "bifrost-transport-posthook-completer" // func() (callback to run HTTPTransportPostHook after streaming - set by transport interceptor middleware) BifrostContextKeySkipPluginPipeline BifrostContextKey = "bifrost-skip-plugin-pipeline" // bool - skip plugin pipeline for the request BifrostIsAsyncRequest BifrostContextKey = "bifrost-is-async-request" // bool (set by bifrost - DO NOT SET THIS MANUALLY)) - whether the request is an async request (only used in gateway) BifrostContextKeyRequestHeaders BifrostContextKey = "bifrost-request-headers" // map[string]string (all request headers with lowercased keys) @@ -276,6 +278,27 @@ type RoutingEngineLogEntry struct { Timestamp int64 // Unix milliseconds } +// PluginLogEntry represents a structured log entry emitted by a plugin via ctx.Log(). +type PluginLogEntry struct { + PluginName string `json:"plugin_name"` + Level LogLevel `json:"level"` + Message string `json:"message"` + Timestamp int64 `json:"timestamp"` // Unix milliseconds +} + +// GroupPluginLogsByName groups a flat slice of plugin log entries by plugin name. +// Returns nil if the input is empty. +func GroupPluginLogsByName(logs []PluginLogEntry) map[string][]PluginLogEntry { + if len(logs) == 0 { + return nil + } + grouped := make(map[string][]PluginLogEntry, min(len(logs), 4)) + for _, entry := range logs { + grouped[entry.PluginName] = append(grouped[entry.PluginName], entry) + } + return grouped +} + // NOTE: for custom plugin implementation dealing with streaming short circuit, // make sure to mark BifrostContextKeyStreamEndIndicator as true at the end of the stream. diff --git a/core/schemas/context.go b/core/schemas/context.go index 1ff4663eae..325f911762 100644 --- a/core/schemas/context.go +++ b/core/schemas/context.go @@ -26,6 +26,28 @@ var reservedKeys = []any{ BifrostContextKeyDeferTraceCompletion, } +// pluginLogStore holds plugin log entries accumulated during request processing. +// It is shared between the root BifrostContext and all scoped contexts derived from it. +// Uses a flat slice (not map) to minimize heap allocations. +type pluginLogStore struct { + mu sync.Mutex + logs []PluginLogEntry +} + +// pluginLogStorePool pools pluginLogStore instances to reduce per-request allocations. +var pluginLogStorePool = sync.Pool{ + New: func() any { + return &pluginLogStore{logs: make([]PluginLogEntry, 0, 8)} + }, +} + +// pluginScopePool pools BifrostContext instances used as scoped plugin contexts. +var pluginScopePool = sync.Pool{ + New: func() any { + return &BifrostContext{} + }, +} + // BifrostContext is a custom context.Context implementation that tracks user-set values. // It supports deadlines, can be derived from other contexts, and provides layered // value inheritance when derived from another BifrostContext. @@ -40,6 +62,11 @@ type BifrostContext struct { userValues map[any]any valuesMu sync.RWMutex blockRestrictedWrites atomic.Bool + + // Plugin scoping fields + pluginScope *string // Non-nil when this is a scoped plugin context + pluginLogs atomic.Pointer[pluginLogStore] // Shared log store; lazily initialized on root, shared by scoped contexts + valueDelegate *BifrostContext // For scoped contexts: delegate Value/SetValue to this root context } // NewBifrostContext creates a new BifrostContext with the given parent context and deadline. @@ -166,8 +193,12 @@ func (bc *BifrostContext) cancel(err error) { } // Deadline returns the deadline for this context. +// For scoped contexts, delegates to the root context. // If both this context and the parent have deadlines, the earlier one is returned. func (bc *BifrostContext) Deadline() (time.Time, bool) { + if bc.valueDelegate != nil { + return bc.valueDelegate.Deadline() + } parentDeadline, parentHasDeadline := bc.parent.Deadline() if !bc.hasDeadline && !parentHasDeadline { @@ -195,16 +226,24 @@ func (bc *BifrostContext) Done() <-chan struct{} { } // Err returns the error explaining why the context was cancelled. +// For scoped contexts, delegates to the root context. // Returns nil if the context has not been cancelled. func (bc *BifrostContext) Err() error { + if bc.valueDelegate != nil { + return bc.valueDelegate.Err() + } bc.errMu.RLock() defer bc.errMu.RUnlock() return bc.err } // Value returns the value associated with the key. -// It first checks the internal userValues map, then delegates to the parent context. +// For scoped contexts, delegates to the root context via valueDelegate. +// Otherwise checks the internal userValues map, then delegates to the parent context. func (bc *BifrostContext) Value(key any) any { + if bc.valueDelegate != nil { + return bc.valueDelegate.Value(key) + } bc.valuesMu.RLock() if val, ok := bc.userValues[key]; ok { bc.valuesMu.RUnlock() @@ -216,8 +255,13 @@ func (bc *BifrostContext) Value(key any) any { } // SetValue sets a value in the internal userValues map. +// For scoped contexts, delegates to the root context via valueDelegate. // This is thread-safe and can be called concurrently. func (bc *BifrostContext) SetValue(key, value any) { + if bc.valueDelegate != nil { + bc.valueDelegate.SetValue(key, value) + return + } // Check if the key is a reserved key if bc.blockRestrictedWrites.Load() && slices.Contains(reservedKeys, key) { // we silently drop writes for these reserved keys @@ -232,7 +276,12 @@ func (bc *BifrostContext) SetValue(key, value any) { } // ClearValue clears a value from the internal userValues map. +// For scoped contexts, delegates to the root context via valueDelegate. func (bc *BifrostContext) ClearValue(key any) { + if bc.valueDelegate != nil { + bc.valueDelegate.ClearValue(key) + return + } // Check if the key is a reserved key if bc.blockRestrictedWrites.Load() && slices.Contains(reservedKeys, key) { // we silently drop writes for these reserved keys @@ -245,8 +294,12 @@ func (bc *BifrostContext) ClearValue(key any) { } } -// GetAndSetValue gets a value from the internal userValues map and sets it +// GetAndSetValue gets a value from the internal userValues map and sets it. +// For scoped contexts, delegates to the root context via valueDelegate. func (bc *BifrostContext) GetAndSetValue(key any, value any) any { + if bc.valueDelegate != nil { + return bc.valueDelegate.GetAndSetValue(key, value) + } bc.valuesMu.Lock() defer bc.valuesMu.Unlock() // Check if the key is a reserved key @@ -340,3 +393,104 @@ func AppendToContextList[T any](ctx *BifrostContext, key BifrostContextKey, valu } ctx.SetValue(key, append(existingValues, value)) } + +// WithPluginScope returns a lightweight scoped BifrostContext from the pool. +// The scoped context shares the root's pluginLogs store and delegates all +// Value/SetValue operations to the root context. +// Call ReleasePluginScope() when done to return the scoped context to the pool. +func (bc *BifrostContext) WithPluginScope(name *string) *BifrostContext { + // Lazily initialize the plugin log store on the root context (CAS to avoid race) + if bc.pluginLogs.Load() == nil { + newStore := pluginLogStorePool.Get().(*pluginLogStore) + if !bc.pluginLogs.CompareAndSwap(nil, newStore) { + // Another goroutine initialized first — return unused store to pool + pluginLogStorePool.Put(newStore) + } + } + + scoped := pluginScopePool.Get().(*BifrostContext) + scoped.parent = bc.parent + scoped.done = bc.done + scoped.pluginScope = name + scoped.pluginLogs.Store(bc.pluginLogs.Load()) + scoped.valueDelegate = bc + return scoped +} + +// ReleasePluginScope returns a scoped context to the pool. +// Safe no-op if called on a non-scoped context. +// Do not use the scoped context after calling this method. +func (bc *BifrostContext) ReleasePluginScope() { + if bc.valueDelegate == nil { + return // not a scoped context + } + bc.parent = nil + bc.done = nil + bc.pluginScope = nil + bc.pluginLogs.Store(nil) + bc.valueDelegate = nil + pluginScopePool.Put(bc) +} + +// Log appends a structured log entry for the current plugin scope. +// No-op if the context is not scoped to a plugin or has no log store. +func (bc *BifrostContext) Log(level LogLevel, msg string) { + store := bc.pluginLogs.Load() + if bc.pluginScope == nil || store == nil { + return + } + store.mu.Lock() + store.logs = append(store.logs, PluginLogEntry{ + PluginName: *bc.pluginScope, + Level: level, + Message: msg, + Timestamp: time.Now().UnixMilli(), + }) + store.mu.Unlock() +} + +// GetPluginLogs returns a deep copy of all accumulated plugin log entries. +// Thread-safe. Returns nil if no logs have been recorded. +func (bc *BifrostContext) GetPluginLogs() []PluginLogEntry { + store := bc.pluginLogs.Load() + if store == nil { + return nil + } + store.mu.Lock() + defer store.mu.Unlock() + if len(store.logs) == 0 { + return nil + } + copied := make([]PluginLogEntry, len(store.logs)) + copy(copied, store.logs) + return copied +} + +// DrainPluginLogs transfers ownership of the plugin log slice to the caller. +// The internal log store is returned to the pool after draining. +// Returns nil if no logs have been recorded. +// This should be called once on the root context after all plugin hooks have completed. +func (bc *BifrostContext) DrainPluginLogs() []PluginLogEntry { + if bc.valueDelegate != nil { + return nil // scoped contexts must not drain the shared log store + } + store := bc.pluginLogs.Load() + if store == nil { + return nil + } + bc.pluginLogs.Store(nil) + + store.mu.Lock() + logs := store.logs + // Reset with fresh pre-allocated slice before returning to pool + store.logs = make([]PluginLogEntry, 0, 8) + store.mu.Unlock() + + // Return the store to the pool for reuse + pluginLogStorePool.Put(store) + + if len(logs) == 0 { + return nil + } + return logs +} diff --git a/core/schemas/context_test.go b/core/schemas/context_test.go index 75e52e2061..108da2ced0 100644 --- a/core/schemas/context_test.go +++ b/core/schemas/context_test.go @@ -207,3 +207,125 @@ func TestNewBifrostContext_NilParent(t *testing.T) { t.Errorf("Cancelled context should have Canceled error, got %v", ctx.Err()) } } + +// Plugin logging tests + +func TestPluginLog_NoScopeIsNoop(t *testing.T) { + ctx := NewBifrostContext(context.Background(), NoDeadline) + ctx.Log(LogLevelInfo, "should be ignored") + logs := ctx.GetPluginLogs() + if logs != nil { + t.Errorf("expected nil logs without plugin scope, got %v", logs) + } +} + +func TestPluginLog_SinglePlugin(t *testing.T) { + ctx := NewBifrostContext(context.Background(), NoDeadline) + name := "test-plugin" + scoped := ctx.WithPluginScope(&name) + scoped.Log(LogLevelInfo, "hello") + scoped.Log(LogLevelError, "oops") + scoped.ReleasePluginScope() + + logs := ctx.GetPluginLogs() + if len(logs) != 2 { + t.Fatalf("expected 2 logs, got %d", len(logs)) + } + if logs[0].PluginName != "test-plugin" || logs[0].Level != LogLevelInfo || logs[0].Message != "hello" { + t.Errorf("unexpected first log: %+v", logs[0]) + } + if logs[1].Level != LogLevelError || logs[1].Message != "oops" { + t.Errorf("unexpected second log: %+v", logs[1]) + } +} + +func TestPluginLog_MultiplePlugins(t *testing.T) { + ctx := NewBifrostContext(context.Background(), NoDeadline) + + name1 := "plugin-a" + s1 := ctx.WithPluginScope(&name1) + s1.Log(LogLevelDebug, "a-msg") + s1.ReleasePluginScope() + + name2 := "plugin-b" + s2 := ctx.WithPluginScope(&name2) + s2.Log(LogLevelWarn, "b-msg") + s2.ReleasePluginScope() + + logs := ctx.GetPluginLogs() + if len(logs) != 2 { + t.Fatalf("expected 2 logs, got %d", len(logs)) + } + if logs[0].PluginName != "plugin-a" { + t.Errorf("expected plugin-a, got %s", logs[0].PluginName) + } + if logs[1].PluginName != "plugin-b" { + t.Errorf("expected plugin-b, got %s", logs[1].PluginName) + } +} + +func TestPluginLog_DrainTransfersOwnership(t *testing.T) { + ctx := NewBifrostContext(context.Background(), NoDeadline) + name := "drain-test" + scoped := ctx.WithPluginScope(&name) + scoped.Log(LogLevelInfo, "msg1") + scoped.ReleasePluginScope() + + drained := ctx.DrainPluginLogs() + if len(drained) != 1 { + t.Fatalf("expected 1 drained log, got %d", len(drained)) + } + + // After drain, GetPluginLogs should return nil + after := ctx.GetPluginLogs() + if after != nil { + t.Errorf("expected nil after drain, got %v", after) + } + + // Second drain should return nil + second := ctx.DrainPluginLogs() + if second != nil { + t.Errorf("expected nil on second drain, got %v", second) + } +} + +func TestPluginLog_ScopedContextValueDelegation(t *testing.T) { + ctx := NewBifrostContext(context.Background(), NoDeadline) + ctx.SetValue(BifrostContextKeyTraceID, "trace-123") + + name := "delegate-test" + scoped := ctx.WithPluginScope(&name) + + // Scoped should read from root + val := scoped.Value(BifrostContextKeyTraceID) + if val != "trace-123" { + t.Errorf("expected trace-123, got %v", val) + } + + // Scoped should write to root + type testContextKey string + const customKey testContextKey = "custom-key" + scoped.SetValue(customKey, "custom-val") + if ctx.Value(customKey) != "custom-val" { + t.Errorf("SetValue on scoped did not delegate to root") + } + + scoped.ReleasePluginScope() +} + +func TestPluginLog_PoolReuse(t *testing.T) { + ctx := NewBifrostContext(context.Background(), NoDeadline) + + // Create and release multiple scoped contexts to exercise the pool + for i := 0; i < 100; i++ { + name := "pool-test" + scoped := ctx.WithPluginScope(&name) + scoped.Log(LogLevelInfo, "pooled") + scoped.ReleasePluginScope() + } + + logs := ctx.DrainPluginLogs() + if len(logs) != 100 { + t.Errorf("expected 100 logs from pool reuse, got %d", len(logs)) + } +} diff --git a/core/schemas/trace.go b/core/schemas/trace.go index 9a69980d3c..d6862d4d4e 100644 --- a/core/schemas/trace.go +++ b/core/schemas/trace.go @@ -8,6 +8,7 @@ import ( // Trace represents a distributed trace that captures the full lifecycle of a request type Trace struct { + RequestID string // Request ID for the trace TraceID string // Unique identifier for this trace ParentID string // Parent trace ID from incoming W3C traceparent header RootSpan *Span // The root span of this trace @@ -15,6 +16,7 @@ type Trace struct { StartTime time.Time // When the trace started EndTime time.Time // When the trace completed Attributes map[string]any // Additional attributes for the trace + PluginLogs []PluginLogEntry // Plugin log entries accumulated during request processing mu sync.Mutex // Mutex for thread-safe span operations } @@ -37,15 +39,49 @@ func (t *Trace) GetSpan(spanID string) *Span { return nil } +// GetRequestID retrieves the request ID from the trace +func (t *Trace) GetRequestID() string { + t.mu.Lock() + defer t.mu.Unlock() + return t.RequestID +} + +// SetRequestID sets the request ID for the trace +func (t *Trace) SetRequestID(requestID string) { + t.mu.Lock() + defer t.mu.Unlock() + t.RequestID = requestID +} + // Reset clears the trace for reuse from pool func (t *Trace) Reset() { + t.mu.Lock() + defer t.mu.Unlock() + t.RequestID = "" t.TraceID = "" t.ParentID = "" t.RootSpan = nil + for i := range t.Spans { + t.Spans[i] = nil + } t.Spans = t.Spans[:0] t.StartTime = time.Time{} t.EndTime = time.Time{} t.Attributes = nil + for i := range t.PluginLogs { + t.PluginLogs[i] = PluginLogEntry{} + } + t.PluginLogs = t.PluginLogs[:0] +} + +// AppendPluginLogs appends plugin log entries to the trace in a thread-safe manner. +func (t *Trace) AppendPluginLogs(logs []PluginLogEntry) { + if len(logs) == 0 { + return + } + t.mu.Lock() + t.PluginLogs = append(t.PluginLogs, logs...) + t.mu.Unlock() } // Span represents a single operation within a trace diff --git a/core/schemas/tracer.go b/core/schemas/tracer.go index 820d88c9dc..8ad1de9ffa 100644 --- a/core/schemas/tracer.go +++ b/core/schemas/tracer.go @@ -38,7 +38,8 @@ type StreamAccumulatorResult struct { type Tracer interface { // CreateTrace creates a new trace with optional parent ID and returns the trace ID. // The parentID can be extracted from W3C traceparent headers for distributed tracing. - CreateTrace(parentID string) string + // The requestID is optional and can be used to identify the request. + CreateTrace(parentID string, requestID ...string) string // EndTrace completes a trace and returns the trace data for observation/export. // After this call, the trace is removed from active tracking and returned for cleanup. @@ -111,6 +112,10 @@ type Tracer interface { // The ctx parameter must contain the stream end indicator for proper final chunk detection. ProcessStreamingChunk(traceID string, isFinalChunk bool, result *BifrostResponse, err *BifrostError) *StreamAccumulatorResult + // AttachPluginLogs appends plugin log entries to the trace identified by traceID. + // Thread-safe. Should be called after plugin hooks complete, before trace completion. + AttachPluginLogs(traceID string, logs []PluginLogEntry) + // Stop releases resources associated with the tracer. // Should be called during shutdown to stop background goroutines. Stop() @@ -121,7 +126,7 @@ type Tracer interface { type NoOpTracer struct{} // CreateTrace returns an empty string (no trace created). -func (n *NoOpTracer) CreateTrace(_ string) string { return "" } +func (n *NoOpTracer) CreateTrace(_ string, _ ...string) string { return "" } // EndTrace returns nil (no trace to end). func (n *NoOpTracer) EndTrace(_ string) *Trace { return nil } @@ -176,6 +181,9 @@ func (n *NoOpTracer) ProcessStreamingChunk(_ string, _ bool, _ *BifrostResponse, return nil } +// AttachPluginLogs does nothing. +func (n *NoOpTracer) AttachPluginLogs(_ string, _ []PluginLogEntry) {} + // Stop does nothing. func (n *NoOpTracer) Stop() {} diff --git a/examples/plugins/hello-world/main.go b/examples/plugins/hello-world/main.go index 4d6de8609c..2f464e2a7d 100644 --- a/examples/plugins/hello-world/main.go +++ b/examples/plugins/hello-world/main.go @@ -6,6 +6,12 @@ import ( "github.com/maximhq/bifrost/core/schemas" ) +const ( + transportPreHookKey schemas.BifrostContextKey = "hello-world-plugin-transport-pre-hook" + transportPostHookKey schemas.BifrostContextKey = "hello-world-plugin-transport-post-hook" + preHookKey schemas.BifrostContextKey = "hello-world-plugin-pre-hook" +) + func Init(config any) error { fmt.Println("Init called") return nil @@ -23,8 +29,9 @@ func HTTPTransportPreHook(ctx *schemas.BifrostContext, req *schemas.HTTPRequest) // Modify request in-place req.Headers["x-hello-world-plugin"] = "transport-pre-hook-value" // Store value in context for PreLLMHook/PostLLMHook - ctx.SetValue(schemas.BifrostContextKey("hello-world-plugin-transport-pre-hook"), "transport-pre-hook-value") + ctx.SetValue(transportPreHookKey, "transport-pre-hook-value") // Return nil to continue processing, or return &schemas.HTTPResponse{} to short-circuit + ctx.Log(schemas.LogLevelInfo, "HTTPTransportPreHook called") return nil, nil } @@ -33,7 +40,8 @@ func HTTPTransportPostHook(ctx *schemas.BifrostContext, req *schemas.HTTPRequest // Modify response in-place resp.Headers["x-hello-world-plugin"] = "transport-post-hook-value" // Store value in context - ctx.SetValue(schemas.BifrostContextKey("hello-world-plugin-transport-post-hook"), "transport-post-hook-value") + ctx.Log(schemas.LogLevelInfo, "HTTPTransportPostHook called") + ctx.SetValue(transportPostHookKey, "transport-post-hook-value") // Return nil to continue processing return nil } @@ -41,6 +49,7 @@ func HTTPTransportPostHook(ctx *schemas.BifrostContext, req *schemas.HTTPRequest func HTTPTransportStreamChunkHook(ctx *schemas.BifrostContext, req *schemas.HTTPRequest, chunk *schemas.BifrostStreamChunk) (*schemas.BifrostStreamChunk, error) { fmt.Println("HTTPTransportStreamChunkHook called") // Modify chunk in-place + ctx.Log(schemas.LogLevelInfo, "HTTPTransportStreamChunkHook called") if chunk.BifrostChatResponse != nil && chunk.BifrostChatResponse.Choices != nil && len(chunk.BifrostChatResponse.Choices) > 0 && chunk.BifrostChatResponse.Choices[0].ChatStreamResponseChoice != nil && chunk.BifrostChatResponse.Choices[0].ChatStreamResponseChoice.Delta != nil && chunk.BifrostChatResponse.Choices[0].ChatStreamResponseChoice.Delta.Content != nil { *chunk.BifrostChatResponse.Choices[0].ChatStreamResponseChoice.Delta.Content += " - modified by hello-world-plugin" } @@ -49,19 +58,21 @@ func HTTPTransportStreamChunkHook(ctx *schemas.BifrostContext, req *schemas.HTTP } func PreLLMHook(ctx *schemas.BifrostContext, req *schemas.BifrostRequest) (*schemas.BifrostRequest, *schemas.LLMPluginShortCircuit, error) { - value1 := ctx.Value(schemas.BifrostContextKey("hello-world-plugin-transport-pre-hook")) + value1 := ctx.Value(transportPreHookKey) fmt.Println("value1:", value1) - ctx.SetValue(schemas.BifrostContextKey("hello-world-plugin-pre-hook"), "pre-hook-value") + ctx.SetValue(preHookKey, "pre-hook-value") + ctx.Log(schemas.LogLevelInfo, "PreLLMHook called") fmt.Println("PreLLMHook called") return req, nil, nil } func PostLLMHook(ctx *schemas.BifrostContext, resp *schemas.BifrostResponse, bifrostErr *schemas.BifrostError) (*schemas.BifrostResponse, *schemas.BifrostError, error) { fmt.Println("PostLLMHook called") - value1 := ctx.Value(schemas.BifrostContextKey("hello-world-plugin-transport-pre-hook")) + value1 := ctx.Value(transportPreHookKey) fmt.Println("value1:", value1) - value2 := ctx.Value(schemas.BifrostContextKey("hello-world-plugin-pre-hook")) + value2 := ctx.Value(preHookKey) fmt.Println("value2:", value2) + ctx.Log(schemas.LogLevelInfo, "PostLLMHook called") return resp, bifrostErr, nil } diff --git a/framework/logstore/migrations.go b/framework/logstore/migrations.go index c8dc780c88..27c0865240 100644 --- a/framework/logstore/migrations.go +++ b/framework/logstore/migrations.go @@ -222,6 +222,9 @@ func triggerMigrations(ctx context.Context, db *gorm.DB) error { if err := migrationAddLogsAndDashboardPerformanceIndexes(ctx, db); err != nil { return err } + if err := migrationAddPluginLogsColumn(ctx, db); err != nil { + return err + } return nil } @@ -2182,3 +2185,37 @@ func ensurePerformanceIndexes(ctx context.Context, db *gorm.DB) error { return nil } + +// migrationAddPluginLogsColumn adds the plugin_logs column to the logs table. +func migrationAddPluginLogsColumn(ctx context.Context, db *gorm.DB) error { + opts := *migrator.DefaultOptions + opts.UseTransaction = true + m := migrator.New(db, &opts, []*migrator.Migration{{ + ID: "logs_add_plugin_logs_column", + Migrate: func(tx *gorm.DB) error { + tx = tx.WithContext(ctx) + migrator := tx.Migrator() + if !migrator.HasColumn(&Log{}, "plugin_logs") { + if err := migrator.AddColumn(&Log{}, "plugin_logs"); err != nil { + return err + } + } + return nil + }, + Rollback: func(tx *gorm.DB) error { + tx = tx.WithContext(ctx) + migrator := tx.Migrator() + if migrator.HasColumn(&Log{}, "plugin_logs") { + if err := migrator.DropColumn(&Log{}, "plugin_logs"); err != nil { + return err + } + } + return nil + }, + }}) + err := m.Migrate() + if err != nil { + return fmt.Errorf("error while adding plugin logs column: %s", err.Error()) + } + return nil +} diff --git a/framework/logstore/rdb.go b/framework/logstore/rdb.go index f77d5b4701..fbeb7e9eaa 100644 --- a/framework/logstore/rdb.go +++ b/framework/logstore/rdb.go @@ -1961,6 +1961,20 @@ func (s *RDBLogStore) FindByID(ctx context.Context, id string) (*Log, error) { return &log, nil } +// IsLogEntryPresent checks if a log entry is present in the database. +// Here we dont load entire log entry in memory - just check if it exists. +func (s *RDBLogStore) IsLogEntryPresent(ctx context.Context, id string) (bool, error) { + var log Log + err := s.db.WithContext(ctx).Select("id").Where("id = ?", id).First(&log).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return false, nil + } + return false, err + } + return true, nil +} + // FindFirst gets a log entry from the database. func (s *RDBLogStore) FindFirst(ctx context.Context, query any, fields ...string) (*Log, error) { var log Log diff --git a/framework/logstore/store.go b/framework/logstore/store.go index 6078ab3eea..5942d93db2 100644 --- a/framework/logstore/store.go +++ b/framework/logstore/store.go @@ -24,6 +24,7 @@ type LogStore interface { CreateIfNotExists(ctx context.Context, entry *Log) error BatchCreateIfNotExists(ctx context.Context, entries []*Log) error FindByID(ctx context.Context, id string) (*Log, error) + IsLogEntryPresent(ctx context.Context, id string) (bool, error) FindFirst(ctx context.Context, query any, fields ...string) (*Log, error) FindAll(ctx context.Context, query any, fields ...string) ([]*Log, error) FindAllDistinct(ctx context.Context, query any, fields ...string) ([]*Log, error) diff --git a/framework/logstore/tables.go b/framework/logstore/tables.go index 2da8c0ec0a..a26cec19ee 100644 --- a/framework/logstore/tables.go +++ b/framework/logstore/tables.go @@ -128,6 +128,7 @@ type Log struct { PassthroughRequestBody string `gorm:"type:text" json:"passthrough_request_body,omitempty"` // Raw body for passthrough requests (UTF-8) PassthroughResponseBody string `gorm:"type:text" json:"passthrough_response_body,omitempty"` // Raw body for passthrough responses (UTF-8) RoutingEngineLogs string `gorm:"type:text" json:"routing_engine_logs,omitempty"` // Formatted routing engine decision logs + PluginLogs string `gorm:"type:text" json:"plugin_logs,omitempty"` // JSON serialized plugin log entries grouped by plugin name Metadata *string `gorm:"type:text" json:"-"` // JSON serialized map[string]interface{} IsLargePayloadRequest bool `gorm:"default:false" json:"is_large_payload_request"` IsLargePayloadResponse bool `gorm:"default:false" json:"is_large_payload_response"` @@ -1169,7 +1170,7 @@ type MCPHistogramBucket struct { // MCPHistogramResult represents the MCP tool call volume histogram query result type MCPHistogramResult struct { Buckets []MCPHistogramBucket `json:"buckets"` - BucketSizeSeconds int64 `json:"bucket_size_seconds"` + BucketSizeSeconds int64 `json:"bucket_size_seconds"` } // MCPCostHistogramBucket represents a single time bucket for MCP cost data diff --git a/framework/tracing/store.go b/framework/tracing/store.go index d367c2aca8..0571f2605d 100644 --- a/framework/tracing/store.go +++ b/framework/tracing/store.go @@ -72,7 +72,7 @@ func NewTraceStore(ttl time.Duration, logger schemas.Logger) *TraceStore { // If empty, a new trace ID will be generated. // Note: The parent span ID (for linking to upstream spans) is handled separately // via context in StartSpan, not stored on the trace itself. -func (s *TraceStore) CreateTrace(inheritedTraceID string) string { +func (s *TraceStore) CreateTrace(inheritedTraceID string, requestID ...string) string { trace := s.tracePool.Get().(*schemas.Trace) // Reset and initialize the trace if inheritedTraceID != "" { @@ -84,6 +84,9 @@ func (s *TraceStore) CreateTrace(inheritedTraceID string) string { // Parent-child relationships are between spans, not traces. // The root span's ParentID is set in StartSpan from context. trace.ParentID = "" + if len(requestID) > 0 { + trace.RequestID = requestID[0] + } trace.StartTime = time.Now() trace.EndTime = time.Time{} trace.RootSpan = nil @@ -114,6 +117,15 @@ func (s *TraceStore) GetTrace(traceID string) *schemas.Trace { return nil } +// SetRequestID sets the request ID for the trace +func (s *TraceStore) SetRequestID(traceID string, requestID string) { + trace := s.GetTrace(traceID) + if trace == nil { + return + } + trace.SetRequestID(requestID) +} + // CompleteTrace marks the trace as complete, removes it from store, and returns it for flushing func (s *TraceStore) CompleteTrace(traceID string) *schemas.Trace { // Clear any deferred span for this trace diff --git a/framework/tracing/tracer.go b/framework/tracing/tracer.go index c5088b17ed..ce4374f597 100644 --- a/framework/tracing/tracer.go +++ b/framework/tracing/tracer.go @@ -32,8 +32,8 @@ func NewTracer(store *TraceStore, pricingManager *modelcatalog.ModelCatalog, log } // CreateTrace creates a new trace with optional parent ID and returns the trace ID. -func (t *Tracer) CreateTrace(parentID string) string { - return t.store.CreateTrace(parentID) +func (t *Tracer) CreateTrace(parentID string, requestID ...string) string { + return t.store.CreateTrace(parentID, requestID...) } // EndTrace completes a trace and returns the trace data for observation/export. @@ -329,6 +329,18 @@ func (t *Tracer) GetAccumulator() *streaming.Accumulator { return t.accumulator } +// AttachPluginLogs appends plugin log entries to the trace identified by traceID. +func (t *Tracer) AttachPluginLogs(traceID string, logs []schemas.PluginLogEntry) { + if len(logs) == 0 || traceID == "" { + return + } + trace := t.store.GetTrace(traceID) + if trace == nil { + return + } + trace.AppendPluginLogs(logs) +} + // Stop stops the tracer and releases its resources. // This stops the internal TraceStore's cleanup goroutine. func (t *Tracer) Stop() { diff --git a/plugins/logging/main.go b/plugins/logging/main.go index 160b67d064..a4141531ff 100644 --- a/plugins/logging/main.go +++ b/plugins/logging/main.go @@ -6,6 +6,7 @@ package logging import ( "context" "fmt" + "math" "strings" "sync" "sync/atomic" @@ -99,6 +100,7 @@ func applyLargePayloadPreviewsToEntry(ctx *schemas.BifrostContext, entry *logsto } } +// scheduleDeferredUsageUpdate schedules a deferred usage update for the request. func (p *LoggerPlugin) scheduleDeferredUsageUpdate(ctx *schemas.BifrostContext, requestID string, usageAlreadyPresent bool) { if usageAlreadyPresent || ctx == nil { return @@ -108,7 +110,6 @@ func (p *LoggerPlugin) scheduleDeferredUsageUpdate(ctx *schemas.BifrostContext, if !ok || deferredChan == nil { return } - p.wg.Add(1) go func() { defer p.wg.Done() @@ -127,7 +128,6 @@ func (p *LoggerPlugin) scheduleDeferredUsageUpdate(ctx *schemas.BifrostContext, p.logger.Warn("deferred usage update dropped for request %s: semaphore full", requestID) return } - usageUpdates := map[string]interface{}{ "prompt_tokens": deferredUsage.PromptTokens, "completion_tokens": deferredUsage.CompletionTokens, @@ -137,6 +137,27 @@ func (p *LoggerPlugin) scheduleDeferredUsageUpdate(ctx *schemas.BifrostContext, if serErr := tempEntry.SerializeFields(); serErr == nil { usageUpdates["token_usage"] = tempEntry.TokenUsage } + + // Check if log entry present in the store + // exponential backoff with jitter and 3 retries + // then fail + var found bool + var findErr error + for i := 0; i < 3; i++ { + found, findErr = p.store.IsLogEntryPresent(p.ctx, requestID) + if findErr != nil { + p.logger.Warn("failed to check if log entry is present for request %s: %v", requestID, findErr) + continue + } + if found { + break + } + time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second * 2) + } + if !found { + p.logger.Warn("log entry not found for request %s after 3 retries. failed to update deferred usage for large payload request", requestID) + return + } if updErr := p.store.Update(p.ctx, requestID, usageUpdates); updErr != nil { p.logger.Warn("failed to update deferred usage for request %s: %v", requestID, updErr) } @@ -223,7 +244,8 @@ type LoggerPlugin struct { cleanupTicker *time.Ticker // Ticker for cleaning up old processing logs logMsgPool sync.Pool // Pool for reusing LogMessage structs updateDataPool sync.Pool // Pool for reusing UpdateLogData structs - pendingLogs sync.Map // Maps requestID -> *PendingLogData (PreLLMHook input data awaiting PostLLMHook) + pendingLogsEntries sync.Map // Maps requestID -> *PendingLogData (PreLLMHook input data awaiting PostLLMHook) + pendingLogsToInject sync.Map // Maps traceID -> *pendingInjectEntries (log entries to inject, supports multiple per trace) writeQueue chan *writeQueueEntry // Buffered channel for batch write queue closed atomic.Bool // Set during cleanup to prevent sends on closed writeQueue deferredUsageSem chan struct{} // Limits concurrent deferred usage DB updates @@ -551,7 +573,7 @@ func (p *LoggerPlugin) PreLLMHook(ctx *schemas.BifrostContext, req *schemas.Bifr CreatedAt: time.Now(), Status: "processing", } - p.pendingLogs.Store(effectiveRequestID, pending) + p.pendingLogsEntries.Store(effectiveRequestID, pending) // Call callback synchronously for immediate UI feedback (WebSocket "processing" notification). // The entry does not exist in the DB yet - it will be written when PostLLMHook fires. p.mu.Lock() @@ -627,7 +649,7 @@ func (p *LoggerPlugin) PostLLMHook(ctx *schemas.BifrostContext, result *schemas. routingEngineLogs := formatRoutingEngineLogs(ctx.GetRoutingEngineLogs()) // Retrieve pending input data from PreLLMHook - pendingVal, hasPending := p.pendingLogs.LoadAndDelete(requestID) + pendingVal, hasPending := p.pendingLogsEntries.LoadAndDelete(requestID) if !hasPending { // If we have an error (e.g., cancellation/timeout), still write a minimal error entry // so the error is visible in logs. Without PreLLMHook's DB insert, silently returning @@ -648,7 +670,7 @@ func (p *LoggerPlugin) PostLLMHook(ctx *schemas.BifrostContext, result *schemas. } entry.ErrorDetailsParsed = bifrostErr applyLargePayloadPreviewsToEntry(ctx, entry) - p.enqueueLogEntry(entry, p.makePostWriteCallback(nil)) + p.storeOrEnqueueEntry(ctx, entry, p.makePostWriteCallback(nil)) } else { p.logger.Warn("no pending log data found for request %s, skipping log write", requestID) } @@ -699,7 +721,7 @@ func (p *LoggerPlugin) PostLLMHook(ctx *schemas.BifrostContext, result *schemas. } } applyLargePayloadPreviewsToEntry(ctx, entry) - p.enqueueLogEntry(entry, p.makePostWriteCallback(nil)) + p.storeOrEnqueueEntry(ctx, entry, p.makePostWriteCallback(nil)) p.scheduleDeferredUsageUpdate(ctx, requestID, entry.TokenUsageParsed != nil) return result, bifrostErr, nil } @@ -746,7 +768,7 @@ func (p *LoggerPlugin) PostLLMHook(ctx *schemas.BifrostContext, result *schemas. tracer.CleanupStreamAccumulator(traceID) } - p.enqueueLogEntry(entry, p.makePostWriteCallback(nil)) + p.storeOrEnqueueEntry(ctx, entry, p.makePostWriteCallback(nil)) p.scheduleDeferredUsageUpdate(ctx, requestID, entry.TokenUsageParsed != nil) return result, bifrostErr, nil } @@ -784,24 +806,24 @@ func (p *LoggerPlugin) PostLLMHook(ctx *schemas.BifrostContext, result *schemas. } } - p.enqueueLogEntry(entry, p.makePostWriteCallback(func(updatedEntry *logstore.Log) { - updatedEntry.SelectedKey = &schemas.Key{ - ID: updatedEntry.SelectedKeyID, - Name: updatedEntry.SelectedKeyName, - } - if updatedEntry.VirtualKeyID != nil && updatedEntry.VirtualKeyName != nil { - updatedEntry.VirtualKey = &tables.TableVirtualKey{ - ID: *updatedEntry.VirtualKeyID, - Name: *updatedEntry.VirtualKeyName, - } + // Pre-apply denormalized fields for WebSocket callback enrichment + entry.SelectedKey = &schemas.Key{ + ID: entry.SelectedKeyID, + Name: entry.SelectedKeyName, + } + if entry.VirtualKeyID != nil && entry.VirtualKeyName != nil { + entry.VirtualKey = &tables.TableVirtualKey{ + ID: *entry.VirtualKeyID, + Name: *entry.VirtualKeyName, } - if updatedEntry.RoutingRuleID != nil && updatedEntry.RoutingRuleName != nil { - updatedEntry.RoutingRule = &tables.TableRoutingRule{ - ID: *updatedEntry.RoutingRuleID, - Name: *updatedEntry.RoutingRuleName, - } + } + if entry.RoutingRuleID != nil && entry.RoutingRuleName != nil { + entry.RoutingRule = &tables.TableRoutingRule{ + ID: *entry.RoutingRuleID, + Name: *entry.RoutingRuleName, } - })) + } + p.storeOrEnqueueEntry(ctx, entry, p.makePostWriteCallback(nil)) p.scheduleDeferredUsageUpdate(ctx, requestID, entry.TokenUsageParsed != nil) return result, bifrostErr, nil } @@ -828,6 +850,60 @@ func (p *LoggerPlugin) Cleanup() error { return nil } +// storeOrEnqueueEntry stores a log entry in pendingLogs keyed by traceID for later +// retrieval by Inject(), or enqueues directly if no traceID is available (Go SDK path). +// Multiple entries per traceID are supported (e.g. fallback/retry attempts within the same trace). +func (p *LoggerPlugin) storeOrEnqueueEntry(ctx *schemas.BifrostContext, entry *logstore.Log, callback func(entry *logstore.Log)) { + traceID, _ := ctx.Value(schemas.BifrostContextKeyTraceID).(string) + if traceID != "" { + // Append to slice for Inject() to pick up — supports multiple attempts per trace + existing, loaded := p.pendingLogsToInject.LoadOrStore(traceID, &pendingInjectEntries{entries: []*logstore.Log{entry}, createdAt: time.Now()}) + if !loaded { + return + } + pending := existing.(*pendingInjectEntries) + pending.mu.Lock() + pending.entries = append(pending.entries, entry) + pending.mu.Unlock() + } else { + // Fallback: no tracing (Go SDK path), enqueue directly + p.enqueueLogEntry(entry, callback) + } +} + +// Inject receives a completed trace and writes the log entries with plugin logs to DB. +// This implements the ObservabilityPlugin interface. +func (p *LoggerPlugin) Inject(_ context.Context, trace *schemas.Trace) error { + if trace == nil { + return nil + } + // Retrieve pending log entries built by PostLLMHook (stored by traceID) + entryVal, ok := p.pendingLogsToInject.LoadAndDelete(trace.TraceID) + if !ok { + return nil + } + pending, ok := entryVal.(*pendingInjectEntries) + if !ok { + return nil + } + + // Serialize plugin logs once for all entries + var pluginLogsJSON string + if len(trace.PluginLogs) > 0 { + grouped := schemas.GroupPluginLogsByName(trace.PluginLogs) + if data, err := sonic.Marshal(grouped); err == nil { + pluginLogsJSON = string(data) + } + } + + // Enqueue each log entry (supports multiple attempts per trace) + for _, entry := range pending.entries { + entry.PluginLogs = pluginLogsJSON + p.enqueueLogEntry(entry, p.makePostWriteCallback(nil)) + } + return nil +} + // MCP Plugin Interface Implementation // SetMCPToolLogCallback sets a callback function that will be called for each MCP tool log entry diff --git a/plugins/logging/operations_test.go b/plugins/logging/operations_test.go index daa2ec91ff..27d4436818 100644 --- a/plugins/logging/operations_test.go +++ b/plugins/logging/operations_test.go @@ -243,3 +243,71 @@ func TestUpdateStreamingLogEntryPreservesResponsesInputContentSummary(t *testing t.Fatalf("expected content summary to avoid overwriting with streamed responses output-only data, got %q", logEntry.ContentSummary) } } + +func TestStoreOrEnqueueRetryPreservesAllEntries(t *testing.T) { + // Simulate fallback/retry scenario where multiple PostLLMHook calls + // store entries under the same traceID. All entries must be preserved. + plugin := &LoggerPlugin{ + logger: testLogger{}, + writeQueue: make(chan *writeQueueEntry, 10), + } + + traceID := "trace-retry-test" + ctx := schemas.NewBifrostContext(context.Background(), schemas.NoDeadline) + ctx.SetValue(schemas.BifrostContextKeyTraceID, traceID) + + // Simulate 3 retry attempts storing entries under the same traceID + entry1 := &logstore.Log{ID: "req-attempt-1", Model: "gpt-4o"} + entry2 := &logstore.Log{ID: "req-attempt-2", Model: "gpt-4o"} + entry3 := &logstore.Log{ID: "req-attempt-3", Model: "claude-3-5-sonnet"} + + plugin.storeOrEnqueueEntry(ctx, entry1, nil) + plugin.storeOrEnqueueEntry(ctx, entry2, nil) + plugin.storeOrEnqueueEntry(ctx, entry3, nil) + + // Verify all 3 entries are stored + val, ok := plugin.pendingLogsToInject.Load(traceID) + if !ok { + t.Fatal("expected pending entries for traceID, got none") + } + pending, ok := val.(*pendingInjectEntries) + if !ok { + t.Fatal("expected *pendingInjectEntries type") + } + if len(pending.entries) != 3 { + t.Fatalf("expected 3 entries, got %d", len(pending.entries)) + } + if pending.entries[0].ID != "req-attempt-1" || pending.entries[1].ID != "req-attempt-2" || pending.entries[2].ID != "req-attempt-3" { + t.Fatalf("entries not in expected order: %v, %v, %v", pending.entries[0].ID, pending.entries[1].ID, pending.entries[2].ID) + } + + // Now test Inject flushes all entries with plugin logs attached + trace := &schemas.Trace{ + TraceID: traceID, + PluginLogs: []schemas.PluginLogEntry{ + {PluginName: "hello-world", Level: schemas.LogLevelInfo, Message: "test log"}, + }, + } + + if err := plugin.Inject(context.Background(), trace); err != nil { + t.Fatalf("Inject() error = %v", err) + } + + // Verify all 3 entries were enqueued to writeQueue + if len(plugin.writeQueue) != 3 { + t.Fatalf("expected 3 entries in writeQueue, got %d", len(plugin.writeQueue)) + } + + // Verify plugin logs were attached to each entry + for i := 0; i < 3; i++ { + qe := <-plugin.writeQueue + if qe.log.PluginLogs == "" { + t.Fatalf("entry %d: expected PluginLogs to be set", i) + } + } + + // Verify pendingLogsToInject was cleaned up + if _, ok := plugin.pendingLogsToInject.Load(traceID); ok { + t.Fatal("expected pendingLogsToInject to be cleaned up after Inject") + } +} diff --git a/plugins/logging/writer.go b/plugins/logging/writer.go index e2a5861655..92c6d3ce98 100644 --- a/plugins/logging/writer.go +++ b/plugins/logging/writer.go @@ -1,6 +1,7 @@ package logging import ( + "sync" "time" "github.com/maximhq/bifrost/framework/logstore" @@ -36,6 +37,14 @@ type PendingLogData struct { CreatedAt time.Time // For cleanup of stale entries } +// pendingInjectEntries wraps a slice of log entries so it can be used with sync.Map. +// The mutex protects concurrent appends to the entries slice within the same traceID. +type pendingInjectEntries struct { + mu sync.Mutex + entries []*logstore.Log + createdAt time.Time +} + // writeQueueEntry is an entry pushed to the batch write queue. type writeQueueEntry struct { log *logstore.Log // Complete log entry ready for INSERT @@ -167,10 +176,18 @@ func (p *LoggerPlugin) processBatch(batch []*writeQueueEntry) { // never fires for a request (e.g., request was cancelled before reaching the provider). func (p *LoggerPlugin) cleanupStalePendingLogs() { cutoff := time.Now().Add(-pendingLogTTL) - p.pendingLogs.Range(func(key, value any) bool { + p.pendingLogsEntries.Range(func(key, value any) bool { if pending, ok := value.(*PendingLogData); ok { if pending.CreatedAt.Before(cutoff) { - p.pendingLogs.Delete(key) + p.pendingLogsEntries.Delete(key) + } + } + return true + }) + p.pendingLogsToInject.Range(func(key, value any) bool { + if pending, ok := value.(*pendingInjectEntries); ok { + if pending.createdAt.Before(cutoff) { + p.pendingLogsToInject.Delete(key) } } return true diff --git a/transports/bifrost-http/handlers/middlewares.go b/transports/bifrost-http/handlers/middlewares.go index 18d3ce9c57..2f0a12f8de 100644 --- a/transports/bifrost-http/handlers/middlewares.go +++ b/transports/bifrost-http/handlers/middlewares.go @@ -12,6 +12,7 @@ import ( "sync/atomic" "time" + "github.com/google/uuid" providerUtils "github.com/maximhq/bifrost/core/providers/utils" "github.com/maximhq/bifrost/core/schemas" "github.com/maximhq/bifrost/framework/configstore" @@ -90,15 +91,15 @@ func CorsMiddleware(config *lib.Config) schemas.BifrostHTTPMiddleware { ctx.Response.Header.Set("Access-Control-Allow-Origin", origin) ctx.Response.Header.Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, PATCH, OPTIONS, HEAD") ctx.Response.Header.Set("Access-Control-Allow-Headers", strings.Join(allowedHeaders, ", ")) - // Set Allow-Credentials for credentialed requests. Only skip when wildcard - // is configured AND the origin was matched by the wildcard (not by localhost rule - // or explicit listing). Localhost origins and explicitly listed origins always - // get credentials support since we return the specific origin. - if !slices.Contains(config.ClientConfig.AllowedOrigins, "*") || - isLocalhostOrigin(origin) || - slices.Contains(config.ClientConfig.AllowedOrigins, origin) { - ctx.Response.Header.Set("Access-Control-Allow-Credentials", "true") - } + // Set Allow-Credentials for credentialed requests. Only skip when wildcard + // is configured AND the origin was matched by the wildcard (not by localhost rule + // or explicit listing). Localhost origins and explicitly listed origins always + // get credentials support since we return the specific origin. + if !slices.Contains(config.ClientConfig.AllowedOrigins, "*") || + isLocalhostOrigin(origin) || + slices.Contains(config.ClientConfig.AllowedOrigins, origin) { + ctx.Response.Header.Set("Access-Control-Allow-Credentials", "true") + } ctx.Response.Header.Set("Access-Control-Max-Age", "86400") // Vary: Origin tells caches that the response varies based on the Origin // request header, preventing incorrect CORS headers from being served. @@ -302,20 +303,33 @@ func TransportInterceptorMiddleware(config *lib.Config) schemas.BifrostHTTPMiddl fasthttpToHTTPRequest(ctx, req) // Run plugin interceptors for _, plugin := range plugins { - resp, err := plugin.HTTPTransportPreHook(bifrostCtx, req) + pluginName := plugin.GetName() + pluginCtx := bifrostCtx.WithPluginScope(&pluginName) + resp, err := plugin.HTTPTransportPreHook(pluginCtx, req) + pluginCtx.ReleasePluginScope() if err != nil { - // Short-circuit with error + // Short-circuit with error — drain plugin logs before returning + if logs := bifrostCtx.DrainPluginLogs(); len(logs) > 0 { + ctx.SetUserValue(schemas.BifrostContextKeyTransportPluginLogs, logs) + } ctx.SetStatusCode(fasthttp.StatusInternalServerError) ctx.SetBodyString(err.Error()) return } if resp != nil { - // Short-circuit with response + // Short-circuit with response — drain plugin logs before returning + if logs := bifrostCtx.DrainPluginLogs(); len(logs) > 0 { + ctx.SetUserValue(schemas.BifrostContextKeyTransportPluginLogs, logs) + } applyHTTPResponseToCtx(ctx, resp) return } // If we got here, the plugin may have modified req in-place } + // Drain pre-hook plugin logs and store on fasthttp context for trace attachment + if preHookLogs := bifrostCtx.DrainPluginLogs(); len(preHookLogs) > 0 { + ctx.SetUserValue(schemas.BifrostContextKeyTransportPluginLogs, preHookLogs) + } // Apply modifications back to fasthttp context applyHTTPRequestToCtx(ctx, req) // Adding user values @@ -324,31 +338,70 @@ func TransportInterceptorMiddleware(config *lib.Config) schemas.BifrostHTTPMiddl } next(ctx) - // Skip HTTPTransportPostHook for streaming responses - // Streaming handlers set DeferTraceCompletion and use StreamChunkInterceptor for per-chunk hooks + // For streaming responses, store a callback to run post-hooks after the stream ends. + // The streaming handler calls this before traceCompleter. if deferred, ok := ctx.UserValue(schemas.BifrostContextKeyDeferTraceCompletion).(bool); ok && deferred { + ctx.SetUserValue(schemas.BifrostContextKeyTransportPostHookCompleter, func() { + runTransportPostHooks(ctx, plugins, bifrostCtx) + }) return } - // Acquire pooled response for post-hooks (non-streaming only) - httpResp := schemas.AcquireHTTPResponse() - defer schemas.ReleaseHTTPResponse(httpResp) - fasthttpResponseToHTTPResponse(ctx, httpResp) - // Run http post-hooks in reverse order - for i := len(plugins) - 1; i >= 0; i-- { - plugin := plugins[i] - err := plugin.HTTPTransportPostHook(bifrostCtx, req, httpResp) - if err != nil { - logger.Warn("error in HTTPTransportPostHook for plugin %s: %s", plugin.GetName(), err.Error()) - // Short-circuit with response - applyHTTPResponseToCtx(ctx, httpResp) - return + runTransportPostHooks(ctx, plugins, bifrostCtx) + } + } +} + +// runTransportPostHooks runs HTTPTransportPostHook for all plugins in reverse order, +// drains plugin logs, and applies the response back to the fasthttp context. +// Used for both non-streaming (inline) and streaming (deferred callback) paths. +// +// Transport-level plugin logs are stored in fasthttp UserValues (keyed by +// BifrostContextKeyTransportPluginLogs) rather than directly on BifrostContext, +// because transport hooks operate at the fasthttp layer before/after the core +// BifrostContext lifecycle. These logs are merged into the trace by the +// TracingMiddleware at trace completion, alongside core-level plugin logs +// which travel through BifrostContext → Trace → AttachPluginLogs. +func runTransportPostHooks(ctx *fasthttp.RequestCtx, plugins []schemas.HTTPTransportPlugin, bifrostCtx *schemas.BifrostContext) { + httpResp := schemas.AcquireHTTPResponse() + defer schemas.ReleaseHTTPResponse(httpResp) + fasthttpResponseToHTTPResponse(ctx, httpResp) + + // Build request from current fasthttp state (original pooled req may have been released) + req := schemas.AcquireHTTPRequest() + defer schemas.ReleaseHTTPRequest(req) + fasthttpToHTTPRequest(ctx, req) + + // Run http post-hooks in reverse order + for i := len(plugins) - 1; i >= 0; i-- { + plugin := plugins[i] + pluginName := plugin.GetName() + pluginCtx := bifrostCtx.WithPluginScope(&pluginName) + err := plugin.HTTPTransportPostHook(pluginCtx, req, httpResp) + pluginCtx.ReleasePluginScope() + if err != nil { + logger.Warn("error in HTTPTransportPostHook for plugin %s: %s", pluginName, err.Error()) + // Drain plugin logs before returning on error + if postHookLogs := bifrostCtx.DrainPluginLogs(); len(postHookLogs) > 0 { + if existing, ok := ctx.UserValue(schemas.BifrostContextKeyTransportPluginLogs).([]schemas.PluginLogEntry); ok { + ctx.SetUserValue(schemas.BifrostContextKeyTransportPluginLogs, append(existing, postHookLogs...)) + } else { + ctx.SetUserValue(schemas.BifrostContextKeyTransportPluginLogs, postHookLogs) } } - // Apply modifications back to fasthttp context applyHTTPResponseToCtx(ctx, httpResp) + return + } + } + // Drain post-hook plugin logs and merge with pre-hook logs + if postHookLogs := bifrostCtx.DrainPluginLogs(); len(postHookLogs) > 0 { + if existing, ok := ctx.UserValue(schemas.BifrostContextKeyTransportPluginLogs).([]schemas.PluginLogEntry); ok { + ctx.SetUserValue(schemas.BifrostContextKeyTransportPluginLogs, append(existing, postHookLogs...)) + } else { + ctx.SetUserValue(schemas.BifrostContextKeyTransportPluginLogs, postHookLogs) } } + applyHTTPResponseToCtx(ctx, httpResp) } // getBifrostContextFromFastHTTP gets or creates a BifrostContext from fasthttp context. @@ -766,14 +819,19 @@ func (m *TracingMiddleware) Middleware() schemas.BifrostHTTPMiddleware { next(ctx) return } + requestID := string(ctx.Request.Header.Peek("x-request-id")) + if requestID == "" { + requestID = uuid.New().String() + // Injecting this back to be picked up by the next middleware + ctx.Request.Header.Set("x-request-id", requestID) + } // Extract trace ID from W3C traceparent header (if present) // This is the 32-char trace ID that links all spans in a distributed trace inheritedTraceID := tracing.ExtractParentID(&ctx.Request.Header) // Create trace in store - only ID returned (trace data stays in store) - traceID := m.tracer.Load().CreateTrace(inheritedTraceID) + traceID := m.tracer.Load().CreateTrace(inheritedTraceID, requestID) // Only trace ID goes into context (lightweight, no bloat) ctx.SetUserValue(schemas.BifrostContextKeyTraceID, traceID) - // Extract parent span ID from W3C traceparent header (if present) // This is the 16-char span ID from the upstream service that should be // set as the ParentID of our root span for proper trace linking in Datadog/etc. @@ -784,6 +842,14 @@ func (m *TracingMiddleware) Middleware() schemas.BifrostHTTPMiddleware { // Store a trace completion callback for streaming handlers to use ctx.SetUserValue(schemas.BifrostContextKeyTraceCompleter, func() { + // Run deferred HTTPTransportPostHook for streaming responses + if postHookCompleter, ok := ctx.UserValue(schemas.BifrostContextKeyTransportPostHookCompleter).(func()); ok { + postHookCompleter() + } + // Attach transport plugin logs before completing the trace (streaming path) + if transportLogs, ok := ctx.UserValue(schemas.BifrostContextKeyTransportPluginLogs).([]schemas.PluginLogEntry); ok && len(transportLogs) > 0 { + m.tracer.Load().AttachPluginLogs(traceID, transportLogs) + } m.completeAndFlushTrace(traceID) }) // Create root span for the HTTP request @@ -812,6 +878,10 @@ func (m *TracingMiddleware) Middleware() schemas.BifrostHTTPMiddleware { if deferred, ok := ctx.UserValue(schemas.BifrostContextKeyDeferTraceCompletion).(bool); ok && deferred { return } + // Attach transport plugin logs to trace before completion + if transportLogs, ok := ctx.UserValue(schemas.BifrostContextKeyTransportPluginLogs).([]schemas.PluginLogEntry); ok && len(transportLogs) > 0 { + m.tracer.Load().AttachPluginLogs(traceID, transportLogs) + } // After response written - async flush m.completeAndFlushTrace(traceID) }() diff --git a/transports/bifrost-http/lib/config.go b/transports/bifrost-http/lib/config.go index 31bd656598..91bdc87629 100644 --- a/transports/bifrost-http/lib/config.go +++ b/transports/bifrost-http/lib/config.go @@ -2402,9 +2402,19 @@ type pluginChunkInterceptor struct { // Plugins are called in reverse order (same as PostHook) so modifications chain correctly. func (i *pluginChunkInterceptor) InterceptChunk(ctx *schemas.BifrostContext, req *schemas.HTTPRequest, stream *schemas.BifrostStreamChunk) (*schemas.BifrostStreamChunk, error) { for j := len(i.plugins) - 1; j >= 0; j-- { - modified, err := i.plugins[j].HTTPTransportStreamChunkHook(ctx, req, stream) + plugin := i.plugins[j] + pluginName := plugin.GetName() + var ( + modified *schemas.BifrostStreamChunk + err error + ) + func() { + pluginCtx := ctx.WithPluginScope(&pluginName) + defer pluginCtx.ReleasePluginScope() + modified, err = plugin.HTTPTransportStreamChunkHook(pluginCtx, req, stream) + }() if err != nil { - return modified, fmt.Errorf("failed to intercept chunk with plugin %s: %w", i.plugins[j].GetName(), err) + return modified, fmt.Errorf("failed to intercept chunk with plugin %s: %w", pluginName, err) } if modified == nil { return nil, nil // Plugin wants to skip this chunk diff --git a/transports/bifrost-http/server/server.go b/transports/bifrost-http/server/server.go index c0e973d6a3..b873c57092 100644 --- a/transports/bifrost-http/server/server.go +++ b/transports/bifrost-http/server/server.go @@ -1355,8 +1355,9 @@ func (s *BifrostHTTPServer) Bootstrap(ctx context.Context) error { if ctx.Value(schemas.BifrostContextKeyIsEnterprise) == nil && s.AuthMiddleware != nil { inferenceMiddlewares = append(inferenceMiddlewares, s.AuthMiddleware.InferenceMiddleware()) } - // Registering inference middlewares - inferenceMiddlewares = append([]schemas.BifrostHTTPMiddleware{handlers.TransportInterceptorMiddleware(s.Config)}, inferenceMiddlewares...) + // Once auth is done we will first add the Tracing middleware + // Always add tracing middleware when tracer is enabled - it creates traces and sets traceID in context + // The observability plugins are optional (can be empty if only logging is enabled) // Curating observability plugins observabilityPlugins := s.CollectObservabilityPlugins() // This enables the central streaming accumulator for both use cases @@ -1364,10 +1365,13 @@ func (s *BifrostHTTPServer) Bootstrap(ctx context.Context) error { traceStore := tracing.NewTraceStore(60*time.Minute, logger) tracer := tracing.NewTracer(traceStore, s.Config.ModelCatalog, logger) s.Client.SetTracer(tracer) - // Always add tracing middleware when tracer is enabled - it creates traces and sets traceID in context - // The observability plugins are optional (can be empty if only logging is enabled) s.TracingMiddleware = handlers.NewTracingMiddleware(tracer, observabilityPlugins) + // TransportInterceptor must be inside TracingMiddleware so that the tracing defer + // runs AFTER transport post-hooks (capturing HTTPTransportPostHook plugin logs). + // Order: Tracing.pre → TransportInterceptor.pre → handler → TransportInterceptor.post → Tracing.defer + inferenceMiddlewares = append([]schemas.BifrostHTTPMiddleware{handlers.TransportInterceptorMiddleware(s.Config)}, inferenceMiddlewares...) inferenceMiddlewares = append([]schemas.BifrostHTTPMiddleware{s.TracingMiddleware.Middleware()}, inferenceMiddlewares...) + err = s.RegisterInferenceRoutes(s.Ctx, inferenceMiddlewares...) if err != nil { if s.WSTicketStore != nil { diff --git a/ui/app/workspace/logs/sheets/logDetailsSheet.tsx b/ui/app/workspace/logs/sheets/logDetailsSheet.tsx index 9e07d6f491..ffa1bce83e 100644 --- a/ui/app/workspace/logs/sheets/logDetailsSheet.tsx +++ b/ui/app/workspace/logs/sheets/logDetailsSheet.tsx @@ -1,7 +1,5 @@ "use client"; -import { useEffect } from "react"; -import { useLazyGetLogByIdQuery } from "@/lib/store/apis/logsApi"; import { AlertDialog, AlertDialogAction, @@ -15,7 +13,14 @@ import { } from "@/components/ui/alertDialog"; import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; -import { DropdownMenu, DropdownMenuContent, DropdownMenuItem, DropdownMenuSeparator, DropdownMenuTrigger } from "@/components/ui/dropdownMenu"; +import { CodeEditor } from "@/components/ui/codeEditor"; +import { + DropdownMenu, + DropdownMenuContent, + DropdownMenuItem, + DropdownMenuSeparator, + DropdownMenuTrigger, +} from "@/components/ui/dropdownMenu"; import { DottedSeparator } from "@/components/ui/separator"; import { Sheet, SheetContent, SheetHeader, SheetTitle } from "@/components/ui/sheet"; import { ProviderIconType, RenderProviderIcon, RoutingEngineUsedIcons } from "@/lib/constants/icons"; @@ -27,9 +32,11 @@ import { Status, StatusColors, } from "@/lib/constants/logs"; +import { useLazyGetLogByIdQuery } from "@/lib/store/apis/logsApi"; import { LogEntry } from "@/lib/types/logs"; import { Clipboard, Loader2, MoreVertical, Trash2 } from "lucide-react"; import moment from "moment"; +import { useEffect } from "react"; import { toast } from "sonner"; import BlockHeader from "../views/blockHeader"; import CollapsibleBox from "../views/collapsibleBox"; @@ -37,10 +44,10 @@ import ImageView from "../views/imageView"; import LogChatMessageView from "../views/logChatMessageView"; import LogEntryDetailsView from "../views/logEntryDetailsView"; import LogResponsesMessageView from "../views/logResponsesMessageView"; +import PluginLogsView from "../views/pluginLogsView"; import SpeechView from "../views/speechView"; import TranscriptionView from "../views/transcriptionView"; import VideoView from "../views/videoView"; -import { CodeEditor } from "@/components/ui/codeEditor"; const formatJsonSafe = (str: string | undefined): string => { try { @@ -58,8 +65,7 @@ interface LogDetailSheetProps { } // Helper to detect passthrough operations -const isPassthroughOperation = (object: string) => - object === "passthrough" || object === "passthrough_stream"; +const isPassthroughOperation = (object: string) => object === "passthrough" || object === "passthrough_stream"; // Helper to detect container operations (for hiding irrelevant fields like Model/Tokens) const isContainerOperation = (object: string) => { @@ -96,11 +102,11 @@ export function LogDetailSheet({ log, open, onOpenChange, handleDelete }: LogDet const isPassthrough = isPassthroughOperation(displayLog.object); const passthroughParams = isPassthrough ? (displayLog.params as { - method?: string; - path?: string; - raw_query?: string; - status_code?: number; - }) + method?: string; + path?: string; + raw_query?: string; + status_code?: number; + }) : null; // Taking out tool call @@ -108,7 +114,7 @@ export function LogDetailSheet({ log, open, onOpenChange, handleDelete }: LogDet if (displayLog.params?.tools) { try { toolsParameter = JSON.stringify(displayLog.params.tools, null, 2); - } catch (ignored) { } + } catch (ignored) {} } // Extract audio format from request params @@ -126,7 +132,7 @@ export function LogDetailSheet({ log, open, onOpenChange, handleDelete }: LogDet {!isFullDataReady ? (
- +
) : ( <> @@ -191,7 +197,9 @@ export function LogDetailSheet({ log, open, onOpenChange, handleDelete }: LogDet Are you sure you want to delete this log? - This action cannot be undone. This will permanently delete the log entry. + + This action cannot be undone. This will permanently delete the log entry. + Cancel @@ -250,19 +258,26 @@ export function LogDetailSheet({ log, open, onOpenChange, handleDelete }: LogDet label="Type" value={
{RequestTypeLabels[displayLog.object as keyof typeof RequestTypeLabels] ?? displayLog.object ?? "unknown"}
} /> - {displayLog.selected_key && } + {displayLog.selected_key && ( + + )} {displayLog.number_of_retries > 0 && ( )} - {displayLog.fallback_index > 0 && } - {displayLog.virtual_key && } + {displayLog.fallback_index > 0 && ( + + )} + {displayLog.virtual_key && ( + + )} {displayLog.routing_engines_used && displayLog.routing_engines_used.length > 0 && ( (
{RoutingEngineUsedIcons[engine as keyof typeof RoutingEngineUsedIcons]?.()} @@ -284,7 +301,9 @@ export function LogDetailSheet({ log, open, onOpenChange, handleDelete }: LogDet } /> )} - {displayLog.routing_rule && } + {displayLog.routing_rule && ( + + )} {/* Display audio params if present */} {(displayLog.params as any)?.audio && ( @@ -304,18 +323,12 @@ export function LogDetailSheet({ log, open, onOpenChange, handleDelete }: LogDet {passthroughParams.method && ( )} - {passthroughParams.path && ( - - )} + {passthroughParams.path && } {passthroughParams.raw_query && ( )} {(passthroughParams.status_code ?? 0) !== 0 && ( - + )} )} @@ -326,10 +339,7 @@ export function LogDetailSheet({ log, open, onOpenChange, handleDelete }: LogDet .filter(([key]) => { const passthroughKeys = ["method", "path", "raw_query", "status_code"]; return ( - key !== "tools" && - key !== "instructions" && - key !== "audio" && - !(isPassthrough && passthroughKeys.includes(key)) + key !== "tools" && key !== "instructions" && key !== "audio" && !(isPassthrough && passthroughKeys.includes(key)) ); }) .filter(([_, value]) => typeof value === "boolean" || typeof value === "number" || typeof value === "string") @@ -343,27 +353,31 @@ export function LogDetailSheet({ log, open, onOpenChange, handleDelete }: LogDet
- + - + {displayLog.token_usage?.prompt_tokens_details && ( <> - {(displayLog.token_usage.prompt_tokens_details.cached_read_tokens) && ( + {displayLog.token_usage.prompt_tokens_details.cached_read_tokens && ( )} - {(displayLog.token_usage.prompt_tokens_details.cached_write_tokens) && ( + {displayLog.token_usage.prompt_tokens_details.cached_write_tokens && ( )} {displayLog.token_usage.prompt_tokens_details.audio_tokens && ( @@ -454,7 +468,9 @@ export function LogDetailSheet({ log, open, onOpenChange, handleDelete }: LogDet } /> )} - {reasoning.max_tokens && } + {reasoning.max_tokens && ( + + )}
@@ -492,10 +508,18 @@ export function LogDetailSheet({ log, open, onOpenChange, handleDelete }: LogDet /> )} {displayLog.cache_debug.model_used && ( - + )} {displayLog.cache_debug.threshold && ( - + )} {displayLog.cache_debug.similarity && ( )} {displayLog.cache_debug.input_tokens && ( - + )} )} @@ -564,6 +592,7 @@ export function LogDetailSheet({ log, open, onOpenChange, handleDelete }: LogDet )} + {displayLog.plugin_logs && } {toolsParameter && ( toolsParameter}> + )} {(displayLog.video_generation_input || videoOutput || videoListOutput) && ( @@ -631,34 +664,39 @@ export function LogDetailSheet({ log, open, onOpenChange, handleDelete }: LogDet )} {/* Passthrough request body */} - {isPassthrough && passthroughRequestBody && (() => { - return ( - { - try { - return JSON.stringify(JSON.parse(passthroughRequestBody || ""), null, 2); - } catch { - return passthroughRequestBody || ""; - } - }}> - { + {isPassthrough && + passthroughRequestBody && + (() => { + return ( + { try { return JSON.stringify(JSON.parse(passthroughRequestBody || ""), null, 2); } catch { return passthroughRequestBody || ""; } - })()} - lang="json" - readonly={true} - options={{ scrollBeyondLastLine: false, lineNumbers: "off", alwaysConsumeMouseWheel: false }} - /> - - ); - })()} + }} + > + { + try { + return JSON.stringify(JSON.parse(passthroughRequestBody || ""), null, 2); + } catch { + return passthroughRequestBody || ""; + } + })()} + lang="json" + readonly={true} + options={{ scrollBeyondLastLine: false, lineNumbers: "off", alwaysConsumeMouseWheel: false }} + /> + + ); + })()} {/* Show conversation history for chat/text completions */} {displayLog.input_history && displayLog.input_history.length > 1 && ( @@ -693,12 +731,15 @@ export function LogDetailSheet({ log, open, onOpenChange, handleDelete }: LogDet )} - {displayLog.is_large_payload_response && !displayLog.output_message && !displayLog.responses_output?.length && displayLog.status !== "processing" && ( -
- Large payload response — response content was streamed directly to the client and is not available for display. - {displayLog.raw_response && " A truncated preview is available in the Raw Response section below."} -
- )} + {displayLog.is_large_payload_response && + !displayLog.output_message && + !displayLog.responses_output?.length && + displayLog.status !== "processing" && ( +
+ Large payload response — response content was streamed directly to the client and is not available for display. + {displayLog.raw_response && " A truncated preview is available in the Raw Response section below."} +
+ )} {displayLog.status !== "processing" && ( <> diff --git a/ui/app/workspace/logs/views/pluginLogsView.tsx b/ui/app/workspace/logs/views/pluginLogsView.tsx new file mode 100644 index 0000000000..b8648035f4 --- /dev/null +++ b/ui/app/workspace/logs/views/pluginLogsView.tsx @@ -0,0 +1,75 @@ +"use client"; + +import { PluginLogEntry } from "@/lib/types/logs"; +import { ChevronDown, ChevronRight } from "lucide-react"; +import moment from "moment"; +import { useState } from "react"; + +const levelColors: Record = { + debug: "bg-gray-100 text-gray-700 dark:bg-gray-800 dark:text-gray-300", + info: "bg-blue-100 text-blue-700 dark:bg-blue-900 dark:text-blue-300", + warn: "bg-amber-100 text-amber-700 dark:bg-amber-900 dark:text-amber-300", + error: "bg-red-100 text-red-700 dark:bg-red-900 dark:text-red-300", +}; + +interface PluginLogsViewProps { + pluginLogs: string; +} + +export default function PluginLogsView({ pluginLogs }: PluginLogsViewProps) { + let parsed: Record; + try { + const raw: unknown = JSON.parse(pluginLogs); + if (!raw || typeof raw !== "object" || Array.isArray(raw)) return null; + parsed = Object.fromEntries(Object.entries(raw as Record).filter(([, value]) => Array.isArray(value))) as Record< + string, + PluginLogEntry[] + >; + } catch { + return null; + } + + const pluginNames = Object.keys(parsed); + if (pluginNames.length === 0) return null; + + return ( +
+
Plugin Logs
+
+ {pluginNames.map((name) => ( + + ))} +
+
+ ); +} + +function PluginSection({ name, entries }: { name: string; entries: PluginLogEntry[] }) { + const [isOpen, setIsOpen] = useState(false); + const sorted = [...entries].sort((a, b) => a.timestamp - b.timestamp); + + return ( +
+ + {isOpen && ( +
+ {sorted.map((entry, idx) => ( +
+ {moment(entry.timestamp).format("HH:mm:ss.SSS")} + + {entry.level} + + {entry.message} +
+ ))} +
+ )} +
+ ); +} diff --git a/ui/lib/types/logs.ts b/ui/lib/types/logs.ts index 8192381465..69bd66b115 100644 --- a/ui/lib/types/logs.ts +++ b/ui/lib/types/logs.ts @@ -422,6 +422,13 @@ export interface Annotation { url_citation: Citation; } +export interface PluginLogEntry { + plugin_name: string; + level: "debug" | "info" | "warn" | "error"; + message: string; + timestamp: number; +} + // Main LogEntry interface matching backend export interface LogEntry { id: string; @@ -436,6 +443,7 @@ export interface LogEntry { routing_engines_used?: string[]; routing_rule_id?: string; routing_engine_logs?: string; // Human-readable routing decision logs + plugin_logs?: string; // JSON string of plugin execution logs grouped by plugin name selected_key?: DBKey; virtual_key?: VirtualKey; routing_rule?: RoutingRule;