Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
*/
package org.elasticsearch.xpack.core.termsenum.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -26,22 +28,26 @@
*/
public class NodeTermsEnumRequest extends TransportRequest implements IndicesRequest {

private String field;
private String string;
private String searchAfter;
private long taskStartedTimeMillis;
private long nodeStartedTimeMillis;
private boolean caseInsensitive;
private int size;
private long timeout;
private final String field;
private final String string;
private final String searchAfter;
private final long taskStartedTimeMillis;
private final boolean caseInsensitive;
private final int size;
private final long timeout;
private final QueryBuilder indexFilter;
private Set<ShardId> shardIds;
private String nodeId;
private final Set<ShardId> shardIds;
private final String nodeId;
private final OriginalIndices originalIndices;

private long nodeStartedTimeMillis;

public NodeTermsEnumRequest(final String nodeId,
public NodeTermsEnumRequest(OriginalIndices originalIndices,
final String nodeId,
final Set<ShardId> shardIds,
TermsEnumRequest request,
long taskStartTimeMillis) {
this.originalIndices = originalIndices;
this.field = request.field();
this.string = request.string();
this.searchAfter = request.searchAfter();
Expand Down Expand Up @@ -70,6 +76,15 @@ public NodeTermsEnumRequest(StreamInput in) throws IOException {
for (int i = 0; i < numShards; i++) {
shardIds.add(new ShardId(in));
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
originalIndices = OriginalIndices.readOriginalIndices(in);
} else {
String[] indicesNames = shardIds.stream()
.map(ShardId::getIndexName)
.distinct()
.toArray(String[]::new);
this.originalIndices = new OriginalIndices(indicesNames, null);
}
}

@Override
Expand All @@ -92,6 +107,9 @@ public void writeTo(StreamOutput out) throws IOException {
for (ShardId shardId : shardIds) {
shardId.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
}

public String field() {
Expand Down Expand Up @@ -152,16 +170,12 @@ public QueryBuilder indexFilter() {

@Override
public String[] indices() {
HashSet<String> indicesNames = new HashSet<>();
for (ShardId shardId : shardIds) {
indicesNames.add(shardId.getIndexName());
}
return indicesNames.toArray(new String[0]);
return originalIndices.indices();
}

@Override
public IndicesOptions indicesOptions() {
return null;
return originalIndices.indicesOptions();
}

public boolean remove(ShardId shardId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public TransportTermsEnumAction(
this.scriptService = scriptService;
this.licenseState = licenseState;
this.settings = settings;
this.remoteClusterService = searchTransportService.getRemoteClusterService();;
this.remoteClusterService = searchTransportService.getRemoteClusterService();

transportService.registerRequestHandler(
transportShardAction,
Expand All @@ -140,7 +140,8 @@ protected void doExecute(Task task, TermsEnumRequest request, ActionListener<Ter
new AsyncBroadcastAction(task, request, listener).start();
}

protected NodeTermsEnumRequest newNodeRequest(final String nodeId,
protected NodeTermsEnumRequest newNodeRequest(final OriginalIndices originalIndices,
final String nodeId,
final Set<ShardId> shardIds,
TermsEnumRequest request,
long taskStartMillis) {
Expand All @@ -149,14 +150,14 @@ protected NodeTermsEnumRequest newNodeRequest(final String nodeId,
// final ClusterState clusterState = clusterService.state();
// final Set<String> indicesAndAliases = indexNameExpressionResolver.resolveExpressions(clusterState, request.indices());
// final AliasFilter aliasFilter = searchService.buildAliasFilter(clusterState, shard.getIndexName(), indicesAndAliases);
return new NodeTermsEnumRequest(nodeId, shardIds, request, taskStartMillis);
return new NodeTermsEnumRequest(originalIndices, nodeId, shardIds, request, taskStartMillis);
}

protected NodeTermsEnumResponse readShardResponse(StreamInput in) throws IOException {
return new NodeTermsEnumResponse(in);
}

protected Map<String, Set<ShardId>> getNodeBundles(ClusterState clusterState, TermsEnumRequest request, String[] concreteIndices) {
protected Map<String, Set<ShardId>> getNodeBundles(ClusterState clusterState, String[] concreteIndices) {
// Group targeted shards by nodeId
Map<String, Set<ShardId>> fastNodeBundles = new HashMap<>();
for (String indexName : concreteIndices) {
Expand All @@ -166,9 +167,7 @@ protected Map<String, Set<ShardId>> getNodeBundles(ClusterState clusterState, Te
GroupShardsIterator<ShardIterator> shards = clusterService.operationRouting()
.searchShards(clusterState, singleIndex, null, null);

Iterator<ShardIterator> shardsForIndex = shards.iterator();
while (shardsForIndex.hasNext()) {
ShardIterator copiesOfShard = shardsForIndex.next();
for (ShardIterator copiesOfShard : shards) {
ShardRouting selectedCopyOfShard = null;
for (ShardRouting copy : copiesOfShard) {
// Pick the first active node with a copy of the shard
Expand All @@ -181,7 +180,7 @@ protected Map<String, Set<ShardId>> getNodeBundles(ClusterState clusterState, Te
break;
}
String nodeId = selectedCopyOfShard.currentNodeId();
Set<ShardId> bundle = null;
final Set<ShardId> bundle;
if (fastNodeBundles.containsKey(nodeId)) {
bundle = fastNodeBundles.get(nodeId);
} else {
Expand Down Expand Up @@ -392,7 +391,7 @@ protected NodeTermsEnumResponse dataNodeOperation(NodeTermsEnumRequest request,
if (termsList.size() >= shard_size) {
break;
}
};
}

} catch (Exception e) {
error = ExceptionsHelper.stackTrace(e);
Expand All @@ -418,7 +417,7 @@ private boolean canAccess(

if (indexAccessControl != null) {
final boolean dls = indexAccessControl.getDocumentPermissions().hasDocumentLevelPermissions();
if ( dls && licenseChecker.get()) {
if (dls && licenseChecker.get()) {
// Check to see if any of the roles defined for the current user rewrite to match_all

SecurityContext securityContext = new SecurityContext(clusterService.getSettings(), threadContext);
Expand Down Expand Up @@ -469,28 +468,28 @@ protected class AsyncBroadcastAction {
private final Task task;
private final TermsEnumRequest request;
private ActionListener<TermsEnumResponse> listener;
private final ClusterState clusterState;
private final DiscoveryNodes nodes;
private final int expectedOps;
private final AtomicInteger counterOps = new AtomicInteger();
private final AtomicReferenceArray<Object> atomicResponses;
private final Map<String, Set<ShardId>> nodeBundles;
private final OriginalIndices localIndices;
private final Map<String, OriginalIndices> remoteClusterIndices;

protected AsyncBroadcastAction(Task task, TermsEnumRequest request, ActionListener<TermsEnumResponse> listener) {
this.task = task;
this.request = request;
this.listener = listener;

clusterState = clusterService.state();
ClusterState clusterState = clusterService.state();

ClusterBlockException blockException = checkGlobalBlock(clusterState, request);
if (blockException != null) {
throw blockException;
}

this.remoteClusterIndices = remoteClusterService.groupIndices(request.indicesOptions(), request.indices());
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
this.localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);

// update to concrete indices
String[] concreteIndices = localIndices == null ? new String[0] :
Expand All @@ -502,7 +501,7 @@ protected AsyncBroadcastAction(Task task, TermsEnumRequest request, ActionListen

nodes = clusterState.nodes();
logger.trace("resolving shards based on cluster state version [{}]", clusterState.version());
nodeBundles = getNodeBundles(clusterState, request, concreteIndices);
nodeBundles = getNodeBundles(clusterState, concreteIndices);
expectedOps = nodeBundles.size() + remoteClusterIndices.size();

atomicResponses = new AtomicReferenceArray<>(expectedOps);
Expand Down Expand Up @@ -557,7 +556,7 @@ protected void performOperation(final String nodeId, final Set<ShardId> shardIds
onNodeFailure(nodeId, opsIndex, null);
} else {
try {
final NodeTermsEnumRequest nodeRequest = newNodeRequest(nodeId, shardIds, request, task.getStartTime());
final NodeTermsEnumRequest nodeRequest = newNodeRequest(localIndices, nodeId, shardIds, request, task.getStartTime());
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
DiscoveryNode node = nodes.get(nodeId);
if (node == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ setup:
]
}

- do:
security.put_role:
name: "dls_alias_role"
body: >
{
"indices": [
{ "names": ["alias_security"], "privileges": ["read"], "query": "{\"term\": {\"ck\": \"const\"}}" }
]
}

- do:
security.put_role:
name: "dls_none_role"
Expand All @@ -57,6 +67,16 @@ setup:
"full_name" : "user with access to all docs in test_security index (using DLS)"
}

- do:
security.put_user:
username: "dls_alias_user"
body: >
{
"password" : "x-pack-test-password",
"roles" : [ "dls_alias_role" ],
"full_name" : "user with access to all docs in test_security index (using DLS)"
}

- do:
security.put_role:
name: "dls_some_role"
Expand Down Expand Up @@ -143,6 +163,8 @@ setup:
indices.create:
index: test_security
body:
aliases:
alias_security: {}
settings:
index:
number_of_shards: 1
Expand Down Expand Up @@ -198,6 +220,16 @@ teardown:
security.delete_role:
name: "dls_all_role"
ignore: 404

- do:
security.delete_user:
username: "dls_alias_user"
ignore: 404

- do:
security.delete_role:
name: "dls_alias_role"
ignore: 404
- do:
security.delete_role:
name: "dls_none_role"
Expand Down Expand Up @@ -289,6 +321,7 @@ teardown:
index: test_k
body: {"field": "foo"}
- length: {terms: 1}

---
"Test search after keyword field":
- do:
Expand Down Expand Up @@ -389,6 +422,7 @@ teardown:
terms_enum:
index: test_*
body: {"field": "foo", "string":"b", "timeout": "2m"}

---
"Test security":

Expand All @@ -406,6 +440,13 @@ teardown:
body: {"field": "foo", "string":"b"}
- length: {terms: 1}

- do:
headers: { Authorization: "Basic ZGxzX2FsaWFzX3VzZXI6eC1wYWNrLXRlc3QtcGFzc3dvcmQ=" } # dls_alias_user sees all docs through the alias
terms_enum:
index: alias_security
body: { "field": "foo", "string": "b" }
- length: { terms: 1 }

- do:
headers: { Authorization: "Basic ZGxzX3NvbWVfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" } # dls_some_user sees selected docs
terms_enum:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,20 @@ setup:
]
}

- do:
security.put_role:
name: "terms_enum_alias_role"
body: >
{
"cluster": ["all"],
"indices": [
{
"names": ["my_remote_cluster:terms_enum_alias"],
"privileges": ["read"]
}
]
}

- do:
security.put_user:
username: "joe_all"
Expand All @@ -30,6 +44,15 @@ setup:
"roles" : [ "terms_enum_all_role" ]
}

- do:
security.put_user:
username: "joe_alias"
body: >
{
"password": "s3krit-password",
"roles" : [ "terms_enum_alias_role" ]
}

- do:
security.put_role:
name: "terms_enum_none_role"
Expand Down Expand Up @@ -82,6 +105,10 @@ teardown:
security.delete_user:
username: "joe_all"
ignore: 404
- do:
security.delete_user:
username: "joe_alias"
ignore: 404
- do:
security.delete_user:
username: "joe_none"
Expand All @@ -94,6 +121,10 @@ teardown:
security.delete_role:
name: "terms_enum_all_role"
ignore: 404
- do:
security.delete_role:
name: "terms_enum_alias_role"
ignore: 404
- do:
security.delete_role:
name: "terms_enum_none_role"
Expand Down Expand Up @@ -123,6 +154,15 @@ teardown:
- match: { terms.0: "zar" }
- match: { complete: true }

- do:
headers: { Authorization: "Basic am9lX2FsaWFzOnMza3JpdC1wYXNzd29yZA==" } # joe_alias can see all docs through alias
terms_enum:
index: my_remote_cluster:terms_enum_alias
body: { "field": "foo", "search_after": "foobar" }
- length: { terms: 1 }
- match: { terms.0: "zar" }
- match: { complete: true }

- do:
headers: { Authorization: "Basic am9lX25vbmU6czNrcml0LXBhc3N3b3Jk" } # joe_none can't see docs
terms_enum:
Expand Down
Loading