diff --git a/pom.xml b/pom.xml
index 4f8b16dfcbe0..26fbafc519a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -456,7 +456,7 @@
io.prestosql.orc
orc-protobuf
- 9
+ 10
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveFileWriter.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/FileWriter.java
similarity index 96%
rename from presto-hive/src/main/java/io/prestosql/plugin/hive/HiveFileWriter.java
rename to presto-hive/src/main/java/io/prestosql/plugin/hive/FileWriter.java
index dfca109f00b7..673bc8501faf 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveFileWriter.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/FileWriter.java
@@ -17,7 +17,7 @@
import java.util.Optional;
-public interface HiveFileWriter
+public interface FileWriter
{
long getWrittenBytes();
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveFileWriterFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveFileWriterFactory.java
index 115875e3af1e..516ed11bb908 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveFileWriterFactory.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveFileWriterFactory.java
@@ -24,7 +24,7 @@
public interface HiveFileWriterFactory
{
- Optional createFileWriter(
+ Optional createFileWriter(
Path path,
List inputColumnNames,
StorageFormat storageFormat,
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriter.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriter.java
index 2ebeeb658e9f..bb1b44004973 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriter.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriter.java
@@ -25,7 +25,7 @@
public class HiveWriter
{
- private final HiveFileWriter fileWriter;
+ private final FileWriter fileWriter;
private final Optional partitionName;
private final UpdateMode updateMode;
private final String fileName;
@@ -38,7 +38,7 @@ public class HiveWriter
private long inputSizeInBytes;
public HiveWriter(
- HiveFileWriter fileWriter,
+ FileWriter fileWriter,
Optional partitionName,
UpdateMode updateMode,
String fileName,
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriterFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriterFactory.java
index 3547b8df83ea..82816c027ca6 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriterFactory.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriterFactory.java
@@ -437,9 +437,9 @@ else if (insertExistingPartitionsBehavior == InsertExistingPartitionsBehavior.ER
Path path = new Path(writeInfo.getWritePath(), fileNameWithExtension);
- HiveFileWriter hiveFileWriter = null;
+ FileWriter hiveFileWriter = null;
for (HiveFileWriterFactory fileWriterFactory : fileWriterFactories) {
- Optional fileWriter = fileWriterFactory.createFileWriter(
+ Optional fileWriter = fileWriterFactory.createFileWriter(
path,
dataColumns.stream()
.map(DataColumn::getName)
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/RcFileFileWriter.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/RcFileFileWriter.java
index 3696cd9afbba..a8b01487bab1 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/RcFileFileWriter.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/RcFileFileWriter.java
@@ -47,7 +47,7 @@
import static java.util.Objects.requireNonNull;
public class RcFileFileWriter
- implements HiveFileWriter
+ implements FileWriter
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(RcFileFileWriter.class).instanceSize();
private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/RcFileFileWriterFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/RcFileFileWriterFactory.java
index 70624bb24f62..8e6132812dc9 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/RcFileFileWriterFactory.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/RcFileFileWriterFactory.java
@@ -85,7 +85,7 @@ public RcFileFileWriterFactory(
}
@Override
- public Optional createFileWriter(
+ public Optional createFileWriter(
Path path,
List inputColumnNames,
StorageFormat storageFormat,
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/RecordFileWriter.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/RecordFileWriter.java
index 1e4b9c79ec57..9b4f8a934066 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/RecordFileWriter.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/RecordFileWriter.java
@@ -53,7 +53,7 @@
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardStructObjectInspector;
public class RecordFileWriter
- implements HiveFileWriter
+ implements FileWriter
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(RecordFileWriter.class).instanceSize();
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/SortingFileWriter.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/SortingFileWriter.java
index 8bc90d8cc3a7..f07f9f5eb83f 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/SortingFileWriter.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/SortingFileWriter.java
@@ -58,7 +58,7 @@
import static java.util.Objects.requireNonNull;
public class SortingFileWriter
- implements HiveFileWriter
+ implements FileWriter
{
private static final Logger log = Logger.get(SortingFileWriter.class);
@@ -70,7 +70,7 @@ public class SortingFileWriter
private final List types;
private final List sortFields;
private final List sortOrders;
- private final HiveFileWriter outputWriter;
+ private final FileWriter outputWriter;
private final SortBuffer sortBuffer;
private final TempFileSinkFactory tempFileSinkFactory;
private final Queue tempFiles = new PriorityQueue<>(comparing(TempFile::getSize));
@@ -79,7 +79,7 @@ public class SortingFileWriter
public SortingFileWriter(
FileSystem fileSystem,
Path tempFilePrefix,
- HiveFileWriter outputWriter,
+ FileWriter outputWriter,
DataSize maxMemory,
int maxOpenTempFiles,
List types,
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcFileWriter.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcFileWriter.java
index 34683f695f91..a1d9d4738c3f 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcFileWriter.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcFileWriter.java
@@ -20,8 +20,10 @@
import io.prestosql.orc.OrcWriter;
import io.prestosql.orc.OrcWriterOptions;
import io.prestosql.orc.OrcWriterStats;
+import io.prestosql.orc.metadata.ColumnMetadata;
import io.prestosql.orc.metadata.CompressionKind;
-import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.orc.metadata.OrcType;
+import io.prestosql.plugin.hive.FileWriter;
import io.prestosql.spi.Page;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.block.Block;
@@ -48,7 +50,7 @@
import static java.util.Objects.requireNonNull;
public class OrcFileWriter
- implements HiveFileWriter
+ implements FileWriter
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(OrcFileWriter.class).instanceSize();
private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
@@ -66,6 +68,7 @@ public OrcFileWriter(
Callable rollbackAction,
List columnNames,
List fileColumnTypes,
+ ColumnMetadata fileColumnOrcTypes,
CompressionKind compression,
OrcWriterOptions options,
boolean writeLegacyVersion,
@@ -82,6 +85,7 @@ public OrcFileWriter(
orcDataSink,
columnNames,
fileColumnTypes,
+ fileColumnOrcTypes,
compression,
options,
writeLegacyVersion,
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcFileWriterFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcFileWriterFactory.java
index 5b3bebc9f0c6..ae02f54b7e4b 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcFileWriterFactory.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/orc/OrcFileWriterFactory.java
@@ -22,10 +22,11 @@
import io.prestosql.orc.OrcWriterStats;
import io.prestosql.orc.OutputStreamOrcDataSink;
import io.prestosql.orc.metadata.CompressionKind;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
+import io.prestosql.plugin.hive.FileWriter;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveConfig;
-import io.prestosql.plugin.hive.HiveFileWriter;
import io.prestosql.plugin.hive.HiveFileWriterFactory;
import io.prestosql.plugin.hive.HiveMetadata;
import io.prestosql.plugin.hive.HiveSessionProperties;
@@ -126,7 +127,7 @@ public OrcWriterStats getStats()
}
@Override
- public Optional createFileWriter(
+ public Optional createFileWriter(
Path path,
List inputColumnNames,
StorageFormat storageFormat,
@@ -182,6 +183,7 @@ public Optional createFileWriter(
rollbackAction,
fileColumnNames,
fileColumnTypes,
+ OrcType.createRootOrcType(fileColumnNames, fileColumnTypes),
compression,
orcWriterOptions
.withStripeMinSize(getOrcOptimizedWriterMinStripeSize(session))
diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/util/TempFileWriter.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/util/TempFileWriter.java
index df45b9e9fc47..e4399f33a434 100644
--- a/presto-hive/src/main/java/io/prestosql/plugin/hive/util/TempFileWriter.java
+++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/util/TempFileWriter.java
@@ -20,6 +20,7 @@
import io.prestosql.orc.OrcWriter;
import io.prestosql.orc.OrcWriterOptions;
import io.prestosql.orc.OrcWriterStats;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.spi.Page;
import io.prestosql.spi.type.Type;
@@ -77,6 +78,7 @@ private static OrcWriter createOrcFileWriter(OrcDataSink sink, List types)
sink,
columnNames,
types,
+ OrcType.createRootOrcType(columnNames, types),
LZ4,
new OrcWriterOptions()
.withMaxStringStatisticsLimit(new DataSize(0, BYTE))
diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHiveFileFormats.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHiveFileFormats.java
index 9a0568ca644d..d559cad4b5e6 100644
--- a/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHiveFileFormats.java
+++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/AbstractTestHiveFileFormats.java
@@ -535,7 +535,7 @@ public static FileSplit createTestFile(
.map(TestColumn::getType)
.collect(Collectors.joining(",")));
- Optional fileWriter = fileWriterFactory.createFileWriter(
+ Optional fileWriter = fileWriterFactory.createFileWriter(
new Path(filePath),
testColumns.stream()
.map(TestColumn::getName)
@@ -545,7 +545,7 @@ public static FileSplit createTestFile(
jobConf,
session);
- HiveFileWriter hiveFileWriter = fileWriter.orElseThrow(() -> new IllegalArgumentException("fileWriterFactory"));
+ FileWriter hiveFileWriter = fileWriter.orElseThrow(() -> new IllegalArgumentException("fileWriterFactory"));
hiveFileWriter.appendRows(page);
hiveFileWriter.commit();
diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java
index 86efc644a437..3863002d18f8 100644
--- a/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java
+++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/benchmark/FileFormat.java
@@ -20,6 +20,7 @@
import io.prestosql.orc.OrcWriterOptions;
import io.prestosql.orc.OrcWriterStats;
import io.prestosql.orc.OutputStreamOrcDataSink;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveColumnHandle;
@@ -450,6 +451,7 @@ public PrestoOrcFormatWriter(File targetFile, List columnNames, Listpresto-hive
+
+ io.prestosql
+ presto-orc
+
+
io.prestosql
presto-parquet
@@ -164,6 +169,18 @@
${dep.iceberg.version}
+
+ ${dep.iceberg.groupId}
+ iceberg-orc
+ ${dep.iceberg.version}
+
+
+ org.apache.orc
+ orc-core
+
+
+
+
${dep.iceberg.groupId}
iceberg-parquet
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergConfig.java
index b5e08ecd45bd..074b922b7262 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergConfig.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergConfig.java
@@ -14,12 +14,19 @@
package io.prestosql.plugin.iceberg;
import io.airlift.configuration.Config;
+import io.prestosql.plugin.hive.HiveCompressionCodec;
+import org.apache.iceberg.FileFormat;
import javax.validation.constraints.Min;
+import static io.prestosql.plugin.hive.HiveCompressionCodec.GZIP;
+import static io.prestosql.plugin.iceberg.IcebergFileFormat.ORC;
+
public class IcebergConfig
{
private long metastoreTransactionCacheSize = 1000;
+ private IcebergFileFormat fileFormat = ORC;
+ private HiveCompressionCodec compressionCodec = GZIP;
@Min(1)
public long getMetastoreTransactionCacheSize()
@@ -33,4 +40,28 @@ public IcebergConfig setMetastoreTransactionCacheSize(long metastoreTransactionC
this.metastoreTransactionCacheSize = metastoreTransactionCacheSize;
return this;
}
+
+ public FileFormat getFileFormat()
+ {
+ return FileFormat.valueOf(fileFormat.name());
+ }
+
+ @Config("iceberg.file-format")
+ public IcebergConfig setFileFormat(IcebergFileFormat fileFormat)
+ {
+ this.fileFormat = fileFormat;
+ return this;
+ }
+
+ public HiveCompressionCodec getCompressionCodec()
+ {
+ return compressionCodec;
+ }
+
+ @Config("iceberg.compression-codec")
+ public IcebergConfig setCompressionCodec(HiveCompressionCodec compressionCodec)
+ {
+ this.compressionCodec = compressionCodec;
+ return this;
+ }
}
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergErrorCode.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergErrorCode.java
index 47625623ac12..7428ec034efb 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergErrorCode.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergErrorCode.java
@@ -18,6 +18,7 @@
import io.prestosql.spi.ErrorType;
import static io.prestosql.spi.ErrorType.EXTERNAL;
+import static io.prestosql.spi.ErrorType.INTERNAL_ERROR;
import static io.prestosql.spi.ErrorType.USER_ERROR;
public enum IcebergErrorCode
@@ -30,6 +31,10 @@ public enum IcebergErrorCode
ICEBERG_BAD_DATA(4, EXTERNAL),
ICEBERG_MISSING_DATA(5, EXTERNAL),
ICEBERG_CANNOT_OPEN_SPLIT(6, EXTERNAL),
+ ICEBERG_WRITER_OPEN_ERROR(7, EXTERNAL),
+ ICEBERG_FILESYSTEM_ERROR(8, EXTERNAL),
+ ICEBERG_CURSOR_ERROR(9, EXTERNAL),
+ ICEBERG_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR),
/**/;
private final ErrorCode errorCode;
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileFormat.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileFormat.java
new file mode 100644
index 000000000000..bf83eb9b10f6
--- /dev/null
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileFormat.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.iceberg;
+
+public enum IcebergFileFormat
+{
+ ORC,
+ PARQUET,
+}
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriterFactory.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriterFactory.java
new file mode 100644
index 000000000000..5d31504c5954
--- /dev/null
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergFileWriterFactory.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.iceberg;
+
+import com.google.common.collect.ImmutableMap;
+import io.prestosql.orc.OrcDataSink;
+import io.prestosql.orc.OrcDataSource;
+import io.prestosql.orc.OrcDataSourceId;
+import io.prestosql.orc.OrcReaderOptions;
+import io.prestosql.orc.OrcWriterOptions;
+import io.prestosql.orc.OrcWriterStats;
+import io.prestosql.orc.OutputStreamOrcDataSink;
+import io.prestosql.plugin.hive.FileFormatDataSourceStats;
+import io.prestosql.plugin.hive.FileWriter;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveStorageFormat;
+import io.prestosql.plugin.hive.NodeVersion;
+import io.prestosql.plugin.hive.RecordFileWriter;
+import io.prestosql.plugin.hive.orc.HdfsOrcDataSource;
+import io.prestosql.plugin.hive.orc.OrcFileWriter;
+import io.prestosql.plugin.hive.orc.OrcWriterConfig;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.weakref.jmx.Flatten;
+import org.weakref.jmx.Managed;
+
+import javax.inject.Inject;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveMetadata.PRESTO_QUERY_ID_NAME;
+import static io.prestosql.plugin.hive.HiveMetadata.PRESTO_VERSION_NAME;
+import static io.prestosql.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat;
+import static io.prestosql.plugin.hive.util.ParquetRecordWriterUtil.setParquetSchema;
+import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR;
+import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITE_VALIDATION_FAILED;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getCompressionCodec;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcStringStatisticsLimit;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcWriterMaxDictionaryMemory;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcWriterMaxStripeRows;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcWriterMaxStripeSize;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcWriterMinStripeSize;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcWriterValidateMode;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.isOrcWriterValidate;
+import static io.prestosql.plugin.iceberg.TypeConverter.toHiveType;
+import static io.prestosql.plugin.iceberg.TypeConverter.toOrcType;
+import static io.prestosql.plugin.iceberg.TypeConverter.toPrestoType;
+import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.joining;
+import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;
+import static org.joda.time.DateTimeZone.UTC;
+
+public class IcebergFileWriterFactory
+{
+ private final HdfsEnvironment hdfsEnvironment;
+ private final TypeManager typeManager;
+ private final NodeVersion nodeVersion;
+ private final FileFormatDataSourceStats readStats;
+ private final OrcWriterStats orcWriterStats = new OrcWriterStats();
+ private final OrcWriterOptions orcWriterOptions;
+
+ @Inject
+ public IcebergFileWriterFactory(
+ HdfsEnvironment hdfsEnvironment,
+ TypeManager typeManager,
+ NodeVersion nodeVersion,
+ FileFormatDataSourceStats readStats,
+ OrcWriterConfig orcWriterConfig)
+ {
+ checkArgument(!requireNonNull(orcWriterConfig, "orcWriterConfig is null").isUseLegacyVersion(), "the ORC writer shouldn't be configured to use a legacy version");
+ this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
+ this.readStats = requireNonNull(readStats, "readStats is null");
+ this.orcWriterOptions = orcWriterConfig.toOrcWriterOptions();
+ }
+
+ @Managed
+ @Flatten
+ public OrcWriterStats getOrcWriterStats()
+ {
+ return orcWriterStats;
+ }
+
+ public FileWriter createFileWriter(
+ Path outputPath,
+ Schema icebergSchema,
+ List columns,
+ JobConf jobConf,
+ ConnectorSession session,
+ FileFormat fileFormat)
+ {
+ switch (fileFormat) {
+ case PARQUET:
+ return createParquetWriter(outputPath, icebergSchema, columns, jobConf, session);
+ case ORC:
+ return createOrcWriter(outputPath, icebergSchema, jobConf, session);
+ }
+ throw new PrestoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat);
+ }
+
+ private FileWriter createParquetWriter(
+ Path outputPath,
+ Schema icebergSchema,
+ List columns,
+ JobConf jobConf,
+ ConnectorSession session)
+ {
+ Properties properties = new Properties();
+ properties.setProperty(IOConstants.COLUMNS, columns.stream()
+ .map(IcebergColumnHandle::getName)
+ .collect(joining(",")));
+ properties.setProperty(IOConstants.COLUMNS_TYPES, columns.stream()
+ .map(column -> toHiveType(column.getType()).getHiveTypeName().toString())
+ .collect(joining(":")));
+
+ setParquetSchema(jobConf, convert(icebergSchema, "table"));
+ jobConf.set(ParquetOutputFormat.COMPRESSION, getCompressionCodec(session).getParquetCompressionCodec().name());
+
+ return new RecordFileWriter(
+ outputPath,
+ columns.stream()
+ .map(IcebergColumnHandle::getName)
+ .collect(toImmutableList()),
+ fromHiveStorageFormat(HiveStorageFormat.PARQUET),
+ properties,
+ HiveStorageFormat.PARQUET.getEstimatedWriterSystemMemoryUsage(),
+ jobConf,
+ typeManager,
+ session);
+ }
+
+ private FileWriter createOrcWriter(
+ Path outputPath,
+ Schema icebergSchema,
+ JobConf jobConf,
+ ConnectorSession session)
+ {
+ try {
+ FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getUser(), outputPath, jobConf);
+ OrcDataSink orcDataSink = new OutputStreamOrcDataSink(fileSystem.create(outputPath));
+ Callable rollbackAction = () -> {
+ fileSystem.delete(outputPath, false);
+ return null;
+ };
+
+ List columnFields = icebergSchema.columns();
+ List fileColumnNames = columnFields.stream()
+ .map(Types.NestedField::name)
+ .collect(toImmutableList());
+ List fileColumnTypes = columnFields.stream()
+ .map(Types.NestedField::type)
+ .map(type -> toPrestoType(type, typeManager))
+ .collect(toImmutableList());
+
+ Optional> validationInputFactory = Optional.empty();
+ if (isOrcWriterValidate(session)) {
+ validationInputFactory = Optional.of(() -> {
+ try {
+ return new HdfsOrcDataSource(
+ new OrcDataSourceId(outputPath.toString()),
+ fileSystem.getFileStatus(outputPath).getLen(),
+ new OrcReaderOptions(),
+ fileSystem.open(outputPath),
+ readStats);
+ }
+ catch (IOException e) {
+ throw new PrestoException(ICEBERG_WRITE_VALIDATION_FAILED, e);
+ }
+ });
+ }
+
+ return new OrcFileWriter(
+ orcDataSink,
+ rollbackAction,
+ fileColumnNames,
+ fileColumnTypes,
+ toOrcType(icebergSchema),
+ getCompressionCodec(session).getOrcCompressionKind(),
+ orcWriterOptions
+ .withStripeMinSize(getOrcWriterMinStripeSize(session))
+ .withStripeMaxSize(getOrcWriterMaxStripeSize(session))
+ .withStripeMaxRowCount(getOrcWriterMaxStripeRows(session))
+ .withDictionaryMaxMemory(getOrcWriterMaxDictionaryMemory(session))
+ .withMaxStringStatisticsLimit(getOrcStringStatisticsLimit(session)),
+ false,
+ IntStream.range(0, fileColumnNames.size()).toArray(),
+ ImmutableMap.builder()
+ .put(PRESTO_VERSION_NAME, nodeVersion.toString())
+ .put(PRESTO_QUERY_ID_NAME, session.getQueryId())
+ .build(),
+ UTC,
+ validationInputFactory,
+ getOrcWriterValidateMode(session),
+ orcWriterStats);
+ }
+ catch (IOException e) {
+ throw new PrestoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating ORC file", e);
+ }
+ }
+}
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java
index 7de46efc2c98..7bfe300c5fbb 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergMetadata.java
@@ -55,6 +55,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
@@ -108,6 +109,7 @@
import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
import static org.apache.iceberg.TableMetadata.newTableMetadata;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.Transactions.createTableTransaction;
public class IcebergMetadata
@@ -314,7 +316,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
throw new TableAlreadyExistsException(schemaTableName);
}
- TableMetadata metadata = newTableMetadata(operations, schema, partitionSpec, targetPath);
+ FileFormat fileFormat = (FileFormat) tableMetadata.getProperties().get(FILE_FORMAT_PROPERTY);
+ TableMetadata metadata = newTableMetadata(operations, schema, partitionSpec, targetPath, ImmutableMap.of(DEFAULT_FILE_FORMAT, fileFormat.toString()));
transaction = createTableTransaction(operations, metadata);
@@ -325,7 +328,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
PartitionSpecParser.toJson(metadata.spec()),
getColumns(metadata.schema(), typeManager),
targetPath,
- getFileFormat(tableMetadata.getProperties()));
+ fileFormat);
}
@Override
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergModule.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergModule.java
index cbc125981ff4..86376bfa931d 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergModule.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergModule.java
@@ -28,6 +28,8 @@
import io.prestosql.plugin.hive.HiveTransactionManager;
import io.prestosql.plugin.hive.LocationService;
import io.prestosql.plugin.hive.NamenodeStats;
+import io.prestosql.plugin.hive.orc.OrcReaderConfig;
+import io.prestosql.plugin.hive.orc.OrcWriterConfig;
import io.prestosql.plugin.hive.parquet.ParquetReaderConfig;
import io.prestosql.plugin.hive.parquet.ParquetWriterConfig;
import io.prestosql.spi.connector.ConnectorNodePartitioningProvider;
@@ -67,6 +69,9 @@ public void configure(Binder binder)
binder.bind(HdfsConfigurationInitializer.class).in(Scopes.SINGLETON);
newSetBinder(binder, DynamicConfigurationProvider.class);
+ configBinder(binder).bindConfig(OrcReaderConfig.class);
+ configBinder(binder).bindConfig(OrcWriterConfig.class);
+
configBinder(binder).bindConfig(ParquetReaderConfig.class);
configBinder(binder).bindConfig(ParquetWriterConfig.class);
@@ -78,5 +83,8 @@ public void configure(Binder binder)
binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(FileFormatDataSourceStats.class).withGeneratedName();
+
+ binder.bind(IcebergFileWriterFactory.class).in(Scopes.SINGLETON);
+ newExporter(binder).export(IcebergFileWriterFactory.class).withGeneratedName();
}
}
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java
index d6a3acda03fa..a3e462773223 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSink.java
@@ -16,11 +16,9 @@
import com.google.common.collect.ImmutableList;
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
+import io.prestosql.plugin.hive.FileWriter;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
-import io.prestosql.plugin.hive.HiveFileWriter;
-import io.prestosql.plugin.hive.HiveStorageFormat;
-import io.prestosql.plugin.hive.RecordFileWriter;
import io.prestosql.plugin.iceberg.PartitionTransforms.ColumnTransform;
import io.prestosql.spi.Page;
import io.prestosql.spi.PageIndexer;
@@ -41,11 +39,9 @@
import io.prestosql.spi.type.TimestampWithTimeZoneType;
import io.prestosql.spi.type.TinyintType;
import io.prestosql.spi.type.Type;
-import io.prestosql.spi.type.TypeManager;
import io.prestosql.spi.type.VarbinaryType;
import io.prestosql.spi.type.VarcharType;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.mapred.JobConf;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;
@@ -54,6 +50,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.orc.OrcMetrics;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.transforms.Transform;
@@ -63,7 +60,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
@@ -71,12 +67,9 @@
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.Slices.wrappedBuffer;
-import static io.prestosql.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static io.prestosql.plugin.hive.util.ConfigurationUtils.toJobConf;
-import static io.prestosql.plugin.hive.util.ParquetRecordWriterUtil.setParquetSchema;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_TOO_MANY_OPEN_PARTITIONS;
import static io.prestosql.plugin.iceberg.PartitionTransforms.getColumnTransform;
-import static io.prestosql.plugin.iceberg.TypeConverter.toHiveType;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.spi.type.DateTimeEncoding.unpackMillisUtc;
import static io.prestosql.spi.type.Decimals.readBigDecimal;
@@ -87,8 +80,6 @@
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.stream.Collectors.joining;
-import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;
public class IcebergPageSink
implements ConnectorPageSink
@@ -100,12 +91,12 @@ public class IcebergPageSink
private final Schema outputSchema;
private final PartitionSpec partitionSpec;
private final String outputPath;
+ private final IcebergFileWriterFactory fileWriterFactory;
private final HdfsEnvironment hdfsEnvironment;
private final JobConf jobConf;
private final List inputColumns;
private final JsonCodec jsonCodec;
private final ConnectorSession session;
- private final TypeManager typeManager;
private final FileFormat fileFormat;
private final PagePartitioner pagePartitioner;
@@ -119,11 +110,11 @@ public IcebergPageSink(
Schema outputSchema,
PartitionSpec partitionSpec,
String outputPath,
+ IcebergFileWriterFactory fileWriterFactory,
PageIndexerFactory pageIndexerFactory,
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
List inputColumns,
- TypeManager typeManager,
JsonCodec jsonCodec,
ConnectorSession session,
FileFormat fileFormat)
@@ -132,12 +123,12 @@ public IcebergPageSink(
this.outputSchema = requireNonNull(outputSchema, "outputSchema is null");
this.partitionSpec = requireNonNull(partitionSpec, "partitionSpec is null");
this.outputPath = requireNonNull(outputPath, "outputPath is null");
+ this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
requireNonNull(hdfsContext, "hdfsContext is null");
this.jobConf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, new Path(outputPath)));
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
this.session = requireNonNull(session, "session is null");
- this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.inputColumns = ImmutableList.copyOf(inputColumns);
this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec));
@@ -267,7 +258,7 @@ private void writePage(Page page)
pageForWriter = pageForWriter.getPositions(positions, 0, positions.length);
}
- HiveFileWriter writer = writers.get(index).getWriter();
+ FileWriter writer = writers.get(index).getWriter();
long currentWritten = writer.getWrittenBytes();
long currentMemory = writer.getSystemMemoryUsage();
@@ -321,44 +312,15 @@ private WriteContext createWriter(Optional partitionPath, Optional toHiveType(column.getType()).getHiveTypeName().toString())
- .collect(joining(":")));
-
- setParquetSchema(jobConf, convert(outputSchema, "table"));
-
- return new RecordFileWriter(
+ FileWriter writer = fileWriterFactory.createFileWriter(
outputPath,
- inputColumns.stream()
- .map(IcebergColumnHandle::getName)
- .collect(toImmutableList()),
- fromHiveStorageFormat(HiveStorageFormat.PARQUET),
- properties,
- HiveStorageFormat.PARQUET.getEstimatedWriterSystemMemoryUsage(),
+ outputSchema,
+ inputColumns,
jobConf,
- typeManager,
- session);
+ session,
+ fileFormat);
+
+ return new WriteContext(writer, outputPath, partitionData);
}
@SuppressWarnings("SwitchStatementWithTooFewBranches")
@@ -367,6 +329,9 @@ private Metrics readMetrics(Path path)
switch (fileFormat) {
case PARQUET:
return ParquetUtil.fileMetrics(HadoopInputFile.fromPath(path, jobConf), MetricsConfig.getDefault());
+ case ORC:
+ // TODO: update Iceberg version after OrcMetrics is completed
+ return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(path, jobConf), jobConf);
}
throw new PrestoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat);
}
@@ -452,18 +417,18 @@ private static List toPartitionColumns(List partitionData;
- public WriteContext(HiveFileWriter writer, Path path, Optional partitionData)
+ public WriteContext(FileWriter writer, Path path, Optional partitionData)
{
this.writer = requireNonNull(writer, "writer is null");
this.path = requireNonNull(path, "path is null");
this.partitionData = requireNonNull(partitionData, "partitionData is null");
}
- public HiveFileWriter getWriter()
+ public FileWriter getWriter()
{
return writer;
}
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSinkProvider.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSinkProvider.java
index 6556e9091589..d99b44fa4981 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSinkProvider.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSinkProvider.java
@@ -23,7 +23,6 @@
import io.prestosql.spi.connector.ConnectorPageSinkProvider;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
-import io.prestosql.spi.type.TypeManager;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
@@ -38,19 +37,19 @@ public class IcebergPageSinkProvider
{
private final HdfsEnvironment hdfsEnvironment;
private final JsonCodec jsonCodec;
- private final TypeManager typeManager;
+ private final IcebergFileWriterFactory fileWriterFactory;
private final PageIndexerFactory pageIndexerFactory;
@Inject
public IcebergPageSinkProvider(
HdfsEnvironment hdfsEnvironment,
JsonCodec jsonCodec,
- TypeManager typeManager,
+ IcebergFileWriterFactory fileWriterFactory,
PageIndexerFactory pageIndexerFactory)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
- this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
}
@@ -75,11 +74,11 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab
schema,
partitionSpec,
tableHandle.getOutputPath(),
+ fileWriterFactory,
pageIndexerFactory,
hdfsEnvironment,
hdfsContext,
tableHandle.getInputColumns(),
- typeManager,
jsonCodec,
session,
tableHandle.getFileFormat());
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java
index bdff866971e1..6a6f33118499 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java
@@ -16,6 +16,15 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.prestosql.memory.context.AggregatedMemoryContext;
+import io.prestosql.orc.OrcColumn;
+import io.prestosql.orc.OrcCorruptionException;
+import io.prestosql.orc.OrcDataSource;
+import io.prestosql.orc.OrcDataSourceId;
+import io.prestosql.orc.OrcReader;
+import io.prestosql.orc.OrcReaderOptions;
+import io.prestosql.orc.OrcRecordReader;
+import io.prestosql.orc.TupleDomainOrcPredicate;
+import io.prestosql.orc.TupleDomainOrcPredicate.TupleDomainOrcPredicateBuilder;
import io.prestosql.parquet.Field;
import io.prestosql.parquet.ParquetCorruptionException;
import io.prestosql.parquet.ParquetDataSource;
@@ -27,6 +36,10 @@
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HdfsEnvironment.HdfsContext;
+import io.prestosql.plugin.hive.orc.HdfsOrcDataSource;
+import io.prestosql.plugin.hive.orc.OrcPageSource;
+import io.prestosql.plugin.hive.orc.OrcPageSource.ColumnAdaptation;
+import io.prestosql.plugin.hive.orc.OrcReaderConfig;
import io.prestosql.plugin.hive.parquet.ParquetPageSource;
import io.prestosql.plugin.hive.parquet.ParquetReaderConfig;
import io.prestosql.spi.PrestoException;
@@ -47,6 +60,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.iceberg.FileFormat;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.FileMetaData;
@@ -66,7 +80,9 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static com.google.common.collect.Maps.uniqueIndex;
import static io.prestosql.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
+import static io.prestosql.orc.OrcReader.INITIAL_BATCH_SIZE;
import static io.prestosql.parquet.ParquetTypeUtils.getColumnIO;
import static io.prestosql.parquet.ParquetTypeUtils.getDescriptors;
import static io.prestosql.parquet.ParquetTypeUtils.getParquetTypeByName;
@@ -76,30 +92,47 @@
import static io.prestosql.plugin.hive.parquet.ParquetColumnIOConverter.constructField;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT;
+import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_CURSOR_ERROR;
+import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.prestosql.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_DATA;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcLazyReadSmallRanges;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcMaxBufferSize;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcMaxMergeDistance;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcMaxReadBlockSize;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcStreamBufferSize;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getOrcTinyStripeThreshold;
import static io.prestosql.plugin.iceberg.IcebergSessionProperties.getParquetMaxReadBlockSize;
import static io.prestosql.plugin.iceberg.IcebergSessionProperties.isFailOnCorruptedParquetStatistics;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.isOrcBloomFiltersEnabled;
+import static io.prestosql.plugin.iceberg.IcebergSessionProperties.isOrcNestedLazy;
+import static io.prestosql.plugin.iceberg.TypeConverter.ORC_ICEBERG_ID_KEY;
+import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
+import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
+import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
+import static org.joda.time.DateTimeZone.UTC;
public class IcebergPageSourceProvider
implements ConnectorPageSourceProvider
{
private final HdfsEnvironment hdfsEnvironment;
private final FileFormatDataSourceStats fileFormatDataSourceStats;
- private final ParquetReaderOptions options;
+ private final OrcReaderOptions orcReaderOptions;
+ private final ParquetReaderOptions parquetReaderOptions;
@Inject
public IcebergPageSourceProvider(
HdfsEnvironment hdfsEnvironment,
FileFormatDataSourceStats fileFormatDataSourceStats,
- ParquetReaderConfig config)
+ OrcReaderConfig orcReaderConfig,
+ ParquetReaderConfig parquetReaderConfig)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.fileFormatDataSourceStats = requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null");
- requireNonNull(config, "config is null");
- this.options = config.toParquetReaderOptions();
+ this.orcReaderOptions = requireNonNull(orcReaderConfig, "orcReaderConfig is null").toOrcReaderOptions();
+ this.parquetReaderOptions = requireNonNull(parquetReaderConfig, "parquetReaderConfig is null").toParquetReaderOptions();
}
@Override
@@ -119,25 +152,185 @@ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transacti
.filter(column -> !partitionKeys.containsKey(column.getId()))
.collect(toImmutableList());
- Path path = new Path(split.getPath());
- long start = split.getStart();
- long length = split.getLength();
HdfsContext hdfsContext = new HdfsContext(session, table.getSchemaName(), table.getTableName());
- ConnectorPageSource parquetPageSource = createParquetPageSource(
- hdfsEnvironment,
- session.getUser(),
- hdfsEnvironment.getConfiguration(hdfsContext, path),
- path,
- start,
- length,
+ ConnectorPageSource dataPageSource = createDataPageSource(
+ session,
+ hdfsContext,
+ new Path(split.getPath()),
+ split.getStart(),
+ split.getLength(),
+ split.getFileFormat(),
regularColumns,
- options
- .withFailOnCorruptedStatistics(isFailOnCorruptedParquetStatistics(session))
- .withMaxReadBlockSize(getParquetMaxReadBlockSize(session)),
- split.getPredicate(),
- fileFormatDataSourceStats);
+ split.getPredicate());
- return new IcebergPageSource(icebergColumns, partitionKeys, parquetPageSource, session.getTimeZoneKey());
+ return new IcebergPageSource(icebergColumns, partitionKeys, dataPageSource, session.getTimeZoneKey());
+ }
+
+ private ConnectorPageSource createDataPageSource(
+ ConnectorSession session,
+ HdfsContext hdfsContext,
+ Path path,
+ long start,
+ long length,
+ FileFormat fileFormat,
+ List dataColumns,
+ TupleDomain predicate)
+ {
+ switch (fileFormat) {
+ case ORC:
+ FileSystem fileSystem = null;
+ FileStatus fileStatus = null;
+ try {
+ fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, path);
+ fileStatus = fileSystem.getFileStatus(path);
+ }
+ catch (IOException e) {
+ throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, e);
+ }
+ long fileSize = fileStatus.getLen();
+ return createOrcPageSource(
+ hdfsEnvironment,
+ session.getUser(),
+ hdfsEnvironment.getConfiguration(hdfsContext, path),
+ path,
+ start,
+ length,
+ fileSize,
+ dataColumns,
+ predicate,
+ orcReaderOptions
+ .withMaxMergeDistance(getOrcMaxMergeDistance(session))
+ .withMaxBufferSize(getOrcMaxBufferSize(session))
+ .withStreamBufferSize(getOrcStreamBufferSize(session))
+ .withTinyStripeThreshold(getOrcTinyStripeThreshold(session))
+ .withMaxReadBlockSize(getOrcMaxReadBlockSize(session))
+ .withLazyReadSmallRanges(getOrcLazyReadSmallRanges(session))
+ .withNestedLazy(isOrcNestedLazy(session))
+ .withBloomFiltersEnabled(isOrcBloomFiltersEnabled(session)),
+ fileFormatDataSourceStats);
+ case PARQUET:
+ return createParquetPageSource(
+ hdfsEnvironment,
+ session.getUser(),
+ hdfsEnvironment.getConfiguration(hdfsContext, path),
+ path,
+ start,
+ length,
+ dataColumns,
+ parquetReaderOptions
+ .withFailOnCorruptedStatistics(isFailOnCorruptedParquetStatistics(session))
+ .withMaxReadBlockSize(getParquetMaxReadBlockSize(session)),
+ predicate,
+ fileFormatDataSourceStats);
+ }
+ throw new PrestoException(NOT_SUPPORTED, "File format not supported for Iceberg: " + fileFormat);
+ }
+
+ private static ConnectorPageSource createOrcPageSource(
+ HdfsEnvironment hdfsEnvironment,
+ String user,
+ Configuration configuration,
+ Path path,
+ long start,
+ long length,
+ long fileSize,
+ List columns,
+ TupleDomain effectivePredicate,
+ OrcReaderOptions options,
+ FileFormatDataSourceStats stats)
+ {
+ OrcDataSource orcDataSource = null;
+ try {
+ FileSystem fileSystem = hdfsEnvironment.getFileSystem(user, path, configuration);
+ FSDataInputStream inputStream = hdfsEnvironment.doAs(user, () -> fileSystem.open(path));
+ orcDataSource = new HdfsOrcDataSource(
+ new OrcDataSourceId(path.toString()),
+ fileSize,
+ options,
+ inputStream,
+ stats);
+
+ OrcReader reader = new OrcReader(orcDataSource, options);
+ List fileColumns = reader.getRootColumn().getNestedColumns();
+ Map fileColumnsByIcebergId = fileColumns.stream()
+ .filter(orcColumn -> orcColumn.getAttributes().containsKey(ORC_ICEBERG_ID_KEY))
+ .collect(toImmutableMap(
+ orcColumn -> Integer.valueOf(orcColumn.getAttributes().get(ORC_ICEBERG_ID_KEY)),
+ identity()));
+ Map fileColumnsByName = null;
+ if (fileColumnsByIcebergId.isEmpty()) {
+ fileColumnsByName = uniqueIndex(fileColumns, orcColumn -> orcColumn.getColumnName().toLowerCase(ENGLISH));
+ }
+
+ TupleDomainOrcPredicateBuilder predicateBuilder = TupleDomainOrcPredicate.builder()
+ .setBloomFiltersEnabled(options.isBloomFiltersEnabled());
+ Map effectivePredicateDomains = effectivePredicate.getDomains()
+ .orElseThrow(() -> new IllegalArgumentException("Effective predicate is none"));
+ List fileReadColumns = new ArrayList<>(columns.size());
+ List fileReadTypes = new ArrayList<>(columns.size());
+ List columnAdaptations = new ArrayList<>(columns.size());
+ for (IcebergColumnHandle column : columns) {
+ OrcColumn orcColumn;
+ if (fileColumnsByIcebergId.isEmpty()) {
+ orcColumn = fileColumnsByName.get(column.getName().toLowerCase(ENGLISH));
+ }
+ else {
+ orcColumn = fileColumnsByIcebergId.get(column.getId());
+ }
+ Type readType = column.getType();
+ if (orcColumn != null) {
+ int sourceIndex = fileReadColumns.size();
+ columnAdaptations.add(ColumnAdaptation.sourceColumn(sourceIndex));
+ fileReadColumns.add(orcColumn);
+ fileReadTypes.add(readType);
+
+ Domain domain = effectivePredicateDomains.get(column);
+ if (domain != null) {
+ predicateBuilder.addColumn(orcColumn.getColumnId(), domain);
+ }
+ }
+ else {
+ columnAdaptations.add(ColumnAdaptation.nullColumn(readType));
+ }
+ }
+
+ AggregatedMemoryContext systemMemoryUsage = newSimpleAggregatedMemoryContext();
+ OrcDataSourceId orcDataSourceId = orcDataSource.getId();
+ OrcRecordReader recordReader = reader.createRecordReader(
+ fileReadColumns,
+ fileReadTypes,
+ predicateBuilder.build(),
+ start,
+ length,
+ UTC,
+ systemMemoryUsage,
+ INITIAL_BATCH_SIZE,
+ exception -> handleException(orcDataSourceId, exception));
+
+ return new OrcPageSource(
+ recordReader,
+ columnAdaptations,
+ orcDataSource,
+ systemMemoryUsage,
+ stats);
+ }
+ catch (Exception e) {
+ if (orcDataSource != null) {
+ try {
+ orcDataSource.close();
+ }
+ catch (IOException ignored) {
+ }
+ }
+ if (e instanceof PrestoException) {
+ throw (PrestoException) e;
+ }
+ String message = format("Error opening Iceberg split %s (offset=%s, length=%s): %s", path, start, length, e.getMessage());
+ if (e instanceof BlockMissingException) {
+ throw new PrestoException(ICEBERG_MISSING_DATA, message, e);
+ }
+ throw new PrestoException(ICEBERG_CANNOT_OPEN_SPLIT, message, e);
+ }
}
private static ConnectorPageSource createParquetPageSource(
@@ -266,4 +459,15 @@ private static TupleDomain getParquetTupleDomain(Map> sessionProperties;
@Inject
public IcebergSessionProperties(
+ IcebergConfig icebergConfig,
+ OrcReaderConfig orcReaderConfig,
+ OrcWriterConfig orcWriterConfig,
ParquetReaderConfig parquetReaderConfig,
ParquetWriterConfig parquetWriterConfig)
{
sessionProperties = ImmutableList.>builder()
+ .add(enumProperty(
+ COMPRESSION_CODEC,
+ "Compression codec to use when writing files",
+ HiveCompressionCodec.class,
+ icebergConfig.getCompressionCodec(),
+ false))
+ .add(booleanProperty(
+ ORC_BLOOM_FILTERS_ENABLED,
+ "ORC: Enable bloom filters for predicate pushdown",
+ orcReaderConfig.isBloomFiltersEnabled(),
+ false))
+ .add(dataSizeProperty(
+ ORC_MAX_MERGE_DISTANCE,
+ "ORC: Maximum size of gap between two reads to merge into a single read",
+ orcReaderConfig.getMaxMergeDistance(),
+ false))
+ .add(dataSizeProperty(
+ ORC_MAX_BUFFER_SIZE,
+ "ORC: Maximum size of a single read",
+ orcReaderConfig.getMaxBufferSize(),
+ false))
+ .add(dataSizeProperty(
+ ORC_STREAM_BUFFER_SIZE,
+ "ORC: Size of buffer for streaming reads",
+ orcReaderConfig.getStreamBufferSize(),
+ false))
+ .add(dataSizeProperty(
+ ORC_TINY_STRIPE_THRESHOLD,
+ "ORC: Threshold below which an ORC stripe or file will read in its entirety",
+ orcReaderConfig.getTinyStripeThreshold(),
+ false))
+ .add(dataSizeProperty(
+ ORC_MAX_READ_BLOCK_SIZE,
+ "ORC: Soft max size of Presto blocks produced by ORC reader",
+ orcReaderConfig.getMaxBlockSize(),
+ false))
+ .add(booleanProperty(
+ ORC_LAZY_READ_SMALL_RANGES,
+ "Experimental: ORC: Read small file segments lazily",
+ orcReaderConfig.isLazyReadSmallRanges(),
+ false))
+ .add(booleanProperty(
+ ORC_NESTED_LAZY_ENABLED,
+ "Experimental: ORC: Lazily read nested data",
+ orcReaderConfig.isNestedLazy(),
+ false))
+ .add(dataSizeProperty(
+ ORC_STRING_STATISTICS_LIMIT,
+ "ORC: Maximum size of string statistics; drop if exceeding",
+ orcWriterConfig.getStringStatisticsLimit(),
+ false))
+ .add(new PropertyMetadata<>(
+ ORC_WRITER_VALIDATE_PERCENTAGE,
+ "ORC: Percentage of written files to validate by re-reading them",
+ DOUBLE,
+ Double.class,
+ orcWriterConfig.getValidationPercentage(),
+ false,
+ value -> {
+ double doubleValue = ((Number) value).doubleValue();
+ if (doubleValue < 0.0 || doubleValue > 100.0) {
+ throw new PrestoException(INVALID_SESSION_PROPERTY, format(
+ "%s must be between 0.0 and 100.0 inclusive: %s",
+ ORC_WRITER_VALIDATE_PERCENTAGE,
+ doubleValue));
+ }
+ return doubleValue;
+ },
+ value -> value))
+ .add(enumProperty(
+ ORC_WRITER_VALIDATE_MODE,
+ "ORC: Level of detail in ORC validation",
+ OrcWriteValidationMode.class,
+ orcWriterConfig.getValidationMode(),
+ false))
+ .add(dataSizeProperty(
+ ORC_WRITER_MIN_STRIPE_SIZE,
+ "ORC: Min stripe size",
+ orcWriterConfig.getStripeMinSize(),
+ false))
+ .add(dataSizeProperty(
+ ORC_WRITER_MAX_STRIPE_SIZE,
+ "ORC: Max stripe size",
+ orcWriterConfig.getStripeMaxSize(),
+ false))
+ .add(integerProperty(
+ ORC_WRITER_MAX_STRIPE_ROWS,
+ "ORC: Max stripe row count",
+ orcWriterConfig.getStripeMaxRowCount(),
+ false))
+ .add(dataSizeProperty(
+ ORC_WRITER_MAX_DICTIONARY_MEMORY,
+ "ORC: Max dictionary memory",
+ orcWriterConfig.getDictionaryMaxMemory(),
+ false))
.add(booleanProperty(
PARQUET_FAIL_WITH_CORRUPTED_STATISTICS,
"Parquet: Fail when scanning Parquet files with corrupted statistics",
@@ -70,6 +195,93 @@ public List> getSessionProperties()
return sessionProperties;
}
+ public static boolean isOrcBloomFiltersEnabled(ConnectorSession session)
+ {
+ return session.getProperty(ORC_BLOOM_FILTERS_ENABLED, Boolean.class);
+ }
+
+ public static DataSize getOrcMaxMergeDistance(ConnectorSession session)
+ {
+ return session.getProperty(ORC_MAX_MERGE_DISTANCE, DataSize.class);
+ }
+
+ public static DataSize getOrcMaxBufferSize(ConnectorSession session)
+ {
+ return session.getProperty(ORC_MAX_BUFFER_SIZE, DataSize.class);
+ }
+
+ public static DataSize getOrcStreamBufferSize(ConnectorSession session)
+ {
+ return session.getProperty(ORC_STREAM_BUFFER_SIZE, DataSize.class);
+ }
+
+ public static DataSize getOrcTinyStripeThreshold(ConnectorSession session)
+ {
+ return session.getProperty(ORC_TINY_STRIPE_THRESHOLD, DataSize.class);
+ }
+
+ public static DataSize getOrcMaxReadBlockSize(ConnectorSession session)
+ {
+ return session.getProperty(ORC_MAX_READ_BLOCK_SIZE, DataSize.class);
+ }
+
+ public static boolean getOrcLazyReadSmallRanges(ConnectorSession session)
+ {
+ return session.getProperty(ORC_LAZY_READ_SMALL_RANGES, Boolean.class);
+ }
+
+ public static boolean isOrcNestedLazy(ConnectorSession session)
+ {
+ return session.getProperty(ORC_NESTED_LAZY_ENABLED, Boolean.class);
+ }
+
+ public static DataSize getOrcStringStatisticsLimit(ConnectorSession session)
+ {
+ return session.getProperty(ORC_STRING_STATISTICS_LIMIT, DataSize.class);
+ }
+
+ public static boolean isOrcWriterValidate(ConnectorSession session)
+ {
+ double percentage = session.getProperty(ORC_WRITER_VALIDATE_PERCENTAGE, Double.class);
+ if (percentage == 0.0) {
+ return false;
+ }
+
+ checkArgument(percentage > 0.0 && percentage <= 100.0);
+
+ return ThreadLocalRandom.current().nextDouble(100) < percentage;
+ }
+
+ public static OrcWriteValidationMode getOrcWriterValidateMode(ConnectorSession session)
+ {
+ return session.getProperty(ORC_WRITER_VALIDATE_MODE, OrcWriteValidationMode.class);
+ }
+
+ public static DataSize getOrcWriterMinStripeSize(ConnectorSession session)
+ {
+ return session.getProperty(ORC_WRITER_MIN_STRIPE_SIZE, DataSize.class);
+ }
+
+ public static DataSize getOrcWriterMaxStripeSize(ConnectorSession session)
+ {
+ return session.getProperty(ORC_WRITER_MAX_STRIPE_SIZE, DataSize.class);
+ }
+
+ public static int getOrcWriterMaxStripeRows(ConnectorSession session)
+ {
+ return session.getProperty(ORC_WRITER_MAX_STRIPE_ROWS, Integer.class);
+ }
+
+ public static DataSize getOrcWriterMaxDictionaryMemory(ConnectorSession session)
+ {
+ return session.getProperty(ORC_WRITER_MAX_DICTIONARY_MEMORY, DataSize.class);
+ }
+
+ public static HiveCompressionCodec getCompressionCodec(ConnectorSession session)
+ {
+ return session.getProperty(COMPRESSION_CODEC, HiveCompressionCodec.class);
+ }
+
public static boolean isFailOnCorruptedParquetStatistics(ConnectorSession session)
{
return session.getProperty(PARQUET_FAIL_WITH_CORRUPTED_STATISTICS, Boolean.class);
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplit.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplit.java
index a01a8fd3687a..0a9ea251d75e 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplit.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplit.java
@@ -20,6 +20,7 @@
import io.prestosql.spi.HostAddress;
import io.prestosql.spi.connector.ConnectorSplit;
import io.prestosql.spi.predicate.TupleDomain;
+import org.apache.iceberg.FileFormat;
import java.util.List;
import java.util.Map;
@@ -33,6 +34,7 @@ public class IcebergSplit
private final String path;
private final long start;
private final long length;
+ private final FileFormat fileFormat;
private final List addresses;
private final TupleDomain predicate;
private final Map partitionKeys;
@@ -42,6 +44,7 @@ public IcebergSplit(
@JsonProperty("path") String path,
@JsonProperty("start") long start,
@JsonProperty("length") long length,
+ @JsonProperty("fileFormat") FileFormat fileFormat,
@JsonProperty("addresses") List addresses,
@JsonProperty("predicate") TupleDomain predicate,
@JsonProperty("partitionKeys") Map partitionKeys)
@@ -49,6 +52,7 @@ public IcebergSplit(
this.path = requireNonNull(path, "path is null");
this.start = start;
this.length = length;
+ this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.addresses = ImmutableList.copyOf(requireNonNull(addresses, "addresses is null"));
this.predicate = requireNonNull(predicate, "predicate is null");
this.partitionKeys = ImmutableMap.copyOf(requireNonNull(partitionKeys, "partitionKeys is null"));
@@ -85,6 +89,12 @@ public long getLength()
return length;
}
+ @JsonProperty
+ public FileFormat getFileFormat()
+ {
+ return fileFormat;
+ }
+
@JsonProperty
public TupleDomain getPredicate()
{
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitSource.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitSource.java
index 148d4a660a39..164d1c7f21a2 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitSource.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergSplitSource.java
@@ -112,6 +112,7 @@ private ConnectorSplit toIcebergSplit(TupleDomain predicate
task.file().path().toString(),
task.start(),
task.length(),
+ task.file().format(),
ImmutableList.of(),
predicate,
getPartitionKeys(task));
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableProperties.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableProperties.java
index a3db5e5e2e3e..97f44f261fc1 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableProperties.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergTableProperties.java
@@ -37,14 +37,14 @@ public class IcebergTableProperties
private final List> tableProperties;
@Inject
- public IcebergTableProperties()
+ public IcebergTableProperties(IcebergConfig icebergConfig)
{
tableProperties = ImmutableList.>builder()
.add(enumProperty(
FILE_FORMAT_PROPERTY,
"File format for the table",
FileFormat.class,
- FileFormat.PARQUET,
+ icebergConfig.getFileFormat(),
false))
.add(new PropertyMetadata<>(
PARTITIONING_PROPERTY,
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionTable.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionTable.java
index 3d002e32b98c..81d7c54c47fd 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionTable.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/PartitionTable.java
@@ -50,7 +50,6 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -60,13 +59,13 @@
import java.util.stream.Collectors;
import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.prestosql.plugin.iceberg.IcebergUtil.getIdentityPartitions;
import static io.prestosql.plugin.iceberg.IcebergUtil.getTableScan;
import static io.prestosql.plugin.iceberg.TypeConverter.toPrestoType;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.DateTimeEncoding.packDateTimeWithZone;
import static java.util.Objects.requireNonNull;
-import static java.util.stream.Collectors.toCollection;
import static java.util.stream.Collectors.toSet;
public class PartitionTable
@@ -227,6 +226,10 @@ private RecordCursor buildRecordCursor(Map partiti
// add column level metrics
for (int i = 0; i < columnMetricTypes.size(); i++) {
+ if (!partition.hasValidColumnMetrics()) {
+ row.add(null);
+ continue;
+ }
Integer fieldId = nonPartitionPrimitiveColumns.get(i).fieldId();
Type.PrimitiveType type = idToTypeMapping.get(fieldId);
Object min = convert(partition.getMinValues().get(fieldId), type);
@@ -267,6 +270,9 @@ private static Block getColumnMetricBlock(RowType columnMetricType, Object min,
private Map toMap(Map idToMetricMap)
{
+ if (idToMetricMap == null) {
+ return null;
+ }
ImmutableMap.Builder map = ImmutableMap.builder();
idToMetricMap.forEach((id, value) -> {
Type.PrimitiveType type = idToTypeMapping.get(id);
@@ -311,6 +317,7 @@ private class Partition
private final Map maxValues;
private final Map nullCounts;
private final Set corruptedStats;
+ private boolean hasValidColumnMetrics;
public Partition(
StructLike values,
@@ -324,14 +331,23 @@ public Partition(
this.recordCount = recordCount;
this.fileCount = 1;
this.size = size;
- this.minValues = new HashMap<>(requireNonNull(minValues, "minValues is null"));
- this.maxValues = new HashMap<>(requireNonNull(maxValues, "maxValues is null"));
- // we are assuming if minValues is not present, max will be not be present either.
- this.corruptedStats = nonPartitionPrimitiveColumns.stream()
- .map(Types.NestedField::fieldId)
- .filter(id -> !minValues.containsKey(id) && (!nullCounts.containsKey(id) || nullCounts.get(id) != recordCount))
- .collect(toCollection(HashSet::new));
- this.nullCounts = new HashMap<>(nullCounts);
+ if (minValues == null || maxValues == null || nullCounts == null) {
+ this.minValues = null;
+ this.maxValues = null;
+ this.nullCounts = null;
+ corruptedStats = null;
+ }
+ else {
+ this.minValues = new HashMap<>(minValues);
+ this.maxValues = new HashMap<>(maxValues);
+ // we are assuming if minValues is not present, max will be not be present either.
+ this.corruptedStats = nonPartitionPrimitiveColumns.stream()
+ .map(Types.NestedField::fieldId)
+ .filter(id -> !minValues.containsKey(id) && (!nullCounts.containsKey(id) || nullCounts.get(id) != recordCount))
+ .collect(toImmutableSet());
+ this.nullCounts = new HashMap<>(nullCounts);
+ hasValidColumnMetrics = true;
+ }
}
public StructLike getValues()
@@ -369,6 +385,11 @@ public Map getNullCounts()
return nullCounts;
}
+ public boolean hasValidColumnMetrics()
+ {
+ return hasValidColumnMetrics;
+ }
+
public void incrementRecordCount(long count)
{
this.recordCount += count;
@@ -403,6 +424,13 @@ public void updateMax(Map upperBounds, Map nullC
private void updateStats(Map current, Map newStat, Map nullCounts, long recordCount, Predicate predicate)
{
+ if (!hasValidColumnMetrics) {
+ return;
+ }
+ if (newStat == null || nullCounts == null) {
+ hasValidColumnMetrics = false;
+ return;
+ }
for (Types.NestedField column : nonPartitionPrimitiveColumns) {
int id = column.fieldId();
@@ -433,6 +461,13 @@ private void updateStats(Map current, Map newS
public void updateNullCount(Map nullCounts)
{
+ if (!hasValidColumnMetrics) {
+ return;
+ }
+ if (nullCounts == null) {
+ hasValidColumnMetrics = false;
+ return;
+ }
nullCounts.forEach((key, counts) ->
this.nullCounts.merge(key, counts, Long::sum));
}
diff --git a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TypeConverter.java b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TypeConverter.java
index 83feed34fbde..a8c0389066be 100644
--- a/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TypeConverter.java
+++ b/presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/TypeConverter.java
@@ -15,6 +15,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import io.prestosql.orc.metadata.ColumnMetadata;
+import io.prestosql.orc.metadata.OrcColumnId;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.plugin.hive.HiveType;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.type.ArrayType;
@@ -43,6 +46,7 @@
import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import java.util.ArrayList;
@@ -87,6 +91,9 @@
public final class TypeConverter
{
+ public static final String ORC_ICEBERG_ID_KEY = "iceberg.id";
+ public static final String ORC_ICEBERG_REQUIRED_KEY = "iceberg.required";
+
private static final Map, org.apache.iceberg.types.Type> PRESTO_TO_ICEBERG = ImmutableMap., org.apache.iceberg.types.Type>builder()
.put(BooleanType.class, Types.BooleanType.get())
.put(VarbinaryType.class, Types.BinaryType.get())
@@ -292,4 +299,134 @@ private static TypeInfo toHiveTypeInfo(Type type)
}
throw new PrestoException(NOT_SUPPORTED, format("Unsupported Hive type: %s", type));
}
+
+ public static ColumnMetadata toOrcType(Schema schema)
+ {
+ return new ColumnMetadata<>(toOrcStructType(0, schema.asStruct(), ImmutableMap.of()));
+ }
+
+ private static List toOrcType(int nextFieldTypeIndex, org.apache.iceberg.types.Type type, Map attributes)
+ {
+ switch (type.typeId()) {
+ case BOOLEAN:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.BOOLEAN, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes));
+ case INTEGER:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.INT, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes));
+ case LONG:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.LONG, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes));
+ case FLOAT:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.FLOAT, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes));
+ case DOUBLE:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.DOUBLE, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes));
+ case DATE:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.DATE, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes));
+ case TIME:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.INT, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes));
+ case TIMESTAMP:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.TIMESTAMP, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes));
+ case STRING:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.STRING, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes));
+ case UUID:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.BINARY, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes));
+ case FIXED:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.BINARY, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes));
+ case BINARY:
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.BINARY, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), attributes));
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) type;
+ return ImmutableList.of(new OrcType(OrcType.OrcTypeKind.DECIMAL, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.of(decimalType.getPrecision()), Optional.of(decimalType.getScale()), attributes));
+ case STRUCT:
+ return toOrcStructType(nextFieldTypeIndex, (Types.StructType) type, attributes);
+ case LIST:
+ return toOrcListType(nextFieldTypeIndex, (Types.ListType) type, attributes);
+ case MAP:
+ return toOrcMapType(nextFieldTypeIndex, (Types.MapType) type, attributes);
+ default:
+ throw new PrestoException(NOT_SUPPORTED, "Unsupported Iceberg type: " + type);
+ }
+ }
+
+ private static List toOrcStructType(int nextFieldTypeIndex, Types.StructType structType, Map attributes)
+ {
+ nextFieldTypeIndex++;
+ List fieldTypeIndexes = new ArrayList<>();
+ List fieldNames = new ArrayList<>();
+ List> fieldTypesList = new ArrayList<>();
+ for (Types.NestedField field : structType.fields()) {
+ fieldTypeIndexes.add(new OrcColumnId(nextFieldTypeIndex));
+ fieldNames.add(field.name());
+ Map fieldAttributes = ImmutableMap.builder()
+ .put(ORC_ICEBERG_ID_KEY, Integer.toString(field.fieldId()))
+ .put(ORC_ICEBERG_REQUIRED_KEY, Boolean.toString(field.isRequired()))
+ .build();
+ List fieldOrcTypes = toOrcType(nextFieldTypeIndex, field.type(), fieldAttributes);
+ fieldTypesList.add(fieldOrcTypes);
+ nextFieldTypeIndex += fieldOrcTypes.size();
+ }
+
+ ImmutableList.Builder orcTypes = ImmutableList.builder();
+ orcTypes.add(new OrcType(
+ OrcType.OrcTypeKind.STRUCT,
+ fieldTypeIndexes,
+ fieldNames,
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ attributes));
+ fieldTypesList.forEach(orcTypes::addAll);
+
+ return orcTypes.build();
+ }
+
+ private static List toOrcListType(int nextFieldTypeIndex, Types.ListType listType, Map attributes)
+ {
+ nextFieldTypeIndex++;
+ Map elementAttributes = ImmutableMap.builder()
+ .put(ORC_ICEBERG_ID_KEY, Integer.toString(listType.elementId()))
+ .put(ORC_ICEBERG_REQUIRED_KEY, Boolean.toString(listType.isElementRequired()))
+ .build();
+ List itemTypes = toOrcType(nextFieldTypeIndex, listType.elementType(), elementAttributes);
+
+ List orcTypes = new ArrayList<>();
+ orcTypes.add(new OrcType(
+ OrcType.OrcTypeKind.LIST,
+ ImmutableList.of(new OrcColumnId(nextFieldTypeIndex)),
+ ImmutableList.of("item"),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ attributes));
+
+ orcTypes.addAll(itemTypes);
+ return orcTypes;
+ }
+
+ private static List toOrcMapType(int nextFieldTypeIndex, Types.MapType mapType, Map attributes)
+ {
+ nextFieldTypeIndex++;
+ Map keyAttributes = ImmutableMap.builder()
+ .put(ORC_ICEBERG_ID_KEY, Integer.toString(mapType.keyId()))
+ .put(ORC_ICEBERG_REQUIRED_KEY, Boolean.toString(true))
+ .build();
+ List keyTypes = toOrcType(nextFieldTypeIndex, mapType.keyType(), keyAttributes);
+ Map valueAttributes = ImmutableMap.builder()
+ .put(ORC_ICEBERG_ID_KEY, Integer.toString(mapType.valueId()))
+ .put(ORC_ICEBERG_REQUIRED_KEY, Boolean.toString(mapType.isValueRequired()))
+ .build();
+ List valueTypes = toOrcType(nextFieldTypeIndex + keyTypes.size(), mapType.valueType(), valueAttributes);
+
+ List orcTypes = new ArrayList<>();
+ orcTypes.add(new OrcType(
+ OrcType.OrcTypeKind.MAP,
+ ImmutableList.of(new OrcColumnId(nextFieldTypeIndex), new OrcColumnId(nextFieldTypeIndex + keyTypes.size())),
+ ImmutableList.of("key", "value"),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ attributes));
+
+ orcTypes.addAll(keyTypes);
+ orcTypes.addAll(valueTypes);
+ return orcTypes;
+ }
}
diff --git a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergConfig.java b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergConfig.java
index 50493921b5d4..0d7c374ae9d6 100644
--- a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergConfig.java
+++ b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergConfig.java
@@ -14,6 +14,7 @@
package io.prestosql.plugin.iceberg;
import com.google.common.collect.ImmutableMap;
+import io.prestosql.plugin.hive.HiveCompressionCodec;
import org.testng.annotations.Test;
import java.util.Map;
@@ -21,6 +22,9 @@
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
+import static io.prestosql.plugin.hive.HiveCompressionCodec.GZIP;
+import static io.prestosql.plugin.iceberg.IcebergFileFormat.ORC;
+import static io.prestosql.plugin.iceberg.IcebergFileFormat.PARQUET;
public class TestIcebergConfig
{
@@ -28,7 +32,9 @@ public class TestIcebergConfig
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(IcebergConfig.class)
- .setMetastoreTransactionCacheSize(1000));
+ .setMetastoreTransactionCacheSize(1000)
+ .setFileFormat(ORC)
+ .setCompressionCodec(GZIP));
}
@Test
@@ -36,10 +42,14 @@ public void testExplicitPropertyMappings()
{
Map properties = new ImmutableMap.Builder()
.put("iceberg.metastore.transaction-cache.size", "999")
+ .put("iceberg.file-format", "Parquet")
+ .put("iceberg.compression-codec", "NONE")
.build();
IcebergConfig expected = new IcebergConfig()
- .setMetastoreTransactionCacheSize(999);
+ .setMetastoreTransactionCacheSize(999)
+ .setFileFormat(PARQUET)
+ .setCompressionCodec(HiveCompressionCodec.NONE);
assertFullMapping(properties, expected);
}
diff --git a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java
index 6807f1db7027..de90ded9ae45 100644
--- a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java
+++ b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSmoke.java
@@ -183,7 +183,7 @@ private void testCreatePartitionedTableAs(Session session, FileFormat fileFormat
" order_status varchar\n" +
")\n" +
"WITH (\n" +
- " format = 'PARQUET',\n" +
+ " format = '" + fileFormat + "',\n" +
" partitioning = ARRAY['order_status','ship_priority','bucket(order_key, 9)']\n" +
")",
getSession().getCatalog().get(),
@@ -212,9 +212,40 @@ public void testColumnComments()
dropTable(session, "test_column_comments");
}
+ @Test
+ public void testSchemaEvolution()
+ {
+ // Schema evolution should be id based
+ testWithAllFileFormats(this::testSchemaEvolution);
+ }
+
+ private void testSchemaEvolution(Session session, FileFormat fileFormat)
+ {
+ assertUpdate(session, "CREATE TABLE test_schema_evolution_drop_end (col0 INTEGER, col1 INTEGER, col2 INTEGER) WITH (format = '" + fileFormat + "')");
+ assertUpdate(session, "INSERT INTO test_schema_evolution_drop_end VALUES (0, 1, 2)", 1);
+ assertQuery(session, "SELECT * FROM test_schema_evolution_drop_end", "VALUES(0, 1, 2)");
+ assertUpdate(session, "ALTER TABLE test_schema_evolution_drop_end DROP COLUMN col2");
+ assertQuery(session, "SELECT * FROM test_schema_evolution_drop_end", "VALUES(0, 1)");
+ assertUpdate(session, "ALTER TABLE test_schema_evolution_drop_end ADD COLUMN col2 INTEGER");
+ assertUpdate(session, "INSERT INTO test_schema_evolution_drop_end VALUES (3, 4, 5)", 1);
+ assertQuery(session, "SELECT * FROM test_schema_evolution_drop_end", "VALUES(0, 1, NULL), (3, 4, 5)");
+ dropTable(session, "test_schema_evolution_drop_end");
+
+ assertUpdate(session, "CREATE TABLE test_schema_evolution_drop_middle (col0 INTEGER, col1 INTEGER, col2 INTEGER) WITH (format = '" + fileFormat + "')");
+ assertUpdate(session, "INSERT INTO test_schema_evolution_drop_middle VALUES (0, 1, 2)", 1);
+ assertQuery(session, "SELECT * FROM test_schema_evolution_drop_middle", "VALUES(0, 1, 2)");
+ assertUpdate(session, "ALTER TABLE test_schema_evolution_drop_middle DROP COLUMN col1");
+ assertQuery(session, "SELECT * FROM test_schema_evolution_drop_middle", "VALUES(0, 2)");
+ assertUpdate(session, "ALTER TABLE test_schema_evolution_drop_middle ADD COLUMN col1 INTEGER");
+ assertUpdate(session, "INSERT INTO test_schema_evolution_drop_middle VALUES (3, 4, 5)", 1);
+ assertQuery(session, "SELECT * FROM test_schema_evolution_drop_middle", "VALUES(0, 2, NULL), (3, 4, 5)");
+ dropTable(session, "test_schema_evolution_drop_middle");
+ }
+
private void testWithAllFileFormats(BiConsumer test)
{
test.accept(getSession(), FileFormat.PARQUET);
+ test.accept(getSession(), FileFormat.ORC);
}
private void dropTable(Session session, String table)
diff --git a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSystemTables.java b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSystemTables.java
index 21442b3f0c59..9d31056d4b43 100644
--- a/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSystemTables.java
+++ b/presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/TestIcebergSystemTables.java
@@ -71,7 +71,8 @@ protected DistributedQueryRunner createQueryRunner()
public void setUp()
{
assertUpdate("CREATE SCHEMA test_schema");
- assertUpdate("CREATE TABLE test_schema.test_table (_bigint BIGINT, _date DATE) WITH (partitioning = ARRAY['_date'])");
+ // "$partitions" tables with ORC file format are not fully supported yet. So we use Parquet here for testing
+ assertUpdate("CREATE TABLE test_schema.test_table (_bigint BIGINT, _date DATE) WITH (partitioning = ARRAY['_date'], format = 'Parquet')");
assertUpdate("INSERT INTO test_schema.test_table VALUES (0, CAST('2019-09-08' AS DATE)), (1, CAST('2019-09-09' AS DATE)), (2, CAST('2019-09-09' AS DATE))", 3);
assertUpdate("INSERT INTO test_schema.test_table VALUES (3, CAST('2019-09-09' AS DATE)), (4, CAST('2019-09-10' AS DATE)), (5, CAST('2019-09-10' AS DATE))", 3);
assertQuery("SELECT count(*) FROM test_schema.test_table", "VALUES 6");
diff --git a/presto-orc/src/main/java/io/prestosql/orc/OrcColumn.java b/presto-orc/src/main/java/io/prestosql/orc/OrcColumn.java
index 26562fef51ed..2242caef3e5a 100644
--- a/presto-orc/src/main/java/io/prestosql/orc/OrcColumn.java
+++ b/presto-orc/src/main/java/io/prestosql/orc/OrcColumn.java
@@ -14,10 +14,12 @@
package io.prestosql.orc;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import io.prestosql.orc.metadata.OrcColumnId;
import io.prestosql.orc.metadata.OrcType.OrcTypeKind;
import java.util.List;
+import java.util.Map;
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
@@ -30,6 +32,7 @@ public final class OrcColumn
private final String columnName;
private final OrcDataSourceId orcDataSourceId;
private final List nestedColumns;
+ private final Map attributes;
public OrcColumn(
String path,
@@ -37,7 +40,8 @@ public OrcColumn(
String columnName,
OrcTypeKind columnType,
OrcDataSourceId orcDataSourceId,
- List nestedColumns)
+ List nestedColumns,
+ Map attributes)
{
this.path = requireNonNull(path, "path is null");
this.columnId = requireNonNull(columnId, "columnId is null");
@@ -45,6 +49,7 @@ public OrcColumn(
this.columnType = requireNonNull(columnType, "columnType is null");
this.orcDataSourceId = requireNonNull(orcDataSourceId, "orcDataSourceId is null");
this.nestedColumns = ImmutableList.copyOf(requireNonNull(nestedColumns, "nestedColumns is null"));
+ this.attributes = ImmutableMap.copyOf(requireNonNull(attributes, "attributes is null"));
}
public String getPath()
@@ -77,6 +82,11 @@ public List getNestedColumns()
return nestedColumns;
}
+ public Map getAttributes()
+ {
+ return attributes;
+ }
+
@Override
public String toString()
{
diff --git a/presto-orc/src/main/java/io/prestosql/orc/OrcReader.java b/presto-orc/src/main/java/io/prestosql/orc/OrcReader.java
index d6434c5434ea..b9dbaa93b4e9 100644
--- a/presto-orc/src/main/java/io/prestosql/orc/OrcReader.java
+++ b/presto-orc/src/main/java/io/prestosql/orc/OrcReader.java
@@ -320,7 +320,7 @@ else if (orcType.getOrcTypeKind() == OrcTypeKind.MAP) {
createOrcColumn(path, "key", orcType.getFieldTypeIndex(0), types, orcDataSourceId),
createOrcColumn(path, "value", orcType.getFieldTypeIndex(1), types, orcDataSourceId));
}
- return new OrcColumn(path, columnId, fieldName, orcType.getOrcTypeKind(), orcDataSourceId, nestedColumns);
+ return new OrcColumn(path, columnId, fieldName, orcType.getOrcTypeKind(), orcDataSourceId, nestedColumns, orcType.getAttributes());
}
/**
diff --git a/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java b/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java
index c7348ec62f31..4dfedc6fbccb 100644
--- a/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java
+++ b/presto-orc/src/main/java/io/prestosql/orc/OrcWriter.java
@@ -126,6 +126,7 @@ public OrcWriter(
OrcDataSink orcDataSink,
List columnNames,
List types,
+ ColumnMetadata orcTypes,
CompressionKind compression,
OrcWriterOptions options,
boolean writeLegacyVersion,
@@ -163,7 +164,7 @@ public OrcWriter(
this.stats = requireNonNull(stats, "stats is null");
requireNonNull(columnNames, "columnNames is null");
- this.orcTypes = OrcType.createRootOrcType(columnNames, types);
+ this.orcTypes = requireNonNull(orcTypes, "orcTypes is null");
recordValidation(validation -> validation.setColumnNames(columnNames));
// create column writers
diff --git a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java
index 1629ce4b4423..0e0871810575 100644
--- a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java
+++ b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataReader.java
@@ -492,7 +492,7 @@ private static OrcType toType(OrcProto.Type type)
precision = Optional.of(type.getPrecision());
scale = Optional.of(type.getScale());
}
- return new OrcType(toTypeKind(type.getKind()), toOrcColumnId(type.getSubtypesList()), type.getFieldNamesList(), length, precision, scale);
+ return new OrcType(toTypeKind(type.getKind()), toOrcColumnId(type.getSubtypesList()), type.getFieldNamesList(), length, precision, scale, toMap(type.getAttributesList()));
}
private static List toOrcColumnId(List columnIds)
@@ -553,6 +553,20 @@ private static OrcTypeKind toTypeKind(OrcProto.Type.Kind typeKind)
}
}
+ // This method assumes type attributes have no duplicate key
+ private static Map toMap(List attributes)
+ {
+ ImmutableMap.Builder results = new ImmutableMap.Builder<>();
+ if (attributes != null) {
+ for (OrcProto.StringPair attribute : attributes) {
+ if (attribute.hasKey() && attribute.hasValue()) {
+ results.put(attribute.getKey(), attribute.getValue());
+ }
+ }
+ }
+ return results.build();
+ }
+
private static StreamKind toStreamKind(OrcProto.Stream.Kind streamKind)
{
switch (streamKind) {
diff --git a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java
index d6f26a2d2acc..56f28f399a5d 100644
--- a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java
+++ b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcMetadataWriter.java
@@ -34,10 +34,12 @@
import java.io.OutputStream;
import java.time.ZoneId;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.TimeZone;
+import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.prestosql.orc.metadata.PostScript.MAGIC;
import static java.lang.Math.toIntExact;
import static java.util.stream.Collectors.toList;
@@ -151,7 +153,8 @@ private static Type toType(OrcType type)
.addAllSubtypes(type.getFieldTypeIndexes().stream()
.map(OrcColumnId::getId)
.collect(toList()))
- .addAllFieldNames(type.getFieldNames());
+ .addAllFieldNames(type.getFieldNames())
+ .addAllAttributes(toStringPairList(type.getAttributes()));
if (type.getLength().isPresent()) {
builder.setMaximumLength(type.getLength().get());
@@ -208,6 +211,16 @@ private static OrcProto.Type.Kind toTypeKind(OrcTypeKind orcTypeKind)
throw new IllegalArgumentException("Unsupported type: " + orcTypeKind);
}
+ private static List toStringPairList(Map attributes)
+ {
+ return attributes.entrySet().stream()
+ .map(entry -> OrcProto.StringPair.newBuilder()
+ .setKey(entry.getKey())
+ .setValue(entry.getValue())
+ .build())
+ .collect(toImmutableList());
+ }
+
private static OrcProto.ColumnStatistics toColumnStatistics(ColumnStatistics columnStatistics)
{
OrcProto.ColumnStatistics.Builder builder = OrcProto.ColumnStatistics.newBuilder();
diff --git a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcType.java b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcType.java
index 997da40cc99c..689751eb55e8 100644
--- a/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcType.java
+++ b/presto-orc/src/main/java/io/prestosql/orc/metadata/OrcType.java
@@ -14,6 +14,7 @@
package io.prestosql.orc.metadata;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.CharType;
@@ -26,6 +27,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import static com.google.common.base.MoreObjects.toStringHelper;
@@ -80,28 +82,29 @@ public enum OrcTypeKind
private final Optional length;
private final Optional precision;
private final Optional scale;
+ private final Map attributes;
private OrcType(OrcTypeKind orcTypeKind)
{
- this(orcTypeKind, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty());
+ this(orcTypeKind, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.empty(), Optional.empty(), ImmutableMap.of());
}
private OrcType(OrcTypeKind orcTypeKind, int length)
{
- this(orcTypeKind, ImmutableList.of(), ImmutableList.of(), Optional.of(length), Optional.empty(), Optional.empty());
+ this(orcTypeKind, ImmutableList.of(), ImmutableList.of(), Optional.of(length), Optional.empty(), Optional.empty(), ImmutableMap.of());
}
private OrcType(OrcTypeKind orcTypeKind, int precision, int scale)
{
- this(orcTypeKind, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.of(precision), Optional.of(scale));
+ this(orcTypeKind, ImmutableList.of(), ImmutableList.of(), Optional.empty(), Optional.of(precision), Optional.of(scale), ImmutableMap.of());
}
private OrcType(OrcTypeKind orcTypeKind, List fieldTypeIndexes, List fieldNames)
{
- this(orcTypeKind, fieldTypeIndexes, fieldNames, Optional.empty(), Optional.empty(), Optional.empty());
+ this(orcTypeKind, fieldTypeIndexes, fieldNames, Optional.empty(), Optional.empty(), Optional.empty(), ImmutableMap.of());
}
- public OrcType(OrcTypeKind orcTypeKind, List fieldTypeIndexes, List fieldNames, Optional length, Optional precision, Optional scale)
+ public OrcType(OrcTypeKind orcTypeKind, List fieldTypeIndexes, List fieldNames, Optional length, Optional precision, Optional scale, Map attributes)
{
this.orcTypeKind = requireNonNull(orcTypeKind, "typeKind is null");
this.fieldTypeIndexes = ImmutableList.copyOf(requireNonNull(fieldTypeIndexes, "fieldTypeIndexes is null"));
@@ -115,6 +118,7 @@ public OrcType(OrcTypeKind orcTypeKind, List fieldTypeIndexes, List
this.length = requireNonNull(length, "length is null");
this.precision = requireNonNull(precision, "precision is null");
this.scale = requireNonNull(scale, "scale cannot be null");
+ this.attributes = ImmutableMap.copyOf(requireNonNull(attributes, "attributes is null"));
}
public OrcTypeKind getOrcTypeKind()
@@ -162,6 +166,11 @@ public Optional getScale()
return scale;
}
+ public Map getAttributes()
+ {
+ return attributes;
+ }
+
@Override
public String toString()
{
diff --git a/presto-orc/src/test/java/io/prestosql/orc/OrcTester.java b/presto-orc/src/test/java/io/prestosql/orc/OrcTester.java
index 458d12dd6c4c..529ff1a3d316 100644
--- a/presto-orc/src/test/java/io/prestosql/orc/OrcTester.java
+++ b/presto-orc/src/test/java/io/prestosql/orc/OrcTester.java
@@ -24,6 +24,7 @@
import io.airlift.units.DataSize;
import io.prestosql.metadata.Metadata;
import io.prestosql.orc.metadata.CompressionKind;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.spi.Page;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
@@ -591,10 +592,14 @@ public static void writeOrcColumnPresto(File outputFile, CompressionKind compres
metadata.put("columns", "test");
metadata.put("columns.types", createSettableStructObjectInspector("test", type).getTypeName());
+ List columnNames = ImmutableList.of("test");
+ List types = ImmutableList.of(type);
+
OrcWriter writer = new OrcWriter(
new OutputStreamOrcDataSink(new FileOutputStream(outputFile)),
ImmutableList.of("test"),
- ImmutableList.of(type),
+ types,
+ OrcType.createRootOrcType(columnNames, types),
compression,
new OrcWriterOptions(),
false,
diff --git a/presto-orc/src/test/java/io/prestosql/orc/TestOrcWriter.java b/presto-orc/src/test/java/io/prestosql/orc/TestOrcWriter.java
index fbb20504a100..c073b9fd25b1 100644
--- a/presto-orc/src/test/java/io/prestosql/orc/TestOrcWriter.java
+++ b/presto-orc/src/test/java/io/prestosql/orc/TestOrcWriter.java
@@ -21,6 +21,7 @@
import io.prestosql.orc.OrcWriteValidation.OrcWriteValidationMode;
import io.prestosql.orc.metadata.Footer;
import io.prestosql.orc.metadata.OrcMetadataReader;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.orc.metadata.Stream;
import io.prestosql.orc.metadata.StripeFooter;
import io.prestosql.orc.metadata.StripeInformation;
@@ -29,11 +30,13 @@
import io.prestosql.spi.Page;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
+import io.prestosql.spi.type.Type;
import org.testng.annotations.Test;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.List;
import java.util.Optional;
import static io.airlift.testing.Assertions.assertGreaterThanOrEqual;
@@ -58,10 +61,15 @@ public void testWriteOutputStreamsInOrder()
{
for (OrcWriteValidationMode validationMode : OrcWriteValidationMode.values()) {
TempFile tempFile = new TempFile();
+
+ List columnNames = ImmutableList.of("test1", "test2", "test3", "test4", "test5");
+ List types = ImmutableList.of(VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR);
+
OrcWriter writer = new OrcWriter(
new OutputStreamOrcDataSink(new FileOutputStream(tempFile.getFile())),
ImmutableList.of("test1", "test2", "test3", "test4", "test5"),
- ImmutableList.of(VARCHAR, VARCHAR, VARCHAR, VARCHAR, VARCHAR),
+ types,
+ OrcType.createRootOrcType(columnNames, types),
NONE,
new OrcWriterOptions()
.withStripeMinSize(new DataSize(0, MEGABYTE))
diff --git a/presto-orc/src/test/java/io/prestosql/orc/TestStructColumnReader.java b/presto-orc/src/test/java/io/prestosql/orc/TestStructColumnReader.java
index 4296d9147372..f389cc5af9d7 100644
--- a/presto-orc/src/test/java/io/prestosql/orc/TestStructColumnReader.java
+++ b/presto-orc/src/test/java/io/prestosql/orc/TestStructColumnReader.java
@@ -19,6 +19,7 @@
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.prestosql.metadata.Metadata;
+import io.prestosql.orc.metadata.OrcType;
import io.prestosql.spi.Page;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
@@ -217,10 +218,13 @@ public void testExtraFieldsInWriter()
private void write(TempFile tempFile, Type writerType, List data)
throws IOException
{
+ List columnNames = ImmutableList.of(STRUCT_COL_NAME);
+ List types = ImmutableList.of(writerType);
OrcWriter writer = new OrcWriter(
new OutputStreamOrcDataSink(new FileOutputStream(tempFile.getFile())),
- ImmutableList.of(STRUCT_COL_NAME),
- ImmutableList.of(writerType),
+ columnNames,
+ types,
+ OrcType.createRootOrcType(columnNames, types),
NONE,
new OrcWriterOptions()
.withStripeMinSize(new DataSize(0, MEGABYTE))