Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search
ensureOpen();
SearcherScope scope;
if (get.realtime()) {
VersionValue versionValue = versionMap.getUnderLock(get.uid().bytes());
VersionValue versionValue = getVersionFromMap(get.uid().bytes());
if (versionValue != null) {
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
Expand Down Expand Up @@ -600,7 +600,7 @@ enum OpVsLuceneDocStatus {
private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";
final OpVsLuceneDocStatus status;
final VersionValue versionValue = versionMap.getUnderLock(op.uid().bytes());
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
assert incrementVersionLookup();
if (versionValue != null) {
if (op.seqNo() > versionValue.seqNo ||
Expand Down Expand Up @@ -637,7 +637,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
/** resolves the current version of the document, returning null if not found */
private VersionValue resolveDocVersion(final Operation op) throws IOException {
assert incrementVersionLookup(); // used for asserting in tests
VersionValue versionValue = versionMap.getUnderLock(op.uid().bytes());
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
if (versionValue == null) {
assert incrementIndexVersionLookup(); // used for asserting in tests
final long currentVersion = loadCurrentVersionFromIndex(op.uid());
Expand All @@ -651,6 +651,21 @@ private VersionValue resolveDocVersion(final Operation op) throws IOException {
return versionValue;
}

private VersionValue getVersionFromMap(BytesRef id) {
if (versionMap.isUnsafe()) {
synchronized (versionMap) {
// we are switching from an unsafe map to a safe map. This might happen concurrently
// but we only need to do this once since the last operation per ID is to add to the version
// map so once we pass this point we can safely lookup from the version map.
if (versionMap.isUnsafe()) {
refresh("unsafe_version_map", SearcherScope.INTERNAL);
}
versionMap.enforceSafeAccess();
}
}
return versionMap.getUnderLock(id);
}

private boolean canOptimizeAddDocument(Index index) {
if (index.getAutoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
assert index.getAutoGeneratedIdTimestamp() >= 0 : "autoGeneratedIdTimestamp must be positive but was: "
Expand Down Expand Up @@ -812,6 +827,7 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
} else {
versionMap.enforceSafeAccess();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just an idea - shall we fold this into the IndexingStrategy? this means that we can set this flag under the static methods such as IndexingStrategy.overrideExistingAsIfNotThere. I think it will be easier to follow the logic.

// drop out of order operations
assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
"resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]";
Expand Down Expand Up @@ -849,10 +865,12 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
if (canOptimizeAddDocument(index)) {
if (mayHaveBeenIndexedBefore(index)) {
plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L);
versionMap.enforceSafeAccess();
} else {
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index));
}
} else {
versionMap.enforceSafeAccess();
// resolves incoming version
final VersionValue versionValue = resolveDocVersion(index);
final long currentVersion;
Expand Down Expand Up @@ -898,7 +916,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
index(index.docs(), indexWriter);
}
versionMap.putUnderLock(index.uid().bytes(),
versionMap.maybePutUnderLock(index.uid().bytes(),
new VersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm()));
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
} catch (Exception ex) {
Expand Down Expand Up @@ -1018,7 +1036,9 @@ static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted,
* Asserts that the doc in the index operation really doesn't exist
*/
private boolean assertDocDoesNotExist(final Index index, final boolean allowDeleted) throws IOException {
final VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
// NOTE this uses direct access to the version map since we are in the assertion code where we maintain a secondary
// map in the version map such that we don't need to refresh if we are unsafe;
final VersionValue versionValue = versionMap.getVersionForAssert(index.uid().bytes());
if (versionValue != null) {
if (versionValue.isDelete() == false || allowDeleted == false) {
throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")");
Expand All @@ -1044,6 +1064,7 @@ private static void update(final Term uid, final List<ParseContext.Document> doc

@Override
public DeleteResult delete(Delete delete) throws IOException {
versionMap.enforceSafeAccess();
assert Objects.equals(delete.uid().field(), uidField) : delete.uid().field();
assert assertVersionType(delete);
assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo());
Expand Down Expand Up @@ -2114,6 +2135,15 @@ private boolean incrementIndexVersionLookup() {
return true;
}

int getVersionMapSize() {
return versionMap.getAllCurrent().size();
}

boolean isSafeAccessRequired() {
return versionMap.isSafeAccessRequired();
}


/**
* Returns <code>true</code> iff the index writer has any deletions either buffered in memory or
* in the index.
Expand Down
157 changes: 142 additions & 15 deletions core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.concurrent.atomic.AtomicLong;

/** Maps _uid value to its version information. */
class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {

/**
* Resets the internal map and adjusts it's capacity as if there were no indexing operations.
Expand All @@ -46,29 +46,110 @@ void adjustMapSizeUnderLock() {
maps = new Maps();
}

private static class Maps {
private static final class VersionLookup {

private static final VersionLookup EMPTY = new VersionLookup(Collections.emptyMap());
private final Map<BytesRef,VersionValue> map;

// each version map has a notion of safe / unsafe which allows us to apply certain optimization in the auto-generated ID usecase
// where we know that documents can't have any duplicates so we can skip the version map entirely. This reduces
// the memory pressure significantly for this use-case where we often get a massive amount of small document (metrics).
// if the version map is in safeAccess mode we track all version in the version map. yet if a document comes in that needs
// safe access but we are not in this mode we force a refresh and make the map as safe access required. All subsequent ops will
// respect that and fill the version map. The nice part here is that we are only really requiring this for a single ID and since
// we hold the ID lock in the engine while we do all this it's safe to do it globally unlocked.
// NOTE: these values can both be non-volatile since it's ok to read a stale value per doc ID. We serialize changes in the engine
// that will prevent concurrent updates to the same document ID and therefore we can rely on the happens-before guanratee of the
// map reference itself.
private boolean unsafe;

private VersionLookup(Map<BytesRef, VersionValue> map) {
this.map = map;
}

VersionValue get(BytesRef key) {
return map.get(key);
}

VersionValue put(BytesRef key, VersionValue value) {
return map.put(key, value);
}

boolean isEmpty() {
return map.isEmpty();
}


int size() {
return map.size();
}

boolean isUnsafe() {
return unsafe;
}

void markAsUnsafe() {
unsafe = true;
}
}

private static final class Maps {

// All writes (adds and deletes) go into here:
final Map<BytesRef,VersionValue> current;
final VersionLookup current;

// Used while refresh is running, and to hold adds/deletes until refresh finishes. We read from both current and old on lookup:
final Map<BytesRef,VersionValue> old;
final VersionLookup old;

// this is not volatile since we don't need to maintain a happens before relation ship across doc IDs so it's enough to
// have the volatile read of the Maps reference to make it visible even across threads.
boolean needsSafeAccess;
final boolean previousMapsNeededSafeAccess;

Maps(Map<BytesRef,VersionValue> current, Map<BytesRef,VersionValue> old) {
this.current = current;
this.old = old;
Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess) {
this.current = current;
this.old = old;
this.previousMapsNeededSafeAccess = previousMapsNeededSafeAccess;
}

Maps() {
this(ConcurrentCollections.<BytesRef,VersionValue>newConcurrentMapWithAggressiveConcurrency(),
Collections.emptyMap());
this(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()), VersionLookup.EMPTY, false);
}

boolean isSafeAccessMode() {
return needsSafeAccess || previousMapsNeededSafeAccess;
}

boolean shouldInheritSafeAccess() {
final boolean mapHasNotSeenAnyOperations = current.isEmpty() && current.isUnsafe() == false;
return needsSafeAccess
// we haven't seen any ops and map before needed it so we maintain it
|| (mapHasNotSeenAnyOperations && previousMapsNeededSafeAccess);
}

/**
* Builds a new map for the refresh transition this should be called in beforeRefresh()
*/
Maps buildTransitionMap() {
return new Maps(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(current.size())),
current, shouldInheritSafeAccess());
}

/**
* builds a new map that invalidates the old map but maintains the current. This should be called in afterRefresh()
*/
Maps invalidateOldMap() {
return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
}
}

// All deletes also go here, and delete "tombstones" are retained after refresh:
private final Map<BytesRef,DeleteVersionValue> tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();

private volatile Maps maps = new Maps();
// we maintain a second map that only receives the updates that we skip on the actual map (unsafe ops)
// this map is only maintained if assertions are enabled
private volatile Maps unsafeKeysMap = new Maps();

/** Bytes consumed for each BytesRef UID:
* In this base value, we account for the {@link BytesRef} object itself as
Expand Down Expand Up @@ -113,8 +194,8 @@ public void beforeRefresh() throws IOException {
// map. While reopen is running, any lookup will first
// try this new map, then fallback to old, then to the
// current searcher:
maps = new Maps(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(maps.current.size()), maps.current);

maps = maps.buildTransitionMap();
assert (unsafeKeysMap = unsafeKeysMap.buildTransitionMap()) != null;
// This is not 100% correct, since concurrent indexing ops can change these counters in between our execution of the previous
// line and this one, but that should be minor, and the error won't accumulate over time:
ramBytesUsedCurrent.set(0);
Expand All @@ -128,13 +209,18 @@ public void afterRefresh(boolean didRefresh) throws IOException {
// case. This is because we assign new maps (in beforeRefresh) slightly before Lucene actually flushes any segments for the
// reopen, and so any concurrent indexing requests can still sneak in a few additions to that current map that are in fact reflected
// in the previous reader. We don't touch tombstones here: they expire on their own index.gc_deletes timeframe:
maps = new Maps(maps.current, Collections.emptyMap());

maps = maps.invalidateOldMap();
assert (unsafeKeysMap = unsafeKeysMap.invalidateOldMap()) != null;

}

/** Returns the live version (add or delete) for this uid. */
VersionValue getUnderLock(final BytesRef uid) {
Maps currentMaps = maps;
return getUnderLock(uid, maps);
}

private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) {
// First try to get the "live" value:
VersionValue value = currentMaps.current.get(uid);
if (value != null) {
Expand All @@ -149,11 +235,52 @@ VersionValue getUnderLock(final BytesRef uid) {
return tombstones.get(uid);
}

VersionValue getVersionForAssert(final BytesRef uid) {
VersionValue value = getUnderLock(uid, maps);
if (value == null) {
value = getUnderLock(uid, unsafeKeysMap);
}
return value;
}

boolean isUnsafe() {
return maps.current.isUnsafe() || maps.old.isUnsafe();
}

void enforceSafeAccess() {
maps.needsSafeAccess = true;
}

boolean isSafeAccessRequired() {
return maps.isSafeAccessMode();
}

/** Adds this uid/version to the pending adds map iff the map needs safe access. */
void maybePutUnderLock(BytesRef uid, VersionValue version) {
Maps maps = this.maps;
if (maps.isSafeAccessMode()) {
putUnderLock(uid, version, maps);
} else {
maps.current.markAsUnsafe();
assert putAssertionMap(uid, version);
}
}

private boolean putAssertionMap(BytesRef uid, VersionValue version) {
putUnderLock(uid, version, unsafeKeysMap);
return true;
}

/** Adds this uid/version to the pending adds map. */
void putUnderLock(BytesRef uid, VersionValue version) {
Maps maps = this.maps;
putUnderLock(uid, version, maps);
}

/** Adds this uid/version to the pending adds map. */
private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) {
assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length;
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;

final VersionValue prev = maps.current.put(uid, version);
if (prev != null) {
// Deduct RAM for the version we just replaced:
Expand Down Expand Up @@ -264,5 +391,5 @@ public Collection<Accountable> getChildResources() {

/** Returns the current internal versions as a point in time snapshot*/
Map<BytesRef, VersionValue> getAllCurrent() {
return maps.current;
return maps.current.map;
}}
Loading