Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.trino.orc.OrcDataSink;
import io.trino.orc.OrcDataSource;
import io.trino.orc.OrcDataSourceId;
Expand Down Expand Up @@ -131,10 +132,10 @@ public IcebergFileWriter createDataFileWriter(
{
switch (fileFormat) {
case PARQUET:
// TODO use metricsConfig
return createParquetWriter(outputPath, icebergSchema, jobConf, session, hdfsContext);
// TODO use metricsConfig https://github.com/trinodb/trino/issues/9791
Comment thread
findepi marked this conversation as resolved.
return createParquetWriter(MetricsConfig.getDefault(), outputPath, icebergSchema, jobConf, session, hdfsContext);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a GH Issue to support metrics mode for Parquet? If not, can you make one

Either way, link it here

case ORC:
return createOrcWriter(metricsConfig, outputPath, icebergSchema, jobConf, session, storageProperties);
return createOrcWriter(metricsConfig, outputPath, icebergSchema, jobConf, session, storageProperties, getOrcStringStatisticsLimit(session));
default:
throw new TrinoException(NOT_SUPPORTED, "File format not supported: " + fileFormat);
}
Expand All @@ -150,15 +151,16 @@ public IcebergFileWriter createPositionDeleteWriter(
{
switch (fileFormat) {
case PARQUET:
return createParquetWriter(outputPath, POSITION_DELETE_SCHEMA, jobConf, session, hdfsContext);
return createParquetWriter(FULL_METRICS_CONFIG, outputPath, POSITION_DELETE_SCHEMA, jobConf, session, hdfsContext);
case ORC:
return createOrcWriter(FULL_METRICS_CONFIG, outputPath, POSITION_DELETE_SCHEMA, jobConf, session, storageProperties);
return createOrcWriter(FULL_METRICS_CONFIG, outputPath, POSITION_DELETE_SCHEMA, jobConf, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
default:
throw new TrinoException(NOT_SUPPORTED, "File format not supported: " + fileFormat);
}
}

private IcebergFileWriter createParquetWriter(
MetricsConfig metricsConfig,
Path outputPath,
Schema icebergSchema,
JobConf jobConf,
Expand Down Expand Up @@ -187,6 +189,7 @@ private IcebergFileWriter createParquetWriter(
.build();

return new IcebergParquetFileWriter(
metricsConfig,
hdfsEnvironment.doAs(session.getIdentity(), () -> fileSystem.create(outputPath)),
rollbackAction,
fileColumnTypes,
Expand All @@ -211,7 +214,8 @@ private IcebergFileWriter createOrcWriter(
Schema icebergSchema,
JobConf jobConf,
ConnectorSession session,
Map<String, String> storageProperties)
Map<String, String> storageProperties,
DataSize stringStatisticsLimit)
{
try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), outputPath, jobConf);
Expand Down Expand Up @@ -261,7 +265,7 @@ private IcebergFileWriter createOrcWriter(
.withStripeMaxSize(getOrcWriterMaxStripeSize(session))
.withStripeMaxRowCount(getOrcWriterMaxStripeRows(session))
.withDictionaryMaxMemory(getOrcWriterMaxDictionaryMemory(session))
.withMaxStringStatisticsLimit(getOrcStringStatisticsLimit(session)),
.withMaxStringStatisticsLimit(stringStatisticsLimit),
IntStream.range(0, fileColumnNames.size()).toArray(),
ImmutableMap.<String, String>builder()
.put(PRESTO_VERSION_NAME, nodeVersion.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ public class IcebergParquetFileWriter
extends ParquetFileWriter
implements IcebergFileWriter
{
private final MetricsConfig metricsConfig;
private final Path outputPath;
private final HdfsEnvironment hdfsEnvironment;
private final HdfsContext hdfsContext;

public IcebergParquetFileWriter(
MetricsConfig metricsConfig,
OutputStream outputStream,
Callable<Void> rollbackAction,
List<Type> fileColumnTypes,
Expand All @@ -63,6 +65,7 @@ public IcebergParquetFileWriter(
fileInputColumnIndexes,
compressionCodecName,
trinoVersion);
this.metricsConfig = requireNonNull(metricsConfig, "metricsConfig is null");
this.outputPath = requireNonNull(outputPath, "outputPath is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.hdfsContext = requireNonNull(hdfsContext, "hdfsContext is null");
Expand All @@ -71,6 +74,6 @@ public IcebergParquetFileWriter(
@Override
public Metrics getMetrics()
{
return hdfsEnvironment.doAs(hdfsContext.getIdentity(), () -> ParquetUtil.fileMetrics(new HdfsInputFile(outputPath, hdfsEnvironment, hdfsContext), MetricsConfig.getDefault()));
return hdfsEnvironment.doAs(hdfsContext.getIdentity(), () -> ParquetUtil.fileMetrics(new HdfsInputFile(outputPath, hdfsEnvironment, hdfsContext), metricsConfig));
}
}