Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta lake S3 exclusive write reconciliation #23145

Merged
merged 5 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@
<artifactId>trino-filesystem-manager</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem-s3</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hive</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,27 @@
*/
package io.trino.plugin.deltalake;

import com.google.inject.AbstractModule;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.filesystem.s3.S3FileSystemConfig;
import io.trino.plugin.deltalake.transactionlog.writer.AzureTransactionLogSynchronizer;
import io.trino.plugin.deltalake.transactionlog.writer.GcsTransactionLogSynchronizer;
import io.trino.plugin.deltalake.transactionlog.writer.S3NativeTransactionLogSynchronizer;
import io.trino.plugin.deltalake.transactionlog.writer.S3ConditionalWriteLogSynchronizer;
import io.trino.plugin.deltalake.transactionlog.writer.S3LockBasedTransactionLogSynchronizer;
import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogSynchronizer;

import static com.google.inject.multibindings.MapBinder.newMapBinder;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;

public class DeltaLakeSynchronizerModule
implements Module
extends AbstractConfigurationAwareModule
{
@Override
public void configure(Binder binder)
protected void setup(Binder binder)
{
var synchronizerBinder = newMapBinder(binder, String.class, TransactionLogSynchronizer.class);

Expand All @@ -40,9 +45,27 @@ public void configure(Binder binder)
synchronizerBinder.addBinding("gs").to(GcsTransactionLogSynchronizer.class).in(Scopes.SINGLETON);

// S3
jsonCodecBinder(binder).bindJsonCodec(S3NativeTransactionLogSynchronizer.LockFileContents.class);
synchronizerBinder.addBinding("s3").to(S3NativeTransactionLogSynchronizer.class).in(Scopes.SINGLETON);
synchronizerBinder.addBinding("s3a").to(S3NativeTransactionLogSynchronizer.class).in(Scopes.SINGLETON);
synchronizerBinder.addBinding("s3n").to(S3NativeTransactionLogSynchronizer.class).in(Scopes.SINGLETON);
jsonCodecBinder(binder).bindJsonCodec(S3LockBasedTransactionLogSynchronizer.LockFileContents.class);
binder.bind(S3LockBasedTransactionLogSynchronizer.class).in(Scopes.SINGLETON);
binder.bind(S3ConditionalWriteLogSynchronizer.class).in(Scopes.SINGLETON);

install(conditionalModule(S3FileSystemConfig.class, S3FileSystemConfig::isSupportsExclusiveCreate,
s3SynchronizerModule(S3ConditionalWriteLogSynchronizer.class),
s3SynchronizerModule(S3LockBasedTransactionLogSynchronizer.class)));
}

private static Module s3SynchronizerModule(Class<? extends TransactionLogSynchronizer> synchronizerClass)
{
return new AbstractModule()
{
@Override
protected void configure()
{
var synchronizerBinder = newMapBinder(binder(), String.class, TransactionLogSynchronizer.class);
wendigo marked this conversation as resolved.
Show resolved Hide resolved
synchronizerBinder.addBinding("s3").to(synchronizerClass).in(Scopes.SINGLETON);
synchronizerBinder.addBinding("s3a").to(synchronizerClass).in(Scopes.SINGLETON);
synchronizerBinder.addBinding("s3n").to(synchronizerClass).in(Scopes.SINGLETON);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.transactionlog.writer;

import com.google.inject.Inject;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.spi.connector.ConnectorSession;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.FileAlreadyExistsException;

import static java.util.Objects.requireNonNull;

public class S3ConditionalWriteLogSynchronizer
implements TransactionLogSynchronizer
{
private final TrinoFileSystemFactory fileSystemFactory;

@Inject
S3ConditionalWriteLogSynchronizer(TrinoFileSystemFactory fileSystemFactory)
{
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
}

@Override
public void write(ConnectorSession session, String clusterId, Location newLogEntryPath, byte[] entryContents)
{
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
try {
fileSystem.newOutputFile(newLogEntryPath).createExclusive(entryContents);
}
catch (FileAlreadyExistsException e) {
throw new TransactionConflictException("Conflict detected while writing Transaction Log entry " + newLogEntryPath + " to S3", e);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public boolean isUnsafe()
{
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
*/
package io.trino.plugin.deltalake.transactionlog.writer;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.json.JsonCodec;
Expand Down Expand Up @@ -46,12 +44,12 @@
import static java.util.Objects.requireNonNull;

/**
* The S3 Native synhcornizer is a {@link TransactionLogSynchronizer} for S3 that requires no other dependencies.
* The S3 lock-based synchronizer is a {@link TransactionLogSynchronizer} for S3-compatible storage that doesn't support conditional writes
*/
public class S3NativeTransactionLogSynchronizer
public class S3LockBasedTransactionLogSynchronizer
implements TransactionLogSynchronizer
{
public static final Logger LOG = Logger.get(S3NativeTransactionLogSynchronizer.class);
public static final Logger LOG = Logger.get(S3LockBasedTransactionLogSynchronizer.class);

// TODO: add refreshing of log expiration time (https://github.com/trinodb/trino/issues/12008)
wendigo marked this conversation as resolved.
Show resolved Hide resolved
private static final Duration EXPIRATION_DURATION = Duration.of(5, MINUTES);
Expand All @@ -63,10 +61,10 @@ public class S3NativeTransactionLogSynchronizer
private final JsonCodec<LockFileContents> lockFileContentsJsonCodec;

@Inject
public S3NativeTransactionLogSynchronizer(TrinoFileSystemFactory fileSystemFactory, JsonCodec<LockFileContents> lockFileContentesCodec)
S3LockBasedTransactionLogSynchronizer(TrinoFileSystemFactory fileSystemFactory, JsonCodec<LockFileContents> lockFileContentsJsonCodec)
{
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.lockFileContentsJsonCodec = requireNonNull(lockFileContentesCodec, "lockFileContentesCodec is null");
this.lockFileContentsJsonCodec = requireNonNull(lockFileContentsJsonCodec, "lockFileContentsJsonCodec is null");
}

@Override
Expand Down Expand Up @@ -254,53 +252,26 @@ public String getEntryFilename()

public String getClusterId()
{
return contents.getClusterId();
return contents.clusterId();
}

public String getOwningQuery()
{
return contents.getOwningQuery();
return contents.owningQuery();
}

public Instant getExpirationTime()
{
return Instant.ofEpochMilli(contents.getExpirationEpochMillis());
return Instant.ofEpochMilli(contents.expirationEpochMillis());
}
}

public static class LockFileContents
public record LockFileContents(String clusterId, String owningQuery, long expirationEpochMillis)
{
private final String clusterId;
private final String owningQuery;
private final long expirationEpochMillis;

@JsonCreator
public LockFileContents(
@JsonProperty("clusterId") String clusterId,
@JsonProperty("owningQuery") String owningQuery,
@JsonProperty("expirationEpochMillis") long expirationEpochMillis)
public LockFileContents
{
this.clusterId = requireNonNull(clusterId, "clusterId is null");
this.owningQuery = requireNonNull(owningQuery, "owningQuery is null");
this.expirationEpochMillis = expirationEpochMillis;
}

@JsonProperty
public String getClusterId()
{
return clusterId;
}

@JsonProperty
public String getOwningQuery()
{
return owningQuery;
}

@JsonProperty
public long getExpirationEpochMillis()
{
return expirationEpochMillis;
requireNonNull(clusterId, "clusterId is null");
requireNonNull(owningQuery, "owningQuery is null");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ private static String transactionConflictErrors()
"|Target file already exists: .*/_delta_log/\\d+.json" +
"|Conflicting concurrent writes found\\..*" +
"|Multiple live locks found for:.*" +
"|Target file was created during locking: .*";
"|Target file was created during locking: .*" +
"|Conflict detected while writing Transaction Log .* to S3";
}

@Override
Expand Down
Loading