Skip to content

Commit ad2e69c

Browse files
committed
Detect and close I/O leaks in the native FS implementations
It is enable by default in the CI to report detected leaks
1 parent cd77f93 commit ad2e69c

File tree

11 files changed

+778
-2
lines changed

11 files changed

+778
-2
lines changed

lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class FileSystemConfig
2424
private boolean nativeGcsEnabled;
2525
private boolean nativeLocalEnabled;
2626
private boolean cacheEnabled;
27+
private boolean leakDetectionEnabled;
2728

2829
public boolean isHadoopEnabled()
2930
{
@@ -108,4 +109,16 @@ public FileSystemConfig setCacheEnabled(boolean enabled)
108109
this.cacheEnabled = enabled;
109110
return this;
110111
}
112+
113+
public boolean isLeakDetectionEnabled()
114+
{
115+
return leakDetectionEnabled;
116+
}
117+
118+
@Config("fs.leak-detection.enabled")
119+
public FileSystemConfig setLeakDetectionEnabled(boolean leakDetectionEnabled)
120+
{
121+
this.leakDetectionEnabled = leakDetectionEnabled;
122+
return this;
123+
}
111124
}

lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import io.trino.filesystem.cache.TrinoFileSystemCache;
3838
import io.trino.filesystem.gcs.GcsFileSystemFactory;
3939
import io.trino.filesystem.gcs.GcsFileSystemModule;
40+
import io.trino.filesystem.leakdetection.LeakDetectingFileSystemFactory;
4041
import io.trino.filesystem.local.LocalFileSystemConfig;
4142
import io.trino.filesystem.local.LocalFileSystemFactory;
4243
import io.trino.filesystem.memory.MemoryFileSystemCache;
@@ -54,6 +55,7 @@
5455
import static com.google.inject.multibindings.MapBinder.newMapBinder;
5556
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
5657
import static io.airlift.configuration.ConfigBinder.configBinder;
58+
import static java.lang.System.getenv;
5759
import static java.util.Objects.requireNonNull;
5860

5961
public class FileSystemModule
@@ -146,6 +148,7 @@ protected void setup(Binder binder)
146148
@Provides
147149
@Singleton
148150
static TrinoFileSystemFactory createFileSystemFactory(
151+
FileSystemConfig config,
149152
Optional<HdfsFileSystemLoader> hdfsFileSystemLoader,
150153
Map<String, TrinoFileSystemFactory> factories,
151154
Optional<TrinoFileSystemCache> fileSystemCache,
@@ -162,6 +165,13 @@ static TrinoFileSystemFactory createFileSystemFactory(
162165

163166
TrinoFileSystemFactory delegate = new SwitchingFileSystemFactory(loader);
164167
delegate = new TracingFileSystemFactory(tracer, delegate);
168+
169+
// Enable leak detection if configured or if running in a CI environment
170+
boolean leakDetectionEnabled = getenv("CONTINUOUS_INTEGRATION") != null;
171+
if (config.isLeakDetectionEnabled() || leakDetectionEnabled) {
172+
delegate = new LeakDetectingFileSystemFactory(delegate);
173+
}
174+
165175
if (fileSystemCache.isPresent()) {
166176
return new CacheFileSystemFactory(tracer, delegate, fileSystemCache.orElseThrow(), keyProvider.orElseThrow());
167177
}

lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public void testDefaults()
3434
.setNativeS3Enabled(false)
3535
.setNativeGcsEnabled(false)
3636
.setNativeLocalEnabled(false)
37-
.setCacheEnabled(false));
37+
.setCacheEnabled(false)
38+
.setLeakDetectionEnabled(false));
3839
}
3940

4041
@Test
@@ -48,6 +49,7 @@ public void testExplicitPropertyMappings()
4849
.put("fs.native-gcs.enabled", "true")
4950
.put("fs.native-local.enabled", "true")
5051
.put("fs.cache.enabled", "true")
52+
.put("fs.leak-detection.enabled", "true")
5153
.buildOrThrow();
5254

5355
FileSystemConfig expected = new FileSystemConfig()
@@ -57,7 +59,8 @@ public void testExplicitPropertyMappings()
5759
.setNativeS3Enabled(true)
5860
.setNativeGcsEnabled(true)
5961
.setNativeLocalEnabled(true)
60-
.setCacheEnabled(true);
62+
.setCacheEnabled(true)
63+
.setLeakDetectionEnabled(true);
6164

6265
assertFullMapping(properties, expected);
6366
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.filesystem.leakdetection;
15+
16+
import io.airlift.units.Duration;
17+
import io.trino.filesystem.FileIterator;
18+
import io.trino.filesystem.Location;
19+
import io.trino.filesystem.TrinoFileSystem;
20+
import io.trino.filesystem.TrinoInputFile;
21+
import io.trino.filesystem.TrinoOutputFile;
22+
import io.trino.filesystem.UriLocation;
23+
import io.trino.filesystem.encryption.EncryptionKey;
24+
25+
import java.io.IOException;
26+
import java.lang.ref.Cleaner;
27+
import java.time.Instant;
28+
import java.util.Collection;
29+
import java.util.Optional;
30+
import java.util.Set;
31+
32+
import static java.util.Objects.requireNonNull;
33+
34+
public class LeakDetectingFileSystem
35+
implements TrinoFileSystem
36+
{
37+
private final TrinoFileSystem delegate;
38+
private final Cleaner cleaner;
39+
40+
public LeakDetectingFileSystem(TrinoFileSystem delegate, Cleaner cleaner)
41+
{
42+
this.delegate = requireNonNull(delegate, "delegate is null");
43+
this.cleaner = requireNonNull(cleaner, "cleaner is null");
44+
}
45+
46+
@Override
47+
public TrinoInputFile newInputFile(Location location)
48+
{
49+
return new LeakDetectingInputFile(delegate.newInputFile(location), cleaner);
50+
}
51+
52+
@Override
53+
public TrinoInputFile newInputFile(Location location, long length)
54+
{
55+
return new LeakDetectingInputFile(delegate.newInputFile(location, length), cleaner);
56+
}
57+
58+
@Override
59+
public TrinoInputFile newInputFile(Location location, long length, Instant lastModified)
60+
{
61+
return new LeakDetectingInputFile(delegate.newInputFile(location, length, lastModified), cleaner);
62+
}
63+
64+
@Override
65+
public TrinoOutputFile newOutputFile(Location location)
66+
{
67+
return new LeakDetectingOutputFile(delegate.newOutputFile(location), cleaner);
68+
}
69+
70+
@Override
71+
public void deleteFile(Location location)
72+
throws IOException
73+
{
74+
delegate.deleteFile(location);
75+
}
76+
77+
@Override
78+
public void deleteDirectory(Location location)
79+
throws IOException
80+
{
81+
delegate.deleteDirectory(location);
82+
}
83+
84+
@Override
85+
public void renameFile(Location source, Location target)
86+
throws IOException
87+
{
88+
delegate.renameFile(source, target);
89+
}
90+
91+
@Override
92+
public FileIterator listFiles(Location location)
93+
throws IOException
94+
{
95+
return delegate.listFiles(location);
96+
}
97+
98+
@Override
99+
public Optional<Boolean> directoryExists(Location location)
100+
throws IOException
101+
{
102+
return delegate.directoryExists(location);
103+
}
104+
105+
@Override
106+
public void createDirectory(Location location)
107+
throws IOException
108+
{
109+
delegate.createDirectory(location);
110+
}
111+
112+
@Override
113+
public void renameDirectory(Location source, Location target)
114+
throws IOException
115+
{
116+
delegate.renameDirectory(source, target);
117+
}
118+
119+
@Override
120+
public Set<Location> listDirectories(Location location)
121+
throws IOException
122+
{
123+
return delegate.listDirectories(location);
124+
}
125+
126+
@Override
127+
public Optional<Location> createTemporaryDirectory(Location targetPath, String temporaryPrefix, String relativePrefix)
128+
throws IOException
129+
{
130+
return delegate.createTemporaryDirectory(targetPath, temporaryPrefix, relativePrefix);
131+
}
132+
133+
@Override
134+
public TrinoInputFile newEncryptedInputFile(Location location, EncryptionKey key)
135+
{
136+
return new LeakDetectingInputFile(delegate.newEncryptedInputFile(location, key), cleaner);
137+
}
138+
139+
@Override
140+
public TrinoInputFile newEncryptedInputFile(Location location, long length, EncryptionKey key)
141+
{
142+
return new LeakDetectingInputFile(delegate.newEncryptedInputFile(location, length, key), cleaner);
143+
}
144+
145+
@Override
146+
public TrinoInputFile newEncryptedInputFile(Location location, long length, Instant lastModified, EncryptionKey key)
147+
{
148+
return new LeakDetectingInputFile(delegate.newEncryptedInputFile(location, length, lastModified, key), cleaner);
149+
}
150+
151+
@Override
152+
public TrinoOutputFile newEncryptedOutputFile(Location location, EncryptionKey key)
153+
{
154+
return new LeakDetectingOutputFile(delegate.newEncryptedOutputFile(location, key), cleaner);
155+
}
156+
157+
@Override
158+
public void deleteFiles(Collection<Location> locations)
159+
throws IOException
160+
{
161+
delegate.deleteFiles(locations);
162+
}
163+
164+
@Override
165+
public Optional<UriLocation> preSignedUri(Location location, Duration ttl)
166+
throws IOException
167+
{
168+
return delegate.preSignedUri(location, ttl);
169+
}
170+
171+
@Override
172+
public Optional<UriLocation> encryptedPreSignedUri(Location location, Duration ttl, EncryptionKey key)
173+
throws IOException
174+
{
175+
return delegate.encryptedPreSignedUri(location, ttl, key);
176+
}
177+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.filesystem.leakdetection;
15+
16+
import io.trino.filesystem.TrinoFileSystem;
17+
import io.trino.filesystem.TrinoFileSystemFactory;
18+
import io.trino.spi.connector.ConnectorSession;
19+
import io.trino.spi.security.ConnectorIdentity;
20+
21+
import java.lang.ref.Cleaner;
22+
23+
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
24+
import static java.util.Objects.requireNonNull;
25+
26+
public class LeakDetectingFileSystemFactory
27+
implements TrinoFileSystemFactory
28+
{
29+
private final TrinoFileSystemFactory delegate;
30+
31+
public LeakDetectingFileSystemFactory(TrinoFileSystemFactory delegate)
32+
{
33+
this.delegate = requireNonNull(delegate, "delegate is null");
34+
}
35+
36+
@Override
37+
public TrinoFileSystem create(ConnectorIdentity identity)
38+
{
39+
return new LeakDetectingFileSystem(delegate.create(identity), createCleaner());
40+
}
41+
42+
@Override
43+
public TrinoFileSystem create(ConnectorSession session)
44+
{
45+
return new LeakDetectingFileSystem(delegate.create(session), createCleaner());
46+
}
47+
48+
private static Cleaner createCleaner()
49+
{
50+
return Cleaner.create(daemonThreadsNamed("fs-leak-detector-%s"));
51+
}
52+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.filesystem.leakdetection;
15+
16+
import io.airlift.slice.Slice;
17+
import io.trino.filesystem.Location;
18+
import io.trino.filesystem.TrinoInput;
19+
20+
import java.io.IOException;
21+
import java.lang.ref.Cleaner;
22+
23+
import static java.util.Objects.requireNonNull;
24+
25+
public class LeakDetectingInput
26+
implements TrinoInput
27+
{
28+
private final TrinoInput delegate;
29+
private final TrackedCloseableState trackedState;
30+
31+
public LeakDetectingInput(TrinoInput delegate, Location location, Cleaner cleaner)
32+
{
33+
this.delegate = requireNonNull(delegate, "delegate is null");
34+
this.trackedState = new TrackedCloseableState(delegate, location);
35+
cleaner.register(this, trackedState);
36+
}
37+
38+
@Override
39+
public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength)
40+
throws IOException
41+
{
42+
delegate.readFully(position, buffer, bufferOffset, bufferLength);
43+
}
44+
45+
@Override
46+
public int readTail(byte[] buffer, int bufferOffset, int bufferLength)
47+
throws IOException
48+
{
49+
return delegate.readTail(buffer, bufferOffset, bufferLength);
50+
}
51+
52+
@Override
53+
public Slice readFully(long position, int length)
54+
throws IOException
55+
{
56+
return delegate.readFully(position, length);
57+
}
58+
59+
@Override
60+
public Slice readTail(int length)
61+
throws IOException
62+
{
63+
return delegate.readTail(length);
64+
}
65+
66+
@Override
67+
public void close()
68+
throws IOException
69+
{
70+
trackedState.markClosed();
71+
delegate.close();
72+
}
73+
}

0 commit comments

Comments
 (0)