Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ static List<DataFile> readDataFiles(ManifestFile manifestFile, FileIO io) throws
}
}

static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId,
long attemptNumber) {
static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, String operatorUniqueId,
int subTaskId, long attemptNumber) {
TableOperations ops = ((HasTableOperations) table).operations();
return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber);
return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, operatorUniqueId,
subTaskId, attemptNumber);
}

static DeltaManifests writeCompletedFiles(WriteResult result,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ public void initializeState(StateInitializationContext context) throws Exception

int subTaskId = getRuntimeContext().getIndexOfThisSubtask();
int attemptId = getRuntimeContext().getAttemptNumber();
this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, subTaskId, attemptId);
String operatorUniqueId = getRuntimeContext().getOperatorUniqueID();
this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorUniqueId,
subTaskId, attemptId);
this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;

this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,25 @@ class ManifestOutputFileFactory {
private final FileIO io;
private final Map<String, String> props;
private final String flinkJobId;
private final String operatorUniqueId;
private final int subTaskId;
private final long attemptNumber;
private final AtomicInteger fileCount = new AtomicInteger(0);

ManifestOutputFileFactory(TableOperations ops, FileIO io, Map<String, String> props,
String flinkJobId, int subTaskId, long attemptNumber) {
String flinkJobId, String operatorUniqueId, int subTaskId, long attemptNumber) {
this.ops = ops;
this.io = io;
this.props = props;
this.flinkJobId = flinkJobId;
this.operatorUniqueId = operatorUniqueId;
this.subTaskId = subTaskId;
this.attemptNumber = attemptNumber;
}

private String generatePath(long checkpointId) {
return FileFormat.AVRO.addExtension(String.format("%s-%05d-%d-%d-%05d", flinkJobId, subTaskId,
attemptNumber, checkpointId, fileCount.incrementAndGet()));
return FileFormat.AVRO.addExtension(String.format("%s-%s-%05d-%d-%d-%05d", flinkJobId, operatorUniqueId,
subTaskId, attemptNumber, checkpointId, fileCount.incrementAndGet()));
}

OutputFile create(long checkpointId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ public void before() throws IOException {
@Test
public void testIO() throws IOException {
String flinkJobId = newFlinkJobId();
String operatorId = newOperatorUniqueId();
for (long checkpointId = 1; checkpointId <= 3; checkpointId++) {
ManifestOutputFileFactory factory =
FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId, 1, 1);
final long curCkpId = checkpointId;

List<DataFile> dataFiles = generateDataFiles(10);
Expand Down Expand Up @@ -122,11 +123,12 @@ public void testIO() throws IOException {
public void testUserProvidedManifestLocation() throws IOException {
long checkpointId = 1;
String flinkJobId = newFlinkJobId();
String operatorId = newOperatorUniqueId();
File userProvidedFolder = tempFolder.newFolder();
Map<String, String> props = ImmutableMap.of(FLINK_MANIFEST_LOCATION, userProvidedFolder.getAbsolutePath() + "///");
ManifestOutputFileFactory factory = new ManifestOutputFileFactory(
((HasTableOperations) table).operations(), table.io(), props,
flinkJobId, 1, 1);
flinkJobId, operatorId, 1, 1);

List<DataFile> dataFiles = generateDataFiles(5);
DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(
Expand Down Expand Up @@ -156,7 +158,9 @@ public void testUserProvidedManifestLocation() throws IOException {
public void testVersionedSerializer() throws IOException {
long checkpointId = 1;
String flinkJobId = newFlinkJobId();
ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
String operatorId = newOperatorUniqueId();
ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId,
1, 1);

List<DataFile> dataFiles = generateDataFiles(10);
List<DeleteFile> eqDeleteFiles = generateEqDeleteFiles(10);
Expand Down Expand Up @@ -186,7 +190,9 @@ public void testCompatibility() throws IOException {
// The v2 deserializer should be able to deserialize the v1 binary.
long checkpointId = 1;
String flinkJobId = newFlinkJobId();
ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, 1, 1);
String operatorId = newOperatorUniqueId();
ManifestOutputFileFactory factory = FlinkManifestUtil.createOutputFileFactory(table, flinkJobId, operatorId,
1, 1);

List<DataFile> dataFiles = generateDataFiles(10);
ManifestFile manifest = FlinkManifestUtil.writeDataFiles(factory.create(checkpointId), table.spec(), dataFiles);
Expand Down Expand Up @@ -271,4 +277,8 @@ private List<DeleteFile> generatePosDeleteFiles(int fileNum) throws IOException
private static String newFlinkJobId() {
return UUID.randomUUID().toString();
}

private static String newOperatorUniqueId() {
return UUID.randomUUID().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,10 @@ public void testFlinkManifests() throws Exception {
harness.snapshot(checkpoint, ++timestamp);
List<Path> manifestPaths = assertFlinkManifests(1);
Path manifestPath = manifestPaths.get(0);
String operatorId = harness.getOneInputOperator().getOperatorID().toString();
Assert.assertEquals("File name should have the expected pattern.",
String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString());
String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1),
manifestPath.getFileName().toString());

// 2. Read the data files from manifests and assert.
List<DataFile> dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());
Expand Down Expand Up @@ -640,8 +642,10 @@ public void testDeleteFiles() throws Exception {
harness.snapshot(checkpoint, ++timestamp);
List<Path> manifestPaths = assertFlinkManifests(1);
Path manifestPath = manifestPaths.get(0);
String operatorId = harness.getOneInputOperator().getOperatorID().toString();
Assert.assertEquals("File name should have the expected pattern.",
String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString());
String.format("%s-%s-%05d-%d-%d-%05d.avro", jobId, operatorId, 0, 0, checkpoint, 1),
manifestPath.getFileName().toString());

// 2. Read the data files from manifests and assert.
List<DataFile> dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,10 @@ private Pair<Table, Long> load(Identifier ident) {
return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null);

} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
if (ident.namespace().length == 0) {
throw e;
}

// if the original load didn't work, the identifier may be extended and include a snapshot selector
TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace()));
Table table;
Expand Down Expand Up @@ -567,6 +571,8 @@ private Pair<Table, Long> loadFromPathIdentifier(PathIdentifier ident) {
}

private Identifier namespaceToIdentifier(String[] namespace) {
Preconditions.checkArgument(namespace.length > 0,
"Cannot convert empty namespace to identifier");
String[] ns = Arrays.copyOf(namespace, namespace.length - 1);
String name = namespace[ns.length];
return Identifier.of(ns, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ public void testCreateTable() {
table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
}

@Test
public void testCreateTableInRootNamespace() {
Assume.assumeTrue("Hadoop has no default namespace configured", "testhadoop".equals(catalogName));

try {
sql("CREATE TABLE %s.table (id bigint) USING iceberg", catalogName);
} finally {
sql("DROP TABLE IF EXISTS %s.table", catalogName);
}
}

@Test
public void testCreateTableUsingParquet() {
Assume.assumeTrue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,10 @@ private Pair<Table, Long> load(Identifier ident) {
return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null);

} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
if (ident.namespace().length == 0) {
throw e;
}

// if the original load didn't work, the identifier may be extended and include a snapshot selector
TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace()));
Table table;
Expand Down Expand Up @@ -567,6 +571,8 @@ private Pair<Table, Long> loadFromPathIdentifier(PathIdentifier ident) {
}

private Identifier namespaceToIdentifier(String[] namespace) {
Preconditions.checkArgument(namespace.length > 0,
"Cannot convert empty namespace to identifier");
String[] ns = Arrays.copyOf(namespace, namespace.length - 1);
String name = namespace[ns.length];
return Identifier.of(ns, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ public void testCreateTable() {
table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
}

@Test
public void testCreateTableInRootNamespace() {
Assume.assumeTrue("Hadoop has no default namespace configured", "testhadoop".equals(catalogName));

try {
sql("CREATE TABLE %s.table (id bigint) USING iceberg", catalogName);
} finally {
sql("DROP TABLE IF EXISTS %s.table", catalogName);
}
}

@Test
public void testCreateTableUsingParquet() {
Assume.assumeTrue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.AttributeSet
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.planning.RewrittenRowLevelCommand
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType

object RowLevelCommandScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
Expand All @@ -39,16 +43,12 @@ object RowLevelCommandScanRelationPushDown extends Rule[LogicalPlan] with Predic
val table = relation.table.asRowLevelOperationTable
val scanBuilder = table.newScanBuilder(relation.options)

val filters = command.condition.toSeq
val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output)
val (_, normalizedFiltersWithoutSubquery) =
normalizedFilters.partition(SubqueryExpression.hasSubquery)

val (pushedFilters, remainingFilters) = PushDownUtils.pushFilters(
scanBuilder, normalizedFiltersWithoutSubquery)
val (pushedFilters, remainingFilters) = command.condition match {
case Some(cond) => pushFilters(cond, scanBuilder, relation.output)
case None => (Nil, Nil)
}

val (scan, output) = PushDownUtils.pruneColumns(
scanBuilder, relation, relation.output, Seq.empty)
val (scan, output) = PushDownUtils.pruneColumns(scanBuilder, relation, relation.output, Nil)

logInfo(
s"""
Expand All @@ -68,6 +68,20 @@ object RowLevelCommandScanRelationPushDown extends Rule[LogicalPlan] with Predic
command.withNewRewritePlan(newRewritePlan)
}

private def pushFilters(
cond: Expression,
scanBuilder: ScanBuilder,
tableAttrs: Seq[AttributeReference]): (Seq[Filter], Seq[Expression]) = {

val tableAttrSet = AttributeSet(tableAttrs)
val filters = splitConjunctivePredicates(cond).filter(_.references.subsetOf(tableAttrSet))
val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, tableAttrs)
val (_, normalizedFiltersWithoutSubquery) =
normalizedFilters.partition(SubqueryExpression.hasSubquery)

PushDownUtils.pushFilters(scanBuilder, normalizedFiltersWithoutSubquery)
}

private def toOutputAttrs(
schema: StructType,
relation: DataSourceV2Relation): Seq[AttributeReference] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -75,6 +78,52 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS source");
}

@Test
public void testMergeWithStaticPredicatePushDown() {
createAndInitTable("id BIGINT, dep STRING");

sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);

append(tableName,
"{ \"id\": 1, \"dep\": \"software\" }\n" +
"{ \"id\": 11, \"dep\": \"software\" }\n" +
"{ \"id\": 1, \"dep\": \"hr\" }");

Table table = validationCatalog.loadTable(tableIdent);

Snapshot snapshot = table.currentSnapshot();
String dataFilesCount = snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP);
Assert.assertEquals("Must have 2 files before MERGE", "2", dataFilesCount);

createOrReplaceView("source",
"{ \"id\": 1, \"dep\": \"finance\" }\n" +
"{ \"id\": 2, \"dep\": \"hardware\" }");

// disable dynamic pruning and rely only on static predicate pushdown
withSQLConf(ImmutableMap.of(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED().key(), "false"), () -> {
sql("MERGE INTO %s t USING source " +
"ON t.id == source.id AND t.dep IN ('software') AND source.id < 10 " +
"WHEN MATCHED AND source.id = 1 THEN " +
" UPDATE SET dep = source.dep " +
"WHEN NOT MATCHED THEN " +
" INSERT (dep, id) VALUES (source.dep, source.id)", tableName);
});

table.refresh();

Snapshot mergeSnapshot = table.currentSnapshot();
String deletedDataFilesCount = mergeSnapshot.summary().get(SnapshotSummary.DELETED_FILES_PROP);
Assert.assertEquals("Must overwrite only 1 file", "1", deletedDataFilesCount);

ImmutableList<Object[]> expectedRows = ImmutableList.of(
row(1L, "finance"), // updated
row(1L, "hr"), // kept
row(2L, "hardware"), // new
row(11L, "software") // kept
);
assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s ORDER BY id, dep", tableName));
}

@Test
public void testMergeIntoEmptyTargetInsertAllNonMatchingRows() {
createAndInitTable("id INT, dep STRING");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,10 @@ private Pair<Table, Long> load(Identifier ident) {
return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null);

} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
if (ident.namespace().length == 0) {
throw e;
}

// if the original load didn't work, the identifier may be extended and include a snapshot selector
TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace()));
Table table;
Expand Down Expand Up @@ -567,6 +571,8 @@ private Pair<Table, Long> loadFromPathIdentifier(PathIdentifier ident) {
}

private Identifier namespaceToIdentifier(String[] namespace) {
Preconditions.checkArgument(namespace.length > 0,
"Cannot convert empty namespace to identifier");
String[] ns = Arrays.copyOf(namespace, namespace.length - 1);
String name = namespace[ns.length];
return Identifier.of(ns, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ public void testCreateTable() {
table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
}

@Test
public void testCreateTableInRootNamespace() {
Assume.assumeTrue("Hadoop has no default namespace configured", "testhadoop".equals(catalogName));

try {
sql("CREATE TABLE %s.table (id bigint) USING iceberg", catalogName);
} finally {
sql("DROP TABLE IF EXISTS %s.table", catalogName);
}
}

@Test
public void testCreateTableUsingParquet() {
Assume.assumeTrue(
Expand Down