Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test s3 filesystem #16

Merged
merged 3 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions cacholote/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,18 @@
import fsspec
import fsspec.implementations.dirfs

EXTENSIONS = MappingProxyType(
{
"application/x-netcdf": ".nc",
"application/x-grib": ".grib",
"application/vnd+zarr": ".zarr",
}
)
_SETTINGS: Dict[str, Any] = {
"cache_store_directory": os.path.join(tempfile.gettempdir(), "cacholote"),
"cache_files_urlpath": None,
"cache_files_storage_options": {},
"xarray_cache_type": "application/netcdf",
"xarray_cache_type": list(EXTENSIONS)[0],
"io_delete_original": False,
}

Expand All @@ -46,13 +53,6 @@ def _initialize_cache_store() -> None:

# Immutable settings to be used by other modules
SETTINGS = MappingProxyType(_SETTINGS)
EXTENSIONS = MappingProxyType(
{
"application/x-netcdf": ".nc",
"application/x-grib": ".grib",
"application/vnd+zarr": ".zarr",
}
)


class set:
Expand Down
2 changes: 1 addition & 1 deletion cacholote/extra_encoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def decode_xr_dataset(xr_json: Dict[str, Any]) -> "xr.Dataset":
fs = get_filesystem(xr_json["href"], xr_json["xarray:storage_options"])
filename_or_obj = fs.get_mapper(xr_json["href"])
else:
if "file:local_path":
if "file:local_path" in xr_json:
filename_or_obj = xr_json["file:local_path"]
else:
# Download local copy
Expand Down
2 changes: 2 additions & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ dependencies:
- dask
- diskcache
- fsspec
- moto
- netCDF4
- pip
- python-magic
- requests
- s3fs
- xarray>=2022.6.0
- zarr
- pip:
Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ strict = True
cacholote =
py.typed

[mypy-botocore.*]
ignore_missing_imports = True

[mypy-cfgrib.*]
ignore_missing_imports = True

Expand Down
164 changes: 164 additions & 0 deletions tests/test_60_cache_to_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import os
import shlex
import subprocess
import time
import typing
from typing import Any, Dict

import fsspec
import fsspec.implementations.local
import pytest

from cacholote import cache, config

try:
import xarray as xr
except ImportError:
pass

requests = pytest.importorskip("requests")
pytest.importorskip("moto")
pytest.importorskip("s3fs")


# TODO: See https://gist.github.com/michcio1234/7d72edc97bd751931aaf1952e4cb479c
# This is a workaround because moto.mock_s3 does not work.

PORT = 5555
ENDPOINT_URI = f"http://127.0.0.1:{PORT}/"


@typing.no_type_check
@pytest.fixture(scope="session")
def s3_base():
"""Run moto in server mode
This starts a local S3 server which we'll test against.
We must do this because if we try to use moto's s3_mock, problems with aiobotocore
arise. See https://github.com/aio-libs/aiobotocore/issues/755.
This and some other fixtures are taken from
https://github.com/fsspec/s3fs/blob/main/s3fs/tests/test_s3fs.py
"""

try:
# should fail since we didn't start server yet
r = requests.get(ENDPOINT_URI)
except: # noqa: E722
pass
else:
if r.ok:
raise RuntimeError("moto server already up")
if "AWS_SECRET_ACCESS_KEY" not in os.environ:
os.environ["AWS_SECRET_ACCESS_KEY"] = "foo"
if "AWS_ACCESS_KEY_ID" not in os.environ:
os.environ["AWS_ACCESS_KEY_ID"] = "foo"
proc = subprocess.Popen(shlex.split(f"moto_server s3 -p {PORT}"))

timeout = 5
while timeout > 0:
try:
r = requests.get(ENDPOINT_URI)
if r.ok:
break
except: # noqa: E722
pass
timeout -= 0.1
time.sleep(0.1)
yield
proc.terminate()
proc.wait()


@typing.no_type_check
def get_boto3_client():
from botocore.session import Session

# NB: we use the sync botocore client for setup
session = Session()
return session.create_client("s3", endpoint_url=ENDPOINT_URI)


@typing.no_type_check
@pytest.fixture()
def s3_config(s3_base):
"""Yields properly configured S3FileSystem instance + test bucket name"""
test_bucket_name = "test-bucket"
client = get_boto3_client()
client.create_bucket(
Bucket=test_bucket_name,
CreateBucketConfiguration={"LocationConstraint": "eu-central-1"},
)

s3 = fsspec.filesystem("s3", client_kwargs={"endpoint_url": ENDPOINT_URI})
s3.invalidate_cache()
yield {
"cache_files_urlpath": f"s3://{test_bucket_name}/",
"cache_files_storage_options": {
"client_kwargs": {"endpoint_url": ENDPOINT_URI}
},
}
s3.rm(f"s3://{test_bucket_name}/", recursive=True) # removes the bucket as well


@cache.cacheable
def io_cached_func(path: str) -> fsspec.implementations.local.LocalFileOpener:
return fsspec.open(path, "rb").open()


@cache.cacheable
def xr_cached_func(data_vars: Dict[str, Any]) -> "xr.Dataset":
return xr.Dataset(data_vars, attrs={})


@pytest.mark.parametrize("io_delete_original", [True, False])
def test_io_to_s3(
tmpdir: str, s3_config: fsspec.AbstractFileSystem, io_delete_original: bool
) -> None:
tmpfile = os.path.join(tmpdir, "test")
with open(tmpfile, "wb") as f:
f.write(b"test")
checksum = fsspec.filesystem("file").checksum(tmpfile)

# Create cache
with config.set(**s3_config, io_delete_original=io_delete_original):
res = io_cached_func(tmpfile)
assert res.read() == b"test"
assert res.path == f"test-bucket/{checksum}"
assert config.SETTINGS["cache_store"].stats() == (0, 1)
assert os.path.exists(tmpfile) is not io_delete_original

# Use cache
with config.set(**s3_config, io_delete_original=io_delete_original):
res = io_cached_func(tmpfile)
assert res.read() == b"test"
assert res.path == f"test-bucket/{checksum}"
assert config.SETTINGS["cache_store"].stats() == (1, 1)


@pytest.mark.parametrize(
"xarray_cache_type,extension",
[("application/x-netcdf", ".nc"), ("application/vnd+zarr", ".zarr")],
)
def test_xr_to_s3(
tmpdir: str,
s3_config: fsspec.AbstractFileSystem,
xarray_cache_type: str,
extension: str,
) -> None:
pytest.importorskip("xarray")

# Create cache
with config.set(**s3_config, xarray_cache_type=xarray_cache_type):
res = xr_cached_func({"foo": [0]})
fs = config.get_cache_files_dirfs()
xr.testing.assert_identical(res, xr.Dataset({"foo": [0]}))
assert config.SETTINGS["cache_store"].stats() == (0, 1)
assert fs.exists(f"247fd17e087ae491996519c097e70e48{extension}")
checksum = fs.checksum(f"247fd17e087ae491996519c097e70e48{extension}")

# Use cache
with config.set(**s3_config, xarray_cache_type=xarray_cache_type):
res = xr_cached_func({"foo": [0]})
fs = config.get_cache_files_dirfs()
xr.testing.assert_identical(res, xr.Dataset({"foo": [0]}))
assert config.SETTINGS["cache_store"].stats() == (1, 1)
assert fs.checksum(f"247fd17e087ae491996519c097e70e48{extension}") == checksum