11package redis .clients .jedis ;
22
3+ import org .slf4j .Logger ;
4+ import org .slf4j .LoggerFactory ;
5+
36import redis .clients .jedis .exceptions .JedisAskDataException ;
47import redis .clients .jedis .exceptions .JedisClusterMaxAttemptsException ;
58import redis .clients .jedis .exceptions .JedisClusterOperationException ;
1114
1215public abstract class JedisClusterCommand <T > {
1316
17+ private static final Logger LOG = LoggerFactory .getLogger (JedisClusterCommand .class );
18+
1419 private final JedisClusterConnectionHandler connectionHandler ;
1520 private final int maxAttempts ;
1621
@@ -22,7 +27,7 @@ public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int
2227 public abstract T execute (Jedis connection );
2328
2429 public T run (String key ) {
25- return runWithRetries (JedisClusterCRC16 .getSlot (key ), this . maxAttempts , false , null );
30+ return runWithRetries (JedisClusterCRC16 .getSlot (key ));
2631 }
2732
2833 public T run (int keyCount , String ... keys ) {
@@ -42,11 +47,11 @@ public T run(int keyCount, String... keys) {
4247 }
4348 }
4449
45- return runWithRetries (slot , this . maxAttempts , false , null );
50+ return runWithRetries (slot );
4651 }
4752
4853 public T runBinary (byte [] key ) {
49- return runWithRetries (JedisClusterCRC16 .getSlot (key ), this . maxAttempts , false , null );
54+ return runWithRetries (JedisClusterCRC16 .getSlot (key ));
5055 }
5156
5257 public T runBinary (int keyCount , byte []... keys ) {
@@ -66,7 +71,7 @@ public T runBinary(int keyCount, byte[]... keys) {
6671 }
6772 }
6873
69- return runWithRetries (slot , this . maxAttempts , false , null );
74+ return runWithRetries (slot );
7075 }
7176
7277 public T runWithAnyNode () {
@@ -79,61 +84,62 @@ public T runWithAnyNode() {
7984 }
8085 }
8186
82- private T runWithRetries (final int slot , int attempts , boolean tryRandomNode , JedisRedirectionException redirect ) {
83- if (attempts <= 0 ) {
84- throw new JedisClusterMaxAttemptsException ("No more cluster attempts left." );
85- }
86-
87- Jedis connection = null ;
88- try {
89-
90- if (redirect != null ) {
91- connection = this .connectionHandler .getConnectionFromNode (redirect .getTargetNode ());
92- if (redirect instanceof JedisAskDataException ) {
93- // TODO: Pipeline asking with the original command to make it faster....
94- connection .asking ();
95- }
96- } else {
97- if (tryRandomNode ) {
98- connection = connectionHandler .getConnection ();
87+ private T runWithRetries (final int slot ) {
88+ JedisRedirectionException redirect = null ;
89+ Exception lastException = null ;
90+ for (int attemptsLeft = this .maxAttempts ; attemptsLeft > 0 ; attemptsLeft --) {
91+ Jedis connection = null ;
92+ try {
93+ if (redirect != null ) {
94+ connection = connectionHandler .getConnectionFromNode (redirect .getTargetNode ());
95+ if (redirect instanceof JedisAskDataException ) {
96+ // TODO: Pipeline asking with the original command to make it faster....
97+ connection .asking ();
98+ }
9999 } else {
100100 connection = connectionHandler .getConnectionFromSlot (slot );
101101 }
102- }
103-
104- return execute (connection );
105102
106- } catch (JedisNoReachableClusterNodeException jnrcne ) {
107- throw jnrcne ;
108- } catch (JedisConnectionException jce ) {
109- // release current connection before recursion
110- releaseConnection (connection );
111- connection = null ;
112-
113- if (attempts <= 1 ) {
114- //We need this because if node is not reachable anymore - we need to finally initiate slots
115- //renewing, or we can stuck with cluster state without one node in opposite case.
116- //But now if maxAttempts = [1 or 2] we will do it too often.
117- //TODO make tracking of successful/unsuccessful operations for node - do renewing only
118- //if there were no successful responses from this node last few seconds
119- this .connectionHandler .renewSlotCache ();
120- }
121-
122- return runWithRetries (slot , attempts - 1 , tryRandomNode , redirect );
123- } catch (JedisRedirectionException jre ) {
124- // if MOVED redirection occurred,
125- if (jre instanceof JedisMovedDataException ) {
126- // it rebuilds cluster's slot cache recommended by Redis cluster specification
127- this .connectionHandler .renewSlotCache (connection );
103+ return execute (connection );
104+
105+ } catch (JedisNoReachableClusterNodeException jnrcne ) {
106+ throw jnrcne ;
107+ } catch (JedisConnectionException jce ) {
108+ lastException = jce ;
109+ LOG .debug ("Failed connecting to Redis: {}" , connection , jce );
110+ // "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet
111+ handleConnectionProblem (attemptsLeft - 1 );
112+ } catch (JedisRedirectionException jre ) {
113+ // avoid updating lastException if it is a connection exception
114+ if (lastException == null || lastException instanceof JedisRedirectionException ) {
115+ lastException = jre ;
116+ }
117+ LOG .debug ("Redirected by server to {}" , jre .getTargetNode ());
118+ redirect = jre ;
119+ // if MOVED redirection occurred,
120+ if (jre instanceof JedisMovedDataException ) {
121+ // it rebuilds cluster's slot cache recommended by Redis cluster specification
122+ this .connectionHandler .renewSlotCache (connection );
123+ }
124+ } finally {
125+ releaseConnection (connection );
128126 }
127+ }
129128
130- // release current connection before recursion
131- releaseConnection (connection );
132- connection = null ;
129+ JedisClusterMaxAttemptsException maxAttemptsException
130+ = new JedisClusterMaxAttemptsException ("No more cluster attempts left." );
131+ maxAttemptsException .addSuppressed (lastException );
132+ throw maxAttemptsException ;
133+ }
133134
134- return runWithRetries (slot , attempts - 1 , false , jre );
135- } finally {
136- releaseConnection (connection );
135+ private void handleConnectionProblem (int attemptsLeft ) {
136+ if (attemptsLeft <= 1 ) {
137+ //We need this because if node is not reachable anymore - we need to finally initiate slots
138+ //renewing, or we can stuck with cluster state without one node in opposite case.
139+ //But now if maxAttempts = [1 or 2] we will do it too often.
140+ //TODO make tracking of successful/unsuccessful operations for node - do renewing only
141+ //if there were no successful responses from this node last few seconds
142+ this .connectionHandler .renewSlotCache ();
137143 }
138144 }
139145
0 commit comments