diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveLocationService.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveLocationService.java index 6da35e45ba4f..a874b98f24fa 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveLocationService.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveLocationService.java @@ -14,7 +14,6 @@ package io.trino.plugin.hive; import com.google.inject.Inject; -import io.trino.filesystem.Location; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.LocationHandle.WriteMode; @@ -57,32 +56,32 @@ public HiveLocationService(HdfsEnvironment hdfsEnvironment, HiveConfig hiveConfi } @Override - public Location forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName) + public Path forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName) { HdfsContext context = new HdfsContext(session); - Location targetPath = getTableDefaultLocation(context, metastore, hdfsEnvironment, schemaName, tableName); + Path targetPath = getTableDefaultLocation(context, metastore, hdfsEnvironment, schemaName, tableName); // verify the target directory for table - if (pathExists(context, hdfsEnvironment, new Path(targetPath.toString()))) { + if (pathExists(context, hdfsEnvironment, targetPath)) { throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, format("Target directory for table '%s.%s' already exists: %s", schemaName, tableName, targetPath)); } return targetPath; } @Override - public LocationHandle forNewTableAsSelect(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, Optional externalLocation) + public LocationHandle forNewTableAsSelect(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, Optional externalLocation) { HdfsContext context = new HdfsContext(session); - Location targetPath = externalLocation.orElseGet(() -> getTableDefaultLocation(context, metastore, hdfsEnvironment, schemaName, tableName)); + Path targetPath = externalLocation.orElseGet(() -> getTableDefaultLocation(context, metastore, hdfsEnvironment, schemaName, tableName)); // verify the target directory for the table - if (pathExists(context, hdfsEnvironment, new Path(targetPath.toString()))) { + if (pathExists(context, hdfsEnvironment, targetPath)) { throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, format("Target directory for table '%s.%s' already exists: %s", schemaName, tableName, targetPath)); } // TODO detect when existing table's location is a on a different file system than the temporary directory if (shouldUseTemporaryDirectory(context, new Path(targetPath.toString()), externalLocation.isPresent())) { - Location writePath = createTemporaryPath(context, hdfsEnvironment, new Path(targetPath.toString()), temporaryStagingDirectoryPath); + Path writePath = createTemporaryPath(context, hdfsEnvironment, new Path(targetPath.toString()), temporaryStagingDirectoryPath); return new LocationHandle(targetPath, writePath, STAGE_AND_MOVE_TO_TARGET_DIRECTORY); } return new LocationHandle(targetPath, targetPath, DIRECT_TO_TARGET_NEW_DIRECTORY); @@ -92,10 +91,10 @@ public LocationHandle forNewTableAsSelect(SemiTransactionalHiveMetastore metasto public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table) { HdfsContext context = new HdfsContext(session); - Location targetPath = Location.of(table.getStorage().getLocation()); + Path targetPath = new Path(table.getStorage().getLocation()); if (shouldUseTemporaryDirectory(context, new Path(targetPath.toString()), false) && !isTransactionalTable(table.getParameters())) { - Location writePath = createTemporaryPath(context, hdfsEnvironment, new Path(targetPath.toString()), temporaryStagingDirectoryPath); + Path writePath = createTemporaryPath(context, hdfsEnvironment, targetPath, temporaryStagingDirectoryPath); return new LocationHandle(targetPath, writePath, STAGE_AND_MOVE_TO_TARGET_DIRECTORY); } return new LocationHandle(targetPath, targetPath, DIRECT_TO_TARGET_EXISTING_DIRECTORY); @@ -105,7 +104,7 @@ public LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore, public LocationHandle forOptimize(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table) { // For OPTIMIZE write result files directly to table directory; that is needed by the commit logic in HiveMetadata#finishTableExecute - Location targetPath = Location.of(table.getStorage().getLocation()); + Path targetPath = new Path(table.getStorage().getLocation()); return new LocationHandle(targetPath, targetPath, DIRECT_TO_TARGET_EXISTING_DIRECTORY); } @@ -141,23 +140,27 @@ public WriteInfo getPartitionWriteInfo(LocationHandle locationHandle, Optional

locationHandle.getWritePath().appendPath(partitionName); - case DIRECT_TO_TARGET_EXISTING_DIRECTORY -> targetPath; - case DIRECT_TO_TARGET_NEW_DIRECTORY -> throw new UnsupportedOperationException(format("inserting into existing partition is not supported for %s", writeMode)); - }; + switch (writeMode) { + case STAGE_AND_MOVE_TO_TARGET_DIRECTORY: + return new Path(locationHandle.getWritePath(), partitionName); + case DIRECT_TO_TARGET_EXISTING_DIRECTORY: + return targetPath; + case DIRECT_TO_TARGET_NEW_DIRECTORY: + throw new UnsupportedOperationException(format("inserting into existing partition is not supported for %s", writeMode)); + } + throw new UnsupportedOperationException("Unexpected write mode: " + writeMode); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 564f128dd269..9cc9d230e1fc 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -996,7 +996,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe .collect(toImmutableList()); checkPartitionTypesSupported(partitionColumns); - Optional targetPath; + Optional targetPath; boolean external; String externalLocation = getExternalLocation(tableMetadata.getProperties()); if (externalLocation != null) { @@ -1005,8 +1005,8 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe } external = true; - targetPath = Optional.of(getValidatedExternalLocation(externalLocation)); - checkExternalPath(new HdfsContext(session), new Path(targetPath.get().toString())); + targetPath = Optional.of(getExternalLocationAsPath(externalLocation)); + checkExternalPath(new HdfsContext(session), targetPath.get()); } else { external = false; @@ -1273,10 +1273,11 @@ private static String validateAvroSchemaLiteral(String avroSchemaLiteral) } } - private static Location getValidatedExternalLocation(String location) + private static Path getExternalLocationAsPath(String location) { try { - return Location.of(location); + Location.of(location); // Calling just for validation + return new Path(location); } catch (IllegalArgumentException e) { throw new TrinoException(INVALID_TABLE_PROPERTY, "External location is not a valid file system URI: " + location, e); @@ -1315,7 +1316,7 @@ private static Table buildTableObject( List partitionedBy, Optional bucketProperty, Map additionalTableParameters, - Optional targetPath, + Optional targetPath, boolean external, String prestoVersion, boolean usingSystemSecurity) @@ -1606,8 +1607,8 @@ private static List canonicalizePartitionValues(String partitionName, Li @Override public HiveOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout, RetryMode retryMode) { - Optional externalLocation = Optional.ofNullable(getExternalLocation(tableMetadata.getProperties())) - .map(HiveMetadata::getValidatedExternalLocation); + Optional externalLocation = Optional.ofNullable(getExternalLocation(tableMetadata.getProperties())) + .map(HiveMetadata::getExternalLocationAsPath); if (!createsOfNonManagedTablesEnabled && externalLocation.isPresent()) { throw new TrinoException(NOT_SUPPORTED, "Creating non-managed Hive tables is disabled"); } @@ -1694,7 +1695,7 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto retryMode != NO_RETRIES); WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle); - metastore.declareIntentionToWrite(session, writeInfo.writeMode(), writeInfo.writePath(), schemaTableName); + metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), schemaTableName); return result; } @@ -1720,7 +1721,7 @@ public Optional finishCreateTable(ConnectorSession sess handle.getPartitionedBy(), handle.getBucketProperty(), handle.getAdditionalTableParameters(), - Optional.of(writeInfo.targetPath()), + Optional.of(writeInfo.getTargetPath()), handle.isExternal(), prestoVersion, accessControlMetadata.isUsingSystemSecurity()); @@ -1765,7 +1766,6 @@ public Optional finishCreateTable(ConnectorSession sess tableStatistics = new PartitionStatistics(createEmptyStatistics(), ImmutableMap.of()); } - Optional writePath = Optional.of(new Path(writeInfo.writePath().toString())); if (handle.getPartitionedBy().isEmpty()) { List fileNames; if (partitionUpdates.isEmpty()) { @@ -1775,10 +1775,10 @@ public Optional finishCreateTable(ConnectorSession sess else { fileNames = getOnlyElement(partitionUpdates).getFileNames(); } - metastore.createTable(session, table, principalPrivileges, writePath, Optional.of(fileNames), false, tableStatistics, handle.isRetriesEnabled()); + metastore.createTable(session, table, principalPrivileges, Optional.of(writeInfo.getWritePath()), Optional.of(fileNames), false, tableStatistics, handle.isRetriesEnabled()); } else { - metastore.createTable(session, table, principalPrivileges, writePath, Optional.empty(), false, tableStatistics, false); + metastore.createTable(session, table, principalPrivileges, Optional.of(writeInfo.getWritePath()), Optional.empty(), false, tableStatistics, false); } if (!handle.getPartitionedBy().isEmpty()) { @@ -2001,7 +2001,7 @@ public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle merg LocationHandle locationHandle = locationService.forExistingTable(metastore, session, table); WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle); - metastore.finishMerge(session, table.getDatabaseName(), table.getTableName(), writeInfo.writePath(), partitionMergeResults, partitions); + metastore.finishMerge(session, table.getDatabaseName(), table.getTableName(), writeInfo.getWritePath(), partitionMergeResults, partitions); } @Override @@ -2072,7 +2072,7 @@ else if (isTransactional) { WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle); if (getInsertExistingPartitionsBehavior(session) == InsertExistingPartitionsBehavior.OVERWRITE - && writeInfo.writeMode() == DIRECT_TO_TARGET_EXISTING_DIRECTORY) { + && writeInfo.getWriteMode() == DIRECT_TO_TARGET_EXISTING_DIRECTORY) { if (isTransactional) { throw new TrinoException(NOT_SUPPORTED, "Overwriting existing partition in transactional tables doesn't support DIRECT_TO_TARGET_EXISTING_DIRECTORY write mode"); } @@ -2082,7 +2082,7 @@ else if (isTransactional) { throw new TrinoException(NOT_SUPPORTED, "Overwriting existing partition in non auto commit context doesn't support DIRECT_TO_TARGET_EXISTING_DIRECTORY write mode"); } } - metastore.declareIntentionToWrite(session, writeInfo.writeMode(), writeInfo.writePath(), tableName); + metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), tableName); return result; } @@ -2453,7 +2453,7 @@ private BeginTableExecuteResult( hiveExecuteHandle diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java index 0402d29933f4..a3a1527f062b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java @@ -256,17 +256,17 @@ public HiveWriterFactory( this.dataColumns = dataColumns.build(); this.isCreateTransactionalTable = isCreateTable && transaction.isTransactional(); - Location writePath; + Path writePath; if (isCreateTable) { this.table = null; WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle); - checkArgument(writeInfo.writeMode() != DIRECT_TO_TARGET_EXISTING_DIRECTORY, "CREATE TABLE write mode cannot be DIRECT_TO_TARGET_EXISTING_DIRECTORY"); - writePath = writeInfo.writePath(); + checkArgument(writeInfo.getWriteMode() != DIRECT_TO_TARGET_EXISTING_DIRECTORY, "CREATE TABLE write mode cannot be DIRECT_TO_TARGET_EXISTING_DIRECTORY"); + writePath = writeInfo.getWritePath(); } else { this.table = pageSinkMetadataProvider.getTable() .orElseThrow(() -> new TrinoException(HIVE_INVALID_METADATA, format("Table '%s.%s' was dropped during insert", schemaName, tableName))); - writePath = locationService.getQueryWriteInfo(locationHandle).writePath(); + writePath = locationService.getQueryWriteInfo(locationHandle).getWritePath(); } this.bucketCount = requireNonNull(bucketCount, "bucketCount is null"); @@ -289,12 +289,12 @@ public HiveWriterFactory( .filter(entry -> entry.getValue() != null) .collect(toImmutableMap(Entry::getKey, entry -> entry.getValue().toString())); - Configuration conf = hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(writePath.toString())); + Configuration conf = hdfsEnvironment.getConfiguration(new HdfsContext(session), writePath); this.conf = toJobConf(conf); // make sure the FileSystem is created with the correct Configuration object try { - hdfsEnvironment.getFileSystem(session.getIdentity(), new Path(writePath.toString()), conf); + hdfsEnvironment.getFileSystem(session.getIdentity(), writePath, conf); } catch (IOException e) { throw new TrinoException(HIVE_FILESYSTEM_ERROR, "Failed getting FileSystem: " + writePath, e); @@ -357,10 +357,10 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt // a new partition in a new partitioned table writeInfo = locationService.getPartitionWriteInfo(locationHandle, partition, partitionName.get()); - if (!writeInfo.writeMode().isWritePathSameAsTargetPath()) { + if (!writeInfo.getWriteMode().isWritePathSameAsTargetPath()) { // When target path is different from write path, // verify that the target directory for the partition does not already exist - Location writeInfoTargetPath = writeInfo.targetPath(); + Location writeInfoTargetPath = Location.of(writeInfo.getTargetPath().toString()); try { if (fileSystem.directoryExists(writeInfoTargetPath).orElse(false)) { throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, format( @@ -368,7 +368,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt partitionName, schemaName, tableName, - writeInfo.targetPath())); + writeInfo.getTargetPath())); } } catch (IOException e) { @@ -482,7 +482,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt int bucketToUse = bucketNumber.isEmpty() ? 0 : bucketNumber.getAsInt(); - Location path = writeInfo.writePath(); + Location path = Location.of(writeInfo.getWritePath().toString()); if (transaction.isAcidTransactionRunning() && transaction.getOperation() != CREATE_TABLE) { String subdir = computeAcidSubdir(transaction); String nameFormat = table != null && isInsertOnlyTable(table.getParameters()) ? "%05d_0" : "bucket_%05d"; @@ -632,8 +632,8 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt partitionName, updateMode, path.fileName(), - writeInfo.writePath().toString(), - writeInfo.targetPath().toString(), + writeInfo.getWritePath().toString(), + writeInfo.getTargetPath().toString(), onCommit, hiveWriterStats); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationHandle.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationHandle.java index 95b51def1c06..22a6e072dab4 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationHandle.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationHandle.java @@ -15,18 +15,21 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import io.trino.filesystem.Location; +import org.apache.hadoop.fs.Path; import static java.lang.String.format; import static java.util.Objects.requireNonNull; public class LocationHandle { - private final Location targetPath; - private final Location writePath; + private final Path targetPath; + private final Path writePath; private final WriteMode writeMode; - public LocationHandle(Location targetPath, Location writePath, WriteMode writeMode) + public LocationHandle( + Path targetPath, + Path writePath, + WriteMode writeMode) { if (writeMode.isWritePathSameAsTargetPath() && !targetPath.equals(writePath)) { throw new IllegalArgumentException(format("targetPath is expected to be same as writePath for writeMode %s", writeMode)); @@ -43,19 +46,19 @@ public LocationHandle( @JsonProperty("writeMode") WriteMode writeMode) { this( - Location.of(requireNonNull(targetPath, "targetPath is null")), - Location.of(requireNonNull(writePath, "writePath is null")), + new Path(requireNonNull(targetPath, "targetPath is null")), + new Path(requireNonNull(writePath, "writePath is null")), writeMode); } // This method should only be called by LocationService - Location getTargetPath() + Path getTargetPath() { return targetPath; } // This method should only be called by LocationService - Location getWritePath() + Path getWritePath() { return writePath; } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationService.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationService.java index f57759d353ad..7e3e1aa20a5d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationService.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/LocationService.java @@ -13,12 +13,11 @@ */ package io.trino.plugin.hive; -import io.trino.filesystem.Location; -import io.trino.plugin.hive.LocationHandle.WriteMode; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore; import io.trino.plugin.hive.metastore.Table; import io.trino.spi.connector.ConnectorSession; +import org.apache.hadoop.fs.Path; import java.util.Optional; @@ -26,9 +25,9 @@ public interface LocationService { - Location forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName); + Path forNewTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName); - LocationHandle forNewTableAsSelect(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, Optional externalLocation); + LocationHandle forNewTableAsSelect(SemiTransactionalHiveMetastore metastore, ConnectorSession session, String schemaName, String tableName, Optional externalLocation); LocationHandle forExistingTable(SemiTransactionalHiveMetastore metastore, ConnectorSession session, Table table); @@ -48,20 +47,23 @@ public interface LocationService */ WriteInfo getPartitionWriteInfo(LocationHandle locationHandle, Optional partition, String partitionName); - record WriteInfo(Location targetPath, Location writePath, WriteMode writeMode) + class WriteInfo { - public WriteInfo + private final Path targetPath; + private final Path writePath; + private final LocationHandle.WriteMode writeMode; + + public WriteInfo(Path targetPath, Path writePath, LocationHandle.WriteMode writeMode) { - requireNonNull(targetPath, "targetPath is null"); - requireNonNull(writePath, "writePath is null"); - requireNonNull(writeMode, "writeMode is null"); + this.targetPath = requireNonNull(targetPath, "targetPath is null"); + this.writePath = requireNonNull(writePath, "writePath is null"); + this.writeMode = requireNonNull(writeMode, "writeMode is null"); } /** * Target path for the partition, unpartitioned table, or the query. */ - @Override - public Location targetPath() + public Path getTargetPath() { return targetPath; } @@ -71,10 +73,14 @@ public Location targetPath() *

* It may be the same as {@code targetPath}. */ - @Override - public Location writePath() + public Path getWritePath() { return writePath; } + + public LocationHandle.WriteMode getWriteMode() + { + return writeMode; + } } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java index 3e453c80d738..8990eed5fa9b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java @@ -24,7 +24,6 @@ import com.google.errorprone.annotations.FormatMethod; import io.airlift.log.Logger; import io.airlift.units.Duration; -import io.trino.filesystem.Location; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.hive.thrift.metastore.DataOperationType; @@ -766,7 +765,7 @@ public synchronized void finishMerge( ConnectorSession session, String databaseName, String tableName, - Location currentLocation, + Path currentLocation, List partitionUpdateAndMergeResults, List partitions) { @@ -792,7 +791,7 @@ public synchronized void finishMerge( new TableAndMergeResults( table, Optional.of(principalPrivileges), - Optional.of(new Path(currentLocation.toString())), + Optional.of(currentLocation), partitionUpdateAndMergeResults, partitions), hdfsContext, @@ -1243,7 +1242,7 @@ public synchronized void revokeTablePrivileges(String databaseName, String table setExclusive((delegate, hdfsEnvironment) -> delegate.revokeTablePrivileges(databaseName, tableName, getRequiredTableOwner(databaseName, tableName), grantee, grantor, privileges, grantOption)); } - public synchronized String declareIntentionToWrite(ConnectorSession session, WriteMode writeMode, Location stagingPathRoot, SchemaTableName schemaTableName) + public synchronized String declareIntentionToWrite(ConnectorSession session, WriteMode writeMode, Path stagingPathRoot, SchemaTableName schemaTableName) { setShared(); if (writeMode == WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY) { @@ -1256,7 +1255,7 @@ public synchronized String declareIntentionToWrite(ConnectorSession session, Wri String queryId = session.getQueryId(); String declarationId = queryId + "_" + declaredIntentionsToWriteCounter; declaredIntentionsToWriteCounter++; - declaredIntentionsToWrite.add(new DeclaredIntentionToWrite(declarationId, writeMode, hdfsContext, queryId, new Path(stagingPathRoot.toString()), schemaTableName)); + declaredIntentionsToWrite.add(new DeclaredIntentionToWrite(declarationId, writeMode, hdfsContext, queryId, stagingPathRoot, schemaTableName)); return declarationId; } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/CreateEmptyPartitionProcedure.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/CreateEmptyPartitionProcedure.java index 6a1096222001..a7fbc1481102 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/CreateEmptyPartitionProcedure.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/CreateEmptyPartitionProcedure.java @@ -137,8 +137,8 @@ private void doCreateEmptyPartition(ConnectorSession session, ConnectorAccessCon new PartitionUpdate( partitionName, UpdateMode.NEW, - writeInfo.writePath().toString(), - writeInfo.targetPath().toString(), + writeInfo.getWritePath(), + writeInfo.getTargetPath(), ImmutableList.of(), 0, 0, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveWriteUtils.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveWriteUtils.java index b6c061ea4775..dd5473ddeb80 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveWriteUtils.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveWriteUtils.java @@ -430,7 +430,7 @@ private static void checkWritable( } } - public static Location getTableDefaultLocation(HdfsContext context, SemiTransactionalHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, String schemaName, String tableName) + public static Path getTableDefaultLocation(HdfsContext context, SemiTransactionalHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, String schemaName, String tableName) { Database database = metastore.getDatabase(schemaName) .orElseThrow(() -> new SchemaNotFoundException(schemaName)); @@ -438,7 +438,7 @@ public static Location getTableDefaultLocation(HdfsContext context, SemiTransact return getTableDefaultLocation(database, context, hdfsEnvironment, schemaName, tableName); } - public static Location getTableDefaultLocation(Database database, HdfsContext context, HdfsEnvironment hdfsEnvironment, String schemaName, String tableName) + public static Path getTableDefaultLocation(Database database, HdfsContext context, HdfsEnvironment hdfsEnvironment, String schemaName, String tableName) { String location = database.getLocation() .orElseThrow(() -> new TrinoException(HIVE_DATABASE_LOCATION_ERROR, format("Database '%s' location is not set", schemaName))); @@ -452,8 +452,9 @@ public static Location getTableDefaultLocation(Database database, HdfsContext co throw new TrinoException(HIVE_DATABASE_LOCATION_ERROR, format("Database '%s' location is not a directory: %s", schemaName, databasePath)); } } + Location.of(databasePath.toString()); // Calling just for validation - return Location.of(location).appendPath(escapeTableName(tableName)); + return new Path(databasePath, escapeTableName(tableName)); } public static boolean pathExists(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path path) @@ -516,7 +517,7 @@ public static boolean isFileCreatedByQuery(String fileName, String queryId) return fileName.startsWith(queryId) || fileName.endsWith(queryId); } - public static Location createTemporaryPath(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path targetPath, String temporaryStagingDirectoryPath) + public static Path createTemporaryPath(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path targetPath, String temporaryStagingDirectoryPath) { // use a per-user temporary directory to avoid permission problems String temporaryPrefix = temporaryStagingDirectoryPath.replace("${USER}", context.getIdentity().getUser()); @@ -536,7 +537,7 @@ public static Location createTemporaryPath(HdfsContext context, HdfsEnvironment setDirectoryOwner(context, hdfsEnvironment, temporaryPath, targetPath); } - return Location.of(temporaryPath.toString()); + return temporaryPath; } private static void setDirectoryOwner(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path path, Path targetPath) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index 06cb66a91c2f..7de08b130503 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -2682,7 +2682,7 @@ public void testTableCreationRollback() { SchemaTableName temporaryCreateRollbackTable = temporaryTable("create_rollback"); try { - Location stagingPathRoot; + Path stagingPathRoot; try (Transaction transaction = newTransaction()) { ConnectorSession session = newSession(); ConnectorMetadata metadata = transaction.getMetadata(); @@ -2731,7 +2731,7 @@ public void testTableCreationIgnoreExisting() String schemaName = schemaTableName.getSchemaName(); String tableName = schemaTableName.getTableName(); PrincipalPrivileges privileges = testingPrincipalPrivilege(session); - Location targetPath; + Path targetPath; try { try (Transaction transaction = newTransaction()) { LocationService locationService = getLocationService(); @@ -2746,7 +2746,7 @@ public void testTableCreationIgnoreExisting() // try creating it again from another transaction with ignoreExisting=false try (Transaction transaction = newTransaction()) { - Table table = createSimpleTable(schemaTableName, columns, session, targetPath.appendSuffix("_2"), "q2"); + Table table = createSimpleTable(schemaTableName, columns, session, targetPath.suffix("_2"), "q2"); transaction.getMetastore() .createTable(session, table, privileges, Optional.empty(), Optional.empty(), false, ZERO_TABLE_STATISTICS, false); transaction.commit(); @@ -2758,7 +2758,7 @@ public void testTableCreationIgnoreExisting() // try creating it again from another transaction with ignoreExisting=true try (Transaction transaction = newTransaction()) { - Table table = createSimpleTable(schemaTableName, columns, session, targetPath.appendSuffix("_3"), "q3"); + Table table = createSimpleTable(schemaTableName, columns, session, targetPath.suffix("_3"), "q3"); transaction.getMetastore() .createTable(session, table, privileges, Optional.empty(), Optional.empty(), true, ZERO_TABLE_STATISTICS, false); transaction.commit(); @@ -2767,7 +2767,7 @@ public void testTableCreationIgnoreExisting() // at this point the table should exist, now try creating the table again with a different table definition columns = ImmutableList.of(new Column("new_column", HiveType.valueOf("string"), Optional.empty())); try (Transaction transaction = newTransaction()) { - Table table = createSimpleTable(schemaTableName, columns, session, targetPath.appendSuffix("_4"), "q4"); + Table table = createSimpleTable(schemaTableName, columns, session, targetPath.suffix("_4"), "q4"); transaction.getMetastore() .createTable(session, table, privileges, Optional.empty(), Optional.empty(), true, ZERO_TABLE_STATISTICS, false); transaction.commit(); @@ -2783,7 +2783,7 @@ public void testTableCreationIgnoreExisting() } } - private static Table createSimpleTable(SchemaTableName schemaTableName, List columns, ConnectorSession session, Location targetPath, String queryId) + private static Table createSimpleTable(SchemaTableName schemaTableName, List columns, ConnectorSession session, Path targetPath, String queryId) { String tableOwner = session.getUser(); String schemaName = schemaTableName.getSchemaName(); @@ -2871,9 +2871,9 @@ private void doTestBucketSortedTables(SchemaTableName table) HdfsContext context = new HdfsContext(session); HiveConfig config = getHiveConfig(); // verify we have enough temporary files per bucket to require multiple passes - Location stagingPathRoot; + Path stagingPathRoot; if (config.isTemporaryStagingDirectoryEnabled()) { - stagingPathRoot = Location.of(config.getTemporaryStagingDirectoryPath() + stagingPathRoot = new Path(config.getTemporaryStagingDirectoryPath() .replace("${USER}", context.getIdentity().getUser())); } else { @@ -3092,11 +3092,8 @@ public void testCreateEmptyTableShouldNotCreateStagingDirectory() String tableOwner = session.getUser(); String schemaName = temporaryCreateEmptyTable.getSchemaName(); String tableName = temporaryCreateEmptyTable.getTableName(); - HiveConfig hiveConfig = getHiveConfig() - .setTemporaryStagingDirectoryPath(temporaryStagingPrefix) - .setTemporaryStagingDirectoryEnabled(true); - LocationService locationService = new HiveLocationService(hdfsEnvironment, hiveConfig); - Location targetPath = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName); + LocationService locationService = getLocationService(); + Path targetPath = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName); Table.Builder tableBuilder = Table.builder() .setDatabaseName(schemaName) .setTableName(tableName) @@ -3121,7 +3118,7 @@ public void testCreateEmptyTableShouldNotCreateStagingDirectory() transaction.commit(); HdfsContext context = new HdfsContext(session); - Path temporaryRoot = new Path(targetPath.toString(), temporaryStagingPrefix); + Path temporaryRoot = new Path(targetPath, temporaryStagingPrefix); FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, temporaryRoot); assertFalse(fileSystem.exists(temporaryRoot), format("Temporary staging directory %s is created.", temporaryRoot)); } @@ -3483,7 +3480,7 @@ public void testIllegalStorageFormatDuringTableScan() String tableOwner = session.getUser(); String schemaName = schemaTableName.getSchemaName(); String tableName = schemaTableName.getTableName(); - Location targetPath = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName); + Path targetPath = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName); //create table whose storage format is null Table.Builder tableBuilder = Table.builder() .setDatabaseName(schemaName) @@ -3714,7 +3711,7 @@ protected String partitionTargetPath(SchemaTableName schemaTableName, String par LocationService locationService = getLocationService(); Table table = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName()).get(); LocationHandle handle = locationService.forExistingTable(metastore, session, table); - return locationService.getPartitionWriteInfo(handle, Optional.empty(), partitionName).targetPath().toString(); + return locationService.getPartitionWriteInfo(handle, Optional.empty(), partitionName).getTargetPath().toString(); } } @@ -4281,7 +4278,7 @@ private void doInsert(HiveStorageFormat storageFormat, SchemaTableName tableName assertFalse(existingFiles.isEmpty()); } - Location stagingPathRoot; + Path stagingPathRoot; try (Transaction transaction = newTransaction()) { ConnectorSession session = newSession(); ConnectorMetadata metadata = transaction.getMetadata(); @@ -4402,7 +4399,7 @@ private void doInsertOverwriteUnpartitioned(SchemaTableName tableName) assertFalse(existingFiles.isEmpty()); } - Location stagingPathRoot; + Path stagingPathRoot; try (Transaction transaction = newTransaction()) { ConnectorSession session = newSession(overwriteProperties); ConnectorMetadata metadata = transaction.getMetadata(); @@ -4471,31 +4468,32 @@ private void doInsertOverwriteUnpartitioned(SchemaTableName tableName) } } - private Location getStagingPathRoot(ConnectorInsertTableHandle insertTableHandle) + // These are protected so extensions to the hive connector can replace the handle classes + protected Path getStagingPathRoot(ConnectorInsertTableHandle insertTableHandle) { HiveInsertTableHandle handle = (HiveInsertTableHandle) insertTableHandle; WriteInfo writeInfo = getLocationService().getQueryWriteInfo(handle.getLocationHandle()); - if (writeInfo.writeMode() != STAGE_AND_MOVE_TO_TARGET_DIRECTORY) { + if (writeInfo.getWriteMode() != STAGE_AND_MOVE_TO_TARGET_DIRECTORY) { throw new AssertionError("writeMode is not STAGE_AND_MOVE_TO_TARGET_DIRECTORY"); } - return writeInfo.writePath(); + return writeInfo.getWritePath(); } - private Location getStagingPathRoot(ConnectorOutputTableHandle outputTableHandle) + protected Path getStagingPathRoot(ConnectorOutputTableHandle outputTableHandle) { HiveOutputTableHandle handle = (HiveOutputTableHandle) outputTableHandle; return getLocationService() .getQueryWriteInfo(handle.getLocationHandle()) - .writePath(); + .getWritePath(); } - private Location getTargetPathRoot(ConnectorInsertTableHandle insertTableHandle) + protected Path getTargetPathRoot(ConnectorInsertTableHandle insertTableHandle) { HiveInsertTableHandle hiveInsertTableHandle = (HiveInsertTableHandle) insertTableHandle; return getLocationService() .getQueryWriteInfo(hiveInsertTableHandle.getLocationHandle()) - .targetPath(); + .getTargetPath(); } protected Set listAllDataFiles(Transaction transaction, String schemaName, String tableName) @@ -4504,7 +4502,7 @@ protected Set listAllDataFiles(Transaction transaction, String schemaNam HdfsContext hdfsContext = new HdfsContext(newSession()); Set existingFiles = new HashSet<>(); for (String location : listAllDataPaths(transaction.getMetastore(), schemaName, tableName)) { - existingFiles.addAll(listAllDataFiles(hdfsContext, Location.of(location))); + existingFiles.addAll(listAllDataFiles(hdfsContext, new Path(location))); } return existingFiles; } @@ -4532,10 +4530,9 @@ public static List listAllDataPaths(SemiTransactionalHiveMetastore metas return locations.build(); } - protected Set listAllDataFiles(HdfsContext context, Location location) + protected Set listAllDataFiles(HdfsContext context, Path path) throws IOException { - Path path = new Path(location.toString()); Set result = new HashSet<>(); FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, path); if (fileSystem.exists(path)) { @@ -4547,7 +4544,7 @@ else if (fileStatus.isFile()) { result.add(fileStatus.getPath().toString()); } else if (fileStatus.isDirectory()) { - result.addAll(listAllDataFiles(context, Location.of(fileStatus.getPath().toString()))); + result.addAll(listAllDataFiles(context, fileStatus.getPath())); } } } @@ -4608,7 +4605,7 @@ private void doInsertIntoNewPartition(HiveStorageFormat storageFormat, SchemaTab } } - Location stagingPathRoot; + Path stagingPathRoot; try (Transaction transaction = newTransaction()) { ConnectorSession session = newSession(); ConnectorMetadata metadata = transaction.getMetadata(); @@ -4720,7 +4717,7 @@ private void doInsertIntoExistingPartition(HiveStorageFormat storageFormat, Sche // test rollback Set existingFiles; - Location stagingPathRoot; + Path stagingPathRoot; try (Transaction transaction = newTransaction()) { ConnectorMetadata metadata = transaction.getMetadata(); ConnectorSession session = newSession(); @@ -4867,8 +4864,8 @@ private String insertData(SchemaTableName tableName, MaterializedResult data) private String insertData(SchemaTableName tableName, MaterializedResult data, Map sessionProperties) throws Exception { - Location writePath; - Location targetPath; + Path writePath; + Path targetPath; String queryId; try (Transaction transaction = newTransaction()) { ConnectorMetadata metadata = transaction.getMetadata(); @@ -4893,8 +4890,8 @@ private String insertData(SchemaTableName tableName, MaterializedResult data, Ma // check that temporary files are removed if (!writePath.equals(targetPath)) { HdfsContext context = new HdfsContext(newSession()); - FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, new Path(writePath.toString())); - assertFalse(fileSystem.exists(new Path(writePath.toString()))); + FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, writePath); + assertFalse(fileSystem.exists(writePath)); } return queryId; @@ -5576,7 +5573,7 @@ protected void createEmptyTable( String tableName = schemaTableName.getTableName(); LocationService locationService = getLocationService(); - targetPath = new Path(locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName).toString()); + targetPath = locationService.forNewTable(transaction.getMetastore(), session, schemaName, tableName); ImmutableMap.Builder tableParamBuilder = ImmutableMap.builder() .put(PRESTO_VERSION_NAME, TEST_SERVER_VERSION) @@ -6029,8 +6026,8 @@ private void doTestTransactionDeleteInsert( Optional conflictTrigger) throws Exception { - Location writePath = null; - Location targetPath = null; + Path writePath = null; + Path targetPath = null; try (Transaction transaction = newTransaction()) { try { @@ -6094,8 +6091,8 @@ private void doTestTransactionDeleteInsert( // check that temporary files are removed if (writePath != null && !writePath.equals(targetPath)) { HdfsContext context = new HdfsContext(newSession()); - FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, new Path(writePath.toString())); - assertFalse(fileSystem.exists(new Path(writePath.toString()))); + FileSystem fileSystem = hdfsEnvironment.getFileSystem(context, writePath); + assertFalse(fileSystem.exists(writePath)); } try (Transaction transaction = newTransaction()) { @@ -6273,15 +6270,15 @@ protected class DirectoryRenameFailure @Override public void triggerConflict(ConnectorSession session, SchemaTableName tableName, ConnectorInsertTableHandle insertTableHandle, List partitionUpdates) { - Location writePath = getStagingPathRoot(insertTableHandle); - Location targetPath = getTargetPathRoot(insertTableHandle); + Path writePath = getStagingPathRoot(insertTableHandle); + Path targetPath = getTargetPathRoot(insertTableHandle); if (writePath.equals(targetPath)) { // This conflict does not apply. Trigger a rollback right away so that this test case passes. throw new TestingRollbackException(); } - path = new Path(targetPath.appendPath("pk1=b").appendPath("pk2=add2").toString()); + path = new Path(targetPath + "/pk1=b/pk2=add2"); context = new HdfsContext(session); - createDirectory(context, hdfsEnvironment, new Path(path.toString())); + createDirectory(context, hdfsEnvironment, path); } @Override diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index 2d6011c376d4..0dcf076500f1 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -579,8 +579,10 @@ private void createTable(SchemaTableName tableName, HiveStorageFormat storageFor // table, which fails without explicit configuration for file system. // We work around that by using a dummy location when creating the // table and update it here to the correct location. - Location location = locationService.getTableWriteInfo(((HiveOutputTableHandle) outputHandle).getLocationHandle(), false).targetPath(); - metastoreClient.updateTableLocation(database, tableName.getTableName(), location.toString()); + metastoreClient.updateTableLocation( + database, + tableName.getTableName(), + locationService.getTableWriteInfo(((HiveOutputTableHandle) outputHandle).getLocationHandle(), false).getTargetPath().toString()); } try (Transaction transaction = newTransaction()) { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveLocationService.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveLocationService.java index 1fdc4355b738..94593fbd6282 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveLocationService.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveLocationService.java @@ -14,11 +14,11 @@ package io.trino.plugin.hive; import com.google.common.collect.ImmutableList; -import io.trino.filesystem.Location; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.LocationService.WriteInfo; import io.trino.plugin.hive.TestBackgroundHiveSplitLoader.TestingHdfsEnvironment; import io.trino.spi.TrinoException; +import org.apache.hadoop.fs.Path; import org.testng.annotations.Test; import static io.trino.plugin.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY; @@ -32,21 +32,21 @@ public class TestHiveLocationService public void testGetTableWriteInfoAppend() { assertThat(locationHandle(STAGE_AND_MOVE_TO_TARGET_DIRECTORY), false) - .producesWriteInfo(writeInfo( - "/target", - "/write", + .producesWriteInfo(new WriteInfo( + new Path("/target"), + new Path("/write"), STAGE_AND_MOVE_TO_TARGET_DIRECTORY)); assertThat(locationHandle(DIRECT_TO_TARGET_EXISTING_DIRECTORY, "/target", "/target"), false) - .producesWriteInfo(writeInfo( - "/target", - "/target", + .producesWriteInfo(new WriteInfo( + new Path("/target"), + new Path("/target"), DIRECT_TO_TARGET_EXISTING_DIRECTORY)); assertThat(locationHandle(DIRECT_TO_TARGET_NEW_DIRECTORY, "/target", "/target"), false) - .producesWriteInfo(writeInfo( - "/target", - "/target", + .producesWriteInfo(new WriteInfo( + new Path("/target"), + new Path("/target"), DIRECT_TO_TARGET_NEW_DIRECTORY)); } @@ -54,7 +54,10 @@ public void testGetTableWriteInfoAppend() public void testGetTableWriteInfoOverwriteSuccess() { assertThat(locationHandle(STAGE_AND_MOVE_TO_TARGET_DIRECTORY), true) - .producesWriteInfo(writeInfo("/target", "/write", STAGE_AND_MOVE_TO_TARGET_DIRECTORY)); + .producesWriteInfo(new WriteInfo( + new Path("/target"), + new Path("/write"), + STAGE_AND_MOVE_TO_TARGET_DIRECTORY)); } @Test(expectedExceptions = TrinoException.class, expectedExceptionsMessageRegExp = "Overwriting unpartitioned table not supported when writing directly to target directory") @@ -87,9 +90,9 @@ public Assertion(LocationHandle locationHandle, boolean overwrite) public void producesWriteInfo(WriteInfo expected) { - assertEquals(actual.writePath(), expected.writePath()); - assertEquals(actual.targetPath(), expected.targetPath()); - assertEquals(actual.writeMode(), expected.writeMode()); + assertEquals(actual.getWritePath(), expected.getWritePath()); + assertEquals(actual.getTargetPath(), expected.getTargetPath()); + assertEquals(actual.getWriteMode(), expected.getWriteMode()); } } @@ -100,11 +103,6 @@ private static LocationHandle locationHandle(LocationHandle.WriteMode writeMode) private static LocationHandle locationHandle(LocationHandle.WriteMode writeMode, String targetPath, String writePath) { - return new LocationHandle(Location.of(targetPath), Location.of(writePath), writeMode); - } - - private static WriteInfo writeInfo(String targetPath, String writePath, LocationHandle.WriteMode writeMode) - { - return new WriteInfo(Location.of(targetPath), Location.of(writePath), writeMode); + return new LocationHandle(new Path(targetPath), new Path(writePath), writeMode); } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java index e8ed3c4f988a..4d1293b5e22f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java @@ -18,7 +18,6 @@ import com.google.common.collect.ImmutableMap; import io.airlift.json.JsonCodec; import io.airlift.slice.Slices; -import io.trino.filesystem.Location; import io.trino.operator.GroupByHashPageIndexerFactory; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.HiveMetastoreFactory; @@ -44,6 +43,7 @@ import io.trino.tpch.TpchColumnType; import io.trino.tpch.TpchColumnTypes; import io.trino.type.BlockTypeOperators; +import org.apache.hadoop.fs.Path; import org.testng.annotations.Test; import java.io.File; @@ -166,7 +166,7 @@ private static long writeTestFile(HiveConfig config, SortingFileWriterConfig sor { HiveTransactionHandle transaction = new HiveTransactionHandle(false); HiveWriterStats stats = new HiveWriterStats(); - ConnectorPageSink pageSink = createPageSink(transaction, config, sortingFileWriterConfig, metastore, Location.of("file:///" + outputPath), stats); + ConnectorPageSink pageSink = createPageSink(transaction, config, sortingFileWriterConfig, metastore, new Path("file:///" + outputPath), stats); List columns = getTestColumns(); List columnTypes = columns.stream() .map(LineItemColumn::getType) @@ -278,7 +278,7 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa return provider.createPageSource(transaction, getHiveSession(config), split, table, ImmutableList.copyOf(getColumnHandles()), DynamicFilter.EMPTY); } - private static ConnectorPageSink createPageSink(HiveTransactionHandle transaction, HiveConfig config, SortingFileWriterConfig sortingFileWriterConfig, HiveMetastore metastore, Location outputPath, HiveWriterStats stats) + private static ConnectorPageSink createPageSink(HiveTransactionHandle transaction, HiveConfig config, SortingFileWriterConfig sortingFileWriterConfig, HiveMetastore metastore, Path outputPath, HiveWriterStats stats) { LocationHandle locationHandle = new LocationHandle(outputPath, outputPath, DIRECT_TO_TARGET_NEW_DIRECTORY); HiveOutputTableHandle handle = new HiveOutputTableHandle( diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveS3AndGlueMetastoreTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveS3AndGlueMetastoreTest.java index e0f2b5d797af..b9631adefed8 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveS3AndGlueMetastoreTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveS3AndGlueMetastoreTest.java @@ -129,7 +129,8 @@ public void testBasicOperationsWithProvidedTableLocation(boolean partitioned, St assertQuery("SELECT * FROM " + tableName, "VALUES ('str1', 1), ('str2', 2), ('str3', 3)"); String actualTableLocation = getTableLocation(tableName); - assertThat(actualTableLocation).isEqualTo(location); + String expectedLocation = location.endsWith("/") ? location.substring(0, location.length() - 1) : location; + assertThat(actualTableLocation).isEqualTo(expectedLocation); assertUpdate("INSERT INTO " + tableName + " VALUES ('str4', 4)", 1); assertQuery("SELECT * FROM " + tableName, "VALUES ('str1', 1), ('str2', 2), ('str3', 3), ('str4', 4)");