Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.connector.ConnectorServicesProvider.PrunableState;
import io.trino.metadata.CatalogManager;
import io.trino.node.AllNodes;
import io.trino.node.InternalNode;
Expand Down Expand Up @@ -133,12 +134,14 @@ public void pruneCatalogs()
.filter(uri -> !uri.equals(currentNode.getInternalUri()))
.collect(toImmutableSet());

PrunableState prunableState = connectorServicesProvider.getPrunableState();

// send message to workers to trigger prune
Set<CatalogHandle> activeCatalogs = getActiveCatalogs();
pruneWorkerCatalogs(online, activeCatalogs);

// prune inactive catalogs locally
connectorServicesProvider.pruneCatalogs(activeCatalogs);
connectorServicesProvider.pruneCatalogs(prunableState, activeCatalogs);
}

void pruneWorkerCatalogs(Set<URI> online, Set<CatalogHandle> activeCatalogs)
Expand Down Expand Up @@ -172,10 +175,10 @@ public Object handle(Request request, Response response)
private Set<CatalogHandle> getActiveCatalogs()
{
ImmutableSet.Builder<CatalogHandle> activeCatalogs = ImmutableSet.builder();
// all catalogs in an active transaction
transactionManager.getAllTransactionInfos().forEach(info -> activeCatalogs.addAll(info.getActiveCatalogs()));
// all catalogs currently associated with a name
activeCatalogs.addAll(catalogManager.getActiveCatalogs());
activeCatalogs.addAll(catalogManager.getReachableDynamicCatalogs());
// all catalogs that still may be used by ongoing transactions
transactionManager.getAllTransactionInfos().forEach(info -> activeCatalogs.addAll(info.getRegisteredCatalogs()));
return activeCatalogs.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,26 @@ public interface ConnectorServicesProvider

void ensureCatalogsLoaded(List<CatalogProperties> catalogs);

void pruneCatalogs(Set<CatalogHandle> catalogsInUse);
/**
* Returns prunable state to be passed to {@link #pruneCatalogs}.
*/
PrunableState getPrunableState();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it feels too soon to add an abstraction like PrunableState if we only need a snapshot of all catalogs

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i added it not to confuse with Set<CatalogHandle> inUse parameter. Also, i will change the abstraction now a little bit


/**
* Prune catalogs that are no longer in use.
*
* @param prunableState obtained from {@link #getPrunableState}
* @param catalogsInUse catalogs in use observed between obtaining the prunable state and calling this method
*/
void pruneCatalogs(PrunableState prunableState, Set<CatalogHandle> catalogsInUse);

ConnectorServices getConnectorServices(CatalogHandle catalogHandle);

interface PrunableState
{
static PrunableState empty()
{
return new PrunableState() {};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.connector;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
Expand Down Expand Up @@ -47,6 +48,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.connector.CatalogHandle.createRootCatalogHandle;
import static io.trino.metadata.Catalog.failedCatalog;
Expand Down Expand Up @@ -82,7 +84,7 @@ private enum State
/**
* All catalogs including those that have been dropped.
*/
private final ConcurrentMap<CatalogHandle, CatalogConnector> allCatalogs = new ConcurrentHashMap<>();
private final ConcurrentMap<CatalogHandle, RegisteredCatalog> allCatalogs = new ConcurrentHashMap<>();

@GuardedBy("catalogsUpdateLock")
private State state = State.CREATED;
Expand All @@ -98,7 +100,7 @@ public CoordinatorDynamicCatalogManager(CatalogStore catalogStore, CatalogFactor
@PreDestroy
public void stop()
{
List<CatalogConnector> catalogs;
List<RegisteredCatalog> catalogs;

synchronized (catalogsUpdateLock) {
if (state == State.STOPPED) {
Expand All @@ -111,8 +113,8 @@ public void stop()
activeCatalogs.clear();
}

for (CatalogConnector connector : catalogs) {
connector.shutdown();
for (RegisteredCatalog connector : catalogs) {
connector.catalog().shutdown();
}
}

Expand All @@ -136,7 +138,7 @@ public void loadInitialCatalogs()
verify(catalog.name().equals(storedCatalog.name()), "Catalog name does not match catalog properties");
CatalogConnector newCatalog = catalogFactory.createCatalog(catalog);
activeCatalogs.put(storedCatalog.name(), newCatalog.getCatalog());
allCatalogs.put(newCatalog.getCatalogHandle(), newCatalog);
allCatalogs.put(newCatalog.getCatalogHandle(), new RegisteredCatalog(new RegistrationToken(), newCatalog));
log.debug("-- Added catalog %s using connector %s --", storedCatalog.name(), catalog.connectorName());
}
catch (Throwable e) {
Expand Down Expand Up @@ -164,7 +166,7 @@ public Optional<Catalog> getCatalog(CatalogName catalogName)
}

@Override
public Set<CatalogHandle> getActiveCatalogs()
public Set<CatalogHandle> getReachableDynamicCatalogs()
{
return activeCatalogs.values().stream()
.map(Catalog::getCatalogHandle)
Expand All @@ -184,26 +186,37 @@ public void ensureCatalogsLoaded(List<CatalogProperties> catalogs)
}

@Override
public void pruneCatalogs(Set<CatalogHandle> catalogsInUse)
public PrunableState getPrunableState()
{
return new PrunableStateImpl(allCatalogs.entrySet().stream()
.collect(toImmutableMap(entry -> entry.getKey(), entry -> entry.getValue().registrationToken())));
}

@Override
public void pruneCatalogs(PrunableState opaquePrunableState, Set<CatalogHandle> catalogsInUse)
{
PrunableStateImpl prunableState = (PrunableStateImpl) opaquePrunableState;
List<CatalogConnector> removedCatalogs = new ArrayList<>();
synchronized (catalogsUpdateLock) {
if (state == State.STOPPED) {
return;
}
Iterator<Entry<CatalogHandle, CatalogConnector>> iterator = allCatalogs.entrySet().iterator();
Iterator<Entry<CatalogHandle, RegisteredCatalog>> iterator = allCatalogs.entrySet().iterator();
while (iterator.hasNext()) {
Entry<CatalogHandle, CatalogConnector> entry = iterator.next();
Entry<CatalogHandle, RegisteredCatalog> entry = iterator.next();
CatalogHandle catalogHandle = entry.getKey();
RegistrationToken registrationToken = entry.getValue().registrationToken();

Catalog activeCatalog = activeCatalogs.get(entry.getKey().getCatalogName());
if (activeCatalog != null && activeCatalog.getCatalogHandle().equals(entry.getKey())) {
Catalog activeCatalog = activeCatalogs.get(catalogHandle.getCatalogName());
if (activeCatalog != null && activeCatalog.getCatalogHandle().equals(catalogHandle)) {
// catalog is registered with a name, and therefor is available for new queries, and should not be removed
Comment thread
findepi marked this conversation as resolved.
continue;
}

if (!catalogsInUse.contains(entry.getKey())) {
if (registrationToken.equals(prunableState.prunableCatalogs().get(catalogHandle))
&& !catalogsInUse.contains(catalogHandle)) {
iterator.remove();
removedCatalogs.add(entry.getValue());
removedCatalogs.add(entry.getValue().catalog());
}
}
}
Expand All @@ -228,15 +241,16 @@ public void pruneCatalogs(Set<CatalogHandle> catalogsInUse)
public Optional<CatalogProperties> getCatalogProperties(CatalogHandle catalogHandle)
{
return Optional.ofNullable(allCatalogs.get(catalogHandle.getRootCatalogHandle()))
.map(RegisteredCatalog::catalog)
.flatMap(CatalogConnector::getCatalogProperties);
}

@Override
public ConnectorServices getConnectorServices(CatalogHandle catalogHandle)
{
CatalogConnector catalogConnector = allCatalogs.get(catalogHandle.getRootCatalogHandle());
checkArgument(catalogConnector != null, "No catalog '%s'", catalogHandle.getCatalogName());
return catalogConnector.getMaterializedConnector(catalogHandle.getType());
RegisteredCatalog registeredCatalog = allCatalogs.get(catalogHandle.getRootCatalogHandle());
checkArgument(registeredCatalog != null, "No catalog '%s'", catalogHandle.getCatalogName());
return registeredCatalog.catalog().getMaterializedConnector(catalogHandle.getType());
}

@Override
Expand All @@ -259,9 +273,10 @@ public void createCatalog(CatalogName catalogName, ConnectorName connectorName,
CatalogProperties catalogProperties = catalogStore.createCatalogProperties(catalogName, connectorName, properties);

// get or create catalog for the handle
CatalogConnector catalog = allCatalogs.computeIfAbsent(
RegisteredCatalog registeredCatalog = allCatalogs.computeIfAbsent(
createRootCatalogHandle(catalogName, catalogProperties.version()),
_ -> catalogFactory.createCatalog(catalogProperties));
_ -> new RegisteredCatalog(new RegistrationToken(), catalogFactory.createCatalog(catalogProperties)));
CatalogConnector catalog = registeredCatalog.catalog();
catalogStore.addOrReplaceCatalog(catalogProperties);
activeCatalogs.put(catalogName, catalog.getCatalog());

Expand All @@ -282,7 +297,7 @@ public void registerGlobalSystemConnector(GlobalSystemConnector connector)
if (activeCatalogs.putIfAbsent(new CatalogName(GlobalSystemConnector.NAME), catalog.getCatalog()) != null) {
throw new IllegalStateException("Global system catalog already registered");
}
allCatalogs.put(GlobalSystemConnector.CATALOG_HANDLE, catalog);
allCatalogs.put(GlobalSystemConnector.CATALOG_HANDLE, new RegisteredCatalog(new RegistrationToken(), catalog));
}
}

Expand All @@ -305,4 +320,38 @@ public void dropCatalog(CatalogName catalogName, boolean exists)
// Do not shut down the catalog, because there may still be running queries using this catalog.
// Catalog shutdown logic will be added later.
}

record PrunableStateImpl(Map<CatalogHandle, RegistrationToken> prunableCatalogs)
implements PrunableState
{
public PrunableStateImpl
{
prunableCatalogs = ImmutableMap.copyOf(requireNonNull(prunableCatalogs, "prunableCatalogs is null"));
}
}

static class RegistrationToken
{
// identity-based equality
@Override
public boolean equals(Object obj)
{
return super.equals(obj);
}

@Override
public int hashCode()
{
return super.hashCode();
}
}

private record RegisteredCatalog(RegistrationToken registrationToken, CatalogConnector catalog)
{
RegisteredCatalog
{
requireNonNull(registrationToken, "registrationToken is null");
requireNonNull(catalog, "catalog is null");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,23 +190,28 @@ public void ensureCatalogsLoaded(List<CatalogProperties> catalogs)
}

@Override
public void pruneCatalogs(Set<CatalogHandle> catalogsInUse)
public PrunableState getPrunableState()
{
// static catalogs do not need management
return PrunableState.empty();
}

@Override
public Optional<CatalogProperties> getCatalogProperties(CatalogHandle catalogHandle)
public Set<CatalogHandle> getReachableDynamicCatalogs()
{
// static catalog manager does not propagate catalogs between machines
return Optional.empty();
return ImmutableSet.of();
}

@Override
public Set<CatalogHandle> getActiveCatalogs()
public void pruneCatalogs(PrunableState prunableState, Set<CatalogHandle> catalogsInUse)
{
// Static catalog manager does not differentiate between active and not. Nor does it need to prune
return ImmutableSet.of();
// static catalogs do not need management
}

@Override
public Optional<CatalogProperties> getCatalogProperties(CatalogHandle catalogHandle)
{
// static catalog manager does not propagate catalogs between machines
return Optional.empty();
}

@Override
Expand Down
Loading