Skip to content

Commit

Permalink
Add ping task (#13)
Browse files Browse the repository at this point in the history
* Add ping task

* code review feedback - surround sendPing with try-catch

Co-authored-by: Hemraj <hemrajsinh@benzinga.com>
gharia and Hemraj authored Dec 10, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 212def1 commit df41c2a
Showing 1 changed file with 30 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft;
@@ -46,8 +47,12 @@ public class SquawkWSClient extends WebSocketClient {
private static final long CONNECTION_RETRY_INTERVAL = 20L;
// For what period of time the client should keep retrying at CONNECTION_RETRY_INTERVAL
private static final long RETRY_PERIOD = 60L * 15L;
// Interval for sending ping requests
private static final long PING_INTERVAL_SECONDS = 25L;

ScheduledExecutorService scheduledExecutorService;
private ScheduledExecutorService reconnectionService;
private ScheduledExecutorService pingSchedulerService = Executors.newScheduledThreadPool(1);
private ScheduledFuture<?> pingTask;

private boolean shouldRetryConnection = true;

@@ -74,11 +79,11 @@ public SquawkWSClient( URI serverUri, Map<String, String> httpHeaders ) {
@Override
public void onOpen( ServerHandshake handshakedata ) {
log.info("WebSocket connection opened");
if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) {
scheduledExecutorService.shutdownNow();
if (null != reconnectionService && !reconnectionService.isShutdown()) {
reconnectionService.shutdownNow();
// To start retrying when next time it disconnect
shouldRetryConnection = true;
}
}
sendAuthMessage();
}

@@ -91,7 +96,18 @@ public void onMessage( String message ) {
if (msg.has("error")) {
log.error("Authentication failed {}", msg.get("error").getAsString());
this.closeWS();
} else {
} else {
pingTask = pingSchedulerService.scheduleAtFixedRate(
() -> {
try {
this.sendPing();
} catch (Exception e) {
log.error("Error sending ping request", e);
}
},
PING_INTERVAL_SECONDS,
PING_INTERVAL_SECONDS,
TimeUnit.SECONDS);
log.info("Authentication successful. Joining Room.");
this.sendJoinRoomMessage();
}
@@ -127,20 +143,23 @@ public void onMessage( String message ) {
@Override
public void onClose( int code, String reason, boolean remote ) {
log.info( "Connection closed by " + ( remote ? "remote peer" : "us" ) + " Code: " + code + " Reason: " + reason );
if (!pingSchedulerService.isShutdown() && null != pingTask) {
pingTask.cancel(true);
}
if (this.shouldRetryConnection) {
scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(
reconnectionService = Executors.newScheduledThreadPool(1);
reconnectionService.scheduleAtFixedRate(
() -> {
log.info("Retrying connecting the Sqauwk");
this.reconnect();
},
CONNECTION_RETRY_INTERVAL,
CONNECTION_RETRY_INTERVAL,
TimeUnit.SECONDS);
scheduledExecutorService.schedule(
reconnectionService.schedule(
() -> {
System.out.println("Not able to re-connect to Squawk. Stopping retries." );
scheduledExecutorService.shutdown();
reconnectionService.shutdown();
},
RETRY_PERIOD,TimeUnit.SECONDS);
// To stop starting any new retry scheduler as the one scheduler is started
@@ -165,6 +184,7 @@ public void sendPing() {
private void closeWS() {
log.info("Closing WebSocket connection.");
this.shouldRetryConnection= false;
this.pingSchedulerService.shutdown();
this.close();
}

@@ -273,7 +293,7 @@ public void run()
if (c.isOpen()) {
log.info("Shutting down WebSocket Client!");
c.sendLogoutMessage();
c.close();
c.closeWS();
}
}
});

0 comments on commit df41c2a

Please sign in to comment.