Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions airflow/providers/google/cloud/transfers/gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class GCSToGCSOperator(BaseOperator):
account from the list granting this role to the originating account (templated).
:param source_object_required: Whether you want to raise an exception when the source object
doesn't exist. It doesn't have any effect when the source objects are folders or patterns.
:param exact_match: When specified, only exact match of the source object (filename) will be
copied.

:Example:

Expand Down Expand Up @@ -189,6 +191,7 @@ def __init__(
is_older_than=None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
source_object_required=False,
exact_match=False,
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -208,6 +211,7 @@ def __init__(
self.is_older_than = is_older_than
self.impersonation_chain = impersonation_chain
self.source_object_required = source_object_required
self.exact_match = exact_match

def execute(self, context: 'Context'):

Expand Down Expand Up @@ -341,6 +345,8 @@ def _copy_source_without_wildcard(self, hook, prefix):
raise AirflowException(msg)

for source_obj in objects:
if self.exact_match and (source_obj != prefix or not source_obj.endswith(prefix)):
continue
Comment on lines +348 to +349
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m probably misunderstanding this; shouldn’t prefix be compared with startswith instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr That's the issue. According to this issue #22675 , treating these objects as prefix copies objects from the source bucket that has no match with the specified source_object. So, checking not source_obj.endswith(prefix) makes sure we only copy the exact match and not copying all objects with the same prefix as the source_objext

Copy link
Member

@potiuk potiuk Jun 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I see the point. The object store semantics is funny because it resembles filesystem one, but it is in fact object name, so "prefix" can often mean wrong things, for example:

  • test_file.zip - might be an object (file) stored
  • test_file.zip/another_file.zip - > might be ANOTHER object in something that looks like is in a "test_file.zip" folder.

This is not like that in "real filesystem", you cannot have "file" and "folder" with the same name. That's why "exact-match" is sometimes the only choice as you have no other way to skip the "nested" object..

if self.destination_object is None:
destination_object = source_obj
else:
Expand Down
22 changes: 22 additions & 0 deletions tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,28 @@ def test_execute_no_wildcard_with_replace_flag_false(self, mock_hook):
]
mock_hook.return_value.list.assert_has_calls(mock_calls)

@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
def test_copy_file_with_exact_match(self, mock_hook):
SOURCE_FILES = [
'test_object.txt',
'test_object.txt.copy/',
'test_object.txt.folder/',
]
mock_hook.return_value.list.return_value = SOURCE_FILES
operator = GCSToGCSOperator(
task_id=TASK_ID,
source_bucket=TEST_BUCKET,
source_object=SOURCE_OBJECT_NO_WILDCARD,
destination_bucket=DESTINATION_BUCKET,
exact_match=True,
)

operator.execute(None)
mock_calls = [
mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None),
]
mock_hook.return_value.list.assert_has_calls(mock_calls)

@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
def test_execute_prefix_and_suffix(self, mock_hook):
operator = GCSToGCSOperator(
Expand Down