From 0a2022a96c90c7c54f72ab671c8645998a71e1d7 Mon Sep 17 00:00:00 2001 From: Vikash Kumar Date: Wed, 14 Jun 2023 19:43:17 +0530 Subject: [PATCH] Don't retry table metadata read when fails with ValidationException --- .../trino/plugin/iceberg/IcebergMetadata.java | 5 ++- .../AbstractIcebergTableOperations.java | 7 +++- .../iceberg/BaseIcebergConnectorTest.java | 35 +++++++++++++++++++ 3 files changed, 45 insertions(+), 2 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index a989f00c434c..d86b65712e08 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -46,6 +46,7 @@ import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle; import io.trino.plugin.iceberg.procedure.IcebergTableProcedureId; import io.trino.plugin.iceberg.util.DataFileWithDeleteFiles; +import io.trino.spi.ErrorCode; import io.trino.spi.TrinoException; import io.trino.spi.block.Block; import io.trino.spi.connector.Assignment; @@ -374,7 +375,9 @@ public ConnectorTableHandle getTableHandle( return null; } catch (TrinoException e) { - if (e.getErrorCode().equals(ICEBERG_MISSING_METADATA.toErrorCode())) { + ErrorCode errorCode = e.getErrorCode(); + if (errorCode.equals(ICEBERG_MISSING_METADATA.toErrorCode()) + || errorCode.equals(ICEBERG_INVALID_METADATA.toErrorCode())) { return new CorruptedIcebergTableHandle(tableName, e); } throw e; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java index 84f4045c55d3..c6c43130fa69 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java @@ -24,6 +24,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.io.OutputFile; @@ -45,6 +46,7 @@ import static io.trino.plugin.hive.util.HiveClassNames.FILE_INPUT_FORMAT_CLASS; import static io.trino.plugin.hive.util.HiveClassNames.FILE_OUTPUT_FORMAT_CLASS; import static io.trino.plugin.hive.util.HiveClassNames.LAZY_SIMPLE_SERDE_CLASS; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_METADATA; import static io.trino.plugin.iceberg.IcebergUtil.METADATA_FOLDER_NAME; import static io.trino.plugin.iceberg.IcebergUtil.fixBrokenMetadataLocation; @@ -235,7 +237,7 @@ protected void refreshFromMetadataLocation(String newLocation) .withMaxRetries(20) .withBackoff(100, 5000, MILLIS, 4.0) .withMaxDuration(Duration.ofMinutes(10)) - .abortOn(AbstractIcebergTableOperations::isNotFoundException) + .abortOn(failure -> failure instanceof ValidationException || isNotFoundException(failure)) .build()) .get(() -> TableMetadataParser.read(fileIo, io().newInputFile(newLocation))); } @@ -243,6 +245,9 @@ protected void refreshFromMetadataLocation(String newLocation) if (isNotFoundException(failure)) { throw new TrinoException(ICEBERG_MISSING_METADATA, "Metadata not found in metadata location for table " + getSchemaTableName(), failure); } + if (failure instanceof ValidationException) { + throw new TrinoException(ICEBERG_INVALID_METADATA, "Invalid metadata file for table " + getSchemaTableName(), failure); + } throw failure; } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 7218d1a94eea..7a3243be4556 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -13,6 +13,10 @@ */ package io.trino.plugin.iceberg; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; @@ -58,6 +62,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.util.JsonUtil; import org.intellij.lang.annotations.Language; import org.testng.SkipException; import org.testng.annotations.BeforeClass; @@ -135,6 +140,7 @@ import static io.trino.transaction.TransactionBuilder.transaction; import static java.lang.String.format; import static java.lang.String.join; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.time.ZoneOffset.UTC; import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME; import static java.util.Collections.nCopies; @@ -6829,6 +6835,35 @@ public void testDropCorruptedTableWithHiveRedirection() assertFalse(fileSystem.listFiles(tableLocation).hasNext(), "Table location should not exist"); } + @Test(timeOut = 10_000) + public void testNoRetryWhenMetadataFileInvalid() + throws Exception + { + String tableName = "test_no_retry_when_metadata_file_invalid_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 id", 1); + + String tableLocation = getTableLocation(tableName); + String metadataFileLocation = getLatestMetadataLocation(fileSystem, tableLocation); + + ObjectMapper mapper = JsonUtil.mapper(); + JsonNode jsonNode = mapper.readValue(fileSystem.newInputFile(Location.of(metadataFileLocation)).newStream(), JsonNode.class); + ArrayNode fieldsNode = (ArrayNode) jsonNode.get("schemas").get(0).get("fields"); + ObjectNode newFieldNode = fieldsNode.get(0).deepCopy(); + // Add duplicate field to produce validation error while reading the metadata file + fieldsNode.add(newFieldNode); + + String modifiedJson = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode); + try (OutputStream outputStream = fileSystem.newOutputFile(Location.of(metadataFileLocation)).createOrOverwrite()) { + // Corrupt metadata file by overwriting the invalid metadata content + outputStream.write(modifiedJson.getBytes(UTF_8)); + } + assertThatThrownBy(() -> query("SELECT * FROM " + tableName)) + .hasMessage("Invalid metadata file for table tpch.%s".formatted(tableName)); + + assertUpdate("DROP TABLE " + tableName); + } + @Override protected void verifyTableNameLengthFailurePermissible(Throwable e) {