Skip to content

Commit 3133a60

Browse files
committed
fix accumulator with reused worker
1 parent 760ab1f commit 3133a60

File tree

2 files changed

+12
-0
lines changed

2 files changed

+12
-0
lines changed

python/pyspark/tests.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1106,6 +1106,17 @@ def test_after_jvm_exception(self):
11061106
rdd = self.sc.parallelize(range(100), 1)
11071107
self.assertEqual(100, rdd.map(str).count())
11081108

1109+
def test_accumulator_when_reuse_worker(self):
1110+
from pyspark.accumulators import INT_ACCUMULATOR_PARAM
1111+
acc1 = self.sc.accumulator(0, INT_ACCUMULATOR_PARAM)
1112+
self.sc.parallelize(range(100), 20).foreach(lambda x: acc1.add(x))
1113+
self.assertEqual(sum(range(100)), acc1.value)
1114+
1115+
acc2 = self.sc.accumulator(0, INT_ACCUMULATOR_PARAM)
1116+
self.sc.parallelize(range(100), 20).foreach(lambda x: acc2.add(x))
1117+
self.assertEqual(sum(range(100)), acc2.value)
1118+
self.assertEqual(sum(range(100)), acc1.value)
1119+
11091120

11101121
class TestSparkSubmit(unittest.TestCase):
11111122

python/pyspark/worker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ def main(infile, outfile):
7676
bid = - bid - 1
7777
_broadcastRegistry.pop(bid, None)
7878

79+
_accumulatorRegistry.clear()
7980
command = pickleSer._read_with_length(infile)
8081
(func, deserializer, serializer) = command
8182
init_time = time.time()

0 commit comments

Comments
 (0)