diff --git a/aws-advanced-jdbc-wrapper-bundle/build.gradle.kts b/aws-advanced-jdbc-wrapper-bundle/build.gradle.kts index 0169be9c3..af5dc4275 100644 --- a/aws-advanced-jdbc-wrapper-bundle/build.gradle.kts +++ b/aws-advanced-jdbc-wrapper-bundle/build.gradle.kts @@ -25,8 +25,8 @@ repositories { dependencies { implementation("org.apache.httpcomponents:httpclient:4.5.14") - implementation("software.amazon.awssdk:rds:2.31.78") - implementation("software.amazon.awssdk:sts:2.31.78") + implementation("software.amazon.awssdk:rds:2.32.21") + implementation("software.amazon.awssdk:sts:2.32.21") implementation(project(":aws-advanced-jdbc-wrapper")) } diff --git a/examples/AWSDriverExample/build.gradle.kts b/examples/AWSDriverExample/build.gradle.kts index 4aef0d218..fd8a5495c 100644 --- a/examples/AWSDriverExample/build.gradle.kts +++ b/examples/AWSDriverExample/build.gradle.kts @@ -18,9 +18,9 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-jdbc:2.7.13") // 2.7.13 is the last version compatible with Java 8 implementation("org.postgresql:postgresql:42.7.7") implementation("com.mysql:mysql-connector-j:9.3.0") - implementation("software.amazon.awssdk:rds:2.31.78") - implementation("software.amazon.awssdk:secretsmanager:2.31.12") - implementation("software.amazon.awssdk:sts:2.31.78") + implementation("software.amazon.awssdk:rds:2.32.21") + implementation("software.amazon.awssdk:secretsmanager:2.32.21") + implementation("software.amazon.awssdk:sts:2.32.21") implementation("com.fasterxml.jackson.core:jackson-databind:2.19.0") implementation(project(":aws-advanced-jdbc-wrapper")) implementation("io.opentelemetry:opentelemetry-api:1.52.0") diff --git a/examples/DBCPExample/build.gradle.kts b/examples/DBCPExample/build.gradle.kts index 3edc2033c..abd54cd60 100644 --- a/examples/DBCPExample/build.gradle.kts +++ b/examples/DBCPExample/build.gradle.kts @@ -19,5 +19,5 @@ dependencies { implementation("com.mysql:mysql-connector-j:9.3.0") implementation(project(":aws-advanced-jdbc-wrapper")) implementation("org.apache.commons:commons-dbcp2:2.13.0") - implementation("software.amazon.awssdk:rds:2.31.50") + implementation("software.amazon.awssdk:rds:2.32.21") } diff --git a/examples/SpringHibernateBalancedReaderOneDataSourceExample/build.gradle.kts b/examples/SpringHibernateBalancedReaderOneDataSourceExample/build.gradle.kts index 0e4a9890d..0fdd7a991 100644 --- a/examples/SpringHibernateBalancedReaderOneDataSourceExample/build.gradle.kts +++ b/examples/SpringHibernateBalancedReaderOneDataSourceExample/build.gradle.kts @@ -23,6 +23,6 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-data-jpa") implementation("org.springframework.retry:spring-retry") implementation("org.postgresql:postgresql:42.7.7") - implementation("software.amazon.awssdk:rds:2.31.50") + implementation("software.amazon.awssdk:rds:2.32.21") implementation(project(":aws-advanced-jdbc-wrapper")) } diff --git a/examples/SpringHibernateBalancedReaderTwoDataSourceExample/build.gradle.kts b/examples/SpringHibernateBalancedReaderTwoDataSourceExample/build.gradle.kts index 0e4a9890d..0fdd7a991 100644 --- a/examples/SpringHibernateBalancedReaderTwoDataSourceExample/build.gradle.kts +++ b/examples/SpringHibernateBalancedReaderTwoDataSourceExample/build.gradle.kts @@ -23,6 +23,6 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-data-jpa") implementation("org.springframework.retry:spring-retry") implementation("org.postgresql:postgresql:42.7.7") - implementation("software.amazon.awssdk:rds:2.31.50") + implementation("software.amazon.awssdk:rds:2.32.21") implementation(project(":aws-advanced-jdbc-wrapper")) } diff --git a/examples/SpringHibernateExample/build.gradle.kts b/examples/SpringHibernateExample/build.gradle.kts index b7c96d490..90ca464a3 100644 --- a/examples/SpringHibernateExample/build.gradle.kts +++ b/examples/SpringHibernateExample/build.gradle.kts @@ -23,6 +23,6 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-data-jpa") implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.postgresql:postgresql:42.7.7") - implementation("software.amazon.awssdk:rds:2.31.50") + implementation("software.amazon.awssdk:rds:2.32.21") implementation(project(":aws-advanced-jdbc-wrapper")) } diff --git a/examples/SpringTxFailoverExample/README.md b/examples/SpringTxFailoverExample/README.md index 33351a887..a83b97330 100644 --- a/examples/SpringTxFailoverExample/README.md +++ b/examples/SpringTxFailoverExample/README.md @@ -102,7 +102,7 @@ dependencies { implementation("org.springframework.retry:spring-retry:1.3.4") implementation("org.springframework:spring-aspects:5.3.29") implementation("org.postgresql:postgresql:42.5.4") - implementation("software.amazon.awssdk:rds:2.29.23") + implementation("software.amazon.awssdk:rds:2.32.21") implementation("software.amazon.jdbc:aws-advanced-jdbc-wrapper:latest") } ``` diff --git a/examples/SpringWildflyExample/spring/build.gradle.kts b/examples/SpringWildflyExample/spring/build.gradle.kts index 6f5712580..25ae9059d 100644 --- a/examples/SpringWildflyExample/spring/build.gradle.kts +++ b/examples/SpringWildflyExample/spring/build.gradle.kts @@ -24,6 +24,6 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-web") runtimeOnly("org.springframework.boot:spring-boot-devtools") implementation("org.postgresql:postgresql:42.7.7") - implementation("software.amazon.awssdk:rds:2.31.50") + implementation("software.amazon.awssdk:rds:2.32.21") implementation(project(":aws-advanced-jdbc-wrapper")) } diff --git a/wrapper/build.gradle.kts b/wrapper/build.gradle.kts index 4294ddbab..1fe142d8b 100644 --- a/wrapper/build.gradle.kts +++ b/wrapper/build.gradle.kts @@ -34,13 +34,13 @@ if (useJacoco) { dependencies { implementation("org.checkerframework:checker-qual:3.49.5") compileOnly("org.apache.httpcomponents:httpclient:4.5.14") - compileOnly("software.amazon.awssdk:rds:2.32.15") - compileOnly("software.amazon.awssdk:auth:2.31.45") // Required for IAM (light implementation) - compileOnly("software.amazon.awssdk:http-client-spi:2.32.11") // Required for IAM (light implementation) - compileOnly("software.amazon.awssdk:sts:2.32.15") + compileOnly("software.amazon.awssdk:rds:2.32.21") + compileOnly("software.amazon.awssdk:auth:2.32.21") // Required for IAM (light implementation) + compileOnly("software.amazon.awssdk:http-client-spi:2.32.21") // Required for IAM (light implementation) + compileOnly("software.amazon.awssdk:sts:2.32.21") compileOnly("com.zaxxer:HikariCP:4.0.3") // Version 4.+ is compatible with Java 8 compileOnly("com.mchange:c3p0:0.11.0") - compileOnly("software.amazon.awssdk:secretsmanager:2.31.12") + compileOnly("software.amazon.awssdk:secretsmanager:2.32.21") compileOnly("com.fasterxml.jackson.core:jackson-databind:2.19.0") compileOnly("com.mysql:mysql-connector-j:9.3.0") compileOnly("org.postgresql:postgresql:42.7.7") @@ -70,12 +70,12 @@ dependencies { testImplementation("com.mchange:c3p0:0.11.0") testImplementation("org.springframework.boot:spring-boot-starter-jdbc:2.7.13") // 2.7.13 is the last version compatible with Java 8 testImplementation("org.mockito:mockito-inline:4.11.0") // 4.11.0 is the last version compatible with Java 8 - testImplementation("software.amazon.awssdk:rds:2.32.15") - testImplementation("software.amazon.awssdk:auth:2.31.45") // Required for IAM (light implementation) - testImplementation("software.amazon.awssdk:http-client-spi:2.32.11") // Required for IAM (light implementation) - testImplementation("software.amazon.awssdk:ec2:2.32.15") - testImplementation("software.amazon.awssdk:secretsmanager:2.31.12") - testImplementation("software.amazon.awssdk:sts:2.32.15") + testImplementation("software.amazon.awssdk:rds:2.32.21") + testImplementation("software.amazon.awssdk:auth:2.32.21") // Required for IAM (light implementation) + testImplementation("software.amazon.awssdk:http-client-spi:2.32.21") // Required for IAM (light implementation) + testImplementation("software.amazon.awssdk:ec2:2.32.21") + testImplementation("software.amazon.awssdk:secretsmanager:2.32.21") + testImplementation("software.amazon.awssdk:sts:2.32.21") // Note: all org.testcontainers dependencies should have the same version testImplementation("org.testcontainers:testcontainers:1.21.2") testImplementation("org.testcontainers:mysql:1.21.2") diff --git a/wrapper/src/main/java/software/amazon/jdbc/Driver.java b/wrapper/src/main/java/software/amazon/jdbc/Driver.java index f89cdf5ad..e4f5a14af 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/Driver.java +++ b/wrapper/src/main/java/software/amazon/jdbc/Driver.java @@ -44,13 +44,11 @@ import software.amazon.jdbc.plugin.OpenedConnectionTracker; import software.amazon.jdbc.plugin.customendpoint.CustomEndpointMonitorImpl; import software.amazon.jdbc.plugin.efm.HostMonitorThreadContainer; -import software.amazon.jdbc.plugin.efm2.HostMonitorServiceImpl; import software.amazon.jdbc.plugin.federatedauth.FederatedAuthCacheHolder; import software.amazon.jdbc.plugin.federatedauth.OktaAuthCacheHolder; import software.amazon.jdbc.plugin.iam.IamAuthCacheHolder; import software.amazon.jdbc.plugin.limitless.LimitlessRouterServiceImpl; import software.amazon.jdbc.plugin.strategy.fastestresponse.FastestResponseStrategyPlugin; -import software.amazon.jdbc.plugin.strategy.fastestresponse.HostResponseTimeServiceImpl; import software.amazon.jdbc.profile.ConfigurationProfile; import software.amazon.jdbc.profile.DriverConfigurationProfiles; import software.amazon.jdbc.states.ResetSessionStateOnCloseCallable; @@ -440,11 +438,9 @@ public static void clearCaches() { public static void releaseResources() { CoreServicesContainer.getInstance().getMonitorService().stopAndRemoveAll(); - HostMonitorServiceImpl.closeAllMonitors(); HostMonitorThreadContainer.releaseInstance(); ConnectionProviderManager.releaseResources(); HikariPoolsHolder.closeAllPools(); - HostResponseTimeServiceImpl.closeAllMonitors(); clearCaches(); } diff --git a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/RdsMultiAzDbClusterListProvider.java b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/RdsMultiAzDbClusterListProvider.java index f6b5106a6..a63323176 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/RdsMultiAzDbClusterListProvider.java +++ b/wrapper/src/main/java/software/amazon/jdbc/hostlistprovider/RdsMultiAzDbClusterListProvider.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Properties; import java.util.logging.Logger; -import software.amazon.jdbc.HostListProviderService; import software.amazon.jdbc.HostRole; import software.amazon.jdbc.HostSpec; import software.amazon.jdbc.hostavailability.HostAvailability; diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/HostMonitorThreadContainer.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/HostMonitorThreadContainer.java index 266e64630..e65b21116 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/HostMonitorThreadContainer.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm/HostMonitorThreadContainer.java @@ -36,7 +36,6 @@ public class HostMonitorThreadContainer { private static HostMonitorThreadContainer singleton = null; private final Map> tasksMap = new ConcurrentHashMap<>(); - // TODO: remove monitorMap and threadPool and submit monitors to MonitorService instead private final Map monitorMap = new ConcurrentHashMap<>(); private final ExecutorService threadPool; private static final ReentrantLock LOCK_OBJECT = new ReentrantLock(); diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitorImpl.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitorImpl.java index 14baa6669..5a858b856 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitorImpl.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitorImpl.java @@ -27,17 +27,16 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; import org.checkerframework.checker.nullness.qual.NonNull; import software.amazon.jdbc.HostSpec; -import software.amazon.jdbc.PluginService; import software.amazon.jdbc.util.ExecutorFactory; import software.amazon.jdbc.util.Messages; import software.amazon.jdbc.util.PropertyUtils; +import software.amazon.jdbc.util.connection.ConnectionService; +import software.amazon.jdbc.util.monitoring.AbstractMonitor; import software.amazon.jdbc.util.telemetry.TelemetryContext; import software.amazon.jdbc.util.telemetry.TelemetryCounter; import software.amazon.jdbc.util.telemetry.TelemetryFactory; @@ -47,10 +46,11 @@ * This class uses a background thread to monitor a particular server with one or more active {@link * Connection}. */ -public class HostMonitorImpl implements HostMonitor { +public class HostMonitorImpl extends AbstractMonitor implements HostMonitor { private static final Logger LOGGER = Logger.getLogger(HostMonitorImpl.class.getName()); private static final long THREAD_SLEEP_NANO = TimeUnit.MILLISECONDS.toNanos(100); + private static final long TERMINATION_TIMEOUT_SEC = 30; private static final String MONITORING_PROPERTY_PREFIX = "monitoring-"; protected static final Executor ABORT_EXECUTOR = @@ -59,15 +59,11 @@ public class HostMonitorImpl implements HostMonitor { private final Queue> activeContexts = new ConcurrentLinkedQueue<>(); private final Map>> newContexts = new ConcurrentHashMap<>(); - private final PluginService pluginService; + private final ConnectionService connectionService; private final TelemetryFactory telemetryFactory; private final Properties properties; private final HostSpec hostSpec; - private final AtomicBoolean stopped = new AtomicBoolean(false); private Connection monitoringConn = null; - // TODO: remove and submit monitors to MonitorService instead - private final ExecutorService threadPool = - ExecutorFactory.newFixedThreadPool(2, "threadPool"); private final long failureDetectionTimeNano; private final long failureDetectionIntervalNano; @@ -82,7 +78,8 @@ public class HostMonitorImpl implements HostMonitor { /** * Store the monitoring configuration for a connection. * - * @param pluginService A service for creating new connections. + * @param connectionService The service to use to create the monitoring connection. + * @param telemetryFactory The telemetry factory to use to create telemetry data. * @param hostSpec The {@link HostSpec} of the server this {@link HostMonitorImpl} * instance is monitoring. * @param properties The {@link Properties} containing additional monitoring @@ -93,26 +90,24 @@ public class HostMonitorImpl implements HostMonitor { * @param abortedConnectionsCounter Aborted connection telemetry counter. */ public HostMonitorImpl( - final @NonNull PluginService pluginService, + final @NonNull ConnectionService connectionService, + final @NonNull TelemetryFactory telemetryFactory, final @NonNull HostSpec hostSpec, final @NonNull Properties properties, final int failureDetectionTimeMillis, final int failureDetectionIntervalMillis, final int failureDetectionCount, final TelemetryCounter abortedConnectionsCounter) { + super(TERMINATION_TIMEOUT_SEC, ExecutorFactory.newFixedThreadPool(2, "efm2-monitor")); - this.pluginService = pluginService; - this.telemetryFactory = pluginService.getTelemetryFactory(); + this.connectionService = connectionService; + this.telemetryFactory = telemetryFactory; this.hostSpec = hostSpec; this.properties = properties; this.failureDetectionTimeNano = TimeUnit.MILLISECONDS.toNanos(failureDetectionTimeMillis); this.failureDetectionIntervalNano = TimeUnit.MILLISECONDS.toNanos(failureDetectionIntervalMillis); this.failureDetectionCount = failureDetectionCount; this.abortedConnectionsCounter = abortedConnectionsCounter; - - this.threadPool.submit(this::newContextRun); // task to handle new contexts - this.threadPool.submit(this); // task to handle active monitoring contexts - this.threadPool.shutdown(); // No more tasks are accepted by pool. } @Override @@ -121,21 +116,15 @@ public boolean canDispose() { } @Override - public void close() throws Exception { - this.stopped.set(true); - - // Waiting for 30s gives a thread enough time to exit monitoring loop and close database connection. - if (!this.threadPool.awaitTermination(30, TimeUnit.SECONDS)) { - this.threadPool.shutdownNow(); - } - LOGGER.finest(() -> Messages.get( - "HostMonitorImpl.stopped", - new Object[] {this.hostSpec.getHost()})); + public void start() { + this.monitorExecutor.submit(this::newContextRun); // task to handle new contexts + this.monitorExecutor.submit(this); // task to handle active monitoring contexts + this.monitorExecutor.shutdown(); // No more tasks are accepted by pool. } @Override public void startMonitoring(final HostMonitorConnectionContext context) { - if (this.stopped.get()) { + if (this.stop.get()) { LOGGER.warning(() -> Messages.get("HostMonitorImpl.monitorIsStopped", new Object[] {this.hostSpec.getHost()})); } @@ -166,9 +155,9 @@ public void newContextRun() { new Object[] {this.hostSpec.getHost()})); try { - while (!this.stopped.get()) { - + while (!this.stop.get()) { final long currentTimeNano = this.getCurrentTimeNano(); + this.lastActivityTimestampNanos.set(currentTimeNano); final ArrayList processedKeys = new ArrayList<>(); this.newContexts.entrySet().stream() @@ -212,14 +201,14 @@ public void newContextRun() { } @Override - public void run() { + public void monitor() { LOGGER.finest(() -> Messages.get( "HostMonitorImpl.startMonitoringThread", new Object[] {this.hostSpec.getHost()})); try { - while (!this.stopped.get()) { + while (!this.stop.get()) { if (this.activeContexts.isEmpty() && !this.nodeUnhealthy) { TimeUnit.NANOSECONDS.sleep(THREAD_SLEEP_NANO); @@ -236,7 +225,7 @@ public void run() { WeakReference monitorContextWeakRef; while ((monitorContextWeakRef = this.activeContexts.poll()) != null) { - if (this.stopped.get()) { + if (this.stop.get()) { break; } @@ -284,7 +273,7 @@ public void run() { ex); // We want to print full trace stack of the exception. } } finally { - this.stopped.set(true); + this.stop.set(true); if (this.monitoringConn != null) { try { this.monitoringConn.close(); @@ -328,8 +317,7 @@ boolean checkConnectionStatus() { }); LOGGER.finest(() -> "Opening a monitoring connection to " + this.hostSpec.getUrl()); - // TODO: replace with ConnectionService#open - this.monitoringConn = this.pluginService.forceConnect(this.hostSpec, monitoringConnProperties); + this.monitoringConn = this.connectionService.open(this.hostSpec, monitoringConnProperties); LOGGER.finest(() -> "Opened monitoring connection: " + this.monitoringConn); return true; } @@ -402,4 +390,14 @@ private void abortConnection(final @NonNull Connection connectionToAbort) { } } + @Override + public void close() { + if (this.monitoringConn != null) { + try { + this.monitoringConn.close(); + } catch (SQLException e) { + // ignore + } + } + } } diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitorService.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitorService.java index 35a934057..f2cb5b2cd 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitorService.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitorService.java @@ -17,6 +17,7 @@ package software.amazon.jdbc.plugin.efm2; import java.sql.Connection; +import java.sql.SQLException; import java.util.Properties; import software.amazon.jdbc.HostSpec; @@ -32,7 +33,7 @@ HostMonitorConnectionContext startMonitoring( Properties properties, int failureDetectionTimeMillis, int failureDetectionIntervalMillis, - int failureDetectionCount); + int failureDetectionCount) throws SQLException; /** * Stop monitoring for a connection represented by the given {@link HostMonitorConnectionContext}. diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitorServiceImpl.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitorServiceImpl.java index 8b6b92f22..9970b4f39 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitorServiceImpl.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitorServiceImpl.java @@ -26,9 +26,11 @@ import software.amazon.jdbc.AwsWrapperProperty; import software.amazon.jdbc.HostSpec; import software.amazon.jdbc.PluginService; +import software.amazon.jdbc.util.CoreServicesContainer; import software.amazon.jdbc.util.ExecutorFactory; +import software.amazon.jdbc.util.FullServicesContainer; import software.amazon.jdbc.util.Messages; -import software.amazon.jdbc.util.storage.SlidingExpirationCacheWithCleanupThread; +import software.amazon.jdbc.util.monitoring.MonitorService; import software.amazon.jdbc.util.telemetry.TelemetryCounter; import software.amazon.jdbc.util.telemetry.TelemetryFactory; @@ -44,66 +46,32 @@ public class HostMonitorServiceImpl implements HostMonitorService { "monitorDisposalTime", "600000", // 10min "Interval in milliseconds for a monitor to be considered inactive and to be disposed."); - - protected static final long CACHE_CLEANUP_NANO = TimeUnit.MINUTES.toNanos(1); - protected static final Executor ABORT_EXECUTOR = ExecutorFactory.newSingleThreadExecutor("abort"); - // TODO: remove and submit monitors to MonitorService instead - protected static final SlidingExpirationCacheWithCleanupThread monitors = - new SlidingExpirationCacheWithCleanupThread<>( - HostMonitor::canDispose, - (monitor) -> { - try { - monitor.close(); - } catch (Exception ex) { - // ignore - } - }, - CACHE_CLEANUP_NANO); + protected final FullServicesContainer serviceContainer; protected final PluginService pluginService; - protected final HostMonitorInitializer monitorInitializer; + protected final MonitorService coreMonitorService; protected final TelemetryFactory telemetryFactory; protected final TelemetryCounter abortedConnectionsCounter; - public HostMonitorServiceImpl(final @NonNull PluginService pluginService) { - this( - pluginService, - (hostSpec, - properties, - failureDetectionTimeMillis, - failureDetectionIntervalMillis, - failureDetectionCount, - abortedConnectionsCounter) -> - new HostMonitorImpl( - pluginService, - hostSpec, - properties, - failureDetectionTimeMillis, - failureDetectionIntervalMillis, - failureDetectionCount, - abortedConnectionsCounter)); - } - - HostMonitorServiceImpl( - final @NonNull PluginService pluginService, - final @NonNull HostMonitorInitializer monitorInitializer) { - this.pluginService = pluginService; - this.telemetryFactory = pluginService.getTelemetryFactory(); + public HostMonitorServiceImpl(final @NonNull FullServicesContainer serviceContainer, Properties props) { + this.serviceContainer = serviceContainer; + this.coreMonitorService = serviceContainer.getMonitorService(); + this.pluginService = serviceContainer.getPluginService(); + this.telemetryFactory = serviceContainer.getTelemetryFactory(); this.abortedConnectionsCounter = telemetryFactory.createCounter("efm2.connections.aborted"); - this.monitorInitializer = monitorInitializer; + + this.coreMonitorService.registerMonitorTypeIfAbsent( + HostMonitorImpl.class, + TimeUnit.MILLISECONDS.toNanos(MONITOR_DISPOSAL_TIME_MS.getLong(props)), + TimeUnit.MINUTES.toNanos(3), + null, + null); } public static void closeAllMonitors() { - monitors.getEntries().values().forEach(monitor -> { - try { - monitor.close(); - } catch (Exception ex) { - // ignore - } - }); - monitors.clear(); + CoreServicesContainer.getInstance().getMonitorService().stopAndRemoveMonitors(HostMonitorImpl.class); } @Override @@ -113,7 +81,7 @@ public HostMonitorConnectionContext startMonitoring( final Properties properties, final int failureDetectionTimeMillis, final int failureDetectionIntervalMillis, - final int failureDetectionCount) { + final int failureDetectionCount) throws SQLException { final HostMonitor monitor = this.getMonitor( hostSpec, @@ -173,7 +141,7 @@ protected HostMonitor getMonitor( final Properties properties, final int failureDetectionTimeMillis, final int failureDetectionIntervalMillis, - final int failureDetectionCount) { + final int failureDetectionCount) throws SQLException { final String monitorKey = String.format("%d:%d:%d:%s", failureDetectionTimeMillis, @@ -181,18 +149,24 @@ protected HostMonitor getMonitor( failureDetectionCount, hostSpec.getUrl()); - final long cacheExpirationNano = TimeUnit.MILLISECONDS.toNanos( - MONITOR_DISPOSAL_TIME_MS.getLong(properties)); - - return monitors.computeIfAbsent( + return this.coreMonitorService.runIfAbsent( + HostMonitorImpl.class, monitorKey, - (key) -> monitorInitializer.createMonitor( + this.serviceContainer.getStorageService(), + this.telemetryFactory, + this.pluginService.getOriginalUrl(), + this.pluginService.getDriverProtocol(), + this.pluginService.getTargetDriverDialect(), + this.pluginService.getDialect(), + this.pluginService.getProperties(), + (connectionService, pluginService) -> new HostMonitorImpl( + connectionService, + pluginService.getTelemetryFactory(), hostSpec, properties, failureDetectionTimeMillis, failureDetectionIntervalMillis, failureDetectionCount, - this.abortedConnectionsCounter), - cacheExpirationNano); + this.abortedConnectionsCounter)); } } diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitoringConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitoringConnectionPlugin.java index 69da581b9..0e7472939 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitoringConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitoringConnectionPlugin.java @@ -36,9 +36,11 @@ import software.amazon.jdbc.PropertyDefinition; import software.amazon.jdbc.cleanup.CanReleaseResources; import software.amazon.jdbc.plugin.AbstractConnectionPlugin; +import software.amazon.jdbc.util.FullServicesContainer; import software.amazon.jdbc.util.Messages; import software.amazon.jdbc.util.RdsUrlType; import software.amazon.jdbc.util.RdsUtils; +import software.amazon.jdbc.util.WrapperUtils; /** * Monitor the server while the connection is executing methods for more sophisticated failure @@ -92,21 +94,24 @@ public class HostMonitoringConnectionPlugin extends AbstractConnectionPlugin /** * Initialize the node monitoring plugin. * - * @param pluginService A service allowing the plugin to retrieve the current active connection - * and its connection settings. - * @param properties The property set used to initialize the active connection. + * @param servicesContainer The service container for the services required by this class. + * @param properties The property set used to initialize the active connection. */ public HostMonitoringConnectionPlugin( - final @NonNull PluginService pluginService, final @NonNull Properties properties) { - this(pluginService, properties, () -> new HostMonitorServiceImpl(pluginService), new RdsUtils()); + final @NonNull FullServicesContainer servicesContainer, final @NonNull Properties properties) { + this( + servicesContainer, + properties, + () -> new HostMonitorServiceImpl(servicesContainer, properties), + new RdsUtils()); } HostMonitoringConnectionPlugin( - final @NonNull PluginService pluginService, + final @NonNull FullServicesContainer serviceContainer, final @NonNull Properties properties, final @NonNull Supplier monitorServiceSupplier, final RdsUtils rdsHelper) { - this.pluginService = pluginService; + this.pluginService = serviceContainer.getPluginService(); this.properties = properties; this.monitorServiceSupplier = monitorServiceSupplier; this.rdsHelper = rdsHelper; @@ -161,14 +166,18 @@ public T execute( final HostSpec monitoringHostSpec = this.getMonitoringHostSpec(); - monitorContext = - this.monitorService.startMonitoring( - this.pluginService.getCurrentConnection(), // abort this connection if needed - monitoringHostSpec, - this.properties, - failureDetectionTimeMillis, - failureDetectionIntervalMillis, - failureDetectionCount); + try { + monitorContext = + this.monitorService.startMonitoring( + this.pluginService.getCurrentConnection(), // abort this connection if needed + monitoringHostSpec, + this.properties, + failureDetectionTimeMillis, + failureDetectionIntervalMillis, + failureDetectionCount); + } catch (SQLException e) { + throw WrapperUtils.wrapExceptionIfNeeded(exceptionClass, e); + } result = jdbcMethodFunc.call(); @@ -192,7 +201,9 @@ private void initMonitorService() { } } - /** Call this plugin's monitor service to release all resources associated with this plugin. */ + /** + * Call this plugin's monitor service to release all resources associated with this plugin. + */ @Override public void releaseResources() { if (this.monitorService != null) { diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitoringConnectionPluginFactory.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitoringConnectionPluginFactory.java index 0dfdb79ca..b671c90b9 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitoringConnectionPluginFactory.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/efm2/HostMonitoringConnectionPluginFactory.java @@ -18,13 +18,23 @@ import java.util.Properties; import software.amazon.jdbc.ConnectionPlugin; -import software.amazon.jdbc.ConnectionPluginFactory; import software.amazon.jdbc.PluginService; +import software.amazon.jdbc.ServicesContainerPluginFactory; +import software.amazon.jdbc.util.FullServicesContainer; +import software.amazon.jdbc.util.Messages; /** Class initializing a {@link HostMonitoringConnectionPlugin}. */ -public class HostMonitoringConnectionPluginFactory implements ConnectionPluginFactory { +public class HostMonitoringConnectionPluginFactory implements ServicesContainerPluginFactory { @Override public ConnectionPlugin getInstance(final PluginService pluginService, final Properties props) { - return new HostMonitoringConnectionPlugin(pluginService, props); + throw new UnsupportedOperationException( + Messages.get( + "ServiceContainerPluginFactory.serviceContainerRequired", + new Object[] {"efm2.HostMonitoringConnectionPlugin"})); + } + + @Override + public ConnectionPlugin getInstance(final FullServicesContainer servicesContainer, final Properties props) { + return new HostMonitoringConnectionPlugin(servicesContainer, props); } } diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessConnectionPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessConnectionPlugin.java index 23a1aa8be..2bff9680c 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessConnectionPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessConnectionPlugin.java @@ -33,6 +33,7 @@ import software.amazon.jdbc.dialect.AuroraLimitlessDialect; import software.amazon.jdbc.dialect.Dialect; import software.amazon.jdbc.plugin.AbstractConnectionPlugin; +import software.amazon.jdbc.util.FullServicesContainer; import software.amazon.jdbc.util.Messages; public class LimitlessConnectionPlugin extends AbstractConnectionPlugin { @@ -82,10 +83,12 @@ public Set getSubscribedMethods() { return subscribedMethods; } - public LimitlessConnectionPlugin(final PluginService pluginService, final @NonNull Properties properties) { - this(pluginService, + public LimitlessConnectionPlugin( + final FullServicesContainer servicesContainer, + final @NonNull Properties properties) { + this(servicesContainer.getPluginService(), properties, - () -> new LimitlessRouterServiceImpl(pluginService)); + () -> new LimitlessRouterServiceImpl(servicesContainer, properties)); } public LimitlessConnectionPlugin( diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessConnectionPluginFactory.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessConnectionPluginFactory.java index 27542f9ed..d0bc8ea1f 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessConnectionPluginFactory.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessConnectionPluginFactory.java @@ -18,12 +18,21 @@ import java.util.Properties; import software.amazon.jdbc.ConnectionPlugin; -import software.amazon.jdbc.ConnectionPluginFactory; import software.amazon.jdbc.PluginService; +import software.amazon.jdbc.ServicesContainerPluginFactory; +import software.amazon.jdbc.util.FullServicesContainer; +import software.amazon.jdbc.util.Messages; -public class LimitlessConnectionPluginFactory implements ConnectionPluginFactory { +public class LimitlessConnectionPluginFactory implements ServicesContainerPluginFactory { @Override public ConnectionPlugin getInstance(final PluginService pluginService, final Properties props) { - return new LimitlessConnectionPlugin(pluginService, props); + throw new UnsupportedOperationException( + Messages.get( + "ServicesContainerPluginFactory.servicesContainerRequired", new Object[] {"LimitlessConnectionPlugin"})); + } + + @Override + public ConnectionPlugin getInstance(final FullServicesContainer servicesContainer, final Properties props) { + return new LimitlessConnectionPlugin(servicesContainer, props); } } diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterMonitor.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterMonitor.java index 9a18b9132..cd983896a 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterMonitor.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterMonitor.java @@ -20,54 +20,53 @@ import java.sql.SQLException; import java.util.List; import java.util.Properties; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; import org.checkerframework.checker.nullness.qual.NonNull; import software.amazon.jdbc.HostSpec; import software.amazon.jdbc.PluginService; -import software.amazon.jdbc.util.ExecutorFactory; import software.amazon.jdbc.util.Messages; import software.amazon.jdbc.util.PropertyUtils; import software.amazon.jdbc.util.Utils; -import software.amazon.jdbc.util.storage.SlidingExpirationCacheWithCleanupThread; +import software.amazon.jdbc.util.connection.ConnectionService; +import software.amazon.jdbc.util.monitoring.AbstractMonitor; +import software.amazon.jdbc.util.storage.StorageService; import software.amazon.jdbc.util.telemetry.TelemetryContext; import software.amazon.jdbc.util.telemetry.TelemetryFactory; import software.amazon.jdbc.util.telemetry.TelemetryTraceLevel; -public class LimitlessRouterMonitor implements AutoCloseable, Runnable { +public class LimitlessRouterMonitor extends AbstractMonitor { private static final Logger LOGGER = Logger.getLogger(LimitlessRouterMonitor.class.getName()); protected static final String MONITORING_PROPERTY_PREFIX = "limitless-router-monitor-"; + protected static final long TERMINATION_TIMEOUT_SEC = 5; protected final int intervalMs; protected final @NonNull HostSpec hostSpec; - protected final SlidingExpirationCacheWithCleanupThread> limitlessRouterCache; - protected final String limitlessRouterCacheKey; + protected final @NonNull StorageService storageService; + protected final @NonNull String limitlessRouterCacheKey; protected final @NonNull Properties props; - protected final @NonNull PluginService pluginService; - protected final LimitlessQueryHelper queryHelper; - protected final TelemetryFactory telemetryFactory; + protected final @NonNull ConnectionService connectionService; + protected final @NonNull LimitlessQueryHelper queryHelper; + protected final @NonNull TelemetryFactory telemetryFactory; protected Connection monitoringConn = null; - // TODO: remove and submit monitors to MonitorService instead - private final ExecutorService threadPool = ExecutorFactory.newFixedThreadPool(1, "threadPool"); - - private final AtomicBoolean stopped = new AtomicBoolean(false); - public LimitlessRouterMonitor( final @NonNull PluginService pluginService, + final @NonNull ConnectionService connectionService, + final @NonNull TelemetryFactory telemetryFactory, final @NonNull HostSpec hostSpec, - final @NonNull SlidingExpirationCacheWithCleanupThread> limitlessRouterCache, + final @NonNull StorageService storageService, final @NonNull String limitlessRouterCacheKey, final @NonNull Properties props, final int intervalMs) { - this.pluginService = pluginService; + super(TERMINATION_TIMEOUT_SEC); + this.connectionService = connectionService; + this.storageService = storageService; + this.telemetryFactory = telemetryFactory; this.hostSpec = hostSpec; - this.limitlessRouterCache = limitlessRouterCache; this.limitlessRouterCacheKey = limitlessRouterCacheKey; this.props = PropertyUtils.copyProperties(props); props.stringPropertyNames().stream() @@ -82,24 +81,11 @@ public LimitlessRouterMonitor( this.props.setProperty(LimitlessConnectionPlugin.WAIT_FOR_ROUTER_INFO.name, "false"); this.intervalMs = intervalMs; - this.telemetryFactory = this.pluginService.getTelemetryFactory(); - this.queryHelper = new LimitlessQueryHelper(this.pluginService); - this.threadPool.submit(this); - this.threadPool.shutdown(); // No more task are accepted by pool. - } - - public List getLimitlessRouters() { - return this.limitlessRouterCache.get(this.limitlessRouterCacheKey, - TimeUnit.MILLISECONDS.toNanos(LimitlessRouterServiceImpl.MONITOR_DISPOSAL_TIME_MS.getLong(props))); - } - - public AtomicBoolean isStopped() { - return this.stopped; + this.queryHelper = new LimitlessQueryHelper(pluginService); } @Override - public void close() throws Exception { - this.stopped.set(true); + public void close() { try { if (this.monitoringConn != null && !this.monitoringConn.isClosed()) { this.monitoringConn.close(); @@ -109,42 +95,31 @@ public void close() throws Exception { } this.monitoringConn = null; - - // Waiting for 5s gives a thread enough time to exit monitoring loop and close database connection. - if (!this.threadPool.awaitTermination(5, TimeUnit.SECONDS)) { - this.threadPool.shutdownNow(); - } - LOGGER.finest(() -> Messages.get( - "LimitlessRouterMonitor.stopped", - new Object[] {this.hostSpec.getHost()})); } @Override - public void run() { + public void monitor() { LOGGER.finest(() -> Messages.get( "LimitlessRouterMonitor.running", new Object[] {this.hostSpec.getHost()})); try { - while (!this.stopped.get()) { + while (!this.stop.get()) { TelemetryContext telemetryContext = this.telemetryFactory.openTelemetryContext( "limitless router monitor thread", TelemetryTraceLevel.TOP_LEVEL); if (telemetryContext != null) { telemetryContext.setAttribute("url", hostSpec.getUrl()); } + try { this.openConnection(); if (this.monitoringConn == null || this.monitoringConn.isClosed()) { continue; } - List newLimitlessRouters = queryHelper.queryForLimitlessRouters(this.monitoringConn, - this.hostSpec.getPort()); - - limitlessRouterCache.put( - this.limitlessRouterCacheKey, - newLimitlessRouters, - TimeUnit.MILLISECONDS.toNanos(LimitlessRouterServiceImpl.MONITOR_DISPOSAL_TIME_MS.getLong(props))); + List newLimitlessRouters = + queryHelper.queryForLimitlessRouters(this.monitoringConn, this.hostSpec.getPort()); + this.storageService.set(this.limitlessRouterCacheKey, new LimitlessRouters(newLimitlessRouters)); LOGGER.finest(Utils.logTopology(newLimitlessRouters, "[limitlessRouterMonitor] Topology:")); TimeUnit.MILLISECONDS.sleep(this.intervalMs); // do not include this in the telemetry } catch (final Exception ex) { @@ -175,7 +150,7 @@ public void run() { ex); // We want to print full trace stack of the exception. } } finally { - this.stopped.set(true); + this.stop.set(true); try { if (this.monitoringConn != null && !this.monitoringConn.isClosed()) { this.monitoringConn.close(); @@ -195,8 +170,7 @@ private void openConnection() throws SQLException { LOGGER.finest(() -> Messages.get( "LimitlessRouterMonitor.openingConnection", new Object[] {this.hostSpec.getUrl()})); - // TODO: replace with ConnectionService#open - this.monitoringConn = this.pluginService.forceConnect(this.hostSpec, this.props); + this.monitoringConn = this.connectionService.open(this.hostSpec, this.props); LOGGER.finest(() -> Messages.get( "LimitlessRouterMonitor.openedConnection", new Object[] {this.monitoringConn})); diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImpl.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImpl.java index 1c5feee1c..44f5b8a97 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImpl.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImpl.java @@ -17,9 +17,12 @@ package software.amazon.jdbc.plugin.limitless; import java.sql.SQLException; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -33,9 +36,10 @@ import software.amazon.jdbc.PropertyDefinition; import software.amazon.jdbc.RoundRobinHostSelector; import software.amazon.jdbc.hostavailability.HostAvailability; +import software.amazon.jdbc.util.FullServicesContainer; import software.amazon.jdbc.util.Messages; import software.amazon.jdbc.util.Utils; -import software.amazon.jdbc.util.storage.SlidingExpirationCacheWithCleanupThread; +import software.amazon.jdbc.util.monitoring.MonitorErrorResponse; public class LimitlessRouterServiceImpl implements LimitlessRouterService { private static final Logger LOGGER = @@ -45,69 +49,51 @@ public class LimitlessRouterServiceImpl implements LimitlessRouterService { "limitlessTransactionRouterMonitorDisposalTimeMs", "600000", // 10min "Interval in milliseconds for an Limitless router monitor to be considered inactive and to be disposed."); - protected static final long CACHE_CLEANUP_NANO = TimeUnit.MINUTES.toNanos(1); protected static final Map forceGetLimitlessRoutersLockMap = new ConcurrentHashMap<>(); + protected static final Set monitorErrorResponses = + new HashSet<>(Collections.singletonList(MonitorErrorResponse.RECREATE)); + protected final FullServicesContainer servicesContainer; protected final PluginService pluginService; protected final LimitlessQueryHelper queryHelper; - protected final LimitlessRouterMonitorInitializer limitlessRouterMonitorInitializer; - // TODO: remove and submit monitors to MonitorService instead - protected static final SlidingExpirationCacheWithCleanupThread - limitlessRouterMonitors = new SlidingExpirationCacheWithCleanupThread<>( - limitlessRouterMonitor -> true, - limitlessRouterMonitor -> { - try { - limitlessRouterMonitor.close(); - } catch (Exception e) { - LOGGER.warning(Messages.get("LimitlessRouterServiceImpl.errorClosingMonitor", - new Object[]{e.getMessage()})); - } - }, - CACHE_CLEANUP_NANO - ); - - protected static final SlidingExpirationCacheWithCleanupThread> - limitlessRouterCache = - new SlidingExpirationCacheWithCleanupThread<>( - x -> true, - x -> {}, - CACHE_CLEANUP_NANO - ); static { PropertyDefinition.registerPluginProperties(LimitlessRouterServiceImpl.class); } - public LimitlessRouterServiceImpl(final @NonNull PluginService pluginService) { - this( - pluginService, - (hostSpec, - routerCache, - routerCacheKey, - props, - intervalMs) -> - new LimitlessRouterMonitor( - pluginService, - hostSpec, - routerCache, - routerCacheKey, - props, - intervalMs), - new LimitlessQueryHelper(pluginService)); + public LimitlessRouterServiceImpl( + final @NonNull FullServicesContainer servicesContainer, + final @NonNull Properties props) { + this(servicesContainer, new LimitlessQueryHelper(servicesContainer.getPluginService()), props); } public LimitlessRouterServiceImpl( - final @NonNull PluginService pluginService, - final LimitlessRouterMonitorInitializer limitlessRouterMonitorInitializer, - final LimitlessQueryHelper queryHelper) { - this.pluginService = pluginService; - this.limitlessRouterMonitorInitializer = limitlessRouterMonitorInitializer; + final @NonNull FullServicesContainer servicesContainer, + final @NonNull LimitlessQueryHelper queryHelper, + final @NonNull Properties props) { + this.servicesContainer = servicesContainer; + this.pluginService = servicesContainer.getPluginService(); this.queryHelper = queryHelper; + + this.servicesContainer.getStorageService().registerItemClassIfAbsent( + LimitlessRouters.class, + true, + TimeUnit.MILLISECONDS.toNanos(MONITOR_DISPOSAL_TIME_MS.getLong(props)), + null, + null + ); + + this.servicesContainer.getMonitorService().registerMonitorTypeIfAbsent( + LimitlessRouterMonitor.class, + TimeUnit.MILLISECONDS.toNanos(MONITOR_DISPOSAL_TIME_MS.getLong(props)), + TimeUnit.MINUTES.toNanos(3), + monitorErrorResponses, + LimitlessRouters.class + ); } @Override public void establishConnection(final LimitlessConnectionContext context) throws SQLException { - context.setLimitlessRouters(getLimitlessRouters( - this.pluginService.getHostListProvider().getClusterId(), context.getProps())); + context.setLimitlessRouters(getLimitlessRouters(this.pluginService.getHostListProvider().getClusterId())); if (Utils.isNullOrEmpty(context.getLimitlessRouters())) { LOGGER.finest(Messages.get("LimitlessRouterServiceImpl.limitlessRouterCacheEmpty")); @@ -183,10 +169,9 @@ public void establishConnection(final LimitlessConnectionContext context) throws } } - protected List getLimitlessRouters(final String clusterId, final Properties props) { - final long cacheExpirationNano = TimeUnit.MILLISECONDS.toNanos( - MONITOR_DISPOSAL_TIME_MS.getLong(props)); - return limitlessRouterCache.get(clusterId, cacheExpirationNano); + protected List getLimitlessRouters(final String clusterId) { + LimitlessRouters routers = this.servicesContainer.getStorageService().get(LimitlessRouters.class, clusterId); + return routers == null ? null : routers.getHosts(); } private void retryConnectWithLeastLoadedRouters( @@ -292,9 +277,6 @@ protected void synchronouslyGetLimitlessRoutersWithRetry(final LimitlessConnecti protected void synchronouslyGetLimitlessRouters(final LimitlessConnectionContext context) throws SQLException { - final long cacheExpirationNano = TimeUnit.MILLISECONDS.toNanos( - MONITOR_DISPOSAL_TIME_MS.getLong(context.getProps())); - final ReentrantLock lock = forceGetLimitlessRoutersLockMap.computeIfAbsent( this.pluginService.getHostListProvider().getClusterId(), key -> new ReentrantLock() @@ -302,7 +284,7 @@ protected void synchronouslyGetLimitlessRouters(final LimitlessConnectionContext lock.lock(); try { final List limitlessRouters = - limitlessRouterCache.get(this.pluginService.getHostListProvider().getClusterId(), cacheExpirationNano); + getLimitlessRouters(this.pluginService.getHostListProvider().getClusterId()); if (!Utils.isNullOrEmpty(limitlessRouters)) { context.setLimitlessRouters(limitlessRouters); return; @@ -311,16 +293,15 @@ protected void synchronouslyGetLimitlessRouters(final LimitlessConnectionContext if (context.getConnection() == null || context.getConnection().isClosed()) { context.setConnection(context.getConnectFunc().call()); } - final List newLimitlessRouters = + final List newRouterList = this.queryHelper.queryForLimitlessRouters(context.getConnection(), context.getHostSpec().getPort()); - if (!Utils.isNullOrEmpty(newLimitlessRouters)) { - context.setLimitlessRouters(newLimitlessRouters); - limitlessRouterCache.put( + if (!Utils.isNullOrEmpty(newRouterList)) { + context.setLimitlessRouters(newRouterList); + LimitlessRouters newRouters = new LimitlessRouters(newRouterList); + this.servicesContainer.getStorageService().set( this.pluginService.getHostListProvider().getClusterId(), - newLimitlessRouters, - TimeUnit.MILLISECONDS.toNanos( - LimitlessRouterServiceImpl.MONITOR_DISPOSAL_TIME_MS.getLong(context.getProps()))); + newRouters); } else { throw new SQLException(Messages.get("LimitlessRouterServiceImpl.fetchedEmptyRouterList")); } @@ -340,18 +321,25 @@ public void startMonitoring(final @NonNull HostSpec hostSpec, try { final String limitlessRouterMonitorKey = pluginService.getHostListProvider().getClusterId(); - final long cacheExpirationNano = TimeUnit.MILLISECONDS.toNanos(MONITOR_DISPOSAL_TIME_MS.getLong(props)); - - limitlessRouterMonitors.computeIfAbsent( + this.servicesContainer.getMonitorService().runIfAbsent( + LimitlessRouterMonitor.class, limitlessRouterMonitorKey, - key -> this.limitlessRouterMonitorInitializer - .createLimitlessRouterMonitor( + this.servicesContainer.getStorageService(), + this.servicesContainer.getTelemetryFactory(), + this.pluginService.getOriginalUrl(), + this.pluginService.getDriverProtocol(), + this.pluginService.getTargetDriverDialect(), + this.pluginService.getDialect(), + props, + (connectionService, pluginService) -> new LimitlessRouterMonitor( + pluginService, + connectionService, + this.servicesContainer.getTelemetryFactory(), hostSpec, - limitlessRouterCache, + this.servicesContainer.getStorageService(), limitlessRouterMonitorKey, props, - intervalMs), - cacheExpirationNano); + intervalMs)); } catch (SQLException e) { LOGGER.warning(Messages.get("LimitlessRouterServiceImpl.errorStartingMonitor", new Object[]{e})); throw new RuntimeException(e); @@ -360,6 +348,5 @@ public void startMonitoring(final @NonNull HostSpec hostSpec, public static void clearCache() { forceGetLimitlessRoutersLockMap.clear(); - limitlessRouterCache.clear(); } } diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouters.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouters.java new file mode 100644 index 000000000..0793dbcff --- /dev/null +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/limitless/LimitlessRouters.java @@ -0,0 +1,54 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.jdbc.plugin.limitless; + +import java.util.List; +import java.util.Objects; +import org.checkerframework.checker.nullness.qual.NonNull; +import software.amazon.jdbc.HostSpec; +import software.amazon.jdbc.hostlistprovider.Topology; + +public class LimitlessRouters { + private final @NonNull List hosts; + + public LimitlessRouters(@NonNull List hosts) { + this.hosts = hosts; + } + + public @NonNull List getHosts() { + return hosts; + } + + @Override + public int hashCode() { + return Objects.hash(hosts); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + LimitlessRouters other = (LimitlessRouters) obj; + return Objects.equals(hosts, other.hosts); + } +} diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/FastestResponseStrategyPlugin.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/FastestResponseStrategyPlugin.java index 6e7f39312..9c5377a79 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/FastestResponseStrategyPlugin.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/FastestResponseStrategyPlugin.java @@ -39,6 +39,7 @@ import software.amazon.jdbc.PropertyDefinition; import software.amazon.jdbc.RandomHostSelector; import software.amazon.jdbc.plugin.AbstractConnectionPlugin; +import software.amazon.jdbc.util.FullServicesContainer; import software.amazon.jdbc.util.storage.CacheMap; public class FastestResponseStrategyPlugin extends AbstractConnectionPlugin { @@ -80,17 +81,19 @@ public class FastestResponseStrategyPlugin extends AbstractConnectionPlugin { PropertyDefinition.registerPluginProperties("frt-"); } - public FastestResponseStrategyPlugin(final PluginService pluginService, final @NonNull Properties properties) { - this(pluginService, + public FastestResponseStrategyPlugin( + final FullServicesContainer servicesContainer, + final @NonNull Properties properties) { + this(servicesContainer.getPluginService(), properties, new HostResponseTimeServiceImpl( - pluginService, + servicesContainer, properties, RESPONSE_MEASUREMENT_INTERVAL_MILLIS.getInteger(properties))); } public FastestResponseStrategyPlugin( - final PluginService pluginService, + final @NonNull PluginService pluginService, final @NonNull Properties properties, final @NonNull HostResponseTimeService hostResponseTimeService) { diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/FastestResponseStrategyPluginFactory.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/FastestResponseStrategyPluginFactory.java index 87a1d766b..dd2e54e17 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/FastestResponseStrategyPluginFactory.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/FastestResponseStrategyPluginFactory.java @@ -18,13 +18,22 @@ import java.util.Properties; import software.amazon.jdbc.ConnectionPlugin; -import software.amazon.jdbc.ConnectionPluginFactory; import software.amazon.jdbc.PluginService; +import software.amazon.jdbc.ServicesContainerPluginFactory; +import software.amazon.jdbc.util.FullServicesContainer; +import software.amazon.jdbc.util.Messages; -public class FastestResponseStrategyPluginFactory implements ConnectionPluginFactory { - +public class FastestResponseStrategyPluginFactory implements ServicesContainerPluginFactory { @Override public ConnectionPlugin getInstance(final PluginService pluginService, final Properties props) { - return new FastestResponseStrategyPlugin(pluginService, props); + throw new UnsupportedOperationException( + Messages.get( + "ServicesContainerPluginFactory.servicesContainerRequired", + new Object[] {"FastestResponseStrategyPlugin"})); + } + + @Override + public ConnectionPlugin getInstance(final FullServicesContainer servicesContainer, final Properties props) { + return new FastestResponseStrategyPlugin(servicesContainer, props); } } diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/HostResponseTimeServiceImpl.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/HostResponseTimeServiceImpl.java index 4a7567c81..623e9b2e0 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/HostResponseTimeServiceImpl.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/HostResponseTimeServiceImpl.java @@ -16,72 +16,47 @@ package software.amazon.jdbc.plugin.strategy.fastestresponse; +import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Logger; import java.util.stream.Collectors; import org.checkerframework.checker.nullness.qual.NonNull; import software.amazon.jdbc.HostSpec; import software.amazon.jdbc.PluginService; +import software.amazon.jdbc.util.FullServicesContainer; +import software.amazon.jdbc.util.Messages; import software.amazon.jdbc.util.storage.SlidingExpirationCacheWithCleanupThread; -import software.amazon.jdbc.util.telemetry.TelemetryFactory; -import software.amazon.jdbc.util.telemetry.TelemetryGauge; public class HostResponseTimeServiceImpl implements HostResponseTimeService { private static final Logger LOGGER = Logger.getLogger(HostResponseTimeServiceImpl.class.getName()); - protected static final long CACHE_EXPIRATION_NANO = TimeUnit.MINUTES.toNanos(10); - protected static final long CACHE_CLEANUP_NANO = TimeUnit.MINUTES.toNanos(1); - - // TODO: remove and submit monitors to MonitorService instead - protected static final SlidingExpirationCacheWithCleanupThread monitoringNodes - = new SlidingExpirationCacheWithCleanupThread<>( - (monitor) -> true, - (monitor) -> { - try { - monitor.close(); - } catch (Exception ex) { - // ignore - } - }, - CACHE_CLEANUP_NANO); - protected static final ReentrantLock cacheLock = new ReentrantLock(); - protected int intervalMs; - protected List hosts = new ArrayList<>(); + protected final @NonNull FullServicesContainer servicesContainer; protected final @NonNull PluginService pluginService; protected final @NonNull Properties props; - protected final TelemetryFactory telemetryFactory; - private final TelemetryGauge nodeCountGauge; - public HostResponseTimeServiceImpl( - final @NonNull PluginService pluginService, + final @NonNull FullServicesContainer servicesContainer, final @NonNull Properties props, int intervalMs) { - - this.pluginService = pluginService; + this.servicesContainer = servicesContainer; + this.pluginService = servicesContainer.getPluginService(); this.props = props; this.intervalMs = intervalMs; - this.telemetryFactory = this.pluginService.getTelemetryFactory(); - this.nodeCountGauge = this.telemetryFactory.createGauge("frt.nodes.count", - () -> (long) monitoringNodes.size()); - - monitoringNodes.setCleanupIntervalNanos(CACHE_CLEANUP_NANO); } @Override public int getResponseTime(HostSpec hostSpec) { - final NodeResponseTimeMonitor monitor = monitoringNodes.get(hostSpec.getUrl(), CACHE_EXPIRATION_NANO); + final NodeResponseTimeMonitor monitor = + this.servicesContainer.getMonitorService().get(NodeResponseTimeMonitor.class, hostSpec.getUrl()); if (monitor == null) { return Integer.MAX_VALUE; } @@ -99,26 +74,24 @@ public void setHosts(final @NonNull List hosts) { // hostSpec is not in the set of hosts that already being monitored .filter(hostSpec -> !oldHosts.contains(hostSpec.getUrl())) .forEach(hostSpec -> { - cacheLock.lock(); try { - monitoringNodes.computeIfAbsent( + this.servicesContainer.getMonitorService().runIfAbsent( + NodeResponseTimeMonitor.class, hostSpec.getUrl(), - (key) -> new NodeResponseTimeMonitor(this.pluginService, hostSpec, this.props, this.intervalMs), - CACHE_EXPIRATION_NANO); - } finally { - cacheLock.unlock(); + servicesContainer.getStorageService(), + servicesContainer.getTelemetryFactory(), + this.pluginService.getOriginalUrl(), + this.pluginService.getDriverProtocol(), + this.pluginService.getTargetDriverDialect(), + this.pluginService.getDialect(), + this.props, + (connectionService, pluginService) -> + new NodeResponseTimeMonitor(pluginService, connectionService, hostSpec, this.props, + this.intervalMs)); + } catch (SQLException e) { + LOGGER.warning( + Messages.get("HostResponseTimeServiceImpl.errorStartingMonitor", new Object[] {hostSpec.getUrl(), e})); } }); } - - public static void closeAllMonitors() { - monitoringNodes.getEntries().values().forEach(monitor -> { - try { - monitor.close(); - } catch (Exception ex) { - // ignore - } - }); - monitoringNodes.clear(); - } } diff --git a/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/NodeResponseTimeMonitor.java b/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/NodeResponseTimeMonitor.java index a1758f7f0..1b985c03f 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/NodeResponseTimeMonitor.java +++ b/wrapper/src/main/java/software/amazon/jdbc/plugin/strategy/fastestresponse/NodeResponseTimeMonitor.java @@ -19,8 +19,6 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -30,21 +28,23 @@ import org.checkerframework.checker.nullness.qual.NonNull; import software.amazon.jdbc.HostSpec; import software.amazon.jdbc.PluginService; -import software.amazon.jdbc.util.ExecutorFactory; import software.amazon.jdbc.util.Messages; import software.amazon.jdbc.util.PropertyUtils; import software.amazon.jdbc.util.StringUtils; +import software.amazon.jdbc.util.connection.ConnectionService; +import software.amazon.jdbc.util.monitoring.AbstractMonitor; import software.amazon.jdbc.util.telemetry.TelemetryContext; import software.amazon.jdbc.util.telemetry.TelemetryFactory; import software.amazon.jdbc.util.telemetry.TelemetryGauge; import software.amazon.jdbc.util.telemetry.TelemetryTraceLevel; -public class NodeResponseTimeMonitor implements AutoCloseable, Runnable { +public class NodeResponseTimeMonitor extends AbstractMonitor { private static final Logger LOGGER = Logger.getLogger(NodeResponseTimeMonitor.class.getName()); private static final String MONITORING_PROPERTY_PREFIX = "frt-"; + private static final int TERMINATION_TIMEOUT_SEC = 5; private static final int NUM_OF_MEASURES = 5; private final int intervalMs; @@ -56,24 +56,23 @@ public class NodeResponseTimeMonitor implements AutoCloseable, Runnable { private final @NonNull Properties props; private final @NonNull PluginService pluginService; + private final @NonNull ConnectionService connectionService; private final TelemetryFactory telemetryFactory; private final TelemetryGauge responseTimeMsGauge; - private Connection monitoringConn = null; - // TODO: remove and submit monitors to MonitorService instead - private final ExecutorService threadPool = - ExecutorFactory.newFixedThreadPool(1, "threadPool"); - public NodeResponseTimeMonitor( final @NonNull PluginService pluginService, + final @NonNull ConnectionService connectionService, final @NonNull HostSpec hostSpec, final @NonNull Properties props, int intervalMs) { + super(TERMINATION_TIMEOUT_SEC); this.pluginService = pluginService; + this.connectionService = connectionService; this.hostSpec = hostSpec; this.props = props; this.intervalMs = intervalMs; @@ -85,12 +84,9 @@ public NodeResponseTimeMonitor( // Report current response time (in milliseconds) to telemetry engine. // Report -1 if response time couldn't be measured. - this.responseTimeMsGauge = this.telemetryFactory.createGauge( + this.responseTimeMsGauge = telemetryFactory.createGauge( String.format("frt.response.time.%s", nodeId), () -> this.responseTime.get() == Integer.MAX_VALUE ? -1 : (long) this.responseTime.get()); - - this.threadPool.submit(this); - this.threadPool.shutdown(); // No more task are accepted by pool. } // Return node response time in milliseconds. @@ -98,35 +94,19 @@ public int getResponseTime() { return this.responseTime.get(); } - public long getCheckTimestamp() { - return this.checkTimestamp.get(); - } - + @NonNull public HostSpec getHostSpec() { return this.hostSpec; } - @Override - public void close() throws Exception { - this.stopped.set(true); - - // Waiting for 5s gives a thread enough time to exit monitoring loop and close database connection. - if (!this.threadPool.awaitTermination(5, TimeUnit.SECONDS)) { - this.threadPool.shutdownNow(); - } - LOGGER.finest(() -> Messages.get( - "NodeResponseTimeMonitor.stopped", - new Object[] {this.hostSpec.getHost()})); - } - // The method is for testing purposes. protected long getCurrentTime() { return System.nanoTime(); } @Override - public void run() { - TelemetryContext telemetryContext = this.telemetryFactory.openTelemetryContext( + public void monitor() { + TelemetryContext telemetryContext = telemetryFactory.openTelemetryContext( "node response time thread", TelemetryTraceLevel.TOP_LEVEL); if (telemetryContext != null) { telemetryContext.setAttribute("url", hostSpec.getUrl()); @@ -134,6 +114,7 @@ public void run() { try { while (!this.stopped.get()) { + this.lastActivityTimestampNanos.set(System.nanoTime()); this.openConnection(); if (this.monitoringConn != null) { @@ -216,8 +197,7 @@ private void openConnection() { LOGGER.finest(() -> Messages.get( "NodeResponseTimeMonitor.openingConnection", new Object[] {this.hostSpec.getUrl()})); - // TODO: replace with ConnectionService#open - this.monitoringConn = this.pluginService.forceConnect(this.hostSpec, monitoringConnProperties); + this.monitoringConn = this.connectionService.open(this.hostSpec, monitoringConnProperties); LOGGER.finest(() -> Messages.get( "NodeResponseTimeMonitor.openedConnection", new Object[] {this.monitoringConn})); @@ -233,4 +213,15 @@ private void openConnection() { } } } + + @Override + public void close() { + if (this.monitoringConn != null) { + try { + this.monitoringConn.close(); + } catch (SQLException e) { + // ignore + } + } + } } diff --git a/wrapper/src/main/java/software/amazon/jdbc/util/monitoring/MonitorServiceImpl.java b/wrapper/src/main/java/software/amazon/jdbc/util/monitoring/MonitorServiceImpl.java index b6804f9a1..227b29587 100644 --- a/wrapper/src/main/java/software/amazon/jdbc/util/monitoring/MonitorServiceImpl.java +++ b/wrapper/src/main/java/software/amazon/jdbc/util/monitoring/MonitorServiceImpl.java @@ -37,6 +37,7 @@ import software.amazon.jdbc.hostlistprovider.Topology; import software.amazon.jdbc.hostlistprovider.monitoring.ClusterTopologyMonitorImpl; import software.amazon.jdbc.hostlistprovider.monitoring.MultiAzClusterTopologyMonitorImpl; +import software.amazon.jdbc.plugin.strategy.fastestresponse.NodeResponseTimeMonitor; import software.amazon.jdbc.targetdriverdialect.TargetDriverDialect; import software.amazon.jdbc.util.ExecutorFactory; import software.amazon.jdbc.util.Messages; @@ -64,6 +65,7 @@ public class MonitorServiceImpl implements MonitorService, EventSubscriber { suppliers.put(ClusterTopologyMonitorImpl.class, () -> new CacheContainer(defaultSettings, Topology.class)); suppliers.put(MultiAzClusterTopologyMonitorImpl.class, () -> new CacheContainer(defaultSettings, Topology.class)); + suppliers.put(NodeResponseTimeMonitor.class, () -> new CacheContainer(defaultSettings, null)); defaultSuppliers = Collections.unmodifiableMap(suppliers); } diff --git a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties index f9cd14d48..6bb12413d 100644 --- a/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties +++ b/wrapper/src/main/resources/aws_advanced_jdbc_wrapper_messages.properties @@ -213,10 +213,8 @@ LimitlessRouterMonitor.getNetworkTimeoutError=An error occurred while getting th LimitlessRouterMonitor.openingConnection=Opening Limitless Router Monitor connection to ''{0}''. LimitlessRouterMonitor.openedConnection=Opened Limitless Router Monitor connection: {0}. LimitlessRouterMonitor.running=Limitless Router Monitor thread running on node {0}. -LimitlessRouterMonitor.stopped=Limitless Router Monitor thread stopped on node {0}. LimitlessRouterServiceImpl.connectWithHost=Connecting to host {0}. -LimitlessRouterServiceImpl.errorClosingMonitor=An error occurred while closing Limitless Router Monitor: {0} LimitlessRouterServiceImpl.errorStartingMonitor=An error occurred while starting Limitless Router Monitor: {0} LimitlessRouterServiceImpl.failedToConnectToHost=Failed to connect to host {0}. LimitlessRouterServiceImpl.fetchedEmptyRouterList=Empty router list was fetched. @@ -255,6 +253,8 @@ HostMonitorImpl.stopMonitoringThread=Stop monitoring thread for {0}. HostMonitorServiceImpl.emptyAliasSet=Empty alias set passed for ''{0}''. Set should not be empty. +HostResponseTimeServiceImpl.errorStartingMonitor=An error occurred while starting a response time monitor for ''{0}'': {1} + MonitorServiceImpl.checkingMonitors=Checking monitors for errors... MonitorServiceImpl.monitorClassMismatch=The monitor stored at ''{0}'' did not have the expected type. The expected type was ''{1}'', but the monitor ''{2}'' had a type of ''{3}''. MonitorServiceImpl.monitorStuck=Monitor ''{0}'' has not been updated within the inactive timeout of {1} milliseconds. The monitor will be stopped. diff --git a/wrapper/src/test/build.gradle.kts b/wrapper/src/test/build.gradle.kts index 8db494043..c1cb327a7 100644 --- a/wrapper/src/test/build.gradle.kts +++ b/wrapper/src/test/build.gradle.kts @@ -41,9 +41,9 @@ dependencies { testImplementation("com.zaxxer:HikariCP:4.0.3") // Version 4.+ is compatible with Java 8 testImplementation("org.springframework.boot:spring-boot-starter-jdbc:2.7.13") // 2.7.13 is the last version compatible with Java 8 testImplementation("org.mockito:mockito-inline:4.11.0") // 4.11.0 is the last version compatible with Java 8 - testImplementation("software.amazon.awssdk:ec2:2.29.34") - testImplementation("software.amazon.awssdk:rds:2.29.34") - testImplementation("software.amazon.awssdk:sts:2.29.34") + testImplementation("software.amazon.awssdk:ec2:2.32.21") + testImplementation("software.amazon.awssdk:rds:2.32.21") + testImplementation("software.amazon.awssdk:sts:2.32.21") // Note: all org.testcontainers dependencies should have the same version testImplementation("org.testcontainers:testcontainers:1.20.4") testImplementation("org.testcontainers:mysql:1.20.4") diff --git a/wrapper/src/test/java/integration/container/ProxyHelper.java b/wrapper/src/test/java/integration/container/ProxyHelper.java index 76fbd3e68..212ed1a9d 100644 --- a/wrapper/src/test/java/integration/container/ProxyHelper.java +++ b/wrapper/src/test/java/integration/container/ProxyHelper.java @@ -96,4 +96,41 @@ private static void enableConnectivity(Proxy proxy) { } LOGGER.finest("Enabled connectivity to " + proxy.getName()); } + + public static void setLatency(String instanceName, int latencyMs) { + Proxy proxy = TestEnvironment.getCurrent().getProxy(instanceName); + if (proxy == null) { + throw new RuntimeException("Proxy for instance " + instanceName + " not found."); + } + + try { + proxy.toxics().latency("latency", ToxicDirection.DOWNSTREAM, latencyMs); + } catch (IOException ex) { + LOGGER.finest("Error setting latency for '" + instanceName + "': " + ex.getMessage()); + } + LOGGER.finest("Set latency for '" + instanceName + "' to " + latencyMs); + } + + public static void clearAllLatencies() { + for (Proxy proxy : TestEnvironment.getCurrent().getProxies()) { + clearLatencies(proxy); + } + } + + public static void clearLatencies(Proxy proxy) { + try { + proxy.toxics().getAll().stream() + .filter(t -> "latency".equals(t.getName())) + .forEach(toxic -> { + try { + toxic.remove(); + } catch (IOException e) { + // ignore + } + }); + } catch (IOException ex) { + LOGGER.finest("Error clearing latencies for '" + proxy.getName() + "': " + ex.getMessage()); + } + LOGGER.finest("Cleared latencies for '" + proxy.getName() + "'."); + } } diff --git a/wrapper/src/test/java/integration/container/TestDriverProvider.java b/wrapper/src/test/java/integration/container/TestDriverProvider.java index 8428ab7c6..817d0a1b0 100644 --- a/wrapper/src/test/java/integration/container/TestDriverProvider.java +++ b/wrapper/src/test/java/integration/container/TestDriverProvider.java @@ -131,6 +131,7 @@ public void beforeEach(ExtensionContext context) throws Exception { .contains(TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED)) { // Enable all proxies ProxyHelper.enableAllConnectivity(); + ProxyHelper.clearAllLatencies(); } boolean makeSureFirstInstanceWriter = diff --git a/wrapper/src/test/java/integration/container/tests/AutoscalingTests.java b/wrapper/src/test/java/integration/container/tests/AutoscalingTests.java index b39a030a7..307e00bb9 100644 --- a/wrapper/src/test/java/integration/container/tests/AutoscalingTests.java +++ b/wrapper/src/test/java/integration/container/tests/AutoscalingTests.java @@ -30,7 +30,6 @@ import integration.container.ConnectionStringHelper; import integration.container.TestDriverProvider; import integration.container.TestEnvironment; -import integration.container.condition.DisableOnTestFeature; import integration.container.condition.EnableOnDatabaseEngineDeployment; import integration.container.condition.EnableOnNumOfInstances; import integration.container.condition.EnableOnTestFeature; @@ -63,7 +62,7 @@ @EnableOnDatabaseEngineDeployment({DatabaseEngineDeployment.AURORA}) @EnableOnNumOfInstances(min = 5) @MakeSureFirstInstanceWriter -@Order(17) +@Order(19) public class AutoscalingTests { protected static final AuroraTestUtility auroraUtil = AuroraTestUtility.getUtility(); diff --git a/wrapper/src/test/java/integration/container/tests/BlueGreenDeploymentTests.java b/wrapper/src/test/java/integration/container/tests/BlueGreenDeploymentTests.java index 2f917a91f..0f3911158 100644 --- a/wrapper/src/test/java/integration/container/tests/BlueGreenDeploymentTests.java +++ b/wrapper/src/test/java/integration/container/tests/BlueGreenDeploymentTests.java @@ -93,7 +93,7 @@ @DisableOnTestFeature(TestEnvironmentFeatures.RUN_DB_METRICS_ONLY) @EnableOnDatabaseEngineDeployment({DatabaseEngineDeployment.RDS_MULTI_AZ_INSTANCE, DatabaseEngineDeployment.AURORA}) @EnableOnDatabaseEngine({DatabaseEngine.MYSQL, DatabaseEngine.PG}) -@Order(18) +@Order(20) public class BlueGreenDeploymentTests { private static final Logger LOGGER = Logger.getLogger(BlueGreenDeploymentTests.class.getName()); diff --git a/wrapper/src/test/java/integration/container/tests/EFM2Test.java b/wrapper/src/test/java/integration/container/tests/EFM2Test.java new file mode 100644 index 000000000..87999c28f --- /dev/null +++ b/wrapper/src/test/java/integration/container/tests/EFM2Test.java @@ -0,0 +1,147 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.container.tests; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import integration.DatabaseEngine; +import integration.DatabaseEngineDeployment; +import integration.TestEnvironmentFeatures; +import integration.container.ConnectionStringHelper; +import integration.container.TestDriver; +import integration.container.TestDriverProvider; +import integration.container.TestEnvironment; +import integration.container.condition.DisableOnTestDriver; +import integration.container.condition.DisableOnTestFeature; +import integration.container.condition.EnableOnDatabaseEngineDeployment; +import integration.container.condition.EnableOnTestFeature; +import integration.util.AuroraTestUtility; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import software.amazon.jdbc.PropertyDefinition; +import software.amazon.jdbc.plugin.efm.HostMonitoringConnectionPlugin; + +@TestMethodOrder(MethodOrderer.MethodName.class) +@ExtendWith(TestDriverProvider.class) +@EnableOnDatabaseEngineDeployment({ + DatabaseEngineDeployment.AURORA, + DatabaseEngineDeployment.RDS, + DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER, + DatabaseEngineDeployment.RDS_MULTI_AZ_INSTANCE, +}) +@DisableOnTestFeature({ + TestEnvironmentFeatures.PERFORMANCE, + TestEnvironmentFeatures.RUN_HIBERNATE_TESTS_ONLY, + TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY, + TestEnvironmentFeatures.BLUE_GREEN_DEPLOYMENT, + TestEnvironmentFeatures.RUN_DB_METRICS_ONLY}) +@Order(17) +public class EFM2Test { + private static final Logger LOGGER = Logger.getLogger(ReadWriteSplittingTests.class.getName()); + protected static final AuroraTestUtility auroraUtil = AuroraTestUtility.getUtility(); + protected ExecutorService executor = Executors.newFixedThreadPool(1, r -> { + final Thread thread = new Thread(r); + thread.setDaemon(true); + return thread; + }); + + @BeforeEach + public void setUpEach() { + this.executor = Executors.newFixedThreadPool(1, r -> { + final Thread thread = new Thread(r); + thread.setDaemon(true); + return thread; + }); + } + + @AfterEach + public void afterEach() { + this.executor.shutdownNow(); + } + + @TestTemplate + @ExtendWith(TestDriverProvider.class) + // TODO: test fails because EFM monitor's isValid call is freezing for MARIADB, investigate why + @DisableOnTestDriver(TestDriver.MARIADB) + @EnableOnTestFeature(TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED) + public void test_efmNetworkFailureDetection() throws SQLException { + int failureDelayMs = 10000; + int maxDurationMs = 30000; + + final Properties props = ConnectionStringHelper.getDefaultProperties(); + props.setProperty(PropertyDefinition.CONNECT_TIMEOUT.name, "10000"); + props.setProperty(PropertyDefinition.SOCKET_TIMEOUT.name, String.valueOf(maxDurationMs)); + props.setProperty(PropertyDefinition.PLUGINS.name, "efm2"); + props.setProperty(HostMonitoringConnectionPlugin.FAILURE_DETECTION_TIME.name, "5000"); + props.setProperty(HostMonitoringConnectionPlugin.FAILURE_DETECTION_COUNT.name, "1"); + + String url = ConnectionStringHelper.getProxyWrapperUrl(); + String instanceId = TestEnvironment.getCurrent() + .getInfo() + .getProxyDatabaseInfo() + .getInstances() + .get(0) + .getInstanceId(); + try (final Connection conn = DriverManager.getConnection(url, props)) { + Statement stmt = conn.createStatement(); + + // Simulate network failure in the middle of the query. The simulated failure occurs after a small delay to allow + // time for the statement to be sent and the monitoring connection to be opened. + auroraUtil.simulateTemporaryFailure(executor, instanceId, failureDelayMs, maxDurationMs); + long startNs = System.nanoTime(); + try { + stmt.executeQuery(getSleepSql(TimeUnit.MILLISECONDS.toSeconds(maxDurationMs))); + fail("Sleep query should have failed"); + } catch (SQLException e) { + long endNs = System.nanoTime(); + long durationMs = TimeUnit.NANOSECONDS.toMillis(endNs - startNs); + // EFM should detect network failure and abort the connection ~5-10 seconds after the query is sent + assertTrue(durationMs > failureDelayMs && durationMs < maxDurationMs, + String.format("Time before failure was not between %d and %d seconds, actual duration was %d seconds.", + failureDelayMs, maxDurationMs, durationMs)); + } + } + } + + private String getSleepSql(final long seconds) { + final DatabaseEngine databaseEngine = TestEnvironment.getCurrent().getInfo().getRequest().getDatabaseEngine(); + switch (databaseEngine) { + case PG: + return String.format("SELECT pg_sleep(%d)", seconds); + case MYSQL: + case MARIADB: + return String.format("SELECT sleep(%d)", seconds); + default: + throw new UnsupportedOperationException(databaseEngine.name()); + } + } +} diff --git a/wrapper/src/test/java/integration/container/tests/FailoverTest.java b/wrapper/src/test/java/integration/container/tests/FailoverTest.java index 400f35c41..1c9359fd5 100644 --- a/wrapper/src/test/java/integration/container/tests/FailoverTest.java +++ b/wrapper/src/test/java/integration/container/tests/FailoverTest.java @@ -70,6 +70,9 @@ @TestMethodOrder(MethodOrderer.MethodName.class) @ExtendWith(TestDriverProvider.class) @EnableOnTestFeature(TestEnvironmentFeatures.FAILOVER_SUPPORTED) +@EnableOnDatabaseEngineDeployment({ + DatabaseEngineDeployment.AURORA, + DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER}) @DisableOnTestFeature({ TestEnvironmentFeatures.PERFORMANCE, TestEnvironmentFeatures.RUN_HIBERNATE_TESTS_ONLY, @@ -567,6 +570,7 @@ public void test_failFromWriterWhereKeepSessionStateOnFailoverIsTrue() throws SQ @TestTemplate @EnableOnTestFeature(TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED) + @EnableOnNumOfInstances(min = 2) // Multi-AZ tests already simulate this in other tests instead of sending server failover requests. @EnableOnDatabaseEngineDeployment(DatabaseEngineDeployment.AURORA) public void test_writerFailover_writerReelected() throws SQLException { @@ -647,6 +651,7 @@ public void test_readerFailover_strictReader() throws SQLException { } @TestTemplate + @EnableOnNumOfInstances(min = 2) @EnableOnTestFeature(TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED) public void test_readerFailover_writerReelected() throws SQLException { final String initialWriterId = this.currentWriter; diff --git a/wrapper/src/test/java/integration/container/tests/FastestResponseStrategyTest.java b/wrapper/src/test/java/integration/container/tests/FastestResponseStrategyTest.java new file mode 100644 index 000000000..5efb9f07c --- /dev/null +++ b/wrapper/src/test/java/integration/container/tests/FastestResponseStrategyTest.java @@ -0,0 +1,105 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.container.tests; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import integration.DatabaseEngineDeployment; +import integration.TestEnvironmentFeatures; +import integration.TestInstanceInfo; +import integration.container.ConnectionStringHelper; +import integration.container.ProxyHelper; +import integration.container.TestDriverProvider; +import integration.container.TestEnvironment; +import integration.container.condition.DisableOnTestFeature; +import integration.container.condition.EnableOnDatabaseEngineDeployment; +import integration.container.condition.EnableOnNumOfInstances; +import integration.container.condition.EnableOnTestFeature; +import integration.container.condition.MakeSureFirstInstanceWriter; +import integration.util.AuroraTestUtility; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import software.amazon.jdbc.PropertyDefinition; +import software.amazon.jdbc.hostlistprovider.RdsHostListProvider; +import software.amazon.jdbc.plugin.readwritesplitting.ReadWriteSplittingPlugin; + +@TestMethodOrder(MethodOrderer.MethodName.class) +@ExtendWith(TestDriverProvider.class) +@MakeSureFirstInstanceWriter +@EnableOnDatabaseEngineDeployment({ + DatabaseEngineDeployment.AURORA, + DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER}) +@DisableOnTestFeature({ + TestEnvironmentFeatures.PERFORMANCE, + TestEnvironmentFeatures.RUN_HIBERNATE_TESTS_ONLY, + TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY, + TestEnvironmentFeatures.BLUE_GREEN_DEPLOYMENT, + TestEnvironmentFeatures.RUN_DB_METRICS_ONLY}) +@Order(18) +public class FastestResponseStrategyTest { + protected static final AuroraTestUtility auroraUtil = AuroraTestUtility.getUtility(); + + @TestTemplate + @ExtendWith(TestDriverProvider.class) + @EnableOnNumOfInstances(min = 3) + @EnableOnTestFeature(TestEnvironmentFeatures.NETWORK_OUTAGES_ENABLED) + public void test_fastestHostAlwaysSelected() throws SQLException, InterruptedException { + final Properties props = ConnectionStringHelper.getDefaultProperties(); + props.setProperty(PropertyDefinition.CONNECT_TIMEOUT.name, "10000"); + props.setProperty(PropertyDefinition.SOCKET_TIMEOUT.name, "10000"); + RdsHostListProvider.CLUSTER_INSTANCE_HOST_PATTERN.set(props, + "?." + TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo().getInstanceEndpointSuffix() + + ":" + TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo().getInstanceEndpointPort()); + props.setProperty(PropertyDefinition.PLUGINS.name, "readWriteSplitting,fastestResponseStrategy"); + props.setProperty(ReadWriteSplittingPlugin.READER_HOST_SELECTOR_STRATEGY.name, "fastestResponse"); + + String url = ConnectionStringHelper.getProxyWrapperUrl(); + List instances = TestEnvironment.getCurrent().getInfo().getProxyDatabaseInfo().getInstances(); + assertTrue(instances.size() >= 3); + // The writer is stored at index 0, so we'll choose the reader at index 1 to be the fast one. + TestInstanceInfo fastestReader = instances.get(1); + // Add latencies to all readers except for the first reader in the list. + for (int i = 2; i < instances.size(); i++) { + ProxyHelper.setLatency(instances.get(i).getInstanceId(), 500); + } + + // Verify that the fastest reader is always selected. + for (int i = 0; i < 10; i++) { + try (final Connection conn = DriverManager.getConnection(url, props)) { + if (i == 0) { + // Give the monitors some time to determine the response time of each host. + TimeUnit.SECONDS.sleep(20); + } + + conn.setReadOnly(true); + String instanceId = auroraUtil.queryInstanceId(conn); + assertEquals(fastestReader.getInstanceId(), instanceId); + } + } + } +} + diff --git a/wrapper/src/test/java/integration/container/tests/hibernate/HibernateTests.java b/wrapper/src/test/java/integration/container/tests/hibernate/HibernateTests.java index 03969d80e..01d263f6b 100644 --- a/wrapper/src/test/java/integration/container/tests/hibernate/HibernateTests.java +++ b/wrapper/src/test/java/integration/container/tests/hibernate/HibernateTests.java @@ -60,7 +60,7 @@ TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY, TestEnvironmentFeatures.RUN_DB_METRICS_ONLY}) @MakeSureFirstInstanceWriter -@Order(19) +@Order(21) public class HibernateTests { private static final Logger LOGGER = Logger.getLogger(HibernateTests.class.getName()); diff --git a/wrapper/src/test/java/integration/container/tests/metrics/DatabasePerformanceMetricTest.java b/wrapper/src/test/java/integration/container/tests/metrics/DatabasePerformanceMetricTest.java index d9d2dc70f..c698834de 100644 --- a/wrapper/src/test/java/integration/container/tests/metrics/DatabasePerformanceMetricTest.java +++ b/wrapper/src/test/java/integration/container/tests/metrics/DatabasePerformanceMetricTest.java @@ -96,7 +96,7 @@ @EnableOnDatabaseEngineDeployment({RDS_MULTI_AZ_CLUSTER, AURORA}) @EnableOnDatabaseEngine({DatabaseEngine.MYSQL, DatabaseEngine.PG}) @EnableOnNumOfInstances(min = 2) -@Order(19) +@Order(22) public class DatabasePerformanceMetricTest { private static final Logger LOGGER = Logger.getLogger(DatabasePerformanceMetricTest.class.getName()); diff --git a/wrapper/src/test/java/integration/util/AuroraTestUtility.java b/wrapper/src/test/java/integration/util/AuroraTestUtility.java index e43a3debb..b7c1a5172 100644 --- a/wrapper/src/test/java/integration/util/AuroraTestUtility.java +++ b/wrapper/src/test/java/integration/util/AuroraTestUtility.java @@ -40,7 +40,6 @@ import java.net.URL; import java.net.UnknownHostException; import java.sql.Connection; -import java.sql.DatabaseMetaData; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; @@ -1456,10 +1455,19 @@ public void crashInstance(ExecutorService executor, String instanceId) { } public void simulateTemporaryFailure(ExecutorService executor, String instanceName) { + simulateTemporaryFailure(executor, instanceName, 0, 5000); + } + + public void simulateTemporaryFailure( + ExecutorService executor, String instanceName, int delayMs, int failureDurationMs) { executor.submit(() -> { try { + if (delayMs > 0) { + TimeUnit.MILLISECONDS.sleep(delayMs); + } + ProxyHelper.disableConnectivity(instanceName); - TimeUnit.SECONDS.sleep(5); + TimeUnit.MILLISECONDS.sleep(failureDurationMs); ProxyHelper.enableConnectivity(instanceName); } catch (InterruptedException e) { fail("The disable connectivity thread was unexpectedly interrupted."); @@ -1797,7 +1805,6 @@ protected String getInstanceIdSql(DatabaseEngine databaseEngine, DatabaseEngineD throw new UnsupportedOperationException(databaseEngine.toString()); } case RDS_MULTI_AZ_CLUSTER: - case RDS_MULTI_AZ_INSTANCE: switch (databaseEngine) { case MYSQL: return "SELECT SUBSTRING_INDEX(endpoint, '.', 1) as id FROM mysql.rds_topology WHERE id=@@server_id"; diff --git a/wrapper/src/test/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImplTest.java b/wrapper/src/test/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImplTest.java index 12e761087..4412e2368 100644 --- a/wrapper/src/test/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImplTest.java +++ b/wrapper/src/test/java/software/amazon/jdbc/plugin/limitless/LimitlessRouterServiceImplTest.java @@ -16,8 +16,8 @@ package software.amazon.jdbc.plugin.limitless; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; @@ -31,10 +31,9 @@ import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.concurrent.TimeUnit; -import org.junit.Ignore; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -48,20 +47,31 @@ import software.amazon.jdbc.RoundRobinHostSelector; import software.amazon.jdbc.hostavailability.HostAvailability; import software.amazon.jdbc.hostavailability.SimpleHostAvailabilityStrategy; +import software.amazon.jdbc.util.FullServicesContainer; +import software.amazon.jdbc.util.FullServicesContainerImpl; +import software.amazon.jdbc.util.events.EventPublisher; +import software.amazon.jdbc.util.monitoring.MonitorService; +import software.amazon.jdbc.util.storage.StorageService; +import software.amazon.jdbc.util.storage.StorageServiceImpl; +import software.amazon.jdbc.util.telemetry.TelemetryFactory; -public class LimitlessRouterServiceImplTest { +@SuppressWarnings("resource") +class LimitlessRouterServiceImplTest { private static final String CLUSTER_ID = "someClusterId"; + @Mock private EventPublisher mockEventPublisher; + @Mock private MonitorService mockMonitorService; + @Mock private TelemetryFactory mockTelemetryFactory; @Mock private PluginService mockPluginService; @Mock private HostListProvider mockHostListProvider; - @Mock private LimitlessRouterMonitor mockLimitlessRouterMonitor; @Mock private LimitlessQueryHelper mockQueryHelper; @Mock JdbcCallable mockConnectFuncLambda; @Mock private Connection mockConnection; private static final HostSpec hostSpec = new HostSpecBuilder(new SimpleHostAvailabilityStrategy()) .host("some-instance").role(HostRole.WRITER).build(); - private static final long someExpirationNano = TimeUnit.MILLISECONDS.toNanos(60000); private static Properties props; + private FullServicesContainer servicesContainer; + private StorageService storageService; private AutoCloseable closeable; @BeforeEach @@ -72,12 +82,16 @@ public void init() throws SQLException { when(mockPluginService.getHostListProvider()).thenReturn(mockHostListProvider); when(mockPluginService.getProperties()).thenReturn(props); when(mockHostListProvider.getClusterId()).thenReturn(CLUSTER_ID); + + this.storageService = new StorageServiceImpl(mockEventPublisher); + servicesContainer = new FullServicesContainerImpl(this.storageService, mockMonitorService, mockTelemetryFactory); + servicesContainer.setPluginService(mockPluginService); } @AfterEach public void cleanup() throws Exception { closeable.close(); - LimitlessRouterServiceImpl.limitlessRouterCache.clear(); + this.storageService.clearAll(); } @Test @@ -93,9 +107,7 @@ void testEstablishConnection_GivenGetEmptyRouterListAndWaitForRouterInfo_ThenThr null ); final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( - mockPluginService, - (a, b, c, d, e) -> mockLimitlessRouterMonitor, - mockQueryHelper); + this.servicesContainer, mockQueryHelper, props); assertThrows(SQLException.class, () -> limitlessRouterService.establishConnection(inputContext)); } @@ -112,9 +124,7 @@ void testEstablishConnection_GivenGetEmptyRouterListAndNoWaitForRouterInfo_ThenC null ); final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( - mockPluginService, - (a, b, c, d, e) -> mockLimitlessRouterMonitor, - mockQueryHelper); + this.servicesContainer, mockQueryHelper, props); limitlessRouterService.establishConnection(inputContext); @@ -131,7 +141,10 @@ void testEstablishConnection_GivenHostSpecInRouterCache_ThenCallConnectFunc() th .build(), new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("instance-3").role(HostRole.WRITER).weight(100) .build()); - LimitlessRouterServiceImpl.limitlessRouterCache.put(CLUSTER_ID, routerList, someExpirationNano); + final LimitlessRouters routers = new LimitlessRouters(routerList); + final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( + this.servicesContainer, mockQueryHelper, props); + this.storageService.set(CLUSTER_ID, routers); final LimitlessConnectionContext inputContext = new LimitlessConnectionContext( routerList.get(1), @@ -141,10 +154,6 @@ void testEstablishConnection_GivenHostSpecInRouterCache_ThenCallConnectFunc() th null, null ); - final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( - mockPluginService, - (a, b, c, d, e) -> mockLimitlessRouterMonitor, - mockQueryHelper); limitlessRouterService.establishConnection(inputContext); @@ -174,14 +183,13 @@ void testEstablishConnection_GivenFetchRouterListAndHostSpecInRouterList_ThenCal null ); final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( - mockPluginService, - (a, b, c, d, e) -> mockLimitlessRouterMonitor, - mockQueryHelper); + this.servicesContainer, mockQueryHelper, props); limitlessRouterService.establishConnection(inputContext); assertEquals(mockConnection, inputContext.getConnection()); - assertEquals(routerList, LimitlessRouterServiceImpl.limitlessRouterCache.get(CLUSTER_ID, someExpirationNano)); + final LimitlessRouters routers = new LimitlessRouters(routerList); + assertEquals(routers, this.storageService.get(LimitlessRouters.class, CLUSTER_ID)); verify(mockQueryHelper, times(1)) .queryForLimitlessRouters(inputContext.getConnection(), inputContext.getHostSpec().getPort()); verify(mockConnectFuncLambda, times(1)).call(); @@ -197,7 +205,10 @@ void testEstablishConnection_GivenRouterCache_ThenSelectsHost() throws SQLExcept new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("instance-3").role(HostRole.WRITER).weight(100) .build()); final HostSpec selectedRouter = routerList.get(2); - LimitlessRouterServiceImpl.limitlessRouterCache.put(CLUSTER_ID, routerList, someExpirationNano); + final LimitlessRouters routers = new LimitlessRouters(routerList); + final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( + this.servicesContainer, mockQueryHelper, props); + this.storageService.set(CLUSTER_ID, routers); when(mockPluginService.getHostSpecByStrategy(any(), any(), any())).thenReturn(selectedRouter); when(mockPluginService.connect(any(), any(), any())).thenReturn(mockConnection); @@ -210,10 +221,6 @@ void testEstablishConnection_GivenRouterCache_ThenSelectsHost() throws SQLExcept null, null ); - final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( - mockPluginService, - (a, b, c, d, e) -> mockLimitlessRouterMonitor, - mockQueryHelper); limitlessRouterService.establishConnection(inputContext); @@ -247,14 +254,13 @@ void testEstablishConnection_GivenFetchRouterList_ThenSelectsHost() throws SQLEx null ); final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( - mockPluginService, - (a, b, c, d, e) -> mockLimitlessRouterMonitor, - mockQueryHelper); + this.servicesContainer, mockQueryHelper, props); limitlessRouterService.establishConnection(inputContext); assertEquals(mockConnection, inputContext.getConnection()); - assertEquals(routerList, LimitlessRouterServiceImpl.limitlessRouterCache.get(CLUSTER_ID, someExpirationNano)); + final LimitlessRouters routers = new LimitlessRouters(routerList); + assertEquals(routers, this.storageService.get(LimitlessRouters.class, CLUSTER_ID)); verify(mockQueryHelper, times(1)) .queryForLimitlessRouters(inputContext.getConnection(), inputContext.getHostSpec().getPort()); verify(mockConnectFuncLambda, times(1)).call(); @@ -270,7 +276,10 @@ void testEstablishConnection_GivenHostSpecInRouterCacheAndCallConnectFuncThrows_ .availability(HostAvailability.AVAILABLE).build(), new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("instance-3").role(HostRole.WRITER).weight(100) .availability(HostAvailability.AVAILABLE).build()); - LimitlessRouterServiceImpl.limitlessRouterCache.put(CLUSTER_ID, routerList, someExpirationNano); + final LimitlessRouters routers = new LimitlessRouters(routerList); + final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( + this.servicesContainer, mockQueryHelper, props); + this.storageService.set(CLUSTER_ID, routers); final HostSpec selectedRouter = routerList.get(2); final LimitlessConnectionContext inputContext = new LimitlessConnectionContext( routerList.get(1), @@ -285,22 +294,17 @@ void testEstablishConnection_GivenHostSpecInRouterCacheAndCallConnectFuncThrows_ when(mockPluginService.getHostSpecByStrategy(any(), any(), any())).thenReturn(selectedRouter); when(mockPluginService.connect(any(), any(), any())).thenReturn(mockConnection); - final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( - mockPluginService, - (a, b, c, d, e) -> mockLimitlessRouterMonitor, - mockQueryHelper); - limitlessRouterService.establishConnection(inputContext); assertEquals(mockConnection, inputContext.getConnection()); - assertEquals(routerList, LimitlessRouterServiceImpl.limitlessRouterCache.get(CLUSTER_ID, someExpirationNano)); + assertEquals(routers, this.storageService.get(LimitlessRouters.class, CLUSTER_ID)); verify(mockPluginService, times(1)) .getHostSpecByStrategy(routerList, HostRole.WRITER, HighestWeightHostSelector.STRATEGY_HIGHEST_WEIGHT); verify(mockPluginService, times(1)).connect(selectedRouter, inputContext.getProps(), null); verify(mockConnectFuncLambda, times(1)).call(); } - @Ignore + @Disabled @Test void testEstablishConnection_GivenSelectsHostThrows_ThenRetry() throws SQLException { final List routerList = Arrays.asList( @@ -311,7 +315,8 @@ void testEstablishConnection_GivenSelectsHostThrows_ThenRetry() throws SQLExcept new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("some-instance-3").role(HostRole.WRITER) .build()); final HostSpec selectedRouter = routerList.get(2); - LimitlessRouterServiceImpl.limitlessRouterCache.put(CLUSTER_ID, routerList, someExpirationNano); + final LimitlessRouters routers = new LimitlessRouters(routerList); + this.storageService.set(CLUSTER_ID, routers); when(mockPluginService.getHostSpecByStrategy(any(), any(), any())) .thenThrow(new SQLException()) .thenReturn(selectedRouter); @@ -326,14 +331,12 @@ void testEstablishConnection_GivenSelectsHostThrows_ThenRetry() throws SQLExcept null ); final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( - mockPluginService, - (a, b, c, d, e) -> mockLimitlessRouterMonitor, - mockQueryHelper); + this.servicesContainer, mockQueryHelper, props); limitlessRouterService.establishConnection(inputContext); assertEquals(mockConnection, inputContext.getConnection()); - assertEquals(routerList, LimitlessRouterServiceImpl.limitlessRouterCache.get(CLUSTER_ID, someExpirationNano)); + assertEquals(routers, this.storageService.get(LimitlessRouters.class, CLUSTER_ID)); verify(mockPluginService, times(2)).getHostSpecByStrategy(any(), any(), any()); verify(mockPluginService, times(1)) .getHostSpecByStrategy(routerList, HostRole.WRITER, RoundRobinHostSelector.STRATEGY_ROUND_ROBIN); @@ -351,10 +354,12 @@ void testEstablishConnection_GivenSelectsHostNull_ThenRetry() throws SQLExceptio .build(), new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("some-instance-3").role(HostRole.WRITER) .build()); + final LimitlessRouters routers = new LimitlessRouters(routerList); + final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( + this.servicesContainer, mockQueryHelper, props); + this.storageService.set(CLUSTER_ID, routers); final HostSpec selectedRouter = routerList.get(2); - LimitlessRouterServiceImpl.limitlessRouterCache.put(CLUSTER_ID, routerList, someExpirationNano); - when(mockPluginService.getHostSpecByStrategy(any(), any(), any())) - .thenReturn(null, selectedRouter); + when(mockPluginService.getHostSpecByStrategy(any(), any(), any())).thenReturn(null, selectedRouter); when(mockPluginService.connect(any(), any(), any())).thenReturn(mockConnection); final LimitlessConnectionContext inputContext = new LimitlessConnectionContext( @@ -365,15 +370,11 @@ void testEstablishConnection_GivenSelectsHostNull_ThenRetry() throws SQLExceptio null, null ); - final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( - mockPluginService, - (a, b, c, d, e) -> mockLimitlessRouterMonitor, - mockQueryHelper); limitlessRouterService.establishConnection(inputContext); assertEquals(mockConnection, inputContext.getConnection()); - assertEquals(routerList, LimitlessRouterServiceImpl.limitlessRouterCache.get(CLUSTER_ID, someExpirationNano)); + assertEquals(routers, this.storageService.get(LimitlessRouters.class, CLUSTER_ID)); verify(mockPluginService, times(2)).getHostSpecByStrategy(any(), any(), any()); verify(mockPluginService, times(1)) .getHostSpecByStrategy(routerList, HostRole.WRITER, RoundRobinHostSelector.STRATEGY_ROUND_ROBIN); @@ -391,9 +392,12 @@ void testEstablishConnection_GivenPluginServiceConnectThrows_ThenRetry() throws .build(), new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("some-instance-3").role(HostRole.WRITER) .build()); + final LimitlessRouters routers = new LimitlessRouters(routerList); + final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( + this.servicesContainer, mockQueryHelper, props); + this.storageService.set(CLUSTER_ID, routers); final HostSpec selectedRouter = routerList.get(1); final HostSpec selectedRouterForRetry = routerList.get(2); - LimitlessRouterServiceImpl.limitlessRouterCache.put(CLUSTER_ID, routerList, someExpirationNano); when(mockPluginService.getHostSpecByStrategy(any(), any(), any())) .thenReturn(selectedRouter, selectedRouterForRetry); when(mockPluginService.connect(any(), any(), any())) @@ -408,15 +412,11 @@ void testEstablishConnection_GivenPluginServiceConnectThrows_ThenRetry() throws null, null ); - final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( - mockPluginService, - (a, b, c, d, e) -> mockLimitlessRouterMonitor, - mockQueryHelper); limitlessRouterService.establishConnection(inputContext); assertEquals(mockConnection, inputContext.getConnection()); - assertEquals(routerList, LimitlessRouterServiceImpl.limitlessRouterCache.get(CLUSTER_ID, someExpirationNano)); + assertEquals(routers, this.storageService.get(LimitlessRouters.class, CLUSTER_ID)); verify(mockPluginService, times(2)).getHostSpecByStrategy(any(), any(), any()); verify(mockPluginService, times(1)) .getHostSpecByStrategy(routerList, HostRole.WRITER, RoundRobinHostSelector.STRATEGY_ROUND_ROBIN); @@ -436,7 +436,10 @@ void testEstablishConnection_GivenRetryAndMaxRetriesExceeded_thenThrowSqlExcepti .availability(HostAvailability.AVAILABLE).build(), new HostSpecBuilder(new SimpleHostAvailabilityStrategy()).host("instance-3").role(HostRole.WRITER).weight(100) .availability(HostAvailability.AVAILABLE).build()); - LimitlessRouterServiceImpl.limitlessRouterCache.put(CLUSTER_ID, routerList, someExpirationNano); + final LimitlessRouters routers = new LimitlessRouters(routerList); + final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( + this.servicesContainer, mockQueryHelper, props); + this.storageService.set(CLUSTER_ID, routers); final LimitlessConnectionContext inputContext = new LimitlessConnectionContext( routerList.get(0), @@ -451,11 +454,6 @@ void testEstablishConnection_GivenRetryAndMaxRetriesExceeded_thenThrowSqlExcepti when(mockPluginService.getHostSpecByStrategy(any(), any(), any())).thenReturn(routerList.get(0)); when(mockPluginService.connect(any(), any(), any())).thenThrow(new SQLException()); - final LimitlessRouterService limitlessRouterService = new LimitlessRouterServiceImpl( - mockPluginService, - (a, b, c, d, e) -> mockLimitlessRouterMonitor, - mockQueryHelper); - assertThrows(SQLException.class, () -> limitlessRouterService.establishConnection(inputContext)); verify(mockPluginService, times(LimitlessConnectionPlugin.MAX_RETRIES.getInteger(props)))