diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py index 5a10aa7a32506..fa4f523ea448f 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py @@ -89,6 +89,8 @@ class GCSToGCSOperator(BaseOperator): account from the list granting this role to the originating account (templated). :param source_object_required: Whether you want to raise an exception when the source object doesn't exist. It doesn't have any effect when the source objects are folders or patterns. + :param exact_match: When specified, only exact match of the source object (filename) will be + copied. :Example: @@ -189,6 +191,7 @@ def __init__( is_older_than=None, impersonation_chain: Optional[Union[str, Sequence[str]]] = None, source_object_required=False, + exact_match=False, **kwargs, ): super().__init__(**kwargs) @@ -208,6 +211,7 @@ def __init__( self.is_older_than = is_older_than self.impersonation_chain = impersonation_chain self.source_object_required = source_object_required + self.exact_match = exact_match def execute(self, context: 'Context'): @@ -341,6 +345,8 @@ def _copy_source_without_wildcard(self, hook, prefix): raise AirflowException(msg) for source_obj in objects: + if self.exact_match and (source_obj != prefix or not source_obj.endswith(prefix)): + continue if self.destination_object is None: destination_object = source_obj else: diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index 7d5af935eab41..61186591f35a1 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -121,6 +121,28 @@ def test_execute_no_wildcard_with_replace_flag_false(self, mock_hook): ] mock_hook.return_value.list.assert_has_calls(mock_calls) + @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook') + def test_copy_file_with_exact_match(self, mock_hook): + SOURCE_FILES = [ + 'test_object.txt', + 'test_object.txt.copy/', + 'test_object.txt.folder/', + ] + mock_hook.return_value.list.return_value = SOURCE_FILES + operator = GCSToGCSOperator( + task_id=TASK_ID, + source_bucket=TEST_BUCKET, + source_object=SOURCE_OBJECT_NO_WILDCARD, + destination_bucket=DESTINATION_BUCKET, + exact_match=True, + ) + + operator.execute(None) + mock_calls = [ + mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None), + ] + mock_hook.return_value.list.assert_has_calls(mock_calls) + @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook') def test_execute_prefix_and_suffix(self, mock_hook): operator = GCSToGCSOperator(