Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task: Add test for simulating OOM error during merge equality deletes #2

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,118 @@ public void testEqualityDeleteWithFilter() throws IOException {
assertThat(actual).as("Table should contain no rows").hasSize(0);
}

@TestTemplate
public void testMergeEqualityDeletesOOM() throws IOException {
String tableName = table.name().substring(table.name().lastIndexOf(".") + 1);
Schema deleteRowSchema = table.schema().select("data");
Record dataDelete = GenericRecord.create(deleteRowSchema);

// Create an extremely large dataset
List<Record> dataDeletes = Lists.newArrayList();
for (int i = 0; i < 1000_000_000; i++) { // Adjust this number up if necessary
// Creating large strings for each record to consume more memory
dataDeletes.add(dataDelete.copy("data", "item" + i + "data".repeat(100)));
// System.out.println(dataDeletes);
}

DeleteFile eqDeletes =
FileHelpers.writeDeleteFile(
table,
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
TestHelpers.Row.of(0), // not partition specific because key = 0
dataDeletes,
deleteRowSchema);

table.newRowDelta().addDeletes(eqDeletes).commit();

Types.StructType projection = table.schema().select("*").asStruct();
Dataset<Row> df =
spark
.read()
.format("iceberg")
.load(TableIdentifier.of("default", tableName).toString())
.filter("data = 'item0" + "data".repeat(100) + "'") // filtering a specific deleted row
.selectExpr("*");

StructLikeSet actual = StructLikeSet.create(projection);
df.collectAsList()
.forEach(
row -> {
SparkStructLike rowWrapper = new SparkStructLike(projection);
actual.add(rowWrapper.wrap(row));
});
assertThat(actual).as("Table should contain no rows").hasSize(0);
}

@TestTemplate
public void testMergeEqualityDeletesOOM_v2() throws IOException {
// Set up table name and delete schema
String tableName = table.name().substring(table.name().lastIndexOf(".") + 1);
Schema deleteRowSchema = table.schema().select("data");
Record dataDelete = GenericRecord.create(deleteRowSchema);

// Create and commit multiple equality delete files to simulate memory pressure
// ie. creating multiple smaller equalirty delete files instead of one really big equality delete file
int fileCount = 10; // Number of smaller delete files
int recordsPerFile = 1_000_000; // Adjust for estimated ~120 MB file size

for (int j = 0; j < fileCount; j++) {
List<Record> dataDeletes = Lists.newArrayList();

for (int i = 0; i < recordsPerFile; i++) {
dataDeletes.add(dataDelete.copy("data", "item" + i + "data".repeat(100)));
}

// Write each equality delete file separately
File deleteFile = File.createTempFile("junit", null, temp.toFile());
DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
table,
Files.localOutput(deleteFile),
TestHelpers.Row.of(0),
dataDeletes,
deleteRowSchema
);
table.newRowDelta().addDeletes(eqDeletes).commit();

// Check and log the file size
// File size check: deleteFile.length() returns the file size in bytes
// then we divide by (1024 * 1024) to convert bytes to megabytes
long fileSizeInBytes = deleteFile.length();
System.out.println("Equality delete file " + j + " size: " + (fileSizeInBytes / (1024 * 1024)) + " MB");
System.out.println(deleteFile);
System.out.println(eqDeletes);
}


// Prepare for the SELECT * query (basic read)
Types.StructType projection = table.schema().select("*").asStruct();
Dataset<Row> df = spark.read()
.format("iceberg")
.load(TableIdentifier.of("default", tableName).toString())
.filter("data = 'item0" + "data".repeat(100) + "'") // Filter to verify delete is applied
.selectExpr("*");

// Track memory usage before read operation
long initialMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();

// Execute and process SELECT * result, ensuring all equality deletes are applied
StructLikeSet actual = StructLikeSet.create(projection);
df.collectAsList().forEach(row -> {
SparkStructLike rowWrapper = new SparkStructLike(projection);
actual.add(rowWrapper.wrap(row));
});

// Track memory usage after read operation
long finalMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();

// Assertions
assertThat(actual).as("Table should contain no rows").hasSize(0); // Equality-deleted rows should not appear
System.out.println("Memory consumed during read operation: " + (finalMemory - initialMemory)
+ " bytes. This is because initial memory consumption (before simulating read) was "
+ initialMemory + " and final memory consumption was " + finalMemory);
}


@TestTemplate
public void testReadEqualityDeleteRows() throws IOException {
Schema deleteSchema1 = table.schema().select("data");
Expand Down