Skip to content

Commit 2241673

Browse files
DaveCTurnerprobakowski
authored andcommitted
Use static empty store files metadata (elastic#84034)
In a large cluster we expect most nodes not to have a copy of most shards, but today during replica shard allocation we create a new (and nontrivial) object for each node that has no copy of a shard. With this commit we check at deserialization time whether the response is empty and, if so, avoid the unnecessary instantiation. Relates elastic#77466
1 parent 1fa4427 commit 2241673

File tree

8 files changed

+96
-66
lines changed

8 files changed

+96
-66
lines changed

docs/changelog/84034.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 84034
2+
summary: Use static empty store files metadata
3+
area: Allocation
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
5353
import org.elasticsearch.common.settings.Setting;
5454
import org.elasticsearch.common.settings.Setting.Property;
55+
import org.elasticsearch.common.util.Maps;
5556
import org.elasticsearch.core.AbstractRefCounted;
5657
import org.elasticsearch.core.Nullable;
5758
import org.elasticsearch.core.RefCounted;
@@ -759,25 +760,17 @@ public String toString() {
759760
*/
760761
public static final class MetadataSnapshot implements Iterable<StoreFileMetadata>, Writeable {
761762
private final Map<String, StoreFileMetadata> metadata;
762-
763-
public static final MetadataSnapshot EMPTY = new MetadataSnapshot();
764-
765763
private final Map<String, String> commitUserData;
766-
767764
private final long numDocs;
768765

766+
public static final MetadataSnapshot EMPTY = new MetadataSnapshot(emptyMap(), emptyMap(), 0L);
767+
769768
public MetadataSnapshot(Map<String, StoreFileMetadata> metadata, Map<String, String> commitUserData, long numDocs) {
770769
this.metadata = metadata;
771770
this.commitUserData = commitUserData;
772771
this.numDocs = numDocs;
773772
}
774773

775-
MetadataSnapshot() {
776-
metadata = emptyMap();
777-
commitUserData = emptyMap();
778-
numDocs = 0;
779-
}
780-
781774
MetadataSnapshot(IndexCommit commit, Directory directory, Logger logger) throws IOException {
782775
LoadedMetadata loadedMetadata = loadMetadata(commit, directory, logger);
783776
metadata = loadedMetadata.fileMetadata;
@@ -786,26 +779,21 @@ public MetadataSnapshot(Map<String, StoreFileMetadata> metadata, Map<String, Str
786779
assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles();
787780
}
788781

789-
/**
790-
* Read from a stream.
791-
*/
792-
public MetadataSnapshot(StreamInput in) throws IOException {
793-
final int size = in.readVInt();
794-
Map<String, StoreFileMetadata> metadata = new HashMap<>();
795-
for (int i = 0; i < size; i++) {
796-
StoreFileMetadata meta = new StoreFileMetadata(in);
797-
metadata.put(meta.name(), meta);
798-
}
799-
Map<String, String> commitUserData = new HashMap<>();
800-
int num = in.readVInt();
801-
for (int i = num; i > 0; i--) {
802-
commitUserData.put(in.readString(), in.readString());
782+
public static MetadataSnapshot readFrom(StreamInput in) throws IOException {
783+
final int metadataSize = in.readVInt();
784+
final Map<String, StoreFileMetadata> metadata = metadataSize == 0 ? emptyMap() : Maps.newMapWithExpectedSize(metadataSize);
785+
for (int i = 0; i < metadataSize; i++) {
786+
final var storeFileMetadata = new StoreFileMetadata(in);
787+
metadata.put(storeFileMetadata.name(), storeFileMetadata);
803788
}
789+
final var commitUserData = in.readMap(StreamInput::readString, StreamInput::readString);
790+
final var numDocs = in.readLong();
804791

805-
this.metadata = unmodifiableMap(metadata);
806-
this.commitUserData = unmodifiableMap(commitUserData);
807-
this.numDocs = in.readLong();
808-
assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles();
792+
if (metadataSize == 0 && commitUserData.size() == 0 && numDocs == 0) {
793+
return MetadataSnapshot.EMPTY;
794+
} else {
795+
return new MetadataSnapshot(metadata, commitUserData, numDocs);
796+
}
809797
}
810798

811799
@Override

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryCleanFilesRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public RecoveryCleanFilesRequest(
4343
super(in);
4444
recoveryId = in.readLong();
4545
shardId = new ShardId(in);
46-
snapshotFiles = new Store.MetadataSnapshot(in);
46+
snapshotFiles = Store.MetadataSnapshot.readFrom(in);
4747
totalTranslogOps = in.readVInt();
4848
globalCheckpoint = in.readZLong();
4949
}

server/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public StartRecoveryRequest(StreamInput in) throws IOException {
4040
targetAllocationId = in.readString();
4141
sourceNode = new DiscoveryNode(in);
4242
targetNode = new DiscoveryNode(in);
43-
metadataSnapshot = new Store.MetadataSnapshot(in);
43+
metadataSnapshot = Store.MetadataSnapshot.readFrom(in);
4444
primaryRelocation = in.readBoolean();
4545
startingSeqNo = in.readLong();
4646
if (in.getVersion().onOrAfter(RecoverySettings.SNAPSHOT_FILE_DOWNLOAD_THROTTLING_SUPPORTED_VERSION)) {

server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.apache.logging.log4j.message.ParameterizedMessage;
1212
import org.elasticsearch.ElasticsearchException;
13+
import org.elasticsearch.Version;
1314
import org.elasticsearch.action.ActionType;
1415
import org.elasticsearch.action.FailedNodeException;
1516
import org.elasticsearch.action.support.ActionFilters;
@@ -45,12 +46,13 @@
4546
import org.elasticsearch.transport.TransportService;
4647

4748
import java.io.IOException;
48-
import java.util.Collections;
4949
import java.util.Iterator;
5050
import java.util.List;
5151
import java.util.Objects;
5252
import java.util.concurrent.TimeUnit;
5353

54+
import static java.util.Collections.emptyList;
55+
5456
public class TransportNodesListShardStoreMetadata extends TransportNodesAction<
5557
TransportNodesListShardStoreMetadata.Request,
5658
TransportNodesListShardStoreMetadata.NodesStoreFilesMetadata,
@@ -132,18 +134,17 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep
132134
if (indexShard != null) {
133135
try {
134136
final StoreFilesMetadata storeFilesMetadata = new StoreFilesMetadata(
135-
shardId,
136137
indexShard.snapshotStoreMetadata(),
137138
indexShard.getPeerRecoveryRetentionLeases()
138139
);
139140
exists = true;
140141
return storeFilesMetadata;
141142
} catch (org.apache.lucene.index.IndexNotFoundException e) {
142143
logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e);
143-
return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
144+
return StoreFilesMetadata.EMPTY;
144145
} catch (IOException e) {
145146
logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e);
146-
return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
147+
return StoreFilesMetadata.EMPTY;
147148
}
148149
}
149150
}
@@ -166,7 +167,7 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep
166167
}
167168
final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath);
168169
if (shardPath == null) {
169-
return new StoreFilesMetadata(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
170+
return StoreFilesMetadata.EMPTY;
170171
}
171172
// note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means:
172173
// 1) a shard is being constructed, which means the master will not use a copy of this replica
@@ -180,7 +181,7 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep
180181
);
181182
// We use peer recovery retention leases from the primary for allocating replicas. We should always have retention leases when
182183
// we refresh shard info after the primary has started. Hence, we can ignore retention leases if there is no active shard.
183-
return new StoreFilesMetadata(shardId, metadataSnapshot, Collections.emptyList());
184+
return new StoreFilesMetadata(metadataSnapshot, emptyList());
184185
} finally {
185186
TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS);
186187
if (exists) {
@@ -192,37 +193,43 @@ private StoreFilesMetadata listStoreMetadata(NodeRequest request) throws IOExcep
192193
}
193194

194195
public static class StoreFilesMetadata implements Iterable<StoreFileMetadata>, Writeable {
195-
private final ShardId shardId;
196196
private final Store.MetadataSnapshot metadataSnapshot;
197197
private final List<RetentionLease> peerRecoveryRetentionLeases;
198198

199-
public StoreFilesMetadata(
200-
ShardId shardId,
201-
Store.MetadataSnapshot metadataSnapshot,
202-
List<RetentionLease> peerRecoveryRetentionLeases
203-
) {
204-
this.shardId = shardId;
199+
private static final ShardId FAKE_SHARD_ID = new ShardId("_na_", "_na_", 0);
200+
public static final StoreFilesMetadata EMPTY = new StoreFilesMetadata(Store.MetadataSnapshot.EMPTY, emptyList());
201+
202+
public StoreFilesMetadata(Store.MetadataSnapshot metadataSnapshot, List<RetentionLease> peerRecoveryRetentionLeases) {
205203
this.metadataSnapshot = metadataSnapshot;
206204
this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases;
207205
}
208206

209-
public StoreFilesMetadata(StreamInput in) throws IOException {
210-
this.shardId = new ShardId(in);
211-
this.metadataSnapshot = new Store.MetadataSnapshot(in);
212-
this.peerRecoveryRetentionLeases = in.readList(RetentionLease::new);
207+
public static StoreFilesMetadata readFrom(StreamInput in) throws IOException {
208+
if (in.getVersion().before(Version.V_8_2_0)) {
209+
new ShardId(in);
210+
}
211+
final var metadataSnapshot = Store.MetadataSnapshot.readFrom(in);
212+
final var peerRecoveryRetentionLeases = in.readList(RetentionLease::new);
213+
if (metadataSnapshot == Store.MetadataSnapshot.EMPTY && peerRecoveryRetentionLeases.isEmpty()) {
214+
return EMPTY;
215+
} else {
216+
return new StoreFilesMetadata(metadataSnapshot, peerRecoveryRetentionLeases);
217+
}
213218
}
214219

215220
@Override
216221
public void writeTo(StreamOutput out) throws IOException {
217-
shardId.writeTo(out);
222+
if (out.getVersion().before(Version.V_8_2_0)) {
223+
// no compatible version cares about the shard ID, we can just make one up
224+
FAKE_SHARD_ID.writeTo(out);
225+
226+
// NB only checked this for versions back to 7.17.0, we are assuming that we don't use this with earlier versions:
227+
assert out.getVersion().onOrAfter(Version.V_7_17_0) : out.getVersion();
228+
}
218229
metadataSnapshot.writeTo(out);
219230
out.writeList(peerRecoveryRetentionLeases);
220231
}
221232

222-
public ShardId shardId() {
223-
return this.shardId;
224-
}
225-
226233
public boolean isEmpty() {
227234
return metadataSnapshot.size() == 0;
228235
}
@@ -267,8 +274,6 @@ public String syncId() {
267274
@Override
268275
public String toString() {
269276
return "StoreFilesMetadata{"
270-
+ ", shardId="
271-
+ shardId
272277
+ ", metadataSnapshot{size="
273278
+ metadataSnapshot.size()
274279
+ ", syncId="
@@ -385,7 +390,7 @@ public static class NodeStoreFilesMetadata extends BaseNodeResponse {
385390

386391
public NodeStoreFilesMetadata(StreamInput in, DiscoveryNode node) throws IOException {
387392
super(in, node);
388-
storeFilesMetadata = new StoreFilesMetadata(in);
393+
storeFilesMetadata = StoreFilesMetadata.readFrom(in);
389394
}
390395

391396
public NodeStoreFilesMetadata(DiscoveryNode node, StoreFilesMetadata storeFilesMetadata) {

server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,6 @@ TestAllocator addData(
684684
data.put(
685685
node,
686686
new TransportNodesListShardStoreMetadata.StoreFilesMetadata(
687-
shardId,
688687
new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()),
689688
peerRecoveryRetentionLeases
690689
)

server/src/test/java/org/elasticsearch/index/store/StoreTests.java

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,10 @@
8080
import java.util.concurrent.atomic.AtomicInteger;
8181
import java.util.function.LongUnaryOperator;
8282

83+
import static java.util.Collections.emptyList;
84+
import static java.util.Collections.emptyMap;
8385
import static java.util.Collections.unmodifiableMap;
86+
import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion;
8487
import static org.elasticsearch.test.VersionUtils.randomVersion;
8588
import static org.hamcrest.Matchers.anyOf;
8689
import static org.hamcrest.Matchers.containsString;
@@ -93,6 +96,7 @@
9396
import static org.hamcrest.Matchers.is;
9497
import static org.hamcrest.Matchers.not;
9598
import static org.hamcrest.Matchers.notNullValue;
99+
import static org.hamcrest.Matchers.sameInstance;
96100

97101
public class StoreTests extends ESTestCase {
98102

@@ -918,7 +922,7 @@ public void testMetadataSnapshotStreaming() throws Exception {
918922
ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
919923
InputStreamStreamInput in = new InputStreamStreamInput(inBuffer);
920924
in.setVersion(targetNodeVersion);
921-
Store.MetadataSnapshot inMetadataSnapshot = new Store.MetadataSnapshot(in);
925+
Store.MetadataSnapshot inMetadataSnapshot = Store.MetadataSnapshot.readFrom(in);
922926
Map<String, StoreFileMetadata> origEntries = new HashMap<>();
923927
origEntries.putAll(outMetadataSnapshot.asMap());
924928
for (Map.Entry<String, StoreFileMetadata> entry : inMetadataSnapshot.asMap().entrySet()) {
@@ -928,6 +932,21 @@ public void testMetadataSnapshotStreaming() throws Exception {
928932
assertThat(inMetadataSnapshot.getCommitUserData(), equalTo(outMetadataSnapshot.getCommitUserData()));
929933
}
930934

935+
public void testEmptyMetadataSnapshotStreaming() throws Exception {
936+
var outMetadataSnapshot = randomBoolean() ? Store.MetadataSnapshot.EMPTY : new Store.MetadataSnapshot(emptyMap(), emptyMap(), 0L);
937+
var targetNodeVersion = randomCompatibleVersion(random(), org.elasticsearch.Version.CURRENT);
938+
939+
var outBuffer = new ByteArrayOutputStream();
940+
var out = new OutputStreamStreamOutput(outBuffer);
941+
out.setVersion(targetNodeVersion);
942+
outMetadataSnapshot.writeTo(out);
943+
944+
var inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
945+
var in = new InputStreamStreamInput(inBuffer);
946+
in.setVersion(targetNodeVersion);
947+
assertThat(Store.MetadataSnapshot.readFrom(in), sameInstance(Store.MetadataSnapshot.EMPTY));
948+
}
949+
931950
protected Store.MetadataSnapshot createMetadataSnapshot() {
932951
StoreFileMetadata storeFileMetadata1 = new StoreFileMetadata("segments", 1, "666", MIN_SUPPORTED_LUCENE_VERSION.toString());
933952
StoreFileMetadata storeFileMetadata2 = new StoreFileMetadata("no_segments", 1, "666", MIN_SUPPORTED_LUCENE_VERSION.toString());
@@ -978,21 +997,16 @@ public void testStreamStoreFilesMetadata() throws Exception {
978997
);
979998
}
980999
TransportNodesListShardStoreMetadata.StoreFilesMetadata outStoreFileMetadata =
981-
new TransportNodesListShardStoreMetadata.StoreFilesMetadata(
982-
new ShardId("test", "_na_", 0),
983-
metadataSnapshot,
984-
peerRecoveryRetentionLeases
985-
);
1000+
new TransportNodesListShardStoreMetadata.StoreFilesMetadata(metadataSnapshot, peerRecoveryRetentionLeases);
9861001
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
9871002
OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer);
988-
org.elasticsearch.Version targetNodeVersion = randomVersion(random());
1003+
org.elasticsearch.Version targetNodeVersion = randomCompatibleVersion(random(), org.elasticsearch.Version.CURRENT);
9891004
out.setVersion(targetNodeVersion);
9901005
outStoreFileMetadata.writeTo(out);
9911006
ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
9921007
InputStreamStreamInput in = new InputStreamStreamInput(inBuffer);
9931008
in.setVersion(targetNodeVersion);
994-
TransportNodesListShardStoreMetadata.StoreFilesMetadata inStoreFileMetadata =
995-
new TransportNodesListShardStoreMetadata.StoreFilesMetadata(in);
1009+
var inStoreFileMetadata = TransportNodesListShardStoreMetadata.StoreFilesMetadata.readFrom(in);
9961010
Iterator<StoreFileMetadata> outFiles = outStoreFileMetadata.iterator();
9971011
for (StoreFileMetadata inFile : inStoreFileMetadata) {
9981012
assertThat(inFile.name(), equalTo(outFiles.next().name()));
@@ -1001,6 +1015,25 @@ public void testStreamStoreFilesMetadata() throws Exception {
10011015
assertThat(outStoreFileMetadata.peerRecoveryRetentionLeases(), equalTo(peerRecoveryRetentionLeases));
10021016
}
10031017

1018+
public void testStreamEmptyStoreFilesMetadata() throws Exception {
1019+
var outStoreFileMetadata = randomBoolean()
1020+
? TransportNodesListShardStoreMetadata.StoreFilesMetadata.EMPTY
1021+
: new TransportNodesListShardStoreMetadata.StoreFilesMetadata(Store.MetadataSnapshot.EMPTY, emptyList());
1022+
var outBuffer = new ByteArrayOutputStream();
1023+
var out = new OutputStreamStreamOutput(outBuffer);
1024+
var targetNodeVersion = randomCompatibleVersion(random(), org.elasticsearch.Version.CURRENT);
1025+
out.setVersion(targetNodeVersion);
1026+
outStoreFileMetadata.writeTo(out);
1027+
1028+
var inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
1029+
var in = new InputStreamStreamInput(inBuffer);
1030+
in.setVersion(targetNodeVersion);
1031+
assertThat(
1032+
TransportNodesListShardStoreMetadata.StoreFilesMetadata.readFrom(in),
1033+
sameInstance(TransportNodesListShardStoreMetadata.StoreFilesMetadata.EMPTY)
1034+
);
1035+
}
1036+
10041037
public void testMarkCorruptedOnTruncatedSegmentsFile() throws IOException {
10051038
IndexWriterConfig iwc = newIndexWriterConfig();
10061039
final ShardId shardId = new ShardId("index", "_na_", 1);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public static class PutCcrRestoreSessionResponse extends ActionResponse {
114114
PutCcrRestoreSessionResponse(StreamInput in) throws IOException {
115115
super(in);
116116
node = new DiscoveryNode(in);
117-
storeFileMetadata = new Store.MetadataSnapshot(in);
117+
storeFileMetadata = Store.MetadataSnapshot.readFrom(in);
118118
mappingVersion = in.readVLong();
119119
}
120120

0 commit comments

Comments
 (0)