Skip to content

Commit fc7ef77

Browse files
committed
Calculate feature counts using aggs, not hits
1 parent cca73d5 commit fc7ef77

File tree

4 files changed

+44
-25
lines changed

4 files changed

+44
-25
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ public final class TransformInternalIndexConstants {
3131
public static final String TRANSFORM_PREFIX_DEPRECATED = ".data-frame-";
3232

3333
// version is not a rollover pattern, however padded because sort is string based
34-
public static final Version INDEX_VERSION_LAST_CHANGED = Version.V_7_12_0;
35-
public static final String INDEX_VERSION = "006";
34+
public static final Version INDEX_VERSION_LAST_CHANGED = Version.V_7_13_0;
35+
public static final String INDEX_VERSION = "007";
3636
public static final String INDEX_PATTERN = TRANSFORM_PREFIX + "internal-";
3737
public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION;
3838
public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME;

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void testUsage() throws Exception {
5555
assertEquals(4, XContentMapValues.extractValue("transform.transforms.stopped", usageAsMap));
5656
assertEquals(3, XContentMapValues.extractValue("transform.feature_counts.pivot", usageAsMap));
5757
assertEquals(1, XContentMapValues.extractValue("transform.feature_counts.latest", usageAsMap));
58-
assertNull(XContentMapValues.extractValue("transform.feature_counts.retention_policy", usageAsMap));
58+
assertEquals(0, XContentMapValues.extractValue("transform.feature_counts.retention_policy", usageAsMap));
5959
assertEquals(1, XContentMapValues.extractValue("transform.feature_counts.sync", usageAsMap));
6060

6161
startAndWaitForTransform("test_usage", "pivot_reviews");
@@ -111,7 +111,7 @@ public void testUsage() throws Exception {
111111
assertEquals(1, XContentMapValues.extractValue("transform.transforms.started", statsMap));
112112
assertEquals(3, XContentMapValues.extractValue("transform.feature_counts.pivot", statsMap));
113113
assertEquals(1, XContentMapValues.extractValue("transform.feature_counts.latest", statsMap));
114-
assertNull(XContentMapValues.extractValue("transform.feature_counts.retention_policy", statsMap));
114+
assertEquals(0, XContentMapValues.extractValue("transform.feature_counts.retention_policy", statsMap));
115115
assertEquals(1, XContentMapValues.extractValue("transform.feature_counts.sync", statsMap));
116116
for (String statName : PROVIDED_STATS) {
117117
// the trigger count can be off: e.g. if the scheduler kicked in before usage has been called,
@@ -144,7 +144,7 @@ public void testUsage() throws Exception {
144144
assertEquals(4, XContentMapValues.extractValue("transform.transforms.stopped", usageAsMap));
145145
assertEquals(3, XContentMapValues.extractValue("transform.feature_counts.pivot", usageAsMap));
146146
assertEquals(1, XContentMapValues.extractValue("transform.feature_counts.latest", usageAsMap));
147-
assertNull(XContentMapValues.extractValue("transform.feature_counts.retention_policy", usageAsMap));
147+
assertEquals(0, XContentMapValues.extractValue("transform.feature_counts.retention_policy", usageAsMap));
148148
assertEquals(1, XContentMapValues.extractValue("transform.feature_counts.sync", usageAsMap));
149149
}
150150

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformUsageTransportAction.java

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import org.elasticsearch.index.query.QueryBuilders;
2424
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
2525
import org.elasticsearch.protocol.xpack.XPackUsageRequest;
26-
import org.elasticsearch.search.SearchHit;
27-
import org.elasticsearch.search.SearchHits;
26+
import org.elasticsearch.search.aggregations.AggregationBuilders;
27+
import org.elasticsearch.search.aggregations.Aggregations;
28+
import org.elasticsearch.search.aggregations.bucket.filter.Filters;
29+
import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregator;
2830
import org.elasticsearch.tasks.Task;
2931
import org.elasticsearch.threadpool.ThreadPool;
3032
import org.elasticsearch.transport.TransportService;
@@ -48,10 +50,14 @@
4850
import java.util.Map;
4951
import java.util.stream.Stream;
5052

53+
import static java.util.stream.Collectors.toMap;
54+
5155
public class TransformUsageTransportAction extends XPackUsageFeatureTransportAction {
5256

5357
private static final Logger logger = LogManager.getLogger(TransformUsageTransportAction.class);
5458

59+
private static final String FEATURE_COUNTS = "feature_counts";
60+
5561
/**
5662
* Features we want to measure the usage of.
5763
*
@@ -130,7 +136,7 @@ protected void masterOperation(
130136
return;
131137
}
132138
transformsCountByState.merge(TransformTaskState.STOPPED.value(), totalTransforms - taskCount, Long::sum);
133-
transformsCountByFeature.set(getFeatureCounts(transformCountSuccess.getHits()));
139+
transformsCountByFeature.set(getFeatureCounts(transformCountSuccess.getAggregations()));
134140
TransformInfoTransportAction.getStatisticSummations(client, totalStatsListener);
135141
}, transformCountFailure -> {
136142
if (transformCountFailure instanceof ResourceNotFoundException) {
@@ -145,14 +151,23 @@ protected void masterOperation(
145151
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
146152
)
147153
.setTrackTotalHits(true)
148-
// We don't need the whole configs but only fields related to features we want to measure the usage of.
149-
.setFetchSource(FEATURES, null)
154+
// We only need the total hits count and aggs.
155+
.setSize(0)
156+
.setFetchSource(false)
150157
.setQuery(
151158
QueryBuilders.constantScoreQuery(
152159
QueryBuilders.boolQuery()
153160
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformConfig.NAME))
154161
)
155162
)
163+
.addAggregation(
164+
AggregationBuilders.filters(
165+
FEATURE_COUNTS,
166+
Arrays.stream(FEATURES)
167+
.map(f -> new FiltersAggregator.KeyedFilter(f, QueryBuilders.existsQuery(f)))
168+
.toArray(FiltersAggregator.KeyedFilter[]::new)
169+
)
170+
)
156171
.request();
157172

158173
ClientHelper.executeAsyncWithOrigin(
@@ -167,23 +182,12 @@ protected void masterOperation(
167182
/**
168183
* Returns the feature usage map.
169184
* For each feature it counts the number of transforms using this feature.
170-
* If the feature is not used by any transform, there is no corresponding entry in the returned map.
171185
*
172-
* TODO: Should the counts be obtained using ES query/aggregation instead? Which fields need to have mappings in order to allow that?
173-
*
174-
* @param hits hits returned by the search
186+
* @param aggs aggs returned by the search
175187
* @return feature usage map
176188
*/
177-
private static Map<String, Long> getFeatureCounts(SearchHits hits) {
178-
Map<String, Long> transformsCountByFeature = new HashMap<>();
179-
for (SearchHit hit : hits) {
180-
Map<String, Object> source = hit.getSourceAsMap();
181-
for (String feature : FEATURES) {
182-
if (source.containsKey(feature)) {
183-
transformsCountByFeature.merge(feature, 1L, Long::sum);
184-
}
185-
}
186-
}
187-
return transformsCountByFeature;
189+
private static Map<String, Long> getFeatureCounts(Aggregations aggs) {
190+
Filters filters = aggs.get(FEATURE_COUNTS);
191+
return filters.getBuckets().stream().collect(toMap(Filters.Bucket::getKeyAsString, Filters.Bucket::getDocCount));
188192
}
189193
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
3535
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
3636
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
37+
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
3738
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
3839
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
3940
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
@@ -65,6 +66,7 @@ public final class TransformInternalIndex {
6566
* checkpoint::checkpoint
6667
* version 5 (7.7): stats::processing_time_in_ms, stats::processing_total
6768
* version 6 (7.12):stats::delete_time_in_ms, stats::documents_deleted
69+
* version 7 (7.13):add mapping for config::pivot, config::latest, config::retention_policy and config::sync
6870
*/
6971

7072
// constants for mappings
@@ -84,6 +86,7 @@ public final class TransformInternalIndex {
8486
public static final String LONG = "long";
8587
public static final String KEYWORD = "keyword";
8688
public static final String BOOLEAN = "boolean";
89+
public static final String FLATTENED = "flattened";
8790

8891
public static SystemIndexDescriptor getSystemIndexDescriptor() throws IOException {
8992
return SystemIndexDescriptor.builder()
@@ -319,6 +322,18 @@ public static XContentBuilder addTransformsConfigMappings(XContentBuilder builde
319322
.endObject()
320323
.startObject(TransformField.CREATE_TIME.getPreferredName())
321324
.field(TYPE, DATE)
325+
.endObject()
326+
.startObject(TransformConfig.Function.PIVOT.getParseField().getPreferredName())
327+
.field(TYPE, FLATTENED)
328+
.endObject()
329+
.startObject(TransformConfig.Function.LATEST.getParseField().getPreferredName())
330+
.field(TYPE, FLATTENED)
331+
.endObject()
332+
.startObject(TransformField.RETENTION_POLICY.getPreferredName())
333+
.field(TYPE, FLATTENED)
334+
.endObject()
335+
.startObject(TransformField.SYNC.getPreferredName())
336+
.field(TYPE, FLATTENED)
322337
.endObject();
323338
}
324339

0 commit comments

Comments
 (0)