Skip to content

Commit 9f75699

Browse files
committed
fix: hanging mysql and failing pg integration tests
1 parent 74ae5de commit 9f75699

File tree

13 files changed

+106
-44
lines changed

13 files changed

+106
-44
lines changed

.github/workflows/integration_tests.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ on:
55
push:
66
branches:
77
- main
8+
- test/hanging-efm2-integration-test
89

910
permissions:
1011
id-token: write # This is required for requesting the JWT

aws_advanced_python_wrapper/states/session_state.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,6 @@ def copy(self):
9292
new_session_state.readonly = self.readonly.copy()
9393

9494
return new_session_state
95+
96+
def __str__(self):
97+
return f"autocommit: {self.auto_commit}, readonly: {self.readonly}"

aws_advanced_python_wrapper/utils/pg_exception_handler.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,19 @@ class PgExceptionHandler(ExceptionHandler):
2727
_PAM_AUTHENTICATION_FAILED_MSG = "PAM authentication failed"
2828
_CONNECTION_FAILED = "connection failed"
2929
_CONSUMING_INPUT_FAILED = "consuming input failed"
30+
_CONNECTION_SOCKET_CLOSED = "connection socket closed"
3031

31-
_NETWORK_ERRORS: List[str]
32-
_ACCESS_ERRORS: List[str]
32+
_NETWORK_ERROR_MESSAGES: List[str] = [
33+
_CONNECTION_FAILED,
34+
_CONSUMING_INPUT_FAILED,
35+
_CONNECTION_SOCKET_CLOSED
36+
]
37+
_ACCESS_ERROR_MESSAGES: List[str] = [
38+
_PASSWORD_AUTHENTICATION_FAILED_MSG,
39+
_PAM_AUTHENTICATION_FAILED_MSG
40+
]
41+
_NETWORK_ERROR_CODES: List[str]
42+
_ACCESS_ERROR_CODES: List[str]
3343

3444
def is_network_exception(self, error: Optional[Exception] = None, sql_state: Optional[str] = None) -> bool:
3545
if isinstance(error, QueryTimeoutError) or isinstance(error, ConnectionTimeout):
@@ -43,15 +53,15 @@ def is_network_exception(self, error: Optional[Exception] = None, sql_state: Opt
4353
# getattr may throw an AttributeError if the error does not have a `sqlstate` attribute
4454
pass
4555

46-
if sql_state is not None and sql_state in self._NETWORK_ERRORS:
56+
if sql_state is not None and sql_state in self._NETWORK_ERROR_CODES:
4757
return True
4858

4959
if isinstance(error, OperationalError):
5060
if len(error.args) == 0:
5161
return False
5262
# Check the error message if this is a generic error
5363
error_msg: str = error.args[0]
54-
return self._CONNECTION_FAILED in error_msg or self._CONSUMING_INPUT_FAILED in error_msg
64+
return any(msg in error_msg for msg in self._NETWORK_ERROR_MESSAGES)
5565

5666
return False
5767

@@ -63,7 +73,7 @@ def is_login_exception(self, error: Optional[Exception] = None, sql_state: Optio
6373
if sql_state is None and hasattr(error, "sqlstate") and error.sqlstate is not None:
6474
sql_state = error.sqlstate
6575

66-
if sql_state is not None and sql_state in self._ACCESS_ERRORS:
76+
if sql_state is not None and sql_state in self._ACCESS_ERROR_CODES:
6777
return True
6878

6979
if isinstance(error, OperationalError):
@@ -72,15 +82,14 @@ def is_login_exception(self, error: Optional[Exception] = None, sql_state: Optio
7282

7383
# Check the error message if this is a generic error
7484
error_msg: str = error.args[0]
75-
if self._PASSWORD_AUTHENTICATION_FAILED_MSG in error_msg \
76-
or self._PAM_AUTHENTICATION_FAILED_MSG in error_msg:
85+
if any(msg in error_msg for msg in self._ACCESS_ERROR_MESSAGES):
7786
return True
7887

7988
return False
8089

8190

8291
class SingleAzPgExceptionHandler(PgExceptionHandler):
83-
_NETWORK_ERRORS: List[str] = [
92+
_NETWORK_ERROR_CODES: List[str] = [
8493
"53", # insufficient resources
8594
"57P01", # admin shutdown
8695
"57P02", # crash shutdown
@@ -92,14 +101,14 @@ class SingleAzPgExceptionHandler(PgExceptionHandler):
92101
"XX" # internal error(backend)
93102
]
94103

95-
_ACCESS_ERRORS: List[str] = [
104+
_ACCESS_ERROR_CODES: List[str] = [
96105
"28000", # PAM authentication errors
97106
"28P01"
98107
]
99108

100109

101110
class MultiAzPgExceptionHandler(PgExceptionHandler):
102-
_NETWORK_ERRORS: List[str] = [
111+
_NETWORK_ERROR_CODES: List[str] = [
103112
"28000", # access denied during reboot, this should be considered a temporary failure
104113
"53", # insufficient resources
105114
"57P01", # admin shutdown
@@ -112,4 +121,4 @@ class MultiAzPgExceptionHandler(PgExceptionHandler):
112121
"XX" # internal error(backend)
113122
]
114123

115-
_ACCESS_ERRORS: List[str] = ["28P01"]
124+
_ACCESS_ERROR_CODES: List[str] = ["28P01"]

aws_advanced_python_wrapper/utils/telemetry/xray_telemetry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def post_copy(context: XRayTelemetryContext, trace_level: TelemetryTraceLevel):
7878
return
7979

8080
if trace_level in [TelemetryTraceLevel.FORCE_TOP_LEVEL, TelemetryTraceLevel.TOP_LEVEL]:
81-
with ThreadPoolExecutor() as executor:
81+
with ThreadPoolExecutor(thread_name_prefix=context.get_name()) as executor:
8282
future = executor.submit(_clone_and_close_context, context, trace_level)
8383
future.result()
8484
else:

aws_advanced_python_wrapper/writer_failover_handler.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,9 @@ def get_result_from_future(self, current_topology: Tuple[HostInfo, ...]) -> Writ
120120

121121
executor = ThreadPoolExecutor(thread_name_prefix="WriterFailoverHandlerExecutor")
122122
try:
123+
futures = [executor.submit(self.reconnect_to_writer, writer_host),
124+
executor.submit(self.wait_for_new_writer, current_topology, writer_host)]
123125
try:
124-
futures = [executor.submit(self.reconnect_to_writer, writer_host),
125-
executor.submit(self.wait_for_new_writer, current_topology, writer_host)]
126126
for future in as_completed(futures, timeout=self._max_failover_timeout_sec):
127127
result = future.result()
128128
if result.is_connected:
@@ -132,9 +132,10 @@ def get_result_from_future(self, current_topology: Tuple[HostInfo, ...]) -> Writ
132132
return result
133133
except TimeoutError:
134134
self._timeout_event.set()
135-
finally:
136-
self._timeout_event.set()
135+
for future in futures:
136+
future.cancel()
137137
finally:
138+
self._timeout_event.set()
138139
executor.shutdown(wait=False)
139140

140141
return WriterFailoverHandlerImpl.failed_writer_failover_result
@@ -184,9 +185,9 @@ def reconnect_to_writer(self, initial_writer_host: HostInfo):
184185

185186
if latest_topology is None or len(latest_topology) == 0:
186187
sleep(self._reconnect_writer_interval_sec)
187-
else:
188-
success = self.is_current_host_writer(latest_topology, initial_writer_host)
189188

189+
success = self.is_current_host_writer(latest_topology, initial_writer_host)
190+
logger.debug("[TaskA] success: " + str(success))
190191
self._plugin_service.set_availability(initial_writer_host.as_aliases(), HostAvailability.AVAILABLE)
191192
return WriterFailoverResult(success, False, latest_topology, conn if success else None, "TaskA", None)
192193

tests/integration/container/conftest.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ def pytest_runtest_setup(item):
6767
else:
6868
TestEnvironment.get_current().set_current_driver(None)
6969

70+
logger.info("Starting test preparation for: " + test_name)
71+
7072
segment: Optional[Segment] = None
7173
if TestEnvironmentFeatures.TELEMETRY_TRACES_ENABLED in TestEnvironment.get_current().get_features():
7274
segment = xray_recorder.begin_segment("test: setup")
@@ -92,7 +94,7 @@ def pytest_runtest_setup(item):
9294
# Wait up to 5min
9395
instances: List[str] = list()
9496
start_time = timeit.default_timer()
95-
while (len(instances) != request.get_num_of_instances()
97+
while (len(instances) < request.get_num_of_instances()
9698
or len(instances) == 0
9799
or not rds_utility.is_db_instance_writer(instances[0])) and (
98100
timeit.default_timer() - start_time) < 300: # 5 min

tests/integration/container/test_aurora_failover.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
from .utils.test_environment import TestEnvironment
4040
from .utils.test_environment_features import TestEnvironmentFeatures
4141

42+
logger = Logger(__name__)
43+
4244

4345
@enable_on_num_instances(min_instances=2)
4446
@enable_on_deployments([DatabaseEngineDeployment.AURORA, DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER])
@@ -49,20 +51,34 @@ class TestAuroraFailover:
4951
IDLE_CONNECTIONS_NUM: int = 5
5052
logger = Logger(__name__)
5153

54+
@pytest.fixture(autouse=True)
55+
def setup_method(self, request):
56+
self.logger.info(f"Starting test: {request.node.name}")
57+
yield
58+
self.logger.info(f"Ending test: {request.node.name}")
59+
5260
@pytest.fixture(scope='class')
5361
def aurora_utility(self):
5462
region: str = TestEnvironment.get_current().get_info().get_region()
5563
return RdsTestUtility(region)
5664

5765
@pytest.fixture(scope='class')
5866
def props(self):
59-
p: Properties = Properties({"plugins": "failover", "connect_timeout": 60, "topology_refresh_ms": 10, "autocommit": True})
67+
p: Properties = Properties({
68+
"plugins": "failover",
69+
"socket_timeout": 30,
70+
"connect_timeout": 10,
71+
"monitoring-connect_timeout": 5,
72+
"monitoring-socket_timeout": 5,
73+
"topology_refresh_ms": 10,
74+
"autocommit": True
75+
})
6076

6177
features = TestEnvironment.get_current().get_features()
6278
if TestEnvironmentFeatures.TELEMETRY_TRACES_ENABLED in features \
6379
or TestEnvironmentFeatures.TELEMETRY_METRICS_ENABLED in features:
64-
WrapperProperties.ENABLE_TELEMETRY.set(p, True)
65-
WrapperProperties.TELEMETRY_SUBMIT_TOPLEVEL.set(p, True)
80+
WrapperProperties.ENABLE_TELEMETRY.set(p, False)
81+
WrapperProperties.TELEMETRY_SUBMIT_TOPLEVEL.set(p, False)
6682
if TestEnvironmentFeatures.TELEMETRY_TRACES_ENABLED in features:
6783
WrapperProperties.TELEMETRY_TRACES_BACKEND.set(p, "XRAY")
6884
if TestEnvironmentFeatures.TELEMETRY_METRICS_ENABLED in features:

tests/integration/container/test_basic_connectivity.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,15 @@ def test_proxied_wrapper_connection_failed(
131131
@enable_on_deployments([DatabaseEngineDeployment.AURORA, DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER])
132132
@enable_on_features([TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
133133
def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: TestDriver, conn_utils, plugins):
134+
props: Properties = Properties({
135+
WrapperProperties.PLUGINS.name: plugins,
136+
"socket_timeout": 5,
137+
"connect_timeout": 5,
138+
"monitoring-connect_timeout": 3,
139+
"monitoring-socket_timeout": 3,
140+
"autocommit": True})
134141
target_driver_connect = DriverHelper.get_connect_func(test_driver)
135-
conn = AwsWrapperConnection.connect(
136-
target_driver_connect,
137-
**conn_utils.get_connect_params(conn_utils.reader_cluster_host),
138-
plugins=plugins, connect_timeout=10)
142+
conn = AwsWrapperConnection.connect(target_driver_connect, **conn_utils.get_connect_params(conn_utils.reader_cluster_host), **props)
139143
cursor = conn.cursor()
140144
cursor.execute("SELECT 1")
141145
result = cursor.fetchone()

tests/integration/container/test_host_monitoring_v2.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
from aws_advanced_python_wrapper.utils.properties import (Properties,
2323
WrapperProperties)
2424
from tests.integration.container.utils.conditions import (
25-
disable_on_features, enable_on_deployments)
25+
disable_on_engines, disable_on_features, enable_on_deployments)
26+
from tests.integration.container.utils.database_engine import DatabaseEngine
2627
from tests.integration.container.utils.database_engine_deployment import \
2728
DatabaseEngineDeployment
2829
from tests.integration.container.utils.driver_helper import DriverHelper
@@ -44,6 +45,7 @@
4445
@disable_on_features([TestEnvironmentFeatures.PERFORMANCE,
4546
TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY,
4647
TestEnvironmentFeatures.BLUE_GREEN_DEPLOYMENT])
48+
@disable_on_engines([DatabaseEngine.MYSQL])
4749
class TestHostMonitoringV2:
4850
@pytest.fixture(scope='class')
4951
def rds_utils(self):
@@ -55,6 +57,8 @@ def props(self):
5557
p: Properties = Properties({"plugins": "host_monitoring_v2",
5658
"socket_timeout": 30,
5759
"connect_timeout": 10,
60+
"monitoring-connect_timeout": 5,
61+
"monitoring-socket_timeout": 5,
5862
"failure_detection_time_ms": 5_000,
5963
"failure_detection_interval_ms": 5_000,
6064
"failure_detection_count": 1,

tests/integration/container/test_read_write_splitting.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,11 @@ def props(self):
8282
@pytest.fixture(scope='class')
8383
def failover_props(self):
8484
return {
85-
"plugins": "read_write_splitting,failover", "connect_timeout": 10, "autocommit": True}
85+
"plugins": "read_write_splitting,failover",
86+
"socket_timeout": 10,
87+
"connect_timeout": 10,
88+
"autocommit": True
89+
}
8690

8791
@pytest.fixture(scope='class')
8892
def proxied_props(self, props, conn_utils):
@@ -317,9 +321,6 @@ def test_failover_to_new_writer__switch_read_only(
317321
target_driver_connect = DriverHelper.get_connect_func(test_driver)
318322
connect_params = conn_utils.get_proxy_connect_params()
319323

320-
# To prevent endless waiting while executing SQL queries
321-
WrapperProperties.SOCKET_TIMEOUT_SEC.set(connect_params, 10)
322-
323324
with AwsWrapperConnection.connect(target_driver_connect, **connect_params, **proxied_failover_props) as conn:
324325
original_writer_id = rds_utils.query_instance_id(conn)
325326

@@ -349,15 +350,13 @@ def test_failover_to_new_writer__switch_read_only(
349350
current_id = rds_utils.query_instance_id(conn)
350351
assert new_writer_id == current_id
351352

352-
@pytest.mark.parametrize("plugins", ["read_write_splitting,failover,host_monitoring_v2"])
353353
@enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
354354
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
355355
@enable_on_num_instances(min_instances=3)
356356
@disable_on_engines([DatabaseEngine.MYSQL])
357357
def test_failover_to_new_reader__switch_read_only(
358358
self, test_environment: TestEnvironment, test_driver: TestDriver,
359-
proxied_failover_props, conn_utils, rds_utils, plugins):
360-
WrapperProperties.PLUGINS.set(proxied_failover_props, plugins)
359+
proxied_failover_props, conn_utils, rds_utils):
361360
WrapperProperties.FAILOVER_MODE.set(proxied_failover_props, "reader-or-writer")
362361

363362
target_driver_connect = DriverHelper.get_connect_func(test_driver)
@@ -398,16 +397,13 @@ def test_failover_to_new_reader__switch_read_only(
398397
current_id = rds_utils.query_instance_id(conn)
399398
assert other_reader_id == current_id
400399

401-
@pytest.mark.parametrize("plugins", ["read_write_splitting,failover,host_monitoring",
402-
"read_write_splitting,failover,host_monitoring_v2"])
403400
@enable_on_features([TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
404401
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
405402
@enable_on_num_instances(min_instances=3)
406403
@disable_on_engines([DatabaseEngine.MYSQL])
407404
def test_failover_reader_to_writer__switch_read_only(
408405
self, test_environment: TestEnvironment, test_driver: TestDriver,
409-
proxied_failover_props, conn_utils, rds_utils, plugins):
410-
WrapperProperties.PLUGINS.set(proxied_failover_props, plugins)
406+
proxied_failover_props, conn_utils, rds_utils):
411407
target_driver_connect = DriverHelper.get_connect_func(test_driver)
412408
with AwsWrapperConnection.connect(
413409
target_driver_connect, **conn_utils.get_proxy_connect_params(), **proxied_failover_props) as conn:
@@ -519,19 +515,16 @@ def test_pooled_connection__cluster_url_failover(
519515
new_driver_conn = conn.target_connection
520516
assert initial_driver_conn is not new_driver_conn
521517

522-
@pytest.mark.parametrize("plugins", ["read_write_splitting,failover,host_monitoring",
523-
"read_write_splitting,failover,host_monitoring_v2"])
524518
@enable_on_features([TestEnvironmentFeatures.FAILOVER_SUPPORTED, TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED,
525519
TestEnvironmentFeatures.ABORT_CONNECTION_SUPPORTED])
526520
@disable_on_engines([DatabaseEngine.MYSQL])
527521
def test_pooled_connection__failover_failed(
528522
self, test_environment: TestEnvironment, test_driver: TestDriver,
529-
rds_utils, conn_utils, proxied_failover_props, plugins):
523+
rds_utils, conn_utils, proxied_failover_props):
530524
writer_host = test_environment.get_writer().get_host()
531525
provider = SqlAlchemyPooledConnectionProvider(lambda _, __: {"pool_size": 1}, None, lambda host_info, props: writer_host in host_info.host)
532526
ConnectionProviderManager.set_connection_provider(provider)
533527

534-
WrapperProperties.PLUGINS.set(proxied_failover_props, plugins)
535528
WrapperProperties.FAILOVER_TIMEOUT_SEC.set(proxied_failover_props, "1")
536529
WrapperProperties.FAILURE_DETECTION_TIME_MS.set(proxied_failover_props, "1000")
537530
WrapperProperties.FAILURE_DETECTION_COUNT.set(proxied_failover_props, "1")

0 commit comments

Comments
 (0)