Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/operation.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,33 @@ taking a long time for garbage collection.
completed initialization and is ready to serve requests. This means the initial
connection to the database and the first round of health check on Trino clusters
are completed. Otherwise, status code 503 is returned.

## Database cache configuration

Trino Gateway can cache database queries to improve performance and reduce load
Comment thread
oneonestar marked this conversation as resolved.
on the backend database. This also allow gateway to continue routing queries
when the database is temporarily unavailable. Currently only the list of backend
Trino clusters used for query routing are being cached.
The cache can be configured using the `databaseCache` section in the config file.

```yaml
databaseCache:
enabled: true
expireAfterWrite: 60m
refreshAfterWrite: 5s
```

Configuration options:

* `enabled` - Enable or disable the database cache. Default is `false`.
* `expireAfterWrite` - The maximum time a cached entry is kept since it was last
loaded or refreshed. This ensures stale data is eventually removed.
If cache is not refreshed before expiration, requests will fail once the entry
Comment thread
oneonestar marked this conversation as resolved.
expires (i.e. cache miss will attempt to reload data, but if the database is unavailable,
the request fails because there is no stale value to fall back to after
expiration).
* `refreshAfterWrite` - Duration after which cache entries are eligible for
asynchronous refresh. When a refresh is triggered, the existing cached value
continues to be served while the refresh happens in the background.
This helps keep data fresh while serving slightly stale data to avoid blocking requests.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.gateway.ha.clustermonitor.ClusterMetricsStatsExporter;
import io.trino.gateway.ha.clustermonitor.ForMonitor;
import io.trino.gateway.ha.config.DataStoreConfiguration;
import io.trino.gateway.ha.config.DatabaseCacheConfiguration;
import io.trino.gateway.ha.config.HaGatewayConfiguration;
import io.trino.gateway.ha.config.MonitorConfiguration;
import io.trino.gateway.ha.config.RoutingConfiguration;
Expand Down Expand Up @@ -126,6 +127,7 @@ public void configure(Binder binder)
binder.bind(RoutingConfiguration.class).toInstance(configuration.getRouting());
binder.bind(DataStoreConfiguration.class).toInstance(configuration.getDataStore());
binder.bind(MonitorConfiguration.class).toInstance(configuration.getMonitor());
binder.bind(DatabaseCacheConfiguration.class).toInstance(configuration.getDatabaseCache());
registerAuthFilters(binder);
registerResources(binder);
registerProxyResources(binder);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.config;

import io.airlift.units.Duration;

import java.util.concurrent.TimeUnit;

public class DatabaseCacheConfiguration
{
private boolean enabled;
private Duration expireAfterWrite = Duration.succinctDuration(60, TimeUnit.MINUTES);
private Duration refreshAfterWrite = Duration.succinctDuration(5, TimeUnit.SECONDS);
Comment on lines +23 to +24
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private Duration expireAfterWrite = Duration.succinctDuration(60, TimeUnit.MINUTES);
private Duration refreshAfterWrite = Duration.succinctDuration(5, TimeUnit.SECONDS);
private Duration expireAfterWrite = Duration.succinctDuration(1, HOURS);
private Duration refreshAfterWrite = Duration.succinctDuration(5, SECONDS);


public boolean isEnabled()
{
return enabled;
}

public void setEnabled(boolean enabled)
{
this.enabled = enabled;
}

public Duration getExpireAfterWrite()
{
return expireAfterWrite;
}

public void setExpireAfterWrite(Duration expireAfterWrite)
{
this.expireAfterWrite = expireAfterWrite;
}

public Duration getRefreshAfterWrite()
{
return refreshAfterWrite;
}

public void setRefreshAfterWrite(Duration refreshAfterWrite)
{
this.refreshAfterWrite = refreshAfterWrite;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ public class HaGatewayConfiguration
private List<String> statementPaths = ImmutableList.of(V1_STATEMENT_PATH);
private boolean includeClusterHostInResponse;
private ProxyResponseConfiguration proxyResponseConfiguration = new ProxyResponseConfiguration();

private RequestAnalyzerConfig requestAnalyzerConfig = new RequestAnalyzerConfig();

private UIConfiguration uiConfiguration = new UIConfiguration();
private DatabaseCacheConfiguration databaseCache = new DatabaseCacheConfiguration();

// List of Modules with FQCN (Fully Qualified Class Name)
private List<String> modules;
Expand Down Expand Up @@ -267,6 +266,16 @@ public void setProxyResponseConfiguration(ProxyResponseConfiguration proxyRespon
this.proxyResponseConfiguration = proxyResponseConfiguration;
}

public DatabaseCacheConfiguration getDatabaseCache()
{
return databaseCache;
}

public void setDatabaseCache(DatabaseCacheConfiguration databaseCache)
{
this.databaseCache = databaseCache;
}

private void validateStatementPath(String statementPath, List<String> statementPaths)
{
if (statementPath.startsWith(V1_STATEMENT_PATH) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,6 @@ public interface GatewayBackendDao
@SqlQuery("SELECT * FROM gateway_backend")
List<GatewayBackend> findAll();

@SqlQuery("""
SELECT * FROM gateway_backend
WHERE active = true
""")
List<GatewayBackend> findActiveBackend();

@SqlQuery("""
SELECT * FROM gateway_backend
WHERE active = true AND routing_group = :routingGroup
""")
List<GatewayBackend> findActiveBackendByRoutingGroup(String routingGroup);

@SqlQuery("""
SELECT * FROM gateway_backend
WHERE name = :name
""")
List<GatewayBackend> findByName(String name);

@SqlQuery("""
SELECT * FROM gateway_backend
WHERE name = :name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@
*/
package io.trino.gateway.ha.router;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Ticker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
import io.trino.gateway.ha.config.DatabaseCacheConfiguration;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import io.trino.gateway.ha.config.RoutingConfiguration;
import io.trino.gateway.ha.persistence.dao.GatewayBackend;
Expand All @@ -28,34 +34,111 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class HaGatewayManager
implements GatewayBackendManager
{
private static final Logger log = Logger.get(HaGatewayManager.class);
private static final Object ALL_BACKEND_CACHE_KEY = new Object();

private final GatewayBackendDao dao;
private final String defaultRoutingGroup;
private final boolean cacheEnabled;
private final LoadingCache<Object, List<GatewayBackend>> backendCache;

private final CounterStat backendLookupSuccesses = new CounterStat();
private final CounterStat backendLookupFailures = new CounterStat();

@Inject
public HaGatewayManager(Jdbi jdbi, RoutingConfiguration routingConfiguration)
public HaGatewayManager(Jdbi jdbi, RoutingConfiguration routingConfiguration, DatabaseCacheConfiguration databaseCacheConfiguration)
{
this(jdbi, routingConfiguration, databaseCacheConfiguration, Ticker.systemTicker());
}

@VisibleForTesting
public HaGatewayManager(Jdbi jdbi, RoutingConfiguration routingConfiguration, DatabaseCacheConfiguration databaseCacheConfiguration, Ticker ticker)
{
dao = requireNonNull(jdbi, "jdbi is null").onDemand(GatewayBackendDao.class);
this.defaultRoutingGroup = routingConfiguration.getDefaultRoutingGroup();
defaultRoutingGroup = routingConfiguration.getDefaultRoutingGroup();
cacheEnabled = databaseCacheConfiguration.isEnabled();
if (cacheEnabled) {
Caffeine<Object, Object> caffeineBuilder = Caffeine.newBuilder()
.initialCapacity(1)
.ticker(ticker);
if (databaseCacheConfiguration.getExpireAfterWrite() != null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this getter return null when cacheEnabled is true?

caffeineBuilder = caffeineBuilder.expireAfterWrite(databaseCacheConfiguration.getExpireAfterWrite().toJavaTime());
}
if (databaseCacheConfiguration.getRefreshAfterWrite() != null) {
caffeineBuilder = caffeineBuilder.refreshAfterWrite(databaseCacheConfiguration.getRefreshAfterWrite().toJavaTime());
}
backendCache = caffeineBuilder.build(this::fetchAllBackends);

// Load the data once during initialization. This ensures a fail-fast behavior in case of database misconfiguration.
try {
List<GatewayBackend> _ = backendCache.get(ALL_BACKEND_CACHE_KEY);
}
catch (Exception e) {
throw new RuntimeException("Failed to warm up backend cache", e);
}
}
else {
backendCache = null;
}
}

private List<GatewayBackend> fetchAllBackends(Object ignored)
{
try {
List<GatewayBackend> backends = dao.findAll();
backendLookupSuccesses.update(1);
return backends;
}
catch (Exception e) {
backendLookupFailures.update(1);
log.warn(e, "Failed to fetch backends");
throw e;
Comment thread
oneonestar marked this conversation as resolved.
}
}

private void invalidateBackendCache()
{
if (cacheEnabled) {
// Avoid using bulk invalidation like invalidateAll(), in order to invalidate in-flight loads properly.
// See https://github.com/trinodb/trino/issues/10512#issuecomment-1016398117
backendCache.invalidate(ALL_BACKEND_CACHE_KEY);
}
}

private List<GatewayBackend> getAllBackendsInternal()
{
if (cacheEnabled) {
try {
return backendCache.get(ALL_BACKEND_CACHE_KEY);
}
catch (Exception e) {
throw new RuntimeException("Failed to load backends from database to cache", e);
}
}
else {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove redundant else.

https://github.com/trinodb/trino/blob/master/.github/DEVELOPMENT.md#additional-ide-configuration

Java | Control flow issues | Redundant 'else' (including Report when there are no more statements after the 'if' statement option),

return fetchAllBackends(ALL_BACKEND_CACHE_KEY);
}
}

@Override
public List<ProxyBackendConfiguration> getAllBackends()
{
List<GatewayBackend> proxyBackendList = dao.findAll();
List<GatewayBackend> proxyBackendList = getAllBackendsInternal();
return upcast(proxyBackendList);
}

@Override
public List<ProxyBackendConfiguration> getAllActiveBackends()
{
List<GatewayBackend> proxyBackendList = dao.findActiveBackend();
List<GatewayBackend> proxyBackendList = getAllBackendsInternal().stream()
.filter(GatewayBackend::active)
.collect(toImmutableList());
return upcast(proxyBackendList);
}

Expand All @@ -74,14 +157,19 @@ public List<ProxyBackendConfiguration> getActiveDefaultBackends()
@Override
public List<ProxyBackendConfiguration> getActiveBackends(String routingGroup)
{
List<GatewayBackend> proxyBackendList = dao.findActiveBackendByRoutingGroup(routingGroup);
List<GatewayBackend> proxyBackendList = getAllBackendsInternal().stream()
.filter(GatewayBackend::active)
.filter(backend -> backend.routingGroup().equals(routingGroup))
.collect(toImmutableList());
return upcast(proxyBackendList);
}

@Override
public Optional<ProxyBackendConfiguration> getBackendByName(String name)
{
List<GatewayBackend> proxyBackendList = dao.findByName(name);
List<GatewayBackend> proxyBackendList = getAllBackendsInternal().stream()
.filter(backend -> backend.name().equals(name))
.collect(toImmutableList());
return upcast(proxyBackendList).stream().findAny();
}

Expand All @@ -105,6 +193,7 @@ private void updateClusterActivationStatus(String clusterName, boolean newStatus
boolean previousStatus = model.active();
changeActiveStatus.run();
logActivationStatusChange(clusterName, newStatus, previousStatus);
invalidateBackendCache();
}

private static void logActivationStatusChange(String clusterName, boolean newStatus, boolean previousStatus)
Expand All @@ -121,6 +210,7 @@ public ProxyBackendConfiguration addBackend(ProxyBackendConfiguration backend)
String backendProxyTo = removeTrailingSlash(backend.getProxyTo());
String backendExternalUrl = removeTrailingSlash(backend.getExternalUrl());
dao.create(backend.getName(), backend.getRoutingGroup(), backendProxyTo, backendExternalUrl, backend.isActive());
invalidateBackendCache();
return backend;
}

Expand All @@ -138,6 +228,7 @@ public ProxyBackendConfiguration updateBackend(ProxyBackendConfiguration backend
dao.update(backend.getName(), backend.getRoutingGroup(), backendProxyTo, backendExternalUrl, backend.isActive());
logActivationStatusChange(backend.getName(), backend.isActive(), model.active());
}
invalidateBackendCache();
return backend;
}

Expand All @@ -152,6 +243,7 @@ private static void validateBackendConfiguration(ProxyBackendConfiguration backe
public void deleteBackend(String name)
{
dao.deleteByName(name);
invalidateBackendCache();
}

private static List<ProxyBackendConfiguration> upcast(List<GatewayBackend> gatewayBackendList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.jdbi.v3.core.Jdbi;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

public final class TestingJdbcConnectionManager
Expand All @@ -39,4 +41,25 @@ public static JdbcConnectionManager createTestingJdbcConnectionManager(DataStore
Jdbi jdbi = HaGatewayProviderModule.createJdbi(config);
return new JdbcConnectionManager(jdbi, config);
}

public static void destroyTestingDatabase(DataStoreConfiguration config)
{
String tempH2DbDirPath = config.getJdbcUrl().replace("jdbc:h2:", "").replace(";NON_KEYWORDS=NAME,VALUE", "");
File dbFile = Path.of(tempH2DbDirPath).toFile();
File parentDir = dbFile.getParentFile();

if (parentDir != null && parentDir.exists()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destroyTestingDatabase is called from single place. Why do we need this condition? Could you leave a code comment?

File[] files = parentDir.listFiles((dir, name) -> name.startsWith(dbFile.getName()));
if (files != null) {
for (File file : files) {
try {
Files.deleteIfExists(file.toPath());
}
catch (IOException e) {
// Ignore deletion errors in test cleanup
}
}
}
}
}
}
Loading