-
Notifications
You must be signed in to change notification settings - Fork 0
/
beamTesting.py
108 lines (89 loc) · 3.2 KB
/
beamTesting.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/python
#https://github.com/apache/beam/tree/master/sdks/python
import apache_beam as beam
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions
from apache_beam.utils.pipeline_options import StandardOptions
import os
import time
project = 'cgc-05-0016'
input_table = 'isb-cgc.test.TCGA_Annotation'
filename = "names"
directory = "."
def runQuery():
#https://beam.apache.org/get-started/wordcount-example/
#input_query = 'SELECT annotation_gdc_id, category, classification, status, entity_type, project_short_name, entity_barcode ' \
# 'FROM `isb-cgc.test.TCGA_Annotation` ' \
# 'WHERE classification = \'Observation\' ' \
# 'LIMIT 10'
#options = PipelineOptions()
#google_cloud_options = options.view_as(GoogleCloudOptions)
#google_cloud_options.project = project
#google_cloud_options.temp_location = "gs://tdp-test/beam-test"
#options.view_as(StandardOptions).runner = 'DataflowRunner'
#p = beam.Pipeline(options=options)
#print "Running pipeline"
#p = beam.Pipeline(argv=['--project', project])
#(p
# | 'read' >> beam.Read(beam.io.BigQuerySource(query=input_query))
# | 'save' >> beam.Write(beam.io.WriteToText('./beam_test.txt')))
#p.run()
def beamBasic():
p = beam.Pipeline('DirectRunner')
(p
| 'add names' >> beam.Create(['Larry', 'Curly', 'Moe'])
| 'save' >> beam.io.WriteToText(directory +"/"+filename))
p.run()
def beamMap():
#The Github example is somewhat misleading. The WriteToText as shown in the example doesn't save the
#filename, it adds an additional extention. And that causes the later ReadFromText to not find the file
#So when reading and writing files wtih Beam, you need to use exact file names.
p = beam.Pipeline('DirectRunner')
for file in os.listdir(directory):
if file.startswith(filename):
print file
(p
| 'load names' >> beam.io.ReadFromText(file)
| 'add greeting' >> beam.Map(lambda names, msg: '%s, %s!' %(msg, names), 'Nyuk!')
| 'save' >> beam.io.WriteToText('./greetings'))
p.run()
def basicFlatmap():
p = beam.Pipeline('DirectRunner')
for file in os.listdir(directory):
if file.startswith(filename):
(p
| 'load names' >> beam.io.ReadFromText(file)
| 'add greetings' >> beam.FlatMap(
lambda name, messages: ['%s %s!' % (msg,name) for msg in messages],
['Nyuk', 'Soitanley'])
| 'save' >> beam.io.WriteToText('./multi_greetings'))
p.run()
def add_greetings(name, messages):
for msg in messages:
yield '%s %s#' % (msg,name)
def complexFlatMap():
p = beam.Pipeline('DirectRunner')
for file in os.listdir(directory):
if file.startswith(filename):
(p
| 'load names' >> beam.io.ReadFromText(file)
| 'add greetings' >> beam.FlatMap(add_greetings, ['Nyuk', 'Soitanley'])
| 'save' >> beam.io.WriteToText('./complex_greetings'))
p.run()
def main():
#runQuery()
#print "Running beamBasic"
#beamBasic()
#Need to sleep otherwise beamMap reads temporary files and blows up
#time.sleep(1)
#print "Running beamMap"
#beamMap()
#time.sleep(1)
#print "Running basicFlatMap"
#basicFlatmap()
#print "Running complexFlatMap"
#complexFlatMap()
print "Runninq BQ query"
runQuery()
if __name__ == '__main__':
main()