Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump http-tansport, msf4j and siddhi #19

Merged
merged 5 commits into from
Mar 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>siddhi-io-websocket-parent</artifactId>
<groupId>org.wso2.extension.siddhi.io.websocket</groupId>
<version>1.0.12-SNAPSHOT</version>
<version>1.1.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,18 @@
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
import org.wso2.siddhi.core.exception.SiddhiAppRuntimeException;
import org.wso2.siddhi.core.stream.output.sink.Sink;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.DynamicOptions;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.transport.http.netty.contract.HttpWsConnectorFactory;
import org.wso2.transport.http.netty.contract.websocket.HandshakeFuture;
import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeFuture;
import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnector;
import org.wso2.transport.http.netty.contract.websocket.WsClientConnectorConfig;
import org.wso2.transport.http.netty.contract.websocket.WebSocketClientConnectorConfig;
import org.wso2.transport.http.netty.contract.websocket.WebSocketConnection;
import org.wso2.transport.http.netty.contractimpl.DefaultHttpWsConnectorFactory;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
Expand All @@ -52,7 +51,6 @@
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import javax.websocket.Session;

/**
* {@code WebsocketSink } Publishing the siddhi events to the WebSocket server.
Expand Down Expand Up @@ -92,6 +90,24 @@
type = DataType.INT,
optional = true,
defaultValue = "-1"
),
@Parameter(
name = "truststore.path",
description = "The file path to the location of the truststore. If a custom truststore is" +
" not specified, then the system uses the default truststore file - wso2carbon.jks " +
"in the `${carbon.home}/resources/security` directory.",
type = {DataType.STRING},
optional = true,
defaultValue = "${carbon.home}/resources/security/client-truststore.jks"
),
@Parameter(
name = "truststore.password",
description = "The password for the truststore. A custom password can be specified " +
"if required. If no custom password is specified, then the system uses " +
"`wso2carbon` as the default password.",
type = {DataType.STRING},
optional = true,
defaultValue = "wso2carbon"
)
},
examples = {
Expand All @@ -117,8 +133,11 @@ public class WebSocketSink extends Sink {
private String idleTimeoutString;
private int idleTimeout;
private WebSocketClientConnectorListener connectorListener;
private Session session = null;
private WebSocketConnection webSocketConnection = null;
private Semaphore semaphore = new Semaphore(0);
private boolean sslEnabled = false;
private String tlsstruststorePath;
private String tlsstruststorePass;

@Override
public Class[] getSupportedInputEventClasses() {
Expand Down Expand Up @@ -146,32 +165,43 @@ protected void init(StreamDefinition streamDefinition, OptionHolder optionHolder
idleTimeout = Integer.parseInt(idleTimeoutString);
if (idleTimeout < -1) {
throw new SiddhiAppCreationException("The idle timeout defined in '" + streamDefinition +
"' should be greater than 0.");
"' should be greater than 0.");
}
} catch (NumberFormatException e) {
throw new SiddhiAppCreationException("The idle timeout defined in '" + streamDefinition +
"' should be an Integer.");
"' should be an Integer.");
}
}
connectorListener = new WebSocketClientConnectorListener();
try {
String scheme = (new URI(url)).getScheme();
if (!Objects.equals("ws", scheme) && !Objects.equals("wss", scheme)) {
throw new SiddhiAppCreationException("Invalid scheme in " + WebSocketProperties.URL + " = " +
url + ". The scheme of the " + WebSocketProperties.URL +
" for the websocket server should be either `ws` or "
+ "`wss`.");
url + ". The scheme of the " + WebSocketProperties.URL +
" for the websocket server should be either `ws` or "
+ "`wss`.");
}
if (Objects.equals("wss", scheme)) {
this.sslEnabled = true;
this.tlsstruststorePath = optionHolder.validateAndGetStaticValue(
WebSocketProperties.TLS_TRUSTSTORE_PATH, configReader.readConfig
(WebSocketProperties.TLS_TRUSTSTORE_PATH,
WebSocketProperties
.DEFAULT_TRUSTSTORE_FILE_PATH));
this.tlsstruststorePass = optionHolder.validateAndGetStaticValue(
WebSocketProperties.TLS_TRUSTSTORE_PASS, configReader.readConfig(
WebSocketProperties.TLS_TRUSTSTORE_PASS, WebSocketProperties.DEFAULT_TRUSTSTORE_PASS));
}
} catch (URISyntaxException e) {
throw new SiddhiAppCreationException("There is an syntax error in the '" + url + "' of the websocket "
+ "server.", e);
+ "server.", e);
}
}

@Override
public void connect() throws ConnectionUnavailableException {
HttpWsConnectorFactory httpConnectorFactory = new DefaultHttpWsConnectorFactory();
WsClientConnectorConfig configuration = new WsClientConnectorConfig(url);
WebSocketClientConnectorConfig configuration = new WebSocketClientConnectorConfig(url);
niveathika marked this conversation as resolved.
Show resolved Hide resolved
if (subProtocol != null) {
String[] subProtocol1 = WebSocketUtil.getSubProtocol(subProtocol);
configuration.setSubProtocols(subProtocol1);
Expand All @@ -183,48 +213,45 @@ public void connect() throws ConnectionUnavailableException {
if (idleTimeoutString != null) {
configuration.setIdleTimeoutInMillis(idleTimeout);
}
if (sslEnabled) {
configuration.setTrustStoreFile(this.tlsstruststorePath);
configuration.setTrustStorePass(this.tlsstruststorePass);
}
configuration.setAutoRead(true);
WebSocketClientConnector clientConnector = httpConnectorFactory.createWsClientConnector(configuration);
HandshakeFuture handshakeFuture = clientConnector.connect(connectorListener);
ClientHandshakeFuture handshakeFuture = clientConnector.connect();
handshakeFuture.setWebSocketConnectorListener(connectorListener);
WebSocketSinkHandshakeListener handshakeListener = new WebSocketSinkHandshakeListener
(streamDefinition, semaphore);
try {
handshakeFuture.setHandshakeListener(handshakeListener);
handshakeFuture.setClientHandshakeListener(handshakeListener);
semaphore.acquire();
} catch (InterruptedException e) {
log.error("Error occurs while connecting with the server defined in " + streamDefinition, e);
}
AtomicReference<Session> sessionAtomicReference = handshakeListener.getSessionAtomicReference();
session = sessionAtomicReference.get();
AtomicReference<WebSocketConnection> sessionAtomicReference =
handshakeListener.getWebSocketConnectionAtomicReference();
webSocketConnection = sessionAtomicReference.get();
}

@Override
public void publish(Object payload, DynamicOptions dynamicOptions) throws ConnectionUnavailableException {
try {
if (session != null) {
if (payload instanceof ByteBuffer) {
byte[] byteMessage = ((ByteBuffer) payload).array();
ByteBuffer binaryMessage = ByteBuffer.wrap(byteMessage);
session.getBasicRemote().sendBinary(binaryMessage);
} else {
session.getBasicRemote().sendText(payload.toString());
}
if (webSocketConnection != null) {
if (payload instanceof ByteBuffer) {
byte[] byteMessage = ((ByteBuffer) payload).array();
ByteBuffer binaryMessage = ByteBuffer.wrap(byteMessage);
webSocketConnection.pushBinary(binaryMessage);
niveathika marked this conversation as resolved.
Show resolved Hide resolved
} else {
webSocketConnection.pushText(payload.toString());
}
} catch (IOException e) {
throw new SiddhiAppRuntimeException(
"Error while sending events to the '" + WebSocketProperties.URL + "' of the WebSocket "
+ "server defined in '" + streamDefinition + "'.", e);
}
}


@Override
public void disconnect() {
if (session != null) {
try {
session.close();
} catch (IOException e) {
log.error("Error while closing the WebSocket connection." + e);
}
if (webSocketConnection != null) {
webSocketConnection.terminateConnection();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@

import org.wso2.siddhi.core.exception.SiddhiAppRuntimeException;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.transport.http.netty.contract.websocket.HandshakeListener;
import org.wso2.transport.http.netty.contract.websocket.ClientHandshakeListener;
import org.wso2.transport.http.netty.contract.websocket.WebSocketConnection;
import org.wso2.transport.http.netty.message.HttpCarbonResponse;

import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import javax.websocket.Session;

/**
* Future listener for WebSocket handshake.
*/

public class WebSocketSinkHandshakeListener implements HandshakeListener {
public class WebSocketSinkHandshakeListener implements ClientHandshakeListener {
private StreamDefinition streamDefinition;
private AtomicReference<Session> sessionAtomicReference = new AtomicReference<>();
private AtomicReference<WebSocketConnection> webSocketConnectionAtomicReference = new AtomicReference<>();
private Semaphore semaphore;

public WebSocketSinkHandshakeListener(StreamDefinition streamDefinition, Semaphore semaphore) {
Expand All @@ -43,18 +43,19 @@ public WebSocketSinkHandshakeListener(StreamDefinition streamDefinition, Semapho
}

@Override
public void onSuccess(WebSocketConnection webSocketConnection) {
sessionAtomicReference.set(webSocketConnection.getSession());
public void onSuccess(WebSocketConnection webSocketConnection, HttpCarbonResponse response) {
webSocketConnectionAtomicReference.set(webSocketConnection);
semaphore.release();
}

@Override public void onError(Throwable throwable) {
@Override
public void onError(Throwable t, HttpCarbonResponse response) {
semaphore.release();
throw new SiddhiAppRuntimeException("Error while connecting with the websocket server defined in '"
+ streamDefinition + "'.", throwable);
+ streamDefinition + "'.", t);
}

public AtomicReference<Session> getSessionAtomicReference() {
return sessionAtomicReference;
public AtomicReference<WebSocketConnection> getWebSocketConnectionAtomicReference() {
return webSocketConnectionAtomicReference;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
package org.wso2.extension.siddhi.io.websocket.sink.websocketserver;

import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.transport.http.netty.config.ListenerConfiguration;
import org.wso2.transport.http.netty.contract.HttpWsConnectorFactory;
import org.wso2.transport.http.netty.contract.ServerConnector;
import org.wso2.transport.http.netty.contract.ServerConnectorFuture;
import org.wso2.transport.http.netty.contract.config.ListenerConfiguration;
import org.wso2.transport.http.netty.contract.config.ServerBootstrapConfiguration;
import org.wso2.transport.http.netty.contractimpl.DefaultHttpWsConnectorFactory;
import org.wso2.transport.http.netty.listener.ServerBootstrapConfiguration;

/**
* {@code WebSocketServer } Handle the WebSocket server.
Expand Down Expand Up @@ -79,7 +79,7 @@ void start() throws InterruptedException {
serverConnector = httpConnectorFactory.createServerConnector(new ServerBootstrapConfiguration(null),
listenerConfiguration);
ServerConnectorFuture connectorFuture = serverConnector.start();
connectorFuture.setWSConnectorListener(serverSinkConnectorListener);
connectorFuture.setWebSocketConnectorListener(serverSinkConnectorListener);
connectorFuture.sync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,25 @@
package org.wso2.extension.siddhi.io.websocket.sink.websocketserver;

import org.wso2.siddhi.core.exception.SiddhiAppRuntimeException;
import org.wso2.transport.http.netty.contract.websocket.HandshakeListener;
import org.wso2.transport.http.netty.contract.websocket.ServerHandshakeListener;
import org.wso2.transport.http.netty.contract.websocket.WebSocketConnection;

import java.util.List;
import javax.websocket.Session;

/**
* Future listener for WebSocket handshake.
*/

public class WebSocketServerHandshakeListener implements HandshakeListener {
private List<Session> sessionList;
public class WebSocketServerHandshakeListener implements ServerHandshakeListener {
private List<WebSocketConnection> webSocketConnectionList;

WebSocketServerHandshakeListener(List<Session> sessionList) {
this.sessionList = sessionList;
WebSocketServerHandshakeListener(List<WebSocketConnection> webSocketConnectionList) {
this.webSocketConnectionList = webSocketConnectionList;
}

@Override
public void onSuccess(WebSocketConnection webSocketConnection) {
sessionList.add(webSocketConnection.getSession());
webSocketConnectionList.add(webSocketConnection);
webSocketConnection.startReadingFrames();
}

Expand Down
Loading