diff --git a/distribution/src/config/opensearch.yml b/distribution/src/config/opensearch.yml index ae0c62004b737..1b0fe139fbaab 100644 --- a/distribution/src/config/opensearch.yml +++ b/distribution/src/config/opensearch.yml @@ -125,3 +125,11 @@ ${path.logs} # # Limits the memory pool for datafusion which it uses for query execution. #datafusion.search.memory_pool: 1GB +#node.attr.remote_store.segment.repository: my-repo-1 +#node.attr.remote_store.translog.repository: my-repo-1 +#node.attr.remote_store.state.repository: my-repo-1 +#cluster.remote_store.state.enabled: true +#node.attr.remote_store.repository.my-repo-1.type: fs +#path.repo: /tmp/remote-store-repo +#node.attr.remote_store.repository.my-repo-1.settings.location: /tmp/remote-store-repo + diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java index 325283d928469..8e0f62f58a1a5 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java @@ -20,14 +20,18 @@ import org.opensearch.index.engine.exec.WriterFileSet; import org.opensearch.index.engine.exec.coord.CatalogSnapshot; import org.opensearch.index.engine.exec.coord.CompositeEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; import java.nio.file.Path; import java.util.Collection; import java.util.List; +import java.util.function.Supplier; public class DatafusionReaderManager implements EngineReaderManager, CatalogSnapshotAwareRefreshListener, FileDeletionListener { + private static final Logger log = LoggerFactory.getLogger(DatafusionReaderManager.class); private DatafusionReader current; private String path; private String dataFormat; @@ -72,9 +76,14 @@ public void beforeRefresh() throws IOException { } @Override - public void afterRefresh(boolean didRefresh, CompositeEngine.ReleasableRef catalogSnapshot) throws IOException { - if (didRefresh && catalogSnapshot != null) { + public void afterRefresh(boolean didRefresh, Supplier> catalogSnapshotSupplier) throws IOException { + if (didRefresh && catalogSnapshotSupplier != null) { DatafusionReader old = this.current; + final CompositeEngine.ReleasableRef catalogSnapshot = catalogSnapshotSupplier.get(); + if (catalogSnapshot == null) { + log.warn("Catalog snapshot is null, skipping post refresh actions for Datafusion reader"); + return; + } Collection newFiles = catalogSnapshot.getRef().getSearchableFiles(dataFormat); this.current = new DatafusionReader(this.path, catalogSnapshot, catalogSnapshot.getRef().getSearchableFiles(dataFormat)); if (old != null) { diff --git a/server/src/main/java/org/opensearch/index/engine/CatalogSnapshotAwareRefreshListener.java b/server/src/main/java/org/opensearch/index/engine/CatalogSnapshotAwareRefreshListener.java index b5bd4e445c3a9..a563b5d3e4ae3 100644 --- a/server/src/main/java/org/opensearch/index/engine/CatalogSnapshotAwareRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/engine/CatalogSnapshotAwareRefreshListener.java @@ -12,6 +12,7 @@ import org.opensearch.index.engine.exec.coord.CompositeEngine; import java.io.IOException; +import java.util.function.Supplier; public interface CatalogSnapshotAwareRefreshListener { /** @@ -24,5 +25,5 @@ public interface CatalogSnapshotAwareRefreshListener { * @param didRefresh whether refresh actually occurred * @param catalogSnapshot the current catalog snapshot with file information */ - void afterRefresh(boolean didRefresh, CompositeEngine.ReleasableRef catalogSnapshot) throws IOException; + void afterRefresh(boolean didRefresh, Supplier> catalogSnapshot) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index d63b644ae85a0..8c14e495a1dec 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -98,6 +98,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; import org.opensearch.index.VersionType; +import org.opensearch.index.engine.exec.coord.LastRefreshedCheckpointListener; import org.opensearch.index.fieldvisitor.IdOnlyFieldVisitor; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.ParseContext; @@ -337,7 +338,7 @@ public void onFailure(String reason, Exception ex) { for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) { this.internalReaderManager.addListener(listener); } - this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint()); + this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker); this.internalReaderManager.addListener(lastRefreshedCheckpointListener); maxSeqNoOfUpdatesOrDeletes = new AtomicLong( SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getMaxSeqNo()) @@ -2448,14 +2449,14 @@ public long tryDeleteDocument(IndexReader readerIn, int docID) { * Returned the last local checkpoint value has been refreshed internally. */ public final long lastRefreshedCheckpoint() { - return lastRefreshedCheckpointListener.refreshedCheckpoint.get(); + return lastRefreshedCheckpointListener.getRefreshedCheckpoint(); } /** * Returns the current local checkpoint getting refreshed internally. */ public final long currentOngoingRefreshCheckpoint() { - return lastRefreshedCheckpointListener.pendingCheckpoint.get(); + return lastRefreshedCheckpointListener.getPendingCheckpoint(); } private final Object refreshIfNeededMutex = new Object(); @@ -2473,38 +2474,6 @@ protected final void refreshIfNeeded(String source, long requestingSeqNo) { } } - private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener { - final AtomicLong refreshedCheckpoint; - volatile AtomicLong pendingCheckpoint; - - LastRefreshedCheckpointListener(long initialLocalCheckpoint) { - this.refreshedCheckpoint = new AtomicLong(initialLocalCheckpoint); - this.pendingCheckpoint = new AtomicLong(initialLocalCheckpoint); - } - - @Override - public void beforeRefresh() { - // all changes until this point should be visible after refresh - pendingCheckpoint.updateAndGet(curr -> Math.max(curr, localCheckpointTracker.getProcessedCheckpoint())); - } - - @Override - public void afterRefresh(boolean didRefresh) { - if (didRefresh) { - updateRefreshedCheckpoint(pendingCheckpoint.get()); - } - } - - void updateRefreshedCheckpoint(long checkpoint) { - refreshedCheckpoint.updateAndGet(curr -> Math.max(curr, checkpoint)); - assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint; - // This shouldn't be required ideally, but we're also invoking this method from refresh as of now. - // This change is added as safety check to ensure that our checkpoint values are consistent at all times. - pendingCheckpoint.updateAndGet(curr -> Math.max(curr, checkpoint)); - - } - } - @Override public final long getMaxSeenAutoIdTimestamp() { return maxSeenAutoIdTimestamp.get(); diff --git a/server/src/main/java/org/opensearch/index/engine/exec/bridge/Indexer.java b/server/src/main/java/org/opensearch/index/engine/exec/bridge/Indexer.java index 932c1c9151d83..46e20f943e860 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/bridge/Indexer.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/bridge/Indexer.java @@ -156,6 +156,14 @@ public interface Indexer { */ int fillSeqNoGaps(long primaryTerm) throws IOException; + default long lastRefreshedCheckpoint() { + throw new UnsupportedOperationException(); + } + + default long currentOngoingRefreshCheckpoint() { + throw new UnsupportedOperationException(); + } + /** * Performs a force merge operation on this engine. */ diff --git a/server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshot.java b/server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshot.java index 0294e1067c9d0..52590d5e0c848 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshot.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshot.java @@ -239,10 +239,9 @@ public String toString() { return "CatalogSnapshot{" + "id=" + id + ", version=" + version + ", dfGroupedSearchableFiles=" + dfGroupedSearchableFiles + ", List of Segment= " + segmentList + ", userData=" + userData +'}'; } - @Override - public CatalogSnapshot clone() { - // TODO this doesn't call super.clone right now - return new CatalogSnapshot(getId(), getVersion(), getSegments(), catalogSnapshotMap, indexFileDeleterSupplier); + public CatalogSnapshot cloneNoAcquire() { + // Still using the clone call since Lucene call requires clone. This will allow a SegmentsInfos backed CatalogSnapshot to use the same method in calls. + return this; } public static class Segment implements Serializable, Writeable { diff --git a/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java b/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java index dcf487a23a970..c02e65c7a5fb4 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.IndexCommit; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.common.Nullable; @@ -44,7 +43,6 @@ import org.opensearch.index.engine.SafeCommitInfo; import org.opensearch.index.engine.SearchExecEngine; import org.opensearch.index.engine.Segment; -import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.engine.VersionValue; import org.opensearch.index.engine.exec.RefreshInput; import org.opensearch.index.engine.exec.RefreshResult; @@ -103,6 +101,7 @@ import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Supplier; import static org.opensearch.index.engine.Engine.HISTORY_UUID_KEY; import static org.opensearch.index.engine.Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID; @@ -126,7 +125,7 @@ public class CompositeEngine implements LifecycleAware, Closeable, Indexer, Chec throw new RuntimeException(e); } }; - private static final BiConsumer, CatalogSnapshotAwareRefreshListener> + private static final BiConsumer>, CatalogSnapshotAwareRefreshListener> POST_REFRESH_CATALOG_SNAPSHOT_AWARE_LISTENER_CONSUMER = (catalogSnapshot, catalogSnapshotAwareRefreshListener) -> { try { catalogSnapshotAwareRefreshListener.afterRefresh(true, catalogSnapshot); @@ -156,6 +155,7 @@ public class CompositeEngine implements LifecycleAware, Closeable, Indexer, Chec protected final String historyUUID; private final LocalCheckpointTracker localCheckpointTracker; + private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener; private final ReentrantLock failEngineLock = new ReentrantLock(); private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final ReleasableLock readLock = new ReleasableLock(rwl.readLock()); @@ -215,6 +215,9 @@ public CompositeEngine( } // initialize local checkpoint tracker and translog manager this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); + this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker); + refreshListeners.add(lastRefreshedCheckpointListener); + final Map userData = store.readLastCommittedSegmentsInfo().getUserData(); String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); TranslogEventListener internalTranslogEventListener = new TranslogEventListener() { @@ -326,7 +329,7 @@ public void onFailure(String reason, Exception ex) { } } catalogSnapshotAwareRefreshListeners.forEach(refreshListener -> POST_REFRESH_CATALOG_SNAPSHOT_AWARE_LISTENER_CONSUMER.accept( - acquireSnapshot(), + this::acquireSnapshot, refreshListener )); success = true; @@ -344,6 +347,7 @@ public void onFailure(String reason, Exception ex) { logger.trace("created new CompositeEngine"); initializeRefreshListeners(engineConfig); + } private LocalCheckpointTracker createLocalCheckpointTracker( @@ -410,7 +414,7 @@ LocalCheckpointTracker getLocalCheckpointTracker() { public void updateSearchEngine() throws IOException { catalogSnapshotAwareRefreshListeners.forEach(ref -> { try { - ref.afterRefresh(true, catalogSnapshotManager.acquireSnapshot()); + ref.afterRefresh(true, catalogSnapshotManager::acquireSnapshot); } catch (IOException e) { throw new RuntimeException(e); } @@ -695,7 +699,7 @@ public synchronized void refresh(String source) throws EngineException { } catalogSnapshotManager.applyRefreshResult(refreshResult); catalogSnapshotAwareRefreshListeners.forEach(refreshListener -> POST_REFRESH_CATALOG_SNAPSHOT_AWARE_LISTENER_CONSUMER.accept( - acquireSnapshot(), + this::acquireSnapshot, refreshListener )); @@ -797,6 +801,16 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException { return 0; } + @Override + public long lastRefreshedCheckpoint() { + return lastRefreshedCheckpointListener.getRefreshedCheckpoint(); + } + + @Override + public long currentOngoingRefreshCheckpoint() { + return lastRefreshedCheckpointListener.getPendingCheckpoint(); + } + @Override public void forceMerge( boolean flush, diff --git a/server/src/main/java/org/opensearch/index/engine/exec/coord/IndexFileDeleter.java b/server/src/main/java/org/opensearch/index/engine/exec/coord/IndexFileDeleter.java index c99c5093511b0..d365187b1e487 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/coord/IndexFileDeleter.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/coord/IndexFileDeleter.java @@ -132,4 +132,4 @@ public String toString() { public Map> getFileRefCounts() { return Map.copyOf(fileRefCounts); } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/opensearch/index/engine/exec/coord/LastRefreshedCheckpointListener.java b/server/src/main/java/org/opensearch/index/engine/exec/coord/LastRefreshedCheckpointListener.java new file mode 100644 index 0000000000000..73fcbe59bb4c8 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/exec/coord/LastRefreshedCheckpointListener.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine.exec.coord; + +import org.apache.lucene.search.ReferenceManager; +import org.opensearch.index.seqno.LocalCheckpointTracker; + +import java.util.concurrent.atomic.AtomicLong; + +public final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener { + private final AtomicLong refreshedCheckpoint; + private final AtomicLong pendingCheckpoint; + private final LocalCheckpointTracker localCheckpointTracker; + + public LastRefreshedCheckpointListener(LocalCheckpointTracker localCheckpointTracker) { + final long processedCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); + this.refreshedCheckpoint = new AtomicLong(processedCheckpoint); + this.pendingCheckpoint = new AtomicLong(processedCheckpoint); + this.localCheckpointTracker = localCheckpointTracker; + } + + public long getRefreshedCheckpoint() { + return refreshedCheckpoint.get(); + } + + public long getPendingCheckpoint() { + return pendingCheckpoint.get(); + } + + @Override + public void beforeRefresh() { + // all changes until this point should be visible after refresh + pendingCheckpoint.updateAndGet(curr -> Math.max(curr, localCheckpointTracker.getProcessedCheckpoint())); + } + + @Override + public void afterRefresh(boolean didRefresh) { + if (didRefresh) { + updateRefreshedCheckpoint(pendingCheckpoint.get()); + } + } + + public void updateRefreshedCheckpoint(long checkpoint) { + refreshedCheckpoint.updateAndGet(curr -> Math.max(curr, checkpoint)); + assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint; + // This shouldn't be required ideally, but we're also invoking this method from refresh as of now. + // This change is added as safety check to ensure that our checkpoint values are consistent at all times. + pendingCheckpoint.updateAndGet(curr -> Math.max(curr, checkpoint)); + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java index 0b4d9b8ffd51a..1628e2cfc567c 100644 --- a/server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; /** * RefreshListener that runs afterRefresh method if and only if there is a permit available. Once the {@code drainRefreshes()} @@ -62,7 +63,7 @@ public ReleasableRetryableRefreshListener(ThreadPool threadPool) { } @Override - public final void afterRefresh(boolean didRefresh, CompositeEngine.ReleasableRef catalogSnapshot) throws IOException { + public final void afterRefresh(boolean didRefresh, Supplier> catalogSnapshot) throws IOException { // TODO CompositeEngine filters CatalogSnapshotAwareListeners, keeping this for now afterRefresh(didRefresh); } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index bae430b6123c3..f9f9c83a16839 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -9,24 +9,20 @@ package org.opensearch.index.shard; import org.apache.logging.log4j.Logger; -import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.FilterDirectory; import org.opensearch.action.LatchedActionListener; import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.cluster.routing.RecoverySource; -import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.logging.Loggers; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.UploadListener; import org.opensearch.core.action.ActionListener; -import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.exec.FileMetadata; import org.opensearch.index.engine.exec.coord.CatalogSnapshot; import org.opensearch.index.engine.exec.coord.CompositeEngine; import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.seqno.SequenceNumbers; -import org.opensearch.index.store.CompositeRemoteSegmentStoreDirectory; import org.opensearch.index.store.CompositeStoreDirectory; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; @@ -43,7 +39,6 @@ import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -224,7 +219,7 @@ private boolean syncSegments() { // primaryMode to true. Due to this, the refresh that is triggered post replay of translog will not go through // if following condition does not exist. The segments created as part of translog replay will not be present // in the remote store. - return indexShard.state() != IndexShardState.STARTED || !(indexShard.getEngine() instanceof InternalEngine); + return indexShard.state() != IndexShardState.STARTED || !(indexShard.getIndexer() instanceof InternalEngine || indexShard.getIndexer() instanceof CompositeEngine); } beforeSegmentsSync(); long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs(); @@ -258,7 +253,7 @@ private boolean syncSegments() { } // Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can // move. - long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); + long lastRefreshedCheckpoint = indexShard.getIndexer().lastRefreshedCheckpoint(); Collection localFilesPostRefresh = catalogSnapshot.getFileMetadataList(); @@ -430,7 +425,7 @@ private void onSuccessfulSegmentsSync( // Reset the backoffDelayIterator for the future failures resetBackOffDelayIterator(); // Set the minimum sequence number for keeping translog - indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); + indexShard.getIndexer().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); // Publishing the new checkpoint which is used for remote store + segrep indexes checkpointPublisher.publish(indexShard, checkpoint); logger.debug("onSuccessfulSegmentsSync lastRefreshedCheckpoint={} checkpoint={}", lastRefreshedCheckpoint, checkpoint); @@ -477,9 +472,9 @@ private boolean isRefreshAfterCommitSafe() { // ToDo:@Kamal Update MaxSeqNo void uploadMetadata(Collection localFilesPostRefresh, CatalogSnapshot catalogSnapshot, ReplicationCheckpoint replicationCheckpoint) throws IOException { - final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint(); + final long maxSeqNo = indexShard.getIndexer().currentOngoingRefreshCheckpoint(); - CatalogSnapshot catalogSnapshotCloned = catalogSnapshot.clone(); + CatalogSnapshot catalogSnapshotCloned = catalogSnapshot.cloneNoAcquire(); // Create mutable copy and update checkpoint fields while preserving ALL existing metadata catalogSnapshotCloned.getUserData().put(LOCAL_CHECKPOINT_KEY, String.valueOf(maxSeqNo)); @@ -491,7 +486,7 @@ void uploadMetadata(Collection localFilesPostRefresh, CatalogSnaps catalogSnapshotCloned.getUserData().get(org.opensearch.index.engine.Engine.HISTORY_UUID_KEY), catalogSnapshotCloned.getUserData().keySet()); - Translog.TranslogGeneration translogGeneration = indexShard.getEngine().translogManager().getTranslogGeneration(); + Translog.TranslogGeneration translogGeneration = indexShard.getIndexer().translogManager().getTranslogGeneration(); if (translogGeneration == null) { throw new UnsupportedOperationException("Encountered null TranslogGeneration while uploading metadata to remote segment store"); } else { @@ -631,8 +626,8 @@ private boolean isReadyForUpload() { if (indexShard.state() != null) { sb.append(" indexShardState=").append(indexShard.state()); } - if (indexShard.getEngineOrNull() != null) { - sb.append(" engineType=").append(indexShard.getEngine().getClass().getSimpleName()); + if (indexShard.getIndexerOrNull() != null) { + sb.append(" engineType=").append(indexShard.getIndexer().getClass().getSimpleName()); } if (indexShard.recoveryState() != null) { sb.append(" recoverySourceType=").append(indexShard.recoveryState().getRecoverySource().getType());