Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ public record CorruptedDeltaLakeTableHandle(
boolean catalogOwned,
boolean managed,
String location,
TrinoException originalException)
TrinoException originalException,
Optional<String> extendedStatsFile)
implements LocatedTableHandle
{
public CorruptedDeltaLakeTableHandle
{
requireNonNull(schemaTableName, "schemaTableName is null");
requireNonNull(location, "location is null");
requireNonNull(originalException, "originalException is null");
requireNonNull(extendedStatsFile, "extendedStatsFile is null");
}

public TrinoException createException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.spi.connector.SchemaTableName;

import java.util.List;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

Expand All @@ -32,7 +33,8 @@ public record DeltaLakeInsertTableHandle(
List<DeltaLakeColumnHandle> inputColumns,
long readVersion,
boolean retriesEnabled,
VendedCredentialsHandle credentialsHandle)
VendedCredentialsHandle credentialsHandle,
Optional<String> extendedStatisticsFile)
implements ConnectorInsertTableHandle
{
public DeltaLakeInsertTableHandle
Expand All @@ -43,6 +45,7 @@ public record DeltaLakeInsertTableHandle(
inputColumns = ImmutableList.copyOf(requireNonNull(inputColumns, "inputColumns is null"));
requireNonNull(location, "location is null");
requireNonNull(credentialsHandle, "credentialsHandle is null");
requireNonNull(extendedStatisticsFile, "extendedStatisticsFile is null");
}

@Override
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.plugin.deltalake.metastore.DeltaMetastoreTable;
import io.trino.plugin.deltalake.metastore.VendedCredentialsHandle;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataAndProtocolEntries;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
Expand Down Expand Up @@ -104,8 +105,9 @@ public DeltaLakePartitionsTable(
}

TrinoFileSystem fileSystem = fileSystemFactory.create(session, table);
this.metadataEntry = transactionLogAccess.getMetadataEntry(session, fileSystem, tableSnapshot);
this.protocolEntry = transactionLogAccess.getProtocolEntry(session, fileSystem, tableSnapshot);
MetadataAndProtocolEntries metadataAndProtocolEntries = transactionLogAccess.getMetadataAndProtocolEntry(session, fileSystem, tableSnapshot);
this.metadataEntry = metadataAndProtocolEntries.metadata().orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + table.schemaTableName()));
this.protocolEntry = metadataAndProtocolEntries.protocol().orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + table.schemaTableName()));
this.schema = extractSchema(metadataEntry, protocolEntry, typeManager);

this.partitionColumns = getPartitionColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableSet;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.deltalake.metastore.DeltaMetastoreTable;
import io.trino.plugin.deltalake.transactionlog.MetadataAndProtocolEntries;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
Expand All @@ -37,6 +38,7 @@
import java.util.List;
import java.util.Optional;

import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -86,11 +88,14 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
TrinoFileSystem fileSystem = fileSystemFactory.create(session, table);
try {
TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(session, table, Optional.empty());
metadataEntry = transactionLogAccess.getMetadataEntry(session, fileSystem, tableSnapshot);
protocolEntry = transactionLogAccess.getProtocolEntry(session, fileSystem, tableSnapshot);
MetadataAndProtocolEntries metadataAndProtocolEntries = transactionLogAccess.getMetadataAndProtocolEntry(session, fileSystem, tableSnapshot);
metadataEntry = metadataAndProtocolEntries.metadata()
.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + table.schemaTableName()));
protocolEntry = metadataAndProtocolEntries.protocol()
.orElseThrow(() -> new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + table.schemaTableName()));
}
catch (IOException e) {
throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Unable to load table metadata from location: " + table.location(), e);
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Unable to load table metadata from location: " + table.location(), e);
}

return new FixedPageSource(buildPages(metadataEntry, protocolEntry));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class DeltaLakeTableHandle
// Used only for validation when config property delta.query-partition-filter-required is enabled.
private final Set<DeltaLakeColumnHandle> constraintColumns;

// For extended stats
private final Optional<String> extendedStatsFile;

@JsonCreator
public DeltaLakeTableHandle(
@JsonProperty("schemaName") String schemaName,
Expand All @@ -71,7 +74,8 @@ public DeltaLakeTableHandle(
@JsonProperty("projectedColumns") Optional<Set<DeltaLakeColumnHandle>> projectedColumns,
@JsonProperty("analyzeHandle") Optional<AnalyzeHandle> analyzeHandle,
@JsonProperty("readVersion") long readVersion,
@JsonProperty("timeTravel") boolean timeTravel)
@JsonProperty("timeTravel") boolean timeTravel,
@JsonProperty("extendedStatsFile") Optional<String> extendedStatsFile)
{
this(
schemaName,
Expand All @@ -90,7 +94,8 @@ public DeltaLakeTableHandle(
false,
Optional.empty(),
readVersion,
timeTravel);
timeTravel,
extendedStatsFile);
}

public DeltaLakeTableHandle(
Expand All @@ -110,7 +115,8 @@ public DeltaLakeTableHandle(
boolean isOptimize,
Optional<DataSize> maxScannedFileSize,
long readVersion,
boolean timeTravel)
boolean timeTravel,
Optional<String> extendedStatsFile)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
Expand All @@ -129,6 +135,7 @@ public DeltaLakeTableHandle(
this.readVersion = readVersion;
this.timeTravel = timeTravel;
this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null"));
this.extendedStatsFile = requireNonNull(extendedStatsFile, "extendedStatsFileName is null");
}

public DeltaLakeTableHandle withProjectedColumns(Set<DeltaLakeColumnHandle> projectedColumns)
Expand All @@ -150,7 +157,8 @@ public DeltaLakeTableHandle withProjectedColumns(Set<DeltaLakeColumnHandle> proj
isOptimize,
maxScannedFileSize,
readVersion,
timeTravel);
timeTravel,
extendedStatsFile);
}

public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize)
Expand All @@ -172,7 +180,8 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max
true,
Optional.of(maxScannedFileSize),
readVersion,
timeTravel);
timeTravel,
extendedStatsFile);
}

public DeltaLakeTableHandle forMerge()
Expand All @@ -194,7 +203,8 @@ public DeltaLakeTableHandle forMerge()
isOptimize,
maxScannedFileSize,
readVersion,
timeTravel);
timeTravel,
extendedStatsFile);
}

@Override
Expand Down Expand Up @@ -329,6 +339,13 @@ public boolean isTimeTravel()
return timeTravel;
}

@Override
@JsonProperty("extendedStatsFile")
public Optional<String> extendedStatsFile()
{
return extendedStatsFile;
}

@Override
public String toString()
{
Expand Down Expand Up @@ -361,7 +378,8 @@ public boolean equals(Object o)
isOptimize == that.isOptimize &&
Objects.equals(maxScannedFileSize, that.maxScannedFileSize) &&
readVersion == that.readVersion &&
timeTravel == that.timeTravel;
timeTravel == that.timeTravel &&
Objects.equals(extendedStatsFile, that.extendedStatsFile);
}

@Override
Expand All @@ -383,6 +401,7 @@ public int hashCode()
isOptimize,
maxScannedFileSize,
readVersion,
timeTravel);
timeTravel,
extendedStatsFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;

import java.util.Optional;

public interface LocatedTableHandle
extends ConnectorTableHandle
{
Expand All @@ -27,4 +29,6 @@ public interface LocatedTableHandle
String location();

VendedCredentialsHandle toCredentialsHandle();

Optional<String> extendedStatsFile();
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class DeltaTableOptimizeHandle
private final boolean retriesEnabled;
private final TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint;
private final VendedCredentialsHandle credentialsHandle;
private final Optional<String> extendedStatsFile;

@JsonCreator
public DeltaTableOptimizeHandle(
Expand All @@ -52,7 +53,8 @@ public DeltaTableOptimizeHandle(
Optional<Long> currentVersion,
boolean retriesEnabled,
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
VendedCredentialsHandle credentialsHandle)
VendedCredentialsHandle credentialsHandle,
Optional<String> extendedStatsFile)
{
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
Expand All @@ -63,6 +65,7 @@ public DeltaTableOptimizeHandle(
this.retriesEnabled = retriesEnabled;
this.enforcedPartitionConstraint = requireNonNull(enforcedPartitionConstraint, "enforcedPartitionConstraint is null");
this.credentialsHandle = requireNonNull(credentialsHandle, "credentialsHandle is null");
this.extendedStatsFile = requireNonNull(extendedStatsFile, "extendedStatsFile is null");
}

public DeltaTableOptimizeHandle withCurrentVersion(long currentVersion)
Expand All @@ -77,7 +80,8 @@ public DeltaTableOptimizeHandle withCurrentVersion(long currentVersion)
Optional.of(currentVersion),
retriesEnabled,
enforcedPartitionConstraint,
credentialsHandle);
credentialsHandle,
extendedStatsFile);
}

public DeltaTableOptimizeHandle withEnforcedPartitionConstraint(TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint)
Expand All @@ -91,7 +95,8 @@ public DeltaTableOptimizeHandle withEnforcedPartitionConstraint(TupleDomain<Delt
currentVersion,
retriesEnabled,
requireNonNull(enforcedPartitionConstraint, "enforcedPartitionConstraint is null"),
credentialsHandle);
credentialsHandle,
extendedStatsFile);
}

@JsonProperty
Expand Down Expand Up @@ -150,4 +155,10 @@ public VendedCredentialsHandle getCredentialsHandle()
{
return credentialsHandle;
}

@JsonProperty
public Optional<String> getExtendedStatsFile()
{
return extendedStatsFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Optional;

import static io.trino.plugin.base.util.Procedures.checkProcedureArgument;
import static io.trino.plugin.deltalake.statistics.MetaDirStatisticsAccess.STATISTICS_FILE;
import static io.trino.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
Expand Down Expand Up @@ -88,7 +89,7 @@ public void dropStats(ConnectorSession session, ConnectorAccessControl accessCon
throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", name));
}
accessControl.checkCanInsertIntoTable(null, name);
statsAccess.deleteExtendedStatistics(session, name, tableHandle.location(), tableHandle.toCredentialsHandle());
statsAccess.deleteExtendedStatistics(session, name, tableHandle.location(), tableHandle.extendedStatsFile().orElse(STATISTICS_FILE), tableHandle.toCredentialsHandle());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,15 @@ public CachingExtendedStatisticsAccess(@ForCachingExtendedStatisticsAccess Exten
}

@Override
public Optional<ExtendedStatistics> readExtendedStatistics(ConnectorSession session, SchemaTableName schemaTableName, String tableLocation, VendedCredentialsHandle credentialsHandle)
public Optional<ExtendedStatistics> readExtendedStatistics(
ConnectorSession session,
SchemaTableName schemaTableName,
String tableLocation,
String extendedStatsFile,
VendedCredentialsHandle credentialsHandle)
{
try {
return uncheckedCacheGet(cache, new CacheKey(schemaTableName, tableLocation), () -> delegate.readExtendedStatistics(session, schemaTableName, tableLocation, credentialsHandle));
return uncheckedCacheGet(cache, new CacheKey(schemaTableName, tableLocation, extendedStatsFile), () -> delegate.readExtendedStatistics(session, schemaTableName, tableLocation, extendedStatsFile, credentialsHandle));
}
catch (UncheckedExecutionException e) {
throwIfInstanceOf(e.getCause(), TrinoException.class);
Expand All @@ -70,17 +75,24 @@ public Optional<ExtendedStatistics> readExtendedStatistics(ConnectorSession sess
}

@Override
public void updateExtendedStatistics(ConnectorSession session, SchemaTableName schemaTableName, String tableLocation, VendedCredentialsHandle credentialsHandle, ExtendedStatistics statistics)
public String writeExtendedStatistics(
ConnectorSession session,
SchemaTableName schemaTableName,
String tableLocation,
Optional<String> previousExtendedStatsFile,
VendedCredentialsHandle credentialsHandle,
ExtendedStatistics statistics)
{
delegate.updateExtendedStatistics(session, schemaTableName, tableLocation, credentialsHandle, statistics);
cache.invalidate(new CacheKey(schemaTableName, tableLocation));
String extendedStatsFile = delegate.writeExtendedStatistics(session, schemaTableName, tableLocation, previousExtendedStatsFile, credentialsHandle, statistics);
cache.invalidate(new CacheKey(schemaTableName, tableLocation, extendedStatsFile));
return extendedStatsFile;
}

@Override
public void deleteExtendedStatistics(ConnectorSession session, SchemaTableName schemaTableName, String tableLocation, VendedCredentialsHandle credentialsHandle)
public void deleteExtendedStatistics(ConnectorSession session, SchemaTableName schemaTableName, String tableLocation, String extendedStatsFile, VendedCredentialsHandle credentialsHandle)
{
delegate.deleteExtendedStatistics(session, schemaTableName, tableLocation, credentialsHandle);
cache.invalidate(new CacheKey(schemaTableName, tableLocation));
delegate.deleteExtendedStatistics(session, schemaTableName, tableLocation, extendedStatsFile, credentialsHandle);
cache.invalidate(new CacheKey(schemaTableName, tableLocation, extendedStatsFile));
}

public void invalidateCache()
Expand All @@ -102,12 +114,13 @@ public void invalidateCache(SchemaTableName schemaTableName, Optional<String> ta
@BindingAnnotation
public @interface ForCachingExtendedStatisticsAccess {}

private record CacheKey(SchemaTableName tableName, String location)
private record CacheKey(SchemaTableName tableName, String location, String extendedStatsFile)
{
CacheKey
{
requireNonNull(tableName, "tableName is null");
requireNonNull(location, "location is null");
requireNonNull(extendedStatsFile, "extendedStatsFile is null");
}
}
}
Loading