Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
Introduce ResultSetTableToken::toArrow and use it in PyHDK.
Browse files Browse the repository at this point in the history
Signed-off-by: ienkovich <[email protected]>
  • Loading branch information
ienkovich authored and alexbaden committed Jun 29, 2023
1 parent 5858692 commit 1a5aace
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 12 deletions.
4 changes: 4 additions & 0 deletions omniscidb/ResultSet/ResultSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,10 @@ bool ResultSet::hasColNames() const {
return targets_.size() == fields_.size();
}

const std::vector<std::string>& ResultSet::getColNames() const {
return fields_;
}

std::string ResultSet::colName(size_t col_idx) const {
if (fields_.empty()) {
return "col" + std::to_string(col_idx);
Expand Down
1 change: 1 addition & 0 deletions omniscidb/ResultSet/ResultSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ class ResultSet {

void setColNames(std::vector<std::string> fields);
bool hasColNames() const;
const std::vector<std::string>& getColNames() const;
std::string colName(size_t col_idx) const;

/**
Expand Down
8 changes: 8 additions & 0 deletions omniscidb/ResultSetRegistry/ResultSetRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ ResultSetTableTokenPtr ResultSetRegistry::head(const ResultSetTableToken& token,
}
}

// Copy column names to the resulting table.
auto* first_rs = table->fragments.front().rs.get();
new_results.front()->setColNames(first_rs->getColNames());

data_lock.unlock();
table_lock.unlock();

Expand Down Expand Up @@ -256,6 +260,10 @@ ResultSetTableTokenPtr ResultSetRegistry::tail(const ResultSetTableToken& token,
}
}

// Copy column names to the resulting table.
auto* first_rs = table->fragments.front().rs.get();
new_results.front()->setColNames(first_rs->getColNames());

data_lock.unlock();
table_lock.unlock();

Expand Down
24 changes: 24 additions & 0 deletions omniscidb/ResultSetRegistry/ResultSetTableToken.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include "ResultSetTableToken.h"
#include "ResultSetRegistry.h"

#include "ResultSet/ArrowResultSet.h"
#include "Shared/ArrowUtil.h"

namespace hdk {

ResultSetTableToken::ResultSetTableToken(TableInfoPtr tinfo,
Expand Down Expand Up @@ -43,4 +46,25 @@ ResultSetTableTokenPtr ResultSetTableToken::tail(size_t n) const {
return registry_->tail(*this, n);
}

std::shared_ptr<arrow::Table> ResultSetTableToken::toArrow() const {
auto first_rs = resultSet(0);
std::vector<std::string> col_names;
for (size_t col_idx = 0; col_idx < first_rs->colCount(); ++col_idx) {
col_names.push_back(first_rs->colName(col_idx));
}

std::vector<std::shared_ptr<arrow::Table>> converted_tables;
for (size_t rs_idx = 0; rs_idx < resultSetCount(); ++rs_idx) {
ArrowResultSetConverter converter(resultSet(rs_idx), col_names, -1);
converted_tables.push_back(converter.convertToArrowTable());
}

if (converted_tables.size() == (size_t)1) {
return converted_tables.front();
}

ARROW_ASSIGN_OR_THROW(auto res, arrow::ConcatenateTables(converted_tables));
return res;
}

} // namespace hdk
4 changes: 4 additions & 0 deletions omniscidb/ResultSetRegistry/ResultSetTableToken.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "DataMgr/ChunkMetadata.h"
#include "SchemaMgr/TableInfo.h"

#include "arrow/api.h"

namespace hdk {

class ResultSetRegistry;
Expand Down Expand Up @@ -55,6 +57,8 @@ class ResultSetTableToken : public std::enable_shared_from_this<ResultSetTableTo
ResultSetTableTokenPtr head(size_t n) const;
ResultSetTableTokenPtr tail(size_t n) const;

std::shared_ptr<arrow::Table> toArrow() const;

std::string toString() const {
return "ResultSetTableToken(" + std::to_string(dbId()) + ":" +
std::to_string(tableId()) + ")";
Expand Down
20 changes: 20 additions & 0 deletions omniscidb/Tests/ResultSetArrowConversion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,26 @@ TEST(ArrowTable, FixedLenArrays) {
}
}

TEST(ArrowTable, MultifragResult) {
bool prev_enable_multifrag_execution_result =
config().exec.enable_multifrag_execution_result;
ScopeGuard reset = [prev_enable_multifrag_execution_result] {
config().exec.enable_multifrag_execution_result =
prev_enable_multifrag_execution_result;
};

config().exec.enable_multifrag_execution_result = true;

auto res = runSqlQuery("select * from test_chunked;", ExecutorDeviceType::CPU, false);
// Expect two fragments in the result and two batches in converted table.
CHECK_EQ(res.getToken()->resultSetCount(), (size_t)2);
auto table = res.getToken()->toArrow();
CHECK_EQ(table->column(1)->num_chunks(), 2);
compare_columns(table6x4_col_i64, table->column(1));
compare_columns(table6x4_col_bi, table->column(2));
compare_columns(table6x4_col_d, table->column(3));
}

int main(int argc, char* argv[]) {
testing::InitGoogleTest(&argc, argv);
TestHelpers::init_logger_stderr_only(argc, argv);
Expand Down
10 changes: 10 additions & 0 deletions python/pyhdk/_sql.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ from libcpp.memory cimport shared_ptr, unique_ptr
from libcpp.string cimport string
from libcpp.vector cimport vector

from pyarrow.lib cimport CTable as CArrowTable

from pyhdk._common cimport CConfig, CType
from pyhdk._storage cimport CSchemaProvider, CSchemaProviderPtr, CDataProvider, CDataMgr, CBufferProvider
from pyhdk._execute cimport CExecutor, CResultSetPtr, CCompilationOptions, CExecutionOptions, CTargetMetaInfo
Expand Down Expand Up @@ -50,6 +52,13 @@ cdef extern from "omniscidb/QueryEngine/RelAlgDagBuilder.h":
cdef cppclass CRelAlgDagBuilder "RelAlgDagBuilder"(CQueryDag):
CRelAlgDagBuilder(const string&, int, CSchemaProviderPtr, shared_ptr[CConfig]) except +

cdef extern from "omniscidb/ResultSetRegistry/ResultSetTableToken.h":
cdef cppclass CResultSetTableToken "hdk::ResultSetTableToken":
size_t rowCount()
shared_ptr[CArrowTable] toArrow() except +

ctypedef shared_ptr[const CResultSetTableToken] CResultSetTableTokenPtr

cdef extern from "omniscidb/QueryEngine/Descriptors/RelAlgExecutionDescriptor.h":
cdef cppclass CExecutionResult "ExecutionResult":
CExecutionResult()
Expand All @@ -60,6 +69,7 @@ cdef extern from "omniscidb/QueryEngine/Descriptors/RelAlgExecutionDescriptor.h"
const vector[CTargetMetaInfo]& getTargetsMeta()
string getExplanation()
const string& tableName()
CResultSetTableTokenPtr getToken()

CExecutionResult head(size_t) except +
CExecutionResult tail(size_t) except +
Expand Down
16 changes: 4 additions & 12 deletions python/pyhdk/_sql.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,12 @@ cdef extract_array_value(const CArrayTargetValue *array, const CType *c_type):

cdef class ExecutionResult:
def row_count(self):
cdef shared_ptr[CResultSet] c_res
c_res = self.c_result.getRows()
return int(c_res.get().rowCount())
cdef CResultSetTableTokenPtr c_token = self.c_result.getToken()
return int(c_token.get().rowCount())

def to_arrow(self):
cdef vector[string] col_names
cdef vector[CTargetMetaInfo].const_iterator it = self.c_result.getTargetsMeta().const_begin()

while it != self.c_result.getTargetsMeta().const_end():
col_names.push_back(dereference(it).get_resname())
preincrement(it)

cdef unique_ptr[CArrowResultSetConverter] converter = make_unique[CArrowResultSetConverter](self.c_result.getRows(), col_names, -1)
cdef shared_ptr[CArrowTable] at = converter.get().convertToArrowTable()
cdef CResultSetTableTokenPtr c_token = self.c_result.getToken()
cdef shared_ptr[CArrowTable] at = c_token.get().toArrow()
return pyarrow_wrap_table(at)

def to_explain_str(self):
Expand Down

0 comments on commit 1a5aace

Please sign in to comment.