Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.junit.Assert.assertThrows;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.ChangelogOperation;
Expand Down Expand Up @@ -45,13 +47,13 @@ public void removeTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
}

public void createTableWith2Columns() {
public void createTableWithTwoColumns() {
sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, 1);
sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
}

private void createTableWith3Columns() {
private void createTableWithThreeColumns() {
sql("CREATE TABLE %s (id INT, data STRING, age INT) USING iceberg", tableName);
sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, 1);
sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
Expand All @@ -65,7 +67,7 @@ private void createTableWithIdentifierField() {

@Test
public void testCustomizedViewName() {
createTableWith2Columns();
createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
sql("INSERT INTO %s VALUES (2, 'b')", tableName);

Expand Down Expand Up @@ -98,7 +100,7 @@ public void testCustomizedViewName() {

@Test
public void testNoSnapshotIdInput() {
createTableWith2Columns();
createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();
Expand Down Expand Up @@ -129,7 +131,7 @@ public void testNoSnapshotIdInput() {

@Test
public void testTimestampsBasedQuery() {
createTableWith2Columns();
createTableWithTwoColumns();
long beginning = System.currentTimeMillis();

sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Expand Down Expand Up @@ -189,7 +191,7 @@ public void testTimestampsBasedQuery() {

@Test
public void testWithCarryovers() {
createTableWith2Columns();
createTableWithTwoColumns();
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap0 = table.currentSnapshot();
Expand Down Expand Up @@ -224,7 +226,7 @@ public void testWithCarryovers() {

@Test
public void testUpdate() {
createTableWith2Columns();
createTableWithTwoColumns();
sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);

Expand Down Expand Up @@ -283,7 +285,7 @@ public void testUpdateWithIdentifierField() {

@Test
public void testUpdateWithFilter() {
createTableWith2Columns();
createTableWithTwoColumns();
sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);

Expand Down Expand Up @@ -315,7 +317,7 @@ public void testUpdateWithFilter() {

@Test
public void testUpdateWithMultipleIdentifierColumns() {
createTableWith3Columns();
createTableWithThreeColumns();

sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Expand Down Expand Up @@ -347,7 +349,7 @@ public void testUpdateWithMultipleIdentifierColumns() {

@Test
public void testRemoveCarryOvers() {
createTableWith3Columns();
createTableWithThreeColumns();

sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Expand Down Expand Up @@ -381,7 +383,7 @@ public void testRemoveCarryOvers() {

@Test
public void testRemoveCarryOversWithoutUpdatedRows() {
createTableWith3Columns();
createTableWithThreeColumns();

sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Expand Down Expand Up @@ -411,9 +413,74 @@ public void testRemoveCarryOversWithoutUpdatedRows() {
sql("select * from %s order by _change_ordinal, id, data", viewName));
}

@Test
public void testNetChangesWithRemoveCarryOvers() {
// partitioned by id
createTableWithThreeColumns();

// insert rows: (1, 'a', 12) (2, 'b', 11) (2, 'e', 12)
sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snap1 = table.currentSnapshot();

// delete rows: (2, 'b', 11) (2, 'e', 12)
// insert rows: (3, 'c', 13) (2, 'd', 11) (2, 'e', 12)
sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
table.refresh();
Snapshot snap2 = table.currentSnapshot();

// delete rows: (2, 'd', 11) (2, 'e', 12) (3, 'c', 13)
// insert rows: (3, 'c', 15) (2, 'e', 12)
sql("INSERT OVERWRITE %s VALUES (3, 'c', 15), (2, 'e', 12)", tableName);
table.refresh();
Snapshot snap3 = table.currentSnapshot();

// test with all snapshots
List<Object[]> returns =
sql(
"CALL %s.system.create_changelog_view(table => '%s', net_changes => true)",
catalogName, tableName);

String viewName = (String) returns.get(0)[0];

assertEquals(
"Rows should match",
ImmutableList.of(
row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
row(3, "c", 15, INSERT, 2, snap3.snapshotId()),
row(2, "e", 12, INSERT, 2, snap3.snapshotId())),
sql("select * from %s order by _change_ordinal, data", viewName));

// test with snap2 and snap3
sql(
"CALL %s.system.create_changelog_view(table => '%s', "
+ "options => map('start-snapshot-id','%s'), "
+ "net_changes => true)",
catalogName, tableName, snap1.snapshotId());

assertEquals(
"Rows should match",
ImmutableList.of(
row(2, "b", 11, DELETE, 0, snap2.snapshotId()),
row(3, "c", 15, INSERT, 1, snap3.snapshotId())),
sql("select * from %s order by _change_ordinal, data", viewName));
}

@Test
public void testNetChangesWithComputeUpdates() {
createTableWithTwoColumns();
assertThrows(
"Should fail because net_changes is not supported with computing updates",
IllegalArgumentException.class,
() ->
sql(
"CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'), net_changes => true)",
catalogName, tableName));
}

@Test
public void testNotRemoveCarryOvers() {
createTableWith3Columns();
createTableWithThreeColumns();

sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import org.apache.iceberg.ChangelogOperation;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
Expand All @@ -35,16 +37,28 @@ public abstract class ChangelogIterator implements Iterator<Row> {

private final Iterator<Row> rowIterator;
private final int changeTypeIndex;
private final StructType rowType;

protected ChangelogIterator(Iterator<Row> rowIterator, StructType rowType) {
this.rowIterator = rowIterator;
this.rowType = rowType;
this.changeTypeIndex = rowType.fieldIndex(MetadataColumns.CHANGE_TYPE.name());
}

protected int changeTypeIndex() {
return changeTypeIndex;
}

protected StructType rowType() {
return rowType;
}

protected String changeType(Row row) {
String changeType = row.getString(changeTypeIndex());
Preconditions.checkNotNull(changeType, "Change type should not be null");
return changeType;
}

protected Iterator<Row> rowIterator() {
return rowIterator;
}
Expand Down Expand Up @@ -79,7 +93,35 @@ public static Iterator<Row> removeCarryovers(Iterator<Row> rowIterator, StructTy
return Iterators.filter(changelogIterator, Objects::nonNull);
}

public static Iterator<Row> removeNetCarryovers(Iterator<Row> rowIterator, StructType rowType) {
ChangelogIterator changelogIterator = new RemoveNetCarryoverIterator(rowIterator, rowType);
return Iterators.filter(changelogIterator, Objects::nonNull);
}

protected boolean isSameRecord(Row currentRow, Row nextRow, int[] indicesToIdentifySameRow) {
for (int idx : indicesToIdentifySameRow) {
if (isDifferentValue(currentRow, nextRow, idx)) {
return false;
}
}

return true;
}

protected boolean isDifferentValue(Row currentRow, Row nextRow, int idx) {
return !Objects.equals(nextRow.get(idx), currentRow.get(idx));
}

protected static int[] generateIndicesToIdentifySameRow(
int totalColumnCount, Set<Integer> metadataColumnIndices) {
int[] indices = new int[totalColumnCount - metadataColumnIndices.size()];

for (int i = 0, j = 0; i < indices.length; i++) {
if (!metadataColumnIndices.contains(i)) {
indices[j] = i;
j++;
}
}
return indices;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,13 @@ public Row next() {
// either a cached record which is not an UPDATE or the next record in the iterator.
Row currentRow = currentRow();

if (currentRow.getString(changeTypeIndex()).equals(DELETE) && rowIterator().hasNext()) {
if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) {
Row nextRow = rowIterator().next();
cachedRow = nextRow;

if (sameLogicalRow(currentRow, nextRow)) {
String nextRowChangeType = nextRow.getString(changeTypeIndex());

Preconditions.checkState(
nextRowChangeType.equals(INSERT),
changeType(nextRow).equals(INSERT),
"Cannot compute updates because there are multiple rows with the same identifier"
+ " fields([%s]). Please make sure the rows are unique.",
String.join(",", identifierFields));
Expand Down Expand Up @@ -118,7 +116,7 @@ private Row modify(Row row, int valueIndex, Object value) {
}

private boolean cachedUpdateRecord() {
return cachedRow != null && cachedRow.getString(changeTypeIndex()).equals(UPDATE_AFTER);
return cachedRow != null && changeType(cachedRow).equals(UPDATE_AFTER);
}

private Row currentRow() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.iceberg.spark;

import java.util.Iterator;
import java.util.Set;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;

Expand Down Expand Up @@ -55,7 +57,7 @@ class RemoveCarryoverIterator extends ChangelogIterator {

RemoveCarryoverIterator(Iterator<Row> rowIterator, StructType rowType) {
super(rowIterator, rowType);
this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow(rowType.size());
this.indicesToIdentifySameRow = generateIndicesToIdentifySameRow();
}

@Override
Expand Down Expand Up @@ -88,7 +90,7 @@ public Row next() {
}

// If the current row is a delete row, drain all identical delete rows
if (currentRow.getString(changeTypeIndex()).equals(DELETE) && rowIterator().hasNext()) {
if (changeType(currentRow).equals(DELETE) && rowIterator().hasNext()) {
cachedDeletedRow = currentRow;
deletedRowCount = 1;

Expand All @@ -98,8 +100,8 @@ public Row next() {
// row is the same record
while (nextRow != null
&& cachedDeletedRow != null
&& isSameRecord(cachedDeletedRow, nextRow)) {
if (nextRow.getString(changeTypeIndex()).equals(INSERT)) {
&& isSameRecord(cachedDeletedRow, nextRow, indicesToIdentifySameRow)) {
if (changeType(nextRow).equals(INSERT)) {
deletedRowCount--;
if (deletedRowCount == 0) {
cachedDeletedRow = null;
Expand Down Expand Up @@ -139,25 +141,8 @@ private boolean hasCachedDeleteRow() {
return cachedDeletedRow != null;
}

private int[] generateIndicesToIdentifySameRow(int columnSize) {
int[] indices = new int[columnSize - 1];
for (int i = 0; i < indices.length; i++) {
if (i < changeTypeIndex()) {
indices[i] = i;
} else {
indices[i] = i + 1;
}
}
return indices;
}

private boolean isSameRecord(Row currentRow, Row nextRow) {
for (int idx : indicesToIdentifySameRow) {
if (isDifferentValue(currentRow, nextRow, idx)) {
return false;
}
}

return true;
private int[] generateIndicesToIdentifySameRow() {
Set<Integer> metadataColumnIndices = Sets.newHashSet(changeTypeIndex());
return generateIndicesToIdentifySameRow(rowType().size(), metadataColumnIndices);
}
}
Loading