Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Athena Connection Support #91

Merged
merged 15 commits into from
Mar 22, 2023
Merged

Add Athena Connection Support #91

merged 15 commits into from
Mar 22, 2023

Conversation

denimalpaca
Copy link
Contributor

Add an Athena URI builder to make_connection_string(), assuming for now that Athena is the only connection when an AWS connection type is given. This is an incorrect assumption, but we currently do not have asks for other use cases figuring out how to differentiate these may be a non-trivial issue.

Closes: #90

denimalpaca and others added 4 commits February 6, 2023 13:48
Add an Athena URI builder to make_connection_string(),
assuming for now that Athena is the only connection when an AWS
connection type is given. This is an incorrect assumption, but we
currently do not have asks for other use cases figuring out how
to differentiate these may be a non-trivial issue.

Signed-off-by: Benji Lampel <[email protected]>
…sure if this is correct use of params...

Signed-off-by: Benji Lampel <[email protected]>
@denimalpaca
Copy link
Contributor Author

@diman82 let me know if this is working for you and I'll merge+release

@diman82
Copy link

diman82 commented Feb 8, 2023

@denimalpaca no problem, it'll just take some time, as I'm facing another issue, that blocks me from testing (and I need to create a new environment for testing)

@deathwebo
Copy link

Hello, thank you for this PR! Just when I needed to add Athena validation to a project.

I have a particular use-case that is conflicting with my combination of parameters:

  GreatExpectationsOperator(
      task_id="my_gx_validation",
      data_asset_name="some_data_asset_name_not_important",
      query_to_validate="SELECT * FROM my_db.my_table WHERE dt = '2023-02-07-03",
      conn_id="aws_default",
      checkpoint_name="my_checkpoint",
      data_context_root_dir=ge_root_dir,
  )

I have the athena connection string specified in my datasources in the great_expectations config:

datasources:
  awsathena_datasource:
    module_name: great_expectations.datasource
    data_connectors:
      default_runtime_data_connector_name:
        module_name: great_expectations.datasource.data_connector
        batch_identifiers:
          - default_identifier_name
        class_name: RuntimeDataConnector
      default_inferred_data_connector_name:
        module_name: great_expectations.datasource.data_connector
        include_schema_name: true
        class_name: InferredAssetSqlDataConnector
    execution_engine:
      module_name: great_expectations.execution_engine
      connection_string: awsathena+rest://@athena.us-east-1.amazonaws.com?s3_staging_dir=s3://my-athena-results-bucket
      class_name: SqlAlchemyExecutionEngine
    class_name: Datasource

My checkpoint config:

name: my_checkpoint
config_version: 1.0
template_name:
module_name: great_expectations.checkpoint
class_name: Checkpoint
run_name_template: '%Y%m%d-%H%M%S-my-run-name-template'
expectation_suite_name: my_expectation_suite_name
batch_request: {}
action_list:
  - name: store_validation_result
    action:
      class_name: StoreValidationResultAction
  - name: store_evaluation_params
    action:
      class_name: StoreEvaluationParametersAction
  - name: update_data_docs
    action:
      class_name: UpdateDataDocsAction
      site_names: []
evaluation_parameters: {}
runtime_configuration: {}
validations:
  - batch_request:
      datasource_name: awsathena_datasource
      data_connector_name: default_inferred_data_connector_name
      data_asset_name: my_data_asset_name
      data_connector_query:
        index: -1
    expectation_suite_name: my_expectation_suite_name
profilers: []
ge_cloud_id:
expectation_suite_ge_cloud_id:

So, if I try to remove the data_asset_name from the operator parameters

  GreatExpectationsOperator(
      task_id="my_gx_validation",
      #data_asset_name="some_data_asset_name_not_important",
      query_to_validate="SELECT * FROM my_db.my_table WHERE dt = '2023-02-07-03",
      conn_id="aws_default",
      checkpoint_name="my_checkpoint",
      data_context_root_dir=ge_root_dir,
  )

in order to instead let the operator pick the existing datasource from the checkpoint, the operator fails at line 199 during the constructor validation:

        # A data asset name is also used to determine if a runtime env will be used; if it is not passed in,
        # then the data asset name is assumed to be configured in the data context passed in.
        if (self.is_dataframe or self.query_to_validate or self.conn_id) and not self.data_asset_name:
            raise ValueError("A data_asset_name must be specified with a runtime_data_source or conn_id.")

Is it possible to have this issue addressed in this PR ? Or maybe I'm not using the correct combination of parameters 😄

Thanks again for your work denimalpaca!

@diman82
Copy link

diman82 commented Feb 15, 2023

@denimalpaca OK, so I've setup the following code in my dag:

TABLE = "country_codes"
REGION = "us-west-2"
ATHENA_DB = "dbr"
S3_PATH = "s3://seekingalpha-data/aws-athena-query-results-744522205193-us-west-2"
base_path = Path(__file__).parents[2]
ge_root_dir = os.path.join(base_path, "include", "great_expectations")

with DAG(
    "great_expectations.athena",
    start_date=datetime(2023, 2, 2),
    description="Example DAG showcasing loading and data quality checking with Athena and Great Expectations.",
    doc_md=__doc__,
    schedule_interval=None,
    template_searchpath=os.path.join(sql_templates_dir, 'great_expectations'),
    catchup=False,
) as dag:
    ge_athena_validation = GreatExpectationsOperator(
        task_id="ge_athena_validation",
        data_context_root_dir=ge_root_dir,
        conn_id="aws_default",
        params={"region": REGION, "athena_db": ATHENA_DB, "s3_path": TABLE},
        expectation_suite_name="dbr.country_codes",
        data_asset_name=TABLE,
        fail_task_on_validation_failure=False,
    )
    chain(
        ge_athena_validation,
    )

And I get the following error message:

[2023-02-15, 14:01:29 UTC] {base.py:73} INFO - Using connection ID 'aws_default' for task execution.
[2023-02-15, 14:01:29 UTC] {taskinstance.py:1768} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/great_expectations/datasource/new_datasource.py", line 66, in __init__
    self._execution_engine = instantiate_class_from_config(
  File "/usr/local/lib/python3.9/site-packages/great_expectations/data_context/util.py", line 92, in instantiate_class_from_config
    class_instance = class_(**config_with_defaults)
  File "/usr/local/lib/python3.9/site-packages/great_expectations/execution_engine/sqlalchemy_execution_engine.py", line 328, in __init__
    self.engine = sa.create_engine(connection_string, **kwargs)
  File "<string>", line 2, in create_engine
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/deprecations.py", line 375, in warned
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/create.py", line 522, in create_engine
    entrypoint = u._get_entrypoint()
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/url.py", line 662, in _get_entrypoint
    cls = registry.load(name)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 343, in load
    raise exc.NoSuchModuleError(
**sqlalchemy.exc.NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:awsathena.rest**

@deathwebo
Copy link

@diman82 can you try installing the package "pyathena[SQLAlchemy]"

@denimalpaca
Copy link
Contributor Author

denimalpaca commented Feb 16, 2023

Hello, thank you for this PR! Just when I needed to add Athena validation to a project.

I have a particular use-case that is conflicting with my combination of parameters:

  GreatExpectationsOperator(
      task_id="my_gx_validation",
      data_asset_name="some_data_asset_name_not_important",
      query_to_validate="SELECT * FROM my_db.my_table WHERE dt = '2023-02-07-03",
      conn_id="aws_default",
      checkpoint_name="my_checkpoint",
      data_context_root_dir=ge_root_dir,
  )

I have the athena connection string specified in my datasources in the great_expectations config:

datasources:
  awsathena_datasource:
    module_name: great_expectations.datasource
    data_connectors:
      default_runtime_data_connector_name:
        module_name: great_expectations.datasource.data_connector
        batch_identifiers:
          - default_identifier_name
        class_name: RuntimeDataConnector
      default_inferred_data_connector_name:
        module_name: great_expectations.datasource.data_connector
        include_schema_name: true
        class_name: InferredAssetSqlDataConnector
    execution_engine:
      module_name: great_expectations.execution_engine
      connection_string: awsathena+rest://@athena.us-east-1.amazonaws.com?s3_staging_dir=s3://my-athena-results-bucket
      class_name: SqlAlchemyExecutionEngine
    class_name: Datasource

My checkpoint config:

name: my_checkpoint
config_version: 1.0
template_name:
module_name: great_expectations.checkpoint
class_name: Checkpoint
run_name_template: '%Y%m%d-%H%M%S-my-run-name-template'
expectation_suite_name: my_expectation_suite_name
batch_request: {}
action_list:
  - name: store_validation_result
    action:
      class_name: StoreValidationResultAction
  - name: store_evaluation_params
    action:
      class_name: StoreEvaluationParametersAction
  - name: update_data_docs
    action:
      class_name: UpdateDataDocsAction
      site_names: []
evaluation_parameters: {}
runtime_configuration: {}
validations:
  - batch_request:
      datasource_name: awsathena_datasource
      data_connector_name: default_inferred_data_connector_name
      data_asset_name: my_data_asset_name
      data_connector_query:
        index: -1
    expectation_suite_name: my_expectation_suite_name
profilers: []
ge_cloud_id:
expectation_suite_ge_cloud_id:

So, if I try to remove the data_asset_name from the operator parameters

  GreatExpectationsOperator(
      task_id="my_gx_validation",
      #data_asset_name="some_data_asset_name_not_important",
      query_to_validate="SELECT * FROM my_db.my_table WHERE dt = '2023-02-07-03",
      conn_id="aws_default",
      checkpoint_name="my_checkpoint",
      data_context_root_dir=ge_root_dir,
  )

in order to instead let the operator pick the existing datasource from the checkpoint, the operator fails at line 199 during the constructor validation:

        # A data asset name is also used to determine if a runtime env will be used; if it is not passed in,
        # then the data asset name is assumed to be configured in the data context passed in.
        if (self.is_dataframe or self.query_to_validate or self.conn_id) and not self.data_asset_name:
            raise ValueError("A data_asset_name must be specified with a runtime_data_source or conn_id.")

Is it possible to have this issue addressed in this PR ? Or maybe I'm not using the correct combination of parameters 😄

Thanks again for your work denimalpaca!

Hey @deathwebo , in this case you should remove the conn_id, as that is only supposed to be specified if you aren't using a checkpoint or data context config. The addition of the conn_id param was to allow users who are primarily Airflow users to not have to write checkpoint configs or data context configs.

Or, conversely, remove the checkpoint_name and let the operator build the data sources and checkpoint for you. This will ignore the datasource you wrote above.

@denimalpaca
Copy link
Contributor Author

@diman82 have you had a chance to test this?

@diman82
Copy link

diman82 commented Mar 1, 2023

@denimalpaca Sorry, awas too busy last 2 weeks.
I've started testing it, seems to be working fine, but I need a couple of more days to verify and get back with confirmation/additional findings..

@denimalpaca
Copy link
Contributor Author

denimalpaca commented Mar 13, 2023

@diman82 any news here? Would love to merge this PR and do a release.

denimalpaca and others added 11 commits March 21, 2023 09:13
Add an Athena URI builder to make_connection_string(),
assuming for now that Athena is the only connection when an AWS
connection type is given. This is an incorrect assumption, but we
currently do not have asks for other use cases figuring out how
to differentiate these may be a non-trivial issue.

Signed-off-by: Benji Lampel <[email protected]>
…sure if this is correct use of params...

Signed-off-by: Benji Lampel <[email protected]>
Signed-off-by: Benji Lampel <[email protected]>
Signed-off-by: Benji Lampel <[email protected]>
Signed-off-by: Benji Lampel <[email protected]>
@denimalpaca denimalpaca merged commit 9291e1f into main Mar 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

add support for Athena (aws_default) airflow connection
3 participants