Skip to content

Commit

Permalink
First attempt on implementing SSE, doesn't work without the socket yet.
Browse files Browse the repository at this point in the history
  • Loading branch information
cdr-chakotay committed Oct 19, 2023
1 parent fab1dab commit 217ec4d
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 105 deletions.
2 changes: 1 addition & 1 deletion .github/linters/sun_checks.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<module name="FileLength"/>
<module name="LineLength">
<property name="fileExtensions" value="java"/>
<property name="max" value="120"/>
<property name="max" value="150"/>
</module>

<!-- Checks for whitespace -->
Expand Down
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ dependencies {
implementation 'org.jetbrains:annotations:23.0.0'
implementation 'io.tus.java.client:tus-java-client:0.4.5'
implementation 'joda-time:joda-time:2.12.2'
implementation 'com.squareup.okhttp3:okhttp:4.10.0'
implementation 'com.squareup.okhttp3:okhttp:4.11.0'
implementation 'org.json:json:20230227'
implementation 'commons-codec:commons-codec:1.15'
implementation 'io.socket:socket.io-client:2.1.0'
implementation 'com.launchdarkly:okhttp-eventsource:4.1.1'


testImplementation 'junit:junit:4.13.2'
testImplementation 'org.mock-server:mockserver-junit-rule:5.15.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public final class AsyncExample {
* @param args
*/
public static void main(String[] args) {
Transloadit transloadit = new Transloadit("TRANSLOADIT_KEY", "TRANSLOADIT_SECRET");
Transloadit transloadit = new Transloadit(System.getenv("TRANSLOADIT_KEY"), System.getenv("TRANSLOADIT_SECRET"));

Map<String, Object> stepOptions = new HashMap<String, Object>();
stepOptions.put("width", 75);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public final class AsyncPausePlayExample {
* @param args
*/
public static void main(String[] args) {
Transloadit transloadit = new Transloadit("TRANSLOADIT_KEY", "TRANSLOADIT_SECRET");
Transloadit transloadit = new Transloadit(System.getenv("TRANSLOADIT_KEY"), System.getenv("TRANSLOADIT_SECRET"));

Map<String, Object> stepOptions = new HashMap<String, Object>();
stepOptions.put("width", 75);
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Uncomment following line if you want to use the local java-sdk
// for the example instead of pulling the JARs from JCenter.
// This is useful for debugging and testing new features.
//include ':examples'
include ':examples'
rootProject.name = 'transloadit'
188 changes: 96 additions & 92 deletions src/main/java/com/transloadit/sdk/Assembly.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
package com.transloadit.sdk;

import com.launchdarkly.eventsource.ConnectStrategy;
import com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.eventsource.MessageEvent;
import com.launchdarkly.eventsource.background.BackgroundEventHandler;
import com.launchdarkly.eventsource.background.BackgroundEventSource;
import com.transloadit.sdk.exceptions.LocalOperationException;
import com.transloadit.sdk.exceptions.RequestException;
import com.transloadit.sdk.response.AssemblyResponse;
import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;
import io.socket.engineio.client.transports.WebSocket;
import io.tus.java.client.ProtocolException;
import io.tus.java.client.TusClient;
import io.tus.java.client.TusURLMemoryStore;
import io.tus.java.client.TusURLStore;
import io.tus.java.client.TusUpload;
import org.jetbrains.annotations.TestOnly;
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
// CHECKSTYLE:OFF
import io.tus.java.client.TusUploader;
// CHECKTYLE:ON
Expand All @@ -48,7 +50,7 @@ public class Assembly extends OptionsBuilder {
protected boolean shouldWaitForCompletion;
protected AssemblyListener assemblyListener;
protected AssemblyListener runnableAssemblyListener;
protected Socket socket;
protected BackgroundEventSource backgroundEventSource;


protected ArrayList<TusUploadRunnable> threadList;
Expand Down Expand Up @@ -252,8 +254,8 @@ public AssemblyResponse save(boolean isResumable)
throw new RequestException("Request to Assembly failed: " + response.json().getString("error"));
}

if (shouldWaitWithSocket()) {
listenToSocket(response);
if (shouldWaitWithSSE()) {
listenToServerSentEvents(response);
}

try {
Expand All @@ -265,12 +267,12 @@ public AssemblyResponse save(boolean isResumable)
}
} else {
response = new AssemblyResponse(request.post(obtainUploadUrlSuffix(), options, null, files, fileStreams));
if (shouldWaitWithSocket() && !response.isFinished()) {
listenToSocket(response);
if (shouldWaitWithSSE() && !response.isFinished()) {
listenToServerSentEvents(response);
}
}

return shouldWaitWithoutSocket() ? waitTillComplete(response) : response;
return shouldWaitWithoutSSE() ? waitTillComplete(response) : response;
}

/**
Expand Down Expand Up @@ -498,115 +500,117 @@ public void onAssemblyResultFinished(String stepName, JSONObject result) {
* <li>{@code false} if the client should not wait for completion by observing the HTTP - Response</li></ul>
* @see Assembly#save(boolean) Usage in Assembly.save()
*/
protected boolean shouldWaitWithoutSocket() {
protected boolean shouldWaitWithoutSSE() {
return this.shouldWaitForCompletion && this.assemblyListener == null;
}

/**
* Determines if the Client should wait until the Assembly execution is finished by observing a server socket. <p>
* Determines if the Client should wait until the Assembly execution is finished by observing a server sent events (SSE). <p>
* Can only be {@code true} if <code> {@link #shouldWaitForCompletion} = true</code> and an
* {@link AssemblyListener} has been specified.</p>
* @return <ul><li>{@code true} if the client should wait for Assembly completion by observing the socket</li>
* <li>{@code false} if the client should not wait for completion by observing the socket.</li></ul>
* @return <ul><li>{@code true} if the client should wait for Assembly completion by observing SSE</li>
* <li>{@code false} if the client should not wait for completion by observing the SSE.</li></ul>
* @see Assembly#save(boolean) Usage in Assembly.save()
*/
protected boolean shouldWaitWithSocket() {
protected boolean shouldWaitWithSSE() {
return this.shouldWaitForCompletion && this.assemblyListener != null;
}

/**
* Opens a Websocket to the provided URL in order to receive updates on the assembly's execution status.
* @param socketUrl target url to open the WebSocket at.
* @return {@link Socket}
* @throws LocalOperationException
*/
Socket getSocket(String socketUrl) throws LocalOperationException {
IO.Options options = new IO.Options();
options.transports = new String[] {WebSocket.NAME };
try {
URL url = new URL(socketUrl);
options.path = url.getPath();
String host = url.getProtocol() + "://" + url.getHost();
return IO.socket(host, options);
} catch (URISyntaxException | MalformedURLException e) {
throw new LocalOperationException(e);
}
}

/**
* Wait till the assembly is finished and then return the response of the complete state.
*
* @param response {@link AssemblyResponse}
* @throws LocalOperationException if something goes wrong while running non-http operations.
*/
private void listenToSocket(AssemblyResponse response) throws LocalOperationException {
final String assemblyUrl = response.getSslUrl();
final String assemblyId = response.getId();
private void listenToServerSentEvents(AssemblyResponse response) {
final URI sseUpdateStreamUrl = URI.create(response.getUpdateStreamUrl());


socket = getSocket(response.getWebsocketUrl());
Emitter.Listener onFinished = new Emitter.Listener() {
BackgroundEventHandler myHandler = new BackgroundEventHandler() {
@Override
public void call(Object... args) {
socket.disconnect();
try {
getAssemblyListener().onAssemblyFinished(transloadit.getAssemblyByUrl(assemblyUrl));
} catch (RequestException e) {
getAssemblyListener().onError(e);
} catch (LocalOperationException e) {
getAssemblyListener().onError(e);
}
public void onOpen() {
}
};

Emitter.Listener onConnect = new Emitter.Listener() {
@Override
public void call(Object... args) {
JSONObject obj = new JSONObject();
obj.put("id", assemblyId);
socket.emit("assembly_connect", obj);
public void onClosed() {
}
// Here the different SSE sent events are getting piped to the corresponding assembly event via the {@link AssemblyListener}
public void onMessage(String event, MessageEvent messageEvent) {
if (event != null && messageEvent != null) {
// In case of a message event, without additional payload.
if (event.equals("message")) {
String messageContent = messageEvent.getData();
if (messageContent.equals("assembly_finished")) {
try {
getAssemblyListener().onAssemblyFinished(transloadit.getAssemblyByUrl(response.getSslUrl()));
} catch (RequestException | LocalOperationException e) {
getAssemblyListener().onError(e);
} finally {
// Close the event source, as the assembly encoding process has finished.
getBackgroundEventSource().close();
}
}

if (messageContent.equals("assembly_uploading_finished")) {
getAssemblyListener().onAssemblyUploadFinished();
}

if (messageContent.equals("assembly_upload_meta_data_extracted")) {
getAssemblyListener().onMetadataExtracted();
}

// In case of regular events, which are coming with extra payloads from the server.
} else {
// Some events are not wrapped inside a plain JSON Object, but inside a JSON array.
JSONArray messageEventArray = new JSONArray(messageEvent.getData());

if (event.equals("assembly_result_finished")) {
// Unpack the two expected fields in the JSON array.
String stepName = messageEventArray.getString(0);
JSONObject messageEventJson = messageEventArray.getJSONObject(1);
getAssemblyListener().onAssemblyResultFinished(stepName, messageEventJson);
}

if (event.equals("assembly_error")) {
// Deliver error information to the user.
JSONObject messageEventJson = messageEventArray.getJSONObject(0);
String errorString = messageEventJson.getString("error") + "\n" + messageEventJson.toString(2);
getAssemblyListener().onError(new RequestException(errorString));
}

if (event.equals("assembly_upload_finished")) {
JSONObject messageEventJson = messageEventArray.getJSONObject(0);
getAssemblyListener().onFileUploadFinished(messageEventJson.getString("name"), messageEventJson);
}
}
}
}
};

Emitter.Listener onError = new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.disconnect();
getAssemblyListener().onError((Exception) args[0]);
public void onComment(String comment) {
}
};

Emitter.Listener onMetadataExtracted = args -> {
getAssemblyListener().onMetadataExtracted();
};
@Override
public void onError(Throwable t) {
}

Emitter.Listener onAssemblyResultFinished = args -> {
String stepName = (String) args[0];
JSONObject result = (JSONObject) args[1];
getAssemblyListener().onAssemblyResultFinished(stepName, result);
};
this.backgroundEventSource = new BackgroundEventSource.Builder(myHandler, new EventSource.Builder(
ConnectStrategy.http(sseUpdateStreamUrl)
.header("Accept", "text/event-stream")
.connectTimeout(5, TimeUnit.SECONDS)
)
).build();

//Hands over Filename of recently uploaded file to the callback in the AssemblyListener
Emitter.Listener onFileUploadFinished = args -> {
String name = ((JSONObject) args[0]).getString("name");
JSONObject uploadInformation = (JSONObject) args[0];
getAssemblyListener().onFileUploadFinished(name, uploadInformation);
};
this.backgroundEventSource.start();

// Triggers callback in the {@link Assembly#assemblyListener} if the Assembly instructions have been uploaded.
Emitter.Listener onAssemblyUploadFinished = args -> {
getAssemblyListener().onAssemblyUploadFinished();
};
}

socket
.on(Socket.EVENT_CONNECT, onConnect)
.on("assembly_finished", onFinished)
.on("assembly_uploading_finished", onAssemblyUploadFinished)
.on("assembly_upload_finished", onFileUploadFinished)
.on("assembly_upload_meta_data_extracted", onMetadataExtracted)
.on("assembly_result_finished", onAssemblyResultFinished)
.on("assembly_error", onFinished)
.on(Socket.EVENT_CONNECT_ERROR, onError);
socket.connect();
/**
* Returns the Event Source, which handles the Server Sent Event driven assembly status updates.
* @return BackgroundEventSource, which handles the assembly Status via SSE.
*/
public BackgroundEventSource getBackgroundEventSource() {
return backgroundEventSource;
}

/**
Expand Down Expand Up @@ -733,8 +737,8 @@ protected void abortUploads(Exception e) {
executor.shutdownNow();
}
runnableAssemblyListener.onError(e);
if (socket != null) {
socket.disconnect();
if (backgroundEventSource != null) {
backgroundEventSource.close();
}
}

Expand Down
10 changes: 4 additions & 6 deletions src/main/java/com/transloadit/sdk/async/AsyncAssembly.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ synchronized void setState(State state) {
* Returns always false to indicate to the {@link Assembly#save} method that it should never wait for the Assembly
* to be complete by observing the HTTP - Response.
* @return false
* @see Assembly#shouldWaitWithoutSocket()
* @see Assembly#shouldWaitWithoutSSE()
* @see Assembly#save(boolean)
*/
protected boolean shouldWaitWithoutSocket() {
protected boolean shouldWaitWithoutSSE() {
return false;
}

Expand Down Expand Up @@ -279,12 +279,10 @@ public void run() {

if (state == State.UPLOAD_COMPLETE) {
getUploadListener().onUploadFinished();
if (!shouldWaitWithSocket() && shouldWaitForCompletion && (getListener() != null)) {
if (!shouldWaitWithSSE() && shouldWaitForCompletion && (getListener() != null)) {
try {
getListener().onAssemblyFinished(watchStatus());
} catch (LocalOperationException e) {
getListener().onAssemblyStatusUpdateFailed(e);
} catch (RequestException e) {
} catch (LocalOperationException | RequestException e) {
getListener().onAssemblyStatusUpdateFailed(e);
} finally {
executor.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ public String getSslUrl() {
return this.json().getString("assembly_ssl_url");
}

/**
* Retruns the upstream url needed for Server Sent Events.
* @return upstream url
*/
public String getUpdateStreamUrl() {
return this.json().getString("update_stream_url");
}

/**
* Returns the URL of the websocket used in the Assembly execution.
* @return assembly websocket url
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/com/transloadit/sdk/AssemblyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void saveWithInputStream() throws Exception {
* setting {@link Assembly#shouldWaitForCompletion} = {@code true}.
* @throws Exception if communication with the server goes wrong, if building the request goes wrong or
* if Test resources "assembly_executing.json" or "resumable_assembly_complete.json" are missing.
* @see Assembly#shouldWaitWithoutSocket()
* @see Assembly#shouldWaitWithoutSSE()
*/
@Test
public void saveTillComplete() throws Exception {
Expand Down Expand Up @@ -303,7 +303,7 @@ public void onAssemblyResultFinished(String stepName, JSONObject result) {
* This Test verifies the functionality of {@link Assembly#save(boolean)}. It is identical to
* {@link AssemblyTest#saveWithTus()}, except it waits until the {@link Assembly} execution is finished.
* This is determined by by observing the {@link AssemblyResponse} status.
* @see Assembly#shouldWaitWithoutSocket()
* @see Assembly#shouldWaitWithoutSSE()
* @throws Exception if communication with the server goes wrong, if building the request goes wrong or
* if Test resources "resumable_assembly.json" or "resumable_assembly_complete.json" are missing.
*/
Expand Down
Loading

0 comments on commit 217ec4d

Please sign in to comment.