Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make http-call sink to have default on.error behavior & refactor code #157

Merged
merged 2 commits into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<artifactId>siddhi-io-http-parent</artifactId>
<groupId>io.siddhi.extension.io.http</groupId>
<version>2.1.3-SNAPSHOT</version>
<version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.siddhi.extension.io.http.sink;

import org.wso2.transport.http.netty.contract.HttpClientConnector;
import org.wso2.transport.http.netty.contract.HttpResponseFuture;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;

import java.util.Map;

/**
* Class to have the client connector and related properties
*/
public class ClientConnector {

private String publisherURL;
private Map<String, String> httpURLProperties;
private HttpClientConnector httpClientConnector;

public ClientConnector(String publisherURL, Map<String, String> httpURLProperties,
HttpClientConnector httpClientConnector) {
this.publisherURL = publisherURL;
this.httpURLProperties = httpURLProperties;
this.httpClientConnector = httpClientConnector;
}

public String getPublisherURL() {
return publisherURL;
}

public HttpResponseFuture send(HttpCarbonMessage cMessage) {
return httpClientConnector.send(cMessage);
}

public Map<String, String> getHttpURLProperties() {
return httpURLProperties;
}

}

Large diffs are not rendered by default.

408 changes: 186 additions & 222 deletions component/src/main/java/io/siddhi/extension/io/http/sink/HttpSink.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@

import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.http.sink.exception.HttpSinkAdaptorRuntimeException;
import io.siddhi.extension.io.http.util.HttpConstants;
import io.siddhi.extension.io.http.util.TrpPropertyTypes;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.Header;
import org.wso2.transport.http.netty.contract.Constants;
import org.wso2.transport.http.netty.contract.config.ProxyServerConfiguration;
import org.wso2.transport.http.netty.contract.config.SenderConfiguration;
import org.wso2.transport.http.netty.contractimpl.DefaultHttpWsConnectorFactory;
import org.wso2.transport.http.netty.contractimpl.sender.channel.pool.PoolConfiguration;

import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -42,6 +47,7 @@
import static io.siddhi.extension.io.http.util.HttpConstants.CLIENT_BOOTSTRAP_SENDBUFFERSIZE;
import static io.siddhi.extension.io.http.util.HttpConstants.CLIENT_BOOTSTRAP_SOCKET_REUSE;
import static io.siddhi.extension.io.http.util.HttpConstants.CLIENT_BOOTSTRAP_SOCKET_TIMEOUT;
import static io.siddhi.extension.io.http.util.HttpConstants.EMPTY_STRING;
import static io.siddhi.extension.io.http.util.HttpConstants.HTTP_TRACE_LOG_ENABLED;
import static io.siddhi.extension.io.http.util.HttpConstants.LOG_TRACE_ENABLE_DEFAULT_VALUE;
import static io.siddhi.extension.io.http.util.HttpConstants.PARAMETER_SEPARATOR;
Expand Down Expand Up @@ -283,4 +289,84 @@ private static Map<String, TrpPropertyTypes> trpPropertyTypeMap() {
trpPropertyTypes.put(CLIENT_BOOTSTRAP_SOCKET_TIMEOUT, TrpPropertyTypes.INTEGER);
return trpPropertyTypes;
}

public static PoolConfiguration createPoolConfigurations(OptionHolder optionHolder) {
int maxIdlePerPool = Integer.parseInt(optionHolder.validateAndGetStaticValue(
HttpConstants.MAX_IDLE_CONNECTIONS_PER_POOL, HttpConstants.DEFAULT_MAX_IDLE_CONNECTIONS_PER_POOL));
int minIdlePerPool = Integer.parseInt(optionHolder.validateAndGetStaticValue(
HttpConstants.MIN_IDLE_CONNECTIONS_PER_POOL, HttpConstants.DEFAULT_MIN_IDLE_CONNECTIONS_PER_POOL));
int maxActivePerPool = Integer.parseInt(optionHolder.validateAndGetStaticValue(
HttpConstants.MAX_ACTIVE_CONNECTIONS_PER_POOL, HttpConstants.DEFAULT_MAX_ACTIVE_CONNECTIONS_PER_POOL));
boolean testOnBorrow = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(HttpConstants.TEST_ON_BORROW,
HttpConstants.DEFAULT_TEST_ON_BORROW));
boolean testWhileIdle = Boolean.parseBoolean(
optionHolder.validateAndGetStaticValue(HttpConstants.TEST_WHILE_IDLE,
HttpConstants.DEFAULT_TEST_WHILE_IDLE));
long timeBetweenEvictionRuns = Long.parseLong(optionHolder.validateAndGetStaticValue(
HttpConstants.TIME_BETWEEN_EVICTION_RUNS, HttpConstants.DEFAULT_TIME_BETWEEN_EVICTION_RUNS));
long minEvictableIdleTime = Long.parseLong(optionHolder.validateAndGetStaticValue(
HttpConstants.MIN_EVICTABLE_IDLE_TIME, HttpConstants.DEFAULT_MIN_EVICTABLE_IDLE_TIME));
byte exhaustedAction = (byte) Integer.parseInt(optionHolder.validateAndGetStaticValue(
HttpConstants.EXHAUSTED_ACTION, HttpConstants.DEFAULT_EXHAUSTED_ACTION));
int maxWaitTime = Integer.parseInt(optionHolder.validateAndGetStaticValue(
HttpConstants.MAX_WAIT_TIME, HttpConstants.DEFAULT_MAX_WAIT_TIME));
PoolConfiguration connectionPoolConfiguration = new PoolConfiguration();
connectionPoolConfiguration.setMaxActivePerPool(maxActivePerPool);
connectionPoolConfiguration.setMinIdlePerPool(minIdlePerPool);
connectionPoolConfiguration.setMaxIdlePerPool(maxIdlePerPool);
connectionPoolConfiguration.setTestOnBorrow(testOnBorrow);
connectionPoolConfiguration.setTestWhileIdle(testWhileIdle);
connectionPoolConfiguration.setTimeBetweenEvictionRuns(timeBetweenEvictionRuns);
connectionPoolConfiguration.setMinEvictableIdleTime(minEvictableIdleTime);
connectionPoolConfiguration.setExhaustedAction(exhaustedAction);
connectionPoolConfiguration.setMaxWaitTime(maxWaitTime);
return connectionPoolConfiguration;
}

public static DefaultHttpWsConnectorFactory createConnectorFactory(ConfigReader configReader) {
//read trp globe configuration
String bootstrapWorker = configReader
.readConfig(HttpConstants.CLIENT_BOOTSTRAP_WORKER_GROUP_SIZE, EMPTY_STRING);
String bootstrapBoss = configReader.readConfig(HttpConstants.CLIENT_BOOTSTRAP_BOSS_GROUP_SIZE, EMPTY_STRING);
String bootstrapClient = configReader.readConfig(HttpConstants.CLIENT_BOOTSTRAP_CLIENT_GROUP_SIZE,
EMPTY_STRING);
//if bootstrap configurations are given then pass it if not let take default value of transport
if (!EMPTY_STRING.equals(bootstrapBoss) && !EMPTY_STRING.equals(bootstrapWorker)) {
if (!EMPTY_STRING.equals(bootstrapClient)) {
return new DefaultHttpWsConnectorFactory(Integer.parseInt(bootstrapBoss),
Integer.parseInt(bootstrapWorker), Integer.parseInt(bootstrapClient));
} else {
return new DefaultHttpWsConnectorFactory(Integer.parseInt(bootstrapBoss),
Integer.parseInt(bootstrapWorker), Integer.parseInt(bootstrapWorker));
}
}
return new DefaultHttpWsConnectorFactory();
}

public static ProxyServerConfiguration createProxyServerConfiguration(OptionHolder optionHolder, String streamID,
String appName) {
String proxyHost = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_HOST, EMPTY_STRING);
String proxyPort = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_PORT, EMPTY_STRING);
String proxyUsername = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_USERNAME,
EMPTY_STRING);
String proxyPassword = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_PASSWORD,
EMPTY_STRING);
//if proxy username and password not equal to null then create proxy configurations
if (!EMPTY_STRING.equals(proxyHost) && !EMPTY_STRING.equals(proxyPort)) {
try {
ProxyServerConfiguration proxyServerConfiguration = new ProxyServerConfiguration(proxyHost, Integer
.parseInt(proxyPort));
if (!EMPTY_STRING.equals(proxyPassword) && !EMPTY_STRING.equals
(proxyUsername)) {
proxyServerConfiguration.setProxyPassword(proxyPassword);
proxyServerConfiguration.setProxyUsername(proxyUsername);
}
} catch (UnknownHostException e) {
log.error("Proxy url of sink defined in '" + streamID + "' of Siddhi App '" +
appName + "' is invalid.", e);
}
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void onMessage(HttpCarbonMessage carbonMessage) {

@Override
public void onError(Throwable throwable) {
log.error("Error occurred during processing response for the request sent by http-request-sink with " +
log.error("Error occurred during processing response for the request sent by http-call sink with " +
"'sink.id' = " + sinkId + " in Siddhi app " + siddhiAppName + ".", throwable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,12 @@ public void connect(ConnectionCallback connectionCallback, State state) throws C
new HttpCallResponseConnectorListener(Integer.parseInt(workerThread), sourceEventListener,
shouldAllowStreamingResponses, sinkId, requestedTransportPropertyNames, siddhiAppName);
this.httpConnectorRegistry.registerSourceListener(httpCallResponseSourceListener, sinkId, httpStatusCode);

HTTPSourceRegistry.registerCallResponseSource(sinkId, httpStatusCode, this);
}

@Override
public void disconnect() {
this.httpConnectorRegistry.unregisterSourceListener(sinkId, httpStatusCode, siddhiAppName);
httpConnectorRegistry.unregisterSourceListener(sinkId, httpStatusCode, siddhiAppName);
HTTPSourceRegistry.removeCallResponseSource(sinkId, httpStatusCode);
}

Expand All @@ -210,4 +209,10 @@ public void resume() {
public HttpCallResponseConnectorListener getConnectorListener() {
return httpCallResponseSourceListener;
}

public boolean matches(String thatSinkId, String thatStatusCode) {
return (sinkId != null ? sinkId.equals(thatSinkId) : thatSinkId == null) &&
(httpStatusCode != null ?
thatStatusCode != null && thatStatusCode.matches(httpStatusCode) : thatStatusCode == null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@

package io.siddhi.extension.io.http.source;

import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.extension.io.http.sink.HttpSink;
import io.siddhi.extension.io.http.util.HTTPSourceRegistry;
import io.siddhi.extension.io.http.util.HttpConstants;
import io.siddhi.extension.io.http.util.ResponseSourceId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.transport.http.netty.contract.HttpConnectorListener;
import org.wso2.transport.http.netty.contract.exceptions.ClientConnectorException;
import org.wso2.transport.http.netty.contract.exceptions.ServerConnectorException;
import org.wso2.transport.http.netty.message.HttpCarbonMessage;

import java.io.IOException;
Expand All @@ -40,28 +44,30 @@
public class HttpResponseMessageListener implements HttpConnectorListener {
private static final Logger log = LoggerFactory.getLogger(HttpResponseMessageListener.class);

private HttpCallResponseConnectorListener responseConnectorListener;
private Map<String, Object> trpProperties;
private boolean isDownloadEnabled;
private String sinkId;
private HttpCarbonMessage carbonMessages;
private CountDownLatch latch;
private int tryCount;
private String authType;
private boolean isBlockingIO;
private HttpSink sink;
private final Object payload;
private final DynamicOptions dynamicOptions;
private String siddhiAppName;
private String publisherURL;

public HttpResponseMessageListener(HttpSink sink, Map<String, Object> trpProperties, String sinkId,
boolean isDownloadEnabled, CountDownLatch latch, int tryCount, String authType,
boolean isBlockingIO) {
boolean isDownloadEnabled, CountDownLatch latch,
Object payload, DynamicOptions dynamicOptions,
String siddhiAppName, String publisherURL) {
this.trpProperties = trpProperties;
this.isDownloadEnabled = isDownloadEnabled;
this.sinkId = sinkId;
this.latch = latch;
this.tryCount = tryCount;
this.authType = authType;
this.isBlockingIO = isBlockingIO;
this.sink = sink;
this.payload = payload;
this.dynamicOptions = dynamicOptions;
this.siddhiAppName = siddhiAppName;
this.publisherURL = publisherURL;
}

@Override
Expand All @@ -71,67 +77,52 @@ public void onMessage(HttpCarbonMessage carbonMessage) {
});
carbonMessage.setProperty(HttpConstants.IS_DOWNLOADABLE_CONTENT, isDownloadEnabled);
this.carbonMessages = carbonMessage;
String statusCode = Integer.toString(carbonMessage.getNettyHttpResponse().status().code());
if (carbonMessage.getNettyHttpResponse().status().code() == (HttpConstants.SUCCESS_CODE) ||
HttpConstants.MAXIMUM_TRY_COUNT == tryCount) {
HttpCallResponseSource responseSource = findAndGetResponseSource(statusCode);
if (responseSource != null) {
responseConnectorListener = responseSource.getConnectorListener();
responseConnectorListener.onMessage(carbonMessage);
} else {
log.error("No source of type 'http-response' that matches with the status code '" + statusCode +
"' has been defined. Hence dropping the response message.");
}
}
if (isBlockingIO || HttpConstants.OAUTH.equals(authType)) {
if (latch != null) {
latch.countDown();
}
String statusCode = Integer.toString(carbonMessage.getNettyHttpResponse().status().code());
HttpCallResponseSource responseSource = HTTPSourceRegistry.findAndGetResponseSource(sinkId, statusCode);
if (responseSource != null) {
HttpCallResponseConnectorListener responseConnectorListener = responseSource.getConnectorListener();
responseConnectorListener.onMessage(carbonMessage);
} else {
log.error("No source of type 'http-call-response' with sink.id '" + sinkId +
"' for the status code '" + statusCode +
"' defined. Hence dropping the response message.");
}

}

@Override
public void onError(Throwable throwable) {
if (throwable instanceof IOException) {
sink.initClientConnector(null);
sink.createClientConnector(null);
}

HttpCallResponseSource source = HTTPSourceRegistry.getCallResponseSource(sinkId,
HttpConstants.DEFAULT_HTTP_ERROR_CODE);
if (source != null) {
responseConnectorListener = source.getConnectorListener();
} else {
log.error("No source of type 'http-response' for status code '500' has been " +
"defined. Hence dropping the response message.");
if (latch != null) {
latch.countDown();
}
if (responseConnectorListener != null) {
responseConnectorListener.onError(throwable);
if (throwable instanceof ClientConnectorException || throwable instanceof ServerConnectorException) {
sink.onError(payload, dynamicOptions, new ConnectionUnavailableException(
"HTTP call sink on stream '" + sink.getStreamDefinition().getId() +
"' of Siddhi App '" + siddhiAppName +
"' failed to publish events to endpoint '" + publisherURL + "'. " +
throwable.getMessage(), throwable));
} else {
if (log.isDebugEnabled()) {
log.debug("No connector listener for the response source with sink id '" + sinkId + "' and http " +
"status code 500 found.");
}
sink.onError(payload, dynamicOptions,
new SiddhiAppRuntimeException("HTTP call sink on stream '" +
sink.getStreamDefinition().getId() +
"' of Siddhi App '" + siddhiAppName +
"' failed to publish events to endpoint '" + publisherURL + "'. " +
throwable.getMessage(), throwable));
}
}

/**
* Disconnect pool execution.
*/
void disconnect() {
responseConnectorListener.disconnect();
}

private HttpCallResponseSource findAndGetResponseSource(String statusCode) {
ResponseSourceId id = new ResponseSourceId(sinkId, statusCode);
for (Map.Entry entry : HTTPSourceRegistry.getCallResponseSourceRegistry().entrySet()) {
ResponseSourceId key = (ResponseSourceId) entry.getKey();
if (id.equals(key)) {
return (HttpCallResponseSource) entry.getValue();
}
public int getHttpResponseStatusCode() {
if (carbonMessages != null) {
return carbonMessages.getNettyHttpResponse().status().code();
} else {
return HttpConstants.CLIENT_REQUEST_TIMEOUT;
}
return null;
}

public HttpCarbonMessage getHttpResponseMessage() {
return carbonMessages;
}

}
Loading