Skip to content

Commit 1113994

Browse files
committed
Remove usages of Hadoop Path for Hive TrinoFileStatus
1 parent ee7f913 commit 1113994

File tree

9 files changed

+53
-54
lines changed

9 files changed

+53
-54
lines changed

plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.airlift.units.Duration;
2626
import io.trino.filesystem.FileEntry;
2727
import io.trino.filesystem.Location;
28+
import io.trino.filesystem.Locations;
2829
import io.trino.filesystem.TrinoFileSystem;
2930
import io.trino.filesystem.TrinoFileSystemFactory;
3031
import io.trino.hdfs.HdfsContext;
@@ -433,7 +434,7 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
433434
boolean s3SelectPushdownEnabled = S3SelectPushdown.shouldEnablePushdownForTable(session, table, path.toString(), partition.getPartition());
434435
// S3 Select pushdown works at the granularity of individual S3 objects for compressed files
435436
// and finer granularity for uncompressed files using scan range feature.
436-
boolean shouldEnableSplits = S3SelectPushdown.isSplittable(s3SelectPushdownEnabled, schema, inputFormat, path);
437+
boolean shouldEnableSplits = S3SelectPushdown.isSplittable(s3SelectPushdownEnabled, schema, inputFormat, path.toString());
437438
// Skip header / footer lines are not splittable except for a special case when skip.header.line.count=1
438439
boolean splittable = shouldEnableSplits && getFooterCount(schema) == 0 && getHeaderCount(schema) <= 1;
439440

@@ -641,7 +642,7 @@ Optional<Iterator<InternalHiveSplit>> buildManifestFileIterator(
641642

642643
Map<Path, TrinoFileStatus> fileStatuses = new HashMap<>();
643644
HiveFileIterator fileStatusIterator = new HiveFileIterator(table, parent, targetFilesystem, directoryLister, namenodeStats, IGNORED, false);
644-
fileStatusIterator.forEachRemaining(status -> fileStatuses.put(getPathWithoutSchemeAndAuthority(status.getPath()), status));
645+
fileStatusIterator.forEachRemaining(status -> fileStatuses.put(getPathWithoutSchemeAndAuthority(new Path(status.getPath())), status));
645646

646647
List<TrinoFileStatus> locatedFileStatuses = new ArrayList<>();
647648
for (Path path : paths) {
@@ -836,7 +837,7 @@ private List<InternalHiveSplit> getBucketedSplits(
836837
// build mapping of file name to bucket
837838
ListMultimap<Integer, TrinoFileStatus> bucketFiles = ArrayListMultimap.create();
838839
for (TrinoFileStatus file : files) {
839-
String fileName = file.getPath().getName();
840+
String fileName = Locations.getFileName(file.getPath());
840841
OptionalInt bucket = getBucketNumber(fileName);
841842
if (bucket.isPresent()) {
842843
bucketFiles.put(bucket.getAsInt(), file);

plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/HiveFileIterator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,11 @@ protected TrinoFileStatus computeNext()
8282
// Ignore hidden files and directories
8383
if (nestedDirectoryPolicy == RECURSE) {
8484
// Search the full sub-path under the listed prefix for hidden directories
85-
if (isHiddenOrWithinHiddenParentDirectory(status.getPath(), pathPrefix)) {
85+
if (isHiddenOrWithinHiddenParentDirectory(new Path(status.getPath()), pathPrefix)) {
8686
continue;
8787
}
8888
}
89-
else if (isHiddenFileOrDirectory(status.getPath())) {
89+
else if (isHiddenFileOrDirectory(new Path(status.getPath()))) {
9090
continue;
9191
}
9292

@@ -230,15 +230,15 @@ private TrinoException processException(IOException exception)
230230
public static class NestedDirectoryNotAllowedException
231231
extends RuntimeException
232232
{
233-
private final Path nestedDirectoryPath;
233+
private final String nestedDirectoryPath;
234234

235-
public NestedDirectoryNotAllowedException(Path nestedDirectoryPath)
235+
public NestedDirectoryNotAllowedException(String nestedDirectoryPath)
236236
{
237237
super("Nested sub-directories are not allowed: " + nestedDirectoryPath);
238238
this.nestedDirectoryPath = requireNonNull(nestedDirectoryPath, "nestedDirectoryPath is null");
239239
}
240240

241-
public Path getNestedDirectoryPath()
241+
public String getNestedDirectoryPath()
242242
{
243243
return nestedDirectoryPath;
244244
}

plugin/trino-hive/src/main/java/io/trino/plugin/hive/fs/TrinoFileStatus.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import io.trino.filesystem.FileEntry;
1818
import io.trino.filesystem.FileEntry.Block;
1919
import org.apache.hadoop.fs.LocatedFileStatus;
20-
import org.apache.hadoop.fs.Path;
2120

2221
import java.util.List;
2322
import java.util.Objects;
@@ -30,7 +29,7 @@ public class TrinoFileStatus
3029
implements Comparable<TrinoFileStatus>
3130
{
3231
private final List<BlockLocation> blockLocations;
33-
private final Path path;
32+
private final String path;
3433
private final boolean isDirectory;
3534
private final long length;
3635
private final long modificationTime;
@@ -42,7 +41,7 @@ public TrinoFileStatus(FileEntry entry)
4241
.stream()
4342
.map(BlockLocation::new)
4443
.collect(toImmutableList()),
45-
new Path(entry.location().toString()),
44+
entry.location().toString(),
4645
false,
4746
entry.length(),
4847
entry.lastModified().toEpochMilli());
@@ -51,13 +50,13 @@ public TrinoFileStatus(FileEntry entry)
5150
public TrinoFileStatus(LocatedFileStatus fileStatus)
5251
{
5352
this(BlockLocation.fromHiveBlockLocations(fileStatus.getBlockLocations()),
54-
fileStatus.getPath(),
53+
fileStatus.getPath().toString(),
5554
fileStatus.isDirectory(),
5655
fileStatus.getLen(),
5756
fileStatus.getModificationTime());
5857
}
5958

60-
public TrinoFileStatus(List<BlockLocation> blockLocations, Path path, boolean isDirectory, long length, long modificationTime)
59+
public TrinoFileStatus(List<BlockLocation> blockLocations, String path, boolean isDirectory, long length, long modificationTime)
6160
{
6261
this.blockLocations = ImmutableList.copyOf(requireNonNull(blockLocations, "blockLocations is null"));
6362
this.path = requireNonNull(path, "path is null");
@@ -71,7 +70,7 @@ public List<BlockLocation> getBlockLocations()
7170
return blockLocations;
7271
}
7372

74-
public Path getPath()
73+
public String getPath()
7574
{
7675
return path;
7776
}

plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3select/S3SelectPushdown.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.trino.plugin.hive.metastore.Table;
2020
import io.trino.plugin.hive.type.DecimalTypeInfo;
2121
import io.trino.spi.connector.ConnectorSession;
22-
import org.apache.hadoop.fs.Path;
2322
import org.apache.hadoop.io.compress.BZip2Codec;
2423
import org.apache.hadoop.io.compress.GzipCodec;
2524
import org.apache.hadoop.mapred.InputFormat;
@@ -95,7 +94,7 @@ private static boolean isInputFormatSupported(Properties schema)
9594
return false;
9695
}
9796

98-
public static boolean isCompressionCodecSupported(InputFormat<?, ?> inputFormat, Path path)
97+
public static boolean isCompressionCodecSupported(InputFormat<?, ?> inputFormat, String path)
9998
{
10099
if (inputFormat instanceof TextInputFormat textInputFormat) {
101100
// S3 Select supports the following formats: uncompressed, GZIP and BZIP2.
@@ -107,10 +106,7 @@ public static boolean isCompressionCodecSupported(InputFormat<?, ?> inputFormat,
107106
return false;
108107
}
109108

110-
public static boolean isSplittable(boolean s3SelectPushdownEnabled,
111-
Properties schema,
112-
InputFormat<?, ?> inputFormat,
113-
Path path)
109+
public static boolean isSplittable(boolean s3SelectPushdownEnabled, Properties schema, InputFormat<?, ?> inputFormat, String path)
114110
{
115111
if (!s3SelectPushdownEnabled) {
116112
return true;
@@ -123,7 +119,7 @@ public static boolean isSplittable(boolean s3SelectPushdownEnabled,
123119
return false;
124120
}
125121

126-
private static boolean isUncompressed(InputFormat<?, ?> inputFormat, Path path)
122+
private static boolean isUncompressed(InputFormat<?, ?> inputFormat, String path)
127123
{
128124
if (inputFormat instanceof TextInputFormat textInputFormat) {
129125
// S3 Select supports splitting uncompressed files

plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ private static void configureCompressionCodecs(JobConf jobConf)
326326
jobConf.set("io.compression.codecs", String.join(",", codecs));
327327
}
328328

329-
public static Optional<CompressionCodec> getCompressionCodec(TextInputFormat inputFormat, Path file)
329+
public static Optional<CompressionCodec> getCompressionCodec(TextInputFormat inputFormat, String file)
330330
{
331331
CompressionCodecFactory compressionCodecFactory;
332332

@@ -341,7 +341,7 @@ public static Optional<CompressionCodec> getCompressionCodec(TextInputFormat inp
341341
return Optional.empty();
342342
}
343343

344-
return Optional.ofNullable(compressionCodecFactory.getCodec(file));
344+
return Optional.ofNullable(compressionCodecFactory.getCodec(new Path(file)));
345345
}
346346

347347
public static InputFormat<?, ?> getInputFormat(Configuration configuration, Properties schema, boolean symlinkTarget)

plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/InternalHiveSplitFactory.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public Optional<InternalHiveSplit> createInternalHiveSplit(TrinoFileStatus statu
125125
{
126126
splittable = splittable &&
127127
status.getLength() > minimumTargetSplitSizeInBytes &&
128-
isSplittable(inputFormat, fileSystem, status.getPath());
128+
isSplittable(inputFormat, fileSystem, new Path(status.getPath()));
129129
return createInternalHiveSplit(
130130
status.getPath(),
131131
status.getBlockLocations(),
@@ -144,7 +144,7 @@ public Optional<InternalHiveSplit> createInternalHiveSplit(FileSplit split)
144144
{
145145
FileStatus file = fileSystem.getFileStatus(split.getPath());
146146
return createInternalHiveSplit(
147-
split.getPath(),
147+
split.getPath().toString(),
148148
BlockLocation.fromHiveBlockLocations(fileSystem.getFileBlockLocations(file, split.getStart(), split.getLength())),
149149
split.getStart(),
150150
split.getLength(),
@@ -157,7 +157,7 @@ public Optional<InternalHiveSplit> createInternalHiveSplit(FileSplit split)
157157
}
158158

159159
private Optional<InternalHiveSplit> createInternalHiveSplit(
160-
Path path,
160+
String path,
161161
List<BlockLocation> blockLocations,
162162
long start,
163163
long length,
@@ -169,8 +169,7 @@ private Optional<InternalHiveSplit> createInternalHiveSplit(
169169
boolean splittable,
170170
Optional<AcidInfo> acidInfo)
171171
{
172-
String pathString = path.toString();
173-
if (!pathMatchesPredicate(pathDomain, pathString)) {
172+
if (!pathMatchesPredicate(pathDomain, path)) {
174173
return Optional.empty();
175174
}
176175

@@ -215,7 +214,7 @@ private Optional<InternalHiveSplit> createInternalHiveSplit(
215214
int bucketNumberIndex = readBucketNumber.orElse(0);
216215
return Optional.of(new InternalHiveSplit(
217216
partitionName,
218-
pathString,
217+
path,
219218
start,
220219
start + length,
221220
estimatedFileSize,
@@ -236,7 +235,7 @@ private Optional<InternalHiveSplit> createInternalHiveSplit(
236235
partitionMatchSupplier));
237236
}
238237

239-
private static void checkBlocks(Path path, List<InternalHiveBlock> blocks, long start, long length)
238+
private static void checkBlocks(String path, List<InternalHiveBlock> blocks, long start, long length)
240239
{
241240
checkArgument(start >= 0, "Split (%s) has negative start (%s)", path, start);
242241
checkArgument(length >= 0, "Split (%s) has negative length (%s)", path, length);

plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
import com.google.common.collect.ImmutableList;
1717
import com.google.common.collect.ImmutableMap;
1818
import com.google.common.collect.ImmutableSet;
19-
import com.google.common.collect.Iterators;
20-
import com.google.common.collect.Lists;
19+
import com.google.common.collect.Streams;
2120
import com.google.common.net.HostAndPort;
2221
import io.airlift.concurrent.BoundedExecutor;
2322
import io.airlift.json.JsonCodec;
@@ -477,7 +476,10 @@ public void testFileIteratorListing()
477476
HiveFileIterator.NestedDirectoryPolicy.RECURSE,
478477
false); // ignoreAbsentPartitions
479478

480-
List<Path> recursiveListing = Lists.newArrayList(Iterators.transform(recursiveIterator, TrinoFileStatus::getPath));
479+
List<Path> recursiveListing = Streams.stream(recursiveIterator)
480+
.map(TrinoFileStatus::getPath)
481+
.map(Path::new)
482+
.toList();
481483
// Should not include directories, or files underneath hidden directories
482484
assertEqualsIgnoreOrder(recursiveListing, ImmutableList.of(nestedFile, baseFile));
483485

@@ -489,7 +491,10 @@ public void testFileIteratorListing()
489491
new NamenodeStats(),
490492
HiveFileIterator.NestedDirectoryPolicy.IGNORED,
491493
false); // ignoreAbsentPartitions
492-
List<Path> shallowListing = Lists.newArrayList(Iterators.transform(shallowIterator, TrinoFileStatus::getPath));
494+
List<Path> shallowListing = Streams.stream(shallowIterator)
495+
.map(TrinoFileStatus::getPath)
496+
.map(Path::new)
497+
.toList();
493498
// Should not include any hidden files, folders, or nested files
494499
assertEqualsIgnoreOrder(shallowListing, ImmutableList.of(baseFile));
495500
}

plugin/trino-hive/src/test/java/io/trino/plugin/hive/fs/TestTransactionScopeCachingDirectoryLister.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,12 @@ protected boolean isCached(TransactionScopeCachingDirectoryLister directoryListe
8484
public void testConcurrentDirectoryListing()
8585
throws IOException
8686
{
87-
TrinoFileStatus firstFile = new TrinoFileStatus(ImmutableList.of(), new org.apache.hadoop.fs.Path("x"), false, 1, 1);
88-
TrinoFileStatus secondFile = new TrinoFileStatus(ImmutableList.of(), new org.apache.hadoop.fs.Path("y"), false, 1, 1);
89-
TrinoFileStatus thirdFile = new TrinoFileStatus(ImmutableList.of(), new org.apache.hadoop.fs.Path("z"), false, 1, 1);
87+
TrinoFileStatus firstFile = new TrinoFileStatus(ImmutableList.of(), "x", false, 1, 1);
88+
TrinoFileStatus secondFile = new TrinoFileStatus(ImmutableList.of(), "y", false, 1, 1);
89+
TrinoFileStatus thirdFile = new TrinoFileStatus(ImmutableList.of(), "z", false, 1, 1);
9090

91-
org.apache.hadoop.fs.Path path1 = new org.apache.hadoop.fs.Path("x");
92-
org.apache.hadoop.fs.Path path2 = new org.apache.hadoop.fs.Path("y");
91+
Path path1 = new Path("x");
92+
Path path2 = new Path("y");
9393

9494
CountingDirectoryLister countingLister = new CountingDirectoryLister(
9595
ImmutableMap.of(
@@ -130,8 +130,8 @@ public void testConcurrentDirectoryListing()
130130
public void testConcurrentDirectoryListingException()
131131
throws IOException
132132
{
133-
TrinoFileStatus file = new TrinoFileStatus(ImmutableList.of(), new org.apache.hadoop.fs.Path("x"), false, 1, 1);
134-
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path("x");
133+
TrinoFileStatus file = new TrinoFileStatus(ImmutableList.of(), "x", false, 1, 1);
134+
Path path = new Path("x");
135135

136136
CountingDirectoryLister countingLister = new CountingDirectoryLister(ImmutableMap.of(path, ImmutableList.of(file)));
137137
DirectoryLister cachingLister = new TransactionScopeCachingDirectoryLister(countingLister, 1);
@@ -167,17 +167,17 @@ private void assertFiles(RemoteIterator<TrinoFileStatus> iterator, List<TrinoFil
167167
private static class CountingDirectoryLister
168168
implements DirectoryLister
169169
{
170-
private final Map<org.apache.hadoop.fs.Path, List<TrinoFileStatus>> fileStatuses;
170+
private final Map<Path, List<TrinoFileStatus>> fileStatuses;
171171
private int listCount;
172172
private boolean throwException;
173173

174-
public CountingDirectoryLister(Map<org.apache.hadoop.fs.Path, List<TrinoFileStatus>> fileStatuses)
174+
public CountingDirectoryLister(Map<Path, List<TrinoFileStatus>> fileStatuses)
175175
{
176176
this.fileStatuses = requireNonNull(fileStatuses, "fileStatuses is null");
177177
}
178178

179179
@Override
180-
public RemoteIterator<TrinoFileStatus> list(FileSystem fs, Table table, org.apache.hadoop.fs.Path path)
180+
public RemoteIterator<TrinoFileStatus> list(FileSystem fs, Table table, Path path)
181181
throws IOException
182182
{
183183
listCount++;

plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3select/TestS3SelectPushdown.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.trino.plugin.hive.metastore.Table;
2121
import io.trino.spi.connector.ConnectorSession;
2222
import io.trino.testing.TestingConnectorSession;
23-
import org.apache.hadoop.fs.Path;
2423
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
2524
import org.apache.hadoop.mapred.JobConf;
2625
import org.apache.hadoop.mapred.TextInputFormat;
@@ -116,11 +115,11 @@ public void setUp()
116115
@Test
117116
public void testIsCompressionCodecSupported()
118117
{
119-
assertTrue(isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject.gz")));
120-
assertTrue(isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject")));
121-
assertFalse(isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject.lz4")));
122-
assertFalse(isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject.snappy")));
123-
assertTrue(isCompressionCodecSupported(inputFormat, new Path("s3://fakeBucket/fakeObject.bz2")));
118+
assertTrue(isCompressionCodecSupported(inputFormat, "s3://fakeBucket/fakeObject.gz"));
119+
assertTrue(isCompressionCodecSupported(inputFormat, "s3://fakeBucket/fakeObject"));
120+
assertFalse(isCompressionCodecSupported(inputFormat, "s3://fakeBucket/fakeObject.lz4"));
121+
assertFalse(isCompressionCodecSupported(inputFormat, "s3://fakeBucket/fakeObject.snappy"));
122+
assertTrue(isCompressionCodecSupported(inputFormat, "s3://fakeBucket/fakeObject.bz2"));
124123
}
125124

126125
@Test
@@ -273,20 +272,20 @@ public void testShouldNotEnableSelectPushdownWhenColumnTypesAreNotSupported()
273272
public void testShouldEnableSplits()
274273
{
275274
// Uncompressed CSV
276-
assertTrue(isSplittable(true, schema, inputFormat, new Path("s3://fakeBucket/fakeObject.csv")));
275+
assertTrue(isSplittable(true, schema, inputFormat, "s3://fakeBucket/fakeObject.csv"));
277276
// Pushdown disabled
278-
assertTrue(isSplittable(false, schema, inputFormat, new Path("s3://fakeBucket/fakeObject.csv")));
277+
assertTrue(isSplittable(false, schema, inputFormat, "s3://fakeBucket/fakeObject.csv"));
279278
// JSON
280279
Properties jsonSchema = new Properties();
281280
jsonSchema.setProperty(SERIALIZATION_LIB, JsonSerDe.class.getName());
282-
assertTrue(isSplittable(true, jsonSchema, inputFormat, new Path("s3://fakeBucket/fakeObject.json")));
281+
assertTrue(isSplittable(true, jsonSchema, inputFormat, "s3://fakeBucket/fakeObject.json"));
283282
}
284283

285284
@Test
286285
public void testShouldNotEnableSplits()
287286
{
288287
// Compressed file
289-
assertFalse(isSplittable(true, schema, inputFormat, new Path("s3://fakeBucket/fakeObject.gz")));
288+
assertFalse(isSplittable(true, schema, inputFormat, "s3://fakeBucket/fakeObject.gz"));
290289
}
291290

292291
@AfterClass(alwaysRun = true)

0 commit comments

Comments
 (0)