Skip to content

Commit

Permalink
Initial Polars Lazyframe Support (#775)
Browse files Browse the repository at this point in the history
This PR is to aid for support of Polars LazyFrames in Hamilton.

* Extended applicable types for Polars writers

The applicable types for PolarsCSVWriter, PolarsParquetWriter, and PolarsFeatherWriter have been extended to include pl.LazyFrame in addition to the existing pl.DataFrame. This change allows these writer classes to handle both eager and lazy data frames from the polars library.

* Updated PolarsLazyFrameResult and data writers

The PolarsLazyFrameResult class now uses the PolarsLazyFrameResult instead of the PolarsDataFrameResult. The logging statement in register_types() has been removed. DataSaver classes have been updated to handle both DATAFRAME_TYPE and pl.LazyFrame types, with a check added to collect data if it's a LazyFrame before saving. Tests have been updated and expanded to cover these changes, including checks for applicable types and correct handling of LazyFrames.

* Extended support for LazyFrame in Polars extensions

The applicable_types method in the PolarsSpreadsheetWriter class and corresponding test assertions have been updated to include pl.LazyFrame, along with the existing DATAFRAME_TYPE. This change extends the functionality of our Polars extensions to handle LazyFrames as well as DataFrames.

* Added Polars LazyFrame example

This update introduces a new example demonstrating the use of Polars LazyFrame. The changes include:
- Creation of two new Python scripts: one defining functions for loading data and calculating spend per signup, and another script to execute these functions.
- Addition of a README file explaining how to run the example, visualize execution, and detailing some caveats with Polars.
- Inclusion of a requirements.txt file specifying necessary dependencies.
- Addition of sample CSV data for testing purposes.

* Updated data loading method in tests

The test methods for PolarsScanParquetReader and PolarsScanFeatherReader have been updated. Instead of using pl.DataFrame to load data, they now use pl.LazyFrame. This change aligns with the applicable types for these readers.

Notes
I've also had to update the get_dataframe_metadata in utils.py to allow it to work with Lazyframes that don't have a row count. I abstracted all the lookups so that if others passed/failed in the future for support of other read/writers they would return what they can.

---------

Co-authored-by: Tom Barber <[email protected]>
  • Loading branch information
buggtb and Tom Barber authored Mar 28, 2024
1 parent 92ebea9 commit 39ce9e0
Show file tree
Hide file tree
Showing 13 changed files with 656 additions and 36 deletions.
37 changes: 37 additions & 0 deletions examples/polars/lazyframe/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Classic Hamilton Hello World

In this example we show you how to create a simple hello world dataflow that
creates a polars lazyframe as a result. It performs a series of transforms on the
input to create columns that appear in the output.

File organization:

* `my_functions.py` houses the logic that we want to compute.
Note (1) how the functions are named, and what input
parameters they require. That is how we create a DAG modeling the dataflow we want to happen.
* `my_script.py` houses how to get Hamilton to create the DAG, specifying that we want a polars dataframe and
exercise it with some inputs.

To run things:
```bash
> python my_script.py
```

# Visualizing Execution
Here is the graph of execution - which should look the same as the pandas example:

![polars](polars.png)

# Caveat with Polars
There is one major caveat with Polars to be aware of: THERE IS NO INDEX IN POLARS LIKE THERE IS WITH PANDAS.

What this means is that when you tell Hamilton to execute and return a polars dataframe if you are using the
[provided results builder](https://github.com/dagworks-inc/hamilton/blob/sf-hamilton-1.14.1/hamilton/plugins/h_polars.py#L8), i.e. `hamilton.plugins.h_polars.PolarsResultsBuilder`, then you will have to
ensure the row order matches the order you expect for all the outputs you request. E.g. if you do a filter, or a sort,
or a join, or a groupby, you will have to ensure that when you ask Hamilton to materialize an output that it's in the
order you expect.

If you have questions, or need help with this example,
join us on [slack](https://join.slack.com/t/hamilton-opensource/shared_invite/zt-1bjs72asx-wcUTgH7q7QX1igiQ5bbdcg), and we'll try to help!

Otherwise if you have ideas on how to better make Hamilton work with Polars, please open an issue or start a discussion!
15 changes: 15 additions & 0 deletions examples/polars/lazyframe/my_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import polars as pl

from hamilton.function_modifiers import load_from, value


@load_from.csv(file=value("./sample_data.csv"))
def raw_data(data: pl.LazyFrame) -> pl.LazyFrame:
return data


def spend_per_signup(raw_data: pl.LazyFrame) -> pl.LazyFrame:
"""Computes cost per signup in relation to spend."""
return raw_data.select("spend", "signups").with_columns(
[(pl.col("spend") / pl.col("signups")).alias("spend_per_signup")]
)
33 changes: 33 additions & 0 deletions examples/polars/lazyframe/my_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging
import sys

from hamilton import base, driver
from hamilton.plugins import h_polars_lazyframe

logging.basicConfig(stream=sys.stdout)

# Create a driver instance. If you want to run the compute in the final node you can also use
# h_polars.PolarsDataFrameResult() and you don't need to run collect at the end. Which you use
# probably depends on whether you want to use the LazyFrame in more nodes in another DAG before
# computing the result.
adapter = base.SimplePythonGraphAdapter(result_builder=h_polars_lazyframe.PolarsLazyFrameResult())
import my_functions # where our functions are defined

dr = driver.Driver({}, my_functions, adapter=adapter)
output_columns = [
"spend_per_signup",
]
# let's create the lazyframe!
df = dr.execute(output_columns)
# Here we just print the Lazyframe plan
print(df)

# Now we run the query
df = df.collect()

# And print the table.
print(df)

# To visualize do `pip install "sf-hamilton[visualization]"` if you want these to work
# dr.visualize_execution(output_columns, './polars', {"format": "png"})
# dr.display_all_functions('./my_full_dag.dot')
2 changes: 2 additions & 0 deletions examples/polars/lazyframe/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
polars
sf-hamilton
7 changes: 7 additions & 0 deletions examples/polars/lazyframe/sample_data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
signups,spend
1,10
10,10
50,20
100,40
200,40
400,50
1 change: 1 addition & 0 deletions hamilton/function_modifiers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"pandas",
"plotly",
"polars",
"polars_lazyframe",
"pyspark_pandas",
"spark",
"dask",
Expand Down
29 changes: 21 additions & 8 deletions hamilton/io/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,27 @@ def get_dataframe_metadata(df: pd.DataFrame) -> Dict[str, Any]:
- the column names
- the data types
"""
return {
DATAFRAME_METADATA: {
"rows": len(df),
"columns": len(df.columns),
"column_names": list(df.columns),
"datatypes": [str(t) for t in list(df.dtypes)], # for serialization purposes
}
}
metadata = {}
try:
metadata["rows"] = len(df)
except TypeError:
metadata["rows"] = None

try:
metadata["columns"] = len(df.columns)
except (AttributeError, TypeError):
metadata["columns"] = None

try:
metadata["column_names"] = list(df.columns)
except (AttributeError, TypeError):
metadata["column_names"] = None

try:
metadata["datatypes"] = [str(t) for t in list(df.dtypes)]
except (AttributeError, TypeError):
metadata["datatypes"] = None
return {DATAFRAME_METADATA: metadata}


def get_file_and_dataframe_metadata(path: str, df: pd.DataFrame) -> Dict[str, Any]:
Expand Down
2 changes: 2 additions & 0 deletions hamilton/plugins/h_polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def build_result(
(value,) = outputs.values() # this works because it's length 1.
if isinstance(value, pl.DataFrame): # it's a dataframe
return value
if isinstance(value, pl.LazyFrame): # it's a lazyframe
return value.collect()
elif not isinstance(value, pl.Series): # it's a single scalar/object
key, value = outputs.popitem()
return pl.DataFrame({key: [value]})
Expand Down
46 changes: 46 additions & 0 deletions hamilton/plugins/h_polars_lazyframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import Any, Dict, Type, Union

import polars as pl

from hamilton import base


class PolarsLazyFrameResult(base.ResultMixin):
"""A ResultBuilder that produces a polars dataframe.
Use this when you want to create a polars dataframe from the outputs. Caveat: you need to ensure that the length
of the outputs is the same, otherwise you will get an error; mixed outputs aren't that well handled.
To use:
.. code-block:: python
from hamilton import base, driver
from hamilton.plugins import polars_extensions
polars_builder = polars_extensions.PolarsLazyFrameResult()
adapter = base.SimplePythonGraphAdapter(polars_builder)
dr = driver.Driver(config, *modules, adapter=adapter)
df = dr.execute([...], inputs=...) # returns polars dataframe
Note: this is just a first attempt at something for Polars. Think it should handle more? Come chat/open a PR!
"""

def build_result(
self, **outputs: Dict[str, Union[pl.Series, pl.LazyFrame, Any]]
) -> pl.LazyFrame:
"""This is the method that Hamilton will call to build the final result. It will pass in the results
of the requested outputs that you passed in to the execute() method.
Note: this function could do smarter things; looking for contributions here!
:param outputs: The results of the requested outputs.
:return: a polars DataFrame.
"""
if len(outputs) == 1:
(value,) = outputs.values() # this works because it's length 1.
if isinstance(value, pl.LazyFrame): # it's a lazyframe
return value
return pl.LazyFrame(outputs)

def output_type(self) -> Type:
return pl.LazyFrame
56 changes: 35 additions & 21 deletions hamilton/plugins/polars_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ def _get_loading_kwargs(self):
kwargs["row_count_name"] = self.row_count_name
if self.row_count_offset is not None:
kwargs["row_count_offset"] = self.row_count_offset
if self.sample_size is not None:
kwargs["sample_size"] = self.sample_size
if self.eol_char is not None:
kwargs["eol_char"] = self.eol_char
if self.raise_if_empty is not None:
Expand All @@ -176,6 +174,7 @@ def _get_loading_kwargs(self):

def load_data(self, type_: Type) -> Tuple[DATAFRAME_TYPE, Dict[str, Any]]:
df = pl.read_csv(self.file, **self._get_loading_kwargs())

metadata = utils.get_file_and_dataframe_metadata(self.file, df)
return df, metadata

Expand Down Expand Up @@ -206,7 +205,7 @@ class PolarsCSVWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
Expand Down Expand Up @@ -236,15 +235,12 @@ def _get_saving_kwargs(self):
kwargs["quote_style"] = self.quote_style
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()
data.write_csv(self.file, **self._get_saving_kwargs())
return utils.get_file_and_dataframe_metadata(self.file, data)

def load_data(self, type_: Type) -> Tuple[DATAFRAME_TYPE, Dict[str, Any]]:
df = pl.read_csv(self.file, **self._get_loading_kwargs())
metadata = utils.get_file_and_dataframe_metadata(self.file, df)
return df, metadata

@classmethod
def name(cls) -> str:
return "csv"
Expand Down Expand Up @@ -330,7 +326,7 @@ class PolarsParquetWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
Expand All @@ -348,8 +344,12 @@ def _get_saving_kwargs(self):
kwargs["pyarrow_options"] = self.pyarrow_options
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()

data.write_parquet(self.file, **self._get_saving_kwargs())

return utils.get_file_and_dataframe_metadata(self.file, data)

@classmethod
Expand Down Expand Up @@ -422,15 +422,17 @@ class PolarsFeatherWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
if self.compression is not None:
kwargs["compression"] = self.compression
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()
data.write_ipc(self.file, **self._get_saving_kwargs())
return utils.get_file_and_dataframe_metadata(self.file, data)

Expand Down Expand Up @@ -484,15 +486,18 @@ class PolarsAvroWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
if self.compression is not None:
kwargs["compression"] = self.compression
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()

data.write_avro(self.file, **self._get_saving_kwargs())
return utils.get_file_and_dataframe_metadata(self.file, data)

Expand Down Expand Up @@ -547,7 +552,7 @@ class PolarsJSONWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
Expand All @@ -557,7 +562,10 @@ def _get_saving_kwargs(self):
kwargs["row_oriented"] = self.row_oriented
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()

data.write_json(self.file, **self._get_saving_kwargs())
return utils.get_file_and_dataframe_metadata(self.file, data)

Expand Down Expand Up @@ -665,7 +673,7 @@ class PolarsSpreadsheetWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
Expand Down Expand Up @@ -713,7 +721,10 @@ def _get_saving_kwargs(self):
kwargs["freeze_panes"] = self.freeze_panes
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()

data.write_excel(self.workbook, self.worksheet, **self._get_saving_kwargs())
return utils.get_file_and_dataframe_metadata(self.workbook, data)

Expand Down Expand Up @@ -782,7 +793,7 @@ class PolarsDatabaseWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
Expand All @@ -792,7 +803,10 @@ def _get_saving_kwargs(self):
kwargs["engine"] = self.engine
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()

data.write_database(
table_name=self.table_name,
connection=self.connection,
Expand Down
Loading

0 comments on commit 39ce9e0

Please sign in to comment.