Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@
<dependency>
<groupId>io.prestosql.orc</groupId>
<artifactId>orc-protobuf</artifactId>
<version>9</version>
<version>10</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import java.util.Optional;

public interface HiveFileWriter
public interface FileWriter
{
long getWrittenBytes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public interface HiveFileWriterFactory
{
Optional<HiveFileWriter> createFileWriter(
Optional<FileWriter> createFileWriter(
Path path,
List<String> inputColumnNames,
StorageFormat storageFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

public class HiveWriter
{
private final HiveFileWriter fileWriter;
private final FileWriter fileWriter;
private final Optional<String> partitionName;
private final UpdateMode updateMode;
private final String fileName;
Expand All @@ -38,7 +38,7 @@ public class HiveWriter
private long inputSizeInBytes;

public HiveWriter(
HiveFileWriter fileWriter,
FileWriter fileWriter,
Optional<String> partitionName,
UpdateMode updateMode,
String fileName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,9 @@ else if (insertExistingPartitionsBehavior == InsertExistingPartitionsBehavior.ER

Path path = new Path(writeInfo.getWritePath(), fileNameWithExtension);

HiveFileWriter hiveFileWriter = null;
FileWriter hiveFileWriter = null;
for (HiveFileWriterFactory fileWriterFactory : fileWriterFactories) {
Optional<HiveFileWriter> fileWriter = fileWriterFactory.createFileWriter(
Optional<FileWriter> fileWriter = fileWriterFactory.createFileWriter(
path,
dataColumns.stream()
.map(DataColumn::getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import static java.util.Objects.requireNonNull;

public class RcFileFileWriter
implements HiveFileWriter
implements FileWriter
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(RcFileFileWriter.class).instanceSize();
private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public RcFileFileWriterFactory(
}

@Override
public Optional<HiveFileWriter> createFileWriter(
public Optional<FileWriter> createFileWriter(
Path path,
List<String> inputColumnNames,
StorageFormat storageFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardStructObjectInspector;

public class RecordFileWriter
implements HiveFileWriter
implements FileWriter
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(RecordFileWriter.class).instanceSize();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import static java.util.Objects.requireNonNull;

public class SortingFileWriter
implements HiveFileWriter
implements FileWriter
{
private static final Logger log = Logger.get(SortingFileWriter.class);

Expand All @@ -70,7 +70,7 @@ public class SortingFileWriter
private final List<Type> types;
private final List<Integer> sortFields;
private final List<SortOrder> sortOrders;
private final HiveFileWriter outputWriter;
private final FileWriter outputWriter;
private final SortBuffer sortBuffer;
private final TempFileSinkFactory tempFileSinkFactory;
private final Queue<TempFile> tempFiles = new PriorityQueue<>(comparing(TempFile::getSize));
Expand All @@ -79,7 +79,7 @@ public class SortingFileWriter
public SortingFileWriter(
FileSystem fileSystem,
Path tempFilePrefix,
HiveFileWriter outputWriter,
FileWriter outputWriter,
DataSize maxMemory,
int maxOpenTempFiles,
List<Type> types,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import io.prestosql.orc.OrcWriter;
import io.prestosql.orc.OrcWriterOptions;
import io.prestosql.orc.OrcWriterStats;
import io.prestosql.orc.metadata.ColumnMetadata;
import io.prestosql.orc.metadata.CompressionKind;
import io.prestosql.plugin.hive.HiveFileWriter;
import io.prestosql.orc.metadata.OrcType;
import io.prestosql.plugin.hive.FileWriter;
import io.prestosql.spi.Page;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.block.Block;
Expand All @@ -48,7 +50,7 @@
import static java.util.Objects.requireNonNull;

public class OrcFileWriter
implements HiveFileWriter
implements FileWriter
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(OrcFileWriter.class).instanceSize();
private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
Expand All @@ -66,6 +68,7 @@ public OrcFileWriter(
Callable<Void> rollbackAction,
List<String> columnNames,
List<Type> fileColumnTypes,
ColumnMetadata<OrcType> fileColumnOrcTypes,
CompressionKind compression,
OrcWriterOptions options,
boolean writeLegacyVersion,
Expand All @@ -82,6 +85,7 @@ public OrcFileWriter(
orcDataSink,
columnNames,
fileColumnTypes,
fileColumnOrcTypes,
compression,
options,
writeLegacyVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
import io.prestosql.orc.OrcWriterStats;
import io.prestosql.orc.OutputStreamOrcDataSink;
import io.prestosql.orc.metadata.CompressionKind;
import io.prestosql.orc.metadata.OrcType;
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
import io.prestosql.plugin.hive.FileWriter;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveConfig;
import io.prestosql.plugin.hive.HiveFileWriter;
import io.prestosql.plugin.hive.HiveFileWriterFactory;
import io.prestosql.plugin.hive.HiveMetadata;
import io.prestosql.plugin.hive.HiveSessionProperties;
Expand Down Expand Up @@ -126,7 +127,7 @@ public OrcWriterStats getStats()
}

@Override
public Optional<HiveFileWriter> createFileWriter(
public Optional<FileWriter> createFileWriter(
Path path,
List<String> inputColumnNames,
StorageFormat storageFormat,
Expand Down Expand Up @@ -182,6 +183,7 @@ public Optional<HiveFileWriter> createFileWriter(
rollbackAction,
fileColumnNames,
fileColumnTypes,
OrcType.createRootOrcType(fileColumnNames, fileColumnTypes),
compression,
orcWriterOptions
.withStripeMinSize(getOrcOptimizedWriterMinStripeSize(session))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.prestosql.orc.OrcWriter;
import io.prestosql.orc.OrcWriterOptions;
import io.prestosql.orc.OrcWriterStats;
import io.prestosql.orc.metadata.OrcType;
import io.prestosql.spi.Page;
import io.prestosql.spi.type.Type;

Expand Down Expand Up @@ -77,6 +78,7 @@ private static OrcWriter createOrcFileWriter(OrcDataSink sink, List<Type> types)
sink,
columnNames,
types,
OrcType.createRootOrcType(columnNames, types),
LZ4,
new OrcWriterOptions()
.withMaxStringStatisticsLimit(new DataSize(0, BYTE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ public static FileSplit createTestFile(
.map(TestColumn::getType)
.collect(Collectors.joining(",")));

Optional<HiveFileWriter> fileWriter = fileWriterFactory.createFileWriter(
Optional<FileWriter> fileWriter = fileWriterFactory.createFileWriter(
new Path(filePath),
testColumns.stream()
.map(TestColumn::getName)
Expand All @@ -545,7 +545,7 @@ public static FileSplit createTestFile(
jobConf,
session);

HiveFileWriter hiveFileWriter = fileWriter.orElseThrow(() -> new IllegalArgumentException("fileWriterFactory"));
FileWriter hiveFileWriter = fileWriter.orElseThrow(() -> new IllegalArgumentException("fileWriterFactory"));
hiveFileWriter.appendRows(page);
hiveFileWriter.commit();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.prestosql.orc.OrcWriterOptions;
import io.prestosql.orc.OrcWriterStats;
import io.prestosql.orc.OutputStreamOrcDataSink;
import io.prestosql.orc.metadata.OrcType;
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveColumnHandle;
Expand Down Expand Up @@ -450,6 +451,7 @@ public PrestoOrcFormatWriter(File targetFile, List<String> columnNames, List<Typ
new OutputStreamOrcDataSink(new FileOutputStream(targetFile)),
columnNames,
types,
OrcType.createRootOrcType(columnNames, types),
compressionCodec.getOrcCompressionKind(),
new OrcWriterOptions(),
false,
Expand Down
17 changes: 17 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
<artifactId>presto-hive</artifactId>
</dependency>

<dependency>
<groupId>io.prestosql</groupId>
<artifactId>presto-orc</artifactId>
</dependency>

<dependency>
<groupId>io.prestosql</groupId>
<artifactId>presto-parquet</artifactId>
Expand Down Expand Up @@ -164,6 +169,18 @@
<version>${dep.iceberg.version}</version>
</dependency>

<dependency>
<groupId>${dep.iceberg.groupId}</groupId>
<artifactId>iceberg-orc</artifactId>
<version>${dep.iceberg.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>${dep.iceberg.groupId}</groupId>
<artifactId>iceberg-parquet</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@
package io.prestosql.plugin.iceberg;

import io.airlift.configuration.Config;
import io.prestosql.plugin.hive.HiveCompressionCodec;
import org.apache.iceberg.FileFormat;

import javax.validation.constraints.Min;

import static io.prestosql.plugin.hive.HiveCompressionCodec.GZIP;
import static io.prestosql.plugin.iceberg.IcebergFileFormat.ORC;

public class IcebergConfig
{
private long metastoreTransactionCacheSize = 1000;
private IcebergFileFormat fileFormat = ORC;
private HiveCompressionCodec compressionCodec = GZIP;

@Min(1)
public long getMetastoreTransactionCacheSize()
Expand All @@ -33,4 +40,28 @@ public IcebergConfig setMetastoreTransactionCacheSize(long metastoreTransactionC
this.metastoreTransactionCacheSize = metastoreTransactionCacheSize;
return this;
}

public FileFormat getFileFormat()
{
return FileFormat.valueOf(fileFormat.name());
}

@Config("iceberg.file-format")
public IcebergConfig setFileFormat(IcebergFileFormat fileFormat)
{
this.fileFormat = fileFormat;
return this;
}

public HiveCompressionCodec getCompressionCodec()
{
return compressionCodec;
}

@Config("iceberg.compression-codec")
public IcebergConfig setCompressionCodec(HiveCompressionCodec compressionCodec)
{
this.compressionCodec = compressionCodec;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.prestosql.spi.ErrorType;

import static io.prestosql.spi.ErrorType.EXTERNAL;
import static io.prestosql.spi.ErrorType.INTERNAL_ERROR;
import static io.prestosql.spi.ErrorType.USER_ERROR;

public enum IcebergErrorCode
Expand All @@ -30,6 +31,10 @@ public enum IcebergErrorCode
ICEBERG_BAD_DATA(4, EXTERNAL),
ICEBERG_MISSING_DATA(5, EXTERNAL),
ICEBERG_CANNOT_OPEN_SPLIT(6, EXTERNAL),
ICEBERG_WRITER_OPEN_ERROR(7, EXTERNAL),
ICEBERG_FILESYSTEM_ERROR(8, EXTERNAL),
ICEBERG_CURSOR_ERROR(9, EXTERNAL),
ICEBERG_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR),
/**/;

private final ErrorCode errorCode;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.iceberg;

public enum IcebergFileFormat
{
ORC,
PARQUET,
}
Loading