Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and encapsulate for pre-decode #201

Merged
merged 7 commits into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 51 additions & 64 deletions dbms/src/Storages/Transaction/RegionBlockReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionBlockReader.h>
#include <Storages/Transaction/TiDB.h>
#include <sparsehash/dense_hash_map>
#include <sparsehash/dense_hash_set>
#include <Storages/Transaction/RegionBlockReaderHelper.hpp>

namespace DB
{
Expand Down Expand Up @@ -127,13 +126,12 @@ void setPKVersionDel(ColumnUInt8 & delmark_col,
}
}

using ColumnIdToInfoIndexMap = google::dense_hash_map<ColumnID, UInt16>;
using ColumnIdToInfoIndexMap = google::dense_hash_map<ColumnID, size_t>;
using SchemaAllColumnIds = google::dense_hash_set<ColumnID>;

/// DecodeRowSkip function will try to jump over unnecessary field.
bool DecodeRowSkip(const TiKVValue & value, const ColumnIdToInfoIndexMap & column_id_to_info_index,
const SchemaAllColumnIds & schema_all_column_ids, DecodedRow & additional_decoded_row,
std::vector<DecodedRow::const_iterator> & decoded_col_iter, const bool force_decode)
const SchemaAllColumnIds & schema_all_column_ids, DecodedRecordData & decoded_data, const bool force_decode)
{
const String & raw_value = value.getStr();
size_t cursor = 0;
Expand Down Expand Up @@ -162,8 +160,7 @@ bool DecodeRowSkip(const TiKVValue & value, const ColumnIdToInfoIndexMap & colum
}
else
{
additional_decoded_row.emplace_back(col_id, DecodeDatum(cursor, raw_value));
decoded_col_iter.emplace_back(additional_decoded_row.cend() - 1);
decoded_data.emplace_back(col_id, DecodeDatum(cursor, raw_value));
}
}

Expand All @@ -179,8 +176,7 @@ bool DecodeRowSkip(const TiKVValue & value, const ColumnIdToInfoIndexMap & colum

/// DecodeRow function will try to get pre-decoded fields from value, if is none, just decode its str.
bool DecodeRow(const TiKVValue & value, const ColumnIdToInfoIndexMap & column_id_to_info_index,
const SchemaAllColumnIds & schema_all_column_ids, DecodedRow & additional_decoded_row,
std::vector<DecodedRow::const_iterator> & decoded_col_iter, const bool force_decode)
const SchemaAllColumnIds & schema_all_column_ids, DecodedRecordData & decoded_data, const bool force_decode)
{
auto & decoded_row_info = value.extraInfo();
const DecodedRow * id_fields_ptr = decoded_row_info.load();
Expand All @@ -190,7 +186,7 @@ bool DecodeRow(const TiKVValue & value, const ColumnIdToInfoIndexMap & column_id

const DecodedRow & id_fields = *id_fields_ptr;

for (auto it = id_fields.begin(); it != id_fields.end(); ++it)
for (auto it = id_fields.cbegin(); it != id_fields.cend(); ++it)
{
const auto & ele = *it;
const auto & col_id = ele.col_id;
Expand All @@ -204,7 +200,7 @@ bool DecodeRow(const TiKVValue & value, const ColumnIdToInfoIndexMap & column_id

if (column_id_to_info_index.count(col_id))
{
decoded_col_iter.emplace_back(it);
decoded_data.push_back(it);
}
}

Expand All @@ -215,7 +211,7 @@ bool DecodeRow(const TiKVValue & value, const ColumnIdToInfoIndexMap & column_id
}
else
{
return DecodeRowSkip(value, column_id_to_info_index, schema_all_column_ids, additional_decoded_row, decoded_col_iter, force_decode);
return DecodeRowSkip(value, column_id_to_info_index, schema_all_column_ids, decoded_data, force_decode);
}
}

Expand All @@ -232,21 +228,19 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,

ColumnID handle_col_id = InvalidColumnID;

constexpr size_t MustHaveColCnt = 3; // pk, del, version
constexpr ColumnID EmptyColumnID = InvalidColumnID - 1;

using ColTypePair = std::pair<MutableColumnPtr, NameAndTypePair>;
google::dense_hash_map<ColumnID, std::shared_ptr<ColTypePair>> column_map;
column_map.set_empty_key(EmptyColumnID);
// column_map contains columns in column_names_to_read exclude del and version.
ColumnDataInfoMap column_map(column_names_to_read.size() - MustHaveColCnt + 1, EmptyColumnID);

// column_id_to_info_index contains columns in column_names_to_read exclude pk, del and version
ColumnIdToInfoIndexMap column_id_to_info_index;
column_id_to_info_index.set_empty_key(EmptyColumnID);

SchemaAllColumnIds schema_all_column_ids;
schema_all_column_ids.set_empty_key(EmptyColumnID);

if (table_info.columns.size() > std::numeric_limits<ColumnIdToInfoIndexMap::mapped_type>::max())
throw Exception("Too many columns in schema", ErrorCodes::LOGICAL_ERROR);

for (size_t i = 0; i < table_info.columns.size(); i++)
{
auto & column_info = table_info.columns[i];
Expand All @@ -257,26 +251,30 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,
{
continue;
}
auto ch_col = columns.getPhysical(col_name);
column_map.insert(std::make_pair(col_id, std::make_shared<ColTypePair>(ch_col.type->createColumn(), ch_col)));
column_map[col_id]->first->reserve(data_list.size());

{
auto ch_col = columns.getPhysical(col_name);
auto mut_col = ch_col.type->createColumn();
column_map.insert(col_id, std::move(mut_col), std::move(ch_col), i, data_list.size());
}

if (table_info.pk_is_handle && column_info.hasPriKeyFlag())
handle_col_id = col_id;
else
column_id_to_info_index.insert(std::make_pair(col_id, i));
}

if (column_names_to_read.size() - 3 != column_id_to_info_index.size())
if (column_names_to_read.size() - MustHaveColCnt != column_id_to_info_index.size())
throw Exception("schema doesn't contain needed columns.", ErrorCodes::LOGICAL_ERROR);

if (!table_info.pk_is_handle)
{
auto ch_col = columns.getPhysical(MutableSupport::tidb_pk_column_name);
column_map.insert(std::make_pair(handle_col_id, std::make_shared<ColTypePair>(ch_col.type->createColumn(), ch_col)));
column_map[handle_col_id]->first->reserve(data_list.size());
auto mut_col = ch_col.type->createColumn();
column_map.insert(handle_col_id, std::move(mut_col), std::move(ch_col), -1, data_list.size());
}

const TMTPKType pk_type = getTMTPKType(*column_map[handle_col_id]->second.type);
const TMTPKType pk_type = getTMTPKType(*column_map.getNameAndTypePair(handle_col_id).type);

if (pk_type == TMTPKType::UINT64)
ReorderRegionDataReadList(data_list);
Expand All @@ -296,26 +294,19 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,
break;
}

func(*delmark_col, *version_col, column_map[handle_col_id]->first, data_list, start_ts);
func(*delmark_col, *version_col, column_map.getMutableColumnPtr(handle_col_id), data_list, start_ts);
}

const size_t target_col_size = column_names_to_read.size() - 3;

Block block;
const size_t target_col_size = column_names_to_read.size() - MustHaveColCnt;

// optimize for only need handle, tso, delmark.
if (column_names_to_read.size() > 3)
if (column_names_to_read.size() > MustHaveColCnt)
{
google::dense_hash_set<ColumnID> decoded_col_ids_set;
decoded_col_ids_set.set_empty_key(EmptyColumnID);
DecodedRecordData decoded_data(column_id_to_info_index.size());

// TODO: optimize columns' insertion, use better implementation rather than Field, it's terrible.
DecodedRow additional_decoded_row;
std::vector<DecodedRow::const_iterator> decoded_col_iter;

/// Notice: iterator of std::vector will invalid after the capacity changed, so !!! must set the capacity of
/// additional_decoded_row big enough
additional_decoded_row.reserve(table_info.columns.size());

for (const auto & [handle, write_type, commit_ts, value_ptr] : data_list)
{
Expand All @@ -325,40 +316,36 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,
if (commit_ts > start_ts)
continue;

decoded_col_iter.clear();
additional_decoded_row.clear();
decoded_data.clear();

if (write_type == Region::DelFlag)
{
for (const auto & item : column_id_to_info_index)
{
const auto & column = table_info.columns[item.second];

additional_decoded_row.emplace_back(column.id, GenDecodeRow(column));
decoded_col_iter.emplace_back(additional_decoded_row.cend() - 1);
decoded_data.emplace_back(column.id, GenDecodeRow(column));
}
}
else
{
bool schema_matches = DecodeRow(
*value_ptr, column_id_to_info_index, schema_all_column_ids, additional_decoded_row, decoded_col_iter, force_decode);
bool schema_matches = DecodeRow(*value_ptr, column_id_to_info_index, schema_all_column_ids, decoded_data, force_decode);
if (!schema_matches && !force_decode)
return std::make_tuple(block, false);
return std::make_tuple(Block(), false);
}

/// Modify `row` by adding missing column values or removing useless column values.
if (unlikely(decoded_col_iter.size() > column_id_to_info_index.size()))
if (unlikely(decoded_data.size() > column_id_to_info_index.size()))
{
throw Exception("read unexpected columns.", ErrorCodes::LOGICAL_ERROR);
}

// redundant column values (column id not in current schema) has been dropped when decoding row
// this branch handles the case when the row doesn't contain all the needed column
if (decoded_col_iter.size() < column_id_to_info_index.size())
if (decoded_data.size() < column_id_to_info_index.size())
{
decoded_col_ids_set.clear_no_resize();
for (const auto & e : decoded_col_iter)
decoded_col_ids_set.insert(e->col_id);
for (size_t i = 0; i < decoded_data.size(); ++i)
decoded_col_ids_set.insert(decoded_data[i].col_id);

for (const auto & item : column_id_to_info_index)
{
Expand All @@ -367,26 +354,23 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,

const auto & column = table_info.columns[item.second];

additional_decoded_row.emplace_back(column.id,
decoded_data.emplace_back(column.id,
column.hasNoDefaultValueFlag() ? (column.hasNotNullFlag() ? GenDecodeRow(column) : Field())
: column.defaultValueToField());
decoded_col_iter.emplace_back(additional_decoded_row.cend() - 1);
}
}

if (decoded_col_iter.size() != target_col_size)
if (decoded_data.size() != target_col_size)
throw Exception("decode row error.", ErrorCodes::LOGICAL_ERROR);

/// Transform `row` to columnar format.
for (const auto & iter : decoded_col_iter)
for (size_t data_idx = 0; data_idx < decoded_data.size(); ++data_idx)
{
const ColumnID & col_id = iter->col_id;
const Field & field = iter->field;
const ColumnInfo & column_info = table_info.columns[column_id_to_info_index[col_id]];
const ColumnID & col_id = decoded_data[data_idx].col_id;
const Field & field = decoded_data[data_idx].field;

auto it = column_map.find(col_id);
if (it == column_map.end())
throw Exception("col_id not found in column_map", ErrorCodes::LOGICAL_ERROR);
auto & col_info = column_map[col_id];
const ColumnInfo & column_info = table_info.columns[ColumnDataInfoMap::getIndex(col_info)];

DatumFlat datum(field, column_info.tp);
const Field & unflattened = datum.field();
Expand All @@ -397,21 +381,23 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,
// Otherwise return false to outer, outer should sync schema and try again.
if (force_decode)
{
const auto & data_type = it->second->second.type;
const auto & data_type = ColumnDataInfoMap::getNameAndTypePair(col_info).type;
throw Exception("Detected overflow when decoding data " + std::to_string(unflattened.get<UInt64>()) + " of column "
+ column_info.name + " with type " + data_type->getName(),
ErrorCodes::LOGICAL_ERROR);
}

return std::make_tuple(block, false);
return std::make_tuple(Block(), false);
}
auto & mut_col = it->second->first;
auto & mut_col = ColumnDataInfoMap::getMutableColumnPtr(col_info);
mut_col->insert(unflattened);
}
}
}

decoded_data.checkValid();
}

Block block;
for (const auto & name : column_names_to_read)
{
if (name == MutableSupport::delmark_column_name)
Expand All @@ -424,11 +410,12 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,
}
else
{
Int64 col_id = table_info.getColumnID(name);
block.insert({std::move(column_map[col_id]->first), column_map[col_id]->second.type, name});
ColumnID col_id = table_info.getColumnID(name);
block.insert({std::move(column_map.getMutableColumnPtr(col_id)), column_map.getNameAndTypePair(col_id).type, name});
}
}

column_map.checkValid();
return std::make_tuple(std::move(block), true);
}

Expand Down
94 changes: 94 additions & 0 deletions dbms/src/Storages/Transaction/RegionBlockReaderHelper.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#include <sparsehash/dense_hash_map>
#include <sparsehash/dense_hash_set>

namespace DB
{

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}

struct ColumnDataInfoMap
{
using ColTypeInfo = std::tuple<MutableColumnPtr, NameAndTypePair, size_t>;
using ColTypeInfoData = std::vector<ColTypeInfo>;

ColumnDataInfoMap(const size_t cap, const ColumnID empty_id)
{
column_data.reserve(cap);
column_map.set_empty_key(empty_id);
ori_cap = column_data.capacity();
}

/// Notice: iterator of std::vector will invalid after the capacity changed, so !!! must set the capacity big enough
void checkValid() const
{
if (ori_cap != column_data.capacity())
throw Exception("ColumnDataInfoMap capacity changes", ErrorCodes::LOGICAL_ERROR);
}

void insert(const ColumnID col_id, MutableColumnPtr && ptr, NameAndTypePair && name_pair, size_t index, const size_t cap)
{
column_data.emplace_back(std::move(ptr), std::move(name_pair), index);
column_map.insert(std::make_pair(col_id, column_data.end() - 1));
getMutableColumnPtr(col_id)->reserve(cap);
}

MutableColumnPtr & getMutableColumnPtr(const ColumnID col_id) { return getMutableColumnPtr((*this)[col_id]); }
static MutableColumnPtr & getMutableColumnPtr(ColTypeInfo & info) { return std::get<0>(info); }

NameAndTypePair & getNameAndTypePair(const ColumnID col_id) { return getNameAndTypePair((*this)[col_id]); }
static NameAndTypePair & getNameAndTypePair(ColTypeInfo & info) { return std::get<1>(info); }

static size_t getIndex(const ColTypeInfo & info) { return std::get<2>(info); }

ColTypeInfo & operator[](const ColumnID col_id) { return *column_map[col_id]; }

private:
ColTypeInfoData column_data;
google::dense_hash_map<ColumnID, ColTypeInfoData::iterator> column_map;
size_t ori_cap;
};

struct DecodedRecordData
{
DecodedRecordData(const size_t cap)
{
additional_decoded_row.reserve(cap);
ori_cap = additional_decoded_row.capacity();
}

/// just like ColumnDataInfoMap::checkValid
void checkValid() const
{
if (ori_cap != additional_decoded_row.capacity())
throw Exception("DecodedRecordData capacity changes", ErrorCodes::LOGICAL_ERROR);
}

size_t size() const { return decoded_col_iter.size(); }

void clear()
{
additional_decoded_row.clear();
decoded_col_iter.clear();
}

const DecodedRow::value_type & operator[](const size_t index) const { return *decoded_col_iter[index]; }

template <class... _Args>
void emplace_back(_Args &&... __args)
{
additional_decoded_row.emplace_back(std::forward<_Args>(__args)...);
decoded_col_iter.emplace_back(additional_decoded_row.cend() - 1);
}

void push_back(const DecodedRow::const_iterator & iter) { decoded_col_iter.push_back(iter); }

private:
DecodedRow additional_decoded_row;
std::vector<DecodedRow::const_iterator> decoded_col_iter;
size_t ori_cap;
};

} // namespace DB
3 changes: 1 addition & 2 deletions dbms/src/Storages/Transaction/RegionCFDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ TableID RegionCFDataBase<Trait>::insert(const TableID table_id, std::pair<Key, V
{
auto & map = data[table_id];
auto [it, ok] = map.emplace(std::move(kv_pair));
std::ignore = it;
if (!ok)
throw Exception("Found existing key in hex: " + getTiKVKey(kv_pair.second).toHex(), ErrorCodes::LOGICAL_ERROR);
throw Exception("Found existing key in hex: " + getTiKVKey(it->second).toHex(), ErrorCodes::LOGICAL_ERROR);

if constexpr (std::is_same_v<Trait, RegionWriteCFDataTrait>)
extra.add(Trait::getRowRawValuePtr(it->second));
Expand Down
Loading