Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](Paimon) support deletion vector for Paimon naive reader (#34743) #35241

Merged
merged 1 commit into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/io/fs/hdfs_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ uint64 HdfsFileSystemCache::_hdfs_hash_code(const THdfsParams& hdfs_params,
} else if (hdfs_params.__isset.fs_name) {
hash_code ^= Fingerprint(hdfs_params.fs_name);
}

if (hdfs_params.__isset.user) {
hash_code ^= Fingerprint(hdfs_params.user);
}
Expand Down
81 changes: 81 additions & 0 deletions be/src/util/deletion_vector.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <algorithm>
#include <cstdint>
#include <cstring>
#include <stdexcept>

#include "common/status.h"
#include "roaring/roaring.hh"

namespace doris {
class DeletionVector {
public:
const static uint32_t MAGIC_NUMBER = 1581511376;
DeletionVector(roaring::Roaring roaring_bitmap) : _roaring_bitmap(std::move(roaring_bitmap)) {};
~DeletionVector() = default;

bool checked_delete(uint32_t postition) { return _roaring_bitmap.addChecked(postition); }

bool is_delete(uint32_t postition) const { return _roaring_bitmap.contains(postition); }

bool is_empty() const { return _roaring_bitmap.isEmpty(); }

uint32_t maximum() const { return _roaring_bitmap.maximum(); }

uint32_t minimum() const { return _roaring_bitmap.minimum(); }

static Result<DeletionVector> deserialize(const char* buf, size_t length) {
uint32_t actual_length;
std::memcpy(reinterpret_cast<char*>(&actual_length), buf, 4);
// change byte order to big endian
std::reverse(reinterpret_cast<char*>(&actual_length),
reinterpret_cast<char*>(&actual_length) + 4);
buf += 4;
if (actual_length != length - 4) {
return ResultError(
Status::RuntimeError("DeletionVector deserialize error: length not match, "
"actual length: {}, expect length: {}",
actual_length, length - 4));
}
uint32_t magic_number;
std::memcpy(reinterpret_cast<char*>(&magic_number), buf, 4);
// change byte order to big endian
std::reverse(reinterpret_cast<char*>(&magic_number),
reinterpret_cast<char*>(&magic_number) + 4);
buf += 4;
if (magic_number != MAGIC_NUMBER) {
return ResultError(Status::RuntimeError(
"DeletionVector deserialize error: invalid magic number {}", magic_number));
}
roaring::Roaring roaring_bitmap;
try {
roaring_bitmap = roaring::Roaring::readSafe(buf, length);
} catch (std::runtime_error) {
return ResultError(Status::RuntimeError(
"DeletionVector deserialize error: failed to deserialize roaring bitmap"));
}
return DeletionVector(roaring_bitmap);
}

private:
roaring::Roaring _roaring_bitmap;
};
} // namespace doris
20 changes: 15 additions & 5 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ void OrcReader::_collect_profile_before_close() {
COUNTER_UPDATE(_orc_profile.set_fill_column_time, _statistics.set_fill_column_time);
COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time);
COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time);
COUNTER_UPDATE(_orc_profile.filter_block_time, _statistics.filter_block_time);

if (_file_input_stream != nullptr) {
_file_input_stream->collect_profile_before_close();
Expand Down Expand Up @@ -224,6 +225,8 @@ void OrcReader::_init_profile() {
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeValueTime", orc_profile, 1);
_orc_profile.decode_null_map_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeNullMapTime", orc_profile, 1);
_orc_profile.filter_block_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "FilterBlockTime", orc_profile, 1);
}
}

Expand Down Expand Up @@ -1605,8 +1608,11 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
return Status::OK();
}
_execute_filter_position_delete_rowids(*_filter);

RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter, *_filter));
{
SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, *_filter));
}
if (!_not_single_slot_filter_conjuncts.empty()) {
static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec));
RETURN_IF_CATCH_EXCEPTION(
Expand Down Expand Up @@ -1733,8 +1739,11 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
return Status::OK();
}
_execute_filter_position_delete_rowids(result_filter);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, result_filter));
{
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, result_filter));
}
if (!_not_single_slot_filter_conjuncts.empty()) {
static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec));
RETURN_IF_CATCH_EXCEPTION(
Expand All @@ -1748,15 +1757,16 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
} else {
if (_delete_rows_filter_ptr) {
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter,
(*_delete_rows_filter_ptr)));
} else {
std::unique_ptr<IColumn::Filter> filter(new IColumn::Filter(block->rows(), 1));
_execute_filter_position_delete_rowids(*filter);
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, (*filter)));
}

Block::erase_useless_column(block, column_to_keep);
static_cast<void>(_convert_dict_cols_to_string_cols(block, &batch_vec));
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class OrcReader : public GenericReader {
int64_t set_fill_column_time = 0;
int64_t decode_value_time = 0;
int64_t decode_null_map_time = 0;
int64_t filter_block_time = 0;
};

OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params,
Expand Down Expand Up @@ -222,6 +223,7 @@ class OrcReader : public GenericReader {
RuntimeProfile::Counter* set_fill_column_time = nullptr;
RuntimeProfile::Counter* decode_value_time = nullptr;
RuntimeProfile::Counter* decode_null_map_time = nullptr;
RuntimeProfile::Counter* filter_block_time = nullptr;
};

class ORCFilterImpl : public orc::ORCFilter {
Expand Down
11 changes: 0 additions & 11 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,6 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool*
return _shrink_block_if_need(block);
}

Status IcebergTableReader::set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
return _file_format_reader->set_fill_columns(partition_columns, missing_columns);
}

bool IcebergTableReader::fill_all_columns() const {
return _file_format_reader->fill_all_columns();
};

Status IcebergTableReader::get_columns(
std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
Expand Down
7 changes: 0 additions & 7 deletions be/src/vec/exec/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,6 @@ class IcebergTableReader : public TableFormatReader {

Status get_next_block(Block* block, size_t* read_rows, bool* eof) final;

Status set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) final;

bool fill_all_columns() const final;

Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) final;

Expand Down
94 changes: 94 additions & 0 deletions be/src/vec/exec/format/table/paimon_jni_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "paimon_jni_reader.h"

#include <map>
#include <ostream>

#include "runtime/descriptors.h"
#include "runtime/types.h"
#include "vec/core/types.h"

namespace doris {
class RuntimeProfile;
class RuntimeState;

namespace vectorized {
class Block;
} // namespace vectorized
} // namespace doris

namespace doris::vectorized {

const std::string PaimonJniReader::PAIMON_OPTION_PREFIX = "paimon_option_prefix.";

PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state, RuntimeProfile* profile,
const TFileRangeDesc& range)
: _file_slot_descs(file_slot_descs), _state(state), _profile(profile) {
std::vector<std::string> column_names;
std::vector<std::string> column_types;
for (auto& desc : _file_slot_descs) {
column_names.emplace_back(desc->col_name());
column_types.emplace_back(JniConnector::get_jni_type(desc->type()));
}
std::map<String, String> params;
params["db_name"] = range.table_format_params.paimon_params.db_name;
params["table_name"] = range.table_format_params.paimon_params.table_name;
params["paimon_split"] = range.table_format_params.paimon_params.paimon_split;
params["paimon_column_names"] = range.table_format_params.paimon_params.paimon_column_names;
params["paimon_predicate"] = range.table_format_params.paimon_params.paimon_predicate;
params["ctl_id"] = std::to_string(range.table_format_params.paimon_params.ctl_id);
params["db_id"] = std::to_string(range.table_format_params.paimon_params.db_id);
params["tbl_id"] = std::to_string(range.table_format_params.paimon_params.tbl_id);
params["last_update_time"] =
std::to_string(range.table_format_params.paimon_params.last_update_time);
params["required_fields"] = join(column_names, ",");
params["columns_types"] = join(column_types, "#");

// Used to create paimon option
for (auto& kv : range.table_format_params.paimon_params.paimon_options) {
params[PAIMON_OPTION_PREFIX + kv.first] = kv.second;
}
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/paimon/PaimonJniScanner",
params, column_names);
}

Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
RETURN_IF_ERROR(_jni_connector->get_next_block(block, read_rows, eof));
if (*eof) {
RETURN_IF_ERROR(_jni_connector->close());
}
return Status::OK();
}

Status PaimonJniReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
for (auto& desc : _file_slot_descs) {
name_to_type->emplace(desc->col_name(), desc->type());
}
return Status::OK();
}

Status PaimonJniReader::init_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
return _jni_connector->open(_state, _profile);
}
} // namespace doris::vectorized
77 changes: 77 additions & 0 deletions be/src/vec/exec/format/table/paimon_jni_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstddef>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "common/status.h"
#include "exec/olap_common.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/format/table/table_format_reader.h"
#include "vec/exec/jni_connector.h"

namespace doris {
class RuntimeProfile;
class RuntimeState;
class SlotDescriptor;
namespace vectorized {
class Block;
} // namespace vectorized
struct TypeDescriptor;
} // namespace doris

namespace doris::vectorized {

/**
* The demo usage of JniReader, showing how to read data from java scanner.
* The java side is also a mock reader that provide values for each type.
* This class will only be retained during the functional testing phase to verify that
* the communication and data exchange with the jvm are correct.
*/
class PaimonJniReader : public GenericReader {
ENABLE_FACTORY_CREATOR(PaimonJniReader);

public:
static const std::string PAIMON_OPTION_PREFIX;
PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state,
RuntimeProfile* profile, const TFileRangeDesc& range);

~PaimonJniReader() override = default;

Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;

Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;

Status init_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);

private:
const std::vector<SlotDescriptor*>& _file_slot_descs;
RuntimeState* _state = nullptr;
RuntimeProfile* _profile = nullptr;
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
std::unique_ptr<JniConnector> _jni_connector;
};

} // namespace doris::vectorized
Loading
Loading