Skip to content

Commit

Permalink
Merge pull request #157 from suhothayan/master
Browse files Browse the repository at this point in the history
Make http-call sink to have default on.error behavior & refactor code
  • Loading branch information
mohanvive committed Nov 7, 2019
2 parents e85e086 + 6f18fa1 commit 8b409ed
Show file tree
Hide file tree
Showing 20 changed files with 548 additions and 603 deletions.
2 changes: 1 addition & 1 deletion component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<artifactId>siddhi-io-http-parent</artifactId>
<groupId>io.siddhi.extension.io.http</groupId>
<version>2.1.3-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.siddhi.extension.io.http.sink;

import org.wso2.transport.http.netty.contract.HttpClientConnector;
import org.wso2.transport.http.netty.contract.HttpResponseFuture;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;

import java.util.Map;

/**
* Class to have the client connector and related properties
*/
public class ClientConnector {

private String publisherURL;
private Map<String, String> httpURLProperties;
private HttpClientConnector httpClientConnector;

public ClientConnector(String publisherURL, Map<String, String> httpURLProperties,
HttpClientConnector httpClientConnector) {
this.publisherURL = publisherURL;
this.httpURLProperties = httpURLProperties;
this.httpClientConnector = httpClientConnector;
}

public String getPublisherURL() {
return publisherURL;
}

public HttpResponseFuture send(HttpCarbonMessage cMessage) {
return httpClientConnector.send(cMessage);
}

public Map<String, String> getHttpURLProperties() {
return httpURLProperties;
}

}

Large diffs are not rendered by default.

408 changes: 186 additions & 222 deletions component/src/main/java/io/siddhi/extension/io/http/sink/HttpSink.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@

import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.http.sink.exception.HttpSinkAdaptorRuntimeException;
import io.siddhi.extension.io.http.util.HttpConstants;
import io.siddhi.extension.io.http.util.TrpPropertyTypes;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.Header;
import org.wso2.transport.http.netty.contract.Constants;
import org.wso2.transport.http.netty.contract.config.ProxyServerConfiguration;
import org.wso2.transport.http.netty.contract.config.SenderConfiguration;
import org.wso2.transport.http.netty.contractimpl.DefaultHttpWsConnectorFactory;
import org.wso2.transport.http.netty.contractimpl.sender.channel.pool.PoolConfiguration;

import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -42,6 +47,7 @@
import static io.siddhi.extension.io.http.util.HttpConstants.CLIENT_BOOTSTRAP_SENDBUFFERSIZE;
import static io.siddhi.extension.io.http.util.HttpConstants.CLIENT_BOOTSTRAP_SOCKET_REUSE;
import static io.siddhi.extension.io.http.util.HttpConstants.CLIENT_BOOTSTRAP_SOCKET_TIMEOUT;
import static io.siddhi.extension.io.http.util.HttpConstants.EMPTY_STRING;
import static io.siddhi.extension.io.http.util.HttpConstants.HTTP_TRACE_LOG_ENABLED;
import static io.siddhi.extension.io.http.util.HttpConstants.LOG_TRACE_ENABLE_DEFAULT_VALUE;
import static io.siddhi.extension.io.http.util.HttpConstants.PARAMETER_SEPARATOR;
Expand Down Expand Up @@ -283,4 +289,84 @@ private static Map<String, TrpPropertyTypes> trpPropertyTypeMap() {
trpPropertyTypes.put(CLIENT_BOOTSTRAP_SOCKET_TIMEOUT, TrpPropertyTypes.INTEGER);
return trpPropertyTypes;
}

public static PoolConfiguration createPoolConfigurations(OptionHolder optionHolder) {
int maxIdlePerPool = Integer.parseInt(optionHolder.validateAndGetStaticValue(
HttpConstants.MAX_IDLE_CONNECTIONS_PER_POOL, HttpConstants.DEFAULT_MAX_IDLE_CONNECTIONS_PER_POOL));
int minIdlePerPool = Integer.parseInt(optionHolder.validateAndGetStaticValue(
HttpConstants.MIN_IDLE_CONNECTIONS_PER_POOL, HttpConstants.DEFAULT_MIN_IDLE_CONNECTIONS_PER_POOL));
int maxActivePerPool = Integer.parseInt(optionHolder.validateAndGetStaticValue(
HttpConstants.MAX_ACTIVE_CONNECTIONS_PER_POOL, HttpConstants.DEFAULT_MAX_ACTIVE_CONNECTIONS_PER_POOL));
boolean testOnBorrow = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(HttpConstants.TEST_ON_BORROW,
HttpConstants.DEFAULT_TEST_ON_BORROW));
boolean testWhileIdle = Boolean.parseBoolean(
optionHolder.validateAndGetStaticValue(HttpConstants.TEST_WHILE_IDLE,
HttpConstants.DEFAULT_TEST_WHILE_IDLE));
long timeBetweenEvictionRuns = Long.parseLong(optionHolder.validateAndGetStaticValue(
HttpConstants.TIME_BETWEEN_EVICTION_RUNS, HttpConstants.DEFAULT_TIME_BETWEEN_EVICTION_RUNS));
long minEvictableIdleTime = Long.parseLong(optionHolder.validateAndGetStaticValue(
HttpConstants.MIN_EVICTABLE_IDLE_TIME, HttpConstants.DEFAULT_MIN_EVICTABLE_IDLE_TIME));
byte exhaustedAction = (byte) Integer.parseInt(optionHolder.validateAndGetStaticValue(
HttpConstants.EXHAUSTED_ACTION, HttpConstants.DEFAULT_EXHAUSTED_ACTION));
int maxWaitTime = Integer.parseInt(optionHolder.validateAndGetStaticValue(
HttpConstants.MAX_WAIT_TIME, HttpConstants.DEFAULT_MAX_WAIT_TIME));
PoolConfiguration connectionPoolConfiguration = new PoolConfiguration();
connectionPoolConfiguration.setMaxActivePerPool(maxActivePerPool);
connectionPoolConfiguration.setMinIdlePerPool(minIdlePerPool);
connectionPoolConfiguration.setMaxIdlePerPool(maxIdlePerPool);
connectionPoolConfiguration.setTestOnBorrow(testOnBorrow);
connectionPoolConfiguration.setTestWhileIdle(testWhileIdle);
connectionPoolConfiguration.setTimeBetweenEvictionRuns(timeBetweenEvictionRuns);
connectionPoolConfiguration.setMinEvictableIdleTime(minEvictableIdleTime);
connectionPoolConfiguration.setExhaustedAction(exhaustedAction);
connectionPoolConfiguration.setMaxWaitTime(maxWaitTime);
return connectionPoolConfiguration;
}

public static DefaultHttpWsConnectorFactory createConnectorFactory(ConfigReader configReader) {
//read trp globe configuration
String bootstrapWorker = configReader
.readConfig(HttpConstants.CLIENT_BOOTSTRAP_WORKER_GROUP_SIZE, EMPTY_STRING);
String bootstrapBoss = configReader.readConfig(HttpConstants.CLIENT_BOOTSTRAP_BOSS_GROUP_SIZE, EMPTY_STRING);
String bootstrapClient = configReader.readConfig(HttpConstants.CLIENT_BOOTSTRAP_CLIENT_GROUP_SIZE,
EMPTY_STRING);
//if bootstrap configurations are given then pass it if not let take default value of transport
if (!EMPTY_STRING.equals(bootstrapBoss) && !EMPTY_STRING.equals(bootstrapWorker)) {
if (!EMPTY_STRING.equals(bootstrapClient)) {
return new DefaultHttpWsConnectorFactory(Integer.parseInt(bootstrapBoss),
Integer.parseInt(bootstrapWorker), Integer.parseInt(bootstrapClient));
} else {
return new DefaultHttpWsConnectorFactory(Integer.parseInt(bootstrapBoss),
Integer.parseInt(bootstrapWorker), Integer.parseInt(bootstrapWorker));
}
}
return new DefaultHttpWsConnectorFactory();
}

public static ProxyServerConfiguration createProxyServerConfiguration(OptionHolder optionHolder, String streamID,
String appName) {
String proxyHost = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_HOST, EMPTY_STRING);
String proxyPort = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_PORT, EMPTY_STRING);
String proxyUsername = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_USERNAME,
EMPTY_STRING);
String proxyPassword = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_PASSWORD,
EMPTY_STRING);
//if proxy username and password not equal to null then create proxy configurations
if (!EMPTY_STRING.equals(proxyHost) && !EMPTY_STRING.equals(proxyPort)) {
try {
ProxyServerConfiguration proxyServerConfiguration = new ProxyServerConfiguration(proxyHost, Integer
.parseInt(proxyPort));
if (!EMPTY_STRING.equals(proxyPassword) && !EMPTY_STRING.equals
(proxyUsername)) {
proxyServerConfiguration.setProxyPassword(proxyPassword);
proxyServerConfiguration.setProxyUsername(proxyUsername);
}
} catch (UnknownHostException e) {
log.error("Proxy url of sink defined in '" + streamID + "' of Siddhi App '" +
appName + "' is invalid.", e);
}
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void onMessage(HttpCarbonMessage carbonMessage) {

@Override
public void onError(Throwable throwable) {
log.error("Error occurred during processing response for the request sent by http-request-sink with " +
log.error("Error occurred during processing response for the request sent by http-call sink with " +
"'sink.id' = " + sinkId + " in Siddhi app " + siddhiAppName + ".", throwable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,12 @@ public void connect(ConnectionCallback connectionCallback, State state) throws C
new HttpCallResponseConnectorListener(Integer.parseInt(workerThread), sourceEventListener,
shouldAllowStreamingResponses, sinkId, requestedTransportPropertyNames, siddhiAppName);
this.httpConnectorRegistry.registerSourceListener(httpCallResponseSourceListener, sinkId, httpStatusCode);

HTTPSourceRegistry.registerCallResponseSource(sinkId, httpStatusCode, this);
}

@Override
public void disconnect() {
this.httpConnectorRegistry.unregisterSourceListener(sinkId, httpStatusCode, siddhiAppName);
httpConnectorRegistry.unregisterSourceListener(sinkId, httpStatusCode, siddhiAppName);
HTTPSourceRegistry.removeCallResponseSource(sinkId, httpStatusCode);
}

Expand All @@ -210,4 +209,10 @@ public void resume() {
public HttpCallResponseConnectorListener getConnectorListener() {
return httpCallResponseSourceListener;
}

public boolean matches(String thatSinkId, String thatStatusCode) {
return (sinkId != null ? sinkId.equals(thatSinkId) : thatSinkId == null) &&
(httpStatusCode != null ?
thatStatusCode != null && thatStatusCode.matches(httpStatusCode) : thatStatusCode == null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@

package io.siddhi.extension.io.http.source;

import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.extension.io.http.sink.HttpSink;
import io.siddhi.extension.io.http.util.HTTPSourceRegistry;
import io.siddhi.extension.io.http.util.HttpConstants;
import io.siddhi.extension.io.http.util.ResponseSourceId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.transport.http.netty.contract.HttpConnectorListener;
import org.wso2.transport.http.netty.contract.exceptions.ClientConnectorException;
import org.wso2.transport.http.netty.contract.exceptions.ServerConnectorException;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;

import java.io.IOException;
Expand All @@ -40,28 +44,30 @@
public class HttpResponseMessageListener implements HttpConnectorListener {
private static final Logger log = LoggerFactory.getLogger(HttpResponseMessageListener.class);

private HttpCallResponseConnectorListener responseConnectorListener;
private Map<String, Object> trpProperties;
private boolean isDownloadEnabled;
private String sinkId;
private HttpCarbonMessage carbonMessages;
private CountDownLatch latch;
private int tryCount;
private String authType;
private boolean isBlockingIO;
private HttpSink sink;
private final Object payload;
private final DynamicOptions dynamicOptions;
private String siddhiAppName;
private String publisherURL;

public HttpResponseMessageListener(HttpSink sink, Map<String, Object> trpProperties, String sinkId,
boolean isDownloadEnabled, CountDownLatch latch, int tryCount, String authType,
boolean isBlockingIO) {
boolean isDownloadEnabled, CountDownLatch latch,
Object payload, DynamicOptions dynamicOptions,
String siddhiAppName, String publisherURL) {
this.trpProperties = trpProperties;
this.isDownloadEnabled = isDownloadEnabled;
this.sinkId = sinkId;
this.latch = latch;
this.tryCount = tryCount;
this.authType = authType;
this.isBlockingIO = isBlockingIO;
this.sink = sink;
this.payload = payload;
this.dynamicOptions = dynamicOptions;
this.siddhiAppName = siddhiAppName;
this.publisherURL = publisherURL;
}

@Override
Expand All @@ -71,67 +77,52 @@ public void onMessage(HttpCarbonMessage carbonMessage) {
});
carbonMessage.setProperty(HttpConstants.IS_DOWNLOADABLE_CONTENT, isDownloadEnabled);
this.carbonMessages = carbonMessage;
String statusCode = Integer.toString(carbonMessage.getNettyHttpResponse().status().code());
if (carbonMessage.getNettyHttpResponse().status().code() == (HttpConstants.SUCCESS_CODE) ||
HttpConstants.MAXIMUM_TRY_COUNT == tryCount) {
HttpCallResponseSource responseSource = findAndGetResponseSource(statusCode);
if (responseSource != null) {
responseConnectorListener = responseSource.getConnectorListener();
responseConnectorListener.onMessage(carbonMessage);
} else {
log.error("No source of type 'http-response' that matches with the status code '" + statusCode +
"' has been defined. Hence dropping the response message.");
}
}
if (isBlockingIO || HttpConstants.OAUTH.equals(authType)) {
if (latch != null) {
latch.countDown();
}
String statusCode = Integer.toString(carbonMessage.getNettyHttpResponse().status().code());
HttpCallResponseSource responseSource = HTTPSourceRegistry.findAndGetResponseSource(sinkId, statusCode);
if (responseSource != null) {
HttpCallResponseConnectorListener responseConnectorListener = responseSource.getConnectorListener();
responseConnectorListener.onMessage(carbonMessage);
} else {
log.error("No source of type 'http-call-response' with sink.id '" + sinkId +
"' for the status code '" + statusCode +
"' defined. Hence dropping the response message.");
}

}

@Override
public void onError(Throwable throwable) {
if (throwable instanceof IOException) {
sink.initClientConnector(null);
sink.createClientConnector(null);
}

HttpCallResponseSource source = HTTPSourceRegistry.getCallResponseSource(sinkId,
HttpConstants.DEFAULT_HTTP_ERROR_CODE);
if (source != null) {
responseConnectorListener = source.getConnectorListener();
} else {
log.error("No source of type 'http-response' for status code '500' has been " +
"defined. Hence dropping the response message.");
if (latch != null) {
latch.countDown();
}
if (responseConnectorListener != null) {
responseConnectorListener.onError(throwable);
if (throwable instanceof ClientConnectorException || throwable instanceof ServerConnectorException) {
sink.onError(payload, dynamicOptions, new ConnectionUnavailableException(
"HTTP call sink on stream '" + sink.getStreamDefinition().getId() +
"' of Siddhi App '" + siddhiAppName +
"' failed to publish events to endpoint '" + publisherURL + "'. " +
throwable.getMessage(), throwable));
} else {
if (log.isDebugEnabled()) {
log.debug("No connector listener for the response source with sink id '" + sinkId + "' and http " +
"status code 500 found.");
}
sink.onError(payload, dynamicOptions,
new SiddhiAppRuntimeException("HTTP call sink on stream '" +
sink.getStreamDefinition().getId() +
"' of Siddhi App '" + siddhiAppName +
"' failed to publish events to endpoint '" + publisherURL + "'. " +
throwable.getMessage(), throwable));
}
}

/**
* Disconnect pool execution.
*/
void disconnect() {
responseConnectorListener.disconnect();
}

private HttpCallResponseSource findAndGetResponseSource(String statusCode) {
ResponseSourceId id = new ResponseSourceId(sinkId, statusCode);
for (Map.Entry entry : HTTPSourceRegistry.getCallResponseSourceRegistry().entrySet()) {
ResponseSourceId key = (ResponseSourceId) entry.getKey();
if (id.equals(key)) {
return (HttpCallResponseSource) entry.getValue();
}
public int getHttpResponseStatusCode() {
if (carbonMessages != null) {
return carbonMessages.getNettyHttpResponse().status().code();
} else {
return HttpConstants.CLIENT_REQUEST_TIMEOUT;
}
return null;
}

public HttpCarbonMessage getHttpResponseMessage() {
return carbonMessages;
}

}
Loading

0 comments on commit 8b409ed

Please sign in to comment.