Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python Dataset Reader #2414

Merged
merged 25 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
746650a
Add skeleton for new python data reader
fiedorowicz1 Jan 3, 2024
64a76e6
Implement basic functionality
fiedorowicz1 Jan 6, 2024
7eb92a6
Fix initialization for distconv
fiedorowicz1 Jan 7, 2024
9a7a150
Add support for labels
fiedorowicz1 Jan 7, 2024
8fc35e7
Add python library supporting classes
fiedorowicz1 Jan 7, 2024
e6465c5
clang format
fiedorowicz1 Jan 7, 2024
c20d4b4
Raise exception if rank/io parts not set
fiedorowicz1 Jan 8, 2024
88c52bd
Rename to python dataset
fiedorowicz1 Jan 9, 2024
6027f19
Add optional module dir argument to add to path
fiedorowicz1 Jan 10, 2024
05d1395
Add unit tests
fiedorowicz1 Jan 11, 2024
faee18a
Simplify naming
fiedorowicz1 Jan 11, 2024
4702069
Add cosmoflow example and reader helper
fiedorowicz1 Jan 17, 2024
9e4cba8
Update release notes
fiedorowicz1 Jan 17, 2024
44eda6d
Save dataset pickle in work dir
fiedorowicz1 Jan 17, 2024
a2f9a15
Overhaul new data reader to support prefetching multiple samples/batches
fiedorowicz1 Mar 6, 2024
ac2d403
Fix worker index calculation
fiedorowicz1 Mar 7, 2024
5cf06ab
clang-format
fiedorowicz1 Mar 7, 2024
753a52f
Clarify proto comments
fiedorowicz1 Mar 7, 2024
944b90f
Throw error if file fails to open
fiedorowicz1 Mar 7, 2024
de0e5fa
Add docstrings and type hints
fiedorowicz1 Mar 7, 2024
0af89f8
Update CosmoFlow example and enable parallel IO
fiedorowicz1 Mar 7, 2024
fa668c9
Add basic sample size checking, remove label reconstruction, general …
fiedorowicz1 Mar 7, 2024
6c33696
Switch to multiprocessing pool
fiedorowicz1 Mar 20, 2024
63288ac
Implement response shuffling for distconv
fiedorowicz1 Mar 25, 2024
75aac4e
fix typo
fiedorowicz1 Mar 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ReleaseNotes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Experiments & Applications:
Internal features:

I/O & data ingestion:
- Added a new python dataset reader for simple, flexible, and distconv-supported
python data readers.

Build system:

Expand Down
31 changes: 31 additions & 0 deletions applications/physics/cosmology/cosmoflow/cosmoflow_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import numpy as np
from glob import glob
from lbann.util.data import Sample, SampleDims, Dataset, DistConvDataset
import h5py as h5
import os


class CosmoFlowDataset(DistConvDataset):
def __init__(self, data_dir, input_width, num_secrets):
self.data_dir = data_dir
self.input_width = input_width
self.num_secrets = num_secrets
self.samples = glob(os.path.join(data_dir, '*.hdf5'))
self.samples.sort()

def __len__(self):
return len(self.samples)

def __getitem__(self, index) -> Sample:
data = h5.File(self.samples[index], 'r')
slice_width = self.input_width // self.num_io_partitions
slice_ind = self.rank % self.num_io_partitions
full = data['full'][:,
fiedorowicz1 marked this conversation as resolved.
Show resolved Hide resolved
slice_ind*slice_width:(slice_ind+1)*slice_width,
:self.input_width,
:self.input_width].astype(np.float32)
par = data['unitPar'][:].astype(np.float32)
return Sample(sample=np.ascontiguousarray(full), response=par)

def get_sample_dims(self):
return SampleDims(sample=[4, self.input_width, self.input_width, self.input_width], response=self.num_secrets)
23 changes: 22 additions & 1 deletion applications/physics/cosmology/cosmoflow/train_cosmoflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,21 @@
import lbann.contrib.args
import lbann.contrib.launcher
from lbann.core.util import get_parallel_strategy_args
import lbann.util.data
import os
from cosmoflow_dataset import CosmoFlowDataset

def create_python_dataset_reader(args):
"""Create a python dataset reader for CosmoFlow."""

readers = []
for role in ['train', 'val', 'test']:
role_dir = getattr(args, f'{role}_dir')
dataset = CosmoFlowDataset(role_dir, args.input_width, args.num_secrets)
reader = lbann.util.data.construct_python_dataset_reader(dataset, role=role)
readers.append(reader)

return lbann.reader_pb2.DataReader(reader=readers)

def create_cosmoflow_data_reader(
train_path, val_path, test_path, num_responses):
Expand Down Expand Up @@ -133,6 +147,9 @@ def create_synthetic_data_reader(input_width: int, num_responses: int) -> Any:
parser.add_argument(
'--synthetic', action='store_true',
help='Use synthetic data')
parser.add_argument(
'--python-dataset', action='store_true',
help='Use python dataset reader')
parser.add_argument(
'--no-datastore', action='store_true',
help='Disable the data store')
Expand Down Expand Up @@ -220,22 +237,26 @@ def create_synthetic_data_reader(input_width: int, num_responses: int) -> Any:
# optimizer.learn_rate *= 1e-2

# Setup data reader
serialize_io = False
if args.synthetic:
data_reader = create_synthetic_data_reader(
args.input_width, args.num_secrets)
elif args.python_dataset:
data_reader = create_python_dataset_reader(args)
else:
data_reader = create_cosmoflow_data_reader(
args.train_dir,
args.val_dir,
args.test_dir,
num_responses=args.num_secrets)
serialize_io = True

# Setup trainer
random_seed_arg = {'random_seed': args.random_seed} \
if args.random_seed is not None else {}
trainer = lbann.Trainer(
mini_batch_size=args.mini_batch_size,
serialize_io=True,
serialize_io=serialize_io,
**random_seed_arg)

# Runtime parameters/arguments
Expand Down
137 changes: 137 additions & 0 deletions ci_test/unit_tests/test_unit_datareader_python_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import os
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the pytest version?
Would be nice to extend the test infrastructure (maybe at a later PR?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify what you mean here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly. What I meant is that our pytest version of the tests (i.e., @test_util.lbann_test) might have a nicer syntax here. However, it is designed to use the existing Python data reader and may be extended as such to use the dataset reader (for example, with a flag @test_util.lbann_test(dataset=MyObject()) or similar).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should change the test framework to use the new python dataset reader, but I think that should be a separate PR.

import os.path
import sys
import numpy as np
from lbann.util.data import Dataset, Sample, SampleDims, construct_python_dataset_reader

# Bamboo utilities
current_file = os.path.realpath(__file__)
current_dir = os.path.dirname(current_file)
sys.path.insert(0, os.path.join(os.path.dirname(current_dir), 'common_python'))
import tools

# ==============================================
# Objects for Python dataset data reader
# ==============================================
# Note: The Python dataset data reader loads the dataset constructed below.

# Data
class TestDataset(Dataset):
def __init__(self):
np.random.seed(20240109)
self.num_samples = 29
self.sample_size = 7
self.samples = np.random.normal(size=(self.num_samples,self.sample_size)).astype(np.float32)

def __len__(self):
return self.num_samples

def __getitem__(self, index):
return Sample(sample=self.samples[index,:])

def get_sample_dims(self):
return SampleDims(sample=[self.sample_size])

test_dataset = TestDataset()

# ==============================================
# Setup LBANN experiment
# ==============================================

def setup_experiment(lbann, weekly):
"""Construct LBANN experiment.

Args:
lbann (module): Module for LBANN Python frontend

"""
mini_batch_size = len(test_dataset) // 4
trainer = lbann.Trainer(mini_batch_size)
model = construct_model(lbann)
data_reader = construct_data_reader(lbann)
optimizer = lbann.NoOptimizer()
return trainer, model, data_reader, optimizer, None # Don't request any specific number of nodes

def construct_model(lbann):
"""Construct LBANN model.

Args:
lbann (module): Module for LBANN Python frontend

"""

# Layer graph
x = lbann.Input(data_field='samples')
y = lbann.L2Norm2(x)
layers = list(lbann.traverse_layer_graph(x))
metric = lbann.Metric(y, name='obj')
callbacks = []

# Compute expected value with NumPy
vals = []
for i in range(len(test_dataset)):
x = test_dataset[i].sample.astype(np.float64)
y = tools.numpy_l2norm2(x)
vals.append(y)
val = np.mean(vals)
tol = 8 * val * np.finfo(np.float32).eps
callbacks.append(lbann.CallbackCheckMetric(
metric=metric.name,
lower_bound=val-tol,
upper_bound=val+tol,
error_on_failure=True,
execution_modes='test'))

# Construct model
num_epochs = 0
return lbann.Model(num_epochs,
layers=layers,
metrics=[metric],
callbacks=callbacks)

def construct_data_reader(lbann):
"""Construct Protobuf message for Python dataset data reader.

The Python data reader will import the current Python file to
access the sample access functions.

Args:
lbann (module): Module for LBANN Python frontend

"""

dataset_path = os.path.join(work_dir, 'dataset.pkl')

# Note: The training data reader should be removed when
# https://github.com/LLNL/lbann/issues/1098 is resolved.
message = lbann.reader_pb2.DataReader()
message.reader.extend([
construct_python_dataset_reader(
test_dataset,
dataset_path,
'train',
shuffle=False
tbennun marked this conversation as resolved.
Show resolved Hide resolved
)
])
message.reader.extend([
construct_python_dataset_reader(
test_dataset,
dataset_path,
'test',
shuffle=False
)
])
return message

# ==============================================
# Setup PyTest
# ==============================================

work_dir = os.path.join(os.path.dirname(__file__),
'experiments',
os.path.basename(__file__).split('.py')[0])
os.makedirs(work_dir, exist_ok=True)

# Create test functions that can interact with PyTest
for _test_func in tools.create_tests(setup_experiment, __file__, work_dir=work_dir):
globals()[_test_func.__name__] = _test_func
Loading
Loading