Skip to content

Commit

Permalink
Fixes apache#303: added recurse_directories to InputSplit::Create (ap…
Browse files Browse the repository at this point in the history
…ache#310)

Co-Authored-By: Saswata Chakravarty <[email protected]>
  • Loading branch information
2 people authored and piiswrong committed Oct 3, 2017
1 parent 088ef15 commit 9627c01
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 12 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ list(APPEND SOURCE "src/io/line_split.cc")
list(APPEND SOURCE "src/io/recordio_split.cc")
list(APPEND SOURCE "src/io/indexed_recordio_split.cc")
list(APPEND SOURCE "src/io/input_split_base.cc")
list(APPEND SOURCE "src/io/filesys.cc")
list(APPEND SOURCE "src/io/local_filesys.cc")

if(USE_HDFS)
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ endif

.PHONY: clean all test lint doc example pylint

OBJ=line_split.o indexed_recordio_split.o recordio_split.o input_split_base.o io.o local_filesys.o data.o recordio.o config.o
OBJ=line_split.o indexed_recordio_split.o recordio_split.o input_split_base.o io.o filesys.o local_filesys.o data.o recordio.o config.o

ifeq ($(USE_HDFS), 1)
OBJ += hdfs_filesys.o
Expand Down Expand Up @@ -65,6 +65,7 @@ line_split.o: src/io/line_split.cc
recordio_split.o: src/io/recordio_split.cc
indexed_recordio_split.o: src/io/indexed_recordio_split.cc
input_split_base.o: src/io/input_split_base.cc
filesys.o: src/io/filesys.cc
hdfs_filesys.o: src/io/hdfs_filesys.cc
s3_filesys.o: src/io/s3_filesys.cc
azure_filesys.o: src/io/azure_filesys.cc
Expand Down
4 changes: 3 additions & 1 deletion include/dmlc/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ class InputSplit {
* \param batch_size a hint to InputSplit what is the intended number
* of examples return per batch. Used only by
* "indexed_recordio" type
* \param recurse_directories whether to recursively traverse directories
* \return a new input split
* \sa InputSplit::Type
*/
Expand All @@ -276,7 +277,8 @@ class InputSplit {
const char *type,
const bool shuffle = false,
const int seed = 0,
const size_t batch_size = 256);
const size_t batch_size = 256,
const bool recurse_directories = false);
};

/*!
Expand Down
6 changes: 4 additions & 2 deletions src/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ InputSplit* InputSplit::Create(const char *uri_,
const char *type,
const bool shuffle,
const int seed,
const size_t batch_size) {
const size_t batch_size,
const bool recurse_directories) {
using namespace std;
using namespace dmlc::io;
// allow cachefile in format path#cachefile
Expand All @@ -99,7 +100,8 @@ InputSplit* InputSplit::Create(const char *uri_,
}
} else if (!strcmp(type, "recordio")) {
split = new RecordIOSplitter(FileSystem::GetInstance(path),
spec.uri.c_str(), part, nsplit);
spec.uri.c_str(), part, nsplit,
recurse_directories);
} else {
LOG(FATAL) << "unknown input split type " << type;
}
Expand Down
28 changes: 28 additions & 0 deletions src/io/filesys.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright by Contributors
#include <queue>

#include "./filesys.h"

namespace dmlc {
namespace io {

void FileSystem::ListDirectoryRecursive(const URI &path,
std::vector<FileInfo> *out_list) {
std::queue<URI> queue;
queue.push(path);
while (!queue.empty()) {
std::vector<FileInfo> dfiles;
ListDirectory(queue.front(), &dfiles);
queue.pop();
for (auto dfile : dfiles) {
if (dfile.type == kDirectory) {
queue.push(dfile.path);
} else {
out_list->push_back(dfile);
}
}
}
}

} // namespace io
} // namespace dmlc
7 changes: 7 additions & 0 deletions src/io/filesys.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ class FileSystem {
* \param out_list the output information about the files
*/
virtual void ListDirectory(const URI &path, std::vector<FileInfo> *out_list) = 0;
/*!
* \brief list files in a directory recursively using ListDirectory
* \param path to the file
* \param out_list the output information about the files
*/
virtual void ListDirectoryRecursive(const URI &path,
std::vector<FileInfo> *out_list);
/*!
* \brief open a stream
* \param path path to file
Expand Down
14 changes: 10 additions & 4 deletions src/io/input_split_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ namespace dmlc {
namespace io {
void InputSplitBase::Init(FileSystem *filesys,
const char *uri,
size_t align_bytes) {
size_t align_bytes,
const bool recurse_directories) {
this->filesys_ = filesys;
// initialize the path
this->InitInputFileInfo(uri);
this->InitInputFileInfo(uri, recurse_directories);
file_offset_.resize(files_.size() + 1);
file_offset_[0] = 0;
for (size_t i = 0; i < files_.size(); ++i) {
Expand Down Expand Up @@ -145,14 +146,19 @@ std::vector<URI> InputSplitBase::ConvertToURIs(const std::string& uri) {
return expanded_list;
}

void InputSplitBase::InitInputFileInfo(const std::string& uri) {
void InputSplitBase::InitInputFileInfo(const std::string& uri,
const bool recurse_directories) {
std::vector<URI> expanded_list = this->ConvertToURIs(uri);
for (size_t i = 0; i < expanded_list.size(); ++i) {
const URI& path = expanded_list[i];
FileInfo info = filesys_->GetPathInfo(path);
if (info.type == kDirectory) {
std::vector<FileInfo> dfiles;
filesys_->ListDirectory(info.path, &dfiles);
if (!recurse_directories) {
filesys_->ListDirectory(info.path, &dfiles);
} else {
filesys_->ListDirectoryRecursive(info.path, &dfiles);
}
for (size_t i = 0; i < dfiles.size(); ++i) {
if (dfiles[i].size != 0 && dfiles[i].type == kFile) {
files_.push_back(dfiles[i]);
Expand Down
7 changes: 5 additions & 2 deletions src/io/input_split_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,12 @@ class InputSplitBase : public InputSplit {
* \param nsplit number of splits
* \param align_bytes the head split must be multiple of align_bytes
* this also checks if file size are multiple of align_bytes
* \param recurse_directories recursively travese directories
*/
void Init(FileSystem *fs,
const char *uri,
size_t align_bytes);
size_t align_bytes,
const bool recurse_directories = false);
// to be implemented by child class
/*!
* \brief seek to the beginning of the first record
Expand Down Expand Up @@ -179,7 +181,8 @@ class InputSplitBase : public InputSplit {
/*! \brief internal overflow buffer */
std::string overflow_;
/*! \brief initialize information in files */
void InitInputFileInfo(const std::string& uri);
void InitInputFileInfo(const std::string& uri,
const bool recurse_directories);
/*! \brief strip continous chars in the end of str */
std::string StripEnd(std::string str, char ch);
};
Expand Down
5 changes: 3 additions & 2 deletions src/io/recordio_split.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ class RecordIOSplitter : public InputSplitBase {
RecordIOSplitter(FileSystem *fs,
const char *uri,
unsigned rank,
unsigned nsplit) {
this->Init(fs, uri, 4);
unsigned nsplit,
const bool recurse_directories) {
this->Init(fs, uri, 4, recurse_directories);
this->ResetPartition(rank, nsplit);
}

Expand Down

0 comments on commit 9627c01

Please sign in to comment.