Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ implementation is used:
creation of more data files, since it uses the append operation to insert
the new records.
- `true`
* - `iceberg.metadata-cache.enabled`
- Set to `false` to disable in-memory caching of metadata files on the
coordinator. This cache is not used when `fs.cache.enabled` is set to true.
- `true`

:::

(iceberg-fte-support)=
Expand Down
10 changes: 0 additions & 10 deletions lib/trino-filesystem-cache-alluxio/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,6 @@
<artifactId>opentelemetry-api</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import jakarta.annotation.PreDestroy;

import java.io.IOException;
import java.util.Collection;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -71,12 +72,25 @@ public TrinoInputStream cacheStream(TrinoInputFile delegate, String key)
return new AlluxioInputStream(tracer, delegate, key, uriStatus(delegate, key), new TracingCacheManager(tracer, key, pageSize, cacheManager), config, statistics);
}

@Override
public long cacheLength(TrinoInputFile delegate, String key)
throws IOException
{
return delegate.length();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How frequently cacheLength is called?

Seems like length could be important to cache. @jkylling could alluxio provide an implementation of this method?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the context of iceberg metadata files, we need it only because SNAPSHOT file lengths are not known in advance and iceberg library uses file length to open it's avro reader.
Having it in the cache helps for subsequent queries.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can tell the CacheManager interface we use from Alluxio does not have a way to get the file length.

}

@Override
public void expire(Location source)
throws IOException
{
}

@Override
public void expire(Collection<Location> locations)
throws IOException
{
}

@PreDestroy
public void shutdown()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import java.io.EOFException;
import java.io.IOException;

import static io.trino.filesystem.alluxio.AlluxioTracing.withTracing;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_LOCATION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_KEY;
import static io.trino.filesystem.tracing.Tracing.withTracing;
import static java.lang.Math.min;
import static java.util.Objects.checkFromIndexSize;
import static java.util.Objects.requireNonNull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import java.nio.ByteBuffer;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.filesystem.alluxio.AlluxioTracing.withTracing;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_LOCATION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_SIZE;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_KEY;
import static io.trino.filesystem.tracing.Tracing.withTracing;
import static java.lang.Integer.max;
import static java.lang.Math.addExact;
import static java.lang.Math.min;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@

import static com.google.common.base.Verify.verify;
import static com.google.common.primitives.Ints.saturatedCast;
import static io.trino.filesystem.alluxio.AlluxioTracing.withTracing;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_LOCATION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_KEY;
import static io.trino.filesystem.tracing.Tracing.withTracing;
import static java.lang.Integer.max;
import static java.lang.Math.addExact;
import static java.lang.Math.min;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

import static io.trino.filesystem.alluxio.AlluxioTracing.withTracing;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_READ_SIZE;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_POSITION;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_FILE_WRITE_SIZE;
import static io.trino.filesystem.tracing.CacheSystemAttributes.CACHE_KEY;
import static io.trino.filesystem.tracing.Tracing.withTracing;
import static java.util.Objects.requireNonNull;

public class TracingCacheManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import io.trino.filesystem.cache.TrinoFileSystemCache;
import io.trino.filesystem.gcs.GcsFileSystemFactory;
import io.trino.filesystem.gcs.GcsFileSystemModule;
import io.trino.filesystem.memory.MemoryFileSystemCache;
import io.trino.filesystem.memory.MemoryFileSystemCacheModule;
import io.trino.filesystem.s3.FileSystemS3;
import io.trino.filesystem.s3.S3FileSystemModule;
import io.trino.filesystem.switching.SwitchingFileSystemFactory;
Expand All @@ -54,12 +56,14 @@ public class FileSystemModule
private final String catalogName;
private final NodeManager nodeManager;
private final OpenTelemetry openTelemetry;
private final boolean coordinatorFileCaching;

public FileSystemModule(String catalogName, NodeManager nodeManager, OpenTelemetry openTelemetry)
public FileSystemModule(String catalogName, NodeManager nodeManager, OpenTelemetry openTelemetry, boolean coordinatorFileCaching)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.coordinatorFileCaching = coordinatorFileCaching;
}

@Override
Expand Down Expand Up @@ -107,9 +111,14 @@ protected void setup(Binder binder)
newOptionalBinder(binder, CacheKeyProvider.class).setDefault().to(DefaultCacheKeyProvider.class).in(Scopes.SINGLETON);

newOptionalBinder(binder, TrinoFileSystemCache.class);
newOptionalBinder(binder, MemoryFileSystemCache.class);

boolean isCoordinator = nodeManager.getCurrentNode().isCoordinator();
if (config.isCacheEnabled()) {
install(new AlluxioFileSystemCacheModule(nodeManager.getCurrentNode().isCoordinator()));
install(new AlluxioFileSystemCacheModule(isCoordinator));
}
if (coordinatorFileCaching) {
install(new MemoryFileSystemCacheModule(isCoordinator));
}
}

Expand All @@ -119,6 +128,7 @@ static TrinoFileSystemFactory createFileSystemFactory(
Optional<HdfsFileSystemLoader> hdfsFileSystemLoader,
Map<String, TrinoFileSystemFactory> factories,
Optional<TrinoFileSystemCache> fileSystemCache,
Optional<MemoryFileSystemCache> memoryFileSystemCache,
Optional<CacheKeyProvider> keyProvider,
Tracer tracer)
{
Expand All @@ -133,6 +143,10 @@ static TrinoFileSystemFactory createFileSystemFactory(
if (fileSystemCache.isPresent()) {
delegate = new CacheFileSystemFactory(tracer, delegate, fileSystemCache.orElseThrow(), keyProvider.orElseThrow());
}
// use MemoryFileSystemCache only when no other TrinoFileSystemCache is configured
else if (memoryFileSystemCache.isPresent()) {
delegate = new CacheFileSystemFactory(tracer, delegate, memoryFileSystemCache.orElseThrow(), keyProvider.orElseThrow());
}
return new TracingFileSystemFactory(tracer, delegate);
}
}
26 changes: 26 additions & 0 deletions lib/trino-filesystem/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
<artifactId>slice</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
Expand All @@ -68,6 +73,11 @@
<artifactId>opentelemetry-semconv</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-cache</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-memory-context</artifactId>
Expand All @@ -83,6 +93,16 @@
<artifactId>jakarta.annotation-api</artifactId>
</dependency>

<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
</dependency>

<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
</dependency>

<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
Expand All @@ -107,6 +127,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>tracing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;

import static java.util.Objects.requireNonNull;
Expand All @@ -44,13 +45,13 @@ public CacheFileSystem(TrinoFileSystem delegate, TrinoFileSystemCache cache, Cac
@Override
public TrinoInputFile newInputFile(Location location)
{
return new CacheInputFile(delegate.newInputFile(location), cache, keyProvider);
return new CacheInputFile(delegate.newInputFile(location), cache, keyProvider, OptionalLong.empty());
}

@Override
public TrinoInputFile newInputFile(Location location, long length)
{
return new CacheInputFile(delegate.newInputFile(location, length), cache, keyProvider);
return new CacheInputFile(delegate.newInputFile(location, length), cache, keyProvider, OptionalLong.of(length));
}

@Override
Expand Down Expand Up @@ -79,6 +80,7 @@ public void deleteDirectory(Location location)
throws IOException
{
delegate.deleteDirectory(location);
cache.expire(location);
}

@Override
Expand Down Expand Up @@ -137,8 +139,6 @@ public void deleteFiles(Collection<Location> locations)
throws IOException
{
delegate.deleteFiles(locations);
for (var location : locations) {
cache.expire(location);
}
cache.expire(locations);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.time.Instant;
import java.util.Optional;
import java.util.OptionalLong;

import static java.util.Objects.requireNonNull;

Expand All @@ -30,12 +31,14 @@ public final class CacheInputFile
private final TrinoInputFile delegate;
private final TrinoFileSystemCache cache;
private final CacheKeyProvider keyProvider;
private OptionalLong length;

public CacheInputFile(TrinoInputFile delegate, TrinoFileSystemCache cache, CacheKeyProvider keyProvider)
public CacheInputFile(TrinoInputFile delegate, TrinoFileSystemCache cache, CacheKeyProvider keyProvider, OptionalLong length)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.cache = requireNonNull(cache, "cache is null");
this.keyProvider = requireNonNull(keyProvider, "keyProvider is null");
this.length = requireNonNull(length, "length is null");
}

@Override
Expand Down Expand Up @@ -64,7 +67,16 @@ public TrinoInputStream newStream()
public long length()
throws IOException
{
return delegate.length();
if (length.isEmpty()) {
Optional<String> key = keyProvider.getCacheKey(delegate);
if (key.isPresent()) {
length = OptionalLong.of(cache.cacheLength(delegate, key.orElseThrow()));
}
else {
length = OptionalLong.of(delegate.length());
}
}
return length.getAsLong();
}

@Override
Expand Down
Loading