Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions _duckdb-stubs/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,7 @@ class DuckDBPyRelation:
partition_by: pytyping.List[str] | None = None,
write_partition_columns: bool | None = None,
append: bool | None = None,
filename_pattern: str | None = None,
) -> None: ...
def to_table(self, table_name: str) -> None: ...
def to_view(self, view_name: str, replace: bool = True) -> DuckDBPyRelation: ...
Expand Down Expand Up @@ -772,6 +773,7 @@ class DuckDBPyRelation:
partition_by: pytyping.List[str] | None = None,
write_partition_columns: bool | None = None,
append: bool | None = None,
filename_pattern: str | None = None,
) -> None: ...
@property
def alias(self) -> str: ...
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb_py/include/duckdb_python/pyrelation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ struct DuckDBPyRelation {
const py::object &row_group_size = py::none(), const py::object &overwrite = py::none(),
const py::object &per_thread_output = py::none(), const py::object &use_tmp_file = py::none(),
const py::object &partition_by = py::none(), const py::object &write_partition_columns = py::none(),
const py::object &append = py::none());
const py::object &append = py::none(), const py::object &filename_pattern = py::none());

void ToCSV(const string &filename, const py::object &sep = py::none(), const py::object &na_rep = py::none(),
const py::object &header = py::none(), const py::object &quotechar = py::none(),
Expand Down
10 changes: 9 additions & 1 deletion src/duckdb_py/pyrelation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,8 @@ void DuckDBPyRelation::ToParquet(const string &filename, const py::object &compr
const py::object &row_group_size_bytes, const py::object &row_group_size,
const py::object &overwrite, const py::object &per_thread_output,
const py::object &use_tmp_file, const py::object &partition_by,
const py::object &write_partition_columns, const py::object &append) {
const py::object &write_partition_columns, const py::object &append,
const py::object &filename_pattern) {
case_insensitive_map_t<vector<Value>> options;

if (!py::none().is(compression)) {
Expand Down Expand Up @@ -1304,6 +1305,13 @@ void DuckDBPyRelation::ToParquet(const string &filename, const py::object &compr
options["use_tmp_file"] = {Value::BOOLEAN(py::bool_(use_tmp_file))};
}

if (!py::none().is(filename_pattern)) {
if (!py::isinstance<py::str>(filename_pattern)) {
throw InvalidInputException("to_parquet only accepts 'filename_pattern' as a string");
}
options["filename_pattern"] = {Value(py::str(filename_pattern))};
}

auto write_parquet = rel->WriteParquetRel(filename, std::move(options));
PyExecuteRelation(write_parquet);
}
Expand Down
3 changes: 2 additions & 1 deletion src/duckdb_py/pyrelation/initialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ static void InitializeConsumers(py::class_<DuckDBPyRelation> &m) {
py::arg("row_group_size_bytes") = py::none(), py::arg("row_group_size") = py::none(),
py::arg("overwrite") = py::none(), py::arg("per_thread_output") = py::none(),
py::arg("use_tmp_file") = py::none(), py::arg("partition_by") = py::none(),
py::arg("write_partition_columns") = py::none(), py::arg("append") = py::none());
py::arg("write_partition_columns") = py::none(), py::arg("append") = py::none(),
py::arg("filename_pattern") = py::none());

DefineMethod(
{"to_csv", "write_csv"}, m, &DuckDBPyRelation::ToCSV, "Write the relation object to a CSV file in 'file_name'",
Expand Down
55 changes: 55 additions & 0 deletions tests/fast/api/test_to_parquet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import pathlib
import re
import tempfile

import pytest
Expand Down Expand Up @@ -170,3 +172,56 @@ def test_append(self, pd):
("shinji", 123.0, "a"),
]
assert result.execute().fetchall() == expected

@pytest.mark.parametrize("pd", [NumpyPandas(), ArrowPandas()])
def test_filename_pattern_with_index(self, pd):
temp_file_name = os.path.join(tempfile.mkdtemp(), next(tempfile._get_candidate_names())) # noqa: PTH118
df = pd.DataFrame(
{
"name": ["rei", "shinji", "asuka", "kaworu"],
"float": [321.0, 123.0, 23.0, 340.0],
"category": ["a", "a", "b", "c"],
}
)
rel = duckdb.from_df(df)
rel.to_parquet(temp_file_name, partition_by=["category"], filename_pattern="orders_{i}")
# Check that files follow the pattern with {i}
files_a = list(pathlib.Path(f"{temp_file_name}/category=a").iterdir())
files_b = list(pathlib.Path(f"{temp_file_name}/category=b").iterdir())
files_c = list(pathlib.Path(f"{temp_file_name}/category=c").iterdir())
filename_pattern = re.compile(r"^orders_[09]+\.parquet$")
assert all(filename_pattern.search(str(f.name)) for f in files_a)
assert all(filename_pattern.search(str(f.name)) for f in files_b)
assert all(filename_pattern.search(str(f.name)) for f in files_c)

# Verify data integrity
result = duckdb.sql(f"FROM read_parquet('{temp_file_name}/*/*.parquet', hive_partitioning=TRUE)")
expected = [("rei", 321.0, "a"), ("shinji", 123.0, "a"), ("asuka", 23.0, "b"), ("kaworu", 340.0, "c")]
assert result.execute().fetchall() == expected

@pytest.mark.parametrize("pd", [NumpyPandas(), ArrowPandas()])
def test_filename_pattern_with_uuid(self, pd):
temp_file_name = os.path.join(tempfile.mkdtemp(), next(tempfile._get_candidate_names())) # noqa: PTH118
df = pd.DataFrame(
{
"name": ["rei", "shinji", "asuka", "kaworu"],
"float": [321.0, 123.0, 23.0, 340.0],
"category": ["a", "a", "b", "c"],
}
)
rel = duckdb.from_df(df)
rel.to_parquet(temp_file_name, partition_by=["category"], filename_pattern="file_{uuid}")
# Check that files follow the pattern with {uuid}
files_a = list(pathlib.Path(f"{temp_file_name}/category=a").iterdir())
files_b = list(pathlib.Path(f"{temp_file_name}/category=b").iterdir())
files_c = list(pathlib.Path(f"{temp_file_name}/category=c").iterdir())
filename_pattern = re.compile(r"^file_[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}\.parquet$")
print(files_a)
assert all(filename_pattern.search(str(f.name)) for f in files_a)
assert all(filename_pattern.search(str(f.name)) for f in files_b)
assert all(filename_pattern.search(str(f.name)) for f in files_c)

# Verify data integrity
result = duckdb.sql(f"FROM read_parquet('{temp_file_name}/*/*.parquet', hive_partitioning=TRUE)")
expected = [("rei", 321.0, "a"), ("shinji", 123.0, "a"), ("asuka", 23.0, "b"), ("kaworu", 340.0, "c")]
assert result.execute().fetchall() == expected