Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ private HadoopCatalogLoader(
this.properties = Maps.newHashMap(properties);
}

@Override
@SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
public CatalogLoader clone() {
return new HadoopCatalogLoader(catalogName, new Configuration(hadoopConf.get()), properties);
}

@Override
public Catalog loadCatalog() {
return CatalogUtil.loadCatalog(
HadoopCatalog.class.getName(), catalogName, properties, hadoopConf.get());
}

@Override
@SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
public CatalogLoader clone() {
return new HadoopCatalogLoader(catalogName, new Configuration(hadoopConf.get()), properties);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.iceberg.flink.util.FlinkPackage;

class FlinkEnvironmentContext {

private FlinkEnvironmentContext() {}

public static void init() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,19 @@ public Table loadTable() {
return catalog.loadTable(TableIdentifier.parse(identifier));
}

@Override
@SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
public TableLoader clone() {
return new CatalogTableLoader(catalogLoader.clone(), TableIdentifier.parse(identifier));
}

@Override
public void close() throws IOException {
if (catalog instanceof Closeable) {
((Closeable) catalog).close();
}
}

@Override
@SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
public TableLoader clone() {
return new CatalogTableLoader(catalogLoader.clone(), TableIdentifier.parse(identifier));
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
// the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
if (checkpointId > maxCommittedCheckpointId) {
LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that we merged this one: 5a4761c

What happened with it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like a later backport unintentionally reverted it. I missed it during review. #6949

commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
} else {
Expand Down Expand Up @@ -288,6 +289,8 @@ private void commitPendingResult(
commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
}
continuousEmptyCheckpoints = 0;
} else {
LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.source;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
Expand Down Expand Up @@ -61,11 +60,6 @@ public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> {
private final boolean caseSensitive;
private final FlinkSourceFilter rowFilter;

public RowDataFileScanTaskReader(
Schema tableSchema, Schema projectedSchema, String nameMapping, boolean caseSensitive) {
this(tableSchema, projectedSchema, nameMapping, caseSensitive, Collections.emptyList());
}

public RowDataFileScanTaskReader(
Schema tableSchema,
Schema projectedSchema,
Expand All @@ -76,6 +70,7 @@ public RowDataFileScanTaskReader(
this.projectedSchema = projectedSchema;
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;

if (filters != null && !filters.isEmpty()) {
Expression combinedExpression =
filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.RichMapFunction;
Expand Down Expand Up @@ -125,7 +126,8 @@ public RewriteMap(
this.encryptionManager = encryptionManager;
this.taskWriterFactory = taskWriterFactory;
this.rowDataReader =
new RowDataFileScanTaskReader(schema, schema, nameMapping, caseSensitive);
new RowDataFileScanTaskReader(
schema, schema, nameMapping, caseSensitive, Collections.emptyList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.source.reader;

import java.util.Collections;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -57,28 +56,8 @@ public static AvroGenericRecordReaderFunction fromTable(Table table) {
null,
false,
table.io(),
table.encryption());
}

public AvroGenericRecordReaderFunction(
String name,
Configuration config,
Schema schema,
Schema projectedSchema,
String nameMapping,
boolean caseSensitive,
FileIO io,
EncryptionManager encryption) {
this(
name,
config,
schema,
projectedSchema,
nameMapping,
caseSensitive,
io,
encryption,
Collections.emptyList());
table.encryption(),
null);
}

public AvroGenericRecordReaderFunction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.flink.source.reader;

import java.util.Collections;
import java.util.List;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -48,33 +47,15 @@ public RowDataReaderFunction(
String nameMapping,
boolean caseSensitive,
FileIO io,
EncryptionManager encryption) {
this(
config,
tableSchema,
projectedSchema,
nameMapping,
caseSensitive,
io,
encryption,
Collections.emptyList());
}

public RowDataReaderFunction(
ReadableConfig config,
Schema schema,
Schema project,
String nameMapping,
boolean caseSensitive,
FileIO io,
EncryptionManager encryption,
List<Expression> filters) {
super(
new ArrayPoolDataIteratorBatcher<>(
config,
new RowDataRecordFactory(FlinkSchemaUtil.convert(readSchema(schema, project)))));
this.tableSchema = schema;
this.readSchema = readSchema(schema, project);
new RowDataRecordFactory(
FlinkSchemaUtil.convert(readSchema(tableSchema, projectedSchema)))));
this.tableSchema = tableSchema;
this.readSchema = readSchema(tableSchema, projectedSchema);
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
this.io = io;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void before() {

@After
public void clean() {
sql("DROP CATALOG IF EXISTS %s", catalogName);
dropCatalog(catalogName, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is not strictly needed for 1.15, as the change which caused the need for this is introduced to Flink 1.16. But it is ok to have it here too, to have less diverged code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. it is a little better to have less diverged code

}

@Parameterized.Parameters(name = "catalogName = {0} baseNamespace = {1}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,17 @@ protected void assertSameElements(String message, Iterable<Row> expected, Iterab
.as(message)
.containsExactlyInAnyOrderElementsOf(expected);
}

/**
* We can not drop currently used catalog after FLINK-29677, so we have make sure that we do not
* use the current catalog before dropping it. This method switches to the 'default_catalog' and
* drops the one requested.
*
* @param catalogName The catalog to drop
* @param ifExists If we should use the 'IF EXISTS' when dropping the catalog
*/
protected void dropCatalog(String catalogName, boolean ifExists) {
sql("USE CATALOG default_catalog");
sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void before() {
public void clean() {
sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
sql("DROP CATALOG IF EXISTS %s", CATALOG_NAME);
dropCatalog(CATALOG_NAME, true);
BoundedTableFactory.clearDataSets();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,6 @@ private void checkSQLQuery(Map<String, String> catalogProperties, File warehouse

sql("DROP TABLE test_table");
sql("DROP DATABASE test_db");
sql("DROP CATALOG test_catalog");
dropCatalog("test_catalog", false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@
import org.junit.rules.TemporaryFolder;

public class TestFlinkMergingMetrics extends TestMergingMetrics<RowData> {
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

@ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

@Rule
public final HadoopTableResource tableResource =
new HadoopTableResource(TEMPORARY_FOLDER, "test_db", "test_table", SCHEMA);
new HadoopTableResource(TEMP_FOLDER, "test_db", "test_table", SCHEMA);

public TestFlinkMergingMetrics(FileFormat fileFormat) {
super(fileFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.BaseCombinedScanTask;
Expand Down Expand Up @@ -83,7 +84,8 @@ public static FileScanTask createFileTask(

public static DataIterator<RowData> createDataIterator(CombinedScanTask combinedTask) {
return new DataIterator<>(
new RowDataFileScanTaskReader(TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true),
new RowDataFileScanTaskReader(
TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true, Collections.emptyList()),
combinedTask,
new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
new PlaintextEncryptionManager());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.flink.source.reader;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -106,7 +107,8 @@ private IcebergSourceReader createReader(
null,
true,
new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
new PlaintextEncryptionManager());
new PlaintextEncryptionManager(),
Collections.emptyList());
return new IcebergSourceReader<>(readerMetrics, readerFunction, readerContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.source.reader;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -55,7 +56,8 @@ protected ReaderFunction<RowData> readerFunction() {
null,
true,
new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
new PlaintextEncryptionManager());
new PlaintextEncryptionManager(),
Collections.emptyList());
}

@Override
Expand Down