diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 3ddf91d2..1b317533 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -36,7 +36,7 @@ jobs: SNOWFLAKE_WAREHOUSE: ${{ secrets.SNOWSQL_WAREHOUSE }} SNOWFLAKE_SYNAPSE_STAGE_STORAGE_INTEGRATION: ${{ vars.SNOWFLAKE_SYNAPSE_STAGE_STORAGE_INTEGRATION }} SNOWFLAKE_SYNAPSE_STAGE_URL: ${{ vars.SNOWFLAKE_SYNAPSE_STAGE_URL }} - + STACK: ${{ vars.STACK }} steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v4 @@ -156,6 +156,7 @@ jobs: SNOWFLAKE_SYNAPSE_DATA_WAREHOUSE_DATABASE: ${{ vars.SNOWFLAKE_SYNAPSE_DATA_WAREHOUSE_DATABASE }} SNOWFLAKE_SYNAPSE_STAGE_STORAGE_INTEGRATION: ${{ vars.SNOWFLAKE_SYNAPSE_STAGE_STORAGE_INTEGRATION }} SNOWFLAKE_SYNAPSE_STAGE_URL: ${{ vars.SNOWFLAKE_SYNAPSE_STAGE_URL }} + STACK: ${{ vars.STACK }} steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v4 diff --git a/admin/integrations.sql b/admin/integrations.sql index a5f0a013..bb67584e 100644 --- a/admin/integrations.sql +++ b/admin/integrations.sql @@ -7,7 +7,7 @@ CREATE STORAGE INTEGRATION IF NOT EXISTS synapse_prod_warehouse_s3 TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = 'S3' ENABLED = TRUE - STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::325565585839:role/snowflake-accesss-SnowflakeServiceRole-HL66JOP7K4BT' + STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::325565585839:role/snowflake-access-SnowflakeServiceRole-2JSCDRkX8TcW' STORAGE_ALLOWED_LOCATIONS = ('s3://prod.datawarehouse.sagebase.org', 's3://prod.filehandles.sagebase.org'); -- DESC INTEGRATION synapse_prod_warehouse_s3; diff --git a/sage/portal_elt.py b/sage/portal_elt.py index d5345be1..6e427c5f 100644 --- a/sage/portal_elt.py +++ b/sage/portal_elt.py @@ -1,22 +1,30 @@ -from dotenv import dotenv_values import snowflake.connector from snowflake.connector.pandas_tools import write_pandas import synapseclient import pandas as pd +import configparser +import os +config = configparser.ConfigParser() +config.read(os.path.expanduser('~/.snowsql/config')) -syn = synapseclient.login() +snowflake_config = config['connections'] -config = dotenv_values("../.env") +syn = synapseclient.login() +# config = dotenv_values("../.env") +print(snowflake_config['username']) ctx = snowflake.connector.connect( - user=config['user'], - password=config['password'], - account=config['snowflake_account'], + user=snowflake_config['username'], + account=snowflake_config['accountname'], + authenticator=snowflake_config['authenticator'], database="sage", schema="portal_raw", role="SYSADMIN", - warehouse="compute_xsmall" + warehouse="compute_xsmall", + login_timeout=60, + network_timeout=30, + socket_timeout=10 ) cs = ctx.cursor() diff --git a/synapse_data_warehouse/schemachange-config.yml b/synapse_data_warehouse/schemachange-config.yml index 00f35e4f..d7a31670 100644 --- a/synapse_data_warehouse/schemachange-config.yml +++ b/synapse_data_warehouse/schemachange-config.yml @@ -12,6 +12,7 @@ vars: database_name: {{env_var('SNOWFLAKE_SYNAPSE_DATA_WAREHOUSE_DATABASE')}} stage_storage_integration: {{env_var('SNOWFLAKE_SYNAPSE_STAGE_STORAGE_INTEGRATION')}} stage_url: {{env_var('SNOWFLAKE_SYNAPSE_STAGE_URL')}} + stack: {{env_var('STACK')}} # secrets: # # not a good example of secrets, just here to demo the secret filtering # trips_s3_bucket: s3://snowflake-workshop-lab/citibike-trips diff --git a/synapse_data_warehouse/synapse_raw/V2.16.0__create_file_association_stages.sql b/synapse_data_warehouse/synapse_raw/V2.16.0__create_file_association_stages.sql new file mode 100644 index 00000000..d25d22c6 --- /dev/null +++ b/synapse_data_warehouse/synapse_raw/V2.16.0__create_file_association_stages.sql @@ -0,0 +1,7 @@ +USE SCHEMA {{database_name}}.synapse_raw; --noqa: PRS,TMP +CREATE STAGE IF NOT EXISTS synapse_filehandles_stage + STORAGE_INTEGRATION = {{stage_storage_integration}} --noqa: TMP + URL = 's3://{{stack}}.filehandles.sagebase.org/fileHandleAssociations/records/' --noqa: TMP + FILE_FORMAT = (TYPE = PARQUET COMPRESSION = AUTO) + DIRECTORY = (ENABLE = TRUE); +ALTER STAGE IF EXISTS synapse_filehandles_stage REFRESH; diff --git a/synapse_data_warehouse/synapse_raw/tables/V2.16.1__add_initial_file_association_table.sql b/synapse_data_warehouse/synapse_raw/tables/V2.16.1__add_initial_file_association_table.sql new file mode 100644 index 00000000..7ce2806b --- /dev/null +++ b/synapse_data_warehouse/synapse_raw/tables/V2.16.1__add_initial_file_association_table.sql @@ -0,0 +1,26 @@ +USE SCHEMA {{database_name}}.synapse_raw; --noqa: JJ01,PRS,TMP +USE WAREHOUSE COMPUTE_MEDIUM; +CREATE TABLE IF NOT EXISTS filehandleassociationsnapshots ( + associateid INT COMMENT 'The unique identifier of the Synapse object that wraps the file.', + associatetype STRING COMMENT 'The type of the Synapse object that wraps the file.', + filehandleid INT COMMENT 'The unique identifier of the file handle.', + instance STRING COMMENT 'The version of the stack that processed the file association.', + stack STRING COMMENT 'The stack (prod, dev) on which the file handle association processed.', + timestamp TIMESTAMP COMMENT 'The time when the association data was collected.' +) +COMMENT='The table contains file handle association records that are weekly scanned. A FileHandleAssociation record is a FileHandle (identified by its id) along with a Synapse object (identified by its id and type).' +CLUSTER BY (instance); + +copy into + filehandleassociationsnapshots +from ( + select + $1:associateid as associateid, + $1:associatetype as associatetype, + $1:filehandleid as filehandleid, + $1:instance as instance, + $1:stack as stack, + $1:timestamp as timestamp + from + @synapse_filehandles_stage --noqa: TMP +); diff --git a/synapse_data_warehouse/synapse_raw/tasks/V2.16.2__add_file_association_tasks.sql b/synapse_data_warehouse/synapse_raw/tasks/V2.16.2__add_file_association_tasks.sql new file mode 100644 index 00000000..28d94043 --- /dev/null +++ b/synapse_data_warehouse/synapse_raw/tasks/V2.16.2__add_file_association_tasks.sql @@ -0,0 +1,21 @@ +use role accountadmin; +use schema {{database_name}}.synapse_raw; --noqa: JJ01,PRS,TMP +alter task refresh_synapse_warehouse_s3_stage_task suspend; +create task if not exists append_to_filehandleassociationsnapshots_task + user_task_managed_initial_warehouse_size = 'SMALL' + AFTER refresh_synapse_warehouse_s3_stage_task +as + copy into + filehandleassociationsnapshots + from ( + select + $1:associateid as associateid, + $1:associatetype as associatetype, + $1:filehandleid as filehandleid, + $1:instance as instance, + $1:stack as stack, + $1:timestamp as timestamp + from + @synapse_filehandles_stage --noqa: TMP + ); +SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('refresh_synapse_warehouse_s3_stage_task');