From 4e5840053d84750bf1af0f952abffb5c31f800e5 Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Mon, 27 Jan 2025 15:50:58 +0000 Subject: [PATCH 01/18] single-threaded processing for area selector --- cads_adaptors/tools/area_selector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index b4a3db0d..2c81f184 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -282,7 +282,7 @@ def area_selector_path( for var in ds_area.variables: ds_area[var].encoding.setdefault("_FillValue", None) # Need to compute before writing to disk as dask loses too many jobs - ds_area.compute().to_netcdf(out_path) + ds_area.to_netcdf(out_path) out_paths.append(out_path) else: context.add_user_visible_error( @@ -304,7 +304,7 @@ def area_selector_paths( context: Context = Context(), **kwargs: Any, ) -> list[str]: - with dask.config.set(scheduler="threads"): + with dask.config.set(scheduler="single-threaded"): # We try to select the area for all paths, if any fail we return the original paths out_paths = [] for path in paths: From 3fcb607f6dd62a3578c0f35b5c3b018c069b43aa Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Mon, 27 Jan 2025 15:51:15 +0000 Subject: [PATCH 02/18] single-threaded processing for area selector --- cads_adaptors/tools/area_selector.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index 2c81f184..133a989a 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -281,7 +281,6 @@ def area_selector_path( out_path = os.path.join(target_dir, f"{fname_tag}.nc") for var in ds_area.variables: ds_area[var].encoding.setdefault("_FillValue", None) - # Need to compute before writing to disk as dask loses too many jobs ds_area.to_netcdf(out_path) out_paths.append(out_path) else: From ba380abbbebfce89082f7e58543ecc6d7a65f9cb Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Thu, 30 Jan 2025 08:39:47 +0000 Subject: [PATCH 03/18] upload_time hacky message --- cads_adaptors/tools/area_selector.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index 6585a393..6ed4f175 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -303,7 +303,9 @@ def area_selector_paths( context: Context = Context(), **kwargs: Any, ) -> list[str]: + import time with dask.config.set(scheduler="single-threaded"): + time0 = time.time() # We try to select the area for all paths, if any fail we return the original paths out_paths = [] for path in paths: @@ -316,4 +318,5 @@ def area_selector_paths( f"could not convert {path} to xarray; returning the original data" ) out_paths.append(path) + context.info("Area selection complete", upload_time=time.time() - time0) return out_paths From 6b5a1c9f2d5e998409b4a28aaf51883757903d70 Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Thu, 30 Jan 2025 09:53:31 +0000 Subject: [PATCH 04/18] upload_time hacky message --- cads_adaptors/tools/area_selector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index 6ed4f175..afa7aab7 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -318,5 +318,5 @@ def area_selector_paths( f"could not convert {path} to xarray; returning the original data" ) out_paths.append(path) - context.info("Area selection complete", upload_time=time.time() - time0) + context.upload_log("Area selection complete", upload_time=time.time() - time0) return out_paths From 7ca52dd587736b0b9e644819b4cd7c0c3fea27b4 Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Thu, 30 Jan 2025 11:20:40 +0000 Subject: [PATCH 05/18] upload_time hacky message --- cads_adaptors/tools/area_selector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index afa7aab7..26df7853 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -318,5 +318,5 @@ def area_selector_paths( f"could not convert {path} to xarray; returning the original data" ) out_paths.append(path) - context.upload_log("Area selection complete", upload_time=time.time() - time0) + context.logger.info("Area selection complete", upload_time=time.time() - time0) return out_paths From 56f4e65dbfe2ffaf1618a38671c907169bcf5daa Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Thu, 30 Jan 2025 11:48:39 +0000 Subject: [PATCH 06/18] upload_time hacky message --- cads_adaptors/tools/area_selector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index 26df7853..7837494f 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -318,5 +318,5 @@ def area_selector_paths( f"could not convert {path} to xarray; returning the original data" ) out_paths.append(path) - context.logger.info("Area selection complete", upload_time=time.time() - time0) + context.logger.info("Area selection complete", delta_time=time.time() - time0) return out_paths From 2a69f180dc0ec95a482e6522bd6b58dde9e9de3f Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Thu, 30 Jan 2025 11:52:51 +0000 Subject: [PATCH 07/18] delta_time message --- cads_adaptors/tools/area_selector.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index 7837494f..8c5736e2 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -318,5 +318,6 @@ def area_selector_paths( f"could not convert {path} to xarray; returning the original data" ) out_paths.append(path) - context.logger.info("Area selection complete", delta_time=time.time() - time0) + context.logger.info("1. context.logger.info Area selection complete", delta_time=time.time() - time0) + context.info("2. context.info Area selection complete", delta_time=time.time() - time0) return out_paths From 437dc53f4580e4bbda568e1fd373cab87bcd73f2 Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Thu, 30 Jan 2025 13:29:11 +0000 Subject: [PATCH 08/18] removing extra logs --- cads_adaptors/tools/area_selector.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index 8c5736e2..7837494f 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -318,6 +318,5 @@ def area_selector_paths( f"could not convert {path} to xarray; returning the original data" ) out_paths.append(path) - context.logger.info("1. context.logger.info Area selection complete", delta_time=time.time() - time0) - context.info("2. context.info Area selection complete", delta_time=time.time() - time0) + context.logger.info("Area selection complete", delta_time=time.time() - time0) return out_paths From 068a75d594df3c36e6496ebc299ae19b8a65cf52 Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Thu, 30 Jan 2025 14:20:38 +0000 Subject: [PATCH 09/18] DEBUG --- cads_adaptors/tools/area_selector.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index 7837494f..d390a3ab 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -256,6 +256,7 @@ def area_selector_path( open_datasets_kwargs.setdefault("decode_times", False) open_datasets_kwargs.setdefault("chunks", -1) + print(open_datasets_kwargs) # open_kwargs = ds_dict = convertors.open_file_as_xarray_dictionary( infile, @@ -265,6 +266,7 @@ def area_selector_path( "open_datasets_kwargs": open_datasets_kwargs, }, ) + print(ds_dict) ds_area_dict = { ".".join( From 8c0ba502c9839d074ed1e09163fe2fdeb619782b Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Thu, 30 Jan 2025 14:21:22 +0000 Subject: [PATCH 10/18] DEBUG --- cads_adaptors/tools/area_selector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index d390a3ab..7c1b4540 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -256,7 +256,7 @@ def area_selector_path( open_datasets_kwargs.setdefault("decode_times", False) open_datasets_kwargs.setdefault("chunks", -1) - print(open_datasets_kwargs) + print(f"ECP DEBUG 2: {open_datasets_kwargs}") # open_kwargs = ds_dict = convertors.open_file_as_xarray_dictionary( infile, @@ -266,7 +266,7 @@ def area_selector_path( "open_datasets_kwargs": open_datasets_kwargs, }, ) - print(ds_dict) + print(f"ECP DEBUG 2: {ds_dict}") ds_area_dict = { ".".join( From 551be4ed8d7e18533959b81b8803f9a7128be52c Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Thu, 30 Jan 2025 14:24:56 +0000 Subject: [PATCH 11/18] DEBUG --- cads_adaptors/tools/area_selector.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index 7c1b4540..7136bc14 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -256,7 +256,8 @@ def area_selector_path( open_datasets_kwargs.setdefault("decode_times", False) open_datasets_kwargs.setdefault("chunks", -1) - print(f"ECP DEBUG 2: {open_datasets_kwargs}") + context.info(f"ECP DEBUG 1: {open_datasets_kwargs}") + print(f"ECP DEBUG 1: {open_datasets_kwargs}") # open_kwargs = ds_dict = convertors.open_file_as_xarray_dictionary( infile, @@ -266,6 +267,7 @@ def area_selector_path( "open_datasets_kwargs": open_datasets_kwargs, }, ) + context.info(f"ECP DEBUG 2: {ds_dict}") print(f"ECP DEBUG 2: {ds_dict}") ds_area_dict = { @@ -306,6 +308,7 @@ def area_selector_paths( **kwargs: Any, ) -> list[str]: import time + context.info(f"ECP DEBUG 0: {kwargs}") with dask.config.set(scheduler="single-threaded"): time0 = time.time() # We try to select the area for all paths, if any fail we return the original paths From 54d649029ad87131cb8912349916aaa0ba11d93a Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Tue, 4 Feb 2025 11:10:25 +0000 Subject: [PATCH 12/18] Logger update to match cads-worker --- cads_adaptors/tools/area_selector.py | 14 +++++--------- cads_adaptors/tools/logger.py | 10 ++-------- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index 7136bc14..1d2be741 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -183,7 +183,7 @@ def area_selector( ds, lat_key, area["south"], area["north"], context, **extra_kwargs )[0] - context.debug(f"lat_slice: {lat_slice}\nlon_slices: {lon_slices}") + context.debug(f"Area selector: lat_slice: {lat_slice}\nlon_slices: {lon_slices}") sub_selections = [] for lon_slice in lon_slices: @@ -197,12 +197,12 @@ def area_selector( **sel_kwargs, ) ) - context.debug(f"selections: {sub_selections}") + # context.debug(f"selections: {sub_selections}") ds_area = xr.concat( sub_selections, dim=lon_key, data_vars="minimal", coords="minimal" ) - context.debug(f"ds_area: {ds_area}") + context.debug(f"Area selector: ds_area: {ds_area}") # Ensure that there are no length zero dimensions for dim in [lat_key, lon_key]: @@ -256,8 +256,6 @@ def area_selector_path( open_datasets_kwargs.setdefault("decode_times", False) open_datasets_kwargs.setdefault("chunks", -1) - context.info(f"ECP DEBUG 1: {open_datasets_kwargs}") - print(f"ECP DEBUG 1: {open_datasets_kwargs}") # open_kwargs = ds_dict = convertors.open_file_as_xarray_dictionary( infile, @@ -267,8 +265,6 @@ def area_selector_path( "open_datasets_kwargs": open_datasets_kwargs, }, ) - context.info(f"ECP DEBUG 2: {ds_dict}") - print(f"ECP DEBUG 2: {ds_dict}") ds_area_dict = { ".".join( @@ -308,7 +304,7 @@ def area_selector_paths( **kwargs: Any, ) -> list[str]: import time - context.info(f"ECP DEBUG 0: {kwargs}") + with dask.config.set(scheduler="single-threaded"): time0 = time.time() # We try to select the area for all paths, if any fail we return the original paths @@ -319,7 +315,7 @@ def area_selector_paths( path, area=area, context=context, **kwargs ) except NotImplementedError: - context.logger.debug( + context.debug( f"could not convert {path} to xarray; returning the original data" ) out_paths.append(path) diff --git a/cads_adaptors/tools/logger.py b/cads_adaptors/tools/logger.py index 09c4db70..7420d0cd 100644 --- a/cads_adaptors/tools/logger.py +++ b/cads_adaptors/tools/logger.py @@ -1,9 +1,3 @@ -# TODO: use a better logger, where better means standardised to cads logs in terms of formatting -import logging -import os +import structlog -logger = logging.getLogger("adaptors") -logger.setLevel(os.getenv("ADAPTORS_LOG_LEVEL", "DEBUG")) -ch = logging.StreamHandler() - -logger.addHandler(ch) +logger = structlog.getLogger(__name__) From 3f9381342438f6f2d02b79887baae94c56cc9903 Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Wed, 5 Feb 2025 15:16:13 +0000 Subject: [PATCH 13/18] QA --- cads_adaptors/tools/area_selector.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index 2aef833e..ec650e38 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -183,7 +183,9 @@ def area_selector( ds, lat_key, area["south"], area["north"], context, **extra_kwargs )[0] - context.debug(f"Area selector: lat_slice: {lat_slice}\nlon_slices: {lon_slices}") + context.debug( + f"Area selector: lat_slice: {lat_slice}\nlon_slices: {lon_slices}" + ) sub_selections = [] for lon_slice in lon_slices: From 7056e1fd23a673d477160ef05d2c6b0c14cdebd3 Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Fri, 7 Feb 2025 08:53:41 +0000 Subject: [PATCH 14/18] switch behaviour on envvar: CADS_ADAPTOR_DASK_SCHEDULER_MODE --- cads_adaptors/tools/area_selector.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index ec650e38..c004b02c 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -11,6 +11,7 @@ from cads_adaptors.exceptions import CdsFormatConversionError, InvalidRequest from cads_adaptors.tools import adaptor_tools, convertors +DASK_SCHEDULER_MODE = os.getenv("CADS_ADAPTOR_DASK_SCHEDULER_MODE", "threads") def area_to_checked_dictionary(area: list[float | int]) -> dict[str, float | int]: north, east, south, west = area @@ -283,6 +284,9 @@ def area_selector_path( out_path = os.path.join(target_dir, f"{fname_tag}.nc") for var in ds_area.variables: ds_area[var].encoding.setdefault("_FillValue", None) + # If threads, need to compute before writing to disk as dask loses too many jobs + if DASK_SCHEDULER_MODE == "threads": + ds_area.compute() ds_area.to_netcdf(out_path) out_paths.append(out_path) else: @@ -293,7 +297,10 @@ def area_selector_path( out_path = os.path.join(target_dir, f"{fname_tag}.nc") for var in ds_area.variables: ds_area[var].encoding.setdefault("_FillValue", None) - ds_area.compute().to_netcdf(out_path) + # If threads, need to compute before writing to disk as dask loses too many jobs + if DASK_SCHEDULER_MODE == "threads": + ds_area.compute() + ds_area.to_netcdf(out_path) out_paths.append(out_path) return out_paths @@ -307,7 +314,7 @@ def area_selector_paths( ) -> list[str]: import time - with dask.config.set(scheduler="single-threaded"): + with dask.config.set(scheduler=DASK_SCHEDULER_MODE): time0 = time.time() # We try to select the area for all paths, if any fail we return the original paths out_paths = [] From f67bec7b9137798619ebc1be50055e34067d18e9 Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Fri, 7 Feb 2025 09:22:58 +0000 Subject: [PATCH 15/18] switch behaviour on adaptor config option: dask_scheduler_mode --- cads_adaptors/tools/area_selector.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index c004b02c..46b9e7c3 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -234,6 +234,7 @@ def area_selector_path( target_dir: str | None = None, area_selector_kwargs: dict[str, Any] = {}, open_datasets_kwargs: list[dict[str, Any]] | dict[str, Any] = {}, + dask_scheduler_mode: str = "threads", **kwargs: dict[str, Any], ) -> list[str]: if isinstance(area, list): @@ -285,7 +286,7 @@ def area_selector_path( for var in ds_area.variables: ds_area[var].encoding.setdefault("_FillValue", None) # If threads, need to compute before writing to disk as dask loses too many jobs - if DASK_SCHEDULER_MODE == "threads": + if dask_scheduler_mode == "threads": ds_area.compute() ds_area.to_netcdf(out_path) out_paths.append(out_path) @@ -298,7 +299,7 @@ def area_selector_path( for var in ds_area.variables: ds_area[var].encoding.setdefault("_FillValue", None) # If threads, need to compute before writing to disk as dask loses too many jobs - if DASK_SCHEDULER_MODE == "threads": + if dask_scheduler_mode == "threads": ds_area.compute() ds_area.to_netcdf(out_path) out_paths.append(out_path) @@ -314,14 +315,15 @@ def area_selector_paths( ) -> list[str]: import time - with dask.config.set(scheduler=DASK_SCHEDULER_MODE): + dask_scheduler_mode: str = kwargs.pop("dask_scheduler_mode", DASK_SCHEDULER_MODE) + with dask.config.set(scheduler=dask_scheduler_mode): time0 = time.time() # We try to select the area for all paths, if any fail we return the original paths out_paths = [] for path in paths: try: out_paths += area_selector_path( - path, area=area, context=context, **kwargs + path, area=area, context=context, dask_scheduler_mode=dask_scheduler_mode, **kwargs ) except (NotImplementedError, CdsFormatConversionError): context.debug( From eb26b5eeaeea2a2df26202348bef39c46e7ee643 Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Fri, 7 Feb 2025 09:29:09 +0000 Subject: [PATCH 16/18] area-selector dask_scheduler_mode on config option --- tests/test_30_area_selector.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/tests/test_30_area_selector.py b/tests/test_30_area_selector.py index 86ebaf6d..1b54e14c 100644 --- a/tests/test_30_area_selector.py +++ b/tests/test_30_area_selector.py @@ -251,7 +251,13 @@ def test_area_selector_zero_length_dim(): "https://get.ecmwf.int/repository/test-data/test-data/cads-adaptors/" ) - +@pytest.mark.parametrize( + "dask_mode", + [ + "single-threaded", + "threads", + ], +) @pytest.mark.parametrize( "url", [ @@ -259,20 +265,27 @@ def test_area_selector_zero_length_dim(): f"{TEST_DATA_BASE_URL}/C3S-312bL1-L3C-MONTHLY-SRB-ATSR2_ORAC_ERS2_199506_fv3.0.nc", ], ) -def test_area_selector_real_files(url): +def test_area_selector_real_files(dask_mode, url): with tempfile.TemporaryDirectory() as temp_dir: test_file = os.path.join(temp_dir, TEMP_FILENAME) remote_file = requests.get(url) with open(test_file, "wb") as f: f.write(remote_file.content) - result = area_selector_path(test_file, area=[90, -180, -90, 180]) + result = area_selector_path(test_file, area=[90, -180, -90, 180], dask_scheduler_mode=dask_mode) assert isinstance(result, list) assert len(result) == 1 assert isinstance(result[0], str) # Test with lists of urls +@pytest.mark.parametrize( + "dask_mode", + [ + "single-threaded", + "threads", + ], +) @pytest.mark.parametrize( "urls", [ @@ -292,7 +305,7 @@ def test_area_selector_real_files(url): ], ], ) -def test_area_selector_paths_real_files(urls): +def test_area_selector_paths_real_files(dask_mode, urls): with tempfile.TemporaryDirectory() as temp_dir: test_files = [] for i, file in enumerate(urls): @@ -306,6 +319,7 @@ def test_area_selector_paths_real_files(urls): result = area_selector_paths( test_files, area=[90, -180, -90, 180], + dask_scheduler_mode=dask_mode, ) assert isinstance(result, list) assert len(result) == len(urls) From bd4fb7929d77360a0286dd9996399ed7d0ff0ee4 Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Fri, 7 Feb 2025 09:50:15 +0000 Subject: [PATCH 17/18] QA --- cads_adaptors/tools/area_selector.py | 7 ++++++- tests/test_30_area_selector.py | 5 ++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index 46b9e7c3..2ab4b00d 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -13,6 +13,7 @@ DASK_SCHEDULER_MODE = os.getenv("CADS_ADAPTOR_DASK_SCHEDULER_MODE", "threads") + def area_to_checked_dictionary(area: list[float | int]) -> dict[str, float | int]: north, east, south, west = area if north < south: @@ -323,7 +324,11 @@ def area_selector_paths( for path in paths: try: out_paths += area_selector_path( - path, area=area, context=context, dask_scheduler_mode=dask_scheduler_mode, **kwargs + path, + area=area, + context=context, + dask_scheduler_mode=dask_scheduler_mode, + **kwargs, ) except (NotImplementedError, CdsFormatConversionError): context.debug( diff --git a/tests/test_30_area_selector.py b/tests/test_30_area_selector.py index 1b54e14c..fdb59431 100644 --- a/tests/test_30_area_selector.py +++ b/tests/test_30_area_selector.py @@ -251,6 +251,7 @@ def test_area_selector_zero_length_dim(): "https://get.ecmwf.int/repository/test-data/test-data/cads-adaptors/" ) + @pytest.mark.parametrize( "dask_mode", [ @@ -272,7 +273,9 @@ def test_area_selector_real_files(dask_mode, url): with open(test_file, "wb") as f: f.write(remote_file.content) - result = area_selector_path(test_file, area=[90, -180, -90, 180], dask_scheduler_mode=dask_mode) + result = area_selector_path( + test_file, area=[90, -180, -90, 180], dask_scheduler_mode=dask_mode + ) assert isinstance(result, list) assert len(result) == 1 assert isinstance(result[0], str) From 7cd65f45e12ab6110a6dd02b83686eece51a75cb Mon Sep 17 00:00:00 2001 From: EddyCMWF Date: Fri, 7 Feb 2025 09:52:06 +0000 Subject: [PATCH 18/18] QA --- cads_adaptors/tools/area_selector.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cads_adaptors/tools/area_selector.py b/cads_adaptors/tools/area_selector.py index 2ab4b00d..58bfff2d 100644 --- a/cads_adaptors/tools/area_selector.py +++ b/cads_adaptors/tools/area_selector.py @@ -11,7 +11,7 @@ from cads_adaptors.exceptions import CdsFormatConversionError, InvalidRequest from cads_adaptors.tools import adaptor_tools, convertors -DASK_SCHEDULER_MODE = os.getenv("CADS_ADAPTOR_DASK_SCHEDULER_MODE", "threads") +DEFAULT_DASK_SCHEDULER_MODE = os.getenv("CADS_ADAPTOR_DASK_SCHEDULER_MODE", "threads") def area_to_checked_dictionary(area: list[float | int]) -> dict[str, float | int]: @@ -316,7 +316,9 @@ def area_selector_paths( ) -> list[str]: import time - dask_scheduler_mode: str = kwargs.pop("dask_scheduler_mode", DASK_SCHEDULER_MODE) + dask_scheduler_mode: str = kwargs.pop( + "dask_scheduler_mode", DEFAULT_DASK_SCHEDULER_MODE + ) with dask.config.set(scheduler=dask_scheduler_mode): time0 = time.time() # We try to select the area for all paths, if any fail we return the original paths