Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
269af4e
add parallel merge using mpi
adammoody Aug 9, 2021
9ba081b
handle case where some ranks might have 0 items
adammoody Aug 10, 2021
d29a702
add inclusive scan prefix sum
adammoody Aug 11, 2021
ed49713
report more timing info
adammoody Aug 11, 2021
e94f2a0
Update megatron/data/indexed_dataset.py
adammoody Aug 12, 2021
687ff32
Update megatron/data/indexed_dataset.py
adammoody Aug 12, 2021
af59545
rename total size variable for clarity
adammoody Aug 12, 2021
4f648a0
move translation to bin/idx file names a level deeper
adammoody Aug 13, 2021
9f2ba6a
parallel merge for cached dataset
adammoody Aug 13, 2021
72d6c9c
add alltrue function
adammoody Aug 13, 2021
8b67bec
move collectives to new distdata class, add torch.distributed
adammoody Aug 14, 2021
3eca1f3
drop unused prefix_sum function
adammoody Aug 14, 2021
a691b48
allow ranks to pass a list of files to be merged
adammoody Aug 15, 2021
e4a34e2
check that input dataset files exist
adammoody Aug 15, 2021
8b168ca
fix: using wrong doc_idx list for mmap
adammoody Aug 16, 2021
7a02693
move init dist and collectives to distdata class
adammoody Aug 16, 2021
eca2940
add --merge option, move parallel/serial to their own functions
adammoody Aug 16, 2021
b14491d
Merge branch 'main' into pmerge
adammoody Aug 16, 2021
ec11281
Update megatron/data/distdata.py
adammoody Aug 16, 2021
354d13b
Update megatron/data/indexed_dataset.py
adammoody Aug 16, 2021
2dc3f7a
Update megatron/data/indexed_dataset.py
adammoody Aug 16, 2021
980e904
Update megatron/data/indexed_dataset.py
adammoody Aug 16, 2021
ebd20a6
Update megatron/data/indexed_dataset.py
adammoody Aug 16, 2021
69b2f49
Update megatron/data/indexed_dataset.py
adammoody Aug 16, 2021
50de06a
Update megatron/data/indexed_dataset.py
adammoody Aug 16, 2021
af290ad
drop extraneous numpy tolist calls
adammoody Aug 16, 2021
4b58c74
rename self.MPI to mpi4py
adammoody Aug 16, 2021
71a2fdc
handle case where no ranks have elements in their file
adammoody Aug 16, 2021
73d3a24
rename tokenize_start to time_start
adammoody Aug 16, 2021
b9e69be
drop unrelated comment in distdata.min
adammoody Aug 16, 2021
da615c6
add comment why pointers_shift is not None and add assert
adammoody Aug 16, 2021
c42f41f
note why pointers uses sizes count and offset values
adammoody Aug 16, 2021
a3a7d53
can just rely on rank 0 for the leading 0 element
adammoody Aug 17, 2021
163310a
add write_list function
adammoody Aug 17, 2021
01b2be0
determine element size
adammoody Aug 17, 2021
4b6e8ff
add checks for consistent element_size values
adammoody Aug 17, 2021
ea08555
check that at least one rank has a file to merge
adammoody Aug 17, 2021
2524fce
assert that torch backend is gloo or mpi
adammoody Aug 17, 2021
ca14d48
add collectives for assert and raise
adammoody Aug 17, 2021
d482f36
rename to allassert and allraise_if
adammoody Aug 17, 2021
28d76f5
check dtype instead of element_size
adammoody Aug 17, 2021
f706108
add uint32 to element_sizes table
adammoody Aug 17, 2021
f122883
infer dtype from files being merged
adammoody Aug 17, 2021
57c012e
add write_header function to indexed dataset classes
adammoody Aug 17, 2021
eed8327
call write_header internally from IndexedDataset classes
adammoody Aug 17, 2021
a75cfc2
return number of bytes written from write calls
adammoody Aug 17, 2021
afcfcf9
Merge branch 'main' into pmerge
adammoody Aug 17, 2021
74b733a
move scatterv to distdata class
adammoody Aug 17, 2021
dadb51b
add functions to format status and error messages
adammoody Aug 17, 2021
a2f8fa0
defer merge_files_dist to future PR
adammoody Aug 17, 2021
39e6cd7
open files using with, refresh comments
adammoody Aug 18, 2021
2a29d99
rely on default torch datatypes
adammoody Aug 18, 2021
d6fa895
fix some status messages from preprocess script
adammoody Aug 18, 2021
1216c0a
fix: exclusive scan computing pointers list
adammoody Aug 18, 2021
a64d3da
Merge branch 'pointerfix' into pmerge
adammoody Aug 18, 2021
fde439e
fix: exclusive scan to compute mmap pointers list
adammoody Aug 18, 2021
fb274bf
abstraction to index and randomly access jsonl files
adammoody Aug 10, 2021
d428c02
rebase on parallel merge, replace mpi4py with distdata class
adammoody Aug 19, 2021
ba14351
note about seek
adammoody Aug 19, 2021
852fdd0
rename preprocess_dataset_mpi.py to preprocess_data_dist.py
adammoody Aug 19, 2021
61f4b46
update usage comments at top of script
adammoody Aug 19, 2021
18881ae
Merge branch 'pmerge' into mpijson
adammoody Aug 19, 2021
bd6f41f
look for extension .jsonl
adammoody Aug 19, 2021
3488d0b
add progress messages
adammoody Aug 25, 2021
1305fe9
rebuild index if mtime is old
adammoody Aug 26, 2021
6bcac1f
store index values in network byte order
adammoody Aug 26, 2021
813d068
add magic value and format version number to index file
adammoody Aug 26, 2021
0510081
Merge branch 'main' into mpijson
adammoody Aug 26, 2021
1fea302
clean up merge
adammoody Aug 26, 2021
d360313
clean up merge
adammoody Aug 26, 2021
20a43af
pass distctx instead of mpi_comm to IndexedJSON
adammoody Aug 28, 2021
7b08347
move existence test and stat queries to distdata
adammoody Aug 30, 2021
8d448bc
add exception handling
adammoody Aug 30, 2021
6f7519f
edit typos in comments
adammoody Aug 30, 2021
3f9078d
close shared file if open fails on any rank
adammoody Aug 30, 2021
b9aa845
store newline offsets in memory during scan
adammoody Sep 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 87 additions & 1 deletion megatron/data/distdata.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import stat
import numpy as np

import torch
Expand Down Expand Up @@ -142,7 +143,8 @@ def all_sum_(self, vals: np.array):
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)

def open(self, filename, truncate=None):
"""Create, truncate, and open a file shared by all ranks."""
"""Create, truncate, and open a file for writing shared by all ranks."""
f = None

# Don't truncate existing file until all ranks reach this point
self.barrier()
Expand All @@ -162,6 +164,8 @@ def open(self, filename, truncate=None):

except Exception as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead scope everything? It would allow exception handling to be specific? Like if truncate fails then we need to close the file.

err = e
if f is not None:
f.close()

# Verify that rank 0 created the file
self.allraise_if(err)
Expand All @@ -175,6 +179,40 @@ def open(self, filename, truncate=None):
err = e

# Verify that all ranks successfully opened the file
if not self.alltrue(err is None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I overall curious why you need to close the file? Raise should destroy everything no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point. I think the raise should do the trick, since the file handle will not be returned and go out of scope in that case. I'll simplify that code.

# Someone failed to open the file.
# If we succeeded, close our file.
if f is not None:
f.close()

# All raise an exception if anyone did
self.allraise_if(err)

return f

def openread(self, filename):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that just self.open(filename, truncate=None)? I don't see why there's such a function. Will look further in the PR to understand the need for duplication. Is it because you have "rb" here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the open-for-write function, I have that written as a two phase process, where rank 0 creates and truncates the file, then other ranks open the file afterwards. In the open-for-read, all procs open the file simultaneously. I think it's useful to keep the two-phase step for creating the file, because create/truncate can be expensive on some file systems. However, I suspect this can be refactored to have a single open call so that openread can be merged into open for a single call.

"""Open a shared file for reading by all ranks."""
f = None

# Don't attempt to open until all ranks are ready.
self.barrier()

# Open the file for reading on all ranks.
# Catch exception if the rank fails.
err = None
try:
f = open(filename, 'rb')
except Exception as e:
err = e

# Verify that all ranks successfully opened the file
if not self.alltrue(err is None):
# Someone failed to open the file.
# If we succeeded, close our file.
if f is not None:
f.close()

# All raise an exception if anyone did
self.allraise_if(err)

return f
Expand Down Expand Up @@ -218,3 +256,51 @@ def rename(self, srcfile, destfile):

# Verify that the rename succeeded
self.allraise_if(err)

def exists(self, filename):
"""Test whether file exists and broadcast result to all ranks."""
# We'll capture any exception in this variable
err = None

# Rank 0 executes the existence check
exists = False
if self.rank == 0:
try:
exists = os.path.exists(filename)
except Exception as e:
err = e

# Verify that the check succeeded
self.allraise_if(err)

# Get value from rank 0
exists = self.bcast(exists, root=0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically you can have all ranks to run os.path.exists(filename) and remove one bcast.

Also sidenote, we're having a lot of

err = None

        # Rank 0 executes the existence check
        exists = False
        if self.rank == 0:
            try:
                do_something
            except Exception as e:
                err = e

        # Verify that the check succeeded
        self.allraise_if(err)

can we make a helper and factorise that code somewhere? You could pass a method as an argument. (there might not even be a need for allraise_if anymore.

Copy link
Contributor Author

@adammoody adammoody Sep 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll see if I can create a helper routine to account for that pattern. I tend to use this pattern for a couple of reasons.

One is that having rank 0 do the check and bcast the result tends to be more scalable than having all ranks do the check directly. For example, a stat call invokes communication between the client and file system server, which is a remote process on networked file systems like Lustre/GPFS. With P procs, the direct method can induce O(P) messages and time at the server. Having rank 0 do the check and bcast requires one query to the server and then O(log P) time to spread the result, assuming a tree-based bcast.

A second benefit is that it guarantees that all procs see a consistent result. As an example, imagine that someone deletes the file while the exist check is running. When doing direct queries, some procs might get a result that says the file exists while others see it as not existing. That leads the procs to take different code branches later on. With a single query, everyone gets the same state. Though it might still be wrong, at least all procs work from a consistent state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay this makes sense to me. Thanks for the great explanation!

return exists

def stat(self, filename, field):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we need this helper, can't all ranks run os.start(filename)[field]. Granted you're running that code a lot, but it removes all te communication, but we can remove a lot of code by doing this. Same comment for exists

"""Lookup field from stat on file and broadcast to all ranks."""
# We'll capture any exception in this variable
err = None

# Rank 0 does the actual stat call
val = None
if self.rank == 0:
try:
val = os.stat(filename)[field]
except Exception as e:
err = e

# Verify that the stat succeeded
self.allraise_if(err)

# Get value from rank 0
val = self.bcast(val, root=0)
return val

def filesize(self, filename):
"""Lookup filesize and broadcast to all ranks."""
return self.stat(filename, stat.ST_SIZE)

def mtime(self, filename):
"""Lookup file mtime and broadcast to all ranks."""
return self.stat(filename, stat.ST_MTIME)
Loading