Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .mvn/modernizer/violations.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@
<comment>Prefer ConfigurationInstantiator.newEmptyConfiguration() for two reasons: (1) loading default resources is unlikely desired and (2) ConfigurationInstantiator adds additional safety checks</comment>
</violation>

<violation>
<name>org/apache/hadoop/fs/FileSystem.close:()V</name>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be caught when the code uses explicit type, eg. SyncingFileSystem?
i'd guess the code might be compiled with actual implementation method name.

Maybe the shared FileSystem instances should be wrapped with something that prevents inadvertent close?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as long as it's not cast to Closeable. We're getting rid of FileSystem usage so I'd rather not make an invasive change like that.

<version>1.1</version>
<comment>Hadoop FileSystem instances are shared and should not be closed</comment>
</violation>

<violation>
<name>org/apache/hadoop/conf/Configuration."&lt;init&gt;":(Z)V</name>
<version>1.1</version>
Expand Down
5 changes: 5 additions & 0 deletions lib/trino-hdfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@
<artifactId>validation-api</artifactId>
</dependency>

<dependency>
<groupId>org.gaul</groupId>
<artifactId>modernizer-maven-annotations</artifactId>
</dependency>

<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;

import javax.annotation.concurrent.GuardedBy;

Expand Down Expand Up @@ -167,7 +168,7 @@ private static FileSystem createFileSystem(URI uri, Configuration conf)
FilterFileSystem wrapper = new FileSystemWrapper(original);
FileSystemFinalizerService.getInstance().addFinalizer(wrapper, () -> {
try {
original.close();
closeFileSystem(original);
}
catch (IOException e) {
log.error(e, "Error occurred when finalizing file system");
Expand All @@ -188,11 +189,18 @@ public synchronized void closeAll()
throws IOException
{
for (FileSystemHolder fileSystemHolder : ImmutableList.copyOf(map.values())) {
fileSystemHolder.getFileSystem().close();
closeFileSystem(fileSystemHolder.getFileSystem());
}
map.clear();
}

@SuppressModernizer
private static void closeFileSystem(FileSystem fileSystem)
throws IOException
{
fileSystem.close();
}

private static FileSystemKey createFileSystemKey(URI uri, UserGroupInformation userGroupInformation, long unique)
{
String scheme = nullToEmpty(uri.getScheme()).toLowerCase(ENGLISH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,10 +640,9 @@ public void dropSchema(ConnectorSession session, String schemaName)
// If we see files in the schema location, don't delete it.
// If we see no files or can't see the location at all, use fallback.
boolean deleteData = location.map(path -> {
HdfsContext context = new HdfsContext(session); // don't catch errors here

try (FileSystem fs = hdfsEnvironment.getFileSystem(context, path)) {
return !fs.listLocatedStatus(path).hasNext();
try {
return !hdfsEnvironment.getFileSystem(new HdfsContext(session), path)
.listLocatedStatus(path).hasNext();
}
catch (IOException | RuntimeException e) {
LOG.warn(e, "Could not check schema directory '%s'", path);
Expand Down
5 changes: 5 additions & 0 deletions plugin/trino-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@
<artifactId>libthrift</artifactId>
</dependency>

<dependency>
<groupId>org.gaul</groupId>
<artifactId>modernizer-maven-annotations</artifactId>
</dependency>

<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,9 @@ public synchronized void dropDatabase(ConnectorSession session, String schemaNam
// If we see no files, request deletion.
// If we fail to check the schema location, behave according to fallback.
boolean deleteData = location.map(path -> {
HdfsContext context = new HdfsContext(session);
try (FileSystem fs = hdfsEnvironment.getFileSystem(context, path)) {
return !fs.listLocatedStatus(path).hasNext();
try {
return !hdfsEnvironment.getFileSystem(new HdfsContext(session), path)
.listLocatedStatus(path).hasNext();
}
catch (IOException | RuntimeException e) {
log.warn(e, "Could not check schema directory '%s'", path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;

import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -341,7 +342,7 @@ public void close()
throws IOException
{
try (Closer closer = Closer.create()) {
closer.register(super::close);
closer.register(this::closeSuper);
if (credentialsProvider instanceof Closeable) {
closer.register((Closeable) credentialsProvider);
}
Expand All @@ -350,6 +351,13 @@ public void close()
}
}

@SuppressModernizer
private void closeSuper()
throws IOException
{
super.close();
}

@Override
public URI getUri()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.gaul.modernizer_maven_annotations.SuppressModernizer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -235,7 +236,7 @@ private FileSystem getCachingFileSystem(HdfsContext context, Path path)
public void tearDown()
throws IOException
{
nonCachingFileSystem.close();
closeFileSystem(nonCachingFileSystem);
}

@AfterMethod(alwaysRun = true)
Expand All @@ -251,7 +252,7 @@ public void closeRubix()
});
closer.register(() -> {
if (cachingFileSystem != null) {
cachingFileSystem.close();
closeFileSystem(cachingFileSystem);
cachingFileSystem = null;
}
});
Expand All @@ -274,6 +275,13 @@ public void closeRubix()
}
}

@SuppressModernizer
private static void closeFileSystem(FileSystem fileSystem)
throws IOException
{
fileSystem.close();
}

@DataProvider
public static Object[][] readMode()
{
Expand Down Expand Up @@ -469,6 +477,7 @@ public void testLargeFile(ReadMode readMode)
});
}

@SuppressModernizer
@Test
public void testFileSystemBindings()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,38 +62,37 @@ private OrcFileRewriter() {}
public static OrcFileInfo rewrite(File input, File output, BitSet rowsToDelete)
throws IOException
{
try (FileSystem fileSystem = new SyncingFileSystem(CONFIGURATION)) {
Reader reader = createReader(fileSystem, path(input));
FileSystem fileSystem = new SyncingFileSystem(CONFIGURATION);
Reader reader = createReader(fileSystem, path(input));

if (reader.getNumberOfRows() < rowsToDelete.length()) {
throw new IOException("File has fewer rows than deletion vector");
}
int deleteRowCount = rowsToDelete.cardinality();
if (reader.getNumberOfRows() == deleteRowCount) {
return new OrcFileInfo(0, 0);
}
if (reader.getNumberOfRows() >= Integer.MAX_VALUE) {
throw new IOException("File has too many rows");
}
int inputRowCount = toIntExact(reader.getNumberOfRows());

WriterOptions writerOptions = OrcFile.writerOptions(CONFIGURATION)
.memory(new NullMemoryManager())
.fileSystem(fileSystem)
.compress(reader.getCompression())
.inspector(reader.getObjectInspector());

long start = System.nanoTime();
try (Closer<RecordReader, IOException> recordReader = closer(reader.rows(), RecordReader::close);
Closer<Writer, IOException> writer = closer(createWriter(path(output), writerOptions), Writer::close)) {
if (reader.hasMetadataValue(OrcFileMetadata.KEY)) {
ByteBuffer orcFileMetadata = reader.getMetadataValue(OrcFileMetadata.KEY);
writer.get().addUserMetadata(OrcFileMetadata.KEY, orcFileMetadata);
}
OrcFileInfo fileInfo = rewrite(recordReader.get(), writer.get(), rowsToDelete, inputRowCount);
log.debug("Rewrote file %s in %s (input rows: %s, output rows: %s)", input.getName(), nanosSince(start), inputRowCount, inputRowCount - deleteRowCount);
return fileInfo;
if (reader.getNumberOfRows() < rowsToDelete.length()) {
throw new IOException("File has fewer rows than deletion vector");
}
int deleteRowCount = rowsToDelete.cardinality();
if (reader.getNumberOfRows() == deleteRowCount) {
return new OrcFileInfo(0, 0);
}
if (reader.getNumberOfRows() >= Integer.MAX_VALUE) {
throw new IOException("File has too many rows");
}
int inputRowCount = toIntExact(reader.getNumberOfRows());

WriterOptions writerOptions = OrcFile.writerOptions(CONFIGURATION)
.memory(new NullMemoryManager())
.fileSystem(fileSystem)
.compress(reader.getCompression())
.inspector(reader.getObjectInspector());

long start = System.nanoTime();
try (Closer<RecordReader, IOException> recordReader = closer(reader.rows(), RecordReader::close);
Closer<Writer, IOException> writer = closer(createWriter(path(output), writerOptions), Writer::close)) {
if (reader.hasMetadataValue(OrcFileMetadata.KEY)) {
ByteBuffer orcFileMetadata = reader.getMetadataValue(OrcFileMetadata.KEY);
writer.get().addUserMetadata(OrcFileMetadata.KEY, orcFileMetadata);
}
OrcFileInfo fileInfo = rewrite(recordReader.get(), writer.get(), rowsToDelete, inputRowCount);
log.debug("Rewrote file %s in %s (input rows: %s, output rows: %s)", input.getName(), nanosSince(start), inputRowCount, inputRowCount - deleteRowCount);
return fileInfo;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
Expand Down Expand Up @@ -217,10 +216,10 @@ private static OrcSerde createSerializer(Properties properties)

private static RecordWriter createRecordWriter(Path target, List<Long> columnIds, List<Type> columnTypes, boolean writeMetadata)
{
try (FileSystem fileSystem = new SyncingFileSystem(CONFIGURATION)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to change the SyncingFileSystem usage? This actually seemed fine before. For example, if you disable this, then you break the deleteOnExit behavior if that is being used.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't necessary to close before, as we didn't use that behavior. It seems better to simply remove the close than to suppress the warning.

try {
OrcFile.WriterOptions options = OrcFile.writerOptions(CONFIGURATION)
.memory(new NullMemoryManager())
.fileSystem(fileSystem)
.fileSystem(new SyncingFileSystem(CONFIGURATION))
.compress(SNAPPY);

if (writeMetadata) {
Expand Down