Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import io.trino.spi.TrinoException;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.procedure.Procedure;

import javax.inject.Inject;
Expand Down Expand Up @@ -106,13 +104,16 @@ private void doFlushMetadataCache(ConnectorSession session, Optional<String> sch
}
else if (schemaName.isPresent() && tableName.isPresent()) {
HiveMetastore metastore = metastoreFactory.createMetastore(Optional.of(session.getIdentity()));
Table table = metastore.getTable(schemaName.get(), tableName.get())
.orElseThrow(() -> new TableNotFoundException(new SchemaTableName(schemaName.get(), tableName.get())));
verifyDeltaLakeTable(table);
cachingHiveMetastore.ifPresent(caching -> caching.invalidateTable(table.getDatabaseName(), table.getTableName()));
String tableLocation = getTableLocation(table);
transactionLogAccess.invalidateCaches(tableLocation);
extendedStatisticsAccess.invalidateCache(tableLocation);
// This may insert into a cache, but this will get invalidated below. TODO fix Delta so that flush_metadata_cache doesn't have to read from metastore
Optional<Table> tableBeforeFlush = metastore.getTable(schemaName.get(), tableName.get());
cachingHiveMetastore.ifPresent(caching -> caching.invalidateTable(schemaName.get(), tableName.get()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does invalidateTable throw something if you call it with a schema/table that isn't cached? Seems like we should be able to just call it blindly without calling getTable first.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does invalidateTable throw something if you call it with a schema/table that isn't cached?

it does not

Seems like we should be able to just call it blindly without calling getTable first.

we call getTable to know the location (pre-existing)

we invalidate after calling, so that the cache is empty in the end state.


Optional<String> tableLocation = tableBeforeFlush.map(table -> {
verifyDeltaLakeTable(table);
return getTableLocation(table);
});
tableLocation.ifPresent(transactionLogAccess::invalidateCaches);
tableLocation.ifPresent(extendedStatisticsAccess::invalidateCache);
Comment on lines +115 to +116
Copy link
Member

@alexjo2144 alexjo2144 Apr 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the cache key for these should be a tableName, tableLocation tuple? That way you could invalidate them by name

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i think so

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i added TODO fix Delta so that flush_metadata_cache doesn't have to read from metastore for now.
would prefer to revisit this after #17092

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> #17214

}
else {
throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, "Illegal parameter set passed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.createS3DeltaLakeQueryRunner;
import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder;
import static io.trino.plugin.hive.containers.HiveHadoop.HIVE3_IMAGE;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static org.assertj.core.api.Assertions.assertThat;

public class TestDeltaLakeFlushMetadataCacheProcedure
extends AbstractTestQueryFramework
Expand Down Expand Up @@ -105,10 +107,34 @@ public void testFlushMetadataCache()
}

@Test
public void testFlushMetadataCacheTableNotFound()
public void testFlushMetadataCacheAfterTableCreated()
{
assertQueryFails(
"CALL system.flush_metadata_cache(schema_name => 'test_not_existing_schema', table_name => 'test_not_existing_table')",
"Table 'test_not_existing_schema.test_not_existing_table' not found");
String schema = getSession().getSchema().orElseThrow();
String tableName = "flush_metadata_after_table_created";
String intermediateTableName = "test_flush_intermediate_" + randomNameSuffix();

String location = "s3://%s/%s".formatted(BUCKET_NAME, intermediateTableName);
assertUpdate("CREATE TABLE " + intermediateTableName + " WITH (location = '" + location + "') AS TABLE tpch.tiny.region", 5);

// This may cause the connector to cache the fact that the table does not exist
assertQueryFails("TABLE " + tableName, "\\Qline 1:1: Table 'delta_lake.default.flush_metadata_after_table_created' does not exist");

metastore.renameTable(schema, intermediateTableName, schema, tableName);

// Verify cached state (we currently cache missing objects in CachingMetastore)
assertQueryFails("TABLE " + tableName, "\\Qline 1:1: Table 'delta_lake.default.flush_metadata_after_table_created' does not exist");

assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => '" + tableName + "')");
assertThat(query("TABLE " + tableName))
.skippingTypesCheck() // Delta has no parametric varchar
.matches("TABLE tpch.tiny.region");

assertUpdate("DROP TABLE flush_metadata_after_table_created");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
assertUpdate("DROP TABLE flush_metadata_after_table_created");
assertUpdate("DROP TABLE " + tableName)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed this one. will resolve in #17214

}

@Test
public void testFlushMetadataCacheNonExistentTable()
{
assertUpdate("CALL system.flush_metadata_cache(schema_name => 'test_not_existing_schema', table_name => 'test_not_existing_table')");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,9 @@ public static void copyTable(QueryRunner queryRunner, QualifiedObjectName table,
long rows = (Long) queryRunner.execute(session, sql).getMaterializedRows().get(0).getField(0);
log.info("Imported %s rows for %s in %s", rows, table.getObjectName(), nanosSince(start).convertToMostSuccinctTimeUnit());

assertThat(queryRunner.execute(session, "SELECT count(*) FROM " + table).getOnlyValue())
.as("Table is not loaded properly: %s", table)
.isEqualTo(queryRunner.execute(session, "SELECT count(*) FROM " + table.getObjectName()).getOnlyValue());
assertThat(queryRunner.execute(session, "SELECT count(*) FROM " + table.getObjectName()).getOnlyValue())
.as("Table is not loaded properly: %s", table.getObjectName())
.isEqualTo(queryRunner.execute(session, "SELECT count(*) FROM " + table).getOnlyValue());
}

public static RuntimeException getTrinoExceptionCause(Throwable e)
Expand Down