diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/FlushMetadataCacheProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/FlushMetadataCacheProcedure.java index 43a997ac599b..22ff740a7e71 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/FlushMetadataCacheProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/FlushMetadataCacheProcedure.java @@ -23,8 +23,6 @@ import io.trino.spi.TrinoException; import io.trino.spi.classloader.ThreadContextClassLoader; import io.trino.spi.connector.ConnectorSession; -import io.trino.spi.connector.SchemaTableName; -import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.procedure.Procedure; import javax.inject.Inject; @@ -106,13 +104,16 @@ private void doFlushMetadataCache(ConnectorSession session, Optional sch } else if (schemaName.isPresent() && tableName.isPresent()) { HiveMetastore metastore = metastoreFactory.createMetastore(Optional.of(session.getIdentity())); - Table table = metastore.getTable(schemaName.get(), tableName.get()) - .orElseThrow(() -> new TableNotFoundException(new SchemaTableName(schemaName.get(), tableName.get()))); - verifyDeltaLakeTable(table); - cachingHiveMetastore.ifPresent(caching -> caching.invalidateTable(table.getDatabaseName(), table.getTableName())); - String tableLocation = getTableLocation(table); - transactionLogAccess.invalidateCaches(tableLocation); - extendedStatisticsAccess.invalidateCache(tableLocation); + // This may insert into a cache, but this will get invalidated below. TODO fix Delta so that flush_metadata_cache doesn't have to read from metastore + Optional tableBeforeFlush = metastore.getTable(schemaName.get(), tableName.get()); + cachingHiveMetastore.ifPresent(caching -> caching.invalidateTable(schemaName.get(), tableName.get())); + + Optional tableLocation = tableBeforeFlush.map(table -> { + verifyDeltaLakeTable(table); + return getTableLocation(table); + }); + tableLocation.ifPresent(transactionLogAccess::invalidateCaches); + tableLocation.ifPresent(extendedStatisticsAccess::invalidateCache); } else { throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, "Illegal parameter set passed"); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFlushMetadataCacheProcedure.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFlushMetadataCacheProcedure.java index 57bcb27e2cb6..7ff02d4b5723 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFlushMetadataCacheProcedure.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFlushMetadataCacheProcedure.java @@ -28,6 +28,8 @@ import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.createS3DeltaLakeQueryRunner; import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; import static io.trino.plugin.hive.containers.HiveHadoop.HIVE3_IMAGE; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static org.assertj.core.api.Assertions.assertThat; public class TestDeltaLakeFlushMetadataCacheProcedure extends AbstractTestQueryFramework @@ -105,10 +107,34 @@ public void testFlushMetadataCache() } @Test - public void testFlushMetadataCacheTableNotFound() + public void testFlushMetadataCacheAfterTableCreated() { - assertQueryFails( - "CALL system.flush_metadata_cache(schema_name => 'test_not_existing_schema', table_name => 'test_not_existing_table')", - "Table 'test_not_existing_schema.test_not_existing_table' not found"); + String schema = getSession().getSchema().orElseThrow(); + String tableName = "flush_metadata_after_table_created"; + String intermediateTableName = "test_flush_intermediate_" + randomNameSuffix(); + + String location = "s3://%s/%s".formatted(BUCKET_NAME, intermediateTableName); + assertUpdate("CREATE TABLE " + intermediateTableName + " WITH (location = '" + location + "') AS TABLE tpch.tiny.region", 5); + + // This may cause the connector to cache the fact that the table does not exist + assertQueryFails("TABLE " + tableName, "\\Qline 1:1: Table 'delta_lake.default.flush_metadata_after_table_created' does not exist"); + + metastore.renameTable(schema, intermediateTableName, schema, tableName); + + // Verify cached state (we currently cache missing objects in CachingMetastore) + assertQueryFails("TABLE " + tableName, "\\Qline 1:1: Table 'delta_lake.default.flush_metadata_after_table_created' does not exist"); + + assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => '" + tableName + "')"); + assertThat(query("TABLE " + tableName)) + .skippingTypesCheck() // Delta has no parametric varchar + .matches("TABLE tpch.tiny.region"); + + assertUpdate("DROP TABLE flush_metadata_after_table_created"); + } + + @Test + public void testFlushMetadataCacheNonExistentTable() + { + assertUpdate("CALL system.flush_metadata_cache(schema_name => 'test_not_existing_schema', table_name => 'test_not_existing_table')"); } } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/QueryAssertions.java b/testing/trino-testing/src/main/java/io/trino/testing/QueryAssertions.java index 5593921163f8..57ca5f2f72f2 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/QueryAssertions.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/QueryAssertions.java @@ -507,9 +507,9 @@ public static void copyTable(QueryRunner queryRunner, QualifiedObjectName table, long rows = (Long) queryRunner.execute(session, sql).getMaterializedRows().get(0).getField(0); log.info("Imported %s rows for %s in %s", rows, table.getObjectName(), nanosSince(start).convertToMostSuccinctTimeUnit()); - assertThat(queryRunner.execute(session, "SELECT count(*) FROM " + table).getOnlyValue()) - .as("Table is not loaded properly: %s", table) - .isEqualTo(queryRunner.execute(session, "SELECT count(*) FROM " + table.getObjectName()).getOnlyValue()); + assertThat(queryRunner.execute(session, "SELECT count(*) FROM " + table.getObjectName()).getOnlyValue()) + .as("Table is not loaded properly: %s", table.getObjectName()) + .isEqualTo(queryRunner.execute(session, "SELECT count(*) FROM " + table).getOnlyValue()); } public static RuntimeException getTrinoExceptionCause(Throwable e)