Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.filesystem.manager;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;

public class FileSystemConfig
{
Expand All @@ -24,6 +25,7 @@ public class FileSystemConfig
private boolean nativeGcsEnabled;
private boolean nativeLocalEnabled;
private boolean cacheEnabled;
private boolean trackingEnabled;

public boolean isHadoopEnabled()
{
Expand Down Expand Up @@ -108,4 +110,17 @@ public FileSystemConfig setCacheEnabled(boolean enabled)
this.cacheEnabled = enabled;
return this;
}

public boolean isTrackingEnabled()
{
return trackingEnabled;
}

@ConfigDescription("Enable input/output stream tracking to detect resource leaks")
@Config("fs.tracking.enabled")
public FileSystemConfig setTrackingEnabled(boolean trackingEnabled)
{
this.trackingEnabled = trackingEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.trino.filesystem.s3.S3FileSystemModule;
import io.trino.filesystem.switching.SwitchingFileSystemFactory;
import io.trino.filesystem.tracing.TracingFileSystemFactory;
import io.trino.filesystem.tracking.TrackingFileSystemFactory;
import io.trino.spi.NodeManager;

import java.util.Map;
Expand All @@ -54,6 +55,7 @@
import static com.google.inject.multibindings.MapBinder.newMapBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static java.lang.System.getenv;
import static java.util.Objects.requireNonNull;

public class FileSystemModule
Expand Down Expand Up @@ -146,6 +148,7 @@ protected void setup(Binder binder)
@Provides
@Singleton
static TrinoFileSystemFactory createFileSystemFactory(
FileSystemConfig config,
Optional<HdfsFileSystemLoader> hdfsFileSystemLoader,
Map<String, TrinoFileSystemFactory> factories,
Optional<TrinoFileSystemCache> fileSystemCache,
Expand All @@ -162,6 +165,13 @@ static TrinoFileSystemFactory createFileSystemFactory(

TrinoFileSystemFactory delegate = new SwitchingFileSystemFactory(loader);
delegate = new TracingFileSystemFactory(tracer, delegate);

// Enable leak detection if configured or if running in a CI environment
boolean trackingDetectionEnabled = getenv("CONTINUOUS_INTEGRATION") != null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make this the default for the config, allowing it to be overridden in either case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will follow up

if (config.isTrackingEnabled() || trackingDetectionEnabled) {
delegate = new TrackingFileSystemFactory(delegate);
}

if (fileSystemCache.isPresent()) {
return new CacheFileSystemFactory(tracer, delegate, fileSystemCache.orElseThrow(), keyProvider.orElseThrow());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public void testDefaults()
.setNativeS3Enabled(false)
.setNativeGcsEnabled(false)
.setNativeLocalEnabled(false)
.setCacheEnabled(false));
.setCacheEnabled(false)
.setTrackingEnabled(false));
}

@Test
Expand All @@ -48,6 +49,7 @@ public void testExplicitPropertyMappings()
.put("fs.native-gcs.enabled", "true")
.put("fs.native-local.enabled", "true")
.put("fs.cache.enabled", "true")
.put("fs.tracking.enabled", "true")
.buildOrThrow();

FileSystemConfig expected = new FileSystemConfig()
Expand All @@ -57,7 +59,8 @@ public void testExplicitPropertyMappings()
.setNativeS3Enabled(true)
.setNativeGcsEnabled(true)
.setNativeLocalEnabled(true)
.setCacheEnabled(true);
.setCacheEnabled(true)
.setTrackingEnabled(true);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.tracking;

import io.airlift.units.Duration;
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.filesystem.UriLocation;
import io.trino.filesystem.encryption.EncryptionKey;

import java.io.IOException;
import java.lang.ref.Cleaner;
import java.time.Instant;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;

import static java.util.Objects.requireNonNull;

public class TrackingFileSystem
implements TrinoFileSystem
{
private final TrinoFileSystem delegate;
private final Cleaner cleaner;

public TrackingFileSystem(TrinoFileSystem delegate, Cleaner cleaner)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.cleaner = requireNonNull(cleaner, "cleaner is null");
}

@Override
public TrinoInputFile newInputFile(Location location)
{
return new TrackingInputFile(delegate.newInputFile(location), cleaner);
}

@Override
public TrinoInputFile newInputFile(Location location, long length)
{
return new TrackingInputFile(delegate.newInputFile(location, length), cleaner);
}

@Override
public TrinoInputFile newInputFile(Location location, long length, Instant lastModified)
{
return new TrackingInputFile(delegate.newInputFile(location, length, lastModified), cleaner);
}

@Override
public TrinoOutputFile newOutputFile(Location location)
{
return new TrackingOutputFile(delegate.newOutputFile(location), cleaner);
}

@Override
public void deleteFile(Location location)
throws IOException
{
delegate.deleteFile(location);
}

@Override
public void deleteDirectory(Location location)
throws IOException
{
delegate.deleteDirectory(location);
}

@Override
public void renameFile(Location source, Location target)
throws IOException
{
delegate.renameFile(source, target);
}

@Override
public FileIterator listFiles(Location location)
throws IOException
{
return delegate.listFiles(location);
}

@Override
public Optional<Boolean> directoryExists(Location location)
throws IOException
{
return delegate.directoryExists(location);
}

@Override
public void createDirectory(Location location)
throws IOException
{
delegate.createDirectory(location);
}

@Override
public void renameDirectory(Location source, Location target)
throws IOException
{
delegate.renameDirectory(source, target);
}

@Override
public Set<Location> listDirectories(Location location)
throws IOException
{
return delegate.listDirectories(location);
}

@Override
public Optional<Location> createTemporaryDirectory(Location targetPath, String temporaryPrefix, String relativePrefix)
throws IOException
{
return delegate.createTemporaryDirectory(targetPath, temporaryPrefix, relativePrefix);
}

@Override
public TrinoInputFile newEncryptedInputFile(Location location, EncryptionKey key)
{
return new TrackingInputFile(delegate.newEncryptedInputFile(location, key), cleaner);
}

@Override
public TrinoInputFile newEncryptedInputFile(Location location, long length, EncryptionKey key)
{
return new TrackingInputFile(delegate.newEncryptedInputFile(location, length, key), cleaner);
}

@Override
public TrinoInputFile newEncryptedInputFile(Location location, long length, Instant lastModified, EncryptionKey key)
{
return new TrackingInputFile(delegate.newEncryptedInputFile(location, length, lastModified, key), cleaner);
}

@Override
public TrinoOutputFile newEncryptedOutputFile(Location location, EncryptionKey key)
{
return new TrackingOutputFile(delegate.newEncryptedOutputFile(location, key), cleaner);
}

@Override
public void deleteFiles(Collection<Location> locations)
throws IOException
{
delegate.deleteFiles(locations);
}

@Override
public Optional<UriLocation> preSignedUri(Location location, Duration ttl)
throws IOException
{
return delegate.preSignedUri(location, ttl);
}

@Override
public Optional<UriLocation> encryptedPreSignedUri(Location location, Duration ttl, EncryptionKey key)
throws IOException
{
return delegate.encryptedPreSignedUri(location, ttl, key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem.tracking;

import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.security.ConnectorIdentity;

import java.lang.ref.Cleaner;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static java.util.Objects.requireNonNull;

public class TrackingFileSystemFactory
implements TrinoFileSystemFactory
{
private final TrinoFileSystemFactory delegate;

public TrackingFileSystemFactory(TrinoFileSystemFactory delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
}

@Override
public TrinoFileSystem create(ConnectorIdentity identity)
{
return new TrackingFileSystem(delegate.create(identity), createCleaner());
}

@Override
public TrinoFileSystem create(ConnectorSession session)
{
return new TrackingFileSystem(delegate.create(session), createCleaner());
}

private static Cleaner createCleaner()
{
return Cleaner.create(daemonThreadsNamed("fs-leak-detector-%s"));
}
}
Loading