|
19 | 19 |
|
20 | 20 | package org.elasticsearch.action.admin.cluster.snapshots.get; |
21 | 21 |
|
22 | | -import org.apache.logging.log4j.core.util.Throwables; |
23 | 22 | import org.apache.lucene.util.CollectionUtil; |
24 | 23 | import org.elasticsearch.ElasticsearchException; |
25 | 24 | import org.elasticsearch.action.ActionListener; |
|
28 | 27 | import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; |
29 | 28 | import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; |
30 | 29 | import org.elasticsearch.action.support.ActionFilters; |
| 30 | +import org.elasticsearch.action.support.GroupedActionListener; |
31 | 31 | import org.elasticsearch.action.support.master.TransportMasterNodeAction; |
32 | 32 | import org.elasticsearch.cluster.ClusterState; |
33 | 33 | import org.elasticsearch.cluster.block.ClusterBlockException; |
34 | 34 | import org.elasticsearch.cluster.block.ClusterBlockLevel; |
35 | 35 | import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; |
36 | 36 | import org.elasticsearch.cluster.metadata.RepositoryMetaData; |
37 | 37 | import org.elasticsearch.cluster.service.ClusterService; |
| 38 | +import org.elasticsearch.common.collect.Tuple; |
38 | 39 | import org.elasticsearch.common.inject.Inject; |
39 | 40 | import org.elasticsearch.common.regex.Regex; |
40 | 41 | import org.elasticsearch.repositories.IndexId; |
|
50 | 51 | import java.util.Collections; |
51 | 52 | import java.util.HashMap; |
52 | 53 | import java.util.HashSet; |
| 54 | +import java.util.Iterator; |
53 | 55 | import java.util.List; |
54 | 56 | import java.util.Map; |
55 | 57 | import java.util.Set; |
56 | | -import java.util.concurrent.ExecutionException; |
57 | | -import java.util.concurrent.Future; |
58 | 58 | import java.util.stream.Collectors; |
59 | 59 |
|
60 | 60 | /** |
@@ -106,34 +106,45 @@ protected void masterOperation(final GetSnapshotsRequest request, final ClusterS |
106 | 106 |
|
107 | 107 | private void getMultipleReposSnapshotInfo(List<RepositoryMetaData> repos, String[] snapshots, boolean ignoreUnavailable, |
108 | 108 | boolean verbose, ActionListener<GetSnapshotsResponse> listener) { |
109 | | - List<Future<List<SnapshotInfo>>> futures = new ArrayList<>(repos.size()); |
| 109 | + GroupedActionListener<Tuple<String, Tuple<List<SnapshotInfo>, ElasticsearchException>>> groupedActionListener = |
| 110 | + new GroupedActionListener<>( |
| 111 | + ActionListener.map(listener, responses -> { |
| 112 | + assert repos.size() == responses.size(); |
| 113 | + |
| 114 | + Map<String, List<SnapshotInfo>> successfulResponses = new HashMap<>(); |
| 115 | + Map<String, ElasticsearchException> failedResponses = new HashMap<>(); |
| 116 | + |
| 117 | + Iterator<Tuple<String, Tuple<List<SnapshotInfo>, ElasticsearchException>>> it = responses.iterator(); |
| 118 | + |
| 119 | + while (it.hasNext()) { |
| 120 | + Tuple<String, Tuple<List<SnapshotInfo>, ElasticsearchException>> response = it.next(); |
| 121 | + String repo = response.v1(); |
| 122 | + Tuple<List<SnapshotInfo>, ElasticsearchException> result = response.v2(); |
| 123 | + if (result.v1() != null) { |
| 124 | + assert result.v2() == null; |
| 125 | + successfulResponses.put(repo, result.v1()); |
| 126 | + } else { |
| 127 | + assert result.v2() != null; |
| 128 | + failedResponses.put(repo, result.v2()); |
| 129 | + } |
| 130 | + } |
| 131 | + |
| 132 | + return new GetSnapshotsResponse(successfulResponses, failedResponses); |
| 133 | + }), repos.size()); |
110 | 134 |
|
111 | 135 | // run concurrently for all repos on GENERIC thread pool |
112 | 136 | for (final RepositoryMetaData repo : repos) { |
113 | | - futures.add(threadPool.executor(ThreadPool.Names.GENERIC).submit( |
114 | | - () -> getSingleRepoSnapshotInfo(repo.name(), snapshots, ignoreUnavailable, verbose))); |
115 | | - } |
116 | | - assert repos.size() == futures.size(); |
117 | | - |
118 | | - Map<String, List<SnapshotInfo>> successfulResponses = new HashMap<>(); |
119 | | - Map<String, ElasticsearchException> failedResponses = new HashMap<>(); |
120 | | - |
121 | | - for (int i = 0; i < repos.size(); i++) { |
122 | | - final String repo = repos.get(i).name(); |
123 | | - try { |
124 | | - successfulResponses.put(repo, futures.get(i).get()); |
125 | | - } catch (InterruptedException e) { |
126 | | - Throwables.rethrow(e); |
127 | | - } catch (ExecutionException e) { |
128 | | - if (e.getCause() instanceof ElasticsearchException) { |
129 | | - failedResponses.put(repo, (ElasticsearchException) e.getCause()); |
130 | | - } else { |
131 | | - Throwables.rethrow(e); |
132 | | - } |
133 | | - } |
| 137 | + threadPool.executor(ThreadPool.Names.GENERIC).execute( |
| 138 | + () -> { |
| 139 | + // Unfortunately, there is no Either in Java, so we use Tuple with only one value set |
| 140 | + try { |
| 141 | + groupedActionListener.onResponse(Tuple.tuple(repo.name(), |
| 142 | + Tuple.tuple(getSingleRepoSnapshotInfo(repo.name(), snapshots, ignoreUnavailable, verbose), null))); |
| 143 | + } catch (ElasticsearchException e) { |
| 144 | + groupedActionListener.onResponse(Tuple.tuple(repo.name(), Tuple.tuple(null, e))); |
| 145 | + } |
| 146 | + }); |
134 | 147 | } |
135 | | - |
136 | | - listener.onResponse(new GetSnapshotsResponse(successfulResponses, failedResponses)); |
137 | 148 | } |
138 | 149 |
|
139 | 150 | private List<SnapshotInfo> getSingleRepoSnapshotInfo(String repo, String[] snapshots, boolean ignoreUnavailable, boolean verbose) { |
|
0 commit comments