Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public class HiveConfig
private boolean forceLocalScheduling;
private boolean recursiveDirWalkerEnabled;
private boolean ignoreAbsentPartitions;
private boolean ignoreSchemaLocationCleanupFailure;

private int maxConcurrentFileRenames = 20;
private int maxConcurrentMetastoreDrops = 20;
Expand Down Expand Up @@ -330,19 +329,6 @@ public HiveConfig setIgnoreAbsentPartitions(boolean ignoreAbsentPartitions)
return this;
}

public boolean isIgnoreSchemaLocationCleanupFailure()
{
return ignoreSchemaLocationCleanupFailure;
}

@Config("hive.ignore-schema-location-cleanup-failure")
@ConfigDescription("Allows to ignore failures related to file system cleanup during DROP SCHEMA for situations when schema location is misconfigured or no longer reachable")
public HiveConfig setIgnoreSchemaLocationCleanupFailure(boolean ignoreSchemaLocationCleanupFailure)
{
this.ignoreSchemaLocationCleanupFailure = ignoreSchemaLocationCleanupFailure;
return this;
}

@NotNull
public DataSize getMaxSplitSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ public void createSchema(ConnectorSession session, String schemaName, Map<String
@Override
public void dropSchema(ConnectorSession session, String schemaName)
{
metastore.dropDatabase(session, schemaName);
metastore.dropDatabase(new HiveIdentity(session), schemaName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ public final class HiveSessionProperties
private static final String TEMPORARY_STAGING_DIRECTORY_PATH = "temporary_staging_directory_path";
private static final String DELEGATE_TRANSACTIONAL_MANAGED_TABLE_LOCATION_TO_METASTORE = "delegate_transactional_managed_table_location_to_metastore";
private static final String IGNORE_ABSENT_PARTITIONS = "ignore_absent_partitions";
private static final String IGNORE_SCHEMA_LOCATION_CLEANUP_FAILURE = "ignore_schema_location_cleanup_failure";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
private static final String QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS = "query_partition_filter_required_schemas";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
Expand Down Expand Up @@ -411,11 +410,6 @@ public HiveSessionProperties(
"Ignore partitions when the file system location does not exist rather than failing the query.",
hiveConfig.isIgnoreAbsentPartitions(),
false),
booleanProperty(
IGNORE_SCHEMA_LOCATION_CLEANUP_FAILURE,
"Allows to ignore failures related to file system cleanup during DROP SCHEMA for situations when schema location is misconfigured or no longer reachable",
hiveConfig.isIgnoreSchemaLocationCleanupFailure(),
false),
booleanProperty(
QUERY_PARTITION_FILTER_REQUIRED,
"Require filter on partition column",
Expand Down Expand Up @@ -753,11 +747,6 @@ public static boolean isIgnoreAbsentPartitions(ConnectorSession session)
return session.getProperty(IGNORE_ABSENT_PARTITIONS, Boolean.class);
}

public static boolean isIgnoreSchemaLocationCleanupFailure(ConnectorSession session)
{
return session.getProperty(IGNORE_SCHEMA_LOCATION_CLEANUP_FAILURE, Boolean.class);
}

public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
{
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@
import io.trino.plugin.hive.security.SqlStandardAccessControlMetadataMetastore;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.security.PrincipalType;
import io.trino.spi.security.RoleGrant;
import io.trino.spi.statistics.ColumnStatisticType;
Expand Down Expand Up @@ -95,7 +93,6 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_PATH_ALREADY_EXISTS;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_TABLE_DROPPED_DURING_QUERY;
import static io.trino.plugin.hive.HiveMetadata.PRESTO_QUERY_ID_NAME;
import static io.trino.plugin.hive.HiveSessionProperties.isIgnoreSchemaLocationCleanupFailure;
import static io.trino.plugin.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_NEW_DIRECTORY;
import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView;
import static io.trino.plugin.hive.acid.AcidTransaction.NO_ACID_TRANSACTION;
Expand Down Expand Up @@ -367,46 +364,9 @@ public synchronized void createDatabase(HiveIdentity identity, Database database
setExclusive((delegate, hdfsEnvironment) -> delegate.createDatabase(identity, database));
}

public synchronized void dropDatabase(ConnectorSession session, String schemaName)
public synchronized void dropDatabase(HiveIdentity identity, String schemaName)
{
HiveIdentity identity = new HiveIdentity(session);
HdfsContext context = new HdfsContext(
identity.getUsername()
.map(ConnectorIdentity::ofUser)
.orElseThrow(() -> new IllegalStateException("username is null")));

Optional<Path> location = delegate.getDatabase(schemaName)
.orElseThrow(() -> new SchemaNotFoundException(schemaName))
.getLocation()
.map(Path::new);

setExclusive((delegate, hdfsEnvironment) -> {
delegate.dropDatabase(identity, schemaName);

location.ifPresent(path -> {
try {
FileSystem fs = hdfsEnvironment.getFileSystem(context, path);
// If no files in schema directory, delete it
if (!fs.listFiles(path, false).hasNext()) {
log.debug("Deleting location of dropped schema (%s)", path);
fs.delete(path, true);
}
else {
log.info("Skipped deleting schema location with external files (%s)", path);
}
}
catch (IOException | RuntimeException e) {
if (isIgnoreSchemaLocationCleanupFailure(session)) {
log.warn(e, "Failure when checking or deleting schema directory '%s'", path);
}
else {
throw new TrinoException(
HIVE_FILESYSTEM_ERROR,
format("Error checking or deleting schema directory '%s'", path), e);
}
}
});
});
setExclusive((delegate, hdfsEnvironment) -> delegate.dropDatabase(identity, schemaName));
}

public synchronized void renameDatabase(HiveIdentity identity, String source, String target)
Expand Down Expand Up @@ -2253,7 +2213,7 @@ private void rollbackShared()
.map(Column::getName)
.collect(toImmutableList());
List<String> partitionNames = delegate.getPartitionNamesByFilter(
identity, schemaTableName.getSchemaName(), schemaTableName.getTableName(), partitionColumnNames, TupleDomain.all())
identity, schemaTableName.getSchemaName(), schemaTableName.getTableName(), partitionColumnNames, TupleDomain.all())
.orElse(ImmutableList.of());
for (List<String> partitionNameBatch : Iterables.partition(partitionNames, 10)) {
Collection<Optional<Partition>> partitions = delegate.getPartitionsByNames(identity, schemaTableName.getSchemaName(), schemaTableName.getTableName(), partitionNameBatch).values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ public synchronized void dropDatabase(HiveIdentity identity, String databaseName
throw new TrinoException(HIVE_METASTORE_ERROR, "Database " + databaseName + " is not empty");
}

// Only delete the metadata of the database, not any other files
deleteSchemaFile(DATABASE, getDatabaseMetadataDirectory(databaseName));
deleteMetadataDirectory(getDatabaseMetadataDirectory(databaseName));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,7 @@ public void dropDatabase(HiveIdentity identity, String databaseName)
.stopOnIllegalExceptions()
.run("dropDatabase", stats.getDropDatabase().wrap(() -> {
try (ThriftMetastoreClient client = createMetastoreClient(identity)) {
client.dropDatabase(databaseName, false, false);
client.dropDatabase(databaseName, true, false);
}
return null;
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public void testDefaults()
.setMaxConcurrentMetastoreUpdates(20)
.setRecursiveDirWalkerEnabled(false)
.setIgnoreAbsentPartitions(false)
.setIgnoreSchemaLocationCleanupFailure(false)
.setHiveStorageFormat(HiveStorageFormat.ORC)
.setHiveCompressionCodec(HiveCompressionCodec.GZIP)
.setRespectTableFormat(true)
Expand Down Expand Up @@ -131,7 +130,6 @@ public void testExplicitPropertyMappings()
.put("hive.writer-sort-buffer-size", "13MB")
.put("hive.recursive-directories", "true")
.put("hive.ignore-absent-partitions", "true")
.put("hive.ignore-schema-location-cleanup-failure", "true")
.put("hive.storage-format", "SEQUENCEFILE")
.put("hive.compression-codec", "NONE")
.put("hive.respect-table-format", "false")
Expand Down Expand Up @@ -210,7 +208,6 @@ public void testExplicitPropertyMappings()
.setMaxConcurrentMetastoreUpdates(100)
.setRecursiveDirWalkerEnabled(true)
.setIgnoreAbsentPartitions(true)
.setIgnoreSchemaLocationCleanupFailure(true)
.setHiveStorageFormat(HiveStorageFormat.SEQUENCEFILE)
.setHiveCompressionCodec(HiveCompressionCodec.NONE)
.setRespectTableFormat(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@
import com.google.inject.name.Named;
import io.trino.tempto.ProductTest;
import io.trino.tempto.hadoop.hdfs.HdfsClient;
import io.trino.tempto.query.QueryExecutionException;
import org.testng.annotations.Test;

import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure;
import static io.trino.tempto.query.QueryExecutor.query;
import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix;
import static io.trino.tests.product.utils.QueryExecutors.onHive;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class TestCreateDropSchema
extends ProductTest
Expand All @@ -42,159 +39,18 @@ public class TestCreateDropSchema
@Test
public void testCreateDropSchema()
{
String schemaName = "test_drop_schema";
String schemaDir = warehouseDirectory + "/test_drop_schema.db";
onHive().executeQuery("DROP DATABASE IF EXISTS test_drop_schema CASCADE");

ensureSchemaDoesNotExist(schemaName);

assertQuerySucceeds("CREATE SCHEMA test_drop_schema");
assertThat(hdfsClient.exist(schemaDir))
.as("Check if expected schema directory exists after creating schema")
.isTrue();
onTrino().executeQuery("CREATE SCHEMA test_drop_schema");
assertTrue(hdfsClient.exist(warehouseDirectory + "/test_drop_schema.db"));

onTrino().executeQuery("CREATE TABLE test_drop_schema.test_drop (col1 int)");

assertQueryFailure(() -> query("DROP SCHEMA test_drop_schema"))
.hasMessageContaining("line 1:1: Cannot drop non-empty schema 'test_drop_schema'");

onTrino().executeQuery("DROP TABLE test_drop_schema.test_drop");

assertQuerySucceeds("DROP SCHEMA test_drop_schema");
assertThat(hdfsClient.exist(schemaDir))
.as("Check if schema directory exists after dropping schema")
.isFalse();
}

@Test
public void testDropSchemaWithEmptyLocation()
{
String schemaName = schemaName("schema_with_empty_location");
String schemaDir = warehouseDirectory + "/schema-with-empty-location/";

createSchema(schemaName, schemaDir);
assertFileExists(schemaDir, true, "schema directory exists after create schema");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertFileExists(schemaDir, false, "schema directory exists after drop schema");
}

@Test
public void testDropSchemaFilesWithoutLocation()
{
String schemaName = schemaName("schema_without_location");
String schemaDir = format("%s/%s.db/", warehouseDirectory, schemaName);

createSchema(schemaName);
assertFileExists(schemaDir, true, "schema directory exists after create schema");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertFileExists(schemaDir, false, "schema directory exists after drop schema");
}

@Test
public void testDropSchemaFilesWithNonemptyLocation()
{
String schemaName = schemaName("schema_with_nonempty_location");
String schemaDir = warehouseDirectory + "/schema-with-nonempty-location/";

// Create file in schema directory before creating schema
String externalFile = schemaDir + "external-file";
hdfsClient.createDirectory(schemaDir);
hdfsClient.saveFile(externalFile, "");

createSchema(schemaName, schemaDir);
assertFileExists(schemaDir, true, "schema directory exists after create schema");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertFileExists(schemaDir, true, "schema directory exists after drop schema");

assertFileExists(externalFile, true, "external file exists after drop schema");

hdfsClient.delete(externalFile);
}

// Tests create/drop schema transactions with default schema location
@Test
public void testDropSchemaFilesTransactions()
{
String schemaName = schemaName("schema_directory_transactions");
String schemaDir = format("%s/%s.db/", warehouseDirectory, schemaName);

createSchema(schemaName);
assertFileExists(schemaDir, true, "schema directory exists after create schema");

onTrino().executeQuery("START TRANSACTION");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertQuerySucceeds("ROLLBACK");
assertFileExists(schemaDir, true, "schema directory exists after rollback");

// Sanity check: schema is still working
onTrino().executeQuery(format("CREATE TABLE %s.test_table (i integer)", schemaName));
onTrino().executeQuery(format("DROP TABLE %s.test_table", schemaName));

onTrino().executeQuery("START TRANSACTION");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertQuerySucceeds("COMMIT");
assertFileExists(schemaDir, false, "schema directory exists after drop schema");
}

@Test
public void testDropSchemaFilesTransactionsWithExternalFiles()
{
String schemaName = schemaName("schema_transactions_with_external_files");
String schemaDir = warehouseDirectory + "/schema-transactions-with-external-files/";

// Create file in schema directory before creating schema
String externalFile = schemaDir + "external-file";
hdfsClient.createDirectory(schemaDir);
hdfsClient.saveFile(externalFile, "");

createSchema(schemaName, schemaDir);

onTrino().executeQuery("START TRANSACTION");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertQuerySucceeds("ROLLBACK");
assertFileExists(externalFile, true, "external file exists after rolling back drop schema");

// Sanity check: schema is still working
onTrino().executeQuery(format("CREATE TABLE %s.test_table (i integer)", schemaName));
onTrino().executeQuery(format("DROP TABLE %s.test_table", schemaName));

onTrino().executeQuery("START TRANSACTION");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertQuerySucceeds("COMMIT");
assertFileExists(externalFile, true, "schema directory exists after committing drop schema");
}

private void assertFileExists(String path, boolean exists, String description)
{
assertThat(hdfsClient.exist(path)).as("%s (%s)", description, path).isEqualTo(exists);
}

private static void assertQuerySucceeds(String query)
{
try {
onTrino().executeQuery(query);
}
catch (QueryExecutionException e) {
fail(format("Expected query to succeed: %s", query), e.getCause());
}
}

private void createSchema(String name)
{
onTrino().executeQuery(format("CREATE SCHEMA %s", name));
}

private void createSchema(String name, String location)
{
onTrino().executeQuery(format("CREATE SCHEMA %s WITH (location = '%s')", name, location));
}

private static String schemaName(String name)
{
return format("%s_%s", name, randomTableSuffix());
}

private static void ensureSchemaDoesNotExist(String schemaName)
{
onHive().executeQuery(format("DROP DATABASE IF EXISTS %s CASCADE", schemaName));
onTrino().executeQuery("DROP SCHEMA test_drop_schema");
assertFalse(hdfsClient.exist(warehouseDirectory + "/test_drop_schema.db"));
}
}