1+ """
2+ This script is an example of code that can be used to create an AWS Glue custom visual transform
3+ to post data quality results generated by AWS Glue DataQuality in an ETL job to Amazon DataZone.
4+
5+ Dependencies:
6+ - boto3 (AWS SDK for Python)
7+ - awsglue (AWS Glue Python library)
8+ - re (Python regular expression module)
9+ - json (Python JSON module)
10+ - datetime (Python datetime module)
11+
12+ Author: Fabrizio Napolitano
13+ Version: 1.0
14+ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
15+ # SPDX-License-Identifier: MIT-0
16+ """
17+
18+ import boto3
19+ import re
20+ import json
21+ import pyspark
22+ from datetime import datetime
23+ from awsglue import DynamicFrame
24+
25+ def post_dq_results_to_datazone (self , roleToAssume : str , dzDomain : str , tableName : str , schemaName : str , dqRuleSetName : str , maxResults : int ) -> str :
26+ """
27+ Post data quality results to Amazon DataZone.
28+
29+ Args:
30+ self (DynamicFrame) : The instance of the class on which this method is called.
31+ roleToAssume (str): The ARN of the IAM role to assume.
32+ dzDomain (str): The Amazon DataZone domain identifier.
33+ tableName (str): The name of the table.
34+ schemaName (str): The name of the schema.
35+ dqRuleSetName (str): The name of the ruleset.
36+ maxResults (int): The maximum number of asset IDs to consider.
37+
38+ Returns:
39+ str: A success or error message.
40+ """
41+
42+
43+ get_logger ().info ('Starting post data quality results to datazone...' )
44+ try :
45+ # Get the data quality results from the DynamicFrame
46+ df = self .toDF ()
47+ dq_results = df .collect ()
48+
49+ get_logger ().info (f"Data quality results: { dq_results } " )
50+
51+ # Create the Amazon DataZone client
52+ datazone = create_datazone_client (roleToAssume )
53+ get_logger ().info (f'DataZone Client ready!' )
54+
55+ # Search for the asset ID
56+ entity_identifier_list = search_asset_id (datazone , dzDomain , tableName , schemaName ,maxResults )
57+
58+ get_logger ().info (f'list pf entity identifiers: { entity_identifier_list } ' )
59+
60+ if entity_identifier_list is None :
61+ get_logger ().error ("Error: No asset found" )
62+ return 'error no asset found'
63+ else :
64+ # Post the data quality results to Amazon DataZone
65+ ts_form = generate_ts_form (dq_results ,dqRuleSetName )
66+ response = post_data_quality_results (datazone , dzDomain , entity_identifier_list , ts_form )
67+ get_logger ().info (f"Data quality results posted successfully: { response } " )
68+ return self
69+ except Exception as e :
70+ get_logger ().error (f"Error posting data quality results: { e } " )
71+ raise DataQualityJobError (f"Error calling post data quality results: { e } " )
72+
73+ class DataQualityJobError (Exception ):
74+ """
75+ Custom exception class for errors related to the data quality job.
76+ """
77+ pass
78+
79+ def create_datazone_client (roleToAssume : str ) -> boto3 .client :
80+ """
81+ Create an Amazon DataZone client with the specified role.
82+
83+ Args:
84+ roleToAssume (str): The ARN of the IAM role to assume.
85+
86+ This parameter is needed when running cross accounts and is used to assume a different role in the DataZone Domain Account for the client.
87+ The reason why we need to assume a different role is because the Datazone post_time_series_data_points does not
88+ yet support cross account writes. When the cross account write will be supported for the API we can simplify the
89+ method and remove this parameter.
90+
91+ Returns:
92+ boto3.client: The Amazon DataZone client.
93+ """
94+ try :
95+ credentials = get_credential_for_role (roleToAssume )
96+ if not credentials :
97+ # Create the DataZone client with the local role
98+ datazone_client = boto3 .client ('datazone' )
99+ else :
100+ # Create the DataZone client with the assumed role credentials
101+ datazone_client = boto3 .client (
102+ service_name = 'datazone' ,
103+ region_name = get_current_region (),
104+ aws_access_key_id = credentials ['AccessKeyId' ],
105+ aws_secret_access_key = credentials ['SecretAccessKey' ],
106+ aws_session_token = credentials ['SessionToken' ]
107+ )
108+ except Exception as e :
109+ get_logger ().error (f"Error creating DataZone client: { e } " )
110+ raise DataQualityJobError (f"Error creating DataZone client: { e } " )
111+
112+ return datazone_client
113+
114+ def get_credential_for_role (roleToAssume : str ) -> dict :
115+ """
116+ Validate the format of an Amazon Resource Name (ARN).
117+
118+ Args:
119+ arn_str (str): The ARN string to validate.
120+
121+ Returns:
122+ dict: Dictionary containing the temporary credentials for the assumed role.
123+ """
124+ try :
125+ # Assume the specified IAM role in the domain account
126+ sts = boto3 .client ('sts' , region_name = get_current_region ())
127+ assumed_role_object = sts .assume_role (RoleArn = roleToAssume , RoleSessionName = "AssumeRoleSession" )
128+ credentials = assumed_role_object ['Credentials' ]
129+ except Exception as e :
130+ get_logger ().error (f"Error assuming role: { e } ; proceding with empty credentials and current role..." )
131+ credentials = {}
132+
133+ return credentials
134+
135+ def get_current_region () -> str :
136+ """
137+ Get the current AWS region.
138+
139+ Returns:
140+ str: The current AWS region.
141+ """
142+ try :
143+ session = boto3 .Session ()
144+ return session .region_name
145+ except Exception as e :
146+ get_logger ().error (f"Error getting current region: { e } " )
147+ raise DataQualityJobError (f"Error getting current region: { e } " )
148+
149+ def search_asset_id (datazone , dzDomain , tableName , schemaName , maxResults : int ) -> str :
150+ """
151+ Search for an asset in Amazon DataZone.
152+
153+ Args:
154+ datazone (boto3.client): The Amazon DataZone client.
155+ dzDomain (str): The domain identifier.
156+ tableName (str): The name of the table.
157+ schemaName (str): The name of the schema.
158+ maxResults (int): The maximum number of results to return.
159+
160+ Returns:
161+ list: The list of entity identifiers for the asset, or None if not found.
162+ """
163+ get_logger ().info (f'starting search ... ' )
164+
165+ entity_identifier_list = []
166+
167+ try :
168+ response = datazone .search_listings (
169+ additionalAttributes = ['FORMS' ],
170+ domainIdentifier = dzDomain ,
171+ maxResults = maxResults ,
172+ searchText = tableName
173+ )
174+
175+ for item in response ['items' ]:
176+ forms_dict = json .loads (item ['assetListing' ]['additionalAttributes' ]['forms' ])
177+ if ('RedshiftTableForm' in forms_dict and
178+ forms_dict ['RedshiftTableForm' ]['schemaName' ] == schemaName and
179+ forms_dict ['RedshiftTableForm' ]['tableName' ] == tableName ) or \
180+ ('GlueTableForm' in forms_dict and
181+ f"table/{ schemaName } /{ tableName } " in forms_dict ['GlueTableForm' ]['tableArn' ]):
182+ entity_identifier = item ['assetListing' ]['entityId' ]
183+ get_logger ().info (f"DZ Asset Id: { entity_identifier_list } " )
184+ entity_identifier_list .append (entity_identifier )
185+ else :
186+ get_logger ().info (f'No matching asset found in this iteration' )
187+
188+ get_logger ().info (f"DZ Asset Id list: { entity_identifier_list } " )
189+ return entity_identifier_list
190+ except Exception as e :
191+ get_logger ().error (f"Error searching for asset ID: { e } " )
192+ raise DataQualityJobError (f"Error searching for asset ID: { e } " )
193+
194+ def generate_ts_form (results ,dqRuleSetName ) -> dict :
195+ """
196+ Generate the time series form for posting to Amazon DataZone.
197+
198+ Args:
199+ results (list): A list of evaluation objects.
200+
201+ Returns:
202+ dict: The time series form dictionary.
203+ """
204+ try :
205+ ts_form = {
206+ "content" : json .dumps ({
207+ "evaluationsCount" : len (results ),
208+ "evaluations" : [process_evaluation (evaluation ) for evaluation in results ],
209+ "passingPercentage" : calculate_score (results )
210+ }),
211+ "formName" : dqRuleSetName , # Specify your desired Ruleset Name
212+ "typeIdentifier" : "amazon.datazone.DataQualityResultFormType" ,
213+ "timestamp" : datetime .now ().timestamp ()
214+ }
215+
216+ get_logger ().info (f"Generated time series form: { ts_form } " )
217+ return ts_form
218+ except Exception as e :
219+ get_logger ().error (f"Error generating time series form: { e } " )
220+ raise DataQualityJobError (f"Error generating time series form: { e } " )
221+
222+ def process_evaluation (evaluation ) -> dict :
223+ """
224+ Process the evaluation results and extract relevant information.
225+
226+ Args:
227+ evaluation (object): An evaluation object containing the rule, metrics, and outcome.
228+
229+ Returns:
230+ dict: A dictionary containing the processed evaluation information.
231+ """
232+ try :
233+ result = {}
234+ evaluation_detail_type = "EVALUATION_MESSAGE"
235+ result_value = evaluation .FailureReason
236+
237+ # Extract rule, metric types, and columns from the evaluation
238+ rule , metric_types , columns = extract_metadata (evaluation .Rule , evaluation .EvaluatedMetrics )
239+
240+ evaluation_details = {evaluation_detail_type : result_value } if result_value else {}
241+
242+ result = {
243+ "applicableFields" : columns ,
244+ "types" : metric_types ,
245+ "status" : "PASS" if evaluation .Outcome == "Passed" else "FAIL" ,
246+ "description" : f"{ rule } " ,
247+ "details" : evaluation_details
248+ }
249+
250+ get_logger ().info (f"Processed evaluation: { result } " )
251+
252+ return result
253+ except Exception as e :
254+ get_logger ().error (f"Error processing evaluation: { e } " )
255+ raise DataQualityJobError (f"Error processing evaluation: { e } " )
256+
257+ def extract_metadata (rule , evaluated_metrics ):
258+ """
259+ Extract information on rule, metric types, and columns from the evaluation.
260+
261+ Args:
262+ rule (str): The rule description.
263+ evaluated_metrics (dict): The evaluated metrics.
264+
265+ Returns:
266+ tuple: A tuple containing the rule, metric types, and columns.
267+ """
268+ try :
269+ metric_types = [key .split ("." )[- 1 ] for key in evaluated_metrics ]
270+ columns = re .findall (r'"(.*?)"' , rule )
271+
272+ get_logger ().info (f"Extracted metadata: rule={ rule } , metric_types={ metric_types } , columns={ columns } " )
273+ return rule , metric_types , columns
274+ except Exception as e :
275+ get_logger ().error (f"Error extracting metadata: { e } " )
276+ raise DataQualityJobError (f"Error extracting metadata: { e } " )
277+
278+ def calculate_score (results ) -> float :
279+ """
280+ Calculate the score based on the number of successful evaluations.
281+
282+ Args:
283+ results (list): A list of evaluation objects.
284+
285+ Returns:
286+ float: The calculated score as a percentage.
287+ """
288+ try :
289+ num_success = sum (1 for evaluation in results if evaluation .Outcome == "Passed" )
290+ total_results = len (results )
291+
292+ score = (num_success / total_results ) * 100 if total_results else 0.0
293+ get_logger ().info (f"Calculated score: { score } %" )
294+ return score
295+ except Exception as e :
296+ get_logger ().error (f"Error calculating score: { e } " )
297+ raise DataQualityJobError (f"Error calculating score: { e } " )
298+
299+ def post_data_quality_results (datazone , dzDomain , entity_identifier_list , ts_form ) -> dict :
300+ """
301+ Post data quality time series data points to Amazon DataZone.
302+
303+ Args:
304+ datazone (boto3.client): The Amazon DataZone client.
305+ entity_identifier (str): The asset identifier.
306+ ts_form (dict): The time series form dictionary.
307+
308+ Returns:
309+ List of dict: The list of responses from each of the post_time_series_data_points API call.
310+ """
311+ response_list = []
312+
313+ try :
314+ for ei in entity_identifier_list :
315+ response = datazone .post_time_series_data_points (
316+ domainIdentifier = dzDomain ,
317+ entityIdentifier = ei ,
318+ entityType = 'ASSET' ,
319+ forms = [ts_form ]
320+ )
321+
322+ get_logger ().info (f"Posted results: { response } " )
323+ response_list .append (response )
324+
325+ return response_list
326+ except Exception as e :
327+ get_logger ().error (f"Error posting data quality results: { e } " )
328+ raise DataQualityJobError (f"Error posting data quality results: { e } " )
329+
330+ def get_logger ():
331+ return pyspark .SparkContext .getOrCreate ()._jvm .com .amazonaws .services .glue .log .GlueLogger ()
332+
333+
334+ DynamicFrame .post_dq_results_to_datazone = post_dq_results_to_datazone
0 commit comments