Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions spark/v3.5/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
integrationImplementation "org.scala-lang.modules:scala-collection-compat_${scalaVersion}:${libs.versions.scala.collection.compat.get()}"
integrationImplementation "org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark.hive35.get()}"
integrationImplementation libs.junit.vintage.engine
integrationImplementation libs.junit.jupiter
integrationImplementation libs.slf4j.simple
integrationImplementation libs.assertj.core
integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
Expand Down Expand Up @@ -288,6 +289,7 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
}

task integrationTest(type: Test) {
useJUnitPlatform()
description = "Test Spark3 Runtime Jar against Spark ${sparkMajorVersion}"
group = "verification"
jvmArgs += project.property('extraJvmArgs')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -74,8 +73,9 @@ public void testSetIdentifierFields() {
public void testSetInvalidIdentifierFields() {
sql("CREATE TABLE %s (id bigint NOT NULL, id2 bigint) USING iceberg", tableName);
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertTrue(
"Table should start without identifier", table.schema().identifierFieldIds().isEmpty());
assertThat(table.schema().identifierFieldIds())
.as("Table should start without identifier")
.isEmpty();
Assertions.assertThatThrownBy(
() -> sql("ALTER TABLE %s SET IDENTIFIER FIELDS unknown", tableName))
.isInstanceOf(IllegalArgumentException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
Expand All @@ -34,19 +37,14 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Test;
import org.junit.runners.Parameterized;

public class TestMetaColumnProjectionWithStageScan extends SparkExtensionsTestBase {
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

public TestMetaColumnProjectionWithStageScan(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}
@ExtendWith(ParameterizedTestExtension.class)
public class TestMetaColumnProjectionWithStageScan extends ExtensionsTestBase {

@Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
public static Object[][] parameters() {
return new Object[][] {
{
Expand All @@ -57,7 +55,7 @@ public static Object[][] parameters() {
};
}

@After
@AfterEach
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
}
Expand All @@ -68,7 +66,7 @@ private <T extends ScanTask> void stageTask(
taskSetManager.stageTasks(tab, fileSetID, Lists.newArrayList(tasks));
}

@Test
@TestTemplate
public void testReadStageTableMeta() throws Exception {
sql(
"CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
Expand Down Expand Up @@ -103,7 +101,7 @@ public void testReadStageTableMeta() throws Exception {
.option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.load(tableLocation);

Assertions.assertThat(scanDF2.columns().length).isEqualTo(2);
assertThat(scanDF2.columns()).hasSize(2);
}

try (CloseableIterable<ScanTask> tasks = table.newBatchScan().planFiles()) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@

import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand All @@ -31,13 +34,12 @@
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.spark.sql.execution.SparkPlan;
import org.apache.spark.sql.internal.SQLConf;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runners.Parameterized;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

public class TestStoragePartitionedJoinsInRowLevelOperations extends SparkExtensionsTestBase {
@ExtendWith(ParameterizedTestExtension.class)
public class TestStoragePartitionedJoinsInRowLevelOperations extends ExtensionsTestBase {

private static final String OTHER_TABLE_NAME = "other_table";

Expand Down Expand Up @@ -68,7 +70,7 @@ public class TestStoragePartitionedJoinsInRowLevelOperations extends SparkExtens
SparkSQLProperties.PRESERVE_DATA_GROUPING,
"true");

@Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
@Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
public static Object[][] parameters() {
return new Object[][] {
{
Expand All @@ -79,23 +81,18 @@ public static Object[][] parameters() {
};
}

public TestStoragePartitionedJoinsInRowLevelOperations(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}

@After
@AfterEach
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
sql("DROP TABLE IF EXISTS %s", tableName(OTHER_TABLE_NAME));
}

@Test
@TestTemplate
public void testCopyOnWriteDeleteWithoutShuffles() {
checkDelete(COPY_ON_WRITE);
}

@Test
@TestTemplate
public void testMergeOnReadDeleteWithoutShuffles() {
checkDelete(MERGE_ON_READ);
}
Expand Down Expand Up @@ -139,10 +136,10 @@ private void checkDelete(RowLevelOperationMode mode) {
String planAsString = plan.toString();
if (mode == COPY_ON_WRITE) {
int actualNumShuffles = StringUtils.countMatches(planAsString, "Exchange");
Assert.assertEquals("Should be 1 shuffle with SPJ", 1, actualNumShuffles);
Assertions.assertThat(planAsString).contains("Exchange hashpartitioning(_file");
assertThat(actualNumShuffles).as("Should be 1 shuffle with SPJ").isEqualTo(1);
assertThat(planAsString).contains("Exchange hashpartitioning(_file");
} else {
Assertions.assertThat(planAsString).doesNotContain("Exchange");
assertThat(planAsString).doesNotContain("Exchange");
}
});

Expand All @@ -158,12 +155,12 @@ private void checkDelete(RowLevelOperationMode mode) {
sql("SELECT * FROM %s ORDER BY id, salary", tableName));
}

@Test
@TestTemplate
public void testCopyOnWriteUpdateWithoutShuffles() {
checkUpdate(COPY_ON_WRITE);
}

@Test
@TestTemplate
public void testMergeOnReadUpdateWithoutShuffles() {
checkUpdate(MERGE_ON_READ);
}
Expand Down Expand Up @@ -207,10 +204,10 @@ private void checkUpdate(RowLevelOperationMode mode) {
String planAsString = plan.toString();
if (mode == COPY_ON_WRITE) {
int actualNumShuffles = StringUtils.countMatches(planAsString, "Exchange");
Assert.assertEquals("Should be 1 shuffle with SPJ", 1, actualNumShuffles);
Assertions.assertThat(planAsString).contains("Exchange hashpartitioning(_file");
assertThat(actualNumShuffles).as("Should be 1 shuffle with SPJ").isEqualTo(1);
assertThat(planAsString).contains("Exchange hashpartitioning(_file");
} else {
Assertions.assertThat(planAsString).doesNotContain("Exchange");
assertThat(planAsString).doesNotContain("Exchange");
}
});

Expand All @@ -227,22 +224,22 @@ private void checkUpdate(RowLevelOperationMode mode) {
sql("SELECT * FROM %s ORDER BY id, salary", tableName));
}

@Test
@TestTemplate
public void testCopyOnWriteMergeWithoutShuffles() {
checkMerge(COPY_ON_WRITE, false /* with ON predicate */);
}

@Test
@TestTemplate
public void testCopyOnWriteMergeWithoutShufflesWithPredicate() {
checkMerge(COPY_ON_WRITE, true /* with ON predicate */);
}

@Test
@TestTemplate
public void testMergeOnReadMergeWithoutShuffles() {
checkMerge(MERGE_ON_READ, false /* with ON predicate */);
}

@Test
@TestTemplate
public void testMergeOnReadMergeWithoutShufflesWithPredicate() {
checkMerge(MERGE_ON_READ, true /* with ON predicate */);
}
Expand Down Expand Up @@ -294,10 +291,10 @@ private void checkMerge(RowLevelOperationMode mode, boolean withPredicate) {
String planAsString = plan.toString();
if (mode == COPY_ON_WRITE) {
int actualNumShuffles = StringUtils.countMatches(planAsString, "Exchange");
Assert.assertEquals("Should be 1 shuffle with SPJ", 1, actualNumShuffles);
Assertions.assertThat(planAsString).contains("Exchange hashpartitioning(_file");
assertThat(actualNumShuffles).as("Should be 1 shuffle with SPJ").isEqualTo(1);
assertThat(planAsString).contains("Exchange hashpartitioning(_file");
} else {
Assertions.assertThat(planAsString).doesNotContain("Exchange");
assertThat(planAsString).doesNotContain("Exchange");
}
});

Expand Down
Loading