Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
return;
}

final Client leaderClient = client.getRemoteClusterClient(clusterAlias);
hasPrivilegesToFollowIndices(leaderClient, new String[] {leaderIndex}, e -> {
final Client remoteCluster = client.getRemoteClusterClient(clusterAlias);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one I think should be remoteClient?

hasPrivilegesToFollowIndices(remoteCluster, new String[] {leaderIndex}, e -> {
if (e == null) {
fetchLeaderHistoryUUIDs(leaderClient, leaderIndexMetaData, onFailure, historyUUIDs ->
fetchLeaderHistoryUUIDs(remoteCluster, leaderIndexMetaData, onFailure, historyUUIDs ->
consumer.accept(historyUUIDs, leaderIndexMetaData));
} else {
onFailure.accept(e);
Expand Down Expand Up @@ -179,7 +179,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState(
*
* @param client the client
* @param clusterAlias the remote cluster alias
* @param leaderClient the leader client to use to execute cluster state API
* @param remoteClient the remote client to use to execute cluster state API
* @param request the cluster state request
* @param onFailure the failure consumer
* @param leaderClusterStateConsumer the leader cluster state consumer
Expand All @@ -189,7 +189,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState(
private void checkRemoteClusterLicenseAndFetchClusterState(
final Client client,
final String clusterAlias,
final Client leaderClient,
final Client remoteClient,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterState> leaderClusterStateConsumer,
Expand All @@ -206,7 +206,7 @@ public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseChe
final ActionListener<ClusterStateResponse> clusterStateListener =
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
// following an index in remote cluster, so use remote client to fetch leader index metadata
leaderClient.admin().cluster().state(request, clusterStateListener);
remoteClient.admin().cluster().state(request, clusterStateListener);
} else {
onFailure.accept(nonCompliantLicense.apply(licenseCheck));
}
Expand All @@ -221,17 +221,17 @@ public void onFailure(final Exception e) {
}

/**
* Fetches the history UUIDs for leader index on per shard basis using the specified leaderClient.
* Fetches the history UUIDs for leader index on per shard basis using the specified remoteClient.
*
* @param leaderClient the leader client
* @param remoteClient the remote client
* @param leaderIndexMetaData the leader index metadata
* @param onFailure the failure consumer
* @param historyUUIDConsumer the leader index history uuid and consumer
*/
// NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs
// in case of following a local or a remote cluster.
public void fetchLeaderHistoryUUIDs(
final Client leaderClient,
final Client remoteClient,
final IndexMetaData leaderIndexMetaData,
final Consumer<Exception> onFailure,
final Consumer<String[]> historyUUIDConsumer) {
Expand Down Expand Up @@ -274,20 +274,20 @@ public void fetchLeaderHistoryUUIDs(
IndicesStatsRequest request = new IndicesStatsRequest();
request.clear();
request.indices(leaderIndex);
leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
remoteClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
}

/**
* Check if the user executing the current action has privileges to follow the specified indices on the cluster specified by the leader
* client. The specified callback will be invoked with null if the user has the necessary privileges to follow the specified indices,
* otherwise the callback will be invoked with an exception outlining the authorization error.
*
* @param leaderClient the leader client
* @param remoteClient the remote client
* @param indices the indices
* @param handler the callback
*/
public void hasPrivilegesToFollowIndices(final Client leaderClient, final String[] indices, final Consumer<Exception> handler) {
Objects.requireNonNull(leaderClient, "leaderClient");
public void hasPrivilegesToFollowIndices(final Client remoteClient, final String[] indices, final Consumer<Exception> handler) {
Objects.requireNonNull(remoteClient, "remoteClient");
Objects.requireNonNull(indices, "indices");
if (indices.length == 0) {
throw new IllegalArgumentException("indices must not be empty");
Expand All @@ -298,7 +298,7 @@ public void hasPrivilegesToFollowIndices(final Client leaderClient, final String
return;
}

ThreadContext threadContext = leaderClient.threadPool().getThreadContext();
ThreadContext threadContext = remoteClient.threadPool().getThreadContext();
SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext);
String username = securityContext.getUser().principal();

Expand Down Expand Up @@ -332,7 +332,7 @@ public void hasPrivilegesToFollowIndices(final Client leaderClient, final String
handler.accept(Exceptions.authorizationError(message.toString()));
}
};
leaderClient.execute(HasPrivilegesAction.INSTANCE, request, ActionListener.wrap(responseHandler, handler));
remoteClient.execute(HasPrivilegesAction.INSTANCE, request, ActionListener.wrap(responseHandler, handler));
}

public static Client wrapClient(Client client, Map<String, String> headers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> taskInProgress,
Map<String, String> headers) {
ShardFollowTask params = taskInProgress.getParams();
final Client leaderClient;
final Client remoteClient;
if (params.getRemoteCluster() != null) {
leaderClient = wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders());
remoteClient = wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders());
} else {
leaderClient = wrapClient(client, params.getHeaders());
remoteClient = wrapClient(client, params.getHeaders());
}
Client followerClient = wrapClient(client, params.getHeaders());
BiConsumer<TimeValue, Runnable> scheduler = (delay, command) -> {
Expand Down Expand Up @@ -124,7 +124,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex.getName());

leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
if (indexMetaData.getMappings().isEmpty()) {
assert indexMetaData.getMappingVersion() == 1;
Expand Down Expand Up @@ -186,7 +186,7 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum
}
}
};
leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
}

private void closeIndexUpdateSettingsAndOpenIndex(String followIndex,
Expand Down Expand Up @@ -240,7 +240,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
request.setMaxOperationCount(maxOperationCount);
request.setMaxBatchSize(params.getMaxReadRequestSize());
request.setPollTimeout(params.getReadPollTimeout());
leaderClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request,
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
return;
}
final Client leaderClient = client.getRemoteClusterClient(request.getRemoteCluster());
final Client remoteClient = client.getRemoteClusterClient(request.getRemoteCluster());
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
Expand All @@ -84,9 +84,9 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request,
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]);
ccrLicenseChecker.hasPrivilegesToFollowIndices(leaderClient, indices, e -> {
ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> {
if (e == null) {
leaderClient.admin().cluster().state(
remoteClient.admin().cluster().state(
clusterStateRequest,
ActionListener.wrap(
clusterStateResponse -> {
Expand Down