Skip to content

Conversation

@schlunma
Copy link
Contributor

@schlunma schlunma commented Nov 21, 2023

Description

Many variables in the Earth system show a strong diurnal cycle. The reason for that is of course Earth's rotation around its own axis, which leads to a diurnal cycle of the incoming solar radiation. While UTC time is a very good absoulte time measure, it is not really suited to analyze diurnal cycles over larger regions. For example, the diurnal cycle of temperature over Russia and the USA is phase shifted by ~180°=12hrs in UTC time.

This is where the local solar time (LST) comes into play: For a given location, 12:00 noon LST is defined as the moment when the sun reaches its highest point in the sky. By using this definition which is based on the origin of the diurnal cycle (the sun), we can directly compare diurnal cycles accross the globe. Due to the eccentricity of Earth's orbit, LST does not only depend on longitude, but also on the day of year:

grafik
(from https://www.pveducation.org/pvcdrom/properties-of-sunlight/solar-time).

Since this correction is at most ~15min which is usually smaller than the highest frequency output of CMIP6 models (1hr), we ignore that here, and use the mean LST, which solely depends on longitude:

local_solar_time = utc_time + timedelta(hours=(longitude / 180 * 12)  # longitude in [-180, 180]

This (more or less) exact LST is then put into bins given by the bounds of the time coordinate of the input cube to match its shape. E.g, if the exact LST at a given location is 11:47, then the corresponding data is put into the time step 12:00 with bounds (11:30, 12:30).

To transform the data to LST, this new preprocess local_solar_time shifts data along the time dimension based on the longitude. The input data and output data will have the same shape and temporal frequency; input cubes are not changed at all.

Internally, this preprocessor uses numpy's advanced indexing to reorder the data. Unfortunately, this is not fully supported by dask, yet. Thus, this preprocessor uses dask.array.apply_gufunc with appropriately chunked input arrays to perform this calculation fully lazy. In addition, this function handles auxiliary coordinates, ancillary variables and cell measures properly (i.e., those that do not span time or longitude are unchanged; those that span both are properly reordered just like the cube's data).

Here is a recipe to test this preprocessor: recipe_000.yml.txt

Closes #2257

Link to documentation:


Before you get started

Checklist

It is the responsibility of the author to make sure the pull request is ready to review. The icons indicate whether the item will be subject to the 🛠 Technical or 🧪 Scientific review.


To help with the number pull requests:

@schlunma schlunma added the preprocessor Related to the preprocessor label Nov 21, 2023
@schlunma schlunma added this to the v2.11.0 milestone Nov 21, 2023
@schlunma schlunma self-assigned this Nov 21, 2023
@codecov
Copy link

codecov bot commented Nov 21, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (1f55ec9) 93.53% compared to head (45024f7) 93.61%.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2258      +/-   ##
==========================================
+ Coverage   93.53%   93.61%   +0.07%     
==========================================
  Files         238      238              
  Lines       12955    13113     +158     
==========================================
+ Hits        12118    12276     +158     
  Misses        837      837              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@schlunma schlunma marked this pull request as ready for review November 21, 2023 16:21
@valeriupredoi
Copy link
Contributor

valeriupredoi commented Nov 22, 2023

@schlunma Manu, thanks a lot for all this superb PR! But here's me putting the lazy devv hat on - it's an awful lot of specialized utilitarian code just to be able to get this local Solar time computed - wouldn't it make more sense to have this all dumped into iris (by you, of course, not ask them to implement it). Conversely, probably a fair bit of all that private utilitarian code can be used by other preprocessors - can we not eg transform to local time of Paris or Tokyo for regional models, or have a public rechunk_cube() (obviously outside the time module)?

@schlunma
Copy link
Contributor Author

Hi V, I agree that this is very specialized code, so in my opinion it is out of scope for the iris package and fits much better into ESMValCore!

As I mentioned in the PR description, the main problem here is that dask does not support multidimensional advanced indexing at the moment. We could probably drop 80% of the code here if that was possible (all the rechunking, handling of aux coords, cell measures and ancillary variables, etc.). Unfortunatly, that only works for cubes with realized data, and since this preprocessor is specifically designed to be applied to subdaily data (which tends to be large), this is not really suitable. And please don't ask me to open a PR in dask, there is probably a good reason why they haven't implemented that yet 😅

Putting the rechunking code into a more general module is a good idea, I will do that! The rest of the code is rather specialized to this function, so I don't think it makes sense to put that anywhere else.

@valeriupredoi
Copy link
Contributor

thanks a lot, Manu, cheers! No way I'd ask you get stuff into Dask, that'd be crapload of work - and, unless it's something we know 100% that's a good fit in Dask, we shouldn't put our reputation on the line, and get refused - let's keep that interaction for when we really must. Cool then, if you do a public func with rechunking, that'd be fab 🍺

@schlunma
Copy link
Contributor Author

I just updated the PR description and will put a similar text into the doc, hope that makes things clearer 👍

@schlunma
Copy link
Contributor Author

@valeriupredoi, I moved the rechunking to the iris_helpers module in 646c8af 👍

@valeriupredoi
Copy link
Contributor

many thanks, Manu! I'll revisit my review on Monday, off to the pub now 🍺 Have a good weekend!

@schlunma
Copy link
Contributor Author

Thanks for the review V! 🚀

A recipe to test this is already available in the PR description:

Here is a recipe to test this preprocessor: recipe_000.yml.txt

Regarding an example recipe: I will add a recipe that is using this preprocessors to the general model evaluation recipes (see ESMValGroup/ESMValTool#3421) once this is merged. Would that work for you?

@ESMValGroup/technical-lead-development-team would anyone of you have time to do a second check of this an merge eventually?

mask=mask,
)
cube.data = _transform_arr(
cube.lazy_data(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to also support realized data? E.g. use cube.core_data() here? Most preprocessor functions do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I also though about this, but unfortunately it's not as simple as using core_data() instead of lazy_data() here, since da.apply_gufunc will always return a dask array. In addition, we need to also consider other dimensional metadata here. For example, a cube can have realized data, but lazy cell measures.

I'll try to think of a solution that properly handles all cases.

One question: how should rechunk_cube deal with numpy arrays? Leave them like this? Or transform them to a dask array?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rechunking is not needed for numpy arrays because they have no chunks, so the most logical solution to me seems to not apply that function to numpy arrays.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err well, not entirely true that, before going to Numpy, data has HDF5 chunks, and the mapping of these chunks is held in the file's B-tree, which is loaded by netCDF4 and I believe it is available to Numpy when assembling the data into an array - what's done with it afterwards, is prob nothing, especially for Numpy arrays realized/blown up into memory. There is, however, the question of Numpy lazy data but I am not too familiar with that yet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right, implemented this in b77f6b9. local_solar_time will only return dask arrays if the input is already lazy.

rechunk_cube now never touches numpy arrays (9a2fa74).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is, however, the question of Numpy lazy data but I am not too familiar with that yet

I haven't heard of that before, do you have a reference? The only thing resembling lazy data in numpy that I know of are views, some numpy functions will return those instead of computing a new array.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was thinking of mmap() - here's a cool side-by-side of mmap vs HDF5 vs Zarr https://pythonspeed.com/articles/mmap-vs-zarr-hdf5/

@bouweandela
Copy link
Member

@ESMValGroup/science-reviewers Could one of you please review the algorithm and documentation?

@schlunma
Copy link
Contributor Author

schlunma commented Dec 7, 2023

Thanks for the reviews @valeriupredoi and @bouweandela. I just tested this with 10 years of 3-hourly data:

Setup Runtime [s] Memory usage [GiB]
empty dask.yml 160 17.8
10 workers with 3 GiB each 90 14.2
10 workers with 3 GiB each, ONLY rechunking 90 9.4

As you can see, we get a nice improvement with dask.distributed.

However, when increasing this to 30 years of 3-hourly data, the process just hangs on my machine (32 GiB RAM) when using the default scheduler, and fails with all kinds of wild warnings for the distributed scheduler (I also tested various configurations). I guess the reason is that the rechunking is very expensive and needs to move data around workers (see table: the preprocessor takes the same time with only the rechunking enabled [no re-indexing]).

Does anyone have an idea how to improve this? The data must not be chunked along the dimension that I am re-indexing, so I need this rechunking.

@bouweandela
Copy link
Member

bouweandela commented Dec 7, 2023

I suspect your dask graph may be too big. Could you check what the value is of len(result_cube.lazy_data().dask)?

@schlunma
Copy link
Contributor Author

schlunma commented Dec 7, 2023

I don't think the task graph is too long:

Setup 10 yrs 30 yrs
no custom preproc 297 811
+ rechunking 814 3007
+ reindexing 942 3519

Peek 2023-12-07 16-45

I attached a video of the dask dashboard when when processing 30yrs (the one that fails). In the beginning, everything looks fine, but then during rechunking a lot of transfers (red boxes) and spilling happens (orange boxes).

@bouweandela
Copy link
Member

When workers start spilling to disk, they're running out of memory. You could try to increase the memory per worker. I think it may help with debugging to run the workers on a different machine from the scheduler because if you're using too much memory your OS will start killing processes which makes the whole computer act strangely. I usually run the esmvaltool command the Levante interactive queue with a few processors and e.g. 16GB of memory, and then use a compute node for the workers.

@bouweandela
Copy link
Member

bouweandela commented Dec 7, 2023

There is also some work going on to improve rechunking, see here, but I'm not sure if it quite works yet.

@alistairsellar
Copy link
Contributor

@ESMValGroup/science-reviewers Could one of you please review the algorithm and documentation?

Hi @bouweandela @schlunma I reckon I could science-review this one, but not til next week. Is that soon enough?

@schlunma
Copy link
Contributor Author

schlunma commented Dec 8, 2023

When workers start spilling to disk, they're running out of memory. You could try to increase the memory per worker.

Yeah, I already tried that, up to 16 GiB per worker. I am pretty sure that even more memory will solve this. However, the total size of the 30 years of data is ~6 GiB, so I am really wondering why this does not work smoothly with almost 3x more memory per worker...

I think it may help with debugging to run the workers on a different machine from the scheduler because if you're using too much memory your OS will start killing processes which makes the whole computer act strangely. I usually run the esmvaltool command the Levante interactive queue with a few processors and e.g. 16GB of memory, and then use a compute node for the workers.

That's a good idea! I am usually running this on an interactive node with a LocalCluster. Will try to use a SLURMCluster later today.

Anyway, I don't think there's much we can do at the moment. We got a nice speedup with dask distributed and with more memory we can also analyze larger datasets.

@schlunma
Copy link
Contributor Author

schlunma commented Dec 8, 2023

Hi @bouweandela @schlunma I reckon I could science-review this one, but not til next week. Is that soon enough?

That would be awesome @alistairsellar! Next week is totally fine, this is not urgent 👍

@bouweandela
Copy link
Member

When workers start spilling to disk, they're running out of memory. You could try to increase the memory per worker.

Yeah, I already tried that, up to 16 GiB per worker. I am pretty sure that even more memory will solve this. However, the total size of the 30 years of data is ~6 GiB, so I am really wondering why this does not work smoothly with almost 3x more memory per worker...

Maybe you're accidentally creating some very large intermediate chunks somewhere that increase in size with the input data. Here is a bit of code that allows you to check that.

@schlunma
Copy link
Contributor Author

Maybe you're accidentally creating some very large intermediate chunks somewhere that increase in size with the input data. Here is a bit of code that allows you to check that.

That's also not the case, the larges chunk I encountered is 127.8 MiB

I think it may help with debugging to run the workers on a different machine from the scheduler because if you're using too much memory your OS will start killing processes which makes the whole computer act strangely. I usually run the esmvaltool command the Levante interactive queue with a few processors and e.g. 16GB of memory, and then use a compute node for the workers.

That did the trick!! Reserving an entire compute node with 64 workers (4 GiB per worker) did the rechunking and reindexing of 30years of data in less than a minute 🎉. However, saving the data takes ~2 hours since this only uses one process...We need parallel write! 😁

Here is my dask.yml:

# For SLURM cluster:
# - cores: total number of cores PER JOB
# - memory: total amount of memory PER JOB
# - processes: number of processes PER JOB
# --> Number of threads PER JOB = cores / processes
# --> Number of jobs = n_workers / processes (1 worker == 1 process)

# Spawn one job that will use entire compute node (make sure that processes ==
# n_workers!!)
cluster:
  type: dask_jobqueue.SLURMCluster
  queue: compute
  account: bd1179
  cores: 128
  memory: 256GiB
  processes: 64
  interface: ib0
  local_directory: '/scratch/b/b309141/dask-tmp'
  n_workers: 64
  walltime: '02:00:00'

@axel-lauer
Copy link
Contributor

I just tested this new preprocessor with 2-dim and 3-dim variables also trying different combinations with additional preprocessors such as extract_region and zonal_statistics and the results look as expected. Very nice work @schlunma!

Copy link
Contributor

@alistairsellar alistairsellar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice work indeed @schlunma! Approving science review, with a few suggestions that you are free to ignore.

The documentation is excellent! Particularly important for this preprocessor.

@schlunma
Copy link
Contributor Author

Thanks for your review and comments @alistairsellar 👍 I think I addressed everything now. This can be merged @ESMValGroup/technical-lead-development-team.

@valeriupredoi
Copy link
Contributor

well done, folks! Manu, you getting a merge 🎁 from me 🎄

@valeriupredoi valeriupredoi merged commit 0698ef8 into main Dec 21, 2023
@valeriupredoi valeriupredoi deleted the local_time_preproc branch December 21, 2023 12:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

preprocessor Related to the preprocessor

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add preprocessor that converts time to local solar time

6 participants