Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 267 additions & 9 deletions docs/internal/DistributedArchitectureGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1262,23 +1262,280 @@ See `TransportMasterNodeAction` Javadoc for a detailed description of the execut

### Lucene Locking

# Engine
# Engine & Store

(What does Engine mean in the distrib layer? Distinguish Engine vs Directory vs Lucene)
[Engine]:https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/engine/Engine.java

(High level explanation of how translog ties in with Lucene)
[InternalEngine]:https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

(contrast Lucene vs ES flush / refresh / fsync)
[IndexShard]:https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

### Refresh for Read
[Translog]:https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/translog/Translog.java

(internal vs external reader manager refreshes? flush vs refresh)
### IndexShard

### Reference Counting
The [IndexShard] class is the single entry point for all shard-level operations: indexing, deletion, real-time GET,
refresh, flush, recovery, and snapshot. There is exactly one `IndexShard` object per allocated shard.

An `IndexShard` holds references to:

- The shard's [Store], which wraps a Lucene `Directory` and provides access to the shard's Lucene index files on disk.
- The shard's [Engine], which manages all indexing and search operations for this shard, writing to both the
[Translog] and the Lucene files managed by the `Store`.

The lifecycle of these objects (creation, recovery, and teardown) is controlled by the [IndicesClusterStateService],
which reacts to cluster state changes and updates local state accordingly (see
the [IndicesClusterStateService](#indicesclusterstateservice) section).

### Store

(Data lives beyond a high level IndexShard instance. Continue to exist until all references to the Store go away, then Lucene data is removed)
[Store]:https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/store/Store.java

[RefCounted]:https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/core/RefCounted.java

[StoreFileMetadata]:https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/store/StoreFileMetadata.java

[FsDirectoryFactory]:https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java

The [Store] is the lowest-level Elasticsearch persistence abstraction for a shard. Each shard has a single
dedicated `Store` that wraps a
Lucene [Directory](https://lucene.apache.org/core/10_3_2/core/org/apache/lucene/store/Directory.html), which is Lucene's
own file-system abstraction used to read and write index files on disk.
Lucene's `Directory` is a pure I/O abstraction: callers open
an [IndexInput](https://lucene.apache.org/core/10_3_2/core/org/apache/lucene/store/IndexInput.html) to read a named file
and create an [IndexOutput](https://lucene.apache.org/core/10_3_2/core/org/apache/lucene/store/IndexOutput.html) to
write one. The `Store` builds on the Lucene `Directory` capabilities by adding reference counting and corruption
detection, exposing committed file metadata and enforcing integrity invariants.

#### Reference Counting and Lifecycle

The `Store` implements [RefCounted]. Callers call `store.incRef()` before using it and `store.decRef()` in
a `finally` block when done. Once the reference count drops to zero the store is closed and the underlying Lucene
directory is cleaned up. The `Store` also receives a [ShardLock] at construction time and only releases it
once closed, allowing other threads waiting to acquire the lock for this shard to proceed.

#### Backing Directory

The Lucene `Directory` used by a `Store` is created by
an [IndexStorePlugin.DirectoryFactory](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/plugins/IndexStorePlugin.java#L40).
The default built-in implementation is [FsDirectoryFactory], which
supports [several store types](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java#L107)
selectable via the
`index.store.type` [setting](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/IndexModule.java#L111):

- `hybridfs` (default):
a [HybridDirectory](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java#L180)
that delegates
to a [MMapDirectory](https://lucene.apache.org/core/10_3_2/core/org/apache/lucene/store/MMapDirectory.html) for
performance-sensitive file types (postings, term vectors index, norms, vectors, etc.)
and [NIOFSDirectory](https://lucene.apache.org/core/10_3_2/core/org/apache/lucene/store/NIOFSDirectory.html) for files
where sequential access is preferred (e.g. stored fields). Direct I/O is also enabled for vector index files when
supported by the OS. The `HybridDirectory` selects between mmap and NIO on a per-file basis
by [checking](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java#L252)
the file's Lucene extension and I/O context.
- `mmapfs`: uses `MMapDirectory` for all files.
- `niofs`: uses `NIOFSDirectory` for all files.
- `fs`: automatically selects the best type for the underlying file system.

#### MetadataSnapshot

The `Store`
exposes [MetadataSnapshots](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/store/Store.java#L820)
for each index commit, read and constructed from the Lucene `segments_N` files. A `MetadataSnapshot` is a point-in-time
map from filename to [StoreFileMetadata] for the committed files belonging to a Lucene index commit, produced by an
Elasticsearch flush.
Each [StoreFileMetadata] includes the file's name, on-disk length, CRC32 checksum (from the Lucene file footer), the
Lucene version that wrote it, and a `writerUuid` that uniquely identifies the writer.

`MetadataSnapshot`s are leveraged by several Elasticsearch workflows, including [peer recovery](#peer-recovery) and
replica shard allocation (via `TransportNodesListShardStoreMetadata`). They are used to compare the on-disk state of two
distinct shards and calculate how much data needs to be transferred to bring them into sync.

#### Concurrency

[NodeEnvironment]:https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java

[ShardLock]:https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/env/ShardLock.java

Access to a shard's on-disk data is protected by three layers of locking, each scoped to a different level.

The [ShardLock] is a node-wide, coarse-grained lock managed by [NodeEnvironment]. It is backed by a
`Semaphore` and guarantees that at most one owner at a time has write access to a given shard directory
within a JVM process. The
`Store` [is given](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/store/Store.java#L169)
a `ShardLock` at creation time. It holds this lock for its entire lifetime, ensuring that write operations (e.g.
creating an `IndexWriter`, deleting shard files, or recovering from another shard) have exclusive access to the shard
directory.
Callers that need to access the directory without a live `Store` (e.g. `TransportNodesListShardStoreMetadata` reading
metadata for allocation
decisions) [acquire a temporary](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java#L182)
`ShardLock` for the duration of the read.

The [metadataLock](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/store/Store.java#L168)
is an in-process `ReentrantReadWriteLock` inside the `Store`. It guards access to the
directory's file listing and segment metadata. Operations that only read metadata (e.g. `getMetadata` with an
existing
commit) [take the read lock](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/store/Store.java#L300),
allowing concurrent readers. Operations that structurally modify the
directory (e.g. renaming temporary recovery files via `renameTempFilesSafe`, cleaning up stale files
via `cleanupAndVerify`, or running `CheckIndex`) take the write lock, which excludes all readers and other
writers until the operation completes.

Lucene's [IndexWriter](https://lucene.apache.org/core/10_3_2/core/org/apache/lucene/index/IndexWriter.html) write lock
is a native file lock (`write.lock`) that Lucene places in the index
directory. It prevents two `IndexWriter` instances from ever opening the same directory simultaneously, even
across processes. When the `Store` needs to read metadata from a directory that has no active engine (e.g.
during `getMetadata(commit=null, lockDirectory=true)`), it acquires the Lucene write lock together with the
`metadataLock` write lock, ensuring no writer can interfere. In normal operation, the running `InternalEngine`
already holds this
lock [through its IndexWriter](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java#L149).

To summarize, `ShardLock` is a JVM-level lock enforced across the entire node, `metadataLock`
coordinates in-process readers and writers within the `Store`, and the Lucene write lock guards the raw
directory at the file-system level.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


#### Corruption

The `Store` maintains
corruption [markers](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/store/Store.java#L142)
as special files prefixed with `corrupted_` written into the Lucene directory.
When an unrecoverable I/O error or checksum mismatch
is [detected](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/store/Store.java#L1372),
a marker file is written containing the serialized exception. Subsequent calls to `failIfCorrupted()` scan for these
marker files and throw a `CorruptIndexException` if any are found, preventing any further operations on a known-bad
shard. Corruption markers are removed only after the shard has been successfully recovered from another source (e.g. a
primary or a snapshot).

### Engine

The [Engine] abstract class is the Elasticsearch abstraction that manages and coordinates operations on the running
shard index. Where the `Store` manages files on disk, the `Engine` owns the write
path (indexing, deletion, no-ops), the read path (searcher acquisition, real-time GET), and Lucene lifecycle
operations (refresh, flush, merge). It also controls the translog, ensuring that every acknowledged write is
durably recorded before a response is sent.

The main implementation is [InternalEngine], used for all read-write shards (primary and replica). Other
implementations serve more specialized roles. For example,
the [ReadOnlyEngine](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java)
gives read-only access to a frozen shard, and
the [NoOpEngine](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java)
acts as a placeholder for shards belonging to a closed index. It exists to allow shards of closed indices to be
correctly replicated in case of a node failure.

#### Segments, Refresh, and Flush

Lucene organizes index data on disk into immutable files called *segments*. They are created whenever the in-memory
write buffer is flushed or when merges combine existing segments into larger ones.
The [IndexWriter](https://lucene.apache.org/core/10_3_2/core/org/apache/lucene/index/IndexWriter.html)
accumulates writes in memory and periodically flushes new segments. The `InternalEngine` wraps an `IndexWriter` (for
writes) and a pair of internal and
external [reader manager](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java#L151)
that wrap
Lucene [DirectoryReader](https://lucene.apache.org/core/10_3_2/core/org/apache/lucene/index/DirectoryReader.html)s and
map Elasticsearch concepts onto Lucene's:

- An Elasticsearch `refresh` triggers a Lucene NRT reader reopen (via `DirectoryReader.openIfChanged()`), making
recently indexed documents searchable without writing a full commit to disk. Refreshes do not call
`IndexWriter.commit()` and do not persist data durably.
- An Elasticsearch `flush` calls `IndexWriter.commit()`, writing a durable commit point to disk. A flush is
what allows the translog to be safely truncated up to that commit.

This distinction is a core piece of Elasticsearch's durability model. Documents are searchable after a refresh but
only durably stored after a flush. In between, the translog bridges the gap. Every write is appended to the
translog on disk before being acknowledged, so that it can be replayed in the event of a crash.

The detailed mechanics of how a write flows through the engine (including translog interaction, sequence number
assignment, and version map updates) are covered in the [Translog](#translog) and [Indexing / CRUD](#indexing--crud)
sections.

### IndicesClusterStateService

[ClusterStateApplier]:https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/cluster/ClusterStateApplier.java

[ShardStateAction]:https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java

[RecoverySource]:https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java

[ShardRouting]:https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java

The [IndicesClusterStateService] is a high-priority [ClusterStateApplier] of the [ClusterApplierService] (
see [Cluster State Application](#cluster-state-application)).
Any time a new cluster state is published, [IndicesClusterStateService] checks whether the state of indices
and shards has changed and updates the local node's shards to match accordingly.

Its `doApplyClusterState` method goes through the following operations in order each time a new cluster state
is applied:

1. [Delete indices](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java#L353)
that no longer exist in the new cluster state: closes each `IndexShard` (which closes its
`Engine`, flushing first), releases the `Store` reference, and deletes the on-disk shard directory.
2. [Remove shards](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java#L355)
that are no longer assigned to this node: closes the `Engine` (without flushing, unflushed data is already in the
translog and safe), releases the `Store` reference, but leaves the on-disk files intact for later cleanup by
`IndicesStore`.
3. [Update index metadata](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java#L357):
if index settings changed, propagates them in-memory to each `IndexShard` and then to its `Engine` (e.g. merge
scheduler config, GC deletes policy, soft-delete retention). If mappings changed, updates the `MapperService`. No
disk writes happen here.
4. [Create or update shards](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java#L359):
for each [ShardRouting] targeting this node in `INITIALIZING` state, creates a new `Store` and `IndexShard` and kicks
off recovery. For already-active shards, updates their routing metadata in-place.

The below diagram illustrates the end-to-end flow from a master publishing a new cluster state containing a
newly assigned shard down to the `Engine` becoming active.

```mermaid
sequenceDiagram
participant M as Master Node
participant CA as ClusterApplierService
participant ICSS as IndicesClusterStateService
participant IS as IndexShard
participant S as Store
participant E as InternalEngine

rect rgb(255, 248, 240)
Note over M,CA: Cluster State Committed
M->>CA: ApplyCommitRequest (new ClusterState)
CA->>ICSS: applyClusterState(ClusterChangedEvent)
end

rect rgb(240, 248, 255)
Note over ICSS,S: Shard Creation
ICSS->>ICSS: doApplyClusterState()<br/>deleteIndices()<br/>removeIndicesAndShards()<br/>updateIndices()
ICSS->>S: new Store(shardDirectory)
ICSS->>IS: new IndexShard(store, shardRouting, ...)
ICSS->>IS: startRecovery(recoverySource)
end

rect rgb(240, 255, 240)
Note over IS,E: Recovery and Engine Start
IS->>S: validate / prepare directory
IS->>E: new InternalEngine(engineConfig)
E->>S: IndexWriter.open(store.directory)
Note right of E: Engine open, shard STARTED
end

rect rgb(255, 240, 240)
Note over IS,M: Report Back to Master
IS->>M: ShardStateAction.shardStarted()
Note left of M: Master updates ClusterState<br/>marks shard as STARTED
end
```

For a newly assigned shard, `createShard`
first [acquires](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java#L754)
a `ShardLock` (preventing concurrent access to the same shard directory from a concurrent request) or throws a
`ShardLockObtainFailedException` if it fails to do so. It
then [creates](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/IndexService.java#L560)
a `Store` referencing the shard's on-disk directory,
and [instantiates](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/index/IndexService.java#L569)
an `IndexShard`. Recovery is
then [triggered](https://github.com/elastic/elasticsearch/blob/v9.3.0/server/src/main/java/org/elasticsearch/indices/IndicesService.java#L993)
based on the [RecoverySource] in the [ShardRouting].

The details of recovery, including how the `Engine` is created and started, are covered in the
[Peer Recovery](#peer-recovery), [Snapshot Recovery](#snapshot-recovery), and [Local Shards Recovery](#local-shards-recovery)
sections.

### Translog

Expand Down Expand Up @@ -1319,7 +1576,8 @@ Flushes may also be automatically initiated by Elasticsearch, e.g., if the trans
[`Location`]:https://github.com/elastic/elasticsearch/blob/693f3bfe30271d77a6b3147e4519b4915cbb395d/server/src/main/java/org/elasticsearch/index/translog/Translog.java#L977
[`AsyncIOProcessor`]:https://github.com/elastic/elasticsearch/blob/main/server/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java

A bulk request will repeateadly call ultimately the Engine methods such as [`index()` or `delete()`] which adds operations to the Translog.
A bulk request will repeatedly call ultimately the Engine methods such as [`index()` or `delete()`] which adds
operations to the Translog.
Finally, the AfterWrite action of the [`TransportWriteAction`] will call [`indexShard.syncAfterWrite()`] which will put the last written translog [`Location`] of the bulk request into a [`AsyncIOProcessor`] that is responsible for gradually fsync'ing the Translog and notifying any waiters.
Ultimately the bulk request is notified that the translog has fsync'ed past the requested location, and can continue to acknowledge the bulk request.
This process involves multiple writes to the translog before the next fsync(), and this is done so that we amortize the cost of the translog's fsync() operations across all writes.
Expand Down
Loading