From d21b2ab4e69cd27cfe4cb9929b44cc36026bc8d0 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 8 Nov 2018 11:47:03 +0100 Subject: [PATCH 1/2] [CCR] Rename leaderClient variables and parameters to remoteClient --- .../xpack/ccr/CcrLicenseChecker.java | 30 +++++++++---------- .../ccr/action/ShardFollowTasksExecutor.java | 12 ++++---- .../TransportPutAutoFollowPatternAction.java | 6 ++-- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index e056e312819f9..ada9832d8dbaf 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -128,10 +128,10 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( return; } - final Client leaderClient = client.getRemoteClusterClient(clusterAlias); - hasPrivilegesToFollowIndices(leaderClient, new String[] {leaderIndex}, e -> { + final Client remoteCluster = client.getRemoteClusterClient(clusterAlias); + hasPrivilegesToFollowIndices(remoteCluster, new String[] {leaderIndex}, e -> { if (e == null) { - fetchLeaderHistoryUUIDs(leaderClient, leaderIndexMetaData, onFailure, historyUUIDs -> + fetchLeaderHistoryUUIDs(remoteCluster, leaderIndexMetaData, onFailure, historyUUIDs -> consumer.accept(historyUUIDs, leaderIndexMetaData)); } else { onFailure.accept(e); @@ -179,7 +179,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState( * * @param client the client * @param clusterAlias the remote cluster alias - * @param leaderClient the leader client to use to execute cluster state API + * @param remoteClient the remote client to use to execute cluster state API * @param request the cluster state request * @param onFailure the failure consumer * @param leaderClusterStateConsumer the leader cluster state consumer @@ -189,7 +189,7 @@ public void checkRemoteClusterLicenseAndFetchClusterState( private void checkRemoteClusterLicenseAndFetchClusterState( final Client client, final String clusterAlias, - final Client leaderClient, + final Client remoteClient, final ClusterStateRequest request, final Consumer onFailure, final Consumer leaderClusterStateConsumer, @@ -206,7 +206,7 @@ public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseChe final ActionListener clusterStateListener = ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure); // following an index in remote cluster, so use remote client to fetch leader index metadata - leaderClient.admin().cluster().state(request, clusterStateListener); + remoteClient.admin().cluster().state(request, clusterStateListener); } else { onFailure.accept(nonCompliantLicense.apply(licenseCheck)); } @@ -221,9 +221,9 @@ public void onFailure(final Exception e) { } /** - * Fetches the history UUIDs for leader index on per shard basis using the specified leaderClient. + * Fetches the history UUIDs for leader index on per shard basis using the specified remoteClient. * - * @param leaderClient the leader client + * @param remoteClient the remote client * @param leaderIndexMetaData the leader index metadata * @param onFailure the failure consumer * @param historyUUIDConsumer the leader index history uuid and consumer @@ -231,7 +231,7 @@ public void onFailure(final Exception e) { // NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs // in case of following a local or a remote cluster. public void fetchLeaderHistoryUUIDs( - final Client leaderClient, + final Client remoteClient, final IndexMetaData leaderIndexMetaData, final Consumer onFailure, final Consumer historyUUIDConsumer) { @@ -274,7 +274,7 @@ public void fetchLeaderHistoryUUIDs( IndicesStatsRequest request = new IndicesStatsRequest(); request.clear(); request.indices(leaderIndex); - leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure)); + remoteClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure)); } /** @@ -282,12 +282,12 @@ public void fetchLeaderHistoryUUIDs( * client. The specified callback will be invoked with null if the user has the necessary privileges to follow the specified indices, * otherwise the callback will be invoked with an exception outlining the authorization error. * - * @param leaderClient the leader client + * @param remoteClient the remote client * @param indices the indices * @param handler the callback */ - public void hasPrivilegesToFollowIndices(final Client leaderClient, final String[] indices, final Consumer handler) { - Objects.requireNonNull(leaderClient, "leaderClient"); + public void hasPrivilegesToFollowIndices(final Client remoteClient, final String[] indices, final Consumer handler) { + Objects.requireNonNull(remoteClient, "remoteClient"); Objects.requireNonNull(indices, "indices"); if (indices.length == 0) { throw new IllegalArgumentException("indices must not be empty"); @@ -298,7 +298,7 @@ public void hasPrivilegesToFollowIndices(final Client leaderClient, final String return; } - ThreadContext threadContext = leaderClient.threadPool().getThreadContext(); + ThreadContext threadContext = remoteClient.threadPool().getThreadContext(); SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); String username = securityContext.getUser().principal(); @@ -332,7 +332,7 @@ public void hasPrivilegesToFollowIndices(final Client leaderClient, final String handler.accept(Exceptions.authorizationError(message.toString())); } }; - leaderClient.execute(HasPrivilegesAction.INSTANCE, request, ActionListener.wrap(responseHandler, handler)); + remoteClient.execute(HasPrivilegesAction.INSTANCE, request, ActionListener.wrap(responseHandler, handler)); } public static Client wrapClient(Client client, Map headers) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 79de49c7a28f2..797c08cc973ee 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -91,11 +91,11 @@ protected AllocatedPersistentTask createTask(long id, String type, String action PersistentTasksCustomMetaData.PersistentTask taskInProgress, Map headers) { ShardFollowTask params = taskInProgress.getParams(); - final Client leaderClient; + final Client remoteClient; if (params.getRemoteCluster() != null) { - leaderClient = wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders()); + remoteClient = wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders()); } else { - leaderClient = wrapClient(client, params.getHeaders()); + remoteClient = wrapClient(client, params.getHeaders()); } Client followerClient = wrapClient(client, params.getHeaders()); BiConsumer scheduler = (delay, command) -> { @@ -124,7 +124,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro clusterStateRequest.metaData(true); clusterStateRequest.indices(leaderIndex.getName()); - leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { + remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); if (indexMetaData.getMappings().isEmpty()) { assert indexMetaData.getMappingVersion() == 1; @@ -186,7 +186,7 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum } } }; - leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); + remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); } private void closeIndexUpdateSettingsAndOpenIndex(String followIndex, @@ -240,7 +240,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co request.setMaxOperationCount(maxOperationCount); request.setMaxBatchSize(params.getMaxReadRequestSize()); request.setPollTimeout(params.getReadPollTimeout()); - leaderClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); + remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); } }; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index 73c69bb7c48a3..3d8439fb536dc 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -74,7 +74,7 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request, listener.onFailure(LicenseUtils.newComplianceException("ccr")); return; } - final Client leaderClient = client.getRemoteClusterClient(request.getRemoteCluster()); + final Client remoteClient = client.getRemoteClusterClient(request.getRemoteCluster()); final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); clusterStateRequest.clear(); clusterStateRequest.metaData(true); @@ -84,9 +84,9 @@ protected void masterOperation(PutAutoFollowPatternAction.Request request, .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]); - ccrLicenseChecker.hasPrivilegesToFollowIndices(leaderClient, indices, e -> { + ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> { if (e == null) { - leaderClient.admin().cluster().state( + remoteClient.admin().cluster().state( clusterStateRequest, ActionListener.wrap( clusterStateResponse -> { From c048d576e6256ffe6c17e4bb4d9e44c21492b596 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 8 Nov 2018 13:06:37 +0100 Subject: [PATCH 2/2] whoops --- .../java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index ada9832d8dbaf..3985b90a71b23 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -128,10 +128,10 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( return; } - final Client remoteCluster = client.getRemoteClusterClient(clusterAlias); - hasPrivilegesToFollowIndices(remoteCluster, new String[] {leaderIndex}, e -> { + final Client remoteClient = client.getRemoteClusterClient(clusterAlias); + hasPrivilegesToFollowIndices(remoteClient, new String[] {leaderIndex}, e -> { if (e == null) { - fetchLeaderHistoryUUIDs(remoteCluster, leaderIndexMetaData, onFailure, historyUUIDs -> + fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, onFailure, historyUUIDs -> consumer.accept(historyUUIDs, leaderIndexMetaData)); } else { onFailure.accept(e);