Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ConcurrentSampler #37

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.14)

project(babylon VERSION 1.2.0)
project(babylon VERSION 1.2.1)

include(CTest) # for BUILD_TESTING option
include(CMakePackageConfigHelpers) # for write_basic_package_version_file
Expand All @@ -16,8 +16,8 @@ if(BUILD_DEPS)
)
FetchContent_Declare(
protobuf
URL "https://github.com/protocolbuffers/protobuf/archive/refs/tags/v25.3.tar.gz"
URL_HASH SHA256=d19643d265b978383352b3143f04c0641eea75a75235c111cc01a1350173180e
URL "https://github.com/protocolbuffers/protobuf/archive/refs/tags/v27.2.tar.gz"
URL_HASH SHA256=e4ff2aeb767da6f4f52485c2e72468960ddfe5262483879ef6ad552e52757a77
)
FetchContent_Declare(
boost
Expand Down
4 changes: 2 additions & 2 deletions MODULE.bazel
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module(
name = 'babylon',
version = '1.2.0',
version = '1.2.1',
compatibility_level = 1,
)

Expand Down Expand Up @@ -29,7 +29,7 @@ bazel_dep(name = 'googletest', version = '1.14.0', repo_name = 'com_google_googl

# --registry=file://%workspace%/registry
# protobuf 25.3 is not officially support in BCR
single_version_override(module_name = 'protobuf', version = '27.1')
single_version_override(module_name = 'protobuf', version = '27.2')
# rules_cuda latest release 0.2.1 is too old and do not have auto detect feature
bazel_dep(name = 'rules_cuda', version = '0.2.2-dev', dev_dependency = True)

Expand Down
2 changes: 1 addition & 1 deletion example/use-arena-with-brpc/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ int main(int argc, char* argv[]) {
stub.Echo(controller, &request, response,
::google::protobuf::NewCallback(finish, response, controller));
auto use_us = ::butil::gettimeofday_us() - begin_us;

if (use_us < expected_us) {
usleep(expected_us - use_us);
}
Expand Down
6 changes: 4 additions & 2 deletions example/use-arena-with-brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ class EchoServiceImpl : public EchoService {
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
response->mutable_payload()->CopyFrom(request->payload());
LOG_EVERY_SECOND(INFO) << "Request SpaceUsedLong = " << request->SpaceUsedLong()
<< " Response SpaceUsedLong = " << response->SpaceUsedLong();
LOG_EVERY_SECOND(INFO) << "Request SpaceUsedLong = "
<< request->SpaceUsedLong()
<< " Response SpaceUsedLong = "
<< response->SpaceUsedLong();
}
};
} // namespace example
Expand Down
5 changes: 5 additions & 0 deletions example/use-counter-with-bvar/.bazelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
common --registry=https://bcr.bazel.build
common --registry=https://baidu.github.io/babylon/registry
common --registry=https://raw.githubusercontent.com/bazelboost/registry/main

build --compilation_mode opt --cxxopt=-std=c++17
1 change: 1 addition & 0 deletions example/use-counter-with-bvar/.bazelversion
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
7.1.2
20 changes: 20 additions & 0 deletions example/use-counter-with-bvar/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
cc_library(
name = 'recorder',
srcs = ['recorder.cpp', 'recorder.trick.cpp'],
hdrs = ['recorder.h'],
copts = ['-fno-access-control'],
deps = [
'@brpc//:bvar',
'@babylon//:concurrent_counter',
],
)

cc_binary(
name = 'example',
srcs = ['example.cpp'],
deps = [
':recorder',
'@brpc',
'@tcmalloc//tcmalloc',
],
)
4 changes: 4 additions & 0 deletions example/use-counter-with-bvar/MODULE.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
bazel_dep(name = 'babylon', version = '1.2.0')
bazel_dep(name = 'brpc', version = '1.9.0')
bazel_dep(name = 'tcmalloc', version = '0.0.0-20240411-5ed309d')
single_version_override(module_name = 'protobuf', version = '25.3')
71 changes: 71 additions & 0 deletions example/use-counter-with-bvar/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Use arena for brpc

brpc在调用用户的service前,需要在内部先完成Request和Response的实例构建和以及前后对应的正反序列化。对应的代码实现在相应的Protocol中,默认的方式实例采用动态堆内存分配模式创建,对于比较复杂的结构,内存分配释放和Message结构的构建和析构可能也会带来可见的开销。

利用babylon::SwissMemoryResource可以将堆内存分配该用Arena机制分配到内存池上,降低内存分配的成本且提升局部性。进一步使用babylon::ReusableManager可以在保持内存池聚集分配的局部性的同时,进一步通过尽可能复用Message结构降低构建和析构的开销。

下面实现了一个集成了对应功能的brpc::Protocol,演示了响应功能的使用方式,并配套对应的例子来演示性能对比。相应的Protocol实际也在baidu内部广泛使用,预期可以支持在生产环境直接使用。

## 示例构成

- `:reusable_rpc_protocol`: 独立的brpc:Protocol实现,集成了内存复用和实例复用的功能
- `reusable_rpc_protocol.h`&`reusable_rpc_protocol.cpp`: 独立逻辑,和对应brpc版本无关
- `reusable_rpc_protocol.trick.cpp`: 拷贝自`src/brpc/policy/baidu_rpc_protocol.cpp`并进行简要修改
- `:client`&`:server`: 模拟比较复杂的Message演示性能对比

## 使用手册

```
#include "reusable_rpc_protocol.h"

// 向brpc注册新的protocol,默认使用
// protocol type = 72
// protocol name = "baidu_std_reuse"
if (0 != ::babylon::ReusableRPCProtocol::register_protocol()) {
// 注册protocol失败
}
// 返回失败很可能因为type冲突,可以更换type和name
if (0 != ::babylon::ReusableRPCProtocol::register_protocol(type, name)) {
// 注册protocol失败
}

// ReusableRPCProtocol协议和baidu_std相同,注册后,默认依然会走baidu_std
// 需要通过显式在option中指定来启用
::baidu::rpc::ServerOptions options;
options.enabled_protocols = "baidu_std_reuse";

// 下面正常注册服务,启动服务器即可
class SomeServiceImpl : public SomeService {
public:
virtual void some_method(::google::protobuf::RpcController* controller,
const SomeRequest* request,
SomeResponse* response,
::google::protobuf::Closure* done) {
... // 正常进行业务处理,对应的request和response已经改用内存池或者实例复用托管了
}
};

// 影响运行时的flag
// --babylon_rpc_full_reuse,是否启用实例重用,默认false
// --babylon_rpc_closure_cache_num,内存池和ReusableManager实例本身也会通过对象池复用,设置对象池大小
// --babylon_rpc_page_size,内存池单页大小,超过单页大小的申请会直接改为动态申请
// --babylon_rpc_page_cache_num,内存池页本身通过对象池复用,设置对象池大小
```

## 性能演示

CPU: AMD EPYC 7W83 64-Core Processor, taskset 0-3 core
QPS: 750

- 原始模式
- latency_percentiles: "[1923,2222,2944,3447]"
- process_cpu_usage : 1.489
- process_memory_resident : 59244544
- `--use_arena`模式
- latency_percentiles: "[1378,1607,2263,2716]"
- process_cpu_usage : 0.695
- process_memory_resident : 54255616
- `--use_arena`&`--babylon_rpc_full_reuse`模式
- latency_percentiles: "[1096,1256,1612,1938]"
- process_cpu_usage : 0.612
- process_memory_resident : 101576704
4 changes: 4 additions & 0 deletions example/use-counter-with-bvar/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh
set -ex

bazel build example
145 changes: 145 additions & 0 deletions example/use-counter-with-bvar/example.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#include "brpc/server.h"
#include "gflags/gflags.h"
#include "recorder.h"

#include <random>
#include <thread>

DEFINE_int32(dummy_port, 8000, "TCP Port of this dummy server");
DEFINE_uint64(concurrency, 4, "Concurrent counting thread num");
DEFINE_uint64(vars, 10, "Counting bvar num");
DEFINE_bool(use_counter, false, "use babylon counter implemented bvar");
DEFINE_string(mode, "latency_recorder",
"adder/maxer/int_recorder/latency_recorder");

template <typename T>
void __attribute__((noinline)) work(T& var, uint32_t value) {
var << value;
}

template <typename S>
void run_loop(::std::string prefix) {
::std::vector<::std::unique_ptr<S>> vec;
for (size_t i = 0; i < FLAGS_vars; ++i) {
auto s = ::std::make_unique<S>();
s->expose("test-" + prefix + "-" + ::std::to_string(i));
vec.emplace_back(::std::move(s));
}

::bvar::LatencyRecorder latency("test-" + prefix);
::std::vector<::std::thread> threads;
for (size_t i = 0; i < FLAGS_concurrency; ++i) {
threads.emplace_back([&] {
::std::mt19937_64 gen {::std::random_device {}()};
::std::normal_distribution<> dis(600, 100);
while (true) {
auto begin = ::butil::cpuwide_time_ns();
auto v = dis(gen);
for (size_t i = 0; i < 1000; ++i) {
for (auto& s : vec) {
work(s->var, v);
}
}
auto use = (::butil::cpuwide_time_ns() - begin) / 1000 / vec.size();
latency << use;
}
});
}
usleep(1000000000);
}

template <typename V>
void run(::std::string prefix) {
struct S {
void expose(::std::string name) {
win.expose(name);
}
V var;
::bvar::Window<V, ::bvar::SERIES_IN_SECOND> win {&var, -1};
};
run_loop<S>(prefix);
}

template <>
void run<::bvar::LatencyRecorder>(::std::string prefix) {
struct S {
void expose(::std::string name) {
var.expose(name);
}
::bvar::LatencyRecorder var;
};
run_loop<S>(prefix);
}

template <>
void run<::babylon::BvarLatencyRecorder>(::std::string prefix) {
struct S {
void expose(::std::string name) {
var.expose(name);
}
::babylon::BvarLatencyRecorder var;
};
run_loop<S>(prefix);
}

int main(int argc, char* argv[]) {
::gflags::ParseCommandLineFlags(&argc, &argv, true);

::brpc::StartDummyServerAt(FLAGS_dummy_port);

if (FLAGS_mode == "adder") {
if (FLAGS_use_counter) {
run<::babylon::BvarAdder>("babylon");
} else {
run<::bvar::Adder<ssize_t>>("bvar");
}
} else if (FLAGS_mode == "maxer") {
if (FLAGS_use_counter) {
run<::babylon::BvarMaxer>("babylon");
} else {
run<::bvar::Maxer<ssize_t>>("bvar");
}
} else if (FLAGS_mode == "int_recorder") {
if (FLAGS_use_counter) {
run<::babylon::BvarIntRecorder>("babylon");
} else {
run<::bvar::IntRecorder>("bvar");
}
} else if (FLAGS_mode == "latency_recorder") {
if (FLAGS_use_counter) {
run<::babylon::BvarLatencyRecorder>("babylon");
} else {
run<::bvar::LatencyRecorder>("bvar");
}
}

/*
::babylon::BvarAdder adder;
adder.expose("xxxx_adder");

::bvar::Window<::babylon::BvarAdder, ::bvar::SERIES_IN_SECOND> adder_window {
&adder, -1};
adder_window.expose("xxxx_adder_win");

::babylon::BvarMaxer maxer;
maxer.expose("xxxx_maxer");

::bvar::Window<::babylon::BvarMaxer, ::bvar::SERIES_IN_SECOND> maxer_window {
&maxer, -1};
maxer_window.expose("xxxx_maxer_win");

::babylon::BvarIntRecorder int_recorder;
int_recorder.expose("xxxx_int_recorder");

::bvar::Window<::babylon::BvarIntRecorder, ::bvar::SERIES_IN_SECOND>
int_recorder_window {&int_recorder, -1};
int_recorder_window.expose("xxxx_int_recorder_win");

::babylon::BvarPercentile percentile;
::bvar::Window<::babylon::BvarPercentile, ::bvar::SERIES_IN_SECOND>
percentile_window {&percentile, -1};
percentile_window.expose("xxxx_percentile_win");
*/

return 0;
}
Loading