Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ jobs:
!:trino-faulttolerant-tests,
!:trino-filesystem,
!:trino-filesystem-alluxio,
!:trino-filesystem-cache-alluxio,
!:trino-blob-cache-alluxio,
!:trino-blob-cache-memory,
!:trino-filesystem-azure,
!:trino-filesystem-gcs,
!:trino-filesystem-manager,
Expand Down Expand Up @@ -466,12 +467,13 @@ jobs:
- lib/trino-filesystem
- lib/trino-filesystem-azure
- lib/trino-filesystem-alluxio
- lib/trino-filesystem-cache-alluxio
- lib/trino-filesystem-gcs
- lib/trino-filesystem-manager
- lib/trino-filesystem-s3
- lib/trino-hdfs
- lib/trino-hive-formats
- plugin/trino-blob-cache-alluxio
- plugin/trino-blob-cache-memory
- { modules: core/trino-main }
- { modules: lib/trino-filesystem-azure, profile: cloud-tests }
- { modules: lib/trino-filesystem-gcs, profile: cloud-tests }
Expand Down
3 changes: 3 additions & 0 deletions core/docker/default/etc/cache-manager-memory.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
cache-manager.name=memory
fs.memory-cache.max-size=2GB
fs.memory-cache.ttl=24h
1 change: 1 addition & 0 deletions core/docker/default/etc/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ node-scheduler.include-coordinator=true
http-server.http.port=8080
discovery.uri=http://localhost:8080
catalog.management=${ENV:CATALOG_MANAGEMENT}
cache-manager.config-files=etc/cache-manager-memory.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.cache;

import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;
import io.airlift.configuration.validation.FileExists;

import java.io.File;
import java.util.List;
import java.util.stream.Stream;

import static com.google.common.collect.ImmutableList.toImmutableList;

public class CacheManagerConfig
{
private List<File> cacheManagerConfigFiles = ImmutableList.of();

public List<@FileExists File> getCacheManagerConfigFiles()
{
return cacheManagerConfigFiles;
}

@Config("cache-manager.config-files")
public CacheManagerConfig setCacheManagerConfigFiles(String cacheManagerConfigFiles)
{
this.cacheManagerConfigFiles = Stream.of(cacheManagerConfigFiles.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.map(File::new)
.collect(toImmutableList());
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.cache;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.spi.cache.CacheManagerContext;

import static java.util.Objects.requireNonNull;

public class CacheManagerContextInstance
implements CacheManagerContext
{
private final OpenTelemetry openTelemetry;
private final Tracer tracer;

public CacheManagerContextInstance(OpenTelemetry openTelemetry, Tracer tracer)
{
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.tracer = requireNonNull(tracer, "tracer is null");
}

@Override
public OpenTelemetry getOpenTelemetry()
{
return openTelemetry;
}

@Override
public Tracer getTracer()
{
return tracer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.cache;

import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;

import static io.airlift.configuration.ConfigBinder.configBinder;

public class CacheManagerModule
implements Module
{
@Override
public void configure(Binder binder)
{
configBinder(binder).bindConfig(CacheManagerConfig.class);
binder.bind(CacheManagerRegistry.class).in(Scopes.SINGLETON);
}
}
175 changes: 175 additions & 0 deletions core/trino-main/src/main/java/io/trino/cache/CacheManagerRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.cache;

import com.google.inject.Inject;
import io.airlift.configuration.secrets.SecretsResolver;
import io.airlift.log.Logger;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.spi.cache.Blob;
import io.trino.spi.cache.BlobCache;
import io.trino.spi.cache.BlobCacheManager;
import io.trino.spi.cache.BlobCacheManagerFactory;
import io.trino.spi.cache.BlobSource;
import io.trino.spi.cache.CacheKey;
import io.trino.spi.cache.CacheTier;
import io.trino.spi.cache.ConnectorCacheFactory;
import io.trino.spi.cache.PassThroughBlob;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.classloader.ThreadContextClassLoader;
import jakarta.annotation.PreDestroy;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static io.airlift.configuration.ConfigurationLoader.loadPropertiesFrom;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class CacheManagerRegistry
{
private static final Logger log = Logger.get(CacheManagerRegistry.class);

private static final String CACHE_MANAGER_NAME_PROPERTY = "cache-manager.name";

private final OpenTelemetry openTelemetry;
private final Tracer tracer;
private final SecretsResolver secretsResolver;
private final List<File> configFiles;

private final Map<String, BlobCacheManagerFactory> blobCacheFactories = new ConcurrentHashMap<>();
private final Map<CacheTier, BlobCacheManager> blobCacheManagers = new ConcurrentHashMap<>();

@Inject
public CacheManagerRegistry(
OpenTelemetry openTelemetry,
Tracer tracer,
SecretsResolver secretsResolver,
CacheManagerConfig config)
{
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.tracer = requireNonNull(tracer, "tracer is null");
this.secretsResolver = requireNonNull(secretsResolver, "secretsResolver is null");
this.configFiles = List.copyOf(config.getCacheManagerConfigFiles());
}

public void addBlobCacheManagerFactory(BlobCacheManagerFactory factory)
{
requireNonNull(factory, "factory is null");
if (blobCacheFactories.putIfAbsent(factory.getName(), factory) != null) {
throw new IllegalArgumentException(format("Blob cache manager factory '%s' is already registered", factory.getName()));
}
}

public synchronized void loadCacheManagers()
{
for (File configFile : configFiles) {
Map<String, String> properties = loadProperties(configFile);
String name = properties.remove(CACHE_MANAGER_NAME_PROPERTY);
checkArgument(!isNullOrEmpty(name), "Cache manager configuration %s does not contain %s", configFile, CACHE_MANAGER_NAME_PROPERTY);
loadBlobCacheManager(name, properties);
}
}

public synchronized void loadBlobCacheManager(String name, Map<String, String> properties)
{
log.info("-- Loading blob cache manager %s --", name);

BlobCacheManagerFactory factory = blobCacheFactories.get(name);
checkArgument(factory != null, "Blob cache manager factory '%s' is not registered. Available factories: %s", name, blobCacheFactories.keySet());

CacheTier tier = factory.cacheTier();
if (blobCacheManagers.containsKey(tier)) {
throw new IllegalStateException(format("Blob cache manager for tier %s is already loaded", tier));
}

BlobCacheManager manager;
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(factory.getClass().getClassLoader())) {
manager = factory.create(secretsResolver.getResolvedConfiguration(properties), new CacheManagerContextInstance(openTelemetry, tracer));
}

blobCacheManagers.put(tier, manager);
log.info("-- Loaded blob cache manager %s for tier %s --", name, tier);
}

public ConnectorCacheFactory createConnectorCacheFactory(CatalogName catalog)
{
requireNonNull(catalog, "catalog is null");
return tier -> {
BlobCacheManager manager = blobCacheManagers.get(tier);
if (manager == null) {
log.warn("Catalog %s requested blob cache manager tier %s but none registered, using noop", catalog, tier);
return new NoopBlobCache();
}
log.debug("Created new blob cache on tier %s for catalog %s", tier, catalog);
return manager.createBlobCache(catalog);
};
}

public void drop(CatalogName catalog)
{
for (Map.Entry<CacheTier, BlobCacheManager> entry : blobCacheManagers.entrySet()) {
log.info("Dropping blob cache on tier %s for catalog %s", entry.getKey(), catalog);
entry.getValue().drop(catalog);
}
}

@PreDestroy
public void shutdown()
{
for (Map.Entry<CacheTier, BlobCacheManager> entry : blobCacheManagers.entrySet()) {
try {
entry.getValue().shutdown();
}
catch (Throwable t) {
log.error(t, "Error shutting down blob cache manager for tier %s", entry.getKey());
}
}
}

private static Map<String, String> loadProperties(File configFile)
{
try {
return new HashMap<>(loadPropertiesFrom(configFile.getPath()));
}
catch (IOException e) {
throw new UncheckedIOException("Failed to read configuration file: " + configFile, e);
}
}

private static class NoopBlobCache
implements BlobCache
{
@Override
public Blob get(CacheKey key, BlobSource source)
{
return new PassThroughBlob(source);
}

@Override
public void invalidate(CacheKey key) {}

@Override
public void invalidate(Collection<CacheKey> keys) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.spi.PageIndexerFactory;
import io.trino.spi.PageSorter;
import io.trino.spi.VersionEmbedder;
import io.trino.spi.cache.ConnectorCacheFactory;
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.connector.MetadataProvider;
import io.trino.spi.function.FunctionBundleFactory;
Expand All @@ -38,6 +39,7 @@ public class ConnectorContextInstance
private final PageSorter pageSorter;
private final PageIndexerFactory pageIndexerFactory;
private final FunctionBundleFactory functionBundleFactory;
private final ConnectorCacheFactory cacheFactory;

public ConnectorContextInstance(
OpenTelemetry openTelemetry,
Expand All @@ -48,7 +50,8 @@ public ConnectorContextInstance(
MetadataProvider metadataProvider,
PageSorter pageSorter,
PageIndexerFactory pageIndexerFactory,
FunctionBundleFactory functionBundleFactory)
FunctionBundleFactory functionBundleFactory,
ConnectorCacheFactory cacheFactory)
{
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
this.tracer = requireNonNull(tracer, "tracer is null");
Expand All @@ -59,6 +62,7 @@ public ConnectorContextInstance(
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
this.functionBundleFactory = requireNonNull(functionBundleFactory, "functionBundleFactory is null");
this.cacheFactory = requireNonNull(cacheFactory, "cacheFactory is null");
}

@Override
Expand Down Expand Up @@ -114,4 +118,10 @@ public FunctionBundleFactory getFunctionBundleFactory()
{
return functionBundleFactory;
}

@Override
public ConnectorCacheFactory getCacheFactory()
{
return cacheFactory;
}
}
Loading
Loading