Skip to content

Commit

Permalink
[StarRocks On ES] Support array format. (#9693)
Browse files Browse the repository at this point in the history
(cherry picked from commit 6ae4a62)
  • Loading branch information
Smith-Cruise authored and wanpengfei-git committed Aug 11, 2022
1 parent 3532a37 commit ef581b7
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 19 deletions.
92 changes: 83 additions & 9 deletions be/src/exec/es/es_scroll_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <fmt/format.h>

#include "column/array_column.h"
#include "column/column_helper.h"
#include "column/nullable_column.h"
#include "common/config.h"
Expand Down Expand Up @@ -107,7 +108,7 @@ static Status get_int_value(const rapidjson::Value& col, PrimitiveType type, voi

ScrollParser::ScrollParser(bool doc_value_mode)
: _tuple_desc(nullptr),
_docvalue_context(nullptr),
_doc_value_context(nullptr),
_size(0),
_cur_line(0),
_doc_value_mode(doc_value_mode),
Expand Down Expand Up @@ -173,7 +174,7 @@ Status ScrollParser::fill_chunk(RuntimeState* state, ChunkPtr* chunk, bool* line
// TODO: we could fill chunk by column rather than row
for (size_t i = 0; i < fill_sz; ++i) {
const rapidjson::Value& obj = _inner_hits_node[_cur_line + i];
bool pure_doc_value = _pure_doc_value(obj);
bool pure_doc_value = _is_pure_doc_value(obj);
bool has_source = obj.HasMember(FIELD_SOURCE);
bool has_fields = obj.HasMember(FIELD_FIELDS);

Expand Down Expand Up @@ -221,7 +222,7 @@ Status ScrollParser::fill_chunk(RuntimeState* state, ChunkPtr* chunk, bool* line
// if pure_doc_value enabled, docvalue_context must contains the key
// todo: need move all `pure_docvalue` for every tuple outside fill_tuple
// should check pure_docvalue for one table scan not every tuple
const char* col_name = pure_doc_value ? _docvalue_context->at(slot_desc->col_name()).c_str()
const char* col_name = pure_doc_value ? _doc_value_context->at(slot_desc->col_name()).c_str()
: slot_desc->col_name().c_str();

auto has_col = line.HasMember(col_name);
Expand All @@ -231,8 +232,7 @@ Status ScrollParser::fill_chunk(RuntimeState* state, ChunkPtr* chunk, bool* line
bool is_null = col.IsNull() || (pure_doc_value && col.IsArray() && (col.Empty() || col[0].IsNull()));
if (!is_null) {
// append value from ES to column
RETURN_IF_ERROR(
_append_value_from_json_val(column.get(), slot_desc->type().type, col, pure_doc_value));
RETURN_IF_ERROR(_append_value_from_json_val(column.get(), slot_desc->type(), col, pure_doc_value));
continue;
}
// handle null col
Expand All @@ -255,10 +255,10 @@ Status ScrollParser::fill_chunk(RuntimeState* state, ChunkPtr* chunk, bool* line
void ScrollParser::set_params(const TupleDescriptor* descs,
const std::map<std::string, std::string>* docvalue_context) {
_tuple_desc = descs;
_docvalue_context = docvalue_context;
_doc_value_context = docvalue_context;
}

bool ScrollParser::_pure_doc_value(const rapidjson::Value& obj) {
bool ScrollParser::_is_pure_doc_value(const rapidjson::Value& obj) {
if (obj.HasMember(FIELD_FIELDS)) {
return true;
}
Expand Down Expand Up @@ -288,8 +288,9 @@ void ScrollParser::_append_null(Column* column) {
column->append_default();
}

Status ScrollParser::_append_value_from_json_val(Column* column, PrimitiveType type, const rapidjson::Value& col,
bool pure_doc_value) {
Status ScrollParser::_append_value_from_json_val(Column* column, const TypeDescriptor& type_desc,
const rapidjson::Value& col, bool pure_doc_value) {
PrimitiveType type = type_desc.type;
switch (type) {
case TYPE_CHAR:
case TYPE_VARCHAR: {
Expand Down Expand Up @@ -351,6 +352,10 @@ Status ScrollParser::_append_value_from_json_val(Column* column, PrimitiveType t
RETURN_IF_ERROR(_append_date_val<TYPE_DATETIME>(col, column, pure_doc_value));
break;
}
case TYPE_ARRAY: {
RETURN_IF_ERROR(_append_array_val(col, type_desc, column, pure_doc_value));
break;
}
default: {
DCHECK(false) << "unknown type:" << type;
return Status::InvalidArgument(fmt::format("unknown type {}", type));
Expand Down Expand Up @@ -496,6 +501,75 @@ Status ScrollParser::_append_bool_val(const rapidjson::Value& col, Column* colum

return Status::OK();
}

Status ScrollParser::_append_array_val(const rapidjson::Value& col, const TypeDescriptor& type_desc, Column* column,
bool pure_doc_value) {
// Array type must have child type.
const auto& child_type = type_desc.children[0];
DCHECK(child_type.type != INVALID_TYPE);

// In Elasticsearch, n-dimensional array will be flattened into one-dimensional array.
// https://www.elastic.co/guide/en/elasticsearch/reference/8.3/array.html
// So we do not support user to create nested array column.
// TODO: We should prevent user to create nested array column in FE, but we don't do any schema validation now.
if (child_type.type == TYPE_ARRAY) {
std::string str = fmt::format("Invalid array format; Document slice is: {}.", json_value_to_string(col));
return Status::RuntimeError(str);
}

ArrayColumn* array = nullptr;

if (column->is_nullable()) {
auto* nullable_column = down_cast<NullableColumn*>(column);
auto* data_column = nullable_column->data_column().get();
NullData& null_data = nullable_column->null_column_data();
null_data.push_back(0);
array = down_cast<ArrayColumn*>(data_column);
} else {
array = down_cast<ArrayColumn*>(column);
}

auto* offsets = array->offsets_column().get();
auto* elements = array->elements_column().get();

if (pure_doc_value) {
RETURN_IF_ERROR(_append_array_val_from_docvalue(col, child_type, elements));
} else {
RETURN_IF_ERROR(_append_array_val_from_source(col, child_type, elements));
}

size_t new_size = elements->size();
offsets->append(new_size);
return Status::OK();
}

Status ScrollParser::_append_array_val_from_docvalue(const rapidjson::Value& val, const TypeDescriptor& child_type_desc,
Column* column) {
for (auto& item : val.GetArray()) {
RETURN_IF_ERROR(_append_value_from_json_val(column, child_type_desc, item, true));
}
return Status::OK();
}

Status ScrollParser::_append_array_val_from_source(const rapidjson::Value& val, const TypeDescriptor& child_type_desc,
Column* column) {
if (val.IsNull()) {
// Ignore null item in _source.
return Status::OK();
}

if (!val.IsArray()) {
// For one item situation, like "1" should be treated as "[1]".
RETURN_IF_ERROR(_append_value_from_json_val(column, child_type_desc, val, false));
return Status::OK();
}

for (auto& item : val.GetArray()) {
RETURN_IF_ERROR(_append_array_val_from_source(item, child_type_desc, column));
}
return Status::OK();
}

// TODO: test here
template <PrimitiveType type, typename T>
Status ScrollParser::_append_date_val(const rapidjson::Value& col, Column* column, bool pure_doc_value) {
Expand Down
33 changes: 25 additions & 8 deletions be/src/exec/es/es_scroll_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,55 @@ class ScrollParser {
void set_params(const TupleDescriptor* descs, const std::map<std::string, std::string>* docvalue_context);

private:
static bool _pure_doc_value(const rapidjson::Value& obj);
static bool _is_pure_doc_value(const rapidjson::Value& obj);

template <PrimitiveType type, class CppType = RunTimeCppType<type>>
static void _append_data(Column* column, CppType& value);

static void _append_null(Column* column);

Status _append_value_from_json_val(Column* column, PrimitiveType type, const rapidjson::Value& col,
Status _append_value_from_json_val(Column* column, const TypeDescriptor& type_desc, const rapidjson::Value& col,
bool pure_doc_value);

Slice _json_val_to_slice(const rapidjson::Value& val);

template <PrimitiveType type, typename T = RunTimeCppType<type>>
Status _append_int_val(const rapidjson::Value& col, Column* column, bool pure_doc_value);
static Status _append_int_val(const rapidjson::Value& col, Column* column, bool pure_doc_value);

template <PrimitiveType type, typename T = RunTimeCppType<type>>
Status _append_float_val(const rapidjson::Value& col, Column* column, bool pure_doc_value);
static Status _append_float_val(const rapidjson::Value& col, Column* column, bool pure_doc_value);

Status _append_bool_val(const rapidjson::Value& col, Column* column, bool pure_doc_value);
static Status _append_bool_val(const rapidjson::Value& col, Column* column, bool pure_doc_value);

template <PrimitiveType type, typename T = RunTimeCppType<type>>
Status _append_date_val(const rapidjson::Value& col, Column* column, bool pure_doc_value);
static Status _append_date_val(const rapidjson::Value& col, Column* column, bool pure_doc_value);

// The representation of array value in _source and doc_value (column storage) is inconsistent
// in Elasticsearch, we need to do different processing to show consistent behavior.
// For examples:
// origin array: [1] -> _source: "1" -> doc_values: "[1]"
// origin array: [1, 2] -> _source: "[1, 2]" -> doc_values: "[1, 2]"
// origin array: [1, [2, 3]] -> _source: "[1, [2, 3]]" -> doc_values: "[1, 2, 3]"
// origin array: [1, [null, 3]] -> _source: "[1, [null, 3]]" -> doc_values: "[1, 3]"
Status _append_array_val(const rapidjson::Value& col, const TypeDescriptor& type_desc, Column* column,
bool pure_doc_value);

Status _append_array_val_from_docvalue(const rapidjson::Value& val, const TypeDescriptor& child_type_desc,
Column* column);

// This is a recursive function.
Status _append_array_val_from_source(const rapidjson::Value& val, const TypeDescriptor& child_type_desc,
Column* column);

private:
const TupleDescriptor* _tuple_desc;
const std::map<std::string, std::string>* _docvalue_context;
const std::map<std::string, std::string>* _doc_value_context;

std::string _scroll_id;
size_t _size;
rapidjson::SizeType _cur_line;
rapidjson::Document _document_node;
rapidjson::Value _inner_hits_node;
// TODO: This value assigned but never used.
bool _doc_value_mode;

rapidjson::StringBuffer _scratch_buffer;
Expand Down
5 changes: 3 additions & 2 deletions be/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ set(EXEC_FILES
./fs/fs_s3_test.cpp
./fs/fs_test.cpp
./fs/output_stream_wrapper_test.cpp
./exec/es_query_builder_test.cpp
./exec/column_value_range_test.cpp
./exec/vectorized/agg_hash_map_test.cpp
./exec/vectorized/csv_scanner_test.cpp
Expand All @@ -45,7 +44,9 @@ set(EXEC_FILES
./exec/vectorized/arrow_converter_test.cpp
./exec/vectorized/repeat_node_test.cpp
./exec/vectorized/analytor_test.cpp
./exec/es_scan_reader_test.cpp
./exec/es/es_query_builder_test.cpp
./exec/es/es_scan_reader_test.cpp
./exec/es/es_scroll_parser_test.cpp
./exec/vectorized/hdfs_scan_node_test.cpp
./exec/pipeline/pipeline_test_base.cpp
./exec/pipeline/pipeline_control_flow_test.cpp
Expand Down
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit ef581b7

Please sign in to comment.