Skip to content

Commit

Permalink
[FLASH-774] Refactor process about pre-decode/block-read to support f…
Browse files Browse the repository at this point in the history
…ast codec (#375)

* refactor process about pre-decode/block-read to support fast codec

* bug fix: if column can be null and does not exist, flush process will sync schema frequently
  • Loading branch information
solotzg authored Jan 8, 2020
1 parent a856ac9 commit 7894fe8
Show file tree
Hide file tree
Showing 18 changed files with 438 additions and 281 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Raft/RaftService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ RaftService::RaftService(DB::Context & db_context_)
region = it->second;
regions_to_decode.erase(it);
}
region->tryPreDecodeTiKVValue();
region->tryPreDecodeTiKVValue(db_context);
return true;
});

Expand Down
34 changes: 34 additions & 0 deletions dbms/src/Storages/Transaction/AtomicDecodedRow.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include <Storages/Transaction/AtomicDecodedRow.h>
#include <Storages/Transaction/DecodedRow.h>

namespace DB
{

template <>
AtomicDecodedRow<false>::~AtomicDecodedRow()
{
auto ptr = decoded.load();
if (ptr)
{
auto decoded_ptr = reinterpret_cast<DecodedRow *>(ptr);
delete decoded_ptr;
decoded = nullptr;
}
}

template <>
const DecodedRow * AtomicDecodedRow<false>::load() const
{
return reinterpret_cast<DecodedRow *>(decoded.load());
}

template <>
void AtomicDecodedRow<false>::atomicUpdate(DB::DecodedRow *& data) const
{
void * expected = nullptr;
if (!decoded.compare_exchange_strong(expected, (void *)data))
delete data;
data = nullptr;
}

} // namespace DB
37 changes: 37 additions & 0 deletions dbms/src/Storages/Transaction/AtomicDecodedRow.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once

#include <atomic>

#include <Storages/Transaction/Types.h>

namespace DB
{
class Field;
struct DecodedField;
struct DecodedRow;

template <bool is_key = false>
struct AtomicDecodedRow
{
~AtomicDecodedRow();

const DecodedRow * load() const;

void atomicUpdate(DecodedRow *& data) const;

AtomicDecodedRow() = default;

private:
AtomicDecodedRow(const AtomicDecodedRow &) = delete;
AtomicDecodedRow(AtomicDecodedRow &&) = delete;

private:
mutable std::atomic<void *> decoded{nullptr};
};

template <>
struct AtomicDecodedRow<true>
{
};

} // namespace DB
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <deque>

#include <Storages/Transaction/TiKVKeyValue.h>

namespace DB
Expand All @@ -8,40 +10,40 @@ namespace DB
struct RegionDefaultCFDataTrait;
struct RegionWriteCFDataTrait;

using ExtraCFDataQueue = std::deque<std::shared_ptr<const TiKVValue>>;
using CFDataPreDecodeQueue = std::deque<std::shared_ptr<const TiKVValue>>;

template <typename Trait>
struct ExtraCFData
struct CFDataPreDecode
{
};

template <>
struct ExtraCFData<RegionDefaultCFDataTrait>
struct CFDataPreDecode<RegionDefaultCFDataTrait>
{
ExtraCFData() = default;
CFDataPreDecode() = default;

void add(const std::shared_ptr<const TiKVValue> & e) { queue.push_back(e); }

std::optional<ExtraCFDataQueue> popAll()
std::optional<CFDataPreDecodeQueue> popAll()
{
if (queue.empty())
return {};

ExtraCFDataQueue res;
CFDataPreDecodeQueue res;
queue.swap(res);
return res;
}

ExtraCFData(const ExtraCFData & src) = delete;
CFDataPreDecode(const CFDataPreDecode & src) = delete;

private:
ExtraCFDataQueue queue;
CFDataPreDecodeQueue queue;
};

template <>
struct ExtraCFData<RegionWriteCFDataTrait> : ExtraCFData<RegionDefaultCFDataTrait>
struct CFDataPreDecode<RegionWriteCFDataTrait> : CFDataPreDecode<RegionDefaultCFDataTrait>
{
using Base = ExtraCFData<RegionDefaultCFDataTrait>;
using Base = CFDataPreDecode<RegionDefaultCFDataTrait>;
void add(const std::shared_ptr<const TiKVValue> & e)
{
if (!e)
Expand Down
73 changes: 73 additions & 0 deletions dbms/src/Storages/Transaction/DecodedRow.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#pragma once

#include <Storages/Transaction/Codec.h>

namespace DB
{
class Field;
struct DecodedField;
using DecodedFields = std::vector<DecodedField>;

struct DecodedField : boost::noncopyable
{
Int64 col_id;
Field field;

DecodedField & operator=(DecodedField && e)
{
if (this == &e)
return *this;
col_id = e.col_id;
field = std::move(e.field);
return *this;
}
DecodedField(DecodedField && e) : col_id(e.col_id), field(std::move(e.field)) {}
DecodedField(const Int64 col_id_, Field && field_) : col_id(col_id_), field(std::move(field_)) {}

bool operator<(const DecodedField & e) const { return col_id < e.col_id; }
};

/// force decode TiKV value into row by a specific schema, if there is data can't be decoded, store it in unknown_fields.
struct DecodedRow : boost::noncopyable
{
// In old way, tidb encode each record like: (codec-flag, column-data), (codec-flag, column-data), ...
// we can use codec-flag to tell type of column. But, in new way, https://github.com/pingcap/tidb/pull/7597,
// there is no codec-flag, and we should find type in schema by column id.
struct UnknownFields
{
// should be sorted by column id.
const DecodedFields fields;
// if there is no codec-flag (in tidb fast codec), field are all string and with_codec_flag is false.
const bool with_codec_flag;
};

DecodedRow(bool has_missing_columns_, DecodedFields && unknown_, bool has_codec_flag, DecodedFields && decoded_fields_)
: has_missing_columns(has_missing_columns_),
unknown_fields{std::move(unknown_), has_codec_flag},
decoded_fields(std::move(decoded_fields_))
{
if (!isSortedByColumnID(decoded_fields) || !isSortedByColumnID(unknown_fields.fields))
throw Exception(std::string(__PRETTY_FUNCTION__) + ": should be sorted by column id", ErrorCodes::LOGICAL_ERROR);
}

private:
static bool isSortedByColumnID(const DecodedFields & decoded_fields)
{
for (size_t i = 1; i < decoded_fields.size(); ++i)
{
if (decoded_fields[i - 1].col_id >= decoded_fields[i].col_id)
return false;
}
return true;
}

public:
// if decoded row doesn't contain column in schema.
const bool has_missing_columns;
// decoded column not in schema
const UnknownFields unknown_fields;
// decoded column in schema and default/null column. should be sorted by column id.
const DecodedFields decoded_fields;
};

} // namespace DB
148 changes: 148 additions & 0 deletions dbms/src/Storages/Transaction/PredecodeTiKVValue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
#include <Storages/StorageMergeTree.h>
#include <Storages/Transaction/PredecodeTiKVValue.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/TMTContext.h>

namespace DB
{

void Region::tryPreDecodeTiKVValue(Context & context)
{
auto table_id = getMappedTableID();
auto & tmt = context.getTMTContext();
auto storage = tmt.getStorages().get(table_id);
if (!storage)
return;

std::optional<CFDataPreDecodeQueue> default_val, write_val;
{
std::lock_guard<std::mutex> predecode_lock(predecode_mutex);
default_val = data.defaultCF().getCFDataPreDecode().popAll();
write_val = data.writeCF().getCFDataPreDecode().popAll();
}

DB::tryPreDecodeTiKVValue(std::move(default_val), *storage);
DB::tryPreDecodeTiKVValue(std::move(write_val), *storage);
}

bool ValueDecodeHelper::forceDecodeTiKVValue(DecodedFields & decoded_fields, DecodedFields & unknown)
{
bool schema_match = decoded_fields.size() == schema_all_column_ids.size();
bool has_missing_columns = false;

for (auto it = decoded_fields.cbegin(); schema_match && it != decoded_fields.cend(); ++it)
{
if (!schema_all_column_ids.count(it->col_id))
schema_match = false;
}

if (schema_match)
return has_missing_columns;

{
DecodedFields tmp_row;
tmp_row.reserve(decoded_fields.size());
for (auto && item : decoded_fields)
{
if (schema_all_column_ids.count(item.col_id))
tmp_row.emplace_back(std::move(item));
else
unknown.emplace_back(std::move(item));
}

// must be sorted, for binary search.
::std::sort(tmp_row.begin(), tmp_row.end());
::std::sort(unknown.begin(), unknown.end());
tmp_row.swap(decoded_fields);
}

for (const auto & column_index : schema_all_column_ids)
{
const auto & column = table_info.columns[column_index.second];
if (auto it = findByColumnID(column.id, decoded_fields); it != decoded_fields.end())
continue;

if (column.hasNoDefaultValueFlag() && column.hasNotNullFlag())
{
has_missing_columns = true;
break;
}
}

return has_missing_columns;
}

void ValueDecodeHelper::forceDecodeTiKVValue(const TiKVValue & value)
{
auto & decoded_fields_info = value.getDecodedRow();
if (decoded_fields_info.load())
return;

DecodedFields decoded_fields, unknown;
// TODO: support fast codec of TiDB
{
size_t cursor = 0;

const auto & raw_value = value.getStr();
while (cursor < raw_value.size())
{
Field f = DecodeDatum(cursor, raw_value);
if (f.isNull())
break;
ColumnID col_id = f.get<ColumnID>();
decoded_fields.emplace_back(col_id, DecodeDatum(cursor, raw_value));
}

if (cursor != raw_value.size())
throw Exception(std::string(__PRETTY_FUNCTION__) + ": cursor is not end", ErrorCodes::LOGICAL_ERROR);

{
// must be sorted, for binary search.
::std::sort(decoded_fields.begin(), decoded_fields.end());
}

auto has_missing_columns = forceDecodeTiKVValue(decoded_fields, unknown);
DecodedRow * decoded_fields_ptr = new DecodedRow(has_missing_columns, std::move(unknown), true, std::move(decoded_fields));
decoded_fields_info.atomicUpdate(decoded_fields_ptr);
}
}

void tryPreDecodeTiKVValue(std::optional<CFDataPreDecodeQueue> && values, StorageMergeTree & storage)
{
if (!values)
return;

auto table_lock = storage.lockStructure(false, __PRETTY_FUNCTION__);

const auto & table_info = storage.getTableInfo();
ColumnIdToIndex schema_all_column_ids;
ColumnIds decoded_col_ids_set;

{
schema_all_column_ids.set_empty_key(EmptyColumnID);
schema_all_column_ids.set_deleted_key(DeleteColumnID);
decoded_col_ids_set.set_empty_key(EmptyColumnID);
for (size_t i = 0; i < table_info.columns.size(); ++i)
{
auto & column = table_info.columns[i];
if (table_info.pk_is_handle && column.hasPriKeyFlag())
continue;
schema_all_column_ids.insert({column.id, i});
}
}

ValueDecodeHelper helper{table_info, schema_all_column_ids};
for (const auto & value : *values)
helper.forceDecodeTiKVValue(*value);
}

DecodedFields::const_iterator findByColumnID(Int64 col_id, const DecodedFields & row)
{
const static auto cmp = [](const DecodedField & e, const Int64 cid) -> bool { return e.col_id < cid; };
auto it = std::lower_bound(row.cbegin(), row.cend(), col_id, cmp);
if (it != row.cend() && it->col_id == col_id)
return it;
return row.cend();
}

} // namespace DB
Loading

0 comments on commit 7894fe8

Please sign in to comment.