Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BLOCKING] Handle empty rows in data iterators correctly #5929

Merged
merged 4 commits into from
Jul 25, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,31 @@ class MissingValueHandlingSuite extends FunSuite with PerTest {
val model = new XGBoostClassifier(paramMap).fit(inputDF)
model.transform(inputDF).collect()
}

// https://github.com/dmlc/xgboost/pull/5929
test("handle the empty last row correctly with a missing value as 0") {
val spark = ss
import spark.implicits._
// spark uses 1.5 * (nnz + 1.0) < size as the condition to decide whether using sparse or dense
// vector,
val testDF = Seq(
(7.0f, 0.0f, -1.0f, 1.0f, 1.0),
(1.0f, 0.0f, 1.0f, 1.0f, 1.0),
(0.0f, 1.0f, 0.0f, 1.0f, 0.0),
(1.0f, 0.0f, 1.0f, 1.0f, 1.0),
(1.0f, -1.0f, 0.0f, 1.0f, 0.0),
(0.0f, 0.0f, 0.0f, 1.0f, 1.0),
(0.0f, 0.0f, 0.0f, 0.0f, 0.0)
).toDF("col1", "col2", "col3", "col4", "label")
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("col1", "col2", "col3", "col4"))
.setOutputCol("features")
val inputDF = vectorAssembler.transform(testDF).select("features", "label")
inputDF.show()
val paramMap = List("eta" -> "1", "max_depth" -> "2",
"objective" -> "binary:logistic", "missing" -> 0.0f,
"num_workers" -> 1, "allow_non_zero_for_missing" -> "true").toMap
val model = new XGBoostClassifier(paramMap).fit(inputDF)
model.transform(inputDF).collect()
}
}
8 changes: 4 additions & 4 deletions src/data/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -833,9 +833,9 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread
uint64_t max_columns = 0;

// First-pass over the batch counting valid elements
size_t num_lines = batch.Size();
size_t batch_size = batch.Size();
#pragma omp parallel for schedule(static)
for (omp_ulong i = 0; i < static_cast<omp_ulong>(num_lines);
for (omp_ulong i = 0; i < static_cast<omp_ulong>(batch_size);
++i) { // NOLINT(*)
int tid = omp_get_thread_num();
auto line = batch.GetLine(i);
Expand All @@ -847,7 +847,7 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread
size_t key = element.row_idx - base_rowid;
// Adapter row index is absolute, here we want it relative to
// current page
CHECK_GE(key, builder_base_row_offset);
CHECK_GE(key, builder_base_row_offset);
builder.AddBudget(key, tid);
}
}
Expand All @@ -856,7 +856,7 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread

// Second pass over batch, placing elements in correct position
#pragma omp parallel for schedule(static)
for (omp_ulong i = 0; i < static_cast<omp_ulong>(num_lines);
for (omp_ulong i = 0; i < static_cast<omp_ulong>(batch_size);
++i) { // NOLINT(*)
int tid = omp_get_thread_num();
auto line = batch.GetLine(i);
Expand Down
19 changes: 17 additions & 2 deletions src/data/simple_dmatrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
#include <vector>
#include <limits>
#include <type_traits>
#include <algorithm>

#include "xgboost/data.h"
Expand Down Expand Up @@ -103,13 +104,16 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
auto& offset_vec = sparse_page_.offset.HostVector();
auto& data_vec = sparse_page_.data.HostVector();
uint64_t inferred_num_columns = 0;
uint64_t total_batch_size = 0;
// batch_size is either number of rows or cols, depending on data layout

adapter->BeforeFirst();
// Iterate over batches of input data
while (adapter->Next()) {
auto& batch = adapter->Value();
auto batch_max_columns = sparse_page_.Push(batch, missing, nthread);
inferred_num_columns = std::max(batch_max_columns, inferred_num_columns);
total_batch_size += batch.Size();
// Append meta information if available
if (batch.Labels() != nullptr) {
auto& labels = info_.labels_.HostVector();
Expand Down Expand Up @@ -153,16 +157,27 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
info_.num_col_ = adapter->NumColumns();
}


// Synchronise worker columns
rabit::Allreduce<rabit::op::Max>(&info_.num_col_, 1);

if (adapter->NumRows() == kAdapterUnknownSize) {
info_.num_row_ = offset_vec.size() - 1;
using IteratorAdapterT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good if you could add a comment or two on what the template conditionals are doing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically we are filtering for IteratorAdapter and FileAdapter types. I will add a comment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created #5937 to remember to create a less ad-hoc fix in the future.

= IteratorAdapter<DataIterHandle, XGBCallbackDataIterNext, XGBoostBatchCSR>;
if (std::is_same<AdapterT, IteratorAdapterT>::value
|| std::is_same<AdapterT, FileAdapter>::value) {
info_.num_row_ = total_batch_size;
while (offset_vec.size() - 1 < total_batch_size) {
offset_vec.emplace_back(offset_vec.back());
}
} else {
CHECK((std::is_same<AdapterT, CSCAdapter>::value)) << "Expecting CSCAdapter";
info_.num_row_ = offset_vec.size() - 1;
}
} else {
if (offset_vec.empty()) {
offset_vec.emplace_back(0);
}

while (offset_vec.size() - 1 < adapter->NumRows()) {
offset_vec.emplace_back(offset_vec.back());
}
Expand Down
27 changes: 18 additions & 9 deletions tests/cpp/data/test_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ TEST(Adapter, CSRAdapter) {
EXPECT_EQ(line0.GetElement(1).value, 2);

auto line1 = batch.GetLine(1);
EXPECT_EQ(line1 .GetElement(0).value, 3);
EXPECT_EQ(line1 .GetElement(1).value, 4);
EXPECT_EQ(line1.GetElement(0).value, 3);
EXPECT_EQ(line1.GetElement(1).value, 4);

auto line2 = batch.GetLine(2);
EXPECT_EQ(line2 .GetElement(0).value, 5);
EXPECT_EQ(line2 .GetElement(0).row_idx, 2);
EXPECT_EQ(line2 .GetElement(0).column_idx, 1);
EXPECT_EQ(line2.GetElement(0).value, 5);
EXPECT_EQ(line2.GetElement(0).row_idx, 2);
EXPECT_EQ(line2.GetElement(0).column_idx, 1);
}

TEST(Adapter, CSCAdapterColsMoreThanRows) {
Expand Down Expand Up @@ -73,10 +74,11 @@ class CSRIterForTest {
std::vector<std::remove_pointer<decltype(std::declval<XGBoostBatchCSR>().index)>::type>
feature_idx_ {0, 1, 0, 1, 1};
std::vector<std::remove_pointer<decltype(std::declval<XGBoostBatchCSR>().offset)>::type>
row_ptr_ {0, 2, 4, 5};
row_ptr_ {0, 2, 4, 5, 5};
size_t iter_ {0};

public:
size_t static constexpr kRows { 4 }; // Test for the last row being empty
size_t static constexpr kCols { 13 }; // Test for having some missing columns

XGBoostBatchCSR Next() {
Expand All @@ -88,7 +90,7 @@ class CSRIterForTest {
batch.offset = dmlc::BeginPtr(row_ptr_);
batch.index = dmlc::BeginPtr(feature_idx_);
batch.value = dmlc::BeginPtr(data_);
batch.size = 3;
batch.size = kRows;

batch.label = nullptr;
batch.weight = nullptr;
Expand Down Expand Up @@ -117,16 +119,23 @@ int CSRSetDataNextForTest(DataIterHandle data_handle,
}
}

TEST(Adapter, IteratorAdaper) {
TEST(Adapter, IteratorAdapter) {
CSRIterForTest iter;
data::IteratorAdapter<DataIterHandle, XGBCallbackDataIterNext,
XGBoostBatchCSR> adapter{&iter, CSRSetDataNextForTest};
constexpr size_t kRows { 6 };
constexpr size_t kRows { 8 };

std::unique_ptr<DMatrix> data {
DMatrix::Create(&adapter, std::numeric_limits<float>::quiet_NaN(), 1)
};
ASSERT_EQ(data->Info().num_col_, CSRIterForTest::kCols);
ASSERT_EQ(data->Info().num_row_, kRows);
int num_batch = 0;
for (auto const& batch : data->GetBatches<SparsePage>()) {
ASSERT_EQ(batch.offset.HostVector(), std::vector<bst_row_t>({0, 2, 4, 5, 5, 7, 9, 10, 10}));
++num_batch;
}
ASSERT_EQ(num_batch, 1);
}

} // namespace xgboost
14 changes: 10 additions & 4 deletions tests/cpp/data/test_simple_dmatrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,22 @@ TEST(SimpleDMatrix, FromCSC) {
TEST(SimpleDMatrix, FromFile) {
std::string filename = "test.libsvm";
CreateBigTestData(filename, 3 * 5);
// Add an empty row at the end of the matrix
{
std::ofstream fo(filename, std::ios::app | std::ios::out);
fo << "0\n";
}
constexpr size_t kExpectedNumRow = 6;
std::unique_ptr<dmlc::Parser<uint32_t>> parser(
dmlc::Parser<uint32_t>::Create(filename.c_str(), 0, 1, "auto"));

auto verify_batch = [](SparsePage const &batch) {
EXPECT_EQ(batch.Size(), 5);
auto verify_batch = [kExpectedNumRow](SparsePage const &batch) {
EXPECT_EQ(batch.Size(), kExpectedNumRow);
EXPECT_EQ(batch.offset.HostVector(),
std::vector<bst_row_t>({0, 3, 6, 9, 12, 15}));
std::vector<bst_row_t>({0, 3, 6, 9, 12, 15, 15}));
EXPECT_EQ(batch.base_rowid, 0);

for (auto i = 0ull; i < batch.Size(); i++) {
for (auto i = 0ull; i < batch.Size() - 1; i++) {
if (i % 2 == 0) {
EXPECT_EQ(batch[i][0].index, 0);
EXPECT_EQ(batch[i][1].index, 1);
Expand Down