Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.net.URI;

import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.TimeUnit.DAYS;

public class FileFragmentResultCacheConfig
Expand All @@ -36,6 +37,7 @@ public class FileFragmentResultCacheConfig
private int maxCachedEntries = 10_000;
private Duration cacheTtl = new Duration(2, DAYS);
private DataSize maxInFlightSize = new DataSize(1, GIGABYTE);
private DataSize maxSinglePagesSize = new DataSize(500, MEGABYTE);

public boolean isCachingEnabled()
{
Expand Down Expand Up @@ -117,4 +119,18 @@ public FileFragmentResultCacheConfig setMaxInFlightSize(DataSize maxInFlightSize
this.maxInFlightSize = maxInFlightSize;
return this;
}

@MinDataSize("0B")
public DataSize getMaxSinglePagesSize()
{
return maxSinglePagesSize;
}

@Config("fragment-result-cache.max-single-pages-size")
@ConfigDescription("Maximum size of pages write to flushed")
public FileFragmentResultCacheConfig setMaxSinglePagesSize(DataSize maxSinglePagesSize)
{
this.maxSinglePagesSize = maxSinglePagesSize;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.facebook.presto.metadata.Split;
import com.facebook.presto.metadata.Split.SplitIdentifier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.page.PagesSerde;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
Expand Down Expand Up @@ -67,7 +66,8 @@ public class FileFragmentResultCacheManager

private final Path baseDirectory;
private final long maxInFlightBytes;
private final PagesSerde pagesSerde;
private final long maxSinglePagesBytes;
private final PagesSerdeFactory pagesSerdeFactory;
private final FragmentCacheStats fragmentCacheStats;
private final ExecutorService flushExecutor;
private final ExecutorService removalExecutor;
Expand All @@ -88,7 +88,9 @@ public FileFragmentResultCacheManager(

this.baseDirectory = Paths.get(cacheConfig.getBaseDirectory());
this.maxInFlightBytes = cacheConfig.getMaxInFlightSize().toBytes();
this.pagesSerde = new PagesSerdeFactory(blockEncodingSerde, cacheConfig.isBlockEncodingCompressionEnabled()).createPagesSerde();
this.maxSinglePagesBytes = cacheConfig.getMaxSinglePagesSize().toBytes();
// pagesSerde is not thread safe
this.pagesSerdeFactory = new PagesSerdeFactory(blockEncodingSerde, cacheConfig.isBlockEncodingCompressionEnabled());
this.fragmentCacheStats = requireNonNull(fragmentCacheStats, "fragmentCacheStats is null");
this.flushExecutor = requireNonNull(flushExecutor, "flushExecutor is null");
this.removalExecutor = requireNonNull(removalExecutor, "removalExecutor is null");
Expand Down Expand Up @@ -130,13 +132,13 @@ public Future<?> put(String serializedPlan, Split split, List<Page> result)
{
CacheKey key = new CacheKey(serializedPlan, split.getSplitIdentifier());
long resultSize = getPagesSize(result);
if (fragmentCacheStats.getInFlightBytes() + resultSize > maxInFlightBytes || cache.getIfPresent(key) != null) {
if (fragmentCacheStats.getInFlightBytes() + resultSize > maxInFlightBytes || cache.getIfPresent(key) != null || resultSize > maxSinglePagesBytes) {
return immediateFuture(null);
}

fragmentCacheStats.addInFlightBytes(resultSize);
Path path = baseDirectory.resolve(randomUUID().toString().replaceAll("-", "_"));
return flushExecutor.submit(() -> cachePages(key, path, result));
return flushExecutor.submit(() -> cachePages(key, path, result, resultSize));
}

private static long getPagesSize(List<Page> pages)
Expand All @@ -146,13 +148,13 @@ private static long getPagesSize(List<Page> pages)
.sum();
}

private void cachePages(CacheKey key, Path path, List<Page> pages)
private void cachePages(CacheKey key, Path path, List<Page> pages, long resultSize)
{
// TODO: To support both memory and disk limit, we should check cache size before putting to cache and use written bytes as weight for cache
try {
Files.createFile(path);
try (SliceOutput output = new OutputStreamSliceOutput(newOutputStream(path, APPEND))) {
writePages(pagesSerde, output, pages.iterator());
writePages(pagesSerdeFactory.createPagesSerde(), output, pages.iterator());
cache.put(key, path);
fragmentCacheStats.incrementCacheEntries();
}
Expand All @@ -166,7 +168,7 @@ private void cachePages(CacheKey key, Path path, List<Page> pages)
tryDeleteFile(path);
}
finally {
fragmentCacheStats.addInFlightBytes(-getPagesSize(pages));
fragmentCacheStats.addInFlightBytes(-resultSize);
}
}

Expand Down Expand Up @@ -195,11 +197,12 @@ public Optional<Iterator<Page>> get(String serializedPlan, Split split)

try {
InputStream inputStream = newInputStream(path);
Iterator<Page> result = readPages(pagesSerde, new InputStreamSliceInput(inputStream));
Iterator<Page> result = readPages(pagesSerdeFactory.createPagesSerde(), new InputStreamSliceInput(inputStream));
fragmentCacheStats.incrementCacheHit();
return Optional.of(closeWhenExhausted(result, inputStream));
}
catch (UncheckedIOException | IOException e) {
log.error(e, "read path %s error", path);
Copy link
Copy Markdown
Contributor

@pettyjamesm pettyjamesm Aug 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don’t want to catch Throwable here- since “should be fatal” members of the Error subheirarchy like OutOfMemoryError are not what you want to catch here. Seems like catching Exception would be sufficient instead.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes,I lack consideration.I hava fixed

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider only extending it to RuntimeException? I don't think we should cover checked exception here.

Also, does it make sense to catch this separately from IOException and UncheckedIOException? There are legit reasons for them to happen, but maybe not for other exceptions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you.

// there might be a chance the file has been deleted. We would return cache miss in this case.
fragmentCacheStats.incrementCacheMiss();
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.concurrent.TimeUnit.DAYS;

public class TestFileFragmentResultCacheConfig
Expand All @@ -38,7 +39,8 @@ public void testDefaults()
.setBlockEncodingCompressionEnabled(false)
.setMaxCachedEntries(10_000)
.setCacheTtl(new Duration(2, DAYS))
.setMaxInFlightSize(new DataSize(1, GIGABYTE)));
.setMaxInFlightSize(new DataSize(1, GIGABYTE))
.setMaxSinglePagesSize(new DataSize(500, MEGABYTE)));
}

@Test
Expand All @@ -52,6 +54,7 @@ public void testExplicitPropertyMappings()
.put("fragment-result-cache.max-cached-entries", "100000")
.put("fragment-result-cache.cache-ttl", "1d")
.put("fragment-result-cache.max-in-flight-size", "2GB")
.put("fragment-result-cache.max-single-pages-size", "200MB")
.build();

FileFragmentResultCacheConfig expected = new FileFragmentResultCacheConfig()
Expand All @@ -60,7 +63,8 @@ public void testExplicitPropertyMappings()
.setBlockEncodingCompressionEnabled(true)
.setMaxCachedEntries(100000)
.setCacheTtl(new Duration(1, DAYS))
.setMaxInFlightSize(new DataSize(2, GIGABYTE));
.setMaxInFlightSize(new DataSize(2, GIGABYTE))
.setMaxSinglePagesSize(new DataSize(200, MEGABYTE));

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.block.BlockAssertions.createStringsBlock;
Expand All @@ -56,6 +58,7 @@ public class TestFileFragmentResultCacheManager

private final ExecutorService writeExecutor = newScheduledThreadPool(5, daemonThreadsNamed("test-cache-flusher-%s"));
private final ExecutorService removalExecutor = newScheduledThreadPool(5, daemonThreadsNamed("test-cache-remover-%s"));
private final ExecutorService multithreadingWriteExecutor = newScheduledThreadPool(10, daemonThreadsNamed("test-cache-multithreading-flusher-%s"));

private URI cacheDirectory;

Expand All @@ -72,6 +75,7 @@ public void close()
{
writeExecutor.shutdown();
removalExecutor.shutdown();
multithreadingWriteExecutor.shutdown();

checkState(cacheDirectory != null);
File[] files = new File(cacheDirectory).listFiles();
Expand Down Expand Up @@ -147,6 +151,35 @@ private static void assertPagesEqual(Iterator<Page> pages1, Iterator<Page> pages
assertFalse(pages2.hasNext());
}

@Test(timeOut = 30_000)
public void testThreadWrite() throws Exception
{
String writeThreadNameFormat = "test write content,thread %s,%s";
FragmentCacheStats stats = new FragmentCacheStats();
FileFragmentResultCacheManager threadWriteCacheManager = fileFragmentResultCacheManager(stats);
ImmutableList.Builder<Future<Boolean>> futures = ImmutableList.builder();
for (int i = 0; i < 10; i++) {
Future<Boolean> future = multithreadingWriteExecutor.submit(() -> {
try {
String threadInfo = String.format(writeThreadNameFormat, Thread.currentThread().getName(), Thread.currentThread().getId());
List<Page> pages = ImmutableList.of(new Page(createStringsBlock(threadInfo)));
threadWriteCacheManager.put(threadInfo, SPLIT_2, pages).get();
Optional<Iterator<Page>> result = threadWriteCacheManager.get(threadInfo, SPLIT_2);
assertTrue(result.isPresent());
assertPagesEqual(result.get(), pages.iterator());
return true;
}
catch (Exception e) {
return false;
}
});
futures.add(future);
}
for (Future<Boolean> future : futures.build()) {
assertTrue(future.get(30, TimeUnit.SECONDS));
}
}

private FileFragmentResultCacheManager fileFragmentResultCacheManager(FragmentCacheStats fragmentCacheStats)
{
FileFragmentResultCacheConfig cacheConfig = new FileFragmentResultCacheConfig();
Expand Down