Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public class HiveConfig
private boolean immutablePartitions;
private Optional<InsertExistingPartitionsBehavior> insertExistingPartitionsBehavior = Optional.empty();
private boolean createEmptyBucketFiles;
// This is meant to protect users who are misusing schema locations (by
// putting schemas in locations with extraneous files), so default to false
// to avoid deleting those files if Trino is unable to check.
private boolean deleteSchemaLocationsFallback;
private int maxPartitionsPerWriter = 100;
private int maxOpenSortFiles = 50;
private int writeValidationThreads = 16;
Expand Down Expand Up @@ -522,6 +526,19 @@ public HiveConfig setCreateEmptyBucketFiles(boolean createEmptyBucketFiles)
return this;
}

public boolean isDeleteSchemaLocationsFallback()
{
return this.deleteSchemaLocationsFallback;
}

@Config("hive.delete-schema-locations-fallback")
@ConfigDescription("Whether schema locations should be deleted when Trino can't determine whether they contain external files.")
public HiveConfig setDeleteSchemaLocationsFallback(boolean deleteSchemaLocationsFallback)
{
this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback;
return this;
}

@Min(1)
public int getMaxPartitionsPerWriter()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ public void createSchema(ConnectorSession session, String schemaName, Map<String
@Override
public void dropSchema(ConnectorSession session, String schemaName)
{
metastore.dropDatabase(new HiveIdentity(session), schemaName);
metastore.dropDatabase(session, schemaName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class HiveMetadataFactory
private final boolean skipTargetCleanupOnRollback;
private final boolean writesToNonManagedTablesEnabled;
private final boolean createsOfNonManagedTablesEnabled;
private final boolean deleteSchemaLocationsFallback;
private final boolean translateHiveViews;
private final boolean hideDeltaLakeTables;
private final long perTransactionCacheMaximumSize;
Expand Down Expand Up @@ -100,6 +101,7 @@ public HiveMetadataFactory(
hiveConfig.isSkipTargetCleanupOnRollback(),
hiveConfig.getWritesToNonManagedTablesEnabled(),
hiveConfig.getCreatesOfNonManagedTablesEnabled(),
hiveConfig.isDeleteSchemaLocationsFallback(),
hiveConfig.isTranslateHiveViews(),
hiveConfig.getPerTransactionMetastoreCacheMaximumSize(),
hiveConfig.getHiveTransactionHeartbeatInterval(),
Expand Down Expand Up @@ -130,6 +132,7 @@ public HiveMetadataFactory(
boolean skipTargetCleanupOnRollback,
boolean writesToNonManagedTablesEnabled,
boolean createsOfNonManagedTablesEnabled,
boolean deleteSchemaLocationsFallback,
boolean translateHiveViews,
long perTransactionCacheMaximumSize,
Optional<Duration> hiveTransactionHeartbeatInterval,
Expand All @@ -152,6 +155,7 @@ public HiveMetadataFactory(
this.skipTargetCleanupOnRollback = skipTargetCleanupOnRollback;
this.writesToNonManagedTablesEnabled = writesToNonManagedTablesEnabled;
this.createsOfNonManagedTablesEnabled = createsOfNonManagedTablesEnabled;
this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback;
this.translateHiveViews = translateHiveViews;
this.hideDeltaLakeTables = hideDeltaLakeTables;
this.perTransactionCacheMaximumSize = perTransactionCacheMaximumSize;
Expand Down Expand Up @@ -197,6 +201,7 @@ public TransactionalMetadata create(boolean autoCommit)
updateExecutor,
skipDeletionForAlter,
skipTargetCleanupOnRollback,
deleteSchemaLocationsFallback,
hiveTransactionHeartbeatInterval,
heartbeatService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ public void createDatabase(HiveIdentity identity, Database database)
delegate.createDatabase(identity, database);
}

public void dropDatabase(HiveIdentity identity, String databaseName)
public void dropDatabase(HiveIdentity identity, String databaseName, boolean deleteData)
{
delegate.dropDatabase(identity, databaseName);
delegate.dropDatabase(identity, databaseName, deleteData);
}

public void renameDatabase(HiveIdentity identity, String databaseName, String newDatabaseName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ default void updatePartitionStatistics(HiveIdentity identity, Table table, Strin

void createDatabase(HiveIdentity identity, Database database);

void dropDatabase(HiveIdentity identity, String databaseName);
void dropDatabase(HiveIdentity identity, String databaseName, boolean deleteData);

void renameDatabase(HiveIdentity identity, String databaseName, String newDatabaseName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,10 @@ public void createDatabase(HiveIdentity identity, Database database)
}

@Override
public void dropDatabase(HiveIdentity identity, String databaseName)
public void dropDatabase(HiveIdentity identity, String databaseName, boolean deleteData)
{
verifyRecordingMode();
delegate.dropDatabase(identity, databaseName);
delegate.dropDatabase(identity, databaseName, deleteData);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.trino.plugin.hive.security.SqlStandardAccessControlMetadataMetastore;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
Expand Down Expand Up @@ -133,6 +134,7 @@ public class SemiTransactionalHiveMetastore
private final Executor updateExecutor;
private final boolean skipDeletionForAlter;
private final boolean skipTargetCleanupOnRollback;
private final boolean deleteSchemaLocationsFallback;
private final ScheduledExecutorService heartbeatExecutor;
private final Optional<Duration> configuredTransactionHeartbeatInterval;

Expand Down Expand Up @@ -169,6 +171,7 @@ public SemiTransactionalHiveMetastore(
Executor updateExecutor,
boolean skipDeletionForAlter,
boolean skipTargetCleanupOnRollback,
boolean deleteSchemaLocationsFallback,
Optional<Duration> hiveTransactionHeartbeatInterval,
ScheduledExecutorService heartbeatService)
{
Expand All @@ -179,6 +182,7 @@ public SemiTransactionalHiveMetastore(
this.updateExecutor = requireNonNull(updateExecutor, "updateExecutor is null");
this.skipDeletionForAlter = skipDeletionForAlter;
this.skipTargetCleanupOnRollback = skipTargetCleanupOnRollback;
this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback;
this.heartbeatExecutor = heartbeatService;
this.configuredTransactionHeartbeatInterval = requireNonNull(hiveTransactionHeartbeatInterval, "hiveTransactionHeartbeatInterval is null");
}
Expand Down Expand Up @@ -365,9 +369,32 @@ public synchronized void createDatabase(HiveIdentity identity, Database database
setExclusive((delegate, hdfsEnvironment) -> delegate.createDatabase(identity, database));
}

public synchronized void dropDatabase(HiveIdentity identity, String schemaName)
public synchronized void dropDatabase(ConnectorSession session, String schemaName)
{
setExclusive((delegate, hdfsEnvironment) -> delegate.dropDatabase(identity, schemaName));
Optional<Path> location = delegate.getDatabase(schemaName)
.orElseThrow(() -> new SchemaNotFoundException(schemaName))
.getLocation()
.map(Path::new);

setExclusive((delegate, hdfsEnvironment) -> {
HiveIdentity identity = new HiveIdentity(session);

// If we see files in the schema location, don't delete it.
// If we see no files, request deletion.
// If we fail to check the schema location, behave according to fallback.
boolean deleteData = location.map(path -> {
HdfsContext context = new HdfsContext(session);
try (FileSystem fs = hdfsEnvironment.getFileSystem(context, path)) {
return !fs.listFiles(path, false).hasNext();
}
catch (IOException | RuntimeException e) {
log.warn(e, "Could not check schema directory '%s'", path);
return deleteSchemaLocationsFallback;
}
}).orElse(deleteSchemaLocationsFallback);

delegate.dropDatabase(identity, schemaName, deleteData);
});
}

public synchronized void renameDatabase(HiveIdentity identity, String source, String target)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public void createDatabase(HiveIdentity identity, Database database)
}

@Override
public void dropDatabase(HiveIdentity identity, String databaseName)
public void dropDatabase(HiveIdentity identity, String databaseName, boolean deleteData)
{
throw new TrinoException(NOT_SUPPORTED, "dropDatabase");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,11 @@ public void createDatabase(HiveIdentity identity, Database database)
}

@Override
public void dropDatabase(HiveIdentity identity, String databaseName)
public void dropDatabase(HiveIdentity identity, String databaseName, boolean deleteData)
{
identity = updateIdentity(identity);
try {
delegate.dropDatabase(identity, databaseName);
delegate.dropDatabase(identity, databaseName, deleteData);
}
finally {
invalidateDatabase(databaseName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public class FileHiveMetastore
{
private static final String PUBLIC_ROLE_NAME = "public";
private static final String ADMIN_ROLE_NAME = "admin";
private static final String TRINO_SCHEMA_FILE_NAME = ".trinoSchema";
private static final String TRINO_SCHEMA_FILE_NAME_SUFFIX = ".trinoSchema";
private static final String TRINO_PERMISSIONS_DIRECTORY_NAME = ".trinoPermissions";
public static final String ROLES_FILE_NAME = ".roles";
public static final String ROLE_GRANTS_FILE_NAME = ".roleGrants";
Expand Down Expand Up @@ -204,10 +204,16 @@ public synchronized void createDatabase(HiveIdentity identity, Database database

Path databaseMetadataDirectory = getDatabaseMetadataDirectory(database.getDatabaseName());
writeSchemaFile(DATABASE, databaseMetadataDirectory, databaseCodec, new DatabaseMetadata(currentVersion, database), false);
try {
metadataFileSystem.mkdirs(databaseMetadataDirectory);
}
catch (IOException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, "Could not write database", e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add the database information in the exception?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would go in #10146, but this was just copied from the other "could not write" errors in the class, which also don't give more detailed messages.

}
}

@Override
public synchronized void dropDatabase(HiveIdentity identity, String databaseName)
public synchronized void dropDatabase(HiveIdentity identity, String databaseName, boolean deleteData)
{
requireNonNull(databaseName, "databaseName is null");

Expand All @@ -216,7 +222,13 @@ public synchronized void dropDatabase(HiveIdentity identity, String databaseName
throw new TrinoException(HIVE_METASTORE_ERROR, "Database " + databaseName + " is not empty");
}

deleteMetadataDirectory(getDatabaseMetadataDirectory(databaseName));
// Either delete the entire database directory or just its metadata files
if (deleteData) {
deleteDirectoryAndSchema(DATABASE, getDatabaseMetadataDirectory(databaseName));
}
else {
deleteSchemaFile(DATABASE, getDatabaseMetadataDirectory(databaseName));
}
}

@Override
Expand Down Expand Up @@ -280,7 +292,7 @@ private void verifyDatabaseNotExists(String databaseName)
@Override
public synchronized List<String> getAllDatabases()
{
List<String> databases = getChildSchemaDirectories(catalogDirectory).stream()
List<String> databases = getChildSchemaDirectories(DATABASE, catalogDirectory).stream()
.map(Path::getName)
.collect(toList());
return ImmutableList.copyOf(databases);
Expand Down Expand Up @@ -492,7 +504,7 @@ private List<String> listAllTables(String databaseName)
}

Path databaseMetadataDirectory = getDatabaseMetadataDirectory(databaseName);
List<String> tables = getChildSchemaDirectories(databaseMetadataDirectory).stream()
List<String> tables = getChildSchemaDirectories(TABLE, databaseMetadataDirectory).stream()
.map(Path::getName)
.collect(toImmutableList());
return tables;
Expand Down Expand Up @@ -522,10 +534,10 @@ public synchronized void dropTable(HiveIdentity identity, String databaseName, S

// It is safe to delete the whole meta directory for external tables and views
if (!table.getTableType().equals(MANAGED_TABLE.name()) || deleteData) {
deleteMetadataDirectory(tableMetadataDirectory);
deleteDirectoryAndSchema(TABLE, tableMetadataDirectory);
}
else {
// in this case we only wan to delete the metadata of a managed table
// in this case we only want to delete the metadata of a managed table
deleteSchemaFile(TABLE, tableMetadataDirectory);
deleteTablePrivileges(table);
}
Expand Down Expand Up @@ -576,7 +588,7 @@ public synchronized void renameTable(HiveIdentity identity, String databaseName,
throw new TrinoException(HIVE_METASTORE_ERROR, "Could not create new table directory");
}
// Iceberg metadata references files in old path, so these cannot be moved. Moving table description (metadata from metastore perspective) only.
if (!metadataFileSystem.rename(getSchemaPath(oldPath), getSchemaPath(newPath))) {
if (!metadataFileSystem.rename(getSchemaPath(TABLE, oldPath), getSchemaPath(TABLE, newPath))) {
throw new TrinoException(HIVE_METASTORE_ERROR, "Could not rename table schema file");
}
// TODO drop data files when table is being dropped
Expand Down Expand Up @@ -734,7 +746,7 @@ public synchronized void addPartitions(HiveIdentity identity, String databaseNam
Partition partition = partitionWithStatistics.getPartition();
verifiedPartition(table, partition);
Path partitionMetadataDirectory = getPartitionMetadataDirectory(table, partition.getValues());
Path schemaPath = getSchemaPath(partitionMetadataDirectory);
Path schemaPath = getSchemaPath(PARTITION, partitionMetadataDirectory);
if (metadataFileSystem.exists(schemaPath)) {
throw new TrinoException(HIVE_METASTORE_ERROR, "Partition already exists");
}
Expand Down Expand Up @@ -814,7 +826,7 @@ public synchronized void dropPartition(HiveIdentity identity, String databaseNam

Path partitionMetadataDirectory = getPartitionMetadataDirectory(table, partitionValues);
if (deleteData) {
deleteMetadataDirectory(partitionMetadataDirectory);
deleteDirectoryAndSchema(PARTITION, partitionMetadataDirectory);
}
else {
deleteSchemaFile(PARTITION, partitionMetadataDirectory);
Expand Down Expand Up @@ -1015,7 +1027,7 @@ private synchronized Optional<List<String>> getAllPartitionNames(HiveIdentity id
private boolean isValidPartition(Table table, String partitionName)
{
try {
return metadataFileSystem.exists(getSchemaPath(getPartitionMetadataDirectory(table, partitionName)));
return metadataFileSystem.exists(getSchemaPath(PARTITION, getPartitionMetadataDirectory(table, partitionName)));
}
catch (IOException e) {
return false;
Expand Down Expand Up @@ -1185,7 +1197,7 @@ private synchronized void deleteTablePrivileges(Table table)
}
}

private List<Path> getChildSchemaDirectories(Path metadataDirectory)
private List<Path> getChildSchemaDirectories(SchemaType type, Path metadataDirectory)
{
try {
if (!metadataFileSystem.isDirectory(metadataDirectory)) {
Expand All @@ -1201,7 +1213,7 @@ private List<Path> getChildSchemaDirectories(Path metadataDirectory)
if (childPath.getName().startsWith(".")) {
continue;
}
if (metadataFileSystem.isFile(getSchemaPath(childPath))) {
if (metadataFileSystem.isFile(getSchemaPath(type, childPath))) {
childSchemaDirectories.add(childPath);
}
}
Expand Down Expand Up @@ -1233,15 +1245,19 @@ private Set<HivePrivilegeInfo> readAllPermissions(Path permissionsDirectory)
}
}

private void deleteMetadataDirectory(Path metadataDirectory)
private void deleteDirectoryAndSchema(SchemaType type, Path metadataDirectory)
{
try {
Path schemaPath = getSchemaPath(metadataDirectory);
Path schemaPath = getSchemaPath(type, metadataDirectory);
if (!metadataFileSystem.isFile(schemaPath)) {
// if there is no schema file, assume this is not a database, partition or table
return;
}

// Delete the schema file first, so it can never exist after the directory is deleted.
// (For cases when the schema file isn't in the metadata directory.)
deleteSchemaFile(type, metadataDirectory);

if (!metadataFileSystem.delete(metadataDirectory, true)) {
throw new TrinoException(HIVE_METASTORE_ERROR, "Could not delete metadata directory");
}
Expand Down Expand Up @@ -1273,7 +1289,7 @@ private void checkVersion(Optional<String> writerVersion)

private <T> Optional<T> readSchemaFile(SchemaType type, Path metadataDirectory, JsonCodec<T> codec)
{
return readFile(type + " schema", getSchemaPath(metadataDirectory), codec);
return readFile(type + " schema", getSchemaPath(type, metadataDirectory), codec);
}

private <T> Optional<T> readFile(String type, Path path, JsonCodec<T> codec)
Expand All @@ -1295,7 +1311,7 @@ private <T> Optional<T> readFile(String type, Path path, JsonCodec<T> codec)

private <T> void writeSchemaFile(SchemaType type, Path directory, JsonCodec<T> codec, T value, boolean overwrite)
{
writeFile(type + " schema", getSchemaPath(directory), codec, value, overwrite);
writeFile(type + " schema", getSchemaPath(type, directory), codec, value, overwrite);
}

private <T> void writeFile(String type, Path path, JsonCodec<T> codec, T value, boolean overwrite)
Expand Down Expand Up @@ -1324,7 +1340,7 @@ private <T> void writeFile(String type, Path path, JsonCodec<T> codec, T value,
private void deleteSchemaFile(SchemaType type, Path metadataDirectory)
{
try {
if (!metadataFileSystem.delete(getSchemaPath(metadataDirectory), false)) {
if (!metadataFileSystem.delete(getSchemaPath(type, metadataDirectory), false)) {
throw new TrinoException(HIVE_METASTORE_ERROR, "Could not delete " + type + " schema");
}
}
Expand Down Expand Up @@ -1380,9 +1396,14 @@ private Path getRoleGrantsFile()
return new Path(catalogDirectory, ROLE_GRANTS_FILE_NAME);
}

private static Path getSchemaPath(Path metadataDirectory)
private static Path getSchemaPath(SchemaType type, Path metadataDirectory)
{
return new Path(metadataDirectory, TRINO_SCHEMA_FILE_NAME);
if (type == DATABASE) {
return new Path(
requireNonNull(metadataDirectory.getParent(), "Can't use root directory as database path"),
format(".%s%s", metadataDirectory.getName(), TRINO_SCHEMA_FILE_NAME_SUFFIX));
}
return new Path(metadataDirectory, TRINO_SCHEMA_FILE_NAME_SUFFIX);
}

private static boolean isChildDirectory(Path parentDirectory, Path childDirectory)
Expand Down
Loading