-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Support updating Iceberg table partitioning #12259
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -250,13 +250,22 @@ The following table properties can be updated after a table is created: | |
|
|
||
| * ``format`` | ||
| * ``format_version`` | ||
| * ``partitioning`` | ||
|
|
||
| For example, to update a table from v1 of the Iceberg specification to v2: | ||
|
|
||
| .. code-block:: sql | ||
|
|
||
| ALTER TABLE table_name SET PROPERTIES format_version = 2; | ||
|
|
||
| Or to set the column ``my_new_partition_column`` as a partition column on a table: | ||
|
|
||
| .. code-block:: sql | ||
|
|
||
| ALTER TABLE table_name SET PROPERTIES partitioning = ARRAY[<existing partition columns>, 'my_new_partition_column']; | ||
|
|
||
| The current values of a table's properties can be shown using :doc:`SHOW CREATE TABLE </sql/show-create-table>`. | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mosabua please review doc changes |
||
| .. _iceberg-type-mapping: | ||
|
|
||
| Type mapping | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -103,8 +103,11 @@ | |
| import org.apache.iceberg.TableProperties; | ||
| import org.apache.iceberg.TableScan; | ||
| import org.apache.iceberg.Transaction; | ||
| import org.apache.iceberg.UpdatePartitionSpec; | ||
| import org.apache.iceberg.UpdateProperties; | ||
| import org.apache.iceberg.exceptions.ValidationException; | ||
| import org.apache.iceberg.expressions.Expressions; | ||
| import org.apache.iceberg.expressions.Term; | ||
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.types.Type; | ||
| import org.apache.iceberg.types.Types; | ||
|
|
@@ -207,6 +210,7 @@ public class IcebergMetadata | |
| private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 1; | ||
| private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2; | ||
| private static final String RETENTION_THRESHOLD = "retention_threshold"; | ||
| public static final Set<String> UPDATABLE_TABLE_PROPERTIES = ImmutableSet.of(FILE_FORMAT_PROPERTY, FORMAT_VERSION_PROPERTY, PARTITIONING_PROPERTY); | ||
|
|
||
| private final TypeManager typeManager; | ||
| private final JsonCodec<CommitTaskData> commitTaskCodec; | ||
|
|
@@ -1161,44 +1165,78 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta | |
| IcebergTableHandle table = (IcebergTableHandle) tableHandle; | ||
| BaseTable icebergTable = (BaseTable) catalog.loadTable(session, table.getSchemaTableName()); | ||
|
|
||
| Set<String> unsupportedProperties = Sets.difference(properties.keySet(), UPDATABLE_TABLE_PROPERTIES); | ||
| if (!unsupportedProperties.isEmpty()) { | ||
| throw new TrinoException(NOT_SUPPORTED, "The following properties cannot be updated: " + String.join(", ", unsupportedProperties)); | ||
| } | ||
|
|
||
| transaction = icebergTable.newTransaction(); | ||
| UpdateProperties updateProperties = transaction.updateProperties(); | ||
|
|
||
| for (Map.Entry<String, Optional<Object>> propertyEntry : properties.entrySet()) { | ||
| String trinoPropertyName = propertyEntry.getKey(); | ||
| Optional<Object> propertyValue = propertyEntry.getValue(); | ||
| if (properties.containsKey(FILE_FORMAT_PROPERTY)) { | ||
| IcebergFileFormat fileFormat = (IcebergFileFormat) properties.get(FILE_FORMAT_PROPERTY) | ||
| .orElseThrow(() -> new IllegalArgumentException("The format property cannot be empty")); | ||
| updateProperties.defaultFormat(fileFormat.toIceberg()); | ||
| } | ||
|
|
||
| switch (trinoPropertyName) { | ||
| case FILE_FORMAT_PROPERTY: | ||
| updateProperties.defaultFormat(((IcebergFileFormat) propertyValue.orElseThrow()).toIceberg()); | ||
| break; | ||
| case FORMAT_VERSION_PROPERTY: | ||
| // UpdateProperties#commit will trigger any necessary metadata updates required for the new spec version | ||
| updateProperty(updateProperties, FORMAT_VERSION, propertyValue, formatVersion -> Integer.toString((int) formatVersion)); | ||
| break; | ||
| default: | ||
| // TODO: Support updating partitioning https://github.com/trinodb/trino/issues/12174 | ||
| throw new TrinoException(NOT_SUPPORTED, "Updating the " + trinoPropertyName + " property is not supported"); | ||
| } | ||
| if (properties.containsKey(FORMAT_VERSION_PROPERTY)) { | ||
| // UpdateProperties#commit will trigger any necessary metadata updates required for the new spec version | ||
| int formatVersion = (int) properties.get(FORMAT_VERSION_PROPERTY) | ||
| .orElseThrow(() -> new IllegalArgumentException("The format_version property cannot be empty")); | ||
| updateProperties.set(FORMAT_VERSION, Integer.toString((int) formatVersion)); | ||
| } | ||
|
|
||
| try { | ||
| updateProperties.commit(); | ||
|
alexjo2144 marked this conversation as resolved.
Outdated
|
||
| } | ||
| catch (RuntimeException e) { | ||
| throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to set new property values", e); | ||
| } | ||
|
|
||
| if (properties.containsKey(PARTITIONING_PROPERTY)) { | ||
| @SuppressWarnings("unchecked") | ||
| List<String> partitionColumns = (List<String>) properties.get(PARTITIONING_PROPERTY) | ||
| .orElseThrow(() -> new IllegalArgumentException("The partitioning property cannot be empty")); | ||
| updatePartitioning(icebergTable, transaction, partitionColumns); | ||
| } | ||
|
|
||
| try { | ||
| transaction.commitTransaction(); | ||
| } | ||
| catch (RuntimeException e) { | ||
| throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit new table properties", e); | ||
| } | ||
| } | ||
|
|
||
| private static void updateProperty(UpdateProperties updateProperties, String icebergPropertyName, Optional<Object> value, Function<Object, String> toIcebergString) | ||
| private static void updatePartitioning(Table icebergTable, Transaction transaction, List<String> partitionColumns) | ||
| { | ||
| if (value.isPresent()) { | ||
| updateProperties.set(icebergPropertyName, toIcebergString.apply(value.get())); | ||
| UpdatePartitionSpec updatePartitionSpec = transaction.updateSpec(); | ||
| Set<PartitionField> existingPartitionFields = icebergTable.spec().fields().stream().collect(toImmutableSet()); | ||
| Schema schema = icebergTable.schema(); | ||
| if (partitionColumns.isEmpty()) { | ||
| existingPartitionFields.stream() | ||
| .map(partitionField -> toIcebergTerm(schema, partitionField)) | ||
| .forEach(updatePartitionSpec::removeField); | ||
|
alexjo2144 marked this conversation as resolved.
Outdated
|
||
| } | ||
| else { | ||
| updateProperties.remove(icebergPropertyName); | ||
| Set<PartitionField> partitionFields = ImmutableSet.copyOf(parsePartitionFields(schema, partitionColumns).fields()); | ||
| Sets.difference(existingPartitionFields, partitionFields).forEach(partitionField -> updatePartitionSpec.removeField(partitionField.name())); | ||
| Sets.difference(partitionFields, existingPartitionFields).stream() | ||
| .map(partitionField -> toIcebergTerm(schema, partitionField)) | ||
| .forEach(updatePartitionSpec::addField); | ||
| } | ||
|
|
||
| try { | ||
| updatePartitionSpec.commit(); | ||
| } | ||
| catch (RuntimeException e) { | ||
| throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to set new partitioning value", e); | ||
| } | ||
| } | ||
|
|
||
| private static Term toIcebergTerm(Schema schema, PartitionField partitionField) | ||
| { | ||
| return Expressions.transform(schema.findColumnName(partitionField.sourceId()), partitionField.transform()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's kind of weird that Do you think it's something we could change in Iceberg API? (that would be a follow-up for this PR, no change requested here) |
||
| } | ||
|
|
||
| @Override | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,157 @@ | ||
| /* | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.trino.plugin.iceberg; | ||
|
|
||
| import com.google.common.collect.ImmutableList; | ||
| import io.trino.testing.AbstractTestQueryFramework; | ||
| import io.trino.testing.MaterializedRow; | ||
| import io.trino.testing.QueryRunner; | ||
| import io.trino.tpch.TpchTable; | ||
| import org.testng.annotations.Test; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| import static com.google.common.collect.ImmutableList.toImmutableList; | ||
| import static io.trino.testing.sql.TestTable.randomTableSuffix; | ||
| import static java.lang.Math.toIntExact; | ||
| import static org.assertj.core.api.Assertions.assertThat; | ||
| import static org.testng.Assert.assertEquals; | ||
|
|
||
| public class TestIcebergPartitionEvolution | ||
|
alexjo2144 marked this conversation as resolved.
Outdated
|
||
| extends AbstractTestQueryFramework | ||
| { | ||
| @Override | ||
| protected QueryRunner createQueryRunner() | ||
| throws Exception | ||
| { | ||
| return IcebergQueryRunner.builder() | ||
| .setInitialTables(ImmutableList.of(TpchTable.NATION)) | ||
| .build(); | ||
| } | ||
|
|
||
| @Test | ||
| public void testRemovePartitioning() | ||
|
alexjo2144 marked this conversation as resolved.
Outdated
|
||
| { | ||
| String tableName = "test_remove_partition_" + randomTableSuffix(); | ||
| assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey', 'truncate(name, 1)']) AS SELECT * FROM nation WHERE nationkey < 10", 10); | ||
| assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY[]"); | ||
| assertUpdate("INSERT INTO " + tableName + " SELECT * FROM nation WHERE nationkey >= 10", 15); | ||
|
|
||
| List<MaterializedRow> files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows(); | ||
| List<MaterializedRow> unpartitionedFiles = files.stream() | ||
| .filter(file -> !((String) file.getField(0)).contains("regionkey=")) | ||
| .collect(toImmutableList()); | ||
|
|
||
| List<MaterializedRow> partitionedFiles = files.stream() | ||
| .filter(file -> ((String) file.getField(0)).contains("regionkey=")) | ||
| .collect(toImmutableList()); | ||
|
|
||
| int expectedFileCount = computeActual("SELECT DISTINCT regionkey, substring(name, 1, 1) FROM nation WHERE nationkey < 10").getRowCount(); | ||
| assertThat(partitionedFiles).hasSize(expectedFileCount); | ||
| assertEquals(partitionedFiles.stream().mapToLong(row -> (long) row.getField(1)).sum(), 10L); | ||
|
|
||
| assertThat(unpartitionedFiles).hasSize(1); | ||
| assertEquals((long) unpartitionedFiles.get(0).getField(1), 15); | ||
|
|
||
| assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); | ||
| // Most partitions have one record each. regionkey=2, trunc_name=I has two records, and 15 records are unpartitioned | ||
| assertQuery("SELECT record_count, count(*) FROM \"" + tableName + "$partitions\" GROUP BY record_count", "VALUES (1, 8), (2, 1), (15, 1)"); | ||
| assertUpdate("DROP TABLE " + tableName); | ||
| } | ||
|
|
||
| @Test | ||
| public void testAddPartitionColumn() | ||
| { | ||
| String tableName = "test_add_partition_column_" + randomTableSuffix(); | ||
| assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM nation WHERE nationkey < 10", 10); | ||
| assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['regionkey', 'truncate(name, 1)']"); | ||
| assertUpdate("INSERT INTO " + tableName + " SELECT * FROM nation WHERE nationkey >= 10", 15); | ||
| assertThat((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()).contains("partitioning = ARRAY['regionkey','truncate(name, 1)']"); | ||
|
|
||
| List<MaterializedRow> files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows(); | ||
| List<MaterializedRow> initialFiles = files.stream() | ||
| .filter(file -> !((String) file.getField(0)).contains("name_trunc")) | ||
| .collect(toImmutableList()); | ||
|
|
||
| List<MaterializedRow> partitionedFiles = files.stream() | ||
| .filter(file -> ((String) file.getField(0)).contains("name_trunc")) | ||
| .collect(toImmutableList()); | ||
|
|
||
| int expectedInitialFiles = toIntExact((long) computeActual("SELECT count(distinct regionkey) FROM nation WHERE nationkey < 10").getOnlyValue()); | ||
| assertThat(initialFiles).hasSize(expectedInitialFiles); | ||
| assertEquals(initialFiles.stream().mapToLong(row -> (long) row.getField(1)).sum(), 10L); | ||
|
|
||
| int expectedFinalFileCount = computeActual("SELECT DISTINCT regionkey, substring(name, 1, 1) FROM nation WHERE nationkey >= 10").getRowCount(); | ||
| assertThat(partitionedFiles).hasSize(expectedFinalFileCount); | ||
| assertEquals(partitionedFiles.stream().mapToLong(row -> (long) row.getField(1)).sum(), 15L); | ||
|
|
||
| assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); | ||
| assertUpdate("DROP TABLE " + tableName); | ||
|
|
||
| assertUpdate("CREATE TABLE " + tableName + " WITH (partitioning = ARRAY['truncate(name, 1)']) AS SELECT * FROM nation WHERE nationkey < 10", 10); | ||
| assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['truncate(name, 1)', 'regionkey']"); | ||
| assertUpdate("INSERT INTO " + tableName + " SELECT * FROM nation WHERE nationkey >= 10", 15); | ||
| assertThat((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()).contains("partitioning = ARRAY['truncate(name, 1)','regionkey']"); | ||
|
|
||
| files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows(); | ||
| initialFiles = files.stream() | ||
| .filter(file -> !((String) file.getField(0)).contains("regionkey=")) | ||
| .collect(toImmutableList()); | ||
|
|
||
| partitionedFiles = files.stream() | ||
| .filter(file -> ((String) file.getField(0)).contains("regionkey=")) | ||
| .collect(toImmutableList()); | ||
|
|
||
| expectedInitialFiles = toIntExact((long) computeActual("SELECT DISTINCT substring(name, 1, 1) FROM nation WHERE nationkey < 10").getRowCount()); | ||
| assertThat(initialFiles).hasSize(expectedInitialFiles); | ||
| assertEquals(initialFiles.stream().mapToLong(row -> (long) row.getField(1)).sum(), 10L); | ||
|
|
||
| expectedFinalFileCount = computeActual("SELECT DISTINCT regionkey, substring(name, 1, 1) FROM nation WHERE nationkey >= 10").getRowCount(); | ||
| assertThat(partitionedFiles).hasSize(expectedFinalFileCount); | ||
| assertEquals(partitionedFiles.stream().mapToLong(row -> (long) row.getField(1)).sum(), 15L); | ||
|
|
||
| assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation"); | ||
| assertUpdate("DROP TABLE " + tableName); | ||
| } | ||
|
|
||
| @Test | ||
| public void testChangePartitionTransform() | ||
| { | ||
| String tableName = "test_change_partition_transform_" + randomTableSuffix(); | ||
| assertUpdate("CREATE TABLE " + tableName + " (ts, a) WITH (partitioning = ARRAY['year(ts)']) " + | ||
| "AS VALUES (TIMESTAMP '2021-01-01 01:01:01.111111', 1), (TIMESTAMP '2022-02-02 02:02:02.222222', 2), (TIMESTAMP '2023-03-03 03:03:03.333333', 3)", 3); | ||
| assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES partitioning = ARRAY['month(ts)']"); | ||
| assertUpdate("INSERT INTO " + tableName + " VALUES (TIMESTAMP '2024-04-04 04:04:04.444444', 4), (TIMESTAMP '2025-05-05 05:05:05.555555', 5)", 2); | ||
| assertThat((String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()).contains("partitioning = ARRAY['month(ts)']"); | ||
|
|
||
| List<MaterializedRow> files = computeActual("SELECT file_path, record_count FROM \"" + tableName + "$files\"").getMaterializedRows(); | ||
| List<MaterializedRow> yearPartitionedFiles = files.stream() | ||
| .filter(file -> { | ||
| String filePath = ((String) file.getField(0)); | ||
| return filePath.contains("ts_year") && !filePath.contains("ts_month"); | ||
| }) | ||
| .collect(toImmutableList()); | ||
|
|
||
| List<MaterializedRow> monthPartitionedFiles = files.stream() | ||
| .filter(file -> { | ||
| String filePath = ((String) file.getField(0)); | ||
| return !filePath.contains("ts_year") && filePath.contains("ts_month"); | ||
| }) | ||
| .collect(toImmutableList()); | ||
|
|
||
| assertThat(yearPartitionedFiles).hasSize(3); | ||
| assertThat(monthPartitionedFiles).hasSize(2); | ||
| assertUpdate("DROP TABLE " + tableName); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.