Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/search/index_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ Status IndexManager::Create(engine::Context &ctx, std::unique_ptr<kqir::IndexInf
SearchKey index_key(info->ns, info->name);
auto cf = storage->GetCFHandle(ColumnFamilyID::Search);

auto batch = storage->GetWriteBatchBase();
auto batch = storage->GetWriteBatchBase(ctx);

std::string meta_val;
info->metadata.Encode(&meta_val);
Expand Down Expand Up @@ -244,7 +244,7 @@ Status IndexManager::Drop(engine::Context &ctx, std::string_view index_name, con
SearchKey index_key(info->ns, info->name);
auto cf = storage->GetCFHandle(ColumnFamilyID::Search);

auto batch = storage->GetWriteBatchBase();
auto batch = storage->GetWriteBatchBase(ctx);

auto s = batch->Delete(cf, index_key.ConstructIndexMeta());
if (!s.ok()) {
Expand Down
8 changes: 4 additions & 4 deletions src/search/indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ Status IndexUpdater::UpdateTagIndex(engine::Context &ctx, std::string_view key,
}

auto *storage = indexer->storage;
auto batch = storage->GetWriteBatchBase();
auto batch = storage->GetWriteBatchBase(ctx);
auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);

for (const auto &tag : tags_to_delete) {
Expand Down Expand Up @@ -256,7 +256,7 @@ Status IndexUpdater::UpdateNumericIndex(engine::Context &ctx, std::string_view k
CHECK(current.IsNull() || current.Is<kqir::Numeric>());

auto *storage = indexer->storage;
auto batch = storage->GetWriteBatchBase();
auto batch = storage->GetWriteBatchBase(ctx);
auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);

if (!original.IsNull()) {
Expand Down Expand Up @@ -295,14 +295,14 @@ Status IndexUpdater::UpdateHnswVectorIndex(engine::Context &ctx, std::string_vie
auto hnsw = HnswIndex(search_key, vector, storage);

if (!original.IsNull()) {
auto batch = storage->GetWriteBatchBase();
auto batch = storage->GetWriteBatchBase(ctx);
GET_OR_RET(hnsw.DeleteVectorEntry(ctx, key, batch));
auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
}

if (!current.IsNull()) {
auto batch = storage->GetWriteBatchBase();
auto batch = storage->GetWriteBatchBase(ctx);
GET_OR_RET(hnsw.InsertVectorEntry(ctx, key, current.Get<kqir::NumericArray>(), batch));
auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) return {Status::NotOK, s.ToString()};
Expand Down
6 changes: 3 additions & 3 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ rocksdb::Status Database::Expire(engine::Context &ctx, const Slice &user_key, ui
} else {
EncodeFixed32(value.data() + 1, Metadata::ExpireMsToS(timestamp));
}
auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisNone, {std::to_string(kRedisCmdExpire)});
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) {
Expand Down Expand Up @@ -168,7 +168,7 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice> &k
ns_keys.emplace_back(std::move(ns_key));
}

auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisNone);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) {
Expand Down Expand Up @@ -668,7 +668,7 @@ rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key, con

if (key == new_key) return rocksdb::Status::OK();

auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(type);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/redis_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ rocksdb::Status PubSub::Publish(engine::Context &ctx, const Slice &channel, cons
if (storage_->GetConfig()->IsSlave()) {
return rocksdb::Status::NotSupported("can't publish to db in slave mode");
}
auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
auto s = batch->Put(pubsub_cf_handle_, channel, value);
if (!s.ok()) {
return s;
Expand Down
24 changes: 12 additions & 12 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -714,13 +714,7 @@ rocksdb::Status Storage::writeToDB(engine::Context &ctx, const rocksdb::WriteOpt
}

if (ctx.txn_context_enabled) {
// Extract writes from the updates and append to the ctx.batch
if (ctx.batch == nullptr) {
ctx.batch = std::make_unique<rocksdb::WriteBatchWithIndex>();
}
WriteBatchIndexer handle(ctx);
auto s = updates->Iterate(&handle);
if (!s.ok()) return s;
CHECK(ctx.batch->GetWriteBatch() == updates);
} else {
CHECK(ctx.batch == nullptr);
}
Expand All @@ -730,7 +724,7 @@ rocksdb::Status Storage::writeToDB(engine::Context &ctx, const rocksdb::WriteOpt

rocksdb::Status Storage::Delete(engine::Context &ctx, const rocksdb::WriteOptions &options,
rocksdb::ColumnFamilyHandle *cf_handle, const rocksdb::Slice &key) {
auto batch = GetWriteBatchBase();
auto batch = GetWriteBatchBase(ctx);
auto s = batch->Delete(cf_handle, key);
if (!s.ok()) {
return s;
Expand All @@ -740,7 +734,7 @@ rocksdb::Status Storage::Delete(engine::Context &ctx, const rocksdb::WriteOption

rocksdb::Status Storage::DeleteRange(engine::Context &ctx, const rocksdb::WriteOptions &options,
rocksdb::ColumnFamilyHandle *cf_handle, Slice begin, Slice end) {
auto batch = GetWriteBatchBase();
auto batch = GetWriteBatchBase(ctx);
auto s = batch->DeleteRange(cf_handle, begin, end);
if (!s.ok()) {
return s;
Expand All @@ -760,7 +754,7 @@ rocksdb::Status Storage::FlushScripts(engine::Context &ctx, const rocksdb::Write
// didn't contain the end key.
end_key[end_key.size() - 1] += 1;

auto batch = GetWriteBatchBase();
auto batch = GetWriteBatchBase(ctx);
auto s = batch->DeleteRange(cf_handle, begin_key, end_key);
if (!s.ok()) {
return s;
Expand Down Expand Up @@ -975,10 +969,16 @@ Status Storage::CommitTxn() {
return {Status::NotOK, s.ToString()};
}

ObserverOrUniquePtr<rocksdb::WriteBatchBase> Storage::GetWriteBatchBase() {
ObserverOrUniquePtr<rocksdb::WriteBatchBase> Storage::GetWriteBatchBase(Context &ctx) {
if (is_txn_mode_) {
return ObserverOrUniquePtr<rocksdb::WriteBatchBase>(txn_write_batch_.get(), ObserverOrUnique::Observer);
}
if (ctx.txn_context_enabled) {
if (!ctx.batch) {
ctx.batch = std::make_unique<rocksdb::WriteBatchWithIndex>();
}
return ObserverOrUniquePtr<rocksdb::WriteBatchBase>(ctx.batch.get(), ObserverOrUnique::Observer);
}
return ObserverOrUniquePtr<rocksdb::WriteBatchBase>(
new rocksdb::WriteBatch(0 /*reserved_bytes*/, GetWriteBatchMaxBytes()), ObserverOrUnique::Unique);
}
Expand All @@ -987,7 +987,7 @@ Status Storage::WriteToPropagateCF(engine::Context &ctx, const std::string &key,
if (config_->IsSlave()) {
return {Status::NotOK, "cannot write to propagate column family in slave mode"};
}
auto batch = GetWriteBatchBase();
auto batch = GetWriteBatchBase(ctx);
auto cf = GetCFHandle(ColumnFamilyID::Propagate);
auto s = batch->Put(cf, key, value);
s = Write(ctx, default_write_opts_, batch->GetWriteBatch());
Expand Down
2 changes: 1 addition & 1 deletion src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ class Storage {

Status BeginTxn();
Status CommitTxn();
ObserverOrUniquePtr<rocksdb::WriteBatchBase> GetWriteBatchBase();
ObserverOrUniquePtr<rocksdb::WriteBatchBase> GetWriteBatchBase(Context &ctx);

Storage(const Storage &) = delete;
Storage &operator=(const Storage &) = delete;
Expand Down
6 changes: 3 additions & 3 deletions src/types/redis_bitmap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ rocksdb::Status Bitmap::SetBit(engine::Context &ctx, const Slice &user_key, uint
auto *data_ptr = reinterpret_cast<uint8_t *>(value.data());
*old_bit = util::lsb::GetBit(data_ptr, bit_offset_in_segment);
util::lsb::SetBitTo(data_ptr, bit_offset_in_segment, new_bit);
auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisBitmap, {std::to_string(kRedisCmdSetBit), std::to_string(bit_offset)});
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
Expand Down Expand Up @@ -483,7 +483,7 @@ rocksdb::Status Bitmap::BitOp(engine::Context &ctx, BitOpFlags op_flag, const st
}
size_t num_keys = meta_pairs.size();

auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
if (max_bitmap_size == 0) {
/* Compute the bit operation, if all bitmap is empty. cleanup the dest bitmap. */
auto s = batch->Delete(metadata_cf_handle_, ns_key);
Expand Down Expand Up @@ -850,7 +850,7 @@ rocksdb::Status Bitmap::bitfield(engine::Context &ctx, const Slice &user_key, co

if constexpr (!ReadOnly) {
// Write changes into storage.
auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
if (bitfieldWriteAheadLog(batch, ops)) {
auto s = cache.BatchForFlush(batch);
if (!s.ok()) {
Expand Down
4 changes: 2 additions & 2 deletions src/types/redis_bitmap_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ rocksdb::Status BitmapString::SetBit(engine::Context &ctx, const Slice &ns_key,

*raw_value = raw_value->substr(0, header_offset);
raw_value->append(string_value);
auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisString);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) {
Expand Down Expand Up @@ -260,7 +260,7 @@ rocksdb::Status BitmapString::Bitfield(engine::Context &ctx, const Slice &ns_key

raw_value->resize(header_offset);
raw_value->append(string_value);
auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisString);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) {
Expand Down
4 changes: 2 additions & 2 deletions src/types/redis_bloom_chain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ rocksdb::Status BloomChain::createBloomChain(engine::Context &ctx, const Slice &

auto [block_split_bloom_filter, _] = CreateBlockSplitBloomFilter(metadata->bloom_bytes);

auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisBloomFilter, {"createBloomChain"});
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
Expand Down Expand Up @@ -176,7 +176,7 @@ rocksdb::Status BloomChain::InsertCommon(engine::Context &ctx, const Slice &user
getItemHashList(items, &item_hash_list);

uint64_t origin_size = metadata.size;
auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisBloomFilter, {"insert"});
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
Expand Down
8 changes: 4 additions & 4 deletions src/types/redis_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ rocksdb::Status Hash::IncrBy(engine::Context &ctx, const Slice &user_key, const
}

*new_value = old_value + increment;
auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisHash);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
Expand Down Expand Up @@ -140,7 +140,7 @@ rocksdb::Status Hash::IncrByFloat(engine::Context &ctx, const Slice &user_key, c
}

*new_value = n;
auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisHash);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
Expand Down Expand Up @@ -205,7 +205,7 @@ rocksdb::Status Hash::Delete(engine::Context &ctx, const Slice &user_key, const
std::string ns_key = AppendNamespacePrefix(user_key);

HashMetadata metadata(false);
auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisHash);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
Expand Down Expand Up @@ -252,7 +252,7 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st
ttl_updated = true;
}
int added = 0;
auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisHash);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
Expand Down
4 changes: 2 additions & 2 deletions src/types/redis_hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ rocksdb::Status HyperLogLog::Add(engine::Context &ctx, const Slice &user_key,
return s;
}

auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisHyperLogLog);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
Expand Down Expand Up @@ -252,7 +252,7 @@ rocksdb::Status HyperLogLog::Merge(engine::Context &ctx, const Slice &dest_user_
s = mergeUserKeys(ctx, all_user_keys, &registers);
}

auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisHyperLogLog);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
Expand Down
6 changes: 3 additions & 3 deletions src/types/redis_json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
namespace redis {

rocksdb::Status Json::write(engine::Context &ctx, Slice ns_key, JsonMetadata *metadata, const JsonValue &json_val) {
auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisJson);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
Expand Down Expand Up @@ -94,7 +94,7 @@ rocksdb::Status Json::create(engine::Context &ctx, const std::string &ns_key, Js
}

rocksdb::Status Json::del(engine::Context &ctx, const Slice &ns_key) {
auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisJson);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
Expand Down Expand Up @@ -554,7 +554,7 @@ rocksdb::Status Json::MSet(engine::Context &ctx, const std::vector<std::string>
ns_keys.emplace_back(std::move(ns_key));
}

auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisJson);

// A single JSON key may be modified multiple times in the MSET command,
Expand Down
16 changes: 8 additions & 8 deletions src/types/redis_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ rocksdb::Status List::push(engine::Context &ctx, const Slice &user_key, const st
std::string ns_key = AppendNamespacePrefix(user_key);

ListMetadata metadata;
auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
RedisCommand cmd = left ? kRedisCmdLPush : kRedisCmdRPush;
WriteBatchLogData log_data(kRedisList, {std::to_string(cmd)});
auto s = batch->PutLogData(log_data.Encode());
Expand Down Expand Up @@ -112,7 +112,7 @@ rocksdb::Status List::PopMulti(engine::Context &ctx, const rocksdb::Slice &user_
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;

auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
RedisCommand cmd = left ? kRedisCmdLPop : kRedisCmdRPop;
WriteBatchLogData log_data(kRedisList, {std::to_string(cmd)});
s = batch->PutLogData(log_data.Encode());
Expand Down Expand Up @@ -210,7 +210,7 @@ rocksdb::Status List::Rem(engine::Context &ctx, const Slice &user_key, int count
return rocksdb::Status::NotFound();
}

auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisList, {std::to_string(kRedisCmdLRem), std::to_string(count), elem.ToString()});
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
Expand Down Expand Up @@ -298,7 +298,7 @@ rocksdb::Status List::Insert(engine::Context &ctx, const Slice &user_key, const
return rocksdb::Status::NotFound();
}

auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisList,
{std::to_string(kRedisCmdLInsert), before ? "1" : "0", pivot.ToString(), elem.ToString()});
s = batch->PutLogData(log_data.Encode());
Expand Down Expand Up @@ -476,7 +476,7 @@ rocksdb::Status List::Set(engine::Context &ctx, const Slice &user_key, int index
}
if (value == elem) return rocksdb::Status::OK();

auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisList, {std::to_string(kRedisCmdLSet), std::to_string(index)});
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
Expand Down Expand Up @@ -525,7 +525,7 @@ rocksdb::Status List::lmoveOnSingleList(engine::Context &ctx, const rocksdb::Sli
return rocksdb::Status::OK();
}

auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisList, {std::to_string(kRedisCmdLMove), src.ToString(), src.ToString(),
src_left ? "left" : "right", dst_left ? "left" : "right"});
s = batch->PutLogData(log_data.Encode());
Expand Down Expand Up @@ -576,7 +576,7 @@ rocksdb::Status List::lmoveOnTwoLists(engine::Context &ctx, const rocksdb::Slice

elem->clear();

auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisList, {std::to_string(kRedisCmdLMove), src.ToString(), dst.ToString(),
src_left ? "left" : "right", dst_left ? "left" : "right"});
s = batch->PutLogData(log_data.Encode());
Expand Down Expand Up @@ -642,7 +642,7 @@ rocksdb::Status List::Trim(engine::Context &ctx, const Slice &user_key, int star
}
if (start < 0) start = 0;

auto batch = storage_->GetWriteBatchBase();
auto batch = storage_->GetWriteBatchBase(ctx);
WriteBatchLogData log_data(kRedisList, std::vector<std::string>{std::to_string(kRedisCmdLTrim), std::to_string(start),
std::to_string(stop)});
s = batch->PutLogData(log_data.Encode());
Expand Down
Loading
Loading