Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Prerequisite Tasks
Use the :class:`~airflow.providers.google.cloud.transfers.s3_to_gcs.S3ToGCSOperator`
to transfer data from Amazon S3 to Google Cloud Storage.

.. exampleinclude::/../airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py
.. exampleinclude::/../tests/system/providers/google/cloud/gcs/example_s3_to_gcs.py
:language: python
:start-after: [START howto_transfer_s3togcs_operator]
:end-before: [END howto_transfer_s3togcs_operator]
Expand Down
51 changes: 0 additions & 51 deletions tests/providers/google/cloud/transfers/test_s3_to_gcs_system.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.utils.trigger_rule import TriggerRule

GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'gcp-project-id')
S3BUCKET_NAME = os.environ.get('S3BUCKET_NAME', 'example-s3bucket-name')
GCS_BUCKET = os.environ.get('GCP_GCS_BUCKET', 'example-gcsbucket-name')
GCS_BUCKET_URL = f"gs://{GCS_BUCKET}/"
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
DAG_ID = "example_s3_to_gcs"

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
GCS_BUCKET_URL = f"gs://{BUCKET_NAME}/"
UPLOAD_FILE = '/tmp/example-file.txt'
PREFIX = 'TESTS'

Expand All @@ -37,42 +40,62 @@
def upload_file():
"""A callable to upload file to AWS bucket"""
s3_hook = S3Hook()
s3_hook.load_file(filename=UPLOAD_FILE, key=PREFIX, bucket_name=S3BUCKET_NAME)
s3_hook.load_file(filename=UPLOAD_FILE, key=PREFIX, bucket_name=BUCKET_NAME)


with models.DAG(
'example_s3_to_gcs',
DAG_ID,
schedule_interval='@once',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
tags=['example', 's3'],
) as dag:
create_s3_bucket = S3CreateBucketOperator(
task_id="create_s3_bucket", bucket_name=S3BUCKET_NAME, region_name='us-east-1'
task_id="create_s3_bucket", bucket_name=BUCKET_NAME, region_name='us-east-1'
)

create_gcs_bucket = GCSCreateBucketOperator(
task_id="create_bucket",
bucket_name=GCS_BUCKET,
bucket_name=BUCKET_NAME,
project_id=GCP_PROJECT_ID,
)
# [START howto_transfer_s3togcs_operator]
transfer_to_gcs = S3ToGCSOperator(
task_id='s3_to_gcs_task', bucket=S3BUCKET_NAME, prefix=PREFIX, dest_gcs=GCS_BUCKET_URL
task_id='s3_to_gcs_task', bucket=BUCKET_NAME, prefix=PREFIX, dest_gcs=GCS_BUCKET_URL
)
# [END howto_transfer_s3togcs_operator]

delete_s3_bucket = S3DeleteBucketOperator(
task_id='delete_s3_bucket', bucket_name=S3BUCKET_NAME, force_delete=True
task_id='delete_s3_bucket',
bucket_name=BUCKET_NAME,
force_delete=True,
trigger_rule=TriggerRule.ALL_DONE,
)

delete_gcs_bucket = GCSDeleteBucketOperator(task_id='delete_gcs_bucket', bucket_name=GCS_BUCKET)
delete_gcs_bucket = GCSDeleteBucketOperator(
task_id='delete_gcs_bucket', bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

(
create_s3_bucket
# TEST SETUP
create_gcs_bucket
>> create_s3_bucket
>> upload_file()
>> create_gcs_bucket
# TEST BODY
>> transfer_to_gcs
# TEST TEARDOWN
>> delete_s3_bucket
>> delete_gcs_bucket
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()


from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)