Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import io.airlift.log.Logger;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.node.InternalNode;
import io.trino.server.ServerConfig;
import io.trino.spi.Node;
import io.trino.spi.classloader.ThreadContextClassLoader;
import io.trino.spi.spool.SpoolingManager;
import io.trino.spi.spool.SpoolingManagerContext;
Expand Down Expand Up @@ -54,11 +56,13 @@ public class SpoolingManagerRegistry
private final boolean coordinator;
private final OpenTelemetry openTelemetry;
private final Tracer tracer;
private final Node currentNode;
private volatile SpoolingManager spoolingManager;

@Inject
public SpoolingManagerRegistry(ServerConfig serverConfig, SpoolingEnabledConfig config, OpenTelemetry openTelemetry, Tracer tracer)
public SpoolingManagerRegistry(InternalNode currentNode, ServerConfig serverConfig, SpoolingEnabledConfig config, OpenTelemetry openTelemetry, Tracer tracer)
{
this.currentNode = requireNonNull(currentNode, "currentNode is null");
this.enabled = config.isEnabled();
this.coordinator = serverConfig.isCoordinator();
this.openTelemetry = requireNonNull(openTelemetry, "openTelemetry is null");
Expand Down Expand Up @@ -126,6 +130,12 @@ public boolean isCoordinator()
{
return coordinator;
}

@Override
public Node getCurrentNode()
{
return currentNode;
}
};

SpoolingManager spoolingManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import io.trino.metadata.TablePropertyManager;
import io.trino.metadata.TypeRegistry;
import io.trino.metadata.ViewPropertyManager;
import io.trino.node.InternalNode;
import io.trino.node.InternalNodeManager;
import io.trino.node.TestingInternalNodeManager;
import io.trino.operator.Driver;
Expand Down Expand Up @@ -213,6 +214,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -473,7 +475,12 @@ private PlanTester(Session defaultSession, int nodeCountForStats)
ImmutableSet.of(new ExcludeColumnsFunction()));

exchangeManagerRegistry = new ExchangeManagerRegistry(noop(), noopTracer(), secretsResolver);
spoolingManagerRegistry = new SpoolingManagerRegistry(new ServerConfig(), new SpoolingEnabledConfig(), noop(), noopTracer());
spoolingManagerRegistry = new SpoolingManagerRegistry(
new InternalNode("nodeId", URI.create("http://localhost:8080"), NodeVersion.UNKNOWN, false),
new ServerConfig(),
new SpoolingEnabledConfig(),
noop(),
noopTracer());
this.pluginManager = new PluginManager(
(loader, createClassLoader) -> {},
Optional.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.spi.Node;

public interface SpoolingManagerContext
{
Expand All @@ -32,4 +33,9 @@ default boolean isCoordinator()
{
throw new UnsupportedOperationException();
}

default Node getCurrentNode()
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.spooling.filesystem;

import com.google.common.annotations.VisibleForTesting;
import io.azam.ulidj.ULID;
import io.trino.filesystem.encryption.EncryptionKey;
import io.trino.spi.spool.SpooledSegmentHandle;
Expand All @@ -29,28 +30,17 @@
public record FileSystemSpooledSegmentHandle(
@Override String encoding,
byte[] uuid,
String nodeIdentifier,
Optional<EncryptionKey> encryptionKey)
implements SpooledSegmentHandle
{
public FileSystemSpooledSegmentHandle
{
requireNonNull(encryptionKey, "encryptionKey is null");
requireNonNull(nodeIdentifier, "nodeIdentifier is null");
verify(uuid.length == 16, "uuid must be 128 bits");
}

public static FileSystemSpooledSegmentHandle random(Random random, SpoolingContext context, Instant expireAt)
{
return random(random, context, expireAt, Optional.empty());
}

public static FileSystemSpooledSegmentHandle random(Random random, SpoolingContext context, Instant expireAt, Optional<EncryptionKey> encryptionKey)
{
return new FileSystemSpooledSegmentHandle(
context.encoding(),
ULID.generateBinary(expireAt.toEpochMilli(), entropy(random)),
encryptionKey);
}

@Override
public Instant expirationTime()
{
Expand All @@ -72,6 +62,22 @@ public String identifier()
return ULID.fromBinary(uuid);
}

@VisibleForTesting
static FileSystemSpooledSegmentHandle random(Random random, String nodeIdentifier, SpoolingContext context, Instant expireAt)
{
return random(random, nodeIdentifier, context, expireAt, Optional.empty());
}

@VisibleForTesting
static FileSystemSpooledSegmentHandle random(Random random, String nodeIdentifier, SpoolingContext context, Instant expireAt, Optional<EncryptionKey> encryptionKey)
{
return new FileSystemSpooledSegmentHandle(
context.encoding(),
ULID.generateBinary(expireAt.toEpochMilli(), entropy(random)),
nodeIdentifier,
encryptionKey);
}

private static byte[] entropy(Random random)
{
byte[] entropy = new byte[10];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import io.airlift.slice.SliceOutput;
import io.airlift.slice.Slices;
import io.airlift.units.Duration;
import io.azam.ulidj.MonotonicULID;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.filesystem.encryption.EncryptionKey;
import io.trino.spi.Node;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.spool.SpooledLocation;
import io.trino.spi.spool.SpooledLocation.DirectLocation;
Expand All @@ -38,12 +40,11 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.SecureRandom;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

import static io.airlift.slice.SizeOf.SIZE_OF_BYTE;
import static io.airlift.slice.SizeOf.SIZE_OF_SHORT;
Expand All @@ -62,22 +63,23 @@ public class FileSystemSpoolingManager
private final EncryptionHeadersTranslator encryptionHeadersTranslator;
private final TrinoFileSystem fileSystem;
private final FileSystemLayout fileSystemLayout;
private final Duration ttl;
private final String nodeIdentifier;
private final MonotonicULID uuidGenerator;
private final Duration directAccessTtl;
private final boolean encryptionEnabled;
private final boolean explicitAckEnabled;
private final Random random = ThreadLocalRandom.current();

@Inject
public FileSystemSpoolingManager(FileSystemSpoolingConfig config, TrinoFileSystemFactory fileSystemFactory, FileSystemLayout fileSystemLayout)
public FileSystemSpoolingManager(FileSystemSpoolingConfig config, TrinoFileSystemFactory fileSystemFactory, FileSystemLayout fileSystemLayout, Node currentNode)
{
requireNonNull(config, "config is null");
this.location = Location.of(config.getLocation());
this.fileSystem = requireNonNull(fileSystemFactory, "fileSystemFactory is null")
.create(ConnectorIdentity.ofUser("ignored"));
this.fileSystemLayout = requireNonNull(fileSystemLayout, "fileSystemLayout is null");
this.nodeIdentifier = requireNonNull(currentNode, "currentNode is null").getNodeIdentifier();
this.encryptionHeadersTranslator = encryptionHeadersTranslator(location);
this.ttl = config.getTtl();
this.uuidGenerator = new MonotonicULID(new TimeToLiveClock(config.getTtl()), new SecureRandom(nodeIdentifier.getBytes(UTF_8)));
this.directAccessTtl = config.getDirectAccessTtl();
this.encryptionEnabled = config.isEncryptionEnabled();
this.explicitAckEnabled = config.isExplicitAckEnabled();
Expand Down Expand Up @@ -105,11 +107,11 @@ public OutputStream createOutputStream(SpooledSegmentHandle handle)
@Override
public FileSystemSpooledSegmentHandle create(SpoolingContext context)
{
Instant expireAt = Instant.now().plusMillis(ttl.toMillis());
if (encryptionEnabled) {
return FileSystemSpooledSegmentHandle.random(random, context, expireAt, Optional.of(randomAes256()));
}
return FileSystemSpooledSegmentHandle.random(random, context, expireAt);
return new FileSystemSpooledSegmentHandle(
context.encoding(),
uuidGenerator.generateBinary(),
nodeIdentifier,
encryptionEnabled ? Optional.of(randomAes256()) : Optional.empty());
}

@Override
Expand Down Expand Up @@ -184,15 +186,19 @@ private static Slice serialize(FileSystemSpooledSegmentHandle fileHandle)
// ulid: byte[16]
// encodingLength: short
// encoding: byte[encodingLength]
// nodeIdentifierLength: short
// nodeIdentifier: byte[nodeIdentifierLength]
// isEncrypted: boolean

byte[] encoding = fileHandle.encoding().getBytes(UTF_8);
Slice slice = Slices.allocate(16 + SIZE_OF_SHORT + encoding.length + SIZE_OF_BYTE);
byte[] nodeIdentifier = fileHandle.nodeIdentifier().getBytes(UTF_8);
Slice slice = Slices.allocate(16 + 2 * SIZE_OF_SHORT + encoding.length + nodeIdentifier.length + SIZE_OF_BYTE);

SliceOutput output = slice.getOutput();
output.writeBytes(fileHandle.uuid());
output.writeShort(fileHandle.encoding().length());
output.writeShort(encoding.length);
output.writeBytes(encoding);
output.writeShort(nodeIdentifier.length);
output.writeBytes(nodeIdentifier);
output.writeBoolean(fileHandle.encryptionKey().isPresent());
return output.slice();
}
Expand All @@ -213,10 +219,13 @@ public SpooledSegmentHandle handle(Slice identifier, Map<String, List<String>> h
short encodingLength = input.readShort();

String encoding = input.readSlice(encodingLength).toStringUtf8();
short nodeIdentifierLength = input.readShort();
String nodeIdentifier = input.readSlice(nodeIdentifierLength).toStringUtf8();

if (!input.readBoolean()) {
return new FileSystemSpooledSegmentHandle(encoding, uuid, Optional.empty());
return new FileSystemSpooledSegmentHandle(encoding, uuid, nodeIdentifier, Optional.empty());
}
return new FileSystemSpooledSegmentHandle(encoding, uuid, Optional.of(encryptionHeadersTranslator.extractKey(headers)));
return new FileSystemSpooledSegmentHandle(encoding, uuid, nodeIdentifier, Optional.of(encryptionHeadersTranslator.extractKey(headers)));
}

private Duration remainingTtl(Instant expiresAt, Duration accessTtl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airlift.bootstrap.Bootstrap;
import io.opentelemetry.api.OpenTelemetry;
import io.trino.plugin.base.jmx.MBeanServerModule;
import io.trino.spi.Node;
import io.trino.spi.spool.SpoolingManager;
import io.trino.spi.spool.SpoolingManagerContext;
import io.trino.spi.spool.SpoolingManagerFactory;
Expand Down Expand Up @@ -46,6 +47,7 @@ public SpoolingManager create(Map<String, String> config, SpoolingManagerContext
binder -> {
binder.bind(SpoolingManagerContext.class).toInstance(context);
binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry());
binder.bind(Node.class).toInstance(context.getCurrentNode());
});

Injector injector = app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,28 @@
*/
package io.trino.spooling.filesystem;

import com.google.common.hash.HashFunction;
import io.azam.ulidj.ULID;
import io.trino.filesystem.Location;

import java.time.Instant;
import java.util.List;
import java.util.Optional;

import static com.google.common.hash.Hashing.farmHashFingerprint64;
import static io.azam.ulidj.ULID.ULID_LENGTH;
import static java.util.Locale.ENGLISH;

public class SimpleFileSystemLayout
implements FileSystemLayout
{
// Node identifier is hashed to avoid both long file names and leaking information about the node
private static final HashFunction HASH = farmHashFingerprint64();

@Override
public Location location(Location rootLocation, FileSystemSpooledSegmentHandle segmentHandle)
{
return rootLocation.appendPath(segmentHandle.identifier() + "." + segmentHandle.encoding());
return rootLocation.appendPath(segmentHandle.identifier() + hashNodeIdentifier(segmentHandle.nodeIdentifier()) + '.' + segmentHandle.encoding());
}

@Override
Expand All @@ -39,15 +47,19 @@ public List<Location> searchPaths(Location rootLocation)
public Optional<Instant> getExpiration(Location location)
{
String filename = location.fileName();
int index = filename.indexOf(".");
if (index == -1) {
return Optional.empty(); // Not a segment
if (filename.length() < ULID_LENGTH) {
return Optional.empty(); // Definitely not a segment
}

String uuid = filename.substring(0, index);
String uuid = filename.substring(0, ULID_LENGTH);
if (!ULID.isValid(uuid)) {
return Optional.empty();
}
return Optional.of(Instant.ofEpochMilli(ULID.getTimestamp(uuid)));
}

private static String hashNodeIdentifier(String nodeIdentifier)
{
return HASH.hashUnencodedChars(nodeIdentifier).toString().toUpperCase(ENGLISH);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.spooling.filesystem;

import com.google.common.annotations.VisibleForTesting;
import io.airlift.units.Duration;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;

import static com.google.common.base.Verify.verify;
import static java.time.ZoneId.systemDefault;
import static java.util.Objects.requireNonNull;

public class TimeToLiveClock
extends Clock
{
private final long ttlMillis;
private final Clock delegate;

public TimeToLiveClock(Duration ttl)
{
this(ttl.toMillis(), tickMillis(systemDefault()));
}

@VisibleForTesting
TimeToLiveClock(long ttlMillis, Clock delegate)
{
verify(ttlMillis > 0, "ttlMillis must be positive");
this.ttlMillis = ttlMillis;
this.delegate = requireNonNull(delegate, "delegate is null");
}

@Override
public ZoneId getZone()
{
return delegate.getZone();
}

@Override
public Clock withZone(ZoneId zone)
{
return new TimeToLiveClock(ttlMillis, delegate.withZone(zone));
}

@Override
public Instant instant()
{
return delegate.instant().plusMillis(ttlMillis);
}
}
Loading
Loading