|
5 | 5 | */ |
6 | 6 | package org.elasticsearch.xpack.ml.integration; |
7 | 7 |
|
8 | | -import org.elasticsearch.action.DocWriteRequest; |
9 | | -import org.elasticsearch.action.bulk.BulkItemResponse; |
10 | 8 | import org.elasticsearch.action.bulk.BulkRequestBuilder; |
11 | 9 | import org.elasticsearch.action.bulk.BulkResponse; |
12 | 10 | import org.elasticsearch.action.index.IndexRequest; |
13 | 11 | import org.elasticsearch.action.support.WriteRequest; |
14 | | -import org.elasticsearch.common.Strings; |
15 | 12 | import org.elasticsearch.common.unit.TimeValue; |
16 | 13 | import org.elasticsearch.search.aggregations.AggregationBuilders; |
17 | 14 | import org.elasticsearch.search.aggregations.AggregatorFactories; |
|
25 | 22 | import org.elasticsearch.xpack.core.ml.job.config.DataDescription; |
26 | 23 | import org.elasticsearch.xpack.core.ml.job.config.Detector; |
27 | 24 | import org.elasticsearch.xpack.core.ml.job.config.Job; |
28 | | -import org.elasticsearch.xpack.core.ml.job.config.JobState; |
29 | | -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; |
30 | 25 | import org.elasticsearch.xpack.core.ml.job.results.Bucket; |
31 | 26 | import org.junit.After; |
32 | 27 |
|
33 | | -import java.time.ZonedDateTime; |
34 | | -import java.time.temporal.ChronoUnit; |
35 | 28 | import java.util.Collections; |
36 | | -import java.util.HashMap; |
37 | 29 | import java.util.List; |
38 | | -import java.util.Map; |
39 | 30 | import java.util.concurrent.TimeUnit; |
40 | 31 |
|
41 | | -import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts; |
42 | 32 | import static org.hamcrest.Matchers.equalTo; |
43 | 33 | import static org.hamcrest.Matchers.greaterThan; |
44 | 34 | import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
@@ -148,112 +138,4 @@ public void testRealtime() throws Exception { |
148 | 138 | } |
149 | 139 | } |
150 | 140 | } |
151 | | - |
152 | | - public void testLookbackOnly_WithRuntimeMapping() throws Exception { |
153 | | - String indexName = "df-data"; |
154 | | - client().admin().indices().prepareCreate(indexName) |
155 | | - .setMapping("time", "type=date", "metric", "type=double") |
156 | | - .get(); |
157 | | - |
158 | | - ZonedDateTime startOfDay = ZonedDateTime.now().toLocalDate().atStartOfDay(ZonedDateTime.now().getZone()); |
159 | | - long endTime = startOfDay.toEpochSecond() * 1000; |
160 | | - long startTime = startOfDay.minus(2, ChronoUnit.DAYS).toEpochSecond() * 1000L; |
161 | | - long bucketSize = 3600000; |
162 | | - long numDocsPerBucket = 2L; |
163 | | - indexDocs(indexName, numDocsPerBucket, startTime, endTime, bucketSize); |
164 | | - |
165 | | - DataDescription.Builder dataDescription = new DataDescription.Builder(); |
166 | | - dataDescription.setTimeFormat("yyyy-MM-dd HH:mm:ss"); |
167 | | - |
168 | | - Detector.Builder d = new Detector.Builder("sum", "metric_sum"); |
169 | | - AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); |
170 | | - analysisConfig.setBucketSpan(TimeValue.timeValueHours(1)); |
171 | | - analysisConfig.setSummaryCountFieldName("doc_count"); |
172 | | - |
173 | | - Job.Builder jobBuilder = new Job.Builder(); |
174 | | - jobBuilder.setId("lookback-job-with-rt-fields-agg"); |
175 | | - jobBuilder.setAnalysisConfig(analysisConfig); |
176 | | - jobBuilder.setDataDescription(dataDescription); |
177 | | - |
178 | | - registerJob(jobBuilder); |
179 | | - putJob(jobBuilder); |
180 | | - openJob(jobBuilder.getId()); |
181 | | - assertBusy(() -> assertEquals(getJobStats(jobBuilder.getId()).get(0).getState(), JobState.OPENED)); |
182 | | - |
183 | | - DatafeedConfig.Builder dfBuilder = new DatafeedConfig.Builder(jobBuilder.getId() + "-datafeed", jobBuilder.getId()); |
184 | | - dfBuilder.setIndices(Collections.singletonList(indexName)); |
185 | | - |
186 | | - // aggregate on a runtime field |
187 | | - AggregatorFactories.Builder aggs = new AggregatorFactories.Builder(); |
188 | | - aggs.addAggregator(AggregationBuilders.dateHistogram("time").field("time") |
189 | | - .fixedInterval(new DateHistogramInterval("1h")) |
190 | | - .subAggregation(AggregationBuilders.max("time").field("time")) |
191 | | - .subAggregation(AggregationBuilders.sum("metric_sum").field("metric_percent")) |
192 | | - ); |
193 | | - |
194 | | - dfBuilder.setParsedAggregations(aggs); |
195 | | - |
196 | | - Map<String, Object> properties = new HashMap<>(); |
197 | | - properties.put("type", "double"); |
198 | | - properties.put("script", "emit(doc['metric'].value * 100.0)"); |
199 | | - Map<String, Object> fields = new HashMap<>(); |
200 | | - fields.put("metric_percent", properties); |
201 | | - dfBuilder.setRuntimeMappings(fields); |
202 | | - |
203 | | - DatafeedConfig datafeedConfig = dfBuilder.build(); |
204 | | - |
205 | | - registerDatafeed(datafeedConfig); |
206 | | - putDatafeed(datafeedConfig); |
207 | | - |
208 | | - startDatafeed(datafeedConfig.getId(), 0L, endTime); |
209 | | - long expectedNumberOfHistoBuckets = ((endTime - startTime) / bucketSize); |
210 | | - assertBusy(() -> { |
211 | | - GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId()); |
212 | | - GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); |
213 | | - assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED)); |
214 | | - |
215 | | - DataCounts dataCounts = getDataCounts(jobBuilder.getId()); |
216 | | - System.out.println(Strings.toString(dataCounts)); |
217 | | - assertThat(dataCounts.getProcessedRecordCount(), equalTo(expectedNumberOfHistoBuckets)); |
218 | | - assertThat(dataCounts.getBucketCount(), equalTo(expectedNumberOfHistoBuckets -1)); |
219 | | - assertThat(dataCounts.getInputFieldCount(), equalTo(expectedNumberOfHistoBuckets * 2)); |
220 | | - assertThat(dataCounts.getMissingFieldCount(), equalTo(0L)); |
221 | | - assertThat(dataCounts.getEmptyBucketCount(), equalTo(0L)); |
222 | | - }, 60, TimeUnit.SECONDS); |
223 | | - |
224 | | - waitUntilJobIsClosed(jobBuilder.getId()); |
225 | | - } |
226 | | - |
227 | | - private void indexDocs(String index, long numDocs, long start, long end, long bucketSize) { |
228 | | - BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); |
229 | | - long numBuckets = (end - start) / bucketSize; |
230 | | - for (long i = 0; i < numBuckets; i++) { |
231 | | - for (long j = 0; j < numDocs; j++) { |
232 | | - IndexRequest indexRequest = new IndexRequest(index); |
233 | | - long timestamp = start + randomLongBetween(1, bucketSize - 1); |
234 | | - double value = randomDoubleBetween(0.0, 1.0, true); |
235 | | - indexRequest.source("time", timestamp, "metric", value).opType(DocWriteRequest.OpType.CREATE); |
236 | | - bulkRequestBuilder.add(indexRequest); |
237 | | - } |
238 | | - |
239 | | - start += bucketSize; |
240 | | - } |
241 | | - assertThat(start, equalTo(end)); |
242 | | - |
243 | | - BulkResponse bulkResponse = bulkRequestBuilder |
244 | | - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) |
245 | | - .get(); |
246 | | - |
247 | | - if (bulkResponse.hasFailures()) { |
248 | | - int failures = 0; |
249 | | - for (BulkItemResponse itemResponse : bulkResponse) { |
250 | | - if (itemResponse.isFailed()) { |
251 | | - failures++; |
252 | | - logger.error("Item response failure [{}]", itemResponse.getFailureMessage()); |
253 | | - } |
254 | | - } |
255 | | - fail("Bulk response contained " + failures + " failures"); |
256 | | - } |
257 | | - logger.info("Indexed [{}] documents", bulkRequestBuilder.numberOfActions()); |
258 | | - } |
259 | 141 | } |
0 commit comments