Skip to content

Commit e00b561

Browse files
authored
fixed E501 (#32)
1 parent cdbfeb4 commit e00b561

25 files changed

+174
-102
lines changed

Diff for: Python/.flake8

-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ ignore =
55
E203
66
W503
77
F841
8-
E501
98
exclude =
109
.eggs
1110
.git

Diff for: Python/.isort.cfg

-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import_heading_stdlib=standard libraries
1818
import_heading_thirdparty=third party libraries
1919
include_trailing_comma=True
2020
indent=' '
21-
known_dfml=src
2221
dedup_headings=True
2322
line_length=80
2423
multi_line_output=3

Diff for: Python/basics/combine_globally.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def run(argv=None):
2424
elements = [
2525
"Lorem ipsum dolor sit amet. Consectetur adipiscing elit",
2626
"Sed eu velit nec sem vulputate loborti",
27-
"In lobortis augue vitae sagittis molestie. Mauris volutpat tortor non purus elementum",
27+
"In lobortis augue vitae sagittis molestie. Mauris volutpat tortor non purus elementum", # noqa:E501
2828
"Ut blandit massa et risus sollicitudin auctor",
2929
]
3030

Diff for: Python/basics/combine_per_key.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,22 @@ def run(argv=None):
2424
elements = [
2525
(
2626
"Latin",
27-
"Lorem ipsum dolor sit amet. Consectetur adipiscing elit. Sed eu velit nec sem vulputate loborti",
27+
"Lorem ipsum dolor sit amet. Consectetur adipiscing elit. Sed eu velit nec sem vulputate loborti", # noqa:E501
2828
),
2929
(
3030
"Latin",
31-
"In lobortis augue vitae sagittis molestie. Mauris volutpat tortor non purus elementum",
31+
"In lobortis augue vitae sagittis molestie. Mauris volutpat tortor non purus elementum", # noqa:E501
3232
),
3333
("English", "From fairest creatures we desire increase"),
3434
("English", "That thereby beauty's rose might never die"),
3535
("English", "But as the riper should by time decease"),
3636
(
3737
"Spanish",
38-
"En un lugar de la Mancha, de cuyo nombre no quiero acordarme, no ha mucho",
38+
"En un lugar de la Mancha, de cuyo nombre no quiero acordarme, no ha mucho", # noqa:E501
3939
),
4040
(
4141
"Spanish",
42-
"tiempo que vivía un hidalgo de los de lanza en astillero, adarga antigua",
42+
"tiempo que vivía un hidalgo de los de lanza en astillero, adarga antigua", # noqa:E501
4343
),
4444
]
4545

Diff for: Python/basics/pardo.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def run(argv=None):
3030
elements = [
3131
"Lorem ipsum dolor sit amet. Consectetur adipiscing elit",
3232
"Sed eu velit nec sem vulputate loborti",
33-
"In lobortis augue vitae sagittis molestie. Mauris volutpat tortor non ",
33+
"In lobortis augue vitae sagittis molestie. Mauris volutpat tortor non ", # noqa:E501
3434
"Ut blandit massa et risus sollicitudin auctor",
3535
]
3636

Diff for: Python/bigquery/failed_rows_bigquery.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030

3131

3232
def run(argv=None):
33-
"""This pipeline shows how to access the rows that failed being inserted to BigQuery"""
33+
"""This pipeline shows how to access the rows that failed being
34+
inserted to BigQuery"""
3435
topic = "projects/pubsub-public-data/topics/taxirides-realtime"
3536

3637
class FailedRowsOptions(PipelineOptions):

Diff for: Python/bigquery/nested_bigquery.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ def _add_argparse_args(cls, parser):
4444

4545

4646
def run(argv=None):
47-
"""This pipeline shows how to read, write and modify nested fields from BigQuery"""
47+
"""This pipeline shows how to read, write and modify nested fields
48+
from BigQuery"""
4849

4950
schema = {
5051
"fields": [

Diff for: Python/bigquery/read_query_bigquery.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ def _add_argparse_args(cls, parser):
2828
parser.add_argument(
2929
"--query",
3030
default=(
31-
"SELECT repository_language, COUNT(repository_language) totalRepos"
32-
" FROM `bigquery-public-data.samples.github_timeline` GROUP BY 1"
31+
"SELECT repository_language, COUNT(repository_language) totalRepos" # noqa:E501
32+
" FROM `bigquery-public-data.samples.github_timeline` GROUP BY 1" # noqa:E501
3333
),
3434
help="BigQuery query to read data",
3535
)

Diff for: Python/bigquery/repeated_bigquery.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ def _add_argparse_args(cls, parser):
4343

4444

4545
def run(argv=None):
46-
"""This pipeline shows how to read, write and modify nested fields from BigQuery"""
46+
"""This pipeline shows how to read, write and modify nested fields
47+
from BigQuery"""
4748

4849
schema = {
4950
"fields": [

Diff for: Python/bigquery/streaming_inserts_bigquery.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class StreamingInsertsOptions(PipelineOptions):
4141
def _add_argparse_args(cls, parser):
4242
parser.add_argument("--output_table", help="BQ Table to write")
4343

44-
table_schema = "ride_status:STRING, passenger_count:INTEGER, meter_reading:FLOAT, timestamp:STRING"
44+
table_schema = "ride_status:STRING, passenger_count:INTEGER, meter_reading:FLOAT, timestamp:STRING" # noqa:E501
4545
options = StreamingInsertsOptions()
4646
with beam.Pipeline(options=options) as p:
4747
output = (

Diff for: Python/bigquery/streaming_load_jobs_bigquery.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def _add_argparse_args(cls, parser):
4747
help="Frequency to trigger the load job",
4848
)
4949

50-
table_schema = "ride_status:STRING, passenger_count:INTEGER, meter_reading:FLOAT, timestamp:STRING"
50+
table_schema = "ride_status:STRING, passenger_count:INTEGER, meter_reading:FLOAT, timestamp:STRING" # noqa:E501
5151
options = StreamingLoadOptions()
5252
with beam.Pipeline(options=options) as p:
5353
output = (

Diff for: Python/extra_examples/template.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828

2929
"""
3030
31-
This example shows how to use Classic Templates. Note that Flex Templates are the preferred method.
31+
This example shows how to use Classic Templates. Note that Flex Templates
32+
are the preferred method.
3233
"""
3334

3435

Diff for: Python/gcs/read_all_textio_streaming.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def _add_argparse_args(cls, parser):
3333
"--topic",
3434
description="Provide a topic to read from",
3535
dest="topic",
36-
help="You need to create this topic and the messages will be GCS paths.",
36+
help="You need to create this topic and the messages will be GCS paths.", # noqa:E501
3737
)
3838

3939
options = ReadAllStreamingTextOptions(streaming=True)

Diff for: Python/gcs/read_avro.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def _add_argparse_args(cls, parser):
4040
| "ReadAvro" >> ReadFromAvro(options.path)
4141
| "CheckRow"
4242
>> Map(
43-
lambda row: f"The abbreviation of {row['name']} is {row['post_abbr']}"
43+
lambda row: f"The abbreviation of {row['name']} is {row['post_abbr']}" # noqa:E501
4444
)
4545
| Map(logging.info)
4646
)

Diff for: Python/gcs/read_parquet.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def _add_argparse_args(cls, parser):
4040
| "ReadParquet" >> ReadFromParquet(options.path)
4141
| "CheckRow"
4242
>> Map(
43-
lambda row: f"The abbreviation of {row['name']} is {row['post_abbr']}"
43+
lambda row: f"The abbreviation of {row['name']} is {row['post_abbr']}" # noqa:E501
4444
)
4545
| Map(logging.info)
4646
)

Diff for: Python/pubsub/read_pubsub_topic.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ class ReadPubSubOptions(PipelineOptions):
2828
def _add_argparse_args(cls, parser):
2929
parser.add_argument(
3030
"--topic",
31-
# Run on Dataflow or authenticate to not get 403 PermissionDenied
31+
# Run on Dataflow or authenticate to not get
32+
# 403 PermissionDenied
3233
default="projects/pubsub-public-data/topics/taxirides-realtime",
3334
help="PubSub topic to read",
3435
)

Diff for: Python/testing_windows/accumulating_fired_panes.py

+19-11
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def get_input_stream():
8585
def print_with_info(element, window=DoFn.WindowParam, pane=DoFn.PaneInfoParam):
8686
"""Helper function to print window and pane info"""
8787
print(
88-
f"Element: {element[1]} \n\t Window: {window} \n\t Pane Timing: {PaneInfoTiming.to_string(pane.timing)}"
88+
f"Element: {element[1]} \n\t Window: {window} \n\t Pane Timing: {PaneInfoTiming.to_string(pane.timing)}" # noqa:E501
8989
)
9090

9191

@@ -106,10 +106,12 @@ def run():
106106
| "Window into FixedWindows"
107107
>> WindowInto(
108108
# We are using the default triggering which triggers when watermark
109-
# crosses the window end and whenever valid late element(s) are added
109+
# crosses the window end and whenever valid late element(s)
110+
# are added
110111
FixedWindows(size=window_size_seconds),
111112
allowed_lateness=window_allowed_lateness_seconds,
112-
# Late elements are combined with previous elements within the window
113+
# Late elements are combined with previous elements within
114+
# the window
113115
accumulation_mode=AccumulationMode.ACCUMULATING,
114116
)
115117
| "Combine elements within pane by key" >> GroupByKey()
@@ -126,12 +128,18 @@ def run():
126128
"""
127129
EXPLANATION
128130
1 - Elements "On-Time-X" arrive to the pipeline before window closes
129-
2 - Elements "Late-(1 to 4)" arrive after window closing, but within the allowed late time
130-
3 - Elements "Late-(5, 6)" also arrive after window closing, but within the allowed late time
131-
4 - Element "Outside-allowed-lateness" is discarded as it's outside the allowed lateness
132-
133-
134-
The first trigger contains elements (1), since they arrive before window closing
135-
The second trigger contains elements (2) and elements (1), since we are accumulating
136-
The third trigger contains only elements (3) but also (2) and (1) because of the accumulation
131+
2 - Elements "Late-(1 to 4)" arrive after window closing, but within
132+
the allowed late time
133+
3 - Elements "Late-(5, 6)" also arrive after window closing, but within
134+
the allowed late time
135+
4 - Element "Outside-allowed-lateness" is discarded as it's outside
136+
the allowed lateness
137+
138+
139+
The first trigger contains elements (1), since they arrive before
140+
window closing
141+
The second trigger contains elements (2) and elements (1),
142+
since we are accumulating
143+
The third trigger contains only elements (3) but also (2) and (1)
144+
because of the accumulation
137145
"""

Diff for: Python/testing_windows/discarding_fired_panes.py

+18-11
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ def run():
9999
>> WindowInto(
100100
FixedWindows(size=window_size_seconds),
101101
allowed_lateness=window_allowed_lateness_seconds,
102-
# Late elements are contained within their own pane and no correlation
103-
# exists between the panes
102+
# Late elements are contained within their own pane
103+
# and no correlation exists between the panes
104104
accumulation_mode=AccumulationMode.DISCARDING,
105105
)
106106
| "Combine elements within pane by key" >> GroupByKey()
@@ -118,13 +118,20 @@ def run():
118118
119119
EXPLANATION
120120
1 - Elements "On-Time-X" arrive to the pipeline before window closes
121-
2 - Elements "Late-(1 to 4)" arrive after window closing, but within the allowed late time
122-
3 - Elements "Late-(5, 6)" also arrive after window closing, but within the allowed late time
123-
4 - Element "Outside-allowed-lateness" is discarded as it's outside the allowed lateness
124-
125-
The first trigger contains elements (1), since they arrive before window closing
126-
The second trigger contains elements (2) and only elements (2). They are triggered by themselves as they arrived late
127-
The thrid trigger contains only elements (3) since they also arrived late but at different time
128-
129-
Element "On-Time-New-Window-1" shows that this process continues until termination
121+
2 - Elements "Late-(1 to 4)" arrive after window closing,
122+
but within the allowed late time
123+
3 - Elements "Late-(5, 6)" also arrive after window closing,
124+
but within the allowed late time
125+
4 - Element "Outside-allowed-lateness" is discarded
126+
as it's outside the allowed lateness
127+
128+
The first trigger contains elements (1), since they arrive
129+
before window closing
130+
The second trigger contains elements (2) and only elements (2).
131+
They are triggered by themselves as they arrived late
132+
The thrid trigger contains only elements (3)
133+
since they also arrived late but at different time
134+
135+
Element "On-Time-New-Window-1" shows that this process continues
136+
until termination
130137
"""

Diff for: Python/testing_windows/element_count_trigger.py

+31-19
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ def get_input_stream():
4848
.advance_processing_time(2)
4949
.add_elements(
5050
[
51-
# Different event times but elements "arrived" same time in pipeline
51+
# Different event times but elements "arrived"
52+
# same time in pipeline
5253
TimestampedValue("Same-Time-1", timestamp=4),
5354
TimestampedValue("Same-Time-2", timestamp=5),
5455
TimestampedValue("Same-Time-3", timestamp=6),
@@ -112,18 +113,20 @@ def run():
112113
allowed_lateness=window_allowed_lateness_seconds,
113114
accumulation_mode=trigger.AccumulationMode.DISCARDING,
114115
# Try setting different types of triggers!
115-
# AfterWatermark only fires pane once when the watermark crosses the
116-
# Window end time
116+
# AfterWatermark only fires pane once when the watermark
117+
# crosses the Window end time
117118
# trigger=trigger.AfterWatermark()
118-
# We repeatedly fire panes forever with count as least late_pane_size
119+
# We repeatedly fire panes forever with count as
120+
# least late_pane_size
119121
# We also want to set a termination to stop above infinite fires so
120122
# use AfterWatermark a final trigger.
121123
# trigger=trigger.OrFinally(
122124
# trigger.Repeatedly(trigger.AfterCount(late_pane_size)),
123125
# trigger.AfterWatermark()
124126
# )
125-
# We need to configure how late data should be handled to prevent data
126-
# loss since AfterWatermark by default only fires once
127+
# We need to configure how late data should be handled
128+
# to prevent data loss since AfterWatermark
129+
# by default only fires once
127130
# https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/trigger.py#L596
128131
trigger=trigger.OrFinally(
129132
trigger.Repeatedly(trigger.AfterCount(late_pane_size)),
@@ -145,18 +148,27 @@ def run():
145148
"""
146149
EXPLANATION
147150
148-
`trigger.AfterCount` triggers whenever the current pane has at least `late_pane_size` elements,
149-
but there are some constrains.
150-
151-
1 - The first trigger contains "First" and "Second", which is expected since they are the first 2 elements.
152-
2 - The second trigger contains the "Third" element but also all of "Same-Time-X" elements, so it is not just 2 elements.
153-
This happens because of "Same-Time-X" arrive at the same time, so there is no second element but second elements
154-
3 - The third trigger only contains "Last-In-Window", this is because we have `trigger.OrFinally(... , trigger.AfterWatermark(...))`
155-
so it triggers when the window closes, no matter the number of elements there are in pane.
151+
`trigger.AfterCount` triggers whenever the current pane has
152+
at least `late_pane_size` elements, but there are some constrains.
153+
154+
1 - The first trigger contains "First" and "Second", which is expected
155+
since they are the first 2 elements.
156+
2 - The second trigger contains the "Third" element but also
157+
all of "Same-Time-X" elements, so it is not just 2 elements.
158+
This happens because of "Same-Time-X" arrive at the same time,
159+
so there is no second element but second elements
160+
3 - The third trigger only contains "Last-In-Window", this is because
161+
we have `trigger.OrFinally(... , trigger.AfterWatermark(...))`
162+
so it triggers when the window closes,
163+
no matter the number of elements there are in pane.
156164
4 - The fourth trigger contains "Late-(1,2,3)", since they arrive together
157-
5 - The next triggers contain the elements as they come, since we have `trigger.OrFinally(... , trigger.AfterWatermark(...))`
158-
6 - Element "Outside-Window-Lateness" is discarded as it's outside the allowed lateness
159-
160-
In a real life scenario, it's not that common two elements arrive at the exact same time, but networking conditions can affect arrival.
161-
Note: `AfterCount` can cause hanged pipeline if the condition is not met. To resolve this see note in `after_each_trigger.py`
165+
5 - The next triggers contain the elements as they come,
166+
since we have `trigger.OrFinally(... , trigger.AfterWatermark(...))`
167+
6 - Element "Outside-Window-Lateness" is discarded as
168+
it's outside the allowed lateness
169+
170+
In a real life scenario, it's not that common two elements arrive
171+
at the exact same time, but networking conditions can affect arrival.
172+
Note: `AfterCount` can cause hanged pipeline if the condition is not met.
173+
To resolve this see note in `after_each_trigger.py`
162174
"""

Diff for: Python/testing_windows/late_data.py

+25-14
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ def get_input_stream():
4545
.advance_processing_time(3)
4646
.add_elements(
4747
[
48-
# Although pane fired, below element still within lateness configured
48+
# Although pane fired, below element still
49+
# within lateness configured
4950
TimestampedValue("Late-but-allowed", timestamp=9)
5051
]
5152
)
@@ -90,10 +91,13 @@ def run():
9091
>> WindowInto(
9192
FixedWindows(size=window_size_seconds),
9293
allowed_lateness=window_allowed_lateness_seconds,
93-
# Set `accumulation_mode` as `ACCUMULATING` when you want to collect all the
94-
# panes fired in the window. To have the panes be considered individually
95-
# use `DISCARDING` mode. Use `DISCARDING` when the datum are indipendent
96-
# of each other. `ACCUMULATING` will fire each time the a panes arrives from a window
94+
# Set `accumulation_mode` as `ACCUMULATING` when you want to
95+
# collect all the panes fired in the window.
96+
# To have the panes be considered individually
97+
# use `DISCARDING` mode. Use `DISCARDING`
98+
# when the datum are independent of each other.
99+
# `ACCUMULATING` will fire each time
100+
# when a panes arrives from a window
97101
accumulation_mode=AccumulationMode.DISCARDING,
98102
)
99103
| "Group together elements within same pane and key" >> GroupByKey()
@@ -111,14 +115,21 @@ def run():
111115

112116
# EXPLANATION
113117
# 1 - Elements A,B,C,D arrive to the pipeline on time
114-
# 1 - Element "Late-but-before-window-closing" arrives 9s late, but before window closes
115-
# 2 - Element "Late-but-allowed" arrives after window closing, but within the allowed late time
116-
# 3 - Element "Late-and-outside-window" arrives after window closing and after allowed late time
117-
# 4 - Element "window-2" arrives on time but in a different window than 0 to 10000
118+
# 1 - Element "Late-but-before-window-closing" arrives 9s late,
119+
# but before window closes
120+
# 2 - Element "Late-but-allowed" arrives after window closing,
121+
# but within the allowed late time
122+
# 3 - Element "Late-and-outside-window" arrives after window closing and
123+
# after allowed late time
124+
# 4 - Element "window-2" arrives on time but in a different window
125+
# than 0 to 10000
118126
#
119-
# The first trigger with elements (1) contains all data from the first window that arrived before window closed
120-
# The second trigger with element (2) includes the data that arrived late but within the allowed late time
127+
# The first trigger with elements (1) contains all data from the first window
128+
# that arrived before window closed
129+
# The second trigger with element (2) includes the data that arrived late
130+
# but within the allowed late time
121131
# Elements in (3) are discarded, since they arrive after the allowed late time
122-
# The third trigger with element (4) appears in a new window `[10.0, 20.0)` with pane index `0`
123-
# Even though the Window is same for (1) (2) the pane index for (1) is `0` and for (2) is `1`
124-
# denoting they are logically separated
132+
# The third trigger with element (4) appears in a new window `[10.0, 20.0)`
133+
# with pane index `0`
134+
# Even though the Window is same for (1) (2) the pane index for (1) is `0`
135+
# and for (2) is `1` denoting they are logically separated

0 commit comments

Comments
 (0)