Skip to content

Conversation

@adammoody
Copy link
Contributor

@adammoody adammoody commented Aug 4, 2021

This is work in progress, but I'll post it early to get feedback.

This PR includes a few things at once.

Updates megatron/data/indexed_dataset.py to use numpy to compute sample offsets to improve speed while writing an index file.

Adds a new tools/preprocess_dataset_mpi.py script:

  • uses HuggingFace datasets as the source to build (.bin and .idx) input files for megatron
  • uses MPI to distribute work via mpi4py to support multiple nodes
  • --split option to name the HF dataset split name, defaults to train
  • --columns option to specify dataset feature (column) names to process from each row
  • --shuffle option to randomly shuffle data samples
  • --seed for random number generator on shuffle operations
  • --count option to limit the number of selected samples, e.g. --count 10000 to use 10k samples
  • --mpi4py option to instruct script to use mpi4py instead of torch.distributed
  • --torch-backend option to select between gloo/mpi
  • --local_rank to support torch.distributed.launch when using torch.distributed
  • --log-interval to specify seconds between progress messages or 0 to disable

Assuming srun has been configured to launch MPI jobs, one can run this script with something like:

srun -n 320 -N 8 python preprocess_dataset_mpi.py \
       --input openwebtext \
       --output-prefix openwebtext-bert \
       --vocab bert-large-uncased-vocab.txt \
       --dataset-impl mmap \
       --tokenizer-type BertWordPieceLowerCase \
       --split-sentences \
       --shuffle \
       --seed 100

The script can use MPI and mpi4py. It requires that a shared file system exists, like Lustre or GPFS, such that one process can read a file written by another process. In particular, there may be problems on NFS due to client-side caching.

TODO:

  • The resulting merged file is different in size from a file written directly. I need to identify the cause. --
    The size discrepancy was in the index file, which is resolved with the doc_idx fix here
    self._doc_idx.extend( (offset + index.doc_idx)[1:] )
    With the above fix, I verified that both the .bin and .idx files are identical using cmp after disabling the data shuffle.
  • I have not tested reading the resulting input files, so they may be totally bogus right now. -- Resolved after verifying merged files from multiple ranks are identical to the files produced by a single process.
  • Add some more exception handling to prevent MPI deadlocks on problems. -- Resolved by surrounding per-rank I/O step with try/except block and an allreduce to check that all succeeded. Avoids merge if anyone fails, but cleans up per-rank files regardless.
  • Support options for shuffling if desired, like enable/disable shuffle and define a random seed. -- Resolved by adding --shuffle and --seed options in 32fc48f)
  • Use standard convention for setting rank/size in torch.distributed.init_process_group
  • Catch exceptions to avoid MPI deadlocks, but re-raise exceptions where possible
  • Settle on HF_DATASETS_OFFLINE item

In my testing with 320 procs on 8 nodes, start up takes 2 minutes, it takes about 15 minutes for all processes to write their .bin/.idx file, and the merge takes 3 more minutes. It processes the full openwebtext dataset in 20 minutes.

Opening dataset openwebtext
Dataset({
    features: ['text'],
    num_rows: 8013769
})
> building BertWordPieceLowerCase tokenizer ...
 > padded vocab (size: 30522) with 70 dummy tokens (new size: 30592)
Seconds to startup: 146.8052520751953
Vocab size: 30522
Output prefix: openwebtext-6-bert
> building BertWordPieceLowerCase tokenizer ...
 > padded vocab (size: 30522) with 70 dummy tokens (new size: 30592)
Seconds to tokenize: 912.1747498512268
Documents= 8013769 docs/sec= 8785.344037759238
Sentences= 307384162 sent/sec= 336979.46807904245
Bytes= 39386887788 bytes/sec= 43179103.34003862
Merging rank files ...
Merging file openwebtext-6-bert_text_sentence_0
<snip>
Seconds to merge: 169.25763988494873
Merged 320 files into openwebtext-6-bert
Bytes= 17425293230 bytes/sec= 102951295.0898091
Deleting rank files ...

real	20m51.461s
user	0m0.197s
sys	0m0.084s

The file system hosting the source dataset, the intermediate .bin/.idx files, and the final merged file is GPFS, which provides 120GB/s write bandwidth.

@stas00
Copy link
Contributor

stas00 commented Aug 5, 2021

Thank you, @adammoody

@thomasw21 and @ontocord - since you have been working on this area of Megatron recently would you be inspired to work with Adam to find the best outcome?

@stas00
Copy link
Contributor

stas00 commented Aug 5, 2021

Specifically to MPI, won't the same be accomplished if the deepspeed launcher is used to do the communication. I haven't had a chance to read the code yet, so perhaps my question is obviously wrong.

Just trying to avoid a dependency on MPI as it often camouflages invalid ditributed setup, and then the code fails elsewhere where mpi is not installed. So I actively avoid installing it so that I can detect errors early.

For example if you try to run deepspeed in a notebook w/o proper dist-like ENV emulation, the program fails, but with MPI it often still works. I think there are a few other circumstances where it makes things work, obfuscating problems.

@stas00
Copy link
Contributor

stas00 commented Aug 5, 2021

I have not tested reading the resulting input files, so they may be totally bogus right now.

this is just an idea - use a synthetic input made from the same single line replicated multiple times, and then compare the resulting index output with the original script and yours - they should be the same.

But perhaps even normal data will work, since from what I remember Megatron preprocessing script doesn't shuffle data.

@thomasw21
Copy link
Member

thomasw21 commented Aug 5, 2021

So I might get something completely wrong here but:

  • deepspeed doesn't allow to have a low level handling of communication, ie use a map, reduce functions
  • I think under the hood it uses torch.distributed as communication API but cpu map and reduce is not supported by torch.distributed using nccl, but only mpi and gloo https://pytorch.org/docs/stable/distributed.html . I think this script should be ran on a cpu only nodes. So it should all be the same no between deepspeed and this implem? Maybe use torch.distributed instead of mpi4py?

Sorry I might be mixing a lot of concepts together, I'm not super clear on how deepspeed works right now.

@huu4ontocord
Copy link
Contributor

huu4ontocord commented Aug 5, 2021

I am such a github noob that I didn't even see this thread until now :) Yeah, I'm happy to discuss. I actually don't know MPI. HF Datasets does not support easily multi-node cluster processing. I'm trying to get it to work with Dask in my project for data-tooling. I'm not sure if this answers your question. As for multiprocessing, I know that torch has its own fork functions, and doesn't use python's native multiprocessing if that is any help. Ideally we would want to be able to have datasets or any enhancement to it (my version is called datastore) to use different types of communicaiton channel to distribute loads across nodes; torch.distributed, MPI, whatever. You need to be able to serialize the parameters of the map, and tell the node which shard you are working on, and then you need to be able to get back the results, or the .bin/.idx file in the indexed_dataset case somehow. Ideally the channel to get back data would be fast.

@adammoody
Copy link
Contributor Author

I think torch.distributed may have enough collective calls to sub in for mpi4py. I'll look into it. If so, we can likely abstract the collectives that are used.

@adammoody
Copy link
Contributor Author

adammoody commented Aug 5, 2021

I was able to use torch.distributed collectives as another option. It's a bit more cumbersome than the mpi4py code, but it seems to work. Also, one has to use gloo or perhaps mpi as the backend, since the objects being communicated are on the CPU.

The script is currently hardcoded to set the rank and world_size from Open MPI environment variables. Those are set for each process separately by the job launcher I happen to be using. We'll need to make that more general. If you want to test, you'll likely need to change these lines.

https://github.com/adammoody/Megatron-DeepSpeed/blob/7b27853d6f829798e64bd9001dcad9a9da26d067/tools/preprocess_dataset_mpi.py#L318-L321

It expects MASTER_ADDR and MASTER_PORT to be set in the environment.

And I've left MPI in there. It defaults to use mpi4py. One can switch to torch.distributed by setting use_mpi = False near the top of the script. Obviously, if both stay in there long term, this should be a command line option instead.

https://github.com/adammoody/Megatron-DeepSpeed/blob/7b27853d6f829798e64bd9001dcad9a9da26d067/tools/preprocess_dataset_mpi.py#L69

Copy link
Contributor

@huu4ontocord huu4ontocord left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dtype_size = dtype().itemsize
address = 0
pointers = []
def _get_pointers(sizes, npdtype):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand the changes, this is equivlant to the previous python list + loop based approach. Would this make things faster because you don't need to create a list and then convert to numpy? It looks right. Did you test to see if the output of the two methods are the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an aside, I found that pytorch-biggraph also uses both torch.distributed/glood/MPI to communicate, if you are interested:

@ontocord , thanks for your review, and thanks for the pointers to this other project. I'll take a look when I get a chance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be equivalent. I checked by verifying that the resulting files are identical with cmp. Though, I may put that on my list to double check again, just to be sure.

I found that creating this list of pointers was taking a long time when writing out the index file. This list for the full openwebtext dataset turned out to be ~300 million items. The cost seemed to be concentrated in the loop that computes the byte offsets. Converting that loop into numpy calls cut the compute time significantly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aweseome optimization! Thank you.

# allocate a tensor of appropriate size
# initialize tensor with list values on root
rank, size = get_rank_size(args)
if rank == root:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we assuming it will always be int64? If we have large lists but the values are in range int8, int32, etc. would it be more efficient to send based on a smaller size? perhpas as a parameter to bcast? I guess in the case of mpi, it's fixed at int64, and dtype won't do anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a good point. It ultimately depends on the values within the list. In this code, I'm only calling this broadcast function to send the sample index values of the dataset that is being processed, so those values will be in the range [0, num_samples). We could likely size things down given the value of num_samples.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the communication doesn't happen often and is small, prob. not worth optimizing. If its offten and large, we could consider this, I thin. Thanks for the explanation!

j = idx[i]
for key in columns:
# tokenize text for the given sample index
text = dset[j][key]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to increase the speed on each node, you could use dset.map(... batched=True, batch_size=..., num_proc=...) And have each process write to a .bin/.idx file for merging later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ontocord If it really faster if num_proc = 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For multi-process within a node, I have been launching with one process per CPU core. For example, with a launch command like srun -n320 -N8, the job runs on 8 nodes with 40 procs per node. Each of those procs then writes its own .bin/.idx file for merging.

As you say, with some more work, one could launch say one process per node and then use python multiprocessing to run multiple procs on the node. In fact, that strategy could perhaps help with improved load balancing vs. the static work allocation that I'm using.

However, I settled on the simpler model in this first pass. For now, it's up to the user to launch multiple procs per node.

Copy link
Contributor

@huu4ontocord huu4ontocord Aug 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Makes sense to me. FYI, we ran some benchmark (the code of which I can share with you), for various ways to do multiprocessing to make it faster on the machine we were working on. It turns out a queue method was fastest, then multi-reader, multi-writer was next (which is the way you are doing it here) But your mileage may vary depending on your hardware/os. The queuing code was pushed through this PR if you are interested. #18

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thomasw21 I know we've discussed before and my opinion is that you can still run num of processes at more than the number of cores in some cases if you have to wait for disk for some procesess. But if that is not an issue, num_proc=1 doesn't make sense :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right I forgot about that one. Personally I'd suggest not to mix up multiple ways of having multiple processes, I think they are usually a nightmare and the gain is performance is just not worth it for a script we're going to run just a few times. But if you feel it's really worth a shot then maybe please go ahead!

vocab_size=tokenizer.vocab_size)

# merge all ranks into one file
for rank in range(proc_size):
Copy link
Contributor

@huu4ontocord huu4ontocord Aug 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how fast shared disk access is, but if it is a bottleneck, you could do the merging as a /bin/.idx file is available from a node/process of the node. This will require some communication plumbing, unless you just want to scan the disk for files for which the size hasn't changed in awhile or both.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an ideal world, there is not much time for overlap here. All procs get about the same amount of work, and they ideally all finish writing their .bin/.idx files at about the same time.

I think where a bottleneck can show up is that only a single process (rank 0) works to merge the final file. For users who have parallel file systems, like Lustre/GPFS, that final merge step could be done in parallel by having each process write its portion of the data directly into the merge file. That could be worthwhile, though it will require some significant I/O development work in the indexed_dataset class, e.g., to compute the offsets where each process needs to write. That implementation would also require a POSIX-compliant file system, so NFS users would often have to fallback to something where just a single process writes the file.

I think even in my current implementation, NFS users may hit some snags since the .bin/.idx file written by a process on one node may not be immediately visible to rank 0. If that problem surfaces, we could probably work around it by adding a sleep to wait out the NFS cache timeout. For now, I've just got a note in there that the backing file system needs to support shared file access when running on multiple nodes.

Copy link
Contributor

@huu4ontocord huu4ontocord Aug 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. If it's not an issue because load is balanced, then no big deal. I don't really like the way the builder is written because it is not restartable from an existing .bin/.idx file, so I gave up on an idea of doing fancy merging. I find merging is pretty fast actually. It's just "cat" at the end of the day.

For the issue of files in flight, I wrote some code to see if a file has finished loading before I work on it, for what it's worth. Prob has to be modified for files that are not yet visible, because this assumes all files are there and are just being written too. Might also want to pass in a minimum wait incr. You mentioned your system has a 4 min (!) increement between updates.

You have to call it in an iteration (list, or next):

# This is used for seeing if files from a remote drive have finished loading. Files could be in flight while we try to retreive them.
def wait_until_files_loaded(flist, max_tries=120, fs=None): # wait 2 hrs max
  if fs is None:
    fs = os
  if isinstance(flist, str):
    flist = [[flist, 0]]
  else:
    flist = [[f, 0]for f in flist]
  for j in range(len(flist)*max_tries):
    num_done = 0
    for i, val in enumerate(flist):
      if val is None:
        num_done += 1
        continue
      (f, incr) = val
      if incr > max_tries:
        raise RuntimeError("Timed out while trying to wait for file " + str(f))
      size1 = fs.stat(f).st_size
      time.sleep(min(600, 1 + incr))
      incr += 1
      if fs.stat(f).st_size == size1:
        flist[i] = None
        num_done += 1
        yield f
      else:
        flist[i]=[f,incr]
    if num_done == len(flist):
      return
  return

You could do something like


for bin_file in wait_until_files_loaded(all_bin_files):
  idx_file =  next(wait_until_files_loaded(bin_file.replace(".bin", ".idx")))
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah. That could be handy. Thanks. We could gather the actual file size to rank 0 after each process finishes, and then we could have rank 0 wait with code like this until both the file exists and reaches the expected size. Let's keep this in mind for future work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a good plan.

Copy link
Member

@thomasw21 thomasw21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! So I'd like to start off by saying I don't know much about MPI and such. I have a few questions:

  • Is is possible to no send the entire list of indices to everyone? To my understanding, you're sending it all to every rank, and then they use their start and end variable to compute which subset they need to take care of. torch.distributed has a scatter/gather mechanism that seems to be helpful, is there something similar in mpi4py?
  • I think the use of index of not needed as you can do all the things you're doing using indices using dataset. Then the only thing you need to communicate is the start and end index no?
  • In case of failure we probably want the exception message raised or logged somewhere?
  • You're not able to monitor live the current progress right? I think it's worth trying to keep, especially since this can take quite some time.

j = idx[i]
for key in columns:
# tokenize text for the given sample index
text = dset[j][key]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ontocord If it really faster if num_proc = 1?

print("Bytes=", dset_stats[2], "bytes/sec=", byterate)

# allreduce to check whether all ranks wrote their part successfully
success = all_true(args, success)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's very useful no? unless it's used for clean up only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We basically skip the merge if any process fails. We'll also want to print an error message in that case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this can probably go somewhere like def process_shard(rank) l.428

dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
return (tensor.tolist()[0] == size)

def select_sample_list(args, dset_size):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is needed, you can do shuffling/truncation using datasets instead. This would remove the necessity to communicate the idx between ranks I believe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that's true. The main reason I do this work on rank 0 and bcast is to handle randomness. We need all ranks to generate a consistent random sequence. I think we could have each process build the sequence locally if we ensure they all use a consistent random seed.

For really large datasets, there might also be some savings in memory by having one rank build and shuffle the sequence if we replace the bcast with a scatter.

I'll review the datasets API in more detail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thomasw21 , the Datasets API looks pretty rich. However, it'll take some time to comprehend how to use it all. Mind if we push that change to a future PR?

More immediately, do you know if there is a good way to query whether a dataset is cached without having to load it? It seems like there must be a way.

I'm thinking it may be a good idea to suggest that people download/extract the dataset as a separate step, especially for really large datasets. That can be done with a single process rather than burning a lot of cluster CPU time waiting for one process to download/extract. We could detect that condition and print a message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if you have a single rank handling shuffling/truncation you might not even need to share indices, as all you need is start and end, and since those two values can be computed on each rank, you don;t need to communicate anything. I admit I don't think this is really a bottleneck, but I feel this would simplify the the code a bit by removing the indexing mechanism.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind, please create an issue if you don't want to tackle it now, so we can track it and somebody else might take a look.

Concerning the way things are cached:

  • setting HF_DATASETS_OFFLINE = 1 as env variable allows for offline mode, ie if it tries to download, it'll raise an error
  • I don't think there's a simple way to either enforce getting from cache, or check if it exists in cache. Maybe @stas00 knows?

I agree that having a seperate process might be the best idea especially since you lock a lot of ressources in order to just wait it out. (Might also be for the merging part, as only rank=0 is merging everything, and that can take quite some time depending on the size of your dataset, TBD i guess)

Copy link
Contributor

@stas00 stas00 Aug 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your assert is great, Adam.

Why do you think it may break in the future? HF_DATASETS_OFFLINE is now part of the datasets public API.

Unless you refer to something else when you say " it's not a documented interface"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HF_DATASETS_OFFLINE as env variable is documented, not the one from config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for clarifying, @thomasw21! Perhaps that one should be documented too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe they are not meant to be tweaked? The advantage of env variables usually is once they are set in the script, they are set for the whole duration of the script. I'm guessing playing around the config in the script can have undesired consequences?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed a request to both document and define precedence: huggingface/datasets#2776

Copy link
Member

@thomasw21 thomasw21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more minor comments, but no real blocking issue as this is a seperate script that have no impact on the rest of the code mostly. I'll approve as the only critical part for me were the changes in index_dataset.py that impact other scripts, and that looks much better.

Thanks! Awesome work!

print(f"ERROR: At least one process failed to write its file, skipping merge and cleaning up")

# delete per-rank files, do this even on error
print("Deleting rank files ...", flush=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can actually parallel delete, each rank can remove their own file. And all this code can move to remove_shards

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Will change to do a parallel delete.

try:
os.remove(binfile)
except:
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why it can fail, can we add a log here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main thing I was guarding against here is if some process fails before it creates its file, in which case the file for that rank won't exist. This turned up in testing when I was forcing some process failures. I could likely add an os.exists check before the remove to help mitigate that.

Copy link
Member

@thomasw21 thomasw21 Aug 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see! But if you do parallel removal, if it crash it won't remove the files (since it would have already crashed ...). We could wrap everything in try catch ... Also is one process crash, you probably end in deadlock somewhere around success = all_true(args, success) (right before the merge) no? Or does MPI realise that one process has died so it broadcast to only live processes?

I think it's complicated to handle perfectly all failures, maybe we can make the assumption that everything runs well. And if not there might be manual steps to run (I usually just ran rm {prefix}_* from the folder without much care ...). Or just realise that the next run will override your files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I think we'll be able to auto-cleanup the rank files in most cases, like no failures and perhaps some common failures like out of disk space. There are some other cases where we might leave some files behind, and the user can fallback to use rm then.

It depends on the MPI runtime and the job launcher that is used, but MPI normally tears down a job when it detects that some process in the job has failed. I have seem some hangs though in my testing, especially when python exceptions are raised. I haven't figured out why the MPI is failing to tear down those jobs yet.

try:
os.remove(idxfile)
except:
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

print("Bytes=", dset_stats[2], "bytes/sec=", byterate)

# allreduce to check whether all ranks wrote their part successfully
success = all_true(args, success)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this can probably go somewhere like def process_shard(rank) l.428

byterate = numbytes / secs if secs > 0.0 else 0.0
print("Seconds to merge:", merge_end - merge_start)
print(f"Merged {proc_size} files into {args.output_prefix}")
print(f"Bytes=", numbytes, "bytes/sec=", byterate)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

l.510 to here can go to merge_shards method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created three functions for the create, merge, and delete operations of the per-rank files. That cleans up the main body a lot.

@thomasw21
Copy link
Member

thomasw21 commented Aug 9, 2021

I suggest we take that parallel merge into another PR. The reason why is this PR is getting huge. Good thing is we probably can leverage this in all other scripts, so if it really is much faster we probably want to change the other scripts too.

FYI we could run things at 40-45mb/s on 60 workers (though it was single node)

@adammoody
Copy link
Contributor Author

FYI we could run things at 40-45mb/s on 60 MB/s (though it was single node)

Thanks for that data point @thomasw21 . I noticed that kind of rate from your PR comments last week. That's an impressive result. It also suggests that this multi-node implementation must still be leaving performance on the table somewhere.

@adammoody
Copy link
Contributor Author

adammoody commented Aug 9, 2021

The hot spot in my tests seems to be the sample encode cost. I ran a few tests, each with 8 nodes processing a total of 1m samples:

# with --split-sentences
Seconds to tokenize: 330.50502824783325
Bytes= 4910965824 bytes/sec= 14858974.61238457

# without --split-sentences
Seconds to tokenize: 49.16486191749573
Bytes= 4910965824 bytes/sec= 99887717.21236934

# read text sample, but skip encode step
Seconds to tokenize: 0.48508715629577637
Bytes= 4922582861 bytes/sec= 10147831780.560505

Reading the samples from disk without encoding, I see a processing rate of ~10GB/s. That's reasonable given the file system. Encoding those text samples without splitting sentences drops performance to ~100MB/s -- 100x slower. Splitting sentences drops to ~15MB/s -- another factor of 7x.

@adammoody
Copy link
Contributor Author

Alright, I'm settled on this one, assuming you guys are all ok with it. Thanks for the help.

I suppose the name preprocess_dataset_mpi.py is a bit misleading now since we also support torch.distributed and mpi4py is not the default.

Also, maybe replace dataset with hfdataset to be more specific?

I'd be interested to know whether it works for anyone else, either with torch.distributed or mpi4py. BTW, if you don't have mpi4py but you do have MPI installed on your system, mpi4py is typically easy to install. However, it should be compiled against the MPI that's on your system. To do that, I just do something like the following:

<check that mpicc is in your $PATH>
wget https://bitbucket.org/mpi4py/mpi4py/downloads/mpi4py-3.0.3.tar.gz
tar xf mpi4py-3.0.3.tar.gz
cd mpi4py-3.0.3
python setup.py install

Then if srun is configured to launch MPI jobs, one can start the script with something like:

srun -n40 tools/preprocess_dataset_mpi.py ...

In particular, I have been using a command like:

srun -n 40 python tools/preprocess_dataset_mpi.py \
       --input openwebtext \
       --mpi4py \
       --count 10000 \
       --output-prefix openwebtext-gpt2 \
       --dataset-impl mmap \
       --tokenizer-type GPT2BPETokenizer \
       --vocab gpt2-vocab.json \
       --merge-file gpt2-merges.txt \
       --seed 101

I have less experience with torch.distributed.launch, but I was able to do some basic testing with that.

@adammoody
Copy link
Contributor Author

And a simple test program to check that mpi4py is working:

from mpi4py import MPI
rank = MPI.COMM_WORLD.Get_rank()
size = MPI.COMM_WORLD.Get_size()
print(f"rank {rank} of {size}")

And to run:

>>: srun -n2 python mpitest.py
rank 0 of 2
rank 1 of 2

@thomasw21
Copy link
Member

thomasw21 commented Aug 10, 2021

Okay I've tried a couple of things:

  • We can't have multi node + access to the internet on JZ. So we'll most likely only run this code w/o download
  • torch.distributed didn't seem to work for me (something about rank no begin initialised) do I have to se the torch.distributed launcher? Oops I forgot that we have to run python -m torch.distributed.launch {script}
  • mpi version worked for me.

I haven't run benchmark, just checked that I could get it working fairly easily.

Btw, can we have logs that are easier to read? convert bytes/sec to MB/sec. We use the convention that 1MB = 1024 * 1024 bytes.

I was only able to run on stas/openwebtext-10k as I'm having issues running with openwebtext (issues are unrelated to this PR).

@huu4ontocord
Copy link
Contributor

Reading the samples from disk without encoding, I see a processing rate of ~10GB/s. That's reasonable given the file system. Encoding those text samples without splitting sentences drops performance to ~100MB/s -- 100x slower. Splitting sentences drops to ~15MB/s -- another factor of 7x.

This is very interesting. Splitting uses nltk so unless we want to use something else, we are stuck with that. Maybe something from Spacy might be faster, or we could do split on ". ", but that's not the same semantics.

As for the encoding, which encoder did you use? HFBert, or HFGPT or the GPT in the repo? We could optimize some of the encoder in the repo to use the LRU/word length based cache that we did for GPT to squeeze more performance. But unless the performance is a bottlneck, we were good enough with the performance we got on the optimized gpt encoder we used.

@adammoody
Copy link
Contributor Author

Thank you for testing, @thomasw21 . It's good to know that it's working for you and working on another system.

@ontocord , I'm basically using the same Encoder setup from the original preprocess_data.py script. You can see that here:

https://github.com/adammoody/Megatron-DeepSpeed/blob/a8e9b2e4ac03ede99ea9b523c41b2b26edb5783c/tools/preprocess_dataset_mpi.py#L84-L117

For tokenizers, I've tried BertWordPieceLowerCase and GPT2BPETokenizer, but those seemed to be similar in performance. I also wonder whether my system setup (CPU arch + software setup) might be slowing things down. This is on a PowerPC, which may not vectorize as well as x86. And I don't know how well my software is optimized for the actual hardware. The good news is that it is easy for me to parallelize sample encoding across procs, so I can always throw more nodes at the problem.

@adammoody
Copy link
Contributor Author

I think this script will need some work to handle very large datasets. There are a few aspects that come to mind:

  • Ranks will need to only hold a subset of index values at once rather than the full copy, as mentioned in the review. That can be done through gather/scatter or even writing the list of index values into a file, from which each rank reads its portion.
  • It may be necessary to parallelize the shuffle, both for performance and memory consumption reasons.
  • Reading samples from storage may need some I/O optimization work. Right now, each process directly reads its samples from storage. After a shuffle, that may mean each process is reading from storage in random access. It may be more efficient to read in contiguous blocks and distributed data to procs via the network.

We can look at all of that in future work.

For some context, for a large dataset like oscar, how many different examples are there in the training set?

@adammoody
Copy link
Contributor Author

adammoody commented Aug 10, 2021

I've also got a start on an extension to allow this to process files in jsonl format. The ranks work to scan the file to identify the byte offset of newline (\n) characters. They record those offsets in a global index list, and then procs use that list to seek/read each sample. It still needs some work, so I'll save that for a future PR.

adammoody@409e2ef

@adammoody
Copy link
Contributor Author

Btw, can we have logs that are easier to read? convert bytes/sec to MB/sec. We use the convention that 1MB = 1024 * 1024 bytes.

Sure. Just pushed a commit to make that change.

@thomasw21
Copy link
Member

  • Ranks will need to only hold a subset of index values at once rather than the full copy, as mentioned in the review. That can be done through gather/scatter or even writing the list of index values into a file, from which each rank reads its portion.

This would be fixed with migrating to dataset as there would be no need to have the index anymore.

  • Reading samples from storage may need some I/O optimization work. Right now, each process directly reads its samples from storage. After a shuffle, that may mean each process is reading from storage in random access. It may be more efficient to read in contiguous blocks and distributed data to procs via the network.

So I don't know if datasets handles this after shuffling, I'd guess yes.

For some context, for a large dataset like oscar, how many different examples are there in the training set?

It depends on the split you take, those information are available here: https://huggingface.co/datasets/oscar

Copy link
Member

@thomasw21 thomasw21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw I've noticed that HF dataset can sometimes require a subset config for some (for example: wiki40b). Though it should be just an additional argument.

Comment on lines 293 to 295
# import datasets after potentially setting environment variables
from datasets import load_dataset, logging
from datasets.utils.file_utils import OfflineModeIsEnabled
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this at the top with other imports since you have updated to use config.

# Alternatively, one could set datasets.config.HF_DATASETS_OFFLINE.
# This seems to work even after the import statement,
# but it feels like a bigger hack.
import datasets
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Moved those up.

@thomasw21
Copy link
Member

thomasw21 commented Aug 11, 2021

Feel free to merge whenever you can! This will allow us to look at the parallel merge part.

@stas00 stas00 merged commit 5069622 into bigscience-workshop:main Aug 11, 2021
@stas00
Copy link
Contributor

stas00 commented Aug 11, 2021

Thank you @adammoody, @thomasw21 and @ontocord for working on this. Awesome addition!

@adammoody, you may want now to ask the upstreams

To integrate this new tool into their arsenal, so it's not limited to this repo.

or we can add it to the list here: #10 and the upstreams may or may not pick it from there.

@adammoody adammoody deleted the hfdset branch August 11, 2021 22:25
@adammoody
Copy link
Contributor Author

Thanks, @stas00 . Do you find it's easier to upstream with one vs the other?

I've got a few more PRs planned to further this, so I may let it settle a bit before making the request.

@stas00
Copy link
Contributor

stas00 commented Aug 11, 2021

I haven't had a chance to participate closely in your PR, as there are many other burning issues to attend to at the moment, so I am trusting Thomas to have experimented with it and found it to be an excellent contribution.

I will definitely try it on the first occasion I will next need to do pre-processing and will follow up if I run into any issues.

@stas00
Copy link
Contributor

stas00 commented Aug 13, 2021

FY, we now have a test suite. #64

If you'd like you can add a simple test to https://github.com/bigscience-workshop/Megatron-DeepSpeed/blob/main/tests/test_preprocessing.py by copying the test and adjusting it to use preprocess_dataset_mpi.py.

@thomasw21, you may want to do the same for tools/preprocess_data_many_cores.py

hopefully it should be an easy task for both.

If you have any questions please don't hesitate to ask. We will refactor as we go.

adammoody added a commit to adammoody/Megatron-DeepSpeed that referenced this pull request Oct 27, 2022
Co-authored-by: Ammar Ahmad Awan <[email protected]>
Co-authored-by: Conglong Li <[email protected]>
Co-authored-by: Jeff Rasley <[email protected]>
Co-authored-by: Minjia Zhang <[email protected]>
Co-authored-by: Reza Yazdani <[email protected]>
Co-authored-by: yaozhewei <[email protected]>
Co-authored-by: Adam Moody <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants