Skip to content

Commit

Permalink
[Enhancement] Support refresh hms table file meta in background (#19654)
Browse files Browse the repository at this point in the history
* [Enhancement] Support refresh hms table file meta in background

Signed-off-by: Youngwb <[email protected]>
(cherry picked from commit 5170dbc)
  • Loading branch information
Youngwb authored and wanpengfei-git committed Mar 20, 2023
1 parent 68c89e2 commit c1369e0
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 10 deletions.
10 changes: 8 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1594,10 +1594,16 @@ public class Config extends ConfigBase {
public static long hive_max_split_size = 64L * 1024L * 1024L;

/**
* Enable background refresh all hive external tables all partitions metadata on internal catalog.
* Enable background refresh all external tables all partitions metadata on internal catalog.
*/
@ConfField
public static boolean enable_background_refresh_hive_metadata = false;
public static boolean enable_background_refresh_connector_metadata = true;

/**
* Number of threads to refresh remote file's metadata concurrency.
*/
@ConfField
public static int background_refresh_file_metadata_concurrency = 4;

/**
* Background refresh hive external table metadata interval in milliseconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ public void refreshTable(String dbName, Table table, boolean onlyCachedPartition
}
}

public void refreshTableMetaStoreInfo(String dbName, Table table, boolean onlyCachedPartitions) {
public void refreshTableWithExecutor(Table table, boolean onlyCachedPartitions, ExecutorService executor) {
HiveMetaStoreTable hmsTbl = (HiveMetaStoreTable) table;
metastore.refreshTable(hmsTbl.getDbName(), hmsTbl.getTableName(), onlyCachedPartitions);
refreshRemoteFiles(hmsTbl.getTableLocation(), Operator.UPDATE, getExistPaths(hmsTbl), onlyCachedPartitions,
executor);
}

public Set<HiveTableName> getCachedTableNames() {
Expand Down Expand Up @@ -142,6 +144,12 @@ private void processSchemaChange(String srDbName, HiveTable hiveTable) {

private void refreshRemoteFiles(String tableLocation, Operator operator, List<String> existPaths,
boolean onlyCachedPartitions) {
refreshRemoteFiles(tableLocation, operator, existPaths, onlyCachedPartitions, executor);
}


private void refreshRemoteFiles(String tableLocation, Operator operator, List<String> existPaths,
boolean onlyCachedPartitions, ExecutorService refreshExecutor) {
if (remoteFileIO.isPresent()) {
List<RemotePathKey> presentPathKey;
if (onlyCachedPartitions) {
Expand All @@ -155,9 +163,9 @@ private void refreshRemoteFiles(String tableLocation, Operator operator, List<St
presentPathKey.forEach(pathKey -> {
String pathWithSlash = pathKey.getPath().endsWith("/") ? pathKey.getPath() : pathKey.getPath() + "/";
if (operator == Operator.UPDATE && existPaths.contains(pathWithSlash)) {
futures.add(executor.submit(() -> remoteFileIO.get().updateRemoteFiles(pathKey)));
futures.add(refreshExecutor.submit(() -> remoteFileIO.get().updateRemoteFiles(pathKey)));
} else {
futures.add(executor.submit(() -> remoteFileIO.get().invalidatePartition(pathKey)));
futures.add(refreshExecutor.submit(() -> remoteFileIO.get().invalidatePartition(pathKey)));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.starrocks.catalog.BaseTableInfo;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.HiveTable;
Expand All @@ -32,6 +33,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class ConnectorTableMetadataProcessor extends LeaderDaemon {
Expand All @@ -41,6 +44,8 @@ public class ConnectorTableMetadataProcessor extends LeaderDaemon {

private final Map<String, CacheUpdateProcessor> cacheUpdateProcessors = new ConcurrentHashMap<>();

private final ExecutorService refreshRemoteFileExecutor;

public void registerTableInfo(BaseTableInfo tableInfo) {
registeredTableInfos.add(tableInfo);
}
Expand All @@ -57,11 +62,13 @@ public void unRegisterCacheUpdateProcessor(String catalogName) {

public ConnectorTableMetadataProcessor() {
super(ConnectorTableMetadataProcessor.class.getName(), Config.background_refresh_metadata_interval_millis);
refreshRemoteFileExecutor = Executors.newFixedThreadPool(Config.background_refresh_file_metadata_concurrency,
new ThreadFactoryBuilder().setNameFormat("background-refresh-remote-files-%d").build());
}

@Override
protected void runAfterCatalogReady() {
if (!Config.enable_hms_events_incremental_sync && Config.enable_background_refresh_hive_metadata) {
if (!Config.enable_hms_events_incremental_sync) {
refreshResourceHiveTable();
}

Expand Down Expand Up @@ -94,7 +101,7 @@ private void refreshCatalogTable() {
continue;
}
try {
updateProcessor.refreshTableMetaStoreInfo(dbName, table, true);
updateProcessor.refreshTableWithExecutor(table, true, refreshRemoteFileExecutor);
} catch (Exception e) {
LOG.warn("refresh {}.{}.{} meta store info failed, msg : {}", catalogName, dbName,
tableName, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public void onCreate() {
updateProcessor.ifPresent(processor -> GlobalStateMgr.getCurrentState().getMetastoreEventsProcessor()
.registerCacheUpdateProcessor(catalogName, updateProcessor.get()));
} else {
if (!CatalogMgr.ResourceMappingCatalog.isResourceMappingCatalog(catalogName)) {
if (!CatalogMgr.ResourceMappingCatalog.isResourceMappingCatalog(catalogName) &&
internalMgr.isEnableBackgroundRefreshHiveMetadata()) {
updateProcessor
.ifPresent(processor -> GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor()
.registerCacheUpdateProcessor(catalogName, updateProcessor.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class HiveConnectorInternalMgr {
private final int loadRemoteFileMetadataThreadNum;
private final boolean enableHmsEventsIncrementalSync;

private final boolean enableBackgroundRefreshHiveMetadata;

public HiveConnectorInternalMgr(String catalogName, Map<String, String> properties, HdfsEnvironment hdfsEnvironment) {
this.catalogName = catalogName;
this.properties = properties;
Expand All @@ -49,6 +51,9 @@ public HiveConnectorInternalMgr(String catalogName, Map<String, String> properti
String.valueOf(Config.remote_file_metadata_load_concurrency)));
this.enableHmsEventsIncrementalSync = Boolean.parseBoolean(properties.getOrDefault("enable_hms_events_incremental_sync",
String.valueOf(Config.enable_hms_events_incremental_sync)));

this.enableBackgroundRefreshHiveMetadata = Boolean.parseBoolean(properties.getOrDefault(
"enable_background_refresh_connector_metadata", "true"));
}

public void shutdown() {
Expand Down Expand Up @@ -135,4 +140,8 @@ public boolean enableHmsEventsIncrementalSync() {
public HdfsEnvironment getHdfsEnvironment() {
return hdfsEnvironment;
}

public boolean isEnableBackgroundRefreshHiveMetadata() {
return enableBackgroundRefreshHiveMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ private HudiMetadataFactory createMetadataFactory() {
}

public void onCreate() {
if (!CatalogMgr.ResourceMappingCatalog.isResourceMappingCatalog(catalogName)) {
if (!CatalogMgr.ResourceMappingCatalog.isResourceMappingCatalog(catalogName) &&
internalMgr.isEnableBackgroundRefreshHudiMetadata()) {
Optional<CacheUpdateProcessor> updateProcessor = metadataFactory.getCacheUpdateProcessor();
updateProcessor.ifPresent(processor -> GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor()
.registerCacheUpdateProcessor(catalogName, updateProcessor.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class HudiConnectorInternalMgr {
private final boolean isRecursive;
private final int loadRemoteFileMetadataThreadNum;

private final boolean enableBackgroundRefreshHudiMetadata;

public HudiConnectorInternalMgr(String catalogName, Map<String, String> properties, HdfsEnvironment hdfsEnvironment) {
this.catalogName = catalogName;
this.properties = properties;
Expand All @@ -49,6 +51,9 @@ public HudiConnectorInternalMgr(String catalogName, Map<String, String> properti
this.isRecursive = Boolean.parseBoolean(properties.getOrDefault("enable_recursive_listing", "false"));
this.loadRemoteFileMetadataThreadNum = Integer.parseInt(properties.getOrDefault("remote_file_load_thread_num",
String.valueOf(Config.remote_file_metadata_load_concurrency)));

this.enableBackgroundRefreshHudiMetadata = Boolean.parseBoolean(properties.getOrDefault(
"enable_background_refresh_connector_metadata", "true"));
}

public void shutdown() {
Expand Down Expand Up @@ -126,4 +131,8 @@ public CachingHiveMetastoreConf getHiveMetastoreConf() {
public CachingRemoteFileConf getRemoteFileConf() {
return remoteFileConf;
}

public boolean isEnableBackgroundRefreshHudiMetadata() {
return enableBackgroundRefreshHudiMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1207,7 +1207,9 @@ private void startNonLeaderDaemonThreads() {
metastoreEventsProcessor.start();
}

connectorTableMetadataProcessor.start();
if (Config.enable_background_refresh_connector_metadata) {
connectorTableMetadataProcessor.start();
}

// domain resolver
domainResolver.start();
Expand Down

0 comments on commit c1369e0

Please sign in to comment.