From d5e93aa6b362287957b903ad7529ae7025a8a478 Mon Sep 17 00:00:00 2001 From: Vikash Kumar Date: Wed, 18 Jan 2023 18:57:27 +0530 Subject: [PATCH] Use Failsafe to retry instead iceberg's Tasks in iceberg connector --- .../AbstractIcebergTableOperations.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java index e6e9a2fb15b0..81577ba63a0d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java @@ -17,6 +17,8 @@ import io.trino.plugin.hive.metastore.StorageFormat; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; +import net.jodah.failsafe.Failsafe; +import net.jodah.failsafe.RetryPolicy; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.exceptions.CommitFailedException; @@ -25,16 +27,15 @@ import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.types.Types.NestedField; -import org.apache.iceberg.util.Tasks; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; +import java.time.Duration; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; -import java.util.concurrent.atomic.AtomicReference; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -47,6 +48,7 @@ import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider; import static io.trino.plugin.iceberg.IcebergUtil.parseVersion; import static java.lang.String.format; +import static java.time.temporal.ChronoUnit.MILLIS; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; import static org.apache.iceberg.TableMetadataParser.getFileExtension; @@ -213,21 +215,20 @@ protected void refreshFromMetadataLocation(String newLocation) return; } - AtomicReference newMetadata = new AtomicReference<>(); - Tasks.foreach(newLocation) - .retry(20) - .exponentialBackoff(100, 5000, 600000, 4.0) - .stopRetryOn(org.apache.iceberg.exceptions.NotFoundException.class) // qualified name, as this is NOT the io.trino.spi.connector.NotFoundException - .run(metadataLocation -> newMetadata.set( - TableMetadataParser.read(fileIo, io().newInputFile(metadataLocation)))); + TableMetadata newMetadata = Failsafe.with(new RetryPolicy<>() + .withMaxRetries(20) + .withBackoff(100, 5000, MILLIS, 4.0) + .withMaxDuration(Duration.ofMinutes(10)) + .abortOn(org.apache.iceberg.exceptions.NotFoundException.class)) // qualified name, as this is NOT the io.trino.spi.connector.NotFoundException + .get(() -> TableMetadataParser.read(fileIo, io().newInputFile(newLocation))); - String newUUID = newMetadata.get().uuid(); + String newUUID = newMetadata.uuid(); if (currentMetadata != null) { checkState(newUUID == null || newUUID.equals(currentMetadata.uuid()), "Table UUID does not match: current=%s != refreshed=%s", currentMetadata.uuid(), newUUID); } - currentMetadata = newMetadata.get(); + currentMetadata = newMetadata; currentMetadataLocation = newLocation; version = parseVersion(newLocation); shouldRefresh = false;