Skip to content

Commit

Permalink
Merge pull request #2909 from timbess/feature/decrease-max-heap-loadi…
Browse files Browse the repository at this point in the history
…ng-data

Fix overallocations
  • Loading branch information
texodus authored Feb 1, 2025
2 parents c25388c + 47f1672 commit 0699018
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 95 deletions.
6 changes: 6 additions & 0 deletions cpp/perspective/src/cpp/arrow_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ using namespace perspective;

ArrowLoader::ArrowLoader() = default;
ArrowLoader::~ArrowLoader() = default;
ArrowLoader::ArrowLoader(ArrowLoader&&) noexcept = default;

t_dtype
convert_type(const std::string& src) {
Expand Down Expand Up @@ -484,6 +485,11 @@ copy_array(
t_vocab* vocab = dest->_get_vocab();
std::string elem;

vocab->reserve(
dict->value_data()->size() + dsize, // vocab len + null bytes
dsize
);

for (std::uint64_t i = 0; i < dsize; ++i) {
std::int32_t bidx = offsets[i];
std::size_t es = offsets[i + 1] - bidx;
Expand Down
57 changes: 37 additions & 20 deletions cpp/perspective/src/cpp/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,13 +682,16 @@ ProtoServer::handle_request(
req_env.ParseFromString(data);
std::vector<ProtoServerResp<std::string>> serialized_responses;
std::vector<proto::Response> responses;

auto msg_id = req_env.msg_id();
auto entity_id = req_env.entity_id();
try {
auto resp_msg = _handle_request(client_id, req_env);
auto resp_msg = _handle_request(client_id, std::move(req_env));
for (auto& resp : resp_msg) {
ProtoServerResp<std::string> str_resp;
str_resp.data = resp.data.SerializeAsString();
str_resp.client_id = resp.client_id;
serialized_responses.emplace_back(str_resp);
serialized_responses.emplace_back(std::move(str_resp));
}
} catch (const PerspectiveException& e) {
proto::Response resp;
Expand Down Expand Up @@ -718,13 +721,13 @@ ProtoServer::handle_request(
// proto::Response resp_env;
serialized_responses.reserve(responses.size());
for (auto& resp : responses) {
resp.set_msg_id(req_env.msg_id());
resp.set_entity_id(req_env.entity_id());
resp.set_msg_id(msg_id);
resp.set_entity_id(entity_id);

ProtoServerResp<std::string> str_resp;
str_resp.data = resp.SerializeAsString();
str_resp.client_id = client_id;
serialized_responses.emplace_back(str_resp);
serialized_responses.emplace_back(std::move(str_resp));
}

return serialized_responses;
Expand Down Expand Up @@ -1148,7 +1151,7 @@ coerce_to(const t_dtype dtype, const A& val) {
}

std::vector<ProtoServerResp<ProtoServer::Response>>
ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) {
ProtoServer::_handle_request(std::uint32_t client_id, Request&& req) {
static bool is_init_expr = false;
if (!is_init_expr) {
t_computed_expression_parser::init();
Expand All @@ -1157,9 +1160,13 @@ ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) {

std::vector<ProtoServerResp<ProtoServer::Response>> proto_resp;
// proto::Response resp_env;

auto msg_id = req.msg_id();
auto entity_id = req.entity_id();

auto push_resp = [&](Response&& resp) {
resp.set_msg_id(req.msg_id());
resp.set_entity_id(req.entity_id());
resp.set_msg_id(msg_id);
resp.set_entity_id(entity_id);
ProtoServerResp<ProtoServer::Response> resp2;
resp2.data = std::move(resp);
resp2.client_id = client_id;
Expand Down Expand Up @@ -1280,32 +1287,42 @@ ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) {
dims.end_col
);

table = Table::from_arrow(index, *arrow, limit);
table = Table::from_arrow(index, std::move(*arrow), limit);
break;
}
case proto::MakeTableData::kFromArrow: {
table =
Table::from_arrow(index, r.data().from_arrow(), limit);
std::string data = r.data().from_arrow();
{ auto _ = std::move(req); }

table = Table::from_arrow(index, std::move(data), limit);
break;
}
case proto::MakeTableData::kFromCsv: {
table = Table::from_csv(index, r.data().from_csv(), limit);
std::string data = r.data().from_csv();
{ auto _ = std::move(req); }

table = Table::from_csv(index, std::move(data), limit);
break;
}
case proto::MakeTableData::kFromCols: {
table =
Table::from_cols(index, r.data().from_cols(), limit);
std::string data = r.data().from_cols();
{ auto _ = std::move(req); }

table = Table::from_cols(index, std::move(data), limit);
break;
}
case proto::MakeTableData::kFromRows: {
table =
Table::from_rows(index, r.data().from_rows(), limit);
std::string data = r.data().from_rows();
{ auto _ = std::move(req); }

table = Table::from_rows(index, std::move(data), limit);
break;
}
case proto::MakeTableData::kFromNdjson: {
table = Table::from_ndjson(
index, r.data().from_ndjson(), limit
);
std::string data = r.data().from_ndjson();
{ auto _ = std::move(req); }

table = Table::from_ndjson(index, std::move(data), limit);
break;
}
case proto::MakeTableData::kFromSchema: {
Expand All @@ -1327,7 +1344,7 @@ ProtoServer::_handle_request(std::uint32_t client_id, const Request& req) {
}
}

m_resources.host_table(req.entity_id(), table);
m_resources.host_table(entity_id, table);
proto::Response resp;
resp.mutable_make_table_resp();
push_resp(std::move(resp));
Expand Down
Loading

0 comments on commit 0699018

Please sign in to comment.