Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions presto-base-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@
<artifactId>jackson-annotations</artifactId>
</dependency>

<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
</dependency>

<!-- for testing -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@
public class JdbcMetadata
implements ConnectorMetadata
{
private final JdbcMetadataCache jdbcMetadataCache;
private final JdbcClient jdbcClient;
private final boolean allowDropTable;

private final AtomicReference<Runnable> rollbackAction = new AtomicReference<>();

public JdbcMetadata(JdbcClient jdbcClient, boolean allowDropTable)
public JdbcMetadata(JdbcMetadataCache jdbcMetadataCache, JdbcClient jdbcClient, boolean allowDropTable)
{
this.jdbcMetadataCache = requireNonNull(jdbcMetadataCache, "jdbcMetadataCache is null");
this.jdbcClient = requireNonNull(jdbcClient, "client is null");
this.allowDropTable = allowDropTable;
}
Expand All @@ -78,7 +80,7 @@ public List<String> listSchemaNames(ConnectorSession session)
@Override
public JdbcTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
{
return jdbcClient.getTableHandle(session, JdbcIdentity.from(session), tableName);
return jdbcMetadataCache.getTableHandle(session, tableName);
}

@Override
Expand All @@ -101,7 +103,7 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
JdbcTableHandle handle = (JdbcTableHandle) table;

ImmutableList.Builder<ColumnMetadata> columnMetadata = ImmutableList.builder();
for (JdbcColumnHandle column : jdbcClient.getColumns(session, handle)) {
for (JdbcColumnHandle column : jdbcMetadataCache.getColumns(session, handle)) {
columnMetadata.add(column.getColumnMetadata());
}
return new ConnectorTableMetadata(handle.getSchemaTableName(), columnMetadata.build());
Expand All @@ -119,7 +121,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
JdbcTableHandle jdbcTableHandle = (JdbcTableHandle) tableHandle;

ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
for (JdbcColumnHandle column : jdbcClient.getColumns(session, jdbcTableHandle)) {
for (JdbcColumnHandle column : jdbcMetadataCache.getColumns(session, jdbcTableHandle)) {
columnHandles.put(column.getColumnMetadata().getName(), column);
}
return columnHandles.build();
Expand All @@ -138,7 +140,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
}
for (SchemaTableName tableName : tables) {
try {
JdbcTableHandle tableHandle = jdbcClient.getTableHandle(session, JdbcIdentity.from(session), tableName);
JdbcTableHandle tableHandle = jdbcMetadataCache.getTableHandle(session, tableName);
if (tableHandle == null) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;

import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.UncheckedExecutionException;

import javax.inject.Inject;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.cache.CacheLoader.asyncReloading;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class JdbcMetadataCache
{
private final JdbcClient jdbcClient;

private final LoadingCache<KeyAndSession<SchemaTableName>, Optional<JdbcTableHandle>> tableHandleCache;
private final LoadingCache<KeyAndSession<JdbcTableHandle>, List<JdbcColumnHandle>> columnHandlesCache;

@Inject
public JdbcMetadataCache(JdbcClient jdbcClient, JdbcMetadataConfig config, JdbcMetadataCacheStats stats)
{
this(
newCachedThreadPool(daemonThreadsNamed("jdbc-metadata-cache" + "-%s")),
jdbcClient,
stats,
OptionalLong.of(config.getMetadataCacheTtl().toMillis()),
config.getMetadataCacheRefreshInterval().toMillis() >= config.getMetadataCacheTtl().toMillis() ? OptionalLong.empty() : OptionalLong.of(config.getMetadataCacheRefreshInterval().toMillis()),
config.getMetadataCacheMaximumSize());
}

public JdbcMetadataCache(
ExecutorService executor,
JdbcClient jdbcClient,
JdbcMetadataCacheStats stats,
OptionalLong cacheTtl,
OptionalLong refreshInterval,
long cacheMaximumSize)
{
this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");

this.tableHandleCache = newCacheBuilder(cacheTtl, refreshInterval, cacheMaximumSize)
.build(asyncReloading(CacheLoader.from(this::loadTableHandle), executor));
stats.setTableHandleCache(tableHandleCache);

this.columnHandlesCache = newCacheBuilder(cacheTtl, refreshInterval, cacheMaximumSize)
.build(asyncReloading(CacheLoader.from(this::loadColumnHandles), executor));
stats.setColumnHandlesCache(columnHandlesCache);
}

public JdbcTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
{
return get(tableHandleCache, new KeyAndSession<>(session, tableName)).orElse(null);
}

public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHandle jdbcTableHandle)
{
return get(columnHandlesCache, new KeyAndSession<>(session, jdbcTableHandle));
}

private Optional<JdbcTableHandle> loadTableHandle(KeyAndSession<SchemaTableName> tableName)
{
// The returned tableHandle can be null if it does not contain the table
return Optional.ofNullable(jdbcClient.getTableHandle(tableName.getSession(), JdbcIdentity.from(tableName.getSession()), tableName.getKey()));
}

private List<JdbcColumnHandle> loadColumnHandles(KeyAndSession<JdbcTableHandle> tableHandle)
{
return jdbcClient.getColumns(tableHandle.getSession(), tableHandle.getKey());
}

private static CacheBuilder<Object, Object> newCacheBuilder(OptionalLong expiresAfterWriteMillis, OptionalLong refreshMillis, long maximumSize)
{
CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder();
if (expiresAfterWriteMillis.isPresent()) {
cacheBuilder = cacheBuilder.expireAfterWrite(expiresAfterWriteMillis.getAsLong(), MILLISECONDS);
}
if (refreshMillis.isPresent() && (!expiresAfterWriteMillis.isPresent() || expiresAfterWriteMillis.getAsLong() > refreshMillis.getAsLong())) {
cacheBuilder = cacheBuilder.refreshAfterWrite(refreshMillis.getAsLong(), MILLISECONDS);
}
return cacheBuilder.maximumSize(maximumSize).recordStats();
}

private static <K, V> V get(LoadingCache<K, V> cache, K key)
{
try {
return cache.getUnchecked(key);
}
catch (UncheckedExecutionException e) {
throwIfInstanceOf(e.getCause(), PrestoException.class);
throw e;
}
}

private static class KeyAndSession<T>
{
private final ConnectorSession session;
private final T key;

public KeyAndSession(ConnectorSession session, T key)
{
this.session = requireNonNull(session, "session is null");
this.key = requireNonNull(key, "key is null");
}

public ConnectorSession getSession()
{
return session;
}

public T getKey()
{
return key;
}

// Session object changes for every query. For caching to be effective across multiple queries,
// we should NOT include session in equals() and hashCode() methods below.
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KeyAndSession<?> other = (KeyAndSession<?>) o;
return Objects.equals(key, other.key);
}

@Override
public int hashCode()
{
return Objects.hash(key);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("session", session)
.add("key", key)
.toString();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.plugin.jdbc;

import com.google.common.cache.LoadingCache;
import org.weakref.jmx.Managed;

public class JdbcMetadataCacheStats
{
private LoadingCache<?, ?> tableHandleCache;
private LoadingCache<?, ?> columnHandlesCache;

public void setTableHandleCache(LoadingCache<?, ?> tableHandleCache)
{
this.tableHandleCache = tableHandleCache;
}

public void setColumnHandlesCache(LoadingCache<?, ?> columnHandlesCache)
{
this.columnHandlesCache = columnHandlesCache;
}

@Managed
public long getTableHandleCacheHit()
{
return tableHandleCache.stats().hitCount();
}

@Managed
public long getTableHandleCacheMiss()
{
return tableHandleCache.stats().missCount();
}

@Managed
public long getTableHandleCacheEviction()
{
return tableHandleCache.stats().evictionCount();
}

@Managed
public long getTableHandleCacheSize()
{
return tableHandleCache.size();
}

@Managed
public long getColumnHandlesCacheHit()
{
return columnHandlesCache.stats().hitCount();
}

@Managed
public long getColumnHandlesCacheMiss()
{
return columnHandlesCache.stats().missCount();
}

@Managed
public long getColumnHandlesCacheEviction()
{
return columnHandlesCache.stats().evictionCount();
}

@Managed
public long getColumnHandlesCacheSize()
{
return columnHandlesCache.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,20 @@

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import java.util.concurrent.TimeUnit;

public class JdbcMetadataConfig
{
private boolean allowDropTable;
private Duration metadataCacheTtl = new Duration(0, TimeUnit.SECONDS);
private Duration metadataCacheRefreshInterval = new Duration(0, TimeUnit.SECONDS);
private long metadataCacheMaximumSize = 10000;

public boolean isAllowDropTable()
{
Expand All @@ -32,4 +42,45 @@ public JdbcMetadataConfig setAllowDropTable(boolean allowDropTable)
this.allowDropTable = allowDropTable;
return this;
}

@NotNull
public Duration getMetadataCacheTtl()
{
return metadataCacheTtl;
}

@MinDuration("0ms")
@Config("metadata-cache-ttl")
public JdbcMetadataConfig setMetadataCacheTtl(Duration metadataCacheTtl)
{
this.metadataCacheTtl = metadataCacheTtl;
return this;
}

@NotNull
public Duration getMetadataCacheRefreshInterval()
{
return metadataCacheRefreshInterval;
}

@MinDuration("1ms")
@Config("metadata-cache-refresh-interval")
public JdbcMetadataConfig setMetadataCacheRefreshInterval(Duration metadataCacheRefreshInterval)
{
this.metadataCacheRefreshInterval = metadataCacheRefreshInterval;
return this;
}

public long getMetadataCacheMaximumSize()
{
return metadataCacheMaximumSize;
}

@Min(1)
@Config("metadata-cache-maximum-size")
public JdbcMetadataConfig setMetadataCacheMaximumSize(long metadataCacheMaximumSize)
{
this.metadataCacheMaximumSize = metadataCacheMaximumSize;
return this;
}
}
Loading