diff --git a/pom.xml b/pom.xml
index f22289540e7d..6e584d8f1e3e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -438,7 +438,7 @@
io.prestosql.orc
orc-protobuf
- 9
+ 10
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/OrcFileWriter.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/OrcFileWriter.java
index 9b8a20412d2c..cc3d82337361 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/OrcFileWriter.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/OrcFileWriter.java
@@ -21,6 +21,7 @@
import io.prestosql.orc.OrcWriterOptions;
import io.prestosql.orc.OrcWriterStats;
import io.prestosql.orc.metadata.CompressionKind;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.spi.Page;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.block.Block;
@@ -65,6 +66,7 @@ public OrcFileWriter(
Callable rollbackAction,
List columnNames,
List fileColumnTypes,
+ List flattenedOrcTypes,
CompressionKind compression,
OrcWriterOptions options,
boolean writeLegacyVersion,
@@ -81,6 +83,7 @@ public OrcFileWriter(
orcDataSink,
columnNames,
fileColumnTypes,
+ flattenedOrcTypes,
compression,
options,
writeLegacyVersion,
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/OrcFileWriterFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/OrcFileWriterFactory.java
index 2d51437653a4..6eea94bbcdba 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/OrcFileWriterFactory.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/OrcFileWriterFactory.java
@@ -21,6 +21,7 @@
import io.prestosql.orc.OrcWriterStats;
import io.prestosql.orc.OutputStreamOrcDataSink;
import io.prestosql.orc.metadata.CompressionKind;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.plugin.hive.metastore.StorageFormat;
import io.prestosql.plugin.hive.orc.HdfsOrcDataSource;
import io.prestosql.spi.PrestoException;
@@ -140,6 +141,7 @@ public Optional createFileWriter(
List fileColumnTypes = getColumnTypes(schema).stream()
.map(hiveType -> hiveType.getType(typeManager))
.collect(toList());
+ List flattenedOrcTypes = OrcType.createOrcRowType(0, fileColumnNames, fileColumnTypes);
int[] fileInputColumnIndexes = fileColumnNames.stream()
.mapToInt(inputColumnNames::indexOf)
@@ -179,6 +181,7 @@ public Optional createFileWriter(
rollbackAction,
fileColumnNames,
fileColumnTypes,
+ flattenedOrcTypes,
compression,
orcWriterOptions
.withStripeMinSize(getOrcOptimizedWriterMinStripeSize(session))
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/util/TempFileWriter.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/util/TempFileWriter.java
index df45b9e9fc47..e8ae9d2e2466 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/util/TempFileWriter.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/util/TempFileWriter.java
@@ -20,6 +20,7 @@
import io.prestosql.orc.OrcWriter;
import io.prestosql.orc.OrcWriterOptions;
import io.prestosql.orc.OrcWriterStats;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.spi.Page;
import io.prestosql.spi.type.Type;
@@ -77,6 +78,7 @@ private static OrcWriter createOrcFileWriter(OrcDataSink sink, List types)
sink,
columnNames,
types,
+ OrcType.createOrcRowType(0, columnNames, types),
LZ4,
new OrcWriterOptions()
.withMaxStringStatisticsLimit(new DataSize(0, BYTE))
diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java
index 3d58b4e514ff..8e252d528b65 100644
--- a/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java
+++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java
@@ -19,6 +19,7 @@
import io.prestosql.orc.OrcWriterOptions;
import io.prestosql.orc.OrcWriterStats;
import io.prestosql.orc.OutputStreamOrcDataSink;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
import io.prestosql.plugin.hive.GenericHiveRecordCursorProvider;
import io.prestosql.plugin.hive.HdfsEnvironment;
@@ -447,6 +448,7 @@ public PrestoOrcFormatWriter(File targetFile, List columnNames, Listpresto-parquet
+
+ io.prestosql
+ presto-orc
+
+
com.google.guava
guava
@@ -166,6 +171,18 @@
+
+ ${dep.iceberg.groupId}
+ iceberg-orc
+ ${dep.iceberg.version}
+
+
+ org.apache.orc
+ orc-core
+
+
+
+
io.prestosql
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java
index 2d407bb25387..4b129d6ae5cc 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java
@@ -51,6 +51,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
@@ -87,8 +88,8 @@
import static io.prestosql.plugin.iceberg.IcebergUtil.isIcebergTable;
import static io.prestosql.plugin.iceberg.PartitionFields.parsePartitionFields;
import static io.prestosql.plugin.iceberg.PartitionFields.toPartitionFields;
-import static io.prestosql.plugin.iceberg.TypeConveter.toIcebergType;
-import static io.prestosql.plugin.iceberg.TypeConveter.toPrestoType;
+import static io.prestosql.plugin.iceberg.TypeConverter.toIcebergType;
+import static io.prestosql.plugin.iceberg.TypeConverter.toPrestoType;
import static io.prestosql.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
@@ -99,6 +100,7 @@
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.iceberg.TableMetadata.newTableMetadata;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.Transactions.createTableTransaction;
public class IcebergMetadata
@@ -290,7 +292,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
throw new TableAlreadyExistsException(schemaTableName);
}
- TableMetadata metadata = newTableMetadata(operations, schema, partitionSpec, targetPath.toString());
+ FileFormat fileFormat = (FileFormat) tableMetadata.getProperties().get(FILE_FORMAT_PROPERTY);
+ TableMetadata metadata = newTableMetadata(operations, schema, partitionSpec, targetPath.toString(), ImmutableMap.of(DEFAULT_FILE_FORMAT, fileFormat.toString()));
transaction = createTableTransaction(operations, metadata);
@@ -301,7 +304,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
PartitionSpecParser.toJson(metadata.spec()),
getColumns(metadata.schema(), metadata.spec(), typeManager),
targetPath.toString(),
- getFileFormat(tableMetadata.getProperties()));
+ fileFormat);
}
@Override
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java
index 1f28a8cb9bea..ad2061237ad7 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java
@@ -14,13 +14,27 @@
package io.prestosql.plugin.iceberg;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
+import io.prestosql.orc.OrcDataSink;
+import io.prestosql.orc.OrcDataSource;
+import io.prestosql.orc.OrcWriteValidation;
+import io.prestosql.orc.OrcWriterOptions;
+import io.prestosql.orc.OrcWriterStats;
+import io.prestosql.orc.OutputStreamOrcDataSink;
+import io.prestosql.orc.metadata.CompressionKind;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.plugin.hive.HiveConfig;
import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveMetadata;
import io.prestosql.plugin.hive.HiveStorageFormat;
+import io.prestosql.plugin.hive.NodeVersion;
+import io.prestosql.plugin.hive.OrcFileWriter;
+import io.prestosql.plugin.hive.OrcFileWriterConfig;
import io.prestosql.plugin.hive.RecordFileWriter;
import io.prestosql.plugin.iceberg.PartitionTransforms.ColumnTransform;
import io.prestosql.spi.Page;
@@ -34,6 +48,7 @@
import io.prestosql.spi.type.StandardTypes;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.TypeManager;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.mapred.JobConf;
@@ -44,9 +59,14 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.orc.OrcMetrics;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.OrcConf;
+import org.joda.time.DateTimeZone;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -54,24 +74,38 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.Slices.wrappedBuffer;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR;
+import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcOptimizedWriterMaxDictionaryMemory;
+import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcOptimizedWriterMaxStripeRows;
+import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcOptimizedWriterMaxStripeSize;
+import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcOptimizedWriterMinStripeSize;
+import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcOptimizedWriterValidateMode;
+import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcStringStatisticsLimit;
import static io.prestosql.plugin.hive.ParquetRecordWriterUtil.setParquetSchema;
import static io.prestosql.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static io.prestosql.plugin.hive.util.ConfigurationUtils.toJobConf;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_TOO_MANY_OPEN_PARTITIONS;
import static io.prestosql.plugin.iceberg.PartitionTransforms.getColumnTransform;
+import static io.prestosql.plugin.iceberg.TypeConverter.toOrcStructType;
+import static io.prestosql.plugin.iceberg.TypeConverter.toPrestoType;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.spi.type.DateTimeEncoding.unpackMillisUtc;
import static io.prestosql.spi.type.Decimals.readBigDecimal;
import static java.lang.Float.intBitsToFloat;
import static java.lang.Math.toIntExact;
import static java.lang.String.format;
+import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.completedFuture;
@@ -104,6 +138,10 @@ public class IcebergPageSink
private long systemMemoryUsage;
private long validationCpuNanos;
+ private final HiveConfig hiveConfig;
+ private final OrcFileWriterConfig orcFileWriterConfig;
+ private final NodeVersion nodeVersion;
+
public IcebergPageSink(
Schema outputSchema,
PartitionSpec partitionSpec,
@@ -115,7 +153,10 @@ public IcebergPageSink(
TypeManager typeManager,
JsonCodec jsonCodec,
ConnectorSession session,
- FileFormat fileFormat)
+ FileFormat fileFormat,
+ HiveConfig hiveConfig,
+ OrcFileWriterConfig orcFileWriterConfig,
+ NodeVersion nodeVersion)
{
requireNonNull(inputColumns, "inputColumns is null");
this.outputSchema = requireNonNull(outputSchema, "outputSchema is null");
@@ -130,6 +171,9 @@ public IcebergPageSink(
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.inputColumns = ImmutableList.copyOf(inputColumns);
this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(typeManager, inputColumns, partitionSpec));
+ this.hiveConfig = requireNonNull(hiveConfig, "hiveConfig is null");
+ this.orcFileWriterConfig = requireNonNull(orcFileWriterConfig, "orcFileWriterConfig is null");
+ this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
}
@Override
@@ -321,6 +365,8 @@ private HiveFileWriter createWriter(Path outputPath)
switch (fileFormat) {
case PARQUET:
return createParquetWriter(outputPath);
+ case ORC:
+ return createOrcWriter(outputPath);
}
throw new PrestoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat);
}
@@ -356,6 +402,9 @@ private Metrics readMetrics(Path path)
switch (fileFormat) {
case PARQUET:
return ParquetUtil.fileMetrics(HadoopInputFile.fromPath(path, jobConf), MetricsConfig.getDefault());
+ case ORC:
+ //TODO:LXY
+ return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(path, jobConf), jobConf);
}
throw new PrestoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat);
}
@@ -543,4 +592,71 @@ public Function getBlockTransform()
return blockTransform;
}
}
+
+ private HiveFileWriter createOrcWriter(Path path)
+ {
+ try {
+ FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getUser(), path, jobConf);
+ OrcDataSink orcDataSink = new OutputStreamOrcDataSink(fileSystem.create(path));
+ Callable rollbackAction = () -> {
+ fileSystem.delete(path, false);
+ return null;
+ };
+ List columnFields = outputSchema.columns();
+ List columnNames = columnFields.stream().map(Types.NestedField::name).collect(toImmutableList());
+ List columnTypes = columnFields.stream().map(Types.NestedField::type).map(type -> toPrestoType(type, typeManager)).collect(toImmutableList());
+ List flattenedOrcTypes = toOrcStructType(0, outputSchema.asStruct(), -1);
+ CompressionKind compression = getCompression(jobConf);
+ OrcWriterOptions orcWriterOptions = orcFileWriterConfig.toOrcWriterOptions()
+ .withStripeMinSize(getOrcOptimizedWriterMinStripeSize(session))
+ .withStripeMaxSize(getOrcOptimizedWriterMaxStripeSize(session))
+ .withStripeMaxRowCount(getOrcOptimizedWriterMaxStripeRows(session))
+ .withDictionaryMaxMemory(getOrcOptimizedWriterMaxDictionaryMemory(session))
+ .withMaxStringStatisticsLimit(getOrcStringStatisticsLimit(session));
+ boolean writeLegacyVersion = hiveConfig.isOrcWriteLegacyVersion();
+ int[] fileInputColumnIndexes = IntStream.range(0, columnNames.size()).toArray();
+ Map metadata = ImmutableMap.builder()
+ .put(HiveMetadata.PRESTO_VERSION_NAME, nodeVersion.toString())
+ .put(HiveMetadata.PRESTO_QUERY_ID_NAME, session.getQueryId())
+ .build();
+ DateTimeZone hiveStorageTimeZone = hiveConfig.getDateTimeZone();
+ Optional> validationInputFactory = Optional.empty();
+ OrcWriteValidation.OrcWriteValidationMode validationMode = getOrcOptimizedWriterValidateMode(session);
+ OrcWriterStats stats = new OrcWriterStats();
+ return new OrcFileWriter(
+ orcDataSink,
+ rollbackAction,
+ columnNames,
+ columnTypes,
+ flattenedOrcTypes,
+ compression,
+ orcWriterOptions,
+ writeLegacyVersion,
+ fileInputColumnIndexes,
+ metadata,
+ hiveStorageTimeZone,
+ validationInputFactory,
+ validationMode,
+ stats);
+ }
+ catch (IOException e) {
+ throw new PrestoException(HIVE_WRITER_OPEN_ERROR, "Error creating ORC file", e);
+ }
+ }
+
+ private static CompressionKind getCompression(JobConf configuration)
+ {
+ String compressionName = OrcConf.COMPRESS.getString(configuration);
+ if (compressionName == null) {
+ return CompressionKind.ZLIB;
+ }
+ CompressionKind compression;
+ try {
+ compression = CompressionKind.valueOf(compressionName.toUpperCase(ENGLISH));
+ }
+ catch (IllegalArgumentException e) {
+ throw new PrestoException(HIVE_UNSUPPORTED_FORMAT, "Unknown ORC compression type " + compressionName);
+ }
+ return compression;
+ }
}
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSinkProvider.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSinkProvider.java
index 6556e9091589..46e2caf5e32b 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSinkProvider.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSinkProvider.java
@@ -16,6 +16,9 @@
import io.airlift.json.JsonCodec;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
+import io.prestosql.plugin.hive.HiveConfig;
+import io.prestosql.plugin.hive.NodeVersion;
+import io.prestosql.plugin.hive.OrcFileWriterConfig;
import io.prestosql.spi.PageIndexerFactory;
import io.prestosql.spi.connector.ConnectorInsertTableHandle;
import io.prestosql.spi.connector.ConnectorOutputTableHandle;
@@ -40,18 +43,27 @@ public class IcebergPageSinkProvider
private final JsonCodec jsonCodec;
private final TypeManager typeManager;
private final PageIndexerFactory pageIndexerFactory;
+ private final HiveConfig hiveConfig;
+ private final OrcFileWriterConfig orcFileWriterConfig;
+ private final NodeVersion nodeVersion;
@Inject
public IcebergPageSinkProvider(
HdfsEnvironment hdfsEnvironment,
JsonCodec jsonCodec,
TypeManager typeManager,
- PageIndexerFactory pageIndexerFactory)
+ PageIndexerFactory pageIndexerFactory,
+ HiveConfig hiveConfig,
+ OrcFileWriterConfig orcFileWriterConfig,
+ NodeVersion nodeVersion)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
+ this.hiveConfig = requireNonNull(hiveConfig, "hiveConfig is null");
+ this.orcFileWriterConfig = requireNonNull(orcFileWriterConfig, "orcFileWriterConfig is null");
+ this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
}
@Override
@@ -82,6 +94,9 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab
typeManager,
jsonCodec,
session,
- tableHandle.getFileFormat());
+ tableHandle.getFileFormat(),
+ hiveConfig,
+ orcFileWriterConfig,
+ nodeVersion);
}
}
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java
index ec94532953af..749a894fdb98 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java
@@ -17,6 +17,13 @@
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import io.prestosql.memory.context.AggregatedMemoryContext;
+import io.prestosql.orc.OrcDataSource;
+import io.prestosql.orc.OrcDataSourceId;
+import io.prestosql.orc.OrcPredicate;
+import io.prestosql.orc.OrcReader;
+import io.prestosql.orc.OrcRecordReader;
+import io.prestosql.orc.TupleDomainOrcPredicate;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.parquet.ParquetCorruptionException;
import io.prestosql.parquet.ParquetDataSource;
import io.prestosql.parquet.RichColumnDescriptor;
@@ -27,9 +34,12 @@
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.plugin.hive.HiveConfig;
import io.prestosql.plugin.hive.HivePageSource;
import io.prestosql.plugin.hive.HivePageSourceProvider.ColumnMapping;
import io.prestosql.plugin.hive.HivePartitionKey;
+import io.prestosql.plugin.hive.orc.HdfsOrcDataSource;
+import io.prestosql.plugin.hive.orc.OrcPageSource;
import io.prestosql.plugin.hive.parquet.ParquetPageSource;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ColumnHandle;
@@ -43,6 +53,7 @@
import io.prestosql.spi.type.TypeManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockMissingException;
@@ -57,6 +68,7 @@
import javax.inject.Inject;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -67,8 +79,10 @@
import java.util.Properties;
import java.util.stream.Collectors;
+import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
+import static io.prestosql.orc.OrcReader.INITIAL_BATCH_SIZE;
import static io.prestosql.parquet.ParquetTypeUtils.getColumnIO;
import static io.prestosql.parquet.ParquetTypeUtils.getDescriptors;
import static io.prestosql.parquet.predicate.PredicateUtils.buildPredicate;
@@ -76,13 +90,23 @@
import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.REGULAR;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_BAD_DATA;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_MISSING_DATA;
import static io.prestosql.plugin.hive.HivePageSourceProvider.ColumnMapping.buildColumnMappings;
+import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcLazyReadSmallRanges;
+import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcMaxBufferSize;
+import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcMaxMergeDistance;
+import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcMaxReadBlockSize;
+import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcStreamBufferSize;
+import static io.prestosql.plugin.hive.HiveSessionProperties.getOrcTinyStripeThreshold;
+import static io.prestosql.plugin.hive.HiveSessionProperties.isOrcBloomFiltersEnabled;
import static io.prestosql.plugin.hive.parquet.HdfsParquetDataSource.buildHdfsParquetDataSource;
import static io.prestosql.plugin.hive.parquet.ParquetPageSourceFactory.getParquetTupleDomain;
import static io.prestosql.plugin.hive.parquet.ParquetPageSourceFactory.getParquetType;
import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getParquetMaxReadBlockSize;
import static io.prestosql.plugin.iceberg.IcebergSessionProperties.isFailOnCorruptedParquetStatistics;
+import static io.prestosql.plugin.iceberg.IcebergUtil.ICEBERG_FIELD_ID_KEY;
+import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
@@ -94,15 +118,19 @@ public class IcebergPageSourceProvider
private final TypeManager typeManager;
private final FileFormatDataSourceStats fileFormatDataSourceStats;
+ private final HiveConfig hiveConfig;
+
@Inject
public IcebergPageSourceProvider(
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
- FileFormatDataSourceStats fileFormatDataSourceStats)
+ FileFormatDataSourceStats fileFormatDataSourceStats,
+ HiveConfig hiveConfig)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null");
+ this.hiveConfig = requireNonNull(hiveConfig, "hiveConfig is null");
}
@Override
@@ -118,22 +146,57 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti
.map(HiveColumnHandle.class::cast)
.collect(toList());
HdfsContext hdfsContext = new HdfsContext(session, table.getSchemaName(), table.getTableName());
- return createParquetPageSource(
- hdfsEnvironment,
- session.getUser(),
- hdfsEnvironment.getConfiguration(hdfsContext, path),
- path,
- start,
- length,
- hiveColumns,
- split.getNameToId(),
- false,
- typeManager,
- getParquetMaxReadBlockSize(session),
- isFailOnCorruptedParquetStatistics(session),
- split.getPredicate(),
- split.getPartitionKeys(),
- fileFormatDataSourceStats);
+ switch (split.getFileFormat()) {
+ case PARQUET:
+ return createParquetPageSource(
+ hdfsEnvironment,
+ session.getUser(),
+ hdfsEnvironment.getConfiguration(hdfsContext, path),
+ path,
+ start,
+ length,
+ hiveColumns,
+ split.getNameToId(),
+ false,
+ typeManager,
+ getParquetMaxReadBlockSize(session),
+ isFailOnCorruptedParquetStatistics(session),
+ split.getPredicate(),
+ split.getPartitionKeys(),
+ fileFormatDataSourceStats);
+ case ORC:
+ try {
+ FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, path);
+ FileStatus fileStatus = fileSystem.getFileStatus(path);
+ long fileSize = fileStatus.getLen();
+ return createOrcPageSource(
+ hdfsEnvironment,
+ session.getUser(),
+ hdfsEnvironment.getConfiguration(hdfsContext, path),
+ path,
+ start,
+ length,
+ fileSize,
+ hiveColumns,
+ split.getNameToId(),
+ split.getPredicate(),
+ hiveConfig.getDateTimeZone(),
+ typeManager,
+ getOrcMaxMergeDistance(session),
+ getOrcMaxBufferSize(session),
+ getOrcStreamBufferSize(session),
+ getOrcTinyStripeThreshold(session),
+ getOrcMaxReadBlockSize(session),
+ getOrcLazyReadSmallRanges(session),
+ isOrcBloomFiltersEnabled(session),
+ split.getPartitionKeys(),
+ fileFormatDataSourceStats);
+ }
+ catch (IOException e) {
+ throw new PrestoException(HIVE_FILESYSTEM_ERROR, e);
+ }
+ }
+ throw new PrestoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + split.getFileFormat());
}
// TODO: move column rename handling out and reuse ParquetPageSourceFactory.createPageSource()
@@ -310,4 +373,155 @@ private static Map convertToParquetNames(List columns,
+ Map icebergNameToId,
+ TupleDomain effectivePredicate,
+ DateTimeZone hiveStorageTimeZone,
+ TypeManager typeManager,
+ DataSize maxMergeDistance,
+ DataSize maxBufferSize,
+ DataSize streamBufferSize,
+ DataSize tinyStripeThreshold,
+ DataSize maxReadBlockSize,
+ boolean lazyReadSmallRanges,
+ boolean orcBloomFiltersEnabled,
+ List partitionKeys,
+ FileFormatDataSourceStats stats)
+ {
+ OrcDataSource orcDataSource;
+ try {
+ FileSystem fileSystem = hdfsEnvironment.getFileSystem(user, path, configuration);
+ FSDataInputStream inputStream = hdfsEnvironment.doAs(user, () -> fileSystem.open(path));
+ orcDataSource = new HdfsOrcDataSource(
+ new OrcDataSourceId(path.toString()),
+ fileSize,
+ maxMergeDistance,
+ maxBufferSize,
+ streamBufferSize,
+ lazyReadSmallRanges,
+ inputStream,
+ stats);
+ }
+ catch (Exception e) {
+ if (nullToEmpty(e.getMessage()).trim().equals("Filesystem closed") ||
+ e instanceof FileNotFoundException) {
+ throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, e);
+ }
+ throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, format("Error opening Hive split %s (offset=%s, length=%s): %s", path, start, length, e.getMessage()));
+ }
+
+ AggregatedMemoryContext systemMemoryUsage = newSimpleAggregatedMemoryContext();
+ OrcPageSource orcPageSource;
+ try {
+ OrcReader reader = new OrcReader(orcDataSource, maxMergeDistance, tinyStripeThreshold, maxReadBlockSize);
+
+ List flattenedOrcTypes = reader.getFooter().getTypes();
+ OrcType rootType = flattenedOrcTypes.get(0);
+ ImmutableMap.Builder builder = ImmutableMap.builder();
+ for (int physicalOrdinal = 0; physicalOrdinal < rootType.getFieldCount(); physicalOrdinal++) {
+ int index = rootType.getFieldTypeIndex(physicalOrdinal);
+ Map attributes = flattenedOrcTypes.get(index).getAttributes();
+ if (attributes.containsKey(ICEBERG_FIELD_ID_KEY)) {
+ builder.put(Integer.valueOf(attributes.get(ICEBERG_FIELD_ID_KEY)), physicalOrdinal);
+ }
+ }
+ ImmutableMap icebergIdToPhysicalOrdinal = builder.build();
+
+ int nextMissingColumnIndex = rootType.getFieldCount();
+
+ List physicalColumns;
+ if (icebergIdToPhysicalOrdinal.isEmpty()) {
+ physicalColumns = columns.stream().filter(column -> column.getColumnType() == REGULAR).collect(toImmutableList());
+ }
+ else {
+ ImmutableList.Builder columnsBuilder = ImmutableList.builder();
+ for (HiveColumnHandle column : columns) {
+ if (column.getColumnType() == REGULAR) {
+ int physicalOrdinal;
+ String name = column.getName();
+ Integer id = icebergNameToId.get(name);
+ if (icebergIdToPhysicalOrdinal.containsKey(id)) {
+ physicalOrdinal = icebergIdToPhysicalOrdinal.get(id);
+ }
+ else {
+ physicalOrdinal = nextMissingColumnIndex;
+ nextMissingColumnIndex++;
+ }
+ columnsBuilder.add(new HiveColumnHandle(column.getName(), column.getHiveType(), column.getTypeSignature(), physicalOrdinal, column.getColumnType(), column.getComment()));
+ }
+ }
+ physicalColumns = columnsBuilder.build();
+ }
+
+ ImmutableMap.Builder includedColumns = ImmutableMap.builder();
+ ImmutableList.Builder> columnReferences = ImmutableList.builder();
+ for (HiveColumnHandle column : physicalColumns) {
+ if (column.getColumnType() == REGULAR) {
+ io.prestosql.spi.type.Type type = typeManager.getType(column.getTypeSignature());
+ includedColumns.put(column.getHiveColumnIndex(), type);
+ columnReferences.add(new TupleDomainOrcPredicate.ColumnReference<>(column, column.getHiveColumnIndex(), type));
+ }
+ }
+
+ OrcPredicate predicate = new TupleDomainOrcPredicate<>(effectivePredicate, columnReferences.build(), orcBloomFiltersEnabled);
+
+ OrcRecordReader recordReader = reader.createRecordReader(
+ includedColumns.build(),
+ predicate,
+ start,
+ length,
+ hiveStorageTimeZone,
+ systemMemoryUsage,
+ INITIAL_BATCH_SIZE);
+
+ orcPageSource = new OrcPageSource(
+ recordReader,
+ orcDataSource,
+ physicalColumns,
+ typeManager,
+ systemMemoryUsage,
+ stats);
+ }
+ catch (Exception e) {
+ try {
+ orcDataSource.close();
+ }
+ catch (IOException ignored) {
+ }
+ if (e instanceof PrestoException) {
+ throw (PrestoException) e;
+ }
+ String message = format("Error opening Iceberg split %s (offset=%s, length=%s): %s", path, start, length, e.getMessage());
+ if (e instanceof BlockMissingException) {
+ throw new PrestoException(HIVE_MISSING_DATA, message, e);
+ }
+ throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, message, e);
+ }
+
+ List columnMappings = buildColumnMappings(
+ partitionKeys,
+ columns.stream()
+ .filter(column -> !column.isHidden())
+ .collect(toImmutableList()),
+ ImmutableList.of(),
+ ImmutableMap.of(),
+ path,
+ OptionalInt.empty());
+
+ return new HivePageSource(
+ columnMappings,
+ Optional.empty(),
+ hiveStorageTimeZone,
+ typeManager,
+ orcPageSource);
+ }
}
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSessionProperties.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSessionProperties.java
index 0df1b982199d..422d6cb16242 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSessionProperties.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSessionProperties.java
@@ -27,6 +27,8 @@
import static io.prestosql.spi.session.PropertyMetadata.booleanProperty;
import static io.prestosql.spi.session.PropertyMetadata.dataSizeProperty;
+import static io.prestosql.spi.session.PropertyMetadata.integerProperty;
+import static io.prestosql.spi.session.PropertyMetadata.stringProperty;
public final class IcebergSessionProperties
{
@@ -34,6 +36,19 @@ public final class IcebergSessionProperties
private static final String PARQUET_MAX_READ_BLOCK_SIZE = "parquet_max_read_block_size";
private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size";
private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size";
+ private static final String ORC_OPTIMIZED_WRITER_MIN_STRIPE_SIZE = "orc_optimized_writer_min_stripe_size";
+ private static final String ORC_OPTIMIZED_WRITER_MAX_STRIPE_SIZE = "orc_optimized_writer_max_stripe_size";
+ private static final String ORC_OPTIMIZED_WRITER_MAX_STRIPE_ROWS = "orc_optimized_writer_max_stripe_rows";
+ private static final String ORC_OPTIMIZED_WRITER_MAX_DICTIONARY_MEMORY = "orc_optimized_writer_max_dictionary_memory";
+ private static final String ORC_STRING_STATISTICS_LIMIT = "orc_string_statistics_limit";
+ private static final String ORC_OPTIMIZED_WRITER_VALIDATE_MODE = "orc_optimized_writer_validate_mode";
+ private static final String ORC_MAX_MERGE_DISTANCE = "orc_max_merge_distance";
+ private static final String ORC_MAX_BUFFER_SIZE = "orc_max_buffer_size";
+ private static final String ORC_STREAM_BUFFER_SIZE = "orc_stream_buffer_size";
+ private static final String ORC_TINY_STRIPE_THRESHOLD = "orc_tiny_stripe_threshold";
+ private static final String ORC_MAX_READ_BLOCK_SIZE = "orc_max_read_block_size";
+ private static final String ORC_LAZY_READ_SMALL_RANGES = "orc_lazy_read_small_ranges";
+ private static final String ORC_BLOOM_FILTERS_ENABLED = "orc_bloom_filters_enabled";
private final List> sessionProperties;
@@ -61,6 +76,71 @@ public IcebergSessionProperties(HiveConfig hiveConfig, OrcFileWriterConfig orcFi
"Parquet: Writer page size",
parquetFileWriterConfig.getPageSize(),
false))
+ .add(dataSizeProperty(
+ ORC_OPTIMIZED_WRITER_MIN_STRIPE_SIZE,
+ "Experimental: ORC: Min stripe size",
+ orcFileWriterConfig.getStripeMinSize(),
+ false))
+ .add(dataSizeProperty(
+ ORC_OPTIMIZED_WRITER_MAX_STRIPE_SIZE,
+ "Experimental: ORC: Max stripe size",
+ orcFileWriterConfig.getStripeMaxSize(),
+ false))
+ .add(integerProperty(
+ ORC_OPTIMIZED_WRITER_MAX_STRIPE_ROWS,
+ "Experimental: ORC: Max stripe row count",
+ orcFileWriterConfig.getStripeMaxRowCount(),
+ false))
+ .add(dataSizeProperty(
+ ORC_OPTIMIZED_WRITER_MAX_DICTIONARY_MEMORY,
+ "Experimental: ORC: Max dictionary memory",
+ orcFileWriterConfig.getDictionaryMaxMemory(),
+ false))
+ .add(dataSizeProperty(
+ ORC_STRING_STATISTICS_LIMIT,
+ "ORC: Maximum size of string statistics; drop if exceeding",
+ orcFileWriterConfig.getStringStatisticsLimit(),
+ false))
+ .add(stringProperty(
+ ORC_OPTIMIZED_WRITER_VALIDATE_MODE,
+ "Experimental: ORC: Level of detail in ORC validation",
+ hiveConfig.getOrcWriterValidationMode().toString(),
+ false))
+ .add(dataSizeProperty(
+ ORC_MAX_MERGE_DISTANCE,
+ "ORC: Maximum size of gap between two reads to merge into a single read",
+ hiveConfig.getOrcMaxMergeDistance(),
+ false))
+ .add(dataSizeProperty(
+ ORC_MAX_BUFFER_SIZE,
+ "ORC: Maximum size of a single read",
+ hiveConfig.getOrcMaxBufferSize(),
+ false))
+ .add(dataSizeProperty(
+ ORC_STREAM_BUFFER_SIZE,
+ "ORC: Size of buffer for streaming reads",
+ hiveConfig.getOrcStreamBufferSize(),
+ false))
+ .add(dataSizeProperty(
+ ORC_TINY_STRIPE_THRESHOLD,
+ "ORC: Threshold below which an ORC stripe or file will read in its entirety",
+ hiveConfig.getOrcTinyStripeThreshold(),
+ false))
+ .add(dataSizeProperty(
+ ORC_MAX_READ_BLOCK_SIZE,
+ "ORC: Soft max size of Presto blocks produced by ORC reader",
+ hiveConfig.getOrcMaxReadBlockSize(),
+ false))
+ .add(booleanProperty(
+ ORC_LAZY_READ_SMALL_RANGES,
+ "Experimental: ORC: Read small file segments lazily",
+ hiveConfig.isOrcLazyReadSmallRanges(),
+ false))
+ .add(booleanProperty(
+ ORC_BLOOM_FILTERS_ENABLED,
+ "ORC: Enable bloom filters for predicate pushdown",
+ hiveConfig.isOrcBloomFiltersEnabled(),
+ false))
.build();
}
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplit.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplit.java
index 8e72a50bd828..d08dc7e84569 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplit.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplit.java
@@ -22,6 +22,7 @@
import io.prestosql.spi.HostAddress;
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.iceberg.FileFormat;
import java.util.List;
import java.util.Map;
@@ -39,6 +40,7 @@ public class IcebergSplit
private final Map nameToId;
private final TupleDomain predicate;
private final List partitionKeys;
+ private final FileFormat fileFormat;
@JsonCreator
public IcebergSplit(
@@ -48,7 +50,8 @@ public IcebergSplit(
@JsonProperty("addresses") List addresses,
@JsonProperty("nameToId") Map nameToId,
@JsonProperty("predicate") TupleDomain predicate,
- @JsonProperty("partitionKeys") List partitionKeys)
+ @JsonProperty("partitionKeys") List partitionKeys,
+ @JsonProperty("fileFormat") FileFormat fileFormat)
{
this.path = requireNonNull(path, "path is null");
this.start = start;
@@ -57,6 +60,7 @@ public IcebergSplit(
this.nameToId = ImmutableMap.copyOf(requireNonNull(nameToId, "nameToId is null"));
this.predicate = requireNonNull(predicate, "predicate is null");
this.partitionKeys = ImmutableList.copyOf(requireNonNull(partitionKeys, "partitionKeys is null"));
+ this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
}
@Override
@@ -108,6 +112,12 @@ public List getPartitionKeys()
return partitionKeys;
}
+ @JsonProperty
+ public FileFormat getFileFormat()
+ {
+ return fileFormat;
+ }
+
@Override
public Object getInfo()
{
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitManager.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitManager.java
index f520b93469b4..3c5060971bc8 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitManager.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitManager.java
@@ -24,11 +24,13 @@
import io.prestosql.spi.connector.classloader.ClassLoaderSafeConnectorSplitSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import javax.inject.Inject;
+import static io.prestosql.plugin.iceberg.IcebergUtil.getFileFormat;
import static io.prestosql.plugin.iceberg.IcebergUtil.getIcebergTable;
import static io.prestosql.plugin.iceberg.IcebergUtil.getTableScan;
import static java.util.Objects.requireNonNull;
@@ -57,9 +59,11 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co
TableScan tableScan = getTableScan(session, table.getPredicate(), table.getSnapshotId(), icebergTable);
+ FileFormat fileFormat = getFileFormat(icebergTable);
+
// TODO Use residual. Right now there is no way to propagate residual to presto but at least we can
// propagate it at split level so the parquet pushdown can leverage it.
- IcebergSplitSource splitSource = new IcebergSplitSource(tableScan.planTasks(), table.getPredicate(), icebergTable.schema());
+ IcebergSplitSource splitSource = new IcebergSplitSource(tableScan.planTasks(), table.getPredicate(), icebergTable.schema(), fileFormat);
return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader());
}
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitSource.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitSource.java
index e6055dc7b3b1..bfe7a8b83e19 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitSource.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitSource.java
@@ -22,6 +22,7 @@
import io.prestosql.spi.connector.ConnectorSplitSource;
import io.prestosql.spi.predicate.TupleDomain;
import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
@@ -68,10 +69,13 @@ public class IcebergSplitSource
private final Map nameToId;
private final Iterator fileScanIterator;
+ private final FileFormat fileFormat;
+
public IcebergSplitSource(
CloseableIterable combinedScanIterable,
TupleDomain predicate,
- Schema schema)
+ Schema schema,
+ FileFormat fileFormat)
{
this.combinedScanIterable = requireNonNull(combinedScanIterable, "combinedScanIterable is null");
this.predicate = requireNonNull(predicate, "predicate is null");
@@ -83,6 +87,8 @@ public IcebergSplitSource(
.map(CombinedScanTask::files)
.flatMap(Collection::stream)
.iterator();
+
+ this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
}
@Override
@@ -130,7 +136,8 @@ private ConnectorSplit toIcebergSplit(TupleDomain predicate, F
ImmutableList.of(),
nameToId,
predicate,
- getPartitionKeys(task));
+ getPartitionKeys(task),
+ fileFormat);
}
private static List getPartitionKeys(FileScanTask scanTask)
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergUtil.java
index 917d3205cc80..0229d9f18029 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergUtil.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergUtil.java
@@ -47,7 +47,7 @@
import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
import static io.prestosql.plugin.hive.HiveColumnHandle.ColumnType.REGULAR;
import static io.prestosql.plugin.hive.HiveType.toHiveType;
-import static io.prestosql.plugin.iceberg.TypeConveter.toPrestoType;
+import static io.prestosql.plugin.iceberg.TypeConverter.toPrestoType;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
@@ -57,6 +57,7 @@
final class IcebergUtil
{
+ public static final String ICEBERG_FIELD_ID_KEY = "ICEBERG_FIELD_ID_KEY";
private static final TypeTranslator TYPE_TRANSLATOR = new HiveTypeTranslator();
private IcebergUtil() {}
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionTable.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionTable.java
index e5c2eb00af98..b2c0ee315d38 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionTable.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionTable.java
@@ -63,7 +63,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.plugin.iceberg.IcebergUtil.getIdentityPartitions;
-import static io.prestosql.plugin.iceberg.TypeConveter.toPrestoType;
+import static io.prestosql.plugin.iceberg.TypeConverter.toPrestoType;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static java.util.Objects.requireNonNull;
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TypeConverter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TypeConverter.java
new file mode 100644
index 000000000000..577d3356d3ed
--- /dev/null
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TypeConverter.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.iceberg;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.prestosql.orc.metadata.OrcType;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.type.ArrayType;
+import io.prestosql.spi.type.BigintType;
+import io.prestosql.spi.type.BooleanType;
+import io.prestosql.spi.type.DateType;
+import io.prestosql.spi.type.DecimalType;
+import io.prestosql.spi.type.DoubleType;
+import io.prestosql.spi.type.IntegerType;
+import io.prestosql.spi.type.MapType;
+import io.prestosql.spi.type.RealType;
+import io.prestosql.spi.type.RowType;
+import io.prestosql.spi.type.StandardTypes;
+import io.prestosql.spi.type.TimeType;
+import io.prestosql.spi.type.TimestampType;
+import io.prestosql.spi.type.TimestampWithTimeZoneType;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import io.prestosql.spi.type.TypeSignature;
+import io.prestosql.spi.type.TypeSignatureParameter;
+import io.prestosql.spi.type.VarbinaryType;
+import io.prestosql.spi.type.VarcharType;
+import org.apache.iceberg.types.Types;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.iceberg.IcebergUtil.ICEBERG_FIELD_ID_KEY;
+import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
+import static java.lang.String.format;
+
+public final class TypeConverter
+{
+ private static final Map, org.apache.iceberg.types.Type> PRESTO_TO_ICEBERG = ImmutableMap., org.apache.iceberg.types.Type>builder()
+ .put(BooleanType.class, Types.BooleanType.get())
+ .put(VarbinaryType.class, Types.BinaryType.get())
+ .put(DateType.class, Types.DateType.get())
+ .put(DoubleType.class, Types.DoubleType.get())
+ .put(BigintType.class, Types.LongType.get())
+ .put(RealType.class, Types.FloatType.get())
+ .put(IntegerType.class, Types.IntegerType.get())
+ .put(TimeType.class, Types.TimeType.get())
+ .put(TimestampType.class, Types.TimestampType.withoutZone())
+ .put(TimestampWithTimeZoneType.class, Types.TimestampType.withZone())
+ .put(VarcharType.class, Types.StringType.get())
+ .build();
+
+ private TypeConverter() {}
+
+ public static Type toPrestoType(org.apache.iceberg.types.Type type, TypeManager typeManager)
+ {
+ switch (type.typeId()) {
+ case BOOLEAN:
+ return BooleanType.BOOLEAN;
+ case UUID:
+ case BINARY:
+ case FIXED:
+ return VarbinaryType.VARBINARY;
+ case DATE:
+ return DateType.DATE;
+ case DECIMAL:
+ Types.DecimalType decimalType = (Types.DecimalType) type;
+ return DecimalType.createDecimalType(decimalType.precision(), decimalType.scale());
+ case DOUBLE:
+ return DoubleType.DOUBLE;
+ case LONG:
+ return BigintType.BIGINT;
+ case FLOAT:
+ return RealType.REAL;
+ case INTEGER:
+ return IntegerType.INTEGER;
+ case TIME:
+ return TimeType.TIME;
+ case TIMESTAMP:
+ Types.TimestampType timestampType = (Types.TimestampType) type;
+ if (timestampType.shouldAdjustToUTC()) {
+ return TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
+ }
+ return TimestampType.TIMESTAMP;
+ case STRING:
+ return VarcharType.createUnboundedVarcharType();
+ case LIST:
+ Types.ListType listType = (Types.ListType) type;
+ return new ArrayType(toPrestoType(listType.elementType(), typeManager));
+ case MAP:
+ Types.MapType mapType = (Types.MapType) type;
+ TypeSignature keyType = toPrestoType(mapType.keyType(), typeManager).getTypeSignature();
+ TypeSignature valueType = toPrestoType(mapType.valueType(), typeManager).getTypeSignature();
+ return typeManager.getParameterizedType(StandardTypes.MAP, ImmutableList.of(TypeSignatureParameter.of(keyType), TypeSignatureParameter.of(valueType)));
+ case STRUCT:
+ List fields = ((Types.StructType) type).fields();
+ return RowType.from(fields.stream()
+ .map(field -> new RowType.Field(Optional.of(field.name()), toPrestoType(field.type(), typeManager)))
+ .collect(toImmutableList()));
+ default:
+ throw new UnsupportedOperationException(format("Cannot convert from Iceberg type '%s' (%s) to Presto type", type, type.typeId()));
+ }
+ }
+
+ public static org.apache.iceberg.types.Type toIcebergType(Type type)
+ {
+ if (PRESTO_TO_ICEBERG.containsKey(type.getClass())) {
+ return PRESTO_TO_ICEBERG.get(type.getClass());
+ }
+ if (type instanceof DecimalType) {
+ return fromDecimal((DecimalType) type);
+ }
+ if (type instanceof RowType) {
+ return fromRow((RowType) type);
+ }
+ if (type instanceof ArrayType) {
+ return fromArray((ArrayType) type);
+ }
+ if (type instanceof MapType) {
+ return fromMap((MapType) type);
+ }
+ throw new PrestoException(NOT_SUPPORTED, "Type not supported for Iceberg: " + type.getDisplayName());
+ }
+
+ private static org.apache.iceberg.types.Type fromDecimal(DecimalType type)
+ {
+ return Types.DecimalType.of(type.getPrecision(), type.getScale());
+ }
+
+ private static org.apache.iceberg.types.Type fromRow(RowType type)
+ {
+ List fields = new ArrayList<>();
+ for (RowType.Field field : type.getFields()) {
+ String name = field.getName().orElseThrow(() ->
+ new PrestoException(NOT_SUPPORTED, "Row type field does not have a name: " + type.getDisplayName()));
+ fields.add(Types.NestedField.required(fields.size() + 1, name, toIcebergType(field.getType())));
+ }
+ return Types.StructType.of(fields);
+ }
+
+ private static org.apache.iceberg.types.Type fromArray(ArrayType type)
+ {
+ return Types.ListType.ofOptional(1, toIcebergType(type.getElementType()));
+ }
+
+ private static org.apache.iceberg.types.Type fromMap(MapType type)
+ {
+ return Types.MapType.ofOptional(1, 2, toIcebergType(type.getKeyType()), toIcebergType(type.getValueType()));
+ }
+
+ private static List toOrcType(int nextFieldTypeIndex, org.apache.iceberg.types.Type type, int fieldId)
+ {
+ switch (type.typeId()) {
+ case BOOLEAN:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.BOOLEAN, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)), Optional.empty(), Optional.empty(), Optional.empty()));
+ case INTEGER:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.INT, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)), Optional.empty(), Optional.empty(), Optional.empty()));
+ case LONG:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.LONG, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)), Optional.empty(), Optional.empty(), Optional.empty()));
+ case FLOAT:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.FLOAT, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)), Optional.empty(), Optional.empty(), Optional.empty()));
+ case DOUBLE:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.DOUBLE, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)), Optional.empty(), Optional.empty(), Optional.empty()));
+ case DATE:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.DATE, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)), Optional.empty(), Optional.empty(), Optional.empty()));
+ case TIME:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.INT, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)), Optional.empty(), Optional.empty(), Optional.empty()));
+ case TIMESTAMP:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.TIMESTAMP, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)), Optional.empty(), Optional.empty(), Optional.empty()));
+ case STRING:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.STRING, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)), Optional.empty(), Optional.empty(), Optional.empty()));
+ case UUID:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.BINARY, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)), Optional.empty(), Optional.empty(), Optional.empty()));
+ case FIXED:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.BINARY, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)), Optional.empty(), Optional.empty(), Optional.empty()));
+ case BINARY:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.BINARY, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)), Optional.empty(), Optional.empty(), Optional.empty()));
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) type;
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.DECIMAL, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)), Optional.empty(), Optional.of(decimalType.getPrecision()), Optional.of(decimalType.getScale())));
+ case STRUCT:
+ return toOrcStructType(nextFieldTypeIndex, (Types.StructType) type, fieldId);
+ case LIST:
+ return toOrcListType(nextFieldTypeIndex, (Types.ListType) type, fieldId);
+ case MAP:
+ return toOrcMapType(nextFieldTypeIndex, (Types.MapType) type, fieldId);
+ default:
+ throw new PrestoException(NOT_SUPPORTED, format("Unsupported Iceberg type: %s", type));
+ }
+ }
+
+ public static List toOrcStructType(int nextFieldTypeIndex, Types.StructType structType, int fieldId)
+ {
+ nextFieldTypeIndex++;
+ List fieldTypeIndexes = new ArrayList<>();
+ List fieldNames = new ArrayList<>();
+ List> fieldTypesList = new ArrayList<>();
+ for (Types.NestedField field : structType.fields()) {
+ fieldTypeIndexes.add(nextFieldTypeIndex);
+ fieldNames.add(field.name());
+ List fieldOrcTypes = toOrcType(nextFieldTypeIndex, field.type(), field.fieldId());
+ fieldTypesList.add(fieldOrcTypes);
+ nextFieldTypeIndex += fieldOrcTypes.size();
+ }
+
+ ImmutableList.Builder orcTypes = ImmutableList.builder();
+ orcTypes.add(new OrcType(
+ OrcType.OrcTypeKind.STRUCT,
+ fieldTypeIndexes,
+ fieldNames,
+ ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty()));
+ fieldTypesList.forEach(orcTypes::addAll);
+
+ return orcTypes.build();
+ }
+
+ private static List toOrcListType(int nextFieldTypeIndex, Types.ListType listType, int fieldId)
+ {
+ nextFieldTypeIndex++;
+ List itemTypes = toOrcType(nextFieldTypeIndex, listType.elementType(), listType.elementId());
+
+ List orcTypes = new ArrayList<>();
+ orcTypes.add(new OrcType(
+ OrcType.OrcTypeKind.LIST,
+ ImmutableList.of(nextFieldTypeIndex),
+ ImmutableList.of("item"),
+ ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty()));
+
+ orcTypes.addAll(itemTypes);
+ return orcTypes;
+ }
+
+ private static List toOrcMapType(int nextFieldTypeIndex, Types.MapType mapType, int fieldId)
+ {
+ nextFieldTypeIndex++;
+ List keyTypes = toOrcType(nextFieldTypeIndex, mapType.keyType(), mapType.keyId());
+ List valueTypes = toOrcType(nextFieldTypeIndex + keyTypes.size(), mapType.valueType(), mapType.valueId());
+
+ List orcTypes = new ArrayList<>();
+ orcTypes.add(new OrcType(
+ OrcType.OrcTypeKind.MAP,
+ ImmutableList.of(nextFieldTypeIndex, nextFieldTypeIndex + keyTypes.size()),
+ ImmutableList.of("key", "value"),
+ ImmutableMap.of(ICEBERG_FIELD_ID_KEY, Integer.toString(fieldId)),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty()));
+
+ orcTypes.addAll(keyTypes);
+ orcTypes.addAll(valueTypes);
+ return orcTypes;
+ }
+}
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TypeConveter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TypeConveter.java
deleted file mode 100644
index b730a63cdee0..000000000000
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TypeConveter.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.prestosql.plugin.iceberg;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import io.prestosql.spi.PrestoException;
-import io.prestosql.spi.type.ArrayType;
-import io.prestosql.spi.type.BigintType;
-import io.prestosql.spi.type.BooleanType;
-import io.prestosql.spi.type.DateType;
-import io.prestosql.spi.type.DecimalType;
-import io.prestosql.spi.type.DoubleType;
-import io.prestosql.spi.type.IntegerType;
-import io.prestosql.spi.type.MapType;
-import io.prestosql.spi.type.RealType;
-import io.prestosql.spi.type.RowType;
-import io.prestosql.spi.type.StandardTypes;
-import io.prestosql.spi.type.TimeType;
-import io.prestosql.spi.type.TimestampType;
-import io.prestosql.spi.type.TimestampWithTimeZoneType;
-import io.prestosql.spi.type.Type;
-import io.prestosql.spi.type.TypeManager;
-import io.prestosql.spi.type.TypeSignature;
-import io.prestosql.spi.type.TypeSignatureParameter;
-import io.prestosql.spi.type.VarbinaryType;
-import io.prestosql.spi.type.VarcharType;
-import org.apache.iceberg.types.Types;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static com.google.common.collect.ImmutableList.toImmutableList;
-import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
-import static java.lang.String.format;
-
-public final class TypeConveter
-{
- private static final Map, org.apache.iceberg.types.Type> PRESTO_TO_ICEBERG = ImmutableMap., org.apache.iceberg.types.Type>builder()
- .put(BooleanType.class, Types.BooleanType.get())
- .put(VarbinaryType.class, Types.BinaryType.get())
- .put(DateType.class, Types.DateType.get())
- .put(DoubleType.class, Types.DoubleType.get())
- .put(BigintType.class, Types.LongType.get())
- .put(RealType.class, Types.FloatType.get())
- .put(IntegerType.class, Types.IntegerType.get())
- .put(TimeType.class, Types.TimeType.get())
- .put(TimestampType.class, Types.TimestampType.withoutZone())
- .put(TimestampWithTimeZoneType.class, Types.TimestampType.withZone())
- .put(VarcharType.class, Types.StringType.get())
- .build();
-
- private TypeConveter() {}
-
- public static Type toPrestoType(org.apache.iceberg.types.Type type, TypeManager typeManager)
- {
- switch (type.typeId()) {
- case BOOLEAN:
- return BooleanType.BOOLEAN;
- case BINARY:
- case FIXED:
- return VarbinaryType.VARBINARY;
- case DATE:
- return DateType.DATE;
- case DECIMAL:
- Types.DecimalType decimalType = (Types.DecimalType) type;
- return DecimalType.createDecimalType(decimalType.precision(), decimalType.scale());
- case DOUBLE:
- return DoubleType.DOUBLE;
- case LONG:
- return BigintType.BIGINT;
- case FLOAT:
- return RealType.REAL;
- case INTEGER:
- return IntegerType.INTEGER;
- case TIME:
- return TimeType.TIME;
- case TIMESTAMP:
- Types.TimestampType timestampType = (Types.TimestampType) type;
- if (timestampType.shouldAdjustToUTC()) {
- return TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
- }
- return TimestampType.TIMESTAMP;
- case UUID:
- case STRING:
- return VarcharType.createUnboundedVarcharType();
- case LIST:
- Types.ListType listType = (Types.ListType) type;
- return new ArrayType(toPrestoType(listType.elementType(), typeManager));
- case MAP:
- Types.MapType mapType = (Types.MapType) type;
- TypeSignature keyType = toPrestoType(mapType.keyType(), typeManager).getTypeSignature();
- TypeSignature valueType = toPrestoType(mapType.valueType(), typeManager).getTypeSignature();
- return typeManager.getParameterizedType(StandardTypes.MAP, ImmutableList.of(TypeSignatureParameter.of(keyType), TypeSignatureParameter.of(valueType)));
- case STRUCT:
- List fields = ((Types.StructType) type).fields();
- return RowType.from(fields.stream()
- .map(field -> new RowType.Field(Optional.of(field.name()), toPrestoType(field.type(), typeManager)))
- .collect(toImmutableList()));
- default:
- throw new UnsupportedOperationException(format("Cannot convert from Iceberg type '%s' (%s) to Presto type", type, type.typeId()));
- }
- }
-
- public static org.apache.iceberg.types.Type toIcebergType(Type type)
- {
- if (PRESTO_TO_ICEBERG.containsKey(type.getClass())) {
- return PRESTO_TO_ICEBERG.get(type.getClass());
- }
- if (type instanceof DecimalType) {
- return fromDecimal((DecimalType) type);
- }
- if (type instanceof RowType) {
- return fromRow((RowType) type);
- }
- if (type instanceof ArrayType) {
- return fromArray((ArrayType) type);
- }
- if (type instanceof MapType) {
- return fromMap((MapType) type);
- }
- throw new PrestoException(NOT_SUPPORTED, "Type not supported for Iceberg: " + type.getDisplayName());
- }
-
- private static org.apache.iceberg.types.Type fromDecimal(DecimalType type)
- {
- return Types.DecimalType.of(type.getPrecision(), type.getScale());
- }
-
- private static org.apache.iceberg.types.Type fromRow(RowType type)
- {
- List fields = new ArrayList<>();
- for (RowType.Field field : type.getFields()) {
- String name = field.getName().orElseThrow(() ->
- new PrestoException(NOT_SUPPORTED, "Row type field does not have a name: " + type.getDisplayName()));
- fields.add(Types.NestedField.required(fields.size() + 1, name, toIcebergType(field.getType())));
- }
- return Types.StructType.of(fields);
- }
-
- private static org.apache.iceberg.types.Type fromArray(ArrayType type)
- {
- return Types.ListType.ofOptional(1, toIcebergType(type.getElementType()));
- }
-
- private static org.apache.iceberg.types.Type fromMap(MapType type)
- {
- return Types.MapType.ofOptional(1, 2, toIcebergType(type.getKeyType()), toIcebergType(type.getValueType()));
- }
-}
diff --git a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java
index 34fe60606c40..db1e0146afdb 100644
--- a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java
+++ b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java
@@ -162,7 +162,7 @@ private void testCreatePartitionedTableAs(Session session, FileFormat fileFormat
" order_status varchar\n" +
")\n" +
"WITH (\n" +
- " format = 'PARQUET',\n" +
+ " format = '" + fileFormat + "',\n" +
" partitioning = ARRAY['order_status','ship_priority','bucket(order_key, 9)']\n" +
")",
getSession().getCatalog().get(),
@@ -182,6 +182,7 @@ private void testCreatePartitionedTableAs(Session session, FileFormat fileFormat
private void testWithAllFileFormats(BiConsumer test)
{
test.accept(getSession(), FileFormat.PARQUET);
+ test.accept(getSession(), FileFormat.ORC);
}
private void dropTable(Session session, String table)
diff --git a/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java b/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java
index 9298059ae007..6d4fcadcfd30 100644
--- a/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java
+++ b/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java
@@ -123,6 +123,7 @@ public OrcWriter(
OrcDataSink orcDataSink,
List columnNames,
List types,
+ List flattenedOrcTypes,
CompressionKind compression,
OrcWriterOptions options,
boolean writeLegacyVersion,
@@ -160,7 +161,7 @@ public OrcWriter(
this.stats = requireNonNull(stats, "stats is null");
requireNonNull(columnNames, "columnNames is null");
- this.orcTypes = OrcType.createOrcRowType(0, columnNames, types);
+ this.orcTypes = requireNonNull(flattenedOrcTypes, "flattenedOrcTypes is null");
recordValidation(validation -> validation.setColumnNames(columnNames));
// create column writers
diff --git a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java
index 4d6783564639..140e7baa941f 100644
--- a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java
+++ b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java
@@ -491,7 +491,7 @@ private static OrcType toType(OrcProto.Type type)
precision = Optional.of(type.getPrecision());
scale = Optional.of(type.getScale());
}
- return new OrcType(toTypeKind(type.getKind()), type.getSubtypesList(), type.getFieldNamesList(), length, precision, scale);
+ return new OrcType(toTypeKind(type.getKind()), type.getSubtypesList(), type.getFieldNamesList(), toMap(type.getAttributesList()), length, precision, scale);
}
private static List toType(List types)
@@ -545,6 +545,17 @@ private static OrcTypeKind toTypeKind(OrcProto.Type.Kind typeKind)
}
}
+ private static Map toMap(List attributes)
+ {
+ ImmutableMap.Builder results = new ImmutableMap.Builder<>();
+ for (OrcProto.StringPair attribute : attributes) {
+ if (attribute.hasKey() && attribute.hasValue()) {
+ results.put(attribute.getKey(), attribute.getValue());
+ }
+ }
+ return results.build();
+ }
+
private static StreamKind toStreamKind(OrcProto.Stream.Kind streamKind)
{
switch (streamKind) {
diff --git a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java
index 03a7abf747bb..d69626e24fca 100644
--- a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java
+++ b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java
@@ -34,6 +34,7 @@
import java.io.OutputStream;
import java.time.ZoneId;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.TimeZone;
@@ -145,7 +146,8 @@ private static Type toType(OrcType type)
Builder builder = Type.newBuilder()
.setKind(toTypeKind(type.getOrcTypeKind()))
.addAllSubtypes(type.getFieldTypeIndexes())
- .addAllFieldNames(type.getFieldNames());
+ .addAllFieldNames(type.getFieldNames())
+ .addAllAttributes(toStringPairList(type.getAttributes()));
if (type.getLength().isPresent()) {
builder.setMaximumLength(type.getLength().get());
@@ -202,6 +204,18 @@ private static OrcProto.Type.Kind toTypeKind(OrcTypeKind orcTypeKind)
throw new IllegalArgumentException("Unsupported type: " + orcTypeKind);
}
+ private static List toStringPairList(Map attributes)
+ {
+ ImmutableList.Builder results = new ImmutableList.Builder<>();
+ for (Entry attribute : attributes.entrySet()) {
+ OrcProto.StringPair.Builder builder = OrcProto.StringPair.newBuilder();
+ builder.setKey(attribute.getKey());
+ builder.setValue(attribute.getValue());
+ results.add(builder.build());
+ }
+ return results.build();
+ }
+
private static OrcProto.ColumnStatistics toColumnStatistics(ColumnStatistics columnStatistics)
{
OrcProto.ColumnStatistics.Builder builder = OrcProto.ColumnStatistics.newBuilder();
diff --git a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcType.java b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcType.java
index 69238d25cfa8..4e25975173b8 100644
--- a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcType.java
+++ b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcType.java
@@ -14,6 +14,7 @@
package io.prestosql.orc.metadata;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.type.CharType;
import io.prestosql.spi.type.DecimalType;
@@ -23,6 +24,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import static com.google.common.base.MoreObjects.toStringHelper;
@@ -38,6 +40,7 @@
import static io.prestosql.spi.type.StandardTypes.ARRAY;
import static io.prestosql.spi.type.StandardTypes.MAP;
import static io.prestosql.spi.type.StandardTypes.ROW;
+import static io.prestosql.spi.type.TimeType.TIME;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
import static io.prestosql.spi.type.TinyintType.TINYINT;
import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
@@ -77,31 +80,37 @@ public enum OrcTypeKind
private final OrcTypeKind orcTypeKind;
private final List fieldTypeIndexes;
private final List fieldNames;
+ private final Map attributes;
private final Optional length;
private final Optional precision;
private final Optional scale;
private OrcType(OrcTypeKind orcTypeKind)
{
- this(orcTypeKind, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty());
+ this(orcTypeKind, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(), Optional.empty(), Optional.empty(), Optional.empty());
}
private OrcType(OrcTypeKind orcTypeKind, int length)
{
- this(orcTypeKind, ImmutableList.of(), ImmutableList.of(), Optional.of(length), Optional.empty(), Optional.empty());
+ this(orcTypeKind, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(), Optional.of(length), Optional.empty(), Optional.empty());
}
private OrcType(OrcTypeKind orcTypeKind, int precision, int scale)
{
- this(orcTypeKind, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.of(precision), Optional.of(scale));
+ this(orcTypeKind, ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(), Optional.empty(), Optional.of(precision), Optional.of(scale));
}
private OrcType(OrcTypeKind orcTypeKind, List fieldTypeIndexes, List fieldNames)
{
- this(orcTypeKind, fieldTypeIndexes, fieldNames, Optional.empty(), Optional.empty(), Optional.empty());
+ this(orcTypeKind, fieldTypeIndexes, fieldNames, ImmutableMap.of(), Optional.empty(), Optional.empty(), Optional.empty());
}
- public OrcType(OrcTypeKind orcTypeKind, List fieldTypeIndexes, List fieldNames, Optional length, Optional precision, Optional scale)
+ private OrcType(OrcTypeKind orcTypeKind, List fieldTypeIndexes, List fieldNames, Map attributes)
+ {
+ this(orcTypeKind, fieldTypeIndexes, fieldNames, attributes, Optional.empty(), Optional.empty(), Optional.empty());
+ }
+
+ public OrcType(OrcTypeKind orcTypeKind, List fieldTypeIndexes, List fieldNames, Map attributes, Optional length, Optional precision, Optional scale)
{
this.orcTypeKind = requireNonNull(orcTypeKind, "typeKind is null");
this.fieldTypeIndexes = ImmutableList.copyOf(requireNonNull(fieldTypeIndexes, "fieldTypeIndexes is null"));
@@ -112,6 +121,7 @@ public OrcType(OrcTypeKind orcTypeKind, List fieldTypeIndexes, List getFieldNames()
return fieldNames;
}
+ public Map getAttributes()
+ {
+ return attributes;
+ }
+
public Optional getLength()
{
return length;
@@ -169,6 +184,7 @@ public String toString()
.add("orcTypeKind", orcTypeKind)
.add("fieldTypeIndexes", fieldTypeIndexes)
.add("fieldNames", fieldNames)
+ .add("attributes", attributes)
.toString();
}
@@ -211,6 +227,9 @@ private static List toOrcType(int nextFieldTypeIndex, Type type)
if (DATE.equals(type)) {
return ImmutableList.of(new OrcType(OrcTypeKind.DATE));
}
+ if (TIME.equals(type)) {
+ return ImmutableList.of(new OrcType(OrcTypeKind.INT));
+ }
if (TIMESTAMP.equals(type)) {
return ImmutableList.of(new OrcType(OrcTypeKind.TIMESTAMP));
}
diff --git a/presto-orc/src/test/java/io/prestosql/orc/OrcTester.java b/presto-orc/src/test/java/io/prestosql/orc/OrcTester.java
index 07532101d0e4..80ec2f00dd19 100644
--- a/presto-orc/src/test/java/io/prestosql/orc/OrcTester.java
+++ b/presto-orc/src/test/java/io/prestosql/orc/OrcTester.java
@@ -24,6 +24,7 @@
import io.airlift.units.DataSize;
import io.prestosql.metadata.Metadata;
import io.prestosql.orc.metadata.CompressionKind;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.spi.Page;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
@@ -575,10 +576,14 @@ public static void writeOrcColumnPresto(File outputFile, CompressionKind compres
metadata.put("columns", "test");
metadata.put("columns.types", createSettableStructObjectInspector("test", type).getTypeName());
+ List columnNames = ImmutableList.of("test");
+ List types = ImmutableList.of(type);
+
OrcWriter writer = new OrcWriter(
new OutputStreamOrcDataSink(new FileOutputStream(outputFile)),
- ImmutableList.of("test"),
- ImmutableList.of(type),
+ columnNames,
+ types,
+ OrcType.createOrcRowType(0, columnNames, types),
compression,
new OrcWriterOptions(),
false,
diff --git a/presto-orc/src/test/java/io/prestosql/orc/TestOrcWriter.java b/presto-orc/src/test/java/io/prestosql/orc/TestOrcWriter.java
index f817cc05a6db..77a24c44b00b 100644
--- a/presto-orc/src/test/java/io/prestosql/orc/TestOrcWriter.java
+++ b/presto-orc/src/test/java/io/prestosql/orc/TestOrcWriter.java
@@ -21,6 +21,7 @@
import io.prestosql.orc.OrcWriteValidation.OrcWriteValidationMode;
import io.prestosql.orc.metadata.Footer;
import io.prestosql.orc.metadata.OrcMetadataReader;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.orc.metadata.Stream;
import io.prestosql.orc.metadata.StripeFooter;
import io.prestosql.orc.metadata.StripeInformation;
@@ -29,11 +30,13 @@
import io.prestosql.spi.Page;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
+import io.prestosql.spi.type.Type;
import org.testng.annotations.Test;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.List;
import java.util.Optional;
import static io.airlift.testing.Assertions.assertGreaterThanOrEqual;
@@ -56,10 +59,15 @@ public void testWriteOutputStreamsInOrder()
{
for (OrcWriteValidationMode validationMode : OrcWriteValidationMode.values()) {
TempFile tempFile = new TempFile();
+
+ List columnNames = ImmutableList.of("test1", "test2", "test3", "test4", "test5");
+ List types = ImmutableList.of(VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR);
+
OrcWriter writer = new OrcWriter(
new OutputStreamOrcDataSink(new FileOutputStream(tempFile.getFile())),
- ImmutableList.of("test1", "test2", "test3", "test4", "test5"),
- ImmutableList.of(VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR),
+ columnNames,
+ types,
+ OrcType.createOrcRowType(0, columnNames, types),
NONE,
new OrcWriterOptions()
.withStripeMinSize(new DataSize(0, MEGABYTE))
diff --git a/presto-orc/src/test/java/io/prestosql/orc/TestStructStreamReader.java b/presto-orc/src/test/java/io/prestosql/orc/TestStructStreamReader.java
index 773ae393da75..24981f29fb52 100644
--- a/presto-orc/src/test/java/io/prestosql/orc/TestStructStreamReader.java
+++ b/presto-orc/src/test/java/io/prestosql/orc/TestStructStreamReader.java
@@ -19,6 +19,7 @@
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.prestosql.metadata.Metadata;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.spi.Page;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
@@ -217,10 +218,14 @@ public void testExtraFieldsInWriter()
private void write(TempFile tempFile, Type writerType, List data)
throws IOException
{
+ List columnNames = ImmutableList.of(STRUCT_COL_NAME);
+ List types = ImmutableList.of(writerType);
+
OrcWriter writer = new OrcWriter(
new OutputStreamOrcDataSink(new FileOutputStream(tempFile.getFile())),
- ImmutableList.of(STRUCT_COL_NAME),
- ImmutableList.of(writerType),
+ columnNames,
+ types,
+ OrcType.createOrcRowType(0, columnNames, types),
NONE,
new OrcWriterOptions()
.withStripeMinSize(new DataSize(0, MEGABYTE))