diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java index f4acb030b48e..18473bf4f190 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java @@ -81,18 +81,18 @@ private HadoopCatalogLoader( this.properties = Maps.newHashMap(properties); } - @Override - @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) - public CatalogLoader clone() { - return new HadoopCatalogLoader(catalogName, new Configuration(hadoopConf.get()), properties); - } - @Override public Catalog loadCatalog() { return CatalogUtil.loadCatalog( HadoopCatalog.class.getName(), catalogName, properties, hadoopConf.get()); } + @Override + @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) + public CatalogLoader clone() { + return new HadoopCatalogLoader(catalogName, new Configuration(hadoopConf.get()), properties); + } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkEnvironmentContext.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkEnvironmentContext.java index 49602eea45c7..f35bb577fbba 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkEnvironmentContext.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkEnvironmentContext.java @@ -22,7 +22,6 @@ import org.apache.iceberg.flink.util.FlinkPackage; class FlinkEnvironmentContext { - private FlinkEnvironmentContext() {} public static void init() { diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java index 6646338b629b..f18c5ccda1f6 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -121,12 +121,6 @@ public Table loadTable() { return catalog.loadTable(TableIdentifier.parse(identifier)); } - @Override - @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) - public TableLoader clone() { - return new CatalogTableLoader(catalogLoader.clone(), TableIdentifier.parse(identifier)); - } - @Override public void close() throws IOException { if (catalog instanceof Closeable) { @@ -134,6 +128,12 @@ public void close() throws IOException { } } + @Override + @SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"}) + public TableLoader clone() { + return new CatalogTableLoader(catalogLoader.clone(), TableIdentifier.parse(identifier)); + } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 22b4dc9d21bf..e11975b3efa9 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -233,6 +233,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { // the files, // Besides, we need to maintain the max-committed-checkpoint-id to be increasing. if (checkpointId > maxCommittedCheckpointId) { + LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId); commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, checkpointId); this.maxCommittedCheckpointId = checkpointId; } else { @@ -288,6 +289,8 @@ private void commitPendingResult( commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId); } continuousEmptyCheckpoints = 0; + } else { + LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId); } } diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index c2c587267c09..88364f4e87b1 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.flink.source; -import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.flink.annotation.Internal; @@ -61,11 +60,6 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader { private final boolean caseSensitive; private final FlinkSourceFilter rowFilter; - public RowDataFileScanTaskReader( - Schema tableSchema, Schema projectedSchema, String nameMapping, boolean caseSensitive) { - this(tableSchema, projectedSchema, nameMapping, caseSensitive, Collections.emptyList()); - } - public RowDataFileScanTaskReader( Schema tableSchema, Schema projectedSchema, @@ -76,6 +70,7 @@ public RowDataFileScanTaskReader( this.projectedSchema = projectedSchema; this.nameMapping = nameMapping; this.caseSensitive = caseSensitive; + if (filters != null && !filters.isEmpty()) { Expression combinedExpression = filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and); diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java index 23665b7c9f0f..c958604c004a 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import org.apache.flink.api.common.functions.RichMapFunction; @@ -125,7 +126,8 @@ public RewriteMap( this.encryptionManager = encryptionManager; this.taskWriterFactory = taskWriterFactory; this.rowDataReader = - new RowDataFileScanTaskReader(schema, schema, nameMapping, caseSensitive); + new RowDataFileScanTaskReader( + schema, schema, nameMapping, caseSensitive, Collections.emptyList()); } @Override diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java index b1ce16674817..66e59633fff2 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.flink.source.reader; -import java.util.Collections; import java.util.List; import org.apache.avro.generic.GenericRecord; import org.apache.flink.configuration.Configuration; @@ -57,28 +56,8 @@ public static AvroGenericRecordReaderFunction fromTable(Table table) { null, false, table.io(), - table.encryption()); - } - - public AvroGenericRecordReaderFunction( - String name, - Configuration config, - Schema schema, - Schema projectedSchema, - String nameMapping, - boolean caseSensitive, - FileIO io, - EncryptionManager encryption) { - this( - name, - config, - schema, - projectedSchema, - nameMapping, - caseSensitive, - io, - encryption, - Collections.emptyList()); + table.encryption(), + null); } public AvroGenericRecordReaderFunction( diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java index dcf84305f8c0..5d0a00954e7a 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.flink.source.reader; -import java.util.Collections; import java.util.List; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.data.RowData; @@ -48,33 +47,15 @@ public RowDataReaderFunction( String nameMapping, boolean caseSensitive, FileIO io, - EncryptionManager encryption) { - this( - config, - tableSchema, - projectedSchema, - nameMapping, - caseSensitive, - io, - encryption, - Collections.emptyList()); - } - - public RowDataReaderFunction( - ReadableConfig config, - Schema schema, - Schema project, - String nameMapping, - boolean caseSensitive, - FileIO io, EncryptionManager encryption, List filters) { super( new ArrayPoolDataIteratorBatcher<>( config, - new RowDataRecordFactory(FlinkSchemaUtil.convert(readSchema(schema, project))))); - this.tableSchema = schema; - this.readSchema = readSchema(schema, project); + new RowDataRecordFactory( + FlinkSchemaUtil.convert(readSchema(tableSchema, projectedSchema))))); + this.tableSchema = tableSchema; + this.readSchema = readSchema(tableSchema, projectedSchema); this.nameMapping = nameMapping; this.caseSensitive = caseSensitive; this.io = io; diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java index d4da736dcd83..74c5d343e996 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java @@ -65,7 +65,7 @@ public void before() { @After public void clean() { - sql("DROP CATALOG IF EXISTS %s", catalogName); + dropCatalog(catalogName, true); } @Parameterized.Parameters(name = "catalogName = {0} baseNamespace = {1}") diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java index 95471ac88257..8076e0ec76f8 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java @@ -113,4 +113,17 @@ protected void assertSameElements(String message, Iterable expected, Iterab .as(message) .containsExactlyInAnyOrderElementsOf(expected); } + + /** + * We can not drop currently used catalog after FLINK-29677, so we have make sure that we do not + * use the current catalog before dropping it. This method switches to the 'default_catalog' and + * drops the one requested. + * + * @param catalogName The catalog to drop + * @param ifExists If we should use the 'IF EXISTS' when dropping the catalog + */ + protected void dropCatalog(String catalogName, boolean ifExists) { + sql("USE CATALOG default_catalog"); + sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName); + } } diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java index 975d77cb3565..8e9066e391c9 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java @@ -99,7 +99,7 @@ public void before() { public void clean() { sql("DROP TABLE IF EXISTS %s", TABLE_NAME); sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME); - sql("DROP CATALOG IF EXISTS %s", CATALOG_NAME); + dropCatalog(CATALOG_NAME, true); BoundedTableFactory.clearDataSets(); } diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java index 64746356636b..8f238587d30d 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkHiveCatalog.java @@ -100,6 +100,6 @@ private void checkSQLQuery(Map catalogProperties, File warehouse sql("DROP TABLE test_table"); sql("DROP DATABASE test_db"); - sql("DROP CATALOG test_catalog"); + dropCatalog("test_catalog", false); } } diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java index 30ed8a9742e1..1d52acb2fe7b 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMergingMetrics.java @@ -37,11 +37,12 @@ import org.junit.rules.TemporaryFolder; public class TestFlinkMergingMetrics extends TestMergingMetrics { - @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); @Rule public final HadoopTableResource tableResource = - new HadoopTableResource(TEMPORARY_FOLDER, "test_db", "test_table", SCHEMA); + new HadoopTableResource(TEMP_FOLDER, "test_db", "test_table", SCHEMA); public TestFlinkMergingMetrics(FileFormat fileFormat) { super(fileFormat); diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java index f2e89428a9ff..f9ceaf842263 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; +import java.util.Collections; import java.util.List; import org.apache.flink.table.data.RowData; import org.apache.iceberg.BaseCombinedScanTask; @@ -83,7 +84,8 @@ public static FileScanTask createFileTask( public static DataIterator createDataIterator(CombinedScanTask combinedTask) { return new DataIterator<>( - new RowDataFileScanTaskReader(TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true), + new RowDataFileScanTaskReader( + TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true, Collections.emptyList()), combinedTask, new HadoopFileIO(new org.apache.hadoop.conf.Configuration()), new PlaintextEncryptionManager()); diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java index ddc144be883c..56af0caf1298 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceReader.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.source.reader; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.configuration.Configuration; @@ -106,7 +107,8 @@ private IcebergSourceReader createReader( null, true, new HadoopFileIO(new org.apache.hadoop.conf.Configuration()), - new PlaintextEncryptionManager()); + new PlaintextEncryptionManager(), + Collections.emptyList()); return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext); } } diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java index aee271a3a7b8..d063ad7f4a80 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestRowDataReaderFunction.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.source.reader; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import org.apache.flink.configuration.Configuration; @@ -55,7 +56,8 @@ protected ReaderFunction readerFunction() { null, true, new HadoopFileIO(new org.apache.hadoop.conf.Configuration()), - new PlaintextEncryptionManager()); + new PlaintextEncryptionManager(), + Collections.emptyList()); } @Override