From 51766689d5a5b72f24f966afec0f2d107e21c380 Mon Sep 17 00:00:00 2001 From: superqtqt Date: Fri, 6 Aug 2021 17:00:35 +0800 Subject: [PATCH] Fix multithread writing for fragment result cache --- .../FileFragmentResultCacheConfig.java | 16 +++++++++ .../FileFragmentResultCacheManager.java | 21 +++++++----- .../TestFileFragmentResultCacheConfig.java | 8 +++-- .../TestFileFragmentResultCacheManager.java | 33 +++++++++++++++++++ 4 files changed, 67 insertions(+), 11 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/operator/FileFragmentResultCacheConfig.java b/presto-main/src/main/java/com/facebook/presto/operator/FileFragmentResultCacheConfig.java index 01e69a8cd5d82..c84207408d032 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/FileFragmentResultCacheConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/FileFragmentResultCacheConfig.java @@ -25,6 +25,7 @@ import java.net.URI; import static io.airlift.units.DataSize.Unit.GIGABYTE; +import static io.airlift.units.DataSize.Unit.MEGABYTE; import static java.util.concurrent.TimeUnit.DAYS; public class FileFragmentResultCacheConfig @@ -36,6 +37,7 @@ public class FileFragmentResultCacheConfig private int maxCachedEntries = 10_000; private Duration cacheTtl = new Duration(2, DAYS); private DataSize maxInFlightSize = new DataSize(1, GIGABYTE); + private DataSize maxSinglePagesSize = new DataSize(500, MEGABYTE); public boolean isCachingEnabled() { @@ -117,4 +119,18 @@ public FileFragmentResultCacheConfig setMaxInFlightSize(DataSize maxInFlightSize this.maxInFlightSize = maxInFlightSize; return this; } + + @MinDataSize("0B") + public DataSize getMaxSinglePagesSize() + { + return maxSinglePagesSize; + } + + @Config("fragment-result-cache.max-single-pages-size") + @ConfigDescription("Maximum size of pages write to flushed") + public FileFragmentResultCacheConfig setMaxSinglePagesSize(DataSize maxSinglePagesSize) + { + this.maxSinglePagesSize = maxSinglePagesSize; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/FileFragmentResultCacheManager.java b/presto-main/src/main/java/com/facebook/presto/operator/FileFragmentResultCacheManager.java index 3795a6b4ba5a0..e58269f9338f0 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/FileFragmentResultCacheManager.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/FileFragmentResultCacheManager.java @@ -20,7 +20,6 @@ import com.facebook.presto.metadata.Split; import com.facebook.presto.metadata.Split.SplitIdentifier; import com.facebook.presto.spi.PrestoException; -import com.facebook.presto.spi.page.PagesSerde; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; @@ -67,7 +66,8 @@ public class FileFragmentResultCacheManager private final Path baseDirectory; private final long maxInFlightBytes; - private final PagesSerde pagesSerde; + private final long maxSinglePagesBytes; + private final PagesSerdeFactory pagesSerdeFactory; private final FragmentCacheStats fragmentCacheStats; private final ExecutorService flushExecutor; private final ExecutorService removalExecutor; @@ -88,7 +88,9 @@ public FileFragmentResultCacheManager( this.baseDirectory = Paths.get(cacheConfig.getBaseDirectory()); this.maxInFlightBytes = cacheConfig.getMaxInFlightSize().toBytes(); - this.pagesSerde = new PagesSerdeFactory(blockEncodingSerde, cacheConfig.isBlockEncodingCompressionEnabled()).createPagesSerde(); + this.maxSinglePagesBytes = cacheConfig.getMaxSinglePagesSize().toBytes(); + // pagesSerde is not thread safe + this.pagesSerdeFactory = new PagesSerdeFactory(blockEncodingSerde, cacheConfig.isBlockEncodingCompressionEnabled()); this.fragmentCacheStats = requireNonNull(fragmentCacheStats, "fragmentCacheStats is null"); this.flushExecutor = requireNonNull(flushExecutor, "flushExecutor is null"); this.removalExecutor = requireNonNull(removalExecutor, "removalExecutor is null"); @@ -130,13 +132,13 @@ public Future put(String serializedPlan, Split split, List result) { CacheKey key = new CacheKey(serializedPlan, split.getSplitIdentifier()); long resultSize = getPagesSize(result); - if (fragmentCacheStats.getInFlightBytes() + resultSize > maxInFlightBytes || cache.getIfPresent(key) != null) { + if (fragmentCacheStats.getInFlightBytes() + resultSize > maxInFlightBytes || cache.getIfPresent(key) != null || resultSize > maxSinglePagesBytes) { return immediateFuture(null); } fragmentCacheStats.addInFlightBytes(resultSize); Path path = baseDirectory.resolve(randomUUID().toString().replaceAll("-", "_")); - return flushExecutor.submit(() -> cachePages(key, path, result)); + return flushExecutor.submit(() -> cachePages(key, path, result, resultSize)); } private static long getPagesSize(List pages) @@ -146,13 +148,13 @@ private static long getPagesSize(List pages) .sum(); } - private void cachePages(CacheKey key, Path path, List pages) + private void cachePages(CacheKey key, Path path, List pages, long resultSize) { // TODO: To support both memory and disk limit, we should check cache size before putting to cache and use written bytes as weight for cache try { Files.createFile(path); try (SliceOutput output = new OutputStreamSliceOutput(newOutputStream(path, APPEND))) { - writePages(pagesSerde, output, pages.iterator()); + writePages(pagesSerdeFactory.createPagesSerde(), output, pages.iterator()); cache.put(key, path); fragmentCacheStats.incrementCacheEntries(); } @@ -166,7 +168,7 @@ private void cachePages(CacheKey key, Path path, List pages) tryDeleteFile(path); } finally { - fragmentCacheStats.addInFlightBytes(-getPagesSize(pages)); + fragmentCacheStats.addInFlightBytes(-resultSize); } } @@ -195,11 +197,12 @@ public Optional> get(String serializedPlan, Split split) try { InputStream inputStream = newInputStream(path); - Iterator result = readPages(pagesSerde, new InputStreamSliceInput(inputStream)); + Iterator result = readPages(pagesSerdeFactory.createPagesSerde(), new InputStreamSliceInput(inputStream)); fragmentCacheStats.incrementCacheHit(); return Optional.of(closeWhenExhausted(result, inputStream)); } catch (UncheckedIOException | IOException e) { + log.error(e, "read path %s error", path); // there might be a chance the file has been deleted. We would return cache miss in this case. fragmentCacheStats.incrementCacheMiss(); return Optional.empty(); diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestFileFragmentResultCacheConfig.java b/presto-main/src/test/java/com/facebook/presto/operator/TestFileFragmentResultCacheConfig.java index 20117d9cc2218..fe51e902e3069 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestFileFragmentResultCacheConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestFileFragmentResultCacheConfig.java @@ -25,6 +25,7 @@ import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static io.airlift.units.DataSize.Unit.GIGABYTE; +import static io.airlift.units.DataSize.Unit.MEGABYTE; import static java.util.concurrent.TimeUnit.DAYS; public class TestFileFragmentResultCacheConfig @@ -38,7 +39,8 @@ public void testDefaults() .setBlockEncodingCompressionEnabled(false) .setMaxCachedEntries(10_000) .setCacheTtl(new Duration(2, DAYS)) - .setMaxInFlightSize(new DataSize(1, GIGABYTE))); + .setMaxInFlightSize(new DataSize(1, GIGABYTE)) + .setMaxSinglePagesSize(new DataSize(500, MEGABYTE))); } @Test @@ -52,6 +54,7 @@ public void testExplicitPropertyMappings() .put("fragment-result-cache.max-cached-entries", "100000") .put("fragment-result-cache.cache-ttl", "1d") .put("fragment-result-cache.max-in-flight-size", "2GB") + .put("fragment-result-cache.max-single-pages-size", "200MB") .build(); FileFragmentResultCacheConfig expected = new FileFragmentResultCacheConfig() @@ -60,7 +63,8 @@ public void testExplicitPropertyMappings() .setBlockEncodingCompressionEnabled(true) .setMaxCachedEntries(100000) .setCacheTtl(new Duration(1, DAYS)) - .setMaxInFlightSize(new DataSize(2, GIGABYTE)); + .setMaxInFlightSize(new DataSize(2, GIGABYTE)) + .setMaxSinglePagesSize(new DataSize(200, MEGABYTE)); assertFullMapping(properties, expected); } diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestFileFragmentResultCacheManager.java b/presto-main/src/test/java/com/facebook/presto/operator/TestFileFragmentResultCacheManager.java index 0f2ed7050da0a..e13672fd587ed 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestFileFragmentResultCacheManager.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestFileFragmentResultCacheManager.java @@ -35,6 +35,8 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static com.facebook.presto.block.BlockAssertions.createStringsBlock; @@ -56,6 +58,7 @@ public class TestFileFragmentResultCacheManager private final ExecutorService writeExecutor = newScheduledThreadPool(5, daemonThreadsNamed("test-cache-flusher-%s")); private final ExecutorService removalExecutor = newScheduledThreadPool(5, daemonThreadsNamed("test-cache-remover-%s")); + private final ExecutorService multithreadingWriteExecutor = newScheduledThreadPool(10, daemonThreadsNamed("test-cache-multithreading-flusher-%s")); private URI cacheDirectory; @@ -72,6 +75,7 @@ public void close() { writeExecutor.shutdown(); removalExecutor.shutdown(); + multithreadingWriteExecutor.shutdown(); checkState(cacheDirectory != null); File[] files = new File(cacheDirectory).listFiles(); @@ -147,6 +151,35 @@ private static void assertPagesEqual(Iterator pages1, Iterator pages assertFalse(pages2.hasNext()); } + @Test(timeOut = 30_000) + public void testThreadWrite() throws Exception + { + String writeThreadNameFormat = "test write content,thread %s,%s"; + FragmentCacheStats stats = new FragmentCacheStats(); + FileFragmentResultCacheManager threadWriteCacheManager = fileFragmentResultCacheManager(stats); + ImmutableList.Builder> futures = ImmutableList.builder(); + for (int i = 0; i < 10; i++) { + Future future = multithreadingWriteExecutor.submit(() -> { + try { + String threadInfo = String.format(writeThreadNameFormat, Thread.currentThread().getName(), Thread.currentThread().getId()); + List pages = ImmutableList.of(new Page(createStringsBlock(threadInfo))); + threadWriteCacheManager.put(threadInfo, SPLIT_2, pages).get(); + Optional> result = threadWriteCacheManager.get(threadInfo, SPLIT_2); + assertTrue(result.isPresent()); + assertPagesEqual(result.get(), pages.iterator()); + return true; + } + catch (Exception e) { + return false; + } + }); + futures.add(future); + } + for (Future future : futures.build()) { + assertTrue(future.get(30, TimeUnit.SECONDS)); + } + } + private FileFragmentResultCacheManager fileFragmentResultCacheManager(FragmentCacheStats fragmentCacheStats) { FileFragmentResultCacheConfig cacheConfig = new FileFragmentResultCacheConfig();