diff --git a/docs/reference/search/terms-enum.asciidoc b/docs/reference/search/terms-enum.asciidoc new file mode 100644 index 0000000000000..93b2a9b356b7d --- /dev/null +++ b/docs/reference/search/terms-enum.asciidoc @@ -0,0 +1,97 @@ +[[search-terms-enum]] +=== Terms enum API + +The terms enum API can be used to discover terms in the index that match +a partial string. This is used for auto-complete: + +[source,console] +-------------------------------------------------- +POST stackoverflow/_terms_enum +{ + "field" : "tags", + "string" : "kiba" +} +-------------------------------------------------- +// TEST[setup:stackoverflow] + + +The API returns the following response: + +[source,console-result] +-------------------------------------------------- +{ + "_shards": { + "total": 1, + "successful": 1, + "failed": 0 + }, + "terms": [ + "kibana" + ], + "complete" : true +} +-------------------------------------------------- + +The "complete" flag is false if time or space constraints were met and the +set of terms examined was not the full set of available values. + +[[search-terms-enum-api-request]] +==== {api-request-title} + +`GET //_terms_enum` + + +[[search-terms-enum-api-desc]] +==== {api-description-title} + +The termsenum API can be used to discover terms in the index that begin with the provided +string. It is designed for low-latency look-ups used in auto-complete scenarios. + + +[[search-terms-enum-api-path-params]] +==== {api-path-parms-title} + +``:: +(Mandatory, string) +Comma-separated list of data streams, indices, and index aliases to search. +Wildcard (`*`) expressions are supported. ++ +To search all data streams or indices in a cluster, omit this parameter or use +`_all` or `*`. + +[[search-terms-enum-api-request-body]] +==== {api-request-body-title} + +[[terms-enum-field-param]] +`field`:: +(Mandatory, string) +Which field to match + +[[terms-enum-string-param]] +`string`:: +(Mandatory, string) +The string to match at the start of indexed terms + +[[terms-enum-size-param]] +`size`:: +(Optional, integer) +How many matching terms to return. Defaults to 10 + +[[terms-enum-timeout-param]] +`timeout`:: +(Optional, <>) +The maximum length of time to spend collecting results. Defaults to "1s" (one second). +If the timeout is exceeded the `complete` flag set to false in the response and the results may +be partial or empty. + +[[terms-enum-case_insensitive-param]] +`case_insensitive`:: +(Optional, boolean) +When true the provided search string is matched against index terms without case sensitivity. +Defaults to false. + +[[terms-enum-index_filter-param]] +`index_filter`:: +(Optional, <> Allows to filter an index shard if the provided +query rewrites to `match_none`. + diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/termsenum.json b/rest-api-spec/src/main/resources/rest-api-spec/api/termsenum.json new file mode 100644 index 0000000000000..ea99cdec0ad19 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/termsenum.json @@ -0,0 +1,35 @@ +{ + "termsenum":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/terms-enum.html", + "description": "The terms enum API can be used to discover terms in the index that begin with the provided string. It is designed for low-latency look-ups used in auto-complete scenarios." + }, + "stability":"beta", + "visibility":"public", + "headers":{ + "accept": [ "application/json"], + "content_type": ["application/json"] + }, + "url":{ + "paths":[ + { + "path": "/{index}/_terms_enum", + "methods": [ + "GET", + "POST" + ], + "parts": { + "index": { + "type": "list", + "description": "A comma-separated list of index names to search; use `_all` or empty string to perform the operation on all indices" + } + } + } + ] + }, + "params":{}, + "body":{ + "description":"field name, string which is the prefix expected in matching terms, timeout and size for max number of results" + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java index 1842d5afc0387..746c9700a8125 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/KeywordFieldMapper.java @@ -17,8 +17,18 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.MultiTermQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.MultiTerms; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.automaton.Automata; +import org.apache.lucene.util.automaton.Automaton; +import org.apache.lucene.util.automaton.CompiledAutomaton; +import org.apache.lucene.util.automaton.MinimizationOperations; +import org.apache.lucene.util.automaton.Operations; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.search.AutomatonQueries; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.analysis.IndexAnalyzers; import org.elasticsearch.index.analysis.NamedAnalyzer; @@ -248,6 +258,25 @@ public KeywordFieldType(String name, NamedAnalyzer analyzer) { this.scriptValues = null; } + @Override + public TermsEnum getTerms(boolean caseInsensitive, String string, SearchExecutionContext queryShardContext) throws IOException { + IndexReader reader = queryShardContext.searcher().getTopReaderContext().reader(); + + Terms terms = MultiTerms.getTerms(reader, name()); + if (terms == null) { + // Field does not exist on this shard. + return null; + } + Automaton a = caseInsensitive + ? AutomatonQueries.caseInsensitivePrefix(string) + : Automata.makeString(string); + a = Operations.concatenate(a, Automata.makeAnyString()); + a = MinimizationOperations.minimize(a, Integer.MAX_VALUE); + + CompiledAutomaton automaton = new CompiledAutomaton(a); + return automaton.getTermsEnum(terms); + } + @Override public String typeName() { return CONTENT_TYPE; @@ -470,4 +499,6 @@ protected String contentType() { public FieldMapper.Builder getMergeBuilder() { return new Builder(simpleName(), indexAnalyzers, scriptCompiler).init(this); } + + } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java b/server/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java index e7bee7de8aac6..861fd0ac24971 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/MappedFieldType.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.PrefixCodedTerms; import org.apache.lucene.index.PrefixCodedTerms.TermIterator; import org.apache.lucene.index.Term; +import org.apache.lucene.index.TermsEnum; import org.apache.lucene.queries.intervals.IntervalsSource; import org.apache.lucene.search.BooleanClause.Occur; import org.apache.lucene.search.BooleanQuery; @@ -429,4 +430,20 @@ public enum CollapseType { KEYWORD, NUMERIC } + + /** + * This method is used to support auto-complete services and implementations + * are expected to find terms beginning with the provided string very quickly. + * If fields cannot look up matching terms quickly they should return null. + * The returned TermEnum should implement next(), term() and doc_freq() methods + * but postings etc are not required. + * @param caseInsensitive if matches should be case insensitive + * @param string the partially complete word the user has typed (can be empty) + * @param queryShardContext the shard context + * @return null or an enumeration of matching terms and their doc frequencies + * @throws IOException Errors accessing data + */ + public TermsEnum getTerms(boolean caseInsensitive, String string, SearchExecutionContext queryShardContext) throws IOException { + return null; + } } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/flattened/FlattenedFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/flattened/FlattenedFieldMapper.java index 5c132b2eaae09..26b4af9300e3f 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/flattened/FlattenedFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/flattened/FlattenedFieldMapper.java @@ -9,14 +9,27 @@ package org.elasticsearch.index.mapper.flattened; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.ImpactsEnum; +import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.MultiTerms; import org.apache.lucene.index.OrdinalMap; +import org.apache.lucene.index.PostingsEnum; import org.apache.lucene.index.Term; +import org.apache.lucene.index.TermState; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.MultiTermQuery; import org.apache.lucene.search.PrefixQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.SortField; +import org.apache.lucene.util.AttributeSource; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.automaton.Automata; +import org.apache.lucene.util.automaton.Automaton; +import org.apache.lucene.util.automaton.CompiledAutomaton; +import org.apache.lucene.util.automaton.MinimizationOperations; +import org.apache.lucene.util.automaton.Operations; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.search.AutomatonQueries; import org.elasticsearch.common.unit.Fuzziness; @@ -241,6 +254,29 @@ public Query wildcardQuery(String value, public Query termQueryCaseInsensitive(Object value, SearchExecutionContext context) { return AutomatonQueries.caseInsensitiveTermQuery(new Term(name(), indexedValueForSearch(value))); } + + @Override + public TermsEnum getTerms(boolean caseInsensitive, String string, SearchExecutionContext queryShardContext) throws IOException { + IndexReader reader = queryShardContext.searcher().getTopReaderContext().reader(); + Terms terms = MultiTerms.getTerms(reader, name()); + if (terms == null) { + // Field does not exist on this shard. + return null; + } + + Automaton a = Automata.makeString(key + FlattenedFieldParser.SEPARATOR); + if (caseInsensitive) { + a = Operations.concatenate(a, AutomatonQueries.caseInsensitivePrefix(string)); + } else { + a = Operations.concatenate(a, Automata.makeString(string)); + a = Operations.concatenate(a, Automata.makeAnyString()); + } + a = MinimizationOperations.minimize(a, Integer.MAX_VALUE); + + CompiledAutomaton automaton = new CompiledAutomaton(a); + // Wrap result in a class that strips field names from discovered terms + return new TranslatingTermsEnum(automaton.getTermsEnum(terms)); + } @Override public BytesRef indexedValueForSearch(Object value) { @@ -270,6 +306,95 @@ public ValueFetcher valueFetcher(SearchExecutionContext context, String format) return SourceValueFetcher.identity(rootName + "." + key, context, format); } } + + + // Wraps a raw Lucene TermsEnum to strip values of fieldnames + static class TranslatingTermsEnum extends TermsEnum { + TermsEnum delegate; + + TranslatingTermsEnum(TermsEnum delegate) { + this.delegate = delegate; + } + + @Override + public BytesRef next() throws IOException { + // Strip the term of the fieldname value + BytesRef result = delegate.next(); + if (result != null) { + result = FlattenedFieldParser.extractValue(result); + } + return result; + } + + @Override + public BytesRef term() throws IOException { + // Strip the term of the fieldname value + BytesRef result = delegate.term(); + if (result != null) { + result = FlattenedFieldParser.extractValue(result); + } + return result; + } + + + @Override + public int docFreq() throws IOException { + return delegate.docFreq(); + } + + //=============== All other TermsEnum methods not supported ================= + + @Override + public AttributeSource attributes() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean seekExact(BytesRef text) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public SeekStatus seekCeil(BytesRef text) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void seekExact(long ord) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void seekExact(BytesRef term, TermState state) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long ord() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long totalTermFreq() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ImpactsEnum impacts(int flags) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public TermState termState() throws IOException { + throw new UnsupportedOperationException(); + } + + } /** * A field data implementation that gives access to the values associated with diff --git a/server/src/main/java/org/elasticsearch/index/mapper/flattened/FlattenedFieldParser.java b/server/src/main/java/org/elasticsearch/index/mapper/flattened/FlattenedFieldParser.java index 922d45ca7cf88..e9aaf7cc9c91b 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/flattened/FlattenedFieldParser.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/flattened/FlattenedFieldParser.java @@ -166,4 +166,15 @@ static BytesRef extractKey(BytesRef keyedValue) { } return new BytesRef(keyedValue.bytes, keyedValue.offset, length); } + + static BytesRef extractValue(BytesRef keyedValue) { + int length; + for (length = 0; length < keyedValue.length; length++){ + if (keyedValue.bytes[keyedValue.offset + length] == SEPARATOR_BYTE) { + break; + } + } + int valueStart = keyedValue.offset + length + 1; + return new BytesRef(keyedValue.bytes, valueStart, keyedValue.length - valueStart ); + } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 4c6a2302e3b7d..bdc011de774bd 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -62,6 +62,7 @@ public static class Names { public static final String ANALYZE = "analyze"; public static final String WRITE = "write"; public static final String SEARCH = "search"; + public static final String AUTO_COMPLETE = "auto_complete"; public static final String SEARCH_THROTTLED = "search_throttled"; public static final String MANAGEMENT = "management"; public static final String FLUSH = "flush"; @@ -177,6 +178,10 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000, false)); builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16, false)); builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(allocatedProcessors), 1000, true)); + builders.put( + Names.AUTO_COMPLETE, + new FixedExecutorBuilder(settings, Names.AUTO_COMPLETE, Math.max(allocatedProcessors / 4, 1), 100, true) + ); builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, true)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, boundedBy(allocatedProcessors, 1, 5), TimeValue.timeValueMinutes(5))); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index 9aa531a70d136..4d3712f324df0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -200,6 +200,7 @@ import org.elasticsearch.xpack.core.spatial.SpatialFeatureSetUsage; import org.elasticsearch.xpack.core.sql.SqlFeatureSetUsage; import org.elasticsearch.xpack.core.ssl.action.GetCertificateInfoAction; +import org.elasticsearch.xpack.core.termsenum.action.TermsEnumAction; import org.elasticsearch.xpack.core.textstructure.action.FindStructureAction; import org.elasticsearch.xpack.core.transform.TransformFeatureSetUsage; import org.elasticsearch.xpack.core.transform.TransformField; @@ -413,7 +414,9 @@ public List> getClientActions() { GetAsyncSearchAction.INSTANCE, DeleteAsyncResultAction.INSTANCE, // Text Structure - FindStructureAction.INSTANCE + FindStructureAction.INSTANCE, + // Terms enum API + TermsEnumAction.INSTANCE )); // rollupV2 diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java index ee1132c524bef..e990b29845b18 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java @@ -83,6 +83,9 @@ import org.elasticsearch.xpack.core.ssl.SSLConfigurationReloader; import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.core.transform.TransformMetadata; +import org.elasticsearch.xpack.core.termsenum.action.TermsEnumAction; +import org.elasticsearch.xpack.core.termsenum.action.TransportTermsEnumAction; +import org.elasticsearch.xpack.core.termsenum.rest.RestTermsEnumAction; import org.elasticsearch.xpack.core.watcher.WatcherMetadata; import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants; @@ -291,6 +294,7 @@ public Collection createComponents(Client client, ClusterService cluster actions.add(new ActionHandler<>(XPackUsageAction.INSTANCE, getUsageAction())); actions.addAll(licensing.getActions()); actions.add(new ActionHandler<>(ReloadAnalyzerAction.INSTANCE, TransportReloadAnalyzersAction.class)); + actions.add(new ActionHandler<>(TermsEnumAction.INSTANCE, TransportTermsEnumAction.class)); actions.add(new ActionHandler<>(DeleteAsyncResultAction.INSTANCE, TransportDeleteAsyncResultAction.class)); actions.add(new ActionHandler<>(XPackInfoFeatureAction.DATA_TIERS, DataTiersInfoTransportAction.class)); actions.add(new ActionHandler<>(XPackUsageFeatureAction.DATA_TIERS, DataTiersUsageTransportAction.class)); @@ -330,6 +334,7 @@ public List getRestHandlers(Settings settings, RestController restC handlers.add(new RestXPackInfoAction()); handlers.add(new RestXPackUsageAction()); handlers.add(new RestReloadAnalyzersAction()); + handlers.add(new RestTermsEnumAction()); handlers.addAll(licensing.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings, settingsFilter, indexNameExpressionResolver, nodesInCluster)); return handlers; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/MultiShardTermsEnum.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/MultiShardTermsEnum.java new file mode 100644 index 0000000000000..2a7794df124e6 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/MultiShardTermsEnum.java @@ -0,0 +1,158 @@ +/* @notice + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.elasticsearch.xpack.core.termsenum.action; + +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.PriorityQueue; + +import java.io.IOException; + +/** + * Merges terms and stats from multiple TermEnum classes + * This does a merge sort, by term text. + * Adapted from Lucene's MultiTermsEnum and differs in that: + * 1) Only next(), term() and docFreq() methods are supported + * 2) Doc counts are longs not ints. + * + */ +public final class MultiShardTermsEnum { + + private final TermMergeQueue queue; + private final TermsEnumWithCurrent[] top; + + private int numTop; + private BytesRef current; + + /** Sole constructor. + * @param enums TermsEnums from shards which we should merge + * @throws IOException Errors accessing data + **/ + public MultiShardTermsEnum(TermsEnum[] enums) throws IOException { + queue = new TermMergeQueue(enums.length); + top = new TermsEnumWithCurrent[enums.length]; + numTop = 0; + queue.clear(); + for (int i = 0; i < enums.length; i++) { + final TermsEnum termsEnum = enums[i]; + final BytesRef term = termsEnum.next(); + if (term != null) { + final TermsEnumWithCurrent entry = new TermsEnumWithCurrent(); + entry.current = term; + entry.terms = termsEnum; + queue.add(entry); + } else { + // field has no terms + } + } + } + + public BytesRef term() { + return current; + } + + private void pullTop() { + assert numTop == 0; + numTop = queue.fillTop(top); + current = top[0].current; + } + + private void pushTop() throws IOException { + // call next() on each top, and reorder queue + for (int i = 0; i < numTop; i++) { + TermsEnumWithCurrent top = queue.top(); + top.current = top.terms.next(); + if (top.current == null) { + queue.pop(); + } else { + queue.updateTop(); + } + } + numTop = 0; + } + + public BytesRef next() throws IOException { + pushTop(); + // gather equal top fields + if (queue.size() > 0) { + // TODO: we could maybe defer this somewhat costly operation until one of the APIs that + // needs to see the top is invoked (docFreq, postings, etc.) + pullTop(); + } else { + current = null; + } + + return current; + } + + public long docFreq() throws IOException { + long sum = 0; + for (int i = 0; i < numTop; i++) { + sum += top[i].terms.docFreq(); + } + return sum; + } + + static final class TermsEnumWithCurrent { + TermsEnum terms; + public BytesRef current; + } + + private static final class TermMergeQueue extends PriorityQueue { + final int[] stack; + + TermMergeQueue(int size) { + super(size); + this.stack = new int[size]; + } + + @Override + protected boolean lessThan(TermsEnumWithCurrent termsA, TermsEnumWithCurrent termsB) { + return termsA.current.compareTo(termsB.current) < 0; + } + + /** Add the {@link #top()} slice as well as all slices that are positioned + * on the same term to {@code tops} and return how many of them there are. */ + int fillTop(TermsEnumWithCurrent[] tops) { + final int size = size(); + if (size == 0) { + return 0; + } + tops[0] = top(); + int numTop = 1; + stack[0] = 1; + int stackLen = 1; + + while (stackLen != 0) { + final int index = stack[--stackLen]; + final int leftChild = index << 1; + for (int child = leftChild, end = Math.min(size, leftChild + 1); child <= end; ++child) { + TermsEnumWithCurrent te = get(child); + if (te.current.equals(tops[0].current)) { + tops[numTop++] = te; + stack[stackLen++] = child; + } + } + } + return numTop; + } + + private TermsEnumWithCurrent get(int i) { + return (TermsEnumWithCurrent) getHeapArray()[i]; + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumRequest.java new file mode 100644 index 0000000000000..2dd1b9776ac00 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumRequest.java @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum.action; + +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.transport.TransportRequest; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Internal terms enum request executed directly against a specific node, querying potentially many + * shards in one request + */ +public class NodeTermsEnumRequest extends TransportRequest implements IndicesRequest { + + private String field; + private String string; + private long taskStartedTimeMillis; + private long nodeStartedTimeMillis; + private boolean caseInsensitive; + private int size; + private long timeout; + private final QueryBuilder indexFilter; + private Set shardIds; + private String nodeId; + + + public NodeTermsEnumRequest(StreamInput in) throws IOException { + super(in); + field = in.readString(); + string = in.readString(); + caseInsensitive = in.readBoolean(); + size = in.readVInt(); + timeout = in.readVLong(); + taskStartedTimeMillis = in.readVLong(); + indexFilter = in.readOptionalNamedWriteable(QueryBuilder.class); + nodeId = in.readString(); + int numShards = in.readVInt(); + shardIds = new HashSet<>(numShards); + for (int i = 0; i < numShards; i++) { + shardIds.add(new ShardId(in)); + } + } + + public NodeTermsEnumRequest(final String nodeId, final Set shardIds, TermsEnumRequest request) { + this.field = request.field(); + this.string = request.string(); + this.caseInsensitive = request.caseInsensitive(); + this.size = request.size(); + this.timeout = request.timeout().getMillis(); + this.taskStartedTimeMillis = request.taskStartTimeMillis; + this.indexFilter = request.indexFilter(); + this.nodeId = nodeId; + this.shardIds = shardIds; + + // TODO serialize shard ids + } + + public String field() { + return field; + } + + public String string() { + return string; + } + + public long taskStartedTimeMillis() { + return this.taskStartedTimeMillis; + } + + /** + * The time this request was materialized on a node + */ + long nodeStartedTimeMillis() { + // In case startTimerOnDataNode has not been called (should never happen in normal circumstances?) + if (nodeStartedTimeMillis == 0) { + nodeStartedTimeMillis = System.currentTimeMillis(); + } + return this.nodeStartedTimeMillis; + } + + public void startTimerOnDataNode() { + nodeStartedTimeMillis = System.currentTimeMillis(); + } + + public Set shardIds() { + return Collections.unmodifiableSet(shardIds); + } + + public boolean caseInsensitive() { + return caseInsensitive; + } + + public int size() { + return size; + } + + public long timeout() { + return timeout; + } + public String nodeId() { + return nodeId; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(field); + out.writeString(string); + out.writeBoolean(caseInsensitive); + out.writeVInt(size); + // Adjust the amount of permitted time the shard has remaining to gather terms. + long timeSpentSoFarInCoordinatingNode = System.currentTimeMillis() - taskStartedTimeMillis; + long remainingTimeForShardToUse = (timeout - timeSpentSoFarInCoordinatingNode); + // TODO - if already timed out can we shortcut the trip somehow? Throw exception if remaining time < 0? + out.writeVLong(remainingTimeForShardToUse); + out.writeVLong(taskStartedTimeMillis); + out.writeOptionalNamedWriteable(indexFilter); + out.writeString(nodeId); + out.writeVInt(shardIds.size()); + for (ShardId shardId : shardIds) { + shardId.writeTo(out); + } + } + + public QueryBuilder indexFilter() { + return indexFilter; + } + + @Override + public String[] indices() { + HashSet indicesNames = new HashSet<>(); + for (ShardId shardId : shardIds) { + indicesNames.add(shardId.getIndexName()); + } + return indicesNames.toArray(new String[0]); + } + + @Override + public IndicesOptions indicesOptions() { + return null; + } + + public boolean remove(ShardId shardId) { + return shardIds.remove(shardId); + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumResponse.java new file mode 100644 index 0000000000000..728a6504c0eb3 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumResponse.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum.action; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.List; + +/** + * Internal response of a terms enum request executed directly against a specific shard. + * + * + */ +class NodeTermsEnumResponse extends TransportResponse { + + private String error; + private boolean complete; + + private List terms; + private String nodeId; + + NodeTermsEnumResponse(StreamInput in) throws IOException { + super(in); + terms = in.readList(TermCount::new); + error = in.readOptionalString(); + complete = in.readBoolean(); + nodeId = in.readString(); + } + + NodeTermsEnumResponse(String nodeId, List terms, String error, boolean complete) { + this.nodeId = nodeId; + this.terms = terms; + this.error = error; + this.complete = complete; + } + + public List terms() { + return this.terms; + } + + public String getError() { + return error; + } + + public String getNodeId() { + return nodeId; + } + + public boolean getComplete() { + return complete; + } + + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(terms); + out.writeOptionalString(error); + out.writeBoolean(complete); + out.writeString(nodeId); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/SimpleTermCountEnum.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/SimpleTermCountEnum.java new file mode 100644 index 0000000000000..0f75dd39a0a1e --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/SimpleTermCountEnum.java @@ -0,0 +1,121 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum.action; + +import org.apache.lucene.index.ImpactsEnum; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.TermState; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.AttributeSource; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.index.mapper.MappedFieldType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; + +/** + * A utility class for fields that need to support autocomplete via + * {@link MappedFieldType#getTerms(boolean, String, org.elasticsearch.index.query.SearchExecutionContext)} + * but can't return a raw Lucene TermsEnum. + */ +public class SimpleTermCountEnum extends TermsEnum { + int index =-1; + TermCount[] sortedTerms; + TermCount current = null; + + public SimpleTermCountEnum(TermCount[] terms) { + sortedTerms = Arrays.copyOf(terms, terms.length); + Arrays.sort(sortedTerms, Comparator.comparing(TermCount::getTerm)); + } + + public SimpleTermCountEnum(TermCount termCount) { + sortedTerms = new TermCount[1]; + sortedTerms[0] = termCount; + } + + @Override + public BytesRef term() throws IOException { + if (current == null) { + return null; + } + return new BytesRef(current.getTerm()); + } + + @Override + public BytesRef next() throws IOException { + index++; + if (index >= sortedTerms.length) { + current = null; + } else { + current = sortedTerms[index]; + } + return term(); + } + + @Override + public int docFreq() throws IOException { + if (current == null) { + return 0; + } + return (int) current.getDocCount(); + } + + + //=============== All other TermsEnum methods not supported ================= + + @Override + public AttributeSource attributes() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean seekExact(BytesRef text) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public SeekStatus seekCeil(BytesRef text) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void seekExact(long ord) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void seekExact(BytesRef term, TermState state) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long ord() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long totalTermFreq() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ImpactsEnum impacts(int flags) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public TermState termState() throws IOException { + throw new UnsupportedOperationException(); + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermCount.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermCount.java new file mode 100644 index 0000000000000..9e16d3b8fa8d1 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermCount.java @@ -0,0 +1,99 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum.action; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; + +public class TermCount implements Writeable, ToXContentFragment { + + public static final String TERM_FIELD = "term"; + public static final String DOC_COUNT_FIELD = "doc_count"; + + static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "term_count", + true, + a -> { return new TermCount((String) a[0], (long) a[1]); } + ); + static { + PARSER.declareString(constructorArg(), new ParseField(TERM_FIELD)); + PARSER.declareLong(constructorArg(), new ParseField(DOC_COUNT_FIELD)); + } + + private final String term; + + private long docCount; + + public TermCount(StreamInput in) throws IOException { + term = in.readString(); + docCount = in.readLong(); + } + + public TermCount(String term, long count) { + this.term = term; + this.docCount = count; + } + + public String getTerm() { + return this.term; + } + + public long getDocCount() { + return this.docCount; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(term); + out.writeLong(docCount); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(TERM_FIELD, getTerm()); + builder.field(DOC_COUNT_FIELD, getDocCount()); + return builder; + } + + public static TermCount fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TermCount other = (TermCount) o; + return Objects.equals(getTerm(), other.getTerm()) && Objects.equals(getDocCount(), other.getDocCount()); + } + + @Override + public int hashCode() { + return Objects.hash(getTerm(), getDocCount()); + } + + void addToDocCount(long extra) { + docCount += extra; + } + + @Override + public String toString() { + return term + ":" + docCount; + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumAction.java new file mode 100644 index 0000000000000..acfd46676fb51 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumAction.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum.action; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; + +import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; + +public class TermsEnumAction extends ActionType { + + public static final TermsEnumAction INSTANCE = new TermsEnumAction(); + public static final String NAME = "indices:data/read/xpack/termsenum/list"; + + + static final ParseField INDEX_FILTER = new ParseField("index_filter"); + static final ParseField TIMEOUT = new ParseField("timeout"); + + private TermsEnumAction() { + super(NAME, TermsEnumResponse::new); + } + + public static TermsEnumRequest fromXContent(XContentParser parser, String... indices) throws IOException { + TermsEnumRequest request = new TermsEnumRequest(indices); + PARSER.parse(parser, request, null); + return request; + } + + private static final ObjectParser PARSER = new ObjectParser<>("terms_enum_request"); + static { + PARSER.declareString(TermsEnumRequest::field, new ParseField("field")); + PARSER.declareString(TermsEnumRequest::string, new ParseField("string")); + PARSER.declareInt(TermsEnumRequest::size, new ParseField("size")); + PARSER.declareBoolean(TermsEnumRequest::caseInsensitive, new ParseField("case_insensitive")); + PARSER.declareField(TermsEnumRequest::timeout, + (p, c) -> TimeValue.parseTimeValue(p.text(), TIMEOUT.getPreferredName()), + TIMEOUT, ObjectParser.ValueType.STRING); + PARSER.declareObject(TermsEnumRequest::indexFilter, (p, context) -> parseInnerQueryBuilder(p),INDEX_FILTER); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumRequest.java new file mode 100644 index 0000000000000..159277349a946 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumRequest.java @@ -0,0 +1,176 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilder; + +import java.io.IOException; +import java.util.Arrays; + +/** + * A request to gather terms for a given field matching a string prefix + */ +public class TermsEnumRequest extends BroadcastRequest implements ToXContentObject { + + public static int DEFAULT_SIZE = 10; + public static TimeValue DEFAULT_TIMEOUT = new TimeValue(1000); + + private String field; + private String string; + private int size = DEFAULT_SIZE; + private boolean caseInsensitive; + long taskStartTimeMillis; + private QueryBuilder indexFilter; + + public TermsEnumRequest() { + this(Strings.EMPTY_ARRAY); + } + + public TermsEnumRequest(StreamInput in) throws IOException { + super(in); + field = in.readString(); + string = in.readString(); + caseInsensitive = in.readBoolean(); + size = in.readVInt(); + indexFilter = in.readOptionalNamedWriteable(QueryBuilder.class); + } + + /** + * Constructs a new term enum request against the provided indices. No indices provided means it will + * run against all indices. + */ + public TermsEnumRequest(String... indices) { + super(indices); + indicesOptions(IndicesOptions.fromOptions(false, false, true, false)); + timeout(DEFAULT_TIMEOUT); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = super.validate(); + if (field == null) { + validationException = ValidateActions.addValidationError("field cannot be null", validationException); + } + if (timeout() == null) { + validationException = ValidateActions.addValidationError("Timeout cannot be null", validationException); + } else { + if (timeout().getSeconds() > 60) { + validationException = ValidateActions.addValidationError("Timeout cannot be > 1 minute", + validationException); + } + } + return validationException; + } + + /** + * The field to look inside for values + */ + public void field(String field) { + this.field = field; + } + + /** + * Indicates if detailed information about query is requested + */ + public String field() { + return field; + } + + /** + * The string required in matching field values + */ + public void string(String string) { + this.string = string; + } + + /** + * The string required in matching field values + */ + public String string() { + return string; + } + + /** + * The number of terms to return + */ + public int size() { + return size; + } + + /** + * The number of terms to return + */ + public void size(int size) { + this.size = size; + } + + /** + * If case insensitive matching is required + */ + public void caseInsensitive(boolean caseInsensitive) { + this.caseInsensitive = caseInsensitive; + } + + /** + * If case insensitive matching is required + */ + public boolean caseInsensitive() { + return caseInsensitive; + } + + /** + * Allows to filter shards if the provided {@link QueryBuilder} rewrites to `match_none`. + */ + public void indexFilter(QueryBuilder indexFilter) { + this.indexFilter = indexFilter; + } + + public QueryBuilder indexFilter() { + return indexFilter; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(field); + out.writeString(string); + out.writeBoolean(caseInsensitive); + out.writeVInt(size); + out.writeOptionalNamedWriteable(indexFilter); + } + + @Override + public String toString() { + return "[" + Arrays.toString(indices) + "] field[" + field + "], string[" + string + "] " + " size=" + size + " timeout=" + + timeout().getMillis() + " case_insensitive=" + + caseInsensitive + " indexFilter = "+ indexFilter; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("field", field); + builder.field("string", string); + builder.field("size", size); + builder.field("timeout", timeout().getMillis()); + builder.field("case_insensitive", caseInsensitive); + if (indexFilter != null) { + builder.field("index_filter", indexFilter); + } + return builder.endObject(); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumRequestBuilder.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumRequestBuilder.java new file mode 100644 index 0000000000000..98c0baf1a5ea8 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumRequestBuilder.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum.action; + +import org.elasticsearch.action.support.broadcast.BroadcastOperationRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +public class TermsEnumRequestBuilder extends BroadcastOperationRequestBuilder< + TermsEnumRequest, + TermsEnumResponse, + TermsEnumRequestBuilder> { + + public TermsEnumRequestBuilder(ElasticsearchClient client, TermsEnumAction action) { + super(client, action, new TermsEnumRequest()); + } + + public TermsEnumRequestBuilder setField(String field) { + request.field(field); + return this; + } + + public TermsEnumRequestBuilder setString(String string) { + request.string(string); + return this; + } + + public TermsEnumRequestBuilder setSize(int size) { + request.size(size); + return this; + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumResponse.java new file mode 100644 index 0000000000000..cb7de89ccaea6 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TermsEnumResponse.java @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum.action; + +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; + +/** + * The response of the _terms_enum action. + */ +public class TermsEnumResponse extends BroadcastResponse { + + public static final String TERMS_FIELD = "terms"; + public static final String COMPLETE_FIELD = "complete"; + + @SuppressWarnings("unchecked") + static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "term_enum_results", + true, + arg -> { + BroadcastResponse response = (BroadcastResponse) arg[0]; + return new TermsEnumResponse( + (List) arg[1], + response.getTotalShards(), + response.getSuccessfulShards(), + response.getFailedShards(), + Arrays.asList(response.getShardFailures()), + (Boolean) arg[2] + ); + } + ); + static { + declareBroadcastFields(PARSER); + PARSER.declareStringArray(optionalConstructorArg(), new ParseField(TERMS_FIELD)); + PARSER.declareBoolean(optionalConstructorArg(), new ParseField(COMPLETE_FIELD)); + } + + private final List terms; + + private boolean complete; + + TermsEnumResponse(StreamInput in) throws IOException { + super(in); + terms = in.readStringList(); + complete = in.readBoolean(); + } + + public TermsEnumResponse( + List terms, + int totalShards, + int successfulShards, + int failedShards, + List shardFailures, boolean complete + ) { + super(totalShards, successfulShards, failedShards, shardFailures); + this.terms = terms == null ? Collections.emptyList() : terms; + this.complete = complete; + } + + /** + * The list of terms. + */ + public List getTerms() { + return terms; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringCollection(terms); + out.writeBoolean(complete); + } + + @Override + protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException { + builder.startArray(TERMS_FIELD); + if (getTerms() != null && getTerms().isEmpty() == false) { + for (String term : getTerms()) { + builder.value(term); + } + } + builder.endArray(); + builder.field(COMPLETE_FIELD, complete); + } + + public static TermsEnumResponse fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java new file mode 100644 index 0000000000000..d176a49b512bd --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/TransportTermsEnumAction.java @@ -0,0 +1,607 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum.action; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.MemoizedSupplier; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.Rewriteable; +import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.license.XPackLicenseState.Feature; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.ShardSearchRequest; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.security.SecurityContext; +import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField; +import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl; +import org.elasticsearch.xpack.core.security.authz.support.DLSRoleQueryValidator; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; + +public class TransportTermsEnumAction extends HandledTransportAction { + + protected final ClusterService clusterService; + protected final TransportService transportService; + private final SearchService searchService; + private final IndicesService indicesService; + private final ScriptService scriptService; + protected final IndexNameExpressionResolver indexNameExpressionResolver; + + final String transportShardAction; + private final String shardExecutor; + private final XPackLicenseState licenseState; + + @Inject + public TransportTermsEnumAction( + // NodeClient client, + ClusterService clusterService, + SearchService searchService, + TransportService transportService, + IndicesService indicesService, + ScriptService scriptService, + ActionFilters actionFilters, + XPackLicenseState licenseState, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super(TermsEnumAction.NAME, transportService, actionFilters, TermsEnumRequest::new); + + this.clusterService = clusterService; + this.searchService = searchService; + this.transportService = transportService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.transportShardAction = actionName + "[s]"; + this.shardExecutor = ThreadPool.Names.AUTO_COMPLETE; + this.indicesService = indicesService; + this.scriptService = scriptService; + this.licenseState = licenseState; + + transportService.registerRequestHandler( + transportShardAction, + ThreadPool.Names.SAME, + NodeTermsEnumRequest::new, + new NodeTransportHandler() + ); + + } + + @Override + protected void doExecute(Task task, TermsEnumRequest request, ActionListener listener) { + request.taskStartTimeMillis = task.getStartTime(); + new AsyncBroadcastAction(task, request, listener).start(); + } + + protected NodeTermsEnumRequest newNodeRequest(final String nodeId, final Set shardIds, TermsEnumRequest request) { + // Given we look terms up in the terms dictionary alias filters is another aspect of search (like DLS) that we + // currently do not support. + // final ClusterState clusterState = clusterService.state(); + // final Set indicesAndAliases = indexNameExpressionResolver.resolveExpressions(clusterState, request.indices()); + // final AliasFilter aliasFilter = searchService.buildAliasFilter(clusterState, shard.getIndexName(), indicesAndAliases); + return new NodeTermsEnumRequest(nodeId, shardIds, request); + } + + protected NodeTermsEnumResponse readShardResponse(StreamInput in) throws IOException { + return new NodeTermsEnumResponse(in); + } + + protected Map> getNodeBundles(ClusterState clusterState, TermsEnumRequest request, String[] concreteIndices) { + // Group targeted shards by nodeId + Map> fastNodeBundles = new HashMap<>(); + for (String indexName : concreteIndices) { + + String[] singleIndex = { indexName }; + + GroupShardsIterator shards = clusterService.operationRouting() + .searchShards(clusterState, singleIndex, null, null); + + Iterator shardsForIndex = shards.iterator(); + while (shardsForIndex.hasNext()) { + ShardIterator copiesOfShard = shardsForIndex.next(); + ShardRouting selectedCopyOfShard = null; + for (ShardRouting copy : copiesOfShard) { + // Pick the first active node with a copy of the shard + if (copy.active() && copy.assignedToNode()) { + selectedCopyOfShard = copy; + break; + } + } + if (selectedCopyOfShard == null) { + break; + } + String nodeId = selectedCopyOfShard.currentNodeId(); + Set bundle = null; + if (fastNodeBundles.containsKey(nodeId)) { + bundle = fastNodeBundles.get(nodeId); + } else { + bundle = new HashSet(); + fastNodeBundles.put(nodeId, bundle); + } + if (bundle != null) { + bundle.add(selectedCopyOfShard.shardId()); + } + } + } + return fastNodeBundles; + } + + protected ClusterBlockException checkGlobalBlock(ClusterState state, TermsEnumRequest request) { + return state.blocks().globalBlockedException(ClusterBlockLevel.READ); + } + + protected ClusterBlockException checkRequestBlock(ClusterState state, TermsEnumRequest countRequest, String[] concreteIndices) { + return state.blocks().indicesBlockedException(ClusterBlockLevel.READ, concreteIndices); + } + + protected TermsEnumResponse newResponse( + TermsEnumRequest request, + AtomicReferenceArray nodesResponses, + boolean complete, + Map> nodeBundles + ) { + int successfulShards = 0; + int failedShards = 0; + List shardFailures = null; + Map combinedResults = new HashMap(); + for (int i = 0; i < nodesResponses.length(); i++) { + Object nodeResponse = nodesResponses.get(i); + if (nodeResponse == null) { + // simply ignore non active shards + } else if (nodeResponse instanceof BroadcastShardOperationFailedException) { + complete = false; + failedShards++; + if (shardFailures == null) { + shardFailures = new ArrayList<>(); + } + shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) nodeResponse)); + } else { + NodeTermsEnumResponse str = (NodeTermsEnumResponse) nodeResponse; + // Only one node response has to be incomplete for the entire result to be labelled incomplete. + if (str.getComplete() == false) { + complete = false; + } + + Set shards = nodeBundles.get(str.getNodeId()); + if (str.getError() != null) { + complete = false; + // A single reported error is assumed to be for all shards queried on that node. + // When reading we read from multiple Lucene indices in one unified view so any error is + // assumed to be all shards on that node. + failedShards += shards.size(); + if (shardFailures == null) { + shardFailures = new ArrayList<>(); + } + for (ShardId failedShard : shards) { + shardFailures.add( + new DefaultShardOperationFailedException( + new BroadcastShardOperationFailedException(failedShard, str.getError()) + ) + ); + } + } else { + successfulShards += shards.size(); + } + for (TermCount term : str.terms()) { + TermCount existingTc = combinedResults.get(term.getTerm()); + if (existingTc == null) { + combinedResults.put(term.getTerm(), term); + } else { + // add counts + existingTc.addToDocCount(term.getDocCount()); + } + } + } + } + int size = Math.min(request.size(), combinedResults.size()); + List terms = new ArrayList<>(size); + TermCount[] sortedCombinedResults = combinedResults.values().toArray(new TermCount[0]); + // Sort alphabetically + Arrays.sort(sortedCombinedResults, new Comparator() { + public int compare(TermCount t1, TermCount t2) { + return t1.getTerm().compareTo(t2.getTerm()); + } + }); + + for (TermCount term : sortedCombinedResults) { + terms.add(term.getTerm()); + if (terms.size() == size) { + break; + } + } + return new TermsEnumResponse(terms, (failedShards + successfulShards), successfulShards, failedShards, shardFailures, complete); + } + + protected NodeTermsEnumResponse dataNodeOperation(NodeTermsEnumRequest request, Task task) throws IOException { + List termsList = new ArrayList<>(); + String error = null; + + long timeout_millis = request.timeout(); + long scheduledEnd = request.nodeStartedTimeMillis() + timeout_millis; + + ArrayList shardTermsEnums = new ArrayList<>(); + ArrayList openedResources = new ArrayList<>(); + try { + for (ShardId shardId : request.shardIds()) { + // Check we haven't just arrived on a node and time is up already. + if (System.currentTimeMillis() > scheduledEnd) { + return new NodeTermsEnumResponse(request.nodeId(), termsList, error, false); + } + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + final IndexShard indexShard = indexService.getShard(shardId.getId()); + + Engine.Searcher searcher = indexShard.acquireSearcher(Engine.SEARCH_SOURCE); + openedResources.add(searcher); + final SearchExecutionContext queryShardContext = indexService.newSearchExecutionContext( + shardId.id(), + 0, + searcher, + request::nodeStartedTimeMillis, + null, + Collections.emptyMap() + ); + final MappedFieldType mappedFieldType = indexShard.mapperService().fieldType(request.field()); + if (mappedFieldType != null) { + TermsEnum terms = mappedFieldType.getTerms(request.caseInsensitive(), request.string(), queryShardContext); + if (terms != null) { + shardTermsEnums.add(terms); + } + } + } + MultiShardTermsEnum te = new MultiShardTermsEnum(shardTermsEnums.toArray(new TermsEnum[0])); + + int shard_size = request.size(); + // All the above prep might take a while - do a timer check now before we continue further. + if (System.currentTimeMillis() > scheduledEnd) { + return new NodeTermsEnumResponse(request.nodeId(), termsList, error, false); + } + + int numTermsBetweenClockChecks = 100; + int termCount = 0; + // Collect in alphabetical order + while (te.next() != null) { + termCount++; + if (termCount > numTermsBetweenClockChecks) { + if (System.currentTimeMillis() > scheduledEnd) { + boolean complete = te.next() == null; + return new NodeTermsEnumResponse(request.nodeId(), termsList, error, complete); + } + termCount = 0; + } + long df = te.docFreq(); + BytesRef bytes = te.term(); + termsList.add(new TermCount(bytes.utf8ToString(), df)); + if (termsList.size() >= shard_size) { + break; + } + } + + } catch (Exception e) { + error = ExceptionsHelper.stackTrace(e); + } finally { + IOUtils.close(openedResources); + } + return new NodeTermsEnumResponse(request.nodeId(), termsList, error, true); + } + + // TODO remove this so we can shift code to server module - write a separate Interceptor class to + // rewrite requests according to security rules + private boolean canAccess( + ShardId shardId, + NodeTermsEnumRequest request, + XPackLicenseState frozenLicenseState, + ThreadContext threadContext + ) throws IOException { + if (frozenLicenseState.isSecurityEnabled()) { + var licenseChecker = new MemoizedSupplier<>(() -> frozenLicenseState.checkFeature(Feature.SECURITY_DLS_FLS)); + IndicesAccessControl indicesAccessControl = threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY); + IndicesAccessControl.IndexAccessControl indexAccessControl = indicesAccessControl.getIndexPermissions(shardId.getIndexName()); + + + if (indexAccessControl != null) { + final boolean dls = indexAccessControl.getDocumentPermissions().hasDocumentLevelPermissions(); + if ( dls && licenseChecker.get()) { + // Check to see if any of the roles defined for the current user rewrite to match_all + + SecurityContext securityContext = new SecurityContext(clusterService.getSettings(), threadContext); + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + final SearchExecutionContext queryShardContext = indexService.newSearchExecutionContext( + shardId.id(), + 0, + null, + request::nodeStartedTimeMillis, + null, + Collections.emptyMap() + ); + + // Current user has potentially many roles and therefore potentially many queries + // defining sets of docs accessible + Set queries = indexAccessControl.getDocumentPermissions().getQueries(); + for (BytesReference querySource : queries) { + QueryBuilder queryBuilder = DLSRoleQueryValidator.evaluateAndVerifyRoleQuery( + querySource, + scriptService, + queryShardContext.getXContentRegistry(), + securityContext.getUser() + ); + QueryBuilder rewrittenQueryBuilder = Rewriteable.rewrite(queryBuilder, queryShardContext); + if (rewrittenQueryBuilder instanceof MatchAllQueryBuilder) { + // One of the roles assigned has "all" permissions - allow unfettered access to termsDict + return true; + } + } + return false; + } + } + } + return true; + } + + private boolean canMatchShard(ShardId shardId, NodeTermsEnumRequest req) throws IOException { + if (req.indexFilter() == null || req.indexFilter() instanceof MatchAllQueryBuilder) { + return true; + } + ShardSearchRequest searchRequest = new ShardSearchRequest(shardId, req.nodeStartedTimeMillis(), AliasFilter.EMPTY); + searchRequest.source(new SearchSourceBuilder().query(req.indexFilter())); + return searchService.canMatch(searchRequest).canMatch(); + } + + protected class AsyncBroadcastAction { + + private final Task task; + private final TermsEnumRequest request; + private ActionListener listener; + private final ClusterState clusterState; + private final DiscoveryNodes nodes; + private final int expectedOps; + private final AtomicInteger counterOps = new AtomicInteger(); + private final AtomicReferenceArray nodesResponses; + private Map> nodeBundles; + + protected AsyncBroadcastAction(Task task, TermsEnumRequest request, ActionListener listener) { + this.task = task; + this.request = request; + this.listener = listener; + + clusterState = clusterService.state(); + + ClusterBlockException blockException = checkGlobalBlock(clusterState, request); + if (blockException != null) { + throw blockException; + } + // update to concrete indices + String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, request); + blockException = checkRequestBlock(clusterState, request, concreteIndices); + if (blockException != null) { + throw blockException; + } + + nodes = clusterState.nodes(); + logger.trace("resolving shards based on cluster state version [{}]", clusterState.version()); + nodeBundles = getNodeBundles(clusterState, request, concreteIndices); + expectedOps = nodeBundles.size(); + + nodesResponses = new AtomicReferenceArray<>(expectedOps); + } + + public void start() { + if (nodeBundles.size() == 0) { + // no shards + try { + listener.onResponse(newResponse(request, new AtomicReferenceArray<>(0), true, nodeBundles)); + } catch (Exception e) { + listener.onFailure(e); + } + // TODO or remove above try and instead just call finishHim() here? Helps keep return logic consistent + return; + } + // count the local operations, and perform the non local ones + int nodeIndex = -1; + for (final String nodeId : nodeBundles.keySet()) { + if (checkForEarlyFinish()) { + return; + } + nodeIndex++; + Set shardIds = nodeBundles.get(nodeId); + if (shardIds.size() > 0) { + performOperation(nodeId, shardIds, nodeIndex); + } else { + // really, no shards active in this group + onNoOperation(nodeId); + } + } + } + + // Returns true if we exited with a response to the caller. + boolean checkForEarlyFinish() { + long now = System.currentTimeMillis(); + if ((now - task.getStartTime()) > request.timeout().getMillis()) { + finishHim(false); + return true; + } + return false; + } + + protected void performOperation(final String nodeId, final Set shardIds, final int nodeIndex) { + if (shardIds.size() == 0) { + // no more active shards... (we should not really get here, just safety) + onNoOperation(nodeId); + } else { + try { + final NodeTermsEnumRequest nodeRequest = newNodeRequest(nodeId, shardIds, request); + nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId()); + DiscoveryNode node = nodes.get(nodeId); + if (node == null) { + // no node connected, act as failure + onNoOperation(nodeId); + } else if (checkForEarlyFinish() == false) { + transportService.sendRequest( + node, + transportShardAction, + nodeRequest, + new TransportResponseHandler() { + @Override + public NodeTermsEnumResponse read(StreamInput in) throws IOException { + return readShardResponse(in); + } + + @Override + public void handleResponse(NodeTermsEnumResponse response) { + onOperation(nodeId, nodeIndex, response); + } + + @Override + public void handleException(TransportException e) { + onNoOperation(nodeId); + } + } + ); + } + } catch (Exception e) { + onNoOperation(nodeId); + } + } + } + + protected void onOperation(String nodeId, int nodeIndex, NodeTermsEnumResponse response) { + logger.trace("received response for node {}", nodeId); + nodesResponses.set(nodeIndex, response); + if (expectedOps == counterOps.incrementAndGet()) { + finishHim(true); + } else { + checkForEarlyFinish(); + } + } + + void onNoOperation(String nodeId) { + if (expectedOps == counterOps.incrementAndGet()) { + finishHim(true); + } + } + + // Can be called multiple times - either for early time-outs or for fully-completed collections. + protected synchronized void finishHim(boolean complete) { + if (listener == null) { + return; + } + try { + listener.onResponse(newResponse(request, nodesResponses, complete, nodeBundles)); + } catch (Exception e) { + listener.onFailure(e); + } finally { + listener = null; + } + } + } + + class NodeTransportHandler implements TransportRequestHandler { + + @Override + public void messageReceived(NodeTermsEnumRequest request, TransportChannel channel, Task task) throws Exception { + asyncNodeOperation(request, task, ActionListener.wrap(channel::sendResponse, e -> { + try { + channel.sendResponse(e); + } catch (Exception e1) { + logger.warn( + () -> new ParameterizedMessage( + "Failed to send error response for action [{}] and request [{}]", + actionName, + request + ), + e1 + ); + } + })); + } + } + + private void asyncNodeOperation(NodeTermsEnumRequest request, Task task, ActionListener listener) + throws IOException { + // Start the clock ticking on the data node using the data node's local current time. + request.startTimerOnDataNode(); + + // DLS/FLS check copied from ResizeRequestInterceptor - check permissions and + // any index_filter canMatch checks on network thread before allocating work + ThreadContext threadContext = transportService.getThreadPool().getThreadContext(); + final XPackLicenseState frozenLicenseState = licenseState.copyCurrentLicenseState(); + for (ShardId shardId : request.shardIds().toArray(new ShardId[0])) { + if (canAccess(shardId, request, frozenLicenseState, threadContext) == false || canMatchShard( + shardId, + request + ) == false) { + // Permission denied or can't match, remove shardID from request + request.remove(shardId); + } + } + if (request.shardIds().size() == 0) { + listener.onResponse(new NodeTermsEnumResponse(request.nodeId(), Collections.emptyList(), null, true)); + } else { + // Use the search threadpool if its queue is empty + assert transportService.getThreadPool() + .executor( + ThreadPool.Names.SEARCH + ) instanceof EsThreadPoolExecutor : "SEARCH threadpool must be an instance of ThreadPoolExecutor"; + EsThreadPoolExecutor ex = (EsThreadPoolExecutor) transportService.getThreadPool().executor(ThreadPool.Names.SEARCH); + final String executorName = ex.getQueue().size() == 0 ? ThreadPool.Names.SEARCH : shardExecutor; + transportService.getThreadPool() + .executor(executorName) + .execute(ActionRunnable.supply(listener, () -> dataNodeOperation(request, task))); + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/package-info.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/package-info.java new file mode 100644 index 0000000000000..4042ef981827f --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/package-info.java @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +/** + * Enumerate a field's terms action. + */ +package org.elasticsearch.xpack.core.termsenum.action; \ No newline at end of file diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/rest/RestTermsEnumAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/rest/RestTermsEnumAction.java new file mode 100644 index 0000000000000..4c771995d085e --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/rest/RestTermsEnumAction.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.termsenum.action.TermsEnumAction; +import org.elasticsearch.xpack.core.termsenum.action.TermsEnumRequest; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.GET; +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestTermsEnumAction extends BaseRestHandler { + + @Override + public List routes() { + return List.of( + new Route(GET, "/{index}/_terms_enum"), + new Route(POST, "/{index}/_terms_enum")); + } + + @Override + public String getName() { + return "term_enum_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + try (XContentParser parser = request.contentOrSourceParamParser()) { + TermsEnumRequest termEnumRequest = TermsEnumAction.fromXContent(parser, + Strings.splitStringByCommaToArray(request.param("index"))); + return channel -> + client.execute(TermsEnumAction.INSTANCE, termEnumRequest, new RestToXContentListener<>(channel)); + } + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/MultiShardTermsEnumTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/MultiShardTermsEnumTests.java new file mode 100644 index 0000000000000..248bc102c4e64 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/MultiShardTermsEnumTests.java @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum; + +import org.apache.lucene.analysis.MockAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.MultiTerms; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.store.ByteBuffersDirectory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.automaton.Automata; +import org.apache.lucene.util.automaton.Automaton; +import org.apache.lucene.util.automaton.CompiledAutomaton; +import org.apache.lucene.util.automaton.MinimizationOperations; +import org.apache.lucene.util.automaton.Operations; +import org.elasticsearch.common.lucene.search.AutomatonQueries; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.termsenum.action.MultiShardTermsEnum; +import org.elasticsearch.xpack.core.termsenum.action.SimpleTermCountEnum; +import org.elasticsearch.xpack.core.termsenum.action.TermCount; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; + +public class MultiShardTermsEnumTests extends ESTestCase { + + public void testRandomIndexFusion() throws Exception { + String fieldName = "foo"; + Map globalTermCounts = new HashMap<>(); + + int numShards = randomIntBetween(2, 15); + + ArrayList closeables = new ArrayList<>(); + ArrayList readers = new ArrayList<>(); + + try { + for (int s = 0; s < numShards; s++) { + Directory directory = new ByteBuffersDirectory(); + IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(new MockAnalyzer(random()))); + + int numDocs = randomIntBetween(10,200); + for (int i = 0; i < numDocs; i++) { + Document document = new Document(); + String term = randomAlphaOfLengthBetween(1,3).toLowerCase(Locale.ROOT); + document.add(new StringField(fieldName, term, Field.Store.YES)); + writer.addDocument(document); + int count = 0; + if (globalTermCounts.containsKey(term)) { + count = globalTermCounts.get(term); + } + count++; + globalTermCounts.put(term, count); + + } + DirectoryReader reader = DirectoryReader.open(writer); + readers.add(reader); + writer.close(); + closeables.add(reader); + closeables.add(directory); + } + + int numSearches = 100; + for (int q = 0; q < numSearches; q++) { + String searchPrefix = randomAlphaOfLengthBetween(0, 3).toLowerCase(Locale.ROOT); + Automaton a = AutomatonQueries.caseInsensitivePrefix(searchPrefix); + a = Operations.concatenate(a, Automata.makeAnyString()); + a = MinimizationOperations.minimize(a, Integer.MAX_VALUE); + CompiledAutomaton automaton = new CompiledAutomaton(a); + + ArrayList termsEnums = new ArrayList<>(); + for (DirectoryReader reader : readers) { + Terms terms = MultiTerms.getTerms(reader, fieldName); + TermsEnum te = automaton.getTermsEnum(terms); + if (randomBoolean()) { + // Simulate fields like constant-keyword which use a SimpleTermCountEnum to present results + // rather than the raw TermsEnum from Lucene. + ArrayList termCounts = new ArrayList<>(); + while (te.next() != null) { + termCounts.add(new TermCount(te.term().utf8ToString(), te.docFreq())); + } + SimpleTermCountEnum simpleEnum = new SimpleTermCountEnum(termCounts.toArray(new TermCount[0])); + termsEnums.add(simpleEnum); + } else { + termsEnums.add(te); + } + } + MultiShardTermsEnum mte = new MultiShardTermsEnum(termsEnums.toArray(new TermsEnum[0])); + HashMap expecteds = new HashMap<>(); + + for (Entry termCount : globalTermCounts.entrySet()) { + if (termCount.getKey().startsWith(searchPrefix)) { + expecteds.put(termCount.getKey(), termCount.getValue()); + } + } + + while (mte.next() != null) { + String teString = mte.term().utf8ToString(); + long actual = mte.docFreq(); + assertTrue(expecteds.containsKey(teString)); + long expected = expecteds.get(teString); + expecteds.remove(teString); + assertEquals(mte.term().utf8ToString() + " string count wrong", expected, actual); + } + assertEquals("Expected results not found", 0, expecteds.size()); + + } + } finally { + IOUtils.close(closeables.toArray(new Closeable[0])); + } + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/TermCountTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/TermCountTests.java new file mode 100644 index 0000000000000..c6368db221e29 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/TermCountTests.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.termsenum.action.TermCount; + +import java.io.IOException; + +public class TermCountTests extends AbstractSerializingTestCase { + + static TermCount createRandomQueryExplanation(boolean isValid) { + int docCount = randomInt(100); + String term = randomAlphaOfLength(randomIntBetween(10, 100)); + return new TermCount(term, docCount); + } + + static TermCount createRandomQueryExplanation() { + return createRandomQueryExplanation(randomBoolean()); + } + + @Override + protected TermCount doParseInstance(XContentParser parser) throws IOException { + return TermCount.fromXContent(parser); + } + + @Override + protected TermCount createTestInstance() { + return createRandomQueryExplanation(); + } + + @Override + protected Writeable.Reader instanceReader() { + return TermCount::new; + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/TermsEnumResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/TermsEnumResponseTests.java new file mode 100644 index 0000000000000..e166599f591c0 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/TermsEnumResponseTests.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractBroadcastResponseTestCase; +import org.elasticsearch.xpack.core.termsenum.action.TermsEnumResponse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class TermsEnumResponseTests extends AbstractBroadcastResponseTestCase { + + protected static List getRandomTerms() { + int termCount = randomIntBetween(0, 100); + Set uniqueTerms = new HashSet<>(termCount); + while (uniqueTerms.size() < termCount) { + String s = randomAlphaOfLengthBetween(1, 10); + uniqueTerms.add(s); + } + List terms = new ArrayList<>(uniqueTerms); + return terms; + } + + private static TermsEnumResponse createRandomTermEnumResponse() { + int totalShards = randomIntBetween(1, 10); + int successfulShards = randomIntBetween(0, totalShards); + int failedShards = totalShards - successfulShards; + List shardFailures = new ArrayList<>(failedShards); + for (int i=0; i failures) { + return new TermsEnumResponse(getRandomTerms(), totalShards, successfulShards, failedShards, failures, randomBoolean()); + + } + + @Override + public void testToXContent() { + String s = randomAlphaOfLengthBetween(1, 10); + List terms = new ArrayList<>(); + terms.add(s); + TermsEnumResponse response = new TermsEnumResponse(terms, 10, 10, 0, new ArrayList<>(), true); + + String output = Strings.toString(response); + assertEquals("{\"_shards\":{\"total\":10,\"successful\":10,\"failed\":0},\"terms\":[" + + "\""+ s +"\""+ + "],\"complete\":true}", output); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/TransportTermsEnumActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/TransportTermsEnumActionTests.java new file mode 100644 index 0000000000000..ca9c46e9f6e7d --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/TransportTermsEnumActionTests.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xpack.core.termsenum.action.TermsEnumAction; +import org.elasticsearch.xpack.core.termsenum.action.TermsEnumRequest; +import org.elasticsearch.xpack.core.termsenum.action.TermsEnumResponse; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.equalTo; + +public class TransportTermsEnumActionTests extends ESSingleNodeTestCase { + + /* + * Copy of test that tripped up similarly broadcast ValidateQuery + */ + public void testListenerOnlyInvokedOnceWhenIndexDoesNotExist() { + final AtomicBoolean invoked = new AtomicBoolean(); + final ActionListener listener = new ActionListener<>() { + + @Override + public void onResponse(final TermsEnumResponse validateQueryResponse) { + fail("onResponse should not be invoked in this failure case"); + } + + @Override + public void onFailure(final Exception e) { + if (invoked.compareAndSet(false, true) == false) { + fail("onFailure invoked more than once"); + } + } + + }; + client().execute(TermsEnumAction.INSTANCE, new TermsEnumRequest("non-existent-index"),listener); + assertThat(invoked.get(), equalTo(true)); // ensure that onFailure was invoked + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/action/RestTermsEnumActionTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/action/RestTermsEnumActionTests.java new file mode 100644 index 0000000000000..93e15f141634c --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/termsenum/action/RestTermsEnumActionTests.java @@ -0,0 +1,150 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.core.termsenum.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +//import org.elasticsearch.rest.CompatibleVersion; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.FakeRestChannel; +import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.usage.UsageService; +import org.elasticsearch.xpack.core.termsenum.action.TermsEnumAction; +import org.elasticsearch.xpack.core.termsenum.rest.RestTermsEnumAction; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; + +public class RestTermsEnumActionTests extends ESTestCase { + + private static ThreadPool threadPool = new TestThreadPool(RestTermsEnumActionTests.class.getName()); + private static NodeClient client = new NodeClient(Settings.EMPTY, threadPool); + + private static UsageService usageService = new UsageService(); + private static RestController controller = new RestController(emptySet(), null, client, + new NoneCircuitBreakerService(), usageService); + private static RestTermsEnumAction action = new RestTermsEnumAction(); + + /** + * Configures {@link NodeClient} to stub {@link TermsEnumAction} transport action. + *

+ * This lower level of execution is out of the scope of this test. + */ + @BeforeClass + @SuppressWarnings("rawtypes") + public static void stubTermEnumAction() { + final TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); + + final TransportAction transportAction = new TransportAction(TermsEnumAction.NAME, + new ActionFilters(Collections.emptySet()), taskManager) { + @Override + protected void doExecute(Task task, ActionRequest request, ActionListener listener) { + } + }; + + final Map actions = new HashMap<>(); + actions.put(TermsEnumAction.INSTANCE, transportAction); + + client.initialize(actions, taskManager, () -> "local", + mock(Transport.Connection.class), null, new NamedWriteableRegistry(List.of())); + controller.registerHandler(action); + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList()); + return new NamedXContentRegistry(searchModule.getNamedXContents()); + } + + @AfterClass + public static void terminateThreadPool() { + terminate(threadPool); + + threadPool = null; + client = null; + + usageService = null; + controller = null; + action = null; + } + + public void testRestTermEnumAction() throws Exception { + // GIVEN a valid query + final String content = "{" + + "\"field\":\"a\", " + + "\"string\":\"foo\", " + + "\"index_filter\":{\"bool\":{\"must\":{\"term\":{\"user\":\"kimchy\"}}}}}"; + + final RestRequest request = createRestRequest(content); + final FakeRestChannel channel = new FakeRestChannel(request, true, 0); + + // WHEN + action.handleRequest(request, channel, client); + + // THEN request is parsed OK + assertThat(channel.responses().get(), equalTo(0)); + assertThat(channel.errors().get(), equalTo(0)); + assertNull(channel.capturedResponse()); + } + + public void testRestTermEnumActionMissingField() throws Exception { + // GIVEN an invalid query + final String content = "{" +// + "\"field\":\"a\", " + + "\"string\":\"foo\", " + + "\"index_filter\":{\"bool\":{\"must\":{\"term\":{\"user\":\"kimchy\"}}}}}"; + + final RestRequest request = createRestRequest(content); + final FakeRestChannel channel = new FakeRestChannel(request, true, 0); + + // WHEN + action.handleRequest(request, channel, client); + + // THEN request is invalid - missing mandatory "field" parameter. + assertThat(channel.responses().get(), equalTo(0)); + assertThat(channel.errors().get(), equalTo(1)); + assertThat(channel.capturedResponse().content().utf8ToString(), containsString("field cannot be null")); + } + + + private RestRequest createRestRequest(String content) { + return new FakeRestRequest.Builder(xContentRegistry()) + .withPath("index1/_terms_enum") + .withParams(emptyMap()) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + } + +} diff --git a/x-pack/plugin/mapper-constant-keyword/src/main/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapper.java b/x-pack/plugin/mapper-constant-keyword/src/main/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapper.java index 973ab6d2c0744..a309fec91a663 100644 --- a/x-pack/plugin/mapper-constant-keyword/src/main/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapper.java +++ b/x-pack/plugin/mapper-constant-keyword/src/main/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapper.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.constantkeyword.mapper; +import org.apache.lucene.index.TermsEnum; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.MultiTermQuery; @@ -38,11 +39,14 @@ import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; import org.elasticsearch.search.lookup.SearchLookup; +import org.elasticsearch.xpack.core.termsenum.action.SimpleTermCountEnum; +import org.elasticsearch.xpack.core.termsenum.action.TermCount; import java.io.IOException; import java.time.ZoneId; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.function.Supplier; @@ -140,6 +144,20 @@ public ValueFetcher valueFetcher(SearchExecutionContext context, String format) ? lookup -> List.of() : lookup -> List.of(value); } + + + + @Override + public TermsEnum getTerms(boolean caseInsensitive, String string, SearchExecutionContext queryShardContext) throws IOException { + boolean matches = caseInsensitive ? + value.toLowerCase(Locale.ROOT).startsWith(string.toLowerCase(Locale.ROOT)) : + value.startsWith(string); + if (matches == false) { + return null; + } + int docCount = queryShardContext.searcher().getIndexReader().maxDoc(); + return new SimpleTermCountEnum(new TermCount(value, docCount)); + } @Override protected boolean matches(String pattern, boolean caseInsensitive, SearchExecutionContext context) { diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 86cfc55bd4558..6eabe470b1931 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -416,6 +416,7 @@ public class Constants { "indices:data/read/xpack/graph/explore", "indices:data/read/xpack/rollup/get/index/caps", "indices:data/read/xpack/rollup/search", + "indices:data/read/xpack/termsenum/list", "indices:data/write/bulk", "indices:data/write/bulk[s]", "indices:data/write/bulk_shard_operations[s]", diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/terms_enum/10_basic.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/terms_enum/10_basic.yml new file mode 100644 index 0000000000000..27ca0a82a203a --- /dev/null +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/terms_enum/10_basic.yml @@ -0,0 +1,332 @@ +--- +setup: + - skip: + features: headers + - do: + cluster.health: + wait_for_status: yellow + + - do: + security.put_role: + name: "test_admin_role" + body: > + { + "indices": [ + { "names": ["*"], "privileges": ["all"] } + ] + } + + - do: + security.put_user: + username: "test_admin" + body: > + { + "password" : "x-pack-test-password", + "roles" : [ "test_admin_role" ], + "full_name" : "user with full privileges to multiple indices" + } + + - do: + security.put_role: + name: "dls_all_role" + body: > + { + "indices": [ + { "names": ["test_security"], "privileges": ["read"], "query": "{\"term\": {\"ck\": \"const\"}}" } + ] + } + + - do: + security.put_role: + name: "dls_none_role" + body: > + { + "indices": [ + { "names": ["test_security"], "privileges": ["read"], "query": "{\"term\": {\"foo\": \"does_not_exist\"}}" } + ] + } + - do: + security.put_user: + username: "dls_all_user" + body: > + { + "password" : "x-pack-test-password", + "roles" : [ "dls_none_role", "dls_all_role" ], + "full_name" : "user with access to all docs in test_security index (using DLS)" + } + + - do: + security.put_role: + name: "dls_some_role" + body: > + { + "indices": [ + { "names": ["test_security"], "privileges": ["read"], "query": "{\"term\": {\"foo\": \"bar_dls\"}}" } + ] + } + - do: + security.put_user: + username: "dls_some_user" + body: > + { + "password" : "x-pack-test-password", + "roles" : [ "dls_some_role" ], + "full_name" : "user with access to selected docs in index" + } + - do: + security.put_role: + name: "fls_role" + body: > + { + "indices": [ + { "names": ["test_security"], "privileges": ["read"], "field_security" : {"grant" : [ "*"],"except": [ "foo" ]} } + ] + } + + - do: + security.put_user: + username: "fls_user" + body: > + { + "password" : "x-pack-test-password", + "roles" : [ "fls_role" ], + "full_name" : "user with access to selected docs in index" + } + - do: + indices.create: + index: test_k + body: + settings: + index: + number_of_shards: 1 + number_of_replicas: 0 + mappings: + properties: + foo: + type : keyword + timestamp: + type : date + - do: + indices.create: + index: test_ck + body: + settings: + index: + number_of_shards: 1 + number_of_replicas: 0 + mappings: + properties: + foo: + type : constant_keyword + value: bar_ck + other: + type : text + timestamp: + type : date + - do: + indices.create: + index: test_f + body: + settings: + index: + number_of_shards: 1 + number_of_replicas: 0 + mappings: + properties: + foo: + type : flattened + timestamp: + type : date + - do: + indices.create: + index: test_security + body: + settings: + index: + number_of_shards: 1 + number_of_replicas: 0 + mappings: + properties: + ck: + type : constant_keyword + value: const + foo: + type : keyword + + - do: + index: + index: test_k + id: 1 + body: { foo: "bar_k", "timestamp":"2021-01-01T01:01:01.000Z" } + + - do: + index: + index: test_ck + id: 2 + body: { other: "foo", "timestamp":"2020-01-01T01:01:01.000Z" } + + - do: + index: + index: test_f + id: 3 + body: { foo: { bar: "bar_f" }, "timestamp":"2019-01-01T01:01:01.000Z" } + - do: + index: + index: test_security + id: 4 + body: { foo: "bar_dls"} + + - do: #superuser + headers: { Authorization: "Basic dGVzdF9hZG1pbjp4LXBhY2stdGVzdC1wYXNzd29yZA==" } # admin + indices.refresh: {} + + - do: #superuser + cluster.health: + index: test_f + wait_for_status: green + +--- +teardown: + - do: + security.delete_user: + username: "dls_all_user" + ignore: 404 + + - do: + security.delete_role: + name: "dls_all_role" + ignore: 404 + - do: + security.delete_role: + name: "dls_none_role" + ignore: 404 + + - do: + security.delete_user: + username: "dls_some_user" + ignore: 404 + + - do: + security.delete_role: + name: "dls_some_role" + ignore: 404 + - do: + security.delete_user: + username: "fls_user" + ignore: 404 + + - do: + security.delete_role: + name: "fls_role" + ignore: 404 + - do: + security.delete_user: + username: "test_admin" + ignore: 404 + + - do: + security.delete_role: + name: "test_admin_role" + ignore: 404 + +--- +"Test basic term enumeration": + - do: + termsenum: + index: test_* + body: {"field": "foo", "string":"b"} + - length: {terms: 3} + + + - do: + termsenum: + index: test_* + body: {"field": "foo.bar", "string":"b"} + - length: {terms: 1} + +--- +"Test case insensitivity": + - do: + termsenum: + index: test_k + body: {"field": "foo", "string":"B"} + - length: {terms: 0} + + - do: + termsenum: + index: test_k + body: {"field": "foo", "string":"B", "case_insensitive": true} + - length: {terms: 1} + + - do: + termsenum: + index: test_f + body: {"field": "foo.bar", "string":"B"} + - length: {terms: 0} + + - do: + termsenum: + index: test_f + body: {"field": "foo.bar", "string":"B", "case_insensitive": true} + - length: {terms: 1} + + - do: + termsenum: + index: test_f + body: {"field": "foo.Bar", "string":"B", "case_insensitive": true} + - length: {terms: 0} + + +--- +"Test index filtering": + - do: + termsenum: + index: test_* + body: {"field": "foo", "string":"b", "index_filter":{"range":{"timestamp":{"gte":"2021-01-01T01:01:01.000Z"}}}} + - length: {terms: 1} +--- +"Test legal timeout": + - do: + termsenum: + index: test_* + body: {"field": "foo", "string":"b", "timeout": "1s"} + - length: {terms: 3} +--- +"Test illegal timeout": + - do: + catch: /Timeout cannot be > 1 minute/ + termsenum: + index: test_* + body: {"field": "foo", "string":"b", "timeout": "2m"} +--- +"Test security": + + - do: + headers: { Authorization: "Basic dGVzdF9hZG1pbjp4LXBhY2stdGVzdC1wYXNzd29yZA==" } # admin_user sees all docs + termsenum: + index: test_security + body: {"field": "foo", "string":"b"} + - length: {terms: 1} + + - do: + headers: { Authorization: "Basic ZGxzX2FsbF91c2VyOngtcGFjay10ZXN0LXBhc3N3b3Jk" } # dls_all_user sees all docs + termsenum: + index: test_security + body: {"field": "foo", "string":"b"} + - length: {terms: 1} + + - do: + headers: { Authorization: "Basic ZGxzX3NvbWVfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" } # dls_some_user sees selected docs + termsenum: + index: test_security + body: {"field": "foo", "string":"b"} + - length: {terms: 0} + + - do: + headers: { Authorization: "Basic ZmxzX3VzZXI6eC1wYWNrLXRlc3QtcGFzc3dvcmQ=" } # fls_user can't see field + termsenum: + index: test_security + body: {"field": "foo", "string":"b"} + - length: {terms: 0} + +