Skip to content

Commit

Permalink
feat(reconnect): handle reconnect message type in java websocket sdk (#…
Browse files Browse the repository at this point in the history
…83)

Co-authored-by: bwilloughby <[email protected]>
  • Loading branch information
byblakeorriver and byblakeorriver authored Nov 24, 2020
1 parent 97aff9a commit 633fe9c
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 2 deletions.
2 changes: 1 addition & 1 deletion data-api/java-websocket/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>io.coinapi.websocket</groupId>
<artifactId>v1</artifactId>
<version>1.2</version>
<version>1.3</version>

<organization>
<name>COINAPI LTD</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ public interface CoinAPIWebSocket {
void setOHLCVInvoke(InvokeFunction function);
void setVolumeInvoke(InvokeFunction function);
void setErrorInvoke(InvokeFunction function);
void setReconnectInvoke(InvokeFunction function);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class CoinAPIWebSocketImpl implements CoinAPIWebSocket {
private InvokeFunction ohlcvInvoke;
private InvokeFunction volumeInvoke;
private InvokeFunction errorInvoke;
private InvokeFunction reconnectInvoke;

/**
*
Expand Down Expand Up @@ -80,6 +81,9 @@ public CoinAPIWebSocketImpl(Boolean isSandbox) {
case error:
handle(message, Error.class, errorInvoke);
break;
case reconnect:
handle(message, Reconnect.class, reconnectInvoke);
break;
}
} catch (Exception e) {
e.printStackTrace();
Expand Down Expand Up @@ -208,6 +212,13 @@ public void setVolumeInvoke(InvokeFunction function) {
@Override
public void setErrorInvoke(InvokeFunction function) { this.errorInvoke = function; }

/**
*
* @param function
*/
@Override
public void setReconnectInvoke(InvokeFunction function) { this.reconnectInvoke = function; }

private void handle(String message, Class deserializeClass, InvokeFunction invokeFunction) throws IOException, NotImplementedException {

Object deserialize = json.deserialize(deserializeClass, new ByteArrayInputStream(message.getBytes()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ public enum MessageTypeEnum {
ohlcv,
volume,
hearbeat,
error
error,
reconnect
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.coinapi.websocket.model;

import com.dslplatform.json.CompiledJson;
import com.dslplatform.json.JsonAttribute;

import java.time.OffsetDateTime;

@CompiledJson
public class Reconnect extends MessageBase {

private Integer withinSeconds;
private OffsetDateTime beforeTime;

public Integer getWithinSeconds() { return withinSeconds; }

@JsonAttribute(name = "within_seconds")
public void setWithinSeconds(Integer withinSeconds) { this.withinSeconds = withinSeconds; }

public OffsetDateTime getBeforeTime() { return beforeTime; }

@JsonAttribute(name = "before_time")
public void setBeforeTime(OffsetDateTime beforeTime) { this.beforeTime = beforeTime; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import io.coinapi.websocket.CoinAPIWebSocketImpl;
import io.coinapi.websocket.model.Hello;
import org.junit.Before;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;

public abstract class MessageInjectableCoinAPISDKTest {
protected String apiKey;
protected CoinAPIWebSocketImpl coinAPIWebSocket;
protected Queue messagesQueue;

@Before
public void configuration() throws IOException, NoSuchFieldException, IllegalAccessException {
Config config = new Config();
apiKey = config.getPropValues("coinapi_key");

//make private messagesQueue variable accessible for injecting messages during testing
Field messagesQueueField = CoinAPIWebSocketImpl.class.getDeclaredField("messagesQueue");
messagesQueueField.setAccessible(true);

coinAPIWebSocket = new CoinAPIWebSocketImpl(true);

//inject this public queue into the class
Queue messagesQueueToSet = new LinkedBlockingDeque();
messagesQueueField.set(coinAPIWebSocket, messagesQueueToSet);

messagesQueue = messagesQueueToSet;
}

public Hello createHello(String type) {
Hello hello = new Hello();
hello.setApiKey(apiKey);
hello.setSubscribeDataType(new String[]{type});
return hello;
}
}
39 changes: 39 additions & 0 deletions data-api/java-websocket/src/test/java/ReconnectTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

public class ReconnectTest extends MessageInjectableCoinAPISDKTest {

@Test
public void getReconnectMessages() throws IOException, InterruptedException {

AtomicReference<Integer> msgCount = new AtomicReference<>(0);

coinAPIWebSocket.setOHLCVInvoke(message -> {
//stub invoke to allow connection
});

coinAPIWebSocket.setReconnectInvoke(message -> {
msgCount.getAndSet(msgCount.get() + 1);
});

coinAPIWebSocket.sendHelloMessage(createHello("ohlcv"));

String message = "{" +
"\"type\": \"reconnect\"," +
"\"within_seconds\": 10," +
"\"before_time\": \"2020-08-06T19:19:09.7035429Z\"" +
"}";

//inject reconnect message into exposed queue
messagesQueue.add(message);

Thread.sleep(10000);
System.out.println("processing " + msgCount.get() + " reconnect messages");

coinAPIWebSocket.closeConnect();
Assert.assertEquals(1, msgCount.get().intValue());
}
}

0 comments on commit 633fe9c

Please sign in to comment.