Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -219,23 +219,35 @@ connector.
- A decimal value in the range (0, 1] used as a minimum for weights assigned to each split. A low value may improve performance
on tables with small files. A higher value may improve performance for queries with highly skewed aggregations or joins.
- 0.05
* - ``parquet.optimized-writer.enabled``
- Whether the optimized writer should be used when writing Parquet files.
The equivalent catalog session property is
``parquet_optimized_writer_enabled``.
- ``true``

The following table describes :ref:`catalog session properties
<session-properties-definition>` supported by the Delta Lake connector to
configure processing of Parquet files.

.. list-table:: Parquet catalog session properties
:widths: 40, 60
:widths: 40, 60, 20
:header-rows: 1

* - Property name
- Description
- Default
* - ``parquet_optimized_writer_enabled``
- Whether the optimized writer should be used when writing Parquet files.
- ``true``
* - ``parquet_max_read_block_size``
- The maximum block size used when reading Parquet files.
- ``16MB``
* - ``parquet_writer_block_size``
- The maximum block size created by the Parquet writer.
- ``128MB``
* - ``parquet_writer_page_size``
- The maximum page size created by the Parquet writer.
- ``1MB``

.. _delta-lake-authorization:

Expand Down
8 changes: 7 additions & 1 deletion docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,15 @@ with Parquet files performed by the Hive connector.
definition. The equivalent catalog session property is
``parquet_use_column_names``.
- ``true``
* - ``parquet.optimized-writer.enabled``
- Whether the optimized writer should be used when writing Parquet files.
Set this property to ``true`` to use the optimized parquet writer by
default. The equivalent catalog session property is
``parquet_optimized_writer_enabled``.
- ``false``
* - ``parquet.optimized-writer.validation-percentage``
- Percentage of parquet files to validate after write by re-reading the whole file
when ``parquet.experimental-optimized-writer.enabled`` is set to ``true``.
when ``parquet.optimized-writer.enabled`` is set to ``true``.
The equivalent catalog session property is ``parquet_optimized_writer_validation_percentage``.
Validation can be turned off by setting this property to ``0``.
- ``5``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public final class DeltaLakeSessionProperties
private static final String PARQUET_WRITER_BLOCK_SIZE = "parquet_writer_block_size";
private static final String PARQUET_WRITER_PAGE_SIZE = "parquet_writer_page_size";
private static final String TARGET_MAX_FILE_SIZE = "target_max_file_size";
private static final String PARQUET_OPTIMIZED_WRITER_ENABLED = "experimental_parquet_optimized_writer_enabled"; // = HiveSessionProperties#PARQUET_OPTIMIZED_WRITER_ENABLED
private static final String PARQUET_OPTIMIZED_WRITER_ENABLED = "parquet_optimized_writer_enabled"; // = HiveSessionProperties#PARQUET_OPTIMIZED_WRITER_ENABLED
private static final String COMPRESSION_CODEC = "compression_codec";
// This property is not supported by Delta Lake and exists solely for technical reasons.
@Deprecated
Expand Down Expand Up @@ -115,7 +115,7 @@ public DeltaLakeSessionProperties(
false),
booleanProperty(
PARQUET_OPTIMIZED_WRITER_ENABLED,
"Experimental: Enable optimized writer",
"Enable optimized writer",
parquetWriterConfig.isParquetOptimizedWriterEnabled(),
false),
enumProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ public void testTargetMaxFileSize()
.setSystemProperty("task_writer_count", "1")
// task scale writers should be disabled since we want to write with a single task writer
.setSystemProperty("task_scale_writers_enabled", "false")
.setCatalogSessionProperty("delta_lake", "experimental_parquet_optimized_writer_enabled", "true")
.setCatalogSessionProperty("delta_lake", "parquet_optimized_writer_enabled", "true")
.build();
assertUpdate(session, createTableSql, 100000);
Set<String> initialFiles = getActiveFiles(tableName);
Expand All @@ -522,7 +522,7 @@ public void testTargetMaxFileSize()
.setSystemProperty("task_writer_count", "1")
// task scale writers should be disabled since we want to write with a single task writer
.setSystemProperty("task_scale_writers_enabled", "false")
.setCatalogSessionProperty("delta_lake", "experimental_parquet_optimized_writer_enabled", "true")
.setCatalogSessionProperty("delta_lake", "parquet_optimized_writer_enabled", "true")
.setCatalogSessionProperty("delta_lake", "target_max_file_size", maxSize.toString())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class TestDeltaLakeCreateTableStatisticsLegacyWriter
@Override
Map<String, String> additionalProperties()
{
return ImmutableMap.of("parquet.experimental-optimized-writer.enabled", "false");
return ImmutableMap.of("parquet.optimized-writer.enabled", "false");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ protected QueryRunner createDeltaLakeQueryRunner(Map<String, String> connectorPr
SCHEMA,
ImmutableMap.<String, String>builder()
.putAll(connectorProperties)
.put("parquet.experimental-optimized-writer.enabled", "false")
.put("parquet.optimized-writer.enabled", "false")
.put("delta.enable-non-concurrent-writes", "true")
.put("hive.s3.max-connections", "2")
.buildOrThrow(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public final class HiveSessionProperties
private static final String QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS = "query_partition_filter_required_schemas";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String TIMESTAMP_PRECISION = "timestamp_precision";
private static final String PARQUET_OPTIMIZED_WRITER_ENABLED = "experimental_parquet_optimized_writer_enabled";
private static final String PARQUET_OPTIMIZED_WRITER_ENABLED = "parquet_optimized_writer_enabled";
private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout";
private static final String OPTIMIZE_SYMLINK_LISTING = "optimize_symlink_listing";
private static final String HIVE_VIEWS_LEGACY_TRANSLATION = "hive_views_legacy_translation";
Expand Down Expand Up @@ -466,7 +466,7 @@ public HiveSessionProperties(
false),
booleanProperty(
PARQUET_OPTIMIZED_WRITER_ENABLED,
"Experimental: Enable optimized writer",
"Enable optimized writer",
parquetWriterConfig.isParquetOptimizedWriterEnabled(),
false),
durationProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ public boolean isParquetOptimizedWriterEnabled()
return parquetOptimizedWriterEnabled;
}

@Config("parquet.experimental-optimized-writer.enabled")
@LegacyConfig("hive.parquet.optimized-writer.enabled")
@ConfigDescription("Experimental: Enable optimized Parquet writer")
@Config("parquet.optimized-writer.enabled")
@LegacyConfig({"hive.parquet.optimized-writer.enabled", "parquet.experimental-optimized-writer.enabled"})
@ConfigDescription("Enable optimized Parquet writer")
public ParquetWriterConfig setParquetOptimizedWriterEnabled(boolean parquetOptimizedWriterEnabled)
{
this.parquetOptimizedWriterEnabled = parquetOptimizedWriterEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4941,7 +4941,7 @@ public void testParquetTimestampPredicatePushdown(HiveTimestampPrecision timesta
public void testParquetTimestampPredicatePushdownOptimizedWriter(HiveTimestampPrecision timestampPrecision, LocalDateTime value)
{
Session session = Session.builder(getSession())
.setCatalogSessionProperty("hive", "experimental_parquet_optimized_writer_enabled", "true")
.setCatalogSessionProperty("hive", "parquet_optimized_writer_enabled", "true")
.build();
doTestParquetTimestampPredicatePushdown(session, timestampPrecision, value);
}
Expand Down Expand Up @@ -5054,7 +5054,7 @@ public void testParquetDictionaryPredicatePushdownWithOptimizedWriter()
{
testParquetDictionaryPredicatePushdown(
Session.builder(getSession())
.setCatalogSessionProperty("hive", "experimental_parquet_optimized_writer_enabled", "true")
.setCatalogSessionProperty("hive", "parquet_optimized_writer_enabled", "true")
.build());
}

Expand Down Expand Up @@ -8354,7 +8354,7 @@ private static void testWithStorageFormat(TestingHiveStorageFormat storageFormat
private boolean isNativeParquetWriter(Session session, HiveStorageFormat storageFormat)
{
return storageFormat == HiveStorageFormat.PARQUET &&
"true".equals(session.getCatalogProperties("hive").get("experimental_parquet_optimized_writer_enabled"));
"true".equals(session.getCatalogProperties("hive").get("parquet_optimized_writer_enabled"));
}

private List<TestingHiveStorageFormat> getAllTestingHiveStorageFormat()
Expand All @@ -8370,12 +8370,12 @@ private List<TestingHiveStorageFormat> getAllTestingHiveStorageFormat()
if (hiveStorageFormat == HiveStorageFormat.PARQUET) {
formats.add(new TestingHiveStorageFormat(
Session.builder(session)
.setCatalogSessionProperty(catalog, "experimental_parquet_optimized_writer_enabled", "false")
.setCatalogSessionProperty(catalog, "parquet_optimized_writer_enabled", "false")
.build(),
hiveStorageFormat));
formats.add(new TestingHiveStorageFormat(
Session.builder(session)
.setCatalogSessionProperty(catalog, "experimental_parquet_optimized_writer_enabled", "true")
.setCatalogSessionProperty(catalog, "parquet_optimized_writer_enabled", "true")
.build(),
hiveStorageFormat));
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,13 @@ public void testLegacyProperties()
assertDeprecatedEquivalence(
ParquetWriterConfig.class,
Map.of(
"parquet.experimental-optimized-writer.enabled", "true",
"parquet.optimized-writer.enabled", "true",
"parquet.writer.block-size", "2PB",
"parquet.writer.page-size", "3PB"),
Map.of(
"parquet.experimental-optimized-writer.enabled", "true",
"hive.parquet.writer.block-size", "2PB",
"hive.parquet.writer.page-size", "3PB"),
Map.of(
"hive.parquet.optimized-writer.enabled", "true",
"hive.parquet.writer.block-size", "2PB",
Expand All @@ -58,7 +62,7 @@ public void testLegacyProperties()
public void testExplicitPropertyMappings()
{
Map<String, String> properties = Map.of(
"parquet.experimental-optimized-writer.enabled", "true",
"parquet.optimized-writer.enabled", "true",
"parquet.writer.block-size", "234MB",
"parquet.writer.page-size", "11MB",
"parquet.writer.batch-size", "100",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ private void testCompression(boolean optimizedWriter, String compressionCodec)
"AS TABLE tpch.tiny.nation WITH NO DATA");

try {
onTrino().executeQuery("SET SESSION delta.experimental_parquet_optimized_writer_enabled = " + optimizedWriter);
onTrino().executeQuery("SET SESSION delta.parquet_optimized_writer_enabled = " + optimizedWriter);
onTrino().executeQuery("SET SESSION delta.compression_codec = '" + compressionCodec + "'");

if (optimizedWriter && "LZ4".equals(compressionCodec)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testTimestampFieldWrittenByOptimizedParquetWriterCanBeReadByHive()
{
// only admin user is allowed to change session properties
setAdminRole(onTrino().getConnection());
setSessionProperty(onTrino().getConnection(), "hive.experimental_parquet_optimized_writer_enabled", "true");
setSessionProperty(onTrino().getConnection(), "hive.parquet_optimized_writer_enabled", "true");

String tableName = "parquet_table_timestamp_created_in_trino";
onTrino().executeQuery("DROP TABLE IF EXISTS " + tableName);
Expand Down Expand Up @@ -201,7 +201,7 @@ public void testSmallDecimalFieldWrittenByOptimizedParquetWriterCanBeReadByHive(
{
// only admin user is allowed to change session properties
setAdminRole(onTrino().getConnection());
setSessionProperty(onTrino().getConnection(), "hive.experimental_parquet_optimized_writer_enabled", "true");
setSessionProperty(onTrino().getConnection(), "hive.parquet_optimized_writer_enabled", "true");

String tableName = "parquet_table_small_decimal_created_in_trino";
onTrino().executeQuery("DROP TABLE IF EXISTS " + tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private void testSnappyCompressedParquetTableCreatedInTrino(boolean optimizedPar

String catalog = (String) getOnlyElement(getOnlyElement(onTrino().executeQuery("SELECT CURRENT_CATALOG").rows()));
onTrino().executeQuery("SET SESSION " + catalog + ".compression_codec = 'SNAPPY'");
onTrino().executeQuery("SET SESSION " + catalog + ".experimental_parquet_optimized_writer_enabled = " + optimizedParquetWriter);
onTrino().executeQuery("SET SESSION " + catalog + ".parquet_optimized_writer_enabled = " + optimizedParquetWriter);
onTrino().executeQuery(format("INSERT INTO %s VALUES(1, 'test data')", tableName));

assertThat(onTrino().executeQuery("SELECT * FROM " + tableName)).containsExactlyInOrder(row(1, "test data"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public void testReadTrinoCreatedParquetTable()
@Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS})
public void testReadTrinoCreatedParquetTableWithNativeWriter()
{
onTrino().executeQuery("SET SESSION " + TRINO_CATALOG + ".experimental_parquet_optimized_writer_enabled = true");
onTrino().executeQuery("SET SESSION " + TRINO_CATALOG + ".parquet_optimized_writer_enabled = true");
testReadTrinoCreatedTable("using_native_parquet", "PARQUET");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public static StorageFormat[] storageFormatsWithConfiguration()
return new StorageFormat[] {
storageFormat("ORC", ImmutableMap.of("hive.orc_optimized_writer_validate", "true")),
storageFormat("PARQUET"),
storageFormat("PARQUET", ImmutableMap.of("hive.experimental_parquet_optimized_writer_enabled", "true")),
storageFormat("PARQUET", ImmutableMap.of("hive.parquet_optimized_writer_enabled", "true")),
storageFormat("RCBINARY", ImmutableMap.of("hive.rcfile_optimized_writer_validate", "true")),
storageFormat("RCTEXT", ImmutableMap.of("hive.rcfile_optimized_writer_validate", "true")),
storageFormat("SEQUENCEFILE"),
Expand Down Expand Up @@ -792,7 +792,7 @@ public void testLargeParquetInsertWithNativeWriter()
runLargeInsert(storageFormat(
"PARQUET",
ImmutableMap.of(
"hive.experimental_parquet_optimized_writer_enabled", "true",
"hive.parquet_optimized_writer_enabled", "true",
"hive.parquet_writer_page_size", reducedRowGroupSize.toBytesValueString(),
"task_scale_writers_enabled", "false",
"task_writer_count", "1")));
Expand Down