Skip to content

Make MaxPollCount Configurable. #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.dao.QueueDAO;

import io.orkes.conductor.mq.redis.QueueMonitorProperties;
import io.orkes.conductor.queue.dao.ClusteredRedisQueueDAO;
import io.orkes.conductor.queue.dao.RedisQueueDAO;

Expand All @@ -50,9 +51,11 @@ public QueueDAO getQueueDAOStandalone(
JedisPool jedisPool,
MeterRegistry registry,
QueueRedisProperties queueRedisProperties,
ConductorProperties properties) {
ConductorProperties properties,
QueueMonitorProperties queueMonitorProperties) {
log.info("getQueueDAOStandalone init");
return new RedisQueueDAO(registry, jedisPool, queueRedisProperties, properties);
return new RedisQueueDAO(
registry, jedisPool, queueRedisProperties, properties, queueMonitorProperties);
}

@Bean
Expand All @@ -62,8 +65,14 @@ public QueueDAO getQueueDAOSentinel(
JedisSentinelPool jedisSentinelPool,
MeterRegistry registry,
QueueRedisProperties queueRedisProperties,
ConductorProperties properties) {
return new RedisQueueDAO(registry, jedisSentinelPool, queueRedisProperties, properties);
ConductorProperties properties,
QueueMonitorProperties queueMonitorProperties) {
return new RedisQueueDAO(
registry,
jedisSentinelPool,
queueRedisProperties,
properties,
queueMonitorProperties);
}

@Bean
Expand All @@ -73,8 +82,10 @@ public QueueDAO getQueueDAOCluster(
JedisCluster jedisCluster,
MeterRegistry registry,
QueueRedisProperties queueRedisProperties,
ConductorProperties properties) {
return new ClusteredRedisQueueDAO(registry, jedisCluster, queueRedisProperties, properties);
ConductorProperties properties,
QueueMonitorProperties queueMonitorProperties) {
return new ClusteredRedisQueueDAO(
registry, jedisCluster, queueRedisProperties, properties, queueMonitorProperties);
}

@Bean
Expand All @@ -90,7 +101,7 @@ protected JedisPool getJedisPoolStandalone(QueueRedisProperties redisProperties)
redisProperties.isSsl());
Host host = hostSupplier.getHosts().get(0);

if (redisProperties.getUsername() != null && host.getPassword() != null) {
if (redisProperties.getUsername() != null && host.getPassword() != null) {
log.info("Connecting to Redis Standalone with ACL user AUTH");
return new JedisPool(
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.netflix.conductor.dao.QueueDAO;

import io.orkes.conductor.mq.ConductorQueue;
import io.orkes.conductor.mq.redis.QueueMonitorProperties;
import io.orkes.conductor.mq.redis.cluster.ConductorRedisClusterQueue;
import io.orkes.conductor.queue.config.QueueRedisProperties;

Expand All @@ -30,21 +31,26 @@ public class ClusteredRedisQueueDAO extends BaseRedisQueueDAO implements QueueDA

private final MeterRegistry registry;

private final QueueMonitorProperties queueMonitorProperties;

public ClusteredRedisQueueDAO(
MeterRegistry registry,
JedisCluster jedisCluster,
QueueRedisProperties queueRedisProperties,
ConductorProperties conductorProperties) {
ConductorProperties conductorProperties,
QueueMonitorProperties queueMonitorProperties) {

super(queueRedisProperties, conductorProperties);
this.registry = registry;
this.jedisCluster = jedisCluster;
this.queueMonitorProperties = queueMonitorProperties;
log.info("Queues initialized using {}", ClusteredRedisQueueDAO.class.getName());
}

@Override
protected ConductorQueue getConductorQueue(String queueKey) {
ConductorRedisClusterQueue queue = new ConductorRedisClusterQueue(queueKey, jedisCluster);
ConductorRedisClusterQueue queue =
new ConductorRedisClusterQueue(queueKey, jedisCluster, queueMonitorProperties);
return queue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.netflix.conductor.dao.QueueDAO;

import io.orkes.conductor.mq.ConductorQueue;
import io.orkes.conductor.mq.redis.QueueMonitorProperties;
import io.orkes.conductor.mq.redis.single.ConductorRedisQueue;
import io.orkes.conductor.queue.config.QueueRedisProperties;

Expand All @@ -30,20 +31,24 @@ public class RedisQueueDAO extends BaseRedisQueueDAO implements QueueDAO {

private final MeterRegistry registry;

private final QueueMonitorProperties queueMonitorProperties;

public RedisQueueDAO(
MeterRegistry registry,
JedisPoolAbstract jedisPool,
QueueRedisProperties queueRedisProperties,
ConductorProperties conductorProperties) {
ConductorProperties conductorProperties,
QueueMonitorProperties queueMonitorProperties) {

super(queueRedisProperties, conductorProperties);
this.registry = registry;
this.jedisPool = jedisPool;
this.queueMonitorProperties = queueMonitorProperties;
log.info("Queues initialized using {}", RedisQueueDAO.class.getName());
}

@Override
protected ConductorQueue getConductorQueue(String queueKey) {
return new ConductorRedisQueue(queueKey, jedisPool);
return new ConductorRedisQueue(queueKey, jedisPool, queueMonitorProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.netflix.conductor.core.config.ConductorProperties;

import io.orkes.conductor.mq.redis.QueueMonitorProperties;
import io.orkes.conductor.queue.config.QueueRedisProperties;

import com.google.common.util.concurrent.Uninterruptibles;
Expand Down Expand Up @@ -60,8 +61,13 @@ public static void setUp() {
JedisCluster jedisCluster = new JedisCluster(hostAndPorts);
ConductorProperties properties = new ConductorProperties();
QueueRedisProperties queueRedisProperties = new QueueRedisProperties(properties);
QueueMonitorProperties queueMonitorProperties = new QueueMonitorProperties();
redisQueue =
new ClusteredRedisQueueDAO(
registry, jedisCluster, queueRedisProperties, properties);
registry,
jedisCluster,
queueRedisProperties,
properties,
queueMonitorProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.netflix.conductor.core.config.ConductorProperties;

import io.orkes.conductor.mq.redis.QueueMonitorProperties;
import io.orkes.conductor.queue.config.QueueRedisProperties;

import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
Expand Down Expand Up @@ -50,8 +51,13 @@ public static void setUp() {
SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
ConductorProperties conductorProperties = new ConductorProperties();
QueueRedisProperties queueRedisProperties = new QueueRedisProperties(conductorProperties);
QueueMonitorProperties queueMonitorProperties = new QueueMonitorProperties();
redisQueue =
new RedisQueueDAO(
meterRegistry, jedisPool, queueRedisProperties, conductorProperties);
meterRegistry,
jedisPool,
queueRedisProperties,
conductorProperties,
queueMonitorProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.events.queue.Message;

import io.orkes.conductor.mq.redis.QueueMonitorProperties;
import io.orkes.conductor.queue.config.QueueRedisProperties;
import io.orkes.conductor.queue.dao.BaseRedisQueueDAO;
import io.orkes.conductor.queue.dao.ClusteredRedisQueueDAO;
Expand Down Expand Up @@ -146,9 +147,14 @@ private static void benchmarkRedisCluster() {
JedisCluster jedisCluster = new JedisCluster(hostAndPorts);
ConductorProperties properties = new ConductorProperties();
QueueRedisProperties queueRedisProperties = new QueueRedisProperties(properties);
QueueMonitorProperties queueMonitorProperties = new QueueMonitorProperties();
ClusteredRedisQueueDAO clusteredRedisQueue =
new ClusteredRedisQueueDAO(
registry, jedisCluster, queueRedisProperties, properties);
registry,
jedisCluster,
queueRedisProperties,
properties,
queueMonitorProperties);

RedisQueueDAOBenchmark benchmark = new RedisQueueDAOBenchmark(clusteredRedisQueue);
benchmark.runBenchmark();
Expand All @@ -166,14 +172,16 @@ private static void benchmarkRedisStandalone() {
config.setMaxTotal(20);
JedisPool jedisPool = new JedisPool(config, "localhost", 6379);
ConductorProperties properties = new ConductorProperties();
QueueMonitorProperties queueMonitorProperties = new QueueMonitorProperties();

RedisQueueDAOBenchmark benchmark =
new RedisQueueDAOBenchmark(
new RedisQueueDAO(
registry,
jedisPool,
new QueueRedisProperties(properties),
properties));
properties,
queueMonitorProperties));

benchmark.runBenchmark();

Expand Down
1 change: 1 addition & 0 deletions orkes-redis-queues/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
dependencies {

implementation "redis.clients:jedis:${versions.revJedis}"
implementation 'org.springframework.boot:spring-boot-starter'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would like to avoid bringing this in here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I know why?

This dependency is required to initialise QueueMonitorProperties


//Guava
implementation "com.google.guava:guava:${versions.revGuava}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,21 @@ public abstract class QueueMonitor {

private int maxPollCount = 100;

public void setMaxPollCount(int maxPollCount) {
this.maxPollCount = maxPollCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ArrayBlockingQueue is being initalized in the constructor with maxPollCount. If you wish to make use of this, please propagate it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching that. Done.

}

public int getMaxPollCount() {
return maxPollCount;
}

public QueueMonitor(String queueName) {
this.queueName = queueName;
this.clock = Clock.systemDefaultZone();
this.peekedMessages = new LinkedBlockingQueue<>();
this.executorService =
new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(maxPollCount));
1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(getMaxPollCount()));
}

public List<QueueMessage> pop(int count, int waitTime, TimeUnit timeUnit) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Orkes, Inc.
* <p>
* Licensed under the Orkes Community License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* https://github.com/orkes-io/licenses/blob/main/community/LICENSE.txt
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.orkes.conductor.mq.redis;

import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@ConfigurationProperties("conductor.queue")
@Component
@AutoConfiguration
@EnableAutoConfiguration
public class QueueMonitorProperties {
private int maxPollCount = 100;

public int getMaxPollCount() {
return maxPollCount;
}

public void setMaxPollCount(int maxPollCount) {
this.maxPollCount = maxPollCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;

import io.orkes.conductor.mq.redis.QueueMonitor;
import io.orkes.conductor.mq.redis.QueueMonitorProperties;

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.JedisCluster;
Expand All @@ -32,11 +33,15 @@ public class ClusteredQueueMonitor extends QueueMonitor {

private final String queueName;

public ClusteredQueueMonitor(JedisCluster jedisCluster, String queueName) {
public ClusteredQueueMonitor(
JedisCluster jedisCluster,
String queueName,
QueueMonitorProperties queueMonitorProperties) {
super(queueName);
this.queueName = queueName;
this.jedisCluster = jedisCluster;
this.scriptSha = loadScript();
this.setMaxPollCount(queueMonitorProperties.getMaxPollCount());
}

private String loadScript() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import io.orkes.conductor.mq.ConductorQueue;
import io.orkes.conductor.mq.QueueMessage;
import io.orkes.conductor.mq.redis.QueueMonitorProperties;

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.JedisCluster;
Expand All @@ -45,12 +46,16 @@ public class ConductorRedisClusterQueue implements ConductorQueue {

private final ClusteredQueueMonitor queueMonitor;

public ConductorRedisClusterQueue(String queueName, JedisCluster jedisCluster) {
public ConductorRedisClusterQueue(
String queueName,
JedisCluster jedisCluster,
QueueMonitorProperties queueMonitorProperties) {
this.jedis = jedisCluster;
this.clock = Clock.systemDefaultZone();
this.queueName = queueName;
this.payloadKey = queueName + "_payload";
this.queueMonitor = new ClusteredQueueMonitor(jedisCluster, queueName);
this.queueMonitor =
new ClusteredQueueMonitor(jedisCluster, queueName, queueMonitorProperties);

log.info("ConductorRedisClusterQueue started serving {}", queueName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.orkes.conductor.mq.ConductorQueue;
import io.orkes.conductor.mq.QueueMessage;
import io.orkes.conductor.mq.redis.QueueMonitor;
import io.orkes.conductor.mq.redis.QueueMonitorProperties;

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;
Expand All @@ -42,12 +43,15 @@ public class ConductorRedisQueue implements ConductorQueue {

private final QueueMonitor queueMonitor;

public ConductorRedisQueue(String queueName, JedisPoolAbstract jedisPool) {
public ConductorRedisQueue(
String queueName,
JedisPoolAbstract jedisPool,
QueueMonitorProperties queueMonitorProperties) {
this.jedisPool = jedisPool;
this.clock = Clock.systemDefaultZone();
this.queueName = queueName;
this.payloadKey = queueName + "_payload";
this.queueMonitor = new RedisQueueMonitor(jedisPool, queueName);
this.queueMonitor = new RedisQueueMonitor(jedisPool, queueName, queueMonitorProperties);
log.info("ConductorRedisQueue started serving {}", queueName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.List;

import io.orkes.conductor.mq.redis.QueueMonitor;
import io.orkes.conductor.mq.redis.QueueMonitorProperties;

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;
Expand All @@ -32,11 +33,15 @@ public class RedisQueueMonitor extends QueueMonitor {

private final String scriptSha;

public RedisQueueMonitor(JedisPoolAbstract jedisPool, String queueName) {
public RedisQueueMonitor(
JedisPoolAbstract jedisPool,
String queueName,
QueueMonitorProperties queueMonitorProperties) {
super(queueName);
this.jedisPool = jedisPool;
this.queueName = queueName;
this.scriptSha = loadScript();
this.setMaxPollCount(queueMonitorProperties.getMaxPollCount());
}

@Override
Expand Down
Loading
Loading