-
Notifications
You must be signed in to change notification settings - Fork 25.7k
Associate translog with Lucene index commit for searchable snapshots shards #53459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
406d4de
32df1c4
4ab94ce
557d924
f3fc163
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,8 @@ | |
| import org.apache.lucene.index.Term; | ||
| import org.apache.lucene.store.AlreadyClosedException; | ||
| import org.elasticsearch.Version; | ||
| import org.elasticsearch.common.Nullable; | ||
| import org.elasticsearch.common.Strings; | ||
| import org.elasticsearch.common.UUIDs; | ||
| import org.elasticsearch.common.bytes.BytesArray; | ||
| import org.elasticsearch.common.bytes.BytesReference; | ||
|
|
@@ -1838,20 +1840,41 @@ public static String createEmptyTranslog(final Path location, final long initial | |
|
|
||
| static String createEmptyTranslog(Path location, long initialGlobalCheckpoint, ShardId shardId, | ||
| ChannelFactory channelFactory, long primaryTerm) throws IOException { | ||
| return createEmptyTranslog(location, shardId, initialGlobalCheckpoint, primaryTerm, null, channelFactory); | ||
| } | ||
|
|
||
| public static String createEmptyTranslog(final Path location, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest adding a big warning in a comment here saying how dangerous it is to specify the translog UUID and that it should only be used for shards that will see no indexing. I'm also idly wondering about how hard it would be to make this
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sure, I added some doc in 557d924
I find the idea interesting but I'm not sure if it worths it; I'd prefer to not create translogs at all if they were not to be used. |
||
| final ShardId shardId, | ||
| final long initialGlobalCheckpoint, | ||
| final long primaryTerm, | ||
| @Nullable final String translogUUID, | ||
| @Nullable final ChannelFactory factory) throws IOException { | ||
| IOUtils.rm(location); | ||
| Files.createDirectories(location); | ||
| final Checkpoint checkpoint = | ||
| Checkpoint.emptyTranslogCheckpoint(0, 1, initialGlobalCheckpoint, 1); | ||
|
|
||
| final long generation = 1L; | ||
| final long minTranslogGeneration = 1L; | ||
| final ChannelFactory channelFactory = factory != null ? factory : FileChannel::open; | ||
| final String uuid = Strings.hasLength(translogUUID) ? translogUUID : UUIDs.randomBase64UUID(); | ||
| final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME); | ||
| final Path translogFile = location.resolve(getFilename(generation)); | ||
| final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, initialGlobalCheckpoint, minTranslogGeneration); | ||
|
|
||
| Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); | ||
| IOUtils.fsync(checkpointFile, false); | ||
| final String translogUUID = UUIDs.randomBase64UUID(); | ||
| TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1, | ||
| location.resolve(getFilename(1)), channelFactory, | ||
| new ByteSizeValue(10), 1, initialGlobalCheckpoint, | ||
| () -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, primaryTerm, | ||
| new TragicExceptionHolder(), seqNo -> { throw new UnsupportedOperationException(); }); | ||
| final TranslogWriter writer = TranslogWriter.create(shardId, uuid, generation, translogFile, channelFactory, | ||
| new ByteSizeValue(10), minTranslogGeneration, initialGlobalCheckpoint, | ||
| () -> { | ||
| throw new UnsupportedOperationException(); | ||
| }, () -> { | ||
| throw new UnsupportedOperationException(); | ||
| }, | ||
| primaryTerm, | ||
| new TragicExceptionHolder(), | ||
| seqNo -> { | ||
| throw new UnsupportedOperationException(); | ||
| }); | ||
| writer.close(); | ||
| return translogUUID; | ||
| return uuid; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License; | ||
| * you may not use this file except in compliance with the Elastic License. | ||
| */ | ||
| package org.elasticsearch.xpack.searchablesnapshots; | ||
|
|
||
| import org.apache.lucene.index.SegmentInfos; | ||
| import org.elasticsearch.index.IndexSettings; | ||
| import org.elasticsearch.index.seqno.SequenceNumbers; | ||
| import org.elasticsearch.index.shard.IndexEventListener; | ||
| import org.elasticsearch.index.shard.IndexShard; | ||
| import org.elasticsearch.index.shard.ShardId; | ||
| import org.elasticsearch.index.translog.Translog; | ||
| import org.elasticsearch.index.translog.TranslogException; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
|
|
||
| import java.nio.file.Path; | ||
|
|
||
| import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.isSearchableSnapshotStore; | ||
|
|
||
| public class SearchableSnapshotIndexEventListener implements IndexEventListener { | ||
|
|
||
| @Override | ||
| public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) { | ||
| assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); | ||
| associateNewEmptyTranslogWithIndex(indexShard); | ||
| } | ||
|
|
||
| private static void associateNewEmptyTranslogWithIndex(IndexShard indexShard) { | ||
| final ShardId shardId = indexShard.shardId(); | ||
| assert isSearchableSnapshotStore(indexShard.indexSettings().getSettings()) : "Expected a searchable snapshot shard " + shardId; | ||
| try { | ||
| final SegmentInfos segmentInfos = indexShard.store().readLastCommittedSegmentsInfo(); | ||
| final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); | ||
| final long primaryTerm = indexShard.getPendingPrimaryTerm(); | ||
| final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY); | ||
| final Path translogLocation = indexShard.shardPath().resolveTranslog(); | ||
| Translog.createEmptyTranslog(translogLocation, shardId, localCheckpoint, primaryTerm, translogUUID, null); | ||
| } catch (Exception e) { | ||
| throw new TranslogException(shardId, "failed to associate a new translog", e); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More for my curiosity than something that needs changing: why can we not
assert state == IndexShardState.RECOVERINGhere? The lifecycle of anIndexShardis still a bit opaque to me.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes more sense to assert here, there's no reason to not do it.