From c30b34a4b8a789a34237f374df38ad3bcbda024b Mon Sep 17 00:00:00 2001 From: bufapiqi Date: Fri, 12 Apr 2024 17:20:11 +0800 Subject: [PATCH] fix(interactive): Support concurrent compact operation (#3706) --- .../groot/common/config/StoreConfig.java | 3 ++ .../graphscope/groot/store/StoreService.java | 54 +++++++++++++------ 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java index 952b7e2f7880..c3107944ad27 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java @@ -49,4 +49,7 @@ public class StoreConfig { public static final Config STORE_WAL_DIR = Config.stringConfig("store.rocksdb.wal.dir", ""); + + public static final Config STORE_COMPACT_THREAD_NUM = + Config.intConfig("store.compact.thread.num", 1); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java index 10de7d54cbe0..7df33b8ec7dd 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java @@ -63,6 +63,7 @@ public class StoreService implements MetricsAgent { private final Configs storeConfigs; private final int storeId; private final int writeThreadCount; + private final int compactThreadCount; private final MetaService metaService; private Map idToPartition; private ExecutorService writeExecutor; @@ -84,6 +85,7 @@ public StoreService( this.storeId = CommonConfig.NODE_IDX.get(storeConfigs); this.enableGc = StoreConfig.STORE_GC_ENABLE.get(storeConfigs); this.writeThreadCount = StoreConfig.STORE_WRITE_THREAD_COUNT.get(storeConfigs); + this.compactThreadCount = StoreConfig.STORE_COMPACT_THREAD_NUM.get(storeConfigs); this.metaService = metaService; this.isSecondary = CommonConfig.SECONDARY_INSTANCE_ENABLED.get(storeConfigs); metricsCollector.register(this, this::updateMetrics); @@ -121,13 +123,18 @@ public void start() throws IOException { new LinkedBlockingQueue<>(1), ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( "store-ingest", logger)); + int partitionCount = partitionIds.size(); + int compactQueueLength = + partitionCount - this.compactThreadCount <= 0 + ? 1 + : partitionCount - this.compactThreadCount; this.compactExecutor = new ThreadPoolExecutor( 1, - 1, - 0L, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(1), + this.compactThreadCount, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(compactQueueLength), ThreadFactoryUtils.daemonThreadFactoryWithLogExceptionHandler( "store-compact", logger)); this.garbageCollectExecutor = @@ -407,21 +414,36 @@ public void compactDB(CompletionCallback callback) { callback.onCompleted(null); return; } - this.compactExecutor.execute( - () -> { - logger.info("compact DB"); - try { - for (GraphPartition partition : this.idToPartition.values()) { + int partitionCount = this.idToPartition.values().size(); + CountDownLatch compactCountDownLatch = new CountDownLatch(partitionCount); + AtomicInteger successCompactJobCount = new AtomicInteger(partitionCount); + logger.info("compact DB"); + for (GraphPartition partition : this.idToPartition.values()) { + this.compactExecutor.execute( + () -> { + try { partition.compact(); logger.info("Compaction {} partition finished", partition.getId()); + successCompactJobCount.decrementAndGet(); + } catch (Exception e) { + logger.error("compact DB failed", e); + } finally { + compactCountDownLatch.countDown(); } - logger.info("compact DB finished"); - callback.onCompleted(null); - } catch (Exception e) { - logger.error("compact DB failed", e); - callback.onError(e); - } - }); + }); + } + + try { + compactCountDownLatch.await(); + } catch (InterruptedException e) { + logger.error("compact DB has been InterruptedException", e); + } + + if (successCompactJobCount.get() > 0) { + callback.onError(new Exception("not all partition compact success. please check log.")); + } else { + callback.onCompleted(null); + } } public void tryCatchUpWithPrimary() throws IOException {