Skip to content

Conversation

@ueshin
Copy link
Member

@ueshin ueshin commented Sep 6, 2017

What changes were proposed in this pull request?

This pr introduces vectorized UDFs in Python.
Note that this pr should focus on APIs for vectorized UDFs, not APIs for vectorized UDAFs or Window operations.

Proposed API

We introduce a @pandas_udf decorator (or annotation) to define vectorized UDFs which takes one or more pandas.Series or one integer value meaning the length of the input value for 0-parameter UDFs. The return value should be pandas.Series of the specified type and the length of the returned value should be the same as input value.

We can define vectorized UDFs as:

  @pandas_udf(DoubleType())
  def plus(v1, v2):
      return v1 + v2

or we can define as:

  plus = pandas_udf(lambda v1, v2: v1 + v2, DoubleType())

We can use it similar to row-by-row UDFs:

  df.withColumn('sum', plus(df.v1, df.v2))

As for 0-parameter UDFs, we can define and use as:

  @pandas_udf(LongType())
  def f0(size):
      return pd.Series(1).repeat(size)

  df.select(f0())

How was this patch tested?

Added tests and existing tests.

TBD

  • the way to specify the size hint to 0-parameter UDF (or for more parameter UDFs to be consistent)

@SparkQA
Copy link

SparkQA commented Sep 6, 2017

Test build #81452 has finished for PR 19147 at commit a1e4f62.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private val accumulator = funcs.head.funcs.head.accumulator

// todo: return column batch?
def compute(
Copy link
Contributor

@icexelloss icexelloss Sep 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class duplicates quite a bit of logic of PythonRDD. I think the only difference is how they serialize/deserialize data (non-arrow vs arrow). @ueshin @BryanCutler what's your thought on refactoring this and PythonRDD?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you that we should refactor PythonRunners, but I'm not sure what you mean by refactoring PythonRDD? I think it's already simple enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I meant PythonRunner in PythonRDD.scala

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is a lot of duplicated code from PythonRunner that could be refactored. I'm guessing you did not use the existing code because of the Arrow stream format? While I would love to start using that in Spark, I think it would be better to do this at a later time when the required code could be refactored and the Arrow stream format could replace where we currently use the file format.

Also, the good part about using the iterator based file format is each iteration can allow Python to communicate back an error code and exit gracefully. In my own tests with the streaming format if an error occurred after the stream had started, Spark could lock up in a waiting state. These are the reasons I did not use the streaming format in my implementation. Would this VectorizedPythonRunner be able to handle these types of errors?

Copy link
Member Author

@ueshin ueshin Sep 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@icexelloss Ah, I see, thanks! I still agree with refactoring PythonRunner.

@BryanCutler As for the error, do you mean the case like test_vectorized_udf_exception? If not, could you please let me know the case and think about it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to the protocol between Scala and Python that is changed here and could act differently under some circumstances. Here is the behavior of the PythonRunner protocol and VectorizedPythonRunner protocol that you introduce here:

PythonRunner
Data blocks are framed by a special length integer. Scala reads each data block one at a time and checks the length code. If the code is a PythonException, the error is read from Python and a SparkException is thrown with that being the cause.

VectorizedPythonRunner
A data stream is opened in Scala with ArrowStreamReader and batches are transferred until ArrowStreamReader returns False indicating there is no more data. Only at this point are the special length codes checked to handle an error from Python.

This behavior change would probably only cause problems if things are not working normally. For example, what would happen if pyarrow was not installed on an executor? With PythonRunner the ImportError would cause a PythonException to be transferred and thrown in Scala. In VectorizedPythonRunner I believe the ArrowStreamReader would try to read the special length code and then fail somewhere internally to Arrow, not showing the ImportError.

My point was that this type of behavior change should probably be implemented in a separate JIRA where we could make sure to handle all of these cases.

private def loadNextBatch(): Boolean = {
batchLoaded = reader.loadNextBatch()
if (batchLoaded) {
val batch = new ColumnarBatch(schema, vectors, root.getRowCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A side note: How is performance of ColumnarBatch in terms of converting to a Iterator[InternalRow]? As far as I remember it doesn't return unsafe row at the moment, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, it doesn't return unsafe row.
But I believe it's performant enough because it can return values in row directly from column vectors without copying to unsafe row.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, does this mean it's more performant than copying the column vectors into unsafe row? Because to get any value out, it has to access the memory region of column vectors any way, therefore copying bytes from column vectors into unsafe row don't improve performance?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why copying bytes from column vectors into unsafe row can improve performance? Isn't direct access the data from the column vectors faster without the cost of copying bytes?

@SparkQA
Copy link

SparkQA commented Sep 6, 2017

Test build #81453 has finished for PR 19147 at commit 84d2767.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

relaying my questions from the dev@ thread:
+1 for type in string form.

Would it be correct to assume there will be data type check, for example the returned pandas data frame column data types match what are specified. We have seen quite a bit of issues/confusions with that in R.

Would it make sense to have a more generic decorator name so that it could also be useable for other efficient vectorized format in the future? Or do we anticipate the decorator to be format specific and will have more in the future?

@ueshin
Copy link
Member Author

ueshin commented Sep 7, 2017

@felixcheung Thank you for your comment.
We already support data type in string form. I'll add a test to confirm it.

As for decorator name, we can have a more generic decorator name but we need the format name or something to know the format anyway to know what users want, and also what if some format needs additional options. IMO, it's ok to have the format specific decorators for users to understand each format spec.

@SparkQA
Copy link

SparkQA commented Sep 7, 2017

Test build #81516 has finished for PR 19147 at commit 3a0d4a6.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 7, 2017

Test build #81519 has finished for PR 19147 at commit 2f929d8.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member Author

ueshin commented Sep 8, 2017

The test failure above should be fixed by #19158.

@ueshin
Copy link
Member Author

ueshin commented Sep 8, 2017

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Sep 8, 2017

Test build #81538 has finished for PR 19147 at commit 2f929d8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Sep 8, 2017

Test build #81545 has finished for PR 19147 at commit 2f929d8.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

with self.assertRaisesRegexp(
Exception,
'The length of returned value should be the same as input value'):
df.select(raise_exception()).collect()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a test for mixing udf and vectorized udf?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add a test.

* there should be always some rows buffered in the socket or Python process, so the pulling from
* RowQueue ALWAYS happened after pushing into it.
*/
case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename BatchEvalPythonExec as it is not just for batched python udfs now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about BlockedEvalPythonExec or something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it is better than BatchEvalPythonExec. Don't know if others have any suggestion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Let's see if others have any suggestions for a while.

Copy link
Member

@HyukjinKwon HyukjinKwon Sep 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlockedEvalPythonExec sounds better to me too but I think I don't have a strong preference ..

@SparkQA
Copy link

SparkQA commented Sep 11, 2017

Test build #81621 has finished for PR 19147 at commit dbc6dd2.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Sep 11, 2017

Test build #81629 has finished for PR 19147 at commit dbc6dd2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 11, 2017

Test build #81635 has finished for PR 19147 at commit 803054e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

return "UTF8Deserializer(%s)" % self.use_unicode


class VectorizedSerializer(Serializer):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrowVectorizedSerializer?

if _have_pandas and _have_arrow:

@since(2.3)
def pandas_udf(f=None, returnType=StringType()):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of hiding pandas_udf when no pandas and arrow installed, should we throw a exception if users without pandas and arrow try to use it?

.. note:: The vectorized user-defined functions must be deterministic. Due to optimization,
duplicate invocations may be eliminated or the function may even be invoked more times
than it is present in the query.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we explain more about what the vectorized UDF is and its expected input parameters and outputs?

@ueshin
Copy link
Member Author

ueshin commented Sep 22, 2017

I'd close this in favor of #18659.

@ueshin ueshin closed this Sep 22, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants