Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.iceberg.spark.source;

import static org.apache.iceberg.FileFormat.AVRO;
import static org.apache.iceberg.FileFormat.ORC;
import static org.apache.iceberg.FileFormat.PARQUET;
import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
Expand Down Expand Up @@ -50,6 +53,9 @@
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.SizeBasedFileRewritePlanner;
Expand All @@ -58,8 +64,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.orc.OrcFile;
Expand All @@ -69,73 +75,98 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class TestCompressionSettings extends SparkCatalogTestBase {
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(ParameterizedTestExtension.class)
public class TestCompressionSettings extends CatalogTestBase {

private static final Configuration CONF = new Configuration();
private static final String TABLE_NAME = "testWriteData";

private static SparkSession spark = null;

private final FileFormat format;
private final ImmutableMap<String, String> properties;
@Parameter(index = 3)
private FileFormat format;

@Parameter(index = 4)
private Map<String, String> properties;

@Rule public TemporaryFolder temp = new TemporaryFolder();
@TempDir private java.nio.file.Path temp;

@Parameterized.Parameters(name = "format = {0}, properties = {1}")
@Parameters(
name =
"catalogName = {0}, implementation = {1}, config = {2}, format = {3}, properties = {4}")
public static Object[][] parameters() {
return new Object[][] {
{"parquet", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")},
{"parquet", ImmutableMap.of(COMPRESSION_CODEC, "gzip")},
{"orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "speed")},
{"orc", ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "compression")},
{"avro", ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL, "3")}
{
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties(),
PARQUET,
ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_LEVEL, "1")
},
{
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties(),
PARQUET,
ImmutableMap.of(COMPRESSION_CODEC, "gzip")
},
{
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties(),
ORC,
ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "speed")
},
{
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties(),
ORC,
ImmutableMap.of(COMPRESSION_CODEC, "zstd", COMPRESSION_STRATEGY, "compression")
},
{
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties(),
AVRO,
ImmutableMap.of(COMPRESSION_CODEC, "snappy", COMPRESSION_LEVEL, "3")
}
};
}

@BeforeClass
@BeforeAll
public static void startSpark() {
TestCompressionSettings.spark = SparkSession.builder().master("local[2]").getOrCreate();
}

@Before
@BeforeEach
public void resetSpecificConfigurations() {
spark.conf().unset(COMPRESSION_CODEC);
spark.conf().unset(COMPRESSION_LEVEL);
spark.conf().unset(COMPRESSION_STRATEGY);
}

@Parameterized.AfterParam
public static void clearSourceCache() {
@AfterEach
public void afterEach() {
spark.sql(String.format("DROP TABLE IF EXISTS %s", TABLE_NAME));
}

@AfterClass
@AfterAll
public static void stopSpark() {
SparkSession currentSpark = TestCompressionSettings.spark;
TestCompressionSettings.spark = null;
currentSpark.stop();
}

public TestCompressionSettings(String format, ImmutableMap properties) {
super(
SparkCatalogConfig.SPARK.catalogName(),
SparkCatalogConfig.SPARK.implementation(),
SparkCatalogConfig.SPARK.properties());
this.format = FileFormat.fromString(format);
this.properties = properties;
}

@Test
@TestTemplate
public void testWriteDataWithDifferentSetting() throws Exception {
sql("CREATE TABLE %s (id int, data string) USING iceberg", TABLE_NAME);
Map<String, String> tableProperties = Maps.newHashMap();
Expand Down Expand Up @@ -168,6 +199,8 @@ public void testWriteDataWithDifferentSetting() throws Exception {
spark.conf().set(entry.getKey(), entry.getValue());
}

assertSparkConf();

df.select("id", "data")
.writeTo(TABLE_NAME)
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
Expand Down Expand Up @@ -230,4 +263,13 @@ private String getCompressionType(InputFile inputFile) throws Exception {
return fileReader.getMetaString(DataFileConstants.CODEC);
}
}

private void assertSparkConf() {
String[] propertiesToCheck = {COMPRESSION_CODEC, COMPRESSION_LEVEL, COMPRESSION_STRATEGY};
for (String prop : propertiesToCheck) {
String expected = properties.getOrDefault(prop, null);
String actual = spark.conf().get(prop, null);
assertThat(actual).isEqualToIgnoringCase(expected);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,28 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.After;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

public class TestRequiredDistributionAndOrdering extends SparkCatalogTestBase {
@ExtendWith(ParameterizedTestExtension.class)
public class TestRequiredDistributionAndOrdering extends CatalogTestBase {

public TestRequiredDistributionAndOrdering(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}

@After
@AfterEach
public void dropTestTable() {
sql("DROP TABLE IF EXISTS %s", tableName);
}

@Test
@TestTemplate
public void testDefaultLocalSort() throws NoSuchTableException {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
Expand Down Expand Up @@ -74,7 +71,7 @@ public void testDefaultLocalSort() throws NoSuchTableException {
sql("SELECT count(*) FROM %s", tableName));
}

@Test
@TestTemplate
public void testPartitionColumnsArePrependedForRangeDistribution() throws NoSuchTableException {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
Expand Down Expand Up @@ -110,7 +107,7 @@ public void testPartitionColumnsArePrependedForRangeDistribution() throws NoSuch
sql("SELECT count(*) FROM %s", tableName));
}

@Test
@TestTemplate
public void testSortOrderIncludesPartitionColumns() throws NoSuchTableException {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
Expand Down Expand Up @@ -142,7 +139,7 @@ public void testSortOrderIncludesPartitionColumns() throws NoSuchTableException
sql("SELECT count(*) FROM %s", tableName));
}

@Test
@TestTemplate
public void testDisabledDistributionAndOrdering() {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
Expand Down Expand Up @@ -176,7 +173,7 @@ public void testDisabledDistributionAndOrdering() {
+ "and by partition within each spec. Either cluster the incoming records or switch to fanout writers.");
}

@Test
@TestTemplate
public void testHashDistribution() throws NoSuchTableException {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
Expand Down Expand Up @@ -212,7 +209,7 @@ public void testHashDistribution() throws NoSuchTableException {
sql("SELECT count(*) FROM %s", tableName));
}

@Test
@TestTemplate
public void testSortBucketTransformsWithoutExtensions() throws NoSuchTableException {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
Expand All @@ -238,7 +235,7 @@ public void testSortBucketTransformsWithoutExtensions() throws NoSuchTableExcept
assertEquals("Rows must match", expected, sql("SELECT * FROM %s ORDER BY c1", tableName));
}

@Test
@TestTemplate
public void testRangeDistributionWithQuotedColumnsNames() throws NoSuchTableException {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, `c.3` STRING) "
Expand Down Expand Up @@ -274,7 +271,7 @@ public void testRangeDistributionWithQuotedColumnsNames() throws NoSuchTableExce
sql("SELECT count(*) FROM %s", tableName));
}

@Test
@TestTemplate
public void testHashDistributionWithQuotedColumnsNames() throws NoSuchTableException {
sql(
"CREATE TABLE %s (c1 INT, c2 STRING, `c``3` STRING) "
Expand Down
Loading