diff --git a/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java b/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java index 15df71da1500..a7fe64afa535 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java +++ b/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java @@ -24,6 +24,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; /** * Serializable loader to load an Iceberg {@link Catalog}. @@ -33,10 +34,59 @@ public interface CatalogLoader extends Serializable { Catalog loadCatalog(Configuration hadoopConf); static CatalogLoader hadoop(String name, String warehouseLocation) { - return conf -> new HadoopCatalog(name, conf, warehouseLocation); + return new HadoopCatalogLoader(name, warehouseLocation); } static CatalogLoader hive(String name, String uri, int clientPoolSize) { - return conf -> new HiveCatalog(name, uri, clientPoolSize, conf); + return new HiveCatalogLoader(name, uri, clientPoolSize); + } + + class HadoopCatalogLoader implements CatalogLoader { + private final String catalogName; + private final String warehouseLocation; + + private HadoopCatalogLoader(String catalogName, String warehouseLocation) { + this.catalogName = catalogName; + this.warehouseLocation = warehouseLocation; + } + + @Override + public Catalog loadCatalog(Configuration hadoopConf) { + return new HadoopCatalog(catalogName, hadoopConf, warehouseLocation); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("catalogName", catalogName) + .add("warehouseLocation", warehouseLocation) + .toString(); + } + } + + class HiveCatalogLoader implements CatalogLoader { + private final String catalogName; + private final String uri; + private final int clientPoolSize; + + private HiveCatalogLoader(String catalogName, String uri, int clientPoolSize) { + this.catalogName = catalogName; + this.uri = uri; + this.clientPoolSize = clientPoolSize; + } + + @Override + public Catalog loadCatalog(Configuration hadoopConf) { + return new HiveCatalog(catalogName, uri, clientPoolSize, hadoopConf); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("catalogName", catalogName) + .add("uri", uri) + .add("clientPoolSize", clientPoolSize) + .toString(); + } } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index e37db848f7a3..82b154df8f6a 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.apache.flink.table.api.TableSchema; @@ -48,6 +49,7 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.TableFactory; import org.apache.flink.util.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CachingCatalog; @@ -129,7 +131,7 @@ private Namespace toNamespace(String database) { return Namespace.of(namespace); } - private TableIdentifier toIdentifier(ObjectPath path) { + TableIdentifier toIdentifier(ObjectPath path) { return TableIdentifier.of(toNamespace(path.getDatabaseName()), path.getObjectName()); } @@ -292,7 +294,7 @@ public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException return toCatalogTable(table); } - private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException { + Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException { try { return icebergCatalog.loadTable(toIdentifier(tablePath)); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { @@ -505,6 +507,19 @@ static CatalogTable toCatalogTable(Table table) { return new CatalogTableImpl(schema, partitionKeys, table.properties(), null); } + @Override + public Optional getTableFactory() { + return Optional.of(new FlinkTableFactory(this)); + } + + CatalogLoader getCatalogLoader() { + return catalogLoader; + } + + Configuration getHadoopConf() { + return this.hadoopConf; + } + // ------------------------------ Unsupported methods --------------------------------------------- @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java new file mode 100644 index 000000000000..49a94f54833c --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.util.List; +import java.util.Map; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.StreamTableSinkFactory; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.utils.TableSchemaUtils; + +public class FlinkTableFactory implements StreamTableSinkFactory { + private final FlinkCatalog catalog; + + public FlinkTableFactory(FlinkCatalog catalog) { + this.catalog = catalog; + } + + @Override + public StreamTableSink createTableSink(Context context) { + ObjectPath objectPath = context.getObjectIdentifier().toObjectPath(); + TableLoader tableLoader = createTableLoader(objectPath); + TableSchema tableSchema = getPhysicalSchema(context); + return new IcebergTableSink(context.isBounded(), tableLoader, catalog.getHadoopConf(), tableSchema); + } + + @Override + public Map requiredContext() { + throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI"); + } + + @Override + public List supportedProperties() { + throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI"); + } + + private TableSchema getPhysicalSchema(Context context) { + return TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); + } + + private TableLoader createTableLoader(ObjectPath objectPath) { + return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath)); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java b/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java new file mode 100644 index 000000000000..7bd7871b8789 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.util.Map; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.sinks.OverwritableTableSink; +import org.apache.flink.table.sinks.PartitionableTableSink; +import org.apache.flink.table.sinks.TableSink; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.flink.sink.FlinkSink; + +public class IcebergTableSink implements AppendStreamTableSink, OverwritableTableSink, PartitionableTableSink { + private final boolean isBounded; + private final TableLoader tableLoader; + private final Configuration hadoopConf; + private final TableSchema tableSchema; + + private boolean overwrite = false; + + public IcebergTableSink(boolean isBounded, TableLoader tableLoader, Configuration hadoopConf, + TableSchema tableSchema) { + this.isBounded = isBounded; + this.tableLoader = tableLoader; + this.hadoopConf = hadoopConf; + this.tableSchema = tableSchema; + } + + @Override + public DataStreamSink consumeDataStream(DataStream dataStream) { + Preconditions.checkState(!overwrite || isBounded, "Unbounded data stream doesn't support overwrite operation."); + + return FlinkSink.forRowData(dataStream) + .tableLoader(tableLoader) + .hadoopConf(hadoopConf) + .tableSchema(tableSchema) + .overwrite(overwrite) + .build(); + } + + @Override + public DataType getConsumedDataType() { + return tableSchema.toRowDataType().bridgedTo(RowData.class); + } + + @Override + public TableSchema getTableSchema() { + return this.tableSchema; + } + + @Override + public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + // This method has been deprecated and it will be removed in future version, so left the empty implementation here. + return this; + } + + @Override + public void setOverwrite(boolean overwrite) { + this.overwrite = overwrite; + } + + @Override + public void setStaticPartition(Map partitions) { + // The flink's PartitionFanoutWriter will handle the static partition write policy automatically. + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java index 540cb5b77990..431f4af2eb63 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java +++ b/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -27,6 +27,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; /** * Serializable loader to load an Iceberg {@link Table}. @@ -71,6 +72,13 @@ public Table loadTable() { @Override public void close() { } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("location", location) + .toString(); + } } class CatalogTableLoader implements TableLoader { @@ -103,5 +111,13 @@ public void close() throws IOException { ((Closeable) catalog).close(); } } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("tableIdentifier", identifier) + .add("catalogLoader", catalogLoader) + .toString(); + } } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 96ce0ba6d661..68c9e8c108f5 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -19,6 +19,8 @@ package org.apache.iceberg.flink.sink; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Locale; import java.util.Map; import org.apache.flink.api.common.functions.MapFunction; @@ -110,6 +112,7 @@ public static class Builder { private Configuration hadoopConf; private Table table; private TableSchema tableSchema; + private boolean overwrite = false; private Builder() { } @@ -155,16 +158,29 @@ public Builder tableSchema(TableSchema newTableSchema) { return this; } + public Builder overwrite(boolean newOverwrite) { + this.overwrite = newOverwrite; + return this; + } + @SuppressWarnings("unchecked") public DataStreamSink build() { Preconditions.checkArgument(rowDataInput != null, "Please use forRowData() to initialize the input DataStream."); - Preconditions.checkNotNull(table, "Table shouldn't be null"); Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null"); Preconditions.checkNotNull(hadoopConf, "Hadoop configuration shouldn't be null"); + if (table == null) { + tableLoader.open(hadoopConf); + try (TableLoader loader = tableLoader) { + this.table = loader.loadTable(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e); + } + } + IcebergStreamWriter streamWriter = createStreamWriter(table, tableSchema); - IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, hadoopConf); + IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, hadoopConf, overwrite); DataStream returnStream = rowDataInput .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(DataFile.class), streamWriter) diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index e0cfbafff765..c1bb2ecdef4e 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -39,7 +39,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; +import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.hadoop.SerializableConfiguration; @@ -70,6 +72,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator // TableLoader to load iceberg table lazily. private final TableLoader tableLoader; private final SerializableConfiguration hadoopConf; + private final boolean replacePartitions; // A sorted map to maintain the completed data files for each pending checkpointId (which have not been committed // to iceberg table). We need a sorted map here because there's possible that few checkpoints snapshot failed, for @@ -92,9 +95,10 @@ class IcebergFilesCommitter extends AbstractStreamOperator private static final ListStateDescriptor>> STATE_DESCRIPTOR = buildStateDescriptor(); private transient ListState>> checkpointsState; - IcebergFilesCommitter(TableLoader tableLoader, Configuration hadoopConf) { + IcebergFilesCommitter(TableLoader tableLoader, Configuration hadoopConf, boolean replacePartitions) { this.tableLoader = tableLoader; this.hadoopConf = new SerializableConfiguration(hadoopConf); + this.replacePartitions = replacePartitions; } @Override @@ -164,16 +168,51 @@ private void commitUpToCheckpoint(long checkpointId) { pendingDataFiles.addAll(dataFiles); } - AppendFiles appendFiles = table.newAppend(); - pendingDataFiles.forEach(appendFiles::appendFile); - appendFiles.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); - appendFiles.set(FLINK_JOB_ID, flinkJobId); - appendFiles.commit(); + if (replacePartitions) { + replacePartitions(pendingDataFiles, checkpointId); + } else { + append(pendingDataFiles, checkpointId); + } // Clear the committed data files from dataFilesPerCheckpoint. pendingFileMap.clear(); } + private void replacePartitions(List dataFiles, long checkpointId) { + ReplacePartitions dynamicOverwrite = table.newReplacePartitions(); + + int numFiles = 0; + for (DataFile file : dataFiles) { + numFiles += 1; + dynamicOverwrite.addFile(file); + } + + commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite", checkpointId); + } + + private void append(List dataFiles, long checkpointId) { + AppendFiles appendFiles = table.newAppend(); + + int numFiles = 0; + for (DataFile file : dataFiles) { + numFiles += 1; + appendFiles.appendFile(file); + } + + commitOperation(appendFiles, numFiles, "append", checkpointId); + } + + private void commitOperation(SnapshotUpdate operation, int numFiles, String description, long checkpointId) { + LOG.info("Committing {} with {} files to table {}", description, numFiles, table); + operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); + operation.set(FLINK_JOB_ID, flinkJobId); + + long start = System.currentTimeMillis(); + operation.commit(); // abort is automatically called if this fails. + long duration = System.currentTimeMillis() - start; + LOG.info("Committed in {} ms", duration); + } + @Override public void processElement(StreamRecord element) { this.dataFilesOfCurrentCheckpoint.add(element.getValue()); diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java index ab2230e364d7..b3d4e496f4f5 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java +++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkCatalogTestBase.java @@ -65,16 +65,15 @@ public static void dropWarehouse() { } @Parameterized.Parameters - public static Object[][] parameters() { - return new Object[][] { - new Object[] { "testhive", new String[0] }, - new Object[] { "testhadoop", new String[0] }, - new Object[] { "testhadoop_basenamespace", new String[] { "l0", "l1" }}, - }; + public static Iterable parameters() { + return Lists.newArrayList( + new Object[] {"testhive", new String[0]}, + new Object[] {"testhadoop", new String[0]}, + new Object[] {"testhadoop_basenamespace", new String[] {"l0", "l1"}} + ); } - protected final TableEnvironment tEnv = - TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); + private volatile TableEnvironment tEnv = null; protected final String catalogName; protected final String[] baseNamespace; @@ -109,7 +108,7 @@ protected org.apache.flink.table.catalog.Catalog createCatalog( return super.createCatalog(name, properties, hiveConf); } }; - tEnv.registerCatalog( + getTableEnv().registerCatalog( catalogName, flinkCatalogs.computeIfAbsent(catalogName, k -> factory.createCatalog(k, config))); @@ -117,8 +116,22 @@ protected org.apache.flink.table.catalog.Catalog createCatalog( this.icebergNamespace = Namespace.of(ArrayUtils.concat(baseNamespace, new String[] { DATABASE })); } + protected TableEnvironment getTableEnv() { + if (tEnv == null) { + synchronized (this) { + if (tEnv == null) { + this.tEnv = TableEnvironment.create(EnvironmentSettings + .newInstance() + .useBlinkPlanner() + .inBatchMode().build()); + } + } + } + return tEnv; + } + public List sql(String query, Object... args) { - TableResult tableResult = tEnv.executeSql(String.format(query, args)); + TableResult tableResult = getTableEnv().executeSql(String.format(query, args)); tableResult.getJobClient().ifPresent(c -> { try { c.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get(); diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java index b377e54cde07..3eb7f1643320 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java +++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java @@ -126,12 +126,16 @@ public static void assertTableRows(String tablePath, List expected) thr assertTableRecords(tablePath, expectedRecords); } - public static void assertTableRecords(String tablePath, List expected) throws IOException { - Preconditions.checkArgument(expected != null, "expected records shouldn't be null"); - Table newTable = new HadoopTables().load(tablePath); - try (CloseableIterable iterable = IcebergGenerics.read(newTable).build()) { + public static void assertTableRecords(Table table, List expected) throws IOException { + table.refresh(); + try (CloseableIterable iterable = IcebergGenerics.read(table).build()) { Assert.assertEquals("Should produce the expected record", Sets.newHashSet(expected), Sets.newHashSet(iterable)); } } + + public static void assertTableRecords(String tablePath, List expected) throws IOException { + Preconditions.checkArgument(expected != null, "expected records shouldn't be null"); + assertTableRecords(new HadoopTables().load(tablePath), expected); + } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java index a763ad27d1e6..5b1a2ca26730 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogDatabase.java @@ -60,8 +60,9 @@ public void testCreateNamespace() { public void testDefaultDatabase() { sql("USE CATALOG %s", catalogName); - Assert.assertEquals("Should use the current catalog", tEnv.getCurrentCatalog(), catalogName); - Assert.assertEquals("Should use the configured default namespace", tEnv.getCurrentDatabase(), "default"); + Assert.assertEquals("Should use the current catalog", getTableEnv().getCurrentCatalog(), catalogName); + Assert.assertEquals("Should use the configured default namespace", + getTableEnv().getCurrentDatabase(), "default"); } @Test diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index ef8d72a4270b..934cbf8e8514 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -83,8 +83,8 @@ public void testGetTable() { Arrays.asList( TableColumn.of("id", DataTypes.BIGINT()), TableColumn.of("strV", DataTypes.STRING())), - tEnv.from("tl").getSchema().getTableColumns()); - Assert.assertTrue(tEnv.getCatalog(catalogName).get().tableExists(ObjectPath.fromString("db.tl"))); + getTableEnv().from("tl").getSchema().getTableColumns()); + Assert.assertTrue(getTableEnv().getCatalog(catalogName).get().tableExists(ObjectPath.fromString("db.tl"))); } @Test @@ -99,16 +99,16 @@ public void testRenameTable() { "Should fail if trying to get a nonexistent table", ValidationException.class, "Table `tl` was not found.", - () -> tEnv.from("tl") + () -> getTableEnv().from("tl") ); Assert.assertEquals( Collections.singletonList(TableColumn.of("id", DataTypes.BIGINT())), - tEnv.from("tl2").getSchema().getTableColumns()); + getTableEnv().from("tl2").getSchema().getTableColumns()); } @Test public void testCreateTable() throws TableNotExistException { - tEnv.executeSql("CREATE TABLE tl(id BIGINT)"); + sql("CREATE TABLE tl(id BIGINT)"); Table table = table("tl"); Assert.assertEquals( @@ -125,7 +125,7 @@ public void testCreateTable() throws TableNotExistException { public void testCreateTableLocation() { Assume.assumeFalse("HadoopCatalog does not support creating table with location", isHadoopCatalog); - tEnv.executeSql("CREATE TABLE tl(id BIGINT) WITH ('location'='/tmp/location')"); + sql("CREATE TABLE tl(id BIGINT) WITH ('location'='/tmp/location')"); Table table = table("tl"); Assert.assertEquals( @@ -137,7 +137,7 @@ public void testCreateTableLocation() { @Test public void testCreatePartitionTable() throws TableNotExistException { - tEnv.executeSql("CREATE TABLE tl(id BIGINT, dt STRING) PARTITIONED BY(dt)"); + sql("CREATE TABLE tl(id BIGINT, dt STRING) PARTITIONED BY(dt)"); Table table = table("tl"); Assert.assertEquals( @@ -173,24 +173,24 @@ public void testLoadTransformPartitionTable() throws TableNotExistException { @Test public void testAlterTable() throws TableNotExistException { - tEnv.executeSql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')"); + sql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')"); Map properties = Maps.newHashMap(); properties.put("oldK", "oldV"); // new - tEnv.executeSql("ALTER TABLE tl SET('newK'='newV')"); + sql("ALTER TABLE tl SET('newK'='newV')"); properties.put("newK", "newV"); Assert.assertEquals(properties, table("tl").properties()); // update old - tEnv.executeSql("ALTER TABLE tl SET('oldK'='oldV2')"); + sql("ALTER TABLE tl SET('oldK'='oldV2')"); properties.put("oldK", "oldV2"); Assert.assertEquals(properties, table("tl").properties()); // remove property CatalogTable catalogTable = catalogTable("tl"); properties.remove("oldK"); - tEnv.getCatalog(tEnv.getCurrentCatalog()).get().alterTable( + getTableEnv().getCatalog(getTableEnv().getCurrentCatalog()).get().alterTable( new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); Assert.assertEquals(properties, table("tl").properties()); } @@ -199,14 +199,14 @@ public void testAlterTable() throws TableNotExistException { public void testRelocateTable() { Assume.assumeFalse("HadoopCatalog does not support relocate table", isHadoopCatalog); - tEnv.executeSql("CREATE TABLE tl(id BIGINT)"); - tEnv.executeSql("ALTER TABLE tl SET('location'='/tmp/location')"); + sql("CREATE TABLE tl(id BIGINT)"); + sql("ALTER TABLE tl SET('location'='/tmp/location')"); Assert.assertEquals("/tmp/location", table("tl").location()); } @Test public void testSetCurrentAndCherryPickSnapshotId() { - tEnv.executeSql("CREATE TABLE tl(c1 INT, c2 STRING, c3 STRING) PARTITIONED BY (c1)"); + sql("CREATE TABLE tl(c1 INT, c2 STRING, c3 STRING) PARTITIONED BY (c1)"); Table table = table("tl"); @@ -249,11 +249,11 @@ public void testSetCurrentAndCherryPickSnapshotId() { .commit(); // test cherry pick - tEnv.executeSql(String.format("ALTER TABLE tl SET('cherry-pick-snapshot-id'='%s')", staged.snapshotId())); + sql("ALTER TABLE tl SET('cherry-pick-snapshot-id'='%s')", staged.snapshotId()); validateTableFiles(table, fileB, replacementFile); // test set current snapshot - tEnv.executeSql(String.format("ALTER TABLE tl SET('current-snapshot-id'='%s')", snapshotId)); + sql("ALTER TABLE tl SET('current-snapshot-id'='%s')", snapshotId); validateTableFiles(table, fileA); } @@ -271,6 +271,7 @@ private Table table(String name) { } private CatalogTable catalogTable(String name) throws TableNotExistException { - return (CatalogTable) tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTable(new ObjectPath(DATABASE, name)); + return (CatalogTable) getTableEnv().getCatalog(getTableEnv().getCurrentCatalog()).get() + .getTable(new ObjectPath(DATABASE, name)); } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java new file mode 100644 index 000000000000..0e211584e2a4 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.util.List; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Expressions; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestFlinkTableSink extends FlinkCatalogTestBase { + private static final String TABLE_NAME = "test_table"; + private TableEnvironment tEnv; + private Table icebergTable; + + private final FileFormat format; + private final boolean isStreamingJob; + + @Parameterized.Parameters(name = "{index}: catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}") + public static Iterable parameters() { + List parameters = Lists.newArrayList(); + for (FileFormat format : new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) { + for (Boolean isStreaming : new Boolean[] {true, false}) { + for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) { + String catalogName = (String) catalogParams[0]; + String[] baseNamespace = (String[]) catalogParams[1]; + parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming}); + } + } + } + return parameters; + } + + public TestFlinkTableSink(String catalogName, String[] baseNamespace, FileFormat format, Boolean isStreamingJob) { + super(catalogName, baseNamespace); + this.format = format; + this.isStreamingJob = isStreamingJob; + } + + @Override + protected TableEnvironment getTableEnv() { + if (tEnv == null) { + synchronized (this) { + EnvironmentSettings.Builder settingsBuilder = EnvironmentSettings + .newInstance() + .useBlinkPlanner(); + if (isStreamingJob) { + settingsBuilder.inStreamingMode(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.enableCheckpointing(400); + tEnv = StreamTableEnvironment.create(env, settingsBuilder.build()); + } else { + settingsBuilder.inBatchMode(); + tEnv = TableEnvironment.create(settingsBuilder.build()); + } + } + } + return tEnv; + } + + @Before + public void before() { + sql("CREATE DATABASE %s", flinkDatabase); + sql("USE CATALOG %s", catalogName); + sql("USE %s", DATABASE); + sql("CREATE TABLE %s (id int, data varchar) with ('write.format.default'='%s')", TABLE_NAME, format.name()); + icebergTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); + } + + @After + public void clean() { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME); + sql("DROP DATABASE IF EXISTS %s", flinkDatabase); + } + + @Test + public void testInsertFromSourceTable() throws Exception { + // Register the rows into a temporary table. + getTableEnv().createTemporaryView("sourceTable", + getTableEnv().fromValues(SimpleDataUtil.FLINK_SCHEMA.toRowDataType(), + Expressions.row(1, "hello"), + Expressions.row(2, "world"), + Expressions.row(3, (String) null), + Expressions.row(null, "bar") + ) + ); + + // Redirect the records from source table to destination table. + sql("INSERT INTO %s SELECT id,data from sourceTable", TABLE_NAME); + + // Assert the table records as expected. + SimpleDataUtil.assertTableRecords(icebergTable, Lists.newArrayList( + SimpleDataUtil.createRecord(1, "hello"), + SimpleDataUtil.createRecord(2, "world"), + SimpleDataUtil.createRecord(3, null), + SimpleDataUtil.createRecord(null, "bar") + )); + } + + @Test + public void testOverwriteTable() throws Exception { + Assume.assumeFalse("Flink unbounded streaming does not support overwrite operation", isStreamingJob); + + sql("INSERT INTO %s SELECT 1, 'a'", TABLE_NAME); + SimpleDataUtil.assertTableRecords(icebergTable, Lists.newArrayList( + SimpleDataUtil.createRecord(1, "a") + )); + + sql("INSERT OVERWRITE %s SELECT 2, 'b'", TABLE_NAME); + SimpleDataUtil.assertTableRecords(icebergTable, Lists.newArrayList( + SimpleDataUtil.createRecord(2, "b") + )); + } + + @Test + public void testReplacePartitions() throws Exception { + Assume.assumeFalse("Flink unbounded streaming does not support overwrite operation", isStreamingJob); + String tableName = "test_partition"; + + sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH ('write.format.default'='%s')", + tableName, format.name()); + + Table partitionedTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); + + sql("INSERT INTO %s SELECT 1, 'a'", tableName); + sql("INSERT INTO %s SELECT 2, 'b'", tableName); + sql("INSERT INTO %s SELECT 3, 'c'", tableName); + + SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( + SimpleDataUtil.createRecord(1, "a"), + SimpleDataUtil.createRecord(2, "b"), + SimpleDataUtil.createRecord(3, "c") + )); + + sql("INSERT OVERWRITE %s SELECT 4, 'b'", tableName); + sql("INSERT OVERWRITE %s SELECT 5, 'a'", tableName); + + SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( + SimpleDataUtil.createRecord(5, "a"), + SimpleDataUtil.createRecord(4, "b"), + SimpleDataUtil.createRecord(3, "c") + )); + + sql("INSERT OVERWRITE %s PARTITION (data='a') SELECT 6", tableName); + + SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( + SimpleDataUtil.createRecord(6, "a"), + SimpleDataUtil.createRecord(4, "b"), + SimpleDataUtil.createRecord(3, "c") + )); + + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } + + @Test + public void testInsertIntoPartition() throws Exception { + String tableName = "test_insert_into_partition"; + + sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH ('write.format.default'='%s')", + tableName, format.name()); + + Table partitionedTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); + + // Full partition. + sql("INSERT INTO %s PARTITION (data='a') SELECT 1", tableName); + sql("INSERT INTO %s PARTITION (data='a') SELECT 2", tableName); + sql("INSERT INTO %s PARTITION (data='b') SELECT 3", tableName); + + SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( + SimpleDataUtil.createRecord(1, "a"), + SimpleDataUtil.createRecord(2, "a"), + SimpleDataUtil.createRecord(3, "b") + )); + + // Partial partition. + sql("INSERT INTO %s SELECT 4, 'c'", tableName); + sql("INSERT INTO %s SELECT 5, 'd'", tableName); + + SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( + SimpleDataUtil.createRecord(1, "a"), + SimpleDataUtil.createRecord(2, "a"), + SimpleDataUtil.createRecord(3, "b"), + SimpleDataUtil.createRecord(4, "c"), + SimpleDataUtil.createRecord(5, "d") + )); + + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index e5a9a8d4766b..2c4ef32e42af 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -492,7 +492,7 @@ private static TestOperatorFactory of(String tablePath) { @Override @SuppressWarnings("unchecked") public > T createStreamOperator(StreamOperatorParameters param) { - IcebergFilesCommitter committer = new IcebergFilesCommitter(TableLoader.fromHadoopTable(tablePath), CONF); + IcebergFilesCommitter committer = new IcebergFilesCommitter(TableLoader.fromHadoopTable(tablePath), CONF, false); committer.setup(param.getContainingTask(), param.getStreamConfig(), param.getOutput()); return (T) committer; }