@@ -339,38 +339,37 @@ def _external_items(self):
339339
340340 try :
341341 for i in range (self .partitions ):
342- self .data = {}
343- for j in range (self .spills ):
344- path = self ._get_spill_dir (j )
345- p = os .path .join (path , str (i ))
346- # do not check memory during merging
347- self .mergeCombiners (self .serializer .load_stream (open (p )), 0 )
348-
349- # limit the total partitions
350- if (self .scale * self .partitions < self .MAX_TOTAL_PARTITIONS
351- and j < self .spills - 1
352- and get_used_memory () > hard_limit ):
353- self .data .clear () # will read from disk again
354- gc .collect () # release the memory as much as possible
355- for v in self ._recursive_merged_items (i ):
356- yield v
357- break
358- else :
359- for v in self .data .iteritems ():
360- yield v
361- self .data .clear ()
362-
342+ for v in self ._merged_items (i ):
343+ yield v
344+ self .data .clear ()
363345 gc .collect ()
364346 hard_limit = self ._next_limit ()
365347
366348 # remove the merged partition
367349 for j in range (self .spills ):
368350 path = self ._get_spill_dir (j )
369351 os .remove (os .path .join (path , str (i )))
370-
371352 finally :
372353 self ._cleanup ()
373354
355+ def _merged_items (self , index , limit = 0 ):
356+ self .data = {}
357+ for j in range (self .spills ):
358+ path = self ._get_spill_dir (j )
359+ p = os .path .join (path , str (index ))
360+ # do not check memory during merging
361+ self .mergeCombiners (self .serializer .load_stream (open (p )), 0 )
362+
363+ # limit the total partitions
364+ if (self .scale * self .partitions < self .MAX_TOTAL_PARTITIONS
365+ and j < self .spills - 1
366+ and get_used_memory () > limit ):
367+ self .data .clear () # will read from disk again
368+ gc .collect () # release the memory as much as possible
369+ return self ._recursive_merged_items (index )
370+
371+ return self .data .iteritems ()
372+
374373 def _cleanup (self ):
375374 """ Clean up all the files in disks """
376375 for d in self .localdirs :
@@ -603,7 +602,23 @@ def _spill(self):
603602 self .spills += 1
604603 gc .collect () # release the memory as much as possible
605604
606- def _recursive_merged_items (self , index ):
605+ def _merge_items (self , index , limit = 0 ):
606+ size = sum (os .path .getsize (os .path .join (self ._get_spill_dir (j ), str (index )))
607+ for j in range (self .spills ))
608+ # if the memory can not hold all the partition,
609+ # then use sort based merge
610+ if (size >> 20 ) > self .memory_limit / 2 :
611+ return self ._sorted_items (index )
612+
613+ self .data = {}
614+ for j in range (self .spills ):
615+ path = self ._get_spill_dir (j )
616+ p = os .path .join (path , str (index ))
617+ # do not check memory during merging
618+ self .mergeCombiners (self .serializer .load_stream (open (p )), 0 )
619+ return self .data .iteritems ()
620+
621+ def _sorted_items (self , index ):
607622 """ load a partition from disk, then sort and group by key """
608623 def load_partition (j ):
609624 path = self ._get_spill_dir (j )
0 commit comments