Skip to content

Commit 8c97c47

Browse files
committed
Merge branch 'main' into rw/add-search-role
Signed-off-by: Vinay Krishna Pudyodu <[email protected]>
2 parents 972d73a + e3d3a17 commit 8c97c47

File tree

11 files changed

+274
-33
lines changed

11 files changed

+274
-33
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 2.x]
77
### Added
8+
- Change priority for scheduling reroute during timeout([#16445](https://github.com/opensearch-project/OpenSearch/pull/16445))
89
- Renaming the node role search to warm ([#17573](https://github.com/opensearch-project/OpenSearch/pull/17573))
910
- Introduce a new search node role to hold search only shards ([#17620](https://github.com/opensearch-project/OpenSearch/pull/17620))
1011

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import org.testcontainers.utility.DockerImageName;
3535

3636
/**
37-
* Base test class for Kafka ingestion tests
37+
* Base test class for Kafka ingestion tests.
3838
*/
3939
@ThreadLeakFilters(filters = TestContainerThreadLeakFilter.class)
4040
public class KafkaIngestionBaseIT extends OpenSearchIntegTestCase {
@@ -135,6 +135,9 @@ protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
135135
.put("ingestion_source.param.topic", topicName)
136136
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
137137
.put("index.replication.type", "SEGMENT")
138+
// set custom kafka consumer properties
139+
.put("ingestion_source.param.fetch.min.bytes", 30000)
140+
.put("ingestion_source.param.enable.auto.commit", false)
138141
.build(),
139142
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
140143
);

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ public void testSegmentReplicationWithRemoteStore() throws Exception {
4848
internalCluster().startClusterManagerOnlyNode();
4949
final String nodeA = internalCluster().startDataOnlyNode();
5050
createIndexWithDefaultSettings(1, 1);
51-
5251
ensureYellowAndNoInitializingShards(indexName);
5352
final String nodeB = internalCluster().startDataOnlyNode();
5453
ensureGreen(indexName);

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.opensearch.plugin.kafka;
1010

1111
import org.apache.kafka.clients.consumer.Consumer;
12-
import org.apache.kafka.clients.consumer.ConsumerConfig;
1312
import org.apache.kafka.clients.consumer.ConsumerRecord;
1413
import org.apache.kafka.clients.consumer.ConsumerRecords;
1514
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -99,9 +98,10 @@ protected static Consumer<byte[], byte[]> createConsumer(String clientId, KafkaS
9998
Properties consumerProp = new Properties();
10099
consumerProp.put("bootstrap.servers", config.getBootstrapServers());
101100
consumerProp.put("client.id", clientId);
102-
if (config.getAutoOffsetResetConfig() != null && !config.getAutoOffsetResetConfig().isEmpty()) {
103-
consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.getAutoOffsetResetConfig());
104-
}
101+
102+
logger.info("Kafka consumer properties for topic {}: {}", config.getTopic(), config.getConsumerConfigurations());
103+
consumerProp.putAll(config.getConsumerConfigurations());
104+
105105
// TODO: why Class org.apache.kafka.common.serialization.StringDeserializer could not be found if set the deserializer as prop?
106106
// consumerProp.put("key.deserializer",
107107
// "org.apache.kafka.common.serialization.StringDeserializer");

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.opensearch.core.util.ConfigurationUtils;
1212

13+
import java.util.HashMap;
1314
import java.util.Map;
1415

1516
/**
@@ -18,21 +19,27 @@
1819
public class KafkaSourceConfig {
1920
private final String PROP_TOPIC = "topic";
2021
private final String PROP_BOOTSTRAP_SERVERS = "bootstrap_servers";
21-
// TODO: support pass any generic kafka configs
2222
private final String PROP_AUTO_OFFSET_RESET = "auto.offset.reset";
2323

2424
private final String topic;
2525
private final String bootstrapServers;
2626
private final String autoOffsetResetConfig;
2727

28+
private final Map<String, Object> consumerConfigsMap;
29+
2830
/**
29-
* Constructor
31+
* Extracts and look for required and optional kafka consumer configurations.
3032
* @param params the configuration parameters
3133
*/
3234
public KafkaSourceConfig(Map<String, Object> params) {
3335
this.topic = ConfigurationUtils.readStringProperty(params, PROP_TOPIC);
3436
this.bootstrapServers = ConfigurationUtils.readStringProperty(params, PROP_BOOTSTRAP_SERVERS);
3537
this.autoOffsetResetConfig = ConfigurationUtils.readOptionalStringProperty(params, PROP_AUTO_OFFSET_RESET);
38+
this.consumerConfigsMap = new HashMap<>(params);
39+
40+
// remove above configurations
41+
consumerConfigsMap.remove(PROP_TOPIC);
42+
consumerConfigsMap.remove(PROP_BOOTSTRAP_SERVERS);
3643
}
3744

3845
/**
@@ -60,4 +67,8 @@ public String getBootstrapServers() {
6067
public String getAutoOffsetResetConfig() {
6168
return autoOffsetResetConfig;
6269
}
70+
71+
public Map<String, Object> getConsumerConfigurations() {
72+
return consumerConfigsMap;
73+
}
6374
}

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSourceConfigTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717
public class KafkaSourceConfigTests extends OpenSearchTestCase {
1818

19-
public void testConstructorAndGetters() {
19+
public void testKafkaSourceConfig() {
2020
Map<String, Object> params = new HashMap<>();
2121
params.put("topic", "topic");
2222
params.put("bootstrap_servers", "bootstrap");
23+
params.put("fetch.min.bytes", 30000);
24+
params.put("enable.auto.commit", false);
2325

2426
KafkaSourceConfig config = new KafkaSourceConfig(params);
2527

@@ -29,5 +31,7 @@ public void testConstructorAndGetters() {
2931
"bootstrap",
3032
config.getBootstrapServers()
3133
);
34+
Assert.assertEquals("Incorrect fetch.min.bytes", 30000, config.getConsumerConfigurations().get("fetch.min.bytes"));
35+
Assert.assertEquals("Incorrect enable.auto.commit", false, config.getConsumerConfigurations().get("enable.auto.commit"));
3236
}
3337
}

server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,11 @@
6262
import java.util.HashMap;
6363
import java.util.HashSet;
6464
import java.util.Iterator;
65+
import java.util.Locale;
6566
import java.util.Map;
6667
import java.util.Set;
6768

69+
import static org.opensearch.cluster.action.shard.ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING;
6870
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
6971
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID;
7072
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
@@ -199,6 +201,32 @@ public class BalancedShardsAllocator implements ShardsAllocator {
199201
Setting.Property.Dynamic
200202
);
201203

204+
/**
205+
* Adjusts the priority of the followup reroute task when current round times out. NORMAL is right for reasonable clusters,
206+
* but for a cluster in a messed up state which is starving NORMAL priority tasks, it might be necessary to raise this higher
207+
* to allocate shards.
208+
*/
209+
public static final Setting<Priority> FOLLOW_UP_REROUTE_PRIORITY_SETTING = new Setting<>(
210+
"cluster.routing.allocation.balanced_shards_allocator.schedule_reroute.priority",
211+
Priority.NORMAL.toString(),
212+
BalancedShardsAllocator::parseReroutePriority,
213+
Setting.Property.NodeScope,
214+
Setting.Property.Dynamic
215+
);
216+
217+
private static Priority parseReroutePriority(String priorityString) {
218+
final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT));
219+
switch (priority) {
220+
case NORMAL:
221+
case HIGH:
222+
case URGENT:
223+
return priority;
224+
}
225+
throw new IllegalArgumentException(
226+
"priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]"
227+
);
228+
}
229+
202230
private volatile boolean movePrimaryFirst;
203231
private volatile ShardMovementStrategy shardMovementStrategy;
204232

@@ -213,6 +241,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
213241

214242
private volatile boolean ignoreThrottleInRestore;
215243
private volatile TimeValue allocatorTimeout;
244+
private volatile Priority followUpRerouteTaskPriority;
216245
private long startTime;
217246
private RerouteService rerouteService;
218247

@@ -233,6 +262,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
233262
setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings));
234263
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
235264
setAllocatorTimeout(ALLOCATOR_TIMEOUT_SETTING.get(settings));
265+
setFollowUpRerouteTaskPriority(FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(settings));
236266
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
237267
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
238268
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
@@ -244,6 +274,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
244274
clusterSettings.addSettingsUpdateConsumer(PRIMARY_CONSTRAINT_THRESHOLD_SETTING, this::setPrimaryConstraintThresholdSetting);
245275
clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore);
246276
clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout);
277+
clusterSettings.addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING, this::setFollowUpRerouteTaskPriority);
247278
}
248279

249280
@Override
@@ -342,6 +373,10 @@ private void setAllocatorTimeout(TimeValue allocatorTimeout) {
342373
this.allocatorTimeout = allocatorTimeout;
343374
}
344375

376+
private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) {
377+
this.followUpRerouteTaskPriority = followUpRerouteTaskPriority;
378+
}
379+
345380
protected boolean allocatorTimedOut() {
346381
if (allocatorTimeout.equals(TimeValue.MINUS_ONE)) {
347382
if (logger.isTraceEnabled()) {
@@ -438,10 +473,13 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) {
438473

439474
private void scheduleRerouteIfAllocatorTimedOut() {
440475
if (allocatorTimedOut()) {
441-
assert rerouteService != null : "RerouteService not set to schedule reroute after allocator time out";
476+
if (rerouteService == null) {
477+
logger.info("RerouteService not set to schedule reroute after allocator time out");
478+
return;
479+
}
442480
rerouteService.reroute(
443481
"reroute after balanced shards allocator timed out",
444-
Priority.HIGH,
482+
followUpRerouteTaskPriority,
445483
ActionListener.wrap(
446484
r -> logger.trace("reroute after balanced shards allocator timed out completed"),
447485
e -> logger.debug("reroute after balanced shards allocator timed out failed", e)

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ public void apply(Settings value, Settings current, Settings previous) {
277277
BalancedShardsAllocator.THRESHOLD_SETTING,
278278
BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE,
279279
BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING,
280+
BalancedShardsAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING,
280281
BalancedShardsAllocator.PRIMARY_CONSTRAINT_THRESHOLD_SETTING,
281282
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
282283
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
@@ -351,6 +352,7 @@ public void apply(Settings value, Settings current, Settings previous) {
351352
ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE,
352353
ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING,
353354
ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING,
355+
ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING,
354356
PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD,
355357
NetworkModule.HTTP_DEFAULT_TYPE_SETTING,
356358
NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING,

server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.HashSet;
5454
import java.util.Iterator;
5555
import java.util.List;
56+
import java.util.Locale;
5657
import java.util.Map;
5758
import java.util.Objects;
5859
import java.util.Set;
@@ -82,6 +83,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
8283

8384
private TimeValue primaryShardsBatchGatewayAllocatorTimeout;
8485
private TimeValue replicaShardsBatchGatewayAllocatorTimeout;
86+
private volatile Priority followUpRerouteTaskPriority;
8587
public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20);
8688
private final ClusterManagerMetrics clusterManagerMetrics;
8789

@@ -145,6 +147,32 @@ public void validate(TimeValue timeValue) {
145147
Setting.Property.Dynamic
146148
);
147149

150+
/**
151+
* Adjusts the priority of the followup reroute task when current round times out. NORMAL is right for reasonable clusters,
152+
* but for a cluster in a messed up state which is starving NORMAL priority tasks, it might be necessary to raise this higher
153+
* to allocate existing shards.
154+
*/
155+
public static final Setting<Priority> FOLLOW_UP_REROUTE_PRIORITY_SETTING = new Setting<>(
156+
"cluster.routing.allocation.shards_batch_gateway_allocator.schedule_reroute.priority",
157+
Priority.NORMAL.toString(),
158+
ShardsBatchGatewayAllocator::parseReroutePriority,
159+
Setting.Property.NodeScope,
160+
Setting.Property.Dynamic
161+
);
162+
163+
private static Priority parseReroutePriority(String priorityString) {
164+
final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT));
165+
switch (priority) {
166+
case NORMAL:
167+
case HIGH:
168+
case URGENT:
169+
return priority;
170+
}
171+
throw new IllegalArgumentException(
172+
"priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]"
173+
);
174+
}
175+
148176
private final RerouteService rerouteService;
149177
private final PrimaryShardBatchAllocator primaryShardBatchAllocator;
150178
private final ReplicaShardBatchAllocator replicaShardBatchAllocator;
@@ -179,6 +207,8 @@ public ShardsBatchGatewayAllocator(
179207
this.replicaShardsBatchGatewayAllocatorTimeout = REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings);
180208
clusterSettings.addSettingsUpdateConsumer(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING, this::setReplicaBatchAllocatorTimeout);
181209
this.clusterManagerMetrics = clusterManagerMetrics;
210+
setFollowUpRerouteTaskPriority(FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(settings));
211+
clusterSettings.addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING, this::setFollowUpRerouteTaskPriority);
182212
}
183213

184214
@Override
@@ -308,8 +338,8 @@ public void onComplete() {
308338
logger.trace("scheduling reroute after existing shards allocator timed out for primary shards");
309339
assert rerouteService != null;
310340
rerouteService.reroute(
311-
"reroute after existing shards allocator timed out",
312-
Priority.HIGH,
341+
"reroute after existing shards allocator [P] timed out",
342+
followUpRerouteTaskPriority,
313343
ActionListener.wrap(
314344
r -> logger.trace("reroute after existing shards allocator timed out completed"),
315345
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
@@ -343,8 +373,8 @@ public void onComplete() {
343373
logger.trace("scheduling reroute after existing shards allocator timed out for replica shards");
344374
assert rerouteService != null;
345375
rerouteService.reroute(
346-
"reroute after existing shards allocator timed out",
347-
Priority.HIGH,
376+
"reroute after existing shards allocator [R] timed out",
377+
followUpRerouteTaskPriority,
348378
ActionListener.wrap(
349379
r -> logger.trace("reroute after existing shards allocator timed out completed"),
350380
e -> logger.debug("reroute after existing shards allocator timed out failed", e)
@@ -920,4 +950,8 @@ protected void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatew
920950
protected void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatewayAllocatorTimeout) {
921951
this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout;
922952
}
953+
954+
protected void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) {
955+
this.followUpRerouteTaskPriority = followUpRerouteTaskPriority;
956+
}
923957
}

0 commit comments

Comments
 (0)