Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/mcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ func (m *MCPManager) extractTextFromMCPResponse(toolResponse *mcp.CallToolResult
// Handle typed content
switch content := contentBlock.(type) {
case mcp.TextContent:
result.WriteString(fmt.Sprintf("[Text Response: %s]\n", content.Text))
result.WriteString(content.Text)
case mcp.ImageContent:
result.WriteString(fmt.Sprintf("[Image Response: %s, MIME: %s]\n", content.Data, content.MIMEType))
case mcp.AudioContent:
Expand Down
158 changes: 158 additions & 0 deletions transports/bifrost-http/handlers/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Package handlers provides HTTP request handlers for the Bifrost HTTP transport.
// This file contains logging-related handlers for log search, stats, and management.
package handlers

import (
"fmt"
"strconv"
"strings"
"time"

"github.com/fasthttp/router"
"github.com/maximhq/bifrost/core/schemas"
"github.com/maximhq/bifrost/transports/bifrost-http/plugins/logging"
"github.com/valyala/fasthttp"
)

// LoggingHandler manages HTTP requests for logging operations
type LoggingHandler struct {
logManager logging.LogManager
logger schemas.Logger
}

// NewLoggingHandler creates a new logging handler instance
func NewLoggingHandler(logManager logging.LogManager, logger schemas.Logger) *LoggingHandler {
return &LoggingHandler{
logManager: logManager,
logger: logger,
}
}

// RegisterRoutes registers all logging-related routes
func (h *LoggingHandler) RegisterRoutes(r *router.Router) {
// Log retrieval with filtering, search, and pagination
r.GET("/v1/logs", h.GetLogs)
}

// GetLogs handles GET /v1/logs - Get logs with filtering, search, and pagination via query parameters
func (h *LoggingHandler) GetLogs(ctx *fasthttp.RequestCtx) {
// Parse query parameters into filters
filters := &logging.SearchFilters{}
pagination := &logging.PaginationOptions{}

// Extract filters from query parameters
if providers := string(ctx.QueryArgs().Peek("providers")); providers != "" {
filters.Providers = parseCommaSeparated(providers)
}
if models := string(ctx.QueryArgs().Peek("models")); models != "" {
filters.Models = parseCommaSeparated(models)
}
if statuses := string(ctx.QueryArgs().Peek("status")); statuses != "" {
filters.Status = parseCommaSeparated(statuses)
}
if objects := string(ctx.QueryArgs().Peek("objects")); objects != "" {
filters.Objects = parseCommaSeparated(objects)
}
if startTime := string(ctx.QueryArgs().Peek("start_time")); startTime != "" {
if t, err := time.Parse(time.RFC3339, startTime); err == nil {
filters.StartTime = &t
}
}
if endTime := string(ctx.QueryArgs().Peek("end_time")); endTime != "" {
if t, err := time.Parse(time.RFC3339, endTime); err == nil {
filters.EndTime = &t
}
}
if minLatency := string(ctx.QueryArgs().Peek("min_latency")); minLatency != "" {
if f, err := strconv.ParseFloat(minLatency, 64); err == nil {
filters.MinLatency = &f
}
}
if maxLatency := string(ctx.QueryArgs().Peek("max_latency")); maxLatency != "" {
if val, err := strconv.ParseFloat(maxLatency, 64); err == nil {
filters.MaxLatency = &val
}
}
if minTokens := string(ctx.QueryArgs().Peek("min_tokens")); minTokens != "" {
if val, err := strconv.Atoi(minTokens); err == nil {
filters.MinTokens = &val
}
}
if maxTokens := string(ctx.QueryArgs().Peek("max_tokens")); maxTokens != "" {
if val, err := strconv.Atoi(maxTokens); err == nil {
filters.MaxTokens = &val
}
}
if contentSearch := string(ctx.QueryArgs().Peek("content_search")); contentSearch != "" {
filters.ContentSearch = contentSearch
}

// Extract pagination parameters
pagination.Limit = 50 // Default limit
if limit := string(ctx.QueryArgs().Peek("limit")); limit != "" {
if i, err := strconv.Atoi(limit); err == nil {
if i <= 0 {
SendError(ctx, fasthttp.StatusBadRequest, "limit must be greater than 0", h.logger)
return
}
if i > 1000 {
SendError(ctx, fasthttp.StatusBadRequest, "limit cannot exceed 1000", h.logger)
return
}
pagination.Limit = i
}
}

pagination.Offset = 0 // Default offset
if offset := string(ctx.QueryArgs().Peek("offset")); offset != "" {
if i, err := strconv.Atoi(offset); err == nil {
if i < 0 {
SendError(ctx, fasthttp.StatusBadRequest, "offset cannot be negative", h.logger)
return
}
pagination.Offset = i
}
}

// Sort parameters
pagination.SortBy = "timestamp" // Default sort field
if sortBy := string(ctx.QueryArgs().Peek("sort_by")); sortBy != "" {
if sortBy == "timestamp" || sortBy == "latency" || sortBy == "tokens" {
pagination.SortBy = sortBy
}
}

pagination.Order = "desc" // Default sort order
if order := string(ctx.QueryArgs().Peek("order")); order != "" {
if order == "asc" || order == "desc" {
pagination.Order = order
}
}

result, err := h.logManager.Search(filters, pagination)
if err != nil {
h.logger.Error(fmt.Errorf("failed to search logs: %w", err))
SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Search failed: %v", err), h.logger)
return
}

SendJSON(ctx, result, h.logger)
}

// Helper functions

// parseCommaSeparated splits a comma-separated string into a slice
func parseCommaSeparated(s string) []string {
if s == "" {
return nil
}

var result []string
for _, item := range strings.Split(s, ",") {
if trimmed := strings.TrimSpace(item); trimmed != "" {
result = append(result, trimmed)
}
}

return result
}
117 changes: 62 additions & 55 deletions transports/bifrost-http/handlers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,26 @@ type AddProviderRequest struct {
NetworkConfig *schemas.NetworkConfig `json:"network_config,omitempty"` // Network-related settings
MetaConfig *map[string]interface{} `json:"meta_config,omitempty"` // Provider-specific metadata
ConcurrencyAndBufferSize *schemas.ConcurrencyAndBufferSize `json:"concurrency_and_buffer_size,omitempty"` // Concurrency settings
ProxyConfig *schemas.ProxyConfig `json:"proxy_config,omitempty"` // Proxy configuration
}

// UpdateProviderRequest represents the request body for updating a provider
type UpdateProviderRequest struct {
Keys *[]schemas.Key `json:"keys,omitempty"` // API keys for the provider
NetworkConfig *schemas.NetworkConfig `json:"network_config,omitempty"` // Network-related settings
MetaConfig *map[string]interface{} `json:"meta_config,omitempty"` // Provider-specific metadata
ConcurrencyAndBufferSize *schemas.ConcurrencyAndBufferSize `json:"concurrency_and_buffer_size,omitempty"` // Concurrency settings
Keys []schemas.Key `json:"keys"` // API keys for the provider
NetworkConfig schemas.NetworkConfig `json:"network_config"` // Network-related settings
MetaConfig *map[string]interface{} `json:"meta_config,omitempty"` // Provider-specific metadata
ConcurrencyAndBufferSize schemas.ConcurrencyAndBufferSize `json:"concurrency_and_buffer_size"` // Concurrency settings
ProxyConfig *schemas.ProxyConfig `json:"proxy_config,omitempty"` // Proxy configuration
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// ProviderResponse represents the response for provider operations
type ProviderResponse struct {
Provider schemas.ModelProvider `json:"provider"`
Config *lib.ProviderConfig `json:"config,omitempty"`
Status string `json:"status"`
Message string `json:"message,omitempty"`
Name schemas.ModelProvider `json:"name"`
Keys []schemas.Key `json:"keys"` // API keys for the provider
NetworkConfig schemas.NetworkConfig `json:"network_config"` // Network-related settings
MetaConfig *schemas.MetaConfig `json:"meta_config"` // Provider-specific metadata
ConcurrencyAndBufferSize schemas.ConcurrencyAndBufferSize `json:"concurrency_and_buffer_size"` // Concurrency settings
ProxyConfig *schemas.ProxyConfig `json:"proxy_config"` // Proxy configuration
}

// ListProvidersResponse represents the response for listing all providers
Expand Down Expand Up @@ -95,18 +99,12 @@ func (h *ProviderHandler) ListProviders(ctx *fasthttp.RequestCtx) {
h.logger.Warn(fmt.Sprintf("Failed to get config for provider %s: %v", provider, err))
// Include provider even if config fetch fails
providerResponses = append(providerResponses, ProviderResponse{
Provider: provider,
Status: "error",
Message: fmt.Sprintf("Failed to get config: %v", err),
Name: provider,
})
continue
}

providerResponses = append(providerResponses, ProviderResponse{
Provider: provider,
Config: config,
Status: "active",
})
providerResponses = append(providerResponses, h.getProviderResponseFromConfig(provider, *config))
}

response := ListProvidersResponse{
Expand All @@ -131,11 +129,7 @@ func (h *ProviderHandler) GetProvider(ctx *fasthttp.RequestCtx) {
return
}

response := ProviderResponse{
Provider: provider,
Config: config,
Status: "active",
}
response := h.getProviderResponseFromConfig(provider, *config)

SendJSON(ctx, response, h.logger)
}
Expand All @@ -155,7 +149,7 @@ func (h *ProviderHandler) AddProvider(ctx *fasthttp.RequestCtx) {
}

// Validate required keys
if len(req.Keys) == 0 {
if len(req.Keys) == 0 && req.Provider != schemas.Vertex && req.Provider != schemas.Ollama {
SendError(ctx, fasthttp.StatusBadRequest, "At least one API key is required", h.logger)
return
}
Expand Down Expand Up @@ -204,12 +198,7 @@ func (h *ProviderHandler) AddProvider(ctx *fasthttp.RequestCtx) {

h.logger.Info(fmt.Sprintf("Provider %s added successfully", req.Provider))

response := ProviderResponse{
Provider: req.Provider,
Config: &config,
Status: "added",
Message: fmt.Sprintf("Provider %s added successfully", req.Provider),
}
response := h.getProviderResponseFromConfig(req.Provider, config)

SendJSON(ctx, response, h.logger)
}
Expand Down Expand Up @@ -247,11 +236,11 @@ func (h *ProviderHandler) UpdateProvider(ctx *fasthttp.RequestCtx) {

// Validate required keys (at least one key must be provided)
if req.Keys != nil {
if len(*req.Keys) == 0 {
if len(req.Keys) == 0 && provider != schemas.Vertex && provider != schemas.Ollama {
SendError(ctx, fasthttp.StatusBadRequest, "At least one API key is required", h.logger)
return
}
config.Keys = *req.Keys
config.Keys = req.Keys
}

// Handle meta config if provided
Expand All @@ -265,17 +254,23 @@ func (h *ProviderHandler) UpdateProvider(ctx *fasthttp.RequestCtx) {
config.MetaConfig = metaConfig
}

if req.ConcurrencyAndBufferSize != nil {
if req.ConcurrencyAndBufferSize.Concurrency == 0 {
SendError(ctx, fasthttp.StatusBadRequest, "Concurrency must be greater than 0", h.logger)
return
}
if req.ConcurrencyAndBufferSize.BufferSize == 0 {
SendError(ctx, fasthttp.StatusBadRequest, "Buffer size must be greater than 0", h.logger)
return
}
config.ConcurrencyAndBufferSize = req.ConcurrencyAndBufferSize
if req.ConcurrencyAndBufferSize.Concurrency == 0 {
SendError(ctx, fasthttp.StatusBadRequest, "Concurrency must be greater than 0", h.logger)
return
}
if req.ConcurrencyAndBufferSize.BufferSize == 0 {
SendError(ctx, fasthttp.StatusBadRequest, "Buffer size must be greater than 0", h.logger)
return
}

if req.ConcurrencyAndBufferSize.Concurrency > req.ConcurrencyAndBufferSize.BufferSize {
SendError(ctx, fasthttp.StatusBadRequest, "Concurrency must be less than or equal to buffer size", h.logger)
return
}

config.ConcurrencyAndBufferSize = &req.ConcurrencyAndBufferSize
config.NetworkConfig = &req.NetworkConfig
config.ProxyConfig = req.ProxyConfig

// Update provider config in store (env vars will be processed by store)
if err := h.store.UpdateProviderConfig(provider, config); err != nil {
Expand All @@ -284,20 +279,16 @@ func (h *ProviderHandler) UpdateProvider(ctx *fasthttp.RequestCtx) {
return
}

// Update concurrency and queue configuration in Bifrost
if err := h.client.UpdateProviderConcurrency(provider); err != nil {
// Note: Store update succeeded, continue but log the concurrency update failure
h.logger.Warn(fmt.Sprintf("Failed to update concurrency for provider %s: %v", provider, err))
if config.ConcurrencyAndBufferSize.Concurrency != oldConfig.ConcurrencyAndBufferSize.Concurrency ||
config.ConcurrencyAndBufferSize.BufferSize != oldConfig.ConcurrencyAndBufferSize.BufferSize {
// Update concurrency and queue configuration in Bifrost
if err := h.client.UpdateProviderConcurrency(provider); err != nil {
// Note: Store update succeeded, continue but log the concurrency update failure
h.logger.Warn(fmt.Sprintf("Failed to update concurrency for provider %s: %v", provider, err))
}
}

h.logger.Info(fmt.Sprintf("Provider %s updated successfully", provider))

response := ProviderResponse{
Provider: provider,
Config: &config,
Status: "updated",
Message: fmt.Sprintf("Provider %s updated successfully", provider),
}
response := h.getProviderResponseFromConfig(provider, config)

SendJSON(ctx, response, h.logger)
}
Expand Down Expand Up @@ -326,9 +317,7 @@ func (h *ProviderHandler) DeleteProvider(ctx *fasthttp.RequestCtx) {
h.logger.Info(fmt.Sprintf("Provider %s removed successfully", provider))

response := ProviderResponse{
Provider: provider,
Status: "removed",
Message: fmt.Sprintf("Provider %s removed successfully", provider),
Name: provider,
}

SendJSON(ctx, response, h.logger)
Expand Down Expand Up @@ -396,6 +385,24 @@ func (h *ProviderHandler) convertToProviderMetaConfig(provider schemas.ModelProv
}
}

func (h *ProviderHandler) getProviderResponseFromConfig(provider schemas.ModelProvider, config lib.ProviderConfig) ProviderResponse {
if config.NetworkConfig == nil {
config.NetworkConfig = &schemas.DefaultNetworkConfig
}
if config.ConcurrencyAndBufferSize == nil {
config.ConcurrencyAndBufferSize = &schemas.DefaultConcurrencyAndBufferSize
}

return ProviderResponse{
Name: provider,
Keys: config.Keys,
NetworkConfig: *config.NetworkConfig,
MetaConfig: config.MetaConfig,
ConcurrencyAndBufferSize: *config.ConcurrencyAndBufferSize,
ProxyConfig: config.ProxyConfig,
}
}

func getProviderFromCtx(ctx *fasthttp.RequestCtx) (schemas.ModelProvider, error) {
providerValue := ctx.UserValue("provider")
if providerValue == nil {
Expand Down
Loading