Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
09bb5d0
Add support for gRPC/protobuf transport operations.
finnegancarroll Sep 10, 2025
353ac7f
Refactor request timings.
finnegancarroll Sep 11, 2025
80c0233
Move timing back into runners.
finnegancarroll Sep 11, 2025
8d2c92a
Bump opensearch-protobufs -> 0.13.0.
finnegancarroll Sep 19, 2025
4a61073
Remove pylint comment.
finnegancarroll Sep 23, 2025
386ca9d
Actually call cleanup channel for unified client.
finnegancarroll Sep 23, 2025
fdddc75
Fix response parsing in proto runners - ResponseBody is no longer nes…
finnegancarroll Sep 23, 2025
a865e1b
Remove change to non-async client.
finnegancarroll Oct 6, 2025
d50d00e
Bump proto definitions for 0.23.0.
finnegancarroll Oct 6, 2025
d966a14
Simplify create_grpc_stubs slightly.
finnegancarroll Oct 6, 2025
be2ee2e
Clean up grpc channel options. Move to GrpcClientFactory constructor.
finnegancarroll Oct 6, 2025
6a166b0
Throw exceptions inside unified client when missconfigured.
finnegancarroll Oct 6, 2025
923849a
Comment for stubs channel reference.
finnegancarroll Oct 6, 2025
4d2f3e3
Proto bulk conversion helper unit tests.
finnegancarroll Oct 6, 2025
77fb1ef
Response parsing in KNN helper is redundant.
finnegancarroll Oct 9, 2025
2664f33
Revert bulk helper + tests to 0.19.0.
finnegancarroll Oct 9, 2025
ca9675a
Query helper unit tests.
finnegancarroll Oct 9, 2025
fe569e3
STASH
finnegancarroll Oct 9, 2025
0a1d6d7
Move knn into query tests.
finnegancarroll Oct 10, 2025
3f24d53
Clean up document parsing.
finnegancarroll Oct 10, 2025
64ab70f
Comments variable names and snake case.
finnegancarroll Oct 10, 2025
07a934b
Revert context.py.
finnegancarroll Oct 10, 2025
dfe93c4
extend().
finnegancarroll Oct 10, 2025
fc32af7
Throw exceptions if multiple clusters or hosts provided.
finnegancarroll Oct 13, 2025
ec3e243
Replace if chain with match.
finnegancarroll Oct 13, 2025
933d72c
Un necessary cast.
finnegancarroll Oct 13, 2025
6381fc3
opensearch-protobufs 0.19.0.
finnegancarroll Oct 14, 2025
905e45b
Linter errors.
finnegancarroll Oct 15, 2025
61007e3
Fix bug in proto knn query runner - Needs VectorSearchParamSource.
finnegancarroll Oct 15, 2025
e64213d
Fix knn query structure structure per VectorSearchParamSource.
finnegancarroll Oct 16, 2025
0a37275
Use query body k - Slightly simpler.
finnegancarroll Oct 16, 2025
e45ce83
Unit tests for parse knn query from params.
finnegancarroll Oct 16, 2025
184cdc9
Fix bug in knn query where source config evaluates to true.
finnegancarroll Oct 16, 2025
2c39d16
Fix linter errors.
finnegancarroll Oct 16, 2025
b3e2a83
Fix docvalue_fields not set knn query.
finnegancarroll Oct 17, 2025
5e589f6
Stored fields must be explicitly _none_.
finnegancarroll Oct 17, 2025
e9e8e86
Update stored field default in unit tests.
finnegancarroll Oct 17, 2025
dda8c03
Linter errors.
finnegancarroll Oct 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,11 @@ def add_workload_source(subparser):
"--worker-ips",
help="Define a comma-separated list of hosts which should generate load (default: localhost).",
default="localhost")
test_run_parser.add_argument(
"--grpc-target-hosts",
help="Define a comma-separated list of host:port pairs for gRPC endpoints "
"(default: localhost:9400).",
default="")
test_run_parser.add_argument(
"--client-options",
"-c",
Expand Down Expand Up @@ -1070,6 +1075,10 @@ def configure_connection_params(arg_parser, args, cfg):
cfg.add(config.Scope.applicationOverride, "client", "hosts", target_hosts)
client_options = opts.ClientOptions(args.client_options, target_hosts=target_hosts)
cfg.add(config.Scope.applicationOverride, "client", "options", client_options)

# Configure gRPC target hosts
grpc_target_hosts = opts.TargetHosts(args.grpc_target_hosts) if args.grpc_target_hosts else None
cfg.add(config.Scope.applicationOverride, "client", "grpc_hosts", grpc_target_hosts)
if "timeout" not in client_options.default:
console.info("You did not provide an explicit timeout in the client options. Assuming default of 10 seconds.")
if list(target_hosts.all_hosts) != list(client_options.all_client_options):
Expand Down
133 changes: 128 additions & 5 deletions osbenchmark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
import urllib3
from urllib3.util.ssl_ import is_ipaddress

import grpc
from opensearch.protobufs.services.document_service_pb2_grpc import DocumentServiceStub
from opensearch.protobufs.services.search_service_pb2_grpc import SearchServiceStub

from osbenchmark.kafka_client import KafkaMessageProducer
from osbenchmark import exceptions, doc_link, async_connection
from osbenchmark.context import RequestContextHolder
from osbenchmark.utils import console, convert
from osbenchmark.cloud_provider import CloudProviderFactory


class OsClientFactory:
"""
Abstracts how the OpenSearch client is created. Intended for testing.
Expand Down Expand Up @@ -175,9 +178,11 @@ def create_async(self):
# pylint: disable=import-outside-toplevel
import io
import aiohttp

from opensearchpy.serializer import JSONSerializer

class BenchmarkAsyncOpenSearch(opensearchpy.AsyncOpenSearch, RequestContextHolder):
pass

class LazyJSONSerializer(JSONSerializer):
def loads(self, s):
meta = BenchmarkAsyncOpenSearch.request_context.get()
Expand All @@ -202,9 +207,6 @@ async def on_request_end(session, trace_config_ctx, params):
self.client_options["serializer"] = LazyJSONSerializer()
self.client_options["trace_config"] = trace_config

class BenchmarkAsyncOpenSearch(opensearchpy.AsyncOpenSearch, RequestContextHolder):
pass

if self.provider:
self.logger.info("Creating OpenSearch Async Client with provider %s", self.provider)
return self.provider.create_client(self.hosts, self.client_options,
Expand Down Expand Up @@ -278,3 +280,124 @@ async def create(params):
return await KafkaMessageProducer.create(params)
else:
raise ValueError(f"Unsupported ingestion source type: {producer_type}")


class GrpcClientFactory:
"""
Factory for creating gRPC client stubs.
Note gRPC channels must default `use_local_subchannel_pool` to true.
Sub channels manage the underlying connection with the server. When the global sub channel pool is used gRPC will
re-use sub channels and their underlying connections which does not appropriately reflect a multi client scenario.
Comment on lines +289 to +290
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably could be clearer: using local subchannels permits additional connections to be created each with their own pools, which can improve performance.

"""
def __init__(self, grpc_hosts):
self.grpc_hosts = grpc_hosts
self.logger = logging.getLogger(__name__)
self.grpc_channel_options = [
('grpc.use_local_subchannel_pool', 1),
('grpc.max_send_message_length', 10 * 1024 * 1024), # 10 MB
('grpc.max_receive_message_length', 10 * 1024 * 1024) # 10 MB
]

def create_grpc_stubs(self):
"""
Create gRPC service stubs.
Returns a dict of {cluster_name: {service_name: stub}} structure.
"""
stubs = {}

if len(self.grpc_hosts.all_hosts.items()) > 1:
raise NotImplementedError("Only one gRPC cluster is supported.")

if len(self.grpc_hosts.all_hosts["default"]) > 1:
raise NotImplementedError("Only one gRPC host is supported.")

host = self.grpc_hosts.all_hosts["default"][0]
grpc_addr = f"{host['host']}:{host['port']}"

self.logger.info("Creating gRPC channel for cluster default cluster at %s", grpc_addr)
channel = grpc.aio.insecure_channel(
target=grpc_addr,
options=self.grpc_channel_options,
compression=None
)

# Retain a reference to underlying channel in our stubs dictionary for graceful shutdown.
stubs["default"] = {
'document_service': DocumentServiceStub(channel),
'search_service': SearchServiceStub(channel),
'_channel': channel
}

return stubs


class UnifiedClient:
"""
Unified client that wraps both OpenSearch REST client and gRPC stubs.
This provides a single interface for runners to access both protocols.
Acts as a transparent proxy to the OpenSearch client while adding gRPC capabilities.
"""
def __init__(self, opensearch_client, grpc_stubs=None):
self._opensearch = opensearch_client
self._grpc_stubs = grpc_stubs
self._logger = logging.getLogger(__name__)

def __getattr__(self, name):
"""Delegate all unknown attributes to the underlying OpenSearch client."""
return getattr(self._opensearch, name)

def document_service(self, cluster_name="default"):
"""Get the gRPC DocumentService stub for the specified cluster."""
if cluster_name in self._grpc_stubs:
return self._grpc_stubs[cluster_name].get('document_service')
else:
raise exceptions.SystemSetupError(
"gRPC DocumentService not available. Please configure --grpc-target-hosts.")

def search_service(self, cluster_name="default"):
"""Get the gRPC SearchService stub for the specified cluster."""
if cluster_name in self._grpc_stubs:
return self._grpc_stubs[cluster_name].get('search_service')
else:
raise exceptions.SystemSetupError(
"gRPC SearchService not available. Please configure --grpc-target-hosts.")

def __del__(self):
"""Close all gRPC channels."""
for cluster_stubs in self._grpc_stubs.values():
if '_channel' in cluster_stubs:
try:
cluster_stubs['_channel'].close()
except Exception as e:
self._logger.warning("Error closing gRPC channel: %s", e)
self._opensearch.close()

@property
def opensearch(self):
"""Provide access to the underlying OpenSearch client for explicit access."""
return self._opensearch


class UnifiedClientFactory:
"""
Factory that creates UnifiedClient instances with both REST and gRPC support.
"""
def __init__(self, rest_client_factory, grpc_hosts=None):
self.rest_client_factory = rest_client_factory
self.grpc_hosts = grpc_hosts
self.logger = logging.getLogger(__name__)

def create(self):
"""Non async client is deprecated."""
raise NotImplementedError()

def create_async(self):
"""Create a UnifiedClient with async REST client."""
opensearch_client = self.rest_client_factory.create_async()
grpc_stubs = None

if self.grpc_hosts:
grpc_factory = GrpcClientFactory(self.grpc_hosts)
grpc_stubs = grpc_factory.create_grpc_stubs()

return UnifiedClient(opensearch_client, grpc_stubs)
71 changes: 71 additions & 0 deletions osbenchmark/worker_coordinator/proto_helpers/ProtoBulkHelper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from opensearch.protobufs.schemas import document_pb2

def _parse_docs_from_body(body):
index_op_lines = body.decode('utf-8').split('\n')
doc_list = []
for doc in index_op_lines[1::2]:
doc_list.append(doc)
return doc_list

class ProtoBulkHelper:
# Build protobuf SearchRequest.
# Consumed from params dictionary:
# * ``body``: JSON body of bulk ingest request
# * ``index``: index name
@staticmethod
def build_proto_request(params):
index = params.get("index")
body = params.get("body")
doc_list = _parse_docs_from_body(body)
request = document_pb2.BulkRequest()
request.index = index
# All bulk requests here are index ops
op_container = document_pb2.OperationContainer()
op_container.index.CopyFrom(document_pb2.IndexOperation())
for doc in doc_list:
request_body = document_pb2.BulkRequestBody()
request_body.object = doc.encode('utf-8')
request_body.operation_container.CopyFrom(op_container)
request.request_body.append(request_body)
return request

# Parse stats from protobuf response.
# Consumed from params dictionary:
# ``index``: index name
# ``bulk-size``: documents per bulk request
# ``unit``: in the case of bulk always 'ops'
# ``detailed-results``: gRPC/Protobuf does not support detailed results at this time.
@staticmethod
def build_stats(response : document_pb2.BulkResponse, params):
if params.get("detailed-results"):
raise Exception("Detailed results not supported for gRPC bulk requests")

took = None
error_count = 0
success_count = 0
if response.errors:
error_count = params.get("bulk-size")
else:
took = response.took
for item in response.items:
# status field mirrors http code conventions
# https://github.com/opensearch-project/opensearch-protobufs/blob/b6f889416da83b7dc4a0408347965e7820bd61d0/protos/schemas/document.proto#L217-L219
if item.index.status > 299:
error_count += 1
else:
success_count += 1

meta_data = {
"index": params.get("index"),
"weight": params.get("bulk-size"),
"unit": params.get("unit"),
"took": took,
"success": error_count == 0,
"success-count": success_count,
"error-count": error_count,
}

if error_count > 0:
meta_data["error-type"] = "bulk"

return meta_data
Loading
Loading