Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Avoid unnecessary chunk stats recomputation on append. #646

Merged
merged 1 commit into from
Aug 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions omniscidb/ArrowStorage/ArrowStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,7 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
(static_cast<size_t>(at->num_rows()) + table.fragment_size - 1 - first_frag_size) /
table.fragment_size +
1;
size_t last_orig_frag_idx = fragments.empty() ? 0 : fragments.size() - 1;
// Pre-allocate fragment infos and table stats for each column for the following
// parallel data import.
fragments.resize(frag_count);
Expand Down Expand Up @@ -815,9 +816,9 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
if (col_type->isFixedLenArray()) {
elems_count = col_type->size() / elem_type->size();
}
// Compute stats for each fragment.
// Compute stats for each added/modified fragment.
tbb::parallel_for(
tbb::blocked_range(size_t(0), frag_count), [&](auto frag_range) {
tbb::blocked_range(last_orig_frag_idx, frag_count), [&](auto frag_range) {
for (size_t frag_idx = frag_range.begin(); frag_idx != frag_range.end();
++frag_idx) {
auto& frag = fragments[frag_idx];
Expand Down Expand Up @@ -855,17 +856,21 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
}
}); // each fragment

// Merge fragment stats to the table stats.
// Merge added/mdodified fragment stats to the table stats.
auto& column_stats = table_stats.at(col_info->column_id);
column_stats = fragments[0].metadata[col_idx]->chunkStats();
for (size_t frag_idx = 1; frag_idx < frag_count; ++frag_idx) {
if (!last_orig_frag_idx) {
column_stats = fragments[0].metadata[col_idx]->chunkStats();
}
for (size_t frag_idx = last_orig_frag_idx ? last_orig_frag_idx : 1;
frag_idx < frag_count;
++frag_idx) {
mergeStats(column_stats,
fragments[frag_idx].metadata[col_idx]->chunkStats(),
col_type);
}
} else {
bool has_nulls = false;
for (size_t frag_idx = 0; frag_idx < frag_count; ++frag_idx) {
for (size_t frag_idx = last_orig_frag_idx; frag_idx < frag_count; ++frag_idx) {
auto& frag = fragments[frag_idx];
frag.offset =
frag_idx ? ((frag_idx - 1) * table.fragment_size + first_frag_size) : 0;
Expand All @@ -886,7 +891,8 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
}

auto& column_stats = table_stats.at(col_info->column_id);
column_stats.has_nulls = has_nulls;
column_stats.has_nulls =
last_orig_frag_idx ? (has_nulls || column_stats.has_nulls) : has_nulls;
column_stats.min.stringval = nullptr;
column_stats.max.stringval = nullptr;
}
Expand Down