diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java index 302099906b45..3bed2223113f 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java @@ -697,6 +697,18 @@ public ListenableFuture setBytes(long bytes) return blocked; } + @Override + public ListenableFuture addBytes(long delta) + { + if (delta == 0) { + return NOT_BLOCKED; + } + ListenableFuture blocked = delegate.addBytes(delta); + updateMemoryFuture(blocked, memoryFuture); + allocationListener.run(); + return blocked; + } + @Override public boolean trySetBytes(long bytes) { diff --git a/core/trino-main/src/test/java/io/trino/execution/buffer/TestSpoolingExchangeOutputBuffer.java b/core/trino-main/src/test/java/io/trino/execution/buffer/TestSpoolingExchangeOutputBuffer.java index 2e00ff8a0f4e..250535e07f40 100644 --- a/core/trino-main/src/test/java/io/trino/execution/buffer/TestSpoolingExchangeOutputBuffer.java +++ b/core/trino-main/src/test/java/io/trino/execution/buffer/TestSpoolingExchangeOutputBuffer.java @@ -415,6 +415,12 @@ public ListenableFuture setBytes(long bytes) return immediateVoidFuture(); } + @Override + public ListenableFuture addBytes(long delta) + { + return immediateVoidFuture(); + } + @Override public boolean trySetBytes(long bytes) { diff --git a/lib/trino-filesystem-azure/pom.xml b/lib/trino-filesystem-azure/pom.xml index c90a1742cd35..550f099da751 100644 --- a/lib/trino-filesystem-azure/pom.xml +++ b/lib/trino-filesystem-azure/pom.xml @@ -74,6 +74,11 @@ classes + + io.airlift + concurrent + + io.airlift configuration diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java index 50c3cc3f8886..3ff1e8fb7748 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystem.java @@ -60,6 +60,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.ExecutorService; import static com.azure.storage.common.implementation.Constants.HeaderConstants.ETAG_WILDCARD; import static com.google.common.base.Preconditions.checkArgument; @@ -80,6 +81,7 @@ public class AzureFileSystem implements TrinoFileSystem { private final HttpClient httpClient; + private final ExecutorService uploadExecutor; private final TracingOptions tracingOptions; private final AzureAuth azureAuth; private final String endpoint; @@ -87,18 +89,22 @@ public class AzureFileSystem private final long writeBlockSizeBytes; private final int maxWriteConcurrency; private final long maxSingleUploadSizeBytes; + private final boolean multipartWriteEnabled; public AzureFileSystem( HttpClient httpClient, + ExecutorService uploadExecutor, TracingOptions tracingOptions, AzureAuth azureAuth, String endpoint, DataSize readBlockSize, DataSize writeBlockSize, int maxWriteConcurrency, - DataSize maxSingleUploadSize) + DataSize maxSingleUploadSize, + boolean multipartWriteEnabled) { this.httpClient = requireNonNull(httpClient, "httpClient is null"); + this.uploadExecutor = requireNonNull(uploadExecutor, "uploadExecutor is null"); this.tracingOptions = requireNonNull(tracingOptions, "tracingOptions is null"); this.azureAuth = requireNonNull(azureAuth, "azureAuth is null"); this.endpoint = requireNonNull(endpoint, "endpoint is null"); @@ -107,6 +113,7 @@ public AzureFileSystem( checkArgument(maxWriteConcurrency >= 0, "maxWriteConcurrency is negative"); this.maxWriteConcurrency = maxWriteConcurrency; this.maxSingleUploadSizeBytes = maxSingleUploadSize.toBytes(); + this.multipartWriteEnabled = multipartWriteEnabled; } @Override @@ -162,7 +169,7 @@ public TrinoOutputFile newOutputFile(Location location) { AzureLocation azureLocation = new AzureLocation(location); BlobClient client = createBlobClient(azureLocation, Optional.empty()); - return new AzureOutputFile(azureLocation, client, writeBlockSizeBytes, maxWriteConcurrency, maxSingleUploadSizeBytes); + return new AzureOutputFile(azureLocation, client, uploadExecutor, writeBlockSizeBytes, maxWriteConcurrency, maxSingleUploadSizeBytes, multipartWriteEnabled); } @Override @@ -170,7 +177,7 @@ public TrinoOutputFile newEncryptedOutputFile(Location location, EncryptionKey k { AzureLocation azureLocation = new AzureLocation(location); BlobClient client = createBlobClient(azureLocation, Optional.of(key)); - return new AzureOutputFile(azureLocation, client, writeBlockSizeBytes, maxWriteConcurrency, maxSingleUploadSizeBytes); + return new AzureOutputFile(azureLocation, client, uploadExecutor, writeBlockSizeBytes, maxWriteConcurrency, maxSingleUploadSizeBytes, multipartWriteEnabled); } @Override diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemConfig.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemConfig.java index 95385d0005a8..2a132538a0ec 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemConfig.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemConfig.java @@ -39,6 +39,7 @@ public enum AuthType private DataSize maxSingleUploadSize = DataSize.of(4, Unit.MEGABYTE); private Integer maxHttpRequests = 2 * Runtime.getRuntime().availableProcessors(); private String applicationId = "Trino"; + private boolean multipartWriteEnabled; @NotNull public AuthType getAuthType() @@ -145,4 +146,17 @@ public AzureFileSystemConfig setApplicationId(String applicationId) this.applicationId = applicationId; return this; } + + @Config("azure.multipart-write-enabled") + @ConfigDescription("Enable multipart writes for large files") + public AzureFileSystemConfig setMultipartWriteEnabled(boolean multipartWriteEnabled) + { + this.multipartWriteEnabled = multipartWriteEnabled; + return this; + } + + public boolean isMultipartWriteEnabled() + { + return multipartWriteEnabled; + } } diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java index 34f138d6e531..cbd0e3b5b30b 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureFileSystemFactory.java @@ -31,13 +31,18 @@ import reactor.netty.resources.ConnectionProvider; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newCachedThreadPool; public class AzureFileSystemFactory implements TrinoFileSystemFactory { + private final ExecutorService uploadExecutor = newCachedThreadPool(daemonThreadsNamed("azure-upload-%s")); + private final AzureAuth auth; private final String endpoint; private final DataSize readBlockSize; @@ -48,6 +53,7 @@ public class AzureFileSystemFactory private final HttpClient httpClient; private final ConnectionProvider connectionProvider; private final EventLoopGroup eventLoopGroup; + private final boolean multipart; @Inject public AzureFileSystemFactory(OpenTelemetry openTelemetry, AzureAuth azureAuth, AzureFileSystemConfig config) @@ -60,7 +66,8 @@ public AzureFileSystemFactory(OpenTelemetry openTelemetry, AzureAuth azureAuth, config.getMaxWriteConcurrency(), config.getMaxSingleUploadSize(), config.getMaxHttpRequests(), - config.getApplicationId()); + config.getApplicationId(), + config.isMultipartWriteEnabled()); } public AzureFileSystemFactory( @@ -72,7 +79,8 @@ public AzureFileSystemFactory( int maxWriteConcurrency, DataSize maxSingleUploadSize, int maxHttpRequests, - String applicationId) + String applicationId, + boolean multipart) { this.auth = requireNonNull(azureAuth, "azureAuth is null"); this.endpoint = requireNonNull(endpoint, "endpoint is null"); @@ -88,11 +96,14 @@ public AzureFileSystemFactory( clientOptions.setTracingOptions(tracingOptions); clientOptions.setApplicationId(applicationId); httpClient = createAzureHttpClient(connectionProvider, eventLoopGroup, clientOptions); + this.multipart = multipart; } @PreDestroy public void destroy() { + uploadExecutor.shutdown(); + if (connectionProvider != null) { connectionProvider.dispose(); } @@ -113,7 +124,7 @@ public void destroy() @Override public TrinoFileSystem create(ConnectorIdentity identity) { - return new AzureFileSystem(httpClient, tracingOptions, auth, endpoint, readBlockSize, writeBlockSize, maxWriteConcurrency, maxSingleUploadSize); + return new AzureFileSystem(httpClient, uploadExecutor, tracingOptions, auth, endpoint, readBlockSize, writeBlockSize, maxWriteConcurrency, maxSingleUploadSize, multipart); } public static HttpClient createAzureHttpClient(ConnectionProvider connectionProvider, EventLoopGroup eventLoopGroup, HttpClientOptions clientOptions) diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureMultipartOutputStream.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureMultipartOutputStream.java new file mode 100644 index 000000000000..ae43f82d961b --- /dev/null +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureMultipartOutputStream.java @@ -0,0 +1,252 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem.azure; + +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.specialized.BlobOutputStream; +import com.azure.storage.blob.specialized.BlockBlobClient; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.memory.context.LocalMemoryContext; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.util.concurrent.Futures.submit; +import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; +import static io.trino.filesystem.azure.AzureUtils.handleAzureException; +import static java.lang.Math.clamp; +import static java.lang.Math.max; +import static java.lang.Math.min; +import static java.lang.System.arraycopy; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; + +public class AzureMultipartOutputStream + extends OutputStream +{ + private final AzureLocation location; + private final BlockBlobClient blockClient; + private final LocalMemoryContext memoryContext; + private final Executor uploadExecutor; + + private final int writeBlockSizeBytes; + private final boolean overwrite; + private boolean closed; + private boolean multiStagingStarted; + private final List stagedBlocks = new ArrayList<>(); + private final List> stagedBlockFutures = new ArrayList<>(); + private int blockNum; + + private byte[] buffer = new byte[0]; + private int bufferSize; + private int initialBufferSize = 64; + + public AzureMultipartOutputStream( + AzureLocation location, + BlobClient blobClient, + ExecutorService uploadExecutor, + boolean overwrite, + AggregatedMemoryContext memoryContext, + int writeBlockSizeBytes) + { + requireNonNull(location, "location is null"); + requireNonNull(blobClient, "blobClient is null"); + checkArgument(writeBlockSizeBytes >= 0, "writeBlockSizeBytes is negative"); + + this.location = location; + this.writeBlockSizeBytes = writeBlockSizeBytes; + this.overwrite = overwrite; + this.memoryContext = memoryContext.newLocalMemoryContext(AzureMultipartOutputStream.class.getSimpleName()); + this.blockClient = blobClient.getBlockBlobClient(); + this.uploadExecutor = listeningDecorator(requireNonNull(uploadExecutor, "uploadExecutor is null")); + } + + @Override + public void write(int b) + throws IOException + { + ensureOpen(); + ensureCapacity(1); + buffer[bufferSize] = (byte) b; + bufferSize++; + flushBuffer(false); + } + + @Override + public void write(byte[] bytes, int offset, int length) + throws IOException + { + ensureOpen(); + + while (length > 0) { + ensureCapacity(length); + + int copied = min(buffer.length - bufferSize, length); + arraycopy(bytes, offset, buffer, bufferSize, copied); + bufferSize += copied; + + flushBuffer(false); + + offset += copied; + length -= copied; + } + } + + @Override + public void flush() + throws IOException + { + ensureOpen(); + flushBuffer(false); + } + + private void ensureOpen() + throws IOException + { + if (closed) { + throw new IOException("Output stream closed: " + location); + } + } + + private void ensureCapacity(int extra) + { + int currentCapacity = buffer.length; + int capacity = min(writeBlockSizeBytes, bufferSize + extra); + if (buffer.length < capacity) { + int target = max(buffer.length, initialBufferSize); + if (target < capacity) { + target += target / 2; // increase 50% + target = clamp(target, capacity, writeBlockSizeBytes); + } + buffer = Arrays.copyOf(buffer, target); + memoryContext.addBytes(target - currentCapacity); + } + } + + private void flushBuffer(boolean finished) + throws IOException + { + // skip multipart upload if there would only be one staged block + if (finished && !multiStagingStarted) { + BlobOutputStream blobOutputStream = blockClient + .getBlobOutputStream(overwrite); + + blobOutputStream.write(buffer, 0, bufferSize); + blobOutputStream.close(); + memoryContext.addBytes(-buffer.length); + buffer = new byte[0]; + return; + } + + if (bufferSize != writeBlockSizeBytes && (!finished || bufferSize == 0)) { + // If the buffer isn't full yet and we are not finished, do not stage the block + return; + } + + multiStagingStarted = true; + + byte[] data = buffer; + int length = bufferSize; + + if (finished) { + this.buffer = null; + } + else { + this.buffer = new byte[0]; + this.initialBufferSize = writeBlockSizeBytes; + bufferSize = 0; + } + String nextBlockId = nextBlockId(); + stagedBlockFutures.add(submit(stageBlock(nextBlockId, data, length), uploadExecutor)); + } + + private Callable stageBlock(String blockId, byte[] data, int length) + { + return () -> { + blockClient.stageBlock(blockId, new ByteArrayInputStream(data, 0, length), length); + memoryContext.addBytes(-length); + return blockId; + }; + } + + private synchronized String nextBlockId() + { + String blockId = Base64.getEncoder().encodeToString(String.format("%06d", blockNum).getBytes(UTF_8)); + blockNum++; + stagedBlocks.add(blockId); + return blockId; + } + + private void waitForUploadsToFinish() + throws IOException + { + if (stagedBlockFutures.isEmpty()) { + return; + } + + try { + Futures.allAsList(stagedBlockFutures).get(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(); + } + catch (ExecutionException e) { + throw handleAzureException(e.getCause(), "upload", location); + } + } + + private void commitBlocksIfNeeded() + { + if (stagedBlocks.isEmpty()) { + return; + } + + blockClient.commitBlockList(stagedBlocks, overwrite); + } + + @Override + public void close() + throws IOException + { + if (closed) { + return; + } + closed = true; + + try { + flushBuffer(true); + waitForUploadsToFinish(); + commitBlocksIfNeeded(); + memoryContext.setBytes(0); + memoryContext.close(); + } + catch (IOException | RuntimeException e) { + throw handleAzureException(e, "upload", location); + } + } +} diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureOutputFile.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureOutputFile.java index 52f17228945c..02a469f6fd01 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureOutputFile.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureOutputFile.java @@ -25,8 +25,10 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.file.FileAlreadyExistsException; +import java.util.concurrent.ExecutorService; import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; class AzureOutputFile @@ -34,21 +36,25 @@ class AzureOutputFile { private final AzureLocation location; private final BlobClient blobClient; + private final ExecutorService executorService; private final long writeBlockSizeBytes; private final int maxWriteConcurrency; private final long maxSingleUploadSizeBytes; + private final boolean multipartWriteEnabled; - public AzureOutputFile(AzureLocation location, BlobClient blobClient, long writeBlockSizeBytes, int maxWriteConcurrency, long maxSingleUploadSizeBytes) + public AzureOutputFile(AzureLocation location, BlobClient blobClient, ExecutorService executorService, long writeBlockSizeBytes, int maxWriteConcurrency, long maxSingleUploadSizeBytes, boolean multipartWriteEnabled) { this.location = requireNonNull(location, "location is null"); location.location().verifyValidFileLocation(); this.blobClient = requireNonNull(blobClient, "blobClient is null"); + this.executorService = requireNonNull(executorService, "executorService is null"); checkArgument(writeBlockSizeBytes >= 0, "writeBlockSizeBytes is negative"); this.writeBlockSizeBytes = writeBlockSizeBytes; checkArgument(maxWriteConcurrency >= 0, "maxWriteConcurrency is negative"); this.maxWriteConcurrency = maxWriteConcurrency; checkArgument(maxSingleUploadSizeBytes >= 0, "maxSingleUploadSizeBytes is negative"); this.maxSingleUploadSizeBytes = maxSingleUploadSizeBytes; + this.multipartWriteEnabled = multipartWriteEnabled; } public boolean exists() @@ -92,9 +98,12 @@ public void createExclusive(byte[] data) } } - private AzureOutputStream createOutputStream(AggregatedMemoryContext memoryContext, boolean overwrite) + private OutputStream createOutputStream(AggregatedMemoryContext memoryContext, boolean overwrite) throws IOException { + if (multipartWriteEnabled) { + return new AzureMultipartOutputStream(location, blobClient, executorService, overwrite, memoryContext, toIntExact(writeBlockSizeBytes)); + } return new AzureOutputStream(location, blobClient, overwrite, memoryContext, writeBlockSizeBytes, maxWriteConcurrency, maxSingleUploadSizeBytes); } diff --git a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureUtils.java b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureUtils.java index fed6317bbbf9..873eb97afdba 100644 --- a/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureUtils.java +++ b/lib/trino-filesystem-azure/src/main/java/io/trino/filesystem/azure/AzureUtils.java @@ -31,7 +31,7 @@ final class AzureUtils { private AzureUtils() {} - public static IOException handleAzureException(RuntimeException exception, String action, AzureLocation location) + public static IOException handleAzureException(Throwable exception, String action, AzureLocation location) throws IOException { if (isFileNotFoundException(exception)) { @@ -43,7 +43,7 @@ public static IOException handleAzureException(RuntimeException exception, Strin throw new IOException("Error %s file: %s".formatted(action, location), exception); } - public static boolean isFileNotFoundException(RuntimeException exception) + public static boolean isFileNotFoundException(Throwable exception) { if (exception instanceof BlobStorageException blobStorageException) { return BlobErrorCode.BLOB_NOT_FOUND.equals(blobStorageException.getErrorCode()); diff --git a/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/AbstractTestAzureFileSystem.java b/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/AbstractTestAzureFileSystem.java index 5d87efbc5623..46483e349b6d 100644 --- a/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/AbstractTestAzureFileSystem.java +++ b/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/AbstractTestAzureFileSystem.java @@ -70,17 +70,23 @@ protected enum AccountKind protected void initializeWithAccessKey(String account, String accountKey, AccountKind accountKind) throws IOException { - initialize(account, new AzureAuthAccessKey(accountKey), accountKind); + initialize(account, new AzureAuthAccessKey(accountKey), accountKind, false); + } + + protected void initializeWithAccessKeyAndMultipartWrites(String account, String accountKey, AccountKind accountKind) + throws IOException + { + initialize(account, new AzureAuthAccessKey(accountKey), accountKind, true); } protected void initializeWithOAuth(String account, String tenantId, String clientId, String clientSecret, AccountKind accountKind) throws IOException { String clientEndpoint = "https://login.microsoftonline.com/%s/oauth2/v2.0/token".formatted(tenantId); - initialize(account, new AzureAuthOauth(clientEndpoint, tenantId, clientId, clientSecret), accountKind); + initialize(account, new AzureAuthOauth(clientEndpoint, tenantId, clientId, clientSecret), accountKind, false); } - private void initialize(String account, AzureAuth azureAuth, AccountKind accountKind) + private void initialize(String account, AzureAuth azureAuth, AccountKind accountKind, boolean multipartWriteEnabled) throws IOException { this.account = requireNonNull(account, "account is null"); @@ -107,7 +113,8 @@ private void initialize(String account, AzureAuth azureAuth, AccountKind account fileSystemFactory = new AzureFileSystemFactory( OpenTelemetry.noop(), azureAuth, - new AzureFileSystemConfig()); + new AzureFileSystemConfig() + .setMultipartWriteEnabled(multipartWriteEnabled)); fileSystem = fileSystemFactory.create(ConnectorIdentity.ofUser("test")); cleanupFiles(); diff --git a/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemConfig.java b/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemConfig.java index 28c09a9fcc4c..b098a5c03676 100644 --- a/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemConfig.java +++ b/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemConfig.java @@ -38,7 +38,8 @@ void testDefaults() .setMaxWriteConcurrency(8) .setMaxSingleUploadSize(DataSize.of(4, Unit.MEGABYTE)) .setMaxHttpRequests(2 * Runtime.getRuntime().availableProcessors()) - .setApplicationId("Trino")); + .setApplicationId("Trino") + .setMultipartWriteEnabled(false)); } @Test @@ -53,6 +54,7 @@ public void testExplicitPropertyMappings() .put("azure.max-single-upload-size", "7MB") .put("azure.max-http-requests", "128") .put("azure.application-id", "application id") + .put("azure.multipart-write-enabled", "true") .buildOrThrow(); AzureFileSystemConfig expected = new AzureFileSystemConfig() @@ -63,7 +65,8 @@ public void testExplicitPropertyMappings() .setMaxWriteConcurrency(7) .setMaxSingleUploadSize(DataSize.of(7, Unit.MEGABYTE)) .setMaxHttpRequests(128) - .setApplicationId("application id"); + .setApplicationId("application id") + .setMultipartWriteEnabled(true); assertFullMapping(properties, expected); } diff --git a/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemGen2Flat.java b/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemGen2Flat.java index def79fb20435..8e06e0e3609b 100644 --- a/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemGen2Flat.java +++ b/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemGen2Flat.java @@ -30,6 +30,6 @@ class TestAzureFileSystemGen2Flat void setup() throws IOException { - initializeWithAccessKey(requireEnv("ABFS_FLAT_ACCOUNT"), requireEnv("ABFS_FLAT_ACCESS_KEY"), FLAT); + initializeWithAccessKeyAndMultipartWrites(requireEnv("ABFS_FLAT_ACCOUNT"), requireEnv("ABFS_FLAT_ACCESS_KEY"), FLAT); } } diff --git a/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemGen2Hierarchical.java b/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemGen2Hierarchical.java index d8423e00c2b5..3f74c1ee06eb 100644 --- a/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemGen2Hierarchical.java +++ b/lib/trino-filesystem-azure/src/test/java/io/trino/filesystem/azure/TestAzureFileSystemGen2Hierarchical.java @@ -30,6 +30,6 @@ class TestAzureFileSystemGen2Hierarchical void setup() throws IOException { - initializeWithAccessKey(requireEnv("ABFS_HIERARCHICAL_ACCOUNT"), requireEnv("ABFS_HIERARCHICAL_ACCESS_KEY"), HIERARCHICAL); + initializeWithAccessKeyAndMultipartWrites(requireEnv("ABFS_HIERARCHICAL_ACCOUNT"), requireEnv("ABFS_HIERARCHICAL_ACCESS_KEY"), HIERARCHICAL); } } diff --git a/lib/trino-memory-context/src/main/java/io/trino/memory/context/CoarseGrainLocalMemoryContext.java b/lib/trino-memory-context/src/main/java/io/trino/memory/context/CoarseGrainLocalMemoryContext.java index 21b65bdf01c2..9bfadb7fcbac 100644 --- a/lib/trino-memory-context/src/main/java/io/trino/memory/context/CoarseGrainLocalMemoryContext.java +++ b/lib/trino-memory-context/src/main/java/io/trino/memory/context/CoarseGrainLocalMemoryContext.java @@ -20,6 +20,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy; import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.Math.addExact; import static java.util.Objects.requireNonNull; /** @@ -69,6 +70,12 @@ public synchronized ListenableFuture setBytes(long bytes) return Futures.immediateVoidFuture(); } + @Override + public synchronized ListenableFuture addBytes(long delta) + { + return setBytes(addExact(currentBytes, delta)); + } + @Override public synchronized boolean trySetBytes(long bytes) { diff --git a/lib/trino-memory-context/src/main/java/io/trino/memory/context/LocalMemoryContext.java b/lib/trino-memory-context/src/main/java/io/trino/memory/context/LocalMemoryContext.java index 422606572535..754446e64c6b 100644 --- a/lib/trino-memory-context/src/main/java/io/trino/memory/context/LocalMemoryContext.java +++ b/lib/trino-memory-context/src/main/java/io/trino/memory/context/LocalMemoryContext.java @@ -30,6 +30,17 @@ public interface LocalMemoryContext */ ListenableFuture setBytes(long bytes); + /** + * When this method returns, the bytes tracked by this LocalMemoryContext has been updated by the provided delta. + * The returned future will tell the caller whether it should block before reserving more memory + * (which happens when the memory pools are low on memory). + *

+ * Note: Canceling the returned future will complete it immediately even though the memory pools are low + * on memory, and callers blocked on this future will proceed to allocating more memory from the exhausted + * pools, which will violate the protocol of Trino MemoryPool implementation. + */ + ListenableFuture addBytes(long delta); + /** * This method can return false when there is not enough memory available to satisfy a positive delta allocation * ({@code bytes} is greater than the bytes tracked by this LocalMemoryContext). diff --git a/lib/trino-memory-context/src/main/java/io/trino/memory/context/SimpleLocalMemoryContext.java b/lib/trino-memory-context/src/main/java/io/trino/memory/context/SimpleLocalMemoryContext.java index be5b20688727..f7ef19d5f19a 100644 --- a/lib/trino-memory-context/src/main/java/io/trino/memory/context/SimpleLocalMemoryContext.java +++ b/lib/trino-memory-context/src/main/java/io/trino/memory/context/SimpleLocalMemoryContext.java @@ -67,6 +67,12 @@ public synchronized ListenableFuture setBytes(long bytes) return future; } + @Override + public synchronized ListenableFuture addBytes(long delta) + { + return setBytes(usedBytes + delta); + } + @Override public synchronized boolean trySetBytes(long bytes) {