Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

ResultSet refactoring and clean-up [08/N] #227

Merged
merged 1 commit into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
589 changes: 66 additions & 523 deletions omniscidb/QueryEngine/Descriptors/QueryMemoryDescriptor.cpp

Large diffs are not rendered by default.

49 changes: 21 additions & 28 deletions omniscidb/QueryEngine/Descriptors/QueryMemoryDescriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#ifndef QUERYENGINE_QUERYMEMORYDESCRIPTOR_H
#define QUERYENGINE_QUERYMEMORYDESCRIPTOR_H

#include "../CompilationOptions.h"
#include "ColSlotContext.h"
#include "CountDistinctDescriptor.h"
#include "Types.h"
Expand All @@ -41,11 +40,16 @@
#include <unordered_map>
#include <vector>

#include "Shared/Config.h"
#include "Shared/DeviceType.h"
#include "Shared/SqlTypesLayout.h"
#include "Shared/TargetInfo.h"

class Executor;
namespace Data_Namespace {
class DataMgr;
}

class BufferProvider;
class QueryExecutionContext;
class RowSetMemoryOwner;
struct InputTableInfo;
Expand All @@ -71,7 +75,8 @@ class QueryMemoryDescriptor {
QueryMemoryDescriptor();

// constructor for init call
QueryMemoryDescriptor(const Executor* executor,
QueryMemoryDescriptor(Data_Namespace::DataMgr* data_mgr,
ConfigPtr config,
const RelAlgExecutionUnit& ra_exe_unit,
const std::vector<InputTableInfo>& query_infos,
const bool allow_multifrag,
Expand All @@ -90,7 +95,8 @@ class QueryMemoryDescriptor {
const bool must_use_baseline_sort,
const bool use_streaming_top_n);

QueryMemoryDescriptor(const Executor* executor,
QueryMemoryDescriptor(Data_Namespace::DataMgr* data_mgr,
ConfigPtr config,
const size_t entry_count,
const QueryDescriptionType query_desc_type,
const bool is_table_function);
Expand All @@ -104,7 +110,8 @@ class QueryMemoryDescriptor {
bool operator==(const QueryMemoryDescriptor& other) const;

static std::unique_ptr<QueryMemoryDescriptor> init(
const Executor* executor,
Data_Namespace::DataMgr* data_mgr,
ConfigPtr config,
const RelAlgExecutionUnit& ra_exe_unit,
const std::vector<InputTableInfo>& query_infos,
const ColRangeInfo& col_range_info,
Expand All @@ -119,21 +126,6 @@ class QueryMemoryDescriptor {
const bool output_columnar_hint,
const bool streaming_top_n_hint);

std::unique_ptr<QueryExecutionContext> getQueryExecutionContext(
const RelAlgExecutionUnit&,
Executor* executor,
const ExecutorDeviceType device_type,
const ExecutorDispatchMode dispatch_mode,
const bool use_groupby_buffer_desc,
const int device_id,
const int64_t num_rows,
const std::vector<std::vector<const int8_t*>>& col_buffers,
const std::vector<std::vector<uint64_t>>& frag_offsets,
std::shared_ptr<RowSetMemoryOwner>,
const bool output_columnar,
const bool sort_on_gpu,
const size_t thread_idx) const;

static bool many_entries(const int64_t max_val,
const int64_t min_val,
const int64_t bucket) {
Expand All @@ -153,14 +145,7 @@ class QueryMemoryDescriptor {
return countDescriptorsLogicallyEmpty(count_distinct_descriptors_);
}

static int8_t pick_target_compact_width(const RelAlgExecutionUnit& ra_exe_unit,
const std::vector<InputTableInfo>& query_infos,
const int8_t crt_min_byte_width,
bool bigint_count);

// Getters and Setters
const Executor* getExecutor() const { return executor_; }

QueryDescriptionType getQueryDescriptionType() const { return query_desc_type_; }
void setQueryDescriptionType(const QueryDescriptionType val) { query_desc_type_ = val; }
bool isSingleColumnGroupByWithPerfectHash() const {
Expand Down Expand Up @@ -338,13 +323,21 @@ class QueryMemoryDescriptor {
return col_slot_context_.slotIsVarlen(slot_idx);
}

Data_Namespace::DataMgr* getDataMgr() const;
BufferProvider* getBufferProvider() const;

protected:
void resetGroupColWidths(const std::vector<int8_t>& new_group_col_widths) {
group_col_widths_ = new_group_col_widths;
}

int8_t warpSize() const;
unsigned gridSize() const;
unsigned blockSize() const;

private:
const Executor* executor_;
Data_Namespace::DataMgr* data_mgr_;
ConfigPtr config_;
QueryDescriptionType query_desc_type_;
bool keyless_hash_;
bool interleaved_bins_on_gpu_;
Expand Down
11 changes: 6 additions & 5 deletions omniscidb/QueryEngine/Execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1898,8 +1898,8 @@ TemporaryTable Executor::executeWorkUnitImpl(
plan_state_.reset(new PlanState(false, query_infos, this));
plan_state_->allocateLocalColumnIds(ra_exe_unit.input_col_descs);
CHECK(!query_mem_desc_owned);
query_mem_desc_owned.reset(
new QueryMemoryDescriptor(this, 0, QueryDescriptionType::Projection, false));
query_mem_desc_owned.reset(new QueryMemoryDescriptor(
data_mgr_, config_, 0, QueryDescriptionType::Projection, false));
}

query_comp_descs_owned.insert(std::make_pair(dt, std::move(query_comp_desc_owned)));
Expand Down Expand Up @@ -2093,7 +2093,8 @@ ResultSetPtr Executor::executeTableFunction(
INJECT_TIMER(Exec_executeTableFunction);

if (eo.just_validate) {
QueryMemoryDescriptor query_mem_desc(this,
QueryMemoryDescriptor query_mem_desc(data_mgr_,
config_,
/*entry_count=*/0,
QueryDescriptionType::Projection,
/*is_table_function=*/true);
Expand Down Expand Up @@ -2263,6 +2264,7 @@ void fill_entries_for_empty_input(std::vector<TargetInfo>& target_infos,
}

ResultSetPtr build_row_for_empty_input(
const Executor* executor,
const std::vector<const hdk::ir::Expr*>& target_exprs_in,
const QueryMemoryDescriptor& query_mem_desc,
const ExecutorDeviceType device_type) {
Expand All @@ -2288,7 +2290,6 @@ ResultSetPtr build_row_for_empty_input(
}
std::vector<TargetInfo> target_infos;
std::vector<int64_t> entry;
const auto executor = query_mem_desc.getExecutor();
fill_entries_for_empty_input(target_infos,
entry,
target_exprs,
Expand Down Expand Up @@ -2322,7 +2323,7 @@ ResultSetPtr Executor::collectAllDeviceResults(
if (result_per_device.empty() && query_mem_desc.getQueryDescriptionType() ==
QueryDescriptionType::NonGroupedAggregate) {
return build_row_for_empty_input(
ra_exe_unit.target_exprs, query_mem_desc, device_type);
this, ra_exe_unit.target_exprs, query_mem_desc, device_type);
}
if (use_speculative_top_n(ra_exe_unit, query_mem_desc)) {
try {
Expand Down
3 changes: 2 additions & 1 deletion omniscidb/QueryEngine/Execute.h
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,8 @@ class Executor {
llvm::LLVMContext& getContext() { return *context_.get(); }
void update_extension_modules(bool update_runtime_modules_only = false);

bool isLazyFetchAllowed() const { return plan_state_->allow_lazy_fetch_; }

private:
std::vector<int8_t> serializeLiterals(
const std::unordered_map<int, CgenState::LiteralValues>& literals,
Expand Down Expand Up @@ -1074,7 +1076,6 @@ class Executor {
friend class HashJoin; // cgen_state_
friend class RowFuncBuilder;
friend class QueryCompilationDescriptor;
friend class QueryMemoryDescriptor;
friend class QueryMemoryInitializer;
friend class QueryFragmentDescriptor;
friend class QueryExecutionContext;
Expand Down
58 changes: 30 additions & 28 deletions omniscidb/QueryEngine/ExecutionKernel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,19 +358,20 @@ void ExecutionKernel::runImpl(Executor* executor,
if (eo.executor_type == ExecutorType::Native) {
try {
query_exe_context_owned =
query_mem_desc.getQueryExecutionContext(ra_exe_unit_,
executor,
chosen_device_type,
kernel_dispatch_mode,
query_comp_desc.useGroupByBufferDesc(),
chosen_device_id,
total_num_input_rows,
fetch_result->col_buffers,
fetch_result->frag_offsets,
executor->getRowSetMemoryOwner(),
compilation_result.output_columnar,
query_mem_desc.sortOnGpu(),
thread_idx);
QueryExecutionContext::create(ra_exe_unit_,
query_mem_desc,
executor,
chosen_device_type,
kernel_dispatch_mode,
query_comp_desc.useGroupByBufferDesc(),
chosen_device_id,
total_num_input_rows,
fetch_result->col_buffers,
fetch_result->frag_offsets,
executor->getRowSetMemoryOwner(),
compilation_result.output_columnar,
query_mem_desc.sortOnGpu(),
thread_idx);
} catch (const OutOfHostMemory& e) {
throw QueryExecutionError(Executor::ERR_OUT_OF_CPU_MEM);
}
Expand Down Expand Up @@ -484,21 +485,22 @@ void KernelSubtask::runImpl(Executor* executor) {
std::vector<std::vector<uint64_t>> frag_offsets(
fetch_result_->frag_offsets.size(),
std::vector<uint64_t>(fetch_result_->frag_offsets[0].size()));
query_exe_context_owned = kernel_.query_mem_desc.getQueryExecutionContext(
kernel_.ra_exe_unit_,
executor,
kernel_.chosen_device_type,
kernel_.kernel_dispatch_mode,
kernel_.query_comp_desc.useGroupByBufferDesc(),
kernel_.chosen_device_id,
total_num_input_rows_,
col_buffers,
frag_offsets,
executor->getRowSetMemoryOwner(),
compilation_result.output_columnar,
kernel_.query_mem_desc.sortOnGpu(),
// TODO: use TBB thread id to choose allocator
thread_idx_);
query_exe_context_owned =
QueryExecutionContext::create(kernel_.ra_exe_unit_,
kernel_.query_mem_desc,
executor,
kernel_.chosen_device_type,
kernel_.kernel_dispatch_mode,
kernel_.query_comp_desc.useGroupByBufferDesc(),
kernel_.chosen_device_id,
total_num_input_rows_,
col_buffers,
frag_offsets,
executor->getRowSetMemoryOwner(),
compilation_result.output_columnar,
kernel_.query_mem_desc.sortOnGpu(),
// TODO: use TBB thread id to choose allocator
thread_idx_);
} catch (const OutOfHostMemory& e) {
throw QueryExecutionError(Executor::ERR_OUT_OF_CPU_MEM);
}
Expand Down
Loading