diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java index f3537f76d6ea..85534a42f07e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java @@ -16,6 +16,7 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.units.DataSize; import io.trino.orc.OrcDataSink; import io.trino.orc.OrcDataSource; import io.trino.orc.OrcDataSourceId; @@ -131,10 +132,10 @@ public IcebergFileWriter createDataFileWriter( { switch (fileFormat) { case PARQUET: - // TODO use metricsConfig - return createParquetWriter(outputPath, icebergSchema, jobConf, session, hdfsContext); + // TODO use metricsConfig https://github.com/trinodb/trino/issues/9791 + return createParquetWriter(MetricsConfig.getDefault(), outputPath, icebergSchema, jobConf, session, hdfsContext); case ORC: - return createOrcWriter(metricsConfig, outputPath, icebergSchema, jobConf, session, storageProperties); + return createOrcWriter(metricsConfig, outputPath, icebergSchema, jobConf, session, storageProperties, getOrcStringStatisticsLimit(session)); default: throw new TrinoException(NOT_SUPPORTED, "File format not supported: " + fileFormat); } @@ -150,15 +151,16 @@ public IcebergFileWriter createPositionDeleteWriter( { switch (fileFormat) { case PARQUET: - return createParquetWriter(outputPath, POSITION_DELETE_SCHEMA, jobConf, session, hdfsContext); + return createParquetWriter(FULL_METRICS_CONFIG, outputPath, POSITION_DELETE_SCHEMA, jobConf, session, hdfsContext); case ORC: - return createOrcWriter(FULL_METRICS_CONFIG, outputPath, POSITION_DELETE_SCHEMA, jobConf, session, storageProperties); + return createOrcWriter(FULL_METRICS_CONFIG, outputPath, POSITION_DELETE_SCHEMA, jobConf, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE)); default: throw new TrinoException(NOT_SUPPORTED, "File format not supported: " + fileFormat); } } private IcebergFileWriter createParquetWriter( + MetricsConfig metricsConfig, Path outputPath, Schema icebergSchema, JobConf jobConf, @@ -187,6 +189,7 @@ private IcebergFileWriter createParquetWriter( .build(); return new IcebergParquetFileWriter( + metricsConfig, hdfsEnvironment.doAs(session.getIdentity(), () -> fileSystem.create(outputPath)), rollbackAction, fileColumnTypes, @@ -211,7 +214,8 @@ private IcebergFileWriter createOrcWriter( Schema icebergSchema, JobConf jobConf, ConnectorSession session, - Map storageProperties) + Map storageProperties, + DataSize stringStatisticsLimit) { try { FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), outputPath, jobConf); @@ -261,7 +265,7 @@ private IcebergFileWriter createOrcWriter( .withStripeMaxSize(getOrcWriterMaxStripeSize(session)) .withStripeMaxRowCount(getOrcWriterMaxStripeRows(session)) .withDictionaryMaxMemory(getOrcWriterMaxDictionaryMemory(session)) - .withMaxStringStatisticsLimit(getOrcStringStatisticsLimit(session)), + .withMaxStringStatisticsLimit(stringStatisticsLimit), IntStream.range(0, fileColumnNames.size()).toArray(), ImmutableMap.builder() .put(PRESTO_VERSION_NAME, nodeVersion.toString()) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java index b4204c928601..dff38204f37a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java @@ -36,11 +36,13 @@ public class IcebergParquetFileWriter extends ParquetFileWriter implements IcebergFileWriter { + private final MetricsConfig metricsConfig; private final Path outputPath; private final HdfsEnvironment hdfsEnvironment; private final HdfsContext hdfsContext; public IcebergParquetFileWriter( + MetricsConfig metricsConfig, OutputStream outputStream, Callable rollbackAction, List fileColumnTypes, @@ -63,6 +65,7 @@ public IcebergParquetFileWriter( fileInputColumnIndexes, compressionCodecName, trinoVersion); + this.metricsConfig = requireNonNull(metricsConfig, "metricsConfig is null"); this.outputPath = requireNonNull(outputPath, "outputPath is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.hdfsContext = requireNonNull(hdfsContext, "hdfsContext is null"); @@ -71,6 +74,6 @@ public IcebergParquetFileWriter( @Override public Metrics getMetrics() { - return hdfsEnvironment.doAs(hdfsContext.getIdentity(), () -> ParquetUtil.fileMetrics(new HdfsInputFile(outputPath, hdfsEnvironment, hdfsContext), MetricsConfig.getDefault())); + return hdfsEnvironment.doAs(hdfsContext.getIdentity(), () -> ParquetUtil.fileMetrics(new HdfsInputFile(outputPath, hdfsEnvironment, hdfsContext), metricsConfig)); } }