diff --git a/_duckdb-stubs/__init__.pyi b/_duckdb-stubs/__init__.pyi index a4965632..6b323184 100644 --- a/_duckdb-stubs/__init__.pyi +++ b/_duckdb-stubs/__init__.pyi @@ -720,6 +720,7 @@ class DuckDBPyRelation: partition_by: pytyping.List[str] | None = None, write_partition_columns: bool | None = None, append: bool | None = None, + filename_pattern: str | None = None, ) -> None: ... def to_table(self, table_name: str) -> None: ... def to_view(self, view_name: str, replace: bool = True) -> DuckDBPyRelation: ... @@ -772,6 +773,7 @@ class DuckDBPyRelation: partition_by: pytyping.List[str] | None = None, write_partition_columns: bool | None = None, append: bool | None = None, + filename_pattern: str | None = None, ) -> None: ... @property def alias(self) -> str: ... diff --git a/src/duckdb_py/include/duckdb_python/pyrelation.hpp b/src/duckdb_py/include/duckdb_python/pyrelation.hpp index e272ca41..06cf9e94 100644 --- a/src/duckdb_py/include/duckdb_python/pyrelation.hpp +++ b/src/duckdb_py/include/duckdb_python/pyrelation.hpp @@ -214,7 +214,7 @@ struct DuckDBPyRelation { const py::object &row_group_size = py::none(), const py::object &overwrite = py::none(), const py::object &per_thread_output = py::none(), const py::object &use_tmp_file = py::none(), const py::object &partition_by = py::none(), const py::object &write_partition_columns = py::none(), - const py::object &append = py::none()); + const py::object &append = py::none(), const py::object &filename_pattern = py::none()); void ToCSV(const string &filename, const py::object &sep = py::none(), const py::object &na_rep = py::none(), const py::object &header = py::none(), const py::object "echar = py::none(), diff --git a/src/duckdb_py/pyrelation.cpp b/src/duckdb_py/pyrelation.cpp index 08b001be..bbc7a2ec 100644 --- a/src/duckdb_py/pyrelation.cpp +++ b/src/duckdb_py/pyrelation.cpp @@ -1213,7 +1213,8 @@ void DuckDBPyRelation::ToParquet(const string &filename, const py::object &compr const py::object &row_group_size_bytes, const py::object &row_group_size, const py::object &overwrite, const py::object &per_thread_output, const py::object &use_tmp_file, const py::object &partition_by, - const py::object &write_partition_columns, const py::object &append) { + const py::object &write_partition_columns, const py::object &append, + const py::object &filename_pattern) { case_insensitive_map_t> options; if (!py::none().is(compression)) { @@ -1304,6 +1305,13 @@ void DuckDBPyRelation::ToParquet(const string &filename, const py::object &compr options["use_tmp_file"] = {Value::BOOLEAN(py::bool_(use_tmp_file))}; } + if (!py::none().is(filename_pattern)) { + if (!py::isinstance(filename_pattern)) { + throw InvalidInputException("to_parquet only accepts 'filename_pattern' as a string"); + } + options["filename_pattern"] = {Value(py::str(filename_pattern))}; + } + auto write_parquet = rel->WriteParquetRel(filename, std::move(options)); PyExecuteRelation(write_parquet); } diff --git a/src/duckdb_py/pyrelation/initialize.cpp b/src/duckdb_py/pyrelation/initialize.cpp index cd1f042c..7bfea441 100644 --- a/src/duckdb_py/pyrelation/initialize.cpp +++ b/src/duckdb_py/pyrelation/initialize.cpp @@ -36,7 +36,8 @@ static void InitializeConsumers(py::class_ &m) { py::arg("row_group_size_bytes") = py::none(), py::arg("row_group_size") = py::none(), py::arg("overwrite") = py::none(), py::arg("per_thread_output") = py::none(), py::arg("use_tmp_file") = py::none(), py::arg("partition_by") = py::none(), - py::arg("write_partition_columns") = py::none(), py::arg("append") = py::none()); + py::arg("write_partition_columns") = py::none(), py::arg("append") = py::none(), + py::arg("filename_pattern") = py::none()); DefineMethod( {"to_csv", "write_csv"}, m, &DuckDBPyRelation::ToCSV, "Write the relation object to a CSV file in 'file_name'", diff --git a/tests/fast/api/test_to_parquet.py b/tests/fast/api/test_to_parquet.py index 8d8162b0..f0952e68 100644 --- a/tests/fast/api/test_to_parquet.py +++ b/tests/fast/api/test_to_parquet.py @@ -1,4 +1,6 @@ import os +import pathlib +import re import tempfile import pytest @@ -170,3 +172,56 @@ def test_append(self, pd): ("shinji", 123.0, "a"), ] assert result.execute().fetchall() == expected + + @pytest.mark.parametrize("pd", [NumpyPandas(), ArrowPandas()]) + def test_filename_pattern_with_index(self, pd): + temp_file_name = os.path.join(tempfile.mkdtemp(), next(tempfile._get_candidate_names())) # noqa: PTH118 + df = pd.DataFrame( + { + "name": ["rei", "shinji", "asuka", "kaworu"], + "float": [321.0, 123.0, 23.0, 340.0], + "category": ["a", "a", "b", "c"], + } + ) + rel = duckdb.from_df(df) + rel.to_parquet(temp_file_name, partition_by=["category"], filename_pattern="orders_{i}") + # Check that files follow the pattern with {i} + files_a = list(pathlib.Path(f"{temp_file_name}/category=a").iterdir()) + files_b = list(pathlib.Path(f"{temp_file_name}/category=b").iterdir()) + files_c = list(pathlib.Path(f"{temp_file_name}/category=c").iterdir()) + filename_pattern = re.compile(r"^orders_[09]+\.parquet$") + assert all(filename_pattern.search(str(f.name)) for f in files_a) + assert all(filename_pattern.search(str(f.name)) for f in files_b) + assert all(filename_pattern.search(str(f.name)) for f in files_c) + + # Verify data integrity + result = duckdb.sql(f"FROM read_parquet('{temp_file_name}/*/*.parquet', hive_partitioning=TRUE)") + expected = [("rei", 321.0, "a"), ("shinji", 123.0, "a"), ("asuka", 23.0, "b"), ("kaworu", 340.0, "c")] + assert result.execute().fetchall() == expected + + @pytest.mark.parametrize("pd", [NumpyPandas(), ArrowPandas()]) + def test_filename_pattern_with_uuid(self, pd): + temp_file_name = os.path.join(tempfile.mkdtemp(), next(tempfile._get_candidate_names())) # noqa: PTH118 + df = pd.DataFrame( + { + "name": ["rei", "shinji", "asuka", "kaworu"], + "float": [321.0, 123.0, 23.0, 340.0], + "category": ["a", "a", "b", "c"], + } + ) + rel = duckdb.from_df(df) + rel.to_parquet(temp_file_name, partition_by=["category"], filename_pattern="file_{uuid}") + # Check that files follow the pattern with {uuid} + files_a = list(pathlib.Path(f"{temp_file_name}/category=a").iterdir()) + files_b = list(pathlib.Path(f"{temp_file_name}/category=b").iterdir()) + files_c = list(pathlib.Path(f"{temp_file_name}/category=c").iterdir()) + filename_pattern = re.compile(r"^file_[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}\.parquet$") + print(files_a) + assert all(filename_pattern.search(str(f.name)) for f in files_a) + assert all(filename_pattern.search(str(f.name)) for f in files_b) + assert all(filename_pattern.search(str(f.name)) for f in files_c) + + # Verify data integrity + result = duckdb.sql(f"FROM read_parquet('{temp_file_name}/*/*.parquet', hive_partitioning=TRUE)") + expected = [("rei", 321.0, "a"), ("shinji", 123.0, "a"), ("asuka", 23.0, "b"), ("kaworu", 340.0, "c")] + assert result.execute().fetchall() == expected