Make transaction log publish atomic in local file system#27904
Make transaction log publish atomic in local file system#27904chenjian2664 wants to merge 3 commits intotrinodb:masterfrom
Conversation
|
@chenjian2664 note that this PR relates to #27484
This is not true. |
|
@chenjian2664 could you please add reference in the description to recorded CI failures against object storage providers? |
findinpath
left a comment
There was a problem hiding this comment.
The flakiness is generated by testing against the local file system.
See #27484 (comment)
If the same issue would occur against an object storage, we would have a bug in the implementation.
|
@findinpath just considering the reading phrase, when the queries arrive at the connector, it is possible it reads the same version of the table right? |
yes, sure. This is an operation which reads from the same table it inserts into. In case of dealing with a non-blind append, we will be checking whether other concurrent operations that are also non-blind inserts have inserted data and if so, fail the operation: So, even if two operations are reading the same version of the table, only one of these operations will actually succeed in performing the commit successfully. |
|
@findinpath Thanks for guiding me to the place that committing the insert. checkForConcurrentTransactionConflicts at all.
|
61bb542 to
b5cf225
Compare
b5cf225 to
029ce74
Compare
findinpath
left a comment
There was a problem hiding this comment.
Well done @chenjian2664 ❤️
|
Please add |
|
@findinpath Can we re-enable disabled tests in TestDeltaLakeLocalConcurrentWritesTest now? |
0a58e78 to
2e2833e
Compare
@findinpath Nice! @chenjian2664 Could you re-enable those tests? |
|
ci failures https://github.com/trinodb/trino/actions/runs/21194534248/job/60967948329?pr=27904 The rename is atomic but not exclusive :( |
4242836 to
a1d51c2
Compare
|
@findinpath @ebyhr I am afraid we can't enable the tests of #21725 , the |
@chenjian2664 yes, i remember now - let's handle this in a separate contribution. |
Previously, transaction log publishing relied on file creation with `CREATE_NEW` to ensure that only one concurrent writer succeeds. While this prevents multiple writers from winning, it does not guarantee atomic visibility of the log contents. Readers may observe an empty or partially written file while the writer is still writing the transaction log. This change writes the transaction log to a temporary file and publishes it using an atomic rename, ensuring readers only observe a fully written log or no file at all.
a1d51c2 to
2c980f6
Compare
testConcurrentInsertsSelectingFromTheSameTable| } | ||
|
|
||
| boolean lockCreated = false; | ||
| Path lockPath = path.resolveSibling(path.getFileName() + ".lock"); |
There was a problem hiding this comment.
It looks like we have two mechanisms (file locks and atomic moves) to achieve same goal (exclusion).
So, what is lock needed for?
There was a problem hiding this comment.
the file locks is for the exclusive - only one thread should able to create target file at a time.
The move is for the visibility(atomic), readers should either not see the target file or see the file exists and with the full content.
There was a problem hiding this comment.
pls document this line of thinking with a comment
There was a problem hiding this comment.
@chenjian2664 lock file is "cooperative locking" - the parties involved need to agree on the "protocol" they are following (the lock file existence, its name, locked region).
rename seems to be sufficient for "cooperative exclusion"
i.e. if there is no lock file at all, what could possibly go wrong?
There was a problem hiding this comment.
@findepi The reason is Files.move is not guarantee to be exclusive it is only guarantee to be atomic.
If no lock there then probably multi-threads will think it success moved, but actually being overwrite by others-who commit last win.
There was a problem hiding this comment.
I believe that Files.move with ATOMIC_MOVE and without REPLACE_EXISTING is exclusive.
At least it's IMO documented as such.
Am i wrong?
There was a problem hiding this comment.
I tested with below code :
static void main()
throws Exception
{
Path tempDirectory = Files.createTempDirectory("trino-test-");
ExecutorService executorService = Executors.newFixedThreadPool(10);
CyclicBarrier barrier = new CyclicBarrier(10);
Path targetFilePath = tempDirectory.resolve("target.test");
for (int i = 0; i < 10; i++) {
Path tmpFilePath = tempDirectory.resolve("file-" + i + ".test");
executorService.execute(() -> {
try {
writeFile(tmpFilePath);
barrier.await();
Files.move(tmpFilePath, targetFilePath, ATOMIC_MOVE);
System.out.println("Moved " + tmpFilePath.getFileName() + " to target.test");
}
catch (Exception e) {
throw new RuntimeException(e);
}
});
}
Thread.sleep(5000);
executorService.shutdown();
}
private static void writeFile(Path tmpFilePath)
throws IOException
{
try (OutputStream out = Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE)) {
out.write(("this is " + tmpFilePath.getFileName()).getBytes());
}
}
multiple tmp files think there success moved. could you please help to confirm, or I am using wrongly with the move?
There was a problem hiding this comment.
You're right. I have not read Files.move lengthy javadoc carefully enough. Thanks for standing me corrected.
We may get away from this by
- call some native rename / renameat / renameat2 function. Linux kernel awesomely supports this basic operation of renaming a file without overriding a target, should it exist. It's Java's fault it doesn't work
- not great as it's going to be some unpretty JNA / FFM call and may or may not work on MacOS & Linux the same way
- workaround the limitation with
Files.createLink. It seems to deliver exclusive target creation (on MacOS at least)- not great as this function seems quite platform-dependent. it doesn't seem to guarantee exclusiveness
- remind ourselves we're doing this for our tests only. While
LocalFileSystemis a production facility which needs to be implemented well, the only call site we're really concerned about isLocalTransactionLogSynchronizer(which was supposed to be moved to test dir). For internal test purposes we can simply synchronize in memory on a lock. BTW this is howFileHiveMetastoreperforms synchronization around filesystem operations.
Let's do this last option.
There was a problem hiding this comment.
Sorry for the late reply.
I did consider an in-memory lock at first. My concern is about how we validate the log synchronizer design. We're currently relying entirely on the semantics of createExclusive and assuming the synchronizer logic is correct.
If we can't prove the local concurrency behavior is correct, how can we trust it in the cloud, even if the filesystem provides the same createExclusive guarantees, that’s why I wanted this to work across multiple clusters. It's still not a strong guarantee, but at least it would give us some confidence that matching createExclusive semantics won't lead to corrupted Delta logs or wrongly thinking it is success committed.
But I looked the FileHiveMetastore you've mentioned seems I am overthinking ?
And I don't found tests about the concurrent writing of cloud log synchronizer, is it because of the flaky, or I am missing the tests at somewhere, what's the reason behind?
92ca936 to
0118107
Compare
| throw new FileAlreadyExistsException(location.toString()); | ||
| } | ||
|
|
||
| Closer closer = Closer.create(); |
| Path lockPath = path.resolveSibling(path.getFileName() + ".lock"); | ||
| FileChannel channel = FileChannel.open(lockPath, CREATE_NEW, WRITE); | ||
| closer.register(channel); | ||
| closer.register(() -> Files.deleteIfExists(lockPath)); |
There was a problem hiding this comment.
The lock handling is quite problematic
- what if lock file was left behind by previous attempt (process killed and didn't remove)? the CREATE_NEW will trigger lock file creation failure, application won't recover
- assuming this is fixed, imagine 3 threads contending for the write. A locks, B fails and removes lock file during its cleanup, C creates new lock file and also locks. Now A and C both think they have the lock.
if locks are gone (https://github.com/trinodb/trino/pull/27904/changes#r2730875323), great
if not, these problems need to be addressed somehow
There was a problem hiding this comment.
For the first question, I think it is the problem, but note there will be always problematic if we use the CREATE_NEW strategy to create the file, i,e in previous implementation we use it to create the file then writing into it, and when process killed during writing we are facing the same issue.
Second question, the FileChannel channel = FileChannel.open(lockPath, CREATE_NEW, WRITE); will guarantee that only one thread can register the line 83, then will cleanup the lock. there shouldn't be a thread fail without creating the lock but registered the cleanup logic
There was a problem hiding this comment.
For the first question, I think it is the problem, but note there will be always problematic if we use the
CREATE_NEWstrategy to create the file, i,e in previous implementation we use it to create the file then writing into it, and when process killed during writing we are facing the same issue.
Except that we're writing to a temp file with randomized name, so it's less likely to matter that we left a file behind.
Second question, the
FileChannel channel = FileChannel.open(lockPath, CREATE_NEW, WRITE);will guarantee that only one thread can register the line 83, then will cleanup the lock
If line numbers match what GH is showing me
then, the deletion in 83 is registered before lock is taken.
There was a problem hiding this comment.
I am sure we can make file locks work reasonable well, but let's first consider this one:
if locks are gone (https://github.com/trinodb/trino/pull/27904/changes#r2730875323), great
i.e. maybe we don't need any file locks at all
There was a problem hiding this comment.
I am not sure understand you point correctly - if you are saying we could remove the FileLock lock = channel.lock() line, then I think it is make sense, I will do it, since there should only one thread can pass the line 81 at a time.
I would like to discuss about the point if you think we should remove FileChannel at all :)
refer #27904 (comment) - the move itself doesn't guarantee the exclusive
There was a problem hiding this comment.
then, the deletion in 83 is registered before lock is taken.
Oh, I think here is a gap we understanding the "lock", actually the line 81 -> "FileChannel _ = FileChannel.open(lockPath, CREATE_NEW, WRITE)", it has the "lock"(exclusive) semantic, there should only one thread can pass here, the taken lock from file channel I put there is misleading so I removed now.
| Files.createDirectories(path.getParent()); | ||
|
|
||
| // see if we can stop early without acquire locking | ||
| if (Files.exists(path)) { | ||
| throw new FileAlreadyExistsException(location.toString()); | ||
| } | ||
|
|
||
| Path lockPath = path.resolveSibling(path.getFileName() + ".lock"); | ||
| Closer closer = Closer.create(); | ||
| try { | ||
| try (FileChannel _ = FileChannel.open(lockPath, CREATE_NEW, WRITE)) { | ||
| closer.register(() -> Files.deleteIfExists(lockPath)); | ||
| if (Files.exists(path)) { | ||
| throw new FileAlreadyExistsException(location.toString()); | ||
| } | ||
|
|
||
| Path tmpFilePath = path.resolveSibling(path.getFileName() + "." + randomUUID() + ".tmp"); | ||
| try (OutputStream out = Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE)) { | ||
| closer.register(() -> Files.deleteIfExists(tmpFilePath)); | ||
| out.write(data); | ||
| } | ||
|
|
||
| // Ensure that the file is only visible when fully written | ||
| Files.move(tmpFilePath, path, ATOMIC_MOVE); | ||
| } | ||
| } | ||
| catch (IOException e) { | ||
| throw closer.rethrow(handleException(location, e)); | ||
| } | ||
| finally { | ||
| closer.close(); | ||
| } |
There was a problem hiding this comment.
| Files.createDirectories(path.getParent()); | |
| // see if we can stop early without acquire locking | |
| if (Files.exists(path)) { | |
| throw new FileAlreadyExistsException(location.toString()); | |
| } | |
| Path lockPath = path.resolveSibling(path.getFileName() + ".lock"); | |
| Closer closer = Closer.create(); | |
| try { | |
| try (FileChannel _ = FileChannel.open(lockPath, CREATE_NEW, WRITE)) { | |
| closer.register(() -> Files.deleteIfExists(lockPath)); | |
| if (Files.exists(path)) { | |
| throw new FileAlreadyExistsException(location.toString()); | |
| } | |
| Path tmpFilePath = path.resolveSibling(path.getFileName() + "." + randomUUID() + ".tmp"); | |
| try (OutputStream out = Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE)) { | |
| closer.register(() -> Files.deleteIfExists(tmpFilePath)); | |
| out.write(data); | |
| } | |
| // Ensure that the file is only visible when fully written | |
| Files.move(tmpFilePath, path, ATOMIC_MOVE); | |
| } | |
| } | |
| catch (IOException e) { | |
| throw closer.rethrow(handleException(location, e)); | |
| } | |
| finally { | |
| closer.close(); | |
| } | |
| Files.createDirectories(path.getParent()); | |
| // Check target path before creating lock file | |
| if (Files.exists(path)) { | |
| throw new FileAlreadyExistsException(location.toString()); | |
| } | |
| Path lockPath = path.resolveSibling(path.getFileName() + ".lock"); | |
| Path tmpFilePath = path.resolveSibling(path.getFileName() + "." + randomUUID() + ".tmp"); | |
| try (Closer closer = Closer.create()) { | |
| Files.write(lockPath, new byte[0], CREATE_NEW); | |
| closer.register(() -> Files.delete(lockPath)); | |
| Files.write(tmpFilePath, data, CREATE_NEW); | |
| closer.register(() -> Files.deleteIfExists(tmpFilePath)); | |
| // Ensure that the file is only visible when fully written | |
| // Files.move with ATOMIC_MOVE is not guaranteed to be exclusive, hence the lock file | |
| Files.move(tmpFilePath, path, ATOMIC_MOVE); | |
| } |
|
superseded by #28092 |
Description
testConcurrentInsertsSelectingFromTheSameVersionedTableandtestConcurrentInsertsSelectingFromTheSameTemporalVersionedTableinTestDeltaLakeLocalConcurrentWritesTest#21725The Delta Lake connector does not guarantee that queries are executed in a strict order. Queries may start concurrently, which makes count-based assertions in tests non-deterministic.
Additional context and related issues
Implementation refer https://github.com/trinodb/trino/blob/master/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/writer/AzureTransactionLogSynchronizer.javaThe current Delta transaction commit flow for
INSERToperations works as follows:Read the current table version.
If the table version is greater than the current log version, this indicates a concurrent transaction is being committed. Before writing a new log entry, we perform conflict detection:
a) - Read all actions newly added to the table since the current transaction started.
b) - If any of those actions conflict (overlap) with the current transaction, the commit is considered unsafe and fails.
If no conflicts are detected, write the transaction log entry with currentVersion + 1.
There is a subtle race condition in the retry path, since we have retry when failing the commit log.
On a retry, it is possible (with very low probability) that when the commit logic reaches the conflict detection step (2.a), another concurrent writer has created the log file but has not finished writing its contents. In this case:
* The retry observes a partially written transaction log.
* The newly added actions appear empty or incomplete when current commit reading from the log file.
As a result, the conflict detection passes trivially (no overlapping actions are observed). The retry then proceeds to successfully write its own log entry, step 3.
This leads to an incorrect outcome: the commit succeeds even though it should have detected a conflict with the concurrent transaction.
Update
the pr using the workaround by implementing the
createExclusivefor theLocalOutputFileRelease notes
(x) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: