From aaaf2d0f7b21639a8a101a5e6f68d6ba9502ff7c Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Wed, 2 Oct 2024 23:01:18 +0200 Subject: [PATCH 1/5] Use dash as a separator for storage object name --- .../spooling/filesystem/FileSystemSpooledSegmentHandle.java | 2 +- .../spooling/filesystem/TestFileSystemSpooledSegmentHandle.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java index c3c47374c02ec..d86bbad502f41 100644 --- a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java @@ -35,7 +35,7 @@ public record FileSystemSpooledSegmentHandle( Optional encryptionKey) implements SpooledSegmentHandle { - private static final String OBJECT_NAME_SEPARATOR = "::"; + private static final String OBJECT_NAME_SEPARATOR = "-"; public FileSystemSpooledSegmentHandle { diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpooledSegmentHandle.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpooledSegmentHandle.java index b4ae0dfa3cf6b..594724f0086f1 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpooledSegmentHandle.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpooledSegmentHandle.java @@ -39,7 +39,7 @@ public void testStorageObjectNameStability() Instant expireAt = Instant.ofEpochMilli(90000); FileSystemSpooledSegmentHandle handle = FileSystemSpooledSegmentHandle.random(new NotARandomAtAll(), context, expireAt); assertThat(handle.storageObjectName()) - .isEqualTo("0000002QWG0G2081040G208104::query_id"); + .isEqualTo("0000002QWG0G2081040G208104-query_id"); } @Test From 361cec49949c1a36d041934e5e838614f672b4c9 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Wed, 2 Oct 2024 23:01:58 +0200 Subject: [PATCH 2/5] Migrate segment pruner tests to memory file system --- .../TestFileSystemSegmentPruner.java | 99 +++++-------------- 1 file changed, 26 insertions(+), 73 deletions(-) diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java index 5c5ce908aa81f..b91a7d7782683 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java @@ -14,21 +14,13 @@ package io.trino.spooling.filesystem; import com.google.common.collect.ImmutableList; -import io.airlift.units.DataSize; import io.trino.filesystem.FileEntry; import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; -import io.trino.filesystem.TrinoFileSystemFactory; -import io.trino.filesystem.s3.S3FileSystemConfig; -import io.trino.filesystem.s3.S3FileSystemFactory; -import io.trino.filesystem.s3.S3FileSystemStats; +import io.trino.filesystem.memory.MemoryFileSystem; import io.trino.spi.QueryId; import io.trino.spi.protocol.SpoolingContext; -import io.trino.spi.security.ConnectorIdentity; -import io.trino.testing.containers.Minio; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -37,13 +29,10 @@ import java.io.UncheckedIOException; import java.time.Instant; import java.util.List; -import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; -import static io.opentelemetry.api.OpenTelemetry.noop; -import static io.trino.testing.containers.Minio.MINIO_REGION; import static java.nio.charset.StandardCharsets.UTF_8; import static java.time.temporal.ChronoUnit.MILLIS; import static org.assertj.core.api.Assertions.assertThat; @@ -52,40 +41,27 @@ @TestInstance(PER_CLASS) class TestFileSystemSegmentPruner { - private Minio minio; - private static final String BUCKET_NAME = "segments" + UUID.randomUUID().toString().replace("-", ""); - private static final Location LOCATION = Location.of("s3://" + BUCKET_NAME + "/"); + private static final String TEST_LOCATION = "memory://"; - @BeforeAll - public void setup() - { - minio = Minio.builder().build(); - minio.start(); - minio.createBucket(BUCKET_NAME); - } - - @AfterAll - public void teardown() - { - minio.stop(); - } + private static final FileSystemSpoolingConfig SPOOLING_CONFIG = new FileSystemSpoolingConfig() + .setLocation(TEST_LOCATION); @Test public void shouldPruneExpiredSegments() { + MemoryFileSystem fileSystem = new MemoryFileSystem(); try (ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor()) { - TrinoFileSystemFactory fileSystemFactory = getFileSystemFactory(); - FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(getSpoolingConfig(), fileSystemFactory, executorService); + FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> fileSystem, executorService); Instant now = Instant.now(); QueryId queryId = QueryId.valueOf("prune_expired"); - Location _ = writeNewDummySegment(fileSystemFactory, queryId, now.minusSeconds(1)); - Location nonExpiredSegment = writeNewDummySegment(fileSystemFactory, queryId, now.plusSeconds(1)); + Location _ = createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); + Location nonExpiredSegment = createNewDummySegment(fileSystem, queryId, now.plusSeconds(1)); pruner.pruneExpiredBefore(now.truncatedTo(MILLIS)); - List files = listFiles(fileSystemFactory, queryId); + List files = listFiles(fileSystem, queryId); assertThat(files) .hasSize(1) .containsOnly(nonExpiredSegment); @@ -95,20 +71,20 @@ public void shouldPruneExpiredSegments() @Test public void shouldNotPruneLiveSegments() { + MemoryFileSystem fileSystem = new MemoryFileSystem(); try (ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor()) { - TrinoFileSystemFactory fileSystemFactory = getFileSystemFactory(); - FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(getSpoolingConfig(), fileSystemFactory, executorService); + FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> fileSystem, executorService); Instant now = Instant.now(); QueryId queryId = QueryId.valueOf("prune_live"); - Location _ = writeNewDummySegment(fileSystemFactory, queryId, now.plusSeconds(1)); - Location _ = writeNewDummySegment(fileSystemFactory, queryId, now.plusSeconds(2)); + Location _ = createNewDummySegment(fileSystem, queryId, now.plusSeconds(1)); + Location _ = createNewDummySegment(fileSystem, queryId, now.plusSeconds(2)); pruner.pruneExpiredBefore(now.truncatedTo(MILLIS)); - List files = listFiles(fileSystemFactory, queryId); + List files = listFiles(fileSystem, queryId); assertThat(files) .hasSize(2); } @@ -117,57 +93,46 @@ public void shouldNotPruneLiveSegments() @Test public void shouldNotPruneSegmentsIfNotStrictlyBeforeExpiration() { + TrinoFileSystem memoryFileSystem = new MemoryFileSystem(); try (ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor()) { - TrinoFileSystemFactory fileSystemFactory = getFileSystemFactory(); - FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(getSpoolingConfig(), fileSystemFactory, executorService); + FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> memoryFileSystem, executorService); Instant now = Instant.now(); QueryId queryId = QueryId.valueOf("prune_now"); - Location firstSegment = writeNewDummySegment(fileSystemFactory, queryId, now); - Location secondSegment = writeNewDummySegment(fileSystemFactory, queryId, now); + Location firstSegment = createNewDummySegment(memoryFileSystem, queryId, now); + Location secondSegment = createNewDummySegment(memoryFileSystem, queryId, now); pruner.pruneExpiredBefore(now.truncatedTo(MILLIS)); - List files = listFiles(fileSystemFactory, queryId); + List files = listFiles(memoryFileSystem, queryId); assertThat(files) .hasSize(2) .containsOnly(firstSegment, secondSegment); } } - private TrinoFileSystemFactory getFileSystemFactory() - { - S3FileSystemConfig filesystemConfig = new S3FileSystemConfig() - .setEndpoint(minio.getMinioAddress()) - .setRegion(MINIO_REGION) - .setPathStyleAccess(true) - .setAwsAccessKey(Minio.MINIO_ACCESS_KEY) - .setAwsSecretKey(Minio.MINIO_SECRET_KEY) - .setStreamingPartSize(DataSize.valueOf("5.5MB")); - return new S3FileSystemFactory(noop(), filesystemConfig, new S3FileSystemStats()); - } - - private Location writeNewDummySegment(TrinoFileSystemFactory fileSystemFactory, QueryId queryId, Instant ttl) + private Location createNewDummySegment(TrinoFileSystem fileSystem, QueryId queryId, Instant ttl) { SpoolingContext context = new SpoolingContext("encoding", queryId, 100, 1000); FileSystemSpooledSegmentHandle handle = FileSystemSpooledSegmentHandle.random(ThreadLocalRandom.current(), context, ttl); - try (OutputStream segment = createFileSystem(fileSystemFactory).newOutputFile(LOCATION.appendPath(handle.storageObjectName())).create()) { - segment.write("dummy".getBytes(UTF_8)); - return LOCATION.appendPath(handle.storageObjectName()); + Location location = Location.of(TEST_LOCATION).appendPath(handle.storageObjectName()); + try (OutputStream stream = fileSystem.newOutputFile(location).create()) { + stream.write("dummy".getBytes(UTF_8)); + return location; } catch (IOException e) { throw new UncheckedIOException(e); } } - private List listFiles(TrinoFileSystemFactory fileSystemFactory, QueryId queryId) + private List listFiles(TrinoFileSystem fileSystem, QueryId queryId) { ImmutableList.Builder files = ImmutableList.builder(); try { - FileIterator iterator = createFileSystem(fileSystemFactory).listFiles(LOCATION); + FileIterator iterator = fileSystem.listFiles(Location.of(TEST_LOCATION)); while (iterator.hasNext()) { FileEntry entry = iterator.next(); if (entry.location().fileName().endsWith(queryId.toString())) { @@ -180,16 +145,4 @@ private List listFiles(TrinoFileSystemFactory fileSystemFactory, Query throw new UncheckedIOException(e); } } - - private TrinoFileSystem createFileSystem(TrinoFileSystemFactory fileSystemFactory) - { - return fileSystemFactory.create(ConnectorIdentity.ofUser("ignored")); - } - - private FileSystemSpoolingConfig getSpoolingConfig() - { - return new FileSystemSpoolingConfig() - .setS3Enabled(true) - .setLocation(LOCATION.toString()); - } } From 3876752a327adbfa0f4053e023e8c39ed1df0718 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Wed, 2 Oct 2024 23:09:48 +0200 Subject: [PATCH 3/5] Clear pruned segments after pruning Previously list was ever growing --- .../filesystem/FileSystemSegmentPruner.java | 12 ++++++-- .../TestFileSystemSegmentPruner.java | 29 ++++++++++++++++++- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSegmentPruner.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSegmentPruner.java index 99978c2a0a6c2..3595bc9ee5bea 100644 --- a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSegmentPruner.java +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSegmentPruner.java @@ -90,11 +90,13 @@ private void prune() } @VisibleForTesting - void pruneExpiredBefore(Instant expiredBefore) + long pruneExpiredBefore(Instant expiredBefore) { if (closed) { - return; + return 0; } + long pruned = 0; + try { List expiredSegments = new ArrayList<>(); FileIterator iterator = orderDetectingIterator(fileSystem.listFiles(location)); @@ -109,20 +111,24 @@ void pruneExpiredBefore(Instant expiredBefore) if (handle.get().isBefore(expiredBefore)) { expiredSegments.add(file.location()); if (expiredSegments.size() >= batchSize) { + pruned += expiredSegments.size(); pruneExpiredSegments(expiredBefore, expiredSegments); + expiredSegments.clear(); } } else if (filesAreOrdered) { // First non expired segment was found, no need to check the rest // since we know that files are lexicographically ordered. pruneExpiredSegments(expiredBefore, expiredSegments); - return; + return pruned + expiredSegments.size(); } } pruneExpiredSegments(expiredBefore, expiredSegments); + return pruned + expiredSegments.size(); } catch (IOException e) { log.error(e, "Failed to prune segments"); + return pruned; } } diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java index b91a7d7782683..b3e4038fb8f3a 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java @@ -44,7 +44,8 @@ class TestFileSystemSegmentPruner private static final String TEST_LOCATION = "memory://"; private static final FileSystemSpoolingConfig SPOOLING_CONFIG = new FileSystemSpoolingConfig() - .setLocation(TEST_LOCATION); + .setLocation(TEST_LOCATION) + .setPruningBatchSize(1); @Test public void shouldPruneExpiredSegments() @@ -68,6 +69,32 @@ public void shouldPruneExpiredSegments() } } + @Test + public void shouldPruneExpiredSegmentsOnceAndClear() + { + MemoryFileSystem fileSystem = new MemoryFileSystem(); + try (ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor()) { + FileSystemSegmentPruner pruner = new FileSystemSegmentPruner(SPOOLING_CONFIG, _ -> fileSystem, executorService); + + Instant now = Instant.now(); + QueryId queryId = QueryId.valueOf("prune_expired"); + + Location _ = createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); + Location _ = createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); + Location _ = createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); + + Location nonExpiredSegment = createNewDummySegment(fileSystem, queryId, now.plusSeconds(1)); + + assertThat(pruner.pruneExpiredBefore(now.truncatedTo(MILLIS))) + .isEqualTo(3); + + List files = listFiles(fileSystem, queryId); + assertThat(files) + .hasSize(1) + .containsOnly(nonExpiredSegment); + } + } + @Test public void shouldNotPruneLiveSegments() { From 283028dad9c5a6faff6b7da4661aed0e8e45b407 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 3 Oct 2024 12:19:19 +0200 Subject: [PATCH 4/5] Remove redundant assignments --- .../filesystem/TestFileSystemSegmentPruner.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java index b3e4038fb8f3a..95c0f669b1a0f 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java @@ -57,7 +57,7 @@ public void shouldPruneExpiredSegments() Instant now = Instant.now(); QueryId queryId = QueryId.valueOf("prune_expired"); - Location _ = createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); + createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); Location nonExpiredSegment = createNewDummySegment(fileSystem, queryId, now.plusSeconds(1)); pruner.pruneExpiredBefore(now.truncatedTo(MILLIS)); @@ -79,9 +79,9 @@ public void shouldPruneExpiredSegmentsOnceAndClear() Instant now = Instant.now(); QueryId queryId = QueryId.valueOf("prune_expired"); - Location _ = createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); - Location _ = createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); - Location _ = createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); + createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); + createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); + createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); Location nonExpiredSegment = createNewDummySegment(fileSystem, queryId, now.plusSeconds(1)); @@ -106,8 +106,8 @@ public void shouldNotPruneLiveSegments() QueryId queryId = QueryId.valueOf("prune_live"); - Location _ = createNewDummySegment(fileSystem, queryId, now.plusSeconds(1)); - Location _ = createNewDummySegment(fileSystem, queryId, now.plusSeconds(2)); + createNewDummySegment(fileSystem, queryId, now.plusSeconds(1)); + createNewDummySegment(fileSystem, queryId, now.plusSeconds(2)); pruner.pruneExpiredBefore(now.truncatedTo(MILLIS)); From 1720e78f401e97b65146eafcb2cb6e34703c5cde Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 3 Oct 2024 12:19:52 +0200 Subject: [PATCH 5/5] Rename method to more meaningful one --- .../TestFileSystemSegmentPruner.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java index 95c0f669b1a0f..7b92b07d430b9 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSegmentPruner.java @@ -57,8 +57,8 @@ public void shouldPruneExpiredSegments() Instant now = Instant.now(); QueryId queryId = QueryId.valueOf("prune_expired"); - createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); - Location nonExpiredSegment = createNewDummySegment(fileSystem, queryId, now.plusSeconds(1)); + writeDataSegment(fileSystem, queryId, now.minusSeconds(1)); + Location nonExpiredSegment = writeDataSegment(fileSystem, queryId, now.plusSeconds(1)); pruner.pruneExpiredBefore(now.truncatedTo(MILLIS)); @@ -79,11 +79,11 @@ public void shouldPruneExpiredSegmentsOnceAndClear() Instant now = Instant.now(); QueryId queryId = QueryId.valueOf("prune_expired"); - createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); - createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); - createNewDummySegment(fileSystem, queryId, now.minusSeconds(1)); + writeDataSegment(fileSystem, queryId, now.minusSeconds(1)); + writeDataSegment(fileSystem, queryId, now.minusSeconds(1)); + writeDataSegment(fileSystem, queryId, now.minusSeconds(1)); - Location nonExpiredSegment = createNewDummySegment(fileSystem, queryId, now.plusSeconds(1)); + Location nonExpiredSegment = writeDataSegment(fileSystem, queryId, now.plusSeconds(1)); assertThat(pruner.pruneExpiredBefore(now.truncatedTo(MILLIS))) .isEqualTo(3); @@ -106,8 +106,8 @@ public void shouldNotPruneLiveSegments() QueryId queryId = QueryId.valueOf("prune_live"); - createNewDummySegment(fileSystem, queryId, now.plusSeconds(1)); - createNewDummySegment(fileSystem, queryId, now.plusSeconds(2)); + writeDataSegment(fileSystem, queryId, now.plusSeconds(1)); + writeDataSegment(fileSystem, queryId, now.plusSeconds(2)); pruner.pruneExpiredBefore(now.truncatedTo(MILLIS)); @@ -128,8 +128,8 @@ public void shouldNotPruneSegmentsIfNotStrictlyBeforeExpiration() QueryId queryId = QueryId.valueOf("prune_now"); - Location firstSegment = createNewDummySegment(memoryFileSystem, queryId, now); - Location secondSegment = createNewDummySegment(memoryFileSystem, queryId, now); + Location firstSegment = writeDataSegment(memoryFileSystem, queryId, now); + Location secondSegment = writeDataSegment(memoryFileSystem, queryId, now); pruner.pruneExpiredBefore(now.truncatedTo(MILLIS)); @@ -140,7 +140,7 @@ public void shouldNotPruneSegmentsIfNotStrictlyBeforeExpiration() } } - private Location createNewDummySegment(TrinoFileSystem fileSystem, QueryId queryId, Instant ttl) + private Location writeDataSegment(TrinoFileSystem fileSystem, QueryId queryId, Instant ttl) { SpoolingContext context = new SpoolingContext("encoding", queryId, 100, 1000); FileSystemSpooledSegmentHandle handle = FileSystemSpooledSegmentHandle.random(ThreadLocalRandom.current(), context, ttl);