2020import java .sql .SQLException ;
2121import java .util .Collections ;
2222import java .util .HashSet ;
23- import java .util .List ;
2423import java .util .Properties ;
2524import java .util .Set ;
25+ import java .util .concurrent .locks .ReentrantLock ;
2626import java .util .function .Supplier ;
2727import java .util .logging .Logger ;
2828import org .checkerframework .checker .nullness .qual .NonNull ;
2929import software .amazon .jdbc .AwsWrapperProperty ;
30- import software .amazon .jdbc .HostRole ;
3130import software .amazon .jdbc .HostSpec ;
3231import software .amazon .jdbc .JdbcCallable ;
3332import software .amazon .jdbc .PluginService ;
3433import software .amazon .jdbc .PropertyDefinition ;
35- import software .amazon .jdbc .RoundRobinHostSelector ;
3634import software .amazon .jdbc .dialect .AuroraLimitlessDialect ;
3735import software .amazon .jdbc .dialect .Dialect ;
38- import software .amazon .jdbc .hostavailability .HostAvailability ;
3936import software .amazon .jdbc .plugin .AbstractConnectionPlugin ;
4037import software .amazon .jdbc .util .Messages ;
41- import software .amazon .jdbc .util .Utils ;
42- import software .amazon .jdbc .wrapper .HighestWeightHostSelector ;
4338
4439public class LimitlessConnectionPlugin extends AbstractConnectionPlugin {
4540 private static final Logger LOGGER = Logger .getLogger (LimitlessConnectionPlugin .class .getName ());
@@ -70,6 +65,7 @@ public class LimitlessConnectionPlugin extends AbstractConnectionPlugin {
7065 protected final Properties properties ;
7166 private final Supplier <LimitlessRouterService > limitlessRouterServiceSupplier ;
7267 private LimitlessRouterService limitlessRouterService ;
68+ private static final ReentrantLock lock = new ReentrantLock ();
7369 private static final Set <String > subscribedMethods =
7470 Collections .unmodifiableSet (new HashSet <String >() {
7571 {
@@ -101,6 +97,7 @@ public LimitlessConnectionPlugin(
10197 this .limitlessRouterServiceSupplier = limitlessRouterServiceSupplier ;
10298 }
10399
100+
104101 @ Override
105102 public Connection connect (
106103 final String driverProtocol ,
@@ -109,110 +106,18 @@ public Connection connect(
109106 final boolean isInitialConnection ,
110107 final JdbcCallable <Connection , SQLException > connectFunc )
111108 throws SQLException {
112- return connectInternal (driverProtocol , hostSpec , props , isInitialConnection , connectFunc );
113- }
114-
115- private Connection connectInternal (
116- final @ NonNull String driverProtocol ,
117- final @ NonNull HostSpec hostSpec ,
118- final @ NonNull Properties props ,
119- final boolean isInitialConnection ,
120- final JdbcCallable <Connection , SQLException > connectFunc ) throws SQLException {
121- final Dialect dialect = this .pluginService .getDialect ();
122- if (dialect instanceof AuroraLimitlessDialect ) {
123- return connectInternalWithDialect (driverProtocol , hostSpec , props , isInitialConnection , connectFunc );
124- } else {
125- return connectInternalWithoutDialect (driverProtocol , hostSpec , props , isInitialConnection , connectFunc );
126- }
127- }
128-
129- private Connection connectInternalWithDialect (
130- final @ NonNull String driverProtocol ,
131- final @ NonNull HostSpec hostSpec ,
132- final @ NonNull Properties props ,
133- final boolean isInitialConnection ,
134- final JdbcCallable <Connection , SQLException > connectFunc ) throws SQLException {
135-
136- initLimitlessRouterMonitorService ();
137- if (isInitialConnection ) {
138- this .limitlessRouterService
139- .startMonitoring (hostSpec , properties , INTERVAL_MILLIS .getInteger (properties ));
140- }
141-
142- List <HostSpec > limitlessRouters = this .limitlessRouterService .getLimitlessRouters (
143- this .pluginService .getHostListProvider ().getClusterId (), props );
144109
145110 Connection conn = null ;
146- if (Utils .isNullOrEmpty (limitlessRouters )) {
147- conn = connectFunc .call ();
148- LOGGER .finest (Messages .get ("LimitlessConnectionPlugin.limitlessRouterCacheEmpty" ));
149- final boolean waitForRouterInfo = WAIT_F0R_ROUTER_INFO .getBoolean (props );
150- if (waitForRouterInfo ) {
151- limitlessRouters = synchronouslyGetLimitlessRoutersWithRetry (conn , hostSpec .getPort (), props );
152- } else {
153- LOGGER .finest (Messages .get ("LimitlessConnectionPlugin.usingProvidedConnectUrl" ));
154- return conn ;
155- }
156- }
157-
158- if (limitlessRouters .contains (hostSpec )) {
159- LOGGER .finest (Messages .get ("LimitlessConnectionPlugin.connectWithHost" , new Object [] {hostSpec .getHost ()}));
160- if (conn == null || conn .isClosed ()) {
161- try {
162- conn = connectFunc .call ();
163- } catch (final SQLException e ) {
164- return retryConnectWithLeastLoadedRouters (limitlessRouters , props , null , hostSpec );
165- }
166- }
167- return conn ;
168- }
169-
170- RoundRobinHostSelector .setRoundRobinHostWeightPairsProperty (props , limitlessRouters );
171- HostSpec selectedHostSpec ;
172- try {
173- selectedHostSpec = this .pluginService .getHostSpecByStrategy (limitlessRouters ,
174- HostRole .WRITER , RoundRobinHostSelector .STRATEGY_ROUND_ROBIN );
175- LOGGER .fine (Messages .get (
176- "LimitlessConnectionPlugin.selectedHost" ,
177- new Object [] {selectedHostSpec != null ? selectedHostSpec .getHost () : "null" }));
178- } catch (SQLException e ) {
179- LOGGER .warning (Messages .get ("LimitlessConnectionPlugin.errorSelectingRouter" , new Object [] {e .getMessage ()}));
180- if (conn == null || conn .isClosed ()) {
181- conn = connectFunc .call ();
182- }
183- return retryConnectWithLeastLoadedRouters (limitlessRouters , props , conn , hostSpec );
184- }
185-
186- try {
187- return pluginService .connect (selectedHostSpec , props );
188- } catch (SQLException e ) {
189- if (selectedHostSpec != null ) {
190- LOGGER .fine (Messages .get (
191- "LimitlessConnectionPlugin.failedToConnectToHost" ,
192- new Object [] {selectedHostSpec .getHost ()}));
193- selectedHostSpec .setAvailability (HostAvailability .NOT_AVAILABLE );
194- }
195- if (conn == null || conn .isClosed ()) {
196- conn = connectFunc .call ();
197- }
198- // Retry connect prioritising healthiest router for best chance of connection over load-balancing with round-robin
199- return retryConnectWithLeastLoadedRouters (limitlessRouters , props , conn , hostSpec );
200- }
201- }
202-
203- private Connection connectInternalWithoutDialect (
204- final @ NonNull String driverProtocol ,
205- final @ NonNull HostSpec hostSpec ,
206- final @ NonNull Properties props ,
207- final boolean isInitialConnection ,
208- final JdbcCallable <Connection , SQLException > connectFunc ) throws SQLException {
209-
210- final Connection conn = connectFunc .call ();
211111
212112 final Dialect dialect = this .pluginService .getDialect ();
213113 if (!(dialect instanceof AuroraLimitlessDialect )) {
214- throw new UnsupportedOperationException (Messages .get ("LimitlessConnectionPlugin.unsupportedDialectOrDatabase" ,
215- new Object [] {dialect }));
114+ conn = connectFunc .call ();
115+ final Dialect refreshedDialect = this .pluginService .getDialect ();
116+ if (!(refreshedDialect instanceof AuroraLimitlessDialect )) {
117+ throw new UnsupportedOperationException (Messages .get (
118+ "LimitlessConnectionPlugin.unsupportedDialectOrDatabase" ,
119+ new Object [] {refreshedDialect }));
120+ }
216121 }
217122
218123 initLimitlessRouterMonitorService ();
@@ -221,106 +126,25 @@ private Connection connectInternalWithoutDialect(
221126 .startMonitoring (hostSpec , properties , INTERVAL_MILLIS .getInteger (properties ));
222127 }
223128
224- List <HostSpec > limitlessRouters = this .limitlessRouterService .getLimitlessRouters (
225- this .pluginService .getHostListProvider ().getClusterId (), props );
226- if (Utils .isNullOrEmpty (limitlessRouters )) {
227- LOGGER .finest (Messages .get ("LimitlessConnectionPlugin.limitlessRouterCacheEmpty" ));
228- final boolean waitForRouterInfo = WAIT_F0R_ROUTER_INFO .getBoolean (props );
229- if (waitForRouterInfo ) {
230- synchronouslyGetLimitlessRoutersWithRetry (conn , hostSpec .getPort (), props );
231- }
232- }
129+ final LimitlessConnectionContext context = new LimitlessConnectionContext (
130+ hostSpec ,
131+ props ,
132+ conn ,
133+ connectFunc ,
134+ null );
135+ this .limitlessRouterService .establishConnection (context );
233136
234- return conn ;
137+ if (context .getConnection () != null ) {
138+ return context .getConnection ();
139+ }
140+ throw new SQLException (Messages .get (
141+ "LimitlessConnectionPlugin.failedToConnectToHost" ,
142+ new Object [] {hostSpec .getHost ()}));
235143 }
236144
237145 private void initLimitlessRouterMonitorService () {
238146 if (limitlessRouterService == null ) {
239147 this .limitlessRouterService = this .limitlessRouterServiceSupplier .get ();
240148 }
241149 }
242-
243- private Connection retryConnectWithLeastLoadedRouters (
244- final List <HostSpec > limitlessRouters ,
245- final Properties props ,
246- final Connection conn ,
247- final HostSpec hostSpec ) throws SQLException {
248-
249- List <HostSpec > currentRouters = limitlessRouters ;
250- int retryCount = 0 ;
251- final int maxRetries = MAX_RETRIES .getInteger (props );
252-
253- while (retryCount ++ < maxRetries ) {
254- if (currentRouters .stream ().noneMatch (h -> h .getAvailability ().equals (HostAvailability .AVAILABLE ))) {
255- if (conn != null && !conn .isClosed ()) {
256- currentRouters = synchronouslyGetLimitlessRoutersWithRetry (conn , hostSpec .getPort (), props );
257- }
258-
259- if (currentRouters == null
260- || currentRouters .isEmpty ()
261- || currentRouters .stream ().noneMatch (h -> h .getAvailability ().equals (HostAvailability .AVAILABLE ))) {
262- LOGGER .warning (Messages .get ("LimitlessConnectionPlugin.noRoutersAvailableForRetry" ));
263- if (conn != null && !conn .isClosed ()) {
264- return conn ;
265- } else {
266- throw new SQLException (Messages .get ("LimitlessConnectionPlugin.noRoutersAvailable" ));
267- }
268- }
269- }
270-
271- HostSpec selectedHostSpec = hostSpec ;
272- try {
273- // Select healthiest router for best chance of connection over load-balancing with round-robin
274- selectedHostSpec = this .pluginService .getHostSpecByStrategy (limitlessRouters ,
275- HostRole .WRITER , HighestWeightHostSelector .STRATEGY_HIGHEST_WEIGHT );
276- LOGGER .finest (Messages .get (
277- "LimitlessConnectionPlugin.selectedHostForRetry" ,
278- new Object [] {selectedHostSpec .getHost ()}));
279- } catch (final UnsupportedOperationException e ) {
280- LOGGER .severe (Messages .get ("LimitlessConnectionPlugin.incorrectConfiguration" ));
281- throw e ;
282- } catch (final SQLException e ) {
283- // error from host selector
284- continue ;
285- }
286-
287- try {
288- return pluginService .connect (selectedHostSpec , props );
289- } catch (final SQLException e ) {
290- selectedHostSpec .setAvailability (HostAvailability .NOT_AVAILABLE );
291- LOGGER .finest (Messages .get (
292- "LimitlessConnectionPlugin.failedToConnectToHost" ,
293- new Object [] {selectedHostSpec .getHost ()}));
294- }
295- }
296- if (conn != null && !conn .isClosed ()) {
297- LOGGER .warning (Messages .get ("LimitlessConnectionPlugin.maxRetriesExceeded" ));
298- return conn ;
299- } else {
300- throw new SQLException (Messages .get ("LimitlessConnectionPlugin.maxRetriesExceeded" ));
301- }
302- }
303-
304- private List <HostSpec > synchronouslyGetLimitlessRoutersWithRetry (
305- final Connection conn , final int hostPort , final Properties props ) throws SQLException {
306- LOGGER .finest (Messages .get ("LimitlessConnectionPlugin.synchronouslyGetLimitlessRouters" ));
307- int retryCount = -1 ; // start at -1 since the first try is not a retry.
308- int maxRetries = GET_ROUTER_MAX_RETRIES .getInteger (props );
309- int retryIntervalMs = GET_ROUTER_RETRY_INTERVAL_MILLIS .getInteger (props );
310- do {
311- try {
312- List <HostSpec > newLimitlessRouters = this .limitlessRouterService .forceGetLimitlessRoutersWithConn (
313- conn , hostPort , props );
314- if (newLimitlessRouters != null && !newLimitlessRouters .isEmpty ()) {
315- return newLimitlessRouters ;
316- }
317- Thread .sleep (retryIntervalMs );
318- } catch (final InterruptedException e ) {
319- Thread .currentThread ().interrupt ();
320- } finally {
321- retryCount ++;
322- }
323- } while (retryCount < maxRetries );
324- throw new SQLException (Messages .get ("LimitlessConnectionPlugin.noRoutersAvailable" ));
325- }
326150}
0 commit comments