Skip to content

Commit 67d72db

Browse files
committed
Merge remote-tracking branch 'origin/main' into dev/fix-PemTrustConfigTests-testTrustConfigReloadsFileContents
2 parents 37f824e + e61b6ab commit 67d72db

File tree

31 files changed

+816
-191
lines changed

31 files changed

+816
-191
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 3.x]
77
### Added
8+
- Add multi-threaded writer support in pull-based ingestion ([#17912](https://github.com/opensearch-project/OpenSearch/pull/17912))
89

910
### Changed
1011

build.gradle

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ plugins {
5555
id 'opensearch.docker-support'
5656
id 'opensearch.global-build-info'
5757
id "com.diffplug.spotless" version "6.25.0" apply false
58+
id "org.gradle.test-retry" version "1.6.2" apply false
5859
id "test-report-aggregation"
5960
id 'jacoco-report-aggregation'
6061
}
@@ -70,13 +71,6 @@ apply from: 'gradle/run.gradle'
7071
apply from: 'gradle/missing-javadoc.gradle'
7172
apply from: 'gradle/code-coverage.gradle'
7273

73-
// Disable unconditional publishing of build scans
74-
develocity {
75-
buildScan {
76-
publishing.onlyIf { false }
77-
}
78-
}
79-
8074
// common maven publishing configuration
8175
allprojects {
8276
group = 'org.opensearch'
@@ -475,8 +469,9 @@ gradle.projectsEvaluated {
475469

476470
// test retry configuration
477471
subprojects {
472+
apply plugin: "org.gradle.test-retry"
478473
tasks.withType(Test).configureEach {
479-
develocity.testRetry {
474+
retry {
480475
if (BuildParams.isCi()) {
481476
maxRetries = 3
482477
maxFailures = 10

gradle.properties

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@ org.gradle.jvmargs=-Xmx3g -XX:+HeapDumpOnOutOfMemoryError -Xss2m \
2121
--add-exports jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
2222
options.forkOptions.memoryMaximumSize=3g
2323

24-
# Disable Gradle Enterprise Gradle plugin's test retry
25-
systemProp.develocity.testretry.enabled.enabled=false
26-
2724
# Disable duplicate project id detection
2825
# See https://docs.gradle.org/current/userguide/upgrading_version_6.html#duplicate_project_names_may_cause_publication_to_fail
2926
systemProp.org.gradle.dependency.duplicate.project.detection=false

libs/agent-sm/agent/src/main/java/org/opensearch/javaagent/Agent.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.opensearch.javaagent.bootstrap.AgentPolicy;
1212

1313
import java.lang.instrument.Instrumentation;
14+
import java.net.Socket;
1415
import java.nio.channels.FileChannel;
1516
import java.nio.channels.SocketChannel;
1617
import java.nio.file.Files;
@@ -71,8 +72,9 @@ public static void agentmain(String agentArguments, Instrumentation instrumentat
7172
initAgent(instrumentation);
7273
}
7374

74-
private static AgentBuilder createAgentBuilder(Instrumentation inst) throws Exception {
75-
final Junction<TypeDescription> systemType = ElementMatchers.isSubTypeOf(SocketChannel.class);
75+
private static AgentBuilder createAgentBuilder() throws Exception {
76+
final Junction<TypeDescription> socketType = ElementMatchers.isSubTypeOf(SocketChannel.class)
77+
.or(ElementMatchers.isSubTypeOf(Socket.class));
7678
final Junction<TypeDescription> pathType = ElementMatchers.isSubTypeOf(Files.class);
7779
final Junction<TypeDescription> fileChannelType = ElementMatchers.isSubTypeOf(FileChannel.class);
7880

@@ -98,11 +100,11 @@ private static AgentBuilder createAgentBuilder(Instrumentation inst) throws Exce
98100
);
99101

100102
final ByteBuddy byteBuddy = new ByteBuddy().with(Implementation.Context.Disabled.Factory.INSTANCE);
101-
final AgentBuilder agentBuilder = new AgentBuilder.Default(byteBuddy).with(AgentBuilder.InitializationStrategy.NoOp.INSTANCE)
103+
return new AgentBuilder.Default(byteBuddy).with(AgentBuilder.InitializationStrategy.NoOp.INSTANCE)
102104
.with(AgentBuilder.RedefinitionStrategy.REDEFINITION)
103105
.with(AgentBuilder.TypeStrategy.Default.REDEFINE)
104106
.ignore(ElementMatchers.nameContains("$MockitoMock$")) /* ingore all Mockito mocks */
105-
.type(systemType)
107+
.type(socketType)
106108
.transform(socketTransformer)
107109
.type(pathType.or(fileChannelType))
108110
.transform(fileTransformer)
@@ -118,12 +120,10 @@ private static AgentBuilder createAgentBuilder(Instrumentation inst) throws Exce
118120
Advice.to(RuntimeHaltInterceptor.class).on(ElementMatchers.named("halt"))
119121
)
120122
);
121-
122-
return agentBuilder;
123123
}
124124

125125
private static void initAgent(Instrumentation instrumentation) throws Exception {
126-
AgentBuilder agentBuilder = createAgentBuilder(instrumentation);
126+
AgentBuilder agentBuilder = createAgentBuilder();
127127
agentBuilder.installOn(instrumentation);
128128
}
129129
}

libs/agent-sm/agent/src/test/java/org/opensearch/javaagent/SocketChannelInterceptorTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.io.IOException;
1414
import java.net.InetAddress;
1515
import java.net.InetSocketAddress;
16+
import java.net.Socket;
1617
import java.net.UnixDomainSocketAddress;
1718
import java.nio.channels.SocketChannel;
1819

@@ -28,6 +29,8 @@ public void testConnections() throws IOException {
2829

2930
assertThrows(SecurityException.class, () -> channel.connect(new InetSocketAddress("opensearch.org", 80)));
3031
}
32+
33+
assertThrows(SecurityException.class, () -> new Socket("localhost", 9200));
3134
}
3235

3336
@Test
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
grant {
10+
permission java.net.SocketPermission "*", "connect";
11+
};
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
grant {
10+
permission java.net.SocketPermission "*", "connect";
11+
};

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,4 +200,31 @@ public void testUpdateWithoutIDField() throws Exception {
200200
return 30 == (Integer) response.getHits().getHits()[0].getSourceAsMap().get("age");
201201
});
202202
}
203+
204+
public void testMultiThreadedWrites() throws Exception {
205+
// create index with 5 writer threads
206+
createIndexWithDefaultSettings(indexName, 1, 0, 5);
207+
ensureGreen(indexName);
208+
209+
// Step 1: Produce messages
210+
for (int i = 0; i < 1000; i++) {
211+
produceData(Integer.toString(i), "name" + i, "25");
212+
}
213+
214+
waitForState(() -> {
215+
SearchResponse searchableDocsResponse = client().prepareSearch(indexName).setSize(2000).setPreference("_only_local").get();
216+
return searchableDocsResponse.getHits().getTotalHits().value() == 1000;
217+
});
218+
219+
// Step 2: Produce an update message and validate
220+
for (int i = 0; i < 1000; i++) {
221+
produceData(Integer.toString(i), "name" + i, "30");
222+
}
223+
224+
waitForState(() -> {
225+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(28);
226+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
227+
return response.getHits().getTotalHits().value() == 1000;
228+
});
229+
}
203230
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,10 @@ protected ResumeIngestionResponse resumeIngestion(String indexName) throws Execu
177177
}
178178

179179
protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
180-
createIndexWithDefaultSettings(indexName, numShards, numReplicas);
180+
createIndexWithDefaultSettings(indexName, numShards, numReplicas, 1);
181181
}
182182

183-
protected void createIndexWithDefaultSettings(String indexName, int numShards, int numReplicas) {
183+
protected void createIndexWithDefaultSettings(String indexName, int numShards, int numReplicas, int numProcessorThreads) {
184184
createIndex(
185185
indexName,
186186
Settings.builder()
@@ -191,6 +191,7 @@ protected void createIndexWithDefaultSettings(String indexName, int numShards, i
191191
.put("ingestion_source.param.topic", topicName)
192192
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
193193
.put("index.replication.type", "SEGMENT")
194+
.put("ingestion_source.num_processor_threads", numProcessorThreads)
194195
// set custom kafka consumer properties
195196
.put("ingestion_source.param.fetch.min.bytes", 30000)
196197
.put("ingestion_source.param.enable.auto.commit", false)

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ public void testErrorStrategy() throws Exception {
164164
.setSettings(Settings.builder().put("ingestion_source.error_strategy", "drop"))
165165
.get();
166166
waitForState(() -> "drop".equalsIgnoreCase(getSettings(indexName, "index.ingestion_source.error_strategy")));
167+
resumeIngestion(indexName);
167168
waitForSearchableDocs(2, Arrays.asList(node));
168169
}
169170

@@ -248,8 +249,8 @@ public void testPaginatedGetIngestionState() throws ExecutionException, Interrup
248249
internalCluster().startClusterManagerOnlyNode();
249250
internalCluster().startDataOnlyNode();
250251
internalCluster().startDataOnlyNode();
251-
createIndexWithDefaultSettings("index1", 5, 0);
252-
createIndexWithDefaultSettings("index2", 5, 0);
252+
createIndexWithDefaultSettings("index1", 5, 0, 1);
253+
createIndexWithDefaultSettings("index2", 5, 0, 1);
253254
ensureGreen("index1");
254255
ensureGreen("index2");
255256

0 commit comments

Comments
 (0)