diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index 3f504b63a255..150278774a3e 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -16,6 +16,7 @@ import com.google.common.annotations.VisibleForTesting; import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; +import io.airlift.configuration.DefunctConfig; import io.airlift.configuration.LegacyConfig; import io.airlift.units.DataSize; import io.airlift.units.Duration; @@ -36,6 +37,7 @@ import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.SECONDS; +@DefunctConfig("delta.experimental.ignore-checkpoint-write-failures") public class DeltaLakeConfig { public static final String VACUUM_MIN_RETENTION = "delta.vacuum.min-retention"; @@ -59,7 +61,6 @@ public class DeltaLakeConfig private boolean unsafeWritesEnabled; private boolean checkpointRowStatisticsWritingEnabled = true; private long defaultCheckpointWritingInterval = 10; - private boolean ignoreCheckpointWriteFailures; private Duration vacuumMinRetention = new Duration(7, DAYS); private Optional hiveCatalogName = Optional.empty(); private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS); @@ -249,18 +250,6 @@ public long getDefaultCheckpointWritingInterval() return defaultCheckpointWritingInterval; } - @Config("delta.experimental.ignore-checkpoint-write-failures") - public DeltaLakeConfig setIgnoreCheckpointWriteFailures(boolean ignoreCheckpointWriteFailures) - { - this.ignoreCheckpointWriteFailures = ignoreCheckpointWriteFailures; - return this; - } - - public boolean isIgnoreCheckpointWriteFailures() - { - return ignoreCheckpointWriteFailures; - } - @NotNull public Duration getVacuumMinRetention() { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 04792d1dfc5a..5a93adce5ba6 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -270,7 +270,6 @@ public class DeltaLakeMetadata private final TypeManager typeManager; private final CheckpointWriterManager checkpointWriterManager; private final long defaultCheckpointInterval; - private final boolean ignoreCheckpointWriteFailures; private final int domainCompactionThreshold; private final boolean unsafeWritesEnabled; private final JsonCodec dataFileInfoCodec; @@ -300,7 +299,6 @@ public DeltaLakeMetadata( NodeManager nodeManager, CheckpointWriterManager checkpointWriterManager, long defaultCheckpointInterval, - boolean ignoreCheckpointWriteFailures, boolean deleteSchemaLocationsFallback, DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider, ExtendedStatisticsAccess statisticsAccess, @@ -321,7 +319,6 @@ public DeltaLakeMetadata( this.nodeId = nodeManager.getCurrentNode().getNodeIdentifier(); this.checkpointWriterManager = requireNonNull(checkpointWriterManager, "checkpointWriterManager is null"); this.defaultCheckpointInterval = defaultCheckpointInterval; - this.ignoreCheckpointWriteFailures = ignoreCheckpointWriteFailures; this.deltaLakeRedirectionsProvider = requireNonNull(deltaLakeRedirectionsProvider, "deltaLakeRedirectionsProvider is null"); this.statisticsAccess = requireNonNull(statisticsAccess, "statisticsAccess is null"); this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback; @@ -1935,12 +1932,9 @@ private void writeCheckpointIfNeeded(ConnectorSession session, SchemaTableName t checkpointWriterManager.writeCheckpoint(session, snapshot); } catch (Exception e) { - if (ignoreCheckpointWriteFailures) { - LOG.warn(e, "Failed to write checkpoint for table %s for version %s", table, newVersion); - } - else { - throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Failed to write checkpoint for table %s for version %s", table, newVersion), e); - } + // We can't fail here as transaction was already committed, in case of INSERT this could result + // in inserting data twice if client saw an error and decided to retry + LOG.error(e, "Failed to write checkpoint for table %s for version %s", table, newVersion); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java index dcdb4ad1895e..293bcf970e3c 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadataFactory.java @@ -53,7 +53,6 @@ public class DeltaLakeMetadataFactory private final int domainCompactionThreshold; private final boolean unsafeWritesEnabled; private final long checkpointWritingInterval; - private final boolean ignoreCheckpointWriteFailures; private final long perTransactionMetastoreCacheMaximumSize; private final boolean deleteSchemaLocationsFallback; private final boolean useUniqueTableLocation; @@ -94,7 +93,6 @@ public DeltaLakeMetadataFactory( this.domainCompactionThreshold = deltaLakeConfig.getDomainCompactionThreshold(); this.unsafeWritesEnabled = deltaLakeConfig.getUnsafeWritesEnabled(); this.checkpointWritingInterval = deltaLakeConfig.getDefaultCheckpointWritingInterval(); - this.ignoreCheckpointWriteFailures = deltaLakeConfig.isIgnoreCheckpointWriteFailures(); this.perTransactionMetastoreCacheMaximumSize = deltaLakeConfig.getPerTransactionMetastoreCacheMaximumSize(); this.deleteSchemaLocationsFallback = deltaLakeConfig.isDeleteSchemaLocationsFallback(); this.useUniqueTableLocation = deltaLakeConfig.isUniqueTableLocation(); @@ -128,7 +126,6 @@ public DeltaLakeMetadata create(ConnectorIdentity identity) nodeManager, checkpointWriterManager, checkpointWritingInterval, - ignoreCheckpointWriteFailures, deleteSchemaLocationsFallback, deltaLakeRedirectionsProvider, statisticsAccess, diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index 2a091f3676d3..30b6adc67d4c 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -52,7 +52,6 @@ public void testDefaults() .setMaxPartitionsPerWriter(100) .setUnsafeWritesEnabled(false) .setDefaultCheckpointWritingInterval(10) - .setIgnoreCheckpointWriteFailures(false) .setCheckpointRowStatisticsWritingEnabled(true) .setVacuumMinRetention(new Duration(7, DAYS)) .setHiveCatalogName(null) @@ -84,7 +83,6 @@ public void testExplicitPropertyMappings() .put("delta.max-partitions-per-writer", "200") .put("delta.enable-non-concurrent-writes", "true") .put("delta.default-checkpoint-writing-interval", "15") - .put("delta.experimental.ignore-checkpoint-write-failures", "true") .put("delta.checkpoint-row-statistics-writing.enabled", "false") .put("delta.vacuum.min-retention", "13h") .put("delta.hive-catalog-name", "hive") @@ -113,7 +111,6 @@ public void testExplicitPropertyMappings() .setMaxPartitionsPerWriter(200) .setUnsafeWritesEnabled(true) .setDefaultCheckpointWritingInterval(15) - .setIgnoreCheckpointWriteFailures(true) .setCheckpointRowStatisticsWritingEnabled(false) .setVacuumMinRetention(new Duration(13, HOURS)) .setHiveCatalogName("hive")