-
Notifications
You must be signed in to change notification settings - Fork 25.8k
Optimize version map for append-only indexing #27752
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
46206a1
2f9fdde
cef82c0
c2ade56
5181730
f7efcdb
f331092
5816308
28d8766
c4f97dc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -562,7 +562,7 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search | |
| ensureOpen(); | ||
| SearcherScope scope; | ||
| if (get.realtime()) { | ||
| VersionValue versionValue = versionMap.getUnderLock(get.uid().bytes()); | ||
| VersionValue versionValue = getVersionFromMap(get.uid().bytes()); | ||
| if (versionValue != null) { | ||
| if (versionValue.isDelete()) { | ||
| return GetResult.NOT_EXISTS; | ||
|
|
@@ -600,7 +600,7 @@ enum OpVsLuceneDocStatus { | |
| private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { | ||
| assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; | ||
| final OpVsLuceneDocStatus status; | ||
| final VersionValue versionValue = versionMap.getUnderLock(op.uid().bytes()); | ||
| VersionValue versionValue = getVersionFromMap(op.uid().bytes()); | ||
| assert incrementVersionLookup(); | ||
| if (versionValue != null) { | ||
| if (op.seqNo() > versionValue.seqNo || | ||
|
|
@@ -637,7 +637,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) | |
| /** resolves the current version of the document, returning null if not found */ | ||
| private VersionValue resolveDocVersion(final Operation op) throws IOException { | ||
| assert incrementVersionLookup(); // used for asserting in tests | ||
| VersionValue versionValue = versionMap.getUnderLock(op.uid().bytes()); | ||
| VersionValue versionValue = getVersionFromMap(op.uid().bytes()); | ||
| if (versionValue == null) { | ||
| assert incrementIndexVersionLookup(); // used for asserting in tests | ||
| final long currentVersion = loadCurrentVersionFromIndex(op.uid()); | ||
|
|
@@ -651,6 +651,21 @@ private VersionValue resolveDocVersion(final Operation op) throws IOException { | |
| return versionValue; | ||
| } | ||
|
|
||
| private VersionValue getVersionFromMap(BytesRef id) { | ||
| if (versionMap.isUnsafe()) { | ||
| synchronized (versionMap) { | ||
| // we are switching from an unsafe map to a safe map. This might happen concurrently | ||
| // but we only need to do this once since the last operation per ID is to add to the version | ||
| // map so once we pass this point we can safely lookup from the version map. | ||
| if (versionMap.isUnsafe()) { | ||
| refresh("unsafe_version_map", SearcherScope.INTERNAL); | ||
| } | ||
| versionMap.enforceSafeAccess(); | ||
| } | ||
| } | ||
| return versionMap.getUnderLock(id); | ||
| } | ||
|
|
||
| private boolean canOptimizeAddDocument(Index index) { | ||
| if (index.getAutoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) { | ||
| assert index.getAutoGeneratedIdTimestamp() >= 0 : "autoGeneratedIdTimestamp must be positive but was: " | ||
|
|
@@ -812,6 +827,7 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio | |
| assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]"; | ||
| plan = IndexingStrategy.optimizedAppendOnly(index.seqNo()); | ||
| } else { | ||
| versionMap.enforceSafeAccess(); | ||
| // drop out of order operations | ||
| assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() : | ||
| "resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]"; | ||
|
|
@@ -849,10 +865,12 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { | |
| if (canOptimizeAddDocument(index)) { | ||
| if (mayHaveBeenIndexedBefore(index)) { | ||
| plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L); | ||
| versionMap.enforceSafeAccess(); | ||
| } else { | ||
| plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index)); | ||
| } | ||
| } else { | ||
| versionMap.enforceSafeAccess(); | ||
| // resolves incoming version | ||
| final VersionValue versionValue = resolveDocVersion(index); | ||
| final long currentVersion; | ||
|
|
@@ -898,7 +916,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) | |
| assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); | ||
| index(index.docs(), indexWriter); | ||
| } | ||
| versionMap.putUnderLock(index.uid().bytes(), | ||
| versionMap.maybePutUnderLock(index.uid().bytes(), | ||
| new VersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm())); | ||
| return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); | ||
| } catch (Exception ex) { | ||
|
|
@@ -1018,7 +1036,8 @@ static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, | |
| * Asserts that the doc in the index operation really doesn't exist | ||
| */ | ||
| private boolean assertDocDoesNotExist(final Index index, final boolean allowDeleted) throws IOException { | ||
| final VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes()); | ||
| final VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes()); // this uses direct access to the version map - | ||
| // no refresh needed here | ||
|
||
| if (versionValue != null) { | ||
| if (versionValue.isDelete() == false || allowDeleted == false) { | ||
| throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")"); | ||
|
|
@@ -1044,6 +1063,7 @@ private static void update(final Term uid, final List<ParseContext.Document> doc | |
|
|
||
| @Override | ||
| public DeleteResult delete(Delete delete) throws IOException { | ||
| versionMap.enforceSafeAccess(); | ||
| assert Objects.equals(delete.uid().field(), uidField) : delete.uid().field(); | ||
| assert assertVersionType(delete); | ||
| assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo()); | ||
|
|
@@ -2114,6 +2134,15 @@ private boolean incrementIndexVersionLookup() { | |
| return true; | ||
| } | ||
|
|
||
| int getVersionMapSize() { | ||
| return versionMap.getAllCurrent().size(); | ||
| } | ||
|
|
||
| boolean isSafeAccessRequired() { | ||
| return versionMap.isSafeAccessRequired(); | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Returns <code>true</code> iff the index writer has any deletions either buffered in memory or | ||
| * in the index. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,10 +29,11 @@ | |
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| /** Maps _uid value to its version information. */ | ||
| class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { | ||
| final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { | ||
|
|
||
| /** | ||
| * Resets the internal map and adjusts it's capacity as if there were no indexing operations. | ||
|
|
@@ -46,22 +47,83 @@ void adjustMapSizeUnderLock() { | |
| maps = new Maps(); | ||
| } | ||
|
|
||
| private static final class VersionLookup { | ||
|
|
||
| private static final VersionLookup EMPTY = new VersionLookup(Collections.emptyMap()); | ||
| private final Map<BytesRef,VersionValue> map; | ||
|
|
||
| // each version map has a notion of safe / unsafe which allows us to apply certain optimization in the auto-generated ID usecase | ||
| // where we know that documents can't have any duplicates so we can skip the version map entirely. This reduces | ||
| // the memory pressure significantly for this use-case where we often get a massive amount of small document (metrics). | ||
| // if the version map is in safeAccess mode we track all version in the version map. yet if a document comes in that needs | ||
| // safe access but we are not in this mode we force a refresh and make the map as safe access required. All subsequent ops will | ||
| // respect that and fill the version map. The nice part here is that we are only really requiring this for a single ID and since | ||
| // we hold the ID lock in the engine while we do all this it's safe to do it globally unlocked. | ||
| // NOTE: these values can both be non-volatile since it's ok to read a stale value per doc ID. We serialize changes in the engine | ||
| // that will prevent concurrent updates to the same document ID and therefore we can rely on the happens-before guanratee of the | ||
| // map reference itself. | ||
| private boolean unsafe; | ||
| boolean safeAccessRequested = false; | ||
|
||
|
|
||
| private VersionLookup(Map<BytesRef, VersionValue> map) { | ||
| this.map = map; | ||
| } | ||
|
|
||
| VersionValue get(BytesRef key) { | ||
| return map.get(key); | ||
| } | ||
|
|
||
| VersionValue put(BytesRef key, VersionValue value) { | ||
| return map.put(key, value); | ||
| } | ||
|
|
||
| boolean isEmpty() { | ||
| return map.isEmpty(); | ||
| } | ||
|
|
||
|
|
||
| int size() { | ||
| return map.size(); | ||
| } | ||
|
|
||
| boolean isUnsafe() { | ||
| return unsafe; | ||
| } | ||
|
|
||
| void markAsUnsafe() { | ||
| unsafe = true; | ||
| } | ||
| } | ||
|
|
||
| private static class Maps { | ||
|
|
||
| // All writes (adds and deletes) go into here: | ||
| final Map<BytesRef,VersionValue> current; | ||
| final VersionLookup current; | ||
|
|
||
| // Used while refresh is running, and to hold adds/deletes until refresh finishes. We read from both current and old on lookup: | ||
| final Map<BytesRef,VersionValue> old; | ||
| final VersionLookup old; | ||
|
|
||
| boolean needsSafeAccess; | ||
| final boolean previousMapsNeededSafeAccess; | ||
|
|
||
| Maps(Map<BytesRef,VersionValue> current, Map<BytesRef,VersionValue> old) { | ||
| this.current = current; | ||
| this.old = old; | ||
| Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess) { | ||
| this.current = current; | ||
| this.old = old; | ||
| this.previousMapsNeededSafeAccess = previousMapsNeededSafeAccess; | ||
| } | ||
|
|
||
| Maps() { | ||
| this(ConcurrentCollections.<BytesRef,VersionValue>newConcurrentMapWithAggressiveConcurrency(), | ||
| Collections.emptyMap()); | ||
| this(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()), VersionLookup.EMPTY, false); | ||
| } | ||
|
|
||
| boolean isSafeAccessMode() { | ||
| return needsSafeAccess || previousMapsNeededSafeAccess; | ||
| } | ||
|
|
||
| boolean shouldInheritSafeAccess() { | ||
| return needsSafeAccess | ||
| // previous map was empty and not unsafe but the map before needed it so we maintain it | ||
|
||
| || (current.size() == 0 && current.isUnsafe() == false && previousMapsNeededSafeAccess); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -113,8 +175,8 @@ public void beforeRefresh() throws IOException { | |
| // map. While reopen is running, any lookup will first | ||
| // try this new map, then fallback to old, then to the | ||
| // current searcher: | ||
| maps = new Maps(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(maps.current.size()), maps.current); | ||
|
|
||
| maps = new Maps(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(maps.current.size())), | ||
| maps.current, maps.shouldInheritSafeAccess()); | ||
| // This is not 100% correct, since concurrent indexing ops can change these counters in between our execution of the previous | ||
| // line and this one, but that should be minor, and the error won't accumulate over time: | ||
| ramBytesUsedCurrent.set(0); | ||
|
|
@@ -128,13 +190,13 @@ public void afterRefresh(boolean didRefresh) throws IOException { | |
| // case. This is because we assign new maps (in beforeRefresh) slightly before Lucene actually flushes any segments for the | ||
| // reopen, and so any concurrent indexing requests can still sneak in a few additions to that current map that are in fact reflected | ||
| // in the previous reader. We don't touch tombstones here: they expire on their own index.gc_deletes timeframe: | ||
| maps = new Maps(maps.current, Collections.emptyMap()); | ||
|
|
||
| maps = new Maps(maps.current, VersionLookup.EMPTY, maps.previousMapsNeededSafeAccess); | ||
| } | ||
|
|
||
| /** Returns the live version (add or delete) for this uid. */ | ||
| VersionValue getUnderLock(final BytesRef uid) { | ||
| Maps currentMaps = maps; | ||
|
|
||
| // First try to get the "live" value: | ||
| VersionValue value = currentMaps.current.get(uid); | ||
| if (value != null) { | ||
|
|
@@ -149,11 +211,38 @@ VersionValue getUnderLock(final BytesRef uid) { | |
| return tombstones.get(uid); | ||
| } | ||
|
|
||
| boolean isUnsafe() { | ||
| return maps.current.isUnsafe() || maps.old.isUnsafe(); | ||
| } | ||
|
|
||
| void enforceSafeAccess() { | ||
| maps.needsSafeAccess = true; | ||
| } | ||
|
|
||
| boolean isSafeAccessRequired() { | ||
| return maps.isSafeAccessMode(); | ||
| } | ||
|
|
||
| /** Adds this uid/version to the pending adds map iff the map needs safe access. */ | ||
| void maybePutUnderLock(BytesRef uid, VersionValue version) { | ||
| Maps maps = this.maps; | ||
| if (maps.isSafeAccessMode()) { | ||
| putUnderLock(uid, version, maps); | ||
| } else { | ||
| maps.current.markAsUnsafe(); | ||
| } | ||
| } | ||
|
|
||
| /** Adds this uid/version to the pending adds map. */ | ||
| void putUnderLock(BytesRef uid, VersionValue version) { | ||
| Maps maps = this.maps; | ||
| putUnderLock(uid, version, maps); | ||
| } | ||
|
|
||
| /** Adds this uid/version to the pending adds map. */ | ||
| private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) { | ||
| assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length; | ||
| long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; | ||
|
|
||
| final VersionValue prev = maps.current.put(uid, version); | ||
| if (prev != null) { | ||
| // Deduct RAM for the version we just replaced: | ||
|
|
@@ -264,5 +353,5 @@ public Collection<Accountable> getChildResources() { | |
|
|
||
| /** Returns the current internal versions as a point in time snapshot*/ | ||
| Map<BytesRef, VersionValue> getAllCurrent() { | ||
| return maps.current; | ||
| return maps.current.map; | ||
| }} | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just an idea - shall we fold this into the IndexingStrategy? this means that we can set this flag under the static methods such as
IndexingStrategy.overrideExistingAsIfNotThere. I think it will be easier to follow the logic.