From d141fb516a8e16baab580cb7721f39ea81c4e861 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Tue, 12 Aug 2025 12:15:10 +0200 Subject: [PATCH 1/3] Add node identifier to spooled segments This prevents collisions when spooling on large number of nodes --- .../spooling/SpoolingManagerRegistry.java | 12 +++- .../java/io/trino/testing/PlanTester.java | 9 ++- .../spi/spool/SpoolingManagerContext.java | 6 ++ .../FileSystemSpooledSegmentHandle.java | 9 ++- .../filesystem/FileSystemSpoolingManager.java | 32 ++++++---- .../FileSystemSpoolingManagerFactory.java | 2 + .../filesystem/SimpleFileSystemLayout.java | 22 +++++-- .../AbstractFileSystemSegmentPrunerTest.java | 2 +- ...AbstractFileSystemSpoolingManagerTest.java | 2 +- .../TestFileSystemSpooledSegmentHandle.java | 8 +-- ...stFileSystemSpoolingManagerLocalStack.java | 2 +- .../TestFileSystemSpoolingManagerMinio.java | 2 +- .../TestPartitionedFileSystemLayout.java | 6 +- .../TestSimpleFileSystemLayout.java | 6 +- .../spooling/filesystem/TestingNode.java | 60 +++++++++++++++++++ 15 files changed, 146 insertions(+), 34 deletions(-) create mode 100644 plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestingNode.java diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingManagerRegistry.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingManagerRegistry.java index 85fc6381d8c1..9cc01f52f152 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingManagerRegistry.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/SpoolingManagerRegistry.java @@ -19,7 +19,9 @@ import io.airlift.log.Logger; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Tracer; +import io.trino.node.InternalNode; import io.trino.server.ServerConfig; +import io.trino.spi.Node; import io.trino.spi.classloader.ThreadContextClassLoader; import io.trino.spi.spool.SpoolingManager; import io.trino.spi.spool.SpoolingManagerContext; @@ -54,11 +56,13 @@ public class SpoolingManagerRegistry private final boolean coordinator; private final OpenTelemetry openTelemetry; private final Tracer tracer; + private final Node currentNode; private volatile SpoolingManager spoolingManager; @Inject - public SpoolingManagerRegistry(ServerConfig serverConfig, SpoolingEnabledConfig config, OpenTelemetry openTelemetry, Tracer tracer) + public SpoolingManagerRegistry(InternalNode currentNode, ServerConfig serverConfig, SpoolingEnabledConfig config, OpenTelemetry openTelemetry, Tracer tracer) { + this.currentNode = requireNonNull(currentNode, "currentNode is null"); this.enabled = config.isEnabled(); this.coordinator = serverConfig.isCoordinator(); this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null"); @@ -126,6 +130,12 @@ public boolean isCoordinator() { return coordinator; } + + @Override + public Node getCurrentNode() + { + return currentNode; + } }; SpoolingManager spoolingManager; diff --git a/core/trino-main/src/main/java/io/trino/testing/PlanTester.java b/core/trino-main/src/main/java/io/trino/testing/PlanTester.java index 57cbbc0b8a6a..d06c19a181c1 100644 --- a/core/trino-main/src/main/java/io/trino/testing/PlanTester.java +++ b/core/trino-main/src/main/java/io/trino/testing/PlanTester.java @@ -106,6 +106,7 @@ import io.trino.metadata.TablePropertyManager; import io.trino.metadata.TypeRegistry; import io.trino.metadata.ViewPropertyManager; +import io.trino.node.InternalNode; import io.trino.node.InternalNodeManager; import io.trino.node.TestingInternalNodeManager; import io.trino.operator.Driver; @@ -213,6 +214,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; +import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -473,7 +475,12 @@ private PlanTester(Session defaultSession, int nodeCountForStats) ImmutableSet.of(new ExcludeColumnsFunction())); exchangeManagerRegistry = new ExchangeManagerRegistry(noop(), noopTracer(), secretsResolver); - spoolingManagerRegistry = new SpoolingManagerRegistry(new ServerConfig(), new SpoolingEnabledConfig(), noop(), noopTracer()); + spoolingManagerRegistry = new SpoolingManagerRegistry( + new InternalNode("nodeId", URI.create("http://localhost:8080"), NodeVersion.UNKNOWN, false), + new ServerConfig(), + new SpoolingEnabledConfig(), + noop(), + noopTracer()); this.pluginManager = new PluginManager( (loader, createClassLoader) -> {}, Optional.empty(), diff --git a/core/trino-spi/src/main/java/io/trino/spi/spool/SpoolingManagerContext.java b/core/trino-spi/src/main/java/io/trino/spi/spool/SpoolingManagerContext.java index 715891db469d..3cf039ea640e 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/spool/SpoolingManagerContext.java +++ b/core/trino-spi/src/main/java/io/trino/spi/spool/SpoolingManagerContext.java @@ -15,6 +15,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Tracer; +import io.trino.spi.Node; public interface SpoolingManagerContext { @@ -32,4 +33,9 @@ default boolean isCoordinator() { throw new UnsupportedOperationException(); } + + default Node getCurrentNode() + { + throw new UnsupportedOperationException(); + } } diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java index ab3348272435..4fb8ea9bada5 100644 --- a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java @@ -29,25 +29,28 @@ public record FileSystemSpooledSegmentHandle( @Override String encoding, byte[] uuid, + String nodeIdentifier, Optional encryptionKey) implements SpooledSegmentHandle { public FileSystemSpooledSegmentHandle { requireNonNull(encryptionKey, "encryptionKey is null"); + requireNonNull(nodeIdentifier, "nodeIdentifier is null"); verify(uuid.length == 16, "uuid must be 128 bits"); } - public static FileSystemSpooledSegmentHandle random(Random random, SpoolingContext context, Instant expireAt) + public static FileSystemSpooledSegmentHandle random(Random random, String nodeIdentifier, SpoolingContext context, Instant expireAt) { - return random(random, context, expireAt, Optional.empty()); + return random(random, nodeIdentifier, context, expireAt, Optional.empty()); } - public static FileSystemSpooledSegmentHandle random(Random random, SpoolingContext context, Instant expireAt, Optional encryptionKey) + public static FileSystemSpooledSegmentHandle random(Random random, String nodeIdentifier, SpoolingContext context, Instant expireAt, Optional encryptionKey) { return new FileSystemSpooledSegmentHandle( context.encoding(), ULID.generateBinary(expireAt.toEpochMilli(), entropy(random)), + nodeIdentifier, encryptionKey); } diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManager.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManager.java index 5ebcfd35fd2e..b580c2450b4c 100644 --- a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManager.java +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManager.java @@ -26,6 +26,7 @@ import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoOutputFile; import io.trino.filesystem.encryption.EncryptionKey; +import io.trino.spi.Node; import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.spool.SpooledLocation; import io.trino.spi.spool.SpooledLocation.DirectLocation; @@ -38,12 +39,12 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.security.SecureRandom; import java.time.Instant; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; import static io.airlift.slice.SizeOf.SIZE_OF_BYTE; import static io.airlift.slice.SizeOf.SIZE_OF_SHORT; @@ -62,25 +63,29 @@ public class FileSystemSpoolingManager private final EncryptionHeadersTranslator encryptionHeadersTranslator; private final TrinoFileSystem fileSystem; private final FileSystemLayout fileSystemLayout; + private final String nodeIdentifier; private final Duration ttl; private final Duration directAccessTtl; private final boolean encryptionEnabled; private final boolean explicitAckEnabled; - private final Random random = ThreadLocalRandom.current(); + + private final Random random; @Inject - public FileSystemSpoolingManager(FileSystemSpoolingConfig config, TrinoFileSystemFactory fileSystemFactory, FileSystemLayout fileSystemLayout) + public FileSystemSpoolingManager(FileSystemSpoolingConfig config, TrinoFileSystemFactory fileSystemFactory, FileSystemLayout fileSystemLayout, Node currentNode) { requireNonNull(config, "config is null"); this.location = Location.of(config.getLocation()); this.fileSystem = requireNonNull(fileSystemFactory, "fileSystemFactory is null") .create(ConnectorIdentity.ofUser("ignored")); this.fileSystemLayout = requireNonNull(fileSystemLayout, "fileSystemLayout is null"); + this.nodeIdentifier = requireNonNull(currentNode, "currentNode is null").getNodeIdentifier(); this.encryptionHeadersTranslator = encryptionHeadersTranslator(location); this.ttl = config.getTtl(); this.directAccessTtl = config.getDirectAccessTtl(); this.encryptionEnabled = config.isEncryptionEnabled(); this.explicitAckEnabled = config.isExplicitAckEnabled(); + this.random = new SecureRandom(nodeIdentifier.getBytes(UTF_8)); } @Override @@ -107,9 +112,9 @@ public FileSystemSpooledSegmentHandle create(SpoolingContext context) { Instant expireAt = Instant.now().plusMillis(ttl.toMillis()); if (encryptionEnabled) { - return FileSystemSpooledSegmentHandle.random(random, context, expireAt, Optional.of(randomAes256())); + return FileSystemSpooledSegmentHandle.random(random, nodeIdentifier, context, expireAt, Optional.of(randomAes256())); } - return FileSystemSpooledSegmentHandle.random(random, context, expireAt); + return FileSystemSpooledSegmentHandle.random(random, nodeIdentifier, context, expireAt); } @Override @@ -184,15 +189,19 @@ private static Slice serialize(FileSystemSpooledSegmentHandle fileHandle) // ulid: byte[16] // encodingLength: short // encoding: byte[encodingLength] + // nodeIdentifierLength: short + // nodeIdentifier: byte[nodeIdentifierLength] // isEncrypted: boolean - byte[] encoding = fileHandle.encoding().getBytes(UTF_8); - Slice slice = Slices.allocate(16 + SIZE_OF_SHORT + encoding.length + SIZE_OF_BYTE); + byte[] nodeIdentifier = fileHandle.nodeIdentifier().getBytes(UTF_8); + Slice slice = Slices.allocate(16 + 2 * SIZE_OF_SHORT + encoding.length + nodeIdentifier.length + SIZE_OF_BYTE); SliceOutput output = slice.getOutput(); output.writeBytes(fileHandle.uuid()); - output.writeShort(fileHandle.encoding().length()); + output.writeShort(encoding.length); output.writeBytes(encoding); + output.writeShort(nodeIdentifier.length); + output.writeBytes(nodeIdentifier); output.writeBoolean(fileHandle.encryptionKey().isPresent()); return output.slice(); } @@ -213,10 +222,13 @@ public SpooledSegmentHandle handle(Slice identifier, Map> h short encodingLength = input.readShort(); String encoding = input.readSlice(encodingLength).toStringUtf8(); + short nodeIdentifierLength = input.readShort(); + String nodeIdentifier = input.readSlice(nodeIdentifierLength).toStringUtf8(); + if (!input.readBoolean()) { - return new FileSystemSpooledSegmentHandle(encoding, uuid, Optional.empty()); + return new FileSystemSpooledSegmentHandle(encoding, uuid, nodeIdentifier, Optional.empty()); } - return new FileSystemSpooledSegmentHandle(encoding, uuid, Optional.of(encryptionHeadersTranslator.extractKey(headers))); + return new FileSystemSpooledSegmentHandle(encoding, uuid, nodeIdentifier, Optional.of(encryptionHeadersTranslator.extractKey(headers))); } private Duration remainingTtl(Instant expiresAt, Duration accessTtl) diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManagerFactory.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManagerFactory.java index 9d3290fdcb13..f83a3544561b 100644 --- a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManagerFactory.java +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManagerFactory.java @@ -17,6 +17,7 @@ import io.airlift.bootstrap.Bootstrap; import io.opentelemetry.api.OpenTelemetry; import io.trino.plugin.base.jmx.MBeanServerModule; +import io.trino.spi.Node; import io.trino.spi.spool.SpoolingManager; import io.trino.spi.spool.SpoolingManagerContext; import io.trino.spi.spool.SpoolingManagerFactory; @@ -46,6 +47,7 @@ public SpoolingManager create(Map config, SpoolingManagerContext binder -> { binder.bind(SpoolingManagerContext.class).toInstance(context); binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry()); + binder.bind(Node.class).toInstance(context.getCurrentNode()); }); Injector injector = app diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/SimpleFileSystemLayout.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/SimpleFileSystemLayout.java index 4578e82fd9ea..ad7c8035df19 100644 --- a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/SimpleFileSystemLayout.java +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/SimpleFileSystemLayout.java @@ -13,6 +13,7 @@ */ package io.trino.spooling.filesystem; +import com.google.common.hash.HashFunction; import io.azam.ulidj.ULID; import io.trino.filesystem.Location; @@ -20,13 +21,20 @@ import java.util.List; import java.util.Optional; +import static com.google.common.hash.Hashing.farmHashFingerprint64; +import static io.azam.ulidj.ULID.ULID_LENGTH; +import static java.util.Locale.ENGLISH; + public class SimpleFileSystemLayout implements FileSystemLayout { + // Node identifier is hashed to avoid both long file names and leaking information about the node + private static final HashFunction HASH = farmHashFingerprint64(); + @Override public Location location(Location rootLocation, FileSystemSpooledSegmentHandle segmentHandle) { - return rootLocation.appendPath(segmentHandle.identifier() + "." + segmentHandle.encoding()); + return rootLocation.appendPath(segmentHandle.identifier() + hashNodeIdentifier(segmentHandle.nodeIdentifier()) + '.' + segmentHandle.encoding()); } @Override @@ -39,15 +47,19 @@ public List searchPaths(Location rootLocation) public Optional getExpiration(Location location) { String filename = location.fileName(); - int index = filename.indexOf("."); - if (index == -1) { - return Optional.empty(); // Not a segment + if (filename.length() < ULID_LENGTH) { + return Optional.empty(); // Definitely not a segment } - String uuid = filename.substring(0, index); + String uuid = filename.substring(0, ULID_LENGTH); if (!ULID.isValid(uuid)) { return Optional.empty(); } return Optional.of(Instant.ofEpochMilli(ULID.getTimestamp(uuid))); } + + private static String hashNodeIdentifier(String nodeIdentifier) + { + return HASH.hashUnencodedChars(nodeIdentifier).toString().toUpperCase(ENGLISH); + } } diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/AbstractFileSystemSegmentPrunerTest.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/AbstractFileSystemSegmentPrunerTest.java index cfe036d353fd..e45c61eca8f9 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/AbstractFileSystemSegmentPrunerTest.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/AbstractFileSystemSegmentPrunerTest.java @@ -146,7 +146,7 @@ public void shouldNotPruneSegmentsIfNotStrictlyBeforeExpiration() private Location writeDataSegment(TrinoFileSystem fileSystem, QueryId queryId, Instant ttl) { SpoolingContext context = new SpoolingContext("encoding", queryId, 100, 1000); - FileSystemSpooledSegmentHandle handle = FileSystemSpooledSegmentHandle.random(ThreadLocalRandom.current(), context, ttl); + FileSystemSpooledSegmentHandle handle = FileSystemSpooledSegmentHandle.random(ThreadLocalRandom.current(), "nodeId", context, ttl); Location location = layout().location(TEST_LOCATION, handle); try (OutputStream stream = fileSystem.newOutputFile(location).create()) { stream.write(queryId.toString().getBytes(UTF_8)); diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/AbstractFileSystemSpoolingManagerTest.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/AbstractFileSystemSpoolingManagerTest.java index b5b628738355..0ac102c546a1 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/AbstractFileSystemSpoolingManagerTest.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/AbstractFileSystemSpoolingManagerTest.java @@ -83,7 +83,7 @@ public void testHandleRoundTrip() throws IOException { EncryptionKey key = randomAes256(); - FileSystemSpooledSegmentHandle handle = new FileSystemSpooledSegmentHandle("json", ULID.randomBinary(), Optional.of(key)); + FileSystemSpooledSegmentHandle handle = new FileSystemSpooledSegmentHandle("json", ULID.randomBinary(), "nodeId", Optional.of(key)); SpooledLocation location = getSpoolingManager().location(handle); FileSystemSpooledSegmentHandle handle2 = (FileSystemSpooledSegmentHandle) getSpoolingManager().handle(location.identifier(), location.headers()); diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpooledSegmentHandle.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpooledSegmentHandle.java index 9dfd596b56c7..1820b0df8396 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpooledSegmentHandle.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpooledSegmentHandle.java @@ -37,7 +37,7 @@ class TestFileSystemSpooledSegmentHandle public void testStorageIdentifierStability() { Instant expireAt = Instant.ofEpochMilli(90000); - FileSystemSpooledSegmentHandle handle = FileSystemSpooledSegmentHandle.random(new NotARandomAtAll(), context, expireAt); + FileSystemSpooledSegmentHandle handle = FileSystemSpooledSegmentHandle.random(new NotARandomAtAll(), "nodeId", context, expireAt); assertThat(handle.identifier()) .isEqualTo("0000002QWG0G2081040G208104"); } @@ -45,9 +45,9 @@ public void testStorageIdentifierStability() @Test public void testLexicalOrdering() { - FileSystemSpooledSegmentHandle handle1 = FileSystemSpooledSegmentHandle.random(random, context, now.plusMillis(1)); - FileSystemSpooledSegmentHandle handle2 = FileSystemSpooledSegmentHandle.random(random, context, now.plusMillis(3)); - FileSystemSpooledSegmentHandle handle3 = FileSystemSpooledSegmentHandle.random(random, context, now.plusMillis(2)); + FileSystemSpooledSegmentHandle handle1 = FileSystemSpooledSegmentHandle.random(random, "nodeId", context, now.plusMillis(1)); + FileSystemSpooledSegmentHandle handle2 = FileSystemSpooledSegmentHandle.random(random, "nodeId", context, now.plusMillis(3)); + FileSystemSpooledSegmentHandle handle3 = FileSystemSpooledSegmentHandle.random(random, "nodeId", context, now.plusMillis(2)); assertThat(handle2.identifier()) .isGreaterThan(handle1.identifier()); diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerLocalStack.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerLocalStack.java index baf04b892026..5cc58e57db99 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerLocalStack.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerLocalStack.java @@ -62,7 +62,7 @@ protected SpoolingManager getSpoolingManager() .setAwsAccessKey(LOCALSTACK.getAccessKey()) .setAwsSecretKey(LOCALSTACK.getSecretKey()) .setStreamingPartSize(DataSize.valueOf("5.5MB")); - return new FileSystemSpoolingManager(spoolingConfig, new S3FileSystemFactory(noop(), filesystemConfig, new S3FileSystemStats()), new SimpleFileSystemLayout()); + return new FileSystemSpoolingManager(spoolingConfig, new S3FileSystemFactory(noop(), filesystemConfig, new S3FileSystemStats()), new SimpleFileSystemLayout(), new TestingNode("nodeId")); } protected S3Client createS3Client() diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerMinio.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerMinio.java index ca20255e7977..221eac1a01f2 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerMinio.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestFileSystemSpoolingManagerMinio.java @@ -63,6 +63,6 @@ protected SpoolingManager getSpoolingManager() .setAwsAccessKey(Minio.MINIO_ACCESS_KEY) .setAwsSecretKey(Minio.MINIO_SECRET_KEY) .setStreamingPartSize(DataSize.valueOf("5.5MB")); - return new FileSystemSpoolingManager(spoolingConfig, new S3FileSystemFactory(noop(), filesystemConfig, new S3FileSystemStats()), new SimpleFileSystemLayout()); + return new FileSystemSpoolingManager(spoolingConfig, new S3FileSystemFactory(noop(), filesystemConfig, new S3FileSystemStats()), new SimpleFileSystemLayout(), new TestingNode("nodeId")); } } diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestPartitionedFileSystemLayout.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestPartitionedFileSystemLayout.java index 610ccbf16632..f6ad879da297 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestPartitionedFileSystemLayout.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestPartitionedFileSystemLayout.java @@ -32,7 +32,7 @@ class TestPartitionedFileSystemLayout @Test public void testStorageLocation() { - FileSystemSpooledSegmentHandle handle = new FileSystemSpooledSegmentHandle("json", ULID.generateBinary(213700331, STATIC_ENTROPY), Optional.empty()); + FileSystemSpooledSegmentHandle handle = new FileSystemSpooledSegmentHandle("json", ULID.generateBinary(213700331, STATIC_ENTROPY), "nodeId", Optional.empty()); assertThat(handle.identifier()).isEqualTo("00006BSKQBDSQQ8RBJC5Q68VVD"); @@ -40,9 +40,9 @@ public void testStorageLocation() assertThat(segmentLocation).isEqualTo(ROOT_LOCATION .appendPath("200-spooled") - .appendPath("00006BSKQBDSQQ8RBJC5Q68VVD.json")); + .appendPath("00006BSKQBDSQQ8RBJC5Q68VVDA3C995C2DAF8B753.json")); - assertThat(segmentLocation.fileName()).isEqualTo("00006BSKQBDSQQ8RBJC5Q68VVD.json"); + assertThat(segmentLocation.fileName()).isEqualTo("00006BSKQBDSQQ8RBJC5Q68VVDA3C995C2DAF8B753.json"); assertThat(LAYOUT.getExpiration(segmentLocation)).hasValue(Instant.ofEpochMilli(213700331)); } diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestSimpleFileSystemLayout.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestSimpleFileSystemLayout.java index b370029138f6..16d9b9ca6209 100644 --- a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestSimpleFileSystemLayout.java +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestSimpleFileSystemLayout.java @@ -32,14 +32,14 @@ class TestSimpleFileSystemLayout @Test public void testStorageLocation() { - FileSystemSpooledSegmentHandle handle = new FileSystemSpooledSegmentHandle("json", ULID.generateBinary(21370000, STATIC_ENTROPY), Optional.empty()); + FileSystemSpooledSegmentHandle handle = new FileSystemSpooledSegmentHandle("json", ULID.generateBinary(21370000, STATIC_ENTROPY), "nodeId", Optional.empty()); assertThat(handle.identifier()).isEqualTo("00000MC54GDSQQ8RBJC5Q68VVD"); Location segmentLocation = LAYOUT.location(ROOT_LOCATION, handle); - assertThat(segmentLocation).isEqualTo(ROOT_LOCATION.appendPath("00000MC54GDSQQ8RBJC5Q68VVD.json")); - assertThat(segmentLocation.fileName()).isEqualTo("00000MC54GDSQQ8RBJC5Q68VVD.json"); + assertThat(segmentLocation).isEqualTo(ROOT_LOCATION.appendPath("00000MC54GDSQQ8RBJC5Q68VVDA3C995C2DAF8B753.json")); + assertThat(segmentLocation.fileName()).isEqualTo("00000MC54GDSQQ8RBJC5Q68VVDA3C995C2DAF8B753.json"); assertThat(LAYOUT.getExpiration(segmentLocation)).hasValue(Instant.ofEpochMilli(21370000)); } diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestingNode.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestingNode.java new file mode 100644 index 000000000000..3017c4087d76 --- /dev/null +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestingNode.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spooling.filesystem; + +import io.trino.spi.HostAddress; +import io.trino.spi.Node; + +import static java.util.Objects.requireNonNull; + +class TestingNode + implements Node +{ + private final String nodeIdentifier; + + public TestingNode(String nodeIdentifier) + { + this.nodeIdentifier = requireNonNull(nodeIdentifier, "nodeIdentifier is null"); + } + + @Override + public String getHost() + { + return "localhost"; + } + + @Override + public HostAddress getHostAndPort() + { + return HostAddress.fromParts("localhost", 8080); + } + + @Override + public String getNodeIdentifier() + { + return nodeIdentifier; + } + + @Override + public String getVersion() + { + return "1.0.0"; + } + + @Override + public boolean isCoordinator() + { + return false; + } +} From 42a58531279e9fa6b170c161d22d6d1d345a0c2f Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Tue, 12 Aug 2025 12:32:42 +0200 Subject: [PATCH 2/3] Use monotonic ULID generator --- .../FileSystemSpooledSegmentHandle.java | 31 ++++----- .../filesystem/FileSystemSpoolingManager.java | 19 +++--- .../spooling/filesystem/TimeToLiveClock.java | 63 +++++++++++++++++++ .../filesystem/TestTimeToLiveClock.java | 61 ++++++++++++++++++ 4 files changed, 149 insertions(+), 25 deletions(-) create mode 100644 plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/TimeToLiveClock.java create mode 100644 plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestTimeToLiveClock.java diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java index 4fb8ea9bada5..050b8dd2a43d 100644 --- a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpooledSegmentHandle.java @@ -13,6 +13,7 @@ */ package io.trino.spooling.filesystem; +import com.google.common.annotations.VisibleForTesting; import io.azam.ulidj.ULID; import io.trino.filesystem.encryption.EncryptionKey; import io.trino.spi.spool.SpooledSegmentHandle; @@ -40,20 +41,6 @@ public record FileSystemSpooledSegmentHandle( verify(uuid.length == 16, "uuid must be 128 bits"); } - public static FileSystemSpooledSegmentHandle random(Random random, String nodeIdentifier, SpoolingContext context, Instant expireAt) - { - return random(random, nodeIdentifier, context, expireAt, Optional.empty()); - } - - public static FileSystemSpooledSegmentHandle random(Random random, String nodeIdentifier, SpoolingContext context, Instant expireAt, Optional encryptionKey) - { - return new FileSystemSpooledSegmentHandle( - context.encoding(), - ULID.generateBinary(expireAt.toEpochMilli(), entropy(random)), - nodeIdentifier, - encryptionKey); - } - @Override public Instant expirationTime() { @@ -75,6 +62,22 @@ public String identifier() return ULID.fromBinary(uuid); } + @VisibleForTesting + static FileSystemSpooledSegmentHandle random(Random random, String nodeIdentifier, SpoolingContext context, Instant expireAt) + { + return random(random, nodeIdentifier, context, expireAt, Optional.empty()); + } + + @VisibleForTesting + static FileSystemSpooledSegmentHandle random(Random random, String nodeIdentifier, SpoolingContext context, Instant expireAt, Optional encryptionKey) + { + return new FileSystemSpooledSegmentHandle( + context.encoding(), + ULID.generateBinary(expireAt.toEpochMilli(), entropy(random)), + nodeIdentifier, + encryptionKey); + } + private static byte[] entropy(Random random) { byte[] entropy = new byte[10]; diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManager.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManager.java index b580c2450b4c..7e5b80566bea 100644 --- a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManager.java +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/FileSystemSpoolingManager.java @@ -20,6 +20,7 @@ import io.airlift.slice.SliceOutput; import io.airlift.slice.Slices; import io.airlift.units.Duration; +import io.azam.ulidj.MonotonicULID; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; @@ -44,7 +45,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Random; import static io.airlift.slice.SizeOf.SIZE_OF_BYTE; import static io.airlift.slice.SizeOf.SIZE_OF_SHORT; @@ -64,13 +64,11 @@ public class FileSystemSpoolingManager private final TrinoFileSystem fileSystem; private final FileSystemLayout fileSystemLayout; private final String nodeIdentifier; - private final Duration ttl; + private final MonotonicULID uuidGenerator; private final Duration directAccessTtl; private final boolean encryptionEnabled; private final boolean explicitAckEnabled; - private final Random random; - @Inject public FileSystemSpoolingManager(FileSystemSpoolingConfig config, TrinoFileSystemFactory fileSystemFactory, FileSystemLayout fileSystemLayout, Node currentNode) { @@ -81,11 +79,10 @@ public FileSystemSpoolingManager(FileSystemSpoolingConfig config, TrinoFileSyste this.fileSystemLayout = requireNonNull(fileSystemLayout, "fileSystemLayout is null"); this.nodeIdentifier = requireNonNull(currentNode, "currentNode is null").getNodeIdentifier(); this.encryptionHeadersTranslator = encryptionHeadersTranslator(location); - this.ttl = config.getTtl(); + this.uuidGenerator = new MonotonicULID(new TimeToLiveClock(config.getTtl()), new SecureRandom(nodeIdentifier.getBytes(UTF_8))); this.directAccessTtl = config.getDirectAccessTtl(); this.encryptionEnabled = config.isEncryptionEnabled(); this.explicitAckEnabled = config.isExplicitAckEnabled(); - this.random = new SecureRandom(nodeIdentifier.getBytes(UTF_8)); } @Override @@ -110,11 +107,11 @@ public OutputStream createOutputStream(SpooledSegmentHandle handle) @Override public FileSystemSpooledSegmentHandle create(SpoolingContext context) { - Instant expireAt = Instant.now().plusMillis(ttl.toMillis()); - if (encryptionEnabled) { - return FileSystemSpooledSegmentHandle.random(random, nodeIdentifier, context, expireAt, Optional.of(randomAes256())); - } - return FileSystemSpooledSegmentHandle.random(random, nodeIdentifier, context, expireAt); + return new FileSystemSpooledSegmentHandle( + context.encoding(), + uuidGenerator.generateBinary(), + nodeIdentifier, + encryptionEnabled ? Optional.of(randomAes256()) : Optional.empty()); } @Override diff --git a/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/TimeToLiveClock.java b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/TimeToLiveClock.java new file mode 100644 index 000000000000..0712b229a2fe --- /dev/null +++ b/plugin/trino-spooling-filesystem/src/main/java/io/trino/spooling/filesystem/TimeToLiveClock.java @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spooling.filesystem; + +import com.google.common.annotations.VisibleForTesting; +import io.airlift.units.Duration; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; + +import static com.google.common.base.Verify.verify; +import static java.time.ZoneId.systemDefault; +import static java.util.Objects.requireNonNull; + +public class TimeToLiveClock + extends Clock +{ + private final long ttlMillis; + private final Clock delegate; + + public TimeToLiveClock(Duration ttl) + { + this(ttl.toMillis(), tickMillis(systemDefault())); + } + + @VisibleForTesting + TimeToLiveClock(long ttlMillis, Clock delegate) + { + verify(ttlMillis > 0, "ttlMillis must be positive"); + this.ttlMillis = ttlMillis; + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public ZoneId getZone() + { + return delegate.getZone(); + } + + @Override + public Clock withZone(ZoneId zone) + { + return new TimeToLiveClock(ttlMillis, delegate.withZone(zone)); + } + + @Override + public Instant instant() + { + return delegate.instant().plusMillis(ttlMillis); + } +} diff --git a/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestTimeToLiveClock.java b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestTimeToLiveClock.java new file mode 100644 index 000000000000..7ffd693c16c1 --- /dev/null +++ b/plugin/trino-spooling-filesystem/src/test/java/io/trino/spooling/filesystem/TestTimeToLiveClock.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.spooling.filesystem; + +import org.junit.jupiter.api.Test; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; + +import static java.time.ZoneOffset.MAX; +import static java.time.ZoneOffset.MIN; +import static java.time.ZoneOffset.UTC; +import static java.time.temporal.ChronoUnit.HOURS; +import static org.assertj.core.api.Assertions.assertThat; + +class TestTimeToLiveClock +{ + @Test + void testTimeToLiveClock() + { + testTimeToLiveClock(MIN); + testTimeToLiveClock(MAX); + testTimeToLiveClock(UTC); + testTimeToLiveClock(ZoneId.of("Europe/Warsaw")); + testTimeToLiveClock(ZoneId.of("America/New_York")); + testTimeToLiveClock(ZoneId.of("Asia/Tokyo")); + } + + void testTimeToLiveClock(ZoneId zoneId) + { + Duration ttl = Duration.of(1, HOURS); + Instant now = Instant.now(); + + Clock fixedClock = Clock.fixed(now, zoneId); + TimeToLiveClock ttlClock = new TimeToLiveClock(ttl.toMillis(), fixedClock); + + assertThat(ttlClock.getZone()) + .isEqualTo(zoneId); + + assertThat(ttlClock.instant()) + .isEqualTo(fixedClock.instant().plus(ttl)) + .isEqualTo(now.plus(ttl)); + + assertThat(ttlClock.withZone(zoneId).instant()) + .isEqualTo(fixedClock.withZone(zoneId).instant().plus(ttl)) + .isEqualTo(now.plus(ttl)); + } +} From bb7a5dbf8db8206ada5ce95edd9e4c397673504b Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Tue, 12 Aug 2025 12:39:57 +0200 Subject: [PATCH 3/3] Always spool data while running tests --- .../src/main/java/io/trino/testing/DistributedQueryRunner.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java b/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java index ef46859e8438..4f15657209cd 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/DistributedQueryRunner.java @@ -938,6 +938,8 @@ public DistributedQueryRunner build() // create smaller number of segments addExtraProperty("protocol.spooling.initial-segment-size", "16MB"); addExtraProperty("protocol.spooling.max-segment-size", "32MB"); + // Disable inlining to test spooling + addExtraProperty("protocol.spooling.inlining.enabled", "false"); addExtraProperty("protocol.spooling.shared-secret-key", randomAESKey()); // LocalSpoolingManager doesn't support direct storage access addExtraProperty("protocol.spooling.retrieval-mode", "coordinator_proxy");