Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2224,7 +2224,10 @@ private long commitInsertOperation(
long currentVersion = getMandatoryCurrentVersion(fileSystem, handle.location(), readVersion.get());

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this commit change logic in any way, or just a refactoring ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just refactoring

List<DeltaLakeTableHandle> sameAsTargetSourceTableHandles = getSameAsTargetSourceTableHandles(sourceTableHandles, handle.tableName());
checkForConcurrentTransactionConflicts(session, fileSystem, sameAsTargetSourceTableHandles, isolationLevel, currentVersion, readVersion, handle.location(), attemptCount);
List<TupleDomain<DeltaLakeColumnHandle>> enforcedSourcePartitionConstraints = sameAsTargetSourceTableHandles.stream()
.map(DeltaLakeTableHandle::getEnforcedPartitionConstraint)
.collect(toImmutableList());
checkForConcurrentTransactionConflicts(session, fileSystem, enforcedSourcePartitionConstraints, isolationLevel, currentVersion, readVersion, handle.location(), attemptCount);
long commitVersion = currentVersion + 1;
writeTransactionLogForInsertOperation(session, handle, sameAsTargetSourceTableHandles.isEmpty(), isolationLevel, dataFileInfos, commitVersion, currentVersion);
return commitVersion;
Expand All @@ -2246,7 +2249,7 @@ private List<DeltaLakeTableHandle> getSameAsTargetSourceTableHandles(
private void checkForConcurrentTransactionConflicts(
ConnectorSession session,
TrinoFileSystem fileSystem,
List<DeltaLakeTableHandle> sameAsTargetSourceTableHandles,
List<TupleDomain<DeltaLakeColumnHandle>> enforcedSourcePartitionConstraints,
IsolationLevel isolationLevel,
long currentVersion,
AtomicReference<Long> readVersion,
Expand All @@ -2272,10 +2275,7 @@ private void checkForConcurrentTransactionConflicts(

switch (isolationLevel) {
case WRITESERIALIZABLE -> {
if (!sameAsTargetSourceTableHandles.isEmpty()) {
List<TupleDomain<DeltaLakeColumnHandle>> enforcedSourcePartitionConstraints = sameAsTargetSourceTableHandles.stream()
.map(DeltaLakeTableHandle::getEnforcedPartitionConstraint)
.collect(toImmutableList());
if (!enforcedSourcePartitionConstraints.isEmpty()) {
TupleDomain<DeltaLakeColumnHandle> enforcedSourcePartitionConstraintsUnion = TupleDomain.columnWiseUnion(enforcedSourcePartitionConstraints);

checkIfCommittedAddedFilesConflictWithCurrentOperation(enforcedSourcePartitionConstraintsUnion, commitSummary);
Expand Down Expand Up @@ -2554,9 +2554,12 @@ private long commitMergeOperation(
long createdTime = Instant.now().toEpochMilli();

List<DeltaLakeTableHandle> sameAsTargetSourceTableHandles = getSameAsTargetSourceTableHandles(sourceTableHandles, handle.getSchemaTableName());
List<TupleDomain<DeltaLakeColumnHandle>> enforcedSourcePartitionConstraints = sameAsTargetSourceTableHandles.stream()
.map(DeltaLakeTableHandle::getEnforcedPartitionConstraint)
.collect(toImmutableList());
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
long currentVersion = getMandatoryCurrentVersion(fileSystem, tableLocation, readVersion.get());
checkForConcurrentTransactionConflicts(session, fileSystem, sameAsTargetSourceTableHandles, isolationLevel, currentVersion, readVersion, handle.getLocation(), attemptCount);
checkForConcurrentTransactionConflicts(session, fileSystem, enforcedSourcePartitionConstraints, isolationLevel, currentVersion, readVersion, handle.getLocation(), attemptCount);
long commitVersion = currentVersion + 1;

transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, isolationLevel, commitVersion, createdTime, MERGE_OPERATION, handle.getReadVersion(), sameAsTargetSourceTableHandles.isEmpty()));
Expand Down Expand Up @@ -2674,7 +2677,8 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForOptimize(DeltaLak
tableHandle.getMetadataEntry().getOriginalPartitionColumns(),
maxScannedFileSize,
Optional.empty(),
retryMode != NO_RETRIES),
retryMode != NO_RETRIES,
tableHandle.getEnforcedPartitionConstraint()),
tableHandle.getLocation()));
}

Expand Down Expand Up @@ -2728,7 +2732,10 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl
checkSupportedWriterVersion(table);

return new BeginTableExecuteResult<>(
executeHandle.withProcedureHandle(optimizeHandle.withCurrentVersion(table.getReadVersion())),
executeHandle.withProcedureHandle(
optimizeHandle
.withCurrentVersion(table.getReadVersion())
.withEnforcedPartitionConstraint(table.getEnforcedPartitionConstraint())),
table.forOptimize(true, optimizeHandle.getMaxScannedFileSize()));
}

Expand All @@ -2747,11 +2754,10 @@ public void finishTableExecute(ConnectorSession session, ConnectorTableExecuteHa
private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandle executeHandle, Collection<Slice> fragments, List<Object> splitSourceInfo)
{
DeltaTableOptimizeHandle optimizeHandle = (DeltaTableOptimizeHandle) executeHandle.procedureHandle();
long readVersion = optimizeHandle.getCurrentVersion().orElseThrow(() -> new IllegalArgumentException("currentVersion not set"));
String tableLocation = executeHandle.tableLocation();

// paths to be deleted
Set<DeltaLakeScannedDataFile> scannnedDataFiles = splitSourceInfo.stream()
Set<DeltaLakeScannedDataFile> scannedDataFiles = splitSourceInfo.stream()
.map(DeltaLakeScannedDataFile.class::cast)
.collect(toImmutableSet());

Expand All @@ -2767,35 +2773,10 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl

boolean writeCommitted = false;
try {
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, tableLocation);

long createdTime = Instant.now().toEpochMilli();
long commitVersion = readVersion + 1;
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, IsolationLevel.WRITESERIALIZABLE, commitVersion, createdTime, OPTIMIZE_OPERATION, readVersion, false));
// TODO: Delta writes another field "operationMetrics" that I haven't
// seen before. It contains delete/update metrics. Investigate/include it.

long writeTimestamp = Instant.now().toEpochMilli();

for (DeltaLakeScannedDataFile scannedFile : scannnedDataFiles) {
String relativePath = relativePath(tableLocation, scannedFile.path());
Map<String, Optional<String>> canonicalPartitionValues = scannedFile.partitionKeys();
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(
toUriFormat(relativePath),
createPartitionValuesMap(canonicalPartitionValues),
writeTimestamp,
false,
Optional.empty()));
}

// Note: during writes we want to preserve original case of partition columns
List<String> partitionColumns = getPartitionColumns(
optimizeHandle.getMetadataEntry().getOriginalPartitionColumns(),
optimizeHandle.getTableColumns(),
getColumnMappingMode(optimizeHandle.getMetadataEntry(), optimizeHandle.getProtocolEntry()));
appendAddFileEntries(transactionLogWriter, dataFileInfos, partitionColumns, getExactColumnNames(optimizeHandle.getMetadataEntry()), false);

transactionLogWriter.flush();
IsolationLevel isolationLevel = getIsolationLevel(optimizeHandle.getMetadataEntry());
AtomicReference<Long> readVersion = new AtomicReference<>(optimizeHandle.getCurrentVersion().orElseThrow(() -> new IllegalArgumentException("currentVersion not set")));
long commitVersion = Failsafe.with(TRANSACTION_CONFLICT_RETRY_POLICY)
.get(context -> commitOptimizeOperation(session, optimizeHandle, isolationLevel, tableLocation, scannedDataFiles, dataFileInfos, readVersion, context.getAttemptCount()));
writeCommitted = true;
enqueueUpdateInfo(
session,
Expand All @@ -2822,6 +2803,59 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl
}
}

private long commitOptimizeOperation(
ConnectorSession session,
DeltaTableOptimizeHandle optimizeHandle,
IsolationLevel isolationLevel,
String tableLocation,
Set<DeltaLakeScannedDataFile> scannedDataFiles,
List<DataFileInfo> dataFileInfos,
AtomicReference<Long> readVersion,
int attemptCount)
throws IOException
{
TransactionLogWriter transactionLogWriter = transactionLogWriterFactory.newWriter(session, tableLocation);

long createdTime = Instant.now().toEpochMilli();
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
long currentVersion = getMandatoryCurrentVersion(fileSystem, tableLocation, readVersion.get());
checkForConcurrentTransactionConflicts(session, fileSystem, ImmutableList.of(optimizeHandle.getEnforcedPartitionConstraint()), isolationLevel, currentVersion, readVersion, tableLocation, attemptCount);
long commitVersion = currentVersion + 1;
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(
session,
isolationLevel,
commitVersion,
createdTime,
OPTIMIZE_OPERATION,
optimizeHandle.getCurrentVersion().orElseThrow(() -> new IllegalArgumentException("currentVersion not set")),
false));
// TODO: Delta writes another field "operationMetrics" that I haven't
// seen before. It contains delete/update metrics. Investigate/include it.

long writeTimestamp = Instant.now().toEpochMilli();

for (DeltaLakeScannedDataFile scannedFile : scannedDataFiles) {
String relativePath = relativePath(tableLocation, scannedFile.path());
Map<String, Optional<String>> canonicalPartitionValues = scannedFile.partitionKeys();
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(
toUriFormat(relativePath),
createPartitionValuesMap(canonicalPartitionValues),
writeTimestamp,
false,
Optional.empty()));
}

// Note: during writes we want to preserve original case of partition columns
List<String> partitionColumns = getPartitionColumns(
optimizeHandle.getMetadataEntry().getOriginalPartitionColumns(),
optimizeHandle.getTableColumns(),
getColumnMappingMode(optimizeHandle.getMetadataEntry(), optimizeHandle.getProtocolEntry()));
appendAddFileEntries(transactionLogWriter, dataFileInfos, partitionColumns, getExactColumnNames(optimizeHandle.getMetadataEntry()), false);

transactionLogWriter.flush();
return commitVersion;
}

private void checkWriteAllowed(ConnectorSession session, DeltaLakeTableHandle table)
{
if (!allowWrite(session, table)) {
Expand Down Expand Up @@ -4158,7 +4192,7 @@ private CommitDeleteOperationResult commitDeleteOperation(
long writeTimestamp = Instant.now().toEpochMilli();
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
long currentVersion = getMandatoryCurrentVersion(fileSystem, tableLocation, readVersion.get());
checkForConcurrentTransactionConflicts(session, fileSystem, ImmutableList.of(tableHandle), isolationLevel, currentVersion, readVersion, tableHandle.getLocation(), attemptCount);
checkForConcurrentTransactionConflicts(session, fileSystem, ImmutableList.of(tableHandle.getEnforcedPartitionConstraint()), isolationLevel, currentVersion, readVersion, tableHandle.getLocation(), attemptCount);
long commitVersion = currentVersion + 1;
transactionLogWriter.appendCommitInfoEntry(getCommitInfoEntry(session, isolationLevel, commitVersion, writeTimestamp, operation, tableHandle.getReadVersion(), false));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.spi.predicate.TupleDomain;

import java.util.List;
import java.util.Optional;
Expand All @@ -37,6 +38,7 @@ public class DeltaTableOptimizeHandle
private final DataSize maxScannedFileSize;
private final Optional<Long> currentVersion;
private final boolean retriesEnabled;
private final TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint;

@JsonCreator
public DeltaTableOptimizeHandle(
Expand All @@ -46,7 +48,8 @@ public DeltaTableOptimizeHandle(
List<String> originalPartitionColumns,
DataSize maxScannedFileSize,
Optional<Long> currentVersion,
boolean retriesEnabled)
boolean retriesEnabled,
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint)
{
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
Expand All @@ -55,6 +58,7 @@ public DeltaTableOptimizeHandle(
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
this.currentVersion = requireNonNull(currentVersion, "currentVersion is null");
this.retriesEnabled = retriesEnabled;
this.enforcedPartitionConstraint = requireNonNull(enforcedPartitionConstraint, "enforcedPartitionConstraint is null");
}

public DeltaTableOptimizeHandle withCurrentVersion(long currentVersion)
Expand All @@ -67,7 +71,21 @@ public DeltaTableOptimizeHandle withCurrentVersion(long currentVersion)
originalPartitionColumns,
maxScannedFileSize,
Optional.of(currentVersion),
retriesEnabled);
retriesEnabled,
enforcedPartitionConstraint);
}

public DeltaTableOptimizeHandle withEnforcedPartitionConstraint(TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint)
{
return new DeltaTableOptimizeHandle(
metadataEntry,
protocolEntry,
tableColumns,
originalPartitionColumns,
maxScannedFileSize,
currentVersion,
retriesEnabled,
requireNonNull(enforcedPartitionConstraint, "enforcedPartitionConstraint is null"));
}

@JsonProperty
Expand Down Expand Up @@ -114,4 +132,10 @@ public boolean isRetriesEnabled()
{
return retriesEnabled;
}

@JsonProperty
public TupleDomain<DeltaLakeColumnHandle> getEnforcedPartitionConstraint()
{
return enforcedPartitionConstraint;
}
}
Loading
Loading