Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 212defe

Browse files
authored
Require TBB for CPU multithreading (#539)
Remove TBB/std threading layer in favor of TBB only and remove all conditional TBB enabling.
1 parent df473d6 commit 212defe

28 files changed

+239
-989
lines changed

CMakeLists.txt

-2
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,6 @@ include_directories(${Boost_INCLUDE_DIR})
165165

166166
# TBB
167167
find_package(TBB REQUIRED)
168-
add_definitions("-DENABLE_TBB")
169-
add_definitions("-DHAVE_TBB")
170168
add_definitions("-DTBB_PREVIEW_TASK_GROUP_EXTENSIONS=1")
171169

172170
# Cost Model

omniscidb/ArrowStorage/ArrowStorage.cpp

+116-120
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
#include "IR/Type.h"
1919
#include "Shared/ArrowUtil.h"
2020
#include "Shared/measure.h"
21-
#include "Shared/threading.h"
2221

2322
#ifdef __GNUC__
2423
#pragma GCC diagnostic push
@@ -32,6 +31,8 @@
3231
#include <arrow/util/value_parsing.h>
3332
#include <parquet/api/reader.h>
3433
#include <parquet/arrow/reader.h>
34+
#include <tbb/blocked_range.h>
35+
#include <tbb/parallel_for.h>
3536

3637
#ifdef __GNUC__
3738
#pragma GCC diagnostic pop
@@ -682,131 +683,126 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
682683
}
683684
}
684685

685-
threading::parallel_for(
686-
threading::blocked_range(0, (int)at->columns().size()), [&](auto range) {
687-
for (auto col_idx = range.begin(); col_idx != range.end(); col_idx++) {
688-
auto col_info = getColumnInfo(db_id_, table_id, columnId(col_idx));
689-
auto col_type = col_info->type;
690-
auto col_arr = at->column(col_idx);
691-
692-
// Conversion of empty string to Nulls and further processing handled
693-
// separately.
694-
if (!col_type->nullable() && col_arr->null_count() != 0 &&
695-
col_arr->type()->id() != arrow::Type::STRING) {
696-
throw std::runtime_error("Null values used in non-nullable type: "s +
697-
col_type->toString());
698-
}
686+
tbb::parallel_for(tbb::blocked_range(0, (int)at->columns().size()), [&](auto range) {
687+
for (auto col_idx = range.begin(); col_idx != range.end(); col_idx++) {
688+
auto col_info = getColumnInfo(db_id_, table_id, columnId(col_idx));
689+
auto col_type = col_info->type;
690+
auto col_arr = at->column(col_idx);
699691

700-
DictionaryData* dict_data = nullptr;
701-
auto elem_type =
702-
col_type->isArray()
703-
? dynamic_cast<const hdk::ir::ArrayBaseType*>(col_type)->elemType()
704-
: col_type;
705-
if (elem_type->isExtDictionary()) {
706-
dict_data = dicts_
707-
.at(dynamic_cast<const hdk::ir::ExtDictionaryType*>(elem_type)
708-
->dictId())
709-
.get();
710-
}
692+
// Conversion of empty string to Nulls and further processing handled
693+
// separately.
694+
if (!col_type->nullable() && col_arr->null_count() != 0 &&
695+
col_arr->type()->id() != arrow::Type::STRING) {
696+
throw std::runtime_error("Null values used in non-nullable type: "s +
697+
col_type->toString());
698+
}
711699

712-
if (col_type->isDecimal()) {
713-
col_arr = convertDecimalToInteger(col_arr, col_type);
714-
} else if (col_type->isExtDictionary()) {
715-
switch (col_arr->type()->id()) {
716-
case arrow::Type::STRING:
717-
// if the dictionary has already been materialized, append indices
718-
if (!config_->storage.enable_lazy_dict_materialization ||
719-
dict_data->is_materialized) {
720-
col_arr = createDictionaryEncodedColumn(
721-
dict_data->dict()->stringDict.get(), col_arr, col_type);
722-
}
723-
break;
724-
case arrow::Type::DICTIONARY:
725-
col_arr = convertArrowDictionary(
726-
dict_data->dict()->stringDict.get(), col_arr, col_type);
727-
break;
728-
default:
729-
CHECK(false);
700+
DictionaryData* dict_data = nullptr;
701+
auto elem_type =
702+
col_type->isArray()
703+
? dynamic_cast<const hdk::ir::ArrayBaseType*>(col_type)->elemType()
704+
: col_type;
705+
if (elem_type->isExtDictionary()) {
706+
dict_data =
707+
dicts_
708+
.at(dynamic_cast<const hdk::ir::ExtDictionaryType*>(elem_type)->dictId())
709+
.get();
710+
}
711+
712+
if (col_type->isDecimal()) {
713+
col_arr = convertDecimalToInteger(col_arr, col_type);
714+
} else if (col_type->isExtDictionary()) {
715+
switch (col_arr->type()->id()) {
716+
case arrow::Type::STRING:
717+
// if the dictionary has already been materialized, append indices
718+
if (!config_->storage.enable_lazy_dict_materialization ||
719+
dict_data->is_materialized) {
720+
col_arr = createDictionaryEncodedColumn(
721+
dict_data->dict()->stringDict.get(), col_arr, col_type);
730722
}
731-
} else if (col_type->isString()) {
732-
} else {
733-
col_arr = replaceNullValues(
734-
col_arr,
735-
col_type,
736-
dict_data ? dict_data->dict()->stringDict.get() : nullptr);
737-
}
723+
break;
724+
case arrow::Type::DICTIONARY:
725+
col_arr = convertArrowDictionary(
726+
dict_data->dict()->stringDict.get(), col_arr, col_type);
727+
break;
728+
default:
729+
CHECK(false);
730+
}
731+
} else if (col_type->isString()) {
732+
} else {
733+
col_arr = replaceNullValues(
734+
col_arr, col_type, dict_data ? dict_data->dict()->stringDict.get() : nullptr);
735+
}
738736

739-
col_data[col_idx] = col_arr;
737+
col_data[col_idx] = col_arr;
740738

741-
bool compute_stats = !col_type->isString();
742-
if (compute_stats) {
743-
size_t elems_count = 1;
744-
if (col_type->isFixedLenArray()) {
745-
elems_count = col_type->size() / elem_type->size();
746-
}
747-
// Compute stats for each fragment.
748-
threading::parallel_for(
749-
threading::blocked_range(size_t(0), frag_count), [&](auto frag_range) {
750-
for (size_t frag_idx = frag_range.begin(); frag_idx != frag_range.end();
751-
++frag_idx) {
752-
auto& frag = fragments[frag_idx];
753-
754-
frag.offset =
755-
frag_idx
756-
? ((frag_idx - 1) * table.fragment_size + first_frag_size)
757-
: 0;
758-
frag.row_count =
759-
frag_idx
760-
? std::min(table.fragment_size,
761-
static_cast<size_t>(at->num_rows()) - frag.offset)
762-
: first_frag_size;
763-
764-
size_t num_bytes;
765-
if (col_type->isFixedLenArray()) {
766-
num_bytes = frag.row_count * col_type->size();
767-
} else if (col_type->isVarLenArray()) {
768-
num_bytes =
769-
computeTotalStringsLength(col_arr, frag.offset, frag.row_count);
770-
} else {
771-
num_bytes = frag.row_count * col_type->size();
772-
}
773-
auto meta = std::make_shared<ChunkMetadata>(
774-
col_info->type, num_bytes, frag.row_count);
775-
776-
if (!lazy_fetch_cols[col_idx]) {
777-
meta->fillChunkStats(computeStats(
778-
col_arr->Slice(frag.offset, frag.row_count * elems_count),
779-
col_type));
780-
} else {
781-
int32_t min = 0;
782-
int32_t max = -1;
783-
meta->fillChunkStats(min, max, /*has_nulls=*/true);
784-
}
785-
frag.metadata[col_idx] = meta;
786-
}
787-
}); // each fragment
788-
} else {
789-
for (size_t frag_idx = 0; frag_idx < frag_count; ++frag_idx) {
790-
auto& frag = fragments[frag_idx];
791-
frag.offset =
792-
frag_idx ? ((frag_idx - 1) * table.fragment_size + first_frag_size) : 0;
793-
frag.row_count =
794-
frag_idx ? std::min(table.fragment_size,
795-
static_cast<size_t>(at->num_rows()) - frag.offset)
796-
: first_frag_size;
797-
CHECK(col_type->isText());
798-
auto meta = std::make_shared<ChunkMetadata>(
799-
col_info->type,
800-
computeTotalStringsLength(col_arr, frag.offset, frag.row_count),
801-
frag.row_count);
802-
meta->fillStringChunkStats(
803-
col_arr->Slice(frag.offset, frag.row_count)->null_count());
804-
805-
frag.metadata[col_idx] = meta;
806-
}
807-
}
739+
bool compute_stats = !col_type->isString();
740+
if (compute_stats) {
741+
size_t elems_count = 1;
742+
if (col_type->isFixedLenArray()) {
743+
elems_count = col_type->size() / elem_type->size();
808744
}
809-
}); // each column
745+
// Compute stats for each fragment.
746+
tbb::parallel_for(
747+
tbb::blocked_range(size_t(0), frag_count), [&](auto frag_range) {
748+
for (size_t frag_idx = frag_range.begin(); frag_idx != frag_range.end();
749+
++frag_idx) {
750+
auto& frag = fragments[frag_idx];
751+
752+
frag.offset =
753+
frag_idx ? ((frag_idx - 1) * table.fragment_size + first_frag_size)
754+
: 0;
755+
frag.row_count =
756+
frag_idx ? std::min(table.fragment_size,
757+
static_cast<size_t>(at->num_rows()) - frag.offset)
758+
: first_frag_size;
759+
760+
size_t num_bytes;
761+
if (col_type->isFixedLenArray()) {
762+
num_bytes = frag.row_count * col_type->size();
763+
} else if (col_type->isVarLenArray()) {
764+
num_bytes =
765+
computeTotalStringsLength(col_arr, frag.offset, frag.row_count);
766+
} else {
767+
num_bytes = frag.row_count * col_type->size();
768+
}
769+
auto meta = std::make_shared<ChunkMetadata>(
770+
col_info->type, num_bytes, frag.row_count);
771+
772+
if (!lazy_fetch_cols[col_idx]) {
773+
meta->fillChunkStats(computeStats(
774+
col_arr->Slice(frag.offset, frag.row_count * elems_count),
775+
col_type));
776+
} else {
777+
int32_t min = 0;
778+
int32_t max = -1;
779+
meta->fillChunkStats(min, max, /*has_nulls=*/true);
780+
}
781+
frag.metadata[col_idx] = meta;
782+
}
783+
}); // each fragment
784+
} else {
785+
for (size_t frag_idx = 0; frag_idx < frag_count; ++frag_idx) {
786+
auto& frag = fragments[frag_idx];
787+
frag.offset =
788+
frag_idx ? ((frag_idx - 1) * table.fragment_size + first_frag_size) : 0;
789+
frag.row_count =
790+
frag_idx ? std::min(table.fragment_size,
791+
static_cast<size_t>(at->num_rows()) - frag.offset)
792+
: first_frag_size;
793+
CHECK(col_type->isText());
794+
auto meta = std::make_shared<ChunkMetadata>(
795+
col_info->type,
796+
computeTotalStringsLength(col_arr, frag.offset, frag.row_count),
797+
frag.row_count);
798+
meta->fillStringChunkStats(
799+
col_arr->Slice(frag.offset, frag.row_count)->null_count());
800+
801+
frag.metadata[col_idx] = meta;
802+
}
803+
}
804+
}
805+
}); // each column
810806
dict_lock.unlock();
811807

812808
if (table.row_count) {

omniscidb/ArrowStorage/ArrowStorageUtils.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
#include "IR/Context.h"
1818
#include "Shared/InlineNullValues.h"
1919

20-
// TODO: use <Shared/threading.h>
2120
#include <tbb/parallel_for.h>
2221
#include <tbb/task_group.h>
2322

omniscidb/CMakeLists.txt

+1-16
Original file line numberDiff line numberDiff line change
@@ -590,26 +590,11 @@ endif()
590590

591591
# TBB
592592

593-
option(ENABLE_TBB "Enable OneTBB for threading (if found)" ON)
594-
set(TBB_LIBS "")
595-
find_package(TBB)
593+
find_package(TBB REQUIRED)
596594
if(TBB_FOUND)
597595
message(STATUS "TBB library is found with ${TBB_DIR}")
598-
add_definitions("-DHAVE_TBB")
599596
add_definitions("-DTBB_PREVIEW_TASK_GROUP_EXTENSIONS=1")
600597
list(APPEND TBB_LIBS ${TBB_LIBRARIES})
601-
if(ENABLE_TBB)
602-
add_definitions("-DENABLE_TBB")
603-
else()
604-
message(STATUS "Using TBB for threading is DISABLED")
605-
endif()
606-
else()
607-
set(ENABLE_TBB OFF)
608-
endif()
609-
610-
option(DISABLE_CONCURRENCY "Disable parallellism at the threading layer" OFF)
611-
if(DISABLE_CONCURRENCY)
612-
add_definitions("-DDISABLE_CONCURRENCY")
613598
endif()
614599

615600
list(APPEND ADDITIONAL_MAKE_CLEAN_FILES ${CMAKE_BINARY_DIR}/gen-cpp/)

omniscidb/QueryEngine/ArrowResultSetConverter.cpp

+5-6
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include "Execute.h"
2121
#include "Shared/ArrowUtil.h"
2222
#include "Shared/DateConverters.h"
23-
#include "Shared/threading.h"
2423
#include "Shared/toString.h"
2524

2625
// arrow headers
@@ -341,7 +340,7 @@ int64_t create_bitmap_parallel_for_avx512(uint8_t* bitmap_data,
341340
bitmap_data_ptr, const_cast<TYPE*>(values_data_ptr), processing_count);
342341
};
343342

344-
threading::parallel_for(
343+
tbb::parallel_for(
345344
tbb::blocked_range<size_t>(
346345
0, (avx512_processing_count + min_block_size - 1) / min_block_size),
347346
br_par_processor);
@@ -394,7 +393,7 @@ void convert_column(ResultSetPtr result,
394393

395394
std::vector<std::shared_ptr<arrow::Array>> fragments(values.size(), nullptr);
396395

397-
threading::parallel_for(static_cast<size_t>(0), values.size(), [&](size_t idx) {
396+
tbb::parallel_for(static_cast<size_t>(0), values.size(), [&](size_t idx) {
398397
size_t chunk_rows_count = chunks[idx].second;
399398

400399
auto res = arrow::AllocateBuffer((chunk_rows_count + 7) / 8);
@@ -421,7 +420,7 @@ void convert_column(ResultSetPtr result,
421420
? std::make_shared<NumArray>(
422421
chunk_rows_count, values[idx], is_valid, null_count)
423422
: std::make_shared<NumArray>(chunk_rows_count, values[idx]);
424-
}); // threading::parallel_for
423+
}); // tbb::parallel_for
425424

426425
out = std::make_shared<arrow::ChunkedArray>(std::move(fragments));
427426
}
@@ -1492,7 +1491,7 @@ std::shared_ptr<arrow::Table> ArrowResultSetConverter::getArrowTable(
14921491
results_->isTruncated());
14931492
} else {
14941493
auto timer = DEBUG_TIMER("fetch data in parallel_for");
1495-
threading::parallel_for(
1494+
tbb::parallel_for(
14961495
static_cast<size_t>(0), entry_count, stride, [&](size_t start_entry) {
14971496
const size_t i = start_entry / stride;
14981497
const size_t end_entry = std::min(entry_count, start_entry + stride);
@@ -1513,7 +1512,7 @@ std::shared_ptr<arrow::Table> ArrowResultSetConverter::getArrowTable(
15131512

15141513
{
15151514
auto timer = DEBUG_TIMER("append rows to arrow, finish builders");
1516-
threading::parallel_for(static_cast<size_t>(0), col_count, [&](size_t i) {
1515+
tbb::parallel_for(static_cast<size_t>(0), col_count, [&](size_t i) {
15171516
if (!columnar_conversion_flags[i]) {
15181517
for (size_t j = 0; j < segments_count; ++j) {
15191518
if (column_value_segs[j][i]) {

0 commit comments

Comments
 (0)