Skip to content

Commit 1269cdc

Browse files
authored
feat(AsyncOpenSearch): consistent pool_maxsize setting (#845)
Signed-off-by: samypr100 <[email protected]>
1 parent 6fa54a7 commit 1269cdc

File tree

6 files changed

+92
-3
lines changed

6 files changed

+92
-3
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
44
## [Unreleased]
55
### Added
66
- Added `AsyncSearch#collapse` ([827](https://github.com/opensearch-project/opensearch-py/pull/827))
7+
- Support `pool_maxsize` in `AsyncOpenSearch` ([845](https://github.com/opensearch-project/opensearch-py/pull/845))
78
### Changed
89
### Deprecated
910
### Removed

opensearchpy/_async/helpers/test.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66
#
77
# Modifications Copyright OpenSearch Contributors. See
88
# GitHub history for details.
9-
9+
import asyncio
1010
import os
11-
import time
1211
from typing import Any
1312
from unittest import SkipTest
1413

@@ -37,7 +36,7 @@ async def get_test_client(nowait: bool = False, **kwargs: Any) -> Any:
3736
await client.cluster.health(wait_for_status="yellow")
3837
return client
3938
except ConnectionError:
40-
time.sleep(0.1)
39+
await asyncio.sleep(0.1)
4140
else:
4241
# timeout
4342
raise SkipTest("OpenSearch failed to start.")

opensearchpy/_async/http_aiohttp.py

+5
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ def __init__(
137137
url_prefix=url_prefix,
138138
timeout=timeout,
139139
use_ssl=use_ssl,
140+
maxsize=maxsize,
140141
headers=headers,
141142
http_compress=http_compress,
142143
opaque_id=opaque_id,
@@ -219,6 +220,10 @@ def __init__(
219220
self.loop = loop
220221
self.session = None
221222

223+
# Align with Sync Interface
224+
if "pool_maxsize" in kwargs:
225+
maxsize = kwargs.pop("pool_maxsize")
226+
222227
# Parameters for creating an aiohttp.ClientSession later.
223228
self._limit = maxsize
224229
self._http_auth = http_auth

opensearchpy/_async/transport.py

+4
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def __init__(
7474
serializers: Any = None,
7575
default_mimetype: str = "application/json",
7676
max_retries: int = 3,
77+
pool_maxsize: Optional[int] = None,
7778
retry_on_status: Any = (502, 503, 504),
7879
retry_on_timeout: bool = False,
7980
send_get_body_as: str = "GET",
@@ -102,6 +103,8 @@ def __init__(
102103
:arg default_mimetype: when no mimetype is specified by the server
103104
response assume this mimetype, defaults to `'application/json'`
104105
:arg max_retries: maximum number of retries before an exception is propagated
106+
:arg pool_maxsize: Maximum connection pool size used by pool-manager
107+
For custom connection-pooling on current session
105108
:arg retry_on_status: set of HTTP status codes on which we should retry
106109
on a different node. defaults to ``(502, 503, 504)``
107110
:arg retry_on_timeout: should timeout trigger a retry on different
@@ -134,6 +137,7 @@ def __init__(
134137
serializers=serializers,
135138
default_mimetype=default_mimetype,
136139
max_retries=max_retries,
140+
pool_maxsize=pool_maxsize,
137141
retry_on_status=retry_on_status,
138142
retry_on_timeout=retry_on_timeout,
139143
send_get_body_as=send_get_body_as,

opensearchpy/connection/http_async.py

+4
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ def __init__(
142142
self.loop = loop
143143
self.session = None
144144

145+
# Align with Sync Interface
146+
if "pool_maxsize" in kwargs:
147+
maxsize = kwargs.pop("pool_maxsize")
148+
145149
# Parameters for creating an aiohttp.ClientSession later.
146150
self._limit = maxsize
147151
self._http_auth = http_auth
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
#
3+
# The OpenSearch Contributors require contributions made to
4+
# this file be licensed under the Apache-2.0 license or a
5+
# compatible open source license.
6+
#
7+
# Modifications Copyright OpenSearch Contributors. See
8+
# GitHub history for details.
9+
import os
10+
from typing import Type
11+
12+
import pytest
13+
from pytest import MarkDecorator
14+
15+
from opensearchpy import (
16+
AIOHttpConnection,
17+
AsyncConnection,
18+
AsyncHttpConnection,
19+
AsyncOpenSearch,
20+
)
21+
from opensearchpy._async.helpers.test import get_test_client
22+
23+
pytestmark: MarkDecorator = pytest.mark.asyncio
24+
25+
26+
class TestAIOHttp:
27+
28+
def test_default(self) -> None:
29+
client = AsyncOpenSearch()
30+
assert client.transport.connection_class == AIOHttpConnection
31+
assert client.transport.pool_maxsize is None
32+
33+
def test_connection_class(self) -> None:
34+
client = AsyncOpenSearch(connection_class=AsyncHttpConnection)
35+
assert client.transport.connection_class == AsyncHttpConnection
36+
assert client.transport.pool_maxsize is None
37+
38+
def test_pool_maxsize(self) -> None:
39+
client = AsyncOpenSearch(connection_class=AsyncHttpConnection, pool_maxsize=42)
40+
assert client.transport.connection_class == AsyncHttpConnection
41+
assert client.transport.pool_maxsize == 42
42+
43+
@pytest.mark.parametrize( # type: ignore[misc]
44+
"connection_class", [AIOHttpConnection, AsyncHttpConnection]
45+
)
46+
async def test_default_limit(self, connection_class: Type[AsyncConnection]) -> None:
47+
client = await get_test_client(
48+
connection_class=connection_class,
49+
verify_certs=False,
50+
http_auth=("admin", os.getenv("OPENSEARCH_PASSWORD", "admin")),
51+
)
52+
assert isinstance(
53+
client.transport.connection_pool.connections[0], connection_class
54+
)
55+
assert (
56+
client.transport.connection_pool.connections[0].session.connector.limit # type: ignore[attr-defined]
57+
== 10
58+
)
59+
60+
@pytest.mark.parametrize( # type: ignore[misc]
61+
"connection_class", [AIOHttpConnection, AsyncHttpConnection]
62+
)
63+
async def test_custom_limit(self, connection_class: Type[AsyncConnection]) -> None:
64+
client = await get_test_client(
65+
connection_class=connection_class,
66+
verify_certs=False,
67+
pool_maxsize=42,
68+
http_auth=("admin", os.getenv("OPENSEARCH_PASSWORD", "admin")),
69+
)
70+
assert isinstance(
71+
client.transport.connection_pool.connections[0], connection_class
72+
)
73+
assert (
74+
client.transport.connection_pool.connections[0].session.connector.limit # type: ignore[attr-defined]
75+
== 42
76+
)

0 commit comments

Comments
 (0)