Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE;
import static org.apache.iceberg.io.DeleteSchemaUtil.pathPosSchema;
import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;

public class IcebergFileWriterFactory
{
private static final Schema POSITION_DELETE_SCHEMA = pathPosSchema();
private static final MetricsConfig FULL_METRICS_CONFIG = MetricsConfig.fromProperties(ImmutableMap.of(DEFAULT_WRITE_METRICS_MODE, "full"));

private final HdfsEnvironment hdfsEnvironment;
Expand Down Expand Up @@ -129,17 +131,16 @@ public IcebergFileWriter createDataFileWriter(

public IcebergFileWriter createPositionDeleteWriter(
Path outputPath,
Schema icebergSchema,
JobConf jobConf,
ConnectorSession session,
HdfsContext hdfsContext,
IcebergFileFormat fileFormat)
{
switch (fileFormat) {
case PARQUET:
return createParquetWriter(outputPath, icebergSchema, jobConf, session, hdfsContext);
return createParquetWriter(outputPath, POSITION_DELETE_SCHEMA, jobConf, session, hdfsContext);
case ORC:
return createOrcWriter(FULL_METRICS_CONFIG, outputPath, icebergSchema, jobConf, session);
return createOrcWriter(FULL_METRICS_CONFIG, outputPath, POSITION_DELETE_SCHEMA, jobConf, session);
default:
throw new TrinoException(NOT_SUPPORTED, "File format not supported: " + fileFormat);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.iceberg.FileContent;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.LocationProvider;

import java.util.ArrayList;
Expand All @@ -50,13 +49,10 @@
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.iceberg.io.DeleteSchemaUtil.pathPosSchema;

public class IcebergPositionDeletePageSink
implements ConnectorPageSink
{
private static final Schema POSITION_DELETE_SCHEMA = pathPosSchema();

private final String dataFilePath;
private final PartitionSpec partitionSpec;
private final Optional<PartitionData> partition;
Expand Down Expand Up @@ -92,7 +88,7 @@ public IcebergPositionDeletePageSink(
.map(partitionData -> locationProvider.newDataLocation(partitionSpec, partitionData, fileName))
.orElseGet(() -> locationProvider.newDataLocation(fileName));
JobConf jobConf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, new Path(outputPath)));
this.writer = fileWriterFactory.createPositionDeleteWriter(new Path(outputPath), POSITION_DELETE_SCHEMA, jobConf, session, hdfsContext, fileFormat);
this.writer = fileWriterFactory.createPositionDeleteWriter(new Path(outputPath), jobConf, session, hdfsContext, fileFormat);
}

@Override
Expand Down