Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<dep.lucene.version>9.12.0</dep.lucene.version>
<dep.assertj-core.version>3.8.0</dep.assertj-core.version>
<dep.parquet.version>1.16.0</dep.parquet.version>
<dep.iceberg.version>1.10.0</dep.iceberg.version>
<dep.iceberg.version>1.10.1</dep.iceberg.version>
<dep.asm.version>9.7.1</dep.asm.version>
<dep.gcs.version>1.9.17</dep.gcs.version>
<dep.alluxio.version>313</dep.alluxio.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,29 @@ default void dropTableFromMetastore(MetastoreContext metastoreContext, String da

MetastoreOperationResult persistTable(MetastoreContext metastoreContext, String databaseName, String tableName, Table newTable, PrincipalPrivileges principalPrivileges, Supplier<PartitionStatistics> update, Map<String, String> additionalParameters);

/**
* Atomically commit a data location change on an unpartitioned Iceberg table
* using compare-and-swap (CAS) semantics. Only sd.location is updated — schema,
* parameters, and owner are NOT modified.
*
* <p>This uses the metastore's {@code commit_table_data} API which provides true
* CAS semantics: the commit succeeds only if {@code previousLocation} matches the
* current sd.location in the metastore. This is more reliable than the
* {@code alter_table} path used by {@link #persistTable} for Iceberg metadata commits.
*
* @param metastoreContext the metastore context
* @param databaseName the database name
* @param tableName the table name
* @param newLocation the new metadata file location (sd.location)
* @param previousLocation CAS guard: must match current sd.location in metastore
* @return the metastore operation result
* @throws UnsupportedOperationException if the metastore implementation does not support this API
*/
default MetastoreOperationResult commitTableData(MetastoreContext metastoreContext, String databaseName, String tableName, String newLocation, String previousLocation)
{
throw new UnsupportedOperationException("commitTableData is not supported by this metastore implementation");
}

MetastoreOperationResult renameTable(MetastoreContext metastoreContext, String databaseName, String tableName, String newDatabaseName, String newTableName);

MetastoreOperationResult addColumn(MetastoreContext metastoreContext, String databaseName, String tableName, String columnName, HiveType columnType, String columnComment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Optional;
import java.util.OptionalLong;

import static java.util.Objects.requireNonNull;

Expand All @@ -30,6 +31,9 @@ public class CommitTaskData
private final FileFormat fileFormat;
private final Optional<String> referencedDataFile;
private final FileContent content;
private final OptionalLong contentOffset;
private final OptionalLong contentSizeInBytes;
private final OptionalLong recordCount;

@JsonCreator
public CommitTaskData(
Expand All @@ -40,7 +44,10 @@ public CommitTaskData(
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson,
@JsonProperty("fileFormat") FileFormat fileFormat,
@JsonProperty("referencedDataFile") String referencedDataFile,
@JsonProperty("content") FileContent content)
@JsonProperty("content") FileContent content,
@JsonProperty("contentOffset") OptionalLong contentOffset,
@JsonProperty("contentSizeInBytes") OptionalLong contentSizeInBytes,
@JsonProperty("recordCount") OptionalLong recordCount)
{
this.path = requireNonNull(path, "path is null");
this.fileSizeInBytes = fileSizeInBytes;
Expand All @@ -50,6 +57,24 @@ public CommitTaskData(
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.referencedDataFile = Optional.ofNullable(referencedDataFile);
this.content = requireNonNull(content, "content is null");
this.contentOffset = contentOffset != null ? contentOffset : OptionalLong.empty();
this.contentSizeInBytes = contentSizeInBytes != null ? contentSizeInBytes : OptionalLong.empty();
this.recordCount = recordCount != null ? recordCount : OptionalLong.empty();
}

public CommitTaskData(
String path,
long fileSizeInBytes,
MetricsWrapper metrics,
int partitionSpecId,
Optional<String> partitionDataJson,
FileFormat fileFormat,
String referencedDataFile,
FileContent content)
{
this(path, fileSizeInBytes, metrics, partitionSpecId, partitionDataJson,
fileFormat, referencedDataFile, content,
OptionalLong.empty(), OptionalLong.empty(), OptionalLong.empty());
}

@JsonProperty
Expand Down Expand Up @@ -99,4 +124,22 @@ public FileContent getContent()
{
return content;
}

@JsonProperty
public OptionalLong getContentOffset()
{
return contentOffset;
}

@JsonProperty
public OptionalLong getContentSizeInBytes()
{
return contentSizeInBytes;
}

@JsonProperty
public OptionalLong getRecordCount()
{
return recordCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,16 @@ private static Object getIcebergLiteralValue(Type type, Marker marker)
return toIntExact(((Long) marker.getValue()));
}

if (type instanceof TimestampType || type instanceof TimeType) {
if (type instanceof TimestampType) {
TimestampType tsType = (TimestampType) type;
long value = (Long) marker.getValue();
if (tsType.getPrecision() == MILLISECONDS) {
return MILLISECONDS.toMicros(value);
}
return value;
}

if (type instanceof TimeType) {
return MILLISECONDS.toMicros((Long) marker.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ public enum FileFormat
ORC("orc", true),
PARQUET("parquet", true),
AVRO("avro", true),
METADATA("metadata.json", false);
METADATA("metadata.json", false),
PUFFIN("puffin", false),
DWRF("dwrf", true);

private final String ext;
private final boolean splittable;
Expand Down Expand Up @@ -61,6 +63,9 @@ public static FileFormat fromIcebergFileFormat(org.apache.iceberg.FileFormat for
case METADATA:
prestoFileFormat = METADATA;
break;
case PUFFIN:
prestoFileFormat = PUFFIN;
break;
default:
throw new PrestoException(NOT_SUPPORTED, "Unsupported file format: " + format);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,19 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata)
if (base == null) {
metastore.createTable(metastoreContext, table, privileges, emptyList());
}
else if (config.getCommitTableDataEnabled()) {
// Use commit_table_data CAS API for atomic metadata location swap.
// This is more reliable than alter_table for Iceberg metadata commits
// because it provides true compare-and-swap semantics on sd.location.
try {
metastore.commitTableData(metastoreContext, database, tableName, newMetadataLocation, currentMetadataLocation);
}
catch (UnsupportedOperationException e) {
log.warn("commitTableData not supported by metastore, falling back to persistTable for %s.%s", database, tableName);
PartitionStatistics tableStats = metastore.getTableStatistics(metastoreContext, database, tableName);
metastore.persistTable(metastoreContext, database, tableName, table, privileges, () -> tableStats, useHMSLock ? ImmutableMap.of() : hmsEnvContext(base.metadataFileLocation()));
}
}
else {
PartitionStatistics tableStats = metastore.getTableStatistics(metastoreContext, database, tableName);
metastore.persistTable(metastoreContext, database, tableName, table, privileges, () -> tableStats, useHMSLock ? ImmutableMap.of() : hmsEnvContext(base.metadataFileLocation()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,9 @@ protected static void validateTableForPresto(BaseTable table, Optional<Long> tab
schema = metadata.schema();
}

// Reject schema default values (initial-default / write-default)
for (Types.NestedField field : schema.columns()) {
if (field.initialDefault() != null || field.writeDefault() != null) {
throw new PrestoException(NOT_SUPPORTED, "Iceberg v3 column default values are not supported");
}
}
// Iceberg v3 column default values (initial-default / write-default) are supported.
// The Iceberg library handles applying defaults when reading files that were written
// before a column with a default was added via schema evolution.

// Reject Iceberg table encryption
if (!metadata.encryptionKeys().isEmpty() || snapshot.keyId() != null || metadata.properties().containsKey("encryption.key-id")) {
Expand Down Expand Up @@ -837,6 +834,16 @@ private void handleFinishPositionDeletes(CommitTaskData task, PartitionSpec part
deleteBuilder.withPartition(PartitionData.fromJson(partitionDataJson, partitionColumnTypes));
}

// For PUFFIN deletion vectors: set content offset, content size, record count,
// and referenced data file path. These fields enable the Iceberg library to
// correctly read the DV blob from the PUFFIN container file.
if (task.getFileFormat() == FileFormat.PUFFIN) {
task.getContentOffset().ifPresent(deleteBuilder::withContentOffset);
task.getContentSizeInBytes().ifPresent(deleteBuilder::withContentSizeInBytes);
task.getRecordCount().ifPresent(deleteBuilder::withRecordCount);
task.getReferencedDataFile().ifPresent(deleteBuilder::withReferencedDataFile);
}

rowDelta.addDeletes(deleteBuilder.build());
writtenFiles.add(task.getPath());
task.getReferencedDataFile().ifPresent(referencedDataFiles::add);
Expand Down Expand Up @@ -903,12 +910,19 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
format("Iceberg table updates for format version %s are not supported yet", formatVersion));
}

if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE ||
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) {
throw new PrestoException(ICEBERG_INVALID_FORMAT_VERSION,
"Iceberg table updates require at least format version 2");
}

// V3+ tables use deletion vectors natively (inherently merge-on-read).
// V2 tables require explicit merge-on-read mode configuration.
if (formatVersion < 3 &&
!Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE))
.map(mode -> mode.equals(MERGE_ON_READ.modeName()))
.orElse(false)) {
throw new PrestoException(ICEBERG_INVALID_FORMAT_VERSION,
"Iceberg table updates require at least format version 2 and update mode must be merge-on-read");
"Iceberg V2 table updates require update mode to be merge-on-read");
}
validateTableMode(session, icebergTable);

Expand All @@ -927,7 +941,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT

Map<Integer, PrestoIcebergPartitionSpec> partitionSpecs = transformValues(icebergTable.specs(), partitionSpec -> toPrestoPartitionSpec(partitionSpec, typeManager));

return new IcebergMergeTableHandle(icebergTableHandle, insertHandle, partitionSpecs);
return new IcebergMergeTableHandle(icebergTableHandle, insertHandle, partitionSpecs, formatVersion);
}

@Override
Expand Down Expand Up @@ -1492,7 +1506,8 @@ public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, Connecto
throw new PrestoException(NOT_SUPPORTED,
format("Iceberg table updates for format version %s are not supported yet", formatVersion));
}
if (getDeleteMode(icebergTable) == RowLevelOperationMode.COPY_ON_WRITE) {
// V3+ tables use deletion vectors natively; V2 requires explicit merge-on-read mode.
if (formatVersion < 3 && getDeleteMode(icebergTable) == RowLevelOperationMode.COPY_ON_WRITE) {
throw new PrestoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely. Configure write.delete.mode table property to allow row level deletions.");
}
validateTableMode(session, icebergTable);
Expand Down Expand Up @@ -1524,8 +1539,23 @@ public Optional<ConnectorOutputMetadata> finishDeleteWithOutput(ConnectorSession
.ofPositionDeletes()
.withPath(task.getPath())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withFormat(FileFormat.fromString(task.getFileFormat().name()))
.withMetrics(task.getMetrics().metrics());
.withFormat(FileFormat.fromString(task.getFileFormat().name()));

if (task.getFileFormat() == com.facebook.presto.iceberg.FileFormat.PUFFIN) {
builder.withRecordCount(task.getRecordCount().orElseThrow(() ->
new VerifyException("recordCount required for deletion vector")));
builder.withContentOffset(task.getContentOffset().orElseThrow(() ->
new VerifyException("contentOffset required for deletion vector")));
builder.withContentSizeInBytes(task.getContentSizeInBytes().orElseThrow(() ->
new VerifyException("contentSizeInBytes required for deletion vector")));
}
else {
builder.withMetrics(task.getMetrics().metrics());
}

if (task.getReferencedDataFile().isPresent()) {
builder.withReferencedDataFile(task.getReferencedDataFile().get());
}

if (!spec.fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
Expand Down Expand Up @@ -1758,11 +1788,17 @@ public ConnectorTableHandle beginUpdate(ConnectorSession session, ConnectorTable
format("Iceberg table updates for format version %s are not supported yet", formatVersion));
}

if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE ||
// V3+ tables use deletion vectors natively (inherently merge-on-read).
// V2 tables require explicit merge-on-read mode configuration.
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) {
throw new RuntimeException("Iceberg table updates require at least format version 2");
}

if (formatVersion < 3 &&
!Optional.ofNullable(icebergTable.properties().get(TableProperties.UPDATE_MODE))
.map(mode -> mode.equals(MERGE_ON_READ.modeName()))
.orElse(false)) {
throw new RuntimeException("Iceberg table updates require at least format version 2 and update mode must be merge-on-read");
throw new RuntimeException("Iceberg V2 table updates require update mode to be merge-on-read");
}
validateTableMode(session, icebergTable);
return handle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.facebook.presto.iceberg.procedure.RegisterTableProcedure;
import com.facebook.presto.iceberg.procedure.RemoveOrphanFiles;
import com.facebook.presto.iceberg.procedure.RewriteDataFilesProcedure;
import com.facebook.presto.iceberg.procedure.RewriteDeleteFilesProcedure;
import com.facebook.presto.iceberg.procedure.RewriteManifestsProcedure;
import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure;
import com.facebook.presto.iceberg.procedure.RollbackToTimestampProcedure;
Expand Down Expand Up @@ -195,6 +196,7 @@ protected void setup(Binder binder)
procedures.addBinding().toProvider(StatisticsFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(ManifestFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RewriteDeleteFilesProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RewriteManifestsProcedure.class).in(Scopes.SINGLETON);

// for orc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.airlift.bootstrap.LifeCycleManager;
import com.facebook.presto.hive.HiveTransactionHandle;
import com.facebook.presto.iceberg.function.IcebergBucketFunction;
import com.facebook.presto.iceberg.function.VariantFunctions;
import com.facebook.presto.iceberg.function.changelog.ApplyChangelogFunction;
import com.facebook.presto.iceberg.transaction.IcebergTransactionManager;
import com.facebook.presto.iceberg.transaction.IcebergTransactionMetadata;
Expand Down Expand Up @@ -256,6 +257,7 @@ public Set<Class<?>> getSystemFunctions()
.add(ApplyChangelogFunction.class)
.add(IcebergBucketFunction.class)
.add(IcebergBucketFunction.Bucket.class)
.add(VariantFunctions.class)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public enum IcebergErrorCode
ICEBERG_INVALID_MATERIALIZED_VIEW(18, EXTERNAL),
ICEBERG_INVALID_SPEC_ID(19, EXTERNAL),
ICEBERG_TRANSACTION_CONFLICT_ERROR(20, EXTERNAL),
ICEBERG_WRITER_CLOSE_ERROR(21, EXTERNAL),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,20 @@ public boolean getLockingEnabled()
{
return lockingEnabled;
}

private boolean commitTableDataEnabled;

public boolean getCommitTableDataEnabled()
{
return commitTableDataEnabled;
}

@Config("iceberg.hive.commit-table-data-enabled")
@ConfigDescription("Use commit_table_data CAS API instead of alter_table for Iceberg metadata commits. " +
"Requires metastore support for the commit_table_data API.")
public IcebergHiveTableOperationsConfig setCommitTableDataEnabled(boolean commitTableDataEnabled)
{
this.commitTableDataEnabled = commitTableDataEnabled;
return this;
}
}
Loading
Loading