Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,12 @@
<td>Boolean</td>
<td>Whether to force the use of lookup for compaction.</td>
</tr>
<tr>
<td><h5>format-table.file.compression</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Format table file compression.</td>
</tr>
<tr>
<td><h5>format-table.implementation</h5></td>
<td style="word-wrap: break-word;">paimon</td>
Expand Down
17 changes: 17 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2023,6 +2023,13 @@ public InlineElement getDescription() {
.defaultValue(false)
.withDescription("Format table file path only contain partition value.");

public static final ConfigOption<String> FORMAT_TABLE_FILE_COMPRESSION =
ConfigOptions.key("format-table.file.compression")
.stringType()
.noDefaultValue()
.withFallbackKeys(FILE_COMPRESSION.key())
.withDescription("Format table file compression.");

public static final ConfigOption<String> BLOB_FIELD =
key("blob-field")
.stringType()
Expand Down Expand Up @@ -2295,6 +2302,16 @@ public String fileCompression() {
return options.get(FILE_COMPRESSION);
}

public String formatTableFileImplementation() {
if (options.containsKey(FILE_COMPRESSION.key())) {
return options.get(FILE_COMPRESSION);
} else if (options.containsKey(FORMAT_TABLE_FILE_COMPRESSION.key())) {
return options.get(FORMAT_TABLE_FILE_COMPRESSION);
} else {
return fileCompression();
Copy link

Copilot AI Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback logic at line 2311 calls fileCompression() which returns options.get(FILE_COMPRESSION) and can be null. This creates a circular logic issue: if FILE_COMPRESSION key is not present (checked at line 2306), calling fileCompression() will still return null. This method can return null, causing potential NullPointerException when used. The method should either return a non-null default value based on format type (like CatalogUtils.getFormatTableFileCompression() does) or be annotated as @Nullable.

Suggested change
return fileCompression();
// Fallback to a sensible default value if neither key is present.
// For example, "none" or another appropriate default.
return "none";

Copilot uses AI. Check for mistakes.
}
}

public MemorySize fileReaderAsyncThreshold() {
return options.get(FILE_READER_ASYNC_THRESHOLD);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,17 @@
import java.util.function.Function;

import static org.apache.paimon.CoreOptions.AUTO_CREATE;
import static org.apache.paimon.CoreOptions.FILE_COMPRESSION;
import static org.apache.paimon.CoreOptions.FORMAT_TABLE_FILE_COMPRESSION;
import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
import static org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGACY_NAME;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.PRIMARY_KEY;
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.table.FormatTable.Format.CSV;
import static org.apache.paimon.table.FormatTable.Format.JSON;
import static org.apache.paimon.table.system.AllPartitionsTable.ALL_PARTITIONS;
import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
import static org.apache.paimon.table.system.AllTablesTable.ALL_TABLES;
Expand Down Expand Up @@ -273,6 +277,32 @@ public static Table loadTable(
return table;
}

public static String getFormatTableFileCompression(Map<String, String> options) {
if (options.containsKey(FILE_COMPRESSION.key())) {
return options.get(FILE_COMPRESSION.key());
} else if (options.containsKey(FORMAT_TABLE_FILE_COMPRESSION.key())) {
return options.get(FORMAT_TABLE_FILE_COMPRESSION.key());
} else {
FormatTable.Format format =
FormatTable.parseFormat(
options.getOrDefault(
CoreOptions.FILE_FORMAT.key(),
CoreOptions.FILE_FORMAT.defaultValue()));
switch (format) {
case PARQUET:
return "snappy";
case ORC:
return "zstd";
case CSV:
case JSON:
return "none";
default:
throw new UnsupportedOperationException(
String.format("Unsupported format: %s", format));
}
}
}

private static Table createGlobalSystemTable(String tableName, Catalog catalog)
throws Catalog.TableNotExistException {
switch (tableName.toLowerCase()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.PagedList;
import org.apache.paimon.Snapshot;
import org.apache.paimon.TableType;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
Expand All @@ -33,6 +34,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.function.Function;
import org.apache.paimon.function.FunctionChange;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.partition.PartitionStatistics;
import org.apache.paimon.rest.exceptions.AlreadyExistsException;
Expand Down Expand Up @@ -76,10 +78,13 @@
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.BRANCH;
import static org.apache.paimon.CoreOptions.FORMAT_TABLE_FILE_COMPRESSION;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.getFormatTableFileCompression;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.catalog.CatalogUtils.validateCreateTable;
Expand Down Expand Up @@ -443,6 +448,10 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx
checkNotSystemTable(identifier, "createTable");
validateCreateTable(schema);
createExternalTablePathIfNotExist(schema);
if (Options.fromMap(schema.options()).get(TYPE) == TableType.FORMAT_TABLE) {
String fileCompression = getFormatTableFileCompression(schema.options());
schema.options().put(FORMAT_TABLE_FILE_COMPRESSION.key(), fileCompression);
}
api.createTable(identifier, schema);
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ private FormatTableRecordWriter createWriter(BinaryRow partition) {
pathFactory.createFormatTableDataFilePathFactory(
partition, options.formatTablePartitionOnlyValueInPath()),
writeRowType,
options.fileCompression());
options.formatTableFileImplementation());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,58 @@ public void testCreateTable() throws Exception {
.isInstanceOf(RuntimeException.class);
}

@Test
public void testFormatTableFileCompression() throws Exception {
if (!supportsFormatTable()) {
return;
}
String dbName = "test_format_table_file_compression";
catalog.createDatabase(dbName, true);
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f1", DataTypes.INT());
schemaBuilder.option("type", "format-table");
Pair[] format2ExpectDefaultFileCompression = {
Pair.of("csv", "none"),
Pair.of("parquet", "snappy"),
Pair.of("json", "none"),
Pair.of("orc", "zstd")
};
for (Pair<String, String> format2Compression : format2ExpectDefaultFileCompression) {
Identifier identifier =
Identifier.create(
dbName,
"partition_table_file_compression_" + format2Compression.getKey());
schemaBuilder.option("file.format", format2Compression.getKey());
catalog.createTable(identifier, schemaBuilder.build(), true);
String fileCompression =
catalog.getTable(identifier)
.options()
.get(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION.key());
assertEquals(fileCompression, format2Compression.getValue());
}
// table has option file.compression
String expectFileCompression = "none";
schemaBuilder.option("file.format", "csv");
schemaBuilder.option("file.compression", expectFileCompression);
Identifier identifier = Identifier.create(dbName, "partition_table_file_compression_a");
catalog.createTable(identifier, schemaBuilder.build(), true);
String fileCompression =
catalog.getTable(identifier)
.options()
.get(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION.key());
assertEquals(fileCompression, expectFileCompression);

// table has option format-table.file.compression
schemaBuilder.option("format-table.file.compression", expectFileCompression);
identifier = Identifier.create(dbName, "partition_table_file_compression_b");
catalog.createTable(identifier, schemaBuilder.build(), true);
fileCompression =
catalog.getTable(identifier)
.options()
.get(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION.key());
assertEquals(fileCompression, expectFileCompression);
}

@Test
public void testFormatTableOnlyPartitionValueRead() throws Exception {
if (!supportsFormatTable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.getFormatTableFileCompression;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.catalog.Identifier.DEFAULT_MAIN_BRANCH;
Expand Down Expand Up @@ -914,7 +915,8 @@ public void createFormatTable(Identifier identifier, Schema schema) {
List<String> primaryKeys = schema.primaryKeys();
Map<String, String> options = schema.options();
int highestFieldId = RowType.currentHighestFieldId(fields);

String fileCompression = getFormatTableFileCompression(schema.options());
options.put(CoreOptions.FORMAT_TABLE_FILE_COMPRESSION.key(), fileCompression);
TableSchema newSchema =
new TableSchema(
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ public void testCreateFormatTable() throws IOException {
// test csv table

spark.sql(
"CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c STRING) USING csv OPTIONS ('csv.field-delimiter' ',') "
+ "TBLPROPERTIES ('file.compression'='none')");
"CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c STRING) USING csv OPTIONS ('csv.field-delimiter' ',')");
spark.sql("INSERT INTO table_csv VALUES (1, 1, '1'), (2, 2, '2')").collect();
String r = spark.sql("DESCRIBE FORMATTED table_csv").collectAsList().toString();
assertThat(r).contains("sep=,");
Expand All @@ -103,8 +102,7 @@ public void testCreateFormatTable() throws IOException {
// test json table

spark.sql(
"CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c STRING) USING json "
+ "TBLPROPERTIES ('file.compression'='none')");
"CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c STRING) USING json ");
spark.sql("INSERT INTO table_json VALUES(1, 1, '1'), (2, 2, '2')");
assertThat(
spark.sql("SELECT * FROM table_json").collectAsList().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase {

test("Format table: csv with field-delimiter") {
withTable("t") {
sql(
s"CREATE TABLE t (f0 INT, f1 INT) USING CSV OPTIONS ('csv.field-delimiter' ';') TBLPROPERTIES ('file.compression'='none')")
sql(s"CREATE TABLE t (f0 INT, f1 INT) USING CSV OPTIONS ('csv.field-delimiter' ';')")
val table =
paimonCatalog.getTable(Identifier.create(hiveDbName, "t")).asInstanceOf[FormatTable]
val csvFile =
Expand Down Expand Up @@ -107,8 +106,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase {

test("Format table: CTAS with partitioned table") {
withTable("t1", "t2") {
sql(
"CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv PARTITIONED BY (p1, p2) TBLPROPERTIES ('file.compression'='none')")
sql("CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv PARTITIONED BY (p1, p2)")
sql("INSERT INTO t1 VALUES (1, 2, 3)")

assertThrows[UnsupportedOperationException] {
Expand All @@ -125,8 +123,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase {
test("Format table: read compressed files") {
for (format <- Seq("csv", "json")) {
withTable("compress_t") {
sql(
s"CREATE TABLE compress_t (a INT, b INT, c INT) USING $format TBLPROPERTIES ('file.compression'='none')")
sql(s"CREATE TABLE compress_t (a INT, b INT, c INT) USING $format")
sql("INSERT INTO compress_t VALUES (1, 2, 3)")
val table =
paimonCatalog
Expand All @@ -149,8 +146,7 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase {

test("Format table: field delimiter in HMS") {
withTable("t1") {
sql(
"CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv OPTIONS ('csv.field-delimiter' ';') TBLPROPERTIES ('file.compression'='none')")
sql("CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv OPTIONS ('csv.field-delimiter' ';')")
val row = sql("SHOW CREATE TABLE t1").collect()(0)
assert(row.toString().contains("'csv.field-delimiter' = ';'"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
test("PaimonFormatTableRead table: csv mode") {
val tableName = "paimon_format_test_csv_malformed"
withTable(tableName) {
sql(
s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV TBLPROPERTIES (" +
s"'file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
"'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)")
sql(s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV TBLPROPERTIES (" +
s"'seq'='|', 'lineSep'='\n', 'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)")
val table =
paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable]
val partition = 20250920
Expand All @@ -64,11 +62,10 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
test("PaimonFormatTable non partition table overwrite: csv") {
val tableName = "paimon_non_partiiton_overwrite_test"
withTable(tableName) {
spark.sql(
s"""
|CREATE TABLE $tableName (age INT, name STRING)
|USING CSV TBLPROPERTIES ('format-table.implementation'='paimon', 'file.compression'='none')
|""".stripMargin)
spark.sql(s"""
|CREATE TABLE $tableName (age INT, name STRING)
|USING CSV TBLPROPERTIES ('format-table.implementation'='paimon')
|""".stripMargin)
val table =
paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable]
table.fileIO().mkdirs(new Path(table.location()))
Expand All @@ -88,12 +85,11 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
test("PaimonFormatTable partition table overwrite: csv") {
val tableName = "paimon_overwrite_test"
withTable(tableName) {
spark.sql(
s"""
|CREATE TABLE $tableName (age INT, name STRING)
|USING CSV TBLPROPERTIES ('format-table.implementation'='paimon', 'file.compression'='none')
|PARTITIONED BY (id INT)
|""".stripMargin)
spark.sql(s"""
|CREATE TABLE $tableName (age INT, name STRING)
|USING CSV TBLPROPERTIES ('format-table.implementation'='paimon')
|PARTITIONED BY (id INT)
|""".stripMargin)
val table =
paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable]
table.fileIO().mkdirs(new Path(table.location()))
Expand All @@ -115,10 +111,8 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
test("PaimonFormatTableRead table: csv with field-delimiter") {
val tableName = "paimon_format_test_csv_options"
withTable(tableName) {
sql(
s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" +
s"'file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
"'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)")
sql(s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" +
s"'seq'='|', 'lineSep'='\n', 'format-table.implementation'='paimon') PARTITIONED BY (`ds` bigint)")
val table =
paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable]
table.fileIO().mkdirs(new Path(table.location()))
Expand All @@ -135,10 +129,8 @@ class PaimonFormatTableTest extends PaimonSparkTestWithRestCatalogBase {
test("PaimonFormatTable: csv with partition path only value") {
val tableName = "paimon_format_test_partition_path_only_value"
withTable(tableName) {
sql(
s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" +
s"'file.compression'='none','format-table.implementation'='paimon'," +
"'format-table.partition-path-only-value'='true') PARTITIONED BY (`ds` bigint)")
sql(s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES (" +
s"'format-table.implementation'='paimon','format-table.partition-path-only-value'='true') PARTITIONED BY (`ds` bigint)")
val table =
paimonCatalog.getTable(Identifier.create("test_db", tableName)).asInstanceOf[FormatTable]
table.fileIO().mkdirs(new Path(table.location()))
Expand Down
Loading