Skip to content

Commit

Permalink
Increase perfomance of BulkAppend and BulkFlush (apache#14067)
Browse files Browse the repository at this point in the history
* Better bulkappend

* Fix lint
  • Loading branch information
ptrendx authored and vdantu committed Mar 31, 2019
1 parent 3344954 commit 66d1c3c
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions src/engine/threaded_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,10 @@ class ThreadedEngine : public Engine {
BulkStatus& bulk_status = *BulkStatusStore::Get();
std::swap(bulk_status.bulk_size, bulk_size);
if (bulk_status.count >= bulk_status.bulk_size) BulkFlush();
if (!bulk_status.functions) {
bulk_status.functions.reset(new std::vector<SyncFn>());
}
bulk_status.functions->reserve(bulk_size);
return bulk_size;
}

Expand All @@ -416,7 +420,7 @@ class ThreadedEngine : public Engine {
/*! \brief context of current ops */
Context ctx;
/*! \brief current op functions */
SyncFn fn;
std::shared_ptr<std::vector<SyncFn>> functions;
/*! \brief constant variables */
std::vector<VarHandle> const_vars;
/*! \brief mutable variables */
Expand Down Expand Up @@ -472,15 +476,12 @@ class ThreadedEngine : public Engine {
std::vector<VarHandle> const& const_vars,
std::vector<VarHandle> const& mutable_vars) {
BulkStatus& bulk_status = *BulkStatusStore::Get();
if (!bulk_status.functions) {
bulk_status.functions.reset(new std::vector<SyncFn>());
}
bulk_status.functions->push_back(exec_fn);
if (!bulk_status.count) {
bulk_status.ctx = exec_ctx;
bulk_status.fn = std::move(exec_fn);
} else {
auto prev_fn = std::move(bulk_status.fn);
bulk_status.fn = [exec_fn, prev_fn](RunContext rctx) {
prev_fn(rctx);
exec_fn(rctx);
};
}

++bulk_status.count;
Expand All @@ -497,10 +498,12 @@ class ThreadedEngine : public Engine {
if (!bulk_status.count) return;
bulk_status.count = 0;
DeduplicateVarHandle(&bulk_status.const_vars, &bulk_status.mutable_vars);
SyncFn fn = std::move(bulk_status.fn);
this->PushAsync([fn](RunContext ctx, CallbackOnComplete on_complete) {
auto functions = bulk_status.functions;
this->PushAsync([functions](RunContext ctx, CallbackOnComplete on_complete) {
ctx.is_bulk = true;
fn(ctx);
for (auto& fn : *functions) {
fn(ctx);
}
ctx.is_bulk = false;
bool is_gpu = ctx.ctx.dev_mask() == gpu::kDevMask;
if (is_gpu) {
Expand All @@ -510,6 +513,8 @@ class ThreadedEngine : public Engine {
}, bulk_status.ctx, bulk_status.const_vars, bulk_status.mutable_vars,
FnProperty::kNormal, 0, "ImperativeBulk");

bulk_status.functions.reset(new std::vector<SyncFn>());
bulk_status.functions->reserve(bulk_status.bulk_size);
bulk_status.const_vars.clear();
bulk_status.mutable_vars.clear();
}
Expand Down

0 comments on commit 66d1c3c

Please sign in to comment.