Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Add ResultSetRegistry storage [1/N] #331

Merged
merged 3 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ add_subdirectory(omniscidb/SqliteConnector)
add_subdirectory(omniscidb/QueryBuilder)
add_subdirectory(omniscidb/QueryEngine)
add_subdirectory(omniscidb/ResultSet)
add_subdirectory(omniscidb/ResultSetRegistry)

# Source
add_subdirectory(src)
Expand All @@ -421,7 +422,7 @@ if(BUILD_SHARED_LIBS AND ENABLE_PYTHON)
add_subdirectory(python)
endif()

install(TARGETS OSDependent Logger Shared Utils Calcite ArrowStorage StringDictionary DataMgr CudaMgr SchemaMgr L0Mgr QueryBuilder QueryEngine Analyzer IR ConfigBuilder SqliteConnector ResultSet RUNTIME)
install(TARGETS OSDependent Logger Shared Utils Calcite ArrowStorage StringDictionary DataMgr CudaMgr SchemaMgr L0Mgr QueryBuilder QueryEngine Analyzer IR ConfigBuilder SqliteConnector ResultSet ResultSetRegistry RUNTIME)
if(ENABLE_CUDA)
install(FILES ${CMAKE_BINARY_DIR}/omniscidb/QueryEngine/cuda_mapd_rt.fatbin DESTINATION QueryEngine COMPONENT "exe")
endif()
Expand Down
3 changes: 2 additions & 1 deletion omniscidb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ if(MSVC)
endif()

find_package(Git)
find_package(Glog REQUIRED)
find_package(Glog CONFIG REQUIRED)
find_package(ZLIB REQUIRED)

option(ENABLE_FOLLY "Use Folly" ON)
Expand Down Expand Up @@ -660,6 +660,7 @@ add_subdirectory(IR)
add_subdirectory(L0Mgr)
add_subdirectory(Logger)
add_subdirectory(ResultSet)
add_subdirectory(ResultSetRegistry)
add_subdirectory(Shared)
add_subdirectory(SchemaMgr)
add_subdirectory(OSDependent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ void PersistentStorageMgr::registerDataProvider(
mgr_by_schema_id_[schema_id] = provider;
}

bool PersistentStorageMgr::hasDataProvider(int schema_id) const {
return mgr_by_schema_id_.count(schema_id);
}

std::shared_ptr<AbstractBufferMgr> PersistentStorageMgr::getDataProvider(
int schema_id) const {
CHECK_EQ(mgr_by_schema_id_.count(schema_id), (size_t)1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class PersistentStorageMgr : public AbstractBufferMgr {

void registerDataProvider(int schema_id, std::shared_ptr<AbstractBufferMgr>);

bool hasDataProvider(int schema_id) const;
std::shared_ptr<AbstractBufferMgr> getDataProvider(int schema_id) const;

protected:
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/QueryEngine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ set(query_engine_source_files
CgenState.cpp
Codec.cpp
ColRangeInfo.cpp
ColumnarResults.cpp
ColumnFetcher.cpp
ColumnIR.cpp
CompareIR.cpp
Expand Down Expand Up @@ -326,6 +325,7 @@ set(QUERY_ENGINE_LIBS
Utils
Logger
ResultSet
ResultSetRegistry
Shared
SchemaMgr
SqliteConnector
Expand Down
7 changes: 2 additions & 5 deletions omniscidb/QueryEngine/CardinalityEstimator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,8 @@ size_t RelAlgExecutor::getNDVEstimation(const WorkUnit& work_unit,
false,
data_provider_,
column_cache);
if (estimator_result.empty()) {
return 1;
}
CHECK_EQ(estimator_result.getFragCount(), 1);
return std::max(estimator_result[0]->getNDVEstimator(), size_t(1));
CHECK_EQ(estimator_result.size(), (size_t)1);
return std::max(estimator_result.result(0)->getNDVEstimator(), size_t(1));
} catch (const QueryExecutionError& e) {
if (e.getErrorCode() == Executor::ERR_OUT_OF_TIME) {
throw std::runtime_error("Cardinality estimation query ran out of time");
Expand Down
34 changes: 18 additions & 16 deletions omniscidb/QueryEngine/ColumnFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ inline const ColumnarResults* columnarize_result(
for (size_t i = 0; i < result->colCount(); ++i) {
col_types.push_back(result->colType(i)->canonicalize());
}
return new ColumnarResults(
row_set_mem_owner, *result, result->colCount(), col_types, thread_idx, executor);
return new ColumnarResults(row_set_mem_owner,
*result,
result->colCount(),
col_types,
thread_idx,
executor->getConfig());
}

std::string getMemoryLevelString(Data_Namespace::MemoryLevel memoryLevel) {
Expand Down Expand Up @@ -123,13 +127,12 @@ std::pair<const int8_t*, size_t> ColumnFetcher::getOneColumnFragment(
}
auto& frag_id_to_result = column_cache[table_id];
if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
auto& tmp_table =
get_temporary_table(executor->temporary_tables_, hash_col.tableId());
auto token = get_temporary_table(executor->temporary_tables_, hash_col.tableId());
frag_id_to_result.insert(
std::make_pair(frag_id,
std::shared_ptr<const ColumnarResults>(
columnarize_result(executor->row_set_mem_owner_,
tmp_table.getResultSet(frag_id),
token->resultSet(frag_id),
thread_idx,
frag_id,
executor))));
Expand Down Expand Up @@ -332,8 +335,7 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
col_buffer,
fragment.getNumTuples(),
chunk_meta_it->second->type,
thread_idx,
executor_));
thread_idx));
}
auto merged_results =
ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
Expand All @@ -356,15 +358,15 @@ const int8_t* ColumnFetcher::getResultSetColumn(
const int device_id,
DeviceAllocator* device_allocator,
const size_t thread_idx) const {
return getResultSetColumn(
get_temporary_table(executor_->temporary_tables_, table_id).getResultSet(frag_id),
table_id,
frag_id,
col_id,
memory_level,
device_id,
device_allocator,
thread_idx);
return getResultSetColumn(get_temporary_table(executor_->temporary_tables_, table_id)
->resultSet((size_t)frag_id),
table_id,
frag_id,
col_id,
memory_level,
device_id,
device_allocator,
thread_idx);
}

const int8_t* ColumnFetcher::linearizeColumnFragments(
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/QueryEngine/ColumnFetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
#include "DataMgr/Allocators/DeviceAllocator.h"
#include "DataProvider/DataProvider.h"
#include "IR/Expr.h"
#include "QueryEngine/ColumnarResults.h"
#include "QueryEngine/Descriptors/QueryFragmentDescriptor.h"
#include "QueryEngine/JoinHashTable/Runtime/HashJoinRuntime.h"
#include "ResultSetRegistry/ColumnarResults.h"
#include "Shared/hash.h"

struct FetchResult {
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/QueryEngine/DataRecycler/DataRecycler.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
#pragma once

#include "Analyzer/Analyzer.h"
#include "QueryEngine/ColumnarResults.h"
#include "QueryEngine/Descriptors/InputDescriptors.h"
#include "QueryEngine/Descriptors/RelAlgExecutionDescriptor.h"
#include "QueryEngine/JoinHashTable/HashTable.h"
#include "QueryEngine/RelAlgExecutionUnit.h"
#include "ResultSet/ResultSet.h"
#include "ResultSetRegistry/ColumnarResults.h"
#include "Shared/mapd_shared_mutex.h"
#include "Shared/misc.h"

Expand Down
47 changes: 5 additions & 42 deletions omniscidb/QueryEngine/Descriptors/RelAlgExecutionDescriptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,9 @@ ExecutionResult::ExecutionResult()
, execution_time_ms_(0)
, type_(QueryResult) {}

ExecutionResult::ExecutionResult(const ResultSetPtr& rows,
ExecutionResult::ExecutionResult(hdk::ResultSetTableTokenPtr token,
const std::vector<TargetMetaInfo>& targets_meta)
: results_(rows)
, targets_meta_(targets_meta)
, filter_push_down_enabled_(false)
, success_(true)
, execution_time_ms_(0)
, type_(QueryResult) {}

ExecutionResult::ExecutionResult(ResultSetPtr&& result,
const std::vector<TargetMetaInfo>& targets_meta)
: results_(std::move(result))
, targets_meta_(targets_meta)
, filter_push_down_enabled_(false)
, success_(true)
, execution_time_ms_(0)
, type_(QueryResult) {}

ExecutionResult::ExecutionResult(const TemporaryTable& results,
const std::vector<TargetMetaInfo>& targets_meta)
: results_(results)
, targets_meta_(targets_meta)
, filter_push_down_enabled_(false)
, success_(true)
, execution_time_ms_(0)
, type_(QueryResult) {}

ExecutionResult::ExecutionResult(TemporaryTable&& results,
const std::vector<TargetMetaInfo>& targets_meta)
: results_(results)
: result_token_(std::move(token))
, targets_meta_(targets_meta)
, filter_push_down_enabled_(false)
, success_(true)
Expand All @@ -70,7 +43,7 @@ ExecutionResult::ExecutionResult(const ExecutionResult& that)
(filter_push_down_enabled_ && pushed_down_filter_info_.empty())) {
return;
}
results_ = that.results_;
result_token_ = that.result_token_;
}

ExecutionResult::ExecutionResult(ExecutionResult&& that)
Expand All @@ -84,7 +57,7 @@ ExecutionResult::ExecutionResult(ExecutionResult&& that)
(filter_push_down_enabled_ && pushed_down_filter_info_.empty())) {
return;
}
results_ = std::move(that.results_);
result_token_ = std::move(that.result_token_);
}

ExecutionResult::ExecutionResult(
Expand All @@ -103,7 +76,7 @@ ExecutionResult& ExecutionResult::operator=(const ExecutionResult& that) {
filter_push_down_enabled_ = that.filter_push_down_enabled_;
return *this;
}
results_ = that.results_;
result_token_ = that.result_token_;
targets_meta_ = that.targets_meta_;
success_ = that.success_;
execution_time_ms_ = that.execution_time_ms_;
Expand All @@ -116,16 +89,6 @@ const std::vector<PushedDownFilterInfo>& ExecutionResult::getPushedDownFilterInf
return pushed_down_filter_info_;
}

void ExecutionResult::updateResultSet(const std::string& query,
RType type,
bool success) {
targets_meta_.clear();
pushed_down_filter_info_.clear();
success_ = success;
type_ = type;
results_ = std::make_shared<ResultSet>(query);
}

std::string ExecutionResult::getExplanation() {
if (!empty()) {
return getRows()->getExplanation();
Expand Down
57 changes: 22 additions & 35 deletions omniscidb/QueryEngine/Descriptors/RelAlgExecutionDescriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "QueryEngine/JoinFilterPushDown.h"
#include "ResultSet/QueryMemoryDescriptor.h"
#include "ResultSet/ResultSet.h"
#include "ResultSetRegistry/ResultSetRegistry.h"
#include "Shared/TargetInfo.h"
#include "Shared/toString.h"

Expand All @@ -28,15 +29,7 @@ class ExecutionResult {
public:
ExecutionResult();

ExecutionResult(const ResultSetPtr& rows,
const std::vector<TargetMetaInfo>& targets_meta);

ExecutionResult(ResultSetPtr&& result, const std::vector<TargetMetaInfo>& targets_meta);

ExecutionResult(const TemporaryTable& results,
const std::vector<TargetMetaInfo>& targets_meta);

ExecutionResult(TemporaryTable&& results,
ExecutionResult(hdk::ResultSetTableTokenPtr token,
const std::vector<TargetMetaInfo>& targets_meta);

ExecutionResult(const ExecutionResult& that);
Expand All @@ -48,19 +41,17 @@ class ExecutionResult {

ExecutionResult& operator=(const ExecutionResult& that);

const ResultSetPtr& getRows() const {
CHECK_EQ(results_.getFragCount(), 1);
return results_[0];
}

bool empty() const { return results_.empty(); }
hdk::ResultSetTableTokenPtr getToken() const { return result_token_; }

const ResultSetPtr& getDataPtr() const {
CHECK_EQ(results_.getFragCount(), 1);
return results_[0];
ResultSetPtr getRows() const {
CHECK(result_token_);
CHECK_EQ(result_token_->resultSetCount(), (size_t)1);
return result_token_->resultSet(0);
}

const TemporaryTable& getTable() const { return results_; }
bool empty() const { return !result_token_; }

ResultSetPtr getDataPtr() const { return getRows(); }

const std::vector<TargetMetaInfo>& getTargetsMeta() const { return targets_meta_; }

Expand All @@ -69,19 +60,24 @@ class ExecutionResult {
const bool isFilterPushDownEnabled() const { return filter_push_down_enabled_; }

void setQueueTime(const int64_t queue_time_ms) {
CHECK(!results_.empty());
results_[0]->setQueueTime(queue_time_ms);
CHECK(false);
getRows()->setQueueTime(queue_time_ms);
}

std::string toString() const {
return ::typeName(this) + "(" + ::toString(results_) + ", " +
::toString(targets_meta_) + ")";
std::string res = ::typeName(this) + "(";
if (result_token_) {
res += ::toString(result_token_);
} else {
res += "empty";
}
res += ", " + ::toString(targets_meta_) + ")";
return res;
}

enum RType { QueryResult, SimpleResult, Explaination, CalciteDdl };

std::string getExplanation();
void updateResultSet(const std::string& query_ra, RType type, bool success = true);
RType getResultType() const { return type_; }
void setResultType(RType type) { type_ = type; }
int64_t getExecutionTime() const { return execution_time_ms_; }
Expand All @@ -93,7 +89,7 @@ class ExecutionResult {
}

private:
TemporaryTable results_;
hdk::ResultSetTableTokenPtr result_token_;
std::vector<TargetMetaInfo> targets_meta_;
// filters chosen to be pushed down
std::vector<PushedDownFilterInfo> pushed_down_filter_info_;
Expand All @@ -111,16 +107,7 @@ class Node;

class RaExecutionDesc {
public:
RaExecutionDesc(const hdk::ir::Node* body)
: body_(body)
, result_(std::make_shared<ResultSet>(std::vector<TargetInfo>{},
ExecutorDeviceType::CPU,
QueryMemoryDescriptor(),
nullptr,
nullptr,
0,
0),
{}) {}
RaExecutionDesc(const hdk::ir::Node* body) : body_(body) {}

const ExecutionResult& getResult() const { return result_; }

Expand Down
Loading