12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
14
15
- import logging
15
+ # standard libraries
16
16
import json
17
+ import logging
17
18
19
+ # third party libraries
18
20
import apache_beam as beam
19
- from apache_beam import DoFn
20
- from apache_beam import Filter
21
- from apache_beam import Map
22
- from apache_beam import ParDo
23
- from apache_beam .io import ReadFromPubSub
24
- from apache_beam .io import WriteToPubSub
21
+ from apache_beam import DoFn , Filter , Map , ParDo
22
+ from apache_beam .io import ReadFromPubSub , WriteToPubSub
25
23
from apache_beam .options .pipeline_options import PipelineOptions
26
24
27
-
28
25
PROCESSED_TAG = "processed"
29
26
UNPROCESSED_TAG = "unprocessed"
30
27
31
28
32
29
class PubSubOptions (PipelineOptions ):
33
-
34
30
@classmethod
35
31
def _add_argparse_args (cls , parser ):
36
32
parser .add_argument (
37
33
"--input_topic" ,
38
34
default = "projects/your-project/topics/your-input-test" ,
39
- help = "Input PubSub topic" )
35
+ help = "Input PubSub topic" ,
36
+ )
40
37
parser .add_argument (
41
38
"--output_topic" ,
42
39
default = "projects/your-project/topics/your-output-test" ,
43
- help = "Output PubSub topic" )
40
+ help = "Output PubSub topic" ,
41
+ )
44
42
parser .add_argument (
45
43
"--dlq_topic" ,
46
44
default = "projects/your-project/topics/your-dlq-test" ,
47
- help = "Dead Letter Queue PubSub topic" )
45
+ help = "Dead Letter Queue PubSub topic" ,
46
+ )
48
47
49
48
50
49
def run ():
51
50
"""
52
- This Apache Beam pipeline processes log messages from a Google Cloud Pub/Sub topic.
53
- The expected data format follows the standard Google Cloud log format,
54
- which can be achieved by routing logs to a Pub/Sub topic via https://console.cloud.google.com/logs/router.
51
+ This Apache Beam pipeline processes log messages from a Google Cloud
52
+ Pub/Sub topic. The expected data format follows the standard Google Cloud
53
+ log format, which can be achieved by routing logs to a Pub/Sub topic via
54
+ https://console.cloud.google.com/logs/router.
55
55
56
56
It performs the following steps:
57
57
1. Input Configuration:
@@ -66,40 +66,57 @@ def run():
66
66
b. UNPROCESSED (missing one or both of these fields).
67
67
68
68
4. Severity Filtering:
69
- - For PROCESSED messages, filters out those with severity other than "ERROR".
69
+ - For PROCESSED messages, filters out those with severity other than
70
+ "ERROR".
70
71
71
72
5. Data Transformation:
72
- - Extracts timestamp and message content from the 'jsonPayload' field for PROCESSED messages.
73
+ - Extracts timestamp and message content from the 'jsonPayload' field
74
+ for PROCESSED messages.
73
75
74
76
6. Output Handling:
75
- - Writes transformed PROCESSED messages to a specified output Pub/Sub topic.
77
+ - Writes transformed PROCESSED messages to a specified output Pub/Sub
78
+ topic.
76
79
- Sends UNPROCESSED messages to a Dead Letter Queue (DLQ) topic.
77
80
"""
78
81
79
82
options = PubSubOptions (streaming = True )
80
83
81
84
with beam .Pipeline (options = options ) as p :
82
- split_result = (p | "Read from PubSub" >> ReadFromPubSub (topic = options .input_topic )
83
- | "Parse JSON" >> Map (lambda msg : json .loads (msg ))
84
- | "Split Messages" >> ParDo (SplitMessages ()).with_outputs (UNPROCESSED_TAG , PROCESSED_TAG ))
85
+ split_result = (
86
+ p
87
+ | "Read from PubSub" >> ReadFromPubSub (topic = options .input_topic )
88
+ | "Parse JSON" >> Map (lambda msg : json .loads (msg ))
89
+ | "Split Messages"
90
+ >> ParDo (SplitMessages ()).with_outputs (
91
+ UNPROCESSED_TAG , PROCESSED_TAG
92
+ )
93
+ )
85
94
86
95
# Filter processed messages and write to output topic
87
- (split_result [PROCESSED_TAG ]
88
- | "Filter by Severity" >> Filter (filter_by_severity )
89
- | "Map to PubsubMessage for output" >> Map (to_pubsub_message_for_output )
90
- | "Write to PubSub" >> WriteToPubSub (options .output_topic , with_attributes = True ))
96
+ (
97
+ split_result [PROCESSED_TAG ]
98
+ | "Filter by Severity" >> Filter (filter_by_severity )
99
+ | "Map to PubsubMessage for output"
100
+ >> Map (to_pubsub_message_for_output )
101
+ | "Write to PubSub"
102
+ >> WriteToPubSub (options .output_topic , with_attributes = True )
103
+ )
91
104
92
105
# Write unprocessed messages to DLQ
93
- (split_result [UNPROCESSED_TAG ]
94
- | "Map to PubsubMessage for DLQ" >> Map (to_pubsub_message_for_dlq )
95
- | "Write to DLQ" >> WriteToPubSub (options .dlq_topic , with_attributes = True ))
106
+ (
107
+ split_result [UNPROCESSED_TAG ]
108
+ | "Map to PubsubMessage for DLQ" >> Map (to_pubsub_message_for_dlq )
109
+ | "Write to DLQ"
110
+ >> WriteToPubSub (options .dlq_topic , with_attributes = True )
111
+ )
96
112
97
113
98
114
class SplitMessages (DoFn ):
99
115
def process (self , element ):
116
+ # third party libraries
100
117
from apache_beam .pvalue import TaggedOutput
101
118
102
- if (' severity' in element ) & (' jsonPayload' in element ):
119
+ if (" severity" in element ) & (" jsonPayload" in element ):
103
120
yield TaggedOutput (PROCESSED_TAG , element )
104
121
else :
105
122
yield TaggedOutput (UNPROCESSED_TAG , element )
@@ -111,20 +128,26 @@ def filter_by_severity(log):
111
128
112
129
113
130
def to_pubsub_message_for_dlq (msg ):
131
+ # third party libraries
114
132
from apache_beam .io import PubsubMessage
115
133
116
134
return PubsubMessage (data = bytes (json .dumps (msg ), "utf-8" ), attributes = None )
117
135
118
136
119
137
def to_pubsub_message_for_output (log ):
138
+ # third party libraries
120
139
from apache_beam .io import PubsubMessage
121
140
122
141
# Example transformation: Extract relevant information from the log
123
142
transformed_data = {
124
143
"timestamp" : log .get ("timestamp" ),
125
- "message" : log .get ("jsonPayload" ).get ("message" )
144
+ "message" : log .get ("jsonPayload" ).get ("message" ),
126
145
}
127
- data = bytes (f"Error log message: { transformed_data ['message' ]} [{ transformed_data ['timestamp' ]} ]" , "utf-8" )
146
+ data = bytes (
147
+ f"Error log message: { transformed_data ['message' ]} "
148
+ f"[{ transformed_data ['timestamp' ]} ]" ,
149
+ "utf-8" ,
150
+ )
128
151
return PubsubMessage (data = data , attributes = transformed_data )
129
152
130
153
0 commit comments