Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelized Queue implementation #301

Closed
dvolgyes opened this issue Sep 17, 2020 · 25 comments
Closed

Parallelized Queue implementation #301

dvolgyes opened this issue Sep 17, 2020 · 25 comments

Comments

@dvolgyes
Copy link
Contributor

🚀 Feature
A processing queue implementation which has a limited capacity,
but immediately emits new data samples, and doesn't wait until the queue is full.

Motivation

Speed.

Pitch

Randomize in advance, but do not start the pre-loading until there is no empty spot in the queue.

Alternatives

Existing queue.

@dvolgyes
Copy link
Contributor Author

dvolgyes commented Sep 17, 2020

The primary question is about features. The parallel programming part is not easy, but also not too hard,
if the expected behavior is defined.

Basic assumptions:

  • loading one volume and performing the augmentation takes a lot of time for each volume (30-60 seconds)
  • an augmented volume consumes a lot of memory, so just a small portion of the dataset can be stored in memory (<10%)
  • getting a patch out of a voume is / will be quick, especially with UniformSampler
  • many patches (>2) or many many (>32) patches will be extracted.
    If the volume can be reused, this is basically factor of N speedup.
  • samples from a single volume are somewhat correlated, so it would be nice to mix samples from multiple volumes
  • but also volumes could be / should be somewhat randomized.

Let's call the volume capacity V (how many volumes can be stored at the same time in memory),
and let's call the numer of patches P.

1, Most basic approach: take V (e.g. 4) volumes and P (5) patches, and make (v,p) pairs, and randomize these:
(0,0), (3,2), (1,4), ...
When these run out, you can take the next batch, e.g. (5,0), (7,3), ...

The loader logic would be: load a volume when there is less then V volume in memory, and discard volume
when P patches were extracted.

This is somewhat close to the current implementation, except the part of waiting for the queue to be filled,
but raises a few question.
What happens if in the end less than V volumes are present? E.g. 4 volume long queue, 11volumes in tota.

  • Should any previous volume be reused, if yes, which one?
    The first ones (extra loading), or keeping from the previous batch (not discarding)?
  • Could the final queue be (almost) twice as large? E.g. just merging the last two batches,
    forming a super-batch with 7 volumes, and randomize the volumes here?

Or should be the last two batches merged, split into half, and make two shorter batch,
but both of them are at least V/2 size. (E.g. 4 and 2 would make 3+3).

2, A volume can be loaded multiple times, but also can be discarded. This allows more randomization,
but it is not extra disk loading time, but the augmentations will be different for the same volume.

3, Some other approach.

My proposal:
A semi-intelligent parallel queue which takes a command stream.
The queue has no upper limit, the stream determines the behaviour, and very basic commands govern it.
The parallel implementations should use this base class.
Commands:

  • load a new volume (v), (L #v )
  • take patch from volume v (P # v)
  • discard volume v (assuming that all previous patches extracted, so it has a join) (D #v)

So a command stream would be something like this:
L#1, L#2, L#3, L#4, P#1, P#4, P#3, P#1, P# 2, P#2,..., D#3, L#5, P#5, #P2, D#2, L#6 ....

Whenever a new sample is asked, the P #N sample will be returned. (L and D are only for memory management.)
If reloading (new augmentation) is required, then it should be a discard and reload.
(Could be merged into a re-augment command, but it will be ugly.)

As far as i see, this is a flexible enough structure for implementing patient-based balancing, or class based balancing, or whatever. The specific implementations basically just yield their command stream, and this base class can handle the parallel loading.

The question though: there is a limit for volumes being stored in parallel in memory (V).
Should be a queue for the patches, assuming that the patch extraction takes significant time (e.g. weighted sampler)?
(Which consumes extra memory) Or should the patches generated on demand?
Or should it be optional? (E.g. queue length=V by default, but could be changed).

So how do you feel about this command stream design, and what do you think about a queue for patches?
Should be any other commands, e.g. taking an existing volume for memory, and asking for new augmentation.

Remark: just like the current queue, it should have no workers in the dataloader, the queue will impement its own workers.

@dvolgyes
Copy link
Contributor Author

Hi,

So more or less i have a working implementation, but there are a few issues to sort out.
First, there is a new queue class, let's call it somehow.
Technically, it is a state-machine/command stream based parallel queue implementation.
It could be called ParallelQueue, but the regular Queue is also somewhat parallel.
Command-queue or similar could be a bit misleading / hard to understand.

But anyway, let's move to the next point.
For initialization, the class takes a few parameter:

  • queue length, that is the number of parallel patches just before they are emitted, that is kind of simple, and can be quite large, e.g. 64.
  • a sampler. Ideally, it yields exactly as many patches from a volume as expected, but it can restart the sampling if necessary
  • a command stream. This is just a list of command tuples, but every logic is implemented here. With a well selected stream, you can exactly emulate the current queue behaviour. Or you could do a lot of other magic, see above.

I wrote two command stream generator, a basic optimizer, etc., all of them are simple functions.
So the question is: should we provide subclasses for given strategies? Or should we implement
a few different strategy and the queue can take the strategy as a parameter?

Finally, it makes life much easier if i can use the unsync library. It is a small dependency, and i really would like to keep it.

So currently these are the functions and classes:
command stream generation:

def command_stream_generator(num_subjects, max_num_in_memory, num_patches, strategy='basic')
def basic_command_stream_generator(num_subjects, max_num_in_memory, num_patches)
def merge_incomplete(num_subjects, max_num_in_memory, num_patches):

command stream tools:

def optimize_queue(queue):
def shift_queue(queue, offset):
def randomize_queue(queue):

and the queue:

class ParallelQueue(IterableDataset):

So one possible usage:

cmd_stream = command_stream_generator(len(dataset), 5, 32, strategy='merge_incomplete')
queue = ParallelQueue(dataset, sampler, cmd_stream, patch_queue_size = 32, seed=0)

(Seed: for the subprocesses i need to set a random seed. Pytorch dataloaders do exactly the same.
If the seed is 0, i generate one based on torch.randint, otherwise the seed is set externally (for reproducibility))

But it could be that we make a subclass, and we use something like this:

cmd_stream = command_stream_generator(len(dataset), 5, 32, strategy='merge_incomplete')
queue = ParallelQueue(dataset, sampler, cmd_stream, patch_queue_size = 32, seed=0,
        max_no_subjects_in_mem=5, patch_per_subject=32, strategy='merge_incomplete')

The question is about structure: subclass or separate command stream?
Should the stream tools be exported, or should they be hidden?
(e.g. the basic optimizer, or the other basic stream tools, assuming someone writes a more intelligent scheme)

Should we assume other queue implementations, or just put defaults into this class,
and let practicality bet generality. :)

Performance-wise: it looks quite good, if you have enough patches per volume, then basically the network never waits. :)

How do you see it?

@romainVala
Copy link
Contributor

Hi
thanks for the proposition, this is indeed a very important topic, since in practice it is often that instead of using a super GPU card 90% of the time, it waits 90% that the queue is filled.
Note that the proportion (computing / waiting) for the GPU is difficult to control (Or may be just for me, but I miss timing control for gpu use time)
But anyway just looking at the ressource during training, I do see some waste of the gpu computation, which is just waiting

I am not a programer expert, but here are some comment

What happens if in the end less than V volumes are present? E.g. 4 volume long queue, 11volumes in tota.

* Should any previous volume be reused, if yes, which one?
  The first ones (extra loading), or keeping from the previous batch (not discarding)?

* Could the final queue be (almost) twice as large? E.g. just merging the last two batches,
  forming a super-batch with 7 volumes, and randomize the volumes here?

Or should be the last two batches merged, split into half, and make two shorter batch,
but both of them are at least V/2 size. (E.g. 4 and 2 would make 3+3).

I would go with the last patch beeing shorter, no need to make it smarter because in pratice you have a total number of volumes >> nb volume per patch
I think this is also what pytorch dataloader are doing ...

The queue has no upper limit,

isn't it too dangerous ? imagine your are in a case with very few augmentation (so that the queue is filled wery quick) , and a large model to compute in the GPU, ... there is a risk that the queue will continuously grow with time ... no ?

The question is about structure: subclass or separate command stream?
Should the stream tools be exported, or should they be hidden?
(e.g. the basic optimizer, or the other basic stream tools, assuming someone writes a more intelligent scheme)
I would prefer one classe, with the different strategy as an argument to the class.

* a command stream. This is just a list of command tuples, but every logic is implemented here. With a well selected stream, you can exactly emulate the current queue behaviour. Or you could do a lot of other magic, see above.

Sorry but I have difficulty to follow you here, and understand the 2 implementations parameter
so strategy = 'basic' -> equivalent to the current implementation
strategy ='merge_incomplet' -> same as current implementation but release a batch before the queue is fully complete (ie as soon as the queue contains n patch (n being the batch size)

may be can you detail a little bit the 2 command stream arguments so that they perform the same random mixing:
In the current implementation queue_size and number_of_patch_per_volume will determine the "local" random mixing between subject in your batch (ie if you have a queue of 32 with 32 patch, then you get only patch from the same subject, so in practice you want queue length to be at least 4 time the number of patch for a batch size of 4)

with 32 patch per subject I want the queue size of 32*4 to be sure to have 4 different subjects. Which parameter control this in your example

cmd_stream = command_stream_generator(len(dataset), 5, 32, strategy='merge_incomplete') queue = ParallelQueue(dataset, sampler, cmd_stream, patch_queue_size = 32, seed=0, max_no_subjects_in_mem=5, patch_per_subject=32, strategy='merge_incomplete')

I do not understand what max_num_in_memory is controlling
is max_no_subjects_in_mem similare to queue_length / nb_patch_per_volume in the current queue implementation

I hope @fepegar will merge it, anyway I will be please to test it
many thanks

@dvolgyes
Copy link
Contributor Author

Hi

What happens if in the end less than V volumes are present? E.g. 4 volume long queue, 11volumes in tota.

* Should any previous volume be reused, if yes, which one?
  The first ones (extra loading), or keeping from the previous batch (not discarding)?

* Could the final queue be (almost) twice as large? E.g. just merging the last two batches,
  forming a super-batch with 7 volumes, and randomize the volumes here?

Or should be the last two batches merged, split into half, and make two shorter batch,
but both of them are at least V/2 size. (E.g. 4 and 2 would make 3+3).

I would go with the last patch beeing shorter, no need to make it smarter because in pratice you have a total number of volumes >> nb volume per patch
I think this is also what pytorch dataloader are doing ...

The comparison is a bit misleading. So let's assume i have 4 subjects, 4 patches per subject, and a patch queue of 1.
Well, you can use @fepegar 's animation to figure out what happens.
In the proposed solution this isn't enough paramter, there is one more very important one: the maximum number of subjects in memory at the same time. Let's assume it is 1. In this case the command stream should look like this:
Load subject 1 (L1)
Take patch from subject 1 (P1), and three more times (P1,P1,P1)
Delete subject one (D1)
Load subject 2, take 4 patches, delete the subject (L2 P2 P2 P2 P2 D2),
repeat with subject 3 and 4.
You always get 16 patches, all subjects yield all 4 patches. The issue is randomization (and some parallelization).
However, if you have maximum 2 subjects in memory at the same time, then you can take the first two lines,
and mix it in any way, except that it should start with load and end with delete, e.g.
L1 L2 P1 P2 P2 P1 P1 P2 P1 P2 D1 D2
So you need larger amount of subjects in memory for diversity. But these subjects can be quite huge, depending on the dataset.

OK, then what is a patch queue?
The above ones run in parallel processes. The most expensive part is loading the volumes from disk and performing augmentation. In the end it is quite cheap to extract a patch. However, many patches can eat up memory quickly.
E.g. I could load 64 patches from a single subject, or even hundreds of patches, if you think in huge histopathology images.
We also need many volumes, so the product could be too large to keep in memory.
Here comes the patch queue.
Patch queue contains the preprocessed patches. The above command stream runs in parallel, and every volume
tries to extract a patch, and when it's due it will try to insert it into the patch queue. If the queue is full,
the process will wait. This makes the memory management easy, and also the order reproducible.
When the data loader queries the queue, it will yield patches from the patch queue.
If this queue has at least as many patches as the batch size, then the response is basically immediate.
(That was the goal.) Let's assume the queue is 7 items long, and the batch size is 4 and the queue is full.
Every process is on hold. The dataloader asks for 4 items, get's it, and the queue reduced to 3.
So the parallel processes wake up, and start producing items. Maybe they will yield 1, maybe 2 or 3,
depending how much time they have until the next query. But the point is the immediate wake up and
parallel background loading. Well, which ones will be loaded? The ones next in the command stream.
Assuming patch queue size 3, batch size 2. First, it will try to fill the queue, and would stop
at the | sign. ! marks the already processed items / commands
Loading first:
L1 L2 ! P1 P2 P2 | P1 P1 P2 P1 P2 D1 D2
Getting two items:
L1 L2 P1 P2 ! P2| P1 P1 P2 P1 P2 D1 D2
Refilling the queue:
L1 L2 P1 P2 ! P2 P1 P1| P2 P1 P2 D1 D2

The queue has no upper limit,
isn't it too dangerous ? imagine your are in a case with very few augmentation (so that the queue is filled wery quick) , and a large model to compute in the GPU, ... there is a risk that the queue will continuously grow with time ... no ?

I was imprecise, but it will not grow forever. In fact, most of the time it uses less memory than the classic queue.
There are two parts: the command stream processor and the patch queue.
The command stream processor has no internal logic. It follows instructions directly, and it doesn't question it.
If it get's 15 subjects to load, it will load it. Or hundred. Whatever. It is a very dumb virtual machine,
it executes the program it gets, regardless of memory or anything. Just like an operating system which tries to fulfill
any memory requests. It is the program's responsibility to ask not too much memory, otherwise it will crash.
In this regards, the command stream queue has no upper limit. It will take all of the commands at once,
process them in order, etc.
However, this doesn't mean that there is no overall limit. There is, actually there are two limits.
One is the generation of the command stream. The memory usage is proportional with the number of load instructions
without delete instructions. So L1 L2 L3 L4 L5 | D1 D2 D3 D4 D5 would keep 5 subjects in memory at the marker time,
and then it will release them. If you order it into L1 D1 L2 D2 L3 D3 L4 D4 L5 D5 order, then there is always maximum 1 volume/subject in memory.
So it matters how you generate the command stream. I provide two basic logic, but anything could be implemented with these primitives, including but not limited to the current implementation. (I do not like it, hence my proposal, so i did not make effort to replicate it, but it could be.)
The second memory constraint is the patch queue. After a subject is loaded, you can extract patches. But if you extract
all patches without limit at the same time, then you might run out of memory. This patch queue has a limit, but it is not exactly the same as the queue size in the original implementation. Even if it is just 1 item long, it would preload all volumes in the background, and would yield a reasonable good performance.

The issue comes when you reach the end samper. E.g. you took 4 patches, and now you need a new volume.
You should scale your patch queue this way: it should contain enough batches, e.g. 20-30 batches, so
when your network processes these batches, the background processes will have enough time to
preload the new volumes.

It makes no sense to go above number_of_subjects_in_memory * number_of_patches_per_subject.

So unlike the traditional queue where i have no idea about performance optimization, here the message is clear:

  • longer patch queue always helps, but after a limit, it will saturate, and there is no speed up, just more memory usage.
  • extracting patches is quick, if you increase the number of patches per volume, you get higher speed

However,

  • increasing number of volumes increases the parallel background work at peak load moments, e.g. when you
    load all volumes at the same time (in the beginning, etc.), and also total memory use, but a reasonably
    large number is required for getting enough diversity.

Sorry but I have difficulty to follow you here, and understand the 2 implementations parameter
so strategy = 'basic' -> equivalent to the current implementation
strategy ='merge_incomplet' -> same as current implementation
but release a batch before the queue is fully complete (ie as soon as the queue contains n patch (n being the batch size)

Unfortunately, no. Both of them release any item as soon as possible, one by one, the data loader creates batches from them.
The difference is in the order of patches.
For variety, you need volumes in the memory from subjects.
When they are loaded, they can be used for getting N samples from each.
What happens if the number of subjects (S) is not divisible with this memory constraint.
In the basic approach: take S subjects and take N samples from each, shuffle these NS. For the last part,
take s < S subjects, and shuffle only these s
N patches. At the same time, maximum S subjects are in the memory.
In the merge approach the two last block are merged. So for the first blocks, everything is the same,
but for the last part, the block size will be S+s (<2S), take N samples from each, and these will be shuffled,
yielding (S+s)*N items.

Why is that important? As i said, you need some time to load the new volumes, so you pump up number of patches per subject and the queue size. Batch size might be e.g. 2, but the samples per subject is 64. In this case, the last 32 batch would get data only from one subject. However, if you merge the last two blocks, then you still get variety in the last block.

So very briefly: all implementations work in the background and yield items as soon as possible.
The question is how to generate order and keep high performance. The classic queue implementation
is inefficient, but if you push efficiency too far, you might get low variety for final batches.
If you want to counteract these effects, it might require more memory.

may be can you detail a little bit the 2 command stream arguments so that they perform the same random mixing:
In the current implementation queue_size and number_of_patch_per_volume will determine the "local" random mixing between subject in your batch (ie if you have a queue of 32 with 32 patch, then you get only patch from the same subject, so in practice you want queue length to be at least 4 time the number of patch for a batch size of 4)

with 32 patch per subject I want the queue size of 32*4 to be sure to have 4 different subjects. Which parameter control this in your example

For the 4 subjects, you need to set set max_no_subjects_in_mem to 4, and the patch_per_subject needs to be set to 32.
But it will not make it sure. E.g. if you have 5 subjects, then the last part will have only 1 subject in the basic strategy,
hence the strategy question: what do you do at the end, e.g. if the number of subjects is a prime?
Queue length: here it becomes a performance question. You do not need 32 long queue. You will
get the same results with e.g. 1 long queue too. But performance will suffer. The recommended size
is as large as much time you need to load the new volumes, but maximum subjects*patches (128 in the example).

cmd_stream = command_stream_generator(len(dataset), 5, 32, strategy='merge_incomplete') queue = ParallelQueue(dataset, sampler, cmd_stream, patch_queue_size = 32, seed=0, max_no_subjects_in_mem=5, patch_per_subject=32, strategy='merge_incomplete')
I do not understand what max_num_in_memory is controlling
is max_no_subjects_in_mem similare to queue_length / nb_patch_per_volume in the current queue implementation

Yes, it is quite similar, but not exactly the same, especially when the different numbers are not multiplies of
each other.

The conceptual difference is this briefly: the old queuing is split into two parts: a command generation and a command processing. The processing is very generic. The generation is the way to implement strategies, etc.
The design idea was to keep it flexible for future hacking, but it can make use more complicated, so that is why i
asked for feedback. But i feel it is a bit too complex issue, so i will just cut it a bit, push an ugly code
with many logging to screen, and i will clean up based on the feedback.

I hope @fepegar will merge it, anyway I will be please to test it
many thanks

@romainVala
Copy link
Contributor

thanks for the detail, I think I now get a better understanding
ok, I see now the difference between 'basic' and 'merge incomplete'
It seems to me a quite small problem, but this is because I work with a lot of subject, so it seems to not be a big deal if the last batch have example from the same subject (1 batch over 1000 ...)
but ok if you have very few subject then it may matter

One last precision about the patch queue,
The current implementation has some paralelisation thanks to the pytorch dataloader use to fill in the queue. so the maximum num_worker you have the maximum parallelisation you get, BUT the more cpu memory you will need, ...
Do you still use pytorch dataloader in your strategie, and will num_worker also increase cpu memory ?
is there a parameter to specify how many cpu process you use ?

@dvolgyes
Copy link
Contributor Author

That is a good point. First, in the current implementation there is no way to set number of processes, it will be equal to number of cores at max, or the number of subjets in memory +1 whichever is the smaller. (It uses a process pool, so the exact number is a bit complicated, but it tries to use all cores.) But the dataset is shared, so the number of processes here does not affect the memory usage. The number of worker process is a different story, see below.

(It might be that there is a issue somewhere here, i haven't found any, but parallel programming with process pools is always tricky, so i don't promise anything. :) )

As of performance: i don't use dataloader anymore in the code, which has two benefits: it simplifies code, but also you can set the num_workers in the (main) dataloader to nonzero. If it is larger than 1, then it will consume more memory.

Most likely the optimal size is 1, so not using the main thread, but having 1 subprocess, and inside this worker, the queue will generate N new subprocesses for its own job. If you use more than 1, you will double, tripple, etc., the memory use, but you will not gain speed, because the subprocesses will have less cores, so they will be proportionally slower. (Not to mention disk IO) But unlike the Queue implementation, it will not crash, just it will be slow.
(Practically: everything will be doubled, trippled, etc., if num_workers>1, but the queue itself does proper parallelization, so
at least you don't get bad results, but it will be slower than using 0 or 1 workers.)

And because randomization is part of the queueing, so the shuffle option makes no sense in the dataloader.
Shuffling only applies to random accessible datasets, and this one is a stream. So i changed the type,
but probably the original Queue also should be IterableDataset and not regular Dataset.

Beware that for trying the code, you need to pip-install loguru and unsync (added to the requirements).

@romainVala
Copy link
Contributor

Hi thanks for the precision,
I like the number of cores being number of subject in memory,
it is indeed the "minimal" requirement to be optimal,

About num_worker=1 in the main process, I wonder if it will not limit the number of visible cpu for this subprocess to 1

but you would have notice it, so I guess not ...

@dvolgyes
Copy link
Contributor Author

Hi,

Number of visible CPUs: i use the pure python multiprocessing tools, so pytorch don't limit (and cannot limit) the number of cores/processes. Also i got pretty decent results even with low number of subjects. (e.g. 3 subjects on a 16 core machine).

I am not sure about optimal number of parallel subjects, maybe disk IO, or memory transfer speed, or whatever will limit the performance, and maybe on a 16 core machine the optimal number is like 8 or 12 or whatever. I guess this also depends on the specs, like storage, cpu, memory frequency, memory lanes, etc.

Try it, i think it is easy to set up if you had the classic queue before, and let me know how it works.
Also, yesterday a few sampling improvements were committed, so now the old queue must be the largest performance limitation.
I would try a few volumes and a large number of patches per volume, like 96 with a long queue, like 96, and see how it works. (Assuming it fits into memory. If doesn't, then scale down.)
(I haven't figured it out how to detect / indicate out of memory, but this one is also not clear in base pytorch.)

I will clean up the logging when there is a consensus that the algorithm works, and the details are clear enough.

@romainVala
Copy link
Contributor

Hi @dvolgyes

thanks for providing the new queue, it looks great
I made a few test
I have access to different type of computer, so different setting , which makes the comparison difficult.

Figure_time_para_queue

In blue is the torchio.queue with length 80, 16 patch /volume and num_worker=6
In orange and green is the new parallelQueue orange (num_worker=0) green (num_worker=1) with 16 patch 16 subject in memory and queue size 64

So there is an improvement, but I have other test that does not show it, (I currently double check and report later)

I have some memory issue that is growing during the iteration:
I log the cpu memory with

        main_memory = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
        child_memory = resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss
        self.log('******** CPU Memory Usage  ********')
        self.log(f'Peak: {main_memory + child_memory} kB')

unfortunately this does not work with num_worker>0 (I get a small number not correct compare to the info from htop)
(Any suggestion to get a correct memory reporting ?)
when I use it with num_worker=0, it seems correct, and then I see the peak memory growing through iteration ( a small step every ~ 100 iterations ... ),
not sure if it is a problem with my code or with the parallelQueue.
I wonder if it could come for the queue size (64 in my example) which has not the time to be filled at the beginning and that is going to be filled more and more during the training ... ? Would there be a way to log this queue size, or even better the maximal memory needed by the current setting (after a few iteration could it give an estimation ?)

Last point I was impress by the logging you make, usually with pytorch multiple num_worker, I do not get a proper meaning full error, here with your stuff I get a very nice report (with the variable content !!! amazing !)
the bad part is that I do not understand how the error is possible .. (it happened after 1000 iteration, unfortunately on torchio code part I modified ... so I may be wrong ...

Also there is an improvement I am quite fare from a 100% working gpu, i guess this is a limitation of my settings, where time preparing the data is >> to computation time in GPU, so I need more cpu (and cpu memory) to go faster ... that is why I currently test on an other computer that have more cpus.

@romainVala
Copy link
Contributor

Ok, previous timing were done on a cluster, with only 12 cpu
I now run similar model on local machine which has 48 cpu
Figure_time_para_queue2

torchio queue : length 64 and 8 sample per volume
parallelQueue : 16 patch per volume and 16 subject in memory; 64 queue length

given that for torchio.queue I ask 8 patch per volume but 16 for parallelQueue, it should be harder
never the less with numworker =8 (AND 48 cpu cores) I get better performance

My understanding is that although the basic queue is not optimal, putting it in torch DataLoader with 8 num worker, make the data availability faster ... but I should be at least as fast with the parallelQueue I must have miss something ...

@dvolgyes
Copy link
Contributor Author

There are a few questions, especially about the last graph:
do you have the same number of subjects for the two queues?
I mean: i am pretty sure that still the disk io/memory processing limits the parallel queue.
It seems that you only use 8 volumes for the regular queue, and 16 for the parallel.

In my experience, the patch extraction is around a factor of 50-100 faster than loading the volumes.
The number of patches per volume does not increase memory consumption,
only the queue length and the number of volumes in memory does.
Also, the queue length does not slow down processing, it doesn't need warmup time to be filled.
Finally, the patch size for me is around 64-100 times smaller than the volumes.
(128x128x128 vs. 512x512x500-900)
So if i extract 64 patches, still not necessarily the whole volume is covered.
(Actually, i also think in a patch extractor which mixes grid patches and random patches.)

Assuming you have the same volume and patch sizes as me, and a similar machine. :)
(I know it is unlikely.)
So for me, queue size of 100 is approximately equals with 1 volume, so with N volumes, i never go above
N*100, but i try to keep it close to this number.
Second, the number of patches for me as approximately the same as the ratio of the volume and patch sizes,
so let's say 96.
This basically doubles memory usage, so i need to set the number of volumes in memory to half.
Given you example, i would try:

  • 8 volumes in memory at the time,
  • 96 patches
  • let's say 4*96 queue length (but i would experiment with anything between 64 and 1024).
    (For me, the 128x128x128 patch consumes 8MB, so a 1024 long queue would be 8GB CPU RAM,
    while my volumes easily consume from dozens of MB to 0.5GB each.)

Basically this is the issue: loading a volume takes T seconds. If everything is parallel, it is still T seconds.
The full queue should be able to feed the network with patches for T seconds, so that the next
volumes could be loaded in the background. The breaks in the orange and green lines
indicate that the system still mostly waits for data loading.
I would guess from the time difference between the beginning of a ramp and the end,
and from the time difference at the jump that you can and should increase patch numbers
and the queue length at least with a factor of 4. But it might be that even more drastic
measures could help.

Anyway, i would recommend to experiment with larger queues, it should fit into memory (if not, reduce the
number of subjects in memory).
There is a slight cheat, and i will think on this one, that is when the regular queue starts to preload
the next volume as soon as the first one was finished (because of the parallel dataloader).

The parallel queue can do the same, but it needs to be defined what you mean by "subjects in memory".
So e.g. the command stream can start loading N volumes, and when they are done,
the network can start load the next N, and yield patches from the first batch.
This is the "strategy part" i mentioned in the command stream.
Using G for indicating groups:
LG1 LG2 PG1 PG1 ... DG1 DL3 Dp2 ...
The nice part would be if i could send a signal somehow that the LG1 group is finished, so LG2 will not be started in parallel with the LG1 command, just right after it. I need to think on this part, but long story short: it can be done,
and even in the current form it can be optimized, if we sacrifice memory.

(Speaking of memory: with parallel processes, actualy with many of them, i am not quite sure how to measure it.
I can and i will add some profiling parts, like the logging the current queue occupancy, average length, average time with 0 elements, etc., but it is nontrivial.)

Side remark: unfortunately, nibabel is not very intelligent about caching, etc, so if you use
compressed volumes (.nii.gz) you should decompress them. Most likely your disk IO is faster
than your compression algorithm. (There are exceptions, so measure it.)

You can also experiment with parallel prefetching from the data loader in your trainer.
This might or might not help. It was written because of machine learning pipelines,
but the data loader in pytorch improved in the meantime:
https://github.com/justheuristic/prefetch_generator

So, could you please perform a few more tests,
especially with much larger number of patches / volume, and much longer queue?

@dvolgyes
Copy link
Contributor Author

Hi,

I added a new implementation, let's call it double buffering.
The idea is more or less the same as in the pytorch dataloader:
start loading the new subject as soon as the first is in the memory.
memory-wise this is equivalent with the Queue's dataloader which also preloads the next,
but it doubles the memory requirement.
However, it improves speed quite a bit.
All previous advices above all apply, but now there is an extra parameter,
called double_buffer, and it is set to True by default.
In my example, it improved speed quite much, but please, test it.
With same number of volumes in memory, now i am quite confident that the
ParallelQueue beats the Queue in most cases, and in worst case, they perform equally.

Also, unfortunately, i need an extra shared counter for some accounting,
which means even more parallel programmimng stuff, namely some locks and shared values.
I hope it will not cause any issue, i tried to keep it minimal, but if you experience
any freeze, let me know.

@romainVala
Copy link
Contributor

Hello
sorry to be so slow for testing, but it is not an easy task, it takes times, and it demands a fine tunning for each computer type ...
I could not test with larger number of volumes in memory since I got problem with 8 (even 4) volume in memory

The first problem is that I have a king of CPU memory cumulation that is growing a lot (during iteration) when I use your queue. (it seems worst with the new implementation, logical since you double the queue ...)

I am not sure if it is due by your code, or if it is an error of my code, that is worst when I use the paraQueue than when I use the original one. So I end up with training again with the torchio.Queue. I seems I do also see the cpu max memory growing during iteration, but in a fewer amount, so I can managed it, and I do not get memory kill on the cluster ...
I am still trying to find out where this memory increase comes from ...

A second point which is quite annoying with your paralleleQueue is when a dataloader get an error,
I get a nice reporting (that's great) but the main process is not killed, so it continue waiting for that thread, doing nothing.
This is very annoying in a cluster environment, Since I get a facture, for the code doing nothing
Is there a way to raise the error in the main process ?

Many thanks
I'll keep you informed

@dvolgyes
Copy link
Contributor Author

Hi,

Memory: since it is also present (somewhat) in the classic queue, i would guess it is somewhere in your code,
but every refactoring of mine also means some extra checks, i might found in mine too.
(I thought about some absolute memory limits, like e.g. 20GB, but it turned out, it is quite hard to guesstime in python, not even counting the issues from different platforms.)
Python has a few surprising memory issues, that is basically the fact that it uses reference counting,
and that in a single mega-loop variables can hold references forever.
E.g. something like this would keep memory for long time:
for training_sample in tr_dataloader:
....
for validation_sample in validation_loader:
...
Instead, if you put them into functions, then they will release the variables when the context ends.
My issue usually: collecting some metrics into a list/dict, and forgetting that it grows forever,
a local variable which escapes the intended scope, and stay alive forever, like the above 'training_sample',
which is defined in the for loop, but remains active after the loop too. (It is a bit strange after my C++ past.)
These also might help to track down memory issues:
https://objgraph.readthedocs.io/en/stable/
https://github.com/zhuyifei1999/guppy3
Just in case, i usually try to put everything into functions, and before/after training/validation i call a garbage collection.
something like this pseudo-code:

import gc
for e in range(epochs):
   gc.collect()
   train(dataloader, summary_writer, etc.)
   gc.collect()
   validation(dataloader, summary_writer, etc.)
   gc.collect()

ParallelQueue v2: I had a different issue, this is the fact that it uses a lot of locks and communication queues,
and turned out these are Unix-files on a linux system, which is fine until you realize that there is a limit for open files for a process, and this is quite small (usually 1024), so i got into troubles after a few thousand iterations because reference counting didn't find all. Garbage collection helps somewhat, but also kills performance.
So whatever happens, v2 will be dropped / reworked.

Exception handling: i see, this is a good point. The design of loguru was that the main process could handle dead processes, e.g. if it is a worker process, it could be restarted, etc. But i see how this leads to issues in machine learning. The logging never meant to be permanent, it is just for debugging, but in next iteration i will try to make the exception propagating further in order to stop the whole training.

Side note: in clusters it might be quite annoying to monitor event, e.g. crashed training. If you are allowed to use internet access (not always obvious), you could utilize this:
https://github.com/liiight/notifiers

Basically, you can write a short wrapper notifying yourself about major events, e.g. training start (assuming you have a batch system where it takes long time), or adding an extra call after your training, so you would know it ended.

@dvolgyes
Copy link
Contributor Author

I need a few day break (i have some other pressing issues), but i will be back soon with a new variant.
Besides memory consumption, there are two major concerns currently:

  • number of open files (queues, etc.)
  • accumulating garbage in the worker processes
    and some minors:
  • graceful exception handling / logging
  • performance (too much data transfer because of the queues)

There are two directions i consider:

  • going somewhat back to the original queue imlementation using sub-dataloaders
  • using asyincio co-routines instead of queues when possible.
    This isn't thread safe, so needs some extra protection, but also doesn't copy data that much.

Side remark: my data in (.nii files) files are very slow to read somehow.
So i am considering a dataset where the meta-information is kept in memory, and the tensors are cached on disk in a faster format. (E.g. memory mapped float16 binary. float32 is needed for the computation, but not for the input data).
Any feedback / pointer is appreciated. The point why i mention it here: the caching could be part of the queue.
In this case, the transformations should not be passed to the dataset (otherwise they will be always the same, no random augmentation). Or could be a helper function / tool for building your own dataset. Adding to the queue simplifies dataset creation, you could use it on existing datasets, but also complicates design. Adding manually to your dataset creates more lines of code, but probably easier structure. I lean towards the second, and keep it separate from the queue, but i am open to suggestions.

Does anybody has his/her own measurements about time of loading subjects vs time of preprocessing/augmentation?
(Maybe augmentations need more work in order to speed them up.)

@romainVala
Copy link
Contributor

. The logging never meant to be permanent, it is just for debugging,

Yes of course for the different logging step, but The logging related to the error, is very good and convinient for understanding what is going on, So if it could be kept ... and @fepegar if there could be a way to generalize it to all torchio,

@romainVala
Copy link
Contributor

Side remark: my data in (.nii files) files are very slow to read somehow.

I can imagine with huge dimension that it can be hard ...
Although for different reason I am facing similar problem
In my application (standard T1 matrix around 200^3) it is not a big deal, the real difficulty comes when I start to use specific transform RandomMotion and Elastic deformation, require a lot of cpu work, and I get some acceptable performance only on my local computer, which have 48 core AND 192G of RAM (but the cluster GPU i have only a few 3 to 10 with limited RAM, which make it almost impossible to train in reasonable time)

To solve it I started trying to split the process in two parts,
1_ apply random transform and save the final transform volume in nifti. Run it massively on the cpu cluster
2_ run de model training on the saved samples
Of course it cost a lot on disk storage, but it is possible (especially for transient disk space

It seems there is a lot of python library to automatically handle the caching, but I do not have many experience.
Not sure if it is in line with what you have in mind, but I would be interested to gain insight on this topic too ...

No worry for the time, ( I have also have a lot of less interesting work to catch up ...)

@fepegar
Copy link
Owner

fepegar commented Oct 23, 2020

Hi both. I just want to say that I really want to look at this issue and corresponding PR, but I want to properly dedicate time to it and these last weeks have been very difficult (MICCAI, flying, self-isolating, moving, PhD...). I will go through this as soon as I can.

@dvolgyes
Copy link
Contributor Author

Hi,
I also got stuck with other problems, but i think there are two parts, one is the queueing system,
and one is the transformation time. I have no specific idea yet, but maybe moving majority of the transformations into the GPU would help. See kornia , which implements all transformations as pytorch layers. Well, of course, the issue is elastic transformation and similar stuff which is not necessarily make sense in a patch, because it should be volume-wise. (Maybe some ideas can be borrowed from kornia, but it it for 2d, nonmedical images.)

Well, on the other hand, data loader cannot use GPUs, that is a limitation by pytorch+multiprocessing. We could experiment with returning full volumes, e.g. 4-6, with a regular pytorch dataloader, n workers, etc., and having a GPU based patch extraction tool,
which takes the volumes, and yields patches. Something like this:

for e in range(epochs):
    model.train()
    for data in dataloader:
        for patches in Queue(data): # internally sends it to gpu, transforms, extracts, and copies back to cpu ram
             prediction = model(patches['images'])
             loss ...

@fepegar
Copy link
Owner

fepegar commented Oct 29, 2020

So it seems that you're not yet convinced with your implementation. If you'd still like to have this in the package, we could add a contrib submodule and put it there until you're happy with it, the docs are complete, etc. If #311 is not ready to be merged, you could make it a draft PR for now.

I keep thinking about how to optimize transforms, loading, etc. There are some issues with multiprocessing in the new SimpleITK (SimpleITK/SimpleITK#1239), but maybe if they're fixed, we'll get better times for RandomElasticDeformation. I don't know how crucial this transform is, anyway. Do you have any quantitative comparison? I.e., comparing a run with and without it. That will help you decide whether they're worth it.

I believe kornia does have some 3D support now.

@romainVala
Copy link
Contributor

I like the idea, of GPU transform: I do see it useful for whole brain approach, where you can not gain speed from the queue. As you said for patch based, the pytorch multiprocessing is intrinsically CPU.
So I guess this should be an independent issue. ...

Intensity transform should be already GPU compatible since they rely only on torch. The difficulty comes with spatial transform

For resampling and affine there are torch utiliity grid_sample and affine_grid. I tried once to play with it, and I suffer to get the same convention os nibable resampling (from the same affine) but it is doable.
Once you have the resampling, Elastic deformation should not be too far ...

I did not make any comparison but anyway it will be application dependent, so I do think it is an important transformation to keep for sure !. From a theoretical point of view it does allow to create a really variety of geometry (which you can not get with only affine) for this I do consider it is important
I find this work really great (and an amazing opportunity for torchio )
https://arxiv.org/abs/2004.10282
non linear deformation it then crucial for training without data ! (but the right transforms ...), (may be it is worth to look at there implementation, in pytorch too, to have an alternative implementation to elastic deformation)

Then you have motion which rely on fft, here again there are gpu version,

But it is a non negligible work ...

@dvolgyes
Copy link
Contributor Author

Hi,
About my implementation: yeah, i am not sure about it, but at the current stage, it is quite messy.
I will make a new, cleaned up version, which will be memory heavy. (Basically: i have access to lot of memory,
and this way i can beat the classic queue.) When it is done, i will submit it, and we can decide about details, like where should it go. But right now i am still quite busy, unfortunately, so it will take some time.
But in the meantime, you could consider if you want logging. Last time i used loguru, which works well, i quite like it, and i found it useful for debugging, but i will NOT add logging to the final version, unless you decide otherwise.

@romainVala
Copy link
Contributor

Hello,
I like loguru too (although I only test it with your queue implementation), why then do you not want it in the final version ?

@dvolgyes
Copy link
Contributor Author

Loguru: that is simple: it is not my project. It is very confusing if a project used multiple logging mechanisms, or only one subproject suddenly starts emitting log messages. Logging is a high level decision, also the log levels, the amount of information you need, etc., so i will not introduce a new dependency and a new logging scheme into the final product. :)

@fepegar
Copy link
Owner

fepegar commented Feb 14, 2024

Closing for now unless there's more activity. Thanks everyone!

@fepegar fepegar closed this as not planned Won't fix, can't repro, duplicate, stale Feb 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants