Skip to content

Conversation

@fen-qin
Copy link
Collaborator

@fen-qin fen-qin commented Oct 29, 2025

Description

Goal:

To support more LLMs to Search Relevance Workbench:

  • Anthropic/Claude
  • DeepSeek
  • Cohere
  • OpenAI

Changes:

  • Introduce two new fields to Llm Judgment:

    • modelType: enum type to identify LLM type
    • rateLimit: rate limit for LLM host to receive the request
  • Add prompt formatter to handle differences across connectors:

    • Request Body Format
   OpenAI/DeepSeek: {"messages": [{"role": "system", "content": "..."}, {"role": "user", "content": "..."}]}
   Claude: {"messages": [{"role": "user", "content": [{"type": "text", "text": "..."}]}]}
   Cohere: {"message": "combined prompt and user content"}
  • Response Format
   OpenAI/DeepSeek: choices[0].message.content
   Claude: content[0].text
   Cohere: text
  • Add dynamic rate limit to handle difference across connectors:
## Claude 3.5 Haiku
Requests per minute: 20
Tokens per minute: 40,000

## Cohere Command R  
Requests per minute: 8
Tokens per minute: 6,000
  • Refactor advanced settings into metadata

Tests

  • judgment generated with rating scores for 150 queries, size as 5
PUT /_plugins/_search_relevance/judgments
{
  "name": "claude_10s_20241029",
  "type": "LLM_JUDGMENT",
  "querySetId": "cb34a720-71cb-4f78-b0a9-b3e3f87588d2",
  "searchConfigurationList": ["4897419d-24d2-41b1-91f8-3cb567f734f6"],
  "size": 5,
  "modelId": "TcWBMZoB86ITwl1Cs2nr",
  "tokenLimit": "1000",
  "rateLimit": 10000, // 10 seconds
  "modelType": "CLAUDE",
  "ignoreFailure": false
}
  • Configuration Validated:
Per-chunk rate limiting: 3-second default (rateLimit: 0)
Token limit: 2000 (matching connector capacity)
Model: Claude 3.5 Haiku via Bedrock
Scale: 150 queries × ~5 docs each = ~750 total LLM calls
  • Final Results:
• ✅ Status: COMPLETED
• ✅ Success Rate: 98.7% (148/150 queries successful)
• ✅ Total Ratings Generated: 736 individual document ratings
• ✅ Processing Time: ~43 minutes (21:46 - 22:29 approximately)
• ✅ No Rate Limiting Failures: Zero 429 errors!

Issues Resolved

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@fen-qin fen-qin marked this pull request as ready for review October 29, 2025 23:10
@fen-qin fen-qin changed the title support multiple LLM connectos with proper rate limit [FEATURE] Support multiple LLM connectos with proper rate limit Oct 29, 2025
Copy link
Member

@martin-gaievski martin-gaievski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good addition to the system, thanks for this contribution.

Few high level comments on top of what I posted in the code:

  • can we create a single class for LLM params, e.g LLMConfiguration, where we keep things like rateLimit, maxRetries, retryBackoff, maxTokens etc. Currently we do have all those params spread all over the code in different places. Separate class should help to keep logical structure and decouple logic from parameters.
  • we need to collect metric on retries, I think good way to do it is to use stats API that is part of the SRW repo. Check this PR where it has been introduced #63. We can collect number of retry attempts and delay
  • In future we need to think of a separate pool for retries. With current implementation system uses CompletableFuture.delayedExecutor which is basically common ForkJoinPool. Under high load with many retries this could exhaust the thread pool. This may be too much for this PR, more like a follow up change

ignoreFailure
);
List<String> unprocessedDocIds = docIds;
// List<String> unprocessedDocIds = deduplicateFromCache(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not needed in the final version

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. sure. this should be removed


private boolean isRetryableError(Exception e) {
String message = e.getMessage();
if (message == null) return true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it's retryable if without message? also please fix style, use curly braces

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class BedrockRateLimiter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not everyone will use Bedrock as a platform. We need RateLimiter interface and BedrockRateLimiter will be one possible implementation of it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the high-level suggestions + this thread.

Yes, we should put a more general class MLConfiguration that can probably hold all common functions:

  • rateLimit, maxRetries, retryBackoff, maxTokens etc
  • prompt formatter
  • response processor

to make it more clear if anyone what to add a new model with customized input/output interface and setting.

ActionListener<String> listener
) {
// Apply rate limiting per chunk to handle multiple chunks per query
BedrockRateLimiter.applyRateLimit(connectorType, customRateLimit);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to create exact implementation of RateLimiter interface using some sort of a Factory, or factory method, probably makes sense to have it as class variable. Then call only interface methods.

this.rateLimit = rateLimitObj != null ? rateLimitObj : 0L;
}

public String getQuerySetId() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for bunch of getters you can use lombok getter

}
}

lastRequestTimes.put(key, System.currentTimeMillis());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map can grow indefinitely, I cannot see where we delete from it

long sleepTime = delayMs - timeSinceLastRequest;
try {
log.debug("Rate limiting {}: sleeping for {} ms", modelType, sleepTime);
Thread.sleep(sleepTime);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not inefficient to block threads with sleep. You can do multiple options, for example:

  1. Use ScheduledExecutorService, each time you need to sleep the thread, you do
    scheduler.schedule(() -> { future.complete(null); }, waitTime, TimeUnit.MILLISECONDS)

or even simpler approach - CompletableFuture with delay
CompletableFuture.delayedExecutor(delayMs, TimeUnit.MILLISECONDS) .execute(() -> {});

@fen-qin
Copy link
Collaborator Author

fen-qin commented Oct 30, 2025

Thanks @martin-gaievski for the review.
Adding @chloewqg @heemin32 to the thread.

This PR is about to do code refactoring on existing LLM processor, to make it supports models in a more general way:

  • aws bedrock models: claude, cohere, deepseek, openai
  • http endpoint models: openai, deepseek

and the interface should be extendable to onboard any other models

@chloewqg
Copy link
Contributor

Thanks @martin-gaievski for the review. Adding @chloewqg @heemin32 to the thread.

This PR is about to do code refactoring on existing LLM processor, to make it supports models in a more general way:

  • aws bedrock models: claude, cohere, deepseek, openai
  • http endpoint models: openai, deepseek

and the interface should be extendable to onboard any other models

Thanks for cc me. It does look like to be a lot of code conflicts with PR #264 We should think about how to merge it and the efforts to merge and resolve conflicts without having functionality break.

@chloewqg
Copy link
Contributor

Thanks @martin-gaievski for the review. Adding @chloewqg @heemin32 to the thread.
This PR is about to do code refactoring on existing LLM processor, to make it supports models in a more general way:

  • aws bedrock models: claude, cohere, deepseek, openai
  • http endpoint models: openai, deepseek

and the interface should be extendable to onboard any other models

Thanks for cc me. It does look like to be a lot of code conflicts with PR #264 We should think about how to merge it and the efforts to merge and resolve conflicts without having functionality break.

From the code comparison of Overlap between two PRs, it is not trivial work to merge and ensure not functionality breaks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants