diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index b00018b3b770..d20859377ffc 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -61,10 +61,11 @@ static List readDataFiles(ManifestFile manifestFile, FileIO io) throws } } - static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId, - long attemptNumber) { + static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, String operatorUniqueId, + int subTaskId, long attemptNumber) { TableOperations ops = ((HasTableOperations) table).operations(); - return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber); + return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, operatorUniqueId, + subTaskId, attemptNumber); } static DeltaManifests writeCompletedFiles(WriteResult result, diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 8f8bdad6c852..beffff68fde8 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -129,7 +129,9 @@ public void initializeState(StateInitializationContext context) throws Exception int subTaskId = getRuntimeContext().getIndexOfThisSubtask(); int attemptId = getRuntimeContext().getAttemptNumber(); - this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId); + String operatorUniqueId = getRuntimeContext().getOperatorUniqueID(); + this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorUniqueId, + subTaskId, attemptId); this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID; this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR); diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java index fca86080b11a..b7d575bb446b 100644 --- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java +++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java @@ -35,23 +35,25 @@ class ManifestOutputFileFactory { private final FileIO io; private final Map props; private final String flinkJobId; + private final String operatorUniqueId; private final int subTaskId; private final long attemptNumber; private final AtomicInteger fileCount = new AtomicInteger(0); ManifestOutputFileFactory(TableOperations ops, FileIO io, Map props, - String flinkJobId, int subTaskId, long attemptNumber) { + String flinkJobId, String operatorUniqueId, int subTaskId, long attemptNumber) { this.ops = ops; this.io = io; this.props = props; this.flinkJobId = flinkJobId; + this.operatorUniqueId = operatorUniqueId; this.subTaskId = subTaskId; this.attemptNumber = attemptNumber; } private String generatePath(long checkpointId) { - return FileFormat.AVRO.addExtension(String.format("%s-%05d-%d-%d-%05d", flinkJobId, subTaskId, - attemptNumber, checkpointId, fileCount.incrementAndGet())); + return FileFormat.AVRO.addExtension(String.format("%s-%s-%05d-%d-%d-%05d", flinkJobId, operatorUniqueId, + subTaskId, attemptNumber, checkpointId, fileCount.incrementAndGet())); } OutputFile create(long checkpointId) { diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index c1538bcaff9d..4a47656e847d 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -87,9 +87,10 @@ public void before() throws IOException { @Test public void testIO() throws IOException { String flinkJobId = newFlinkJobId(); + String operatorId = newOperatorUniqueId(); for (long checkpointId = 1; checkpointId <= 3; checkpointId++) { ManifestOutputFileFactory factory = - FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1); + FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, 1, 1); final long curCkpId = checkpointId; List dataFiles = generateDataFiles(10); @@ -122,11 +123,12 @@ public void testIO() throws IOException { public void testUserProvidedManifestLocation() throws IOException { long checkpointId = 1; String flinkJobId = newFlinkJobId(); + String operatorId = newOperatorUniqueId(); File userProvidedFolder = tempFolder.newFolder(); Map props = ImmutableMap.of(FLINK_MANIFEST_LOCATION, userProvidedFolder.getAbsolutePath() + "///"); ManifestOutputFileFactory factory = new ManifestOutputFileFactory( ((HasTableOperations) table).operations(), table.io(), props, - flinkJobId, 1, 1); + flinkJobId, operatorId, 1, 1); List dataFiles = generateDataFiles(5); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( @@ -156,7 +158,9 @@ public void testUserProvidedManifestLocation() throws IOException { public void testVersionedSerializer() throws IOException { long checkpointId = 1; String flinkJobId = newFlinkJobId(); - ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1); + String operatorId = newOperatorUniqueId(); + ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, + 1, 1); List dataFiles = generateDataFiles(10); List eqDeleteFiles = generateEqDeleteFiles(10); @@ -186,7 +190,9 @@ public void testCompatibility() throws IOException { // The v2 deserializer should be able to deserialize the v1 binary. long checkpointId = 1; String flinkJobId = newFlinkJobId(); - ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1); + String operatorId = newOperatorUniqueId(); + ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, + 1, 1); List dataFiles = generateDataFiles(10); ManifestFile manifest = FlinkManifestUtil.writeDataFiles(factory.create(checkpointId), table.spec(), dataFiles); @@ -271,4 +277,8 @@ private List generatePosDeleteFiles(int fileNum) throws IOException private static String newFlinkJobId() { return UUID.randomUUID().toString(); } + + private static String newOperatorUniqueId() { + return UUID.randomUUID().toString(); + } } diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 135fa84ee94a..9c23d8a0889f 100644 --- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -599,8 +599,10 @@ public void testFlinkManifests() throws Exception { harness.snapshot(checkpoint, ++timestamp); List manifestPaths = assertFlinkManifests(1); Path manifestPath = manifestPaths.get(0); + String operatorId = harness.getOneInputOperator().getOperatorID().toString(); Assert.assertEquals("File name should have the expected pattern.", - String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString()); + String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1), + manifestPath.getFileName().toString()); // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io()); @@ -640,8 +642,10 @@ public void testDeleteFiles() throws Exception { harness.snapshot(checkpoint, ++timestamp); List manifestPaths = assertFlinkManifests(1); Path manifestPath = manifestPaths.get(0); + String operatorId = harness.getOneInputOperator().getOperatorID().toString(); Assert.assertEquals("File name should have the expected pattern.", - String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString()); + String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1), + manifestPath.getFileName().toString()); // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io()); diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index ae921c5200a8..d80b9bf5476d 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -488,6 +488,10 @@ private Pair load(Identifier ident) { return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { + if (ident.namespace().length == 0) { + throw e; + } + // if the original load didn't work, the identifier may be extended and include a snapshot selector TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace())); Table table; @@ -567,6 +571,8 @@ private Pair loadFromPathIdentifier(PathIdentifier ident) { } private Identifier namespaceToIdentifier(String[] namespace) { + Preconditions.checkArgument(namespace.length > 0, + "Cannot convert empty namespace to identifier"); String[] ns = Arrays.copyOf(namespace, namespace.length - 1); String name = namespace[ns.length]; return Identifier.of(ns, name); diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java index 303cbb5f932b..0a4c9368cb96 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java @@ -67,6 +67,17 @@ public void testCreateTable() { table.properties().get(TableProperties.DEFAULT_FILE_FORMAT)); } + @Test + public void testCreateTableInRootNamespace() { + Assume.assumeTrue("Hadoop has no default namespace configured", "testhadoop".equals(catalogName)); + + try { + sql("CREATE TABLE %s.table (id bigint) USING iceberg", catalogName); + } finally { + sql("DROP TABLE IF EXISTS %s.table", catalogName); + } + } + @Test public void testCreateTableUsingParquet() { Assume.assumeTrue( diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index ae921c5200a8..d80b9bf5476d 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -488,6 +488,10 @@ private Pair load(Identifier ident) { return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { + if (ident.namespace().length == 0) { + throw e; + } + // if the original load didn't work, the identifier may be extended and include a snapshot selector TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace())); Table table; @@ -567,6 +571,8 @@ private Pair loadFromPathIdentifier(PathIdentifier ident) { } private Identifier namespaceToIdentifier(String[] namespace) { + Preconditions.checkArgument(namespace.length > 0, + "Cannot convert empty namespace to identifier"); String[] ns = Arrays.copyOf(namespace, namespace.length - 1); String name = namespace[ns.length]; return Identifier.of(ns, name); diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java index 303cbb5f932b..0a4c9368cb96 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java @@ -67,6 +67,17 @@ public void testCreateTable() { table.properties().get(TableProperties.DEFAULT_FILE_FORMAT)); } + @Test + public void testCreateTableInRootNamespace() { + Assume.assumeTrue("Hadoop has no default namespace configured", "testhadoop".equals(catalogName)); + + try { + sql("CREATE TABLE %s.table (id bigint) USING iceberg", catalogName); + } finally { + sql("DROP TABLE IF EXISTS %s.table", catalogName); + } + } + @Test public void testCreateTableUsingParquet() { Assume.assumeTrue( diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RowLevelCommandScanRelationPushDown.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RowLevelCommandScanRelationPushDown.scala index 52b27d53e1a7..4e89b9a1c243 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RowLevelCommandScanRelationPushDown.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RowLevelCommandScanRelationPushDown.scala @@ -20,13 +20,17 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.AttributeSet +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.planning.RewrittenRowLevelCommand import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType object RowLevelCommandScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { @@ -39,16 +43,12 @@ object RowLevelCommandScanRelationPushDown extends Rule[LogicalPlan] with Predic val table = relation.table.asRowLevelOperationTable val scanBuilder = table.newScanBuilder(relation.options) - val filters = command.condition.toSeq - val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output) - val (_, normalizedFiltersWithoutSubquery) = - normalizedFilters.partition(SubqueryExpression.hasSubquery) - - val (pushedFilters, remainingFilters) = PushDownUtils.pushFilters( - scanBuilder, normalizedFiltersWithoutSubquery) + val (pushedFilters, remainingFilters) = command.condition match { + case Some(cond) => pushFilters(cond, scanBuilder, relation.output) + case None => (Nil, Nil) + } - val (scan, output) = PushDownUtils.pruneColumns( - scanBuilder, relation, relation.output, Seq.empty) + val (scan, output) = PushDownUtils.pruneColumns(scanBuilder, relation, relation.output, Nil) logInfo( s""" @@ -68,6 +68,20 @@ object RowLevelCommandScanRelationPushDown extends Rule[LogicalPlan] with Predic command.withNewRewritePlan(newRewritePlan) } + private def pushFilters( + cond: Expression, + scanBuilder: ScanBuilder, + tableAttrs: Seq[AttributeReference]): (Seq[Filter], Seq[Expression]) = { + + val tableAttrSet = AttributeSet(tableAttrs) + val filters = splitConjunctivePredicates(cond).filter(_.references.subsetOf(tableAttrSet)) + val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, tableAttrs) + val (_, normalizedFiltersWithoutSubquery) = + normalizedFilters.partition(SubqueryExpression.hasSubquery) + + PushDownUtils.pushFilters(scanBuilder, normalizedFiltersWithoutSubquery) + } + private def toOutputAttrs( schema: StructType, relation: DataSourceV2Relation): Seq[AttributeReference] = { diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java index 23ba7addf99b..6537e31c6757 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java @@ -32,6 +32,9 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -75,6 +78,52 @@ public void removeTables() { sql("DROP TABLE IF EXISTS source"); } + @Test + public void testMergeWithStaticPredicatePushDown() { + createAndInitTable("id BIGINT, dep STRING"); + + sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName); + + append(tableName, + "{ \"id\": 1, \"dep\": \"software\" }\n" + + "{ \"id\": 11, \"dep\": \"software\" }\n" + + "{ \"id\": 1, \"dep\": \"hr\" }"); + + Table table = validationCatalog.loadTable(tableIdent); + + Snapshot snapshot = table.currentSnapshot(); + String dataFilesCount = snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP); + Assert.assertEquals("Must have 2 files before MERGE", "2", dataFilesCount); + + createOrReplaceView("source", + "{ \"id\": 1, \"dep\": \"finance\" }\n" + + "{ \"id\": 2, \"dep\": \"hardware\" }"); + + // disable dynamic pruning and rely only on static predicate pushdown + withSQLConf(ImmutableMap.of(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED().key(), "false"), () -> { + sql("MERGE INTO %s t USING source " + + "ON t.id == source.id AND t.dep IN ('software') AND source.id < 10 " + + "WHEN MATCHED AND source.id = 1 THEN " + + " UPDATE SET dep = source.dep " + + "WHEN NOT MATCHED THEN " + + " INSERT (dep, id) VALUES (source.dep, source.id)", tableName); + }); + + table.refresh(); + + Snapshot mergeSnapshot = table.currentSnapshot(); + String deletedDataFilesCount = mergeSnapshot.summary().get(SnapshotSummary.DELETED_FILES_PROP); + Assert.assertEquals("Must overwrite only 1 file", "1", deletedDataFilesCount); + + ImmutableList expectedRows = ImmutableList.of( + row(1L, "finance"), // updated + row(1L, "hr"), // kept + row(2L, "hardware"), // new + row(11L, "software") // kept + ); + assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id, dep", tableName)); + } + @Test public void testMergeIntoEmptyTargetInsertAllNonMatchingRows() { createAndInitTable("id INT, dep STRING"); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index ae921c5200a8..d80b9bf5476d 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -488,6 +488,10 @@ private Pair load(Identifier ident) { return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { + if (ident.namespace().length == 0) { + throw e; + } + // if the original load didn't work, the identifier may be extended and include a snapshot selector TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace())); Table table; @@ -567,6 +571,8 @@ private Pair loadFromPathIdentifier(PathIdentifier ident) { } private Identifier namespaceToIdentifier(String[] namespace) { + Preconditions.checkArgument(namespace.length > 0, + "Cannot convert empty namespace to identifier"); String[] ns = Arrays.copyOf(namespace, namespace.length - 1); String name = namespace[ns.length]; return Identifier.of(ns, name); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java index 303cbb5f932b..0a4c9368cb96 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java @@ -67,6 +67,17 @@ public void testCreateTable() { table.properties().get(TableProperties.DEFAULT_FILE_FORMAT)); } + @Test + public void testCreateTableInRootNamespace() { + Assume.assumeTrue("Hadoop has no default namespace configured", "testhadoop".equals(catalogName)); + + try { + sql("CREATE TABLE %s.table (id bigint) USING iceberg", catalogName); + } finally { + sql("DROP TABLE IF EXISTS %s.table", catalogName); + } + } + @Test public void testCreateTableUsingParquet() { Assume.assumeTrue(