-
-
Notifications
You must be signed in to change notification settings - Fork 2.2k
feat(rust,python): Add GPU support to the LazyFrame profiler #20693
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #20693 +/- ##
==========================================
- Coverage 79.99% 79.98% -0.02%
==========================================
Files 1598 1598
Lines 229199 229285 +86
Branches 2620 2623 +3
==========================================
+ Hits 183352 183387 +35
- Misses 45248 45297 +49
- Partials 599 601 +2 ☔ View full report in Codecov by Sentry. |
) -> PyResult<(PyDataFrame, PyDataFrame)> { | ||
// if we don't allow threads and we have udfs trying to acquire the gil from different | ||
// threads we deadlock. | ||
let (df, time_df) = py.allow_threads(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use the enter_polars
which handles the allow_threads
.
@@ -1706,6 +1708,30 @@ def profile( | |||
│ group_by_partitioned(a) ┆ 5 ┆ 470 │ | |||
│ sort(a) ┆ 475 ┆ 1964 │ | |||
└─────────────────────────┴───────┴──────┘) | |||
>>> lf.group_by("a", maintain_order=True).agg(pl.all().sum()).sort("a").profile( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can we make this query multiline? Something like:
(
lf.group_by("a", maintain_order=True)
.agg(
pl.all().sum()
).sort("a")
.profile
)
let ldf = self.ldf.clone(); | ||
if let Some(lambda) = lambda_post_opt { | ||
ldf._profile_post_opt(|root, lp_arena, expr_arena| { | ||
Python::with_gil(|py| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this code is exactly the same as in collect. Can we put it in a function?
@@ -14,6 +15,39 @@ pub(crate) struct PythonScanExec { | |||
pub(crate) predicate_serialized: Option<Vec<u8>>, | |||
} | |||
|
|||
#[pyclass] | |||
pub struct PyNodeTimer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is needed to leak those internals. See comment: crates/polars-mem-engine/src/executors/scan/python_scan.rs
) { | ||
let generator_init = if matches!(self.options.python_source, PythonScanSource::Cuda) { | ||
let py_node_timer = PyNodeTimer::new(state.node_timer.clone()); | ||
let args = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of leaking our timer nodes which, I really don't like as now we cannot refactor as CuDF is dependent on internals other than our DSL.
Instead the python callable
could accept an argument profile: bool
and if set return a list of timing tuples:
[(operation: str,, start: time, end: time)]
We can then unpack those tuples here and update the NodeTime
accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @ritchie46. There might be a casting problem with this approach. Is there a way to convert a u64
to a std::time::Instant
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of storing the Instant
s we could convert them to u64
immediately in the store
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once I time the node in cuDF, I'll get two u64
values. The store method of NodeTimer takes Instant
s. I dont think there's a way to convert a u64
to Instant
. Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can bypass the store method for GPU. And then store u64s instead of Instants.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm giving your suggestion a try @ritchie46
- How would I unpack the return call? I have some thing like
let args = (
python_scan_function,
with_columns.map(|x| x.into_iter().map(|x| x.to_string()).collect::<Vec<_>>()),
predicate,
n_rows,
true,
);
callable.call1(args).map_err(to_compute_err) <--- returns tuple[DataFrame, tuple[str, int, int]] in python
- How would I bypass the store method which relies upon
std::time::Instant
s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use pyo3 to access the items of a tuple and then extract
the proper types. (See later in this file how we can get the DataFrame
).
How would I bypass the store method which relies upon std::time::Instants?
Make a store_raw
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: I'm able to extract the items from the PyTuple
returned from the python callback. I also added the raw_store
to NodeTimer
. Here's what profiling with the GPU looks like so far:
In [1]: import polars as pl
In [2]: lf = pl.LazyFrame(
...:
...: {
...:
...: "a": ["a", "b", "a", "b", "b", "c"],
...:
...: "b": [1, 2, 3, 4, 5, 6],
...:
...: "c": [6, 5, 4, 3, 2, 1],
...:
...: }
...:
...: )
...:
...: lf.group_by("a", maintain_order=True).agg(pl.all().sum()).sort(
...:
...: "a"
...:
...: ).profile(engine="gpu")
Out[2]:
(shape: (3, 3)
┌─────┬─────┬─────┐
│ a ┆ b ┆ c │
│ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ i64 │
╞═════╪═════╪═════╡
│ a ┆ 4 ┆ 10 │
│ b ┆ 11 ┆ 10 │
│ c ┆ 6 ┆ 1 │
└─────┴─────┴─────┘,
shape: (2, 3)
┌──────────────┬──────────────────┬──────────────────┐
│ node ┆ start ┆ end │
│ --- ┆ --- ┆ --- │
│ str ┆ u64 ┆ u64 │
╞══════════════╪══════════════════╪══════════════════╡
│ optimization ┆ 0 ┆ 9591759057601683 │
│ sort ┆ 9591759057601683 ┆ 9591759094456985 │
└──────────────┴──────────────────┴──────────────────┘)
***The times are in nanoseconds not microseconds
There are a couple of problems with the result that I'm encountering.
- The
NodeTimer
keeps aquery_start
attribute (astd::time::Instant
) which I cannot use to adjust all of the rawu64
times. Do you have any ideas on capturing the start time and having it available inNodeTimer::finalize
? - If you look at the output above, only the final
sort
operation is being captured. There's probably a copy happening somewhere so that I'm not updating the sameNodeTimer
object each time I callstore_raw
. I'm surprised because the execution state is mutable. Do I need some sort of globalNodeTimer
to resolve this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried an alternative approach in #21534.
…port-gpu-lf-profiler
Closing in favor of #21534. Thanks for your help on this @ritchie46! |
Closes #20039