-
Notifications
You must be signed in to change notification settings - Fork 16.5k
AIP-58: Add Airflow ObjectStore (AFS) #34729
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
83 commits
Select commit
Hold shift + click to select a range
0c456ee
IO
bolkedebruin d9c92f0
Further work
bolkedebruin bda96de
Add Airflow FS
bolkedebruin c064314
Add fsspec dependencies
bolkedebruin 29bf6b8
Move stuff to provider packages
bolkedebruin ad22ed9
Add fsspec
bolkedebruin 58753fa
Use provider style plugins
bolkedebruin 0c4700c
Add plugin registration
bolkedebruin f86c090
Move exception inline
bolkedebruin c7774bd
Clean ups
bolkedebruin 133fc4e
Make FileIO work with connection ids
bolkedebruin ea5584a
Add simple mounts
bolkedebruin 10d7582
Add simple combinations
bolkedebruin 6082452
Allow unmount to use str or Mount
bolkedebruin 11a1f2d
Pre commit stuff - what a mess that creates :/
bolkedebruin 3d57e55
PY38 fixes
bolkedebruin fbb151b
Address pre-commit
bolkedebruin ba88b12
Support contexts and PathLib concatenation
bolkedebruin 390c80d
Add s3fs to devel
bolkedebruin e8331cc
Use deterministic endpoints and generate fsid if not available
bolkedebruin 3d1595f
Fix table
bolkedebruin 48f47b4
Use PathLike object
bolkedebruin b2f5e26
Fix mypy and test
bolkedebruin 772b754
Simplify implementaton
bolkedebruin 8a97977
Use ObjectStoragePath directly
bolkedebruin 0a46254
Fix docstrings
bolkedebruin b6b92b2
Check if samestore (maybe just switch to ObjectStorePath copy)
bolkedebruin fdcd1ba
Use class name instead of type
bolkedebruin b62778f
Use shutil for copying between stores
bolkedebruin 45c8183
Make sure to set alias only when not specified
bolkedebruin 1c1c2a9
Use backing copy
bolkedebruin 328eb00
Fix test
bolkedebruin 95da859
Fix test
bolkedebruin 6a1b525
Implement caching for filesystems
bolkedebruin 41785d0
Move FileTransferOperator to provider package
bolkedebruin 44a03ae
Pin dependencies
bolkedebruin bd8d091
Pin aiobotocore until new release of fsspec
bolkedebruin 6654d59
Address version name
bolkedebruin 7df70f3
Don't copy paste too much
bolkedebruin cb6f442
Use aws infrastructure for getting a session
bolkedebruin 1e40842
Make sure endpoint_url is honored
bolkedebruin 42e897a
Remove s3fs from main and keep in provider
bolkedebruin c41dcfd
Use service config
bolkedebruin 42908de
remove s3fs when testing aws
bolkedebruin 94ec2e3
Make sure prod can build
bolkedebruin 6a5de18
Fix tests to not depend on s3fs
bolkedebruin 9d7fcce
Readd s3fs to setup.py
bolkedebruin 8b3b194
Fix issues with docs
bolkedebruin caf2253
fix link
bolkedebruin 841dbdc
Add example dag
bolkedebruin 96516af
Optimize copy
bolkedebruin f085180
Extra
bolkedebruin 6dc8c1a
Regen images
bolkedebruin 73a9f57
Fix docs
bolkedebruin 74ae073
Update tests not to be dependent on s3fs
bolkedebruin b2fd6fc
Moved example test
bolkedebruin 6dae720
Add stat_result as a way to unify info and traditional stat_result
bolkedebruin 7b6a71e
Clean up
bolkedebruin 1f125da
Add extra docs
bolkedebruin 91e0e94
Fix docs
bolkedebruin acf81f5
Add words
bolkedebruin 671924c
Improve copying
bolkedebruin b1a4cbb
Upgrade fsspec and relax aiobotocore requirements
bolkedebruin 4dea223
Update docs/apache-airflow/core-concepts/objectstorage.rst
bolkedebruin 002de89
Update docs/apache-airflow/core-concepts/objectstorage.rst
bolkedebruin 159c908
Revert "Upgrade fsspec and relax aiobotocore requirements"
bolkedebruin 958eb51
Relax aiobotocore
bolkedebruin ec870c6
Make copy work as expected
bolkedebruin a58000d
Revert "Relax aiobotocore"
bolkedebruin b217eea
Make rename work only within same store
bolkedebruin 855485a
Fix tests
bolkedebruin d07d044
Fix test not te reuse alias
bolkedebruin ed2f928
Ensure templated fields for xcom
bolkedebruin cb8e4a4
Improve handling of existing directories
bolkedebruin 980c3ee
Set aiobotocore to 2.7.0
bolkedebruin 1f26001
Allow larger versions of aiobotocore
bolkedebruin 8f8912b
Update airflow/providers/amazon/aws/fs/s3.py
bolkedebruin d7fc935
Add tests for s3fs
bolkedebruin 1af0dc3
Add example dag
bolkedebruin 836131d
Improve example
bolkedebruin 2e1cba6
Improve example
bolkedebruin 88b9216
Add tutorial and improve docs
bolkedebruin 5099f45
Add extra
bolkedebruin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,6 +48,7 @@ body: | |
| - celery | ||
| - cloudant | ||
| - cncf-kubernetes | ||
| - common-io | ||
| - common-sql | ||
| - daskexecutor | ||
| - databricks | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,135 @@ | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| from __future__ import annotations | ||
|
|
||
| # [START tutorial] | ||
| # [START import_module] | ||
| import pendulum | ||
| import requests | ||
|
|
||
| from airflow.decorators import dag, task | ||
| from airflow.io.store.path import ObjectStoragePath | ||
|
|
||
| # [END import_module] | ||
|
|
||
| API = "https://opendata.fmi.fi/timeseries" | ||
|
|
||
| aq_fields = { | ||
| "fmisid": "int32", | ||
| "time": "datetime64[ns]", | ||
| "AQINDEX_PT1H_avg": "float64", | ||
| "PM10_PT1H_avg": "float64", | ||
| "PM25_PT1H_avg": "float64", | ||
| "O3_PT1H_avg": "float64", | ||
| "CO_PT1H_avg": "float64", | ||
| "SO2_PT1H_avg": "float64", | ||
| "NO2_PT1H_avg": "float64", | ||
| "TRSC_PT1H_avg": "float64", | ||
| } | ||
|
|
||
| # [START create_object_storage_path] | ||
| base = ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default") | ||
| # [END create_object_storage_path] | ||
|
|
||
|
|
||
| # [START instantiate_dag] | ||
| @dag( | ||
| schedule=None, | ||
| start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), | ||
| catchup=False, | ||
| tags=["example"], | ||
| ) | ||
| def tutorial_objectstorage(): | ||
| """ | ||
| ### Object Storage Tutorial Documentation | ||
| This is a tutorial DAG to showcase the usage of the Object Storage API. | ||
| Documentation that goes along with the Airflow Object Storage tutorial is | ||
| located | ||
| [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html) | ||
| """ | ||
| # [END instantiate_dag] | ||
| import duckdb | ||
| import pandas as pd | ||
|
|
||
| # [START get_air_quality_data] | ||
| @task | ||
| def get_air_quality_data(**kwargs) -> ObjectStoragePath: | ||
| """ | ||
| #### Get Air Quality Data | ||
| This task gets air quality data from the Finnish Meteorological Institute's | ||
| open data API. The data is saved as parquet. | ||
| """ | ||
| execution_date = kwargs["logical_date"] | ||
| start_time = kwargs["data_interval_start"] | ||
|
|
||
| params = { | ||
| "format": "json", | ||
| "precision": "double", | ||
| "groupareas": "0", | ||
| "producer": "airquality_urban", | ||
| "area": "Uusimaa", | ||
| "param": ",".join(aq_fields.keys()), | ||
| "starttime": start_time.isoformat(timespec="seconds"), | ||
| "endtime": execution_date.isoformat(timespec="seconds"), | ||
| "tz": "UTC", | ||
| } | ||
|
|
||
| response = requests.get(API, params=params) | ||
| response.raise_for_status() | ||
|
|
||
| # ensure the bucket exists | ||
| base.mkdir(exists_ok=True) | ||
|
|
||
| formatted_date = execution_date.format("YYYYMMDD") | ||
| path = base / f"air_quality_{formatted_date}.parquet" | ||
|
|
||
| df = pd.DataFrame(response.json()).astype(aq_fields) | ||
| with path.open("wb") as file: | ||
| df.to_parquet(file) | ||
|
|
||
| return path | ||
|
|
||
| # [END get_air_quality_data] | ||
|
|
||
| # [START analyze] | ||
| @task | ||
| def analyze(path: ObjectStoragePath, **kwargs): | ||
| """ | ||
| #### Analyze | ||
| This task analyzes the air quality data, prints the results | ||
| """ | ||
| conn = duckdb.connect(database=":memory:") | ||
| conn.register_filesystem(path.fs) | ||
| conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')") | ||
|
|
||
| df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf() | ||
|
|
||
| print(df2.head()) | ||
|
|
||
| # [END analyze] | ||
|
|
||
| # [START main_flow] | ||
| obj_path = get_air_quality_data() | ||
| analyze(obj_path) | ||
| # [END main_flow] | ||
|
|
||
|
|
||
| # [START dag_invocation] | ||
| tutorial_objectstorage() | ||
| # [END dag_invocation] | ||
| # [END tutorial] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Callable, | ||
| ) | ||
|
|
||
| from fsspec.implementations.local import LocalFileSystem | ||
|
|
||
| from airflow.compat.functools import cache | ||
| from airflow.providers_manager import ProvidersManager | ||
| from airflow.stats import Stats | ||
| from airflow.utils.module_loading import import_string | ||
|
|
||
| if TYPE_CHECKING: | ||
| from fsspec import AbstractFileSystem | ||
|
|
||
| log = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def _file(_: str | None) -> LocalFileSystem: | ||
| return LocalFileSystem() | ||
|
|
||
|
|
||
| # builtin supported filesystems | ||
| _BUILTIN_SCHEME_TO_FS: dict[str, Callable[[str | None], AbstractFileSystem]] = { | ||
| "file": _file, | ||
| } | ||
|
|
||
|
|
||
| @cache | ||
| def _register_filesystems() -> dict[str, Callable[[str | None], AbstractFileSystem]]: | ||
bolkedebruin marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| scheme_to_fs = _BUILTIN_SCHEME_TO_FS.copy() | ||
| with Stats.timer("airflow.io.load_filesystems") as timer: | ||
| manager = ProvidersManager() | ||
| for fs_module_name in manager.filesystem_module_names: | ||
| fs_module = import_string(fs_module_name) | ||
| for scheme in getattr(fs_module, "schemes", []): | ||
| if scheme in scheme_to_fs: | ||
| log.warning("Overriding scheme %s for %s", scheme, fs_module_name) | ||
|
|
||
| method = getattr(fs_module, "get_fs", None) | ||
| if method is None: | ||
| raise ImportError(f"Filesystem {fs_module_name} does not have a get_fs method") | ||
| scheme_to_fs[scheme] = method | ||
|
|
||
| log.debug("loading filesystems from providers took %.3f seconds", timer.duration) | ||
| return scheme_to_fs | ||
|
|
||
|
|
||
| def get_fs(scheme: str, conn_id: str | None = None) -> AbstractFileSystem: | ||
| """ | ||
| Get a filesystem by scheme. | ||
|
|
||
| :param scheme: the scheme to get the filesystem for | ||
| :return: the filesystem method | ||
| :param conn_id: the airflow connection id to use | ||
| """ | ||
| filesystems = _register_filesystems() | ||
| try: | ||
| return filesystems[scheme](conn_id) | ||
| except KeyError: | ||
| raise ValueError(f"No filesystem registered for scheme {scheme}") | ||
|
|
||
|
|
||
| def has_fs(scheme: str) -> bool: | ||
| """ | ||
| Check if a filesystem is available for a scheme. | ||
|
|
||
| :param scheme: the scheme to check | ||
| :return: True if a filesystem is available for the scheme | ||
| """ | ||
| return scheme in _register_filesystems() | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.