Skip to content

Commit 110fb8b

Browse files
daviesJoshRosen
authored andcommitted
[SPARK-2334] fix AttributeError when call PipelineRDD.id()
The underline JavaRDD for PipelineRDD is created lazily, it's delayed until call _jrdd. The id of JavaRDD is cached as `_id`, it saves a RPC call in py4j for later calls. closes apache#1276 Author: Davies Liu <[email protected]> Closes apache#2296 from davies/id and squashes the following commits: e197958 [Davies Liu] fix style 9721716 [Davies Liu] fix id of PipelineRDD
1 parent 21a1e1b commit 110fb8b

File tree

3 files changed

+20
-4
lines changed

3 files changed

+20
-4
lines changed

python/pyspark/rdd.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2075,6 +2075,7 @@ def pipeline_func(split, iterator):
20752075
self.ctx = prev.ctx
20762076
self.prev = prev
20772077
self._jrdd_val = None
2078+
self._id = None
20782079
self._jrdd_deserializer = self.ctx.serializer
20792080
self._bypass_serializer = False
20802081
self._partitionFunc = prev._partitionFunc if self.preservesPartitioning else None
@@ -2105,6 +2106,11 @@ def _jrdd(self):
21052106
self._jrdd_val = python_rdd.asJavaRDD()
21062107
return self._jrdd_val
21072108

2109+
def id(self):
2110+
if self._id is None:
2111+
self._id = self._jrdd.id()
2112+
return self._id
2113+
21082114
def _is_pipelinable(self):
21092115
return not (self.is_cached or self.is_checkpointed)
21102116

python/pyspark/sql.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1525,7 +1525,7 @@ def __init__(self, jschema_rdd, sql_ctx):
15251525
self.sql_ctx = sql_ctx
15261526
self._sc = sql_ctx._sc
15271527
self._jschema_rdd = jschema_rdd
1528-
1528+
self._id = None
15291529
self.is_cached = False
15301530
self.is_checkpointed = False
15311531
self.ctx = self.sql_ctx._sc
@@ -1543,9 +1543,10 @@ def _jrdd(self):
15431543
self._lazy_jrdd = self._jschema_rdd.javaToPython()
15441544
return self._lazy_jrdd
15451545

1546-
@property
1547-
def _id(self):
1548-
return self._jrdd.id()
1546+
def id(self):
1547+
if self._id is None:
1548+
self._id = self._jrdd.id()
1549+
return self._id
15491550

15501551
def saveAsParquetFile(self, path):
15511552
"""Save the contents as a Parquet file, preserving the schema.

python/pyspark/tests.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,15 @@ def func():
281281

282282
class TestRDDFunctions(PySparkTestCase):
283283

284+
def test_id(self):
285+
rdd = self.sc.parallelize(range(10))
286+
id = rdd.id()
287+
self.assertEqual(id, rdd.id())
288+
rdd2 = rdd.map(str).filter(bool)
289+
id2 = rdd2.id()
290+
self.assertEqual(id + 1, id2)
291+
self.assertEqual(id2, rdd2.id())
292+
284293
def test_failed_sparkcontext_creation(self):
285294
# Regression test for SPARK-1550
286295
self.sc.stop()

0 commit comments

Comments
 (0)