Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,25 @@ otherwise the procedure will fail with similar message:
``Retention specified (1.00d) is shorter than the minimum retention configured in the system (7.00d)``.
The default value for this property is ``7d``.

.. _iceberg-alter-table-set-properties:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mosabua mind taking a look at the doc update?


ALTER TABLE SET PROPERTIES
^^^^^^^^^^^^^^^^^^^^^^^^^^

The connector supports modifying the properties on existing tables using
:ref:`ALTER TABLE SET PROPERTIES <alter-table-set-properties>`.

The following table properties can be updated after a table is created:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SELECT * FROM system.metadata.table_properties where catalog_name = 'iceberg';

Do we / should we have documented somewhere this statement used to retrieve the "updatable" table properties?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a separate doc page for system tables or information_schema? I couldn't find one, but I feel like that's where it should go


* ``format``
* ``format_version``

For example, to update a table from v1 of the Iceberg specification to v2:

.. code-block:: sql

ALTER TABLE table_name SET PROPERTIES format_version = 2;

.. _iceberg-type-mapping:

Type mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type;

Expand Down Expand Up @@ -146,6 +147,7 @@
import static io.trino.plugin.iceberg.ExpressionConverter.toIcebergExpression;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnHandle;
import static io.trino.plugin.iceberg.IcebergColumnHandle.pathColumnMetadata;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH;
Expand Down Expand Up @@ -190,6 +192,7 @@
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;

public class IcebergMetadata
Expand Down Expand Up @@ -1146,6 +1149,52 @@ public void renameTable(ConnectorSession session, ConnectorTableHandle tableHand
catalog.renameTable(session, ((IcebergTableHandle) tableHandle).getSchemaTableName(), newTable);
}

@Override
public void setTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle, Map<String, Optional<Object>> properties)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
BaseTable icebergTable = (BaseTable) catalog.loadTable(session, table.getSchemaTableName());

transaction = icebergTable.newTransaction();
UpdateProperties updateProperties = transaction.updateProperties();

for (Map.Entry<String, Optional<Object>> propertyEntry : properties.entrySet()) {
String trinoPropertyName = propertyEntry.getKey();
Optional<Object> propertyValue = propertyEntry.getValue();

switch (trinoPropertyName) {
case FILE_FORMAT_PROPERTY:
updateProperties.defaultFormat(((IcebergFileFormat) propertyValue.orElseThrow()).toIceberg());
break;
case FORMAT_VERSION_PROPERTY:
// UpdateProperties#commit will trigger any necessary metadata updates required for the new spec version
updateProperty(updateProperties, FORMAT_VERSION, propertyValue, formatVersion -> Integer.toString((int) formatVersion));
Comment thread
findepi marked this conversation as resolved.
Outdated
break;
default:
// TODO: Support updating partitioning https://github.com/trinodb/trino/issues/12174
throw new TrinoException(NOT_SUPPORTED, "Updating the " + trinoPropertyName + " property is not supported");
}
}

try {
updateProperties.commit();
transaction.commitTransaction();
}
catch (RuntimeException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, "Failed to commit new table properties", e);
}
}

private static void updateProperty(UpdateProperties updateProperties, String icebergPropertyName, Optional<Object> value, Function<Object, String> toIcebergString)
{
if (value.isPresent()) {
updateProperties.set(icebergPropertyName, toIcebergString.apply(value.get()));
}
else {
updateProperties.remove(icebergPropertyName);
}
}

@Override
public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnMetadata column)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.BROADCAST;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder;
import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_DELETE;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static io.trino.testing.assertions.Assert.assertEquals;
import static io.trino.testing.assertions.Assert.assertEventually;
Expand Down Expand Up @@ -3439,6 +3438,35 @@ public void testIfDeletesReturnsNumberOfRemovedRows()
assertUpdate("DELETE FROM " + tableName + " WHERE key = 'two'", 2);
}

@Test
Comment thread
alexjo2144 marked this conversation as resolved.
Outdated
public void testUpdatingFileFormat()
{
String tableName = "test_updating_file_format_" + randomTableSuffix();

assertUpdate("CREATE TABLE " + tableName + " WITH (format = 'orc') AS SELECT * FROM nation WHERE nationkey < 10", "SELECT count(*) FROM nation WHERE nationkey < 10");
Comment thread
alexjo2144 marked this conversation as resolved.
Outdated
assertQuery("SELECT value FROM \"" + tableName + "$properties\" WHERE key = 'write.format.default'", "VALUES 'ORC'");

assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format = 'parquet'");
assertQuery("SELECT value FROM \"" + tableName + "$properties\" WHERE key = 'write.format.default'", "VALUES 'PARQUET'");
assertUpdate("INSERT INTO " + tableName + " SELECT * FROM nation WHERE nationkey >= 10", "SELECT count(*) FROM nation WHERE nationkey >= 10");

assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
assertQuery("SELECT count(*) FROM \"" + tableName + "$files\" WHERE file_path LIKE '%.orc'", "VALUES 1");
assertQuery("SELECT count(*) FROM \"" + tableName + "$files\" WHERE file_path LIKE '%.parquet'", "VALUES 1");

assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testUpdatingInvalidTableProperty()
{
String tableName = "test_updating_invalid_table_property_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " (a INT, b INT)");
assertThatThrownBy(() -> query("ALTER TABLE " + tableName + " SET PROPERTIES not_a_valid_table_property = 'a value'"))
.hasMessage("Catalog 'iceberg' table property 'not_a_valid_table_property' does not exist");
assertUpdate("DROP TABLE " + tableName);
}

private Session prepareCleanUpSession()
{
return Session.builder(getSession())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static io.trino.tpch.TpchTable.NATION;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;

public class TestIcebergV2
extends AbstractTestQueryFramework
Expand Down Expand Up @@ -172,6 +174,39 @@ public void testV2TableWithEqualityDelete()
assertQuery("SELECT nationkey FROM " + tableName, "SELECT nationkey FROM nation WHERE regionkey != 1");
}

@Test
public void testUpgradeTableToV2FromTrino()
{
String tableName = "test_upgrade_table_to_v2_from_trino_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 1) AS SELECT * FROM tpch.tiny.nation", 25);
assertEquals(loadTable(tableName).operations().current().formatVersion(), 1);
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 2");
assertEquals(loadTable(tableName).operations().current().formatVersion(), 2);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please verify via

assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");

that the change of the format_version has no negative outcome for the end user?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

And no, I don't think it does

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just thinking that it would be good to have it in case of eventual regressions.

assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
}

@Test
Comment thread
alexjo2144 marked this conversation as resolved.
Outdated
public void testDowngradingV2TableToV1Fails()
{
String tableName = "test_downgrading_v2_table_to_v1_fails_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25);
assertEquals(loadTable(tableName).operations().current().formatVersion(), 2);
assertThatThrownBy(() -> query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 1"))
.hasMessage("Failed to commit new table properties")
.getRootCause()
.hasMessage("Cannot downgrade v2 table to v1");
}

@Test
public void testUpgradingToInvalidVersionFails()
{
String tableName = "test_upgrading_to_invalid_version_fails_" + randomTableSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25);
assertEquals(loadTable(tableName).operations().current().formatVersion(), 2);
assertThatThrownBy(() -> query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 42"))
.hasMessage("Unable to set catalog 'iceberg' table property 'format_version' to [42]: format_version must be between 1 and 2");
}

private void writeEqualityDeleteToNationTable(Table icebergTable)
throws Exception
{
Expand Down