Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETL-261] Add --drop-duplicates functionality to the bootstrap trigger script #101

Merged
merged 2 commits into from
Oct 25, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions src/scripts/bootstrap_trigger/bootstrap_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def read_args():
"If this argument is not specified, all files in "
"the --file-view will be submitted to the "
"--glue-workflow."))
parser.add_argument("--drop-duplicates",
default=False,
action="store_true",
help=("Optional. Only use the most recently exported record "
"for a given record ID. Defaults to false"))
parser.add_argument("--diff-s3-uri",
help=("Optional. The S3 URI of a parquet dataset to diff with "
"before submitting to the --glue-workflow."))
Expand All @@ -61,6 +66,17 @@ def read_args():


def get_synapse_client(ssm_parameter=None, aws_session=None):
"""
Return an authenticated Synapse client.

Args:
ssm_parameter (str): Name of the SSM parameter containing the
BridgeDownstream Synapse password.
aws_session (boto3.session.Session)

Returns:
synapseclient.Synapse
"""
if ssm_parameter is not None:
ssm_client = aws_session.client("ssm")
token = ssm_client.get_parameter(
Expand All @@ -74,6 +90,18 @@ def get_synapse_client(ssm_parameter=None, aws_session=None):


def get_synapse_df(syn, entity_view, index_field, query=None):
"""
Query a table/view on Synapse and return as a Pandas dataframe

Args:
syn (synapseclient.Synapse)
entity_view (str): The Synapse ID of the entity view
index_field (str): The field name to set as index on the returned DataFrame
query (str): An optional query to filter out records from the entity view.

Returns:
pandas.DataFrame
"""
if query is not None:
query_string = query.format(source_table=entity_view)
else:
Expand All @@ -85,6 +113,17 @@ def get_synapse_df(syn, entity_view, index_field, query=None):


def get_parquet_dataset(dataset_uri, aws_session, columns):
"""
Returns a Parquet dataset on S3 as a pandas dataframe

Args:
dataset_uri (str): The S3 URI of the parquet dataset.
aws_session (boto3.session.Session)
columns (list[str]): A list of columns to return from the parquet dataset.

Returns:
pandas.DataFrame
"""
session_credentials = aws_session.get_credentials()
table_source = dataset_uri.split("s3://")[-1]
s3_fs = fs.S3FileSystem(
Expand All @@ -100,6 +139,20 @@ def get_parquet_dataset(dataset_uri, aws_session, columns):

def submit_archives_to_workflow(
syn, synapse_ids, raw_folder_id, glue_workflow, aws_session):
"""
Submit data on Synapse to a Glue workflow.

Args:
syn (synapseclient.Synapse)
synapse_ids (list[str]): The Synapse IDs to submit to the Glue workflow.
raw_folder_id (str): The Synapse ID of the Bridge Raw Data folder where
the `synapse_ids` are scoped.
glue_workflow (str): The name of the Glue workflow.
aws_session (boto3.session.Session)

Returns:
None
"""
glue_client = aws_session.client("glue")
batch_size = 100
synapse_id_groups = [
Expand All @@ -121,6 +174,22 @@ def submit_archives_to_workflow(


def get_message(syn, synapse_id, raw_folder_id):
"""
Get Synapse file entity metadata and format for submission to the
S3 to JSON S3 workflow. A helper function for `submit_archives_to_workflow`.

Args:
syn (synapseclient.Synapse)
synapse_id (str)
raw_folder_id (str): The Synapse ID of the Bridge Raw Data folder where
`synapse_id` is scoped.

Returns:
dict: With keys
* source_bucket
* source_key
* raw_folder_id
"""
f = syn.get(synapse_id, downloadFile=False)
bucket = f["_file_handle"]["bucketName"]
key = f["_file_handle"]["key"]
Expand All @@ -144,6 +213,10 @@ def main():
entity_view=args.file_view,
index_field=args.diff_file_view_field,
query=args.query)
if args.drop_duplicates:
sorted_synapse_df = synapse_df.sort_values(by="exportedOn")
synapse_df = sorted_synapse_df[
~sorted_synapse_df.index.duplicated(keep="last")]
if (
args.diff_s3_uri is not None
and args.diff_parquet_field is not None
Expand All @@ -156,6 +229,7 @@ def main():
synapse_df = synapse_df.drop(
parquet_dataset[args.diff_parquet_field].values)
if len(synapse_df) > 0:
print(f"Submitting { len(synapse_df) } records to { args.glue_workflow }")
submit_archives_to_workflow(
syn=syn,
synapse_ids=synapse_df.id.values,
Expand Down