Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ Configuration Properties

The following configuration properties are available:

====================================== ===================================================
====================================== ====================================================
Property Name Description
====================================== ===================================================
====================================== ====================================================
``hive.metastore.uri`` The URI(s) of the Hive metastore.

``iceberg.file-format`` The storage file format for Iceberg tables.
Expand All @@ -45,7 +45,9 @@ Property Name Description
``iceberg.catalog.cached-catalog-num`` The number of Iceberg catalogs to cache.

``iceberg.hadoop.config.resources`` The path(s) for Hadoop configuration resources.
====================================== ===================================================

Comment thread
NikhilCollooru marked this conversation as resolved.
Outdated
``iceberg.max-partitions-per-writer`` The maximum number of partitions handled per writer.
====================================== ====================================================

``hive.metastore.uri``
^^^^^^^^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -105,6 +107,13 @@ The path(s) for Hadoop configuration resources. Example:
This property is required if the ``iceberg.catalog.type`` is ``hadoop``.
Otherwise, it will be ignored.

``iceberg.max-partitions-per-writer``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The Maximum number of partitions handled per writer.

The default is 100.

Schema Evolution
------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class IcebergConfig
private CatalogType catalogType = HIVE;
private String catalogWarehouse;
private int catalogCacheSize = 10;
private int maxPartitionsPerWriter = 100;
private List<String> hadoopConfigResources = ImmutableList.of();

@NotNull
Expand Down Expand Up @@ -119,4 +120,18 @@ public IcebergConfig setHadoopConfigResources(String files)
}
return this;
}

@Min(1)
public int getMaxPartitionsPerWriter()
{
return maxPartitionsPerWriter;
}

@Config("iceberg.max-partitions-per-writer")
@ConfigDescription("Maximum number of partitions per writer")
public IcebergConfig setMaxPartitionsPerWriter(int maxPartitionsPerWriter)
{
this.maxPartitionsPerWriter = maxPartitionsPerWriter;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class IcebergPageSink
private static final int MAX_PAGE_POSITIONS = 4096;

@SuppressWarnings({"FieldCanBeLocal", "FieldMayBeStatic"})
private final int maxOpenWriters = 100; // TODO: make this configurable
private final int maxOpenWriters;
private final Schema outputSchema;
private final PartitionSpec partitionSpec;
private final LocationProvider locationProvider;
Expand Down Expand Up @@ -107,7 +107,8 @@ public IcebergPageSink(
List<IcebergColumnHandle> inputColumns,
JsonCodec<CommitTaskData> jsonCodec,
ConnectorSession session,
FileFormat fileFormat)
FileFormat fileFormat,
int maxOpenWriters)
{
requireNonNull(inputColumns, "inputColumns is null");
this.outputSchema = requireNonNull(outputSchema, "outputSchema is null");
Expand All @@ -120,6 +121,7 @@ public IcebergPageSink(
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
this.session = requireNonNull(session, "session is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.maxOpenWriters = maxOpenWriters;
this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(inputColumns, partitionSpec));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,22 @@ public class IcebergPageSinkProvider
private final JsonCodec<CommitTaskData> jsonCodec;
private final IcebergFileWriterFactory fileWriterFactory;
private final PageIndexerFactory pageIndexerFactory;
private final int maxOpenPartitions;

@Inject
public IcebergPageSinkProvider(
HdfsEnvironment hdfsEnvironment,
JsonCodec<CommitTaskData> jsonCodec,
IcebergFileWriterFactory fileWriterFactory,
PageIndexerFactory pageIndexerFactory)
PageIndexerFactory pageIndexerFactory,
IcebergConfig icebergConfig)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
requireNonNull(icebergConfig, "icebergConfig is null");
this.maxOpenPartitions = icebergConfig.getMaxPartitionsPerWriter();
Comment thread
NikhilCollooru marked this conversation as resolved.
Outdated
}

@Override
Expand Down Expand Up @@ -87,6 +91,7 @@ private ConnectorPageSink createPageSink(ConnectorSession session, IcebergWritab
tableHandle.getInputColumns(),
jsonCodec,
session,
tableHandle.getFileFormat());
tableHandle.getFileFormat(),
maxOpenPartitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public void testDefaults()
.setCatalogType(HIVE)
.setCatalogWarehouse(null)
.setCatalogCacheSize(10)
.setHadoopConfigResources(null));
.setHadoopConfigResources(null)
.setMaxPartitionsPerWriter(100));
}

@Test
Expand All @@ -52,6 +53,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.catalog.warehouse", "path")
.put("iceberg.catalog.cached-catalog-num", "6")
.put("iceberg.hadoop.config.resources", "/etc/hadoop/conf/core-site.xml")
.put("iceberg.max-partitions-per-writer", "222")
.build();

IcebergConfig expected = new IcebergConfig()
Expand All @@ -60,7 +62,8 @@ public void testExplicitPropertyMappings()
.setCatalogType(HADOOP)
.setCatalogWarehouse("path")
.setCatalogCacheSize(6)
.setHadoopConfigResources("/etc/hadoop/conf/core-site.xml");
.setHadoopConfigResources("/etc/hadoop/conf/core-site.xml")
.setMaxPartitionsPerWriter(222);

assertFullMapping(properties, expected);
}
Expand Down