Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<dep.lucene.version>9.12.0</dep.lucene.version>
<dep.assertj-core.version>3.8.0</dep.assertj-core.version>
<dep.parquet.version>1.16.0</dep.parquet.version>
<dep.iceberg.version>1.10.0</dep.iceberg.version>
<dep.iceberg.version>1.10.1</dep.iceberg.version>
<dep.asm.version>9.9.1</dep.asm.version>
<dep.gcs.version>1.9.17</dep.gcs.version>
<dep.alluxio.version>313</dep.alluxio.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Optional;
import java.util.OptionalLong;

import static java.util.Objects.requireNonNull;

Expand All @@ -30,6 +31,9 @@ public class CommitTaskData
private final FileFormat fileFormat;
private final Optional<String> referencedDataFile;
private final FileContent content;
private final OptionalLong contentOffset;
private final OptionalLong contentSizeInBytes;
private final OptionalLong recordCount;

@JsonCreator
public CommitTaskData(
Expand All @@ -40,7 +44,10 @@ public CommitTaskData(
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson,
@JsonProperty("fileFormat") FileFormat fileFormat,
@JsonProperty("referencedDataFile") String referencedDataFile,
@JsonProperty("content") FileContent content)
@JsonProperty("content") FileContent content,
@JsonProperty("contentOffset") OptionalLong contentOffset,
@JsonProperty("contentSizeInBytes") OptionalLong contentSizeInBytes,
@JsonProperty("recordCount") OptionalLong recordCount)
{
this.path = requireNonNull(path, "path is null");
this.fileSizeInBytes = fileSizeInBytes;
Expand All @@ -50,6 +57,24 @@ public CommitTaskData(
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.referencedDataFile = Optional.ofNullable(referencedDataFile);
this.content = requireNonNull(content, "content is null");
this.contentOffset = contentOffset != null ? contentOffset : OptionalLong.empty();
this.contentSizeInBytes = contentSizeInBytes != null ? contentSizeInBytes : OptionalLong.empty();
this.recordCount = recordCount != null ? recordCount : OptionalLong.empty();
}

public CommitTaskData(
String path,
long fileSizeInBytes,
MetricsWrapper metrics,
int partitionSpecId,
Optional<String> partitionDataJson,
FileFormat fileFormat,
String referencedDataFile,
FileContent content)
{
this(path, fileSizeInBytes, metrics, partitionSpecId, partitionDataJson,
fileFormat, referencedDataFile, content,
OptionalLong.empty(), OptionalLong.empty(), OptionalLong.empty());
}

@JsonProperty
Expand Down Expand Up @@ -99,4 +124,22 @@ public FileContent getContent()
{
return content;
}

@JsonProperty
public OptionalLong getContentOffset()
{
return contentOffset;
}

@JsonProperty
public OptionalLong getContentSizeInBytes()
{
return contentSizeInBytes;
}

@JsonProperty
public OptionalLong getRecordCount()
{
return recordCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,16 @@ private static Object getIcebergLiteralValue(Type type, Marker marker)
return toIntExact(((Long) marker.getValue()));
}

if (type instanceof TimestampType || type instanceof TimeType) {
if (type instanceof TimestampType) {
TimestampType tsType = (TimestampType) type;
long value = (Long) marker.getValue();
if (tsType.getPrecision() == MILLISECONDS) {
return MILLISECONDS.toMicros(value);
}
return value;
}

if (type instanceof TimeType) {
return MILLISECONDS.toMicros((Long) marker.getValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public enum FileFormat
ORC("orc", true),
PARQUET("parquet", true),
AVRO("avro", true),
METADATA("metadata.json", false);
METADATA("metadata.json", false),
PUFFIN("puffin", false);

private final String ext;
private final boolean splittable;
Expand Down Expand Up @@ -61,6 +62,9 @@ public static FileFormat fromIcebergFileFormat(org.apache.iceberg.FileFormat for
case METADATA:
prestoFileFormat = METADATA;
break;
case PUFFIN:
prestoFileFormat = PUFFIN;
break;
default:
throw new PrestoException(NOT_SUPPORTED, "Unsupported file format: " + format);
}
Expand All @@ -81,6 +85,12 @@ public org.apache.iceberg.FileFormat toIceberg()
case AVRO:
fileFormat = org.apache.iceberg.FileFormat.AVRO;
break;
case METADATA:
fileFormat = org.apache.iceberg.FileFormat.METADATA;
break;
case PUFFIN:
fileFormat = org.apache.iceberg.FileFormat.PUFFIN;
break;
default:
throw new PrestoException(NOT_SUPPORTED, "Unsupported file format: " + this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,9 @@ protected static void validateTableForPresto(BaseTable table, Optional<Long> tab
schema = metadata.schema();
}

// Reject schema default values (initial-default / write-default)
for (Types.NestedField field : schema.columns()) {
if (field.initialDefault() != null || field.writeDefault() != null) {
throw new PrestoException(NOT_SUPPORTED, "Iceberg v3 column default values are not supported");
}
}
// Iceberg v3 column default values (initial-default / write-default) are supported.
// The Iceberg library handles applying defaults when reading files that were written
// before a column with a default was added via schema evolution.

// Reject Iceberg table encryption
if (!metadata.encryptionKeys().isEmpty() || snapshot.keyId() != null || metadata.properties().containsKey("encryption.key-id")) {
Expand Down Expand Up @@ -1524,8 +1521,23 @@ public Optional<ConnectorOutputMetadata> finishDeleteWithOutput(ConnectorSession
.ofPositionDeletes()
.withPath(task.getPath())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withFormat(FileFormat.fromString(task.getFileFormat().name()))
.withMetrics(task.getMetrics().metrics());
.withFormat(FileFormat.fromString(task.getFileFormat().name()));

if (task.getFileFormat() == com.facebook.presto.iceberg.FileFormat.PUFFIN) {
builder.withRecordCount(task.getRecordCount().orElseThrow(() ->
new VerifyException("recordCount required for deletion vector")));
builder.withContentOffset(task.getContentOffset().orElseThrow(() ->
new VerifyException("contentOffset required for deletion vector")));
builder.withContentSizeInBytes(task.getContentSizeInBytes().orElseThrow(() ->
new VerifyException("contentSizeInBytes required for deletion vector")));
}
else {
builder.withMetrics(task.getMetrics().metrics());
}

if (task.getReferencedDataFile().isPresent()) {
builder.withReferencedDataFile(task.getReferencedDataFile().get());
}

if (!spec.fields().isEmpty()) {
String partitionDataJson = task.getPartitionDataJson()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.facebook.presto.iceberg.procedure.RegisterTableProcedure;
import com.facebook.presto.iceberg.procedure.RemoveOrphanFiles;
import com.facebook.presto.iceberg.procedure.RewriteDataFilesProcedure;
import com.facebook.presto.iceberg.procedure.RewriteDeleteFilesProcedure;
import com.facebook.presto.iceberg.procedure.RewriteManifestsProcedure;
import com.facebook.presto.iceberg.procedure.RollbackToSnapshotProcedure;
import com.facebook.presto.iceberg.procedure.RollbackToTimestampProcedure;
Expand Down Expand Up @@ -195,6 +196,7 @@ protected void setup(Binder binder)
procedures.addBinding().toProvider(StatisticsFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(ManifestFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RewriteDeleteFilesProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(RewriteManifestsProcedure.class).in(Scopes.SINGLETON);

// for orc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.airlift.bootstrap.LifeCycleManager;
import com.facebook.presto.hive.HiveTransactionHandle;
import com.facebook.presto.iceberg.function.IcebergBucketFunction;
import com.facebook.presto.iceberg.function.VariantFunctions;
import com.facebook.presto.iceberg.function.changelog.ApplyChangelogFunction;
import com.facebook.presto.iceberg.transaction.IcebergTransactionManager;
import com.facebook.presto.iceberg.transaction.IcebergTransactionMetadata;
Expand Down Expand Up @@ -256,6 +257,7 @@ public Set<Class<?>> getSystemFunctions()
.add(ApplyChangelogFunction.class)
.add(IcebergBucketFunction.class)
.add(IcebergBucketFunction.Bucket.class)
.add(VariantFunctions.class)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public enum IcebergErrorCode
ICEBERG_INVALID_MATERIALIZED_VIEW(18, EXTERNAL),
ICEBERG_INVALID_SPEC_ID(19, EXTERNAL),
ICEBERG_TRANSACTION_CONFLICT_ERROR(20, EXTERNAL),
ICEBERG_WRITER_CLOSE_ERROR(21, EXTERNAL),
/**/;

private final ErrorCode errorCode;
Expand Down
Loading
Loading