Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.extensions;

import static org.apache.iceberg.SnapshotSummary.ADDED_FILE_SIZE_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_FILE_SIZE_PROP;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.Encoders;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class TestRewritePositionDeleteFilesProcedure extends SparkExtensionsTestBase {

public TestRewritePositionDeleteFilesProcedure(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}

private void createTable() throws Exception {
sql(
"CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+ "('format-version'='2', 'write.delete.mode'='merge-on-read')",
tableName);

List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b"),
new SimpleRecord(3, "c"),
new SimpleRecord(4, "d"),
new SimpleRecord(5, "e"),
new SimpleRecord(6, "f"));
spark
.createDataset(records, Encoders.bean(SimpleRecord.class))
.coalesce(1)
.writeTo(tableName)
.append();
}

@After
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
}

@Test
public void testExpireDeleteFilesAll() throws Exception {
createTable();

sql("DELETE FROM %s WHERE id=1", tableName);
sql("DELETE FROM %s WHERE id=2", tableName);

Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals(2, TestHelpers.deleteFiles(table).size());

List<Object[]> output =
sql(
"CALL %s.system.rewrite_position_delete_files("
+ "table => '%s',"
+ "options => map("
+ "'rewrite-all','true'))",
catalogName, tableIdent);
table.refresh();

Map<String, String> snapshotSummary = snapshotSummary();
assertEquals(
"Should delete 2 delete files and add 1",
ImmutableList.of(
row(
2,
1,
Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)),
Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))),
output);

Assert.assertEquals(1, TestHelpers.deleteFiles(table).size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we also validate the contents of the new delete file? (whether it really has all the rewritten files' contents?)

Copy link
Member Author

@szehon-ho szehon-ho May 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted this test to be about the procedure code, as I already have added delete file content check in the test of of Action itself in TestRewritePositionDeleteFilesAction

}

@Test
public void testExpireDeleteFilesNoOption() throws Exception {
createTable();

sql("DELETE FROM %s WHERE id=1", tableName);
sql("DELETE FROM %s WHERE id=2", tableName);
sql("DELETE FROM %s WHERE id=3", tableName);
sql("DELETE FROM %s WHERE id=4", tableName);
sql("DELETE FROM %s WHERE id=5", tableName);

Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals(5, TestHelpers.deleteFiles(table).size());

List<Object[]> output =
sql(
"CALL %s.system.rewrite_position_delete_files(" + "table => '%s')",
catalogName, tableIdent);
table.refresh();

Map<String, String> snapshotSummary = snapshotSummary();
assertEquals(
"Should replace 5 delete files with 1",
ImmutableList.of(
row(
5,
1,
Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)),
Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))),
output);
}

@Test
public void testInvalidOption() throws Exception {
createTable();

Assert.assertThrows(
"Cannot use options [foo], they are not supported by the action or the rewriter BIN-PACK",
IllegalArgumentException.class,
() ->
sql(
"CALL %s.system.rewrite_position_delete_files("
+ "table => '%s',"
+ "options => map("
+ "'foo', 'bar'))",
catalogName, tableIdent));
}

private Map<String, String> snapshotSummary() {
return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add a test case with dangling deletes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, added already tests of dangling deletes on : TestRewritePositionDeleteFilesAction, and was thinking this to be a quicker test just to validate the procedure code only

Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.procedures;

import java.util.Map;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
* A procedure that rewrites position delete files in a table.
*
* @see org.apache.iceberg.spark.actions.SparkActions#rewritePositionDeletes(Table)
*/
public class RewritePositionDeleteFilesProcedure extends BaseProcedure {

private static final ProcedureParameter TABLE_PARAM =
ProcedureParameter.required("table", DataTypes.StringType);
private static final ProcedureParameter OPTIONS_PARAM =
ProcedureParameter.optional("options", STRING_MAP);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};

private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField(
"rewritten_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
new StructField(
"added_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()),
new StructField("added_bytes_count", DataTypes.LongType, false, Metadata.empty())
});

public static SparkProcedures.ProcedureBuilder builder() {
return new Builder<RewritePositionDeleteFilesProcedure>() {
@Override
protected RewritePositionDeleteFilesProcedure doBuild() {
return new RewritePositionDeleteFilesProcedure(tableCatalog());
}
};
}

private RewritePositionDeleteFilesProcedure(TableCatalog tableCatalog) {
super(tableCatalog);
}

@Override
public ProcedureParameter[] parameters() {
return PARAMETERS;
}

@Override
public StructType outputType() {
return OUTPUT_TYPE;
}

@Override
public InternalRow[] call(InternalRow args) {
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
Identifier tableIdent = input.ident(TABLE_PARAM);
Map<String, String> options = input.asStringMap(OPTIONS_PARAM, ImmutableMap.of());

return modifyIcebergTable(
tableIdent,
table -> {
Result result = actions().rewritePositionDeletes(table).options(options).execute();
return new InternalRow[] {toOutputRow(result)};
});
}

private InternalRow toOutputRow(Result result) {
return newInternalRow(
result.rewrittenDeleteFilesCount(),
result.addedDeleteFilesCount(),
result.rewrittenBytesCount(),
result.addedBytesCount());
}

@Override
public String description() {
return "RewritePositionDeleteFilesProcedure";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
mapBuilder.put("register_table", RegisterTableProcedure::builder);
mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
mapBuilder.put("create_changelog_view", CreateChangelogViewProcedure::builder);
mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe shorten the name by just calling it as rewrite_position_deletes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does everyone think? I use RewritePositionDeletes and PositionDeletes in the code a lot for shortness, but was not sure as the procedure names all indicate files

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally I'd go for shorter names, but I see here all the keys in the map already have files suffix so I think it's fine as it is currently

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea thats the primary reason why i kept 'files' though i do think it is long, @aokolnychyi what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong opinion. I also like shorter names but it seems we use files suffix in other procedures so I'll be inclined to keep the name as is.

return mapBuilder.build();
}

Expand Down