Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Change the way NDArrayIter handle the last batch #12285

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,4 @@ List of Contributors
* [Aaron Markham](https://github.com/aaronmarkham)
* [Sam Skalicky](https://github.com/samskalicky)
* [Per Goncalves da Silva](https://github.com/perdasilva)
* [Cheng-Che Lee](https://github.com/stu1130)
188 changes: 123 additions & 65 deletions python/mxnet/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
from .ndarray.sparse import array as sparse_array
from .ndarray import _ndarray_cls
from .ndarray import array
from .ndarray import concatenate
from .ndarray import arange
from .ndarray.random import shuffle as random_shuffle
from .ndarray import concat

class DataDesc(namedtuple('DataDesc', ['name', 'shape'])):
"""DataDesc is used to store name, shape, type and layout
Expand Down Expand Up @@ -601,6 +599,22 @@ class NDArrayIter(DataIter):
...
>>> batchidx # Remaining examples are discarded. So, 10/3 batches are created.
3
>>> dataiter = mx.io.NDArrayIter(data, labels, 3, False, last_batch_handle='roll_over')
>>> batchidx = 0
>>> for batch in dataiter:
... batchidx += 1
...
>>> batchidx # Remaining examples are rolled over to the next iteration.
3
>>> dataiter.reset()
>>> dataiter.next().data[0].asnumpy()
[[[ 36. 37.]
[ 38. 39.]]
[[ 0. 1.]
[ 2. 3.]]
[[ 4. 5.]
[ 6. 7.]]]
(3L, 2L, 2L)

`NDArrayIter` also supports multiple input and labels.

Expand Down Expand Up @@ -633,8 +647,11 @@ class NDArrayIter(DataIter):
Only supported if no h5py.Dataset inputs are used.
last_batch_handle : str, optional
How to handle the last batch. This parameter can be 'pad', 'discard' or
'roll_over'. 'roll_over' is intended for training and can cause problems
if used for prediction.
'roll_over'.
If 'pad', the last batch will be padded with data starting from the begining
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is pad and roll_over different, it is not clear in the documentation? In both it would seem you are taking data from the first batch of off the next epoch and adding it to the current last batch

Copy link
Contributor Author

@stu1130 stu1130 Aug 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let say data look like this [1,2,3,4,5,6,7,8,9,10] with batch_size 3
pad would be like [1,2,3],...[7,8,9],[10,1,2], while roll_over would be [1,2,3],...[7,8,9] and second iteration would be [10,1,2], [3,4,5], [6,7,8] after calling reset().
I've updated example starting from line 610

Copy link
Contributor

@chinakook chinakook Aug 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, It's so clear with an example.

If 'discard', the last batch will be discarded
If 'roll_over', the remaining elements will be rolled over to the next iteration and
note that it is intended for training and can cause problems if used for prediction.
data_name : str, optional
The data name.
label_name : str, optional
Expand All @@ -653,28 +670,20 @@ def __init__(self, data, label=None, batch_size=1, shuffle=False,
raise NotImplementedError("`NDArrayIter` only supports ``CSRNDArray``" \
" with `last_batch_handle` set to `discard`.")

# shuffle data
if shuffle:
tmp_idx = arange(self.data[0][1].shape[0], dtype=np.int32)
self.idx = random_shuffle(tmp_idx, out=tmp_idx).asnumpy()
self.data = _shuffle(self.data, self.idx)
self.label = _shuffle(self.label, self.idx)
else:
self.idx = np.arange(self.data[0][1].shape[0])

# batching
if last_batch_handle == 'discard':
new_n = self.data[0][1].shape[0] - self.data[0][1].shape[0] % batch_size
self.idx = self.idx[:new_n]
self.idx = np.arange(self.data[0][1].shape[0])
self.shuffle = shuffle
self.last_batch_handle = last_batch_handle
self.batch_size = batch_size
self.cursor = -self.batch_size
self.num_data = self.idx.shape[0]
# shuffle
self.reset()

self.data_list = [x[1] for x in self.data] + [x[1] for x in self.label]
self.num_source = len(self.data_list)
self.num_data = self.idx.shape[0]
assert self.num_data >= batch_size, \
"batch_size needs to be smaller than data size."
self.cursor = -batch_size
self.batch_size = batch_size
self.last_batch_handle = last_batch_handle
# used for 'roll_over'
self._cache_data = None
self._cache_label = None

@property
def provide_data(self):
Expand All @@ -694,74 +703,123 @@ def provide_label(self):

def hard_reset(self):
"""Ignore roll over data and set to start."""
if self.shuffle:
self._shuffle()
self.cursor = -self.batch_size
self._cache_data = None
self._cache_label = None

def reset(self):
if self.last_batch_handle == 'roll_over' and self.cursor > self.num_data:
self.cursor = -self.batch_size + (self.cursor%self.num_data)%self.batch_size
"""Resets the iterator to the beginning of the data."""
if self.shuffle:
self._shuffle()
# the range below indicate the last batch
if self.last_batch_handle == 'roll_over' and \
self.num_data - self.batch_size < self.cursor < self.num_data:
# (self.cursor - self.num_data) represents the data we have for the last batch
self.cursor = self.cursor - self.num_data - self.batch_size
else:
self.cursor = -self.batch_size

def iter_next(self):
"""Increments the coursor and check current cursor if exceed num of data."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc string does not make sense and has mistakes. What is num of data?

self.cursor += self.batch_size
return self.cursor < self.num_data

def next(self):
if self.iter_next():
return DataBatch(data=self.getdata(), label=self.getlabel(), \
pad=self.getpad(), index=None)
else:
"""Returns the next batch of data."""
if not self.iter_next():
raise StopIteration
data = self.getdata()
label = self.getlabel()
# iter should stop when last batch is not complete
if data[0].shape[0] != self.batch_size:
# in this case, cache it for next epoch
self._cache_data = data
self._cache_label = label
raise StopIteration
return DataBatch(data=data, label=label, \
pad=self.getpad(), index=None)

def _getdata(self, data_source, start=None, end=None):
"""Load data from underlying arrays."""
assert start is not None or end is not None, 'should at least specify start or end'
start = start if start is not None else 0
end = end if end is not None else data_source[0][1].shape[0]
s = slice(start, end)
return [
x[1][s]
if isinstance(x[1], (np.ndarray, NDArray)) else
# h5py (only supports indices in increasing order)
array(x[1][sorted(self.idx[s])][[
list(self.idx[s]).index(i)
for i in sorted(self.idx[s])
]]) for x in data_source
]

def _getdata(self, data_source):
def _concat(self, first_data, second_data):
"""Helper function to concat two NDArrays."""
return [
concat(first_data[0], second_data[0], dim=0)
]

def _batchify(self, data_source):
"""Load data from underlying arrays, internal use only."""
assert(self.cursor < self.num_data), "DataIter needs reset."
if self.cursor + self.batch_size <= self.num_data:
return [
# np.ndarray or NDArray case
x[1][self.cursor:self.cursor + self.batch_size]
if isinstance(x[1], (np.ndarray, NDArray)) else
# h5py (only supports indices in increasing order)
array(x[1][sorted(self.idx[
self.cursor:self.cursor + self.batch_size])][[
list(self.idx[self.cursor:
self.cursor + self.batch_size]).index(i)
for i in sorted(self.idx[
self.cursor:self.cursor + self.batch_size])
]]) for x in data_source
]
else:
assert self.cursor < self.num_data, 'DataIter needs reset.'
# first batch of next epoch with 'roll_over'
if self.last_batch_handle == 'roll_over' and \
-self.batch_size < self.cursor < 0:
assert self._cache_data is not None or self._cache_label is not None, \
'next epoch should have cached data'
cache_data = self._cache_data if self._cache_data is not None else self._cache_label
second_data = self._getdata(
data_source, end=self.cursor + self.batch_size)
if self._cache_data is not None:
self._cache_data = None
else:
self._cache_label = None
return self._concat(cache_data, second_data)
# last batch with 'pad'
elif self.last_batch_handle == 'pad' and \
self.cursor + self.batch_size > self.num_data:
pad = self.batch_size - self.num_data + self.cursor
return [
# np.ndarray or NDArray case
concatenate([x[1][self.cursor:], x[1][:pad]])
if isinstance(x[1], (np.ndarray, NDArray)) else
# h5py (only supports indices in increasing order)
concatenate([
array(x[1][sorted(self.idx[self.cursor:])][[
list(self.idx[self.cursor:]).index(i)
for i in sorted(self.idx[self.cursor:])
]]),
array(x[1][sorted(self.idx[:pad])][[
list(self.idx[:pad]).index(i)
for i in sorted(self.idx[:pad])
]])
]) for x in data_source
]
first_data = self._getdata(data_source, start=self.cursor)
second_data = self._getdata(data_source, end=pad)
return self._concat(first_data, second_data)
# normal case
else:
if self.cursor + self.batch_size < self.num_data:
end_idx = self.cursor + self.batch_size
# get incomplete last batch
else:
end_idx = self.num_data
return self._getdata(data_source, self.cursor, end_idx)

def getdata(self):
return self._getdata(self.data)
"""Get data."""
return self._batchify(self.data)

def getlabel(self):
return self._getdata(self.label)
"""Get label."""
return self._batchify(self.label)

def getpad(self):
"""Get pad value of DataBatch."""
if self.last_batch_handle == 'pad' and \
self.cursor + self.batch_size > self.num_data:
return self.cursor + self.batch_size - self.num_data
# check the first batch
elif self.last_batch_handle == 'roll_over' and \
-self.batch_size < self.cursor < 0:
return -self.cursor
else:
return 0

def _shuffle(self):
"""Shuffle the data."""
np.random.shuffle(self.idx)
self.data = _shuffle(self.data, self.idx)
self.label = _shuffle(self.label, self.idx)

class MXDataIter(DataIter):
"""A python wrapper a C++ data iterator.
Expand Down
98 changes: 42 additions & 56 deletions tests/python/unittest/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,82 +87,68 @@ def test_Cifar10Rec():
for i in range(10):
assert(labelcount[i] == 5000)


def test_NDArrayIter():
def _init_NDArrayIter_data():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc string?

data = np.ones([1000, 2, 2])
label = np.ones([1000, 1])
labels = np.ones([1000, 1])
for i in range(1000):
data[i] = i / 100
label[i] = i / 100
dataiter = mx.io.NDArrayIter(
data, label, 128, True, last_batch_handle='pad')
batchidx = 0
for batch in dataiter:
batchidx += 1
assert(batchidx == 8)
dataiter = mx.io.NDArrayIter(
data, label, 128, False, last_batch_handle='pad')
batchidx = 0
labelcount = [0 for i in range(10)]
for batch in dataiter:
label = batch.label[0].asnumpy().flatten()
assert((batch.data[0].asnumpy()[:, 0, 0] == label).all())
for i in range(label.shape[0]):
labelcount[int(label[i])] += 1
labels[i] = i / 100
return data, labels

for i in range(10):
if i == 0:
assert(labelcount[i] == 124)
else:
assert(labelcount[i] == 100)
def _test_last_batch_handle(data, labels):
idx = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this being initialized here? It is not needed as you do for idx in range(... later.

last_batch_handle_list = ['pad', 'discard' , 'roll_over']
labelcount_list = [(124, 100), (100, 96), (100, 96)]
batch_count_list = [8, 7, 7]

for idx in range(len(last_batch_handle_list)):
dataiter = mx.io.NDArrayIter(
data, labels, 128, False, last_batch_handle=last_batch_handle_list[idx])
batch_count = 0
labelcount = [0 for i in range(10)]
for batch in dataiter:
label = batch.label[0].asnumpy().flatten()
assert((batch.data[0].asnumpy()[:, 0, 0] == label).all()), last_batch_handle_list[idx]
for i in range(label.shape[0]):
labelcount[int(label[i])] += 1
batch_count += 1
# assert result
assert(labelcount[0] == labelcount_list[idx][0]), last_batch_handle_list[idx]
assert(labelcount[8] == labelcount_list[idx][1]), last_batch_handle_list[idx]

assert batch_count == batch_count_list[idx]
# shuffle equals True for sanity test
dataiter = mx.io.NDArrayIter(
data, labels, 128, True, last_batch_handle=last_batch_handle_list[idx])
batch_count = 0
for _ in dataiter:
batch_count += 1
assert batch_count == batch_count_list[idx]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a test where you verify that the data has indeed been shuffled

Copy link
Contributor Author

@stu1130 stu1130 Aug 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I can't come up with a good solution to test if shuffle work. Shuffle testing will make unit test nondeterministic. if you have any idea, I would love to implement that

Copy link
Member

@anirudhacharya anirudhacharya Aug 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Effectively testing shuffling will be like testing a random number generator, which is a very involved problem by itself. We do not have to do that here. What I suggest is to test if we have the same set of elements pre and post shuffling and ensure that they are not in the same order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought there is a tiny chance that the data remain the same after shuffling?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are n elements being shuffled, the chance that the list remains the same after shuffling is 1/n!(assuming unique elements). For example if there are 10 elements in the shuffled list, the probability of the list being unaltered post shuffling is 2.75573192e-7

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing up this issue. As @sandeep-krishnamurthy suggested, I would check if the data points are moved to the right positions based on index array. Within shuffle's implementation, the index array would shuffle first and then we get the data by their shuffled index.


def test_NDArrayIter():
data, labels = _init_NDArrayIter_data()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add doc string for this method. same for other methods in this module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to add doc string for the test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to have a couple of comments describing what use-cases are being tested?

_test_last_batch_handle(data, labels)

def test_NDArrayIter_h5py():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc string?

if not h5py:
return

data = np.ones([1000, 2, 2])
label = np.ones([1000, 1])
for i in range(1000):
data[i] = i / 100
label[i] = i / 100
data, labels = _init_NDArrayIter_data()

try:
os.remove("ndarraytest.h5")
os.remove('ndarraytest.h5')
except OSError:
pass
with h5py.File("ndarraytest.h5") as f:
f.create_dataset("data", data=data)
f.create_dataset("label", data=label)

dataiter = mx.io.NDArrayIter(
f["data"], f["label"], 128, True, last_batch_handle='pad')
batchidx = 0
for batch in dataiter:
batchidx += 1
assert(batchidx == 8)

dataiter = mx.io.NDArrayIter(
f["data"], f["label"], 128, False, last_batch_handle='pad')
labelcount = [0 for i in range(10)]
for batch in dataiter:
label = batch.label[0].asnumpy().flatten()
assert((batch.data[0].asnumpy()[:, 0, 0] == label).all())
for i in range(label.shape[0]):
labelcount[int(label[i])] += 1
with h5py.File('ndarraytest.h5') as f:
Copy link
Member

@anirudhacharya anirudhacharya Aug 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NDArrayIter is supposed to return iterators for mx.nd.NDArray, numpy.ndarray, h5py.Dataset mx.nd.sparse.CSRNDArray or scipy.sparse.csr_matrix. Do we have a test for scipy.sparse.csr_matrix like for h5py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good Catch. will implement that

f.create_dataset('data', data=data)
f.create_dataset('label', data=labels)

_test_last_batch_handle(f['data'], f['label'])
try:
os.remove("ndarraytest.h5")
except OSError:
pass

for i in range(10):
if i == 0:
assert(labelcount[i] == 124)
else:
assert(labelcount[i] == 100)


def test_NDArrayIter_csr():
# creating toy data
num_rows = rnd.randint(5, 15)
Expand Down