Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,23 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.remote.RemoteClusterNodesAction;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchShardsAction;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction;
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;
import org.elasticsearch.xpack.security.Security;
import org.elasticsearch.xpack.security.audit.AuditUtil;
import org.elasticsearch.xpack.security.authc.CrossClusterAccessAuthenticationService;
import org.elasticsearch.xpack.security.authz.AuthorizationService;

import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;
import static org.elasticsearch.xpack.core.security.authc.CrossClusterAccessSubjectInfo.CROSS_CLUSTER_ACCESS_SUBJECT_INFO_HEADER_KEY;
import static org.elasticsearch.xpack.security.authc.CrossClusterAccessHeaders.CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY;

Expand All @@ -63,56 +45,6 @@ final class CrossClusterAccessServerTransportFilter extends ServerTransportFilte
ALLOWED_TRANSPORT_HEADERS = Set.copyOf(allowedHeaders);
}

// package private for testing
static final Set<String> CROSS_CLUSTER_ACCESS_ACTION_ALLOWLIST;
static {
CROSS_CLUSTER_ACCESS_ACTION_ALLOWLIST = Stream.concat(
// These actions have proxy equivalents, so we need to allow-list the action name and the action name with the proxy action
// prefix
Stream.of(
SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME,
SearchTransportService.FREE_CONTEXT_ACTION_NAME,
SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
SearchTransportService.DFS_ACTION_NAME,
SearchTransportService.QUERY_ACTION_NAME,
SearchTransportService.QUERY_ID_ACTION_NAME,
SearchTransportService.QUERY_SCROLL_ACTION_NAME,
SearchTransportService.QUERY_FETCH_SCROLL_ACTION_NAME,
SearchTransportService.FETCH_ID_SCROLL_ACTION_NAME,
SearchTransportService.FETCH_ID_ACTION_NAME,
SearchTransportService.QUERY_CAN_MATCH_NAME,
SearchTransportService.QUERY_CAN_MATCH_NODE_NAME,
TransportOpenPointInTimeAction.OPEN_SHARD_READER_CONTEXT_NAME,
// CCR actions
"indices:data/read/xpack/ccr/shard_changes",
"indices:internal/admin/ccr/restore/session/clear",
"indices:internal/admin/ccr/restore/file_chunk/get"
).flatMap(name -> Stream.of(name, TransportActionProxy.getProxyAction(name))),
// These actions don't have proxy equivalents
Stream.of(
REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME,
RemoteClusterNodesAction.NAME,
SearchAction.NAME,
ClusterSearchShardsAction.NAME,
SearchShardsAction.NAME,
ResolveIndexAction.NAME,
FieldCapabilitiesAction.NAME,
FieldCapabilitiesAction.NAME + "[n]",
"indices:data/read/eql",
XPackInfoAction.NAME,
GetCheckpointAction.NAME,
// CCR actions
ClusterStateAction.NAME,
HasPrivilegesAction.NAME,
IndicesStatsAction.NAME,
RetentionLeaseActions.Add.ACTION_NAME,
RetentionLeaseActions.Remove.ACTION_NAME,
RetentionLeaseActions.Renew.ACTION_NAME,
"indices:internal/admin/ccr/restore/session/put"
)
).collect(Collectors.toUnmodifiableSet());
}

private final CrossClusterAccessAuthenticationService crossClusterAccessAuthcService;
private final XPackLicenseState licenseState;

Expand Down Expand Up @@ -150,17 +82,6 @@ protected void authenticate(
authenticationListener,
LicenseUtils.newComplianceException(Security.ADVANCED_REMOTE_CLUSTER_SECURITY_FEATURE.getName())
);
} else if (false == CROSS_CLUSTER_ACCESS_ACTION_ALLOWLIST.contains(securityAction)) {
onFailureWithDebugLog(
securityAction,
request,
authenticationListener,
new IllegalArgumentException(
"action ["
+ securityAction
+ "] is not allowed as a cross cluster operation on the dedicated remote cluster server port"
)
);
} else {
try {
validateHeaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import static org.elasticsearch.xpack.core.security.support.Exceptions.authenticationError;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We gotta fix up testInboundDestructiveOperations as well.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch 👍 I totally missed it

import static org.elasticsearch.xpack.core.security.support.Exceptions.authorizationError;
import static org.elasticsearch.xpack.security.authc.CrossClusterAccessHeaders.CROSS_CLUSTER_ACCESS_CREDENTIALS_HEADER_KEY;
import static org.elasticsearch.xpack.security.transport.CrossClusterAccessServerTransportFilter.CROSS_CLUSTER_ACCESS_ACTION_ALLOWLIST;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -109,66 +108,42 @@ public void testInbound() {
public void testCrossClusterAccessInbound() {
TransportRequest request = mock(TransportRequest.class);
Authentication authentication = AuthenticationTestHelper.builder().build();
boolean allowlisted = randomBoolean();
String action = allowlisted ? randomFrom(CROSS_CLUSTER_ACCESS_ACTION_ALLOWLIST) : "_action";
String action = randomAlphaOfLengthBetween(10, 20);
doAnswer(getAnswer(authentication)).when(authcService).authenticate(eq(action), eq(request), eq(true), anyActionListener());
doAnswer(getAnswer(authentication, true)).when(crossClusterAccessAuthcService)
.authenticate(eq(action), eq(request), anyActionListener());
ServerTransportFilter filter = getNodeCrossClusterAccessFilter();
PlainActionFuture<Void> listener = spy(new PlainActionFuture<>());
filter.inbound(action, request, channel, listener);
if (allowlisted) {
verify(authzService).authorize(eq(authentication), eq(action), eq(request), anyActionListener());
verify(crossClusterAccessAuthcService).authenticate(anyString(), any(), anyActionListener());
verify(authcService, never()).authenticate(anyString(), any(), anyBoolean(), anyActionListener());
} else {
var actual = expectThrows(IllegalArgumentException.class, listener::actionGet);
assertThat(
actual.getMessage(),
equalTo("action [" + action + "] is not allowed as a cross cluster operation on the dedicated remote cluster server port")
);
verify(authcService, never()).authenticate(anyString(), any(), anyBoolean(), anyActionListener());
verify(crossClusterAccessAuthcService, never()).authenticate(anyString(), any(), anyActionListener());
verifyNoMoreInteractions(authzService);
}
verify(authzService).authorize(eq(authentication), eq(action), eq(request), anyActionListener());
verify(crossClusterAccessAuthcService).authenticate(anyString(), any(), anyActionListener());
verify(authcService, never()).authenticate(anyString(), any(), anyBoolean(), anyActionListener());
}

public void testCrossClusterAccessInboundInvalidHeadersFail() {
TransportRequest request = mock(TransportRequest.class);
Authentication authentication = AuthenticationTestHelper.builder().build();
boolean allowlisted = randomBoolean();
String action = allowlisted ? randomFrom(CROSS_CLUSTER_ACCESS_ACTION_ALLOWLIST) : "_action";
String action = randomAlphaOfLengthBetween(10, 20);
doAnswer(getAnswer(authentication)).when(authcService).authenticate(eq(action), eq(request), eq(true), anyActionListener());
doAnswer(getAnswer(authentication, true)).when(crossClusterAccessAuthcService)
.authenticate(eq(action), eq(request), anyActionListener());
ServerTransportFilter filter = getNodeCrossClusterAccessFilter(Set.copyOf(randomNonEmptySubsetOf(SECURITY_HEADER_FILTERS)));
PlainActionFuture<Void> listener = new PlainActionFuture<>();
filter.inbound(action, request, channel, listener);
var actual = expectThrows(IllegalArgumentException.class, listener::actionGet);
if (allowlisted) {
verifyNoMoreInteractions(authcService);
verifyNoMoreInteractions(authzService);
assertThat(
actual.getMessage(),
containsString("is not allowed for cross cluster requests through the dedicated remote cluster server port")
);
} else {
verify(authcService, never()).authenticate(anyString(), any(), anyBoolean(), anyActionListener());
verify(crossClusterAccessAuthcService, never()).authenticate(anyString(), any(), anyActionListener());
verifyNoMoreInteractions(authzService);
assertThat(
actual.getMessage(),
equalTo("action [" + action + "] is not allowed as a cross cluster operation on the dedicated remote cluster server port")
);
}
verifyNoMoreInteractions(authcService);
verifyNoMoreInteractions(authzService);
assertThat(
actual.getMessage(),
containsString("is not allowed for cross cluster requests through the dedicated remote cluster server port")
);
verify(crossClusterAccessAuthcService, never()).authenticate(anyString(), any(), anyActionListener());
}

public void testCrossClusterAccessInboundMissingHeadersFail() {
TransportRequest request = mock(TransportRequest.class);
Authentication authentication = AuthenticationTestHelper.builder().build();
boolean allowlisted = randomBoolean();
String action = allowlisted ? randomFrom(CROSS_CLUSTER_ACCESS_ACTION_ALLOWLIST) : "_action";
String action = randomAlphaOfLengthBetween(10, 20);
doAnswer(getAnswer(authentication)).when(authcService).authenticate(eq(action), eq(request), eq(true), anyActionListener());
doAnswer(getAnswer(authentication, true)).when(crossClusterAccessAuthcService)
.authenticate(eq(action), eq(request), anyActionListener());
Expand Down Expand Up @@ -198,27 +173,17 @@ public void testCrossClusterAccessInboundMissingHeadersFail() {
filter.inbound(action, request, channel, listener);
var actual = expectThrows(IllegalArgumentException.class, listener::actionGet);

if (allowlisted) {
verifyNoMoreInteractions(authcService);
verifyNoMoreInteractions(authzService);
assertThat(
actual.getMessage(),
equalTo(
"Cross cluster requests through the dedicated remote cluster server port require transport header ["
+ firstMissingHeader
+ "] but none found. "
+ "Please ensure you have configured remote cluster credentials on the cluster originating the request."
)
);
} else {
verify(authcService, never()).authenticate(anyString(), any(), anyBoolean(), anyActionListener());
verify(crossClusterAccessAuthcService, never()).authenticate(anyString(), any(), anyActionListener());
verifyNoMoreInteractions(authzService);
assertThat(
actual.getMessage(),
equalTo("action [" + action + "] is not allowed as a cross cluster operation on the dedicated remote cluster server port")
);
}
verifyNoMoreInteractions(authcService);
verifyNoMoreInteractions(authzService);
assertThat(
actual.getMessage(),
equalTo(
"Cross cluster requests through the dedicated remote cluster server port require transport header ["
+ firstMissingHeader
+ "] but none found. "
+ "Please ensure you have configured remote cluster credentials on the cluster originating the request."
)
);
verify(crossClusterAccessAuthcService, never()).authenticate(anyString(), any(), anyActionListener());
}

Expand All @@ -230,26 +195,14 @@ public void testInboundDestructiveOperations() {
);
Authentication authentication = AuthenticationTestHelper.builder().build();
doAnswer(getAnswer(authentication)).when(authcService).authenticate(eq(action), eq(request), eq(true), anyActionListener());
boolean crossClusterAccess = randomBoolean();
ServerTransportFilter filter = crossClusterAccess ? getNodeCrossClusterAccessFilter() : getNodeFilter();
ServerTransportFilter filter = getNodeFilter();
PlainActionFuture<Void> listener = spy(new PlainActionFuture<>());
filter.inbound(action, request, channel, listener);
if (failDestructiveOperations) {
expectThrows(IllegalArgumentException.class, listener::actionGet);
verifyNoMoreInteractions(authzService);
} else {
if (crossClusterAccess) {
var actual = expectThrows(IllegalArgumentException.class, listener::actionGet);
assertThat(
actual.getMessage(),
equalTo(
"action [" + action + "] is not allowed as a cross cluster operation on the dedicated remote cluster server port"
)
);
verifyNoMoreInteractions(authzService);
} else {
verify(authzService).authorize(eq(authentication), eq(action), eq(request), anyActionListener());
}
verify(authzService).authorize(eq(authentication), eq(action), eq(request), anyActionListener());
}
}

Expand Down Expand Up @@ -279,9 +232,7 @@ public void testInboundAuthenticationException() {
public void testCrossClusterAccessInboundAuthenticationException() {
TransportRequest request = mock(TransportRequest.class);
Exception authE = authenticationError("authc failed");
// Only pick allowlisted action -- it does not make sense to pick one that isn't because we will never get to authenticate in that
// case
String action = randomFrom(CROSS_CLUSTER_ACCESS_ACTION_ALLOWLIST);
String action = randomAlphaOfLengthBetween(10, 20);
doAnswer(i -> {
final Object[] args = i.getArguments();
assertThat(args, arrayWithSize(3));
Expand Down Expand Up @@ -344,7 +295,7 @@ public void testCrossClusterAccessInboundFailsWithUnsupportedLicense() {

ServerTransportFilter crossClusterAccessFilter = getNodeCrossClusterAccessFilter(unsupportedLicenseState);
PlainActionFuture<Void> listener = new PlainActionFuture<>();
String action = randomBoolean() ? randomFrom(CROSS_CLUSTER_ACCESS_ACTION_ALLOWLIST) : "_action";
String action = randomAlphaOfLengthBetween(10, 20);
crossClusterAccessFilter.inbound(action, mock(TransportRequest.class), channel, listener);

ElasticsearchSecurityException actualException = expectThrows(ElasticsearchSecurityException.class, listener::actionGet);
Expand Down