Skip to content

Commit b3859b7

Browse files
Add rest, transport, service layer changes for Tiering
Signed-off-by: Neetika Singhal <[email protected]>
1 parent fbe048f commit b3859b7

15 files changed

+1995
-1
lines changed

server/src/main/java/org/opensearch/action/ActionModule.java

+5
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,9 @@
288288
import org.opensearch.action.termvectors.TransportMultiTermVectorsAction;
289289
import org.opensearch.action.termvectors.TransportShardMultiTermsVectorAction;
290290
import org.opensearch.action.termvectors.TransportTermVectorsAction;
291+
import org.opensearch.action.tiering.RestWarmTieringAction;
292+
import org.opensearch.action.tiering.TransportWarmTieringAction;
293+
import org.opensearch.action.tiering.WarmTieringAction;
291294
import org.opensearch.action.update.TransportUpdateAction;
292295
import org.opensearch.action.update.UpdateAction;
293296
import org.opensearch.client.node.NodeClient;
@@ -633,6 +636,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
633636
actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
634637
actions.register(CloneSnapshotAction.INSTANCE, TransportCloneSnapshotAction.class);
635638
actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
639+
actions.register(WarmTieringAction.INSTANCE, TransportWarmTieringAction.class);
636640
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);
637641

638642
actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class);
@@ -964,6 +968,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
964968
registerHandler.accept(new RestNodeAttrsAction());
965969
registerHandler.accept(new RestRepositoriesAction());
966970
registerHandler.accept(new RestSnapshotAction());
971+
registerHandler.accept(new RestWarmTieringAction());
967972
registerHandler.accept(new RestTemplatesAction());
968973

969974
// Point in time API
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.tiering;
10+
11+
import org.opensearch.client.node.NodeClient;
12+
import org.opensearch.rest.BaseRestHandler;
13+
import org.opensearch.rest.RestHandler;
14+
import org.opensearch.rest.RestRequest;
15+
import org.opensearch.rest.action.RestToXContentListener;
16+
17+
import java.util.List;
18+
19+
import static java.util.Collections.singletonList;
20+
import static org.opensearch.rest.RestRequest.Method.POST;
21+
22+
/**
23+
* Rest Tiering API class to move index from hot to warm
24+
*
25+
* @opensearch.experimental
26+
*/
27+
public class RestWarmTieringAction extends BaseRestHandler {
28+
29+
@Override
30+
public List<RestHandler.Route> routes() {
31+
return singletonList(new RestHandler.Route(POST, "/{index}/_tier/{tier}"));
32+
}
33+
34+
@Override
35+
public String getName() {
36+
return "tiering_warm";
37+
}
38+
39+
@Override
40+
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
41+
final TieringIndexRequest tieringIndexRequest = new TieringIndexRequest(request.param("tier"), request.param("index"));
42+
tieringIndexRequest.timeout(request.paramAsTime("timeout", tieringIndexRequest.timeout()));
43+
tieringIndexRequest.clusterManagerNodeTimeout(
44+
request.paramAsTime("cluster_manager_timeout", tieringIndexRequest.clusterManagerNodeTimeout())
45+
);
46+
tieringIndexRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false));
47+
return channel -> client.admin().cluster().execute(WarmTieringAction.INSTANCE, tieringIndexRequest, new RestToXContentListener<>(channel));
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.tiering;
10+
11+
import org.opensearch.action.ActionRequestValidationException;
12+
import org.opensearch.action.IndicesRequest;
13+
import org.opensearch.action.support.IndicesOptions;
14+
import org.opensearch.action.support.master.AcknowledgedRequest;
15+
import org.opensearch.common.annotation.ExperimentalApi;
16+
import org.opensearch.core.common.io.stream.StreamInput;
17+
import org.opensearch.core.common.io.stream.StreamOutput;
18+
19+
import java.io.IOException;
20+
import java.util.Arrays;
21+
import java.util.Locale;
22+
import java.util.Objects;
23+
24+
import static org.opensearch.action.ValidateActions.addValidationError;
25+
26+
/**
27+
* Represents the tiering request for indices
28+
* to move to a different tier
29+
*
30+
* @opensearch.experimental
31+
*/
32+
@ExperimentalApi
33+
public class TieringIndexRequest extends AcknowledgedRequest<TieringIndexRequest> implements IndicesRequest.Replaceable {
34+
35+
private String[] indices;
36+
private TieringType tier;
37+
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, true);
38+
private boolean waitForCompletion;
39+
public TieringIndexRequest() {
40+
}
41+
42+
@Override
43+
public ActionRequestValidationException validate() {
44+
ActionRequestValidationException validationException = null;
45+
if (indices == null || indices.length == 0) {
46+
validationException = addValidationError("Mandatory parameter - indices is missing from the request", validationException);
47+
}
48+
if (tier == null) {
49+
validationException = addValidationError("Mandatory parameter - tier is missing from the request", validationException);
50+
}
51+
if (TieringType.HOT.equals(tier)) {
52+
validationException = addValidationError("The specified tiering to hot is not supported yet", validationException);
53+
}
54+
return validationException;
55+
}
56+
57+
public TieringIndexRequest(String tier, String... indices) {
58+
this.tier = TieringType.fromString(tier);
59+
this.indices = indices;
60+
}
61+
62+
public TieringIndexRequest(StreamInput in) throws IOException {
63+
super(in);
64+
indices = in.readStringArray();
65+
tier = TieringType.fromString(in.readString());
66+
indicesOptions = IndicesOptions.readIndicesOptions(in);
67+
waitForCompletion = in.readBoolean();
68+
}
69+
70+
@Override
71+
public void writeTo(StreamOutput out) throws IOException {
72+
super.writeTo(out);
73+
out.writeStringArray(indices);
74+
out.writeString(tier.value());
75+
indicesOptions.writeIndicesOptions(out);
76+
out.writeBoolean(waitForCompletion);
77+
}
78+
79+
@Override
80+
public String[] indices() {
81+
return indices;
82+
}
83+
84+
@Override
85+
public IndicesOptions indicesOptions() {
86+
return indicesOptions;
87+
}
88+
89+
@Override
90+
public TieringIndexRequest indices(String... indices) {
91+
this.indices = indices;
92+
return this;
93+
}
94+
95+
public TieringIndexRequest indicesOptions(IndicesOptions indicesOptions) {
96+
this.indicesOptions = indicesOptions;
97+
return this;
98+
}
99+
100+
/**
101+
* If this parameter is set to true the operation will wait for completion of tiering process before returning.
102+
*
103+
* @param waitForCompletion if true the operation will wait for completion
104+
* @return this request
105+
*/
106+
public TieringIndexRequest waitForCompletion(boolean waitForCompletion) {
107+
this.waitForCompletion = waitForCompletion;
108+
return this;
109+
}
110+
111+
/**
112+
* Returns wait for completion setting
113+
*
114+
* @return true if the operation will wait for completion
115+
*/
116+
public boolean waitForCompletion() {
117+
return waitForCompletion;
118+
}
119+
120+
public TieringType tier() {
121+
return tier;
122+
}
123+
124+
public TieringIndexRequest tier(TieringType tier) {
125+
this.tier = tier;
126+
return this;
127+
}
128+
129+
@Override
130+
public boolean equals(Object o) {
131+
if (this == o) {
132+
return true;
133+
}
134+
if (o == null || getClass() != o.getClass()) {
135+
return false;
136+
}
137+
TieringIndexRequest that = (TieringIndexRequest) o;
138+
return clusterManagerNodeTimeout.equals(that.clusterManagerNodeTimeout)
139+
&& timeout.equals(that.timeout)
140+
&& Objects.equals(indicesOptions, that.indicesOptions)
141+
&& Arrays.equals(indices, that.indices)
142+
&& tier.equals(that.tier)
143+
&& waitForCompletion == that.waitForCompletion;
144+
}
145+
146+
@Override
147+
public int hashCode() {
148+
return Objects.hash(clusterManagerNodeTimeout, timeout, indicesOptions, waitForCompletion, Arrays.hashCode(indices));
149+
}
150+
151+
@ExperimentalApi
152+
public enum TieringType {
153+
HOT,
154+
WARM;
155+
156+
public static TieringType fromString(String name) {
157+
String upperCase = name.trim().toUpperCase(Locale.ROOT);
158+
if (HOT.name().equals(upperCase)) {
159+
return HOT;
160+
}
161+
if (WARM.name().equals(upperCase)) {
162+
return WARM;
163+
}
164+
throw new IllegalArgumentException("Tiering type [" + name + "] is not supported. Supported types are " + HOT + " and " + WARM);
165+
}
166+
167+
public String value() {
168+
return name().toLowerCase(Locale.ROOT);
169+
}
170+
}
171+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.tiering;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
14+
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
15+
import org.opensearch.action.support.ActionFilters;
16+
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
17+
import org.opensearch.action.support.master.AcknowledgedResponse;
18+
import org.opensearch.client.Client;
19+
import org.opensearch.cluster.ClusterState;
20+
import org.opensearch.cluster.block.ClusterBlockException;
21+
import org.opensearch.cluster.block.ClusterBlockLevel;
22+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
23+
import org.opensearch.cluster.service.ClusterService;
24+
import org.opensearch.common.inject.Inject;
25+
import org.opensearch.core.action.ActionListener;
26+
import org.opensearch.core.common.io.stream.StreamInput;
27+
import org.opensearch.threadpool.ThreadPool;
28+
import org.opensearch.tiering.TieringClusterStateListener;
29+
import org.opensearch.tiering.TieringService;
30+
import org.opensearch.transport.TransportService;
31+
32+
import java.io.IOException;
33+
34+
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
35+
36+
/**
37+
* Transport Tiering API class to move index from hot to warm
38+
*
39+
* @opensearch.experimental
40+
*/
41+
public class TransportWarmTieringAction extends TransportClusterManagerNodeAction<TieringIndexRequest, AcknowledgedResponse> {
42+
43+
private static final Logger logger = LogManager.getLogger(TransportWarmTieringAction.class);
44+
private final TieringService tieringService;
45+
private final Client client;
46+
@Inject
47+
public TransportWarmTieringAction(TransportService transportService, ClusterService clusterService,
48+
ThreadPool threadPool, TieringService tieringService,
49+
ActionFilters actionFilters,
50+
IndexNameExpressionResolver indexNameExpressionResolver, Client client) {
51+
super(WarmTieringAction.NAME, transportService, clusterService, threadPool, actionFilters,
52+
TieringIndexRequest::new, indexNameExpressionResolver);
53+
this.client = client;
54+
this.tieringService = tieringService;
55+
}
56+
57+
@Override
58+
protected String executor() {
59+
return ThreadPool.Names.SAME;
60+
}
61+
62+
@Override
63+
protected AcknowledgedResponse read(StreamInput in) throws IOException {
64+
return new AcknowledgedResponse(in);
65+
}
66+
67+
@Override
68+
protected ClusterBlockException checkBlock(TieringIndexRequest request, ClusterState state) {
69+
ClusterBlockException blockException =
70+
state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
71+
if (blockException == null) {
72+
// Check indices level block
73+
blockException = state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE,
74+
indexNameExpressionResolver.concreteIndexNames(clusterService.state(), request));
75+
}
76+
return blockException;
77+
}
78+
79+
@Override
80+
protected void clusterManagerOperation(TieringIndexRequest request, ClusterState state,
81+
ActionListener<AcknowledgedResponse> listener) throws Exception {
82+
// Collect node stats to get node level filesystem info. The response will be used for performing some
83+
// validations.
84+
client.admin().cluster().prepareNodesStats().clear().addMetric(FS.metricName()).execute(
85+
new ActionListener<NodesStatsResponse>() {
86+
@Override
87+
public void onResponse(NodesStatsResponse nodesStatsResponse) {
88+
// Collect index level stats. This response is also used for validations.
89+
client.admin().indices().prepareStats().clear().setStore(true).setIndices(request.indices()).execute(
90+
new ActionListener<IndicesStatsResponse>() {
91+
@Override
92+
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
93+
tieringService.tier(request, nodesStatsResponse, indicesStatsResponse,
94+
ActionListener.delegateFailure(listener, (delegatedListener, acknowledgedResponse) -> {
95+
if (request.waitForCompletion()) {
96+
TieringClusterStateListener.createAndRegisterListener(
97+
clusterService,
98+
new AcknowledgedResponse(acknowledgedResponse.isAcknowledged()),
99+
delegatedListener
100+
);
101+
} else {
102+
delegatedListener.onResponse(new AcknowledgedResponse(true));
103+
}
104+
}));
105+
}
106+
@Override
107+
public void onFailure(Exception e) {
108+
logger.debug("Indices stats call failed with exception", e);
109+
listener.onFailure(e);
110+
}
111+
});
112+
}
113+
@Override
114+
public void onFailure(Exception e) {
115+
logger.debug("Node stats call failed with exception", e);
116+
listener.onFailure(e);
117+
}
118+
});
119+
}
120+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.tiering;
10+
11+
import org.opensearch.action.ActionType;
12+
import org.opensearch.action.support.master.AcknowledgedResponse;
13+
14+
/**
15+
* Tiering action class to move index from hot to warm
16+
*
17+
* @opensearch.experimental
18+
*/
19+
public class WarmTieringAction extends ActionType<AcknowledgedResponse> {
20+
21+
public static final WarmTieringAction INSTANCE = new WarmTieringAction();
22+
public static final String NAME = "indices:admin/tiering/warm";
23+
24+
public WarmTieringAction() {
25+
super(NAME, AcknowledgedResponse::new);
26+
}
27+
}

0 commit comments

Comments
 (0)