Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ public enum IcebergErrorCode
ICEBERG_MISSING_DATA(5, EXTERNAL),
ICEBERG_CANNOT_OPEN_SPLIT(6, EXTERNAL),
ICEBERG_WRITER_OPEN_ERROR(7, EXTERNAL),
ICEBERG_FILESYSTEM_ERROR(8, EXTERNAL),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to change the existing error codes?

ICEBERG_CURSOR_ERROR(9, EXTERNAL),
ICEBERG_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR),
ICEBERG_INVALID_SNAPSHOT_ID(11, USER_ERROR),
ICEBERG_WRITER_CLOSE_ERROR(8, EXTERNAL),
ICEBERG_FILESYSTEM_ERROR(9, EXTERNAL),
ICEBERG_CURSOR_ERROR(10, EXTERNAL),
ICEBERG_WRITE_VALIDATION_FAILED(11, INTERNAL_ERROR),
ICEBERG_INVALID_SNAPSHOT_ID(12, USER_ERROR),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ public enum IcebergFileFormat
{
ORC,
PARQUET,
AVRO
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add trailing comma

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.prestosql.plugin.hive.NodeVersion;
import io.prestosql.plugin.hive.orc.HdfsOrcDataSource;
import io.prestosql.plugin.hive.orc.OrcWriterConfig;
import io.prestosql.plugin.iceberg.avro.IcebergAvroFileWriter;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.type.Type;
Expand Down Expand Up @@ -104,6 +105,8 @@ public OrcWriterStats getOrcWriterStats()
}

public IcebergFileWriter createFileWriter(
String schemaName,
String tableName,
Path outputPath,
Schema icebergSchema,
JobConf jobConf,
Expand All @@ -116,6 +119,8 @@ public IcebergFileWriter createFileWriter(
return createParquetWriter(outputPath, icebergSchema, jobConf, session, hdfsContext);
case ORC:
return createOrcWriter(outputPath, icebergSchema, jobConf, session);
case AVRO:
return createAvroWriter(schemaName, tableName, outputPath, icebergSchema, session);
}
throw new PrestoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat);
}
Expand Down Expand Up @@ -233,4 +238,24 @@ private IcebergFileWriter createOrcWriter(
throw new PrestoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating ORC file", e);
}
}

private IcebergFileWriter createAvroWriter(
String schemaName,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the requirement of providing a "tableName" in Avro.WriteBuilder#named() api feels a bit strange to me. However, I was also wondering if we could just pass hdfsContext as an argument here and use the table name from there.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. We already pass HdfsContext to createParquetWriter()

String tableName,
Path outputPath,
Schema icebergSchema,
ConnectorSession session)
{
List<Type> fileColumnTypes = icebergSchema.columns().stream()
.map(column -> toPrestoType(column.type(), typeManager))
.collect(toImmutableList());
return new IcebergAvroFileWriter(
tableName,
outputPath,
hdfsEnvironment,
new HdfsContext(session, schemaName, tableName),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should pass hdfsContext in directly

icebergSchema,
fileColumnTypes,
getCompressionCodec(session));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class IcebergPageSink

@SuppressWarnings({"FieldCanBeLocal", "FieldMayBeStatic"})
private final int maxOpenWriters = 100; // TODO: make this configurable
private final String schemaName;
private final String tableName;
private final Schema outputSchema;
private final PartitionSpec partitionSpec;
private final String outputPath;
Expand All @@ -102,6 +104,8 @@ public class IcebergPageSink
private long validationCpuNanos;

public IcebergPageSink(
String schemaName,
String tableName,
Schema outputSchema,
PartitionSpec partitionSpec,
String outputPath,
Expand All @@ -114,6 +118,8 @@ public IcebergPageSink(
ConnectorSession session,
FileFormat fileFormat)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
requireNonNull(inputColumns, "inputColumns is null");
this.outputSchema = requireNonNull(outputSchema, "outputSchema is null");
this.partitionSpec = requireNonNull(partitionSpec, "partitionSpec is null");
Expand Down Expand Up @@ -307,6 +313,8 @@ private WriteContext createWriter(Optional<String> partitionPath, Optional<Parti
outputPath = new Path(fileFormat.addExtension(outputPath.toString()));

IcebergFileWriter writer = fileWriterFactory.createFileWriter(
schemaName,
tableName,
outputPath,
outputSchema,
jobConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab
Schema schema = SchemaParser.fromJson(tableHandle.getSchemaAsJson());
PartitionSpec partitionSpec = PartitionSpecParser.fromJson(schema, tableHandle.getPartitionSpecAsJson());
return new IcebergPageSink(
tableHandle.getSchemaName(),
tableHandle.getTableName(),
schema,
partitionSpec,
tableHandle.getOutputPath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.prestosql.plugin.hive.parquet.HdfsParquetDataSource;
import io.prestosql.plugin.hive.parquet.ParquetPageSource;
import io.prestosql.plugin.hive.parquet.ParquetReaderConfig;
import io.prestosql.plugin.iceberg.avro.IcebergAvroPageSource;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorPageSource;
Expand All @@ -55,6 +56,7 @@
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.type.StandardTypes;
import io.prestosql.spi.type.TimeZoneKey;
import io.prestosql.spi.type.Type;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
Expand All @@ -63,6 +65,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
Expand Down Expand Up @@ -167,8 +171,10 @@ public ConnectorPageSource createPageSource(
split.getStart(),
split.getLength(),
split.getFileFormat(),
SchemaParser.fromJson(split.getSchemaAsJson()),
regularColumns,
table.getPredicate());
table.getPredicate(),
session.getTimeZoneKey());

return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, session.getTimeZoneKey());
}
Expand All @@ -180,8 +186,10 @@ private ConnectorPageSource createDataPageSource(
long start,
long length,
FileFormat fileFormat,
Schema fileSchema,
List<IcebergColumnHandle> dataColumns,
TupleDomain<IcebergColumnHandle> predicate)
TupleDomain<IcebergColumnHandle> predicate,
TimeZoneKey timeZoneKey)
{
switch (fileFormat) {
case ORC:
Expand Down Expand Up @@ -227,6 +235,16 @@ private ConnectorPageSource createDataPageSource(
.withMaxReadBlockSize(getParquetMaxReadBlockSize(session)),
predicate,
fileFormatDataSourceStats);
case AVRO:
return new IcebergAvroPageSource(
path.toString(),
start,
length,
hdfsEnvironment,
hdfsContext,
fileSchema,
dataColumns,
timeZoneKey);
}
throw new PrestoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class IcebergSplit
private final long length;
private final FileFormat fileFormat;
private final List<HostAddress> addresses;
private final String schemaAsJson;
private final Map<Integer, String> partitionKeys;

@JsonCreator
Expand All @@ -45,13 +46,15 @@ public IcebergSplit(
@JsonProperty("length") long length,
@JsonProperty("fileFormat") FileFormat fileFormat,
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("schemaAsJson") String schemaAsJson,
@JsonProperty("partitionKeys") Map<Integer, String> partitionKeys)
{
this.path = requireNonNull(path, "path is null");
this.start = start;
this.length = length;
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.schemaAsJson = requireNonNull(schemaAsJson, "schemaAsJson is null");
this.partitionKeys = Collections.unmodifiableMap(requireNonNull(partitionKeys, "partitionKeys is null"));
}

Expand Down Expand Up @@ -92,6 +95,12 @@ public FileFormat getFileFormat()
return fileFormat;
}

@JsonProperty
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add @JsonRawValue here and name the serialized field schema, so that the serialized split will look more natural.

@JsonRawValue
@JsonProperty("schema")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serialized schema can be large. I think we should make it optional and only pass it for Avro. No need to pay the serialization cost for other formats.

public String getSchemaAsJson()
{
return schemaAsJson;
}

@JsonProperty
public Map<Integer, String> getPartitionKeys()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -105,6 +106,7 @@ private ConnectorSplit toIcebergSplit(FileScanTask task)
task.length(),
task.file().format(),
ImmutableList.of(),
SchemaParser.toJson(task.spec().schema()),
getPartitionKeys(task));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,12 @@ private TableStatistics makeTableStatistics(ConnectorSession session, ConnectorT
summary.incrementFileCount();
summary.incrementRecordCount(dataFile.recordCount());
summary.incrementSize(dataFile.fileSizeInBytes());
updateSummaryMin(summary, partitionFields, toMap(idToTypeMapping, dataFile.lowerBounds()), dataFile.nullValueCounts(), dataFile.recordCount());
updateSummaryMax(summary, partitionFields, toMap(idToTypeMapping, dataFile.upperBounds()), dataFile.nullValueCounts(), dataFile.recordCount());
summary.updateNullCount(dataFile.nullValueCounts());
updateColumnSizes(summary, dataFile.columnSizes());
if (summary.hasValidColumnMetrics()) {
updateSummaryMin(summary, partitionFields, toMap(idToTypeMapping, dataFile.lowerBounds()), dataFile.nullValueCounts(), dataFile.recordCount());
updateSummaryMax(summary, partitionFields, toMap(idToTypeMapping, dataFile.upperBounds()), dataFile.nullValueCounts(), dataFile.recordCount());
summary.updateNullCount(dataFile.nullValueCounts());
updateColumnSizes(summary, dataFile.columnSizes());
}
}
}
}
Expand All @@ -173,25 +175,27 @@ private TableStatistics makeTableStatistics(ConnectorSession session, ConnectorT

ImmutableMap.Builder<ColumnHandle, ColumnStatistics> columnHandleBuilder = ImmutableMap.builder();
double recordCount = summary.getRecordCount();
for (IcebergColumnHandle columnHandle : idToColumnHandle.values()) {
int fieldId = columnHandle.getId();
ColumnStatistics.Builder columnBuilder = new ColumnStatistics.Builder();
Long nullCount = summary.getNullCounts().get(fieldId);
if (nullCount != null) {
columnBuilder.setNullsFraction(Estimate.of(nullCount / recordCount));
}
if (summary.getColumnSizes() != null) {
Long columnSize = summary.getColumnSizes().get(fieldId);
if (columnSize != null) {
columnBuilder.setDataSize(Estimate.of(columnSize));
if (summary.hasValidColumnMetrics()) {
for (IcebergColumnHandle columnHandle : idToColumnHandle.values()) {
int fieldId = columnHandle.getId();
ColumnStatistics.Builder columnBuilder = new ColumnStatistics.Builder();
Long nullCount = summary.getNullCounts().get(fieldId);
if (nullCount != null) {
columnBuilder.setNullsFraction(Estimate.of(nullCount / recordCount));
}
if (summary.getColumnSizes() != null) {
Long columnSize = summary.getColumnSizes().get(fieldId);
if (columnSize != null) {
columnBuilder.setDataSize(Estimate.of(columnSize));
}
}
Object min = summary.getMinValues().get(fieldId);
Object max = summary.getMaxValues().get(fieldId);
if (min instanceof Number && max instanceof Number) {
columnBuilder.setRange(Optional.of(new DoubleRange(((Number) min).doubleValue(), ((Number) max).doubleValue())));
}
columnHandleBuilder.put(columnHandle, columnBuilder.build());
}
Object min = summary.getMinValues().get(fieldId);
Object max = summary.getMaxValues().get(fieldId);
if (min instanceof Number && max instanceof Number) {
columnBuilder.setRange(Optional.of(new DoubleRange(((Number) min).doubleValue(), ((Number) max).doubleValue())));
}
columnHandleBuilder.put(columnHandle, columnBuilder.build());
}
return new TableStatistics(Estimate.of(recordCount), columnHandleBuilder.build());
}
Expand Down
Loading