From 50480099c25d2b8eca22350abb1af946565b7dc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josef=20=C5=A0im=C3=A1nek?= Date: Mon, 16 Feb 2026 13:37:47 +0100 Subject: [PATCH] Pass Credentials object to GCSFileSystem for automatic token refresh Previously, get_fs() extracted a static access token string via _get_access_token() and passed it to GCSFileSystem. This token expires after ~1 hour with no way to refresh, causing 401 errors in long-running tasks. By passing the Credentials object directly, gcsfs can automatically refresh the token before each request via its built-in maybe_refresh() mechanism. --- .../airflow/providers/google/cloud/fs/gcs.py | 2 +- .../tests/unit/google/cloud/fs/__init__.py | 16 ++++ .../tests/unit/google/cloud/fs/test_gcs.py | 82 +++++++++++++++++++ 3 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 providers/google/tests/unit/google/cloud/fs/__init__.py create mode 100644 providers/google/tests/unit/google/cloud/fs/test_gcs.py diff --git a/providers/google/src/airflow/providers/google/cloud/fs/gcs.py b/providers/google/src/airflow/providers/google/cloud/fs/gcs.py index b10aa6ef9f591..ec1286f4ae9ea 100644 --- a/providers/google/src/airflow/providers/google/cloud/fs/gcs.py +++ b/providers/google/src/airflow/providers/google/cloud/fs/gcs.py @@ -51,7 +51,7 @@ def get_fs(conn_id: str | None, storage_options: dict[str, str] | None = None) - options = { "project": g.project_id, "access": g.extras.get(GCS_ACCESS, "full_control"), - "token": g._get_access_token(), + "token": g.get_credentials(), "consistency": g.extras.get(GCS_CONSISTENCY, "none"), "cache_timeout": g.extras.get(GCS_CACHE_TIMEOUT), "requester_pays": g.extras.get(GCS_REQUESTER_PAYS, False), diff --git a/providers/google/tests/unit/google/cloud/fs/__init__.py b/providers/google/tests/unit/google/cloud/fs/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/google/tests/unit/google/cloud/fs/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/google/tests/unit/google/cloud/fs/test_gcs.py b/providers/google/tests/unit/google/cloud/fs/test_gcs.py new file mode 100644 index 0000000000000..70e6f0fc32100 --- /dev/null +++ b/providers/google/tests/unit/google/cloud/fs/test_gcs.py @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +pytest.importorskip("gcsfs") + +TEST_CONN = "google_cloud_test_conn" + + +@pytest.fixture(scope="module", autouse=True) +def _setup_connections(): + with pytest.MonkeyPatch.context() as mp_ctx: + mp_ctx.setenv(f"AIRFLOW_CONN_{TEST_CONN}".upper(), "google-cloud-platform://") + yield + + +class TestGCSFilesystem: + @patch("airflow.providers.google.cloud.fs.gcs.GoogleBaseHook") + @patch("gcsfs.GCSFileSystem") + def test_get_fs_passes_credentials_object(self, mock_gcsfs, mock_hook): + """Test that get_fs passes a Credentials object to GCSFileSystem.""" + from airflow.providers.google.cloud.fs.gcs import get_fs + + mock_credentials = MagicMock() + mock_hook_instance = MagicMock() + mock_hook_instance.get_credentials.return_value = mock_credentials + mock_hook_instance.project_id = "test-project" + mock_hook_instance.extras = {} + mock_hook.return_value = mock_hook_instance + + get_fs(conn_id=TEST_CONN) + + mock_hook_instance.get_credentials.assert_called_once() + call_kwargs = mock_gcsfs.call_args.kwargs + assert call_kwargs["token"] is mock_credentials + + @patch("gcsfs.GCSFileSystem") + def test_get_fs_no_conn_id(self, mock_gcsfs): + """Test that get_fs works without conn_id.""" + from airflow.providers.google.cloud.fs.gcs import get_fs + + get_fs(conn_id=None) + + mock_gcsfs.assert_called_once_with() + + @patch("airflow.providers.google.cloud.fs.gcs.GoogleBaseHook") + @patch("gcsfs.GCSFileSystem") + def test_get_fs_with_anonymous_credentials(self, mock_gcsfs, mock_hook): + """Test that get_fs works with anonymous credentials.""" + from google.auth.credentials import AnonymousCredentials + + from airflow.providers.google.cloud.fs.gcs import get_fs + + anonymous_creds = AnonymousCredentials() + mock_hook_instance = MagicMock() + mock_hook_instance.get_credentials.return_value = anonymous_creds + mock_hook_instance.project_id = None + mock_hook_instance.extras = {} + mock_hook.return_value = mock_hook_instance + + get_fs(conn_id=TEST_CONN) + + call_kwargs = mock_gcsfs.call_args.kwargs + assert isinstance(call_kwargs["token"], AnonymousCredentials)