Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/bifrost.go
Original file line number Diff line number Diff line change
Expand Up @@ -6156,7 +6156,7 @@ func (bifrost *Bifrost) selectKeyFromProviderForModel(ctx *schemas.BifrostContex
}
// Check if no keys found
if len(keys) == 0 {
return schemas.Key{}, fmt.Errorf("no keys found for provider: %v and model: %s", providerKey, model)
return schemas.Key{}, fmt.Errorf("no keys found for provider: %v", providerKey)
}

// For batch API operations, filter keys to only include those with UseForBatchAPI enabled
Expand Down
241 changes: 236 additions & 5 deletions core/providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"mime/multipart"
"net/http"
"net/url"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -3102,11 +3103,241 @@ func (provider *AzureProvider) ContainerFileDelete(_ *schemas.BifrostContext, _
return nil, providerUtils.NewUnsupportedOperationError(schemas.ContainerFileDeleteRequest, provider.GetProviderKey())
}

// Passthrough is not supported by the Azure provider.
func (provider *AzureProvider) Passthrough(_ *schemas.BifrostContext, _ schemas.Key, _ *schemas.BifrostPassthroughRequest) (*schemas.BifrostPassthroughResponse, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.PassthroughRequest, provider.GetProviderKey())
// Passthrough forwards a request directly to Azure's API without transformation.
func (provider *AzureProvider) Passthrough(
ctx *schemas.BifrostContext,
key schemas.Key,
req *schemas.BifrostPassthroughRequest,
) (*schemas.BifrostPassthroughResponse, *schemas.BifrostError) {
if key.AzureKeyConfig == nil {
return nil, providerUtils.NewConfigurationError("azure key config not set", provider.GetProviderKey())
}

endpoint := key.AzureKeyConfig.Endpoint.GetValue()
if endpoint == "" {
return nil, providerUtils.NewConfigurationError("endpoint not set", provider.GetProviderKey())
}

url := endpoint + req.Path
if req.RawQuery != "" {
url += "?" + req.RawQuery
}

fasthttpReq := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)
defer fasthttp.ReleaseRequest(fasthttpReq)

fasthttpReq.Header.SetMethod(req.Method)
fasthttpReq.SetRequestURI(url)

providerUtils.SetExtraHeaders(ctx, fasthttpReq, provider.networkConfig.ExtraHeaders, nil)

for k, v := range req.SafeHeaders {
fasthttpReq.Header.Set(k, v)
}

authHeaders, bifrostErr := provider.getAzureAuthHeaders(ctx, key, schemas.IsAnthropicModel(req.Model))
if bifrostErr != nil {
return nil, bifrostErr
}
for k, v := range authHeaders {
fasthttpReq.Header.Set(k, v)
}

fasthttpReq.SetBody(req.Body)

latency, bifrostErr := providerUtils.MakeRequestWithContext(ctx, provider.client, fasthttpReq, resp)
if bifrostErr != nil {
return nil, bifrostErr
}

headers := providerUtils.ExtractProviderResponseHeaders(resp)

body, err := providerUtils.CheckAndDecodeBody(resp)
if err != nil {
return nil, providerUtils.NewBifrostOperationError("failed to decode response body", err, provider.GetProviderKey())
}

for k := range headers {
if strings.EqualFold(k, "Content-Encoding") || strings.EqualFold(k, "Content-Length") {
delete(headers, k)
}
}

bifrostResponse := &schemas.BifrostPassthroughResponse{
StatusCode: resp.StatusCode(),
Headers: headers,
Body: body,
}

bifrostResponse.ExtraFields.Provider = provider.GetProviderKey()
bifrostResponse.ExtraFields.ModelRequested = req.Model
bifrostResponse.ExtraFields.RequestType = schemas.PassthroughRequest
bifrostResponse.ExtraFields.Latency = latency.Milliseconds()

if providerUtils.ShouldSendBackRawRequest(ctx, provider.sendBackRawRequest) {
providerUtils.ParseAndSetRawRequestIfJSON(fasthttpReq, &bifrostResponse.ExtraFields)
}

return bifrostResponse, nil
}

func (provider *AzureProvider) PassthroughStream(_ *schemas.BifrostContext, _ schemas.PostHookRunner, _ schemas.Key, _ *schemas.BifrostPassthroughRequest) (chan *schemas.BifrostStreamChunk, *schemas.BifrostError) {
return nil, providerUtils.NewUnsupportedOperationError(schemas.PassthroughStreamRequest, provider.GetProviderKey())
// PassthroughStream forwards a streaming request directly to Azure's API without transformation.
func (provider *AzureProvider) PassthroughStream(
ctx *schemas.BifrostContext,
postHookRunner schemas.PostHookRunner,
key schemas.Key,
req *schemas.BifrostPassthroughRequest,
) (chan *schemas.BifrostStreamChunk, *schemas.BifrostError) {
if key.AzureKeyConfig == nil {
return nil, providerUtils.NewConfigurationError("azure key config not set", provider.GetProviderKey())
}

endpoint := key.AzureKeyConfig.Endpoint.GetValue()
if endpoint == "" {
return nil, providerUtils.NewConfigurationError("endpoint not set", provider.GetProviderKey())
}

url := endpoint + req.Path
if req.RawQuery != "" {
url += "?" + req.RawQuery
}

fasthttpReq := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
resp.StreamBody = true
defer fasthttp.ReleaseRequest(fasthttpReq)

fasthttpReq.Header.SetMethod(req.Method)
fasthttpReq.SetRequestURI(url)

providerUtils.SetExtraHeaders(ctx, fasthttpReq, provider.networkConfig.ExtraHeaders, nil)

for k, v := range req.SafeHeaders {
fasthttpReq.Header.Set(k, v)
}

fasthttpReq.Header.Set("Connection", "close")

authHeaders, bifrostErr := provider.getAzureAuthHeaders(ctx, key, schemas.IsAnthropicModel(req.Model))
if bifrostErr != nil {
return nil, bifrostErr
}
for k, v := range authHeaders {
fasthttpReq.Header.Set(k, v)
}

fasthttpReq.SetBody(req.Body)

activeClient := providerUtils.PrepareResponseStreaming(ctx, provider.client, resp)

startTime := time.Now()

if err := activeClient.Do(fasthttpReq, resp); err != nil {
providerUtils.ReleaseStreamingResponse(resp)
if errors.Is(err, context.Canceled) {
return nil, &schemas.BifrostError{
IsBifrostError: false,
Error: &schemas.ErrorField{
Type: schemas.Ptr(schemas.RequestCancelled),
Message: schemas.ErrRequestCancelled,
Error: err,
},
}
}
if errors.Is(err, fasthttp.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
return nil, providerUtils.NewBifrostOperationError(schemas.ErrProviderRequestTimedOut, err, provider.GetProviderKey())
}
return nil, providerUtils.NewBifrostOperationError(schemas.ErrProviderDoRequest, err, provider.GetProviderKey())
}

headers := make(map[string]string)
resp.Header.All()(func(k, v []byte) bool {
headers[string(k)] = string(v)
return true
})

bodyStream := resp.BodyStream()
if bodyStream == nil {
providerUtils.ReleaseStreamingResponse(resp)
return nil, providerUtils.NewBifrostOperationError(
"provider returned an empty stream body",
fmt.Errorf("provider returned an empty stream body"),
provider.GetProviderKey(),
)
}

stopCancellation := providerUtils.SetupStreamCancellation(ctx, bodyStream, provider.logger)

extraFields := schemas.BifrostResponseExtraFields{
Provider: provider.GetProviderKey(),
ModelRequested: req.Model,
RequestType: schemas.PassthroughStreamRequest,
}
if providerUtils.ShouldSendBackRawRequest(ctx, provider.sendBackRawRequest) {
providerUtils.ParseAndSetRawRequestIfJSON(fasthttpReq, &extraFields)
}
statusCode := resp.StatusCode()

ch := make(chan *schemas.BifrostStreamChunk, schemas.DefaultStreamBufferSize)
go func() {
defer func() {
if ctx.Err() == context.Canceled {
providerUtils.HandleStreamCancellation(ctx, postHookRunner, ch, provider.GetProviderKey(), req.Model, schemas.PassthroughStreamRequest, provider.logger)
} else if ctx.Err() == context.DeadlineExceeded {
providerUtils.HandleStreamTimeout(ctx, postHookRunner, ch, provider.GetProviderKey(), req.Model, schemas.PassthroughStreamRequest, provider.logger)
}
close(ch)
}()
defer providerUtils.ReleaseStreamingResponse(resp)
defer stopCancellation()

buf := make([]byte, 4096)
for {
n, readErr := bodyStream.Read(buf)
if n > 0 {
chunk := make([]byte, n)
copy(chunk, buf[:n])
select {
case ch <- &schemas.BifrostStreamChunk{
BifrostPassthroughResponse: &schemas.BifrostPassthroughResponse{
StatusCode: statusCode,
Headers: headers,
Body: chunk,
ExtraFields: extraFields,
},
}:
case <-ctx.Done():
return
}
}
if readErr == io.EOF {
ctx.SetValue(schemas.BifrostContextKeyStreamEndIndicator, true)
extraFields.Latency = time.Since(startTime).Milliseconds()
finalResp := &schemas.BifrostResponse{
PassthroughResponse: &schemas.BifrostPassthroughResponse{
StatusCode: statusCode,
Headers: headers,
ExtraFields: extraFields,
},
}
postHookRunner(ctx, finalResp, nil)
if finalizer, ok := ctx.Value(schemas.BifrostContextKeyPostHookSpanFinalizer).(func(context.Context)); ok && finalizer != nil {
finalizer(ctx)
}
return
}
if readErr != nil {
if ctx.Err() != nil {
return
}
ctx.SetValue(schemas.BifrostContextKeyStreamEndIndicator, true)
extraFields.Latency = time.Since(startTime).Milliseconds()
providerUtils.ProcessAndSendError(ctx, postHookRunner, readErr, ch, schemas.PassthroughStreamRequest, provider.GetProviderKey(), req.Model, provider.logger)
return
}
}
}()
return ch, nil
}
6 changes: 0 additions & 6 deletions framework/configstore/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2710,9 +2710,6 @@ func (s *RDBConfigStore) CreateRoutingRule(ctx context.Context, rule *tables.Tab
return s.parseGormError(err)
}
if count > 0 {
if rule.ScopeID != nil {
return fmt.Errorf("routing rule with priority %d already exists for scope '%s' with scopeID '%v'", rule.Priority, rule.Scope, rule.ScopeID)
}
return fmt.Errorf("routing rule with priority %d already exists for scope '%s'", rule.Priority, rule.Scope)
}

Expand Down Expand Up @@ -2759,9 +2756,6 @@ func (s *RDBConfigStore) UpdateRoutingRule(ctx context.Context, rule *tables.Tab
return s.parseGormError(err)
}
if count > 0 {
if rule.ScopeID != nil {
return fmt.Errorf("routing rule with priority %d already exists for scope '%s' with scopeID '%v'", rule.Priority, rule.Scope, rule.ScopeID)
}
return fmt.Errorf("routing rule with priority %d already exists for scope '%s'", rule.Priority, rule.Scope)
}

Expand Down
1 change: 1 addition & 0 deletions transports/bifrost-http/handlers/integrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func NewIntegrationHandler(client *bifrost.Bifrost, handlerStore lib.HandlerStor
integrations.NewGenAIPassthroughRouter(client, handlerStore, logger),
integrations.NewOpenAIPassthroughRouter(client, handlerStore, logger),
integrations.NewAnthropicPassthroughRouter(client, handlerStore, logger),
integrations.NewAzurePassthroughRouter(client, handlerStore, logger),
integrations.NewCursorRouter(client, handlerStore, logger),
}

Expand Down
10 changes: 10 additions & 0 deletions transports/bifrost-http/integrations/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ func NewOpenAIPassthroughRouter(client *bifrost.Bifrost, handlerStore lib.Handle
})
}

// NewAzurePassthroughRouter creates a passthrough router for /azure_passthrough.
func NewAzurePassthroughRouter(client *bifrost.Bifrost, handlerStore lib.HandlerStore, logger schemas.Logger) *PassthroughRouter {
return NewPassthroughRouter(client, handlerStore, logger, &PassthroughConfig{
Provider: schemas.Azure,
StripPrefix: []string{
"/azure_passthrough",
},
})
}

// NewGenAIPassthroughRouter creates a passthrough router for /genai_passthrough.
func NewGenAIPassthroughRouter(client *bifrost.Bifrost, handlerStore lib.HandlerStore, logger schemas.Logger) *PassthroughRouter {
return NewPassthroughRouter(client, handlerStore, logger, &PassthroughConfig{
Expand Down