Skip to content

Commit a730b44

Browse files
authored
[ML][Data Frame] Moving destination creation to _start (#41416)
* [ML][Data Frame] Moving destination creation to _start * slight refactor of DataFrameAuditor constructor
1 parent 3b0dd4a commit a730b44

File tree

14 files changed

+173
-117
lines changed

14 files changed

+173
-117
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ public class DataFrameMessages {
2222
public static final String REST_PUT_DATA_FRAME_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist data frame configuration";
2323
public static final String REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS = "Failed to deduce dest mappings";
2424
public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_DEST_INDEX = "Failed to create dest index";
25-
public static final String REST_PUT_DATA_FRAME_DEST_INDEX_ALREADY_EXISTS = "dest index [{0}] already exists";
2625
public static final String REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING = "Source index [{0}] does not exist";
2726
public static final String REST_PUT_DATA_FRAME_INCONSISTENT_ID =
2827
"Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument";

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameMetaDataIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ public void createIndexes() throws IOException {
3737
indicesCreated = true;
3838
}
3939

40-
public void testMetaData() throws IOException {
40+
public void testMetaData() throws Exception {
4141
long testStarted = System.currentTimeMillis();
4242
createPivotReviewsTransform("test_meta", "pivot_reviews", null);
43+
startAndWaitForTransform("test_meta", "pivot_reviews");
4344

4445
Response mappingResponse = client().performRequest(new Request("GET", "pivot_reviews/_mapping"));
4546

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,9 @@ public void testHistogramPivot() throws Exception {
115115
createDataframeTransformRequest.setJsonEntity(config);
116116
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
117117
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
118-
assertTrue(indexExists(dataFrameIndex));
119118

120119
startAndWaitForTransform(transformId, dataFrameIndex);
120+
assertTrue(indexExists(dataFrameIndex));
121121

122122
// we expect 3 documents as there shall be 5 unique star values and we are bucketing every 2 starting at 0
123123
Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");
@@ -174,9 +174,9 @@ public void testBiggerPivot() throws Exception {
174174
createDataframeTransformRequest.setJsonEntity(config);
175175
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
176176
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
177-
assertTrue(indexExists(dataFrameIndex));
178177

179178
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
179+
assertTrue(indexExists(dataFrameIndex));
180180

181181
// we expect 27 documents as there shall be 27 user_id's
182182
Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");
@@ -228,9 +228,9 @@ public void testDateHistogramPivot() throws Exception {
228228
createDataframeTransformRequest.setJsonEntity(config);
229229
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
230230
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
231-
assertTrue(indexExists(dataFrameIndex));
232231

233232
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
233+
assertTrue(indexExists(dataFrameIndex));
234234

235235
// we expect 21 documents as there shall be 21 days worth of docs
236236
Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");
@@ -301,9 +301,9 @@ public void testPivotWithMaxOnDateField() throws Exception {
301301
createDataframeTransformRequest.setJsonEntity(config);
302302
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
303303
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
304-
assertTrue(indexExists(dataFrameIndex));
305304

306305
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
306+
assertTrue(indexExists(dataFrameIndex));
307307

308308
// we expect 21 documents as there shall be 21 days worth of docs
309309
Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");
@@ -351,9 +351,9 @@ public void testPivotWithScriptedMetricAgg() throws Exception {
351351
createDataframeTransformRequest.setJsonEntity(config);
352352
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
353353
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
354-
assertTrue(indexExists(dataFrameIndex));
355354

356355
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
356+
assertTrue(indexExists(dataFrameIndex));
357357

358358
// we expect 27 documents as there shall be 27 user_id's
359359
Map<String, Object> indexStats = getAsMap(dataFrameIndex + "/_stats");

x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@ protected void createPivotReviewsTransform(String transformId, String dataFrameI
164164

165165
Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
166166
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
167-
assertTrue(indexExists(dataFrameIndex));
168167
}
169168

170169
protected void startDataframeTransform(String transformId, boolean force) throws IOException {
@@ -195,6 +194,7 @@ protected void startAndWaitForTransform(String transformId, String dataFrameInde
195194
protected void startAndWaitForTransform(String transformId, String dataFrameIndex, String authHeader) throws Exception {
196195
// start the transform
197196
startDataframeTransform(transformId, false, authHeader);
197+
assertTrue(indexExists(dataFrameIndex));
198198
// wait until the dataframe has been created and all data is available
199199
waitForDataFrameCheckpoint(transformId);
200200
refreshIndex(dataFrameIndex);

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.elasticsearch.watcher.ResourceWatcherService;
4141
import org.elasticsearch.xpack.core.XPackPlugin;
4242
import org.elasticsearch.xpack.core.XPackSettings;
43-
import org.elasticsearch.xpack.core.common.notifications.Auditor;
4443
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
4544
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
4645
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
@@ -49,7 +48,6 @@
4948
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
5049
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
5150
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
52-
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
5351
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
5452
import org.elasticsearch.xpack.dataframe.action.TransportDeleteDataFrameTransformAction;
5553
import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsAction;
@@ -60,6 +58,7 @@
6058
import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformTaskAction;
6159
import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction;
6260
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
61+
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
6362
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
6463
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
6564
import org.elasticsearch.xpack.dataframe.rest.action.RestDeleteDataFrameTransformAction;
@@ -85,7 +84,6 @@
8584
import java.util.function.UnaryOperator;
8685

8786
import static java.util.Collections.emptyList;
88-
import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
8987

9088
public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlugin {
9189

@@ -102,7 +100,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
102100
private final Settings settings;
103101
private final boolean transportClientMode;
104102
private final SetOnce<DataFrameTransformsConfigManager> dataFrameTransformsConfigManager = new SetOnce<>();
105-
private final SetOnce<Auditor<DataFrameAuditMessage>> dataFrameAuditor = new SetOnce<>();
103+
private final SetOnce<DataFrameAuditor> dataFrameAuditor = new SetOnce<>();
106104
private final SetOnce<DataFrameTransformsCheckpointService> dataFrameTransformsCheckpointService = new SetOnce<>();
107105
private final SetOnce<SchedulerEngine> schedulerEngine = new SetOnce<>();
108106

@@ -184,11 +182,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
184182
if (enabled == false || transportClientMode) {
185183
return emptyList();
186184
}
187-
dataFrameAuditor.set(new Auditor<>(client,
188-
clusterService.getNodeName(),
189-
DataFrameInternalIndex.AUDIT_INDEX,
190-
DATA_FRAME_ORIGIN,
191-
DataFrameAuditMessage.builder()));
185+
dataFrameAuditor.set(new DataFrameAuditor(client, clusterService.getNodeName()));
192186
dataFrameTransformsConfigManager.set(new DataFrameTransformsConfigManager(client, xContentRegistry));
193187
dataFrameTransformsCheckpointService.set(new DataFrameTransformsCheckpointService(client, dataFrameTransformsConfigManager.get()));
194188

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java

Lines changed: 13 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import org.elasticsearch.ElasticsearchStatusException;
1212
import org.elasticsearch.ResourceAlreadyExistsException;
1313
import org.elasticsearch.action.ActionListener;
14-
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
15-
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
1614
import org.elasticsearch.action.support.ActionFilters;
1715
import org.elasticsearch.action.support.IndicesOptions;
1816
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@@ -50,7 +48,6 @@
5048
import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges;
5149
import org.elasticsearch.xpack.core.security.support.Exceptions;
5250
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
53-
import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex;
5451
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
5552

5653
import java.io.IOException;
@@ -117,18 +114,6 @@ protected void masterOperation(Request request, ClusterState clusterState, Actio
117114
return;
118115
}
119116

120-
final String[] dest = indexNameExpressionResolver.concreteIndexNames(clusterState,
121-
IndicesOptions.lenientExpandOpen(),
122-
config.getDestination().getIndex());
123-
124-
if (dest.length > 0) {
125-
listener.onFailure(new ElasticsearchStatusException(
126-
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_INDEX_ALREADY_EXISTS,
127-
config.getDestination().getIndex()),
128-
RestStatus.BAD_REQUEST));
129-
return;
130-
}
131-
132117
for(String src : config.getSource().getIndex()) {
133118
if (indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src).length == 0) {
134119
listener.onFailure(new ElasticsearchStatusException(
@@ -145,9 +130,19 @@ protected void masterOperation(Request request, ClusterState clusterState, Actio
145130
.indices(config.getSource().getIndex())
146131
.privileges("read")
147132
.build();
133+
String[] destPrivileges = new String[3];
134+
destPrivileges[0] = "read";
135+
destPrivileges[1] = "index";
136+
// If the destination index does not exist, we can assume that we may have to create it on start.
137+
// We should check that the creating user has the privileges to create the index.
138+
if (indexNameExpressionResolver.concreteIndexNames(clusterState,
139+
IndicesOptions.lenientExpandOpen(),
140+
config.getDestination().getIndex()).length == 0) {
141+
destPrivileges[2] = "create_index";
142+
}
148143
RoleDescriptor.IndicesPrivileges destIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
149144
.indices(config.getDestination().getIndex())
150-
.privileges("read", "index", "create_index")
145+
.privileges(destPrivileges)
151146
.build();
152147

153148
HasPrivilegesRequest privRequest = new HasPrivilegesRequest();
@@ -202,41 +197,12 @@ private void putDataFrame(DataFrameTransformConfig config, ActionListener<Respon
202197
// <5> Return the listener, or clean up destination index on failure.
203198
ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap(
204199
putTransformConfigurationResult -> listener.onResponse(new Response(true)),
205-
putTransformConfigurationException ->
206-
ClientHelper.executeAsyncWithOrigin(client,
207-
ClientHelper.DATA_FRAME_ORIGIN,
208-
DeleteIndexAction.INSTANCE,
209-
new DeleteIndexRequest(config.getDestination().getIndex()), ActionListener.wrap(
210-
deleteIndexResponse -> listener.onFailure(putTransformConfigurationException),
211-
deleteIndexException -> {
212-
String msg = "Failed to delete destination index after creating transform [" + config.getId() + "] failed";
213-
listener.onFailure(
214-
new ElasticsearchStatusException(msg,
215-
RestStatus.INTERNAL_SERVER_ERROR,
216-
putTransformConfigurationException));
217-
})
218-
)
200+
listener::onFailure
219201
);
220202

221203
// <4> Put our transform
222-
ActionListener<Boolean> createDestinationIndexListener = ActionListener.wrap(
223-
createIndexResult -> dataFrameTransformsConfigManager.putTransformConfiguration(config, putTransformConfigurationListener),
224-
createDestinationIndexException -> listener.onFailure(
225-
new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_CREATE_DEST_INDEX,
226-
createDestinationIndexException))
227-
);
228-
229-
// <3> Create the destination index
230-
ActionListener<Map<String, String>> deduceMappingsListener = ActionListener.wrap(
231-
mappings -> DataframeIndex.createDestinationIndex(client, config, mappings, createDestinationIndexListener),
232-
deduceTargetMappingsException -> listener.onFailure(
233-
new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS,
234-
deduceTargetMappingsException))
235-
);
236-
237-
// <2> Deduce our mappings for the destination index
238204
ActionListener<Boolean> pivotValidationListener = ActionListener.wrap(
239-
validationResult -> pivot.deduceMappings(client, deduceMappingsListener),
205+
validationResult -> dataFrameTransformsConfigManager.putTransformConfiguration(config, putTransformConfigurationListener),
240206
validationException -> listener.onFailure(
241207
new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION,
242208
validationException))

0 commit comments

Comments
 (0)