Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemException;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoOutputFile;

Expand Down Expand Up @@ -165,7 +166,7 @@ private void deleteGen2Directory(AzureLocation location)
DataLakeDirectoryClient directoryClient = createDirectoryClient(fileSystemClient, location.path());
if (directoryClient.exists()) {
if (!directoryClient.getProperties().isDirectory()) {
throw new IOException("Location is not a directory: " + location);
throw new TrinoFileSystemException("Location is not a directory: " + location);
}
directoryClient.deleteIfExistsWithResponse(deleteRecursiveOptions, null, null);
}
Expand All @@ -191,10 +192,10 @@ public void renameFile(Location source, Location target)
AzureLocation sourceLocation = new AzureLocation(source);
AzureLocation targetLocation = new AzureLocation(target);
if (!sourceLocation.account().equals(targetLocation.account())) {
throw new IOException("Cannot rename across storage accounts");
throw new TrinoFileSystemException("Cannot rename across storage accounts");
}
if (!Objects.equals(sourceLocation.container(), targetLocation.container())) {
throw new IOException("Cannot rename across storage account containers");
throw new TrinoFileSystemException("Cannot rename across storage account containers");
}

// DFS rename file works with all storage types
Expand All @@ -208,7 +209,7 @@ private void renameGen2File(AzureLocation source, AzureLocation target)
DataLakeFileSystemClient fileSystemClient = createFileSystemClient(source);
DataLakeFileClient dataLakeFileClient = createFileClient(fileSystemClient, source.path());
if (dataLakeFileClient.getProperties().isDirectory()) {
throw new IOException("Rename file from %s to %s, source is a directory".formatted(source, target));
throw new TrinoFileSystemException("Rename file from %s to %s, source is a directory".formatted(source, target));
}

createDirectoryIfNotExists(fileSystemClient, target.location().parentDirectory().path());
Expand Down Expand Up @@ -255,7 +256,7 @@ private FileIterator listGen2Files(AzureLocation location)
return FileIterator.empty();
}
if (!directoryClient.getProperties().isDirectory()) {
throw new IOException("Location is not a directory: " + location);
throw new TrinoFileSystemException("Location is not a directory: " + location);
}
pathItems = directoryClient.listPaths(true, false, null, null);
}
Expand Down Expand Up @@ -315,7 +316,7 @@ public void createDirectory(Location location)
DataLakeFileSystemClient fileSystemClient = createFileSystemClient(azureLocation);
DataLakeDirectoryClient directoryClient = createDirectoryIfNotExists(fileSystemClient, azureLocation.path());
if (!directoryClient.getProperties().isDirectory()) {
throw new IOException("Location is not a directory: " + azureLocation);
throw new TrinoFileSystemException("Location is not a directory: " + azureLocation);
}
}
catch (RuntimeException e) {
Expand All @@ -330,26 +331,26 @@ public void renameDirectory(Location source, Location target)
AzureLocation sourceLocation = new AzureLocation(source);
AzureLocation targetLocation = new AzureLocation(target);
if (!sourceLocation.account().equals(targetLocation.account())) {
throw new IOException("Cannot rename across storage accounts");
throw new TrinoFileSystemException("Cannot rename across storage accounts");
}
if (!Objects.equals(sourceLocation.container(), targetLocation.container())) {
throw new IOException("Cannot rename across storage account containers");
throw new TrinoFileSystemException("Cannot rename across storage account containers");
}
if (!isHierarchicalNamespaceEnabled(sourceLocation)) {
throw new IOException("Azure non-hierarchical does not support directory renames");
throw new TrinoFileSystemException("Azure non-hierarchical does not support directory renames");
}
if (sourceLocation.path().isEmpty() || targetLocation.path().isEmpty()) {
throw new IOException("Cannot rename %s to %s".formatted(source, target));
throw new TrinoFileSystemException("Cannot rename %s to %s".formatted(source, target));
}

try {
DataLakeFileSystemClient fileSystemClient = createFileSystemClient(sourceLocation);
DataLakeDirectoryClient directoryClient = createDirectoryClient(fileSystemClient, sourceLocation.path());
if (!directoryClient.exists()) {
throw new IOException("Source directory does not exist: " + source);
throw new TrinoFileSystemException("Source directory does not exist: " + source);
}
if (!directoryClient.getProperties().isDirectory()) {
throw new IOException("Source is not a directory: " + source);
throw new TrinoFileSystemException("Source is not a directory: " + source);
}
directoryClient.rename(null, targetLocation.path());
}
Expand Down Expand Up @@ -416,7 +417,7 @@ private Set<Location> listGen2Directories(AzureLocation location)
return ImmutableSet.of();
}
if (!directoryClient.getProperties().isDirectory()) {
throw new IOException("Location is not a directory: " + location);
throw new TrinoFileSystemException("Location is not a directory: " + location);
}
pathItems = directoryClient.listPaths(false, false, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import io.trino.filesystem.TrinoFileSystemException;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -32,7 +33,7 @@ public static IOException handleAzureException(RuntimeException exception, Strin
throw withCause(new FileNotFoundException(location.toString()), exception);
}
if (exception instanceof AzureException) {
throw new IOException("Azure service error %s file: %s".formatted(action, location), exception);
throw new TrinoFileSystemException("Azure service error %s file: %s".formatted(action, location), exception);
}
throw new IOException("Error %s file: %s".formatted(action, location), exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemException;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoOutputFile;

Expand Down Expand Up @@ -150,7 +151,7 @@ public void deleteDirectory(Location location)
public void renameFile(Location source, Location target)
throws IOException
{
throw new IOException("GCS does not support renames");
throw new TrinoFileSystemException("GCS does not support renames");
}

@Override
Expand Down Expand Up @@ -241,7 +242,7 @@ public void createDirectory(Location location)
public void renameDirectory(Location source, Location target)
throws IOException
{
throw new IOException("GCS does not support directory renames");
throw new TrinoFileSystemException("GCS does not support directory renames");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
*/
package io.trino.filesystem.gcs;

import com.google.cloud.BaseServiceException;
import com.google.cloud.ReadChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystemException;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -36,12 +38,18 @@ private GcsUtils() {}
public static IOException handleGcsException(RuntimeException exception, String action, GcsLocation location)
throws IOException
{
if (exception instanceof BaseServiceException) {
throw new TrinoFileSystemException("GCS service error %s: %s".formatted(action, location), exception);
}
throw new IOException("Error %s: %s".formatted(action, location), exception);
}

public static IOException handleGcsException(RuntimeException exception, String action, Collection<Location> locations)
throws IOException
{
if (exception instanceof BaseServiceException) {
throw new TrinoFileSystemException("GCS service error %s: %s".formatted(action, locations), exception);
}
throw new IOException("Error %s: %s".formatted(action, locations), exception);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.trino.filesystem.FileIterator;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemException;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoOutputFile;
import software.amazon.awssdk.core.exception.SdkException;
Expand Down Expand Up @@ -100,7 +101,7 @@ public void deleteFile(Location location)
client.deleteObject(request);
}
catch (SdkException e) {
throw new IOException("Failed to delete file: " + location, e);
throw new TrinoFileSystemException("Failed to delete file: " + location, e);
}
}

Expand Down Expand Up @@ -158,7 +159,7 @@ private void deleteObjects(Collection<Location> locations)
}
}
catch (SdkException e) {
throw new IOException("Error while batch deleting files", e);
throw new TrinoFileSystemException("Error while batch deleting files", e);
}
}
}
Expand Down Expand Up @@ -206,7 +207,7 @@ private FileIterator listObjects(Location location, boolean includeDirectoryObje
return new S3FileIterator(s3Location, s3ObjectStream.iterator());
}
catch (SdkException e) {
throw new IOException("Failed to list location: " + location, e);
throw new TrinoFileSystemException("Failed to list location: " + location, e);
}
}

Expand Down Expand Up @@ -262,7 +263,7 @@ public Set<Location> listDirectories(Location location)
.collect(toImmutableSet());
}
catch (SdkException e) {
throw new IOException("Failed to list location: " + location, e);
throw new TrinoFileSystemException("Failed to list location: " + location, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.filesystem.s3;

import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystemException;
import io.trino.filesystem.TrinoInput;
import io.trino.filesystem.TrinoInputFile;
import io.trino.filesystem.TrinoInputStream;
Expand Down Expand Up @@ -130,7 +131,7 @@ private boolean headObject()
return false;
}
catch (SdkException e) {
throw new IOException("S3 HEAD request failed for file: " + location, e);
throw new TrinoFileSystemException("S3 HEAD request failed for file: " + location, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.filesystem.s3;

import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystemException;
import io.trino.filesystem.TrinoInputStream;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.AbortedException;
Expand Down Expand Up @@ -208,7 +209,7 @@ private void seekStream()
throw ex;
}
catch (SdkException e) {
throw new IOException("Failed to open S3 file: " + location, e);
throw new TrinoFileSystemException("Failed to open S3 file: " + location, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
*/
package io.trino.filesystem;

import com.google.common.base.Throwables;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.Optional;
Expand Down Expand Up @@ -223,4 +226,20 @@ Set<Location> listDirectories(Location location)
*/
Optional<Location> createTemporaryDirectory(Location targetPath, String temporaryPrefix, String relativePrefix)
throws IOException;

/**
* Checks whether given exception is unrecoverable, so that further retries won't help
* <p>
* By default, all third party (AWS, Azure, GCP) SDKs will retry appropriate exceptions
* (either client side IO errors, or 500/503), so there is no need to retry those additionally.
* <p>
* If any custom retry behavior is needed, it is advised to change SDK's retry handlers,
* rather than introducing outer retry loop, which combined with SDKs default retries,
* could lead to prolonged, unnecessary retries
*/
static boolean isUnrecoverableException(Throwable throwable)
{
return Throwables.getCausalChain(throwable).stream()
.anyMatch(t -> t instanceof TrinoFileSystemException || t instanceof FileNotFoundException);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.filesystem;

import java.io.IOException;

/**
* Unrecoverable file system exception.
* This exception is thrown for fatal errors, or after retries have already been performed,
* so additional retries must not be performed when this is caught.
*/
public class TrinoFileSystemException
extends IOException
{
public TrinoFileSystemException(String message, Throwable cause)
{
super(message, cause);
}

public TrinoFileSystemException(String message)
{
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public ReadSession create(ConnectorSession session, TableId remoteTable, List<St
.withMaxRetries(maxCreateReadSessionRetries)
.withBackoff(10, 500, MILLIS)
.onRetry(event -> log.debug("Request failed, retrying: %s", event.getLastException()))
.abortOn(failure -> !BigQueryUtil.isRetryable(failure))
.handleIf(BigQueryUtil::isRetryable)
Comment thread
oskar-szwajkowski marked this conversation as resolved.
Outdated
.build())
.get(() -> {
try {
Expand Down
Loading