Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.catalog.CatalogProperties;
import io.trino.spi.catalog.CatalogStore;
import io.trino.spi.catalog.CatalogStore.StoredCatalog;
import io.trino.spi.connector.CatalogVersion;
import io.trino.spi.connector.ConnectorName;
import jakarta.annotation.PreDestroy;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -93,6 +95,7 @@ public CoordinatorDynamicCatalogManager(CatalogStore catalogStore, CatalogFactor
this.catalogStore = requireNonNull(catalogStore, "catalogStore is null");
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
this.executor = requireNonNull(executor, "executor is null");
catalogStore.setCatalogManager(this);
}

@PreDestroy
Expand Down Expand Up @@ -329,4 +332,70 @@ public void dropCatalog(CatalogName catalogName, boolean exists)
// Do not shut down the catalog, because there may still be running queries using this catalog.
// Catalog shutdown logic will be added later.
}

/**
* Refresh catalogs from the configured catalog store.
* This method is called by catalog stores when they detect external changes.
*/
@Override
public void refreshCatalogsFromStore()
{
catalogsUpdateLock.lock();
try {
if (state != State.INITIALIZED) {
return;
}

// Get desired state from the catalog store
Collection<StoredCatalog> desiredCatalogs = catalogStore.getCatalogs();
Set<CatalogName> desiredNames = desiredCatalogs.stream()
.map(StoredCatalog::name)
.collect(toImmutableSet());

Set<CatalogName> currentNames = ImmutableSet.copyOf(activeCatalogs.keySet());

// Remove catalogs that should no longer exist
for (CatalogName catalogName : currentNames) {
if (!desiredNames.contains(catalogName)) {
activeCatalogs.remove(catalogName);
// Note: CatalogPruneTask will handle actual connector shutdown
}
}

// Add or update catalogs that should exist
for (StoredCatalog desiredCatalog : desiredCatalogs) {
CatalogName desiredCatalogName = desiredCatalog.name();
try {
CatalogProperties catalogProperties = desiredCatalog.loadProperties();

// Check if catalog exists and if version has changed
Catalog existingCatalog = activeCatalogs.get(desiredCatalogName);
boolean needsReload = existingCatalog == null ||
!existingCatalog.getCatalogHandle().getVersion().equals(catalogProperties.version());

if (needsReload) {
log.debug("Loading/reloading catalog during refresh: %s", desiredCatalogName);

// Remove old version if it exists
if (existingCatalog != null) {
activeCatalogs.remove(desiredCatalogName);
}

// Create new catalog
CatalogConnector newCatalog = catalogFactory.createCatalog(catalogProperties);
activeCatalogs.put(desiredCatalogName, newCatalog.getCatalog());
allCatalogs.put(newCatalog.getCatalogHandle(), newCatalog);
}
}
catch (Throwable e) {
log.error(e, "Failed to load catalog %s during refresh", desiredCatalogName);
}
}

log.debug("Catalog refresh completed. Active catalogs: %s", activeCatalogs.keySet());
}
finally {
catalogsUpdateLock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,6 @@ public void dropCatalog(CatalogName catalogName, boolean exists)
void createCatalog(CatalogName catalogName, ConnectorName connectorName, Map<String, String> properties, boolean notExists);

void dropCatalog(CatalogName catalogName, boolean exists);

default void refreshCatalogsFromStore(){}
}
Loading
Loading