diff --git a/component/pom.xml b/component/pom.xml index 01fb478d..930174de 100644 --- a/component/pom.xml +++ b/component/pom.xml @@ -18,7 +18,7 @@ siddhi-io-http-parent io.siddhi.extension.io.http - 2.1.3-SNAPSHOT + 2.2.0-SNAPSHOT ../pom.xml 4.0.0 diff --git a/component/src/main/java/io/siddhi/extension/io/http/sink/ClientConnector.java b/component/src/main/java/io/siddhi/extension/io/http/sink/ClientConnector.java new file mode 100644 index 00000000..d6687576 --- /dev/null +++ b/component/src/main/java/io/siddhi/extension/io/http/sink/ClientConnector.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.siddhi.extension.io.http.sink; + +import org.wso2.transport.http.netty.contract.HttpClientConnector; +import org.wso2.transport.http.netty.contract.HttpResponseFuture; +import org.wso2.transport.http.netty.message.HttpCarbonMessage; + +import java.util.Map; + +/** + * Class to have the client connector and related properties + */ +public class ClientConnector { + + private String publisherURL; + private Map httpURLProperties; + private HttpClientConnector httpClientConnector; + + public ClientConnector(String publisherURL, Map httpURLProperties, + HttpClientConnector httpClientConnector) { + this.publisherURL = publisherURL; + this.httpURLProperties = httpURLProperties; + this.httpClientConnector = httpClientConnector; + } + + public String getPublisherURL() { + return publisherURL; + } + + public HttpResponseFuture send(HttpCarbonMessage cMessage) { + return httpClientConnector.send(cMessage); + } + + public Map getHttpURLProperties() { + return httpURLProperties; + } + +} diff --git a/component/src/main/java/io/siddhi/extension/io/http/sink/HttpCallSink.java b/component/src/main/java/io/siddhi/extension/io/http/sink/HttpCallSink.java index d2d234e4..942d43fb 100644 --- a/component/src/main/java/io/siddhi/extension/io/http/sink/HttpCallSink.java +++ b/component/src/main/java/io/siddhi/extension/io/http/sink/HttpCallSink.java @@ -17,9 +17,7 @@ */ package io.siddhi.extension.io.http.sink; -import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.handler.codec.base64.Base64; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.HttpMethod; @@ -33,13 +31,10 @@ import io.siddhi.core.event.Event; import io.siddhi.core.exception.ConnectionUnavailableException; import io.siddhi.core.util.config.ConfigReader; -import io.siddhi.core.util.snapshot.state.State; import io.siddhi.core.util.snapshot.state.StateFactory; import io.siddhi.core.util.transport.DynamicOptions; import io.siddhi.core.util.transport.Option; import io.siddhi.core.util.transport.OptionHolder; -import io.siddhi.extension.io.http.sink.exception.HttpSinkAdaptorRuntimeException; -import io.siddhi.extension.io.http.sink.updatetoken.AccessTokenCache; import io.siddhi.extension.io.http.sink.util.HttpSinkUtil; import io.siddhi.extension.io.http.source.HttpResponseMessageListener; import io.siddhi.extension.io.http.util.HttpConstants; @@ -52,7 +47,6 @@ import org.wso2.transport.http.netty.message.HttpCarbonMessage; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -444,17 +438,9 @@ public class HttpCallSink extends HttpSink { private static final Logger log = Logger.getLogger(HttpCallSink.class); private String sinkId; private boolean isDownloadEnabled; - private StreamDefinition outputStreamDefinition; private Option downloadPath; - private Option publisherURLOption; - private String consumerKey; - private String consumerSecret; - private String authType; - private AccessTokenCache accessTokenCache = AccessTokenCache.getInstance(); - private String publisherURL; - private String tokenURL; private boolean isBlockingIO; - private CountDownLatch responseLatch; + private StreamDefinition outputStreamDefinition; @Override protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder, @@ -467,198 +453,17 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolde if (isDownloadEnabled) { this.downloadPath = optionHolder.validateAndGetOption(HttpConstants.DOWNLOAD_PATH); } - this.tokenURL = optionHolder.validateAndGetStaticValue(HttpConstants.TOKEN_URL, EMPTY_STRING); - String userName = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_USERNAME, EMPTY_STRING); - String password = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_PASSWORD, EMPTY_STRING); - this.consumerKey = optionHolder.validateAndGetStaticValue(HttpConstants.CONSUMER_KEY, EMPTY_STRING); - this.consumerSecret = optionHolder.validateAndGetStaticValue(HttpConstants.CONSUMER_SECRET, EMPTY_STRING); - String oauthUsername = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_OAUTH_USERNAME, - EMPTY_STRING); - this.publisherURLOption = optionHolder.validateAndGetOption(HttpConstants.PUBLISHER_URL); - String oauthUserPassword = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_OAUTH_PASSWORD, - EMPTY_STRING); - if (!HttpConstants.EMPTY_STRING.equals(userName) && !HttpConstants.EMPTY_STRING.equals(password)) { - authType = HttpConstants.BASIC_AUTH; - } else if ((!HttpConstants.EMPTY_STRING.equals(consumerKey) - && !HttpConstants.EMPTY_STRING.equals(consumerSecret)) || - (!HttpConstants.EMPTY_STRING.equals(oauthUsername) - && !HttpConstants.EMPTY_STRING.equals(oauthUserPassword))) { - authType = HttpConstants.OAUTH; - } else { - authType = HttpConstants.NO_AUTH; - } isBlockingIO = Boolean.parseBoolean( optionHolder.validateAndGetStaticValue(HttpConstants.BLOCKING_IO, HttpConstants.FALSE)); - if (isBlockingIO) { - responseLatch = new CountDownLatch(1); - } return stateFactory; } - /** - * This method will be called when events need to be published via this sink - * - * @param payload payload of the event based on the supported event class exported by the extensions - * @param dynamicOptions holds the dynamic options of this sink and Use this object to obtain dynamic options. - * @param state current state of the sink - * @throws ConnectionUnavailableException throw when connections are unavailable. - */ @Override - public void publish(Object payload, DynamicOptions dynamicOptions, State state) - throws ConnectionUnavailableException { - //get the dynamic parameter - String headers = httpHeaderOption.getValue(dynamicOptions); - List
headersList = HttpSinkUtil.getHeaders(headers); - if (publisherURLOption.isStatic()) { - publisherURL = publisherURLOption.getValue(); - } else { - publisherURL = publisherURLOption.getValue(dynamicOptions); - } - - if (authType.equals(HttpConstants.BASIC_AUTH) || authType.equals(HttpConstants.NO_AUTH)) { - sendRequest(payload, dynamicOptions, headersList, HttpConstants.MAXIMUM_TRY_COUNT); - } else { - sendOauthRequest(payload, dynamicOptions, headersList); - } - } - - private void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, List
headersList) - throws ConnectionUnavailableException { - //generate encoded base64 auth for getting refresh token - String consumerKeyValue = consumerKey + ":" + consumerSecret; - String encodedAuth = "Basic " + encodeBase64(consumerKeyValue); - //check the availability of access token in the header - setAccessToken(encodedAuth, dynamicOptions, headersList); - //send a request to API and get the response - int response = sendRequest(payload, dynamicOptions, headersList, HttpConstants.MINIMUM_TRY_COUNT); - //if authentication fails then get the new access token - if (response == HttpConstants.AUTHENTICATION_FAIL_CODE) { - handleOAuthFailure(payload, dynamicOptions, headersList, encodedAuth); - } else if (response == HttpConstants.SUCCESS_CODE) { - log.debug("Request sent successfully to " + publisherURL); - } else if (response == HttpConstants.INTERNAL_SERVER_FAIL_CODE) { - log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + - response + "- Internal server error. Message dropped."); - throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint: " + - publisherURL + "', with response code: " + response + "- Internal server error. Message dropped."); - } else { - log.error("Error at sending oauth request to API endpoint: " + - publisherURL + "', with response code: " + response + ". Message dropped. Message dropped."); - throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint: " + - publisherURL + "', with response code: " + response + ". Message dropped. Message dropped."); - } - } - - private void handleOAuthFailure(Object payload, DynamicOptions dynamicOptions, List
headersList, - String encodedAuth) throws ConnectionUnavailableException { - - Boolean checkFromCache = accessTokenCache.checkAvailableKey(encodedAuth); - if (checkFromCache) { - getNewAccessTokenWithCache(payload, dynamicOptions, headersList, encodedAuth); - } else { - requestForNewAccessToken(payload, dynamicOptions, headersList, encodedAuth); - } - } - - private void getNewAccessTokenWithCache(Object payload, DynamicOptions dynamicOptions, List
headersList, - String encodedAuth) throws ConnectionUnavailableException { - String accessToken = accessTokenCache.getAccessToken(encodedAuth); - for (Header header : headersList) { - if (header.getName().equals(HttpConstants.AUTHORIZATION_HEADER)) { - header.setValue(accessToken); - break; - } - } - //send a request to API with a new access token - int response = sendRequest(payload, dynamicOptions, headersList, HttpConstants.MINIMUM_TRY_COUNT); - if (response == HttpConstants.SUCCESS_CODE) { - log.debug("Request sent successfully to " + publisherURL); - } else if (response == HttpConstants.AUTHENTICATION_FAIL_CODE) { - requestForNewAccessToken(payload, dynamicOptions, headersList, encodedAuth); - } else if (response == HttpConstants.INTERNAL_SERVER_FAIL_CODE) { - log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + - response + "- Internal server error. Message dropped."); - throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + - publisherURL + "', with response code: " + response + "- Internal server error. Message dropped."); - } else { - log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + - response + ". Message dropped. "); - throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + - publisherURL + "', with response code: " + response + ". Message dropped."); - } - } - - private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOptions, List
headersList, - String encodedAuth) throws ConnectionUnavailableException { - Boolean checkRefreshToken = accessTokenCache.checkRefreshAvailableKey(encodedAuth); - if (checkRefreshToken) { - for (Header header : headersList) { - if (header.getName().equals(HttpConstants.RECEIVER_REFRESH_TOKEN)) { - if (accessTokenCache.getRefreshtoken(encodedAuth) != null) { - header.setValue(accessTokenCache.getRefreshtoken(encodedAuth)); - } - break; - } - } - } - getAccessToken(dynamicOptions, encodedAuth, tokenURL); - if (accessTokenCache.getResponseCode(encodedAuth) == HttpConstants.SUCCESS_CODE) { - String newAccessToken = accessTokenCache.getAccessToken(encodedAuth); - accessTokenCache.setAccessToken(encodedAuth, newAccessToken); - if (accessTokenCache.getRefreshtoken(encodedAuth) != null) { - accessTokenCache.setRefreshtoken(encodedAuth, accessTokenCache.getRefreshtoken(encodedAuth)); - } - for (Header header : headersList) { - if (header.getName().equals(HttpConstants.AUTHORIZATION_HEADER)) { - header.setValue(newAccessToken); - break; - } - } - //send a request to API with a new access token - int response = sendRequest(payload, dynamicOptions, headersList, HttpConstants.MAXIMUM_TRY_COUNT); - if (response == HttpConstants.SUCCESS_CODE) { - log.debug("Request sent successfully to " + publisherURL); - } else if (response == HttpConstants.AUTHENTICATION_FAIL_CODE) { - log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + - response + "- Authentication Failure. Please provide a valid Consumer key, Consumer secret" + - " and token endpoint URL . Message dropped"); - throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + - publisherURL + "', with response code: " + response + "- Authentication Failure. Please" + - " provide a valid Consumer key, Consumer secret and token endpoint URL . Message dropped"); - } else if (response == HttpConstants.INTERNAL_SERVER_FAIL_CODE) { - log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + - response + "- Internal server error. Message dropped."); - throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + - publisherURL + "', with response code:" + response + - "- Internal server error. Message dropped."); - } else { - log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + - response + ". Message dropped."); - throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + - publisherURL + "', with response code: " + response + ". Message dropped."); - } - - } else if (accessTokenCache.getResponseCode(encodedAuth) == HttpConstants.AUTHENTICATION_FAIL_CODE) { - log.error("Failed to generate new access token for the expired access token to " + publisherURL + "', " + - accessTokenCache.getResponseCode(encodedAuth) + ": Authentication Failure. Please provide" + - " a valid Consumer key, Consumer secret and token endpoint URL . Message dropped"); - throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired access " + - "token to " + publisherURL + "', " + accessTokenCache.getResponseCode(encodedAuth) + - ": Authentication Failure.Please provide a valid Consumer key, Consumer secret" + - " and token endpoint URL . Message dropped"); - } else { - log.error("Failed to generate new access token for the expired access token. Error code: " + - accessTokenCache.getResponseCode(encodedAuth) + ". Message dropped."); - throw new ConnectionUnavailableException("Failed to generate new access token for the expired" + - " access token. Error code: " + accessTokenCache.getResponseCode(encodedAuth) + - ". Message dropped."); - } - } - - private int sendRequest(Object payload, DynamicOptions dynamicOptions, List
headersList, int tryCount) + protected int sendRequest(Object payload, DynamicOptions dynamicOptions, List
headersList, + ClientConnector clientConnector) throws ConnectionUnavailableException { if (!publisherURLOption.isStatic()) { - super.initClientConnector(dynamicOptions); + super.createClientConnector(dynamicOptions); } if (mapType == null) { @@ -672,39 +477,44 @@ private int sendRequest(Object payload, DynamicOptions dynamicOptions, List getTrpProperties(DynamicOptions dynamicOptions) { return trpProperties; } - private String encodeBase64(String consumerKeyValue) { - ByteBuf byteBuf = Unpooled.wrappedBuffer(consumerKeyValue.getBytes(StandardCharsets.UTF_8)); - ByteBuf encodedByteBuf = Base64.encode(byteBuf); - return encodedByteBuf.toString(StandardCharsets.UTF_8); - } } diff --git a/component/src/main/java/io/siddhi/extension/io/http/sink/HttpSink.java b/component/src/main/java/io/siddhi/extension/io/http/sink/HttpSink.java index c9869aef..4646ca29 100644 --- a/component/src/main/java/io/siddhi/extension/io/http/sink/HttpSink.java +++ b/component/src/main/java/io/siddhi/extension/io/http/sink/HttpSink.java @@ -54,7 +54,6 @@ import org.apache.log4j.Logger; import org.wso2.carbon.messaging.Header; import org.wso2.transport.http.netty.contract.Constants; -import org.wso2.transport.http.netty.contract.HttpClientConnector; import org.wso2.transport.http.netty.contract.HttpConnectorListener; import org.wso2.transport.http.netty.contract.HttpResponseFuture; import org.wso2.transport.http.netty.contract.config.ChunkConfig; @@ -66,7 +65,6 @@ import java.io.UnsupportedEncodingException; import java.net.URLEncoder; -import java.net.UnknownHostException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.HashMap; @@ -75,6 +73,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static io.siddhi.extension.io.http.sink.util.HttpSinkUtil.createConnectorFactory; +import static io.siddhi.extension.io.http.sink.util.HttpSinkUtil.createPoolConfigurations; +import static io.siddhi.extension.io.http.sink.util.HttpSinkUtil.createProxyServerConfiguration; import static io.siddhi.extension.io.http.util.HttpConstants.EMPTY_STRING; import static io.siddhi.extension.io.http.util.HttpConstants.FALSE; import static io.siddhi.extension.io.http.util.HttpConstants.PORT_HOST_SEPARATOR; @@ -421,19 +422,17 @@ ) public class HttpSink extends Sink { private static final Logger log = Logger.getLogger(HttpSink.class); - HttpClientConnector clientConnector; String mapType; Option httpHeaderOption; Option httpMethodOption; - private String streamID; - private Map httpURLProperties; - private String consumerKey; - private String consumerSecret; + protected String streamID; + protected String consumerKey; + protected String consumerSecret; private String authorizationHeader; - private String userName; - private String userPassword; - private String publisherURL; - private Option publisherURLOption; + protected String userName; + protected String userPassword; + protected ClientConnector staticClientConnector; + protected Option publisherURLOption; private String clientStoreFile; private String clientStorePass; private int socketIdleTimeout; @@ -441,35 +440,21 @@ public class HttpSink extends Sink { private String tlsStoreType; private String chunkDisabled; private String parametersList; - private String proxyHost; - private String proxyPort; - private String proxyUsername; - private String proxyPassword; private String clientBootstrapConfiguration; - private String bootstrapWorker; - private String bootstrapBoss; - private String bootstrapClient; private ConfigReader configReader; - private SiddhiAppContext siddhiAppContext; - private String oauthUsername; - private String oauthUserPassword; + protected SiddhiAppContext siddhiAppContext; + protected String oauthUsername; + protected String oauthUserPassword; private Option refreshToken; - private String authType; + protected String authType; private AccessTokenCache accessTokenCache = AccessTokenCache.getInstance(); - private String tokenURL; - private int maxActivePerPool; - private int minIdlePerPool; - private int maxIdlePerPool; - private boolean testOnBorrow; - private boolean testWhileIdle; - private long timeBetweenEvictionRuns; - private long minEvictableIdleTime; - private byte exhaustedAction; - private long maxWaitTime; + protected String tokenURL; private String hostnameVerificationEnabled; private String sslVerificationDisabled; private DefaultHttpWsConnectorFactory httpConnectorFactory; + private ProxyServerConfiguration proxyServerConfiguration; + private PoolConfiguration connectionPoolConfiguration; /** * Returns the list of classes which this sink can consume. @@ -519,8 +504,6 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolde this.httpMethodOption = optionHolder.getOrCreateOption(HttpConstants.METHOD, HttpConstants.DEFAULT_METHOD); this.consumerKey = optionHolder.validateAndGetStaticValue(HttpConstants.CONSUMER_KEY, EMPTY_STRING); this.consumerSecret = optionHolder.validateAndGetStaticValue(HttpConstants.CONSUMER_SECRET, EMPTY_STRING); - this.userName = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_USERNAME, EMPTY_STRING); - this.userPassword = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_PASSWORD, EMPTY_STRING); this.oauthUsername = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_OAUTH_USERNAME, EMPTY_STRING); this.oauthUserPassword = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_OAUTH_PASSWORD, @@ -538,44 +521,20 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolde chunkDisabled = optionHolder.validateAndGetStaticValue(HttpConstants.CLIENT_CHUNK_DISABLED, EMPTY_STRING); //pool configurations - maxIdlePerPool = Integer.parseInt(optionHolder.validateAndGetStaticValue( - HttpConstants.MAX_IDLE_CONNECTIONS_PER_POOL, HttpConstants.DEFAULT_MAX_IDLE_CONNECTIONS_PER_POOL)); - minIdlePerPool = Integer.parseInt(optionHolder.validateAndGetStaticValue( - HttpConstants.MIN_IDLE_CONNECTIONS_PER_POOL, HttpConstants.DEFAULT_MIN_IDLE_CONNECTIONS_PER_POOL)); - maxActivePerPool = Integer.parseInt(optionHolder.validateAndGetStaticValue( - HttpConstants.MAX_ACTIVE_CONNECTIONS_PER_POOL, HttpConstants.DEFAULT_MAX_ACTIVE_CONNECTIONS_PER_POOL)); - testOnBorrow = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(HttpConstants.TEST_ON_BORROW, - HttpConstants.DEFAULT_TEST_ON_BORROW)); - testWhileIdle = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(HttpConstants.TEST_WHILE_IDLE, - HttpConstants.DEFAULT_TEST_WHILE_IDLE)); - timeBetweenEvictionRuns = Long.parseLong(optionHolder.validateAndGetStaticValue( - HttpConstants.TIME_BETWEEN_EVICTION_RUNS, HttpConstants.DEFAULT_TIME_BETWEEN_EVICTION_RUNS)); - minEvictableIdleTime = Long.parseLong(optionHolder.validateAndGetStaticValue( - HttpConstants.MIN_EVICTABLE_IDLE_TIME, HttpConstants.DEFAULT_MIN_EVICTABLE_IDLE_TIME)); - exhaustedAction = (byte) Integer.parseInt(optionHolder.validateAndGetStaticValue( - HttpConstants.EXHAUSTED_ACTION, HttpConstants.DEFAULT_EXHAUSTED_ACTION)); - maxWaitTime = Integer.parseInt(optionHolder.validateAndGetStaticValue( - HttpConstants.MAX_WAIT_TIME, HttpConstants.DEFAULT_MAX_WAIT_TIME)); + connectionPoolConfiguration = createPoolConfigurations(optionHolder); parametersList = optionHolder.validateAndGetStaticValue(HttpConstants.SINK_PARAMETERS, EMPTY_STRING); - proxyHost = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_HOST, EMPTY_STRING); - proxyPort = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_PORT, EMPTY_STRING); - proxyUsername = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_USERNAME, - EMPTY_STRING); - proxyPassword = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_PASSWORD, - EMPTY_STRING); + clientBootstrapConfiguration = optionHolder .validateAndGetStaticValue(HttpConstants.CLIENT_BOOTSTRAP_CONFIGURATION, EMPTY_STRING); - //read trp globe configuration - bootstrapWorker = configReader - .readConfig(HttpConstants.CLIENT_BOOTSTRAP_WORKER_GROUP_SIZE, EMPTY_STRING); - bootstrapBoss = configReader.readConfig(HttpConstants.CLIENT_BOOTSTRAP_BOSS_GROUP_SIZE, EMPTY_STRING); - bootstrapClient = configReader.readConfig(HttpConstants.CLIENT_BOOTSTRAP_CLIENT_GROUP_SIZE, - EMPTY_STRING); hostnameVerificationEnabled = optionHolder.validateAndGetStaticValue( HttpConstants.HOSTNAME_VERIFICATION_ENABLED, TRUE); sslVerificationDisabled = optionHolder.validateAndGetStaticValue(HttpConstants.SSL_VERIFICATION_DISABLED, FALSE); + + userName = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_USERNAME, EMPTY_STRING); + userPassword = optionHolder.validateAndGetStaticValue(HttpConstants.RECEIVER_PASSWORD, EMPTY_STRING); + if (!HttpConstants.EMPTY_STRING.equals(userName) && !HttpConstants.EMPTY_STRING.equals(userPassword)) { authType = HttpConstants.BASIC_AUTH; } else if ((!HttpConstants.EMPTY_STRING.equals(consumerKey) @@ -587,13 +546,30 @@ protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolde authType = HttpConstants.NO_AUTH; } - initConnectorFactory(); + //if username and password both not equal to null consider as basic auth enabled if only one is null take it + // as exception + if ((EMPTY_STRING.equals(userName) ^ + EMPTY_STRING.equals(userPassword))) { + throw new SiddhiAppCreationException("Please provide user name and password in " + + HttpConstants.HTTP_SINK_ID + " with the stream " + streamID + " in Siddhi app " + + siddhiAppContext.getName()); + } else if (!(EMPTY_STRING.equals(userName))) { + byte[] val = (userName + HttpConstants.AUTH_USERNAME_PASSWORD_SEPARATOR + userPassword).getBytes(Charset + .defaultCharset()); + this.authorizationHeader = HttpConstants.AUTHORIZATION_METHOD + Base64.encode + (Unpooled.copiedBuffer(val)); + } + + proxyServerConfiguration = createProxyServerConfiguration(optionHolder, streamID, siddhiAppContext.getName()); + + httpConnectorFactory = createConnectorFactory(configReader); if (publisherURLOption.isStatic()) { - initClientConnector(null); + staticClientConnector = createClientConnector(null); } return null; } + @Override protected ServiceDeploymentInfo exposeServiceDeploymentInfo() { return null; @@ -614,53 +590,118 @@ public void publish(Object payload, DynamicOptions dynamicOptions, State state) String headers = httpHeaderOption.getValue(dynamicOptions); List
headersList = HttpSinkUtil.getHeaders(headers); + ClientConnector clientConnector; + if (staticClientConnector != null) { + clientConnector = staticClientConnector; + } else { + clientConnector = createClientConnector(dynamicOptions); + } + + if (mapType == null) { + mapType = getMapper().getType(); + } + if (authType.equals(HttpConstants.BASIC_AUTH) || authType.equals(HttpConstants.NO_AUTH)) { - sendRequest(payload, dynamicOptions, headersList); + sendRequest(payload, dynamicOptions, headersList, clientConnector); } else { - sendOauthRequest(payload, dynamicOptions, headersList); + sendOauthRequest(payload, dynamicOptions, headersList, clientConnector); } } - private void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, List
headersList) + protected int sendRequest(Object payload, DynamicOptions dynamicOptions, List
headersList, + ClientConnector clientConnector) + throws ConnectionUnavailableException { + + String httpMethod = EMPTY_STRING.equals(httpMethodOption.getValue(dynamicOptions)) ? + HttpConstants.METHOD_DEFAULT : httpMethodOption.getValue(dynamicOptions); + String contentType = HttpSinkUtil.getContentType(mapType, headersList); + String messageBody = getMessageBody(payload); + HttpMethod httpReqMethod = new HttpMethod(httpMethod); + HttpCarbonMessage cMessage = new HttpCarbonMessage( + new DefaultHttpRequest(HttpVersion.HTTP_1_1, httpReqMethod, EMPTY_STRING)); + cMessage = generateCarbonMessage(headersList, contentType, httpMethod, cMessage, + clientConnector.getHttpURLProperties()); + if (!Constants.HTTP_GET_METHOD.equals(httpMethod)) { + cMessage.addHttpContent(new DefaultLastHttpContent(Unpooled.wrappedBuffer(messageBody + .getBytes(Charset.defaultCharset())))); + } + cMessage.completeMessage(); + if (HttpConstants.OAUTH.equals(authType)) { + CountDownLatch latch = new CountDownLatch(1); + DefaultListener listener = new DefaultListener(latch, authType); + HttpResponseFuture responseFuture = clientConnector.send(cMessage); + responseFuture.setHttpConnectorListener(listener); + try { + boolean latchCount = latch.await(30, TimeUnit.SECONDS); + if (!latchCount) { + log.debug("Time out due to getting getting response from " + clientConnector.getPublisherURL() + + ". Message dropped."); + throw new ConnectionUnavailableException("Time out due to getting getting response from " + + clientConnector.getPublisherURL() + ". Message dropped."); + } + } catch (InterruptedException e) { + log.debug("Failed to get a response from " + clientConnector.getPublisherURL() + "," + e + + ". Message dropped."); + throw new ConnectionUnavailableException("Failed to get a response from " + + clientConnector.getPublisherURL() + ", " + e + ". Message dropped."); + } + HttpCarbonMessage response = listener.getHttpResponseMessage(); + return response.getNettyHttpResponse().status().code(); + } else { + HttpResponseFuture responseFuture = clientConnector.send(cMessage); + HTTPResponseListener responseListener = new HTTPResponseListener(payload, dynamicOptions, this, + clientConnector.getPublisherURL()); + responseFuture.setHttpConnectorListener(responseListener); + return HttpConstants.SUCCESS_CODE; + } + } + + + protected void sendOauthRequest(Object payload, DynamicOptions dynamicOptions, List
headersList, + ClientConnector clientConnector) throws ConnectionUnavailableException { //generate encoded base64 auth for getting refresh token String consumerKeyValue = consumerKey + ":" + consumerSecret; String encodedAuth = "Basic " + encodeBase64(consumerKeyValue) .replaceAll(HttpConstants.NEW_LINE, HttpConstants.EMPTY_STRING); //check the availability of access token in the header - setAccessToken(encodedAuth, dynamicOptions, headersList); + setAccessToken(encodedAuth, dynamicOptions, headersList, clientConnector.getPublisherURL()); //send a request to API and get the response - int response = sendRequest(payload, dynamicOptions, headersList); + int response = sendRequest(payload, dynamicOptions, headersList, clientConnector); //if authentication fails then get the new access token if (response == HttpConstants.AUTHENTICATION_FAIL_CODE) { - handleOAuthFailure(payload, dynamicOptions, headersList, encodedAuth); + handleOAuthFailure(payload, dynamicOptions, headersList, encodedAuth, clientConnector); } else if (response == HttpConstants.SUCCESS_CODE) { - log.info("Request sent successfully to " + publisherURL); + log.info("Request sent successfully to " + clientConnector.getPublisherURL()); } else if (response == HttpConstants.INTERNAL_SERVER_FAIL_CODE) { - log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + + log.error("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + + "', with response code: " + response + "- Internal server error. Message dropped"); throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint, " + - publisherURL + "', with response code: " + response + "- Internal server error. Message dropped."); + clientConnector.getPublisherURL() + "', with response code: " + response + + "- Internal server error. Message dropped."); } else { log.error("Error at sending oauth request to API endpoint " + - publisherURL + "', with response code: " + response + ". Message dropped."); + clientConnector.getPublisherURL() + "', with response code: " + response + ". Message dropped."); throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + - publisherURL + "', and response code: " + response + ". Message dropped."); + clientConnector.getPublisherURL() + "', and response code: " + response + ". Message dropped."); } } private void handleOAuthFailure(Object payload, DynamicOptions dynamicOptions, List
headersList, - String encodedAuth) throws ConnectionUnavailableException { + String encodedAuth, ClientConnector clientConnector) + throws ConnectionUnavailableException { boolean checkFromCache = accessTokenCache.checkAvailableKey(encodedAuth); if (checkFromCache) { - getNewAccessTokenWithCache(payload, dynamicOptions, headersList, encodedAuth); + getNewAccessTokenWithCache(payload, dynamicOptions, headersList, encodedAuth, clientConnector); } else { - requestForNewAccessToken(payload, dynamicOptions, headersList, encodedAuth); + requestForNewAccessToken(payload, dynamicOptions, headersList, encodedAuth, clientConnector); } } private void getNewAccessTokenWithCache(Object payload, DynamicOptions dynamicOptions, List
headersList, - String encodedAuth) throws ConnectionUnavailableException { + String encodedAuth, ClientConnector clientConnector) + throws ConnectionUnavailableException { String accessToken = accessTokenCache.getAccessToken(encodedAuth); for (Header header : headersList) { if (header.getName().equals(HttpConstants.AUTHORIZATION_HEADER)) { @@ -669,26 +710,32 @@ private void getNewAccessTokenWithCache(Object payload, DynamicOptions dynamicOp } } //send a request to API with a new access token - int response = sendRequest(payload, dynamicOptions, headersList); + int response = sendRequest(payload, dynamicOptions, headersList, clientConnector + ); if (response == HttpConstants.SUCCESS_CODE) { - log.info("Request sent successfully to " + publisherURL); + log.info("Request sent successfully to " + clientConnector.getPublisherURL()); } else if (response == HttpConstants.AUTHENTICATION_FAIL_CODE) { - requestForNewAccessToken(payload, dynamicOptions, headersList, encodedAuth); + requestForNewAccessToken(payload, dynamicOptions, headersList, encodedAuth, clientConnector); } else if (response == HttpConstants.INTERNAL_SERVER_FAIL_CODE) { - log.error("Error at sending oauth request to API endpoint, " + publisherURL + "', with response code: " + + log.error("Error at sending oauth request to API endpoint, " + clientConnector.getPublisherURL() + + "', with response code: " + response + "- Internal server error. Message dropped"); throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint, " + - publisherURL + "', with response code: " + response + "- Internal server error. Message dropped"); + clientConnector.getPublisherURL() + "', with response code: " + response + + "- Internal server error. Message dropped"); } else { - log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + + log.error("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + + "', with response code: " + response + ". Message dropped."); - throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + publisherURL + + throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + + clientConnector.getPublisherURL() + "', with response code: " + response + ". Message dropped."); } } private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOptions, List
headersList, - String encodedAuth) throws ConnectionUnavailableException { + String encodedAuth, ClientConnector clientConnector) + throws ConnectionUnavailableException { Boolean checkRefreshToken = accessTokenCache.checkRefreshAvailableKey(encodedAuth); if (checkRefreshToken) { for (Header header : headersList) { @@ -714,35 +761,43 @@ private void requestForNewAccessToken(Object payload, DynamicOptions dynamicOpti } } //send a request to API with a new access token - int response = sendRequest(payload, dynamicOptions, headersList); + int response = sendRequest(payload, dynamicOptions, headersList, clientConnector + ); if (response == HttpConstants.SUCCESS_CODE) { - log.info("Request sent successfully to " + publisherURL); + log.info("Request sent successfully to " + clientConnector.getPublisherURL()); } else if (response == HttpConstants.AUTHENTICATION_FAIL_CODE) { - log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + + log.error("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + + "', with response code: " + response + "- Authentication Failure. Please provide a valid Consumer key, Consumer secret" + " and token endpoint URL . Message dropped"); throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " + - publisherURL + "', with response code: " + response + "- Authentication Failure." + + clientConnector.getPublisherURL() + "', with response code: " + response + + "- Authentication Failure." + " Please provide a valid Consumer key, Consumer secret and token endpoint URL." + " Message dropped"); } else if (response == HttpConstants.INTERNAL_SERVER_FAIL_CODE) { - log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + + log.error("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + + "', with response code: " + response + "- Internal server error. Message dropped"); throw new HttpSinkAdaptorRuntimeException("Error at sending oauth request to API endpoint " - + publisherURL + "', with response code: " + response + + + clientConnector.getPublisherURL() + "', with response code: " + response + "- Internal server error. Message dropped"); } else { - log.error("Error at sending oauth request to API endpoint " + publisherURL + "', with response code: " + + log.error("Error at sending oauth request to API endpoint " + clientConnector.getPublisherURL() + + "', with response code: " + response + ". Message dropped."); throw new ConnectionUnavailableException("Error at sending oauth request to API endpoint " + - publisherURL + "', with response code: " + response + ". Message dropped."); + clientConnector.getPublisherURL() + "', with response code: " + response + + ". Message dropped."); } } else if (accessTokenCache.getResponseCode(encodedAuth) == HttpConstants.AUTHENTICATION_FAIL_CODE) { - log.error("Failed to generate new access token for the expired access token to " + publisherURL + "', " + + log.error("Failed to generate new access token for the expired access token to " + + clientConnector.getPublisherURL() + "', " + accessTokenCache.getResponseCode(encodedAuth) + ": Authentication Failure.cPlease provide a " + "valid Consumer key, Consumer secret and token endpoint URL . Message dropped"); throw new HttpSinkAdaptorRuntimeException("Failed to generate new access token for the expired access " + - "token to " + publisherURL + "', " + accessTokenCache.getResponseCode(encodedAuth) + + "token to " + clientConnector.getPublisherURL() + "', " + + accessTokenCache.getResponseCode(encodedAuth) + ": Authentication Failure.Please provide a valid Consumer key, Consumer secret" + " and token endpoint URL . Message dropped"); } else { @@ -772,7 +827,8 @@ void getAccessToken(DynamicOptions dynamicOptions, String encodedAuth, String to } void setAccessToken(String encodedAuth, DynamicOptions dynamicOptions, - List
headersList) throws ConnectionUnavailableException { + List
headersList, String publisherURL) + throws ConnectionUnavailableException { //check the availability of the authorization String accessToken; boolean authAvailability = false; @@ -831,56 +887,6 @@ void setAccessToken(String encodedAuth, DynamicOptions dynamicOptions, } } - private int sendRequest(Object payload, DynamicOptions dynamicOptions, List
headersList) - throws ConnectionUnavailableException { - if (!publisherURLOption.isStatic()) { - initClientConnector(dynamicOptions); - } - - if (mapType == null) { - mapType = getMapper().getType(); - } - - String httpMethod = EMPTY_STRING.equals(httpMethodOption.getValue(dynamicOptions)) ? - HttpConstants.METHOD_DEFAULT : httpMethodOption.getValue(dynamicOptions); - String contentType = HttpSinkUtil.getContentType(mapType, headersList); - String messageBody = getMessageBody(payload); - HttpMethod httpReqMethod = new HttpMethod(httpMethod); - HttpCarbonMessage cMessage = new HttpCarbonMessage( - new DefaultHttpRequest(HttpVersion.HTTP_1_1, httpReqMethod, EMPTY_STRING)); - cMessage = generateCarbonMessage(headersList, contentType, httpMethod, cMessage); - if (!Constants.HTTP_GET_METHOD.equals(httpMethod)) { - cMessage.addHttpContent(new DefaultLastHttpContent(Unpooled.wrappedBuffer(messageBody - .getBytes(Charset.defaultCharset())))); - } - cMessage.completeMessage(); - if (HttpConstants.OAUTH.equals(authType)) { - CountDownLatch latch = new CountDownLatch(1); - DefaultListener listener = new DefaultListener(latch, authType); - HttpResponseFuture responseFuture = clientConnector.send(cMessage); - responseFuture.setHttpConnectorListener(listener); - try { - boolean latchCount = latch.await(30, TimeUnit.SECONDS); - if (!latchCount) { - log.debug("Time out due to getting getting response from " + publisherURL + ". Message dropped."); - throw new ConnectionUnavailableException("Time out due to getting getting response from " - + publisherURL + ". Message dropped."); - } - } catch (InterruptedException e) { - log.debug("Failed to get a response from " + publisherURL + "," + e + ". Message dropped."); - throw new ConnectionUnavailableException("Failed to get a response from " + - publisherURL + ", " + e + ". Message dropped."); - } - HttpCarbonMessage response = listener.getHttpResponseMessage(); - return response.getNettyHttpResponse().status().code(); - } else { - HttpResponseFuture responseFuture = clientConnector.send(cMessage); - HTTPResponseListener responseListener = new HTTPResponseListener(payload, dynamicOptions, this); - responseFuture.setHttpConnectorListener(responseListener); - return HttpConstants.SUCCESS_CODE; - } - } - /** * This method will be called before the processing method. * Intention to establish connection to publish event. @@ -896,9 +902,10 @@ public void connect() { */ @Override public void disconnect() { - if (clientConnector != null) { - clientConnector = null; - log.info("Server connector for url " + publisherURL + " disconnected."); + if (staticClientConnector != null) { + String publisherURL = staticClientConnector.getPublisherURL(); + staticClientConnector = null; + log.debug("Server connector for url " + publisherURL + " disconnected."); } if (httpConnectorFactory != null) { @@ -913,23 +920,26 @@ public void disconnect() { */ @Override public void destroy() { - if (clientConnector != null) { - clientConnector = null; - log.info("Server connector for url " + publisherURL + " disconnected."); + if (staticClientConnector != null) { + String publisherURL = staticClientConnector.getPublisherURL(); + staticClientConnector = null; + log.debug("Server connector for url " + publisherURL + " disconnected."); } } /** * The method is responsible of generating carbon message to send. * - * @param headers the headers set. - * @param contentType the content type. Value is if user has to given it as a header or if not it is map type. - * @param httpMethod http method type. - * @param cMessage carbon message to be send to the endpoint. + * @param headers the headers set. + * @param contentType the content type. Value is if user has to given it as a header or if not it is map type. + * @param httpMethod http method type. + * @param cMessage carbon message to be send to the endpoint. + * @param httpURLProperties * @return generated carbon message. */ HttpCarbonMessage generateCarbonMessage(List
headers, String contentType, - String httpMethod, HttpCarbonMessage cMessage) { + String httpMethod, HttpCarbonMessage cMessage, + Map httpURLProperties) { // Set protocol type http or https cMessage.setProperty(Constants.PROTOCOL, httpURLProperties.get(Constants.PROTOCOL)); // Set uri @@ -981,22 +991,8 @@ String getMessageBody(Object payload) { } } - private void initConnectorFactory() { - //if bootstrap configurations are given then pass it if not let take default value of transport - if (!EMPTY_STRING.equals(bootstrapBoss) && !EMPTY_STRING.equals(bootstrapWorker)) { - if (!EMPTY_STRING.equals(bootstrapClient)) { - httpConnectorFactory = new DefaultHttpWsConnectorFactory(Integer.parseInt(bootstrapBoss), - Integer.parseInt(bootstrapWorker), Integer.parseInt(bootstrapClient)); - } else { - httpConnectorFactory = new DefaultHttpWsConnectorFactory(Integer.parseInt(bootstrapBoss), - Integer.parseInt(bootstrapWorker), Integer.parseInt(bootstrapWorker)); - } - } else { - httpConnectorFactory = new DefaultHttpWsConnectorFactory(); - } - } - - public void initClientConnector(DynamicOptions dynamicOptions) { + public ClientConnector createClientConnector(DynamicOptions dynamicOptions) { + String publisherURL; if (publisherURLOption.isStatic()) { publisherURL = publisherURLOption.getValue(); } else { @@ -1014,7 +1010,7 @@ public void initClientConnector(DynamicOptions dynamicOptions) { } } String scheme = HttpSinkUtil.getScheme(publisherURL); - this.httpURLProperties = HttpSinkUtil.getURLProperties(publisherURL); + Map httpURLProperties = HttpSinkUtil.getURLProperties(publisherURL); //Generate basic sender configurations SenderConfiguration senderConfig = HttpSinkUtil .getSenderConfigurations(httpURLProperties, clientStoreFile, clientStorePass, configReader); @@ -1027,47 +1023,10 @@ public void initClientConnector(DynamicOptions dynamicOptions) { "default scheme is 'https'. Please provide client " + "trustStore file path and password in " + streamID); } - //if username and password both not equal to null consider as basic auth enabled if only one is null take it - // as exception - if ((EMPTY_STRING.equals(userName) ^ - EMPTY_STRING.equals(userPassword))) { - throw new SiddhiAppCreationException("Please provide user name and password in " + - HttpConstants.HTTP_SINK_ID + " with the stream " + streamID + " in Siddhi app " + - siddhiAppContext.getName()); - } else if (!(EMPTY_STRING.equals(userName))) { - byte[] val = (userName + HttpConstants.AUTH_USERNAME_PASSWORD_SEPARATOR + userPassword).getBytes(Charset - .defaultCharset()); - this.authorizationHeader = HttpConstants.AUTHORIZATION_METHOD + Base64.encode - (Unpooled.copiedBuffer(val)); - } - //if proxy username and password not equal to null then create proxy configurations - if (!EMPTY_STRING.equals(proxyHost) && !EMPTY_STRING.equals(proxyPort)) { - try { - ProxyServerConfiguration proxyServerConfiguration = new ProxyServerConfiguration(proxyHost, Integer - .parseInt(proxyPort)); - if (!EMPTY_STRING.equals(proxyPassword) && !EMPTY_STRING.equals - (proxyUsername)) { - proxyServerConfiguration.setProxyPassword(proxyPassword); - proxyServerConfiguration.setProxyUsername(proxyUsername); - } - senderConfig.setProxyServerConfiguration(proxyServerConfiguration); - } catch (UnknownHostException e) { - log.error("Proxy url and password is invalid in sink " + streamID + " of siddhi app " + - siddhiAppContext.getName(), e); - } + if (proxyServerConfiguration != null) { + senderConfig.setProxyServerConfiguration(proxyServerConfiguration); } - - PoolConfiguration poolConfiguration = new PoolConfiguration(); - poolConfiguration.setMaxActivePerPool(maxActivePerPool); - poolConfiguration.setMinIdlePerPool(minIdlePerPool); - poolConfiguration.setMaxIdlePerPool(maxIdlePerPool); - poolConfiguration.setTestOnBorrow(testOnBorrow); - poolConfiguration.setTestWhileIdle(testWhileIdle); - poolConfiguration.setTimeBetweenEvictionRuns(timeBetweenEvictionRuns); - poolConfiguration.setMinEvictableIdleTime(minEvictableIdleTime); - poolConfiguration.setExhaustedAction(exhaustedAction); - poolConfiguration.setMaxWaitTime(maxWaitTime); - senderConfig.setPoolConfiguration(poolConfiguration); + senderConfig.setPoolConfiguration(connectionPoolConfiguration); //add advanced sender configurations if (socketIdleTimeout != -1) { @@ -1102,7 +1061,8 @@ public void initClientConnector(DynamicOptions dynamicOptions) { //overwrite default transport configuration Map bootStrapProperties = HttpSinkUtil .populateTransportConfiguration(clientBootstrapConfiguration); - clientConnector = httpConnectorFactory.createHttpClientConnector(bootStrapProperties, senderConfig); + return new ClientConnector(publisherURL, httpURLProperties, + httpConnectorFactory.createHttpClientConnector(bootStrapProperties, senderConfig)); } private String encodeMessage(Object s) { @@ -1124,11 +1084,13 @@ static class HTTPResponseListener implements HttpConnectorListener { Object payload; DynamicOptions dynamicOptions; HttpSink httpSink; + private String publisherURL; - HTTPResponseListener(Object payload, DynamicOptions dynamicOptions, HttpSink httpSink) { + HTTPResponseListener(Object payload, DynamicOptions dynamicOptions, HttpSink httpSink, String publisherURL) { this.payload = payload; this.dynamicOptions = dynamicOptions; this.httpSink = httpSink; + this.publisherURL = publisherURL; } @Override @@ -1139,8 +1101,10 @@ public void onMessage(HttpCarbonMessage httpCarbonMessage) { @Override public void onError(Throwable throwable) { httpSink.onError(payload, dynamicOptions, - new ConnectionUnavailableException("Siddhi App " + httpSink.siddhiAppContext.getName() + - " failed to publish events to HTTP endpoint: " + httpSink.publisherURL, throwable)); + new ConnectionUnavailableException("HTTP sink on stream " + httpSink.streamID + + " of Siddhi App '" + httpSink.siddhiAppContext.getName() + + "' failed to publish events to endpoint '" + publisherURL + "'. " + + throwable.getMessage(), throwable)); } } } diff --git a/component/src/main/java/io/siddhi/extension/io/http/sink/util/HttpSinkUtil.java b/component/src/main/java/io/siddhi/extension/io/http/sink/util/HttpSinkUtil.java index 7e9828ec..2f7a639d 100644 --- a/component/src/main/java/io/siddhi/extension/io/http/sink/util/HttpSinkUtil.java +++ b/component/src/main/java/io/siddhi/extension/io/http/sink/util/HttpSinkUtil.java @@ -20,16 +20,21 @@ import io.siddhi.core.exception.SiddhiAppCreationException; import io.siddhi.core.util.config.ConfigReader; +import io.siddhi.core.util.transport.OptionHolder; import io.siddhi.extension.io.http.sink.exception.HttpSinkAdaptorRuntimeException; import io.siddhi.extension.io.http.util.HttpConstants; import io.siddhi.extension.io.http.util.TrpPropertyTypes; import org.apache.log4j.Logger; import org.wso2.carbon.messaging.Header; import org.wso2.transport.http.netty.contract.Constants; +import org.wso2.transport.http.netty.contract.config.ProxyServerConfiguration; import org.wso2.transport.http.netty.contract.config.SenderConfiguration; +import org.wso2.transport.http.netty.contractimpl.DefaultHttpWsConnectorFactory; +import org.wso2.transport.http.netty.contractimpl.sender.channel.pool.PoolConfiguration; import java.net.MalformedURLException; import java.net.URL; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -42,6 +47,7 @@ import static io.siddhi.extension.io.http.util.HttpConstants.CLIENT_BOOTSTRAP_SENDBUFFERSIZE; import static io.siddhi.extension.io.http.util.HttpConstants.CLIENT_BOOTSTRAP_SOCKET_REUSE; import static io.siddhi.extension.io.http.util.HttpConstants.CLIENT_BOOTSTRAP_SOCKET_TIMEOUT; +import static io.siddhi.extension.io.http.util.HttpConstants.EMPTY_STRING; import static io.siddhi.extension.io.http.util.HttpConstants.HTTP_TRACE_LOG_ENABLED; import static io.siddhi.extension.io.http.util.HttpConstants.LOG_TRACE_ENABLE_DEFAULT_VALUE; import static io.siddhi.extension.io.http.util.HttpConstants.PARAMETER_SEPARATOR; @@ -283,4 +289,84 @@ private static Map trpPropertyTypeMap() { trpPropertyTypes.put(CLIENT_BOOTSTRAP_SOCKET_TIMEOUT, TrpPropertyTypes.INTEGER); return trpPropertyTypes; } + + public static PoolConfiguration createPoolConfigurations(OptionHolder optionHolder) { + int maxIdlePerPool = Integer.parseInt(optionHolder.validateAndGetStaticValue( + HttpConstants.MAX_IDLE_CONNECTIONS_PER_POOL, HttpConstants.DEFAULT_MAX_IDLE_CONNECTIONS_PER_POOL)); + int minIdlePerPool = Integer.parseInt(optionHolder.validateAndGetStaticValue( + HttpConstants.MIN_IDLE_CONNECTIONS_PER_POOL, HttpConstants.DEFAULT_MIN_IDLE_CONNECTIONS_PER_POOL)); + int maxActivePerPool = Integer.parseInt(optionHolder.validateAndGetStaticValue( + HttpConstants.MAX_ACTIVE_CONNECTIONS_PER_POOL, HttpConstants.DEFAULT_MAX_ACTIVE_CONNECTIONS_PER_POOL)); + boolean testOnBorrow = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(HttpConstants.TEST_ON_BORROW, + HttpConstants.DEFAULT_TEST_ON_BORROW)); + boolean testWhileIdle = Boolean.parseBoolean( + optionHolder.validateAndGetStaticValue(HttpConstants.TEST_WHILE_IDLE, + HttpConstants.DEFAULT_TEST_WHILE_IDLE)); + long timeBetweenEvictionRuns = Long.parseLong(optionHolder.validateAndGetStaticValue( + HttpConstants.TIME_BETWEEN_EVICTION_RUNS, HttpConstants.DEFAULT_TIME_BETWEEN_EVICTION_RUNS)); + long minEvictableIdleTime = Long.parseLong(optionHolder.validateAndGetStaticValue( + HttpConstants.MIN_EVICTABLE_IDLE_TIME, HttpConstants.DEFAULT_MIN_EVICTABLE_IDLE_TIME)); + byte exhaustedAction = (byte) Integer.parseInt(optionHolder.validateAndGetStaticValue( + HttpConstants.EXHAUSTED_ACTION, HttpConstants.DEFAULT_EXHAUSTED_ACTION)); + int maxWaitTime = Integer.parseInt(optionHolder.validateAndGetStaticValue( + HttpConstants.MAX_WAIT_TIME, HttpConstants.DEFAULT_MAX_WAIT_TIME)); + PoolConfiguration connectionPoolConfiguration = new PoolConfiguration(); + connectionPoolConfiguration.setMaxActivePerPool(maxActivePerPool); + connectionPoolConfiguration.setMinIdlePerPool(minIdlePerPool); + connectionPoolConfiguration.setMaxIdlePerPool(maxIdlePerPool); + connectionPoolConfiguration.setTestOnBorrow(testOnBorrow); + connectionPoolConfiguration.setTestWhileIdle(testWhileIdle); + connectionPoolConfiguration.setTimeBetweenEvictionRuns(timeBetweenEvictionRuns); + connectionPoolConfiguration.setMinEvictableIdleTime(minEvictableIdleTime); + connectionPoolConfiguration.setExhaustedAction(exhaustedAction); + connectionPoolConfiguration.setMaxWaitTime(maxWaitTime); + return connectionPoolConfiguration; + } + + public static DefaultHttpWsConnectorFactory createConnectorFactory(ConfigReader configReader) { + //read trp globe configuration + String bootstrapWorker = configReader + .readConfig(HttpConstants.CLIENT_BOOTSTRAP_WORKER_GROUP_SIZE, EMPTY_STRING); + String bootstrapBoss = configReader.readConfig(HttpConstants.CLIENT_BOOTSTRAP_BOSS_GROUP_SIZE, EMPTY_STRING); + String bootstrapClient = configReader.readConfig(HttpConstants.CLIENT_BOOTSTRAP_CLIENT_GROUP_SIZE, + EMPTY_STRING); + //if bootstrap configurations are given then pass it if not let take default value of transport + if (!EMPTY_STRING.equals(bootstrapBoss) && !EMPTY_STRING.equals(bootstrapWorker)) { + if (!EMPTY_STRING.equals(bootstrapClient)) { + return new DefaultHttpWsConnectorFactory(Integer.parseInt(bootstrapBoss), + Integer.parseInt(bootstrapWorker), Integer.parseInt(bootstrapClient)); + } else { + return new DefaultHttpWsConnectorFactory(Integer.parseInt(bootstrapBoss), + Integer.parseInt(bootstrapWorker), Integer.parseInt(bootstrapWorker)); + } + } + return new DefaultHttpWsConnectorFactory(); + } + + public static ProxyServerConfiguration createProxyServerConfiguration(OptionHolder optionHolder, String streamID, + String appName) { + String proxyHost = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_HOST, EMPTY_STRING); + String proxyPort = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_PORT, EMPTY_STRING); + String proxyUsername = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_USERNAME, + EMPTY_STRING); + String proxyPassword = optionHolder.validateAndGetStaticValue(HttpConstants.PROXY_PASSWORD, + EMPTY_STRING); + //if proxy username and password not equal to null then create proxy configurations + if (!EMPTY_STRING.equals(proxyHost) && !EMPTY_STRING.equals(proxyPort)) { + try { + ProxyServerConfiguration proxyServerConfiguration = new ProxyServerConfiguration(proxyHost, Integer + .parseInt(proxyPort)); + if (!EMPTY_STRING.equals(proxyPassword) && !EMPTY_STRING.equals + (proxyUsername)) { + proxyServerConfiguration.setProxyPassword(proxyPassword); + proxyServerConfiguration.setProxyUsername(proxyUsername); + } + } catch (UnknownHostException e) { + log.error("Proxy url of sink defined in '" + streamID + "' of Siddhi App '" + + appName + "' is invalid.", e); + } + } + return null; + } + } diff --git a/component/src/main/java/io/siddhi/extension/io/http/source/HttpCallResponseConnectorListener.java b/component/src/main/java/io/siddhi/extension/io/http/source/HttpCallResponseConnectorListener.java index 3e36edd5..5c02cc50 100644 --- a/component/src/main/java/io/siddhi/extension/io/http/source/HttpCallResponseConnectorListener.java +++ b/component/src/main/java/io/siddhi/extension/io/http/source/HttpCallResponseConnectorListener.java @@ -68,7 +68,7 @@ public void onMessage(HttpCarbonMessage carbonMessage) { @Override public void onError(Throwable throwable) { - log.error("Error occurred during processing response for the request sent by http-request-sink with " + + log.error("Error occurred during processing response for the request sent by http-call sink with " + "'sink.id' = " + sinkId + " in Siddhi app " + siddhiAppName + ".", throwable); } diff --git a/component/src/main/java/io/siddhi/extension/io/http/source/HttpCallResponseSource.java b/component/src/main/java/io/siddhi/extension/io/http/source/HttpCallResponseSource.java index df770881..c3d8bfa8 100644 --- a/component/src/main/java/io/siddhi/extension/io/http/source/HttpCallResponseSource.java +++ b/component/src/main/java/io/siddhi/extension/io/http/source/HttpCallResponseSource.java @@ -183,13 +183,12 @@ public void connect(ConnectionCallback connectionCallback, State state) throws C new HttpCallResponseConnectorListener(Integer.parseInt(workerThread), sourceEventListener, shouldAllowStreamingResponses, sinkId, requestedTransportPropertyNames, siddhiAppName); this.httpConnectorRegistry.registerSourceListener(httpCallResponseSourceListener, sinkId, httpStatusCode); - HTTPSourceRegistry.registerCallResponseSource(sinkId, httpStatusCode, this); } @Override public void disconnect() { - this.httpConnectorRegistry.unregisterSourceListener(sinkId, httpStatusCode, siddhiAppName); + httpConnectorRegistry.unregisterSourceListener(sinkId, httpStatusCode, siddhiAppName); HTTPSourceRegistry.removeCallResponseSource(sinkId, httpStatusCode); } @@ -210,4 +209,10 @@ public void resume() { public HttpCallResponseConnectorListener getConnectorListener() { return httpCallResponseSourceListener; } + + public boolean matches(String thatSinkId, String thatStatusCode) { + return (sinkId != null ? sinkId.equals(thatSinkId) : thatSinkId == null) && + (httpStatusCode != null ? + thatStatusCode != null && thatStatusCode.matches(httpStatusCode) : thatStatusCode == null); + } } diff --git a/component/src/main/java/io/siddhi/extension/io/http/source/HttpResponseMessageListener.java b/component/src/main/java/io/siddhi/extension/io/http/source/HttpResponseMessageListener.java index 71b98b03..a4e25efe 100644 --- a/component/src/main/java/io/siddhi/extension/io/http/source/HttpResponseMessageListener.java +++ b/component/src/main/java/io/siddhi/extension/io/http/source/HttpResponseMessageListener.java @@ -19,13 +19,17 @@ package io.siddhi.extension.io.http.source; +import io.siddhi.core.exception.ConnectionUnavailableException; +import io.siddhi.core.exception.SiddhiAppRuntimeException; +import io.siddhi.core.util.transport.DynamicOptions; import io.siddhi.extension.io.http.sink.HttpSink; import io.siddhi.extension.io.http.util.HTTPSourceRegistry; import io.siddhi.extension.io.http.util.HttpConstants; -import io.siddhi.extension.io.http.util.ResponseSourceId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.wso2.transport.http.netty.contract.HttpConnectorListener; +import org.wso2.transport.http.netty.contract.exceptions.ClientConnectorException; +import org.wso2.transport.http.netty.contract.exceptions.ServerConnectorException; import org.wso2.transport.http.netty.message.HttpCarbonMessage; import java.io.IOException; @@ -40,28 +44,30 @@ public class HttpResponseMessageListener implements HttpConnectorListener { private static final Logger log = LoggerFactory.getLogger(HttpResponseMessageListener.class); - private HttpCallResponseConnectorListener responseConnectorListener; private Map trpProperties; private boolean isDownloadEnabled; private String sinkId; private HttpCarbonMessage carbonMessages; private CountDownLatch latch; - private int tryCount; - private String authType; - private boolean isBlockingIO; private HttpSink sink; + private final Object payload; + private final DynamicOptions dynamicOptions; + private String siddhiAppName; + private String publisherURL; public HttpResponseMessageListener(HttpSink sink, Map trpProperties, String sinkId, - boolean isDownloadEnabled, CountDownLatch latch, int tryCount, String authType, - boolean isBlockingIO) { + boolean isDownloadEnabled, CountDownLatch latch, + Object payload, DynamicOptions dynamicOptions, + String siddhiAppName, String publisherURL) { this.trpProperties = trpProperties; this.isDownloadEnabled = isDownloadEnabled; this.sinkId = sinkId; this.latch = latch; - this.tryCount = tryCount; - this.authType = authType; - this.isBlockingIO = isBlockingIO; this.sink = sink; + this.payload = payload; + this.dynamicOptions = dynamicOptions; + this.siddhiAppName = siddhiAppName; + this.publisherURL = publisherURL; } @Override @@ -71,67 +77,52 @@ public void onMessage(HttpCarbonMessage carbonMessage) { }); carbonMessage.setProperty(HttpConstants.IS_DOWNLOADABLE_CONTENT, isDownloadEnabled); this.carbonMessages = carbonMessage; - String statusCode = Integer.toString(carbonMessage.getNettyHttpResponse().status().code()); - if (carbonMessage.getNettyHttpResponse().status().code() == (HttpConstants.SUCCESS_CODE) || - HttpConstants.MAXIMUM_TRY_COUNT == tryCount) { - HttpCallResponseSource responseSource = findAndGetResponseSource(statusCode); - if (responseSource != null) { - responseConnectorListener = responseSource.getConnectorListener(); - responseConnectorListener.onMessage(carbonMessage); - } else { - log.error("No source of type 'http-response' that matches with the status code '" + statusCode + - "' has been defined. Hence dropping the response message."); - } - } - if (isBlockingIO || HttpConstants.OAUTH.equals(authType)) { + if (latch != null) { latch.countDown(); } + String statusCode = Integer.toString(carbonMessage.getNettyHttpResponse().status().code()); + HttpCallResponseSource responseSource = HTTPSourceRegistry.findAndGetResponseSource(sinkId, statusCode); + if (responseSource != null) { + HttpCallResponseConnectorListener responseConnectorListener = responseSource.getConnectorListener(); + responseConnectorListener.onMessage(carbonMessage); + } else { + log.error("No source of type 'http-call-response' with sink.id '" + sinkId + + "' for the status code '" + statusCode + + "' defined. Hence dropping the response message."); + } + } @Override public void onError(Throwable throwable) { if (throwable instanceof IOException) { - sink.initClientConnector(null); + sink.createClientConnector(null); } - - HttpCallResponseSource source = HTTPSourceRegistry.getCallResponseSource(sinkId, - HttpConstants.DEFAULT_HTTP_ERROR_CODE); - if (source != null) { - responseConnectorListener = source.getConnectorListener(); - } else { - log.error("No source of type 'http-response' for status code '500' has been " + - "defined. Hence dropping the response message."); + if (latch != null) { + latch.countDown(); } - if (responseConnectorListener != null) { - responseConnectorListener.onError(throwable); + if (throwable instanceof ClientConnectorException || throwable instanceof ServerConnectorException) { + sink.onError(payload, dynamicOptions, new ConnectionUnavailableException( + "HTTP call sink on stream '" + sink.getStreamDefinition().getId() + + "' of Siddhi App '" + siddhiAppName + + "' failed to publish events to endpoint '" + publisherURL + "'. " + + throwable.getMessage(), throwable)); } else { - if (log.isDebugEnabled()) { - log.debug("No connector listener for the response source with sink id '" + sinkId + "' and http " + - "status code 500 found."); - } + sink.onError(payload, dynamicOptions, + new SiddhiAppRuntimeException("HTTP call sink on stream '" + + sink.getStreamDefinition().getId() + + "' of Siddhi App '" + siddhiAppName + + "' failed to publish events to endpoint '" + publisherURL + "'. " + + throwable.getMessage(), throwable)); } } - /** - * Disconnect pool execution. - */ - void disconnect() { - responseConnectorListener.disconnect(); - } - - private HttpCallResponseSource findAndGetResponseSource(String statusCode) { - ResponseSourceId id = new ResponseSourceId(sinkId, statusCode); - for (Map.Entry entry : HTTPSourceRegistry.getCallResponseSourceRegistry().entrySet()) { - ResponseSourceId key = (ResponseSourceId) entry.getKey(); - if (id.equals(key)) { - return (HttpCallResponseSource) entry.getValue(); - } + public int getHttpResponseStatusCode() { + if (carbonMessages != null) { + return carbonMessages.getNettyHttpResponse().status().code(); + } else { + return HttpConstants.CLIENT_REQUEST_TIMEOUT; } - return null; - } - - public HttpCarbonMessage getHttpResponseMessage() { - return carbonMessages; } } diff --git a/component/src/main/java/io/siddhi/extension/io/http/util/HTTPSourceRegistry.java b/component/src/main/java/io/siddhi/extension/io/http/util/HTTPSourceRegistry.java index d15e585d..24e3fb0f 100644 --- a/component/src/main/java/io/siddhi/extension/io/http/util/HTTPSourceRegistry.java +++ b/component/src/main/java/io/siddhi/extension/io/http/util/HTTPSourceRegistry.java @@ -30,7 +30,7 @@ public class HTTPSourceRegistry { private static Map serviceSourceRegistry = new ConcurrentHashMap<>(); - private static Map callResponseSourceRegistry = new ConcurrentHashMap<>(); + private static Map callResponseSourceRegistry = new ConcurrentHashMap<>(); // handle service sources public static HttpServiceSource getServiceSource(String sourceId) { @@ -45,20 +45,20 @@ public static void removeServiceSource(String sourceId) { serviceSourceRegistry.remove(sourceId); } - // handle response sources - public static HttpCallResponseSource getCallResponseSource(String sinkId, String statusCode) { - return callResponseSourceRegistry.get(new ResponseSourceId(sinkId, statusCode)); - } - public static void registerCallResponseSource(String sinkId, String statusCode, HttpCallResponseSource source) { - callResponseSourceRegistry.put(new ResponseSourceId(sinkId, statusCode), source); + callResponseSourceRegistry.put(sinkId + statusCode, source); } public static void removeCallResponseSource(String sinkId, String statusCode) { - callResponseSourceRegistry.remove(new ResponseSourceId(sinkId, statusCode)); + callResponseSourceRegistry.remove(sinkId + statusCode); } - public static Map getCallResponseSourceRegistry() { - return callResponseSourceRegistry; + public static HttpCallResponseSource findAndGetResponseSource(String sinkId, String statusCode) { + for (HttpCallResponseSource responseSource : callResponseSourceRegistry.values()) { + if (responseSource.matches(sinkId, statusCode)) { + return responseSource; + } + } + return null; } } diff --git a/component/src/main/java/io/siddhi/extension/io/http/util/HttpConstants.java b/component/src/main/java/io/siddhi/extension/io/http/util/HttpConstants.java index e12b5ac9..bf15b56a 100644 --- a/component/src/main/java/io/siddhi/extension/io/http/util/HttpConstants.java +++ b/component/src/main/java/io/siddhi/extension/io/http/util/HttpConstants.java @@ -187,7 +187,6 @@ public class HttpConstants { // HTTP codes for response source public static final String HTTP_STATUS_CODE = "http.status.code"; public static final String DEFAULT_HTTP_SUCCESS_CODE = "200"; - public static final String DEFAULT_HTTP_ERROR_CODE = "500"; // HTTP Default ports if the port is not present in the given URL public static final int DEFAULT_HTTP_PORT = 80; public static final int DEFAULT_HTTPS_PORT = 443; @@ -203,6 +202,7 @@ public class HttpConstants { public static final String USERNAME = "username"; public static final String PASSWORD = "password"; public static final int SUCCESS_CODE = 200; + public static final int CLIENT_REQUEST_TIMEOUT = 408; public static final int AUTHENTICATION_FAIL_CODE = 401; public static final int PERSISTENT_ACCESS_FAIL_CODE = 400; public static final int INTERNAL_SERVER_FAIL_CODE = 500; diff --git a/component/src/main/java/io/siddhi/extension/io/http/util/ResponseSourceId.java b/component/src/main/java/io/siddhi/extension/io/http/util/ResponseSourceId.java deleted file mode 100644 index c0c84861..00000000 --- a/component/src/main/java/io/siddhi/extension/io/http/util/ResponseSourceId.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2018 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 Inc. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package io.siddhi.extension.io.http.util; - -/** - * Class for holding sinkid and the status code for a response source - */ -public class ResponseSourceId { - private String sinkId; - private String httpCode; - - public ResponseSourceId(String sinkId, String httpCode) { - this.sinkId = sinkId; - this.httpCode = httpCode; - } - - public String getSinkId() { - return sinkId; - } - - public String getHttpCode() { - return httpCode; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - ResponseSourceId that = (ResponseSourceId) o; - - if (sinkId != null ? !sinkId.equals(that.sinkId) : that.sinkId != null) { - return false; - } - return httpCode != null ? httpCode.matches(that.httpCode) : that.httpCode == null; - } - - @Override - public int hashCode() { - int result = sinkId != null ? sinkId.hashCode() : 0; - result = 31 * result + (httpCode != null ? httpCode.hashCode() : 0); - return result; - } -} diff --git a/component/src/test/java/io/siddhi/extension/io/http/sink/HttpCallResponseTestCase.java b/component/src/test/java/io/siddhi/extension/io/http/sink/HttpCallResponseTestCase.java index 7b807856..f82a99fb 100644 --- a/component/src/test/java/io/siddhi/extension/io/http/sink/HttpCallResponseTestCase.java +++ b/component/src/test/java/io/siddhi/extension/io/http/sink/HttpCallResponseTestCase.java @@ -29,6 +29,7 @@ import org.apache.log4j.Logger; import org.testng.Assert; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.io.File; @@ -41,7 +42,6 @@ public class HttpCallResponseTestCase { private String downloadPath; private String rootPath; - @BeforeClass public void init() { ClassLoader classLoader = getClass().getClassLoader(); @@ -49,6 +49,11 @@ public void init() { downloadPath = rootPath + "/downloads"; } + @BeforeMethod + public void reset() { + eventCount.set(0); + } + @Test public void testHTTPRequestResponse1() throws Exception { log.info("Send a POST request with a json body message and receive the response"); @@ -107,7 +112,7 @@ public void receive(Event[] events) { httpServerListenerHandler.shutdown(); } - @Test + @Test(dependsOnMethods = "testHTTPRequestResponse1") public void testHTTPRequestResponse2() throws Exception { log.info("Send a POST request with a json body message and receive the response along with attributes in the " + "request"); @@ -168,7 +173,7 @@ public void receive(Event[] events) { httpServerListenerHandler.shutdown(); } - @Test + @Test(dependsOnMethods = "testHTTPRequestResponse2") public void testHTTPRequestResponse3() throws Exception { log.info("Download a file"); SiddhiManager siddhiManager = new SiddhiManager(); @@ -223,16 +228,16 @@ public void receive(Event[] events) { SiddhiTestHelper.waitForEvents(1000, 1, eventCount, 1000); File file = new File(downloadPath); - Assert.assertTrue(file != null); Assert.assertTrue(file.isFile()); Assert.assertEquals(file.getName(), "downloadedFile.txt"); + file.delete(); Assert.assertEquals(eventCount.get(), 1); siddhiAppRuntime.shutdown(); httpFileServerListenerHandler.shutdown(); } - @Test + @Test(dependsOnMethods = "testHTTPRequestResponse3") public void testHTTPRequestResponse4() throws Exception { log.info("Try to download a file that not exists."); SiddhiManager siddhiManager = new SiddhiManager(); @@ -241,7 +246,7 @@ public void testHTTPRequestResponse4() throws Exception { "@sink(type='http-call'," + "downloading.enabled='true'," + "download.path='{{downloadPath}}'," + - "publisher.url='http://localhost:8005/files2', " + + "publisher.url='http://localhost:8005/files', " + "method='GET'," + "headers='{{headers}}',sink.id='source-1'," + "@map(type='json')) " + @@ -300,12 +305,111 @@ public void receive(Event[] events) { SiddhiTestHelper.waitForEvents(1000, 1, eventCount, 1000); File file = new File(downloadPath); - Assert.assertTrue(file != null); - Assert.assertTrue(file.isFile()); - Assert.assertEquals(file.getName(), "downloadedFile.txt"); + Assert.assertFalse(file.isFile()); Assert.assertEquals(eventCount.get(), 1); siddhiAppRuntime.shutdown(); httpFileServerListenerHandler.shutdown(); } + + @Test(dependsOnMethods = "testHTTPRequestResponse4") + public void testHTTPRequestResponse5() throws Exception { + log.info("Send a POST request with a json body message and receive the response"); + SiddhiManager siddhiManager = new SiddhiManager(); + String inStreamDefinition = "" + + "define stream FooStream (message String,headers String);" + + "" + + "@sink(type='http-call', publisher.url='http://localhost:8005/abc'," + + " method='POST', headers='{{headers}}', sink.id='source-1'," + + "@map(type='json', @payload('{{message}}'))) " + + "define stream BarStream (message String, headers String);" + + "" + + "@source(type='http-call-response', sink.id='source-1', " + + "@map(type='json', @attributes(name='name', id='id')))" + + "define stream responseStream(name String, id int);"; + String query = "" + + "@info(name = 'query') " + + "from FooStream " + + "select message,headers " + + "insert into BarStream;"; + + String payload = "{\"name\":\"wso2\", \"id\":\"1234\"}"; + String headers = "'comapny:wso2', country:sl'"; + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(inStreamDefinition + query); + InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream"); + StreamCallback streamCallback = new StreamCallback() { + @Override + public void receive(Event[] events) { + Assert.fail(); + } + }; + + siddhiAppRuntime.addCallback("responseStream", streamCallback); + siddhiAppRuntime.start(); + + fooStream.send(new Object[]{payload, headers}); + SiddhiTestHelper.waitForEvents(1000, 1, eventCount, 1000); + + Assert.assertEquals(eventCount.get(), 0); + siddhiAppRuntime.shutdown(); + } + + @Test(dependsOnMethods = "testHTTPRequestResponse5") + public void testHTTPRequestResponse6() throws Exception { + log.info("Send a POST request with a json body message and receive the response"); + SiddhiManager siddhiManager = new SiddhiManager(); + String inStreamDefinition = "" + + "define stream FooStream (message String,headers String);" + + "" + + "@sink(type='http-call', publisher.url='http://localhost:8005/abc'," + + " method='POST', headers='{{headers}}',sink.id='source-1', on.error='wait', " + + "@map(type='json', @payload('{{message}}'))) " + + "define stream BarStream (message String, headers String);" + + "" + + "@source(type='http-call-response', sink.id='source-1', " + + "@map(type='json', @attributes(name='name', id='id')))" + + "define stream responseStream(name String, id int);"; + String query = "" + + "@info(name = 'query') " + + "from FooStream " + + "select message,headers " + + "insert into BarStream;"; + + String payload = "{\"name\":\"wso2\", \"id\":\"1234\"}"; + String headers = "'comapny:wso2', country:sl'"; + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(inStreamDefinition + query); + InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream"); + StreamCallback streamCallback = new StreamCallback() { + @Override + public void receive(Event[] events) { + for (int i = 0; i < events.length; i++) { + eventCount.incrementAndGet(); + switch (i) { + case 0: + Assert.assertEquals("wso2", (String) events[i].getData()[0]); + Assert.assertEquals(1234, events[i].getData()[1]); + break; + + default: + Assert.fail(); + } + } + } + }; + + siddhiAppRuntime.addCallback("responseStream", streamCallback); + siddhiAppRuntime.start(); + + fooStream.send(new Object[]{payload, headers}); + Thread.sleep(1000); + HttpServerListenerHandler httpServerListenerHandler = new HttpServerListenerHandler(8005); + httpServerListenerHandler.run(); + Thread.sleep(5000); + + SiddhiTestHelper.waitForEvents(1000, 1, eventCount, 1000); + + Assert.assertEquals(eventCount.get(), 1); + siddhiAppRuntime.shutdown(); + httpServerListenerHandler.shutdown(); + } } diff --git a/component/src/test/java/io/siddhi/extension/io/http/sink/HttpRequestResponseTestCase.java b/component/src/test/java/io/siddhi/extension/io/http/sink/HttpRequestResponseTestCase.java index 0184d41f..98470ff7 100644 --- a/component/src/test/java/io/siddhi/extension/io/http/sink/HttpRequestResponseTestCase.java +++ b/component/src/test/java/io/siddhi/extension/io/http/sink/HttpRequestResponseTestCase.java @@ -43,7 +43,6 @@ public class HttpRequestResponseTestCase { private String downloadPath; private String rootPath; - @BeforeClass public void init() { ClassLoader classLoader = getClass().getClassLoader(); @@ -52,7 +51,7 @@ public void init() { } @BeforeMethod - public void initMethod() { + public void reset() { eventCount.set(0); } @@ -114,7 +113,7 @@ public void receive(Event[] events) { httpServerListenerHandler.shutdown(); } - @Test + @Test(dependsOnMethods = "testHTTPRequestResponse1") public void testHTTPRequestResponse2() throws Exception { log.info("Send a POST request with a json body message and receive the response along with attributes in the " + "request"); @@ -175,9 +174,10 @@ public void receive(Event[] events) { httpServerListenerHandler.shutdown(); } - @Test + @Test(dependsOnMethods = "testHTTPRequestResponse2") public void testHTTPRequestResponse3() throws Exception { log.info("Download a file"); + SiddhiManager siddhiManager = new SiddhiManager(); String inStreamDefinition = "" + "define stream FooStream (name String, id int, headers String, downloadPath string);" @@ -203,6 +203,7 @@ public void testHTTPRequestResponse3() throws Exception { String downloadPath = rootPath + File.separator + "downloadedFile.txt"; SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(inStreamDefinition + query); + InputHandler fooStream = siddhiAppRuntime.getInputHandler("FooStream"); StreamCallback streamCallback = new StreamCallback() { @Override @@ -224,22 +225,20 @@ public void receive(Event[] events) { HttpFileServerListenerHandler httpFileServerListenerHandler = new HttpFileServerListenerHandler(8007); httpFileServerListenerHandler.run(); siddhiAppRuntime.start(); - fooStream.send(new Object[]{"wso2", 100, "'country:sri-lanka'", downloadPath}); SiddhiTestHelper.waitForEvents(1000, 1, eventCount, 1000); File file = new File(downloadPath); - Assert.assertTrue(file != null); Assert.assertTrue(file.isFile()); Assert.assertEquals(file.getName(), "downloadedFile.txt"); - + file.delete(); Assert.assertEquals(eventCount.get(), 1); siddhiAppRuntime.shutdown(); httpFileServerListenerHandler.shutdown(); } - @Test + @Test(dependsOnMethods = "testHTTPRequestResponse3") public void testHTTPRequestResponse4() throws Exception { log.info("Try to download a file that not exists."); SiddhiManager siddhiManager = new SiddhiManager(); @@ -307,9 +306,7 @@ public void receive(Event[] events) { SiddhiTestHelper.waitForEvents(1000, 1, eventCount, 1000); File file = new File(downloadPath); - Assert.assertTrue(file != null); - Assert.assertTrue(file.isFile()); - Assert.assertEquals(file.getName(), "downloadedFile.txt"); + Assert.assertFalse(file.isFile()); Assert.assertEquals(eventCount.get(), 1); siddhiAppRuntime.shutdown(); diff --git a/component/src/test/java/io/siddhi/extension/io/http/sink/util/HttpFileServerListenerHandler.java b/component/src/test/java/io/siddhi/extension/io/http/sink/util/HttpFileServerListenerHandler.java index 1867f735..5a621f67 100644 --- a/component/src/test/java/io/siddhi/extension/io/http/sink/util/HttpFileServerListenerHandler.java +++ b/component/src/test/java/io/siddhi/extension/io/http/sink/util/HttpFileServerListenerHandler.java @@ -48,6 +48,7 @@ public void run() { try { server = HttpServer.create(new InetSocketAddress(port), 5); server.createContext("/files", fileServerListener); + logger.info("Start server on port '" + port + "'"); server.start(); } catch (IOException e) { logger.error("Error in creating test server.", e); @@ -56,7 +57,7 @@ public void run() { public void shutdown() { if (server != null) { - logger.info("Shutting down"); + logger.info("Shutting down server on port '" + port + "'"); server.stop(1); } } diff --git a/component/src/test/java/io/siddhi/extension/io/http/sink/util/HttpServerListenerHandler.java b/component/src/test/java/io/siddhi/extension/io/http/sink/util/HttpServerListenerHandler.java index b981d083..53a055ae 100644 --- a/component/src/test/java/io/siddhi/extension/io/http/sink/util/HttpServerListenerHandler.java +++ b/component/src/test/java/io/siddhi/extension/io/http/sink/util/HttpServerListenerHandler.java @@ -43,6 +43,7 @@ public void run() { try { server = HttpServer.create(new InetSocketAddress(port), 5); server.createContext("/abc", serverListener); + logger.info("Start server on port '" + port + "'"); server.start(); } catch (IOException e) { logger.error("Error in creating test server.", e); @@ -51,7 +52,7 @@ public void run() { public void shutdown() { if (server != null) { - logger.info("Shutting down"); + logger.info("Shutting down server on port '" + port + "'"); server.stop(1); } } diff --git a/coverage-report/pom.xml b/coverage-report/pom.xml index 13af5c85..f0e76ccd 100644 --- a/coverage-report/pom.xml +++ b/coverage-report/pom.xml @@ -21,7 +21,7 @@ siddhi-io-http-parent io.siddhi.extension.io.http - 2.1.3-SNAPSHOT + 2.2.0-SNAPSHOT ../pom.xml 4.0.0 diff --git a/pom.xml b/pom.xml index 6c4bd697..2de59f77 100644 --- a/pom.xml +++ b/pom.xml @@ -36,7 +36,7 @@ io.siddhi.extension.io.http siddhi-io-http-parent - 2.1.3-SNAPSHOT + 2.2.0-SNAPSHOT Siddhi IO Extension - HTTP IO Aggregator http://wso2.org diff --git a/tests/distribution/pom.xml b/tests/distribution/pom.xml index 81487a70..feec950e 100644 --- a/tests/distribution/pom.xml +++ b/tests/distribution/pom.xml @@ -18,7 +18,7 @@ siddhi-io-http-tests io.siddhi.extension.io.http - 2.1.3-SNAPSHOT + 2.2.0-SNAPSHOT ../pom.xml diff --git a/tests/osgi-tests/pom.xml b/tests/osgi-tests/pom.xml index ddd52c5a..2fd83434 100644 --- a/tests/osgi-tests/pom.xml +++ b/tests/osgi-tests/pom.xml @@ -19,7 +19,7 @@ siddhi-io-http-tests io.siddhi.extension.io.http - 2.1.3-SNAPSHOT + 2.2.0-SNAPSHOT ../pom.xml diff --git a/tests/pom.xml b/tests/pom.xml index f283a1a4..5e47f5db 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -18,7 +18,7 @@ siddhi-io-http-parent io.siddhi.extension.io.http - 2.1.3-SNAPSHOT + 2.2.0-SNAPSHOT ../pom.xml