diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java index b19dc39f2200..c5b9bd0ed5f9 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSinkProvider.java @@ -20,6 +20,7 @@ import io.airlift.event.client.EventClient; import io.airlift.json.JsonCodec; import io.airlift.units.DataSize; +import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.metastore.HiveMetastoreFactory; import io.trino.plugin.hive.metastore.HivePageSinkMetadataProvider; @@ -58,6 +59,7 @@ public class HivePageSinkProvider implements ConnectorPageSinkProvider { private final Set fileWriterFactories; + private final TrinoFileSystemFactory fileSystemFactory; private final HdfsEnvironment hdfsEnvironment; private final PageSorter pageSorter; private final HiveMetastoreFactory metastoreFactory; @@ -79,6 +81,7 @@ public class HivePageSinkProvider @Inject public HivePageSinkProvider( Set fileWriterFactories, + TrinoFileSystemFactory fileSystemFactory, HdfsEnvironment hdfsEnvironment, PageSorter pageSorter, HiveMetastoreFactory metastoreFactory, @@ -93,6 +96,7 @@ public HivePageSinkProvider( HiveWriterStats hiveWriterStats) { this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null")); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.pageSorter = requireNonNull(pageSorter, "pageSorter is null"); this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null"); @@ -154,6 +158,7 @@ private HivePageSink createPageSink(HiveWritableTableHandle handle, boolean isCr HiveWriterFactory writerFactory = new HiveWriterFactory( fileWriterFactories, + fileSystemFactory, handle.getSchemaName(), handle.getTableName(), isCreateTable, diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java index e41ca8276a23..8a3b39d35b1b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.hive; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -20,6 +21,8 @@ import com.google.common.collect.Sets; import io.airlift.event.client.EventClient; import io.airlift.units.DataSize; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import io.trino.plugin.hive.HiveSessionProperties.InsertExistingPartitionsBehavior; @@ -44,7 +47,6 @@ import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -86,7 +88,6 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_PATH_ALREADY_EXISTS; import static io.trino.plugin.hive.HiveErrorCode.HIVE_TABLE_READ_ONLY; import static io.trino.plugin.hive.HiveErrorCode.HIVE_UNSUPPORTED_FORMAT; -import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_OPEN_ERROR; import static io.trino.plugin.hive.HiveSessionProperties.getInsertExistingPartitionsBehavior; import static io.trino.plugin.hive.HiveSessionProperties.getTemporaryStagingDirectoryPath; import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision; @@ -112,7 +113,6 @@ import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT; import static org.apache.hadoop.hive.ql.io.AcidUtils.deleteDeltaSubdir; import static org.apache.hadoop.hive.ql.io.AcidUtils.deltaSubdir; @@ -126,6 +126,7 @@ public class HiveWriterFactory private static final Pattern BUCKET_FROM_FILENAME_PATTERN = Pattern.compile("(0[0-9]+)_.*"); private final Set fileWriterFactories; + private final TrinoFileSystemFactory fileSystemFactory; private final String schemaName; private final String tableName; private final AcidTransaction transaction; @@ -146,7 +147,6 @@ public class HiveWriterFactory private final HivePageSinkMetadataProvider pageSinkMetadataProvider; private final TypeManager typeManager; - private final HdfsEnvironment hdfsEnvironment; private final PageSorter pageSorter; private final JobConf conf; @@ -172,6 +172,7 @@ public class HiveWriterFactory public HiveWriterFactory( Set fileWriterFactories, + TrinoFileSystemFactory fileSystemFactory, String schemaName, String tableName, boolean isCreateTable, @@ -199,6 +200,7 @@ public HiveWriterFactory( HiveWriterStats hiveWriterStats) { this.fileWriterFactories = ImmutableSet.copyOf(requireNonNull(fileWriterFactories, "fileWriterFactories is null")); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); this.transaction = requireNonNull(transaction, "transaction is null"); @@ -214,7 +216,6 @@ public HiveWriterFactory( this.typeManager = requireNonNull(typeManager, "typeManager is null"); - this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.pageSorter = requireNonNull(pageSorter, "pageSorter is null"); this.sortBufferSize = requireNonNull(sortBufferSize, "sortBufferSize is null"); this.maxOpenSortFiles = maxOpenSortFiles; @@ -358,13 +359,19 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt if (!writeInfo.getWriteMode().isWritePathSameAsTargetPath()) { // When target path is different from write path, // verify that the target directory for the partition does not already exist - if (HiveWriteUtils.pathExists(new HdfsContext(session), hdfsEnvironment, writeInfo.getTargetPath())) { - throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, format( - "Target directory for new partition '%s' of table '%s.%s' already exists: %s", - partitionName, - schemaName, - tableName, - writeInfo.getTargetPath())); + String writeInfoTargetPath = writeInfo.getTargetPath().toString(); + try { + if (fileSystemFactory.create(session).newInputFile(writeInfoTargetPath).exists()) { + throw new TrinoException(HIVE_PATH_ALREADY_EXISTS, format( + "Target directory for new partition '%s' of table '%s.%s' already exists: %s", + partitionName, + schemaName, + tableName, + writeInfo.getTargetPath())); + } + } + catch (IOException e) { + throw new TrinoException(HIVE_FILESYSTEM_ERROR, format("Error while accessing: %s", writeInfoTargetPath), e); } } } @@ -570,26 +577,19 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt }; if (!sortedBy.isEmpty()) { - FileSystem fileSystem; + TrinoFileSystem fileSystem; Path tempFilePath; if (sortedWritingTempStagingPathEnabled) { String tempPrefix = sortedWritingTempStagingPath.replace( "${USER}", new HdfsContext(session).getIdentity().getUser()); + tempPrefix = setSchemeToFileIfAbsent(tempPrefix); tempFilePath = new Path(tempPrefix, ".tmp-sort." + path.getParent().getName() + "." + path.getName()); } else { tempFilePath = new Path(path.getParent(), ".tmp-sort." + path.getName()); } - try { - Configuration configuration = new Configuration(outputConf); - // Explicitly set the default FS to local file system to avoid getting HDFS when sortedWritingTempStagingPath specifies no scheme - configuration.set(FS_DEFAULT_NAME_KEY, "file:///"); - fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), tempFilePath, configuration); - } - catch (IOException e) { - throw new TrinoException(HIVE_WRITER_OPEN_ERROR, e); - } + fileSystem = fileSystemFactory.create(session); List types = dataColumns.stream() .map(column -> column.getHiveType().getType(typeManager, getTimestampPrecision(session))) @@ -643,17 +643,9 @@ public interface RowIdSortingFileWriterMaker public SortingFileWriter makeRowIdSortingWriter(FileWriter deleteFileWriter, Path path) { - FileSystem fileSystem; - Path tempFilePath = new Path(path.getParent(), ".tmp-sort." + path.getName()); - try { - Configuration configuration = new Configuration(conf); - // Explicitly set the default FS to local file system to avoid getting HDFS when sortedWritingTempStagingPath specifies no scheme - configuration.set(FS_DEFAULT_NAME_KEY, "file:///"); - fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), tempFilePath, configuration); - } - catch (IOException e) { - throw new TrinoException(HIVE_WRITER_OPEN_ERROR, e); - } + String parentPath = setSchemeToFileIfAbsent(path.getParent().toString()); + Path tempFilePath = new Path(parentPath, ".tmp-sort." + path.getName()); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); // The ORC columns are: operation, originalTransaction, bucket, rowId, row // The deleted rows should be sorted by originalTransaction, then by rowId List sortFields = ImmutableList.of(1, 3); @@ -816,6 +808,17 @@ public static String getFileExtension(JobConf conf, StorageFormat storageFormat) } } + @VisibleForTesting + static String setSchemeToFileIfAbsent(String pathString) + { + Path path = new Path(pathString); + String scheme = path.toUri().getScheme(); + if (scheme == null || scheme.equals("")) { + return "file:///" + pathString; + } + return pathString; + } + private static class DataColumn { private final String name; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java index 40795c42744c..daae0d42dd86 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/SortingFileWriter.java @@ -17,6 +17,8 @@ import com.google.common.io.Closer; import io.airlift.log.Logger; import io.airlift.units.DataSize; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoInputFile; import io.trino.orc.OrcDataSink; import io.trino.orc.OrcDataSource; import io.trino.orc.OrcDataSourceId; @@ -32,7 +34,6 @@ import io.trino.spi.connector.SortOrder; import io.trino.spi.type.Type; import io.trino.spi.type.TypeOperators; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.openjdk.jol.info.ClassLayout; @@ -66,7 +67,7 @@ public class SortingFileWriter private static final int INSTANCE_SIZE = toIntExact(ClassLayout.parseClass(SortingFileWriter.class).instanceSize()); - private final FileSystem fileSystem; + private final TrinoFileSystem fileSystem; private final Path tempFilePrefix; private final int maxOpenTempFiles; private final List types; @@ -80,7 +81,7 @@ public class SortingFileWriter private final TypeOperators typeOperators; public SortingFileWriter( - FileSystem fileSystem, + TrinoFileSystem fileSystem, Path tempFilePrefix, FileWriter outputWriter, DataSize maxMemory, @@ -212,12 +213,13 @@ private void mergeFiles(Iterable files, Consumer consumer) Collection> iterators = new ArrayList<>(); for (TempFile tempFile : files) { - Path file = tempFile.getPath(); + String file = tempFile.getPath(); + TrinoInputFile inputFile = fileSystem.newInputFile(file); OrcDataSource dataSource = new HdfsOrcDataSource( - new OrcDataSourceId(file.toString()), - fileSystem.getFileStatus(file).getLen(), + new OrcDataSourceId(file), + inputFile.length(), new OrcReaderOptions(), - fileSystem.open(file), + inputFile, new FileFormatDataSourceStats()); closer.register(dataSource); iterators.add(new TempFileReader(types, dataSource)); @@ -227,10 +229,7 @@ private void mergeFiles(Iterable files, Consumer consumer) .forEachRemaining(consumer); for (TempFile tempFile : files) { - Path file = tempFile.getPath(); - if (!fileSystem.delete(file, false)) { - throw new IOException("Failed to delete temporary file: " + file); - } + fileSystem.deleteFile(tempFile.getPath()); } } catch (IOException e) { @@ -240,7 +239,7 @@ private void mergeFiles(Iterable files, Consumer consumer) private void writeTempFile(Consumer consumer) { - Path tempFile = getTempFileName(); + String tempFile = getTempFileName(); try (TempFileWriter writer = new TempFileWriter(types, tempFileSinkFactory.createSink(fileSystem, tempFile))) { consumer.accept(writer); @@ -253,36 +252,34 @@ private void writeTempFile(Consumer consumer) } } - private void cleanupFile(Path file) + private void cleanupFile(String file) { try { - if (!fileSystem.delete(file, false)) { - throw new IOException("Delete failed"); - } + fileSystem.deleteFile(file); } catch (IOException e) { log.warn(e, "Failed to delete temporary file: %s", file); } } - private Path getTempFileName() + private String getTempFileName() { - return new Path(tempFilePrefix + "." + nextFileId.getAndIncrement()); + return tempFilePrefix + "." + nextFileId.getAndIncrement(); } private static class TempFile { - private final Path path; + private final String path; private final long size; - public TempFile(Path path, long size) + public TempFile(String path, long size) { checkArgument(size >= 0, "size is negative"); this.path = requireNonNull(path, "path is null"); this.size = size; } - public Path getPath() + public String getPath() { return path; } @@ -304,7 +301,7 @@ public String toString() public interface TempFileSinkFactory { - OrcDataSink createSink(FileSystem fileSystem, Path path) + OrcDataSink createSink(TrinoFileSystem fileSystem, String path) throws IOException; } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/HdfsOrcDataSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/HdfsOrcDataSource.java index 75c40078d5f8..f54fdfd6133f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/HdfsOrcDataSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/HdfsOrcDataSource.java @@ -14,13 +14,13 @@ package io.trino.plugin.hive.orc; import io.airlift.slice.Slice; -import io.trino.hdfs.FSDataInputStreamTail; +import io.trino.filesystem.TrinoInput; +import io.trino.filesystem.TrinoInputFile; import io.trino.orc.AbstractOrcDataSource; import io.trino.orc.OrcDataSourceId; import io.trino.orc.OrcReaderOptions; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.spi.TrinoException; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hdfs.BlockMissingException; import java.io.IOException; @@ -34,18 +34,19 @@ public class HdfsOrcDataSource extends AbstractOrcDataSource { - private final FSDataInputStream inputStream; + private final TrinoInput input; private final FileFormatDataSourceStats stats; public HdfsOrcDataSource( OrcDataSourceId id, long size, OrcReaderOptions options, - FSDataInputStream inputStream, + TrinoInputFile inputFile, FileFormatDataSourceStats stats) + throws IOException { super(id, size, options); - this.inputStream = requireNonNull(inputStream, "inputStream is null"); + this.input = requireNonNull(inputFile, "inputFile is null").newInput(); this.stats = requireNonNull(stats, "stats is null"); } @@ -53,7 +54,7 @@ public HdfsOrcDataSource( public void close() throws IOException { - inputStream.close(); + input.close(); } @Override @@ -62,8 +63,7 @@ protected Slice readTailInternal(int length) { // Handle potentially imprecise file lengths by reading the footer long readStart = System.nanoTime(); - FSDataInputStreamTail fileTail = FSDataInputStreamTail.readTail(getId().toString(), getEstimatedSize(), inputStream, length); - Slice tailSlice = fileTail.getTailSlice(); + Slice tailSlice = input.readTail(length); stats.readDataBytesPerSecond(tailSlice.length(), System.nanoTime() - readStart); return tailSlice; } @@ -73,7 +73,7 @@ protected void readInternal(long position, byte[] buffer, int bufferOffset, int { try { long readStart = System.nanoTime(); - inputStream.readFully(position, buffer, bufferOffset, bufferLength); + input.readFully(position, buffer, bufferOffset, bufferLength); stats.readDataBytesPerSecond(bufferLength, System.nanoTime() - readStart); } catch (TrinoException e) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java index 6d5ab8a4ca46..0d76d0385261 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSource.java @@ -14,7 +14,9 @@ package io.trino.plugin.hive.orc; import com.google.common.collect.ImmutableList; -import io.trino.hdfs.HdfsEnvironment; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.TrinoInputFile; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.orc.NameBasedFieldMapper; import io.trino.orc.OrcColumn; @@ -30,9 +32,6 @@ import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.security.ConnectorIdentity; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockMissingException; @@ -76,23 +75,22 @@ public class OrcDeleteDeltaPageSource private boolean closed; public static Optional createOrcDeleteDeltaPageSource( - Path path, + String path, long fileSize, OrcReaderOptions options, ConnectorIdentity identity, - Configuration configuration, - HdfsEnvironment hdfsEnvironment, - FileFormatDataSourceStats stats) + FileFormatDataSourceStats stats, + TrinoFileSystemFactory fileSystemFactory) { OrcDataSource orcDataSource; try { - FileSystem fileSystem = hdfsEnvironment.getFileSystem(identity, path, configuration); - FSDataInputStream inputStream = hdfsEnvironment.doAs(identity, () -> fileSystem.open(path)); + TrinoFileSystem fileSystem = fileSystemFactory.create(identity); + TrinoInputFile inputFile = fileSystem.newInputFile(path); orcDataSource = new HdfsOrcDataSource( - new OrcDataSourceId(path.toString()), + new OrcDataSourceId(path), fileSize, options, - inputStream, + inputFile, stats); } catch (Exception e) { @@ -129,7 +127,7 @@ public static Optional createOrcDeleteDeltaPageSource( } private OrcDeleteDeltaPageSource( - Path path, + String path, long fileSize, OrcReader reader, OrcDataSource orcDataSource, @@ -139,7 +137,7 @@ private OrcDeleteDeltaPageSource( this.stats = requireNonNull(stats, "stats is null"); this.orcDataSource = requireNonNull(orcDataSource, "orcDataSource is null"); - verifyAcidSchema(reader, path); + verifyAcidSchema(reader, new Path(path)); Map acidColumns = uniqueIndex( reader.getRootColumn().getNestedColumns(), orcColumn -> orcColumn.getColumnName().toLowerCase(ENGLISH)); @@ -228,7 +226,7 @@ public long getMemoryUsage() return memoryContext.getBytes(); } - private static String openError(Throwable t, Path path) + private static String openError(Throwable t, String path) { return format("Error opening Hive delete delta file %s: %s", path, t.getMessage()); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSourceFactory.java index 9411d6048e0b..3431cf269e62 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeleteDeltaPageSourceFactory.java @@ -13,13 +13,11 @@ */ package io.trino.plugin.hive.orc; -import io.trino.hdfs.HdfsEnvironment; +import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.orc.OrcReaderOptions; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.security.ConnectorIdentity; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import java.util.Optional; @@ -30,33 +28,29 @@ public class OrcDeleteDeltaPageSourceFactory { private final OrcReaderOptions options; private final ConnectorIdentity identity; - private final Configuration configuration; - private final HdfsEnvironment hdfsEnvironment; private final FileFormatDataSourceStats stats; + private final TrinoFileSystemFactory fileSystemFactory; public OrcDeleteDeltaPageSourceFactory( OrcReaderOptions options, ConnectorIdentity identity, - Configuration configuration, - HdfsEnvironment hdfsEnvironment, + TrinoFileSystemFactory fileSystemFactory, FileFormatDataSourceStats stats) { this.options = requireNonNull(options, "options is null"); this.identity = requireNonNull(identity, "identity is null"); - this.configuration = requireNonNull(configuration, "configuration is null"); - this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.stats = requireNonNull(stats, "stats is null"); } - public Optional createPageSource(Path path, long fileSize) + public Optional createPageSource(String path, long fileSize) { return createOrcDeleteDeltaPageSource( path, fileSize, options, identity, - configuration, - hdfsEnvironment, - stats); + stats, + fileSystemFactory); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeletedRows.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeletedRows.java index 4724e193c907..b642049f5391 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeletedRows.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcDeletedRows.java @@ -326,7 +326,7 @@ public Optional> loadOrYield() currentPath = createPath(acidInfo, deleteDeltaInfo, sourceFileName); FileSystem fileSystem = hdfsEnvironment.getFileSystem(identity, currentPath, configuration); FileStatus fileStatus = hdfsEnvironment.doAs(identity, () -> fileSystem.getFileStatus(currentPath)); - currentPageSource = pageSourceFactory.createPageSource(fileStatus.getPath(), fileStatus.getLen()).orElseGet(() -> new EmptyPageSource()); + currentPageSource = pageSourceFactory.createPageSource(fileStatus.getPath().toString(), fileStatus.getLen()).orElseGet(() -> new EmptyPageSource()); } while (!currentPageSource.isFinished() || currentPage != null) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriterFactory.java index 7abefae60126..37fbbfbd7178 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcFileWriterFactory.java @@ -14,7 +14,9 @@ package io.trino.plugin.hive.orc; import com.google.common.collect.ImmutableMap; -import io.trino.hdfs.HdfsEnvironment; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.TrinoInputFile; import io.trino.hive.orc.OrcConf; import io.trino.orc.OrcDataSink; import io.trino.orc.OrcDataSource; @@ -36,7 +38,6 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; import org.apache.hadoop.mapred.JobConf; @@ -81,7 +82,7 @@ public class OrcFileWriterFactory implements HiveFileWriterFactory { - private final HdfsEnvironment hdfsEnvironment; + private final TrinoFileSystemFactory fileSystemFactory; private final TypeManager typeManager; private final NodeVersion nodeVersion; private final FileFormatDataSourceStats readStats; @@ -90,32 +91,32 @@ public class OrcFileWriterFactory @Inject public OrcFileWriterFactory( - HdfsEnvironment hdfsEnvironment, + TrinoFileSystemFactory fileSystemFactory, TypeManager typeManager, NodeVersion nodeVersion, FileFormatDataSourceStats readStats, OrcWriterConfig config) { this( - hdfsEnvironment, typeManager, nodeVersion, readStats, - config.toOrcWriterOptions()); + config.toOrcWriterOptions(), + fileSystemFactory); } public OrcFileWriterFactory( - HdfsEnvironment hdfsEnvironment, TypeManager typeManager, NodeVersion nodeVersion, FileFormatDataSourceStats readStats, - OrcWriterOptions orcWriterOptions) + OrcWriterOptions orcWriterOptions, + TrinoFileSystemFactory fileSystemFactory) { - this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null"); this.readStats = requireNonNull(readStats, "readStats is null"); this.orcWriterOptions = requireNonNull(orcWriterOptions, "orcWriterOptions is null"); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); } @Managed @@ -160,18 +161,20 @@ public Optional createFileWriter( } try { - FileSystem fileSystem = hdfsEnvironment.getFileSystem(session.getIdentity(), path, configuration); - OrcDataSink orcDataSink = createOrcDataSink(fileSystem, path); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + String stringPath = path.toString(); + OrcDataSink orcDataSink = createOrcDataSink(fileSystem, stringPath); Optional> validationInputFactory = Optional.empty(); if (isOrcOptimizedWriterValidate(session)) { validationInputFactory = Optional.of(() -> { try { + TrinoInputFile inputFile = fileSystem.newInputFile(stringPath); return new HdfsOrcDataSource( - new OrcDataSourceId(path.toString()), - fileSystem.getFileStatus(path).getLen(), + new OrcDataSourceId(stringPath), + inputFile.length(), new OrcReaderOptions(), - fileSystem.open(path), + inputFile, readStats); } catch (IOException e) { @@ -181,7 +184,7 @@ public Optional createFileWriter( } Callable rollbackAction = () -> { - fileSystem.delete(path, false); + fileSystem.deleteFile(stringPath); return null; }; @@ -237,10 +240,10 @@ public static HiveType createHiveRowType(Properties schema, TypeManager typeMana return toHiveType(acidRowType); } - public static OrcDataSink createOrcDataSink(FileSystem fileSystem, Path path) + public static OrcDataSink createOrcDataSink(TrinoFileSystem fileSystem, String path) throws IOException { - return new OutputStreamOrcDataSink(fileSystem.create(path, false)); + return new OutputStreamOrcDataSink(fileSystem.newOutputFile(path).create()); } private static CompressionKind getCompression(Properties schema, JobConf configuration) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java index b7e0003c778e..933704ca0d44 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OrcPageSourceFactory.java @@ -17,6 +17,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import io.airlift.slice.Slice; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.TrinoInputFile; import io.trino.hdfs.HdfsEnvironment; import io.trino.memory.context.AggregatedMemoryContext; import io.trino.orc.NameBasedFieldMapper; @@ -50,8 +53,6 @@ import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.type.Type; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockMissingException; import org.apache.hadoop.hive.ql.io.orc.OrcSerde; @@ -123,16 +124,23 @@ public class OrcPageSourceFactory private static final Pattern DEFAULT_HIVE_COLUMN_NAME_PATTERN = Pattern.compile("_col\\d+"); private final OrcReaderOptions orcReaderOptions; private final HdfsEnvironment hdfsEnvironment; + private final TrinoFileSystemFactory fileSystemFactory; private final FileFormatDataSourceStats stats; private final DateTimeZone legacyTimeZone; private final int domainCompactionThreshold; @Inject - public OrcPageSourceFactory(OrcReaderConfig config, HdfsEnvironment hdfsEnvironment, FileFormatDataSourceStats stats, HiveConfig hiveConfig) + public OrcPageSourceFactory( + OrcReaderConfig config, + HdfsEnvironment hdfsEnvironment, + TrinoFileSystemFactory fileSystemFactory, + FileFormatDataSourceStats stats, + HiveConfig hiveConfig) { this( config.toOrcReaderOptions(), hdfsEnvironment, + fileSystemFactory, stats, hiveConfig.getOrcLegacyDateTimeZone(), hiveConfig.getDomainCompactionThreshold()); @@ -141,15 +149,17 @@ public OrcPageSourceFactory(OrcReaderConfig config, HdfsEnvironment hdfsEnvironm public OrcPageSourceFactory( OrcReaderOptions orcReaderOptions, HdfsEnvironment hdfsEnvironment, + TrinoFileSystemFactory fileSystemFactory, FileFormatDataSourceStats stats, DateTimeZone legacyTimeZone) { - this(orcReaderOptions, hdfsEnvironment, stats, legacyTimeZone, 0); + this(orcReaderOptions, hdfsEnvironment, fileSystemFactory, stats, legacyTimeZone, 0); } public OrcPageSourceFactory( OrcReaderOptions orcReaderOptions, HdfsEnvironment hdfsEnvironment, + TrinoFileSystemFactory fileSystemFactory, FileFormatDataSourceStats stats, DateTimeZone legacyTimeZone, int domainCompactionThreshold) @@ -159,6 +169,7 @@ public OrcPageSourceFactory( this.stats = requireNonNull(stats, "stats is null"); this.legacyTimeZone = legacyTimeZone; this.domainCompactionThreshold = domainCompactionThreshold; + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); } @Override @@ -252,13 +263,13 @@ private ConnectorPageSource createOrcPageSource( boolean originalFilesPresent = acidInfo.isPresent() && !acidInfo.get().getOriginalFiles().isEmpty(); try { - FileSystem fileSystem = hdfsEnvironment.getFileSystem(identity, path, configuration); - FSDataInputStream inputStream = hdfsEnvironment.doAs(identity, () -> fileSystem.open(path)); + TrinoFileSystem fileSystem = fileSystemFactory.create(identity); + TrinoInputFile inputFile = fileSystem.newInputFile(path.toString()); orcDataSource = new HdfsOrcDataSource( new OrcDataSourceId(path.toString()), estimatedFileSize, options, - inputStream, + inputFile, stats); } catch (Exception e) { @@ -397,7 +408,7 @@ else if (column.getBaseHiveColumnIndex() < fileColumns.size()) { Optional deletedRows = acidInfo.map(info -> new OrcDeletedRows( path.getName(), - new OrcDeleteDeltaPageSourceFactory(options, identity, configuration, hdfsEnvironment, stats), + new OrcDeleteDeltaPageSourceFactory(options, identity, fileSystemFactory, stats), identity, configuration, hdfsEnvironment, @@ -411,10 +422,9 @@ else if (column.getBaseHiveColumnIndex() < fileColumns.size()) { .map(info -> OriginalFilesUtils.getPrecedingRowCount( acidInfo.get().getOriginalFiles(), path, - hdfsEnvironment, + fileSystemFactory, identity, options, - configuration, stats)); if (transaction.isDelete()) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OriginalFilesUtils.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OriginalFilesUtils.java index 573c902fb9fd..b20eb1b1e002 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OriginalFilesUtils.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/orc/OriginalFilesUtils.java @@ -13,7 +13,9 @@ */ package io.trino.plugin.hive.orc; -import io.trino.hdfs.HdfsEnvironment; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.TrinoInputFile; import io.trino.orc.OrcDataSource; import io.trino.orc.OrcDataSourceId; import io.trino.orc.OrcReader; @@ -21,9 +23,6 @@ import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.spi.TrinoException; import io.trino.spi.security.ConnectorIdentity; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.util.Collection; @@ -48,17 +47,16 @@ private OriginalFilesUtils() {} public static long getPrecedingRowCount( Collection originalFileInfos, Path splitPath, - HdfsEnvironment hdfsEnvironment, + TrinoFileSystemFactory fileSystemFactory, ConnectorIdentity identity, OrcReaderOptions options, - Configuration configuration, FileFormatDataSourceStats stats) { long rowCount = 0; for (OriginalFileInfo originalFileInfo : originalFileInfos) { Path path = new Path(splitPath.getParent() + "/" + originalFileInfo.getName()); if (path.compareTo(splitPath) < 0) { - rowCount += getRowsInFile(path, hdfsEnvironment, identity, options, configuration, stats, originalFileInfo.getFileSize()); + rowCount += getRowsInFile(path.toString(), fileSystemFactory, identity, options, stats, originalFileInfo.getFileSize()); } } @@ -69,22 +67,21 @@ public static long getPrecedingRowCount( * Returns number of rows present in the file, based on the ORC footer. */ private static Long getRowsInFile( - Path splitPath, - HdfsEnvironment hdfsEnvironment, + String splitPath, + TrinoFileSystemFactory fileSystemFactory, ConnectorIdentity identity, OrcReaderOptions options, - Configuration configuration, FileFormatDataSourceStats stats, long fileSize) { try { - FileSystem fileSystem = hdfsEnvironment.getFileSystem(identity, splitPath, configuration); - FSDataInputStream inputStream = hdfsEnvironment.doAs(identity, () -> fileSystem.open(splitPath)); + TrinoFileSystem fileSystem = fileSystemFactory.create(identity); + TrinoInputFile inputFile = fileSystem.newInputFile(splitPath); try (OrcDataSource orcDataSource = new HdfsOrcDataSource( - new OrcDataSourceId(splitPath.toString()), + new OrcDataSourceId(splitPath), fileSize, options, - inputStream, + inputFile, stats)) { OrcReader reader = createOrcReader(orcDataSource, options) .orElseThrow(() -> new TrinoException(HIVE_CANNOT_OPEN_SPLIT, "Could not read ORC footer from empty file: " + splitPath)); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java index 01fc4a664143..a3ddb5c9400e 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHive.java @@ -24,6 +24,7 @@ import io.airlift.stats.CounterStat; import io.airlift.units.DataSize; import io.airlift.units.Duration; +import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.hdfs.HdfsConfig; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; @@ -894,6 +895,7 @@ public Optional getMaterializedView(Connect hiveConfig.getMaxPartitionsPerScan()); pageSinkProvider = new HivePageSinkProvider( getDefaultHiveFileWriterFactories(hiveConfig, hdfsEnvironment), + new HdfsFileSystemFactory(hdfsEnvironment), hdfsEnvironment, PAGE_SORTER, HiveMetastoreFactory.ofInstance(metastoreClient), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index 708c4d435880..fa64e214ec3e 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -23,6 +23,7 @@ import io.airlift.json.JsonCodec; import io.airlift.slice.Slice; import io.airlift.stats.CounterStat; +import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.hdfs.HdfsConfig; import io.trino.hdfs.HdfsConfiguration; import io.trino.hdfs.HdfsContext; @@ -238,6 +239,7 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec BlockTypeOperators blockTypeOperators = new BlockTypeOperators(typeOperators); pageSinkProvider = new HivePageSinkProvider( getDefaultHiveFileWriterFactories(config, hdfsEnvironment), + new HdfsFileSystemFactory(hdfsEnvironment), hdfsEnvironment, PAGE_SORTER, HiveMetastoreFactory.ofInstance(metastoreClient), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java index 82e6051ce8e2..bcba537b80e8 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java @@ -98,6 +98,8 @@ private HiveTestUtils() {} new HdfsConfig(), new NoHdfsAuthentication()); + public static final HdfsFileSystemFactory HDFS_FILE_SYSTEM_FACTORY = new HdfsFileSystemFactory(HDFS_ENVIRONMENT); + public static final PageSorter PAGE_SORTER = new PagesIndexPageSorter(new PagesIndex.TestingFactory(false)); public static ConnectorSession getHiveSession(HiveConfig hiveConfig) @@ -155,7 +157,7 @@ public static Set getDefaultHivePageSourceFactories(HdfsE FileFormatDataSourceStats stats = new FileFormatDataSourceStats(); return ImmutableSet.builder() .add(new RcFilePageSourceFactory(TESTING_TYPE_MANAGER, hdfsEnvironment, stats, hiveConfig)) - .add(new OrcPageSourceFactory(new OrcReaderConfig(), hdfsEnvironment, stats, hiveConfig)) + .add(new OrcPageSourceFactory(new OrcReaderConfig(), hdfsEnvironment, fileSystemFactory, stats, hiveConfig)) .add(new ParquetPageSourceFactory(fileSystemFactory, stats, new ParquetReaderConfig(), hiveConfig)) .build(); } @@ -178,7 +180,7 @@ public static Set getDefaultHiveFileWriterFactories(HiveC private static OrcFileWriterFactory getDefaultOrcFileWriterFactory(HdfsEnvironment hdfsEnvironment) { return new OrcFileWriterFactory( - hdfsEnvironment, + new HdfsFileSystemFactory(hdfsEnvironment), TESTING_TYPE_MANAGER, new NodeVersion("test_version"), new FileFormatDataSourceStats(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java index 8fdaf598cf1b..74aca945af84 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java @@ -83,6 +83,7 @@ import static io.trino.plugin.hive.HiveStorageFormat.SEQUENCEFILE; import static io.trino.plugin.hive.HiveStorageFormat.TEXTFILE; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static io.trino.plugin.hive.HiveTestUtils.createGenericHiveRecordCursorProvider; import static io.trino.plugin.hive.HiveTestUtils.getHiveSession; @@ -302,7 +303,7 @@ public void testOrc(int rowCount, long fileSizePadding) .withColumns(TEST_COLUMNS) .withRowsCount(rowCount) .withFileSizePadding(fileSizePadding) - .isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, STATS, UTC)); + .isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_FACTORY, STATS, UTC)); } @Test(dataProvider = "validRowAndFileSizePadding") @@ -330,9 +331,9 @@ public void testOrcOptimizedWriter(int rowCount, long fileSizePadding) .withRowsCount(rowCount) .withSession(session) .withFileSizePadding(fileSizePadding) - .withFileWriterFactory(new OrcFileWriterFactory(HDFS_ENVIRONMENT, TESTING_TYPE_MANAGER, new NodeVersion("test"), STATS, new OrcWriterOptions())) + .withFileWriterFactory(new OrcFileWriterFactory(TESTING_TYPE_MANAGER, new NodeVersion("test"), STATS, new OrcWriterOptions(), HDFS_FILE_SYSTEM_FACTORY)) .isReadableByRecordCursor(createGenericHiveRecordCursorProvider(HDFS_ENVIRONMENT)) - .isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, STATS, UTC)); + .isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_FACTORY, STATS, UTC)); } @Test(dataProvider = "rowCount") @@ -351,7 +352,7 @@ public void testOrcUseColumnNames(int rowCount) .withRowsCount(rowCount) .withReadColumns(Lists.reverse(testColumns)) .withSession(session) - .isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, STATS, UTC)); + .isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_FACTORY, STATS, UTC)); } @Test(dataProvider = "rowCount") @@ -368,7 +369,7 @@ public void testOrcUseColumnNameLowerCaseConversion(int rowCount) .withRowsCount(rowCount) .withReadColumns(TEST_COLUMNS) .withSession(session) - .isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, STATS, UTC)); + .isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_FACTORY, STATS, UTC)); } @Test(dataProvider = "validRowAndFileSizePadding") @@ -530,7 +531,7 @@ public void testTruncateVarcharColumn() assertThatFileFormat(ORC) .withWriteColumns(ImmutableList.of(writeColumn)) .withReadColumns(ImmutableList.of(readColumn)) - .isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, STATS, UTC)); + .isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_FACTORY, STATS, UTC)); assertThatFileFormat(PARQUET) .withWriteColumns(ImmutableList.of(writeColumn)) @@ -630,13 +631,13 @@ public void testORCProjectedColumns(int rowCount) .withReadColumns(readColumns) .withRowsCount(rowCount) .withSession(session) - .isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, STATS, UTC)); + .isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_FACTORY, STATS, UTC)); assertThatFileFormat(ORC) .withWriteColumns(writeColumns) .withReadColumns(readColumns) .withRowsCount(rowCount) - .isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, STATS, UTC)); + .isReadableByPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_FACTORY, STATS, UTC)); } @Test(dataProvider = "rowCount") @@ -828,7 +829,7 @@ public void testFailForLongVarcharPartitionColumn() assertThatFileFormat(ORC) .withColumns(columns) - .isFailingForPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, STATS, UTC), expectedErrorCode, expectedMessage); + .isFailingForPageSource(new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_FACTORY, STATS, UTC), expectedErrorCode, expectedMessage); assertThatFileFormat(PARQUET) .withColumns(columns) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java index eb0afc9313e5..c6afaf2d7430 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHivePageSink.java @@ -66,6 +66,7 @@ import static io.trino.plugin.hive.HiveCompressionOption.LZ4; import static io.trino.plugin.hive.HiveCompressionOption.NONE; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.plugin.hive.HiveTestUtils.PAGE_SORTER; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveFileWriterFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHivePageSourceFactories; @@ -298,6 +299,7 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio BlockTypeOperators blockTypeOperators = new BlockTypeOperators(typeOperators); HivePageSinkProvider provider = new HivePageSinkProvider( getDefaultHiveFileWriterFactories(config, HDFS_ENVIRONMENT), + HDFS_FILE_SYSTEM_FACTORY, HDFS_ENVIRONMENT, PAGE_SORTER, HiveMetastoreFactory.ofInstance(metastore), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveWriterFactory.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveWriterFactory.java index e588ad1eaeb4..ed13328cb5f6 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveWriterFactory.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveWriterFactory.java @@ -13,10 +13,14 @@ */ package io.trino.plugin.hive; +import org.apache.hadoop.fs.Path; import org.testng.annotations.Test; +import java.net.URI; + import static io.trino.plugin.hive.HiveWriterFactory.computeNonTransactionalBucketedFilename; import static io.trino.plugin.hive.HiveWriterFactory.computeTransactionalBucketedFilename; +import static io.trino.plugin.hive.HiveWriterFactory.setSchemeToFileIfAbsent; import static org.apache.hadoop.hive.ql.exec.Utilities.getBucketIdFromFile; import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; @@ -34,4 +38,36 @@ public void testComputeBucketedFileName() assertEquals(name, "001234_0"); assertEquals(getBucketIdFromFile(name), 1234); } + + @Test + public void testSetsSchemeToFile() + { + String pathWithoutScheme = "/simple/file/path"; + String result = setSchemeToFileIfAbsent(pathWithoutScheme); + assertThat(result).isEqualTo("file:////simple/file/path"); + URI resultUri = new Path(result).toUri(); + assertThat(resultUri.getScheme()).isEqualTo("file"); + assertThat(resultUri.getPath()).isEqualTo("/simple/file/path"); + + String pathWithScheme = "s3://simple/file/path"; + result = setSchemeToFileIfAbsent(pathWithScheme); + assertThat(result).isEqualTo(pathWithScheme); + resultUri = new Path(result).toUri(); + assertThat(resultUri.getScheme()).isEqualTo("s3"); + assertThat(resultUri.getPath()).isEqualTo("/file/path"); + + String pathWithEmptySpaces = "/simple/file 1/path"; + result = setSchemeToFileIfAbsent(pathWithEmptySpaces); + assertThat(result).isEqualTo("file:////simple/file 1/path"); + resultUri = new Path(result).toUri(); + assertThat(resultUri.getScheme()).isEqualTo("file"); + assertThat(resultUri.getPath()).isEqualTo("/simple/file 1/path"); + + String pathWithEmptySpacesAndScheme = "s3://simple/file 1/path"; + result = setSchemeToFileIfAbsent(pathWithEmptySpacesAndScheme); + assertThat(result).isEqualTo(pathWithEmptySpacesAndScheme); + resultUri = new Path(result).toUri(); + assertThat(resultUri.getScheme()).isEqualTo("s3"); + assertThat(resultUri.getPath()).isEqualTo("/file 1/path"); + } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java index 12ec5006012a..5aeb660e6e2f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOrcPageSourceMemoryTracking.java @@ -103,6 +103,7 @@ import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; import static io.trino.plugin.hive.HivePageSourceProvider.ColumnMapping.buildColumnMappings; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static io.trino.plugin.hive.acid.AcidTransaction.NO_ACID_TRANSACTION; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; @@ -551,7 +552,7 @@ public ConnectorPageSource newPageSource() public ConnectorPageSource newPageSource(FileFormatDataSourceStats stats, ConnectorSession session) { - OrcPageSourceFactory orcPageSourceFactory = new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, stats, UTC); + OrcPageSourceFactory orcPageSourceFactory = new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_FACTORY, stats, UTC); List columnMappings = buildColumnMappings( partitionName, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOriginalFilesUtils.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOriginalFilesUtils.java index f60a0aaf2cb6..278711998c1d 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOriginalFilesUtils.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestOriginalFilesUtils.java @@ -16,9 +16,7 @@ import com.google.common.io.Resources; import io.trino.orc.OrcReaderOptions; import io.trino.plugin.hive.orc.OriginalFilesUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -26,23 +24,20 @@ import java.util.ArrayList; import java.util.List; -import static io.trino.hadoop.ConfigurationInstantiator.newEmptyConfiguration; import static io.trino.plugin.hive.AcidInfo.OriginalFileInfo; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.testing.TestingConnectorSession.SESSION; import static org.testng.Assert.assertEquals; public class TestOriginalFilesUtils { private String tablePath; - private Configuration config; @BeforeClass public void setup() throws Exception { tablePath = new File(Resources.getResource(("dummy_id_data_orc")).toURI()).getPath(); - config = new JobConf(newEmptyConfiguration()); } @Test @@ -54,10 +49,9 @@ public void testGetPrecedingRowCountSingleFile() long rowCountResult = OriginalFilesUtils.getPrecedingRowCount( originalFileInfoList, new Path(tablePath + "/000001_0"), - HDFS_ENVIRONMENT, + HDFS_FILE_SYSTEM_FACTORY, SESSION.getIdentity(), new OrcReaderOptions(), - config, new FileFormatDataSourceStats()); assertEquals(rowCountResult, 0, "Original file should have 0 as the starting row count"); } @@ -74,10 +68,9 @@ public void testGetPrecedingRowCount() long rowCountResult = OriginalFilesUtils.getPrecedingRowCount( originalFileInfos, new Path(tablePath + "/000002_0_copy_2"), - HDFS_ENVIRONMENT, + HDFS_FILE_SYSTEM_FACTORY, SESSION.getIdentity(), new OrcReaderOptions(), - config, new FileFormatDataSourceStats()); // Bucket-2 has original files: 000002_0, 000002_0_copy_1. Each file original file has 4 rows. // So, starting row ID of 000002_0_copy_2 = row count of original files in Bucket-2 before it in lexicographic order. diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/StandardFileFormats.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/StandardFileFormats.java index a0ccde2547b2..8dc4b0c5b743 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/StandardFileFormats.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/benchmark/StandardFileFormats.java @@ -59,6 +59,7 @@ import static io.trino.orc.OrcWriteValidation.OrcWriteValidationMode.BOTH; import static io.trino.parquet.writer.ParquetSchemaConverter.HIVE_PARQUET_USE_INT96_TIMESTAMP_ENCODING; import static io.trino.parquet.writer.ParquetSchemaConverter.HIVE_PARQUET_USE_LEGACY_DECIMAL_ENCODING; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.plugin.hive.HiveTestUtils.createGenericHiveRecordCursorProvider; import static io.trino.plugin.hive.benchmark.AbstractFileFormat.createSchema; import static io.trino.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat; @@ -143,7 +144,7 @@ public HiveStorageFormat getFormat() @Override public Optional getHivePageSourceFactory(HdfsEnvironment hdfsEnvironment) { - return Optional.of(new OrcPageSourceFactory(new OrcReaderOptions(), hdfsEnvironment, new FileFormatDataSourceStats(), UTC)); + return Optional.of(new OrcPageSourceFactory(new OrcReaderOptions(), hdfsEnvironment, HDFS_FILE_SYSTEM_FACTORY, new FileFormatDataSourceStats(), UTC)); } @Override diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcDeleteDeltaPageSource.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcDeleteDeltaPageSource.java index ee634110da62..3c6189277314 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcDeleteDeltaPageSource.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcDeleteDeltaPageSource.java @@ -21,16 +21,14 @@ import io.trino.spi.security.ConnectorIdentity; import io.trino.testing.MaterializedResult; import io.trino.testing.MaterializedRow; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.BucketCodec; -import org.apache.hadoop.mapred.JobConf; import org.testng.annotations.Test; import java.io.File; import static io.trino.hadoop.ConfigurationInstantiator.newEmptyConfiguration; -import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.IntegerType.INTEGER; @@ -46,11 +44,10 @@ public void testReadingDeletedRows() OrcDeleteDeltaPageSourceFactory pageSourceFactory = new OrcDeleteDeltaPageSourceFactory( new OrcReaderOptions(), ConnectorIdentity.ofUser("test"), - new JobConf(newEmptyConfiguration()), - HDFS_ENVIRONMENT, + HDFS_FILE_SYSTEM_FACTORY, new FileFormatDataSourceStats()); - ConnectorPageSource pageSource = pageSourceFactory.createPageSource(new Path(deleteDeltaFile.toURI()), deleteDeltaFile.length()).orElseThrow(); + ConnectorPageSource pageSource = pageSourceFactory.createPageSource(deleteDeltaFile.toURI().toString(), deleteDeltaFile.length()).orElseThrow(); MaterializedResult materializedRows = MaterializedResult.materializeSourceDataStream(SESSION, pageSource, ImmutableList.of(BIGINT, INTEGER, BIGINT)); assertEquals(materializedRows.getRowCount(), 1); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcDeletedRows.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcDeletedRows.java index 715f219449af..3622c1d7b7d9 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcDeletedRows.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcDeletedRows.java @@ -35,6 +35,7 @@ import static io.trino.hadoop.ConfigurationInstantiator.newEmptyConfiguration; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.IntegerType.INTEGER; @@ -157,8 +158,7 @@ private static OrcDeletedRows createOrcDeletedRows(AcidInfo acidInfo, String sou OrcDeleteDeltaPageSourceFactory pageSourceFactory = new OrcDeleteDeltaPageSourceFactory( new OrcReaderOptions(), ConnectorIdentity.ofUser("test"), - configuration, - HDFS_ENVIRONMENT, + HDFS_FILE_SYSTEM_FACTORY, new FileFormatDataSourceStats()); OrcDeletedRows deletedRows = new OrcDeletedRows( diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java index 40e928fdad51..aafc77002936 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPageSourceFactory.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.trino.filesystem.hdfs.HdfsFileSystemFactory; import io.trino.plugin.hive.AcidInfo; import io.trino.plugin.hive.FileFormatDataSourceStats; import io.trino.plugin.hive.HiveColumnHandle; @@ -78,6 +79,7 @@ public class TestOrcPageSourceFactory private static final HivePageSourceFactory PAGE_SOURCE_FACTORY = new OrcPageSourceFactory( new OrcReaderConfig(), HDFS_ENVIRONMENT, + new HdfsFileSystemFactory(HDFS_ENVIRONMENT), new FileFormatDataSourceStats(), new HiveConfig()); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPredicates.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPredicates.java index a56e31c132b8..9ec50a2cb643 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPredicates.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/orc/TestOrcPredicates.java @@ -51,6 +51,7 @@ import static io.trino.plugin.hive.HivePageSourceProvider.ColumnMapping.buildColumnMappings; import static io.trino.plugin.hive.HiveStorageFormat.ORC; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY; import static io.trino.plugin.hive.HiveTestUtils.getHiveSession; import static io.trino.plugin.hive.acid.AcidTransaction.NO_ACID_TRANSACTION; import static io.trino.spi.type.BigintType.BIGINT; @@ -99,7 +100,7 @@ private void testOrcPredicates(ConnectorSession session) file.delete(); try { // Write data - OrcFileWriterFactory writerFactory = new OrcFileWriterFactory(HDFS_ENVIRONMENT, TESTING_TYPE_MANAGER, new NodeVersion("test"), STATS, new OrcWriterOptions()); + OrcFileWriterFactory writerFactory = new OrcFileWriterFactory(TESTING_TYPE_MANAGER, new NodeVersion("test"), STATS, new OrcWriterOptions(), HDFS_FILE_SYSTEM_FACTORY); FileSplit split = createTestFileTrino(file.getAbsolutePath(), ORC, HiveCompressionCodec.NONE, columnsToWrite, session, NUM_ROWS, writerFactory); TupleDomain testingPredicate; @@ -163,7 +164,7 @@ private ConnectorPageSource createPageSource( ConnectorSession session, FileSplit split) { - OrcPageSourceFactory readerFactory = new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, STATS, UTC); + OrcPageSourceFactory readerFactory = new OrcPageSourceFactory(new OrcReaderOptions(), HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_FACTORY, STATS, UTC); Properties splitProperties = new Properties(); splitProperties.setProperty(FILE_INPUT_FORMAT, ORC.getInputFormat());