From 3d88a0970fedb8e8abaa1c826344e56cfd1091de Mon Sep 17 00:00:00 2001 From: yantian Date: Fri, 31 Oct 2025 17:27:40 +0800 Subject: [PATCH 1/2] [core] format table: support file compression and custom default for diff format --- .../java/org/apache/paimon/CoreOptions.java | 17 ++++++ .../apache/paimon/catalog/CatalogUtils.java | 30 +++++++++++ .../org/apache/paimon/rest/RESTCatalog.java | 9 ++++ .../table/format/FormatTableFileWriter.java | 2 +- .../paimon/catalog/CatalogTestBase.java | 52 +++++++++++++++++++ .../org/apache/paimon/hive/HiveCatalog.java | 4 +- .../spark/SparkCatalogWithHiveTest.java | 6 +-- .../spark/sql/FormatTableTestBase.scala | 12 ++--- .../spark/table/PaimonFormatTableTest.scala | 38 ++++++-------- 9 files changed, 133 insertions(+), 37 deletions(-) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index f5c90b41969d..155c36b762d0 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2023,6 +2023,13 @@ public InlineElement getDescription() { .defaultValue(false) .withDescription("Format table file path only contain partition value."); + public static final ConfigOption FORMAT_TABLE_FILE_COMPRESSION = + ConfigOptions.key("format-table.file.compression") + .stringType() + .noDefaultValue() + .withFallbackKeys(FILE_COMPRESSION.key()) + .withDescription("Format table file compression."); + public static final ConfigOption BLOB_FIELD = key("blob-field") .stringType() @@ -2295,6 +2302,16 @@ public String fileCompression() { return options.get(FILE_COMPRESSION); } + public String formatTableFileImplementation() { + if (options.containsKey(FILE_COMPRESSION.key())) { + return options.get(FILE_COMPRESSION); + } else if (options.containsKey(FORMAT_TABLE_FILE_COMPRESSION.key())) { + return options.get(FORMAT_TABLE_FILE_COMPRESSION); + } else { + return fileCompression(); + } + } + public MemorySize fileReaderAsyncThreshold() { return options.get(FILE_READER_ASYNC_THRESHOLD); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index e9510e8d552d..a724c42882f7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -58,6 +58,8 @@ import java.util.function.Function; import static org.apache.paimon.CoreOptions.AUTO_CREATE; +import static org.apache.paimon.CoreOptions.FILE_COMPRESSION; +import static org.apache.paimon.CoreOptions.FORMAT_TABLE_FILE_COMPRESSION; import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME; import static org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGACY_NAME; import static org.apache.paimon.CoreOptions.PATH; @@ -65,6 +67,8 @@ import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; +import static org.apache.paimon.table.FormatTable.Format.CSV; +import static org.apache.paimon.table.FormatTable.Format.JSON; import static org.apache.paimon.table.system.AllPartitionsTable.ALL_PARTITIONS; import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS; import static org.apache.paimon.table.system.AllTablesTable.ALL_TABLES; @@ -273,6 +277,32 @@ public static Table loadTable( return table; } + public static String getFormatTableFileCompression(Map options) { + if (options.containsKey(FILE_COMPRESSION.key())) { + return options.get(FILE_COMPRESSION.key()); + } else if (options.containsKey(FORMAT_TABLE_FILE_COMPRESSION.key())) { + return options.get(FORMAT_TABLE_FILE_COMPRESSION.key()); + } else { + FormatTable.Format format = + FormatTable.parseFormat( + options.getOrDefault( + CoreOptions.FILE_FORMAT.key(), + CoreOptions.FILE_FORMAT.defaultValue())); + switch (format) { + case PARQUET: + return "snappy"; + case ORC: + return "zstd"; + case CSV: + case JSON: + return "none"; + default: + throw new UnsupportedOperationException( + String.format("Unsupported format: %s", format)); + } + } + } + private static Table createGlobalSystemTable(String tableName, Catalog catalog) throws Catalog.TableNotExistException { switch (tableName.toLowerCase()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 0d34e17bc8e2..870bccf9fc53 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.PagedList; import org.apache.paimon.Snapshot; +import org.apache.paimon.TableType; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; @@ -33,6 +34,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.function.Function; import org.apache.paimon.function.FunctionChange; +import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.partition.PartitionStatistics; import org.apache.paimon.rest.exceptions.AlreadyExistsException; @@ -76,10 +78,13 @@ import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.BRANCH; +import static org.apache.paimon.CoreOptions.FORMAT_TABLE_FILE_COMPRESSION; import static org.apache.paimon.CoreOptions.PATH; +import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; +import static org.apache.paimon.catalog.CatalogUtils.getFormatTableFileCompression; import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem; import static org.apache.paimon.catalog.CatalogUtils.validateCreateTable; @@ -443,6 +448,10 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx checkNotSystemTable(identifier, "createTable"); validateCreateTable(schema); createExternalTablePathIfNotExist(schema); + if (Options.fromMap(schema.options()).get(TYPE) == TableType.FORMAT_TABLE) { + String fileCompression = getFormatTableFileCompression(schema.options()); + schema.options().put(FORMAT_TABLE_FILE_COMPRESSION.key(), fileCompression); + } api.createTable(identifier, schema); } catch (AlreadyExistsException e) { if (!ignoreIfExists) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java index 3d6af1b227ae..31e380e248cc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java @@ -113,6 +113,6 @@ private FormatTableRecordWriter createWriter(BinaryRow partition) { pathFactory.createFormatTableDataFilePathFactory( partition, options.formatTablePartitionOnlyValueInPath()), writeRowType, - options.fileCompression()); + options.formatTableFileImplementation()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index ea3e2ebd8646..e9ba51dd6857 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -575,6 +575,58 @@ public void testCreateTable() throws Exception { .isInstanceOf(RuntimeException.class); } + @Test + public void testFormatTableFileCompression() throws Exception { + if (!supportsFormatTable()) { + return; + } + String dbName = "test_format_table_file_compression"; + catalog.createDatabase(dbName, true); + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f1", DataTypes.INT()); + schemaBuilder.option("type", "format-table"); + Pair[] format2ExpectDefaultFileCompression = { + Pair.of("csv", "none"), + Pair.of("parquet", "snappy"), + Pair.of("json", "none"), + Pair.of("orc", "zstd") + }; + for (Pair format2Compression : format2ExpectDefaultFileCompression) { + Identifier identifier = + Identifier.create( + dbName, + "partition_table_file_compression_" + format2Compression.getKey()); + schemaBuilder.option("file.format", format2Compression.getKey()); + catalog.createTable(identifier, schemaBuilder.build(), true); + String fileCompression = + catalog.getTable(identifier) + .options() + .get(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION.key()); + assertEquals(fileCompression, format2Compression.getValue()); + } + // table has option file.compression + String expectFileCompression = "none"; + schemaBuilder.option("file.format", "csv"); + schemaBuilder.option("file.compression", expectFileCompression); + Identifier identifier = Identifier.create(dbName, "partition_table_file_compression_a"); + catalog.createTable(identifier, schemaBuilder.build(), true); + String fileCompression = + catalog.getTable(identifier) + .options() + .get(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION.key()); + assertEquals(fileCompression, expectFileCompression); + + // table has option format-table.file.compression + schemaBuilder.option("format-table.file.compression", expectFileCompression); + identifier = Identifier.create(dbName, "partition_table_file_compression_b"); + catalog.createTable(identifier, schemaBuilder.build(), true); + fileCompression = + catalog.getTable(identifier) + .options() + .get(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION.key()); + assertEquals(fileCompression, expectFileCompression); + } + @Test public void testFormatTableOnlyPartitionValueRead() throws Exception { if (!supportsFormatTable()) { diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 7cb38c361719..fd876685e85d 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -111,6 +111,7 @@ import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; +import static org.apache.paimon.catalog.CatalogUtils.getFormatTableFileCompression; import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem; import static org.apache.paimon.catalog.Identifier.DEFAULT_MAIN_BRANCH; @@ -914,7 +915,8 @@ public void createFormatTable(Identifier identifier, Schema schema) { List primaryKeys = schema.primaryKeys(); Map options = schema.options(); int highestFieldId = RowType.currentHighestFieldId(fields); - + String fileCompression = getFormatTableFileCompression(schema.options()); + options.put(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION.key(), fileCompression); TableSchema newSchema = new TableSchema( 0, diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java index bcd69cdeb7da..583522822b9e 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java @@ -89,8 +89,7 @@ public void testCreateFormatTable() throws IOException { // test csv table spark.sql( - "CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c STRING) USING csv OPTIONS ('csv.field-delimiter' ',') " - + "TBLPROPERTIES ('file.compression'='none')"); + "CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c STRING) USING csv OPTIONS ('csv.field-delimiter' ',')"); spark.sql("INSERT INTO table_csv VALUES (1, 1, '1'), (2, 2, '2')").collect(); String r = spark.sql("DESCRIBE FORMATTED table_csv").collectAsList().toString(); assertThat(r).contains("sep=,"); @@ -103,8 +102,7 @@ public void testCreateFormatTable() throws IOException { // test json table spark.sql( - "CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c STRING) USING json " - + "TBLPROPERTIES ('file.compression'='none')"); + "CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c STRING) USING json "); spark.sql("INSERT INTO table_json VALUES(1, 1, '1'), (2, 2, '2')"); assertThat( spark.sql("SELECT * FROM table_json").collectAsList().stream() diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala index cb469f4da0b3..6161ccaccdb3 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala @@ -35,8 +35,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase { test("Format table: csv with field-delimiter") { withTable("t") { - sql( - s"CREATE TABLE t (f0 INT, f1 INT) USING CSV OPTIONS ('csv.field-delimiter' ';') TBLPROPERTIES ('file.compression'='none')") + sql(s"CREATE TABLE t (f0 INT, f1 INT) USING CSV OPTIONS ('csv.field-delimiter' ';')") val table = paimonCatalog.getTable(Identifier.create(hiveDbName, "t")).asInstanceOf[FormatTable] val csvFile = @@ -107,8 +106,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase { test("Format table: CTAS with partitioned table") { withTable("t1", "t2") { - sql( - "CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv PARTITIONED BY (p1, p2) TBLPROPERTIES ('file.compression'='none')") + sql("CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv PARTITIONED BY (p1, p2)") sql("INSERT INTO t1 VALUES (1, 2, 3)") assertThrows[UnsupportedOperationException] { @@ -125,8 +123,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase { test("Format table: read compressed files") { for (format <- Seq("csv", "json")) { withTable("compress_t") { - sql( - s"CREATE TABLE compress_t (a INT, b INT, c INT) USING $format TBLPROPERTIES ('file.compression'='none')") + sql(s"CREATE TABLE compress_t (a INT, b INT, c INT) USING $format") sql("INSERT INTO compress_t VALUES (1, 2, 3)") val table = paimonCatalog @@ -149,8 +146,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase { test("Format table: field delimiter in HMS") { withTable("t1") { - sql( - "CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv OPTIONS ('csv.field-delimiter' ';') TBLPROPERTIES ('file.compression'='none')") + sql("CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv OPTIONS ('csv.field-delimiter' ';')") val row = sql("SHOW CREATE TABLE t1").collect()(0) assert(row.toString().contains("'csv.field-delimiter' = ';'")) } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala index 44c6abb297c0..f7b91a0d3cb7 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala @@ -37,10 +37,8 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase { test("PaimonFormatTableRead table: csv mode") { val tableName = "paimon_format_test_csv_malformed" withTable(tableName) { - sql( - s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV TBLPROPERTIES (" + - s"'file.compression'='none', 'seq'='|', 'lineSep'='\n', " + - "'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)") + sql(s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV TBLPROPERTIES (" + + s"'seq'='|', 'lineSep'='\n', 'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)") val table = paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable] val partition = 20250920 @@ -64,11 +62,10 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase { test("PaimonFormatTable non partition table overwrite: csv") { val tableName = "paimon_non_partiiton_overwrite_test" withTable(tableName) { - spark.sql( - s""" - |CREATE TABLE $tableName (age INT, name STRING) - |USING CSV TBLPROPERTIES ('format-table.implementation'='paimon', 'file.compression'='none') - |""".stripMargin) + spark.sql(s""" + |CREATE TABLE $tableName (age INT, name STRING) + |USING CSV TBLPROPERTIES ('format-table.implementation'='paimon') + |""".stripMargin) val table = paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable] table.fileIO().mkdirs(new Path(table.location())) @@ -88,12 +85,11 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase { test("PaimonFormatTable partition table overwrite: csv") { val tableName = "paimon_overwrite_test" withTable(tableName) { - spark.sql( - s""" - |CREATE TABLE $tableName (age INT, name STRING) - |USING CSV TBLPROPERTIES ('format-table.implementation'='paimon', 'file.compression'='none') - |PARTITIONED BY (id INT) - |""".stripMargin) + spark.sql(s""" + |CREATE TABLE $tableName (age INT, name STRING) + |USING CSV TBLPROPERTIES ('format-table.implementation'='paimon') + |PARTITIONED BY (id INT) + |""".stripMargin) val table = paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable] table.fileIO().mkdirs(new Path(table.location())) @@ -115,10 +111,8 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase { test("PaimonFormatTableRead table: csv with field-delimiter") { val tableName = "paimon_format_test_csv_options" withTable(tableName) { - sql( - s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" + - s"'file.compression'='none', 'seq'='|', 'lineSep'='\n', " + - "'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)") + sql(s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" + + s"'seq'='|', 'lineSep'='\n', 'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)") val table = paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable] table.fileIO().mkdirs(new Path(table.location())) @@ -135,10 +129,8 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase { test("PaimonFormatTable: csv with partition path only value") { val tableName = "paimon_format_test_partition_path_only_value" withTable(tableName) { - sql( - s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" + - s"'file.compression'='none','format-table.implementation'='paimon'," + - "'format-table.partition-path-only-value'='true') PARTITIONED BY (`ds` bigint)") + sql(s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" + + s"'format-table.implementation'='paimon','format-table.partition-path-only-value'='true') PARTITIONED BY (`ds` bigint)") val table = paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable] table.fileIO().mkdirs(new Path(table.location())) From 46079503de9fc3ed0246d593326b60b6ddfdcdff Mon Sep 17 00:00:00 2001 From: yantian Date: Mon, 3 Nov 2025 11:38:33 +0800 Subject: [PATCH 2/2] add doc --- docs/layouts/shortcodes/generated/core_configuration.html | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index d26fa9b59d1f..fd073c622787 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -548,6 +548,12 @@ Boolean Whether to force the use of lookup for compaction. + +
format-table.file.compression
+ (none) + String + Format table file compression. +
format-table.implementation
paimon