diff --git a/daft/expressions/expressions.py b/daft/expressions/expressions.py index 9f5085ac3a..7ff129232a 100644 --- a/daft/expressions/expressions.py +++ b/daft/expressions/expressions.py @@ -441,7 +441,9 @@ def __lshift__(self, other: Expression) -> Expression: def __rshift__(self, other: Expression) -> Expression: """Shifts the bits of an integer expression to the right (``e1 >> e2``) + .. NOTE:: + For unsigned integers, this expression perform a logical right shift. For signed integers, this expression perform an arithmetic right shift. @@ -676,6 +678,7 @@ def shift_left(self, other: Expression) -> Expression: def shift_right(self, other: Expression) -> Expression: """Shifts the bits of an integer expression to the right (``expr >> other``) + .. NOTE:: For unsigned integers, this expression perform a logical right shift. For signed integers, this expression perform an arithmetic right shift. diff --git a/docs/source/api_docs/dataframe.rst b/docs/source/api_docs/dataframe.rst index 38a623798c..077590c0c0 100644 --- a/docs/source/api_docs/dataframe.rst +++ b/docs/source/api_docs/dataframe.rst @@ -73,6 +73,7 @@ Reordering DataFrame.sort DataFrame.repartition + DataFrame.into_partitions Combining ********* diff --git a/docs/source/migration_guides/coming_from_dask.rst b/docs/source/migration_guides/coming_from_dask.rst index f1df5b8a21..f8c4551820 100644 --- a/docs/source/migration_guides/coming_from_dask.rst +++ b/docs/source/migration_guides/coming_from_dask.rst @@ -32,20 +32,20 @@ Dask aims for as much feature-parity with pandas as possible, including maintain Daft drops the need for an Index to make queries more readable and consistent. How you write a query should not change because of the state of an index or a reset_index call. In our opinion, eliminating the index makes things simpler, more explicit, more readable and therefore less error-prone. Daft achieves this by using the [Expressions API](/user_guide/basic_concepts/expressions.rst). -In Dask you would index your DataFrame to return row `b` as follows: +In Dask you would index your DataFrame to return row ``b`` as follows: -`ddf.loc[[“b”]]` +``ddf.loc[[“b”]]`` -In Daft, you would accomplish the same by using a `col` Expression to refer to the column that contains `b`: +In Daft, you would accomplish the same by using a ``col`` Expression to refer to the column that contains ``b``: -`df.where(daft.col(“alpha”)==”b”)` +``df.where(daft.col(“alpha”)==”b”)`` More about Expressions in the sections below. Daft does not try to copy the pandas syntax ------------------------------------------- -Dask is built as a parallelizable version of pandas and Dask partitions are in fact pandas DataFrames. When you call a Dask function you are often applying a pandas function on each partition. This makes Dask relatively easy to learn for people familiar with pandas, but it also causes complications when pandas logic (built for sequential processing) does not translate well to a distributed context. When reading the documentation, Dask users will often encounter this phrase `“This docstring was copied from pandas.core… Some inconsistencies with the Dask version may exist.”` It is often unclear what these inconsistencies are and how they might affect performance. +Dask is built as a parallelizable version of pandas and Dask partitions are in fact pandas DataFrames. When you call a Dask function you are often applying a pandas function on each partition. This makes Dask relatively easy to learn for people familiar with pandas, but it also causes complications when pandas logic (built for sequential processing) does not translate well to a distributed context. When reading the documentation, Dask users will often encounter this phrase ``“This docstring was copied from pandas.core… Some inconsistencies with the Dask version may exist.”`` It is often unclear what these inconsistencies are and how they might affect performance. Daft does not try to copy the pandas syntax. Instead, we believe that efficiency is best achieved by defining logic specifically for the unique challenges of distributed computing. This means that we trade a slightly higher learning curve for pandas users against improved performance and more clarity for the developer experience. @@ -68,13 +68,14 @@ Dask currently does not support full-featured query optimization. Daft uses Expressions and UDFs to perform computations in parallel ------------------------------------------------------------------ -Dask provides a `map_partitions` method to map computations over the partitions in your DataFrame. Since Dask partitions are pandas DataFrames, you can pass pandas functions to `map_partitions`. You can also map arbitrary Python functions over Dask partitions using `map_partitions`. +Dask provides a ``map_partitions`` method to map computations over the partitions in your DataFrame. Since Dask partitions are pandas DataFrames, you can pass pandas functions to ``map_partitions``. You can also map arbitrary Python functions over Dask partitions using `map_partitions`. For example: .. code:: python + def my_function(**kwargs): - return … + return ... res = ddf.map_partitions(my_function, **kwargs) @@ -84,7 +85,7 @@ Daft implements two APIs for mapping computations over the data in your DataFram .. code:: python # Add 1 to each element in column "A" - df = df.with_column("A_add_one", daft.col(“A”) + 1) + df = df.with_column("A_add_one", daft.col("A") + 1) You can use User-Defined Functions (UDFs) to run computations over multiple rows or columns: @@ -94,12 +95,11 @@ You can use User-Defined Functions (UDFs) to run computations over multiple rows # apply a custom function “crop_image” to the image column @daft.udf(...) def crop_image(**kwargs): - … - return … + return ... df = df.with_column( "cropped", - crop_image(daft.col(“image”), **kwargs), + crop_image(daft.col("image"), **kwargs), ) diff --git a/docs/source/user_guide/integrations/delta_lake.rst b/docs/source/user_guide/integrations/delta_lake.rst index 777d7c4d51..f27b42547d 100644 --- a/docs/source/user_guide/integrations/delta_lake.rst +++ b/docs/source/user_guide/integrations/delta_lake.rst @@ -5,7 +5,7 @@ Delta Lake Daft currently supports: -1. **Parallel + Distributed Reads:** Daft parallelizes Delta Lake table reads over all cores of your machine, if using the default multithreading runner, or all cores + machines of your Ray cluster, if using the :ref:`distributed Ray runner `. +1. **Parallel + Distributed Reads:** Daft parallelizes Delta Lake table reads over all cores of your machine, if using the default multithreading runner, or all cores + machines of your Ray cluster, if using the :ref:`distributed Ray runner `. 2. **Skipping Filtered Data:** Daft ensures that only data that matches your :meth:`df.where(...) ` filter will be read, often skipping entire files/partitions. 3. **Multi-cloud Support:** Daft supports reading Delta Lake tables from AWS S3, Azure Blob Store, and GCS, as well as local files. diff --git a/docs/source/user_guide/integrations/hudi.rst b/docs/source/user_guide/integrations/hudi.rst index 433410eec2..59f903863e 100644 --- a/docs/source/user_guide/integrations/hudi.rst +++ b/docs/source/user_guide/integrations/hudi.rst @@ -5,7 +5,7 @@ Apache Hudi Daft currently supports: -1. **Parallel + Distributed Reads:** Daft parallelizes Hudi table reads over all cores of your machine, if using the default multithreading runner, or all cores + machines of your Ray cluster, if using the :ref:`distributed Ray runner `. +1. **Parallel + Distributed Reads:** Daft parallelizes Hudi table reads over all cores of your machine, if using the default multithreading runner, or all cores + machines of your Ray cluster, if using the :ref:`distributed Ray runner `. 2. **Skipping Filtered Data:** Daft ensures that only data that matches your :meth:`df.where(...) ` filter will be read, often skipping entire files/partitions. 3. **Multi-cloud Support:** Daft supports reading Hudi tables from AWS S3, Azure Blob Store, and GCS, as well as local files. diff --git a/docs/source/user_guide/integrations/huggingface.rst b/docs/source/user_guide/integrations/huggingface.rst index 547f5ed856..3e3d22a553 100644 --- a/docs/source/user_guide/integrations/huggingface.rst +++ b/docs/source/user_guide/integrations/huggingface.rst @@ -1,5 +1,5 @@ Huggingface Datasets -=========== +==================== Daft is able to read datasets directly from Huggingface via the ``hf://`` protocol. diff --git a/docs/source/user_guide/integrations/microsoft-azure.rst b/docs/source/user_guide/integrations/microsoft-azure.rst index 20939e6db8..17385f825f 100644 --- a/docs/source/user_guide/integrations/microsoft-azure.rst +++ b/docs/source/user_guide/integrations/microsoft-azure.rst @@ -64,7 +64,7 @@ pass a different :class:`daft.io.AzureConfig` per function call if you wish! df2 = daft.read_csv("az://my_container/my_other_path/**/*", io_config=io_config) Connect to Microsoft Fabric/OneLake -**************************** +*********************************** If you are connecting to storage in OneLake or another Microsoft Fabric service, set the `use_fabric_endpoint` parameter to ``True`` in the :class:`daft.io.AzureConfig` object. diff --git a/docs/source/user_guide/integrations/sql.rst b/docs/source/user_guide/integrations/sql.rst index 6a679a1ae6..485c5e8d30 100644 --- a/docs/source/user_guide/integrations/sql.rst +++ b/docs/source/user_guide/integrations/sql.rst @@ -6,7 +6,7 @@ You can read the results of SQL queries from databases, data warehouses, and que Daft currently supports: 1. **20+ SQL Dialects:** Daft supports over 20 databases, data warehouses, and query engines by using `SQLGlot `_ to convert SQL queries across dialects. See the full list of supported dialects `here `__. -2. **Parallel + Distributed Reads:** Daft parallelizes SQL reads by using all local machine cores with its default multithreading runner, or all cores across multiple machines if using the :ref:`distributed Ray runner `. +2. **Parallel + Distributed Reads:** Daft parallelizes SQL reads by using all local machine cores with its default multithreading runner, or all cores across multiple machines if using the :ref:`distributed Ray runner `. 3. **Skipping Filtered Data:** Daft ensures that only data that matches your :meth:`df.select(...) `, :meth:`df.limit(...) `, and :meth:`df.where(...) ` expressions will be read, often skipping entire partitions/columns. Installing Daft with SQL Support @@ -77,7 +77,7 @@ You can also directly provide a SQL alchemy connection via a **connection factor Parallel + Distributed Reads **************************** -For large datasets, Daft can parallelize SQL reads by using all local machine cores with its default multithreading runner, or all cores across multiple machines if using the :ref:`distributed Ray runner `. +For large datasets, Daft can parallelize SQL reads by using all local machine cores with its default multithreading runner, or all cores across multiple machines if using the :ref:`distributed Ray runner `. Supply the :meth:`daft.read_sql` function with a **partition column** and optionally the **number of partitions** to enable parallel reads. diff --git a/docs/source/user_guide/poweruser/distributed-computing.rst b/docs/source/user_guide/poweruser/distributed-computing.rst index f3b93f3a1f..4950007823 100644 --- a/docs/source/user_guide/poweruser/distributed-computing.rst +++ b/docs/source/user_guide/poweruser/distributed-computing.rst @@ -1,3 +1,5 @@ +.. _distributed_computing: + Distributed Computing ===================== diff --git a/docs/source/user_guide/poweruser/partitioning.rst b/docs/source/user_guide/poweruser/partitioning.rst index b76ddd54a5..aaf0114c4a 100644 --- a/docs/source/user_guide/poweruser/partitioning.rst +++ b/docs/source/user_guide/poweruser/partitioning.rst @@ -1,6 +1,116 @@ Partitioning ============ -This guide is a Work-In-Progress! +Daft is a **distributed** dataframe. This means internally, data is represented as partitions which are then spread out across your system -Please comment on this `Github issue `_ if you wish to expedite the filling out of this guide. +Why do we need partitions? +-------------------------- + +When running in a distributed settings (a cluster of machines), Daft spreads your dataframe's data across these machines. This means that your +workload is able to efficiently utilize all the resources in your cluster because each machine is able to work on its assigned partition(s) independently. + +Additionally, certain global operations in a distributed setting requires data to be partitioned in a specific way for the operation to be correct, because +all the data matching a certain criteria needs to be on the same machine and in the same partition. For example, in a groupby-aggregation Daft needs to bring +together all the data for a given key into the same partition before it can perform a definitive local groupby-aggregation which is then globally correct. +Daft refers to this as a "clustering specification", and you are able to see this in the plans that it constructs as well. + +.. NOTE:: + When running locally on just a single machine, Daft is currently still using partitioning as well. This is still useful for + controlling parallelism and how much data is being materialized at a time. + + However, Daft's new experimental execution engine will remove the concept of partitioning entirely for local execution. + You may enable it with ``DAFT_ENABLE_NATIVE_EXECUTOR=1``. Instead of using partitioning to control parallelism, + this new execution engine performs a streaming-based execution on small "morsels" of data, which provides much + more stable memory utilization while improving the user experience with not having to worry about partitioning. + +This user guide helps you think about how to correctly partition your data to improve performance as well as memory stability in Daft. + +General rule of thumb: + +1. **Have Enough Partitions**: our general recommendation for high throughput and maximal resource utilization is to have *at least* ``2 x TOTAL_NUM_CPUS`` partitions, which allows Daft to fully saturate your CPUs. +2. **More Partitions**: if you are observing memory issues (excessive spilling or out-of-memory (OOM) issues) then you may choose to increase the number of partitions. This increases the amount of overhead in your system, but improves overall memory stability (since each partition will be smaller). +3. **Fewer Partitions**: if you are observing a large amount of overhead (e.g. if you observe that shuffle operations such as joins and sorts are taking too much time), then you may choose to decrease the number of partitions. This decreases the amount of overhead in the system, at the cost of using more memory (since each partition will be larger). + +.. seealso:: + :doc:`./memory` - a guide for dealing with memory issues when using Daft + +How is my data partitioned? +--------------------------- + +Daft will automatically use certain heuristics to determine the number of partitions for you when you create a DataFrame. When reading data from files (e.g. Parquet, CSV or JSON), Daft will group small files/split large files appropriately +into nicely-sized partitions based on their estimated in-memory data sizes. + +To interrogate the partitioning of your current DataFrame, you may use the :meth:`df.explain(show_all=True) ` method. Here is an example output from a simple +``df = daft.read_parquet(...)`` call on a fairly large number of Parquet files. + +.. code::python + + import daft + + df = daft.read_parquet("s3://bucket/path_to_100_parquet_files/**") + df.explain(show_all=True) + +.. code:: + + == Unoptimized Logical Plan == + + * GlobScanOperator + | Glob paths = [s3://bucket/path_to_100_parquet_files/**] + | ... + + + ... + + + == Physical Plan == + + * TabularScan: + | Num Scan Tasks = 3 + | Estimated Scan Bytes = 72000000 + | Clustering spec = { Num partitions = 3 } + | ... + +In the above example, the call to ``df.read_parquet`` read 100 Parquet files, but the Physical Plan indicates that Daft will only create 3 partitions. This is because these files are quite small (in this example, totalling about 72MB of data) and Daft recognizes that it should be able to read them as just 3 partitions, each with about 33 files each! + +How can I change the way my data is partitioned? +------------------------------------------------ + +You can change the way your data is partitioned by leveraging certain DataFrame methods: + +1. :meth:`daft.DataFrame.repartition`: repartitions your data into `N` partitions by performing a hash-bucketing that ensure that all data with the same values for the specified columns ends up in the same partition. Expensive, requires data movement between partitions and machines. +2. :meth:`daft.DataFrame.into_partitions`: splits or coalesces adjacent partitions to meet the specified target number of total partitions. This is less expensive than a call to ``df.repartition`` because it does not require shuffling or moving data between partitions. +3. Many global dataframe operations such as :meth:`daft.DataFrame.join`, :meth:`daft.DataFrame.sort` and :meth:`daft.GroupedDataframe.agg` will change the partitioning of your data. This is because they require shuffling data between partitions to be globally correct. + +Note that many of these methods will change both the *number of partitions* as well as the *clustering specification* of the new partitioning. For example, when calling ``df.repartition(8, col("x"))``, the resultant dataframe will now have 8 partitions in total with the additional guarantee that all rows with the same value of ``col("x")`` are in the same partition! This is called "hash partitioning". + +.. code::python + + df = df.repartition(8, daft.col("x")) + df.explain(show_all=True) + +.. code:: + + == Unoptimized Logical Plan == + + * Repartition: Scheme = Hash + | Num partitions = Some(8) + | By = col(x) + | + * GlobScanOperator + | Glob paths = [s3://bucket/path_to_1000_parquet_files/**] + | ... + + ... + + == Physical Plan == + + * ReduceMerge + | + * FanoutByHash: 8 + | Partition by = col(url) + | + * TabularScan: + | Num Scan Tasks = 3 + | Estimated Scan Bytes = 72000000 + | Clustering spec = { Num partitions = 3 } + | ...