Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

This PR introduce a new API: Python dataset. Conceptually it's a combination of Python DataFrame and Python RDD, supports both typed operations(e.g. map, flatMap, filter, etc.) and untyped operations(e.g. select, sort, etc.). This is a simpler version of #11117, without the aggregate part.

There are 3 ways(can be more) to add the Dataset API to python:

  • Extending DataFrame to support typed operations and alias it with Dataset
# conceptually there is only DataFrame, but users can call it Dataset if they like
df = sqlContext.createDataFrame(...)
df2 = df.map(...)
df3 = df2.select(...)
df4 = df2.applySchema(...)
  • Renaming DataFrame to Dataset, and alias Dataset with DataFrame for compatibility(so conceptually we only have Dataset). We also need to rename a lot of related stuff like createDataFrame, some documents, etc.
# conceptually there is only Dataset, but previous users can still use DataFrame
# for compatibility
ds = sqlContext.createDataset(...)
ds2 = ds.map(...)
ds3 = ds2.select(...)
ds4 = ds2.applySchema(...)
  • Add typed operations in DataFrame and make Dataset extend DataFrame. Typed operations will always return Dataset, structured operations will always return DataFrame, and Dataset has an extra API called applySchema to turn it into DataFrame.
# We have both DataFrame and Dataset in concept, while DataFrame always has row as
# element, Dataset always has custom object as element
df = sqlContext.createDataFrame(...)
ds = df.map(...)
df2 = ds.select(...)
df3 = ds.applySchema(...)

This PR choose the first one.

How was this patch tested?

new tests are added in pyspark/sql/tests.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm quite confusing about where to put the python plans.

For the scala ones, we have a object.scala under org.apache.spark.sql.catalyst.plans.logical package which contains all encoder related logical plans, and we have a objects.scala under org.apache.spark.sql.execution package which contains all encoder related physical plans.

For the python ones, current we put all of them under org.apache.spark.sql.execution.python package, including logical plans, physical plans, rules, etc.

cc @rxin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.spark.sql.execution.python is probably ok, since there is no concept of python in catalyst itself.

@SparkQA
Copy link

SparkQA commented Feb 24, 2016

Test build #51878 has finished for PR 11347 at commit a073f83.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 24, 2016

Test build #51880 has finished for PR 11347 at commit f29ed29.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Dataset(object):
    • class PipelinedDataset(Dataset):
    • case class PythonMapPartitions(
    • case class PythonMapPartitions(

@SparkQA
Copy link

SparkQA commented Feb 25, 2016

Test build #51924 has finished for PR 11347 at commit 32a04de.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 25, 2016

Test build #51925 has finished for PR 11347 at commit e549d48.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 25, 2016

Test build #51926 has finished for PR 11347 at commit e549d48.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 25, 2016

Test build #51975 has finished for PR 11347 at commit 22ef406.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 26, 2016

Test build #52009 has finished for PR 11347 at commit effac22.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan changed the title [SPARK-13233][SQL][WIP] Python Dataset (basic version) [SPARK-13233][SQL] Python Dataset (basic version) Feb 26, 2016
@SparkQA
Copy link

SparkQA commented Feb 26, 2016

Test build #52018 has finished for PR 11347 at commit 60ea3d3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

cc @yhuai

@rxin
Copy link
Contributor

rxin commented Feb 26, 2016

cc @davies too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a test for backward compatibility?

@rxin
Copy link
Contributor

rxin commented Feb 26, 2016

@cloud-fan can we break this into multiple patches? I find the size a bit hard to review.

Also maybe let's not do the renaming for now, because it might make more sense in Python to have DataFrame be the main class, and Dataset just be an alias (there is no practical difference in Python), since Python users are more familiar with data frames.

@rxin
Copy link
Contributor

rxin commented Feb 26, 2016

We should also add tests for the compatibility methods.

@SparkQA
Copy link

SparkQA commented Feb 26, 2016

Test build #52039 has finished for PR 11347 at commit c178beb.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 26, 2016

Test build #52041 has finished for PR 11347 at commit 7da3ffc.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 26, 2016

Test build #52052 has finished for PR 11347 at commit adb2aa9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Feb 26, 2016

@cloud-fan Could you update the description of PR to say which approach is take in this PR, thanks!

# If the underlying java DataFrame's output is pickled, which means the query
# engine don't know the real schema of the data and just keep the pickled binary
# for each custom object(no batch). So we need to use non-batched serializer here.
deserializer = PickleSerializer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overhead of PickleSerializer is pretty high, it will serialize the class for each row, could you do some benchmark to see how is the difference between non-batched vs batched (both size and CPU time)?

@davies
Copy link
Contributor

davies commented Feb 26, 2016

In Scala, it's clear that DataFrame is Dataset[Row], and some of function could work with DataFrame, some may not, compiler could check the types.

But in Python, it's confusing to me, sometimes the record is Row object, sometimes the record is just arbitrary object (for example, int). Especially when we create a new DataFrame, for example, range() or text(), these will return an DataFrame of Row or DataFrame of int/string?

Before this PR, it's clear that Python DataFrame always has Row with known schema with it. df.rdd or df.map will return an RDD, which could have arbitrary object in it. Will it make sense to have Dataset to replace RDD for DataFrame, to replace DataFrame?

for example:

df.rdd returns an RDD
df.ds returns a Dataset
df.map() return a Dataset

@SparkQA
Copy link

SparkQA commented Feb 29, 2016

Test build #52168 has finished for PR 11347 at commit 9beffc6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

Hi @davies
I did a simple benchmark for pickle serializer with 1 million (int, string) rows. The execution time only includes serialize results at python side and send bytes to JVM side, the result is: batched serializer is about 2.67 times faster and serialized size is about 30% smaller.

I think this is a problem. We should find another way to overcome the issue that binary data at JVM side has wrong row count, so that we can still use batched serializer.

@davies
Copy link
Contributor

davies commented Feb 29, 2016

I think we had solved this problem in RDD, you could take that as an example.

# planner should not crash without a join
broadcast(df1)._jdf.queryExecution().executedPlan()

def test_basic_typed_operations(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it will be easier to reason about test failures if we break it to multiple defs?

@cloud-fan cloud-fan closed this Mar 1, 2016
@cloud-fan
Copy link
Contributor Author

Will send a simpler version that just use RDD.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants