Skip to content

Conversation

huleilei
Copy link
Contributor

Changes Made

Add the clickhouse data sink, and use it to writes the DataFrame to a ClickHouse table

Related Issues

Checklist

  • Documented in API Docs (if applicable)
  • Documented in User Guide (if applicable)
  • If adding a new documentation page, doc is added to docs/mkdocs.yml navigation
  • Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)

@github-actions github-actions bot added the feat label Jul 25, 2025
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR adds ClickHouse data sink functionality to Daft, enabling users to write DataFrames directly to ClickHouse tables. The implementation follows Daft's established DataSink pattern used by other database connectors like Lance and Turbopuffer.

The changes include:

  • New dependency management: Added clickhouse-connect >= 0.8.18 as an optional dependency in pyproject.toml and pinned version in requirements-dev.txt
  • ClickHouse data sink implementation: Created ClickHouseDataSink class in daft/io/clickhouse/clickhouse_data_sink.py that handles connection management, micropartition processing, and result aggregation
  • DataFrame integration: Added write_clickhouse() method to the DataFrame class with comprehensive parameter support for host, port, credentials, database, table, and custom client/write options
  • Module structure: Created the daft/io/clickhouse/ package directory with __init__.py

The implementation integrates with Daft's existing data sink architecture, using the three-phase pattern (write, finalize) for distributed execution. Users can now write DataFrames to ClickHouse using df.write_clickhouse(host="localhost", port=8123, table="my_table") and receive aggregated statistics about the write operation.

Confidence score: 2/5

  • This PR has several critical issues that need to be addressed before it can be safely merged
  • The main concerns are incomplete module setup, naming inconsistencies, and potential configuration security issues
  • Files that need more attention: daft/io/clickhouse/__init__.py, daft/io/clickhouse/clickhouse_data_sink.py, and pyproject.toml

Critical Issues:

  1. Empty module interface: daft/io/clickhouse/__init__.py is empty, making the ClickHouse functionality inaccessible to users who try to import it
  2. Naming inconsistencies: The data sink implementation contains multiple references to "ByteHouse" instead of "ClickHouse" in comments and variable names, suggesting incomplete adaptation from another codebase
  3. Configuration security flaw: The client kwargs merging logic allows user-provided parameters to potentially override explicit connection parameters
  4. Incomplete dependency setup: The 'clickhouse' extra is not included in the 'all' extras list in pyproject.toml
  5. Unused code: The ClickHouseConfig class is defined but never used

5 files reviewed, 6 comments

Edit Code Review Bot Settings | Greptile

pyproject.toml Outdated
@@ -25,6 +25,7 @@ requires-python = ">=3.9"
all = ["daft[aws, azure, gcp, ray, pandas, numpy, iceberg, deltalake, spark, sql, unity]"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The 'all' extra should include 'clickhouse' to ensure users who install all optional dependencies get ClickHouse support

Suggested change
all = ["daft[aws, azure, gcp, ray, pandas, numpy, iceberg, deltalake, spark, sql, unity]"]
all = ["daft[aws, azure, gcp, ray, pandas, numpy, iceberg, deltalake, spark, sql, unity, clickhouse]"]

Comment on lines 18 to 25
class ClickHouseConfig:
def __init__(self, host: str, port: int, user: str, password: str, database: str, table: str) -> None:
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
self.table = table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: This ClickHouseConfig class is defined but never used in the implementation. Consider removing it or integrating it into the main class design.

Comment on lines 49 to 50
client_kwargs = client_kwargs or {}
self._client_kwargs = {**client_kwargs, **self._client_kwargs}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The merging order allows client_kwargs to override explicit parameters. This could cause unexpected behavior if users pass conflicting values.

Suggested change
client_kwargs = client_kwargs or {}
self._client_kwargs = {**client_kwargs, **self._client_kwargs}
self._client_kwargs = {**self._client_kwargs, **client_kwargs}

return self._result_schema

def write(self, micropartitions: Iterator[MicroPartition]) -> Iterator[WriteResult[QuerySummary]]:
"""Writes to Bytehouse from the given micropartitions."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: Comment incorrectly references "Bytehouse" instead of "ClickHouse".

Suggested change
"""Writes to Bytehouse from the given micropartitions."""
"""Writes to ClickHouse from the given micropartitions."""

def write(self, micropartitions: Iterator[MicroPartition]) -> Iterator[WriteResult[QuerySummary]]:
"""Writes to Bytehouse from the given micropartitions."""
# socket cannot be serialized, so we need to create a new client in write
bh_client = get_client(**self._client_kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Variable name bh_client suggests ByteHouse instead of ClickHouse. Consider renaming for consistency.

Suggested change
bh_client = get_client(**self._client_kwargs)
ch_client = get_client(**self._client_kwargs)

bh_client.close()

def finalize(self, write_results: list[WriteResult[QuerySummary]]) -> MicroPartition:
"""Finish write to ByteHouse dataset. Returns a DataFrame with the stats of the dataset."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: Comment incorrectly references "ByteHouse" instead of "ClickHouse".

Suggested change
"""Finish write to ByteHouse dataset. Returns a DataFrame with the stats of the dataset."""
"""Finish write to ClickHouse dataset. Returns a DataFrame with the stats of the dataset."""



class ClickHouseDataSink(DataSink[QuerySummary]):
def __init__(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ClickHouseConfig has been defined. Why are the parameters still separated here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClickHouseConfig is no need



#clickhouse
clickhouse-connect==0.8.18
Copy link
Contributor

@Jay-ju Jay-ju Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this version have to be fixed? Why is it different from pyproject.toml?

tbl = MicroPartition.from_pydict(
{
"total_written_rows": pa.array([total_written_rows], pa.int64()),
"total_written_bytes": pa.array([total_written_bytes], pa.int64()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does total_written_bytes only exist in the datasink of ck and not in other datasinks? Can it be abstracted into the metrics of sink?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think total_written_bytes field is for ck. Different sinks correspond to different indicator information, right

Copy link

codecov bot commented Jul 25, 2025

Codecov Report

❌ Patch coverage is 98.11321% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 75.79%. Comparing base (64b3fe1) to head (ab6c5a7).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
daft/io/clickhouse/clickhouse_data_sink.py 97.77% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #4850      +/-   ##
==========================================
- Coverage   76.11%   75.79%   -0.32%     
==========================================
  Files         933      936       +3     
  Lines      128478   129552    +1074     
==========================================
+ Hits        97787    98200     +413     
- Misses      30691    31352     +661     
Files with missing lines Coverage Δ
daft/dataframe/dataframe.py 86.81% <100.00%> (+0.05%) ⬆️
daft/io/clickhouse/__init__.py 100.00% <100.00%> (ø)
daft/io/clickhouse/clickhouse_data_sink.py 97.77% <97.77%> (ø)

... and 44 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR adds ClickHouse data sink functionality to Daft, enabling users to write DataFrames directly to ClickHouse tables. The implementation follows Daft's established DataSink pattern used by other database connectors like Lance and Turbopuffer.

The recent changes address several critical issues from the previous review:

  • Fixed module interface: The daft/io/clickhouse/__init__.py file now properly imports and exposes the ClickHouseDataSink class through __all__
  • Corrected dependency configuration: The 'clickhouse' extra is now properly included in the 'all' extras list in pyproject.toml
  • Improved parameter handling: The ClickHouse data sink implementation properly merges connection parameters with user-provided client kwargs
  • Proper development setup: Added clickhouse-connect>=0.8.18 to requirements-dev.txt for development environment consistency

The implementation integrates seamlessly with Daft's existing data sink architecture, using the three-phase pattern (write, finalize) for distributed execution. Users can now write DataFrames to ClickHouse using df.write_clickhouse(host="localhost", port=8123, table="my_table") and receive aggregated statistics about the write operation. The data sink handles Arrow table conversion, manages ClickHouse client connections appropriately for distributed environments, and provides proper resource cleanup.

Confidence score: 4/5

  • This PR is now much safer to merge after addressing the critical issues from the previous review
  • The implementation follows established patterns and includes proper error handling and resource management
  • Files still needing attention: daft/io/clickhouse/clickhouse_data_sink.py for potential parameter override security considerations

5 files reviewed, 2 comments

Edit Code Review Bot Settings | Greptile

user: Optional[str] = None,
password: Optional[str] = None,
database: Optional[str] = None,
table: Optional[str] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: The table parameter should be required since the ClickHouseDataSink constructor raises ValueError if table is None/empty

Comment on lines 121 to 122
#clickhouse
clickhouse-connect>=0.8.18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider adding a comment explaining why version >=0.8.18 is required, similar to other pinned dependencies in this file.

password: str | None = None,
database: str | None = None,
table: str | None = None,
client_kwargs: dict[str, Any] | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the host/port in client_kwargs? What is usually filled in here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other params, eg: timeout、access_token

@huleilei
Copy link
Contributor Author

@Jay-ju Can you help me review code. Thanks.

@srilman
Copy link
Contributor

srilman commented Aug 18, 2025

@huleilei Once you're ready for a review on our side, can you add one of us as a reviewer?

@huleilei
Copy link
Contributor Author

huleilei commented Aug 19, 2025

@srilman @rchowell @universalmind303 @Jay-ju help me review when you are convenient. Thanks

Copy link
Collaborator

@desmondcheongzx desmondcheongzx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great, thanks!

@desmondcheongzx desmondcheongzx enabled auto-merge (squash) August 19, 2025 22:05
auto-merge was automatically disabled August 20, 2025 04:02

Head branch was pushed to by a user without write access

@huleilei
Copy link
Contributor Author

@desmondcheongzx hi, help me review when you are convenient. Thanks

@desmondcheongzx desmondcheongzx merged commit 23fd719 into Eventual-Inc:main Aug 20, 2025
52 checks passed
colin-ho pushed a commit that referenced this pull request Aug 20, 2025
## Changes Made

<!-- Describe what changes were made and why. Include implementation
details if necessary. -->
Add the clickhouse data sink, and use it to writes the DataFrame to a
ClickHouse table
## Related Issues

<!-- Link to related GitHub issues, e.g., "Closes #123" -->

## Checklist

- [ ] Documented in API Docs (if applicable)
- [ ] Documented in User Guide (if applicable)
- [ ] If adding a new documentation page, doc is added to
`docs/mkdocs.yml` navigation
- [ ] Documentation builds and is formatted properly (tag @/ccmao1130
for docs review)

---------

Co-authored-by: Desmond Cheong <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants