Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,9 @@ public static List<Partition> convertNameToMetastorePartition(org.apache.hadoop.
return partitions;
}

public static TableFetcher getTableFetcher(IMetaStoreClient msc, String catalogName, String dbPattern,
public static TableFetcher getTableFetcher(IMetaStoreClient msc, String catalogPattern, String dbPattern,
String tablePattern) {
return new TableFetcher.Builder(msc, catalogName, dbPattern, tablePattern).tableTypes(
return new TableFetcher.Builder(msc, catalogPattern, dbPattern, tablePattern).tableTypes(
"EXTERNAL_TABLE")
.tableCondition(
hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS + "table_type like \"ICEBERG\" ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public Set<CompactionInfo> findPotentialCompactions(long lastChecked, ShowCompac
private Iterable<Table> getTables(Set<String> skipDBs, Set<String> skipTables) {
try {
int maxBatchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
return IcebergTableUtil.getTableFetcher(client, null, "*", null).getTables(skipDBs, skipTables, maxBatchSize);
return IcebergTableUtil.getTableFetcher(client, null, "*", null)
.getTables(null, skipDBs, skipTables, maxBatchSize);
} catch (Exception e) {
throw new RuntimeMetaException(e, "Error getting tables");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,24 @@ public long runFrequency(TimeUnit unit) {
public void run() {
LOG.debug("Running IcebergHouseKeeperService...");

String catalogName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_CATALOG_NAME);
String catalogPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_CATALOG_PATTERN);
String dbPattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_DATABASE_PATTERN);
String tablePattern = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_TABLE_PATTERN);

TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex();

try (AutoCloseable closeable = mutex.acquireLock(TxnStore.MUTEX_KEY.IcebergHouseKeeper.name())) {
expireTables(catalogName, dbPattern, tablePattern);
expireTables(catalogPattern, dbPattern, tablePattern);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void expireTables(String catalogName, String dbPattern, String tablePattern) {
private void expireTables(String catalogPattern, String dbPattern, String tablePattern) {
try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) {
int maxBatchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
Iterable<org.apache.hadoop.hive.metastore.api.Table> tables =
IcebergTableUtil.getTableFetcher(msc, catalogName, dbPattern, tablePattern).getTables(maxBatchSize);
IcebergTableUtil.getTableFetcher(msc, catalogPattern, dbPattern, tablePattern).getTables(maxBatchSize);
// TODO : HIVE-29163 - Create client with cache in metastore package and then use it in TableFetcher
// and HiveTableOperations to reduce the number of msc calls and fetch it from cache
for (org.apache.hadoop.hive.metastore.api.Table table : tables) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static void afterClass() {
public void testIcebergTableFetched() throws Exception {
createIcebergTable("iceberg_table");

TableFetcher tableFetcher = IcebergTableUtil.getTableFetcher(db.getMSC(), null, "default", "*");
TableFetcher tableFetcher = IcebergTableUtil.getTableFetcher(db.getMSC(), "*", "default", "*");

int maxBatchSize = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
Iterator<org.apache.hadoop.hive.metastore.api.Table> tables = tableFetcher.getTables(maxBatchSize).iterator();
Expand All @@ -82,7 +82,7 @@ public void testIcebergTableFetched() throws Exception {
public void testExpireSnapshotsByServiceRun() throws Exception {
String tableName = "iceberg_table_snapshot_expiry_e2e_test";
createIcebergTable(tableName);
IcebergHouseKeeperService service = getServiceForTable("default", tableName);
IcebergHouseKeeperService service = getServiceForTable("hive", "default", tableName);

GetTableRequest request = new GetTableRequest("default", tableName);
org.apache.iceberg.Table icebergTable = IcebergTableUtil.getTable(conf, db.getMSC().getTable(request));
Expand Down Expand Up @@ -128,9 +128,10 @@ private void createIcebergTable(String name) throws Exception {
* @param tableName to be cleaned up
* @return IcebergHouseKeeperService
*/
private IcebergHouseKeeperService getServiceForTable(String dbName, String tableName) {
private IcebergHouseKeeperService getServiceForTable(String catalogName, String dbName, String tableName) {
IcebergHouseKeeperService service = new IcebergHouseKeeperService();
HiveConf serviceConf = new HiveConf(conf);
serviceConf.set("hive.metastore.iceberg.table.expiry.catalog.pattern", catalogName);
serviceConf.set("hive.metastore.iceberg.table.expiry.database.pattern", dbName);
serviceConf.set("hive.metastore.iceberg.table.expiry.table.pattern", tableName);
service.setConf(serviceConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,15 @@ void alterCatalog(String catalogName, Catalog newCatalog)
*/
List<String> getCatalogs() throws MetaException, TException;

/**
* Get a list of all catalogs known to the system.
* @param catalogPattern pattern for the catalog name to match
* @return list of catalog names
* @throws MetaException something went wrong, usually in the database.
* @throws TException general thrift exception.
*/
List<String> getCatalogs(String catalogPattern) throws MetaException, TException;

/**
* Drop a catalog. Catalogs must be empty to be dropped, there is no cascade for dropping a
* catalog.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public final void dropCatalog(String catName)
dropCatalog(catName, true);
}

@Override
public final List<String> getCatalogs() throws MetaException, TException {
return getCatalogs(null);
}

@Override
public final List<String> getDatabases(String databasePattern) throws MetaException, TException {
return getDatabases(getDefaultCatalog(conf), databasePattern);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ public Catalog getCatalog(String catName) throws NoSuchObjectException, MetaExce
}

@Override
public List<String> getCatalogs() throws MetaException, TException {
return delegate.getCatalogs();
public List<String> getCatalogs(String catalogPattern) throws MetaException, TException {
return delegate.getCatalogs(catalogPattern);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -966,8 +966,8 @@ public Catalog getCatalog(String catName) throws TException {
}

@Override
public List<String> getCatalogs() throws TException {
GetCatalogsResponse rsp = client.get_catalogs();
public List<String> getCatalogs(String catalogPattern) throws TException {
GetCatalogsResponse rsp = client.get_catalogs(new GetCatalogRequest(catalogPattern));
if (rsp == null || rsp.getNames() == null) {
return null;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableIterable;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
Expand All @@ -42,8 +41,8 @@ public class TableFetcher {

// mandatory client passed to this fetcher, has to be closed from caller
private final IMetaStoreClient client;
// mandatory catalogName
private final String catalogName;
// mandatory catalogPattern: use "*" to fetch all, empty to fetch none
private final String catalogPattern;
// mandatory dbPattern: use "*" to fetch all, empty to fetch none
private final String dbPattern;
// optional tableTypes: comma separated table types to fetch, fetcher result is empty list if this is empty
Expand All @@ -56,10 +55,7 @@ public class TableFetcher {

private TableFetcher(Builder builder) {
this.client = builder.client;
if ("*".equalsIgnoreCase(builder.catalogName)) {
LOG.warn("Invalid wildcard '*' parameter for catalogName, exact catalog name is expected instead of regexp");
}
this.catalogName = Optional.ofNullable(builder.catalogName).orElse(Warehouse.DEFAULT_CATALOG_NAME);
this.catalogPattern = Optional.ofNullable(builder.catalogPattern).orElse("");
this.dbPattern = Optional.ofNullable(builder.dbPattern).orElse("");
String tablePattern = Optional.ofNullable(builder.tablePattern).orElse("");
String stringTableTypes = Optional.ofNullable(builder.tableTypes).orElse("");
Expand Down Expand Up @@ -103,42 +99,57 @@ public List<TableName> getTableNames() throws Exception {
return candidates;
}

List<String> databases = client.getDatabases(catalogName, dbPattern);
List<String> catalogs = client.getCatalogs(catalogPattern);

for (String db : databases) {
List<String> tablesNames = getTableNamesForDatabase(catalogName, db);
tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db)));
for (String catalogName : catalogs) {
List<String> databases = client.getDatabases(catalogName, dbPattern);
for (String db : databases) {
List<String> tablesNames = getTableNamesForDatabase(catalogName, db);
tablesNames.forEach(tablesName -> candidates.add(TableName.fromString(tablesName, catalogName, db)));
}
}
return candidates;
}

public Iterable<Table> getTables(Set<String> skipDBs, Set<String> skipTables, int maxBatchSize) throws Exception {
public Iterable<Table> getTables(Set<String> skipCatalogs, Set<String> skipDBs, Set<String> skipTables, int maxBatchSize) throws Exception {
// if tableTypes is empty, then a list with single empty string has to specified to scan no tables.
if (tableTypes.isEmpty()) {
LOG.info("Table fetcher returns empty list as no table types specified");
return Collections.emptyList();
}

List<String> databases = client.getDatabases(catalogName, dbPattern).stream()
.filter(dbName -> skipDBs == null || !skipDBs.contains(dbName))
List<String> catalogs = client.getCatalogs(catalogPattern).stream()
.filter(catalogName -> skipCatalogs == null || !skipCatalogs.contains(catalogName))
.toList();

return () -> Iterators.concat(
Iterators.transform(databases.iterator(), db -> {
Iterators.transform(catalogs.iterator(), catalogName -> {
try {
List<String> tableNames = getTableNamesForDatabase(catalogName, db).stream()
.filter(tableName -> skipTables == null || !skipTables.contains(TableName.getDbTable(db, tableName)))
List<String> databases = client.getDatabases(catalogName, dbPattern).stream()
.filter(dbName -> skipDBs == null || !skipDBs.contains(dbName))
.toList();
return new TableIterable(client, db, tableNames, maxBatchSize).iterator();
return Iterators.concat(
Iterators.transform(databases.iterator(), dbName -> {
try {
List<String> tableNames = getTableNamesForDatabase(catalogName, dbName).stream()
.filter(tableName -> skipTables == null ||
!skipTables.contains(TableName.getDbTable(dbName, tableName)))
.toList();
return new TableIterable(client, dbName, tableNames, maxBatchSize).iterator();
} catch (Exception e) {
throw new RuntimeException("Failed to fetch tables for db: " + dbName, e);
}
})
);
} catch (Exception e) {
throw new RuntimeException("Failed to fetch tables for db: " + db, e);
throw new RuntimeException("Failed to fetch database for catalog: " + catalogName, e);
}
})
);
}

public Iterable<Table> getTables(int maxBatchSize) throws Exception {
return getTables(null, null, maxBatchSize);
return getTables(null, null, null, maxBatchSize);
}

private List<String> getTableNamesForDatabase(String catalogName, String dbName) throws Exception {
Expand All @@ -158,15 +169,15 @@ private List<String> getTableNamesForDatabase(String catalogName, String dbName)

public static class Builder {
private final IMetaStoreClient client;
private final String catalogName;
private final String catalogPattern;
private final String dbPattern;
private final String tablePattern;
private final List<String> tableConditions = new ArrayList<>();
private String tableTypes;

public Builder(IMetaStoreClient client, String catalogName, String dbPattern, String tablePattern) {
public Builder(IMetaStoreClient client, String catalogPattern, String dbPattern, String tablePattern) {
this.client = client;
this.catalogName = catalogName;
this.catalogPattern = catalogPattern;
this.dbPattern = dbPattern;
this.tablePattern = tablePattern;
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading