From c1369e04fad8a752775c4b34e1424484711da969 Mon Sep 17 00:00:00 2001 From: Youngwb Date: Sun, 19 Mar 2023 22:01:44 +0800 Subject: [PATCH] [Enhancement] Support refresh hms table file meta in background (#19654) * [Enhancement] Support refresh hms table file meta in background Signed-off-by: Youngwb (cherry picked from commit 5170dbce812468fe5c4d7fa6313723f20baf5102) --- .../src/main/java/com/starrocks/common/Config.java | 10 ++++++++-- .../connector/hive/CacheUpdateProcessor.java | 14 +++++++++++--- .../hive/ConnectorTableMetadataProcessor.java | 11 +++++++++-- .../starrocks/connector/hive/HiveConnector.java | 3 ++- .../connector/hive/HiveConnectorInternalMgr.java | 9 +++++++++ .../starrocks/connector/hudi/HudiConnector.java | 3 ++- .../connector/hudi/HudiConnectorInternalMgr.java | 9 +++++++++ .../java/com/starrocks/server/GlobalStateMgr.java | 4 +++- 8 files changed, 53 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 2a7f5b5d3cbf1..a5103ef4d8a20 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -1594,10 +1594,16 @@ public class Config extends ConfigBase { public static long hive_max_split_size = 64L * 1024L * 1024L; /** - * Enable background refresh all hive external tables all partitions metadata on internal catalog. + * Enable background refresh all external tables all partitions metadata on internal catalog. */ @ConfField - public static boolean enable_background_refresh_hive_metadata = false; + public static boolean enable_background_refresh_connector_metadata = true; + + /** + * Number of threads to refresh remote file's metadata concurrency. + */ + @ConfField + public static int background_refresh_file_metadata_concurrency = 4; /** * Background refresh hive external table metadata interval in milliseconds. diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hive/CacheUpdateProcessor.java b/fe/fe-core/src/main/java/com/starrocks/connector/hive/CacheUpdateProcessor.java index 7eb89e113c837..7dd3ec46e0197 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/hive/CacheUpdateProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hive/CacheUpdateProcessor.java @@ -72,9 +72,11 @@ public void refreshTable(String dbName, Table table, boolean onlyCachedPartition } } - public void refreshTableMetaStoreInfo(String dbName, Table table, boolean onlyCachedPartitions) { + public void refreshTableWithExecutor(Table table, boolean onlyCachedPartitions, ExecutorService executor) { HiveMetaStoreTable hmsTbl = (HiveMetaStoreTable) table; metastore.refreshTable(hmsTbl.getDbName(), hmsTbl.getTableName(), onlyCachedPartitions); + refreshRemoteFiles(hmsTbl.getTableLocation(), Operator.UPDATE, getExistPaths(hmsTbl), onlyCachedPartitions, + executor); } public Set getCachedTableNames() { @@ -142,6 +144,12 @@ private void processSchemaChange(String srDbName, HiveTable hiveTable) { private void refreshRemoteFiles(String tableLocation, Operator operator, List existPaths, boolean onlyCachedPartitions) { + refreshRemoteFiles(tableLocation, operator, existPaths, onlyCachedPartitions, executor); + } + + + private void refreshRemoteFiles(String tableLocation, Operator operator, List existPaths, + boolean onlyCachedPartitions, ExecutorService refreshExecutor) { if (remoteFileIO.isPresent()) { List presentPathKey; if (onlyCachedPartitions) { @@ -155,9 +163,9 @@ private void refreshRemoteFiles(String tableLocation, Operator operator, List { String pathWithSlash = pathKey.getPath().endsWith("/") ? pathKey.getPath() : pathKey.getPath() + "/"; if (operator == Operator.UPDATE && existPaths.contains(pathWithSlash)) { - futures.add(executor.submit(() -> remoteFileIO.get().updateRemoteFiles(pathKey))); + futures.add(refreshExecutor.submit(() -> remoteFileIO.get().updateRemoteFiles(pathKey))); } else { - futures.add(executor.submit(() -> remoteFileIO.get().invalidatePartition(pathKey))); + futures.add(refreshExecutor.submit(() -> remoteFileIO.get().invalidatePartition(pathKey))); } }); diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java b/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java index 2c2a442e95359..9965ac6af1a87 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java @@ -16,6 +16,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.starrocks.catalog.BaseTableInfo; import com.starrocks.catalog.Database; import com.starrocks.catalog.HiveTable; @@ -32,6 +33,8 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.stream.Collectors; public class ConnectorTableMetadataProcessor extends LeaderDaemon { @@ -41,6 +44,8 @@ public class ConnectorTableMetadataProcessor extends LeaderDaemon { private final Map cacheUpdateProcessors = new ConcurrentHashMap<>(); + private final ExecutorService refreshRemoteFileExecutor; + public void registerTableInfo(BaseTableInfo tableInfo) { registeredTableInfos.add(tableInfo); } @@ -57,11 +62,13 @@ public void unRegisterCacheUpdateProcessor(String catalogName) { public ConnectorTableMetadataProcessor() { super(ConnectorTableMetadataProcessor.class.getName(), Config.background_refresh_metadata_interval_millis); + refreshRemoteFileExecutor = Executors.newFixedThreadPool(Config.background_refresh_file_metadata_concurrency, + new ThreadFactoryBuilder().setNameFormat("background-refresh-remote-files-%d").build()); } @Override protected void runAfterCatalogReady() { - if (!Config.enable_hms_events_incremental_sync && Config.enable_background_refresh_hive_metadata) { + if (!Config.enable_hms_events_incremental_sync) { refreshResourceHiveTable(); } @@ -94,7 +101,7 @@ private void refreshCatalogTable() { continue; } try { - updateProcessor.refreshTableMetaStoreInfo(dbName, table, true); + updateProcessor.refreshTableWithExecutor(table, true, refreshRemoteFileExecutor); } catch (Exception e) { LOG.warn("refresh {}.{}.{} meta store info failed, msg : {}", catalogName, dbName, tableName, e); diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnector.java b/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnector.java index 968d55d96dde9..f15e95ef4e9e0 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnector.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnector.java @@ -82,7 +82,8 @@ public void onCreate() { updateProcessor.ifPresent(processor -> GlobalStateMgr.getCurrentState().getMetastoreEventsProcessor() .registerCacheUpdateProcessor(catalogName, updateProcessor.get())); } else { - if (!CatalogMgr.ResourceMappingCatalog.isResourceMappingCatalog(catalogName)) { + if (!CatalogMgr.ResourceMappingCatalog.isResourceMappingCatalog(catalogName) && + internalMgr.isEnableBackgroundRefreshHiveMetadata()) { updateProcessor .ifPresent(processor -> GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor() .registerCacheUpdateProcessor(catalogName, updateProcessor.get())); diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnectorInternalMgr.java b/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnectorInternalMgr.java index 3c26b534bcabf..273c86f17c99a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnectorInternalMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnectorInternalMgr.java @@ -34,6 +34,8 @@ public class HiveConnectorInternalMgr { private final int loadRemoteFileMetadataThreadNum; private final boolean enableHmsEventsIncrementalSync; + private final boolean enableBackgroundRefreshHiveMetadata; + public HiveConnectorInternalMgr(String catalogName, Map properties, HdfsEnvironment hdfsEnvironment) { this.catalogName = catalogName; this.properties = properties; @@ -49,6 +51,9 @@ public HiveConnectorInternalMgr(String catalogName, Map properti String.valueOf(Config.remote_file_metadata_load_concurrency))); this.enableHmsEventsIncrementalSync = Boolean.parseBoolean(properties.getOrDefault("enable_hms_events_incremental_sync", String.valueOf(Config.enable_hms_events_incremental_sync))); + + this.enableBackgroundRefreshHiveMetadata = Boolean.parseBoolean(properties.getOrDefault( + "enable_background_refresh_connector_metadata", "true")); } public void shutdown() { @@ -135,4 +140,8 @@ public boolean enableHmsEventsIncrementalSync() { public HdfsEnvironment getHdfsEnvironment() { return hdfsEnvironment; } + + public boolean isEnableBackgroundRefreshHiveMetadata() { + return enableBackgroundRefreshHiveMetadata; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnector.java b/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnector.java index e0552c4321fcb..66e8b0d37a38d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnector.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnector.java @@ -64,7 +64,8 @@ private HudiMetadataFactory createMetadataFactory() { } public void onCreate() { - if (!CatalogMgr.ResourceMappingCatalog.isResourceMappingCatalog(catalogName)) { + if (!CatalogMgr.ResourceMappingCatalog.isResourceMappingCatalog(catalogName) && + internalMgr.isEnableBackgroundRefreshHudiMetadata()) { Optional updateProcessor = metadataFactory.getCacheUpdateProcessor(); updateProcessor.ifPresent(processor -> GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor() .registerCacheUpdateProcessor(catalogName, updateProcessor.get())); diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnectorInternalMgr.java b/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnectorInternalMgr.java index 73480c64d6843..4f6abb3c86715 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnectorInternalMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnectorInternalMgr.java @@ -36,6 +36,8 @@ public class HudiConnectorInternalMgr { private final boolean isRecursive; private final int loadRemoteFileMetadataThreadNum; + private final boolean enableBackgroundRefreshHudiMetadata; + public HudiConnectorInternalMgr(String catalogName, Map properties, HdfsEnvironment hdfsEnvironment) { this.catalogName = catalogName; this.properties = properties; @@ -49,6 +51,9 @@ public HudiConnectorInternalMgr(String catalogName, Map properti this.isRecursive = Boolean.parseBoolean(properties.getOrDefault("enable_recursive_listing", "false")); this.loadRemoteFileMetadataThreadNum = Integer.parseInt(properties.getOrDefault("remote_file_load_thread_num", String.valueOf(Config.remote_file_metadata_load_concurrency))); + + this.enableBackgroundRefreshHudiMetadata = Boolean.parseBoolean(properties.getOrDefault( + "enable_background_refresh_connector_metadata", "true")); } public void shutdown() { @@ -126,4 +131,8 @@ public CachingHiveMetastoreConf getHiveMetastoreConf() { public CachingRemoteFileConf getRemoteFileConf() { return remoteFileConf; } + + public boolean isEnableBackgroundRefreshHudiMetadata() { + return enableBackgroundRefreshHudiMetadata; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java index 53f5d6f91eabb..ce8732633e348 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java @@ -1207,7 +1207,9 @@ private void startNonLeaderDaemonThreads() { metastoreEventsProcessor.start(); } - connectorTableMetadataProcessor.start(); + if (Config.enable_background_refresh_connector_metadata) { + connectorTableMetadataProcessor.start(); + } // domain resolver domainResolver.start();