Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.plugin.iceberg.IcebergMaterializedViewDefinition;
import io.trino.plugin.iceberg.IcebergUtil;
import io.trino.plugin.iceberg.PartitionTransforms.ColumnTransform;
import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
import io.trino.plugin.iceberg.fileio.ForwardingOutputFile;
import io.trino.spi.TrinoException;
import io.trino.spi.catalog.CatalogName;
Expand Down Expand Up @@ -336,6 +337,14 @@ protected Location createMaterializedViewStorage(
return metadataFileLocation;
}

protected void dropMaterializedViewStorage(TrinoFileSystem fileSystem, String storageMetadataLocation)
throws IOException
{
TableMetadata metadata = TableMetadataParser.read(new ForwardingFileIo(fileSystem), storageMetadataLocation);
String storageLocation = metadata.location();
fileSystem.deleteDirectory(Location.of(storageLocation));
}

protected SchemaTableName createMaterializedViewStorageTable(
ConnectorSession session,
SchemaTableName viewName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.FileIO;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -1129,12 +1130,28 @@ public void createMaterializedView(
encodeMaterializedViewData(fromConnectorMaterializedViewDefinition(definition)),
isUsingSystemSecurity ? null : session.getUser(),
createMaterializedViewProperties(session, storageMetadataLocation));
if (existing.isPresent()) {
updateTable(viewName.getSchemaName(), materializedViewTableInput);
try {
if (existing.isPresent()) {
updateTable(viewName.getSchemaName(), materializedViewTableInput);
}
else {
createTable(viewName.getSchemaName(), materializedViewTableInput);
}
}
else {
createTable(viewName.getSchemaName(), materializedViewTableInput);
catch (RuntimeException e) {
try {
dropMaterializedViewStorage(fileSystemFactory.create(session), storageMetadataLocation.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry if this is a silly question but why dropping storage table when update failed? I understand doing it when create failed, but for update im not so sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the case here is when you do a CREATE OR REPLACE MV and there is a MV with that name already. In that case we create a new storage table. If the update fails, the new storage table is cleaned up here because it is not referenced by the MV.

There is something missing here though, when you do a replace the old storage table is not cleaned up on a successful operation. Should do that here.

}
catch (Exception suppressed) {
LOG.warn(suppressed, "Failed to clean up metadata '%s' for materialized view '%s'", storageMetadataLocation, viewName);
if (e != suppressed) {
e.addSuppressed(suppressed);
}
}
throw e;
}

existing.ifPresent(existingView -> dropMaterializedViewStorage(session, existingView));
}
else {
createMaterializedViewWithStorageTable(session, viewName, definition, materializedViewProperties, existing);
Expand Down Expand Up @@ -1173,7 +1190,7 @@ private void createMaterializedViewWithStorageTable(
}
}
}
dropStorageTable(session, existing.get());
dropMaterializedViewStorage(session, existing.get());
}
else {
createTable(viewName.getSchemaName(), materializedViewTableInput);
Expand Down Expand Up @@ -1227,11 +1244,11 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN
throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + view.getDatabaseName() + "." + view.getName());
}
materializedViewCache.invalidate(viewName);
dropStorageTable(session, view);
dropMaterializedViewStorage(session, view);
deleteTable(view.getDatabaseName(), view.getName());
}

private void dropStorageTable(ConnectorSession session, com.amazonaws.services.glue.model.Table view)
private void dropMaterializedViewStorage(ConnectorSession session, com.amazonaws.services.glue.model.Table view)
{
Map<String, String> parameters = getTableParameters(view);
String storageTableName = parameters.get(STORAGE_TABLE);
Expand All @@ -1245,6 +1262,16 @@ private void dropStorageTable(ConnectorSession session, com.amazonaws.services.g
LOG.warn(e, "Failed to drop storage table '%s.%s' for materialized view '%s'", storageSchema, storageTableName, view.getName());
}
}
else {
String storageMetadataLocation = parameters.get(METADATA_LOCATION_PROP);
checkState(storageMetadataLocation != null, "Storage location missing in definition of materialized view " + view.getName());
try {
dropMaterializedViewStorage(fileSystemFactory.create(session), storageMetadataLocation);
}
catch (IOException e) {
LOG.warn(e, "Failed to delete storage table metadata '%s' for materialized view '%s'", storageMetadataLocation, view.getName());
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,12 +566,28 @@ public void createMaterializedView(
io.trino.plugin.hive.metastore.Table table = tableBuilder.build();
PrincipalPrivileges principalPrivileges = isUsingSystemSecurity ? NO_PRIVILEGES : buildInitialPrivilegeSet(session.getUser());

if (existing.isPresent()) {
metastore.replaceTable(viewName.getSchemaName(), viewName.getTableName(), table, principalPrivileges);
try {
if (existing.isPresent()) {
metastore.replaceTable(viewName.getSchemaName(), viewName.getTableName(), table, principalPrivileges);
}
else {
metastore.createTable(table, principalPrivileges);
}
}
else {
metastore.createTable(table, principalPrivileges);
catch (RuntimeException e) {
try {
dropMaterializedViewStorage(fileSystemFactory.create(session), storageMetadataLocation.toString());
}
catch (Exception suppressed) {
log.warn(suppressed, "Failed to clean up metadata '%s' for materialized view '%s'", storageMetadataLocation, viewName);
if (e != suppressed) {
e.addSuppressed(suppressed);
}
}
throw e;
}

existing.ifPresent(existingView -> dropMaterializedViewStorage(session, existingView));
}
else {
createMaterializedViewWithStorageTable(session, viewName, definition, materializedViewProperties, existing);
Expand Down Expand Up @@ -673,6 +689,13 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN
throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + viewName);
}

dropMaterializedViewStorage(session, view);
metastore.dropTable(viewName.getSchemaName(), viewName.getTableName(), true);
}

private void dropMaterializedViewStorage(ConnectorSession session, io.trino.plugin.hive.metastore.Table view)
{
SchemaTableName viewName = view.getSchemaTableName();
String storageTableName = view.getParameters().get(STORAGE_TABLE);
if (storageTableName != null) {
String storageSchema = Optional.ofNullable(view.getParameters().get(STORAGE_SCHEMA))
Expand All @@ -684,21 +707,16 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN
log.warn(e, "Failed to drop storage table '%s.%s' for materialized view '%s'", storageSchema, storageTableName, viewName);
}
}

String storageMetadataLocation = view.getParameters().get(METADATA_LOCATION_PROP);
checkState(storageMetadataLocation != null, "Storage location missing in definition of materialized view " + viewName);

TrinoFileSystem fileSystem = fileSystemFactory.create(session);
TableMetadata metadata = TableMetadataParser.read(new ForwardingFileIo(fileSystem), storageMetadataLocation);
String storageLocation = metadata.location();
try {
fileSystem.deleteDirectory(Location.of(storageLocation));
}
catch (IOException e) {
log.warn(e, "Failed to delete storage location '%s' for materialized view '%s'", storageLocation, viewName);
else {
String storageMetadataLocation = view.getParameters().get(METADATA_LOCATION_PROP);
checkState(storageMetadataLocation != null, "Storage location missing in definition of materialized view " + viewName);
try {
dropMaterializedViewStorage(fileSystemFactory.create(session), storageMetadataLocation);
}
catch (IOException e) {
log.warn(e, "Failed to delete storage table metadata '%s' for materialized view '%s'", storageMetadataLocation, viewName);
}
}

metastore.dropTable(viewName.getSchemaName(), viewName.getTableName(), true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.SystemSessionProperties.LEGACY_MATERIALIZED_VIEW_GRACE_PERIOD;
import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static io.trino.testing.MaterializedResult.DEFAULT_PRECISION;
import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.DROP_MATERIALIZED_VIEW;
import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.REFRESH_MATERIALIZED_VIEW;
Expand Down Expand Up @@ -111,14 +112,14 @@ public void testMaterializedViewsMetadata()
// test freshness update
assertQuery(
// TODO (https://github.com/trinodb/trino/issues/9039) remove redundant schema_name filter
format("SELECT freshness FROM system.metadata.materialized_views WHERE schema_name = '%s' AND name = '%s'", schemaName, materializedViewName),
format("SELECT freshness FROM system.metadata.materialized_views WHERE catalog_name = '%s' AND schema_name = '%s' AND name = '%s'", ICEBERG_CATALOG, schemaName, materializedViewName),
"VALUES 'STALE'");

computeActual(format("REFRESH MATERIALIZED VIEW %s", materializedViewName));

assertQuery(
// TODO (https://github.com/trinodb/trino/issues/9039) remove redundant schema_name filter
format("SELECT freshness FROM system.metadata.materialized_views WHERE schema_name = '%s' AND name = '%s'", schemaName, materializedViewName),
format("SELECT freshness FROM system.metadata.materialized_views WHERE catalog_name = '%s' AND schema_name = '%s' AND name = '%s'", ICEBERG_CATALOG, schemaName, materializedViewName),
"VALUES 'FRESH'");

assertUpdate("DROP TABLE small_region");
Expand Down Expand Up @@ -774,6 +775,33 @@ tswtz_9 timestamp(9) with time zone
assertThat(query(format("SELECT * FROM %s WHERE tswtz_9 < current_timestamp", materializedViewName))).succeeds();
}

@Test
public void testDropLegacyMaterializedView()
{
String schemaName = getSession().getSchema().orElseThrow();
String materializedViewName = "test_drop_legacy_materialized_view" + randomNameSuffix();
String sourceTableName = "test_source_table_for_mat_view" + randomNameSuffix();
assertUpdate(format("CREATE TABLE %s (a bigint, b bigint)", sourceTableName));
assertUpdate(format("CREATE MATERIALIZED VIEW iceberg_legacy_mv.%s.%s AS SELECT * FROM %s", schemaName, materializedViewName, sourceTableName));

try {
// Refresh with legacy enabled
assertUpdate(format("INSERT INTO %s VALUES (1, 1), (1, 4), (2, 2)", sourceTableName), 3);
assertUpdate(format("REFRESH MATERIALIZED VIEW iceberg_legacy_mv.%s.%s", schemaName, materializedViewName), 3);

// Refresh with legacy disabled
assertUpdate(format("INSERT INTO %s VALUES (10, 10), (10, 40), (20, 20)", sourceTableName), 3);
assertUpdate("REFRESH MATERIALIZED VIEW " + materializedViewName, 6);

assertQuery("SELECT * FROM " + materializedViewName, "VALUES (1, 1), (1, 4), (2, 2), (10, 10), (10, 40), (20, 20)");
assertUpdate("DROP MATERIALIZED VIEW " + materializedViewName);
}
finally {
assertUpdate("DROP TABLE " + sourceTableName);
assertUpdate(format("DROP MATERIALIZED VIEW IF EXISTS iceberg_legacy_mv.%s.%s", schemaName, materializedViewName));
}
}

protected String getColumnComment(String tableName, String columnName)
{
return (String) computeScalar("SELECT comment FROM information_schema.columns WHERE table_schema = '" + getSession().getSchema().orElseThrow() + "' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ protected QueryRunner createQueryRunner()
.setCatalog("iceberg2")
.build();

queryRunner.createCatalog("iceberg_legacy_mv", "iceberg", Map.of(
"iceberg.catalog.type", "TESTING_FILE_METASTORE",
"hive.metastore.catalog.dir", queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data").toString(),
"iceberg.hive-catalog-name", "hive",
"iceberg.materialized-views.hide-storage-table", "false"));

queryRunner.installPlugin(new MockConnectorPlugin(MockConnectorFactory.builder()
.withTableFunctions(ImmutableSet.of(new SequenceTableFunction()))
.withFunctionProvider(Optional.of(new FunctionProvider()
Expand Down Expand Up @@ -143,17 +149,17 @@ public void testMaterializedViewCreatedFromTableFunction()
assertUpdate("CREATE MATERIALIZED VIEW " + viewName + " AS SELECT * FROM TABLE(mock.system.sequence_function())");

assertFreshness(viewName, "STALE");
assertThat(computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE name = '" + viewName + "'").getOnlyValue()).isNull();
assertThat(computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE catalog_name = '" + ICEBERG_CATALOG + "' AND name = '" + viewName + "'").getOnlyValue()).isNull();
int result1 = (int) computeActual("SELECT * FROM " + viewName).getOnlyValue();

int result2 = (int) computeActual("SELECT * FROM " + viewName).getOnlyValue();
assertThat(result2).isNotEqualTo(result1); // differs because PTF sequence_function is called directly as mv is considered stale
assertFreshness(viewName, "STALE");
assertThat(computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE name = '" + viewName + "'").getOnlyValue()).isNull();
assertThat(computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE catalog_name = '" + ICEBERG_CATALOG + "' AND name = '" + viewName + "'").getOnlyValue()).isNull();

assertUpdate("REFRESH MATERIALIZED VIEW " + viewName, 1);
assertFreshness(viewName, "UNKNOWN");
ZonedDateTime lastFreshTime = (ZonedDateTime) computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE name = '" + viewName + "'").getOnlyValue();
ZonedDateTime lastFreshTime = (ZonedDateTime) computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE catalog_name = '" + ICEBERG_CATALOG + "' AND name = '" + viewName + "'").getOnlyValue();
assertThat(lastFreshTime).isNotNull();
int result3 = (int) computeActual("SELECT * FROM " + viewName).getOnlyValue();
assertThat(result3).isNotEqualTo(result2); // mv is not stale anymore so all selects until next refresh returns same result
Expand All @@ -163,7 +169,7 @@ public void testMaterializedViewCreatedFromTableFunction()
assertThat(result4).isEqualTo(result5);

assertUpdate("REFRESH MATERIALIZED VIEW " + viewName, 1);
assertThat((ZonedDateTime) computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE name = '" + viewName + "'").getOnlyValue()).isAfter(lastFreshTime);
assertThat((ZonedDateTime) computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE catalog_name = '" + ICEBERG_CATALOG + "' AND name = '" + viewName + "'").getOnlyValue()).isAfter(lastFreshTime);
assertFreshness(viewName, "UNKNOWN");
int result6 = (int) computeActual("SELECT * FROM " + viewName).getOnlyValue();
assertThat(result6).isNotEqualTo(result5);
Expand All @@ -183,19 +189,19 @@ public void testMaterializedViewCreatedFromTableFunctionAndTable()
assertThat(materializedRows.get(0).getField(1)).isEqualTo(2);
int valueFromPtf1 = (int) materializedRows.get(0).getField(0);
assertFreshness(viewName, "STALE");
assertThat(computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE name = '" + viewName + "'").getOnlyValue()).isNull();
assertThat(computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE catalog_name = '" + ICEBERG_CATALOG + "' AND name = '" + viewName + "'").getOnlyValue()).isNull();

materializedRows = computeActual("SELECT * FROM " + viewName).getMaterializedRows();
assertThat(materializedRows.size()).isEqualTo(1);
assertThat(materializedRows.get(0).getField(1)).isEqualTo(2);
int valueFromPtf2 = (int) materializedRows.get(0).getField(0);
assertThat(valueFromPtf2).isNotEqualTo(valueFromPtf1); // differs because PTF sequence_function is called directly as mv is considered stale
assertFreshness(viewName, "STALE");
assertThat(computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE name = '" + viewName + "'").getOnlyValue()).isNull();
assertThat(computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE catalog_name = '" + ICEBERG_CATALOG + "' AND name = '" + viewName + "'").getOnlyValue()).isNull();

assertUpdate("REFRESH MATERIALIZED VIEW " + viewName, 1);
assertFreshness(viewName, "UNKNOWN");
ZonedDateTime lastFreshTime = (ZonedDateTime) computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE name = '" + viewName + "'").getOnlyValue();
ZonedDateTime lastFreshTime = (ZonedDateTime) computeActual("SELECT last_fresh_time FROM system.metadata.materialized_views WHERE catalog_name = '" + ICEBERG_CATALOG + "' AND name = '" + viewName + "'").getOnlyValue();
assertThat(lastFreshTime).isNotNull();
materializedRows = computeActual("SELECT * FROM " + viewName).getMaterializedRows();
assertThat(materializedRows.size()).isEqualTo(1);
Expand Down Expand Up @@ -284,7 +290,7 @@ AS SELECT sum(value) AS s FROM iceberg2.tpch.common_base_table

private void assertFreshness(String viewName, String expected)
{
assertThat((String) computeScalar("SELECT freshness FROM system.metadata.materialized_views WHERE name = '" + viewName + "'")).isEqualTo(expected);
assertThat((String) computeScalar("SELECT freshness FROM system.metadata.materialized_views WHERE catalog_name = '" + ICEBERG_CATALOG + "' AND name = '" + viewName + "'")).isEqualTo(expected);
}

public static class SequenceTableFunction
Expand Down
Loading