Skip to content

Commit d720f0a

Browse files
benwtrentGurkan Kaymak
authored andcommitted
[ML] add multi node integ tests for data frames (elastic#41508)
* [ML] adding native-multi-node-integTests for data frames' * addressing streaming issues * formatting fixes * Addressing PR comments
1 parent bee3d49 commit d720f0a

File tree

10 files changed

+531
-53
lines changed

10 files changed

+531
-53
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@
4444
import org.elasticsearch.xpack.core.ccr.CCRFeatureSet;
4545
import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage;
4646
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
47+
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
48+
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
49+
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
50+
import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction;
51+
import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction;
52+
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
53+
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
54+
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
4755
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
4856
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
4957
import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
@@ -363,7 +371,16 @@ public List<Action<? extends ActionResponse>> getClientActions() {
363371
RemoveIndexLifecyclePolicyAction.INSTANCE,
364372
MoveToStepAction.INSTANCE,
365373
RetryAction.INSTANCE,
366-
TransportFreezeIndexAction.FreezeIndexAction.INSTANCE
374+
TransportFreezeIndexAction.FreezeIndexAction.INSTANCE,
375+
// Data Frame
376+
PutDataFrameTransformAction.INSTANCE,
377+
StartDataFrameTransformAction.INSTANCE,
378+
StartDataFrameTransformTaskAction.INSTANCE,
379+
StopDataFrameTransformAction.INSTANCE,
380+
DeleteDataFrameTransformAction.INSTANCE,
381+
GetDataFrameTransformsAction.INSTANCE,
382+
GetDataFrameTransformsStatsAction.INSTANCE,
383+
PreviewDataFrameTransformAction.INSTANCE
367384
);
368385
}
369386

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsStatsAction.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,21 +56,22 @@ public static class Request extends BaseTasksRequest<Request> {
5656

5757
public static final int MAX_SIZE_RETURN = 1000;
5858
// used internally to expand the queried id expression
59-
private List<String> expandedIds = Collections.emptyList();
59+
private List<String> expandedIds;
6060

6161
public Request(String id) {
6262
if (Strings.isNullOrEmpty(id) || id.equals("*")) {
6363
this.id = MetaData.ALL;
6464
} else {
6565
this.id = id;
6666
}
67+
this.expandedIds = Collections.singletonList(id);
6768
}
6869

6970
public Request(StreamInput in) throws IOException {
7071
super(in);
7172
id = in.readString();
72-
expandedIds = in.readList(StreamInput::readString);
73-
pageParams = in.readOptionalWriteable(PageParams::new);
73+
expandedIds = Collections.unmodifiableList(in.readStringList());
74+
pageParams = new PageParams(in);
7475
}
7576

7677
@Override
@@ -93,7 +94,7 @@ public void setExpandedIds(List<String> expandedIds) {
9394
}
9495

9596
public final void setPageParams(PageParams pageParams) {
96-
this.pageParams = pageParams;
97+
this.pageParams = Objects.requireNonNull(pageParams);
9798
}
9899

99100
public final PageParams getPageParams() {
@@ -105,7 +106,7 @@ public void writeTo(StreamOutput out) throws IOException {
105106
super.writeTo(out);
106107
out.writeString(id);
107108
out.writeStringCollection(expandedIds);
108-
out.writeOptionalWriteable(pageParams);
109+
pageParams.writeTo(out);
109110
}
110111

111112
@Override
@@ -136,7 +137,7 @@ public boolean equals(Object obj) {
136137
}
137138
}
138139

139-
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
140+
public static class Response extends BaseTasksResponse implements ToXContentObject {
140141
private List<DataFrameTransformStateAndStats> transformsStateAndStats;
141142

142143
public Response(List<DataFrameTransformStateAndStats> transformsStateAndStats) {
@@ -165,6 +166,11 @@ public void writeTo(StreamOutput out) throws IOException {
165166
out.writeList(transformsStateAndStats);
166167
}
167168

169+
@Override
170+
public void readFrom(StreamInput in) {
171+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
172+
}
173+
168174
@Override
169175
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
170176
builder.startObject();

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public boolean equals(Object obj) {
9595
}
9696
}
9797

98-
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
98+
public static class Response extends BaseTasksResponse implements ToXContentObject {
9999
private final boolean started;
100100

101101
public Response(StreamInput in) throws IOException {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public boolean equals(Object obj) {
9393
}
9494
}
9595

96-
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
96+
public static class Response extends BaseTasksResponse implements ToXContentObject {
9797
private final boolean started;
9898

9999
public Response(StreamInput in) throws IOException {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsStatsActionResponseTests.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66

77
package org.elasticsearch.xpack.core.dataframe.action;
88

9+
import org.elasticsearch.ElasticsearchException;
10+
import org.elasticsearch.action.FailedNodeException;
11+
import org.elasticsearch.action.TaskOperationFailure;
912
import org.elasticsearch.common.io.stream.Writeable.Reader;
1013
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
1114
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
@@ -18,11 +21,18 @@ public class GetDataFrameTransformsStatsActionResponseTests extends AbstractWire
1821
@Override
1922
protected Response createTestInstance() {
2023
List<DataFrameTransformStateAndStats> stats = new ArrayList<>();
21-
for (int i = 0; i < randomInt(10); ++i) {
24+
int totalStats = randomInt(10);
25+
for (int i = 0; i < totalStats; ++i) {
2226
stats.add(DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats());
2327
}
24-
25-
return new Response(stats);
28+
int totalErrors = randomInt(10);
29+
List<TaskOperationFailure> taskFailures = new ArrayList<>(totalErrors);
30+
List<ElasticsearchException> nodeFailures = new ArrayList<>(totalErrors);
31+
for (int i = 0; i < totalErrors; i++) {
32+
taskFailures.add(new TaskOperationFailure("node1", randomLongBetween(1, 10), new Exception("error")));
33+
nodeFailures.add(new FailedNodeException("node1", "message", new Exception("error")));
34+
}
35+
return new Response(stats, taskFailures, nodeFailures);
2636
}
2737

2838
@Override
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
apply plugin: 'elasticsearch.standalone-rest-test'
2+
apply plugin: 'elasticsearch.rest-test'
3+
4+
dependencies {
5+
testCompile project(path: xpackModule('core'), configuration: 'default')
6+
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
7+
testCompile project(path: xpackModule('data-frame'), configuration: 'runtime')
8+
}
9+
10+
// location for keys and certificates
11+
File keystoreDir = new File(project.buildDir, 'keystore')
12+
File nodeKey = file("$keystoreDir/testnode.pem")
13+
File nodeCert = file("$keystoreDir/testnode.crt")
14+
// Add key and certs to test classpath: it expects it there
15+
task copyKeyCerts(type: Copy) {
16+
from(project(':x-pack:plugin:core').file('src/test/resources/org/elasticsearch/xpack/security/transport/ssl/certs/simple/')) {
17+
include 'testnode.crt', 'testnode.pem'
18+
}
19+
into keystoreDir
20+
}
21+
// Add keys and cets to test classpath: it expects it there
22+
sourceSets.test.resources.srcDir(keystoreDir)
23+
processTestResources.dependsOn(copyKeyCerts)
24+
25+
integTestCluster {
26+
dependsOn copyKeyCerts
27+
setting 'xpack.security.enabled', 'true'
28+
setting 'xpack.license.self_generated.type', 'trial'
29+
setting 'xpack.monitoring.enabled', 'false'
30+
setting 'xpack.security.authc.token.enabled', 'true'
31+
setting 'xpack.security.transport.ssl.enabled', 'true'
32+
setting 'xpack.security.transport.ssl.key', nodeKey.name
33+
setting 'xpack.security.transport.ssl.certificate', nodeCert.name
34+
setting 'xpack.security.transport.ssl.verification_mode', 'certificate'
35+
setting 'xpack.security.audit.enabled', 'false'
36+
setting 'xpack.license.self_generated.type', 'trial'
37+
keystoreSetting 'bootstrap.password', 'x-pack-test-password'
38+
keystoreSetting 'xpack.security.transport.ssl.secure_key_passphrase', 'testnode'
39+
setupCommand 'setupDummyUser',
40+
'bin/elasticsearch-users', 'useradd', 'x_pack_rest_user', '-p', 'x-pack-test-password', '-r', 'superuser'
41+
42+
numNodes = 3
43+
extraConfigFile nodeKey.name, nodeKey
44+
extraConfigFile nodeCert.name, nodeCert
45+
waitCondition = { node, ant ->
46+
File tmpFile = new File(node.cwd, 'wait.success')
47+
ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow",
48+
dest: tmpFile.toString(),
49+
username: 'x_pack_rest_user',
50+
password: 'x-pack-test-password',
51+
ignoreerrors: true,
52+
retries: 10)
53+
return tmpFile.exists()
54+
}
55+
}

0 commit comments

Comments
 (0)