Skip to content

[FLASH-774] Refactor process about pre-decode/block-read to support fast codec #375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions dbms/src/Storages/Transaction/AtomicDecodedRow.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include <Storages/Transaction/AtomicDecodedRow.h>
#include <Storages/Transaction/DecodedRow.h>

namespace DB
{

template <>
AtomicDecodedRow<false>::~AtomicDecodedRow()
{
auto ptr = decoded.load();
if (ptr)
{
auto decoded_ptr = reinterpret_cast<DecodedRow *>(ptr);
delete decoded_ptr;
decoded = nullptr;
}
}

template <>
const DecodedRow * AtomicDecodedRow<false>::load() const
{
return reinterpret_cast<DecodedRow *>(decoded.load());
}

template <>
void AtomicDecodedRow<false>::atomicUpdate(DB::DecodedRow *& data) const
{
void * expected = nullptr;
if (!decoded.compare_exchange_strong(expected, (void *)data))
delete data;
data = nullptr;
}

} // namespace DB
37 changes: 37 additions & 0 deletions dbms/src/Storages/Transaction/AtomicDecodedRow.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once

#include <atomic>

#include <Storages/Transaction/Types.h>

namespace DB
{
class Field;
struct DecodedField;
struct DecodedRow;

template <bool is_key = false>
struct AtomicDecodedRow
{
~AtomicDecodedRow();

const DecodedRow * load() const;

void atomicUpdate(DecodedRow *& data) const;

AtomicDecodedRow() = default;

private:
AtomicDecodedRow(const AtomicDecodedRow &) = delete;
AtomicDecodedRow(AtomicDecodedRow &&) = delete;

private:
mutable std::atomic<void *> decoded{nullptr};
};

template <>
struct AtomicDecodedRow<true>
{
};

} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,40 @@ namespace DB
struct RegionDefaultCFDataTrait;
struct RegionWriteCFDataTrait;

using ExtraCFDataQueue = std::deque<std::shared_ptr<const TiKVValue>>;
using CFDataPreDecodeQueue = std::deque<std::shared_ptr<const TiKVValue>>;

template <typename Trait>
struct ExtraCFData
struct CFDataPreDecode
{
};

template <>
struct ExtraCFData<RegionDefaultCFDataTrait>
struct CFDataPreDecode<RegionDefaultCFDataTrait>
{
ExtraCFData() = default;
CFDataPreDecode() = default;

void add(const std::shared_ptr<const TiKVValue> & e) { queue.push_back(e); }

std::optional<ExtraCFDataQueue> popAll()
std::optional<CFDataPreDecodeQueue> popAll()
{
if (queue.empty())
return {};

ExtraCFDataQueue res;
CFDataPreDecodeQueue res;
queue.swap(res);
return res;
}

ExtraCFData(const ExtraCFData & src) = delete;
CFDataPreDecode(const CFDataPreDecode & src) = delete;

private:
ExtraCFDataQueue queue;
CFDataPreDecodeQueue queue;
};

template <>
struct ExtraCFData<RegionWriteCFDataTrait> : ExtraCFData<RegionDefaultCFDataTrait>
struct CFDataPreDecode<RegionWriteCFDataTrait> : CFDataPreDecode<RegionDefaultCFDataTrait>
{
using Base = ExtraCFData<RegionDefaultCFDataTrait>;
using Base = CFDataPreDecode<RegionDefaultCFDataTrait>;
void add(const std::shared_ptr<const TiKVValue> & e)
{
if (!e)
Expand Down
73 changes: 73 additions & 0 deletions dbms/src/Storages/Transaction/DecodedRow.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#pragma once

#include <Storages/Transaction/Codec.h>

namespace DB
{
class Field;
struct DecodedField;
using DecodedFields = std::vector<DecodedField>;

struct DecodedField : boost::noncopyable
{
Int64 col_id;
Field field;

DecodedField & operator=(DecodedField && e)
{
if (this == &e)
return *this;
col_id = e.col_id;
field = std::move(e.field);
return *this;
}
DecodedField(DecodedField && e) : col_id(e.col_id), field(std::move(e.field)) {}
DecodedField(const Int64 col_id_, Field && field_) : col_id(col_id_), field(std::move(field_)) {}

bool operator<(const DecodedField & e) const { return col_id < e.col_id; }
};

/// force decode TiKV value into row by a specific schema, if there is data can't be decoded, store it in unknown_fields.
struct DecodedRow : boost::noncopyable
{
// In old way, tidb encode each record like: (codec-flag, column-data), (codec-flag, column-data), ...
// we can use codec-flag to tell type of column. But, in new way, https://github.com/pingcap/tidb/pull/7597,
// there is codec-flag, and we should find type in schema by column id.
struct UnknownFields
{
// should be sorted by column id.
const DecodedFields fields;
// if there is no codec-flag (in tidb fast codec), field are all string and with_codec_flag is false.
const bool with_codec_flag;
};

DecodedRow(bool has_missing_columns_, DecodedFields && unknown_, bool has_codec_flag, DecodedFields && decoded_fields_)
: has_missing_columns(has_missing_columns_),
unknown_fields{std::move(unknown_), has_codec_flag},
decoded_fields(std::move(decoded_fields_))
{
if (!isSortedByColumnID(decoded_fields) || !isSortedByColumnID(unknown_fields.fields))
throw Exception(std::string(__PRETTY_FUNCTION__) + ": should be sorted by column id", ErrorCodes::LOGICAL_ERROR);
}

private:
static bool isSortedByColumnID(const DecodedFields & decoded_fields)
{
for (size_t i = 1; i < decoded_fields.size(); ++i)
{
if (decoded_fields[i - 1].col_id >= decoded_fields[i].col_id)
return false;
}
return true;
}

public:
// if decoded row doesn't contain column in schema.
const bool has_missing_columns;
// decoded column not in schema
const UnknownFields unknown_fields;
// decoded column in schema and default/null column. should be sorted by column id.
const DecodedFields decoded_fields;
};

} // namespace DB
53 changes: 26 additions & 27 deletions dbms/src/Storages/Transaction/PredecodeTiKVValue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,35 @@ void Region::tryPreDecodeTiKVValue(Context & context)
if (!storage)
return;

std::optional<ExtraCFDataQueue> default_val, write_val;
std::optional<CFDataPreDecodeQueue> default_val, write_val;
{
std::lock_guard<std::mutex> predecode_lock(predecode_mutex);
default_val = data.defaultCF().getExtra().popAll();
write_val = data.writeCF().getExtra().popAll();
default_val = data.defaultCF().getCFDataPreDecode().popAll();
write_val = data.writeCF().getCFDataPreDecode().popAll();
}

DB::tryPreDecodeTiKVValue(std::move(default_val), *storage);
DB::tryPreDecodeTiKVValue(std::move(write_val), *storage);
}

bool ValueDecodeHelper::forceDecodeTiKVValue(DecodedRow & decoded_row, DecodedRow & unknown)
bool ValueDecodeHelper::forceDecodeTiKVValue(DecodedFields & decoded_fields, DecodedFields & unknown)
{
bool schema_match = decoded_row.size() == schema_all_column_ids.size();
bool has_dropped_column = false;
bool schema_match = decoded_fields.size() == schema_all_column_ids.size();
bool has_missing_columns = false;

for (auto it = decoded_row.cbegin(); schema_match && it != decoded_row.cend(); ++it)
for (auto it = decoded_fields.cbegin(); schema_match && it != decoded_fields.cend(); ++it)
{
if (!schema_all_column_ids.count(it->col_id))
schema_match = false;
}

if (schema_match)
return has_dropped_column;
return has_missing_columns;

{
DecodedRow tmp_row;
tmp_row.reserve(decoded_row.size());
for (auto && item : decoded_row)
DecodedFields tmp_row;
tmp_row.reserve(decoded_fields.size());
for (auto && item : decoded_fields)
{
if (schema_all_column_ids.count(item.col_id))
tmp_row.emplace_back(std::move(item));
Expand All @@ -53,32 +53,32 @@ bool ValueDecodeHelper::forceDecodeTiKVValue(DecodedRow & decoded_row, DecodedRo
// must be sorted, for binary search.
::std::sort(tmp_row.begin(), tmp_row.end());
::std::sort(unknown.begin(), unknown.end());
tmp_row.swap(decoded_row);
tmp_row.swap(decoded_fields);
}

for (const auto & column_index : schema_all_column_ids)
{
const auto & column = table_info.columns[column_index.second];
if (auto it = findByColumnID(column.id, decoded_row); it != decoded_row.end())
if (auto it = findByColumnID(column.id, decoded_fields); it != decoded_fields.end())
continue;

if (column.hasNoDefaultValueFlag() && column.hasNotNullFlag())
{
has_dropped_column = true;
has_missing_columns = true;
break;
}
}

return has_dropped_column;
return has_missing_columns;
}

void ValueDecodeHelper::forceDecodeTiKVValue(const TiKVValue & value)
{
auto & decoded_row_info = value.extraInfo();
if (decoded_row_info.load())
auto & decoded_fields_info = value.getDecodedRow();
if (decoded_fields_info.load())
return;

DecodedRow decoded_row, unknown;
DecodedFields decoded_fields, unknown;
// TODO: support fast codec of TiDB
{
size_t cursor = 0;
Expand All @@ -90,25 +90,24 @@ void ValueDecodeHelper::forceDecodeTiKVValue(const TiKVValue & value)
if (f.isNull())
break;
ColumnID col_id = f.get<ColumnID>();
decoded_row.emplace_back(col_id, DecodeDatum(cursor, raw_value));
decoded_fields.emplace_back(col_id, DecodeDatum(cursor, raw_value));
}

if (cursor != raw_value.size())
throw Exception(std::string(__PRETTY_FUNCTION__) + ": cursor is not end", ErrorCodes::LOGICAL_ERROR);

{
// must be sorted, for binary search.
::std::sort(decoded_row.begin(), decoded_row.end());
::std::sort(decoded_fields.begin(), decoded_fields.end());
}

auto has_dropped_column = forceDecodeTiKVValue(decoded_row, unknown);
DecodedRowBySchema * decoded_row_ptr
= new DecodedRowBySchema(table_info.schema_version, has_dropped_column, std::move(decoded_row), std::move(unknown), true);
decoded_row_info.atomicUpdate(decoded_row_ptr);
auto has_missing_columns = forceDecodeTiKVValue(decoded_fields, unknown);
DecodedRow * decoded_fields_ptr = new DecodedRow(has_missing_columns, std::move(unknown), true, std::move(decoded_fields));
decoded_fields_info.atomicUpdate(decoded_fields_ptr);
}
}

void tryPreDecodeTiKVValue(std::optional<ExtraCFDataQueue> && values, StorageMergeTree & storage)
void tryPreDecodeTiKVValue(std::optional<CFDataPreDecodeQueue> && values, StorageMergeTree & storage)
{
if (!values)
return;
Expand Down Expand Up @@ -137,9 +136,9 @@ void tryPreDecodeTiKVValue(std::optional<ExtraCFDataQueue> && values, StorageMer
helper.forceDecodeTiKVValue(*value);
}

DecodedRow::const_iterator findByColumnID(Int64 col_id, const DecodedRow & row)
DecodedFields::const_iterator findByColumnID(Int64 col_id, const DecodedFields & row)
{
const static auto cmp = [](const DecodedRowElement & e, const Int64 cid) -> bool { return e.col_id < cid; };
const static auto cmp = [](const DecodedField & e, const Int64 cid) -> bool { return e.col_id < cid; };
auto it = std::lower_bound(row.cbegin(), row.cend(), col_id, cmp);
if (it != row.cend() && it->col_id == col_id)
return it;
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/Transaction/PredecodeTiKVValue.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include <Storages/Transaction/TiKVDecodedValue.h>
#include <Storages/Transaction/DecodedRow.h>
#include <Storages/Transaction/TiKVKeyValue.h>

#include <sparsehash/dense_hash_map>
Expand Down Expand Up @@ -29,11 +29,11 @@ struct ValueDecodeHelper
void forceDecodeTiKVValue(const TiKVValue & value);

private:
bool forceDecodeTiKVValue(DecodedRow & decoded_row, DecodedRow & unknown);
bool forceDecodeTiKVValue(DecodedFields & decoded_fields, DecodedFields & unknown);
};

using ExtraCFDataQueue = std::deque<std::shared_ptr<const TiKVValue>>;
void tryPreDecodeTiKVValue(std::optional<ExtraCFDataQueue> && values, StorageMergeTree & storage);
DecodedRow::const_iterator findByColumnID(const Int64 col_id, const DecodedRow & row);
using CFDataPreDecodeQueue = std::deque<std::shared_ptr<const TiKVValue>>;
void tryPreDecodeTiKVValue(std::optional<CFDataPreDecodeQueue> && values, StorageMergeTree & storage);
DecodedFields::const_iterator findByColumnID(const Int64 col_id, const DecodedFields & row);

} // namespace DB
12 changes: 6 additions & 6 deletions dbms/src/Storages/Transaction/RegionBlockReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,19 +246,19 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,
else
{
const TiKVValue & value = *value_ptr;
const DecodedRowBySchema * row = value.extraInfo().load();
const DecodedRow * row = value.getDecodedRow().load();
if (!row)
{
helper.forceDecodeTiKVValue(value);
row = value.extraInfo().load();
row = value.getDecodedRow().load();
}

const DecodedRow & id_fields = row->row;
const DecodedRow & unknown_col = row->unknown_data.row;
const DecodedFields & id_fields = row->decoded_fields;
const DecodedFields & unknown_col = row->unknown_fields.fields;

if (!force_decode)
{
if (row->has_dropped_column || !unknown_col.empty())
if (row->has_missing_columns || !unknown_col.empty())
return std::make_tuple(Block(), false);
}

Expand All @@ -274,7 +274,7 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,

if (auto it = findByColumnID(item.first, unknown_col); it != unknown_col.end())
{
if (likely(row->unknown_data.known_type))
if (likely(row->unknown_fields.with_codec_flag))
decoded_data.push_back(it);
else
{
Expand Down
Loading