diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java index ae3ba579bc8c..38abf499425b 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java @@ -171,7 +171,11 @@ public static HiveSessionProperties getHiveSessionProperties(HiveConfig hiveConf public static Set getDefaultHivePageSourceFactories(HdfsEnvironment hdfsEnvironment, HiveConfig hiveConfig) { - TrinoFileSystemFactory fileSystemFactory = new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS); + return getDefaultHivePageSourceFactories(new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), hiveConfig); + } + + public static Set getDefaultHivePageSourceFactories(TrinoFileSystemFactory fileSystemFactory, HiveConfig hiveConfig) + { FileFormatDataSourceStats stats = new FileFormatDataSourceStats(); return ImmutableSet.builder() .add(new CsvPageSourceFactory(fileSystemFactory, hiveConfig)) @@ -189,7 +193,11 @@ public static Set getDefaultHivePageSourceFactories(HdfsE public static Set getDefaultHiveFileWriterFactories(HiveConfig hiveConfig, HdfsEnvironment hdfsEnvironment) { - TrinoFileSystemFactory fileSystemFactory = new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS); + return getDefaultHiveFileWriterFactories(hiveConfig, new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS)); + } + + public static Set getDefaultHiveFileWriterFactories(HiveConfig hiveConfig, TrinoFileSystemFactory fileSystemFactory) + { NodeVersion nodeVersion = new NodeVersion("test_version"); return ImmutableSet.builder() .add(new CsvFileWriterFactory(fileSystemFactory, TESTING_TYPE_MANAGER)) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java index a791e93b8ae2..0d4191f21fc9 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java @@ -16,19 +16,19 @@ import com.google.common.collect.AbstractIterator; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.ListMultimap; +import com.google.common.io.Resources; import io.airlift.stats.CounterStat; import io.airlift.units.DataSize; import io.airlift.units.Duration; +import io.trino.filesystem.FileEntry; +import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; -import io.trino.filesystem.hdfs.HdfsFileSystemFactory; -import io.trino.hdfs.DynamicHdfsConfiguration; -import io.trino.hdfs.HdfsConfig; -import io.trino.hdfs.HdfsConfigurationInitializer; -import io.trino.hdfs.HdfsEnvironment; -import io.trino.hdfs.authentication.NoHdfsAuthentication; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoOutputFile; +import io.trino.filesystem.memory.MemoryFileSystemFactory; import io.trino.plugin.hive.HiveColumnHandle.ColumnType; import io.trino.plugin.hive.fs.CachingDirectoryLister; import io.trino.plugin.hive.fs.DirectoryLister; @@ -48,30 +48,17 @@ import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.ConnectorIdentity; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat; -import org.apache.hadoop.util.Progressable; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.parallel.Execution; -import java.io.File; import java.io.IOException; -import java.net.URI; -import java.nio.file.Files; -import java.nio.file.Paths; +import java.io.OutputStream; +import java.time.Instant; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -88,9 +75,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.base.Throwables.throwIfUnchecked; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.io.MoreFiles.deleteRecursively; -import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static com.google.common.io.Resources.getResource; import static io.airlift.concurrent.MoreFutures.unmodifiableFuture; import static io.airlift.concurrent.Threads.daemonThreadsNamed; @@ -107,8 +91,6 @@ import static io.trino.plugin.hive.HiveStorageFormat.AVRO; import static io.trino.plugin.hive.HiveStorageFormat.CSV; import static io.trino.plugin.hive.HiveStorageFormat.ORC; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static io.trino.plugin.hive.HiveTestUtils.getHiveSession; import static io.trino.plugin.hive.HiveTimestampPrecision.DEFAULT_PRECISION; @@ -116,6 +98,7 @@ import static io.trino.plugin.hive.HiveType.HIVE_STRING; import static io.trino.plugin.hive.TableType.MANAGED_TABLE; import static io.trino.plugin.hive.util.HiveBucketing.BucketingVersion.BUCKETING_V1; +import static io.trino.plugin.hive.util.HiveClassNames.SYMLINK_TEXT_INPUT_FORMAT_CLASS; import static io.trino.plugin.hive.util.HiveUtil.getRegularColumnHandles; import static io.trino.plugin.hive.util.SerdeConstants.SERIALIZATION_LIB; import static io.trino.spi.predicate.TupleDomain.withColumnDomains; @@ -142,31 +125,19 @@ public class TestBackgroundHiveSplitLoader { private static final int BUCKET_COUNT = 2; - private static final String SAMPLE_PATH = "hdfs://VOL1:9000/db_name/table_name/000000_0"; - private static final String SAMPLE_PATH_FILTERED = "hdfs://VOL1:9000/db_name/table_name/000000_1"; + private static final Location LOCATION = Location.of("memory:///db_name/table_name/000000_0"); + private static final Location FILTERED_LOCATION = Location.of("memory:///db_name/table_name/000000_1"); - private static final Path RETURNED_PATH = new Path(SAMPLE_PATH); - private static final Path FILTERED_PATH = new Path(SAMPLE_PATH_FILTERED); + private static final TupleDomain LOCATION_DOMAIN = withColumnDomains(Map.of(pathColumnHandle(), Domain.singleValue(VARCHAR, utf8Slice(LOCATION.toString())))); - private static final TupleDomain RETURNED_PATH_DOMAIN = withColumnDomains( - ImmutableMap.of( - pathColumnHandle(), - Domain.singleValue(VARCHAR, utf8Slice(RETURNED_PATH.toString())))); + private static final List TEST_LOCATIONS = List.of(LOCATION, FILTERED_LOCATION); - private static final List TEST_FILES = ImmutableList.of( - locatedFileStatus(RETURNED_PATH), - locatedFileStatus(FILTERED_PATH)); + private static final List PARTITION_COLUMNS = List.of(new Column("partitionColumn", HIVE_INT, Optional.empty(), Map.of())); + private static final List BUCKET_COLUMN_HANDLES = List.of(createBaseColumn("col1", 0, HIVE_INT, INTEGER, ColumnType.REGULAR, Optional.empty())); - private static final List PARTITION_COLUMNS = ImmutableList.of( - new Column("partitionColumn", HIVE_INT, Optional.empty())); - private static final List BUCKET_COLUMN_HANDLES = ImmutableList.of( - createBaseColumn("col1", 0, HIVE_INT, INTEGER, ColumnType.REGULAR, Optional.empty())); - - private static final Optional BUCKET_PROPERTY = Optional.of( - new HiveBucketProperty(ImmutableList.of("col1"), BUCKETING_V1, BUCKET_COUNT, ImmutableList.of())); - - private static final Table SIMPLE_TABLE = table(ImmutableList.of(), Optional.empty(), ImmutableMap.of()); - private static final Table PARTITIONED_TABLE = table(PARTITION_COLUMNS, BUCKET_PROPERTY, ImmutableMap.of()); + private static final String TABLE_PATH = "memory:///db_name/table_name"; + private static final Table SIMPLE_TABLE = table(TABLE_PATH, List.of(), Optional.empty(), Map.of()); + private static final Table PARTITIONED_TABLE = table(TABLE_PATH, PARTITION_COLUMNS, Optional.of(new HiveBucketProperty(List.of("col1"), BUCKETING_V1, BUCKET_COUNT, List.of())), Map.of()); private final ExecutorService executor = newCachedThreadPool(daemonThreadsNamed(getClass().getSimpleName() + "-%s")); @@ -180,9 +151,7 @@ public void tearDown() public void testNoPathFilter() throws Exception { - BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - TEST_FILES, - TupleDomain.none()); + BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(TEST_LOCATIONS, TupleDomain.none()); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); backgroundHiveSplitLoader.start(hiveSplitSource); @@ -194,28 +163,31 @@ public void testNoPathFilter() public void testCsv() throws Exception { - DataSize fileSize = DataSize.of(2, GIGABYTE); - assertSplitCount(CSV, ImmutableMap.of(), fileSize, 33); - assertSplitCount(CSV, ImmutableMap.of("skip.header.line.count", "1"), fileSize, 33); - assertSplitCount(CSV, ImmutableMap.of("skip.header.line.count", "2"), fileSize, 1); - assertSplitCount(CSV, ImmutableMap.of("skip.footer.line.count", "1"), fileSize, 1); - assertSplitCount(CSV, ImmutableMap.of("skip.header.line.count", "1", "skip.footer.line.count", "1"), fileSize, 1); + FileEntry file = new FileEntry(LOCATION, DataSize.of(2, GIGABYTE).toBytes(), Instant.now(), Optional.empty()); + assertCsvSplitCount(file, Map.of(), 33); + assertCsvSplitCount(file, Map.of("skip.header.line.count", "1"), 33); + assertCsvSplitCount(file, Map.of("skip.header.line.count", "2"), 1); + assertCsvSplitCount(file, Map.of("skip.footer.line.count", "1"), 1); + assertCsvSplitCount(file, Map.of("skip.header.line.count", "1", "skip.footer.line.count", "1"), 1); } - private void assertSplitCount(HiveStorageFormat storageFormat, Map tableProperties, DataSize fileSize, int expectedSplitCount) + private void assertCsvSplitCount(FileEntry file, Map tableProperties, int expectedSplitCount) throws Exception { Table table = table( - ImmutableList.of(), + TABLE_PATH, + List.of(), Optional.empty(), - ImmutableMap.copyOf(tableProperties), - StorageFormat.fromHiveStorageFormat(storageFormat)); + Map.copyOf(tableProperties), + StorageFormat.fromHiveStorageFormat(CSV)); + TrinoFileSystemFactory fileSystemFactory = new ListSingleFileFileSystemFactory(file); BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), fileSize.toBytes())), + fileSystemFactory, TupleDomain.all(), Optional.empty(), table, + Optional.empty(), Optional.empty()); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); @@ -229,14 +201,14 @@ public void testPathFilter() throws Exception { BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - TEST_FILES, - RETURNED_PATH_DOMAIN); + TEST_LOCATIONS, + LOCATION_DOMAIN); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); backgroundHiveSplitLoader.start(hiveSplitSource); List paths = drain(hiveSplitSource); assertEquals(paths.size(), 1); - assertEquals(paths.get(0), RETURNED_PATH.toString()); + assertEquals(paths.get(0), LOCATION.toString()); } @Test @@ -244,17 +216,17 @@ public void testPathFilterOneBucketMatchPartitionedTable() throws Exception { BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - TEST_FILES, - RETURNED_PATH_DOMAIN, - Optional.of(new HiveBucketFilter(ImmutableSet.of(0, 1))), + TEST_LOCATIONS, + LOCATION_DOMAIN, + Optional.of(new HiveBucketFilter(Set.of(0, 1))), PARTITIONED_TABLE, - Optional.of(new HiveBucketHandle(BUCKET_COLUMN_HANDLES, BUCKETING_V1, BUCKET_COUNT, BUCKET_COUNT, ImmutableList.of()))); + Optional.of(new HiveBucketHandle(BUCKET_COLUMN_HANDLES, BUCKETING_V1, BUCKET_COUNT, BUCKET_COUNT, List.of()))); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); backgroundHiveSplitLoader.start(hiveSplitSource); List paths = drain(hiveSplitSource); assertEquals(paths.size(), 1); - assertEquals(paths.get(0), RETURNED_PATH.toString()); + assertEquals(paths.get(0), LOCATION.toString()); } @Test @@ -262,8 +234,8 @@ public void testPathFilterBucketedPartitionedTable() throws Exception { BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - TEST_FILES, - RETURNED_PATH_DOMAIN, + TEST_LOCATIONS, + LOCATION_DOMAIN, Optional.empty(), PARTITIONED_TABLE, Optional.of( @@ -272,22 +244,30 @@ public void testPathFilterBucketedPartitionedTable() BUCKETING_V1, BUCKET_COUNT, BUCKET_COUNT, - ImmutableList.of()))); + List.of()))); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); backgroundHiveSplitLoader.start(hiveSplitSource); List paths = drain(hiveSplitSource); assertEquals(paths.size(), 1); - assertEquals(paths.get(0), RETURNED_PATH.toString()); + assertEquals(paths.get(0), LOCATION.toString()); } @Test public void testEmptyFileWithNoBlocks() throws Exception { + TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + // create an empty file + fileSystemFactory.create(ConnectorIdentity.ofUser("test")).newOutputFile(LOCATION).create().close(); + BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - ImmutableList.of(locatedFileStatusWithNoBlocks(RETURNED_PATH)), - TupleDomain.none()); + fileSystemFactory, + TupleDomain.none(), + Optional.empty(), + SIMPLE_TABLE, + Optional.empty(), + Optional.empty()); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); backgroundHiveSplitLoader.start(hiveSplitSource); @@ -298,6 +278,7 @@ public void testEmptyFileWithNoBlocks() @Test public void testNoHangIfPartitionIsOffline() + throws IOException { BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoaderOfflinePartitions(); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); @@ -322,7 +303,7 @@ public void testIncompleteDynamicFilterTimeout() @Override public Set getColumnsCovered() { - return ImmutableSet.of(); + return Set.of(); } @Override @@ -368,7 +349,7 @@ public TupleDomain getCurrentPredicate() public void testCachedDirectoryLister() throws Exception { - CachingDirectoryLister cachingDirectoryLister = new CachingDirectoryLister(new Duration(5, TimeUnit.MINUTES), DataSize.of(100, KILOBYTE), ImmutableList.of("test_dbname.test_table")); + CachingDirectoryLister cachingDirectoryLister = new CachingDirectoryLister(new Duration(5, TimeUnit.MINUTES), DataSize.of(100, KILOBYTE), List.of("test_dbname.test_table")); assertEquals(cachingDirectoryLister.getRequestCount(), 0); int totalCount = 100; @@ -376,7 +357,7 @@ public void testCachedDirectoryLister() List>> futures = new ArrayList<>(); futures.add(executor.submit(() -> { - BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(TEST_FILES, cachingDirectoryLister); + BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(TEST_LOCATIONS, cachingDirectoryLister); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); backgroundHiveSplitLoader.start(hiveSplitSource); try { @@ -390,7 +371,7 @@ public void testCachedDirectoryLister() for (int i = 0; i < totalCount - 1; i++) { futures.add(executor.submit(() -> { firstVisit.await(); - BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(TEST_FILES, cachingDirectoryLister); + BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(TEST_LOCATIONS, cachingDirectoryLister); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); backgroundHiveSplitLoader.start(hiveSplitSource); return drainSplits(hiveSplitSource); @@ -398,7 +379,7 @@ public void testCachedDirectoryLister() } for (Future> future : futures) { - assertEquals(future.get().size(), TEST_FILES.size()); + assertEquals(future.get().size(), TEST_LOCATIONS.size()); } assertEquals(cachingDirectoryLister.getRequestCount(), totalCount); assertEquals(cachingDirectoryLister.getHitCount(), totalCount - 1); @@ -448,6 +429,7 @@ public void testGetAttemptId() @Test @Timeout(60) public void testPropagateException() + throws IOException { testPropagateException(false, 1); testPropagateException(true, 1); @@ -458,10 +440,11 @@ public void testPropagateException() } private void testPropagateException(boolean error, int threads) + throws IOException { AtomicBoolean iteratorUsedAfterException = new AtomicBoolean(); - HdfsEnvironment hdfsEnvironment = new TestingHdfsEnvironment(TEST_FILES); + TrinoFileSystemFactory fileSystemFactory = createTestingFileSystem(TEST_LOCATIONS); BackgroundHiveSplitLoader backgroundHiveSplitLoader = new BackgroundHiveSplitLoader( SIMPLE_TABLE, new Iterator<>() @@ -492,7 +475,7 @@ public HivePartitionMetadata next() TESTING_TYPE_MANAGER, createBucketSplitInfo(Optional.empty(), Optional.empty()), SESSION, - new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), + fileSystemFactory, new CachingDirectoryLister(new HiveConfig()), executor, threads, @@ -520,12 +503,14 @@ public HivePartitionMetadata next() public void testMultipleSplitsPerBucket() throws Exception { + TrinoFileSystemFactory fileSystemFactory = new ListSingleFileFileSystemFactory(new FileEntry(LOCATION, DataSize.of(1, GIGABYTE).toBytes(), Instant.now(), Optional.empty())); BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - ImmutableList.of(locatedFileStatus(new Path(SAMPLE_PATH), DataSize.of(1, GIGABYTE).toBytes())), + fileSystemFactory, TupleDomain.all(), Optional.empty(), SIMPLE_TABLE, - Optional.of(new HiveBucketHandle(BUCKET_COLUMN_HANDLES, BUCKETING_V1, BUCKET_COUNT, BUCKET_COUNT, ImmutableList.of()))); + Optional.of(new HiveBucketHandle(BUCKET_COLUMN_HANDLES, BUCKETING_V1, BUCKET_COUNT, BUCKET_COUNT, List.of())), + Optional.empty()); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); backgroundHiveSplitLoader.start(hiveSplitSource); @@ -537,27 +522,28 @@ public void testMultipleSplitsPerBucket() public void testSplitsGenerationWithAbortedTransactions() throws Exception { - java.nio.file.Path tablePath = Files.createTempDirectory("TestBackgroundHiveSplitLoader"); + TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + TrinoFileSystem fileSystem = fileSystemFactory.create(ConnectorIdentity.ofUser("test")); + Location tableLocation = Location.of("memory:///my_table"); + Table table = table( - tablePath.toString(), - ImmutableList.of(), + tableLocation.toString(), + List.of(), Optional.empty(), - ImmutableMap.of( + Map.of( "transactional", "true", "transactional_properties", "insert_only")); - List filePaths = ImmutableList.of( - tablePath + "/delta_0000001_0000001_0000/_orc_acid_version", - tablePath + "/delta_0000001_0000001_0000/bucket_00000", - tablePath + "/delta_0000002_0000002_0000/_orc_acid_version", - tablePath + "/delta_0000002_0000002_0000/bucket_00000", - tablePath + "/delta_0000003_0000003_0000/_orc_acid_version", - tablePath + "/delta_0000003_0000003_0000/bucket_00000"); - - for (String path : filePaths) { - File file = new File(path); - assertTrue(file.getParentFile().exists() || file.getParentFile().mkdirs(), "Failed creating directory " + file.getParentFile()); - createOrcAcidFile(file); + List fileLocations = List.of( + tableLocation.appendPath("delta_0000001_0000001_0000/_orc_acid_version"), + tableLocation.appendPath("delta_0000001_0000001_0000/bucket_00000"), + tableLocation.appendPath("delta_0000002_0000002_0000/_orc_acid_version"), + tableLocation.appendPath("delta_0000002_0000002_0000/bucket_00000"), + tableLocation.appendPath("delta_0000003_0000003_0000/_orc_acid_version"), + tableLocation.appendPath("delta_0000003_0000003_0000/bucket_00000")); + + for (Location fileLocation : fileLocations) { + createOrcAcidFile(fileSystem, fileLocation); } // ValidWriteIdList is of format $.:::: @@ -565,7 +551,7 @@ public void testSplitsGenerationWithAbortedTransactions() String validWriteIdsList = format("4$%s.%s:3:9223372036854775807::2", table.getDatabaseName(), table.getTableName()); BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - HDFS_ENVIRONMENT, + fileSystemFactory, TupleDomain.none(), Optional.empty(), table, @@ -575,41 +561,41 @@ public void testSplitsGenerationWithAbortedTransactions() HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); backgroundHiveSplitLoader.start(hiveSplitSource); List splits = drain(hiveSplitSource); - assertTrue(splits.stream().anyMatch(p -> p.contains(filePaths.get(1))), format("%s not found in splits %s", filePaths.get(1), splits)); - assertTrue(splits.stream().anyMatch(p -> p.contains(filePaths.get(5))), format("%s not found in splits %s", filePaths.get(5), splits)); - - deleteRecursively(tablePath, ALLOW_INSECURE); + assertThat(splits).contains(fileLocations.get(1).toString()); + assertThat(splits).contains(fileLocations.get(5).toString()); } @Test public void testFullAcidTableWithOriginalFiles() throws Exception { - java.nio.file.Path tablePath = Files.createTempDirectory("TestBackgroundHiveSplitLoader"); + TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + TrinoFileSystem fileSystem = fileSystemFactory.create(ConnectorIdentity.ofUser("test")); + Location tableLocation = Location.of("memory:///my_table"); + Table table = table( - tablePath.toString(), - ImmutableList.of(), + tableLocation.toString(), + List.of(), Optional.empty(), - ImmutableMap.of("transactional", "true")); - - String originalFile = tablePath + "/000000_1"; - List filePaths = ImmutableList.of( - tablePath + "/delta_0000002_0000002_0000/_orc_acid_version", - tablePath + "/delta_0000002_0000002_0000/bucket_00000"); + Map.of("transactional", "true")); - for (String path : filePaths) { - File file = new File(path); - assertTrue(file.getParentFile().exists() || file.getParentFile().mkdirs(), "Failed creating directory " + file.getParentFile()); - createOrcAcidFile(file); + Location originalFile = tableLocation.appendPath("000000_1"); + try (OutputStream outputStream = fileSystem.newOutputFile(originalFile).create()) { + outputStream.write("test".getBytes(UTF_8)); + } + List fileLocations = List.of( + tableLocation.appendPath("delta_0000002_0000002_0000/_orc_acid_version"), + tableLocation.appendPath("delta_0000002_0000002_0000/bucket_00000")); + for (Location fileLocation : fileLocations) { + createOrcAcidFile(fileSystem, fileLocation); } - Files.write(Paths.get(originalFile), "test".getBytes(UTF_8)); // ValidWriteIdsList is of format $.
:::: // This writeId list has high watermark transaction=3 ValidWriteIdList validWriteIdsList = new ValidWriteIdList(format("4$%s.%s:3:9223372036854775807::", table.getDatabaseName(), table.getTableName())); BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - HDFS_ENVIRONMENT, + fileSystemFactory, TupleDomain.all(), Optional.empty(), table, @@ -618,30 +604,31 @@ public void testFullAcidTableWithOriginalFiles() HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); backgroundHiveSplitLoader.start(hiveSplitSource); List splits = drain(hiveSplitSource); - assertTrue(splits.stream().anyMatch(p -> p.contains(originalFile)), format("%s not found in splits %s", filePaths.get(0), splits)); - assertTrue(splits.stream().anyMatch(p -> p.contains(filePaths.get(1))), format("%s not found in splits %s", filePaths.get(1), splits)); + assertThat(splits).contains(originalFile.toString()); + assertThat(splits).contains(fileLocations.get(1).toString()); } @Test public void testVersionValidationNoOrcAcidVersionFile() throws Exception { - java.nio.file.Path tablePath = Files.createTempDirectory("TestBackgroundHiveSplitLoader"); + TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + TrinoFileSystem fileSystem = fileSystemFactory.create(ConnectorIdentity.ofUser("test")); + Location tableLocation = Location.of("memory:///my_table"); + Table table = table( - tablePath.toString(), - ImmutableList.of(), + tableLocation.toString(), + List.of(), Optional.empty(), - ImmutableMap.of("transactional", "true")); + Map.of("transactional", "true")); - List filePaths = ImmutableList.of( - tablePath + "/000000_1", + List fileLocations = List.of( + tableLocation.appendPath("000000_1"), // no /delta_0000002_0000002_0000/_orc_acid_version file - tablePath + "/delta_0000002_0000002_0000/bucket_00000"); + tableLocation.appendPath("delta_0000002_0000002_0000/bucket_00000")); - for (String path : filePaths) { - File file = new File(path); - assertTrue(file.getParentFile().exists() || file.getParentFile().mkdirs(), "Failed creating directory " + file.getParentFile()); - createOrcAcidFile(file); + for (Location fileLocation : fileLocations) { + createOrcAcidFile(fileSystem, fileLocation); } // ValidWriteIdsList is of format $.
:::: @@ -649,7 +636,7 @@ public void testVersionValidationNoOrcAcidVersionFile() ValidWriteIdList validWriteIdsList = new ValidWriteIdList(format("4$%s.%s:3:9223372036854775807::", table.getDatabaseName(), table.getTableName())); BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - HDFS_ENVIRONMENT, + fileSystemFactory, TupleDomain.all(), Optional.empty(), table, @@ -664,30 +651,29 @@ public void testVersionValidationNoOrcAcidVersionFile() .allMatch(Optional::isPresent) .extracting(Optional::get) .noneMatch(AcidInfo::isOrcAcidVersionValidated); - - deleteRecursively(tablePath, ALLOW_INSECURE); } @Test public void testVersionValidationOrcAcidVersionFileHasVersion2() throws Exception { - java.nio.file.Path tablePath = Files.createTempDirectory("TestBackgroundHiveSplitLoader"); + TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + TrinoFileSystem fileSystem = fileSystemFactory.create(ConnectorIdentity.ofUser("test")); + Location tableLocation = Location.of("memory:///my_table"); + Table table = table( - tablePath.toString(), - ImmutableList.of(), + tableLocation.toString(), + List.of(), Optional.empty(), - ImmutableMap.of("transactional", "true")); + Map.of("transactional", "true")); - List filePaths = ImmutableList.of( - tablePath + "/000000_1", // _orc_acid_version does not exist so it's assumed to be "ORC ACID version 0" - tablePath + "/delta_0000002_0000002_0000/_orc_acid_version", - tablePath + "/delta_0000002_0000002_0000/bucket_00000"); + List fileLocations = List.of( + tableLocation.appendPath("000000_1"), // _orc_acid_version does not exist, so it's assumed to be "ORC ACID version 0" + tableLocation.appendPath("delta_0000002_0000002_0000/_orc_acid_version"), + tableLocation.appendPath("delta_0000002_0000002_0000/bucket_00000")); - for (String path : filePaths) { - File file = new File(path); - assertTrue(file.getParentFile().exists() || file.getParentFile().mkdirs(), "Failed creating directory " + file.getParentFile()); - createOrcAcidFile(file, 2); + for (Location fileLocation : fileLocations) { + createOrcAcidFile(fileSystem, fileLocation, 2); } // ValidWriteIdsList is of format $.
:::: @@ -695,7 +681,7 @@ public void testVersionValidationOrcAcidVersionFileHasVersion2() ValidWriteIdList validWriteIdsList = new ValidWriteIdList(format("4$%s.%s:3:9223372036854775807::", table.getDatabaseName(), table.getTableName())); BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - HDFS_ENVIRONMENT, + fileSystemFactory, TupleDomain.all(), Optional.empty(), table, @@ -708,31 +694,30 @@ public void testVersionValidationOrcAcidVersionFileHasVersion2() // We should have it marked in all splits that NO further ORC ACID validation is required assertThat(drainSplits(hiveSplitSource)).extracting(HiveSplit::getAcidInfo) .allMatch(acidInfo -> acidInfo.isEmpty() || acidInfo.get().isOrcAcidVersionValidated()); - - deleteRecursively(tablePath, ALLOW_INSECURE); } @Test public void testVersionValidationOrcAcidVersionFileHasVersion1() throws Exception { - java.nio.file.Path tablePath = Files.createTempDirectory("TestBackgroundHiveSplitLoader"); + TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + TrinoFileSystem fileSystem = fileSystemFactory.create(ConnectorIdentity.ofUser("test")); + Location tableLocation = Location.of("memory:///my_table"); + Table table = table( - tablePath.toString(), - ImmutableList.of(), + tableLocation.toString(), + List.of(), Optional.empty(), - ImmutableMap.of("transactional", "true")); + Map.of("transactional", "true")); - List filePaths = ImmutableList.of( - tablePath + "/000000_1", - tablePath + "/delta_0000002_0000002_0000/_orc_acid_version", - tablePath + "/delta_0000002_0000002_0000/bucket_00000"); + List fileLocations = List.of( + tableLocation.appendPath("000000_1"), + tableLocation.appendPath("delta_0000002_0000002_0000/_orc_acid_version"), + tableLocation.appendPath("delta_0000002_0000002_0000/bucket_00000")); - for (String path : filePaths) { - File file = new File(path); - assertTrue(file.getParentFile().exists() || file.getParentFile().mkdirs(), "Failed creating directory " + file.getParentFile()); + for (Location fileLocation : fileLocations) { // _orc_acid_version_exists but has version 1 - createOrcAcidFile(file, 1); + createOrcAcidFile(fileSystem, fileLocation, 1); } // ValidWriteIdsList is of format $.
:::: @@ -740,7 +725,7 @@ public void testVersionValidationOrcAcidVersionFileHasVersion1() ValidWriteIdList validWriteIdsList = new ValidWriteIdList(format("4$%s.%s:3:9223372036854775807::", table.getDatabaseName(), table.getTableName())); BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - HDFS_ENVIRONMENT, + fileSystemFactory, TupleDomain.all(), Optional.empty(), table, @@ -755,8 +740,6 @@ public void testVersionValidationOrcAcidVersionFileHasVersion1() .allMatch(Optional::isPresent) .extracting(Optional::get) .noneMatch(AcidInfo::isOrcAcidVersionValidated); - - deleteRecursively(tablePath, ALLOW_INSECURE); } @Test @@ -791,24 +774,22 @@ public void testValidateFileBuckets() @Test public void testBuildManifestFileIterator() + throws IOException { - CachingDirectoryLister directoryLister = new CachingDirectoryLister(new Duration(0, TimeUnit.MINUTES), DataSize.ofBytes(0), ImmutableList.of()); + CachingDirectoryLister directoryLister = new CachingDirectoryLister(new Duration(0, TimeUnit.MINUTES), DataSize.ofBytes(0), List.of()); Properties schema = new Properties(); - schema.setProperty(FILE_INPUT_FORMAT, SymlinkTextInputFormat.class.getName()); + schema.setProperty(FILE_INPUT_FORMAT, SYMLINK_TEXT_INPUT_FORMAT_CLASS); schema.setProperty(SERIALIZATION_LIB, AVRO.getSerde()); - Location firstFilePath = Location.of("hdfs://VOL1:9000/db_name/table_name/file1"); - Location secondFilePath = Location.of("hdfs://VOL1:9000/db_name/table_name/file2"); - List locations = ImmutableList.of(firstFilePath, secondFilePath); - List files = locations.stream() - .map(TestBackgroundHiveSplitLoader::locatedFileStatus) - .collect(toImmutableList()); + Location firstFilePath = Location.of("memory:///db_name/table_name/file1"); + Location secondFilePath = Location.of("memory:///db_name/table_name/file2"); + List locations = List.of(firstFilePath, secondFilePath); InternalHiveSplitFactory splitFactory = new InternalHiveSplitFactory( "partition", AVRO, schema, - ImmutableList.of(), + List.of(), TupleDomain.all(), () -> true, TableToPartitionMapping.empty(), @@ -818,11 +799,11 @@ public void testBuildManifestFileIterator() false, Optional.empty()); BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - files, + locations, directoryLister); Iterator splitIterator = backgroundHiveSplitLoader.buildManifestFileIterator( splitFactory, - Location.of("hdfs://VOL1:9000/db_name/table_name"), + Location.of(TABLE_PATH), locations, true); List splits = ImmutableList.copyOf(splitIterator); @@ -833,24 +814,22 @@ public void testBuildManifestFileIterator() @Test public void testBuildManifestFileIteratorNestedDirectory() + throws IOException { - CachingDirectoryLister directoryLister = new CachingDirectoryLister(new Duration(5, TimeUnit.MINUTES), DataSize.of(100, KILOBYTE), ImmutableList.of()); + CachingDirectoryLister directoryLister = new CachingDirectoryLister(new Duration(5, TimeUnit.MINUTES), DataSize.of(100, KILOBYTE), List.of()); Properties schema = new Properties(); - schema.setProperty(FILE_INPUT_FORMAT, SymlinkTextInputFormat.class.getName()); + schema.setProperty("file.inputformat", SYMLINK_TEXT_INPUT_FORMAT_CLASS); schema.setProperty(SERIALIZATION_LIB, AVRO.getSerde()); - Location filePath = Location.of("hdfs://VOL1:9000/db_name/table_name/file1"); - Location directoryPath = Location.of("hdfs://VOL1:9000/db_name/table_name/dir/file2"); - List locations = ImmutableList.of(filePath, directoryPath); - List files = ImmutableList.of( - locatedFileStatus(filePath), - locatedFileStatus(directoryPath)); + Location filePath = Location.of("memory:///db_name/table_name/file1"); + Location directoryPath = Location.of("memory:///db_name/table_name/dir/file2"); + List locations = List.of(filePath, directoryPath); InternalHiveSplitFactory splitFactory = new InternalHiveSplitFactory( "partition", AVRO, schema, - ImmutableList.of(), + List.of(), TupleDomain.all(), () -> true, TableToPartitionMapping.empty(), @@ -861,11 +840,11 @@ public void testBuildManifestFileIteratorNestedDirectory() Optional.empty()); BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - files, + locations, directoryLister); Iterator splitIterator = backgroundHiveSplitLoader.buildManifestFileIterator( splitFactory, - Location.of("hdfs://VOL1:9000/db_name/table_name"), + Location.of(TABLE_PATH), locations, false); List splits = ImmutableList.copyOf(splitIterator); @@ -878,12 +857,12 @@ public void testBuildManifestFileIteratorNestedDirectory() public void testMaxPartitions() throws Exception { - CachingDirectoryLister directoryLister = new CachingDirectoryLister(new Duration(0, TimeUnit.MINUTES), DataSize.ofBytes(0), ImmutableList.of()); + CachingDirectoryLister directoryLister = new CachingDirectoryLister(new Duration(0, TimeUnit.MINUTES), DataSize.ofBytes(0), List.of()); // zero partitions { BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - ImmutableList.of(), - ImmutableList.of(), + List.of(), + List.of(), directoryLister, 0); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); @@ -894,21 +873,21 @@ public void testMaxPartitions() // single partition, not crossing the limit { BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - ImmutableList.of(createPartitionMetadata()), - TEST_FILES, + List.of(createPartitionMetadata()), + TEST_LOCATIONS, directoryLister, 1); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); backgroundHiveSplitLoader.start(hiveSplitSource); - assertThat(drainSplits(hiveSplitSource)).hasSize(TEST_FILES.size()); + assertThat(drainSplits(hiveSplitSource)).hasSize(TEST_LOCATIONS.size()); } // single partition, crossing the limit { int partitionLimit = 0; BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - ImmutableList.of(createPartitionMetadata()), - TEST_FILES, + List.of(createPartitionMetadata()), + TEST_LOCATIONS, directoryLister, partitionLimit); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); @@ -924,23 +903,23 @@ public void testMaxPartitions() // multiple partitions, not crossing the limit { int partitionLimit = 3; - List partitions = ImmutableList.of(createPartitionMetadata(), createPartitionMetadata(), createPartitionMetadata()); + List partitions = List.of(createPartitionMetadata(), createPartitionMetadata(), createPartitionMetadata()); BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( partitions, - TEST_FILES, + TEST_LOCATIONS, directoryLister, partitionLimit); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); backgroundHiveSplitLoader.start(hiveSplitSource); - assertThat(drainSplits(hiveSplitSource)).hasSize(TEST_FILES.size() * partitions.size()); + assertThat(drainSplits(hiveSplitSource)).hasSize(TEST_LOCATIONS.size() * partitions.size()); } // multiple partitions, crossing the limit { int partitionLimit = 3; BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader( - ImmutableList.of(createPartitionMetadata(), createPartitionMetadata(), createPartitionMetadata(), createPartitionMetadata()), - TEST_FILES, + List.of(createPartitionMetadata(), createPartitionMetadata(), createPartitionMetadata(), createPartitionMetadata()), + TEST_LOCATIONS, directoryLister, partitionLimit); HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader); @@ -962,20 +941,22 @@ private static HivePartitionMetadata createPartitionMetadata() TableToPartitionMapping.empty()); } - private static void createOrcAcidFile(File file) + private static void createOrcAcidFile(TrinoFileSystem fileSystem, Location location) throws IOException { - createOrcAcidFile(file, 2); + createOrcAcidFile(fileSystem, location, 2); } - private static void createOrcAcidFile(File file, int orcAcidVersion) + private static void createOrcAcidFile(TrinoFileSystem fileSystem, Location location, int orcAcidVersion) throws IOException { - if (file.getName().equals("_orc_acid_version")) { - Files.write(file.toPath(), String.valueOf(orcAcidVersion).getBytes(UTF_8)); - return; + try (OutputStream outputStream = fileSystem.newOutputFile(location).create()) { + if (location.fileName().equals("_orc_acid_version")) { + outputStream.write(String.valueOf(orcAcidVersion).getBytes(UTF_8)); + return; + } + Resources.copy(getResource("fullacidNationTableWithOriginalFiles/000000_0"), outputStream); } - Files.copy(getResource("fullacidNationTableWithOriginalFiles/000000_0").openStream(), file.toPath()); } private static List drain(HiveSplitSource source) @@ -983,7 +964,7 @@ private static List drain(HiveSplitSource source) { return drainSplits(source).stream() .map(HiveSplit::getPath) - .collect(toImmutableList()); + .toList(); } private static List drainSplits(HiveSplitSource source) @@ -1009,9 +990,11 @@ private static List drainSplits(HiveSplitSource source) private BackgroundHiveSplitLoader backgroundHiveSplitLoader( DynamicFilter dynamicFilter, Duration dynamicFilteringProbeBlockingTimeoutMillis) + throws IOException { + TrinoFileSystemFactory fileSystemFactory = createTestingFileSystem(TEST_LOCATIONS); return backgroundHiveSplitLoader( - new TestingHdfsEnvironment(TEST_FILES), + fileSystemFactory, TupleDomain.all(), dynamicFilter, dynamicFilteringProbeBlockingTimeoutMillis, @@ -1021,12 +1004,26 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader( Optional.empty()); } + private static TrinoFileSystemFactory createTestingFileSystem(Collection locations) + throws IOException + { + TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + TrinoFileSystem fileSystem = fileSystemFactory.create(ConnectorIdentity.ofUser("test")); + for (Location location : locations) { + try (OutputStream outputStream = fileSystem.newOutputFile(location).create()) { + outputStream.write(new byte[10]); + } + } + return fileSystemFactory; + } + private BackgroundHiveSplitLoader backgroundHiveSplitLoader( - List files, + List locations, TupleDomain tupleDomain) + throws IOException { return backgroundHiveSplitLoader( - files, + locations, tupleDomain, Optional.empty(), SIMPLE_TABLE, @@ -1034,14 +1031,15 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader( } private BackgroundHiveSplitLoader backgroundHiveSplitLoader( - List files, + List locations, TupleDomain compactEffectivePredicate, Optional hiveBucketFilter, Table table, Optional bucketHandle) + throws IOException { return backgroundHiveSplitLoader( - files, + locations, compactEffectivePredicate, hiveBucketFilter, table, @@ -1050,15 +1048,17 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader( } private BackgroundHiveSplitLoader backgroundHiveSplitLoader( - List files, + List locations, TupleDomain compactEffectivePredicate, Optional hiveBucketFilter, Table table, Optional bucketHandle, Optional validWriteIds) + throws IOException { + TrinoFileSystemFactory fileSystemFactory = createTestingFileSystem(locations); return backgroundHiveSplitLoader( - new TestingHdfsEnvironment(files), + fileSystemFactory, compactEffectivePredicate, hiveBucketFilter, table, @@ -1067,7 +1067,7 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader( } private BackgroundHiveSplitLoader backgroundHiveSplitLoader( - HdfsEnvironment hdfsEnvironment, + TrinoFileSystemFactory fileSystemFactory, TupleDomain compactEffectivePredicate, Optional hiveBucketFilter, Table table, @@ -1075,7 +1075,7 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader( Optional validWriteIds) { return backgroundHiveSplitLoader( - hdfsEnvironment, + fileSystemFactory, compactEffectivePredicate, DynamicFilter.EMPTY, new Duration(0, SECONDS), @@ -1086,7 +1086,7 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader( } private BackgroundHiveSplitLoader backgroundHiveSplitLoader( - HdfsEnvironment hdfsEnvironment, + TrinoFileSystemFactory fileSystemFactory, TupleDomain compactEffectivePredicate, DynamicFilter dynamicFilter, Duration dynamicFilteringProbeBlockingTimeout, @@ -1096,7 +1096,7 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader( Optional validWriteIds) { List hivePartitionMetadatas = - ImmutableList.of( + List.of( new HivePartitionMetadata( new HivePartition(new SchemaTableName("testSchema", "table_name")), Optional.empty(), @@ -1111,7 +1111,7 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader( TESTING_TYPE_MANAGER, createBucketSplitInfo(bucketHandle, hiveBucketFilter), SESSION, - new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), + fileSystemFactory, new CachingDirectoryLister(new HiveConfig()), executor, 2, @@ -1123,27 +1123,29 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader( } private BackgroundHiveSplitLoader backgroundHiveSplitLoader( - List files, + List locations, DirectoryLister directoryLister) + throws IOException { - List partitions = ImmutableList.of( + List partitions = List.of( new HivePartitionMetadata( new HivePartition(new SchemaTableName("testSchema", "table_name")), Optional.empty(), TableToPartitionMapping.empty())); - return backgroundHiveSplitLoader(partitions, files, directoryLister, 100); + return backgroundHiveSplitLoader(partitions, locations, directoryLister, 100); } private BackgroundHiveSplitLoader backgroundHiveSplitLoader( List partitions, - List files, + List locations, DirectoryLister directoryLister, int maxPartitions) + throws IOException { ConnectorSession connectorSession = getHiveSession(new HiveConfig() .setMaxSplitSize(DataSize.of(1, GIGABYTE))); - HdfsEnvironment hdfsEnvironment = new TestingHdfsEnvironment(files); + TrinoFileSystemFactory fileSystemFactory = createTestingFileSystem(locations); return new BackgroundHiveSplitLoader( SIMPLE_TABLE, partitions.iterator(), @@ -1153,7 +1155,7 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader( TESTING_TYPE_MANAGER, Optional.empty(), connectorSession, - new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), + fileSystemFactory, directoryLister, executor, 2, @@ -1165,11 +1167,12 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader( } private BackgroundHiveSplitLoader backgroundHiveSplitLoaderOfflinePartitions() + throws IOException { ConnectorSession connectorSession = getHiveSession(new HiveConfig() .setMaxSplitSize(DataSize.of(1, GIGABYTE))); - HdfsEnvironment hdfsEnvironment = new TestingHdfsEnvironment(TEST_FILES); + TrinoFileSystemFactory fileSystemFactory = createTestingFileSystem(TEST_LOCATIONS); return new BackgroundHiveSplitLoader( SIMPLE_TABLE, createPartitionMetadataWithOfflinePartitions(), @@ -1179,7 +1182,7 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoaderOfflinePartitions() TESTING_TYPE_MANAGER, createBucketSplitInfo(Optional.empty(), Optional.empty()), connectorSession, - new HdfsFileSystemFactory(hdfsEnvironment, HDFS_FILE_SYSTEM_STATS), + fileSystemFactory, new CachingDirectoryLister(new HiveConfig()), executor, 2, @@ -1203,17 +1206,11 @@ private static Iterator createPartitionMetadataWithOfflin protected HivePartitionMetadata computeNext() { position++; - switch (position) { - case 0: - return new HivePartitionMetadata( - new HivePartition(new SchemaTableName("testSchema", "table_name")), - Optional.empty(), - TableToPartitionMapping.empty()); - case 1: - throw new RuntimeException("OFFLINE"); - default: - return endOfData(); - } + return switch (position) { + case 0 -> new HivePartitionMetadata(new HivePartition(new SchemaTableName("testSchema", "table_name")), Optional.empty(), TableToPartitionMapping.empty()); + case 1 -> throw new RuntimeException("OFFLINE"); + default -> endOfData(); + }; } }; } @@ -1234,41 +1231,17 @@ private HiveSplitSource hiveSplitSource(HiveSplitLoader hiveSplitLoader) false); } - private static Table table( - List partitionColumns, - Optional bucketProperty, - ImmutableMap tableParameters) - { - return table(partitionColumns, - bucketProperty, - tableParameters, - StorageFormat.create(ORC.getSerde(), ORC.getInputFormat(), ORC.getOutputFormat())); - } - private static Table table( String location, List partitionColumns, Optional bucketProperty, - ImmutableMap tableParameters) + Map tableParameters) { return table(location, partitionColumns, bucketProperty, tableParameters, - StorageFormat.create(ORC.getSerde(), ORC.getInputFormat(), ORC.getOutputFormat())); - } - - private static Table table( - List partitionColumns, - Optional bucketProperty, - Map tableParameters, - StorageFormat storageFormat) - { - return table("hdfs://VOL1:9000/db_name/table_name", - partitionColumns, - bucketProperty, - tableParameters, - storageFormat); + StorageFormat.fromHiveStorageFormat(ORC)); } private static Table table( @@ -1290,192 +1263,106 @@ private static Table table( .setOwner(Optional.of("testOwner")) .setTableName("test_table") .setTableType(MANAGED_TABLE.name()) - .setDataColumns(ImmutableList.of(new Column("col1", HIVE_STRING, Optional.empty()))) + .setDataColumns(List.of(new Column("col1", HIVE_STRING, Optional.empty(), Map.of()))) .setParameters(tableParameters) .setPartitionColumns(partitionColumns) .build(); } - private static LocatedFileStatus locatedFileStatus(Location location) - { - return locatedFileStatus(new Path(location.toString()), 10); - } - - private static LocatedFileStatus locatedFileStatus(Path path) - { - return locatedFileStatus(path, 10); - } - - private static LocatedFileStatus locatedFileStatus(Path path, long fileLength) - { - return new LocatedFileStatus( - fileLength, - false, - 0, - 0L, - 0L, - 0L, - null, - null, - null, - null, - path, - new BlockLocation[] {new BlockLocation(new String[1], new String[] {"localhost"}, 0, fileLength)}); - } - - private static LocatedFileStatus locatedFileStatusWithNoBlocks(Path path) - { - return new LocatedFileStatus( - 0L, - false, - 0, - 0L, - 0L, - 0L, - null, - null, - null, - null, - path, - new BlockLocation[] {}); - } - - public static class TestingHdfsEnvironment - extends HdfsEnvironment - { - private final List files; - - public TestingHdfsEnvironment(List files) - { - super( - new DynamicHdfsConfiguration( - new HdfsConfigurationInitializer(new HdfsConfig()), - ImmutableSet.of()), - new HdfsConfig(), - new NoHdfsAuthentication()); - this.files = ImmutableList.copyOf(files); - } - - @Override - public FileSystem getFileSystem(ConnectorIdentity identity, Path path, Configuration configuration) - { - return new TestingHdfsFileSystem(files); - } - } - - private static class TestingHdfsFileSystem - extends FileSystem + private record ListSingleFileFileSystemFactory(FileEntry fileEntry) + implements TrinoFileSystemFactory { - private final List files; - - public TestingHdfsFileSystem(List files) - { - this.files = ImmutableList.copyOf(files); - } - - @Override - public boolean delete(Path f, boolean recursive) - { - throw new UnsupportedOperationException(); - } - @Override - public boolean rename(Path src, Path dst) + public TrinoFileSystem create(ConnectorIdentity identity) { - throw new UnsupportedOperationException(); - } + return new TrinoFileSystem() + { + @Override + public Optional directoryExists(Location location) + { + return Optional.empty(); + } - @Override - public void setWorkingDirectory(Path dir) - { - throw new UnsupportedOperationException(); - } + @Override + public FileIterator listFiles(Location location) + { + Iterator iterator = List.of(fileEntry).iterator(); + return new FileIterator() + { + @Override + public boolean hasNext() + { + return iterator.hasNext(); + } - @Override - public FileStatus[] listStatus(Path f) - { - FileStatus[] fileStatuses = new FileStatus[files.size()]; - for (int i = 0; i < files.size(); i++) { - LocatedFileStatus locatedFileStatus = files.get(i); - fileStatuses[i] = new FileStatus( - locatedFileStatus.getLen(), - locatedFileStatus.isDirectory(), - locatedFileStatus.getReplication(), - locatedFileStatus.getBlockSize(), - locatedFileStatus.getModificationTime(), - locatedFileStatus.getPath()); - } - return fileStatuses; - } + @Override + public FileEntry next() + { + return iterator.next(); + } + }; + } - @Override - public RemoteIterator listLocatedStatus(Path f) - { - return new RemoteIterator<>() - { - private final Iterator iterator = files.iterator(); + @Override + public TrinoInputFile newInputFile(Location location) + { + throw new UnsupportedOperationException(); + } @Override - public boolean hasNext() + public TrinoInputFile newInputFile(Location location, long length) { - return iterator.hasNext(); + throw new UnsupportedOperationException(); } @Override - public LocatedFileStatus next() + public TrinoOutputFile newOutputFile(Location location) { - return iterator.next(); + throw new UnsupportedOperationException(); } - }; - } - @Override - public FSDataOutputStream create( - Path f, - FsPermission permission, - boolean overwrite, - int bufferSize, - short replication, - long blockSize, - Progressable progress) - { - throw new UnsupportedOperationException(); - } + @Override + public void deleteFile(Location location) + { + throw new UnsupportedOperationException(); + } - @Override - public boolean mkdirs(Path f, FsPermission permission) - { - throw new UnsupportedOperationException(); - } + @Override + public void deleteDirectory(Location location) + { + throw new UnsupportedOperationException(); + } - @Override - public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) - { - throw new UnsupportedOperationException(); - } + @Override + public void renameFile(Location source, Location target) + { + throw new UnsupportedOperationException(); + } - @Override - public FSDataInputStream open(Path f, int bufferSize) - { - throw new UnsupportedOperationException(); - } + @Override + public void createDirectory(Location location) + { + throw new UnsupportedOperationException(); + } - @Override - public FileStatus getFileStatus(Path f) - { - throw new UnsupportedOperationException(); - } + @Override + public void renameDirectory(Location source, Location target) + { + throw new UnsupportedOperationException(); + } - @Override - public Path getWorkingDirectory() - { - return new Path(getUri()); - } + @Override + public Set listDirectories(Location location) + { + throw new UnsupportedOperationException(); + } - @Override - public URI getUri() - { - return URI.create("hdfs://VOL1:9000/"); + @Override + public Optional createTemporaryDirectory(Location targetPath, String temporaryPrefix, String relativePrefix) + { + throw new UnsupportedOperationException(); + } + }; } } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java index d48733a3237f..2c4d60a3da93 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java @@ -18,7 +18,11 @@ import com.google.common.collect.ImmutableMap; import io.airlift.json.JsonCodec; import io.airlift.slice.Slices; +import io.trino.filesystem.FileEntry; +import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.memory.MemoryFileSystemFactory; import io.trino.operator.GroupByHashPageIndexerFactory; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.HiveMetastoreFactory; @@ -33,6 +37,7 @@ import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.type.Type; import io.trino.spi.type.TypeOperators; import io.trino.sql.gen.JoinCompiler; @@ -45,8 +50,7 @@ import io.trino.tpch.TpchColumnTypes; import org.junit.jupiter.api.Test; -import java.io.File; -import java.nio.file.Files; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -55,27 +59,19 @@ import java.util.stream.Stream; import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.Iterables.getOnlyElement; -import static com.google.common.io.MoreFiles.deleteRecursively; -import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.testing.Assertions.assertGreaterThan; +import static io.trino.hive.thrift.metastore.hive_metastoreConstants.FILE_INPUT_FORMAT; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; import static io.trino.plugin.hive.HiveCompressionOption.LZ4; import static io.trino.plugin.hive.HiveCompressionOption.NONE; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.plugin.hive.HiveTestUtils.PAGE_SORTER; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveFileWriterFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHivePageSourceFactories; import static io.trino.plugin.hive.HiveTestUtils.getHiveSession; import static io.trino.plugin.hive.HiveTestUtils.getHiveSessionProperties; -import static io.trino.plugin.hive.HiveType.HIVE_DATE; -import static io.trino.plugin.hive.HiveType.HIVE_DOUBLE; -import static io.trino.plugin.hive.HiveType.HIVE_INT; -import static io.trino.plugin.hive.HiveType.HIVE_LONG; -import static io.trino.plugin.hive.HiveType.HIVE_STRING; import static io.trino.plugin.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_NEW_DIRECTORY; import static io.trino.plugin.hive.acid.AcidTransaction.NO_ACID_TRANSACTION; import static io.trino.plugin.hive.metastore.file.TestingFileHiveMetastore.createTestingFileHiveMetastore; @@ -84,13 +80,13 @@ import static io.trino.spi.type.DateType.DATE; import static io.trino.spi.type.DoubleType.DOUBLE; import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; import static io.trino.testing.TestingPageSinkId.TESTING_PAGE_SINK_ID; import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; import static java.lang.Math.round; import static java.lang.String.format; import static java.util.stream.Collectors.toList; -import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; @@ -103,55 +99,51 @@ public class TestHivePageSink private static final String TABLE_NAME = "test"; @Test - public void testAllFormats() + void testAllFormats() throws Exception { HiveConfig config = new HiveConfig(); SortingFileWriterConfig sortingFileWriterConfig = new SortingFileWriterConfig(); - File tempDir = Files.createTempDirectory(null).toFile(); - try { - HiveMetastore metastore = createTestingFileHiveMetastore(new File(tempDir, "metastore")); - for (HiveStorageFormat format : HiveStorageFormat.values()) { - if (format == HiveStorageFormat.CSV) { - // CSV supports only unbounded VARCHAR type, which is not provided by lineitem + + TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + HiveMetastore metastore = createTestingFileHiveMetastore(fileSystemFactory, Location.of("memory:///metastore")); + for (HiveStorageFormat format : HiveStorageFormat.values()) { + if (format == HiveStorageFormat.CSV) { + // CSV supports only the unbounded VARCHAR type, which is not provided by lineitem + continue; + } + if (format == HiveStorageFormat.REGEX) { + // REGEX format is readonly + continue; + } + config.setHiveStorageFormat(format); + config.setHiveCompressionCodec(NONE); + long uncompressedLength = writeTestFile(fileSystemFactory, config, sortingFileWriterConfig, metastore, makeFileName(config)); + assertGreaterThan(uncompressedLength, 0L); + + for (HiveCompressionOption codec : HiveCompressionOption.values()) { + if (codec == NONE) { continue; } - if (format == HiveStorageFormat.REGEX) { - // REGEX format is readonly + if ((format == HiveStorageFormat.PARQUET) && (codec == LZ4)) { + // TODO (https://github.com/trinodb/trino/issues/9142) LZ4 is not supported with native Parquet writer continue; } - config.setHiveStorageFormat(format); - config.setHiveCompressionCodec(NONE); - long uncompressedLength = writeTestFile(config, sortingFileWriterConfig, metastore, makeFileName(tempDir, config)); - assertGreaterThan(uncompressedLength, 0L); + config.setHiveCompressionCodec(codec); - for (HiveCompressionOption codec : HiveCompressionOption.values()) { - if (codec == NONE) { - continue; - } - if ((format == HiveStorageFormat.PARQUET) && (codec == LZ4)) { - // TODO (https://github.com/trinodb/trino/issues/9142) LZ4 is not supported with native Parquet writer - continue; - } - config.setHiveCompressionCodec(codec); - - if (!isSupportedCodec(format, codec)) { - assertThatThrownBy(() -> writeTestFile(config, sortingFileWriterConfig, metastore, makeFileName(tempDir, config))) - .hasMessage("Compression codec " + codec + " not supported for " + format); - continue; - } - - long length = writeTestFile(config, sortingFileWriterConfig, metastore, makeFileName(tempDir, config)); - assertTrue(uncompressedLength > length, format("%s with %s compressed to %s which is not less than %s", format, codec, length, uncompressedLength)); + if (!isSupportedCodec(format, codec)) { + assertThatThrownBy(() -> writeTestFile(fileSystemFactory, config, sortingFileWriterConfig, metastore, makeFileName(config))) + .hasMessage("Compression codec " + codec + " not supported for " + format); + continue; } + + long length = writeTestFile(fileSystemFactory, config, sortingFileWriterConfig, metastore, makeFileName(config)); + assertTrue(uncompressedLength > length, format("%s with %s compressed to %s which is not less than %s", format, codec, length, uncompressedLength)); } } - finally { - deleteRecursively(tempDir.toPath(), ALLOW_INSECURE); - } } - private boolean isSupportedCodec(HiveStorageFormat storageFormat, HiveCompressionOption compressionOption) + private static boolean isSupportedCodec(HiveStorageFormat storageFormat, HiveCompressionOption compressionOption) { if (storageFormat == HiveStorageFormat.AVRO && compressionOption == LZ4) { return false; @@ -159,20 +151,21 @@ private boolean isSupportedCodec(HiveStorageFormat storageFormat, HiveCompressio return true; } - private static String makeFileName(File tempDir, HiveConfig config) + private static Location makeFileName(HiveConfig config) { - return tempDir.getAbsolutePath() + "/" + config.getHiveStorageFormat().name() + "." + config.getHiveCompressionCodec().name(); + return Location.of("memory:///" + config.getHiveStorageFormat().name() + "." + config.getHiveCompressionCodec().name()); } - private static long writeTestFile(HiveConfig config, SortingFileWriterConfig sortingFileWriterConfig, HiveMetastore metastore, String outputPath) + private static long writeTestFile(TrinoFileSystemFactory fileSystemFactory, HiveConfig config, SortingFileWriterConfig sortingFileWriterConfig, HiveMetastore metastore, Location location) + throws IOException { HiveTransactionHandle transaction = new HiveTransactionHandle(false); HiveWriterStats stats = new HiveWriterStats(); - ConnectorPageSink pageSink = createPageSink(transaction, config, sortingFileWriterConfig, metastore, Location.of("file:///" + outputPath), stats); + ConnectorPageSink pageSink = createPageSink(fileSystemFactory, transaction, config, sortingFileWriterConfig, metastore, location, stats); List columns = getTestColumns(); List columnTypes = columns.stream() .map(LineItemColumn::getType) - .map(TestHivePageSink::getHiveType) + .map(TestHivePageSink::getType) .map(hiveType -> TESTING_TYPE_MANAGER.getType(hiveType.getTypeSignature())) .collect(toList()); @@ -212,28 +205,28 @@ private static long writeTestFile(HiveConfig config, SortingFileWriterConfig sor pageSink.appendPage(page); getFutureValue(pageSink.finish()); - File outputDir = new File(outputPath); - List files = ImmutableList.copyOf(outputDir.listFiles((dir, name) -> !name.endsWith(".crc"))); - File outputFile = getOnlyElement(files); - long length = outputFile.length(); - - ConnectorPageSource pageSource = createPageSource(transaction, config, outputFile); + FileIterator fileIterator = fileSystemFactory.create(ConnectorIdentity.ofUser("test")).listFiles(location); + FileEntry fileEntry = fileIterator.next(); + assertThat(fileIterator.hasNext()).isFalse(); List pages = new ArrayList<>(); - while (!pageSource.isFinished()) { - Page nextPage = pageSource.getNextPage(); - if (nextPage != null) { - pages.add(nextPage.getLoadedPage()); + try (ConnectorPageSource pageSource = createPageSource(fileSystemFactory, transaction, config, fileEntry.location())) { + while (!pageSource.isFinished()) { + Page nextPage = pageSource.getNextPage(); + if (nextPage != null) { + pages.add(nextPage.getLoadedPage()); + } } } + MaterializedResult expectedResults = toMaterializedResult(getHiveSession(config), columnTypes, ImmutableList.of(page)); MaterializedResult results = toMaterializedResult(getHiveSession(config), columnTypes, pages); assertThat(results).containsExactlyElementsOf(expectedResults); assertEquals(round(stats.getInputPageSizeInBytes().getAllTime().getMax()), page.getRetainedSizeInBytes()); - return length; + return fileEntry.length(); } - public static MaterializedResult toMaterializedResult(ConnectorSession session, List types, List pages) + static MaterializedResult toMaterializedResult(ConnectorSession session, List types, List pages) { // materialize pages MaterializedResult.Builder resultBuilder = MaterializedResult.resultBuilder(session, types); @@ -243,8 +236,10 @@ public static MaterializedResult toMaterializedResult(ConnectorSession session, return resultBuilder.build(); } - private static ConnectorPageSource createPageSource(HiveTransactionHandle transaction, HiveConfig config, File outputFile) + private static ConnectorPageSource createPageSource(TrinoFileSystemFactory fileSystemFactory, HiveTransactionHandle transaction, HiveConfig config, Location location) + throws IOException { + long length = fileSystemFactory.create(ConnectorIdentity.ofUser("test")).newInputFile(location).length(); Properties splitProperties = new Properties(); splitProperties.setProperty(FILE_INPUT_FORMAT, config.getHiveStorageFormat().getInputFormat()); splitProperties.setProperty(SERIALIZATION_LIB, config.getHiveStorageFormat().getSerde()); @@ -252,11 +247,11 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa splitProperties.setProperty("columns.types", Joiner.on(',').join(getColumnHandles().stream().map(HiveColumnHandle::getHiveType).map(hiveType -> hiveType.getHiveTypeName().toString()).collect(toImmutableList()))); HiveSplit split = new HiveSplit( "", - "file:///" + outputFile.getAbsolutePath(), + location.toString(), + 0, + length, + length, 0, - outputFile.length(), - outputFile.length(), - outputFile.lastModified(), splitProperties, ImmutableList.of(), ImmutableList.of(), @@ -272,13 +267,13 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa HivePageSourceProvider provider = new HivePageSourceProvider( TESTING_TYPE_MANAGER, config, - getDefaultHivePageSourceFactories(HDFS_ENVIRONMENT, config)); + getDefaultHivePageSourceFactories(fileSystemFactory, config)); return provider.createPageSource(transaction, getHiveSession(config), split, table, ImmutableList.copyOf(getColumnHandles()), DynamicFilter.EMPTY); } - private static ConnectorPageSink createPageSink(HiveTransactionHandle transaction, HiveConfig config, SortingFileWriterConfig sortingFileWriterConfig, HiveMetastore metastore, Location outputPath, HiveWriterStats stats) + private static ConnectorPageSink createPageSink(TrinoFileSystemFactory fileSystemFactory, HiveTransactionHandle transaction, HiveConfig config, SortingFileWriterConfig sortingFileWriterConfig, HiveMetastore metastore, Location location, HiveWriterStats stats) { - LocationHandle locationHandle = new LocationHandle(outputPath, outputPath, DIRECT_TO_TARGET_NEW_DIRECTORY); + LocationHandle locationHandle = new LocationHandle(location, location, DIRECT_TO_TARGET_NEW_DIRECTORY); HiveOutputTableHandle handle = new HiveOutputTableHandle( SCHEMA_NAME, TABLE_NAME, @@ -296,7 +291,7 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio false); JsonCodec partitionUpdateCodec = JsonCodec.jsonCodec(PartitionUpdate.class); HivePageSinkProvider provider = new HivePageSinkProvider( - getDefaultHiveFileWriterFactories(config, HDFS_ENVIRONMENT), + getDefaultHiveFileWriterFactories(config, fileSystemFactory), HDFS_FILE_SYSTEM_FACTORY, PAGE_SORTER, HiveMetastoreFactory.ofInstance(metastore), @@ -319,8 +314,8 @@ private static List getColumnHandles() List columns = getTestColumns(); for (int i = 0; i < columns.size(); i++) { LineItemColumn column = columns.get(i); - HiveType hiveType = getHiveType(column.getType()); - handles.add(createBaseColumn(column.getColumnName(), i, hiveType, TESTING_TYPE_MANAGER.getType(hiveType.getTypeSignature()), REGULAR, Optional.empty())); + Type type = getType(column.getType()); + handles.add(createBaseColumn(column.getColumnName(), i, HiveType.toHiveType(type), type, REGULAR, Optional.empty())); } return handles.build(); } @@ -333,20 +328,14 @@ private static List getTestColumns() .collect(toList()); } - private static HiveType getHiveType(TpchColumnType type) + private static Type getType(TpchColumnType type) { - switch (type.getBase()) { - case IDENTIFIER: - return HIVE_LONG; - case INTEGER: - return HIVE_INT; - case DATE: - return HIVE_DATE; - case DOUBLE: - return HIVE_DOUBLE; - case VARCHAR: - return HIVE_STRING; - } - throw new UnsupportedOperationException(); + return switch (type.getBase()) { + case IDENTIFIER -> BIGINT; + case INTEGER -> INTEGER; + case DATE -> DATE; + case DOUBLE -> DOUBLE; + case VARCHAR -> VARCHAR; + }; } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNodeLocalDynamicSplitPruning.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNodeLocalDynamicSplitPruning.java index b13bd0a4b9c9..1122c6ccbff2 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNodeLocalDynamicSplitPruning.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestNodeLocalDynamicSplitPruning.java @@ -16,7 +16,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import io.airlift.testing.TempFile; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.memory.MemoryFileSystemFactory; import io.trino.metadata.TableHandle; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.orc.OrcReaderConfig; @@ -30,10 +32,10 @@ import io.trino.spi.connector.EmptyPageSource; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.security.ConnectorIdentity; import io.trino.testing.TestingConnectorSession; import org.junit.jupiter.api.Test; -import java.io.File; import java.io.IOException; import java.util.Map; import java.util.Optional; @@ -44,7 +46,6 @@ import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.PARTITION_KEY; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHivePageSourceFactories; import static io.trino.plugin.hive.HiveType.HIVE_INT; import static io.trino.plugin.hive.util.HiveBucketing.BucketingVersion.BUCKETING_V1; @@ -56,17 +57,17 @@ import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT; import static org.testng.Assert.assertEquals; -public class TestNodeLocalDynamicSplitPruning +class TestNodeLocalDynamicSplitPruning { private static final String SCHEMA_NAME = "test"; private static final String TABLE_NAME = "test"; - private static final Column BUCKET_COLUMN = new Column("l_orderkey", HIVE_INT, Optional.empty()); - private static final Column PARTITION_COLUMN = new Column("l_partkey", HIVE_INT, Optional.empty()); + private static final Column BUCKET_COLUMN = new Column("l_orderkey", HIVE_INT, Optional.empty(), ImmutableMap.of()); + private static final Column PARTITION_COLUMN = new Column("l_partkey", HIVE_INT, Optional.empty(), ImmutableMap.of()); private static final HiveColumnHandle BUCKET_HIVE_COLUMN_HANDLE = new HiveColumnHandle( BUCKET_COLUMN.getName(), 0, BUCKET_COLUMN.getType(), - TESTING_TYPE_MANAGER.getType(BUCKET_COLUMN.getType().getTypeSignature()), + INTEGER, Optional.empty(), REGULAR, Optional.empty()); @@ -74,57 +75,59 @@ public class TestNodeLocalDynamicSplitPruning PARTITION_COLUMN.getName(), 0, PARTITION_COLUMN.getType(), - TESTING_TYPE_MANAGER.getType(PARTITION_COLUMN.getType().getTypeSignature()), + INTEGER, Optional.empty(), PARTITION_KEY, Optional.empty()); @Test - public void testDynamicBucketPruning() + void testDynamicBucketPruning() throws IOException { HiveConfig config = new HiveConfig(); HiveTransactionHandle transaction = new HiveTransactionHandle(false); - try (TempFile tempFile = new TempFile()) { - try (ConnectorPageSource emptyPageSource = createTestingPageSource(transaction, config, tempFile.file(), getDynamicFilter(getTupleDomainForBucketSplitPruning()))) { - assertEquals(emptyPageSource.getClass(), EmptyPageSource.class); - } + try (ConnectorPageSource emptyPageSource = createTestingPageSource(transaction, config, getDynamicFilter(getTupleDomainForBucketSplitPruning()))) { + assertEquals(emptyPageSource.getClass(), EmptyPageSource.class); + } - try (ConnectorPageSource nonEmptyPageSource = createTestingPageSource(transaction, config, tempFile.file(), getDynamicFilter(getNonSelectiveBucketTupleDomain()))) { - assertEquals(nonEmptyPageSource.getClass(), HivePageSource.class); - } + try (ConnectorPageSource nonEmptyPageSource = createTestingPageSource(transaction, config, getDynamicFilter(getNonSelectiveBucketTupleDomain()))) { + assertEquals(nonEmptyPageSource.getClass(), HivePageSource.class); } } @Test - public void testDynamicPartitionPruning() + void testDynamicPartitionPruning() throws IOException { HiveConfig config = new HiveConfig(); HiveTransactionHandle transaction = new HiveTransactionHandle(false); - try (TempFile tempFile = new TempFile()) { - try (ConnectorPageSource emptyPageSource = createTestingPageSource(transaction, config, tempFile.file(), getDynamicFilter(getTupleDomainForPartitionSplitPruning()))) { - assertEquals(emptyPageSource.getClass(), EmptyPageSource.class); - } - try (ConnectorPageSource nonEmptyPageSource = createTestingPageSource(transaction, config, tempFile.file(), getDynamicFilter(getNonSelectivePartitionTupleDomain()))) { - assertEquals(nonEmptyPageSource.getClass(), HivePageSource.class); - } + try (ConnectorPageSource emptyPageSource = createTestingPageSource(transaction, config, getDynamicFilter(getTupleDomainForPartitionSplitPruning()))) { + assertEquals(emptyPageSource.getClass(), EmptyPageSource.class); + } + + try (ConnectorPageSource nonEmptyPageSource = createTestingPageSource(transaction, config, getDynamicFilter(getNonSelectivePartitionTupleDomain()))) { + assertEquals(nonEmptyPageSource.getClass(), HivePageSource.class); } } - private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle transaction, HiveConfig hiveConfig, File outputFile, DynamicFilter dynamicFilter) + private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle transaction, HiveConfig hiveConfig, DynamicFilter dynamicFilter) + throws IOException { + Location location = Location.of("memory:///file"); + TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + fileSystemFactory.create(ConnectorIdentity.ofUser("test")).newOutputFile(location).create().close(); + Properties splitProperties = new Properties(); splitProperties.setProperty(FILE_INPUT_FORMAT, hiveConfig.getHiveStorageFormat().getInputFormat()); splitProperties.setProperty(SERIALIZATION_LIB, hiveConfig.getHiveStorageFormat().getSerde()); HiveSplit split = new HiveSplit( "", - "file:///" + outputFile.getAbsolutePath(), + location.toString(), + 0, + 0, + 0, 0, - outputFile.length(), - outputFile.length(), - outputFile.lastModified(), splitProperties, ImmutableList.of(new HivePartitionKey(PARTITION_COLUMN.getName(), "42")), ImmutableList.of(), @@ -156,7 +159,7 @@ private static ConnectorPageSource createTestingPageSource(HiveTransactionHandle HivePageSourceProvider provider = new HivePageSourceProvider( TESTING_TYPE_MANAGER, hiveConfig, - getDefaultHivePageSourceFactories(HDFS_ENVIRONMENT, hiveConfig)); + getDefaultHivePageSourceFactories(fileSystemFactory, hiveConfig)); return provider.createPageSource( transaction, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/file/TestingFileHiveMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/file/TestingFileHiveMetastore.java index 140e1adf9c1e..e97982ff7484 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/file/TestingFileHiveMetastore.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/file/TestingFileHiveMetastore.java @@ -13,6 +13,8 @@ */ package io.trino.plugin.hive.metastore.file; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.hive.NodeVersion; import io.trino.plugin.hive.metastore.HiveMetastoreConfig; @@ -25,13 +27,18 @@ public final class TestingFileHiveMetastore private TestingFileHiveMetastore() {} public static FileHiveMetastore createTestingFileHiveMetastore(File catalogDirectory) + { + return createTestingFileHiveMetastore(HDFS_FILE_SYSTEM_FACTORY, Location.of(catalogDirectory.toURI().toString())); + } + + public static FileHiveMetastore createTestingFileHiveMetastore(TrinoFileSystemFactory fileSystemFactory, Location catalogDirectory) { return new FileHiveMetastore( new NodeVersion("testversion"), - HDFS_FILE_SYSTEM_FACTORY, + fileSystemFactory, new HiveMetastoreConfig().isHideDeltaLakeTables(), new FileHiveMetastoreConfig() - .setCatalogDirectory(catalogDirectory.toURI().toString()) + .setCatalogDirectory(catalogDirectory.toString()) .setMetastoreUser("test")); } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java index cf367a0ec76b..3ad44049fd8c 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java @@ -15,8 +15,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.io.Resources; import io.trino.filesystem.Location; -import io.trino.filesystem.hdfs.HdfsFileSystemFactory; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.memory.MemoryFileSystemFactory; import io.trino.plugin.hive.AcidInfo; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.HiveColumnHandle; @@ -27,14 +30,15 @@ import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.type.Type; import io.trino.tpch.Nation; import io.trino.tpch.NationColumn; import io.trino.tpch.NationGenerator; import org.junit.jupiter.api.Test; -import java.io.File; -import java.net.URISyntaxException; +import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -51,8 +55,7 @@ import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; import static io.trino.plugin.hive.HiveStorageFormat.ORC; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; +import static io.trino.plugin.hive.HiveTableProperties.TRANSACTIONAL; import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static io.trino.plugin.hive.HiveType.toHiveType; import static io.trino.plugin.hive.acid.AcidTransaction.NO_ACID_TRANSACTION; @@ -66,8 +69,6 @@ import static io.trino.tpch.NationColumn.REGION_KEY; import static java.util.Collections.nCopies; import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT; -import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.TABLE_IS_TRANSACTIONAL; -import static org.apache.hadoop.hive.ql.io.AcidUtils.deleteDeltaSubdir; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -75,20 +76,17 @@ public class TestOrcPageSourceFactory { private static final Map ALL_COLUMNS = ImmutableMap.of(NATION_KEY, 0, NAME, 1, REGION_KEY, 2, COMMENT, 3); - private static final HivePageSourceFactory PAGE_SOURCE_FACTORY = new OrcPageSourceFactory( - new OrcReaderConfig(), - new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS), - new FileFormatDataSourceStats(), - new HiveConfig()); @Test public void testFullFileRead() + throws IOException { assertRead(ImmutableMap.of(NATION_KEY, 0, NAME, 1, REGION_KEY, 2, COMMENT, 3), OptionalLong.empty(), Optional.empty(), nationKey -> false); } @Test public void testSingleColumnRead() + throws IOException { assertRead(ImmutableMap.of(REGION_KEY, ALL_COLUMNS.get(REGION_KEY)), OptionalLong.empty(), Optional.empty(), nationKey -> false); } @@ -98,6 +96,7 @@ public void testSingleColumnRead() */ @Test public void testFullFileSkipped() + throws IOException { assertRead(ALL_COLUMNS, OptionalLong.of(100L), Optional.empty(), nationKey -> false); } @@ -107,34 +106,44 @@ public void testFullFileSkipped() */ @Test public void testSomeStripesAndRowGroupRead() + throws IOException { assertRead(ALL_COLUMNS, OptionalLong.of(5L), Optional.empty(), nationKey -> false); } @Test public void testDeletedRows() + throws IOException { - Location partitionLocation = Location.of(getResource("nation_delete_deltas").toString()); - Optional acidInfo = AcidInfo.builder(partitionLocation) - .addDeleteDelta(partitionLocation.appendPath(deleteDeltaSubdir(3L, 3L, 0))) - .addDeleteDelta(partitionLocation.appendPath(deleteDeltaSubdir(4L, 4L, 0))) + TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + Location fileLocation = copyResource(fileSystemFactory, "nationFile25kRowsSortedOnNationKey/bucket_00000"); + long fileLength = fileSystemFactory.create(ConnectorIdentity.ofUser("test")).newInputFile(fileLocation).length(); + + Location deleteFile3 = copyResource(fileSystemFactory, "nation_delete_deltas/delete_delta_0000003_0000003_0000/bucket_00000"); + Location deleteFile4 = copyResource(fileSystemFactory, "nation_delete_deltas/delete_delta_0000004_0000004_0000/bucket_00000"); + Optional acidInfo = AcidInfo.builder(deleteFile3.parentDirectory().parentDirectory()) + .addDeleteDelta(deleteFile3.parentDirectory()) + .addDeleteDelta(deleteFile4.parentDirectory()) .build(); - assertRead(ALL_COLUMNS, OptionalLong.empty(), acidInfo, nationKey -> nationKey == 5 || nationKey == 19); + List actual = readFile(fileSystemFactory, ALL_COLUMNS, OptionalLong.empty(), acidInfo, fileLocation, fileLength); + + List expected = expectedResult(OptionalLong.empty(), nationKey -> nationKey == 5 || nationKey == 19, 1000); + assertEqualsByColumns(ALL_COLUMNS.keySet(), actual, expected); } @Test public void testReadWithAcidVersionValidationHive3() throws Exception { - File tableFile = new File(getResource("acid_version_validation/acid_version_hive_3/00000_0").toURI()); - Location tablePath = Location.of(tableFile.getParentFile().toURI().toString()); + TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + Location fileLocation = copyResource(fileSystemFactory, "acid_version_validation/acid_version_hive_3/00000_0"); - Optional acidInfo = AcidInfo.builder(tablePath) + Optional acidInfo = AcidInfo.builder(fileLocation.parentDirectory()) .setOrcAcidVersionValidated(false) .build(); - List result = readFile(Map.of(), OptionalLong.empty(), acidInfo, tableFile.getPath(), 625); + List result = readFile(fileSystemFactory, Map.of(), OptionalLong.empty(), acidInfo, fileLocation, 625); assertEquals(result.size(), 1); } @@ -142,14 +151,14 @@ public void testReadWithAcidVersionValidationHive3() public void testReadWithAcidVersionValidationNoVersionInMetadata() throws Exception { - File tableFile = new File(getResource("acid_version_validation/no_orc_acid_version_in_metadata/00000_0").toURI()); - Location tablePath = Location.of(tableFile.getParentFile().toURI().toString()); + TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + Location fileLocation = copyResource(fileSystemFactory, "acid_version_validation/no_orc_acid_version_in_metadata/00000_0"); - Optional acidInfo = AcidInfo.builder(tablePath) + Optional acidInfo = AcidInfo.builder(fileLocation.parentDirectory()) .setOrcAcidVersionValidated(false) .build(); - assertThatThrownBy(() -> readFile(Map.of(), OptionalLong.empty(), acidInfo, tableFile.getPath(), 730)) + assertThatThrownBy(() -> readFile(fileSystemFactory, Map.of(), OptionalLong.empty(), acidInfo, fileLocation, 730)) .hasMessageMatching("Hive transactional tables are supported since Hive 3.0. Expected `hive.acid.version` in ORC metadata" + " in .*/acid_version_validation/no_orc_acid_version_in_metadata/00000_0 to be >=2 but was ." + " If you have upgraded from an older version of Hive, make sure a major compaction has been run at least once after the upgrade."); @@ -159,17 +168,19 @@ public void testReadWithAcidVersionValidationNoVersionInMetadata() public void testFullFileReadOriginalFilesTable() throws Exception { - File tableFile = new File(getResource("fullacidNationTableWithOriginalFiles/000000_0").toURI()); - Location tablePath = Location.of(tableFile.toURI().toString()).parentDirectory(); + TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + Location fileLocation = copyResource(fileSystemFactory, "fullacidNationTableWithOriginalFiles/000000_0"); + Location deleteDeltaLocation = copyResource(fileSystemFactory, "fullacidNationTableWithOriginalFiles/delete_delta_10000001_10000001_0000/bucket_00000"); + Location tablePath = fileLocation.parentDirectory(); AcidInfo acidInfo = AcidInfo.builder(tablePath) - .addDeleteDelta(tablePath.appendPath(deleteDeltaSubdir(10000001, 10000001, 0))) - .addOriginalFile(tablePath.appendPath("000000_0"), 1780, 0) + .addDeleteDelta(deleteDeltaLocation.parentDirectory()) + .addOriginalFile(fileLocation, 1780, 0) .setOrcAcidVersionValidated(true) .buildWithRequiredOriginalFiles(0); List expected = expectedResult(OptionalLong.empty(), nationKey -> nationKey == 24, 1); - List result = readFile(ALL_COLUMNS, OptionalLong.empty(), Optional.of(acidInfo), tablePath + "/000000_0", 1780); + List result = readFile(fileSystemFactory, ALL_COLUMNS, OptionalLong.empty(), Optional.of(acidInfo), fileLocation, 1780); assertEquals(result.size(), expected.size()); int deletedRowKey = 24; @@ -179,6 +190,7 @@ public void testFullFileReadOriginalFilesTable() } private static void assertRead(Map columns, OptionalLong nationKeyPredicate, Optional acidInfo, LongPredicate deletedRows) + throws IOException { List actual = readFile(columns, nationKeyPredicate, acidInfo); @@ -203,18 +215,22 @@ private static List expectedResult(OptionalLong nationKeyPredicate, Long } private static List readFile(Map columns, OptionalLong nationKeyPredicate, Optional acidInfo) + throws IOException { // This file has the contains the TPC-H nation table which each row repeated 1000 times - try { - File testFile = new File(getResource("nationFile25kRowsSortedOnNationKey/bucket_00000").toURI()); - return readFile(columns, nationKeyPredicate, acidInfo, testFile.toURI().getPath(), testFile.length()); - } - catch (URISyntaxException e) { - throw new RuntimeException(e); - } + TrinoFileSystemFactory fileSystemFactory = new MemoryFileSystemFactory(); + Location fileLocation = copyResource(fileSystemFactory, "nationFile25kRowsSortedOnNationKey/bucket_00000"); + long fileLength = fileSystemFactory.create(ConnectorIdentity.ofUser("test")).newInputFile(fileLocation).length(); + return readFile(fileSystemFactory, columns, nationKeyPredicate, acidInfo, fileLocation, fileLength); } - private static List readFile(Map columns, OptionalLong nationKeyPredicate, Optional acidInfo, String filePath, long fileSize) + private static List readFile( + TrinoFileSystemFactory fileSystemFactory, + Map columns, + OptionalLong nationKeyPredicate, + Optional acidInfo, + Location location, + long fileSize) { TupleDomain tupleDomain = TupleDomain.all(); if (nationKeyPredicate.isPresent()) { @@ -229,9 +245,15 @@ private static List readFile(Map columns, Optiona .map(HiveColumnHandle::getName) .collect(toImmutableList()); - Optional pageSourceWithProjections = PAGE_SOURCE_FACTORY.createPageSource( + HivePageSourceFactory pageSourceFactory = new OrcPageSourceFactory( + new OrcReaderConfig(), + fileSystemFactory, + new FileFormatDataSourceStats(), + new HiveConfig()); + + Optional pageSourceWithProjections = pageSourceFactory.createPageSource( SESSION, - Location.of(filePath), + location, 0, fileSize, fileSize, @@ -291,17 +313,11 @@ private static List readFile(Map columns, Optiona private static HiveColumnHandle toHiveColumnHandle(NationColumn nationColumn, int hiveColumnIndex) { - Type trinoType; - switch (nationColumn.getType().getBase()) { - case IDENTIFIER: - trinoType = BIGINT; - break; - case VARCHAR: - trinoType = VARCHAR; - break; - default: - throw new IllegalStateException("Unexpected value: " + nationColumn.getType().getBase()); - } + Type trinoType = switch (nationColumn.getType().getBase()) { + case IDENTIFIER -> BIGINT; + case VARCHAR -> VARCHAR; + default -> throw new IllegalStateException("Unexpected value: " + nationColumn.getType().getBase()); + }; return createBaseColumn( nationColumn.getColumnName(), @@ -317,7 +333,7 @@ private static Properties createSchema() Properties schema = new Properties(); schema.setProperty(SERIALIZATION_LIB, ORC.getSerde()); schema.setProperty(FILE_INPUT_FORMAT, ORC.getInputFormat()); - schema.setProperty(TABLE_IS_TRANSACTIONAL, "true"); + schema.setProperty(TRANSACTIONAL, "true"); return schema; } @@ -333,4 +349,15 @@ private static void assertEqualsByColumns(Set columns, List"); } } + + private static Location copyResource(TrinoFileSystemFactory fileSystemFactory, String resourceName) + throws IOException + { + Location location = Location.of("memory:///" + resourceName); + TrinoFileSystem fileSystem = fileSystemFactory.create(ConnectorIdentity.ofUser("test")); + try (OutputStream outputStream = fileSystem.newOutputFile(location).create()) { + Resources.copy(getResource(resourceName), outputStream); + } + return location; + } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestAcidTables.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestAcidTables.java index bc78bccbcc82..f59df73221a9 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestAcidTables.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestAcidTables.java @@ -33,6 +33,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; +/** + * See TestHiveAcidUtils for equivalent tests against Hive AcidUtils from the Hive codebase. + */ public class TestAcidTables { private static final byte[] FAKE_DATA = {65, 66, 67}; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestHiveAcidUtils.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestHiveAcidUtils.java index da4e89ea6cd7..23395f8c219e 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestHiveAcidUtils.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/util/TestHiveAcidUtils.java @@ -31,6 +31,9 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; +/** + * Test the expected behavior of the Hive AcidUtils class from the Hive codebase. + */ public class TestHiveAcidUtils { @Test