diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index 64f4377a9a6a..a1b4e8870eba 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -183,6 +183,11 @@ implementation is used: creation of more data files, since it uses the append operation to insert the new records. - `true` +* - `iceberg.metadata-cache.enabled` + - Set to `false` to disable in-memory caching of metadata files on the + coordinator. This cache is not used when `fs.cache.enabled` is set to true. + - `true` + ::: (iceberg-fte-support)= diff --git a/lib/trino-filesystem-cache-alluxio/pom.xml b/lib/trino-filesystem-cache-alluxio/pom.xml index 2254892810b2..a969979152db 100644 --- a/lib/trino-filesystem-cache-alluxio/pom.xml +++ b/lib/trino-filesystem-cache-alluxio/pom.xml @@ -58,16 +58,6 @@ opentelemetry-api - - io.opentelemetry - opentelemetry-context - - - - io.opentelemetry.semconv - opentelemetry-semconv - - io.trino trino-filesystem diff --git a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemCache.java b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemCache.java index 590e21987f78..6bd51270f290 100644 --- a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemCache.java +++ b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemCache.java @@ -32,6 +32,7 @@ import jakarta.annotation.PreDestroy; import java.io.IOException; +import java.util.Collection; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; @@ -71,12 +72,25 @@ public TrinoInputStream cacheStream(TrinoInputFile delegate, String key) return new AlluxioInputStream(tracer, delegate, key, uriStatus(delegate, key), new TracingCacheManager(tracer, key, pageSize, cacheManager), config, statistics); } + @Override + public long cacheLength(TrinoInputFile delegate, String key) + throws IOException + { + return delegate.length(); + } + @Override public void expire(Location source) throws IOException { } + @Override + public void expire(Collection locations) + throws IOException + { + } + @PreDestroy public void shutdown() throws Exception diff --git a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInput.java b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInput.java index 78fc3ee6427e..3c05cc4d4fec 100644 --- a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInput.java +++ b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInput.java @@ -24,11 +24,11 @@ import java.io.EOFException; import java.io.IOException; -import static io.trino.filesystem.alluxio.AlluxioTracing.withTracing; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_LOCATION; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_KEY; +import static io.trino.filesystem.tracing.Tracing.withTracing; import static java.lang.Math.min; import static java.util.Objects.checkFromIndexSize; import static java.util.Objects.requireNonNull; diff --git a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInputHelper.java b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInputHelper.java index 7015aacc873d..8c8a307471f9 100644 --- a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInputHelper.java +++ b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInputHelper.java @@ -27,13 +27,13 @@ import java.nio.ByteBuffer; import static com.google.common.base.Preconditions.checkArgument; -import static io.trino.filesystem.alluxio.AlluxioTracing.withTracing; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_LOCATION; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_POSITION; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_SIZE; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_KEY; +import static io.trino.filesystem.tracing.Tracing.withTracing; import static java.lang.Integer.max; import static java.lang.Math.addExact; import static java.lang.Math.min; diff --git a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInputStream.java b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInputStream.java index 5a9d9b301098..a28fb4ca05eb 100644 --- a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInputStream.java +++ b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioInputStream.java @@ -28,11 +28,11 @@ import static com.google.common.base.Verify.verify; import static com.google.common.primitives.Ints.saturatedCast; -import static io.trino.filesystem.alluxio.AlluxioTracing.withTracing; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_LOCATION; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_KEY; +import static io.trino.filesystem.tracing.Tracing.withTracing; import static java.lang.Integer.max; import static java.lang.Math.addExact; import static java.lang.Math.min; diff --git a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTracing.java b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTracing.java deleted file mode 100644 index aa2514d027b7..000000000000 --- a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTracing.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.filesystem.alluxio; - -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.semconv.ExceptionAttributes; - -public class AlluxioTracing -{ - private AlluxioTracing() {} - - public static T withTracing(Span span, CheckedSupplier supplier) - throws E - { - try (var _ = span.makeCurrent()) { - return supplier.get(); - } - catch (Throwable t) { - span.setStatus(StatusCode.ERROR, t.getMessage()); - span.recordException(t, Attributes.of(ExceptionAttributes.EXCEPTION_ESCAPED, true)); - throw t; - } - finally { - span.end(); - } - } - - public static void withTracing(Span span, CheckedRunnable supplier) - throws E - { - try (var _ = span.makeCurrent()) { - supplier.run(); - } - catch (Throwable t) { - span.setStatus(StatusCode.ERROR, t.getMessage()); - span.recordException(t, Attributes.of(ExceptionAttributes.EXCEPTION_ESCAPED, true)); - throw t; - } - finally { - span.end(); - } - } - - public interface CheckedRunnable - { - void run() - throws E; - } - - public interface CheckedSupplier - { - T get() - throws E; - } -} diff --git a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/TracingCacheManager.java b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/TracingCacheManager.java index 3a79a1942666..2dfec18f5cb7 100644 --- a/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/TracingCacheManager.java +++ b/lib/trino-filesystem-cache-alluxio/src/main/java/io/trino/filesystem/alluxio/TracingCacheManager.java @@ -31,12 +31,12 @@ import java.util.function.Predicate; import java.util.function.Supplier; -import static io.trino.filesystem.alluxio.AlluxioTracing.withTracing; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_POSITION; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_SIZE; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_KEY; +import static io.trino.filesystem.tracing.Tracing.withTracing; import static java.util.Objects.requireNonNull; public class TracingCacheManager diff --git a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java index 1836f735db87..0b09dd50fcd0 100644 --- a/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java +++ b/lib/trino-filesystem-manager/src/main/java/io/trino/filesystem/manager/FileSystemModule.java @@ -34,6 +34,8 @@ import io.trino.filesystem.cache.TrinoFileSystemCache; import io.trino.filesystem.gcs.GcsFileSystemFactory; import io.trino.filesystem.gcs.GcsFileSystemModule; +import io.trino.filesystem.memory.MemoryFileSystemCache; +import io.trino.filesystem.memory.MemoryFileSystemCacheModule; import io.trino.filesystem.s3.FileSystemS3; import io.trino.filesystem.s3.S3FileSystemModule; import io.trino.filesystem.switching.SwitchingFileSystemFactory; @@ -54,12 +56,14 @@ public class FileSystemModule private final String catalogName; private final NodeManager nodeManager; private final OpenTelemetry openTelemetry; + private final boolean coordinatorFileCaching; - public FileSystemModule(String catalogName, NodeManager nodeManager, OpenTelemetry openTelemetry) + public FileSystemModule(String catalogName, NodeManager nodeManager, OpenTelemetry openTelemetry, boolean coordinatorFileCaching) { this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null"); + this.coordinatorFileCaching = coordinatorFileCaching; } @Override @@ -107,9 +111,14 @@ protected void setup(Binder binder) newOptionalBinder(binder, CacheKeyProvider.class).setDefault().to(DefaultCacheKeyProvider.class).in(Scopes.SINGLETON); newOptionalBinder(binder, TrinoFileSystemCache.class); + newOptionalBinder(binder, MemoryFileSystemCache.class); + boolean isCoordinator = nodeManager.getCurrentNode().isCoordinator(); if (config.isCacheEnabled()) { - install(new AlluxioFileSystemCacheModule(nodeManager.getCurrentNode().isCoordinator())); + install(new AlluxioFileSystemCacheModule(isCoordinator)); + } + if (coordinatorFileCaching) { + install(new MemoryFileSystemCacheModule(isCoordinator)); } } @@ -119,6 +128,7 @@ static TrinoFileSystemFactory createFileSystemFactory( Optional hdfsFileSystemLoader, Map factories, Optional fileSystemCache, + Optional memoryFileSystemCache, Optional keyProvider, Tracer tracer) { @@ -133,6 +143,10 @@ static TrinoFileSystemFactory createFileSystemFactory( if (fileSystemCache.isPresent()) { delegate = new CacheFileSystemFactory(tracer, delegate, fileSystemCache.orElseThrow(), keyProvider.orElseThrow()); } + // use MemoryFileSystemCache only when no other TrinoFileSystemCache is configured + else if (memoryFileSystemCache.isPresent()) { + delegate = new CacheFileSystemFactory(tracer, delegate, memoryFileSystemCache.orElseThrow(), keyProvider.orElseThrow()); + } return new TracingFileSystemFactory(tracer, delegate); } } diff --git a/lib/trino-filesystem/pom.xml b/lib/trino-filesystem/pom.xml index 7eb04cc32e81..2ba2b7755089 100644 --- a/lib/trino-filesystem/pom.xml +++ b/lib/trino-filesystem/pom.xml @@ -53,6 +53,11 @@ slice + + io.airlift + units + + io.opentelemetry opentelemetry-api @@ -68,6 +73,11 @@ opentelemetry-semconv + + io.trino + trino-cache + + io.trino trino-memory-context @@ -83,6 +93,16 @@ jakarta.annotation-api + + jakarta.validation + jakarta.validation-api + + + + org.weakref + jmxutils + + org.jetbrains annotations @@ -107,6 +127,12 @@ test + + io.airlift + tracing + test + + io.opentelemetry opentelemetry-sdk-trace diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java index 6a5f0521e61a..b2888ceeddff 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheFileSystem.java @@ -23,6 +23,7 @@ import java.io.UncheckedIOException; import java.util.Collection; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import static java.util.Objects.requireNonNull; @@ -44,13 +45,13 @@ public CacheFileSystem(TrinoFileSystem delegate, TrinoFileSystemCache cache, Cac @Override public TrinoInputFile newInputFile(Location location) { - return new CacheInputFile(delegate.newInputFile(location), cache, keyProvider); + return new CacheInputFile(delegate.newInputFile(location), cache, keyProvider, OptionalLong.empty()); } @Override public TrinoInputFile newInputFile(Location location, long length) { - return new CacheInputFile(delegate.newInputFile(location, length), cache, keyProvider); + return new CacheInputFile(delegate.newInputFile(location, length), cache, keyProvider, OptionalLong.of(length)); } @Override @@ -79,6 +80,7 @@ public void deleteDirectory(Location location) throws IOException { delegate.deleteDirectory(location); + cache.expire(location); } @Override @@ -137,8 +139,6 @@ public void deleteFiles(Collection locations) throws IOException { delegate.deleteFiles(locations); - for (var location : locations) { - cache.expire(location); - } + cache.expire(locations); } } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheInputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheInputFile.java index 4ba17e972364..a2fc973fde17 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheInputFile.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/CacheInputFile.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.time.Instant; import java.util.Optional; +import java.util.OptionalLong; import static java.util.Objects.requireNonNull; @@ -30,12 +31,14 @@ public final class CacheInputFile private final TrinoInputFile delegate; private final TrinoFileSystemCache cache; private final CacheKeyProvider keyProvider; + private OptionalLong length; - public CacheInputFile(TrinoInputFile delegate, TrinoFileSystemCache cache, CacheKeyProvider keyProvider) + public CacheInputFile(TrinoInputFile delegate, TrinoFileSystemCache cache, CacheKeyProvider keyProvider, OptionalLong length) { this.delegate = requireNonNull(delegate, "delegate is null"); this.cache = requireNonNull(cache, "cache is null"); this.keyProvider = requireNonNull(keyProvider, "keyProvider is null"); + this.length = requireNonNull(length, "length is null"); } @Override @@ -64,7 +67,16 @@ public TrinoInputStream newStream() public long length() throws IOException { - return delegate.length(); + if (length.isEmpty()) { + Optional key = keyProvider.getCacheKey(delegate); + if (key.isPresent()) { + length = OptionalLong.of(cache.cacheLength(delegate, key.orElseThrow())); + } + else { + length = OptionalLong.of(delegate.length()); + } + } + return length.getAsLong(); } @Override diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/TrinoFileSystemCache.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/TrinoFileSystemCache.java index ba2d49eccdbb..6f4b3e588788 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/TrinoFileSystemCache.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/cache/TrinoFileSystemCache.java @@ -19,6 +19,7 @@ import io.trino.filesystem.TrinoInputStream; import java.io.IOException; +import java.util.Collection; public interface TrinoFileSystemCache { @@ -34,9 +35,21 @@ TrinoInput cacheInput(TrinoInputFile delegate, String key) TrinoInputStream cacheStream(TrinoInputFile delegate, String key) throws IOException; + /** + * Get the length of the TrinoInputFile, potentially using or updating the data cached at key. + */ + long cacheLength(TrinoInputFile delegate, String key) + throws IOException; + /** * Give a hint to the cache that the cache entry for location should be expired. */ void expire(Location location) throws IOException; + + /** + * Give a hint to the cache that the cache entry for locations should be expired. + */ + void expire(Collection locations) + throws IOException; } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystemCache.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystemCache.java new file mode 100644 index 000000000000..09f9602e06a1 --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystemCache.java @@ -0,0 +1,229 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.memory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.Weigher; +import com.google.inject.Inject; +import io.airlift.slice.Slice; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.trino.cache.EvictableCacheBuilder; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoInput; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.TrinoInputStream; +import io.trino.filesystem.cache.TrinoFileSystemCache; +import org.weakref.jmx.Managed; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.NoSuchFileException; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.sizeOf; +import static io.airlift.units.DataSize.Unit.GIGABYTE; +import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_LOCATION; +import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE; +import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_KEY; +import static io.trino.filesystem.tracing.Tracing.withTracing; +import static java.lang.Math.toIntExact; +import static java.util.Objects.requireNonNull; + +public final class MemoryFileSystemCache + implements TrinoFileSystemCache +{ + private final Tracer tracer; + private final Cache> cache; + private final int maxContentLengthBytes; + private final AtomicLong largeFileSkippedCount = new AtomicLong(); + + @Inject + public MemoryFileSystemCache(Tracer tracer, MemoryFileSystemCacheConfig config) + { + this(tracer, config.getCacheTtl(), config.getMaxSize(), config.getMaxContentLength()); + } + + private MemoryFileSystemCache(Tracer tracer, Duration expireAfterWrite, DataSize maxSize, DataSize maxContentLength) + { + this.tracer = requireNonNull(tracer, "tracer is null"); + checkArgument(maxContentLength.compareTo(DataSize.of(1, GIGABYTE)) <= 0, "maxContentLength must be less than or equal to 1GB"); + this.cache = EvictableCacheBuilder.newBuilder() + .maximumWeight(maxSize.toBytes()) + .weigher((Weigher>) (key, value) -> toIntExact(estimatedSizeOf(key) + sizeOf(value, Slice::getRetainedSize))) + .expireAfterWrite(expireAfterWrite.toMillis(), TimeUnit.MILLISECONDS) + .shareNothingWhenDisabled() + .recordStats() + .build(); + this.maxContentLengthBytes = toIntExact(maxContentLength.toBytes()); + } + + @Override + public TrinoInput cacheInput(TrinoInputFile delegate, String key) + throws IOException + { + Optional cachedEntry = getOrLoadFromCache(key, delegate); + if (cachedEntry.isEmpty()) { + largeFileSkippedCount.incrementAndGet(); + Span span = createSpan(key, delegate.location(), "delegateInput"); + return withTracing(span, delegate::newInput); + } + + return new MemoryInput(delegate.location(), cachedEntry.get()); + } + + @Override + public TrinoInputStream cacheStream(TrinoInputFile delegate, String key) + throws IOException + { + Optional cachedEntry = getOrLoadFromCache(key, delegate); + if (cachedEntry.isEmpty()) { + largeFileSkippedCount.incrementAndGet(); + Span span = createSpan(key, delegate.location(), "delegateStream"); + return withTracing(span, delegate::newStream); + } + + return new MemoryInputStream(delegate.location(), cachedEntry.get()); + } + + @Override + public long cacheLength(TrinoInputFile delegate, String key) + throws IOException + { + Optional cachedEntry = getOrLoadFromCache(key, delegate); + if (cachedEntry.isEmpty()) { + largeFileSkippedCount.incrementAndGet(); + Span span = createSpan(key, delegate.location(), "delegateLength"); + return withTracing(span, delegate::length); + } + + return cachedEntry.get().length(); + } + + @Override + public void expire(Location location) + throws IOException + { + List expired = cache.asMap().keySet().stream() + .filter(key -> key.startsWith(location.path())) + .collect(toImmutableList()); + cache.invalidateAll(expired); + } + + @Override + public void expire(Collection locations) + throws IOException + { + List expired = cache.asMap().keySet().stream() + .filter(key -> locations.stream().map(Location::path).anyMatch(key::startsWith)) + .collect(toImmutableList()); + cache.invalidateAll(expired); + } + + @Managed + public void flushCache() + { + cache.invalidateAll(); + } + + @Managed + public long getHitCount() + { + return cache.stats().hitCount(); + } + + @Managed + public long getRequestCount() + { + return cache.stats().requestCount(); + } + + @Managed + public long getLargeFileSkippedCount() + { + return largeFileSkippedCount.get(); + } + + @VisibleForTesting + boolean isCached(String key) + { + Optional cachedEntry = cache.getIfPresent(key); + return cachedEntry != null && cachedEntry.isPresent(); + } + + private Optional getOrLoadFromCache(String key, TrinoInputFile delegate) + throws IOException + { + try { + return cache.get(key, () -> load(key, delegate)); + } + catch (ExecutionException e) { + throw handleException(delegate.location(), e.getCause()); + } + } + + private Optional load(String key, TrinoInputFile delegate) + throws IOException + { + Span span = createSpan(key, delegate.location(), "loadCache"); + return withTracing(span, () -> { + long fileSize = delegate.length(); + span.setAttribute(CACHE_FILE_READ_SIZE, fileSize); + if (fileSize > maxContentLengthBytes) { + return Optional.empty(); + } + try (TrinoInput trinoInput = delegate.newInput()) { + return Optional.of(trinoInput.readTail(toIntExact(fileSize))); + } + }); + } + + private Span createSpan(String key, Location location, String name) + { + return tracer.spanBuilder("MemoryFileSystemCache." + name) + .setAttribute(CACHE_KEY, key) + .setAttribute(CACHE_FILE_LOCATION, location.toString()) + .startSpan(); + } + + private static IOException handleException(Location location, Throwable cause) + throws IOException + { + if (cause instanceof FileNotFoundException || cause instanceof NoSuchFileException) { + throw withCause(new FileNotFoundException(location.toString()), cause); + } + if (cause instanceof FileAlreadyExistsException) { + throw withCause(new FileAlreadyExistsException(location.toString()), cause); + } + throw new IOException(cause.getMessage() + ": " + location, cause); + } + + private static T withCause(T throwable, Throwable cause) + { + throwable.initCause(cause); + return throwable; + } +} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystemCacheConfig.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystemCacheConfig.java new file mode 100644 index 000000000000..4849c716456a --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystemCacheConfig.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.memory; + +import com.google.common.annotations.VisibleForTesting; +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import io.airlift.units.MaxDataSize; +import jakarta.validation.constraints.NotNull; + +import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static java.util.concurrent.TimeUnit.HOURS; + +public class MemoryFileSystemCacheConfig +{ + // Runtime.getRuntime().maxMemory() is not 100% stable and may return slightly different value over JVM lifetime. We use + // constant so default configuration for cache size is stable. + @VisibleForTesting + static final DataSize DEFAULT_CACHE_SIZE = DataSize.succinctBytes(Math.min( + Math.floorDiv(Runtime.getRuntime().maxMemory(), 20L), + DataSize.of(200, MEGABYTE).toBytes())); + + private Duration ttl = new Duration(1, HOURS); + private DataSize maxSize = DEFAULT_CACHE_SIZE; + private DataSize maxContentLength = DataSize.of(8, MEGABYTE); + + @NotNull + public Duration getCacheTtl() + { + return ttl; + } + + @Config("fs.memory-cache.ttl") + @ConfigDescription("Duration to keep files in the cache prior to eviction") + public MemoryFileSystemCacheConfig setCacheTtl(Duration ttl) + { + this.ttl = ttl; + return this; + } + + @NotNull + public DataSize getMaxSize() + { + return maxSize; + } + + @Config("fs.memory-cache.max-size") + @ConfigDescription("Maximum total size of the cache") + public MemoryFileSystemCacheConfig setMaxSize(DataSize maxSize) + { + this.maxSize = maxSize; + return this; + } + + @NotNull + // Avoids humongous allocations with the recommended G1HeapRegionSize of 32MB and prevents a few big files from hogging cache space + @MaxDataSize("15MB") + public DataSize getMaxContentLength() + { + return maxContentLength; + } + + @Config("fs.memory-cache.max-content-length") + @ConfigDescription("Maximum size of file that can be cached") + public MemoryFileSystemCacheConfig setMaxContentLength(DataSize maxContentLength) + { + this.maxContentLength = maxContentLength; + return this; + } +} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystemCacheModule.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystemCacheModule.java new file mode 100644 index 000000000000..1b4bd9470dd9 --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/memory/MemoryFileSystemCacheModule.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.memory; + +import com.google.inject.Binder; +import com.google.inject.Provider; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.spi.catalog.CatalogName; + +import static com.google.inject.Scopes.SINGLETON; +import static io.airlift.configuration.ConfigBinder.configBinder; +import static org.weakref.jmx.guice.ExportBinder.newExporter; + +public class MemoryFileSystemCacheModule + extends AbstractConfigurationAwareModule +{ + private final boolean isCoordinator; + + public MemoryFileSystemCacheModule(boolean isCoordinator) + { + this.isCoordinator = isCoordinator; + } + + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(MemoryFileSystemCacheConfig.class); + if (isCoordinator) { + binder.bind(MemoryFileSystemCache.class).in(SINGLETON); + Provider catalogName = binder.getProvider(CatalogName.class); + newExporter(binder).export(MemoryFileSystemCache.class) + .as(generator -> generator.generatedNameOf(MemoryFileSystemCache.class, catalogName.get().toString())); + } + } +} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/CacheSystemAttributes.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/CacheSystemAttributes.java index d9d8082aa4f2..09ee2ce4297f 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/CacheSystemAttributes.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/CacheSystemAttributes.java @@ -24,6 +24,7 @@ private CacheSystemAttributes() {} public static final AttributeKey CACHE_KEY = stringKey("trino.cache.key"); public static final AttributeKey CACHE_FILE_LOCATION = stringKey("trino.cache.file.location"); + public static final AttributeKey CACHE_FILE_LOCATION_COUNT = longKey("trino.cache.file.location_count"); public static final AttributeKey CACHE_FILE_READ_SIZE = longKey("trino.cache.read_size"); public static final AttributeKey CACHE_FILE_READ_POSITION = longKey("trino.cache.read_position"); public static final AttributeKey CACHE_FILE_WRITE_SIZE = longKey("trino.cache.write_size"); diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/Tracing.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/Tracing.java index 78254fdb25d9..b160b3360f08 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/Tracing.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/Tracing.java @@ -21,7 +21,7 @@ import java.util.Optional; -final class Tracing +public final class Tracing { private Tracing() {} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystemCache.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystemCache.java index cd06fa8a1449..9b61f5307850 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystemCache.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingFileSystemCache.java @@ -22,8 +22,10 @@ import io.trino.filesystem.cache.TrinoFileSystemCache; import java.io.IOException; +import java.util.Collection; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_LOCATION; +import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_LOCATION_COUNT; import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_KEY; import static io.trino.filesystem.tracing.Tracing.withTracing; import static java.util.Objects.requireNonNull; @@ -64,6 +66,18 @@ public TrinoInputStream cacheStream(TrinoInputFile delegate, String key) return withTracing(span, () -> this.delegate.cacheStream(delegate, key)); } + @Override + public long cacheLength(TrinoInputFile delegate, String key) + throws IOException + { + Span span = tracer.spanBuilder("FileSystemCache.cacheLength") + .setAttribute(CACHE_FILE_LOCATION, delegate.location().toString()) + .setAttribute(CACHE_KEY, key) + .startSpan(); + + return withTracing(span, () -> this.delegate.cacheLength(delegate, key)); + } + @Override public void expire(Location location) throws IOException @@ -74,4 +88,15 @@ public void expire(Location location) withTracing(span, () -> delegate.expire(location)); } + + @Override + public void expire(Collection locations) + throws IOException + { + Span span = tracer.spanBuilder("FileSystemCache.expire") + .setAttribute(CACHE_FILE_LOCATION_COUNT, (long) locations.size()) + .startSpan(); + + withTracing(span, () -> delegate.expire(locations)); + } } diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingInputFile.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingInputFile.java index c46c877ed491..313b1abf38c4 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingInputFile.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/tracing/TracingInputFile.java @@ -34,6 +34,7 @@ final class TracingInputFile private final Tracer tracer; private final TrinoInputFile delegate; private Optional length; + private boolean lastModifiedRequested; public TracingInputFile(Tracer tracer, TrinoInputFile delegate, Optional length) { @@ -85,10 +86,17 @@ public long length() public Instant lastModified() throws IOException { + // skip tracing if lastModified is cached, but delegate anyway + if (lastModifiedRequested) { + return delegate.lastModified(); + } + Span span = tracer.spanBuilder("InputFile.lastModified") .setAttribute(FileSystemAttributes.FILE_LOCATION, toString()) .startSpan(); - return withTracing(span, delegate::lastModified); + Instant fileLastModified = withTracing(span, delegate::lastModified); + lastModifiedRequested = true; + return fileLastModified; } @Override diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestCacheFileSystem.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestCacheFileSystem.java deleted file mode 100644 index 4fd6c095bfa4..000000000000 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestCacheFileSystem.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.filesystem.cache; - -import io.trino.filesystem.AbstractTestTrinoFileSystem; -import io.trino.filesystem.Location; -import io.trino.filesystem.TrinoFileSystem; -import io.trino.filesystem.memory.MemoryFileSystem; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; - -import static org.assertj.core.api.Assertions.assertThat; - -public class TestCacheFileSystem - extends AbstractTestTrinoFileSystem -{ - private MemoryFileSystem delegate; - private CacheFileSystem fileSystem; - - @BeforeAll - void setUp() - { - delegate = new MemoryFileSystem(); - fileSystem = new CacheFileSystem(delegate, new TestingMemoryFileSystemCache(), new DefaultCacheKeyProvider()); - } - - @AfterAll - void tearDown() - { - delegate = null; - fileSystem = null; - } - - @Override - protected boolean isHierarchical() - { - return false; - } - - @Override - protected boolean supportsCreateExclusive() - { - return true; - } - - @Override - protected TrinoFileSystem getFileSystem() - { - return fileSystem; - } - - @Override - protected Location getRootLocation() - { - return Location.of("memory://"); - } - - @Override - protected void verifyFileSystemIsEmpty() - { - assertThat(delegate.isEmpty()).isTrue(); - } -} diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestCacheFileSystemAccessOperations.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestCacheFileSystemAccessOperations.java index 9dce610521c8..d942c86e86c6 100644 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestCacheFileSystemAccessOperations.java +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestCacheFileSystemAccessOperations.java @@ -17,11 +17,14 @@ import com.google.common.collect.ImmutableMultiset; import com.google.common.collect.Multiset; import io.airlift.slice.Slices; +import io.airlift.units.Duration; import io.opentelemetry.sdk.trace.data.SpanData; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInput; import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.memory.MemoryFileSystemCache; +import io.trino.filesystem.memory.MemoryFileSystemCacheConfig; import io.trino.filesystem.memory.MemoryFileSystemFactory; import io.trino.filesystem.tracing.TracingFileSystemFactory; import io.trino.spi.block.TestingSession; @@ -37,8 +40,10 @@ import java.nio.charset.StandardCharsets; import java.util.List; +import static io.airlift.tracing.Tracing.noopTracer; import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_LOCATION; import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; +import static java.util.concurrent.TimeUnit.HOURS; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.TestInstance.Lifecycle; @@ -53,7 +58,9 @@ public class TestCacheFileSystemAccessOperations void setUp() { tracingFileSystemFactory = new TracingFileSystemFactory(telemetry.getTracer(), new MemoryFileSystemFactory()); - fileSystem = new CacheFileSystem(tracingFileSystemFactory.create(TestingSession.SESSION), new TestingMemoryFileSystemCache(), new DefaultCacheKeyProvider()); + MemoryFileSystemCacheConfig configuration = new MemoryFileSystemCacheConfig() + .setCacheTtl(new Duration(24, HOURS)); + fileSystem = new CacheFileSystem(tracingFileSystemFactory.create(TestingSession.SESSION), new MemoryFileSystemCache(noopTracer(), configuration), new DefaultCacheKeyProvider()); } @AfterAll @@ -76,12 +83,11 @@ void testCache() assertReadOperations(location, content, ImmutableMultiset.builder() .add(new FileOperation(location, "InputFile.length")) - .add(new FileOperation(location, "InputFile.newStream")) + .add(new FileOperation(location, "InputFile.newInput")) .add(new FileOperation(location, "InputFile.lastModified")) .build()); assertReadOperations(location, content, ImmutableMultiset.builder() - .add(new FileOperation(location, "InputFile.length")) .add(new FileOperation(location, "InputFile.lastModified")) .build()); @@ -91,7 +97,7 @@ void testCache() assertReadOperations(location, modifiedContent, ImmutableMultiset.builder() .add(new FileOperation(location, "InputFile.length")) - .add(new FileOperation(location, "InputFile.newStream")) + .add(new FileOperation(location, "InputFile.newInput")) .add(new FileOperation(location, "InputFile.lastModified")) .build()); } @@ -112,7 +118,7 @@ private void assertReadOperations(Location location, byte[] content, Multiset getOperations(List spans) diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestingMemoryFileSystemCache.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestingMemoryFileSystemCache.java deleted file mode 100644 index 52725fbc85e2..000000000000 --- a/lib/trino-filesystem/src/test/java/io/trino/filesystem/cache/TestingMemoryFileSystemCache.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.filesystem.cache; - -import io.trino.filesystem.Location; -import io.trino.filesystem.TrinoInput; -import io.trino.filesystem.TrinoInputFile; -import io.trino.filesystem.TrinoInputStream; -import io.trino.filesystem.memory.MemoryFileSystem; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.concurrent.atomic.AtomicInteger; - -public class TestingMemoryFileSystemCache - implements TrinoFileSystemCache -{ - private final MemoryFileSystem memoryCache = new MemoryFileSystem(); - private final AtomicInteger cacheGeneration = new AtomicInteger(0); - - @Override - public TrinoInput cacheInput(TrinoInputFile delegate, String key) - throws IOException - { - Location cacheLocation = Location.of("memory:///" + key.replace("memory:///", "") + key.hashCode() + cacheGeneration.get()); - TrinoInputFile cacheEntry = memoryCache.newInputFile(cacheLocation); - if (!cacheEntry.exists()) { - try (OutputStream output = memoryCache.newOutputFile(cacheLocation).create(); - InputStream input = delegate.newStream()) { - input.transferTo(output); - } - } - return cacheEntry.newInput(); - } - - @Override - public TrinoInputStream cacheStream(TrinoInputFile delegate, String key) - throws IOException - { - Location cacheLocation = Location.of("memory:///" + key.replace("memory:///", "") + key.hashCode() + cacheGeneration.get()); - TrinoInputFile cacheEntry = memoryCache.newInputFile(cacheLocation); - if (!cacheEntry.exists()) { - try (OutputStream output = memoryCache.newOutputFile(cacheLocation).create(); - InputStream input = delegate.newStream()) { - input.transferTo(output); - } - } - return cacheEntry.newStream(); - } - - @Override - public void expire(Location location) - throws IOException - { - // Expire the entire cache on a single invalidation - cacheGeneration.incrementAndGet(); - } -} diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/memory/TestMemoryFileSystemCache.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/memory/TestMemoryFileSystemCache.java new file mode 100644 index 000000000000..49ff0faaab12 --- /dev/null +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/memory/TestMemoryFileSystemCache.java @@ -0,0 +1,131 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.memory; + +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import io.trino.filesystem.AbstractTestTrinoFileSystem; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoInput; +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.cache.CacheFileSystem; +import io.trino.filesystem.cache.CacheKeyProvider; +import io.trino.filesystem.cache.DefaultCacheKeyProvider; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.UUID; + +import static io.airlift.tracing.Tracing.noopTracer; +import static java.util.concurrent.TimeUnit.HOURS; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestMemoryFileSystemCache + extends AbstractTestTrinoFileSystem +{ + private static final int MAX_CONTENT_LENGTH = 2 * 1024 * 1024; + + private MemoryFileSystem delegate; + private CacheFileSystem fileSystem; + private MemoryFileSystemCache cache; + private CacheKeyProvider cacheKeyProvider; + + @BeforeAll + void beforeAll() + { + MemoryFileSystemCacheConfig configuration = new MemoryFileSystemCacheConfig() + .setMaxContentLength(DataSize.ofBytes(MAX_CONTENT_LENGTH)) + .setCacheTtl(new Duration(8, HOURS)); + delegate = new MemoryFileSystem(); + cache = new MemoryFileSystemCache(noopTracer(), configuration); + cacheKeyProvider = new DefaultCacheKeyProvider(); + fileSystem = new CacheFileSystem(delegate, cache, cacheKeyProvider); + } + + @Override + protected boolean isHierarchical() + { + return false; + } + + @Override + protected boolean supportsCreateExclusive() + { + return true; + } + + @Override + protected TrinoFileSystem getFileSystem() + { + return fileSystem; + } + + @Override + protected Location getRootLocation() + { + return Location.of("memory://"); + } + + @Override + protected void verifyFileSystemIsEmpty() + { + assertThat(delegate.isEmpty()).isTrue(); + } + + @Test + public void testMaxContentLength() + throws IOException + { + int fileSize = MAX_CONTENT_LENGTH + 200; + Location location = writeFile(fileSize); + TrinoInputFile inputFile = getFileSystem().newInputFile(location); + long largeFileSkippedCount = cache.getLargeFileSkippedCount(); + try (TrinoInput input = inputFile.newInput()) { + input.readTail(fileSize); + } + assertThat(cache.getLargeFileSkippedCount()).isGreaterThan(largeFileSkippedCount); + assertThat(cache.isCached(cacheKeyProvider.getCacheKey(inputFile).orElseThrow())).isFalse(); + getFileSystem().deleteFile(location); + + fileSize = MAX_CONTENT_LENGTH - 200; + location = writeFile(fileSize); + inputFile = getFileSystem().newInputFile(location); + try (TrinoInput input = inputFile.newInput()) { + input.readTail(fileSize); + } + assertThat(cache.isCached(cacheKeyProvider.getCacheKey(inputFile).orElseThrow())).isTrue(); + getFileSystem().deleteFile(location); + } + + private Location writeFile(int fileSize) + throws IOException + { + Location location = getRootLocation().appendPath("testMaxContentLength-%s".formatted(UUID.randomUUID())); + getFileSystem().deleteFile(location); + try (OutputStream outputStream = getFileSystem().newOutputFile(location).create()) { + byte[] bytes = new byte[8192]; + Arrays.fill(bytes, (byte) 'a'); + int count = 0; + while (count < fileSize) { + outputStream.write(bytes, 0, Math.min(bytes.length, fileSize - count)); + count += bytes.length; + } + } + return location; + } +} diff --git a/lib/trino-filesystem/src/test/java/io/trino/filesystem/memory/TestMemoryFileSystemCacheConfig.java b/lib/trino-filesystem/src/test/java/io/trino/filesystem/memory/TestMemoryFileSystemCacheConfig.java new file mode 100644 index 000000000000..bf0648949ce1 --- /dev/null +++ b/lib/trino-filesystem/src/test/java/io/trino/filesystem/memory/TestMemoryFileSystemCacheConfig.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.memory; + +import com.google.common.collect.ImmutableMap; +import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; +import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static io.trino.filesystem.memory.MemoryFileSystemCacheConfig.DEFAULT_CACHE_SIZE; +import static java.util.concurrent.TimeUnit.HOURS; + +public class TestMemoryFileSystemCacheConfig +{ + @Test + void testDefaults() + { + assertRecordedDefaults(recordDefaults(MemoryFileSystemCacheConfig.class) + .setMaxSize(DEFAULT_CACHE_SIZE) + .setCacheTtl(new Duration(1, HOURS)) + .setMaxContentLength(DataSize.of(8, MEGABYTE))); + } + + @Test + public void testExplicitPropertyMappings() + throws IOException + { + Map properties = ImmutableMap.builder() + .put("fs.memory-cache.max-size", "10MB") + .put("fs.memory-cache.max-content-length", "1MB") + .put("fs.memory-cache.ttl", "8h") + .buildOrThrow(); + + MemoryFileSystemCacheConfig expected = new MemoryFileSystemCacheConfig() + .setMaxSize(DataSize.of(10, MEGABYTE)) + .setCacheTtl(new Duration(8, HOURS)) + .setMaxContentLength(DataSize.of(1, MEGABYTE)); + + assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnectorFactory.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnectorFactory.java index 4c577fe2627d..8dcde95ece24 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnectorFactory.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnectorFactory.java @@ -101,7 +101,7 @@ public static Connector createConnector( new DeltaLakeModule(), new DeltaLakeSecurityModule(), new DeltaLakeSynchronizerModule(), - new FileSystemModule(catalogName, context.getNodeManager(), context.getOpenTelemetry()), + new FileSystemModule(catalogName, context.getNodeManager(), context.getOpenTelemetry(), false), binder -> { binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry()); binder.bind(Tracer.class).toInstance(context.getTracer()); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeGlueMetastore.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeGlueMetastore.java index 437b49838c37..39b209875473 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeGlueMetastore.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeGlueMetastore.java @@ -128,7 +128,7 @@ public void setUp() new DeltaLakeMetastoreModule(), new DeltaLakeModule(), // test setup - new FileSystemModule("test", context.getNodeManager(), context.getOpenTelemetry())); + new FileSystemModule("test", context.getNodeManager(), context.getOpenTelemetry(), false)); Injector injector = app .doNotInitializeLogging() diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnectorFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnectorFactory.java index 2954d81a989c..4fcc498fa561 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnectorFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConnectorFactory.java @@ -106,7 +106,7 @@ public static Connector createConnector( new HiveSecurityModule(), fileSystemFactory .map(factory -> (Module) binder -> binder.bind(TrinoFileSystemFactory.class).toInstance(factory)) - .orElseGet(() -> new FileSystemModule(catalogName, context.getNodeManager(), context.getOpenTelemetry())), + .orElseGet(() -> new FileSystemModule(catalogName, context.getNodeManager(), context.getOpenTelemetry(), false)), new HiveProcedureModule(), new MBeanServerModule(), binder -> { diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java index f4078bb2681d..dc98f38f4c2d 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnectorFactory.java @@ -80,7 +80,7 @@ public static Connector createConnector( new JsonModule(), new HudiModule(), new HiveMetastoreModule(Optional.empty()), - new FileSystemModule(catalogName, context.getNodeManager(), context.getOpenTelemetry()), + new FileSystemModule(catalogName, context.getNodeManager(), context.getOpenTelemetry(), false), new MBeanServerModule(), module.orElse(EMPTY_MODULE), binder -> { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 366f4a3712c9..666b80a48cd5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -85,6 +85,7 @@ public class IcebergConfig private Set queryPartitionFilterRequiredSchemas = ImmutableSet.of(); private int splitManagerThreads = Runtime.getRuntime().availableProcessors() * 2; private boolean incrementalRefreshEnabled = true; + private boolean metadataCacheEnabled = true; public CatalogType getCatalogType() { @@ -469,4 +470,17 @@ public boolean isStorageSchemaSetWhenHidingIsEnabled() { return hideMaterializedViewStorageTable && materializedViewsStorageSchema.isPresent(); } + + public boolean isMetadataCacheEnabled() + { + return metadataCacheEnabled; + } + + @Config("iceberg.metadata-cache.enabled") + @ConfigDescription("Enables in-memory caching of metadata files on coordinator if fs.cache.enabled is not set to true") + public IcebergConfig setMetadataCacheEnabled(boolean metadataCacheEnabled) + { + this.metadataCacheEnabled = metadataCacheEnabled; + return this; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnectorFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnectorFactory.java index 2e64a621b2af..b554bafbd16d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnectorFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConnectorFactory.java @@ -13,11 +13,13 @@ */ package io.trino.plugin.iceberg; +import com.google.inject.Binder; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; import io.airlift.bootstrap.Bootstrap; import io.airlift.bootstrap.LifeCycleManager; +import io.airlift.configuration.AbstractConfigurationAwareModule; import io.airlift.event.client.EventModule; import io.airlift.json.JsonModule; import io.opentelemetry.api.OpenTelemetry; @@ -61,6 +63,7 @@ import static com.google.common.base.Verify.verify; import static com.google.inject.util.Modules.EMPTY_MODULE; import static io.trino.plugin.base.Versions.checkStrictSpiVersionMatch; +import static java.util.Objects.requireNonNull; public class IcebergConnectorFactory implements ConnectorFactory @@ -96,7 +99,7 @@ public static Connector createConnector( new IcebergSecurityModule(), icebergCatalogModule.orElse(new IcebergCatalogModule()), new MBeanServerModule(), - new FileSystemModule(catalogName, context.getNodeManager(), context.getOpenTelemetry()), + new IcebergFileSystemModule(catalogName, context), binder -> { binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry()); binder.bind(Tracer.class).toInstance(context.getTracer()); @@ -153,4 +156,26 @@ public static Connector createConnector( functionProvider); } } + + private static class IcebergFileSystemModule + extends AbstractConfigurationAwareModule + { + private final String catalogName; + private final NodeManager nodeManager; + private final OpenTelemetry openTelemetry; + + public IcebergFileSystemModule(String catalogName, ConnectorContext context) + { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + this.nodeManager = context.getNodeManager(); + this.openTelemetry = context.getOpenTelemetry(); + } + + @Override + protected void setup(Binder binder) + { + boolean metadataCacheEnabled = buildConfigObject(IcebergConfig.class).isMetadataCacheEnabled(); + install(new FileSystemModule(catalogName, nodeManager, openTelemetry, metadataCacheEnabled)); + } + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAlluxioCacheFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAlluxioCacheFileOperations.java index 03371001b044..9a3363cf7804 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAlluxioCacheFileOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAlluxioCacheFileOperations.java @@ -63,6 +63,7 @@ protected DistributedQueryRunner createQueryRunner() .put("fs.cache.enabled", "true") .put("fs.cache.directories", cacheDirectory.toAbsolutePath().toString()) .put("fs.cache.max-sizes", "100MB") + .put("iceberg.metadata-cache.enabled", "false") .put("hive.metastore.catalog.dir", metastoreDirectory.toUri().toString()) .buildOrThrow(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index f77617882ae3..286622dda9b4 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -70,7 +70,8 @@ public void testDefaults() .setQueryPartitionFilterRequired(false) .setQueryPartitionFilterRequiredSchemas(ImmutableSet.of()) .setSplitManagerThreads(Runtime.getRuntime().availableProcessors() * 2) - .setIncrementalRefreshEnabled(true)); + .setIncrementalRefreshEnabled(true) + .setMetadataCacheEnabled(true)); } @Test @@ -104,6 +105,7 @@ public void testExplicitPropertyMappings() .put("iceberg.query-partition-filter-required-schemas", "bronze,silver") .put("iceberg.split-manager-threads", "42") .put("iceberg.incremental-refresh-enabled", "false") + .put("iceberg.metadata-cache.enabled", "false") .buildOrThrow(); IcebergConfig expected = new IcebergConfig() @@ -133,7 +135,8 @@ public void testExplicitPropertyMappings() .setQueryPartitionFilterRequired(true) .setQueryPartitionFilterRequiredSchemas(ImmutableSet.of("bronze", "silver")) .setSplitManagerThreads(42) - .setIncrementalRefreshEnabled(false); + .setIncrementalRefreshEnabled(false) + .setMetadataCacheEnabled(false); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMemoryCacheFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMemoryCacheFileOperations.java new file mode 100644 index 000000000000..fc8527938666 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMemoryCacheFileOperations.java @@ -0,0 +1,197 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultiset; +import com.google.common.collect.Multiset; +import io.opentelemetry.api.common.Attributes; +import io.trino.plugin.iceberg.util.FileOperationUtils; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import org.intellij.lang.annotations.Language; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_LOCATION; +import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.DATA; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.MANIFEST; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.METADATA_JSON; +import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.SNAPSHOT; +import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toCollection; + +@Execution(ExecutionMode.SAME_THREAD) +public class TestIcebergMemoryCacheFileOperations + extends AbstractTestQueryFramework +{ + private static final String TEST_SCHEMA = "test_memory_schema"; + + @Override + protected DistributedQueryRunner createQueryRunner() + throws Exception + { + Path metastoreDirectory = Files.createTempDirectory(ICEBERG_CATALOG); + closeAfterClass(() -> deleteRecursively(metastoreDirectory, ALLOW_INSECURE)); + + Map icebergProperties = ImmutableMap.builder() + .put("iceberg.metadata-cache.enabled", "true") + .put("hive.metastore.catalog.dir", metastoreDirectory.toUri().toString()) + .buildOrThrow(); + + DistributedQueryRunner queryRunner = IcebergQueryRunner.builder() + .setSchemaInitializer(SchemaInitializer.builder() + .withSchemaName(TEST_SCHEMA) + .build()) + .setIcebergProperties(icebergProperties) + .setWorkerCount(0) + .build(); + queryRunner.execute("CREATE SCHEMA IF NOT EXISTS " + TEST_SCHEMA); + return queryRunner; + } + + @Test + public void testCacheFileOperations() + { + assertUpdate("DROP TABLE IF EXISTS test_cache_file_operations"); + assertUpdate("CREATE TABLE test_cache_file_operations(key varchar, data varchar) with (partitioning=ARRAY['key'])"); + assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p1', '1-abc')", 1); + assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p2', '2-xyz')", 1); + assertFileSystemAccesses( + "SELECT * FROM test_cache_file_operations", + ImmutableMultiset.builder() + .addCopies(new CacheOperation("MemoryFileSystemCache.loadCache", DATA), 2) + .addCopies(new CacheOperation("FileSystemCache.cacheInput", DATA), 2) + .add(new CacheOperation("MemoryFileSystemCache.loadCache", METADATA_JSON)) + .add(new CacheOperation("FileSystemCache.cacheStream", METADATA_JSON)) + .add(new CacheOperation("FileSystemCache.cacheLength", SNAPSHOT)) + .add(new CacheOperation("FileSystemCache.cacheStream", SNAPSHOT)) + .add(new CacheOperation("MemoryFileSystemCache.loadCache", MANIFEST)) + .addCopies(new CacheOperation("FileSystemCache.cacheStream", MANIFEST), 2) + .build()); + + assertFileSystemAccesses( + "SELECT * FROM test_cache_file_operations", + ImmutableMultiset.builder() + .addCopies(new CacheOperation("FileSystemCache.cacheInput", DATA), 2) + .add(new CacheOperation("FileSystemCache.cacheStream", METADATA_JSON)) + .add(new CacheOperation("FileSystemCache.cacheLength", SNAPSHOT)) + .add(new CacheOperation("FileSystemCache.cacheStream", SNAPSHOT)) + .addCopies(new CacheOperation("FileSystemCache.cacheStream", MANIFEST), 2) + .build()); + + assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p3', '3-xyz')", 1); + assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p4', '4-xyz')", 1); + assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p5', '5-xyz')", 1); + + assertFileSystemAccesses( + "SELECT * FROM test_cache_file_operations", + ImmutableMultiset.builder() + .addCopies(new CacheOperation("MemoryFileSystemCache.loadCache", DATA), 3) + .addCopies(new CacheOperation("FileSystemCache.cacheInput", DATA), 5) + .add(new CacheOperation("MemoryFileSystemCache.loadCache", METADATA_JSON)) + .add(new CacheOperation("FileSystemCache.cacheStream", METADATA_JSON)) + .add(new CacheOperation("FileSystemCache.cacheLength", SNAPSHOT)) + .add(new CacheOperation("FileSystemCache.cacheStream", SNAPSHOT)) + .add(new CacheOperation("MemoryFileSystemCache.loadCache", MANIFEST)) + .addCopies(new CacheOperation("FileSystemCache.cacheStream", MANIFEST), 5) + .build()); + assertFileSystemAccesses( + "SELECT * FROM test_cache_file_operations", + ImmutableMultiset.builder() + .addCopies(new CacheOperation("FileSystemCache.cacheInput", DATA), 5) + .add(new CacheOperation("FileSystemCache.cacheStream", METADATA_JSON)) + .add(new CacheOperation("FileSystemCache.cacheLength", SNAPSHOT)) + .add(new CacheOperation("FileSystemCache.cacheStream", SNAPSHOT)) + .addCopies(new CacheOperation("FileSystemCache.cacheStream", MANIFEST), 5) + .build()); + } + + @Test + public void testSelectWithFilter() + { + assertUpdate("CREATE TABLE test_select_with_filter AS SELECT 1 col_name", 1); + assertFileSystemAccesses( + "SELECT * FROM test_select_with_filter WHERE col_name = 1", + ImmutableMultiset.builder() + .add(new CacheOperation("MemoryFileSystemCache.loadCache", METADATA_JSON)) + .add(new CacheOperation("FileSystemCache.cacheStream", METADATA_JSON)) + .add(new CacheOperation("FileSystemCache.cacheStream", SNAPSHOT)) + .add(new CacheOperation("FileSystemCache.cacheLength", SNAPSHOT)) + .add(new CacheOperation("MemoryFileSystemCache.loadCache", MANIFEST)) + .add(new CacheOperation("FileSystemCache.cacheStream", MANIFEST)) + .add(new CacheOperation("MemoryFileSystemCache.loadCache", DATA)) + .add(new CacheOperation("FileSystemCache.cacheInput", DATA)) + .build()); + } + + @Test + public void testJoin() + { + assertUpdate("CREATE TABLE test_join_t1 AS SELECT 2 AS age, 'id1' AS id", 1); + assertUpdate("CREATE TABLE test_join_t2 AS SELECT 'name1' AS name, 'id1' AS id", 1); + + assertFileSystemAccesses("SELECT name, age FROM test_join_t1 JOIN test_join_t2 ON test_join_t2.id = test_join_t1.id", + ImmutableMultiset.builder() + .addCopies(new CacheOperation("MemoryFileSystemCache.loadCache", METADATA_JSON), 2) + .addCopies(new CacheOperation("FileSystemCache.cacheStream", METADATA_JSON), 2) + .addCopies(new CacheOperation("FileSystemCache.cacheStream", SNAPSHOT), 2) + .addCopies(new CacheOperation("FileSystemCache.cacheLength", SNAPSHOT), 2) + .addCopies(new CacheOperation("MemoryFileSystemCache.loadCache", MANIFEST), 2) + .addCopies(new CacheOperation("FileSystemCache.cacheStream", MANIFEST), 4) + .addCopies(new CacheOperation("MemoryFileSystemCache.loadCache", DATA), 2) + .addCopies(new CacheOperation("FileSystemCache.cacheInput", DATA), 2) + .build()); + } + + private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedCacheAccesses) + { + DistributedQueryRunner queryRunner = getDistributedQueryRunner(); + queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); + assertMultisetsEqual(expectedCacheAccesses, getCacheOperations()); + } + + private Multiset getCacheOperations() + { + return getQueryRunner().getSpans().stream() + .filter(span -> span.getName().startsWith("FileSystemCache.") || span.getName().startsWith("MemoryFileSystemCache.")) + .filter(span -> !isTrinoSchemaOrPermissions(requireNonNull(span.getAttributes().get(CACHE_FILE_LOCATION)))) + .map(span -> CacheOperation.create(span.getName(), span.getAttributes())) + .collect(toCollection(HashMultiset::create)); + } + + private record CacheOperation(String operationName, FileOperationUtils.FileType fileType) + { + public static CacheOperation create(String operationName, Attributes attributes) + { + String path = requireNonNull(attributes.get(CACHE_FILE_LOCATION)); + return new CacheOperation(operationName, FileOperationUtils.FileType.fromFilePath(path)); + } + } + + private static boolean isTrinoSchemaOrPermissions(String path) + { + return path.endsWith(".trinoSchema") || path.contains(".trinoPermissions"); + } +}