-
Notifications
You must be signed in to change notification settings - Fork 0
/
rdd_1.py
executable file
·198 lines (170 loc) · 7.42 KB
/
rdd_1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from itertools import *
import io
from collections import Iterable,defaultdict
from multiprocessing import Pool
from functools import reduce
#import cloudpickle
import pickle
import os
class Partition:
def __init__(self,x,idx):
self.index=idx
self._x=list(x)
def get(self):
return self._x
class RDD:
def __init__(self,partitions,ctx):
self._p=list(partitions)
self.ctx=ctx
self.name=None
def compute(self,split,task_context):
return split.get()
def collect(self):
return self.ctx.runJob(self,unit_map,resultHandler=unit_collect)
def filt(self,f):
return MapPartitionedRDD(self,lambda tc, i, x:(xx for xx in x if f(xx)))
def flatMap(self,f):
return MapPartitionedRDD(self,lambda tc, i, x: (e for xx in x for e in f(xx)))
def getLength(self):
return self.ctx.runJob(self,lambda tc,i:sum(1 for _ in i),resultHandler=sum)
def groupByKey(self,numPartitions=None):
if not numPartitions:
numPartitions=len(self._p)
r=defaultdict(list)
for k,v in self.collect():
r[k].append(v)
return self.ctx.parallelize(r.items(), numPartitions)
def join(self, other, numPartitions=None):
if not numPartitions:
numPartitions=len(self._p)
d1=dict(self.collect())
d2=dict(other.collect())
k=set(d1.keys())&set(d2.keys())
return self.ctx.parallelize(((x,(d1[x],d2[x]))for x in k),numPartitions)
def Map(self,f):
return MapPartitionedRDD(self,lambda tc, i, x: (f(e) for e in x))
def MapPartition(self,f):
return MapPartitionedRDD(self,lambda tc,i,x:f(x))
def MapValues(self,f):
return MapPartitionedRDD(self,lambda tc,i,x:((e[0],f(e[1])) for e in x))
def partitions(self):
return self._p
def reduceByKey(self,f,numPartitions=None):
return self.groupByKey(numPartitions).MapValues(lambda x:reduce(f,x))
def saveAsPicklefile(self,filename):
def _map(filename, obj):
stream = io.BytesIO()
pickle.dump(obj, stream)
stream.seek(0)
with io.open(filename, 'wb') as f:
for c in stream:
f.write(c)
_map(filename,self.collect())
return self
def saveAsTextfile(self,filename):
def to_stringio(data):
stringio = io.StringIO()
for line in data:
stringio.write('{}\n'.format(line))
stringio.seek(0)
return stringio
with io.open(filename, 'wb') as f:
for c in BytesIO(to_stringio(self.collect()).read().encode('utf8')):
f.write(c)
return self
def sortBy(self, keyfun, ascending=True, numPartitions=None):
if not numPartitions:
numPartitions=len(self._p)
return self.ctx.parallelize(sorted(self.collect(),key=keyfun,reverse=not ascending),numPartitions)
def summation(self):
return self.ctx.runJob(self,lambda tc,i:sum(i),resultHandler=sum)
class MapPartitionedRDD(RDD):
def __init__(self,prev,f):
RDD.__init__(self,prev.partitions(),prev.ctx)
self.prev=prev
self.f=f
def compute(self, split, task_context):
return self.f(task_context, split.index,self.prev.compute(split, task_context._create_child()))
def partitions(self):
return self.prev.partitions()
class TaskContext:
def __init__(self,stage_id=0,partition_id=0):
self.stage_id=stage_id
self.partition_id=partition_id
def _create_child(self):
return TaskContext(stage_id=self.stage_id + 1,partition_id=self.partition_id)
def _run_task(task_context, rdd, func, partition):
return func(task_context,rdd.compute(partition,task_context))
def unit_map(task_context, elements):
if isinstance(elements,Iterable):
return list(elements)
else:
a=[]
a.append(elements)
return a
def unit_collect(l):
return [x for p in l for x in p]
def runJob_map(i):
(serialized_func_rdd, serialized_task_context,serialized_data) = i
func, rdd = pickle.loads(serialized_func_rdd)
partition = pickle.loads(serialized_data)
task_context = pickle.loads(serialized_task_context)
result = _run_task(task_context, rdd, func, partition)
return pickle.dumps(result)#cloudpickle.dumps
class Context:
_last_rdd_id=0
def runJob(self, rdd, func, partitions=None,resultHandler=None,allowLocal=True):
if not partitions:
partitions = rdd.partitions()
def _runJob_local(self, rdd, func, partitions):
for partition in partitions:
task_context = TaskContext(stage_id=0,partition_id=partition.index)
yield _run_task(task_context, rdd, func, partition)
def _runJob_multi(self,rdd,func,partitions=None,resultHandler=None):#这里所有pickle.dumps都应该改为cloudpickle.dumps,但是集群上不支持cloudpickle,所以用pickle代替,此时只能在集群上测试单进程,多进程在thumm01上的虚拟环境中可以测试并正常运行。
if not partitions:
partitions=rdd.partitions()
pool=Pool(len(partitions))
serialized_func_rdd = pickle.dumps((func, rdd))
def prepare(partition):
task_context = TaskContext(stage_id=0,partition_id=partition.index)
serialized_task_context = pickle.dumps(task_context)
serialized_partition=pickle.dumps(partition)
return (serialized_func_rdd,serialized_task_context,serialized_partition)
prepared_partitions=(prepare(partition) for partition in partitions)
for d in pool.map(runJob_map, prepared_partitions):
map_result=pickle.loads(d)
yield map_result
pool.close()
if allowLocal:
map_result=_runJob_local(self, rdd, func, partitions)
else:
map_result=_runJob_multi(self, rdd, func, partitions)
result=(resultHandler(map_result) if resultHandler is not None else list(map_result))
return result
def parallelize(self,x,numPartitions):
if numPartitions==1:
return RDD([Partition(x,0)],self)
i1,i2=tee(x)
len_x=sum(1 for _ in i1)
def partitioned():
for i in range(numPartitions):
start = int(i * len_x / numPartitions)
end = int((i + 1) * len_x /numPartitions)
if i + 1 == numPartitions:
end += 1
yield islice(i2, end-start)
return RDD((Partition(data,i) for i,data in enumerate(partitioned())),self)
def pickleFile(self,filename):
a=filename.split(',')
rdd_filenames = self.parallelize(a,len(a))
def load_pickle(filename):
with io.open(filename, 'rb') as f:
return io.BytesIO(f.read())
return rdd_filenames.flatMap(lambda filename:pickle.load(load_pickle(filename)))
def textFile(self,filename):
a=filename.split(',')
rdd_filenames = self.parallelize(a,len(a))
def load_text(filename,encoding='utf8'):
with io.open(filename, 'r',encoding=encoding) as f:
return io.StringIO(f.read())
return rdd_filenames.flatMap(lambda filename:load_text(filename).read().splitlines())