Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cubed for larger-than-memory workloads on a single machine #492

Open
TomNicholas opened this issue Jul 13, 2024 · 12 comments
Open

Cubed for larger-than-memory workloads on a single machine #492

TomNicholas opened this issue Jul 13, 2024 · 12 comments
Labels

Comments

@TomNicholas
Copy link
Member

TomNicholas commented Jul 13, 2024

One thing that's really obvious at Scipy this year is that people absolutely love DuckDB. The reason is that although DuckDB doesn't offer multi-machine parallelism so can never scale horizontally, it can nevertheless operate on larger-than-memory workloads, and is extremely reliable at just chugging through them until they complete. This confidence is all people really need for 90% of real-world workloads. It seems like a lot of users are happy to lose the horizontal scaling in favour of simplicity and reliability of no longer trying to run a distributed system.

This has got me thinking about whether we could write an executor for Cubed that is single-machine but can handle larger-than-memory workloads. It would parallelize to fully utilize that machine but only that machine.


I've also been reading this blog post on Memory Management in DuckDB, and basically all they seem to do is
a) have a preset memory usage limit (defaulting to 80% of machine RAM),
b) spill to disk when necessary,
c) try to minimize spilling to disk by efficient caching,
d) fully utilize the parallelism available to them on that one machine to get vertical scaling.

I feel like in Cubed's model we have all the information we need to take a similar approach. We have parallel single-machine executors already (synchronous and asynchronous), but (please correct me if I've misunderstood @tomwhite) neither of these executors would work on larger-than-memory workloads, because they currently would naively try to launch more cubed tasks than could fit in memory (because Cubed's allowed_mem refers to per-task).

(I'm assuming this because I notice neither single-machine executor actually uses the cubed.Spec object for anything, so whilst they would raise if you tried to run a workload where a single task would use > allowed_mem (because cubed wouldn't even let you execute in that case), they can't possibly have a model of exactly how many tasks they can safely run in parallel simultaneously before running out of RAM.)

So what if we wrote an executor which:

  • Looks to see how much RAM your machine has,
  • For each stage in the plan:
    • Calculates how many tasks it can safely run in parallel using the multiprocessing code we have (calculated as safe_num_tasks = total_machine_RAM / projected_mem_per_task),
    • Runs that many tasks at once,
      • If that's all of the tasks needed to complete that stage then don't bother writing the results to storage, just keep them in memory ready for the next stage,
      • If that's not all of the tasks needed then run them in batches, spilling results to disk via Zarr as normal. This is what would allow running on larger-than-memory workloads (I think it's basically the same idea as dask's P2P rechunking).
      • (Maybe have some way of only spilling the minimum number of tasks necessary to disk and keeping the rest in memory)
    • Now move on to the next stage of the plan.

Maybe I'm completely misunderstanding something about how sharing memory across processes works, but wouldn't this strategy basically have the same characteristics that the DuckDB approach has?

cc @rabernat @jakirkham

@TomNicholas TomNicholas added enhancement New feature or request runtime labels Jul 13, 2024
@TomNicholas TomNicholas changed the title Cubed for out-of-memory workloads on a single machine Cubed for larger-than-memory workloads on a single machine Jul 13, 2024
@TomNicholas
Copy link
Member Author

TomNicholas commented Jul 13, 2024

Thinking more about how we might prototype this idea in Cubed today...

In #187 @tomwhite added support for TensorStore as an alternative on-disk Zarr store for writing intermediate results to. That choice of storage layer is currently a global config flag, but what if we made it so that each stage in the plan could make its own choice about this flag, and so choose to write out to a different type of Zarr store?

Then we have the single machine executor described above write out to an on-disk Zarr store for stages where it knows some spilling to disk is required, but for other stages have it write to an in-memory Zarr store instead (or maybe even a RedisStore...). That way data is easily accessed by the correct task in the next stage without writing that data to disk.

EDIT: Actually we don't need this in order to just prototype the idea - simply batching the tasks as above and writing and intermediate store to disk for every stage as cubed currently does would be enough to test running on a larger-than-memory workload, it would just do a lot more IO than necessary when running on one machine.

@tomwhite
Copy link
Member

Excited to see this! I agree this would be very attractive for some users and workloads.

I also think that we basically have what you describe today using the processes executor. As long as num_processes * allowed_mem <= total_mem things will just work. (The reason I didn't add that check in #411 was because it would need the psutil dependency - but that should be straightforward to add.)

The optimizer is pretty good at reducing the amount of intermediate data written, so it would be worth trying it out on some workloads as it stands. That may suggest more improvements to reduce IO that we could make of course.

@alxmrs
Copy link
Contributor

alxmrs commented Jul 14, 2024

Maybe I'm completely misunderstanding something about how sharing memory across processes works, but wouldn't this strategy basically have the same characteristics that the DuckDB approach has?

Inter-process communication is expensive. This approach is different than DuckDB, which is a single-process query engine. IIUC, it’s also embeddable within other applications (I don’t think that would be a good goal for Cubed). With multithreading, I believe DuckDB takes advantage shared memory between threads.

For a prototype, I think single-machine, multiprocessing is OK, but to get the next level of performance, multithreading is necessary. To this end, I think exploring #497 is warranted.

@rabernat
Copy link

I also love DuckDB! 🦆

However, from a software architecture point of view, it is almost the complete opposite from Cubed. DuckDB was designed from the beginning as a high-performance, vertically scalable, standalone C++ application. Cubed was designed from the beginning as a fully distributed, horizontally scalable, serverless execution framework.

I support the idea of a local multiprocessing scheduler. But we shouldn't expect same algorithms and approaches that work well in the serverless, horizontally scaling context will automatically translate to good vertical scaling.

One great innovation of cubed has been the development of an Intermediate Representation (the Cubed Plan) for array computations. This is an important part of any high-performance database query engine and could help work towards the DuckDB-type idea. However, I'd argue this is close to a Physical Plan than a Logical Plan. A very different Physical Plan may be needed for a local, vertically scaling executor.

@rabernat
Copy link

p.s. Python has a shared memory pool for IPC.

@TomNicholas
Copy link
Member Author

TomNicholas commented Jul 15, 2024

Cubed was designed from the beginning as a fully distributed, horizontally scalable, serverless execution framework.

Sure you could do better by designing something for the vertical case for the ground up, but I think we still have a good chance of making something really useful by clever implementation of the stages that cubed's model breaks everything up into.

One great innovation of cubed has been the development of an Intermediate Representation (the Cubed Plan) for array computations.

I think the real power of cubed's model is that it allows us to clearly reason about the shuffle in advance of starting it, and categorize it into the in-memory and out-of-memory case (see #502). We could get a very long way just by optimizing implementation details of that shuffle.

p.s. Python has a shared memory pool for IPC.

@applio and I discussed this today during the sprint, and he is literally the person who maintains this part of the python standard library (🤯), so I'm hoping he can give us some pointers on how to optimise the in-memory shuffle by sharing memory intelligently between threads/processes!

@tomwhite
Copy link
Member

it can nevertheless operate on larger-than-memory workloads, and is extremely reliable at just chugging through them until they complete

As an experiment I tried running the Cubed benchmarks for Quadratic Means on my local machine with 8 cores and 16GB of memory using the processes executor. The CPUs were fully utilised, and there were no memory problems. The 5000 case below is 150GB of input data.

┌────────────────────────────────────────────────────────┬────────────────────────────┬────────────────────┐
│                          name                          │           start            │      duration      │
│                        varchar                         │         timestamp          │       double       │
├────────────────────────────────────────────────────────┼────────────────────────────┼────────────────────┤
│ test_quadratic_means_xarray[50-new-optimizer]          │ 2024-07-18 15:25:50.892829 │ 3.7533631324768066 │
│ test_quadratic_means_xarray[500-new-optimizer]         │ 2024-07-18 15:28:18.81189  │ 30.925524950027466 │
│ test_quadratic_means_xarray[5000-new-optimizer]        │ 2024-07-18 15:34:14.146469 │ 294.28265595436096 │
├────────────────────────────────────────────────────────┴────────────────────────────┴────────────────────┤

No doubt there's room for improvement, but this is a good starting point.

@tomwhite
Copy link
Member

I'd like to emphasise the single machine case more prominently in the examples (#505), since it's an easy way to get started with Cubed.

@rabernat
Copy link

That's a great result Tom!

@rabernat
Copy link

150 GB / 295 s = 508 MB/s

That's not a bad throughput and might getting close to the I/O limit. I assume you've got an SSD. Some SSDs can go 5x faster, but for others this is consistent with published benchmarks.

@tomwhite
Copy link
Member

Thanks Ryan. Yes, it's using an SSD on a Mac mini M1.

@tomwhite
Copy link
Member

I've opened #514 to track the work on this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants