Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix node bootstrap error when enable stream transport and remote cluster state ([#19948](https://github.com/opensearch-project/OpenSearch/pull/19948))
- Keep track and release Reactor Netty 4 Transport accepted Http Channels during the Node shutdown ([#20106](https://github.com/opensearch-project/OpenSearch/pull/20106))
- Fix deletion failure/error of unused index template; case when an index template matches a data stream but has a lower priority. ([#20102](https://github.com/opensearch-project/OpenSearch/pull/20102))
- Fixed version incompatibility in remote state entities using bytestream for ser/de ([#20080](https://github.com/opensearch-project/OpenSearch/pull/20080))
- Fix toBuilder method in EngineConfig to include mergedSegmentTransferTracker([#20105](https://github.com/opensearch-project/OpenSearch/pull/20105))
- Fixed handling of property index in BulkRequest during deserialization ([#20132](https://github.com/opensearch-project/OpenSearch/pull/20132))
- Fix negative CPU usage values in node stats ([#19120](https://github.com/opensearch-project/OpenSearch/issues/19120))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.routing.IndexRoutingTable;
Expand Down Expand Up @@ -182,15 +183,16 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
public void getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
LatchedActionListener<IndexRoutingTable> latchedActionListener
LatchedActionListener<IndexRoutingTable> latchedActionListener,
Version version
) {

ActionListener<IndexRoutingTable> actionListener = ActionListener.wrap(
latchedActionListener::onResponse,
latchedActionListener::onFailure
);

RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor);
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor, version);

remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener);
}
Expand All @@ -199,14 +201,15 @@ public void getAsyncIndexRoutingReadAction(
public void getAsyncIndexRoutingTableDiffReadAction(
String clusterUUID,
String uploadedFilename,
LatchedActionListener<Diff<RoutingTable>> latchedActionListener
LatchedActionListener<Diff<RoutingTable>> latchedActionListener,
Version version
) {
ActionListener<Diff<RoutingTable>> actionListener = ActionListener.wrap(
latchedActionListener::onResponse,
latchedActionListener::onFailure
);

RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff(uploadedFilename, clusterUUID, compressor);
RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff(uploadedFilename, clusterUUID, compressor, version);
remoteRoutingTableDiffStore.readAsync(remoteRoutingTableDiff, actionListener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.cluster.routing.remote;

import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.routing.IndexRoutingTable;
Expand Down Expand Up @@ -71,7 +72,8 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
public void getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
LatchedActionListener<IndexRoutingTable> latchedActionListener
LatchedActionListener<IndexRoutingTable> latchedActionListener,
Version version
) {
// noop
}
Expand All @@ -80,7 +82,8 @@ public void getAsyncIndexRoutingReadAction(
public void getAsyncIndexRoutingTableDiffReadAction(
String clusterUUID,
String uploadedFilename,
LatchedActionListener<Diff<RoutingTable>> latchedActionListener
LatchedActionListener<Diff<RoutingTable>> latchedActionListener,
Version version
) {
// noop
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.cluster.routing.remote;

import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.routing.IndexRoutingTable;
Expand All @@ -31,13 +32,15 @@ public interface RemoteRoutingTableService extends LifecycleComponent {
void getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
LatchedActionListener<IndexRoutingTable> latchedActionListener
LatchedActionListener<IndexRoutingTable> latchedActionListener,
Version version
);

void getAsyncIndexRoutingTableDiffReadAction(
String clusterUUID,
String uploadedFilename,
LatchedActionListener<Diff<RoutingTable>> latchedActionListener
LatchedActionListener<Diff<RoutingTable>> latchedActionListener,
Version version
);

List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -1260,6 +1261,14 @@ ClusterState readClusterStateInParallel(
AtomicReference<Diff<RoutingTable>> readIndexRoutingTableDiffResults = new AtomicReference<>();
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks));

if (manifest.getOpensearchVersion() != Version.CURRENT) {
logger.info(
"Reading cluster state on version {} from manifest uploaded with version {}",
Version.CURRENT,
manifest.getOpensearchVersion()
);
}

LatchedActionListener<RemoteReadResult> listener = new LatchedActionListener<>(ActionListener.wrap(response -> {
logger.debug("Successfully read cluster state component from remote");
readResults.add(response);
Expand Down Expand Up @@ -1296,7 +1305,8 @@ ClusterState readClusterStateInParallel(
remoteRoutingTableService.getAsyncIndexRoutingReadAction(
clusterUUID,
indexRouting.getUploadedFilename(),
routingTableLatchedActionListener
routingTableLatchedActionListener,
manifest.getOpensearchVersion()
);
}

Expand All @@ -1315,7 +1325,8 @@ ClusterState readClusterStateInParallel(
remoteRoutingTableService.getAsyncIndexRoutingTableDiffReadAction(
clusterUUID,
manifest.getDiffManifest().getIndicesRoutingDiffPath(),
routingTableDiffLatchedActionListener
routingTableDiffLatchedActionListener,
manifest.getOpensearchVersion()
);
}

Expand Down Expand Up @@ -1392,7 +1403,8 @@ ClusterState readClusterStateInParallel(
new RemoteDiscoveryNodes(
manifest.getDiscoveryNodesMetadata().getUploadedFilename(),
clusterUUID,
blobStoreRepository.getCompressor()
blobStoreRepository.getCompressor(),
manifest.getOpensearchVersion()
),
listener
);
Expand All @@ -1404,7 +1416,8 @@ ClusterState readClusterStateInParallel(
new RemoteClusterBlocks(
manifest.getClusterBlocksMetadata().getUploadedFilename(),
clusterUUID,
blobStoreRepository.getCompressor()
blobStoreRepository.getCompressor(),
manifest.getOpensearchVersion()
),
listener
);
Expand All @@ -1416,7 +1429,8 @@ ClusterState readClusterStateInParallel(
new RemoteHashesOfConsistentSettings(
manifest.getHashesOfConsistentSettings().getUploadedFilename(),
clusterUUID,
blobStoreRepository.getCompressor()
blobStoreRepository.getCompressor(),
manifest.getOpensearchVersion()
),
listener
);
Expand All @@ -1431,7 +1445,8 @@ ClusterState readClusterStateInParallel(
entry.getValue().getAttributeName(),
clusterUUID,
blobStoreRepository.getCompressor(),
namedWriteableRegistry
namedWriteableRegistry,
manifest.getOpensearchVersion()
),
listener
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.gateway.remote.model;

import org.opensearch.Version;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity;
Expand All @@ -32,10 +33,7 @@
public class RemoteClusterBlocks extends AbstractClusterMetadataWriteableBlobEntity<ClusterBlocks> {

public static final String CLUSTER_BLOCKS = "blocks";
public static final ChecksumWritableBlobStoreFormat<ClusterBlocks> CLUSTER_BLOCKS_FORMAT = new ChecksumWritableBlobStoreFormat<>(
"blocks",
ClusterBlocks::readFrom
);
public final ChecksumWritableBlobStoreFormat<ClusterBlocks> clusterBlocksFormat;

private ClusterBlocks clusterBlocks;
private long stateVersion;
Expand All @@ -44,11 +42,13 @@ public RemoteClusterBlocks(final ClusterBlocks clusterBlocks, long stateVersion,
super(clusterUUID, compressor, null);
this.clusterBlocks = clusterBlocks;
this.stateVersion = stateVersion;
this.clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>("blocks", ClusterBlocks::readFrom);
}

public RemoteClusterBlocks(final String blobName, final String clusterUUID, final Compressor compressor) {
public RemoteClusterBlocks(final String blobName, final String clusterUUID, final Compressor compressor, final Version version) {
super(clusterUUID, compressor, null);
this.blobName = blobName;
this.clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>("blocks", ClusterBlocks::readFrom, version);
}

@Override
Expand Down Expand Up @@ -83,11 +83,11 @@ public UploadedMetadata getUploadedMetadata() {

@Override
public InputStream serialize() throws IOException {
return CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, generateBlobFileName(), getCompressor()).streamInput();
return this.clusterBlocksFormat.serialize(clusterBlocks, generateBlobFileName(), getCompressor()).streamInput();
}

@Override
public ClusterBlocks deserialize(final InputStream inputStream) throws IOException {
return CLUSTER_BLOCKS_FORMAT.deserialize(blobName, Streams.readFully(inputStream));
return this.clusterBlocksFormat.deserialize(blobName, Streams.readFully(inputStream));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.gateway.remote.model;

import org.opensearch.Version;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterState.Custom;
import org.opensearch.common.io.Streams;
Expand Down Expand Up @@ -65,15 +66,17 @@ public RemoteClusterStateCustoms(
final String customType,
final String clusterUUID,
final Compressor compressor,
final NamedWriteableRegistry namedWriteableRegistry
final NamedWriteableRegistry namedWriteableRegistry,
final Version version
) {
super(clusterUUID, compressor, null);
this.blobName = blobName;
this.customType = customType;
this.namedWriteableRegistry = namedWriteableRegistry;
this.clusterStateCustomsFormat = new ChecksumWritableBlobStoreFormat<>(
"cluster-state-custom",
is -> readFrom(is, namedWriteableRegistry, customType)
is -> readFrom(is, namedWriteableRegistry, customType),
version
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ public RemoteCustomMetadata(
this.blobName = blobName;
this.customType = customType;
this.namedWriteableRegistry = namedWriteableRegistry;
this.customBlobStoreFormat = new ChecksumWritableBlobStoreFormat<>("custom", is -> {
is.setVersion(version);
return readFrom(is, namedWriteableRegistry, customType);
});
this.customBlobStoreFormat = new ChecksumWritableBlobStoreFormat<>(
"custom",
is -> readFrom(is, namedWriteableRegistry, customType),
version
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.gateway.remote.model;

import org.opensearch.Version;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity;
Expand All @@ -32,10 +33,7 @@
public class RemoteDiscoveryNodes extends AbstractClusterMetadataWriteableBlobEntity<DiscoveryNodes> {

public static final String DISCOVERY_NODES = "nodes";
public static final ChecksumWritableBlobStoreFormat<DiscoveryNodes> DISCOVERY_NODES_FORMAT = new ChecksumWritableBlobStoreFormat<>(
"nodes",
is -> DiscoveryNodes.readFrom(is, null)
);
public final ChecksumWritableBlobStoreFormat<DiscoveryNodes> discoveryNodesFormat;

private DiscoveryNodes discoveryNodes;
private long stateVersion;
Expand All @@ -49,11 +47,13 @@ public RemoteDiscoveryNodes(
super(clusterUUID, compressor, null);
this.discoveryNodes = discoveryNodes;
this.stateVersion = stateVersion;
this.discoveryNodesFormat = new ChecksumWritableBlobStoreFormat<>("nodes", is -> DiscoveryNodes.readFrom(is, null));
}

public RemoteDiscoveryNodes(final String blobName, final String clusterUUID, final Compressor compressor) {
public RemoteDiscoveryNodes(final String blobName, final String clusterUUID, final Compressor compressor, final Version version) {
super(clusterUUID, compressor, null);
this.blobName = blobName;
this.discoveryNodesFormat = new ChecksumWritableBlobStoreFormat<>("nodes", is -> DiscoveryNodes.readFrom(is, null), version);
}

@Override
Expand Down Expand Up @@ -88,7 +88,7 @@ public UploadedMetadata getUploadedMetadata() {

@Override
public InputStream serialize() throws IOException {
return DISCOVERY_NODES_FORMAT.serialize(
return discoveryNodesFormat.serialize(
(out, discoveryNode) -> discoveryNode.writeToWithAttribute(out),
discoveryNodes,
generateBlobFileName(),
Expand All @@ -98,6 +98,6 @@ public InputStream serialize() throws IOException {

@Override
public DiscoveryNodes deserialize(final InputStream inputStream) throws IOException {
return DISCOVERY_NODES_FORMAT.deserialize(blobName, Streams.readFully(inputStream));
return discoveryNodesFormat.deserialize(blobName, Streams.readFully(inputStream));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.gateway.remote.model;

import org.opensearch.Version;
import org.opensearch.cluster.metadata.DiffableStringMap;
import org.opensearch.common.io.Streams;
import org.opensearch.common.remote.AbstractClusterMetadataWriteableBlobEntity;
Expand All @@ -30,8 +31,7 @@
*/
public class RemoteHashesOfConsistentSettings extends AbstractClusterMetadataWriteableBlobEntity<DiffableStringMap> {
public static final String HASHES_OF_CONSISTENT_SETTINGS = "hashes-of-consistent-settings";
public static final ChecksumWritableBlobStoreFormat<DiffableStringMap> HASHES_OF_CONSISTENT_SETTINGS_FORMAT =
new ChecksumWritableBlobStoreFormat<>("hashes-of-consistent-settings", DiffableStringMap::readFrom);
public final ChecksumWritableBlobStoreFormat<DiffableStringMap> hashesOfConsistentSettingsFormat;

private DiffableStringMap hashesOfConsistentSettings;
private long metadataVersion;
Expand All @@ -45,11 +45,25 @@ public RemoteHashesOfConsistentSettings(
super(clusterUUID, compressor, null);
this.metadataVersion = metadataVersion;
this.hashesOfConsistentSettings = hashesOfConsistentSettings;
this.hashesOfConsistentSettingsFormat = new ChecksumWritableBlobStoreFormat<>(
"hashes-of-consistent-settings",
DiffableStringMap::readFrom
);
}

public RemoteHashesOfConsistentSettings(final String blobName, final String clusterUUID, final Compressor compressor) {
public RemoteHashesOfConsistentSettings(
final String blobName,
final String clusterUUID,
final Compressor compressor,
final Version version
) {
super(clusterUUID, compressor, null);
this.blobName = blobName;
this.hashesOfConsistentSettingsFormat = new ChecksumWritableBlobStoreFormat<>(
"hashes-of-consistent-settings",
DiffableStringMap::readFrom,
version
);
}

@Override
Expand Down Expand Up @@ -83,12 +97,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() {

@Override
public InputStream serialize() throws IOException {
return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, generateBlobFileName(), getCompressor())
return hashesOfConsistentSettingsFormat.serialize(hashesOfConsistentSettings, generateBlobFileName(), getCompressor())
.streamInput();
}

@Override
public DiffableStringMap deserialize(final InputStream inputStream) throws IOException {
return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.deserialize(blobName, Streams.readFully(inputStream));
return hashesOfConsistentSettingsFormat.deserialize(blobName, Streams.readFully(inputStream));
}
}
Loading
Loading