Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,11 @@ ${path.logs}
#
# Limits the memory pool for datafusion which it uses for query execution.
#datafusion.search.memory_pool: 1GB
#node.attr.remote_store.segment.repository: my-repo-1
#node.attr.remote_store.translog.repository: my-repo-1
#node.attr.remote_store.state.repository: my-repo-1
#cluster.remote_store.state.enabled: true
#node.attr.remote_store.repository.my-repo-1.type: fs
#path.repo: /tmp/remote-store-repo
#node.attr.remote_store.repository.my-repo-1.settings.location: /tmp/remote-store-repo

Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
import org.opensearch.index.engine.exec.WriterFileSet;
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
import org.opensearch.index.engine.exec.coord.CompositeEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

public class DatafusionReaderManager implements EngineReaderManager<DatafusionReader>, CatalogSnapshotAwareRefreshListener, FileDeletionListener {
private static final Logger log = LoggerFactory.getLogger(DatafusionReaderManager.class);
private DatafusionReader current;
private String path;
private String dataFormat;
Expand Down Expand Up @@ -72,9 +76,14 @@ public void beforeRefresh() throws IOException {
}

@Override
public void afterRefresh(boolean didRefresh, CompositeEngine.ReleasableRef<CatalogSnapshot> catalogSnapshot) throws IOException {
if (didRefresh && catalogSnapshot != null) {
public void afterRefresh(boolean didRefresh, Supplier<CompositeEngine.ReleasableRef<CatalogSnapshot>> catalogSnapshotSupplier) throws IOException {
if (didRefresh && catalogSnapshotSupplier != null) {
DatafusionReader old = this.current;
final CompositeEngine.ReleasableRef<CatalogSnapshot> catalogSnapshot = catalogSnapshotSupplier.get();
if (catalogSnapshot == null) {
log.warn("Catalog snapshot is null, skipping post refresh actions for Datafusion reader");
return;
}
Collection<WriterFileSet> newFiles = catalogSnapshot.getRef().getSearchableFiles(dataFormat);
this.current = new DatafusionReader(this.path, catalogSnapshot, catalogSnapshot.getRef().getSearchableFiles(dataFormat));
if (old != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.index.engine.exec.coord.CompositeEngine;

import java.io.IOException;
import java.util.function.Supplier;

public interface CatalogSnapshotAwareRefreshListener {
/**
Expand All @@ -24,5 +25,5 @@ public interface CatalogSnapshotAwareRefreshListener {
* @param didRefresh whether refresh actually occurred
* @param catalogSnapshot the current catalog snapshot with file information
*/
void afterRefresh(boolean didRefresh, CompositeEngine.ReleasableRef<CatalogSnapshot> catalogSnapshot) throws IOException;
void afterRefresh(boolean didRefresh, Supplier<CompositeEngine.ReleasableRef<CatalogSnapshot>> catalogSnapshot) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.VersionType;
import org.opensearch.index.engine.exec.coord.LastRefreshedCheckpointListener;
import org.opensearch.index.fieldvisitor.IdOnlyFieldVisitor;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.ParseContext;
Expand Down Expand Up @@ -337,7 +338,7 @@ public void onFailure(String reason, Exception ex) {
for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) {
this.internalReaderManager.addListener(listener);
}
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint());
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker);
this.internalReaderManager.addListener(lastRefreshedCheckpointListener);
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(
SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getMaxSeqNo())
Expand Down Expand Up @@ -2448,14 +2449,14 @@ public long tryDeleteDocument(IndexReader readerIn, int docID) {
* Returned the last local checkpoint value has been refreshed internally.
*/
public final long lastRefreshedCheckpoint() {
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
return lastRefreshedCheckpointListener.getRefreshedCheckpoint();
}

/**
* Returns the current local checkpoint getting refreshed internally.
*/
public final long currentOngoingRefreshCheckpoint() {
return lastRefreshedCheckpointListener.pendingCheckpoint.get();
return lastRefreshedCheckpointListener.getPendingCheckpoint();
}

private final Object refreshIfNeededMutex = new Object();
Expand All @@ -2473,38 +2474,6 @@ protected final void refreshIfNeeded(String source, long requestingSeqNo) {
}
}

private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener {
final AtomicLong refreshedCheckpoint;
volatile AtomicLong pendingCheckpoint;

LastRefreshedCheckpointListener(long initialLocalCheckpoint) {
this.refreshedCheckpoint = new AtomicLong(initialLocalCheckpoint);
this.pendingCheckpoint = new AtomicLong(initialLocalCheckpoint);
}

@Override
public void beforeRefresh() {
// all changes until this point should be visible after refresh
pendingCheckpoint.updateAndGet(curr -> Math.max(curr, localCheckpointTracker.getProcessedCheckpoint()));
}

@Override
public void afterRefresh(boolean didRefresh) {
if (didRefresh) {
updateRefreshedCheckpoint(pendingCheckpoint.get());
}
}

void updateRefreshedCheckpoint(long checkpoint) {
refreshedCheckpoint.updateAndGet(curr -> Math.max(curr, checkpoint));
assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint;
// This shouldn't be required ideally, but we're also invoking this method from refresh as of now.
// This change is added as safety check to ensure that our checkpoint values are consistent at all times.
pendingCheckpoint.updateAndGet(curr -> Math.max(curr, checkpoint));

}
}

@Override
public final long getMaxSeenAutoIdTimestamp() {
return maxSeenAutoIdTimestamp.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ public interface Indexer {
*/
int fillSeqNoGaps(long primaryTerm) throws IOException;

default long lastRefreshedCheckpoint() {
throw new UnsupportedOperationException();
}

default long currentOngoingRefreshCheckpoint() {
throw new UnsupportedOperationException();
}

/**
* Performs a force merge operation on this engine.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,9 @@ public String toString() {
return "CatalogSnapshot{" + "id=" + id + ", version=" + version + ", dfGroupedSearchableFiles=" + dfGroupedSearchableFiles + ", List of Segment= " + segmentList + ", userData=" + userData +'}';
}

@Override
public CatalogSnapshot clone() {
// TODO this doesn't call super.clone right now
return new CatalogSnapshot(getId(), getVersion(), getSegments(), catalogSnapshotMap, indexFileDeleterSupplier);
public CatalogSnapshot cloneNoAcquire() {
// Still using the clone call since Lucene call requires clone. This will allow a SegmentsInfos backed CatalogSnapshot to use the same method in calls.
return this;
}
Comment on lines +242 to 245
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this isn't clone right?

Copy link
Member Author

@mgodwan mgodwan Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.
We need a clone for SegmentsInfo but it is not required for default CatalogSnaphot. Once we have a SegmentInfos backed CatalogSnapshot for Lucene changes to work with same code, this would make more sense.
Hence, kept it like this for now.


public static class Segment implements Serializable, Writeable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexCommit;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -44,7 +43,6 @@
import org.opensearch.index.engine.SafeCommitInfo;
import org.opensearch.index.engine.SearchExecEngine;
import org.opensearch.index.engine.Segment;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.VersionValue;
import org.opensearch.index.engine.exec.RefreshInput;
import org.opensearch.index.engine.exec.RefreshResult;
Expand Down Expand Up @@ -103,6 +101,7 @@
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.opensearch.index.engine.Engine.HISTORY_UUID_KEY;
import static org.opensearch.index.engine.Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID;
Expand All @@ -126,7 +125,7 @@ public class CompositeEngine implements LifecycleAware, Closeable, Indexer, Chec
throw new RuntimeException(e);
}
};
private static final BiConsumer<ReleasableRef<CatalogSnapshot>, CatalogSnapshotAwareRefreshListener>
private static final BiConsumer<Supplier<ReleasableRef<CatalogSnapshot>>, CatalogSnapshotAwareRefreshListener>
POST_REFRESH_CATALOG_SNAPSHOT_AWARE_LISTENER_CONSUMER = (catalogSnapshot, catalogSnapshotAwareRefreshListener) -> {
try {
catalogSnapshotAwareRefreshListener.afterRefresh(true, catalogSnapshot);
Expand Down Expand Up @@ -156,6 +155,7 @@ public class CompositeEngine implements LifecycleAware, Closeable, Indexer, Chec
protected final String historyUUID;

private final LocalCheckpointTracker localCheckpointTracker;
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
private final ReentrantLock failEngineLock = new ReentrantLock();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
Expand Down Expand Up @@ -215,6 +215,9 @@ public CompositeEngine(
}
// initialize local checkpoint tracker and translog manager
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker);
refreshListeners.add(lastRefreshedCheckpointListener);

final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
TranslogEventListener internalTranslogEventListener = new TranslogEventListener() {
Expand Down Expand Up @@ -326,7 +329,7 @@ public void onFailure(String reason, Exception ex) {
}
}
catalogSnapshotAwareRefreshListeners.forEach(refreshListener -> POST_REFRESH_CATALOG_SNAPSHOT_AWARE_LISTENER_CONSUMER.accept(
acquireSnapshot(),
this::acquireSnapshot,
refreshListener
));
success = true;
Expand All @@ -344,6 +347,7 @@ public void onFailure(String reason, Exception ex) {
logger.trace("created new CompositeEngine");

initializeRefreshListeners(engineConfig);

}

private LocalCheckpointTracker createLocalCheckpointTracker(
Expand Down Expand Up @@ -410,7 +414,7 @@ LocalCheckpointTracker getLocalCheckpointTracker() {
public void updateSearchEngine() throws IOException {
catalogSnapshotAwareRefreshListeners.forEach(ref -> {
try {
ref.afterRefresh(true, catalogSnapshotManager.acquireSnapshot());
ref.afterRefresh(true, catalogSnapshotManager::acquireSnapshot);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -695,7 +699,7 @@ public synchronized void refresh(String source) throws EngineException {
}
catalogSnapshotManager.applyRefreshResult(refreshResult);
catalogSnapshotAwareRefreshListeners.forEach(refreshListener -> POST_REFRESH_CATALOG_SNAPSHOT_AWARE_LISTENER_CONSUMER.accept(
acquireSnapshot(),
this::acquireSnapshot,
refreshListener
));

Expand Down Expand Up @@ -797,6 +801,16 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
return 0;
}

@Override
public long lastRefreshedCheckpoint() {
return lastRefreshedCheckpointListener.getRefreshedCheckpoint();
}

@Override
public long currentOngoingRefreshCheckpoint() {
return lastRefreshedCheckpointListener.getPendingCheckpoint();
}

@Override
public void forceMerge(
boolean flush,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,4 @@ public String toString() {
public Map<String, Map<String, AtomicInteger>> getFileRefCounts() {
return Map.copyOf(fileRefCounts);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.engine.exec.coord;

import org.apache.lucene.search.ReferenceManager;
import org.opensearch.index.seqno.LocalCheckpointTracker;

import java.util.concurrent.atomic.AtomicLong;

public final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener {
private final AtomicLong refreshedCheckpoint;
private final AtomicLong pendingCheckpoint;
private final LocalCheckpointTracker localCheckpointTracker;

public LastRefreshedCheckpointListener(LocalCheckpointTracker localCheckpointTracker) {
final long processedCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
this.refreshedCheckpoint = new AtomicLong(processedCheckpoint);
this.pendingCheckpoint = new AtomicLong(processedCheckpoint);
this.localCheckpointTracker = localCheckpointTracker;
}

public long getRefreshedCheckpoint() {
return refreshedCheckpoint.get();
}

public long getPendingCheckpoint() {
return pendingCheckpoint.get();
}

@Override
public void beforeRefresh() {
// all changes until this point should be visible after refresh
pendingCheckpoint.updateAndGet(curr -> Math.max(curr, localCheckpointTracker.getProcessedCheckpoint()));
}

@Override
public void afterRefresh(boolean didRefresh) {
if (didRefresh) {
updateRefreshedCheckpoint(pendingCheckpoint.get());
}
}

public void updateRefreshedCheckpoint(long checkpoint) {
refreshedCheckpoint.updateAndGet(curr -> Math.max(curr, checkpoint));
assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint;
// This shouldn't be required ideally, but we're also invoking this method from refresh as of now.
// This change is added as safety check to ensure that our checkpoint values are consistent at all times.
pendingCheckpoint.updateAndGet(curr -> Math.max(curr, checkpoint));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
* RefreshListener that runs afterRefresh method if and only if there is a permit available. Once the {@code drainRefreshes()}
Expand Down Expand Up @@ -62,7 +63,7 @@ public ReleasableRetryableRefreshListener(ThreadPool threadPool) {
}

@Override
public final void afterRefresh(boolean didRefresh, CompositeEngine.ReleasableRef<CatalogSnapshot> catalogSnapshot) throws IOException {
public final void afterRefresh(boolean didRefresh, Supplier<CompositeEngine.ReleasableRef<CatalogSnapshot>> catalogSnapshot) throws IOException {
// TODO CompositeEngine filters CatalogSnapshotAwareListeners, keeping this for now
afterRefresh(didRefresh);
}
Expand Down
Loading
Loading