Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
import io.trino.plugin.hive.security.SqlStandardAccessControlMetadataMetastore;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.security.PrincipalType;
import io.trino.spi.security.RoleGrant;
import io.trino.spi.statistics.ColumnStatisticType;
Expand Down Expand Up @@ -366,7 +368,38 @@ public synchronized void createDatabase(HiveIdentity identity, Database database

public synchronized void dropDatabase(HiveIdentity identity, String schemaName)
{
setExclusive((delegate, hdfsEnvironment) -> delegate.dropDatabase(identity, schemaName));
HdfsContext context = new HdfsContext(
identity.getUsername()
.map(ConnectorIdentity::ofUser)
.orElseThrow(() -> new IllegalStateException("username is null")));

Optional<Path> location = delegate.getDatabase(schemaName)
.orElseThrow(() -> new SchemaNotFoundException(schemaName))
.getLocation()
.map(Path::new);

setExclusive((delegate, hdfsEnvironment) -> {
delegate.dropDatabase(identity, schemaName);

location.ifPresent(path -> {
try {
FileSystem fs = hdfsEnvironment.getFileSystem(context, path);
// If no files in schema directory, delete it
if (!fs.listFiles(path, false).hasNext()) {
log.debug("Deleting location of dropped schema (%s)", path);
fs.delete(path, true);
}
else {
log.info("Skipped deleting schema location with external files (%s)", path);
}
}
catch (IOException e) {
throw new TrinoException(
HIVE_FILESYSTEM_ERROR,
format("Error checking or deleting schema directory '%s'", path), e);
}
});
});
}

public synchronized void renameDatabase(HiveIdentity identity, String source, String target)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ public synchronized void dropDatabase(HiveIdentity identity, String databaseName
throw new TrinoException(HIVE_METASTORE_ERROR, "Database " + databaseName + " is not empty");
}

deleteMetadataDirectory(getDatabaseMetadataDirectory(databaseName));
// Only delete the metadata of the database, not any other files
deleteSchemaFile(DATABASE, getDatabaseMetadataDirectory(databaseName));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ public void dropDatabase(HiveIdentity identity, String databaseName)
.stopOnIllegalExceptions()
.run("dropDatabase", stats.getDropDatabase().wrap(() -> {
try (ThriftMetastoreClient client = createMetastoreClient(identity)) {
client.dropDatabase(databaseName, true, false);
client.dropDatabase(databaseName, false, false);
}
return null;
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
import com.google.inject.name.Named;
import io.trino.tempto.ProductTest;
import io.trino.tempto.hadoop.hdfs.HdfsClient;
import io.trino.tempto.query.QueryExecutionException;
import org.testng.annotations.Test;

import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure;
import static io.trino.tempto.query.QueryExecutor.query;
import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix;
import static io.trino.tests.product.utils.QueryExecutors.onHive;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

public class TestCreateDropSchema
extends ProductTest
Expand All @@ -39,18 +42,159 @@ public class TestCreateDropSchema
@Test
public void testCreateDropSchema()
{
onHive().executeQuery("DROP DATABASE IF EXISTS test_drop_schema CASCADE");
String schemaName = "test_drop_schema";
String schemaDir = warehouseDirectory + "/test_drop_schema.db";

onTrino().executeQuery("CREATE SCHEMA test_drop_schema");
assertTrue(hdfsClient.exist(warehouseDirectory + "/test_drop_schema.db"));
ensureSchemaDoesNotExist(schemaName);

assertQuerySucceeds("CREATE SCHEMA test_drop_schema");
assertThat(hdfsClient.exist(schemaDir))
.as("Check if expected schema directory exists after creating schema")
.isTrue();

onTrino().executeQuery("CREATE TABLE test_drop_schema.test_drop (col1 int)");

assertQueryFailure(() -> query("DROP SCHEMA test_drop_schema"))
.hasMessageContaining("line 1:1: Cannot drop non-empty schema 'test_drop_schema'");

onTrino().executeQuery("DROP TABLE test_drop_schema.test_drop");

onTrino().executeQuery("DROP SCHEMA test_drop_schema");
assertFalse(hdfsClient.exist(warehouseDirectory + "/test_drop_schema.db"));
assertQuerySucceeds("DROP SCHEMA test_drop_schema");
assertThat(hdfsClient.exist(schemaDir))
.as("Check if schema directory exists after dropping schema")
.isFalse();
}

@Test
public void testDropSchemaWithEmptyLocation()
{
String schemaName = schemaName("schema_with_empty_location");
String schemaDir = warehouseDirectory + "/schema-with-empty-location/";

createSchema(schemaName, schemaDir);
assertFileExists(schemaDir, true, "schema directory exists after create schema");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertFileExists(schemaDir, false, "schema directory exists after drop schema");
}

@Test
public void testDropSchemaFilesWithoutLocation()
{
String schemaName = schemaName("schema_without_location");
String schemaDir = format("%s/%s.db/", warehouseDirectory, schemaName);

createSchema(schemaName);
assertFileExists(schemaDir, true, "schema directory exists after create schema");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertFileExists(schemaDir, false, "schema directory exists after drop schema");
}

@Test
public void testDropSchemaFilesWithNonemptyLocation()
{
String schemaName = schemaName("schema_with_nonempty_location");
String schemaDir = warehouseDirectory + "/schema-with-nonempty-location/";

// Create file in schema directory before creating schema
String externalFile = schemaDir + "external-file";
hdfsClient.createDirectory(schemaDir);
hdfsClient.saveFile(externalFile, "");

createSchema(schemaName, schemaDir);
assertFileExists(schemaDir, true, "schema directory exists after create schema");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertFileExists(schemaDir, true, "schema directory exists after drop schema");

assertFileExists(externalFile, true, "external file exists after drop schema");

hdfsClient.delete(externalFile);
}

// Tests create/drop schema transactions with default schema location
@Test
public void testDropSchemaFilesTransactions()
{
String schemaName = schemaName("schema_directory_transactions");
String schemaDir = format("%s/%s.db/", warehouseDirectory, schemaName);

createSchema(schemaName);
assertFileExists(schemaDir, true, "schema directory exists after create schema");

onTrino().executeQuery("START TRANSACTION");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertQuerySucceeds("ROLLBACK");
assertFileExists(schemaDir, true, "schema directory exists after rollback");

// Sanity check: schema is still working
onTrino().executeQuery(format("CREATE TABLE %s.test_table (i integer)", schemaName));
onTrino().executeQuery(format("DROP TABLE %s.test_table", schemaName));

onTrino().executeQuery("START TRANSACTION");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertQuerySucceeds("COMMIT");
assertFileExists(schemaDir, false, "schema directory exists after drop schema");
}

@Test
public void testDropSchemaFilesTransactionsWithExternalFiles()
{
String schemaName = schemaName("schema_transactions_with_external_files");
String schemaDir = warehouseDirectory + "/schema-transactions-with-external-files/";

// Create file in schema directory before creating schema
String externalFile = schemaDir + "external-file";
hdfsClient.createDirectory(schemaDir);
hdfsClient.saveFile(externalFile, "");

createSchema(schemaName, schemaDir);

onTrino().executeQuery("START TRANSACTION");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertQuerySucceeds("ROLLBACK");
assertFileExists(externalFile, true, "external file exists after rolling back drop schema");

// Sanity check: schema is still working
onTrino().executeQuery(format("CREATE TABLE %s.test_table (i integer)", schemaName));
onTrino().executeQuery(format("DROP TABLE %s.test_table", schemaName));

onTrino().executeQuery("START TRANSACTION");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertQuerySucceeds("COMMIT");
assertFileExists(externalFile, true, "schema directory exists after committing drop schema");
}

private void assertFileExists(String path, boolean exists, String description)
{
assertThat(hdfsClient.exist(path)).as("%s (%s)", description, path).isEqualTo(exists);
}

private static void assertQuerySucceeds(String query)
{
try {
onTrino().executeQuery(query);
}
catch (QueryExecutionException e) {
fail(format("Expected query to succeed: %s", query), e.getCause());
}
}

private void createSchema(String name)
{
onTrino().executeQuery(format("CREATE SCHEMA %s", name));
}

private void createSchema(String name, String location)
{
onTrino().executeQuery(format("CREATE SCHEMA %s WITH (location = '%s')", name, location));
}

private static String schemaName(String name)
{
return format("%s_%s", name, randomTableSuffix());
}

private static void ensureSchemaDoesNotExist(String schemaName)
{
onHive().executeQuery(format("DROP DATABASE IF EXISTS %s CASCADE", schemaName));
}
}