diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 3e2b78d3fbf4..889adf5a86c4 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -548,6 +548,12 @@ Boolean Whether to force the use of lookup for compaction. + +
format-table.file.compression
+ (none) + String + Format table file compression. +
format-table.implementation
paimon diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 7f8bc8728a32..590b7b6dd894 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -229,6 +229,8 @@ public InlineElement getDescription() { public static final String FILE_FORMAT_ORC = "orc"; public static final String FILE_FORMAT_AVRO = "avro"; public static final String FILE_FORMAT_PARQUET = "parquet"; + public static final String FILE_FORMAT_CSV = "csv"; + public static final String FILE_FORMAT_JSON = "json"; public static final ConfigOption FILE_FORMAT = key("file.format") @@ -2022,6 +2024,13 @@ public InlineElement getDescription() { .defaultValue(false) .withDescription("Format table file path only contain partition value."); + public static final ConfigOption FORMAT_TABLE_FILE_COMPRESSION = + ConfigOptions.key("format-table.file.compression") + .stringType() + .noDefaultValue() + .withFallbackKeys(FILE_COMPRESSION.key()) + .withDescription("Format table file compression."); + public static final ConfigOption BLOB_FIELD = key("blob-field") .stringType() @@ -2294,6 +2303,29 @@ public String fileCompression() { return options.get(FILE_COMPRESSION); } + public String formatTableFileCompression() { + if (options.containsKey(FILE_COMPRESSION.key())) { + return options.get(FILE_COMPRESSION.key()); + } else if (options.containsKey(FORMAT_TABLE_FILE_COMPRESSION.key())) { + return options.get(FORMAT_TABLE_FILE_COMPRESSION.key()); + } else { + String format = formatType(); + switch (format) { + case FILE_FORMAT_PARQUET: + return "snappy"; + case FILE_FORMAT_AVRO: + case FILE_FORMAT_ORC: + return "zstd"; + case FILE_FORMAT_CSV: + case FILE_FORMAT_JSON: + return "none"; + default: + throw new UnsupportedOperationException( + String.format("Unsupported format: %s", format)); + } + } + } + public MemorySize fileReaderAsyncThreshold() { return options.get(FILE_READER_ASYNC_THRESHOLD); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java index 3d6af1b227ae..5e6c59c57491 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java @@ -65,7 +65,7 @@ public FormatTableFileWriter( options.changelogFilePrefix(), options.legacyPartitionName(), options.fileSuffixIncludeCompression(), - options.fileCompression(), + options.formatTableFileCompression(), options.dataFilePathDirectory(), null, false); @@ -113,6 +113,6 @@ private FormatTableRecordWriter createWriter(BinaryRow partition) { pathFactory.createFormatTableDataFilePathFactory( partition, options.formatTablePartitionOnlyValueInPath()), writeRowType, - options.fileCompression()); + options.formatTableFileCompression()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index 4ddd6ab05fc3..45475cc5bc03 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -574,6 +574,58 @@ public void testCreateTable() throws Exception { .isInstanceOf(RuntimeException.class); } + @Test + public void testFormatTableFileCompression() throws Exception { + if (!supportsFormatTable()) { + return; + } + String dbName = "test_format_table_file_compression"; + catalog.createDatabase(dbName, true); + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f1", DataTypes.INT()); + schemaBuilder.option("type", "format-table"); + Pair[] format2ExpectDefaultFileCompression = { + Pair.of("csv", "none"), + Pair.of("parquet", "snappy"), + Pair.of("json", "none"), + Pair.of("orc", "zstd") + }; + for (Pair format2Compression : format2ExpectDefaultFileCompression) { + Identifier identifier = + Identifier.create( + dbName, + "partition_table_file_compression_" + format2Compression.getKey()); + schemaBuilder.option("file.format", format2Compression.getKey()); + catalog.createTable(identifier, schemaBuilder.build(), true); + String fileCompression = + catalog.getTable(identifier) + .options() + .get(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION.key()); + assertEquals(fileCompression, format2Compression.getValue()); + } + // table has option file.compression + String expectFileCompression = "gzip"; + schemaBuilder.option("file.format", "csv"); + schemaBuilder.option("file.compression", expectFileCompression); + Identifier identifier = Identifier.create(dbName, "partition_table_file_compression_a"); + catalog.createTable(identifier, schemaBuilder.build(), true); + String fileCompression = + catalog.getTable(identifier) + .options() + .get(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION.key()); + assertEquals(fileCompression, expectFileCompression); + + // table has option format-table.file.compression + schemaBuilder.option("format-table.file.compression", expectFileCompression); + identifier = Identifier.create(dbName, "partition_table_file_compression_b"); + catalog.createTable(identifier, schemaBuilder.build(), true); + fileCompression = + catalog.getTable(identifier) + .options() + .get(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION.key()); + assertEquals(fileCompression, expectFileCompression); + } + @Test public void testFormatTableOnlyPartitionValueRead() throws Exception { if (!supportsFormatTable()) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableITCase.java index 2e14093baf27..43f47e433e9f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FormatTableITCase.java @@ -37,26 +37,31 @@ public class FormatTableITCase extends RESTCatalogITCaseBase { @Test - public void testParquetFileFormat() { + public void testDiffFormat() { String bigDecimalStr = "10.001"; Decimal decimal = Decimal.fromBigDecimal(new BigDecimal(bigDecimalStr), 8, 3); - String tableName = "format_table_test"; - Identifier identifier = Identifier.create("default", tableName); - sql( - "CREATE TABLE %s (a DECIMAL(8, 3), b INT, c INT) WITH ('file.format'='parquet', 'type'='format-table')", - tableName); - RESTToken expiredDataToken = - new RESTToken( - ImmutableMap.of( - "akId", "akId-expire", "akSecret", UUID.randomUUID().toString()), - System.currentTimeMillis() + 1000_000); - restCatalogServer.setDataToken(identifier, expiredDataToken); - sql("INSERT INTO %s VALUES (%s, 1, 1), (%s, 2, 2)", tableName, decimal, decimal); - assertThat(sql("SELECT a, b FROM %s", tableName)) - .containsExactlyInAnyOrder( - Row.of(new BigDecimal(bigDecimalStr), 1), - Row.of(new BigDecimal(bigDecimalStr), 2)); - sql("Drop TABLE %s", tableName); + for (String format : new String[] {"parquet", "csv", "json"}) { + String tableName = "format_table_parquet_" + format.toLowerCase(); + Identifier identifier = Identifier.create("default", tableName); + sql( + "CREATE TABLE %s (a DECIMAL(8, 3), b INT, c INT) WITH ('file.format'='%s', 'type'='format-table')", + tableName, format); + RESTToken expiredDataToken = + new RESTToken( + ImmutableMap.of( + "akId", + "akId-expire", + "akSecret", + UUID.randomUUID().toString()), + System.currentTimeMillis() + 1000_000); + restCatalogServer.setDataToken(identifier, expiredDataToken); + sql("INSERT INTO %s VALUES (%s, 1, 1), (%s, 2, 2)", tableName, decimal, decimal); + assertThat(sql("SELECT a, b FROM %s", tableName)) + .containsExactlyInAnyOrder( + Row.of(new BigDecimal(bigDecimalStr), 1), + Row.of(new BigDecimal(bigDecimalStr), 2)); + sql("Drop TABLE %s", tableName); + } } @Test diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 7cb38c361719..bc99c35a4a9b 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -914,7 +914,6 @@ public void createFormatTable(Identifier identifier, Schema schema) { List primaryKeys = schema.primaryKeys(); Map options = schema.options(); int highestFieldId = RowType.currentHighestFieldId(fields); - TableSchema newSchema = new TableSchema( 0, diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java index bcd69cdeb7da..583522822b9e 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java @@ -89,8 +89,7 @@ public void testCreateFormatTable() throws IOException { // test csv table spark.sql( - "CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c STRING) USING csv OPTIONS ('csv.field-delimiter' ',') " - + "TBLPROPERTIES ('file.compression'='none')"); + "CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c STRING) USING csv OPTIONS ('csv.field-delimiter' ',')"); spark.sql("INSERT INTO table_csv VALUES (1, 1, '1'), (2, 2, '2')").collect(); String r = spark.sql("DESCRIBE FORMATTED table_csv").collectAsList().toString(); assertThat(r).contains("sep=,"); @@ -103,8 +102,7 @@ public void testCreateFormatTable() throws IOException { // test json table spark.sql( - "CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c STRING) USING json " - + "TBLPROPERTIES ('file.compression'='none')"); + "CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c STRING) USING json "); spark.sql("INSERT INTO table_json VALUES(1, 1, '1'), (2, 2, '2')"); assertThat( spark.sql("SELECT * FROM table_json").collectAsList().stream() diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala index 23b2873817d2..4a895b10b645 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala @@ -35,8 +35,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase { test("Format table: csv with field-delimiter") { withTable("t") { - sql( - s"CREATE TABLE t (f0 INT, f1 INT) USING CSV OPTIONS ('csv.field-delimiter' ';') TBLPROPERTIES ('file.compression'='none')") + sql(s"CREATE TABLE t (f0 INT, f1 INT) USING CSV OPTIONS ('csv.field-delimiter' ';')") val table = paimonCatalog.getTable(Identifier.create(hiveDbName, "t")).asInstanceOf[FormatTable] val csvFile = @@ -107,8 +106,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase { test("Format table: CTAS with partitioned table") { withTable("t1", "t2") { - sql( - "CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv PARTITIONED BY (p1, p2) TBLPROPERTIES ('file.compression'='none')") + sql("CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv PARTITIONED BY (p1, p2)") sql("INSERT INTO t1 VALUES (1, 2, 3)") assertThrows[UnsupportedOperationException] { @@ -125,8 +123,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase { test("Format table: read compressed files") { for (format <- Seq("csv", "json")) { withTable("compress_t") { - sql( - s"CREATE TABLE compress_t (a INT, b INT, c INT) USING $format TBLPROPERTIES ('file.compression'='none')") + sql(s"CREATE TABLE compress_t (a INT, b INT, c INT) USING $format") sql("INSERT INTO compress_t VALUES (1, 2, 3)") val table = paimonCatalog @@ -149,8 +146,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase { test("Format table: field delimiter in HMS") { withTable("t1") { - sql( - "CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv OPTIONS ('csv.field-delimiter' ';') TBLPROPERTIES ('file.compression'='none')") + sql("CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv OPTIONS ('csv.field-delimiter' ';')") val row = sql("SHOW CREATE TABLE t1").collect()(0) assert(row.toString().contains("'csv.field-delimiter' = ';'")) } @@ -158,8 +154,8 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase { test("Format table: broadcast join for small table") { withTable("t") { - sql("CREATE TABLE t1 (f0 INT, f1 INT) USING CSV TBLPROPERTIES ('file.compression'='none')") - sql("CREATE TABLE t2 (f0 INT, f2 INT) USING CSV TBLPROPERTIES ('file.compression'='none')") + sql("CREATE TABLE t1 (f0 INT, f1 INT) USING CSV") + sql("CREATE TABLE t2 (f0 INT, f2 INT) USING CSV") sql("INSERT INTO t1 VALUES (1, 1)") sql("INSERT INTO t2 VALUES (1, 1)") val df = sql("SELECT t1.f0, t1.f1, t2.f2 FROM t1, t2 WHERE t1.f0 = t2.f0") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala index d2b110448049..fc65cc28fea0 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala @@ -37,10 +37,8 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase { test("PaimonFormatTableRead table: csv mode") { val tableName = "paimon_format_test_csv_malformed" withTable(tableName) { - sql( - s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV TBLPROPERTIES (" + - s"'file.compression'='none', 'seq'='|', 'lineSep'='\n', " + - "'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)") + sql(s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV TBLPROPERTIES (" + + s"'seq'='|', 'lineSep'='\n', 'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)") val table = paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable] val partition = 20250920 @@ -83,11 +81,10 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase { test("PaimonFormatTable non partition table overwrite: csv") { val tableName = "paimon_non_partiiton_overwrite_test" withTable(tableName) { - spark.sql( - s""" - |CREATE TABLE $tableName (age INT, name STRING) - |USING CSV TBLPROPERTIES ('format-table.implementation'='paimon', 'file.compression'='none') - |""".stripMargin) + spark.sql(s""" + |CREATE TABLE $tableName (age INT, name STRING) + |USING CSV TBLPROPERTIES ('format-table.implementation'='paimon') + |""".stripMargin) val table = paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable] table.fileIO().mkdirs(new Path(table.location())) @@ -107,12 +104,11 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase { test("PaimonFormatTable partition table overwrite: csv") { val tableName = "paimon_overwrite_test" withTable(tableName) { - spark.sql( - s""" - |CREATE TABLE $tableName (age INT, name STRING) - |USING CSV TBLPROPERTIES ('format-table.implementation'='paimon', 'file.compression'='none') - |PARTITIONED BY (id INT) - |""".stripMargin) + spark.sql(s""" + |CREATE TABLE $tableName (age INT, name STRING) + |USING CSV TBLPROPERTIES ('format-table.implementation'='paimon') + |PARTITIONED BY (id INT) + |""".stripMargin) val table = paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable] table.fileIO().mkdirs(new Path(table.location())) @@ -134,10 +130,8 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase { test("PaimonFormatTableRead table: csv with field-delimiter") { val tableName = "paimon_format_test_csv_options" withTable(tableName) { - sql( - s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" + - s"'file.compression'='none', 'seq'='|', 'lineSep'='\n', " + - "'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)") + sql(s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" + + s"'seq'='|', 'lineSep'='\n', 'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)") val table = paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable] table.fileIO().mkdirs(new Path(table.location())) @@ -154,10 +148,8 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase { test("PaimonFormatTable: csv with partition path only value") { val tableName = "paimon_format_test_partition_path_only_value" withTable(tableName) { - sql( - s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" + - s"'file.compression'='none','format-table.implementation'='paimon'," + - "'format-table.partition-path-only-value'='true') PARTITIONED BY (`ds` bigint)") + sql(s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" + + s"'format-table.implementation'='paimon','format-table.partition-path-only-value'='true') PARTITIONED BY (`ds` bigint)") val table = paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable] table.fileIO().mkdirs(new Path(table.location()))