Skip to content

Commit

Permalink
Merge pull request apache#16 from dato-code/improve_sframe_iter
Browse files Browse the repository at this point in the history
Improve sframe iter
  • Loading branch information
Jay Gu committed Mar 29, 2016
2 parents 2a12c3a + d8a78a2 commit 5dbcf9f
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 123 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,4 @@ tags

miniconda.sh
deps_version
*.noseids
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ endif

# SFrame flexible_type
FLEXIBLE_TYPE = $(ROOTDIR)/flexible_type
LIB_DEP += $(FLEXIBLE_TYPE)/build/libflexible_type.a
LIB_DEP += $(FLEXIBLE_TYPE)/build/libflexible_type.a
LDFLAGS += -lpng -ljpeg -lz

# plugins
include $(MXNET_PLUGINS)
Expand Down
4 changes: 2 additions & 2 deletions flexible_type/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ clean:
flexible_type: build/libflexible_type.a

test: build/flexible_type_test
OBJS = $(addprefix build/, flexible_type/flexible_type.o image/image_type.o)
OBJS = $(addprefix build/, flexible_type/flexible_type.o image/image_type.o image/jpeg_io.o image/png_io.o)

build/libflexible_type.a: $(OBJS)
ar crv $@ $(filter %.o, $?)
Expand All @@ -27,4 +27,4 @@ build/%.o: $(SFRAME_SRC)/%.cpp
$(CXX) $(CFLAGS) -c $< -o $@

build/flexible_type_test: test/flexible_type_test.cpp
$(CXX) $(CFLAGS) $< -o $@ -L build -lflexible_type
$(CXX) $(CFLAGS) $< -o $@ -L build -lflexible_type -lpng -ljpeg -lz
21 changes: 15 additions & 6 deletions include/mxnet/c_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,21 +227,30 @@ MXNET_DLL int MXNDArrayLoad(const char* fname,
MXNET_DLL int MXNDArraySyncCopyFromCPU(NDArrayHandle handle,
const void *data,
size_t size);
/*!
* \brief
* \param handle Handle of NDArray
* \param idx the offset of the dest array for write
* \param batch_size size of current batch
* \param field_length_p array contain the size (number of floats) of ith data field. Length of the array is "size".
*/
struct SFrameCallbackHandle {
NDArrayHandle handle;
size_t idx;
size_t batch_size;
size_t* field_length_p;
};

/*!
* \brief Perform a synchonize copy by using as SFrame callback
* This function will call WaitToWrite before the copy is performed.
* This is useful to copy data from existing memory region that are
* not wrapped by NDArray(thus dependency not being tracked).
*
* \param callback_handle pointer to struct of callback function state
* \param data the data source to copy from
* \param size the element size to copy
* \param callback_handle pointer to struct of callback function state
*/
struct SFrameCallbackHandle {
NDArrayHandle handle;
size_t idx;
size_t batch_size;
};
MXNET_DLL int MXNDArraySyncCopyFromSFrame(const void *data,
size_t size,
void* callback_handle);
Expand Down
4 changes: 3 additions & 1 deletion include/mxnet/ndarray.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,13 @@ class NDArray {
* \param size field lenth of data
* \param idx index in batch
* \param batch_size total batch size
* \param field_length_p size (in number of floats) of each data element in the row
*/
void SyncCopyFromSFrame(const graphlab::flexible_type *data,
size_t size,
size_t idx,
size_t batch_size) const;
size_t batch_size,
size_t* field_length_p) const;
/*!
* \brief Do a synchronize copy to a continugous CPU memory region.
*
Expand Down
99 changes: 84 additions & 15 deletions python/mxnet/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
from .base import c_array, c_str, mx_uint, py_str
from .base import DataIterHandle, NDArrayHandle
from .base import check_call, ctypes2docstring
from array import array as _array
from .ndarray import NDArray
from .ndarray import array
from .ndarray import _copy_from_sarray, _copy_from_sframe

DataBatch = namedtuple('DataBatch', ['data', 'label', 'pad', 'index'])


class DataIter(object):
"""DataIter object in mxnet. """

Expand Down Expand Up @@ -356,36 +358,57 @@ def getpad(self):
else:
return 0

try:
import graphlab as gl
except:
try:
import sframe as gl
except:
pass


class SFrameIter(DataIter):
def __init__(self, sframe, data_field, data_shape, label_field=None, batch_size=1):
def __init__(self, sframe, data_field, label_field=None, batch_size=1):
"""
Single data input, single label SFrame iterator
Iterator over from SFrame
Parameters
----------
sframe: SFrame object
source SFrmae
source SFrame
data_field: string or list(string)
select fields of training data. For image or array type, only support string
data_shape: tuple
input data shape
label_field: string (optional)
label field in SFrame
batch_size: int
batch_size: int (optional)
batch size
"""

super(SFrameIter, self).__init__()
if not isinstance(sframe, gl.SFrame):
raise TypeError
if not (isinstance(data_field, str) or isinstance(data_field, list)):
raise TypeError
if not (label_field is None or isinstance(label_field, str)):
raise TypeError

if type(data_field) is str:
data_field = [data_field]

self._type_check(sframe, data_field, label_field)
self.data_field = data_field
self.label_field = label_field
self.data_sframe = sframe[data_field]
if label_field != None:
if label_field is not None:
self.label_sframe = sframe[label_field]

# allocate ndarray
data_shape = list(data_shape)
inferred_shape = self.infer_shape()
data_shape = list(inferred_shape["final_shape"])
data_shape.insert(0, batch_size)
self.data_shape = tuple(data_shape)
self.label_shape = (batch_size, )
self.field_length = inferred_shape["field_length"]
self.data_ndarray = array(np.zeros(self.data_shape))
self.label_ndarray = array(np.zeros(self.label_shape))
self.data = _init_data(self.data_ndarray, allow_empty=False, default_name="data")
Expand All @@ -399,6 +422,7 @@ def __init__(self, sframe, data_field, data_shape, label_field=None, batch_size=
def provide_data(self):
"""The name and shape of data provided by this iterator"""
return [(k, tuple([self.batch_size] + list(v.shape[1:]))) for k, v in self.data]

@property
def provide_label(self):
"""The name and shape of label provided by this iterator"""
Expand All @@ -409,13 +433,59 @@ def reset(self):
self.cursor = 0
self.has_next = True

def _copy(self, start, end, bias=0):
if isinstance(self.data_field, list):
_copy_from_sframe(self.data_sframe, self.data_ndarray, start, end, bias)
def _type_check(self, sframe, data_field, label_field):
if label_field is not None:
label_column_type = sframe[label_field].dtype()
if label_column_type not in [int, float]:
raise TypeError('Unexpected type for label_field \"%s\". Expect int or float, got %s' %
(label_field, str(label_column_type)))
for col in data_field:
col_type = sframe[col].dtype()
if col_type not in [int, float, _array, gl.Image]:
raise TypeError('Unexpected type for data_field \"%s\". Expect int, float, array or image, got %s' %
(col, str(col_type)))

def _infer_column_shape(self, sarray):
dtype = sarray.dtype()
if (dtype in [int, float]):
return (1, )
elif dtype is _array:
lengths = sarray.item_length()
if lengths.min() != lengths.max():
raise ValueError('Array column does not have the same length')
else:
return (lengths.max(), )
elif dtype is gl.Image:
first_image = sarray.dropna()[0]
return (first_image.channels, first_image.height, first_image.width)

def infer_shape(self):
ret = {"field_length": [], "final_shape": None}
features = self.data_sframe.column_names()
assert len(features) > 0
if len(features) > 1:
# If more than one feature, all features must be numeric or array
shape = 0
for col in features:
colshape = self._infer_column_shape(self.data_sframe[col])
if len(colshape) != 1:
raise ValueError('Only one column is allowed if input is image typed')
shape += colshape[0]
ret["field_length"].append(colshape[0])
ret["final_shape"] = (shape,)
else:
_copy_from_sarray(self.data_sframe, self.data_ndarray, start, end, bias)
if isinstance(self.label_field, str):
_copy_from_sarray(self.label_sframe, self.label_ndarray, start, end)
col_shape = self._infer_column_shape(self.data_sframe[features[0]])
ret["final_shape"] = col_shape
length = 1
for x in col_shape:
length = length * x
ret["field_length"].append(length)
return ret

def _copy(self, start, end, bias=0):
_copy_from_sframe(self.data_sframe, self.data_ndarray, start, end, self.field_length, bias)
if self.label_field is not None:
_copy_from_sarray(self.label_sframe, self.label_ndarray, start, end, 1, bias)

def iter_next(self):
if self.has_next:
Expand Down Expand Up @@ -448,7 +518,6 @@ def getpad(self):
return self.pad



class MXDataIter(DataIter):
"""DataIter built in MXNet. List all the needed functions here.
Parameters
Expand Down
41 changes: 27 additions & 14 deletions python/mxnet/ndarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,31 +62,44 @@ def _new_alloc_handle(shape, ctx, delay_alloc, dtype=mx_real_t):
ctypes.byref(hdl)))
return hdl

try:
import sframe as gl
except:
pass

try:
import graphlab as gl
except:
pass


class SFrameCallBackHandle(ctypes.Structure):
_fields_ = [('handle', ctypes.c_void_p), ('idx', ctypes.c_ulonglong), ('batch_size', ctypes.c_ulonglong)]
_fields_ = [('handle', ctypes.c_void_p),
('idx', ctypes.c_ulonglong),
('batch_size', ctypes.c_ulonglong),
('field_length_p', ctypes.POINTER(ctypes.c_ulonglong))]

def _copy_from_sframe(sf, arr, start, end, bias=0):

def _copy_from_sframe(sf, arr, start, end, field_length, bias=0):
assert isinstance(sf, gl.SFrame)
callback = _LIB.MXNDArraySyncCopyFromSFrame
callback.argtypes = [ctypes.c_void_p, ctypes.c_ulonglong, ctypes.c_void_p]
callback.restype = ctypes.c_int
addr = ctypes.cast(callback, ctypes.c_void_p).value
callback_handle = SFrameCallBackHandle(arr.handle.value, bias, (end - start))
try:
import graphlab as gl
except:
import sframe as gl
assert isinstance(sf, gl.SFrame)
num_fields = sf.num_columns()
c_field_length_arr = (ctypes.c_ulonglong * num_fields)()
for i in range(num_fields):
c_field_length_arr[i] = int(field_length[i])
callback_handle = SFrameCallBackHandle(arr.handle.value, bias, (end - start),
ctypes.cast(c_field_length_arr, ctypes.POINTER(ctypes.c_ulonglong)))
gl.extensions.sframe_callback(sf, addr, ctypes.addressof(callback_handle), start, end)

def _copy_from_sarray(sa, arr, start, end, bias=0):
try:
import graphlab as gl
except:
import sframe as gl

def _copy_from_sarray(sa, arr, start, end, field_length, bias=0):
assert isinstance(sa, gl.SArray)
sf = gl.SFrame({'__tmp__': sa})
_copy_from_sframe(sf, arr, start, end, bias)
_copy_from_sframe(sf, arr, start, end, [field_length], bias)


def waitall():
"""Wait all async operation to finish in MXNet
Expand Down
2 changes: 1 addition & 1 deletion src/c_api/c_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ int MXNDArraySyncCopyFromSFrame(const void *data,
const graphlab::flexible_type *flex_data
= reinterpret_cast<const graphlab::flexible_type*>(data);
static_cast<NDArray*>(callback_handle->handle)->SyncCopyFromSFrame(flex_data,
size, callback_handle->idx, callback_handle->batch_size);
size, callback_handle->idx, callback_handle->batch_size, callback_handle->field_length_p);
++callback_handle->idx;
API_END();
}
Expand Down
Loading

0 comments on commit 5dbcf9f

Please sign in to comment.