Skip to content

Commit ba91b0c

Browse files
committed
Reset LiveVersionMap on sync commit (#27534)
Today we carry on the size of the live version map to ensure that we minimze rehashing. Yet, once we are idle or we can issue a sync-commit we can resize it to defaults to free up memory. Relates to #27516
1 parent a0427c8 commit ba91b0c

File tree

5 files changed

+227
-11
lines changed

5 files changed

+227
-11
lines changed

core/src/main/java/org/elasticsearch/index/engine/DeleteVersionValue.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,24 @@ public long ramBytesUsed() {
4444
return BASE_RAM_BYTES_USED;
4545
}
4646

47+
@Override
48+
public boolean equals(Object o) {
49+
if (this == o) return true;
50+
if (o == null || getClass() != o.getClass()) return false;
51+
if (!super.equals(o)) return false;
52+
53+
DeleteVersionValue that = (DeleteVersionValue) o;
54+
55+
return time == that.time;
56+
}
57+
58+
@Override
59+
public int hashCode() {
60+
int result = super.hashCode();
61+
result = 31 * result + (int) (time ^ (time >>> 32));
62+
return result;
63+
}
64+
4765
@Override
4866
public String toString() {
4967
return "DeleteVersionValue{" +

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search
574574
ensureOpen();
575575
SearcherScope scope;
576576
if (get.realtime()) {
577-
VersionValue versionValue = versionMap.getUnderLock(get.uid());
577+
VersionValue versionValue = versionMap.getUnderLock(get.uid().bytes());
578578
if (versionValue != null) {
579579
if (versionValue.isDelete()) {
580580
return GetResult.NOT_EXISTS;
@@ -612,7 +612,7 @@ enum OpVsLuceneDocStatus {
612612
private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {
613613
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";
614614
final OpVsLuceneDocStatus status;
615-
final VersionValue versionValue = versionMap.getUnderLock(op.uid());
615+
final VersionValue versionValue = versionMap.getUnderLock(op.uid().bytes());
616616
assert incrementVersionLookup();
617617
if (versionValue != null) {
618618
if (op.seqNo() > versionValue.seqNo ||
@@ -649,7 +649,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
649649
/** resolves the current version of the document, returning null if not found */
650650
private VersionValue resolveDocVersion(final Operation op) throws IOException {
651651
assert incrementVersionLookup(); // used for asserting in tests
652-
VersionValue versionValue = versionMap.getUnderLock(op.uid());
652+
VersionValue versionValue = versionMap.getUnderLock(op.uid().bytes());
653653
if (versionValue == null) {
654654
assert incrementIndexVersionLookup(); // used for asserting in tests
655655
final long currentVersion = loadCurrentVersionFromIndex(op.uid());
@@ -1062,7 +1062,7 @@ static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted,
10621062
* Asserts that the doc in the index operation really doesn't exist
10631063
*/
10641064
private boolean assertDocDoesNotExist(final Index index, final boolean allowDeleted) throws IOException {
1065-
final VersionValue versionValue = versionMap.getUnderLock(index.uid());
1065+
final VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
10661066
if (versionValue != null) {
10671067
if (versionValue.isDelete() == false || allowDeleted == false) {
10681068
throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")");
@@ -1390,6 +1390,8 @@ public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) thr
13901390
commitIndexWriter(indexWriter, translog, syncId);
13911391
logger.debug("successfully sync committed. sync id [{}].", syncId);
13921392
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
1393+
// we are guaranteed to have no operations in the version map here!
1394+
versionMap.adjustMapSizeUnderLock();
13931395
return SyncedFlushResult.SUCCESS;
13941396
} catch (IOException ex) {
13951397
maybeFailEngine("sync commit", ex);

core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.index.engine;
2121

22-
import org.apache.lucene.index.Term;
2322
import org.apache.lucene.search.ReferenceManager;
2423
import org.apache.lucene.util.Accountable;
2524
import org.apache.lucene.util.BytesRef;
@@ -35,6 +34,18 @@
3534
/** Maps _uid value to its version information. */
3635
class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
3736

37+
/**
38+
* Resets the internal map and adjusts it's capacity as if there were no indexing operations.
39+
* This must be called under write lock in the engine
40+
*/
41+
void adjustMapSizeUnderLock() {
42+
if (maps.current.isEmpty() == false || maps.old.isEmpty() == false) {
43+
assert false : "map must be empty"; // fail hard if not empty and fail with assertion in tests to ensure we never swallow it
44+
throw new IllegalStateException("map must be empty");
45+
}
46+
maps = new Maps();
47+
}
48+
3849
private static class Maps {
3950

4051
// All writes (adds and deletes) go into here:
@@ -50,7 +61,7 @@ private static class Maps {
5061

5162
Maps() {
5263
this(ConcurrentCollections.<BytesRef,VersionValue>newConcurrentMapWithAggressiveConcurrency(),
53-
ConcurrentCollections.<BytesRef,VersionValue>newConcurrentMapWithAggressiveConcurrency());
64+
Collections.emptyMap());
5465
}
5566
}
5667

@@ -121,21 +132,21 @@ public void afterRefresh(boolean didRefresh) throws IOException {
121132
}
122133

123134
/** Returns the live version (add or delete) for this uid. */
124-
VersionValue getUnderLock(final Term uid) {
135+
VersionValue getUnderLock(final BytesRef uid) {
125136
Maps currentMaps = maps;
126137

127138
// First try to get the "live" value:
128-
VersionValue value = currentMaps.current.get(uid.bytes());
139+
VersionValue value = currentMaps.current.get(uid);
129140
if (value != null) {
130141
return value;
131142
}
132143

133-
value = currentMaps.old.get(uid.bytes());
144+
value = currentMaps.old.get(uid);
134145
if (value != null) {
135146
return value;
136147
}
137148

138-
return tombstones.get(uid.bytes());
149+
return tombstones.get(uid);
139150
}
140151

141152
/** Adds this uid/version to the pending adds map. */
@@ -250,4 +261,8 @@ public Collection<Accountable> getChildResources() {
250261
// TODO: useful to break down RAM usage here?
251262
return Collections.emptyList();
252263
}
253-
}
264+
265+
/** Returns the current internal versions as a point in time snapshot*/
266+
Map<BytesRef, VersionValue> getAllCurrent() {
267+
return maps.current;
268+
}}

core/src/main/java/org/elasticsearch/index/engine/VersionValue.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,31 @@ public Collection<Accountable> getChildResources() {
5757
return Collections.emptyList();
5858
}
5959

60+
@Override
61+
public boolean equals(Object o) {
62+
if (this == o) return true;
63+
if (o == null || getClass() != o.getClass()) return false;
64+
65+
VersionValue that = (VersionValue) o;
66+
67+
if (version != that.version) return false;
68+
if (seqNo != that.seqNo) return false;
69+
return term == that.term;
70+
}
71+
72+
@Override
73+
public int hashCode() {
74+
int result = (int) (version ^ (version >>> 32));
75+
result = 31 * result + (int) (seqNo ^ (seqNo >>> 32));
76+
result = 31 * result + (int) (term ^ (term >>> 32));
77+
return result;
78+
}
79+
6080
@Override
6181
public String toString() {
6282
return "VersionValue{" +
6383
"version=" + version +
84+
6485
", seqNo=" + seqNo +
6586
", term=" + term +
6687
'}';

core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,25 @@
1919

2020
package org.elasticsearch.index.engine;
2121

22+
import org.apache.lucene.util.BytesRef;
2223
import org.apache.lucene.util.BytesRefBuilder;
2324
import org.apache.lucene.util.RamUsageTester;
2425
import org.apache.lucene.util.TestUtil;
26+
import org.elasticsearch.Assertions;
2527
import org.elasticsearch.bootstrap.JavaVersion;
28+
import org.elasticsearch.common.lease.Releasable;
29+
import org.elasticsearch.common.util.concurrent.KeyedLock;
2630
import org.elasticsearch.test.ESTestCase;
2731

32+
import java.io.IOException;
33+
import java.util.ArrayList;
34+
import java.util.HashMap;
35+
import java.util.HashSet;
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.concurrent.ConcurrentHashMap;
39+
import java.util.concurrent.CountDownLatch;
40+
2841
public class LiveVersionMapTests extends ESTestCase {
2942

3043
public void testRamBytesUsed() throws Exception {
@@ -57,4 +70,151 @@ public void testRamBytesUsed() throws Exception {
5770
assertEquals(actualRamBytesUsed, estimatedRamBytesUsed, actualRamBytesUsed / 4);
5871
}
5972

73+
private BytesRef uid(String string) {
74+
BytesRefBuilder builder = new BytesRefBuilder();
75+
builder.copyChars(string);
76+
// length of the array must be the same as the len of the ref... there is an assertion in LiveVersionMap#putUnderLock
77+
return BytesRef.deepCopyOf(builder.get());
78+
}
79+
80+
public void testBasics() throws IOException {
81+
LiveVersionMap map = new LiveVersionMap();
82+
map.putUnderLock(uid("test"), new VersionValue(1,1,1));
83+
assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
84+
map.beforeRefresh();
85+
assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
86+
map.afterRefresh(randomBoolean());
87+
assertNull(map.getUnderLock(uid("test")));
88+
89+
90+
map.putUnderLock(uid("test"), new DeleteVersionValue(1,1,1, Long.MAX_VALUE));
91+
assertEquals(new DeleteVersionValue(1,1,1, Long.MAX_VALUE), map.getUnderLock(uid("test")));
92+
map.beforeRefresh();
93+
assertEquals(new DeleteVersionValue(1,1,1, Long.MAX_VALUE), map.getUnderLock(uid("test")));
94+
map.afterRefresh(randomBoolean());
95+
assertEquals(new DeleteVersionValue(1,1,1, Long.MAX_VALUE), map.getUnderLock(uid("test")));
96+
map.removeTombstoneUnderLock(uid("test"));
97+
assertNull(map.getUnderLock(uid("test")));
98+
}
99+
100+
101+
public void testAdjustMapSizeUnderLock() throws IOException {
102+
LiveVersionMap map = new LiveVersionMap();
103+
map.putUnderLock(uid("test"), new VersionValue(1,1,1));
104+
boolean withinRefresh = randomBoolean();
105+
if (withinRefresh) {
106+
map.beforeRefresh();
107+
}
108+
assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
109+
final String msg;
110+
if (Assertions.ENABLED) {
111+
msg = expectThrows(AssertionError.class, map::adjustMapSizeUnderLock).getMessage();
112+
} else {
113+
msg = expectThrows(IllegalStateException.class, map::adjustMapSizeUnderLock).getMessage();
114+
}
115+
assertEquals("map must be empty", msg);
116+
assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test")));
117+
if (withinRefresh == false) {
118+
map.beforeRefresh();
119+
}
120+
map.afterRefresh(randomBoolean());
121+
Map<BytesRef, VersionValue> allCurrent = map.getAllCurrent();
122+
map.adjustMapSizeUnderLock();
123+
assertNotSame(allCurrent, map.getAllCurrent());
124+
}
125+
126+
public void testConcurrently() throws IOException, InterruptedException {
127+
HashSet<BytesRef> keySet = new HashSet<>();
128+
int numKeys = randomIntBetween(50, 200);
129+
for (int i = 0; i < numKeys; i++) {
130+
keySet.add(uid(TestUtil.randomSimpleString(random(), 10, 20)));
131+
}
132+
List<BytesRef> keyList = new ArrayList<>(keySet);
133+
ConcurrentHashMap<BytesRef, VersionValue> values = new ConcurrentHashMap<>();
134+
KeyedLock<BytesRef> keyedLock = new KeyedLock<>();
135+
LiveVersionMap map = new LiveVersionMap();
136+
int numThreads = randomIntBetween(2, 5);
137+
138+
Thread[] threads = new Thread[numThreads];
139+
CountDownLatch startGun = new CountDownLatch(numThreads);
140+
CountDownLatch done = new CountDownLatch(numThreads);
141+
int randomValuesPerThread = randomIntBetween(5000, 20000);
142+
for (int j = 0; j < threads.length; j++) {
143+
threads[j] = new Thread(() -> {
144+
startGun.countDown();
145+
try {
146+
startGun.await();
147+
} catch (InterruptedException e) {
148+
done.countDown();
149+
throw new AssertionError(e);
150+
}
151+
try {
152+
for (int i = 0; i < randomValuesPerThread; ++i) {
153+
BytesRef bytesRef = randomFrom(random(), keyList);
154+
try (Releasable r = keyedLock.acquire(bytesRef)) {
155+
VersionValue versionValue = values.computeIfAbsent(bytesRef,
156+
v -> new VersionValue(randomLong(), randomLong(), randomLong()));
157+
boolean isDelete = versionValue instanceof DeleteVersionValue;
158+
if (isDelete) {
159+
map.removeTombstoneUnderLock(bytesRef);
160+
}
161+
if (isDelete == false && rarely()) {
162+
versionValue = new DeleteVersionValue(versionValue.version + 1, versionValue.seqNo + 1,
163+
versionValue.term, Long.MAX_VALUE);
164+
} else {
165+
versionValue = new VersionValue(versionValue.version + 1, versionValue.seqNo + 1, versionValue.term);
166+
}
167+
values.put(bytesRef, versionValue);
168+
map.putUnderLock(bytesRef, versionValue);
169+
}
170+
}
171+
} finally {
172+
done.countDown();
173+
}
174+
});
175+
threads[j].start();
176+
177+
178+
}
179+
do {
180+
Map<BytesRef, VersionValue> valueMap = new HashMap<>(map.getAllCurrent());
181+
map.beforeRefresh();
182+
valueMap.forEach((k, v) -> {
183+
VersionValue actualValue = map.getUnderLock(k);
184+
assertNotNull(actualValue);
185+
assertTrue(v.version <= actualValue.version);
186+
});
187+
map.afterRefresh(randomBoolean());
188+
valueMap.forEach((k, v) -> {
189+
VersionValue actualValue = map.getUnderLock(k);
190+
if (actualValue != null) {
191+
if (actualValue instanceof DeleteVersionValue) {
192+
assertTrue(v.version <= actualValue.version); // deletes can be the same version
193+
} else {
194+
assertTrue(v.version < actualValue.version);
195+
}
196+
197+
}
198+
});
199+
if (randomBoolean()) {
200+
Thread.yield();
201+
}
202+
} while (done.getCount() != 0);
203+
204+
for (int j = 0; j < threads.length; j++) {
205+
threads[j].join();
206+
}
207+
map.getAllCurrent().forEach((k, v) -> {
208+
VersionValue versionValue = values.get(k);
209+
assertNotNull(versionValue);
210+
assertEquals(v, versionValue);
211+
});
212+
213+
map.getAllTombstones().forEach(e -> {
214+
VersionValue versionValue = values.get(e.getKey());
215+
assertNotNull(versionValue);
216+
assertEquals(e.getValue(), versionValue);
217+
assertTrue(versionValue instanceof DeleteVersionValue);
218+
});
219+
}
60220
}

0 commit comments

Comments
 (0)