Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2023,6 +2023,13 @@ public InlineElement getDescription() {
.defaultValue(false)
.withDescription("Format table file path only contain partition value.");

public static final ConfigOption<String> FORMAT_TABLE_FILE_COMPRESSION =
ConfigOptions.key("format-table.file.compression")
.stringType()
.noDefaultValue()
.withFallbackKeys(FILE_COMPRESSION.key())
.withDescription("Format table file compression.");

public static final ConfigOption<String> BLOB_FIELD =
key("blob-field")
.stringType()
Expand Down Expand Up @@ -2295,6 +2302,16 @@ public String fileCompression() {
return options.get(FILE_COMPRESSION);
}

public String formatTableFileImplementation() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge getFormatTableFileCompression.

if (options.containsKey(FILE_COMPRESSION.key())) {
return options.get(FILE_COMPRESSION);
} else if (options.containsKey(FORMAT_TABLE_FILE_COMPRESSION.key())) {
return options.get(FORMAT_TABLE_FILE_COMPRESSION);
} else {
return fileCompression();
}
}

public MemorySize fileReaderAsyncThreshold() {
return options.get(FILE_READER_ASYNC_THRESHOLD);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,17 @@
import java.util.function.Function;

import static org.apache.paimon.CoreOptions.AUTO_CREATE;
import static org.apache.paimon.CoreOptions.FILE_COMPRESSION;
import static org.apache.paimon.CoreOptions.FORMAT_TABLE_FILE_COMPRESSION;
import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
import static org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGACY_NAME;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.PRIMARY_KEY;
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.table.FormatTable.Format.CSV;
import static org.apache.paimon.table.FormatTable.Format.JSON;
import static org.apache.paimon.table.system.AllPartitionsTable.ALL_PARTITIONS;
import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
import static org.apache.paimon.table.system.AllTablesTable.ALL_TABLES;
Expand Down Expand Up @@ -273,6 +277,32 @@ public static Table loadTable(
return table;
}

public static String getFormatTableFileCompression(Map<String, String> options) {
if (options.containsKey(FILE_COMPRESSION.key())) {
return options.get(FILE_COMPRESSION.key());
} else if (options.containsKey(FORMAT_TABLE_FILE_COMPRESSION.key())) {
return options.get(FORMAT_TABLE_FILE_COMPRESSION.key());
} else {
FormatTable.Format format =
FormatTable.parseFormat(
options.getOrDefault(
CoreOptions.FILE_FORMAT.key(),
CoreOptions.FILE_FORMAT.defaultValue()));
switch (format) {
case PARQUET:
return "snappy";
case ORC:
return "zstd";
case CSV:
case JSON:
return "none";
default:
throw new UnsupportedOperationException(
String.format("Unsupported format: %s", format));
}
}
}

private static Table createGlobalSystemTable(String tableName, Catalog catalog)
throws Catalog.TableNotExistException {
switch (tableName.toLowerCase()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.PagedList;
import org.apache.paimon.Snapshot;
import org.apache.paimon.TableType;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
Expand All @@ -33,6 +34,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.function.Function;
import org.apache.paimon.function.FunctionChange;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.partition.PartitionStatistics;
import org.apache.paimon.rest.exceptions.AlreadyExistsException;
Expand Down Expand Up @@ -76,10 +78,13 @@
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.BRANCH;
import static org.apache.paimon.CoreOptions.FORMAT_TABLE_FILE_COMPRESSION;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.getFormatTableFileCompression;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.catalog.CatalogUtils.validateCreateTable;
Expand Down Expand Up @@ -443,6 +448,10 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx
checkNotSystemTable(identifier, "createTable");
validateCreateTable(schema);
createExternalTablePathIfNotExist(schema);
if (Options.fromMap(schema.options()).get(TYPE) == TableType.FORMAT_TABLE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this

String fileCompression = getFormatTableFileCompression(schema.options());
schema.options().put(FORMAT_TABLE_FILE_COMPRESSION.key(), fileCompression);
}
api.createTable(identifier, schema);
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ private FormatTableRecordWriter createWriter(BinaryRow partition) {
pathFactory.createFormatTableDataFilePathFactory(
partition, options.formatTablePartitionOnlyValueInPath()),
writeRowType,
options.fileCompression());
options.formatTableFileImplementation());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,58 @@ public void testCreateTable() throws Exception {
.isInstanceOf(RuntimeException.class);
}

@Test
public void testFormatTableFileCompression() throws Exception {
if (!supportsFormatTable()) {
return;
}
String dbName = "test_format_table_file_compression";
catalog.createDatabase(dbName, true);
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f1", DataTypes.INT());
schemaBuilder.option("type", "format-table");
Pair[] format2ExpectDefaultFileCompression = {
Pair.of("csv", "none"),
Pair.of("parquet", "snappy"),
Pair.of("json", "none"),
Pair.of("orc", "zstd")
};
for (Pair<String, String> format2Compression : format2ExpectDefaultFileCompression) {
Identifier identifier =
Identifier.create(
dbName,
"partition_table_file_compression_" + format2Compression.getKey());
schemaBuilder.option("file.format", format2Compression.getKey());
catalog.createTable(identifier, schemaBuilder.build(), true);
String fileCompression =
catalog.getTable(identifier)
.options()
.get(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION.key());
assertEquals(fileCompression, format2Compression.getValue());
}
// table has option file.compression
String expectFileCompression = "none";
schemaBuilder.option("file.format", "csv");
schemaBuilder.option("file.compression", expectFileCompression);
Identifier identifier = Identifier.create(dbName, "partition_table_file_compression_a");
catalog.createTable(identifier, schemaBuilder.build(), true);
String fileCompression =
catalog.getTable(identifier)
.options()
.get(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION.key());
assertEquals(fileCompression, expectFileCompression);

// table has option format-table.file.compression
schemaBuilder.option("format-table.file.compression", expectFileCompression);
identifier = Identifier.create(dbName, "partition_table_file_compression_b");
catalog.createTable(identifier, schemaBuilder.build(), true);
fileCompression =
catalog.getTable(identifier)
.options()
.get(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION.key());
assertEquals(fileCompression, expectFileCompression);
}

@Test
public void testFormatTableOnlyPartitionValueRead() throws Exception {
if (!supportsFormatTable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.getFormatTableFileCompression;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.catalog.Identifier.DEFAULT_MAIN_BRANCH;
Expand Down Expand Up @@ -914,7 +915,8 @@ public void createFormatTable(Identifier identifier, Schema schema) {
List<String> primaryKeys = schema.primaryKeys();
Map<String, String> options = schema.options();
int highestFieldId = RowType.currentHighestFieldId(fields);

String fileCompression = getFormatTableFileCompression(schema.options());
options.put(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION.key(), fileCompression);
TableSchema newSchema =
new TableSchema(
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ public void testCreateFormatTable() throws IOException {
// test csv table

spark.sql(
"CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c STRING) USING csv OPTIONS ('csv.field-delimiter' ',') "
+ "TBLPROPERTIES ('file.compression'='none')");
"CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c STRING) USING csv OPTIONS ('csv.field-delimiter' ',')");
spark.sql("INSERT INTO table_csv VALUES (1, 1, '1'), (2, 2, '2')").collect();
String r = spark.sql("DESCRIBE FORMATTED table_csv").collectAsList().toString();
assertThat(r).contains("sep=,");
Expand All @@ -103,8 +102,7 @@ public void testCreateFormatTable() throws IOException {
// test json table

spark.sql(
"CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c STRING) USING json "
+ "TBLPROPERTIES ('file.compression'='none')");
"CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c STRING) USING json ");
spark.sql("INSERT INTO table_json VALUES(1, 1, '1'), (2, 2, '2')");
assertThat(
spark.sql("SELECT * FROM table_json").collectAsList().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase {

test("Format table: csv with field-delimiter") {
withTable("t") {
sql(
s"CREATE TABLE t (f0 INT, f1 INT) USING CSV OPTIONS ('csv.field-delimiter' ';') TBLPROPERTIES ('file.compression'='none')")
sql(s"CREATE TABLE t (f0 INT, f1 INT) USING CSV OPTIONS ('csv.field-delimiter' ';')")
val table =
paimonCatalog.getTable(Identifier.create(hiveDbName, "t")).asInstanceOf[FormatTable]
val csvFile =
Expand Down Expand Up @@ -107,8 +106,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase {

test("Format table: CTAS with partitioned table") {
withTable("t1", "t2") {
sql(
"CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv PARTITIONED BY (p1, p2) TBLPROPERTIES ('file.compression'='none')")
sql("CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv PARTITIONED BY (p1, p2)")
sql("INSERT INTO t1 VALUES (1, 2, 3)")

assertThrows[UnsupportedOperationException] {
Expand All @@ -125,8 +123,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase {
test("Format table: read compressed files") {
for (format <- Seq("csv", "json")) {
withTable("compress_t") {
sql(
s"CREATE TABLE compress_t (a INT, b INT, c INT) USING $format TBLPROPERTIES ('file.compression'='none')")
sql(s"CREATE TABLE compress_t (a INT, b INT, c INT) USING $format")
sql("INSERT INTO compress_t VALUES (1, 2, 3)")
val table =
paimonCatalog
Expand All @@ -149,8 +146,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase {

test("Format table: field delimiter in HMS") {
withTable("t1") {
sql(
"CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv OPTIONS ('csv.field-delimiter' ';') TBLPROPERTIES ('file.compression'='none')")
sql("CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv OPTIONS ('csv.field-delimiter' ';')")
val row = sql("SHOW CREATE TABLE t1").collect()(0)
assert(row.toString().contains("'csv.field-delimiter' = ';'"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
test("PaimonFormatTableRead table: csv mode") {
val tableName = "paimon_format_test_csv_malformed"
withTable(tableName) {
sql(
s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV TBLPROPERTIES (" +
s"'file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
"'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)")
sql(s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV TBLPROPERTIES (" +
s"'seq'='|', 'lineSep'='\n', 'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)")
val table =
paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable]
val partition = 20250920
Expand All @@ -64,11 +62,10 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
test("PaimonFormatTable non partition table overwrite: csv") {
val tableName = "paimon_non_partiiton_overwrite_test"
withTable(tableName) {
spark.sql(
s"""
|CREATE TABLE $tableName (age INT, name STRING)
|USING CSV TBLPROPERTIES ('format-table.implementation'='paimon', 'file.compression'='none')
|""".stripMargin)
spark.sql(s"""
|CREATE TABLE $tableName (age INT, name STRING)
|USING CSV TBLPROPERTIES ('format-table.implementation'='paimon')
|""".stripMargin)
val table =
paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable]
table.fileIO().mkdirs(new Path(table.location()))
Expand All @@ -88,12 +85,11 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
test("PaimonFormatTable partition table overwrite: csv") {
val tableName = "paimon_overwrite_test"
withTable(tableName) {
spark.sql(
s"""
|CREATE TABLE $tableName (age INT, name STRING)
|USING CSV TBLPROPERTIES ('format-table.implementation'='paimon', 'file.compression'='none')
|PARTITIONED BY (id INT)
|""".stripMargin)
spark.sql(s"""
|CREATE TABLE $tableName (age INT, name STRING)
|USING CSV TBLPROPERTIES ('format-table.implementation'='paimon')
|PARTITIONED BY (id INT)
|""".stripMargin)
val table =
paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable]
table.fileIO().mkdirs(new Path(table.location()))
Expand All @@ -115,10 +111,8 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
test("PaimonFormatTableRead table: csv with field-delimiter") {
val tableName = "paimon_format_test_csv_options"
withTable(tableName) {
sql(
s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" +
s"'file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
"'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)")
sql(s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" +
s"'seq'='|', 'lineSep'='\n', 'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)")
val table =
paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable]
table.fileIO().mkdirs(new Path(table.location()))
Expand All @@ -135,10 +129,8 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
test("PaimonFormatTable: csv with partition path only value") {
val tableName = "paimon_format_test_partition_path_only_value"
withTable(tableName) {
sql(
s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" +
s"'file.compression'='none','format-table.implementation'='paimon'," +
"'format-table.partition-path-only-value'='true') PARTITIONED BY (`ds` bigint)")
sql(s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" +
s"'format-table.implementation'='paimon','format-table.partition-path-only-value'='true') PARTITIONED BY (`ds` bigint)")
val table =
paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable]
table.fileIO().mkdirs(new Path(table.location()))
Expand Down
Loading