forked from Petersoj/alpaca-java
-
Notifications
You must be signed in to change notification settings - Fork 0
/
AlpacaWebsocket.java
309 lines (264 loc) · 9.64 KB
/
AlpacaWebsocket.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
package net.jacobpeterson.alpaca.websocket;
import net.jacobpeterson.alpaca.util.okhttp.WebsocketStateListener;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* {@link AlpacaWebsocket} represents an abstract websocket for Alpaca.
*
* @param <L> the {@link AlpacaWebsocketMessageListener} type parameter
* @param <T> the 'message type' type parameter
* @param <M> the 'message' type parameter
*/
public abstract class AlpacaWebsocket<T, M, L extends AlpacaWebsocketMessageListener<T, M>> extends WebSocketListener
implements AlpacaWebsocketInterface<L> {
/**
* Defines a websocket normal closure code.
*
* @see WebSocket#close(int, String)
*/
public static final int WEBSOCKET_NORMAL_CLOSURE_CODE = 1000;
/**
* Defines a websocket normal closure message.
*
* @see WebSocket#close(int, String)
*/
public static final String WEBSOCKET_NORMAL_CLOSURE_MESSAGE = "Normal closure";
/** Defines the maximum number of reconnection attempts to be made by an {@link AlpacaWebsocket}. */
public static int MAX_RECONNECT_ATTEMPTS = 5;
/**
* Defines the millisecond sleep interval between reconnection attempts made by an {@link AlpacaWebsocket}.
*/
public static int RECONNECT_SLEEP_INTERVAL = 1000;
private static final Logger LOGGER = LoggerFactory.getLogger(AlpacaWebsocket.class);
protected final OkHttpClient okHttpClient;
protected final HttpUrl websocketURL;
protected final String websocketName;
protected final String keyID;
protected final String secretKey;
protected final String oAuthToken;
protected final boolean useOAuth;
protected L listener;
protected WebsocketStateListener websocketStateListener;
protected WebSocket websocket;
protected boolean connected;
protected boolean authenticated;
protected CompletableFuture<Boolean> authenticationMessageFuture;
protected boolean intentionalClose;
protected int reconnectAttempts;
protected boolean automaticallyReconnect;
/**
* Instantiates a {@link AlpacaWebsocket}.
*
* @param okHttpClient the {@link OkHttpClient}
* @param websocketURL the websocket {@link HttpUrl}
* @param websocketName the websocket name
* @param keyID the key ID
* @param secretKey the secret key
* @param oAuthToken the OAuth token
*/
public AlpacaWebsocket(OkHttpClient okHttpClient, HttpUrl websocketURL, String websocketName,
String keyID, String secretKey, String oAuthToken) {
checkNotNull(okHttpClient);
checkNotNull(websocketURL);
checkNotNull(websocketName);
this.okHttpClient = okHttpClient;
this.websocketURL = websocketURL;
this.websocketName = websocketName;
this.keyID = keyID;
this.secretKey = secretKey;
this.oAuthToken = oAuthToken;
useOAuth = oAuthToken != null;
automaticallyReconnect = true;
}
@Override
public void connect() {
if (!isConnected()) {
Request websocketRequest = new Request.Builder()
.url(websocketURL)
.get()
.build();
websocket = okHttpClient.newWebSocket(websocketRequest, this);
}
}
@Override
public void disconnect() {
if (websocket != null && isConnected()) {
intentionalClose = true;
websocket.close(WEBSOCKET_NORMAL_CLOSURE_CODE, WEBSOCKET_NORMAL_CLOSURE_MESSAGE);
} else {
cleanupState();
}
}
@Override
public boolean isConnected() {
return connected;
}
@Override
public boolean isAuthenticated() {
return authenticated;
}
@Override
public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
connected = true;
LOGGER.info("{} websocket opened.", websocketName);
LOGGER.debug("{} websocket response: {}", websocketName, response);
// Call 'onConnection' or 'onReconnection' async to avoid any potential dead-locking since this is called
// in sync with 'onMessage' in OkHttp's 'WebSocketListener'
ForkJoinPool.commonPool().execute(() -> {
if (reconnectAttempts > 0) {
reconnectAttempts = 0;
onReconnection();
} else {
onConnection();
}
});
if (websocketStateListener != null) {
websocketStateListener.onOpen(response);
}
}
@Override
public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
connected = false;
if (intentionalClose) {
LOGGER.info("{} websocket closed.", websocketName);
LOGGER.debug("Close code: {}, Reason: {}", code, reason);
cleanupState();
} else {
LOGGER.error("{} websocket closed unintentionally! Code: {}, Reason: {}", websocketName, code, reason);
handleReconnectionAttempt();
}
if (websocketStateListener != null) {
websocketStateListener.onClosed(code, reason);
}
}
@Override
public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable cause, @Nullable Response response) {
LOGGER.error("{} websocket failure!", websocketName, cause);
// A websocket failure occurs when either there is a connection failure or when the client throws
// an Exception when receiving a message. In either case, OkHttp will silently close the websocket
// connection, so try to reopen it.
connected = false;
handleReconnectionAttempt();
if (websocketStateListener != null) {
websocketStateListener.onFailure(cause);
}
}
/**
* Attempts to reconnect the disconnected {@link #websocket} asynchronously.
*/
private void handleReconnectionAttempt() {
if (!automaticallyReconnect) {
return;
}
if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
LOGGER.info("Attempting to reconnect {} websocket in {} milliseconds...",
websocketName, RECONNECT_SLEEP_INTERVAL);
reconnectAttempts++;
ForkJoinPool.commonPool().execute(() -> {
try {
Thread.sleep(RECONNECT_SLEEP_INTERVAL);
} catch (InterruptedException ignored) {
return;
}
connect();
});
} else {
LOGGER.error("Exhausted {} reconnection attempts. Not attempting to reconnect.", MAX_RECONNECT_ATTEMPTS);
cleanupState();
}
}
/**
* Cleans up this instances state variables.
*/
protected void cleanupState() {
websocket = null;
connected = false;
authenticated = false;
if (authenticationMessageFuture != null && !authenticationMessageFuture.isDone()) {
authenticationMessageFuture.complete(false);
}
authenticationMessageFuture = null;
intentionalClose = false;
reconnectAttempts = 0;
}
/**
* Called asynchronously when a websocket connection is made.
*/
protected abstract void onConnection();
/**
* Called asynchronously when a websocket reconnection is made after unintentional disconnection.
*/
protected abstract void onReconnection();
/**
* Sends an authentication message to authenticate this websocket stream.
*/
protected abstract void sendAuthenticationMessage();
@Override
public Future<Boolean> getAuthorizationFuture() {
if (authenticationMessageFuture == null) {
authenticationMessageFuture = new CompletableFuture<>();
}
return authenticationMessageFuture;
}
/**
* Calls the {@link AlpacaWebsocketMessageListener}.
*
* @param messageType the message type
* @param message the message
*/
protected void callListener(T messageType, M message) {
try {
listener.onMessage(messageType, message);
} catch (Exception exception) {
LOGGER.error("{} listener threw exception!", websocketName, exception);
}
}
@Override
public void setListener(L listener) {
this.listener = listener;
}
/**
* Gets {@link #websocketStateListener}.
*
* @return the {@link WebsocketStateListener}
*/
public WebsocketStateListener getWebsocketStateListener() {
return websocketStateListener;
}
/**
* Sets {@link #websocketStateListener}.
*
* @param websocketStateListener an {@link WebsocketStateListener}
*/
public void setWebsocketStateListener(WebsocketStateListener websocketStateListener) {
this.websocketStateListener = websocketStateListener;
}
/**
* Gets {@link #automaticallyReconnect}.
*
* @return a boolean
*/
public boolean doesAutomaticallyReconnect() {
return automaticallyReconnect;
}
/**
* Sets {@link #automaticallyReconnect}.
*
* @param automaticallyReconnect the boolean
*/
public void setAutomaticallyReconnect(boolean automaticallyReconnect) {
this.automaticallyReconnect = automaticallyReconnect;
}
}