From 68f14f31cbd9e4878ff999cdc2e8cf8470b4f400 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Fri, 27 Jun 2025 11:25:48 +0200 Subject: [PATCH 1/2] Detect and close I/O leaks in the native FS implementations It is enable by default in the CI to report detected leaks --- .../filesystem/manager/FileSystemConfig.java | 15 ++ .../filesystem/manager/FileSystemModule.java | 10 + .../manager/TestFileSystemConfig.java | 7 +- .../tracking/TrackingFileSystem.java | 177 ++++++++++++++++++ .../tracking/TrackingFileSystemFactory.java | 52 +++++ .../filesystem/tracking/TrackingInput.java | 73 ++++++++ .../tracking/TrackingInputFile.java | 79 ++++++++ .../tracking/TrackingInputStream.java | 148 +++++++++++++++ .../tracking/TrackingOutputFile.java | 71 +++++++ .../tracking/TrackingOutputStream.java | 72 +++++++ .../filesystem/tracking/TrackingState.java | 78 ++++++++ 11 files changed, 780 insertions(+), 2 deletions(-) create mode 100644 lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingFileSystem.java create mode 100644 lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingFileSystemFactory.java create mode 100644 lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingInput.java create mode 100644 lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingInputFile.java create mode 100644 lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingInputStream.java create mode 100644 lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingOutputFile.java create mode 100644 lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingOutputStream.java create mode 100644 lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingState.java diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java index 3b7266e38d45..58719bf3d4d4 100644 --- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemConfig.java @@ -14,6 +14,7 @@ package io.trino.filesystem.manager; import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; public class FileSystemConfig { @@ -24,6 +25,7 @@ public class FileSystemConfig private boolean nativeGcsEnabled; private boolean nativeLocalEnabled; private boolean cacheEnabled; + private boolean trackingEnabled; public boolean isHadoopEnabled() { @@ -108,4 +110,17 @@ public FileSystemConfig setCacheEnabled(boolean enabled) this.cacheEnabled = enabled; return this; } + + public boolean isTrackingEnabled() + { + return trackingEnabled; + } + + @ConfigDescription("Enable input/output stream tracking to detect resource leaks") + @Config("fs.tracking.enabled") + public FileSystemConfig setTrackingEnabled(boolean trackingEnabled) + { + this.trackingEnabled = trackingEnabled; + return this; + } } diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java index 5a397a5e085d..bf8fcfec2d5e 100644 --- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java @@ -45,6 +45,7 @@ import io.trino.filesystem.s3.S3FileSystemModule; import io.trino.filesystem.switching.SwitchingFileSystemFactory; import io.trino.filesystem.tracing.TracingFileSystemFactory; +import io.trino.filesystem.tracking.TrackingFileSystemFactory; import io.trino.spi.NodeManager; import java.util.Map; @@ -54,6 +55,7 @@ import static com.google.inject.multibindings.MapBinder.newMapBinder; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.airlift.configuration.ConfigBinder.configBinder; +import static java.lang.System.getenv; import static java.util.Objects.requireNonNull; public class FileSystemModule @@ -146,6 +148,7 @@ protected void setup(Binder binder) @Provides @Singleton static TrinoFileSystemFactory createFileSystemFactory( + FileSystemConfig config, Optional hdfsFileSystemLoader, Map factories, Optional fileSystemCache, @@ -162,6 +165,13 @@ static TrinoFileSystemFactory createFileSystemFactory( TrinoFileSystemFactory delegate = new SwitchingFileSystemFactory(loader); delegate = new TracingFileSystemFactory(tracer, delegate); + + // Enable leak detection if configured or if running in a CI environment + boolean trackingDetectionEnabled = getenv("CONTINUOUS_INTEGRATION") != null; + if (config.isTrackingEnabled() || trackingDetectionEnabled) { + delegate = new TrackingFileSystemFactory(delegate); + } + if (fileSystemCache.isPresent()) { return new CacheFileSystemFactory(tracer, delegate, fileSystemCache.orElseThrow(), keyProvider.orElseThrow()); } diff --git a/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java b/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java index f7ee83b37534..75005ed4b5c6 100644 --- a/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java +++ b/lib/trino-filesystem-manager/src/test/java/io/trino/filesystem/manager/TestFileSystemConfig.java @@ -34,7 +34,8 @@ public void testDefaults() .setNativeS3Enabled(false) .setNativeGcsEnabled(false) .setNativeLocalEnabled(false) - .setCacheEnabled(false)); + .setCacheEnabled(false) + .setTrackingEnabled(false)); } @Test @@ -48,6 +49,7 @@ public void testExplicitPropertyMappings() .put("fs.native-gcs.enabled", "true") .put("fs.native-local.enabled", "true") .put("fs.cache.enabled", "true") + .put("fs.tracking.enabled", "true") .buildOrThrow(); FileSystemConfig expected = new FileSystemConfig() @@ -57,7 +59,8 @@ public void testExplicitPropertyMappings() .setNativeS3Enabled(true) .setNativeGcsEnabled(true) .setNativeLocalEnabled(true) - .setCacheEnabled(true); + .setCacheEnabled(true) + .setTrackingEnabled(true); assertFullMapping(properties, expected); } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingFileSystem.java new file mode 100644 index 000000000000..dd8fb734ccd1 --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingFileSystem.java @@ -0,0 +1,177 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.tracking; + +import io.airlift.units.Duration; +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoOutputFile; +import io.trino.filesystem.UriLocation; +import io.trino.filesystem.encryption.EncryptionKey; + +import java.io.IOException; +import java.lang.ref.Cleaner; +import java.time.Instant; +import java.util.Collection; +import java.util.Optional; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +public class TrackingFileSystem + implements TrinoFileSystem +{ + private final TrinoFileSystem delegate; + private final Cleaner cleaner; + + public TrackingFileSystem(TrinoFileSystem delegate, Cleaner cleaner) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.cleaner = requireNonNull(cleaner, "cleaner is null"); + } + + @Override + public TrinoInputFile newInputFile(Location location) + { + return new TrackingInputFile(delegate.newInputFile(location), cleaner); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length) + { + return new TrackingInputFile(delegate.newInputFile(location, length), cleaner); + } + + @Override + public TrinoInputFile newInputFile(Location location, long length, Instant lastModified) + { + return new TrackingInputFile(delegate.newInputFile(location, length, lastModified), cleaner); + } + + @Override + public TrinoOutputFile newOutputFile(Location location) + { + return new TrackingOutputFile(delegate.newOutputFile(location), cleaner); + } + + @Override + public void deleteFile(Location location) + throws IOException + { + delegate.deleteFile(location); + } + + @Override + public void deleteDirectory(Location location) + throws IOException + { + delegate.deleteDirectory(location); + } + + @Override + public void renameFile(Location source, Location target) + throws IOException + { + delegate.renameFile(source, target); + } + + @Override + public FileIterator listFiles(Location location) + throws IOException + { + return delegate.listFiles(location); + } + + @Override + public Optional directoryExists(Location location) + throws IOException + { + return delegate.directoryExists(location); + } + + @Override + public void createDirectory(Location location) + throws IOException + { + delegate.createDirectory(location); + } + + @Override + public void renameDirectory(Location source, Location target) + throws IOException + { + delegate.renameDirectory(source, target); + } + + @Override + public Set listDirectories(Location location) + throws IOException + { + return delegate.listDirectories(location); + } + + @Override + public Optional createTemporaryDirectory(Location targetPath, String temporaryPrefix, String relativePrefix) + throws IOException + { + return delegate.createTemporaryDirectory(targetPath, temporaryPrefix, relativePrefix); + } + + @Override + public TrinoInputFile newEncryptedInputFile(Location location, EncryptionKey key) + { + return new TrackingInputFile(delegate.newEncryptedInputFile(location, key), cleaner); + } + + @Override + public TrinoInputFile newEncryptedInputFile(Location location, long length, EncryptionKey key) + { + return new TrackingInputFile(delegate.newEncryptedInputFile(location, length, key), cleaner); + } + + @Override + public TrinoInputFile newEncryptedInputFile(Location location, long length, Instant lastModified, EncryptionKey key) + { + return new TrackingInputFile(delegate.newEncryptedInputFile(location, length, lastModified, key), cleaner); + } + + @Override + public TrinoOutputFile newEncryptedOutputFile(Location location, EncryptionKey key) + { + return new TrackingOutputFile(delegate.newEncryptedOutputFile(location, key), cleaner); + } + + @Override + public void deleteFiles(Collection locations) + throws IOException + { + delegate.deleteFiles(locations); + } + + @Override + public Optional preSignedUri(Location location, Duration ttl) + throws IOException + { + return delegate.preSignedUri(location, ttl); + } + + @Override + public Optional encryptedPreSignedUri(Location location, Duration ttl, EncryptionKey key) + throws IOException + { + return delegate.encryptedPreSignedUri(location, ttl, key); + } +} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingFileSystemFactory.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingFileSystemFactory.java new file mode 100644 index 000000000000..a58c90ef51aa --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingFileSystemFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.tracking; + +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.security.ConnectorIdentity; + +import java.lang.ref.Cleaner; + +import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static java.util.Objects.requireNonNull; + +public class TrackingFileSystemFactory + implements TrinoFileSystemFactory +{ + private final TrinoFileSystemFactory delegate; + + public TrackingFileSystemFactory(TrinoFileSystemFactory delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public TrinoFileSystem create(ConnectorIdentity identity) + { + return new TrackingFileSystem(delegate.create(identity), createCleaner()); + } + + @Override + public TrinoFileSystem create(ConnectorSession session) + { + return new TrackingFileSystem(delegate.create(session), createCleaner()); + } + + private static Cleaner createCleaner() + { + return Cleaner.create(daemonThreadsNamed("fs-leak-detector-%s")); + } +} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingInput.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingInput.java new file mode 100644 index 000000000000..8aa87eb32119 --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingInput.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.tracking; + +import io.airlift.slice.Slice; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInput; + +import java.io.IOException; +import java.lang.ref.Cleaner; + +import static java.util.Objects.requireNonNull; + +public class TrackingInput + implements TrinoInput +{ + private final TrinoInput delegate; + private final TrackingState state; + + public TrackingInput(TrinoInput delegate, Location location, Cleaner cleaner) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.state = new TrackingState(delegate, location); + cleaner.register(this, state); + } + + @Override + public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength) + throws IOException + { + delegate.readFully(position, buffer, bufferOffset, bufferLength); + } + + @Override + public int readTail(byte[] buffer, int bufferOffset, int bufferLength) + throws IOException + { + return delegate.readTail(buffer, bufferOffset, bufferLength); + } + + @Override + public Slice readFully(long position, int length) + throws IOException + { + return delegate.readFully(position, length); + } + + @Override + public Slice readTail(int length) + throws IOException + { + return delegate.readTail(length); + } + + @Override + public void close() + throws IOException + { + state.markClosed(); + delegate.close(); + } +} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingInputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingInputFile.java new file mode 100644 index 000000000000..1727560fee2d --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingInputFile.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.tracking; + +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInput; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoInputStream; + +import java.io.IOException; +import java.lang.ref.Cleaner; +import java.time.Instant; + +import static java.util.Objects.requireNonNull; + +public class TrackingInputFile + implements TrinoInputFile +{ + private final TrinoInputFile delegate; + private final Cleaner cleaner; + + public TrackingInputFile(TrinoInputFile delegate, Cleaner cleaner) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.cleaner = requireNonNull(cleaner, "cleaner is null"); + } + + @Override + public TrinoInput newInput() + throws IOException + { + return new TrackingInput(delegate.newInput(), delegate.location(), cleaner); + } + + @Override + public TrinoInputStream newStream() + throws IOException + { + return new TrackingInputStream(delegate.newStream(), delegate.location(), cleaner); + } + + @Override + public long length() + throws IOException + { + return delegate.length(); + } + + @Override + public Instant lastModified() + throws IOException + { + return delegate.lastModified(); + } + + @Override + public boolean exists() + throws IOException + { + return delegate.exists(); + } + + @Override + public Location location() + { + return delegate.location(); + } +} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingInputStream.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingInputStream.java new file mode 100644 index 000000000000..a75158b64c84 --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingInputStream.java @@ -0,0 +1,148 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.tracking; + +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInputStream; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.ref.Cleaner; + +import static java.util.Objects.requireNonNull; + +public class TrackingInputStream + extends TrinoInputStream +{ + private final TrinoInputStream delegate; + private final TrackingState state; + + public TrackingInputStream(TrinoInputStream delegate, Location location, Cleaner cleaner) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.state = new TrackingState(delegate, location); + cleaner.register(this, state); + } + + @Override + public long getPosition() + throws IOException + { + return delegate.getPosition(); + } + + @Override + public void seek(long position) + throws IOException + { + delegate.seek(position); + } + + @Override + public int read() + throws IOException + { + return delegate.read(); + } + + @Override + public int read(byte[] buffer) + throws IOException + { + return delegate.read(buffer); + } + + @Override + public int read(byte[] buffer, int offset, int length) + throws IOException + { + return delegate.read(buffer, offset, length); + } + + @Override + public byte[] readAllBytes() + throws IOException + { + return delegate.readAllBytes(); + } + + @Override + public byte[] readNBytes(int length) + throws IOException + { + return delegate.readNBytes(length); + } + + @Override + public int readNBytes(byte[] buffer, int offset, int length) + throws IOException + { + return delegate.readNBytes(buffer, offset, length); + } + + @Override + public long skip(long bytes) + throws IOException + { + return delegate.skip(bytes); + } + + @Override + public void skipNBytes(long bytes) + throws IOException + { + delegate.skipNBytes(bytes); + } + + @Override + public int available() + throws IOException + { + return delegate.available(); + } + + @Override + public void close() + throws IOException + { + state.markClosed(); + delegate.close(); + } + + @Override + public void mark(int readlimit) + { + delegate.mark(readlimit); + } + + @Override + public void reset() + throws IOException + { + delegate.reset(); + } + + @Override + public boolean markSupported() + { + return delegate.markSupported(); + } + + @Override + public long transferTo(OutputStream out) + throws IOException + { + return delegate.transferTo(out); + } +} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingOutputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingOutputFile.java new file mode 100644 index 000000000000..a1166088cbf8 --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingOutputFile.java @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.tracking; + +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoOutputFile; +import io.trino.memory.context.AggregatedMemoryContext; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.ref.Cleaner; + +import static java.util.Objects.requireNonNull; + +public class TrackingOutputFile + implements TrinoOutputFile +{ + private final TrinoOutputFile delegate; + private final Cleaner cleaner; + + public TrackingOutputFile(TrinoOutputFile delegate, Cleaner cleaner) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.cleaner = requireNonNull(cleaner, "cleaner is null"); + } + + @Override + public OutputStream create() + throws IOException + { + return new TrackingOutputStream(delegate.create(), delegate.location(), cleaner); + } + + @Override + public void createOrOverwrite(byte[] data) + throws IOException + { + delegate.createOrOverwrite(data); + } + + @Override + public void createExclusive(byte[] data) + throws IOException + { + delegate.createExclusive(data); + } + + @Override + public OutputStream create(AggregatedMemoryContext memoryContext) + throws IOException + { + return new TrackingOutputStream(delegate.create(memoryContext), delegate.location(), cleaner); + } + + @Override + public Location location() + { + return delegate.location(); + } +} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingOutputStream.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingOutputStream.java new file mode 100644 index 000000000000..e5e65fb3dbe6 --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingOutputStream.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.tracking; + +import io.trino.filesystem.Location; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.ref.Cleaner; + +import static java.util.Objects.requireNonNull; + +public class TrackingOutputStream + extends OutputStream +{ + private final OutputStream delegate; + private final TrackingState state; + + public TrackingOutputStream(OutputStream delegate, Location location, Cleaner cleaner) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + this.state = new TrackingState(delegate, location); + cleaner.register(this, state); + } + + @Override + public void write(byte[] buffer) + throws IOException + { + delegate.write(buffer); + } + + @Override + public void write(byte[] buffer, int offset, int length) + throws IOException + { + delegate.write(buffer, offset, length); + } + + @Override + public void flush() + throws IOException + { + delegate.flush(); + } + + @Override + public void close() + throws IOException + { + state.markClosed(); + delegate.close(); + } + + @Override + public void write(int b) + throws IOException + { + delegate.write(b); + } +} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingState.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingState.java new file mode 100644 index 000000000000..fd14fa3a39bd --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracking/TrackingState.java @@ -0,0 +1,78 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.tracking; + +import com.google.common.base.Joiner; +import io.airlift.log.Logger; +import io.trino.filesystem.Location; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; + +public class TrackingState + implements Runnable +{ + private static final Logger LOG = Logger.get(TrackingState.class); + private static final Joiner JOINER = Joiner.on("\n\t\t"); + + private final Location location; + private final Closeable closeable; + private final List stacktrace; + private boolean closed; + + public TrackingState(Closeable closeable, Location location) + { + this.location = requireNonNull(location, "location is null"); + this.closeable = requireNonNull(closeable, "closeable is null"); + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + this.stacktrace = Arrays.stream(stackTrace) + .filter(TrackingState::isNotTrackingFrame) + .collect(toImmutableList()); + } + + public void markClosed() + { + this.closed = true; + } + + @Override + public void run() + { + // This is invoked by cleaner when associated closeable is phantom reachable. + // If markClosed was not called prior to the invocation, resource is considered leaked. + if (!closed) { + LOG.error("%s with location '%s' was not closed properly and leaked. Created by: %s", closeable.getClass().getSimpleName(), location, JOINER.join(stacktrace)); + + try { + closeable.close(); + } + catch (IOException e) { + LOG.error(e, "Failed to close %s with location '%s'", closeable.getClass().getSimpleName(), location); + } + } + } + + private static boolean isNotTrackingFrame(StackTraceElement stackTraceElement) + { + if (stackTraceElement.getClassName().startsWith(TrackingState.class.getPackageName())) { + return false; + } + return !stackTraceElement.getClassName().contains("java.lang.Thread"); + } +} From 6dfa435b0cbf5ec31cb299527b751a7bf621e58b Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Fri, 27 Jun 2025 17:25:55 +0200 Subject: [PATCH 2/2] Pass CONTINUOUS_INTEGRATION to product tests containers --- .../io/trino/tests/product/launcher/cli/TestRun.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/cli/TestRun.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/cli/TestRun.java index 7ed1d66f698b..1fbe795a7338 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/cli/TestRun.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/cli/TestRun.java @@ -328,6 +328,11 @@ private Environment getEnvironment() .setLogsBaseDir(logsDirBase) .setIpv6(ipv6); + if (isEnvSet("CONTINUOUS_INTEGRATION")) { + builder.configureContainers(container -> + container.withEnv("CONTINUOUS_INTEGRATION", "true")); + } + builder.configureContainer(TESTS, this::mountReportsDir); builder.configureContainer(TESTS, container -> { List temptoJavaOptions = Splitter.on(" ").omitEmptyStrings().splitToList( @@ -339,10 +344,6 @@ private Environment getEnvironment() unsafelyExposePort(container, 5007); // debug port } - if (isEnvSet("CONTINUOUS_INTEGRATION")) { - container.withEnv("CONTINUOUS_INTEGRATION", "true"); - } - // Install Java distribution if necessary jdkProvider.applyTo(container) // the test jar is hundreds MB and file system bind is much more efficient