diff --git a/modules/siddhi-core/pom.xml b/modules/siddhi-core/pom.xml
index b467fdf915..1ec315ca1e 100644
--- a/modules/siddhi-core/pom.xml
+++ b/modules/siddhi-core/pom.xml
@@ -44,11 +44,11 @@
siddhi-annotations
- org.apache.log4j.wso2
+ log4j
log4j
- org.wso2.orbit.com.lmax
+ com.lmax
disruptor
@@ -77,14 +77,6 @@
com.google.code.gson
gson
-
- org.eclipse.osgi
- org.eclipse.osgi.services
-
-
- org.osgi
- org.osgi.core
-
org.atteo.classindex
classindex
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiAppContext.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiAppContext.java
index f1976f4952..4a38270195 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiAppContext.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/config/SiddhiAppContext.java
@@ -38,6 +38,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
/**
* Holder object for context information of {@link org.wso2.siddhi.query.api.SiddhiApp}.
@@ -50,6 +51,7 @@ public class SiddhiAppContext {
private boolean enforceOrder;
private StatisticsManager statisticsManager = null;
+ private ThreadFactory executorThreadFactory;
private ExecutorService executorService;
private ScheduledExecutorService scheduledExecutorService;
private List eternalReferencedHolders;
@@ -139,6 +141,14 @@ public void setThreadBarrier(ThreadBarrier threadBarrier) {
this.threadBarrier = threadBarrier;
}
+ public ThreadFactory getExecutorThreadFactory() {
+ return this.executorThreadFactory;
+ }
+
+ public void setExecutorThreadFactory(ThreadFactory threadFactory) {
+ this.executorThreadFactory = threadFactory;
+ }
+
public ExecutorService getExecutorService() {
return executorService;
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionRuntime.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionRuntime.java
index 75515d5ddc..447ca48aea 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionRuntime.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionRuntime.java
@@ -135,7 +135,7 @@ public QueryRuntime addQuery(QueryRuntime metaQueryRuntime) {
if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(streamDefinition,
- siddhiAppContext.getExecutorService(),
+ siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(),
null, siddhiAppContext);
localStreamJunctionMap.putIfAbsent(id, outputStreamJunction);
@@ -148,7 +148,7 @@ public QueryRuntime addQuery(QueryRuntime metaQueryRuntime) {
if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(streamDefinition,
- siddhiAppContext.getExecutorService(),
+ siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(),
null, siddhiAppContext);
streamJunctionMap.putIfAbsent(id, outputStreamJunction);
@@ -166,7 +166,7 @@ public QueryRuntime addQuery(QueryRuntime metaQueryRuntime) {
if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(streamDefinition,
- siddhiAppContext.getExecutorService(),
+ siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(),
null, siddhiAppContext);
streamJunctionMap.putIfAbsent(id, outputStreamJunction);
@@ -291,7 +291,7 @@ private synchronized void clonePartition(String key) {
StreamJunction streamJunction = localStreamJunctionMap.get(streamId + key);
if (streamJunction == null) {
streamJunction = new StreamJunction(streamDefinition, siddhiAppContext
- .getExecutorService(),
+ .getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(),
null, siddhiAppContext);
localStreamJunctionMap.put(streamId + key, streamJunction);
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionStreamReceiver.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionStreamReceiver.java
index 586aff5f31..6ea43065ab 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionStreamReceiver.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/partition/PartitionStreamReceiver.java
@@ -310,7 +310,7 @@ public void addStreamJunction(String key, List queryRuntimeList) {
}
private StreamJunction createStreamJunction() {
- return new StreamJunction(streamDefinition, siddhiAppContext.getExecutorService(),
+ return new StreamJunction(streamDefinition, siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(), null, siddhiAppContext);
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/stream/StreamJunction.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/stream/StreamJunction.java
index 494a19a163..693f720835 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/stream/StreamJunction.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/stream/StreamJunction.java
@@ -52,7 +52,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
/**
* Stream Junction is the place where streams are collected and distributed. There will be an Stream Junction per
@@ -69,7 +69,7 @@ public class StreamJunction implements EventBufferHolder {
private int bufferSize;
private List receivers = new CopyOnWriteArrayList();
private List publishers = Collections.synchronizedList(new LinkedList<>());
- private ExecutorService executorService;
+ private ThreadFactory threadFactory;
private boolean async = false;
private Disruptor disruptor;
private RingBuffer ringBuffer;
@@ -80,12 +80,12 @@ public class StreamJunction implements EventBufferHolder {
private OnErrorAction onErrorAction = OnErrorAction.LOG;
private ExceptionListener exceptionListener;
- public StreamJunction(StreamDefinition streamDefinition, ExecutorService executorService, int bufferSize,
+ public StreamJunction(StreamDefinition streamDefinition, ThreadFactory threadFactory, int bufferSize,
StreamJunction faultStreamJunction, SiddhiAppContext siddhiAppContext) {
this.streamDefinition = streamDefinition;
this.bufferSize = bufferSize;
this.batchSize = bufferSize;
- this.executorService = executorService;
+ this.threadFactory = threadFactory;
this.siddhiAppContext = siddhiAppContext;
if (siddhiAppContext.getStatisticsManager() != null) {
this.throughputTracker = QueryParserHelper.createThroughputTracker(siddhiAppContext,
@@ -285,7 +285,7 @@ public synchronized void startProcessing() {
ProducerType producerType = ProducerType.MULTI;
disruptor = new Disruptor(
new EventExchangeHolderFactory(streamDefinition.getAttributeList().size()),
- bufferSize, executorService, producerType,
+ bufferSize, threadFactory, producerType,
new BlockingWaitStrategy());
disruptor.handleExceptionsWith(siddhiAppContext.getDisruptorExceptionHandler());
break;
@@ -294,7 +294,7 @@ public synchronized void startProcessing() {
if (disruptor == null) {
disruptor = new Disruptor(
new EventExchangeHolderFactory(streamDefinition.getAttributeList().size()),
- bufferSize, executorService);
+ bufferSize, threadFactory);
disruptor.handleExceptionsWith(siddhiAppContext.getDisruptorExceptionHandler());
}
if (workers > 0) {
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/ReferenceHolder.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/ReferenceHolder.java
deleted file mode 100644
index 0b62dbd031..0000000000
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/ReferenceHolder.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * WSO2 Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.wso2.siddhi.core.util;
-
-import org.osgi.framework.BundleContext;
-
-/**
- * Class to hold references need by siddhi bundle.
- */
-public class ReferenceHolder {
- private BundleContext bundleContext; //bundle context for siddhi bundle.
- private static final ReferenceHolder instance = new ReferenceHolder();
-
- private ReferenceHolder() {
- //empty constructor to facilitate singleton.
- }
-
- public static ReferenceHolder getInstance() {
- return instance;
- }
-
- public BundleContext getBundleContext() {
- return bundleContext;
- }
-
- public void setBundleContext(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
- }
-}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiAppRuntimeBuilder.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiAppRuntimeBuilder.java
index 9fc47d4de3..9f5e2dc152 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiAppRuntimeBuilder.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiAppRuntimeBuilder.java
@@ -201,7 +201,7 @@ public String addQuery(QueryRuntime queryRuntime) {
if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(streamDefinition,
- siddhiAppContext.getExecutorService(),
+ siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(), null, siddhiAppContext);
streamJunctionMap.putIfAbsent(streamDefinition.getId(), outputStreamJunction);
}
@@ -217,7 +217,7 @@ public String addQuery(QueryRuntime queryRuntime) {
if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(streamDefinition,
- siddhiAppContext.getExecutorService(),
+ siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(), null, siddhiAppContext);
streamJunctionMap.putIfAbsent(streamDefinition.getId(), outputStreamJunction);
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiExtensionLoader.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiExtensionLoader.java
index 1c02262243..208315fd3f 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiExtensionLoader.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiExtensionLoader.java
@@ -20,11 +20,6 @@
import org.apache.log4j.Logger;
import org.atteo.classindex.ClassIndex;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.BundleEvent;
-import org.osgi.framework.BundleListener;
-import org.osgi.framework.wiring.BundleWiring;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.core.executor.incremental.IncrementalAggregateBaseTimeFunctionExecutor;
import org.wso2.siddhi.core.executor.incremental.IncrementalShouldUpdateFunctionExecutor;
@@ -32,7 +27,6 @@
import org.wso2.siddhi.core.executor.incremental.IncrementalTimeGetTimeZone;
import org.wso2.siddhi.core.executor.incremental.IncrementalUnixTimeFunctionExecutor;
-import java.util.HashMap;
import java.util.Map;
/**
@@ -49,22 +43,6 @@ public class SiddhiExtensionLoader {
*/
public static void loadSiddhiExtensions(Map siddhiExtensionsMap) {
loadLocalExtensions(siddhiExtensionsMap);
- BundleContext bundleContext = ReferenceHolder.getInstance().getBundleContext();
- if (bundleContext != null) {
- loadExtensionOSGI(bundleContext, siddhiExtensionsMap);
- }
- }
-
- /**
- * Load Extensions in OSGi environment.
- *
- * @param bundleContext OSGi bundleContext
- * @param siddhiExtensionsMap reference map for the Siddhi extension
- */
- private static void loadExtensionOSGI(BundleContext bundleContext, Map siddhiExtensionsMap) {
- ExtensionBundleListener extensionBundleListener = new ExtensionBundleListener(siddhiExtensionsMap);
- bundleContext.addBundleListener(extensionBundleListener);
- extensionBundleListener.loadAllExtensions(bundleContext);
}
/**
@@ -151,52 +129,4 @@ private static void addExtensionToMap(String fqExtensionName, Class extensionCla
"loaded with the same namespace and name '" + fqExtensionName + "'");
}
}
-
- /**
- * Class to listen to Bundle changes to update available extensions.
- */
- private static class ExtensionBundleListener implements BundleListener {
-
- private Map bundleExtensions = new HashMap();
- private Map siddhiExtensionsMap;
-
- ExtensionBundleListener(Map siddhiExtensionsMap) {
- this.siddhiExtensionsMap = siddhiExtensionsMap;
- }
-
- @Override
- public void bundleChanged(BundleEvent bundleEvent) {
- if (bundleEvent.getType() == BundleEvent.STARTED) {
- addExtensions(bundleEvent.getBundle());
- } else {
- removeExtensions(bundleEvent.getBundle());
- }
- }
-
- private void addExtensions(Bundle bundle) {
- ClassLoader classLoader = bundle.adapt(BundleWiring.class).getClassLoader();
- Iterable> extensions = ClassIndex.getAnnotated(Extension.class, classLoader);
- for (Class extension : extensions) {
- addExtensionToMap(extension, siddhiExtensionsMap);
- bundleExtensions.put(extension, (int) bundle.getBundleId());
- }
- }
-
- private void removeExtensions(Bundle bundle) {
- bundleExtensions.entrySet().stream().filter(entry -> entry.getValue() ==
- bundle.getBundleId()).forEachOrdered(entry -> {
- siddhiExtensionsMap.remove(entry.getKey());
- });
- bundleExtensions.entrySet().removeIf(entry -> entry.getValue() ==
- bundle.getBundleId());
- }
-
- void loadAllExtensions(BundleContext bundleContext) {
- for (Bundle b : bundleContext.getBundles()) {
- if (b.getState() == Bundle.ACTIVE) {
- addExtensions(b);
- }
- }
- }
- }
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiManagerComponent.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiManagerComponent.java
deleted file mode 100644
index dee5756456..0000000000
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/SiddhiManagerComponent.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * WSO2 Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.wso2.siddhi.core.util;
-
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-
-import java.util.HashMap;
-
-/**
- * Siddhi Manager Service which is
- *
- * @since 4.0.0-M3-SNAPSHOT
- */
-@Component(
- immediate = true
-)
-public class SiddhiManagerComponent {
- private ServiceRegistration serviceRegistration;
-
- /**
- * This is the activation method of SiddhiManagerService. This will be initilize the Siddhi Manager and register the
- * ManagerService.
- *
- * @param bundleContext the bundle context instance of this bundle.
- * @throws Exception this will be thrown if an issue occurs while executing the activate method
- */
- @Activate
- protected void start(BundleContext bundleContext) throws Exception {
- ReferenceHolder.getInstance().setBundleContext(bundleContext);
- SiddhiExtensionLoader.loadSiddhiExtensions(new HashMap<>());
- serviceRegistration = bundleContext.registerService(SiddhiComponentActivator.class.getName(),
- new SiddhiComponentActivator(), null);
- }
-
- protected void stop() throws Exception {
- ReferenceHolder.getInstance().setBundleContext(null);
- serviceRegistration.unregister();
- }
-}
-
-
-
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/OutputParser.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/OutputParser.java
index b581a73382..efaaa2b678 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/OutputParser.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/OutputParser.java
@@ -250,7 +250,7 @@ public static OutputCallback constructOutputCallback(OutputStream outStream, Str
StreamJunction outputStreamJunction = streamJunctionMap.get(id + key);
if (outputStreamJunction == null) {
outputStreamJunction = new StreamJunction(outputStreamDefinition,
- siddhiAppContext.getExecutorService(),
+ siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(), null, siddhiAppContext);
streamJunctionMap.putIfAbsent(id + key, outputStreamJunction);
}
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/SiddhiAppParser.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/SiddhiAppParser.java
index 47ea4f0192..86031254b5 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/SiddhiAppParser.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/SiddhiAppParser.java
@@ -59,6 +59,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import static org.wso2.siddhi.core.util.parser.helper.AnnotationHelper.generateIncludedMetrics;
@@ -156,9 +157,12 @@ public static SiddhiAppRuntimeBuilder parse(SiddhiApp siddhiApp, String siddhiAp
siddhiAppContext.setThreadBarrier(new ThreadBarrier());
- siddhiAppContext.setExecutorService(Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("Siddhi-" + siddhiAppContext.getName() +
- "-executor-thread-%d").build()));
+ final ThreadFactory executorThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("Siddhi-" + siddhiAppContext.getName() + "-executor-thread-%d")
+ .build();
+
+ siddhiAppContext.setExecutorThreadFactory(executorThreadFactory);
+ siddhiAppContext.setExecutorService(Executors.newCachedThreadPool(executorThreadFactory));
siddhiAppContext.setScheduledExecutorService(Executors.newScheduledThreadPool(5,
new ThreadFactoryBuilder().setNameFormat("Siddhi-" +
diff --git a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/helper/DefinitionParserHelper.java b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/helper/DefinitionParserHelper.java
index 3b9caef55d..8a85c6d8bc 100644
--- a/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/helper/DefinitionParserHelper.java
+++ b/modules/siddhi-core/src/main/java/org/wso2/siddhi/core/util/parser/helper/DefinitionParserHelper.java
@@ -140,7 +140,7 @@ public static void addStreamJunction(StreamDefinition streamDefinition,
StreamJunction faultStreamJunction = streamJunctionMap.get(SiddhiConstants.FAULT_STREAM_PREFIX.
concat(streamDefinition.getId()));
StreamJunction streamJunction = new StreamJunction(streamDefinition,
- siddhiAppContext.getExecutorService(),
+ siddhiAppContext.getExecutorThreadFactory(),
siddhiAppContext.getBufferSize(), faultStreamJunction, siddhiAppContext);
streamJunctionMap.putIfAbsent(streamDefinition.getId(), streamJunction);
}
diff --git a/modules/siddhi-core/src/test/java/org/wso2/siddhi/core/stream/JunctionTestCase.java b/modules/siddhi-core/src/test/java/org/wso2/siddhi/core/stream/JunctionTestCase.java
index d880f4cca4..77237dfd2d 100644
--- a/modules/siddhi-core/src/test/java/org/wso2/siddhi/core/stream/JunctionTestCase.java
+++ b/modules/siddhi-core/src/test/java/org/wso2/siddhi/core/stream/JunctionTestCase.java
@@ -31,21 +31,21 @@
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
public class JunctionTestCase {
private static final Logger log = Logger.getLogger(JunctionTestCase.class);
private int count;
private boolean eventArrived;
- private ExecutorService executorService;
+ private ThreadFactory threadFactory;
private SiddhiAppContext siddhiAppContext;
@BeforeMethod
public void init() {
count = 0;
eventArrived = false;
- executorService = Executors.newCachedThreadPool();
+ threadFactory = Executors.defaultThreadFactory();
SiddhiContext siddhiContext = new SiddhiContext();
siddhiAppContext = new SiddhiAppContext();
siddhiAppContext.setSiddhiContext(siddhiContext);
@@ -59,7 +59,7 @@ public void junctionToReceiverTest() throws InterruptedException {
StreamDefinition streamA = StreamDefinition.id("streamA").attribute("symbol", Attribute.Type.STRING)
.attribute("price", Attribute.Type.INT).
annotation(Annotation.annotation("parallel"));
- StreamJunction streamJunctionA = new StreamJunction(streamA, executorService, 1024,
+ StreamJunction streamJunctionA = new StreamJunction(streamA, threadFactory, 1024,
null, siddhiAppContext);
StreamJunction.Publisher streamPublisherA = streamJunctionA.constructPublisher();
@@ -91,7 +91,7 @@ public void oneToOneTest() throws InterruptedException {
.attribute("price", Attribute.Type.INT).
annotation(Annotation.annotation("parallel"));
- StreamJunction streamJunctionA = new StreamJunction(streamA, executorService, 1024,
+ StreamJunction streamJunctionA = new StreamJunction(streamA, threadFactory, 1024,
null, siddhiAppContext);
StreamJunction.Publisher streamPublisherA = streamJunctionA.constructPublisher();
@@ -99,7 +99,7 @@ public void oneToOneTest() throws InterruptedException {
.attribute("price", Attribute.Type.INT).
annotation(Annotation.annotation("parallel"));
- StreamJunction streamJunctionB = new StreamJunction(streamB, executorService, 1024,
+ StreamJunction streamJunctionB = new StreamJunction(streamB, threadFactory, 1024,
null, siddhiAppContext);
final StreamJunction.Publisher streamPublisherB = streamJunctionB.constructPublisher();
@@ -162,14 +162,14 @@ public void multiThreadedTest1() throws InterruptedException {
StreamDefinition streamA = StreamDefinition.id("streamA").attribute("symbol", Attribute.Type.STRING)
.attribute("price", Attribute.Type.INT).
annotation(Annotation.annotation("async"));
- StreamJunction streamJunctionA = new StreamJunction(streamA, executorService, 1024,
+ StreamJunction streamJunctionA = new StreamJunction(streamA, threadFactory, 1024,
null, siddhiAppContext);
StreamJunction.Publisher streamPublisherA = streamJunctionA.constructPublisher();
StreamDefinition streamB = StreamDefinition.id("streamB").attribute("symbol", Attribute.Type.STRING)
.attribute("price", Attribute.Type.INT).
annotation(Annotation.annotation("async"));
- StreamJunction streamJunctionB = new StreamJunction(streamB, executorService, 1024,
+ StreamJunction streamJunctionB = new StreamJunction(streamB, threadFactory, 1024,
null, siddhiAppContext);
final StreamJunction.Publisher streamPublisherB1 = streamJunctionB.constructPublisher();
final StreamJunction.Publisher streamPublisherB2 = streamJunctionB.constructPublisher();
@@ -259,14 +259,14 @@ public void multiThreadedTest2() throws InterruptedException {
StreamDefinition streamA = StreamDefinition.id("streamA").attribute("symbol", Attribute.Type.STRING)
.attribute("price", Attribute.Type.INT).
annotation(Annotation.annotation("async"));
- StreamJunction streamJunctionA = new StreamJunction(streamA, executorService, 1024,
+ StreamJunction streamJunctionA = new StreamJunction(streamA, threadFactory, 1024,
null, siddhiAppContext);
StreamJunction.Publisher streamPublisherA = streamJunctionA.constructPublisher();
StreamDefinition streamB = StreamDefinition.id("streamB").attribute("symbol", Attribute.Type.STRING)
.attribute("price", Attribute.Type.INT).
annotation(Annotation.annotation("async"));
- StreamJunction streamJunctionB = new StreamJunction(streamB, executorService, 1024,
+ StreamJunction streamJunctionB = new StreamJunction(streamB, threadFactory, 1024,
null, siddhiAppContext);
final StreamJunction.Publisher streamPublisherB1 = streamJunctionB.constructPublisher();
final StreamJunction.Publisher streamPublisherB2 = streamJunctionB.constructPublisher();
@@ -275,7 +275,7 @@ public void multiThreadedTest2() throws InterruptedException {
StreamDefinition streamC = StreamDefinition.id("streamC").attribute("symbol", Attribute.Type.STRING)
.attribute("price", Attribute.Type.INT).
annotation(Annotation.annotation("async"));
- StreamJunction streamJunctionC = new StreamJunction(streamC, executorService, 1024,
+ StreamJunction streamJunctionC = new StreamJunction(streamC, threadFactory, 1024,
null, siddhiAppContext);
final StreamJunction.Publisher streamPublisherC1 = streamJunctionC.constructPublisher();
final StreamJunction.Publisher streamPublisherC2 = streamJunctionC.constructPublisher();
@@ -440,14 +440,14 @@ public void multiThreadedWithEventPoolTest() throws InterruptedException {
StreamDefinition streamA = StreamDefinition.id("streamA").attribute("symbol", Attribute.Type.STRING)
.attribute("price", Attribute.Type.INT).
annotation(Annotation.annotation("async"));
- StreamJunction streamJunctionA = new StreamJunction(streamA, executorService, 1024,
+ StreamJunction streamJunctionA = new StreamJunction(streamA, threadFactory, 1024,
null, siddhiAppContext);
StreamJunction.Publisher streamPublisherA = streamJunctionA.constructPublisher();
StreamDefinition streamB = StreamDefinition.id("streamB").attribute("symbol", Attribute.Type.STRING)
.attribute("price", Attribute.Type.INT).
annotation(Annotation.annotation("async"));
- StreamJunction streamJunctionB = new StreamJunction(streamB, executorService, 1024,
+ StreamJunction streamJunctionB = new StreamJunction(streamB, threadFactory, 1024,
null, siddhiAppContext);
final StreamJunction.Publisher streamPublisherB1 = streamJunctionB.constructPublisher();
final StreamJunction.Publisher streamPublisherB2 = streamJunctionB.constructPublisher();
@@ -456,7 +456,7 @@ public void multiThreadedWithEventPoolTest() throws InterruptedException {
StreamDefinition streamC = StreamDefinition.id("streamC").attribute("symbol", Attribute.Type.STRING)
.attribute("price", Attribute.Type.INT).
annotation(Annotation.annotation("async"));
- StreamJunction streamJunctionC = new StreamJunction(streamC, executorService, 1024,
+ StreamJunction streamJunctionC = new StreamJunction(streamC, threadFactory, 1024,
null, siddhiAppContext);
final StreamJunction.Publisher streamPublisherC1 = streamJunctionC.constructPublisher();
final StreamJunction.Publisher streamPublisherC2 = streamJunctionC.constructPublisher();
diff --git a/pom.xml b/pom.xml
index 3e8cfe9ace..c2501d306f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
- org.apache.log4j.wso2
+ log4j
log4j
${log4j.version}
@@ -81,12 +81,6 @@
${testng.version}
test
-
- org.webjars
- jquery
- 3.7.1
- test
-
junit
junit
@@ -99,7 +93,7 @@
${antlr.runtime.version}
- org.wso2.orbit.com.lmax
+ com.lmax
disruptor
${disruptor.version}
@@ -131,18 +125,6 @@
${metrics.version}
-
-
- org.osgi
- org.osgi.core
- ${org.osgi.core.version}
-
-
- org.eclipse.osgi
- org.eclipse.osgi.services
- ${version.equinox.osgi.services}
-
-
org.graylog.repackaged.siddhi
@@ -378,14 +360,12 @@
7.8.0
4.13.2
- 1.2.17.wso2v1
+ 1.2.17
4.13.1
- 3.4.2.wso2v1
+ 4.0.0
32.1.3-jre
2.10.1
3.13
- 6.0.0
- 3.3.100.v20130513-1956
4.2.22
@@ -396,60 +376,6 @@
v4.5.11
-
-
- wso2-nexus
- WSO2 internal Repository
- https://maven.wso2.org/nexus/content/groups/wso2-public/
-
- true
- daily
- ignore
-
-
-
-
- wso2.releases
- WSO2 internal Repository
- https://maven.wso2.org/nexus/content/repositories/releases/
-
- true
- daily
- ignore
-
-
-
-
- wso2.snapshots
- Apache Snapshot Repository
- https://maven.wso2.org/nexus/content/repositories/snapshots/
-
- true
- daily
-
-
- false
-
-
-
-
- central
- Maven Repository Switchboard
- default
- https://repo1.maven.org/maven2
-
- false
-
-
-
-
-
- sonatype.releases
- https://oss.sonatype.org/content/repositories/releases/
-
-
-
-
sonatype-nexus-snapshots
@@ -463,43 +389,4 @@
-
-
-
- wso2.releases
- WSO2 internal Repository
- https://maven.wso2.org/nexus/content/repositories/releases/
-
- true
- daily
- ignore
-
-
-
-
- wso2.snapshots
- Apache Snapshot Repository
- https://maven.wso2.org/nexus/content/repositories/snapshots/
-
- true
- daily
-
-
- false
-
-
-
-
- wso2-nexus
- WSO2 internal Repository
- https://maven.wso2.org/nexus/content/groups/wso2-public/
-
- true
- daily
- ignore
-
-
-
-
-