Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,25 @@
*/
package io.trino.filesystem.local;

import com.google.common.io.Closer;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.memory.context.AggregatedMemoryContext;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

import static io.trino.filesystem.local.LocalUtils.handleException;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardOpenOption.CREATE_NEW;
import static java.nio.file.StandardOpenOption.WRITE;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;

public class LocalOutputFile
implements TrinoOutputFile
Expand All @@ -50,14 +56,52 @@ public OutputStream create(AggregatedMemoryContext memoryContext)
{
try {
Files.createDirectories(path.getParent());
OutputStream stream = Files.newOutputStream(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
OutputStream stream = Files.newOutputStream(path, CREATE_NEW, WRITE);
return new LocalOutputStream(location, stream);
}
catch (IOException e) {
throw handleException(location, e);
}
}

@Override
public void createExclusive(byte[] data)
throws IOException
{
Files.createDirectories(path.getParent());
Comment thread
findepi marked this conversation as resolved.

// see if we can stop early without acquire locking
if (Files.exists(path)) {
throw new FileAlreadyExistsException(location.toString());
}

Path lockPath = path.resolveSibling(path.getFileName() + ".lock");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we have two mechanisms (file locks and atomic moves) to achieve same goal (exclusion).
So, what is lock needed for?

Copy link
Copy Markdown
Contributor Author

@chenjian2664 chenjian2664 Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the file locks is for the exclusive - only one thread should able to create target file at a time.
The move is for the visibility(atomic), readers should either not see the target file or see the file exists and with the full content.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls document this line of thinking with a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenjian2664 lock file is "cooperative locking" - the parties involved need to agree on the "protocol" they are following (the lock file existence, its name, locked region).
rename seems to be sufficient for "cooperative exclusion"

i.e. if there is no lock file at all, what could possibly go wrong?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi The reason is Files.move is not guarantee to be exclusive it is only guarantee to be atomic.
If no lock there then probably multi-threads will think it success moved, but actually being overwrite by others-who commit last win.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that Files.move with ATOMIC_MOVE and without REPLACE_EXISTING is exclusive.
At least it's IMO documented as such.

Am i wrong?

Copy link
Copy Markdown
Contributor Author

@chenjian2664 chenjian2664 Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested with below code :


    static void main()
            throws Exception
    {
        Path tempDirectory = Files.createTempDirectory("trino-test-");
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        CyclicBarrier barrier = new CyclicBarrier(10);
        Path targetFilePath = tempDirectory.resolve("target.test");
        for (int i = 0; i < 10; i++) {
            Path tmpFilePath = tempDirectory.resolve("file-" + i + ".test");
            executorService.execute(() -> {
                try {
                    writeFile(tmpFilePath);
                    barrier.await();
                    Files.move(tmpFilePath, targetFilePath, ATOMIC_MOVE);
                    System.out.println("Moved " + tmpFilePath.getFileName() + " to target.test");
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }

        Thread.sleep(5000);
        executorService.shutdown();
    }

    private static void writeFile(Path tmpFilePath)
            throws IOException
    {
        try (OutputStream out = Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE)) {
            out.write(("this is " + tmpFilePath.getFileName()).getBytes());
        }
    }

multiple tmp files think there success moved. could you please help to confirm, or I am using wrongly with the move?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I have not read Files.move lengthy javadoc carefully enough. Thanks for standing me corrected.

We may get away from this by

  • call some native rename / renameat / renameat2 function. Linux kernel awesomely supports this basic operation of renaming a file without overriding a target, should it exist. It's Java's fault it doesn't work
    • not great as it's going to be some unpretty JNA / FFM call and may or may not work on MacOS & Linux the same way
  • workaround the limitation with Files.createLink. It seems to deliver exclusive target creation (on MacOS at least)
    • not great as this function seems quite platform-dependent. it doesn't seem to guarantee exclusiveness
  • remind ourselves we're doing this for our tests only. While LocalFileSystem is a production facility which needs to be implemented well, the only call site we're really concerned about is LocalTransactionLogSynchronizer (which was supposed to be moved to test dir). For internal test purposes we can simply synchronize in memory on a lock. BTW this is how FileHiveMetastore performs synchronization around filesystem operations.

Let's do this last option.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply.
I did consider an in-memory lock at first. My concern is about how we validate the log synchronizer design. We're currently relying entirely on the semantics of createExclusive and assuming the synchronizer logic is correct.

If we can't prove the local concurrency behavior is correct, how can we trust it in the cloud, even if the filesystem provides the same createExclusive guarantees, that’s why I wanted this to work across multiple clusters. It's still not a strong guarantee, but at least it would give us some confidence that matching createExclusive semantics won't lead to corrupted Delta logs or wrongly thinking it is success committed.

But I looked the FileHiveMetastore you've mentioned seems I am overthinking ?

And I don't found tests about the concurrent writing of cloud log synchronizer, is it because of the flaky, or I am missing the tests at somewhere, what's the reason behind?

Closer closer = Closer.create();
try {
try (FileChannel _ = FileChannel.open(lockPath, CREATE_NEW, WRITE)) {
closer.register(() -> Files.deleteIfExists(lockPath));
if (Files.exists(path)) {
throw new FileAlreadyExistsException(location.toString());
}

Path tmpFilePath = path.resolveSibling(path.getFileName() + "." + randomUUID() + ".tmp");
try (OutputStream out = Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE)) {
closer.register(() -> Files.deleteIfExists(tmpFilePath));
out.write(data);
}

// Ensure that the file is only visible when fully written
Files.move(tmpFilePath, path, ATOMIC_MOVE);
}
}
catch (IOException e) {
throw closer.rethrow(handleException(location, e));
}
finally {
closer.close();
}
Comment on lines +71 to +102
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Files.createDirectories(path.getParent());
// see if we can stop early without acquire locking
if (Files.exists(path)) {
throw new FileAlreadyExistsException(location.toString());
}
Path lockPath = path.resolveSibling(path.getFileName() + ".lock");
Closer closer = Closer.create();
try {
try (FileChannel _ = FileChannel.open(lockPath, CREATE_NEW, WRITE)) {
closer.register(() -> Files.deleteIfExists(lockPath));
if (Files.exists(path)) {
throw new FileAlreadyExistsException(location.toString());
}
Path tmpFilePath = path.resolveSibling(path.getFileName() + "." + randomUUID() + ".tmp");
try (OutputStream out = Files.newOutputStream(tmpFilePath, CREATE_NEW, WRITE)) {
closer.register(() -> Files.deleteIfExists(tmpFilePath));
out.write(data);
}
// Ensure that the file is only visible when fully written
Files.move(tmpFilePath, path, ATOMIC_MOVE);
}
}
catch (IOException e) {
throw closer.rethrow(handleException(location, e));
}
finally {
closer.close();
}
Files.createDirectories(path.getParent());
// Check target path before creating lock file
if (Files.exists(path)) {
throw new FileAlreadyExistsException(location.toString());
}
Path lockPath = path.resolveSibling(path.getFileName() + ".lock");
Path tmpFilePath = path.resolveSibling(path.getFileName() + "." + randomUUID() + ".tmp");
try (Closer closer = Closer.create()) {
Files.write(lockPath, new byte[0], CREATE_NEW);
closer.register(() -> Files.delete(lockPath));
Files.write(tmpFilePath, data, CREATE_NEW);
closer.register(() -> Files.deleteIfExists(tmpFilePath));
// Ensure that the file is only visible when fully written
// Files.move with ATOMIC_MOVE is not guaranteed to be exclusive, hence the lock file
Files.move(tmpFilePath, path, ATOMIC_MOVE);
}

}

@Override
public void createOrOverwrite(byte[] data)
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,6 @@ void beforeAll()
fileSystem = new LocalFileSystem(tempDirectory);
}

@Override
protected boolean supportsCreateExclusive()
{
return false;
}

@AfterEach
void afterEach()
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.trino.spi.connector.ConnectorSession;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.FileAlreadyExistsException;

Expand All @@ -42,8 +41,8 @@ public LocalTransactionLogSynchronizer(DeltaLakeFileSystemFactory fileSystemFact
public void write(ConnectorSession session, VendedCredentialsHandle credentialsHandle, String clusterId, Location newLogEntryPath, byte[] entryContents)
Comment thread
chenjian2664 marked this conversation as resolved.
{
TrinoFileSystem fileSystem = fileSystemFactory.create(session, credentialsHandle);
try (OutputStream outputStream = fileSystem.newOutputFile(newLogEntryPath).create()) {
outputStream.write(entryContents);
try {
fileSystem.newOutputFile(newLogEntryPath).createExclusive(entryContents);
}
catch (FileAlreadyExistsException e) {
throw new TransactionConflictException("Conflict detected while writing Transaction Log entry " + newLogEntryPath, e);
Expand Down