Skip to content

Commit

Permalink
Merge pull request #18 from hchepey-clari/MaxPollCountConfigurable
Browse files Browse the repository at this point in the history
Make MaxPollCount Configurable.
  • Loading branch information
v1r3n authored Aug 15, 2024
2 parents 9665771 + bbd168b commit 76a829b
Show file tree
Hide file tree
Showing 14 changed files with 133 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.dao.QueueDAO;

import io.orkes.conductor.mq.redis.QueueMonitorProperties;
import io.orkes.conductor.queue.dao.ClusteredRedisQueueDAO;
import io.orkes.conductor.queue.dao.RedisQueueDAO;

Expand All @@ -50,9 +51,11 @@ public QueueDAO getQueueDAOStandalone(
JedisPool jedisPool,
MeterRegistry registry,
QueueRedisProperties queueRedisProperties,
ConductorProperties properties) {
ConductorProperties properties,
QueueMonitorProperties queueMonitorProperties) {
log.info("getQueueDAOStandalone init");
return new RedisQueueDAO(registry, jedisPool, queueRedisProperties, properties);
return new RedisQueueDAO(
registry, jedisPool, queueRedisProperties, properties, queueMonitorProperties);
}

@Bean
Expand All @@ -62,8 +65,14 @@ public QueueDAO getQueueDAOSentinel(
JedisSentinelPool jedisSentinelPool,
MeterRegistry registry,
QueueRedisProperties queueRedisProperties,
ConductorProperties properties) {
return new RedisQueueDAO(registry, jedisSentinelPool, queueRedisProperties, properties);
ConductorProperties properties,
QueueMonitorProperties queueMonitorProperties) {
return new RedisQueueDAO(
registry,
jedisSentinelPool,
queueRedisProperties,
properties,
queueMonitorProperties);
}

@Bean
Expand All @@ -73,8 +82,10 @@ public QueueDAO getQueueDAOCluster(
JedisCluster jedisCluster,
MeterRegistry registry,
QueueRedisProperties queueRedisProperties,
ConductorProperties properties) {
return new ClusteredRedisQueueDAO(registry, jedisCluster, queueRedisProperties, properties);
ConductorProperties properties,
QueueMonitorProperties queueMonitorProperties) {
return new ClusteredRedisQueueDAO(
registry, jedisCluster, queueRedisProperties, properties, queueMonitorProperties);
}

@Bean
Expand All @@ -90,7 +101,7 @@ protected JedisPool getJedisPoolStandalone(QueueRedisProperties redisProperties)
redisProperties.isSsl());
Host host = hostSupplier.getHosts().get(0);

if (redisProperties.getUsername() != null && host.getPassword() != null) {
if (redisProperties.getUsername() != null && host.getPassword() != null) {
log.info("Connecting to Redis Standalone with ACL user AUTH");
return new JedisPool(
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.netflix.conductor.dao.QueueDAO;

import io.orkes.conductor.mq.ConductorQueue;
import io.orkes.conductor.mq.redis.QueueMonitorProperties;
import io.orkes.conductor.mq.redis.cluster.ConductorRedisClusterQueue;
import io.orkes.conductor.queue.config.QueueRedisProperties;

Expand All @@ -30,21 +31,26 @@ public class ClusteredRedisQueueDAO extends BaseRedisQueueDAO implements QueueDA

private final MeterRegistry registry;

private final QueueMonitorProperties queueMonitorProperties;

public ClusteredRedisQueueDAO(
MeterRegistry registry,
JedisCluster jedisCluster,
QueueRedisProperties queueRedisProperties,
ConductorProperties conductorProperties) {
ConductorProperties conductorProperties,
QueueMonitorProperties queueMonitorProperties) {

super(queueRedisProperties, conductorProperties);
this.registry = registry;
this.jedisCluster = jedisCluster;
this.queueMonitorProperties = queueMonitorProperties;
log.info("Queues initialized using {}", ClusteredRedisQueueDAO.class.getName());
}

@Override
protected ConductorQueue getConductorQueue(String queueKey) {
ConductorRedisClusterQueue queue = new ConductorRedisClusterQueue(queueKey, jedisCluster);
ConductorRedisClusterQueue queue =
new ConductorRedisClusterQueue(queueKey, jedisCluster, queueMonitorProperties);
return queue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.netflix.conductor.dao.QueueDAO;

import io.orkes.conductor.mq.ConductorQueue;
import io.orkes.conductor.mq.redis.QueueMonitorProperties;
import io.orkes.conductor.mq.redis.single.ConductorRedisQueue;
import io.orkes.conductor.queue.config.QueueRedisProperties;

Expand All @@ -30,20 +31,24 @@ public class RedisQueueDAO extends BaseRedisQueueDAO implements QueueDAO {

private final MeterRegistry registry;

private final QueueMonitorProperties queueMonitorProperties;

public RedisQueueDAO(
MeterRegistry registry,
JedisPoolAbstract jedisPool,
QueueRedisProperties queueRedisProperties,
ConductorProperties conductorProperties) {
ConductorProperties conductorProperties,
QueueMonitorProperties queueMonitorProperties) {

super(queueRedisProperties, conductorProperties);
this.registry = registry;
this.jedisPool = jedisPool;
this.queueMonitorProperties = queueMonitorProperties;
log.info("Queues initialized using {}", RedisQueueDAO.class.getName());
}

@Override
protected ConductorQueue getConductorQueue(String queueKey) {
return new ConductorRedisQueue(queueKey, jedisPool);
return new ConductorRedisQueue(queueKey, jedisPool, queueMonitorProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.netflix.conductor.core.config.ConductorProperties;

import io.orkes.conductor.mq.redis.QueueMonitorProperties;
import io.orkes.conductor.queue.config.QueueRedisProperties;

import com.google.common.util.concurrent.Uninterruptibles;
Expand Down Expand Up @@ -60,8 +61,13 @@ public static void setUp() {
JedisCluster jedisCluster = new JedisCluster(hostAndPorts);
ConductorProperties properties = new ConductorProperties();
QueueRedisProperties queueRedisProperties = new QueueRedisProperties(properties);
QueueMonitorProperties queueMonitorProperties = new QueueMonitorProperties();
redisQueue =
new ClusteredRedisQueueDAO(
registry, jedisCluster, queueRedisProperties, properties);
registry,
jedisCluster,
queueRedisProperties,
properties,
queueMonitorProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.netflix.conductor.core.config.ConductorProperties;

import io.orkes.conductor.mq.redis.QueueMonitorProperties;
import io.orkes.conductor.queue.config.QueueRedisProperties;

import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
Expand Down Expand Up @@ -50,8 +51,13 @@ public static void setUp() {
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
ConductorProperties conductorProperties = new ConductorProperties();
QueueRedisProperties queueRedisProperties = new QueueRedisProperties(conductorProperties);
QueueMonitorProperties queueMonitorProperties = new QueueMonitorProperties();
redisQueue =
new RedisQueueDAO(
meterRegistry, jedisPool, queueRedisProperties, conductorProperties);
meterRegistry,
jedisPool,
queueRedisProperties,
conductorProperties,
queueMonitorProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.queue.Message;

import io.orkes.conductor.mq.redis.QueueMonitorProperties;
import io.orkes.conductor.queue.config.QueueRedisProperties;
import io.orkes.conductor.queue.dao.BaseRedisQueueDAO;
import io.orkes.conductor.queue.dao.ClusteredRedisQueueDAO;
Expand Down Expand Up @@ -146,9 +147,14 @@ private static void benchmarkRedisCluster() {
JedisCluster jedisCluster = new JedisCluster(hostAndPorts);
ConductorProperties properties = new ConductorProperties();
QueueRedisProperties queueRedisProperties = new QueueRedisProperties(properties);
QueueMonitorProperties queueMonitorProperties = new QueueMonitorProperties();
ClusteredRedisQueueDAO clusteredRedisQueue =
new ClusteredRedisQueueDAO(
registry, jedisCluster, queueRedisProperties, properties);
registry,
jedisCluster,
queueRedisProperties,
properties,
queueMonitorProperties);

RedisQueueDAOBenchmark benchmark = new RedisQueueDAOBenchmark(clusteredRedisQueue);
benchmark.runBenchmark();
Expand All @@ -166,14 +172,16 @@ private static void benchmarkRedisStandalone() {
config.setMaxTotal(20);
JedisPool jedisPool = new JedisPool(config, "localhost", 6379);
ConductorProperties properties = new ConductorProperties();
QueueMonitorProperties queueMonitorProperties = new QueueMonitorProperties();

RedisQueueDAOBenchmark benchmark =
new RedisQueueDAOBenchmark(
new RedisQueueDAO(
registry,
jedisPool,
new QueueRedisProperties(properties),
properties));
properties,
queueMonitorProperties));

benchmark.runBenchmark();

Expand Down
1 change: 1 addition & 0 deletions orkes-redis-queues/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
dependencies {

implementation "redis.clients:jedis:${versions.revJedis}"
implementation 'org.springframework.boot:spring-boot-starter'

//Guava
implementation "com.google.guava:guava:${versions.revGuava}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,21 @@ public abstract class QueueMonitor {

private int maxPollCount = 100;

public void setMaxPollCount(int maxPollCount) {
this.maxPollCount = maxPollCount;
}

public int getMaxPollCount() {
return maxPollCount;
}

public QueueMonitor(String queueName) {
this.queueName = queueName;
this.clock = Clock.systemDefaultZone();
this.peekedMessages = new LinkedBlockingQueue<>();
this.executorService =
new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(maxPollCount));
1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(getMaxPollCount()));
}

public List<QueueMessage> pop(int count, int waitTime, TimeUnit timeUnit) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Orkes, Inc.
* <p>
* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.orkes.conductor.mq.redis;

import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@ConfigurationProperties("conductor.queue")
@Component
@AutoConfiguration
@EnableAutoConfiguration
public class QueueMonitorProperties {
private int maxPollCount = 100;

public int getMaxPollCount() {
return maxPollCount;
}

public void setMaxPollCount(int maxPollCount) {
this.maxPollCount = maxPollCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;

import io.orkes.conductor.mq.redis.QueueMonitor;
import io.orkes.conductor.mq.redis.QueueMonitorProperties;

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.JedisCluster;
Expand All @@ -32,11 +33,15 @@ public class ClusteredQueueMonitor extends QueueMonitor {

private final String queueName;

public ClusteredQueueMonitor(JedisCluster jedisCluster, String queueName) {
public ClusteredQueueMonitor(
JedisCluster jedisCluster,
String queueName,
QueueMonitorProperties queueMonitorProperties) {
super(queueName);
this.queueName = queueName;
this.jedisCluster = jedisCluster;
this.scriptSha = loadScript();
this.setMaxPollCount(queueMonitorProperties.getMaxPollCount());
}

private String loadScript() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import io.orkes.conductor.mq.ConductorQueue;
import io.orkes.conductor.mq.QueueMessage;
import io.orkes.conductor.mq.redis.QueueMonitorProperties;

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.JedisCluster;
Expand All @@ -45,12 +46,16 @@ public class ConductorRedisClusterQueue implements ConductorQueue {

private final ClusteredQueueMonitor queueMonitor;

public ConductorRedisClusterQueue(String queueName, JedisCluster jedisCluster) {
public ConductorRedisClusterQueue(
String queueName,
JedisCluster jedisCluster,
QueueMonitorProperties queueMonitorProperties) {
this.jedis = jedisCluster;
this.clock = Clock.systemDefaultZone();
this.queueName = queueName;
this.payloadKey = queueName + "_payload";
this.queueMonitor = new ClusteredQueueMonitor(jedisCluster, queueName);
this.queueMonitor =
new ClusteredQueueMonitor(jedisCluster, queueName, queueMonitorProperties);

log.info("ConductorRedisClusterQueue started serving {}", queueName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.orkes.conductor.mq.ConductorQueue;
import io.orkes.conductor.mq.QueueMessage;
import io.orkes.conductor.mq.redis.QueueMonitor;
import io.orkes.conductor.mq.redis.QueueMonitorProperties;

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;
Expand All @@ -42,12 +43,15 @@ public class ConductorRedisQueue implements ConductorQueue {

private final QueueMonitor queueMonitor;

public ConductorRedisQueue(String queueName, JedisPoolAbstract jedisPool) {
public ConductorRedisQueue(
String queueName,
JedisPoolAbstract jedisPool,
QueueMonitorProperties queueMonitorProperties) {
this.jedisPool = jedisPool;
this.clock = Clock.systemDefaultZone();
this.queueName = queueName;
this.payloadKey = queueName + "_payload";
this.queueMonitor = new RedisQueueMonitor(jedisPool, queueName);
this.queueMonitor = new RedisQueueMonitor(jedisPool, queueName, queueMonitorProperties);
log.info("ConductorRedisQueue started serving {}", queueName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.List;

import io.orkes.conductor.mq.redis.QueueMonitor;
import io.orkes.conductor.mq.redis.QueueMonitorProperties;

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;
Expand All @@ -32,11 +33,15 @@ public class RedisQueueMonitor extends QueueMonitor {

private final String scriptSha;

public RedisQueueMonitor(JedisPoolAbstract jedisPool, String queueName) {
public RedisQueueMonitor(
JedisPoolAbstract jedisPool,
String queueName,
QueueMonitorProperties queueMonitorProperties) {
super(queueName);
this.jedisPool = jedisPool;
this.queueName = queueName;
this.scriptSha = loadScript();
this.setMaxPollCount(queueMonitorProperties.getMaxPollCount());
}

@Override
Expand Down
Loading

0 comments on commit 76a829b

Please sign in to comment.