Skip to content
Closed
4 changes: 3 additions & 1 deletion agents/grpc/proto/asset.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ message Asset {
google.protobuf.Struct metadata = 3;
string data = 4;
bool complete = 5;
uint64 duration = 6;
uint64 duration = 6; // Duration of the profile in milliseconds
double start_ts = 7; // Timestamp when the profile was taken in milliseconds from epoch
double end_ts = 8; // Timestamp when the profile was completed in milliseconds from epoch
}
1 change: 1 addition & 0 deletions agents/grpc/proto/nsolid_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package grpcagent;
service NSolidService {
rpc Command (stream CommandResponse) returns (stream CommandRequest) {}
rpc ExportAsset (stream Asset) returns (EventResponse) {}
rpc ExportContinuousProfile (stream Asset) returns (EventResponse) {}
rpc ExportExit (ExitEvent) returns (EventResponse) {}
rpc ExportInfo (InfoEvent) returns (EventResponse) {}
rpc ExportMetrics (MetricsEvent) returns (EventResponse) {}
Expand Down
15 changes: 12 additions & 3 deletions agents/grpc/src/asset_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@ AssetStream::AssetStream(
AssetStor&& stor,
std::weak_ptr<AssetStreamObserver> observer,
const std::string& agent_id,
const std::string& saas): observer_(observer),
stor_(std::move(stor)) {
const std::string& saas,
AssetStreamRpcType rpc_type): observer_(observer),
stor_(std::move(stor)) {
ASSERT_EQ(0, lock_.init(true));
context_.AddMetadata("nsolid-agent-id", agent_id);
if (!saas.empty()) {
context_.AddMetadata("nsolid-saas-token", saas);
}
stub->async()->ExportAsset(&context_, &event_response_, this);

// Call the appropriate RPC method based on the rpc_type parameter
if (rpc_type == EXPORT_CONTINUOUS_PROFILE) {
stub->async()->ExportContinuousProfile(&context_, &event_response_, this);
} else {
stub->async()->ExportAsset(&context_, &event_response_, this);
}

AddHold();
StartCall();
}
Expand Down Expand Up @@ -86,6 +94,7 @@ void AssetStream::Write(grpcagent::Asset&& asset) {

void AssetStream::WritesDone(bool) {
nsuv::ns_mutex::scoped_lock lock(lock_);
ASSERT(write_state_.write_done_called == false);
write_state_.write_done_called = true;
NextWrite();
}
Expand Down
9 changes: 8 additions & 1 deletion agents/grpc/src/asset_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ namespace grpc {
// Predeclarations
class AssetStream;

// RPC type enum for specifying which RPC method to use
enum AssetStreamRpcType {
EXPORT_ASSET,
EXPORT_CONTINUOUS_PROFILE
};

struct AssetStor {
ProfileType type;
uint64_t thread_id;
Expand All @@ -41,7 +47,8 @@ class AssetStream: public ::grpc::ClientWriteReactor<grpcagent::Asset> {
AssetStor&& stor,
std::weak_ptr<AssetStreamObserver> observer,
const std::string& agent_id,
const std::string& saas);
const std::string& saas,
AssetStreamRpcType rpc_type = EXPORT_ASSET);

~AssetStream();

Expand Down
178 changes: 170 additions & 8 deletions agents/grpc/src/grpc_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -570,12 +570,16 @@ int GrpcAgent::stop(bool profile_stopped) {
} else {
Debug("Stopping gRPC Agent(2): %d\n", profile_stopped);
if (profile_stopped) {
profile_on_exit_ = profile_stopped;
auto it = config_.find("contCpuProfile");
if (it == config_.end() || it->get<bool>() == false) {
profile_on_exit_ = profile_stopped;
}

// Wait here until the are no remaining profiles to be completed
uv_mutex_lock(&stop_lock_);
do {
while (pending_profiles()) {
uv_cond_wait(&stop_cond_, &stop_lock_);
} while (pending_profiles());
}

uv_mutex_unlock(&stop_lock_);
}
Expand Down Expand Up @@ -799,6 +803,17 @@ int GrpcAgent::start_heap_snapshot_from_js(
}
}

/*static*/ void GrpcAgent::cont_profiler_cb(
const ProfileCollector::ProfileQStor& profile_data,
WeakGrpcAgent agent_wp) {
SharedGrpcAgent agent = agent_wp.lock();
if (agent == nullptr) {
return;
}

agent->cont_profile_queue_->enqueue(profile_data);
}

void GrpcAgent::env_creation_cb_(SharedEnvInst envinst,
WeakGrpcAgent agent_wp) {
SharedGrpcAgent agent = agent_wp.lock();
Expand Down Expand Up @@ -1223,7 +1238,22 @@ void GrpcAgent::do_start() {
agent->got_profile(std::move(stor));
},
weak_from_this());
profile_collector_->initialize();

cont_profile_queue_ =
AsyncTSQueue<ProfileCollector::ProfileQStor>::create(
&loop_,
+[](ProfileCollector::ProfileQStor&& stor, WeakGrpcAgent agent_wp) {
SharedGrpcAgent agent = agent_wp.lock();
if (agent == nullptr) {
return;
}

agent->got_continuous_profile(std::move(stor));
},
weak_from_this());

EnvList::Inst()->GetContinuousProfiler()->RegisterHook(cont_profiler_cb,
weak_from_this());

ready_ = true;

Expand Down Expand Up @@ -1265,6 +1295,7 @@ void GrpcAgent::do_stop() {
env_msg_.close();
shutdown_.close();
start_profiling_msg_.close();
cont_profile_queue_.reset();
}

void GrpcAgent::got_spans(const UniqRecordables& recordables) {
Expand Down Expand Up @@ -1342,9 +1373,11 @@ void GrpcAgent::got_proc_metrics() {
void GrpcAgent::got_profile(const ProfileCollector::ProfileQStor& stor) {
google::protobuf::Struct metadata;
uint64_t thread_id;
std::visit([&metadata, &thread_id](auto& opt) {
uint64_t start_timestamp;
std::visit([&metadata, &thread_id, &start_timestamp](auto& opt) {
thread_id = opt.thread_id;
metadata = opt.metadata_pb;
start_timestamp = opt.start_timestamp;
}, stor.options);

nsuv::ns_mutex::scoped_lock lock(profile_state_lock_);
Expand Down Expand Up @@ -1391,14 +1424,15 @@ void GrpcAgent::got_profile(const ProfileCollector::ProfileQStor& stor) {
return;
}

const uint64_t duration = (uv_hrtime() - start_timestamp) / 1e6;
grpcagent::Asset asset;
PopulateCommon(asset.mutable_common(),
ProfileTypeStr[stor.type],
prof_stor.req_id.c_str());
asset.set_thread_id(thread_id);
asset.mutable_metadata()->CopyFrom(metadata);
asset.set_complete(true);
asset.set_duration(uv_now(&loop_) - prof_stor.timestamp);
asset.set_duration(duration);
prof_stor.stream->Write(std::move(asset));
prof_stor.stream->WritesDone();
} else {
Expand Down Expand Up @@ -1449,6 +1483,123 @@ void GrpcAgent::got_profile(const ProfileCollector::ProfileQStor& stor) {
}
}

void GrpcAgent::got_continuous_profile(
const ProfileCollector::ProfileQStor& stor) {
static double performance_process_start_timestamp =
performance::performance_process_start_timestamp / 1e3;
google::protobuf::Struct metadata;
uint64_t thread_id;
uint64_t start_timestamp;
std::visit([&metadata, &thread_id, &start_timestamp](auto& opt) {
thread_id = opt.thread_id;
metadata = opt.metadata_pb;
start_timestamp = opt.start_timestamp;
}, stor.options);

Debug("got_continuous_profile. len: %ld, status: %d. thread_id: %ld\n",
stor.profile.length(),
stor.status,
thread_id);

if (stor.status < 0) {
// Log error but don't send error message back for continuous profiles
auto error = translate_error(stor.status);
Debug("Continuous profile error: %d\n", static_cast<int>(error));
return;
}

// Look if entry in cont_profile_stor_map_. If not create one
auto it = cont_profile_stor_map_.find(thread_id);
if (it == cont_profile_stor_map_.end()) {
// Create a new stream for the continuous profile
AssetStream* stream = new AssetStream(nsolid_service_stub_.get(),
AssetStor{stor.type, thread_id},
weak_from_this(),
agent_id_,
saas(),
EXPORT_CONTINUOUS_PROFILE);
it = cont_profile_stor_map_.emplace(thread_id, ProfileStor{
utils::generate_unique_id(),
stream,
stor.options
}).first;
}

const ProfileStor& profile_stor = it->second;
const std::string& req_id = profile_stor.req_id;
AssetStream* stream = profile_stor.stream;

// Check if the profile is complete
bool profileStreamComplete = stor.profile.length() == 0;
if (profileStreamComplete) {
double start_ts =
performance_process_start_timestamp + start_timestamp / 1e6;
double end_ts = performance_process_start_timestamp + uv_hrtime() / 1e6;
uint64_t duration = (uv_hrtime() - start_timestamp) / 1e6;
// Create complete profile
grpcagent::Asset asset;
PopulateCommon(asset.mutable_common(),
ProfileTypeStr[stor.type],
req_id.c_str());
asset.set_thread_id(thread_id);
asset.mutable_metadata()->CopyFrom(metadata);
asset.set_complete(true);
asset.set_duration(duration);
asset.set_start_ts(start_ts);
asset.set_end_ts(end_ts);

// Remove the entry from the map
cont_profile_stor_map_.erase(it);

// Send the complete profile
stream->Write(std::move(asset));
stream->WritesDone();
return;
}

// Send profile chunks
grpcagent::Asset asset;
PopulateCommon(asset.mutable_common(),
ProfileTypeStr[stor.type],
req_id.c_str());
asset.set_thread_id(thread_id);
asset.mutable_metadata()->CopyFrom(metadata);
asset.set_data(stor.profile);

size_t asset_size = asset.ByteSizeLong();
if (asset_size > GRPC_MAX_SIZE) {
// Split the data into chunks
Debug("Continuous profile size larger than supported (%ld > %ld): "
"splitting profile into chunks\n", asset_size, GRPC_MAX_SIZE);
size_t prof_size = stor.profile.size();
size_t rest = asset_size - prof_size;
size_t offset = 0;
size_t chunk_size = GRPC_MAX_SIZE - rest - 100;

while (offset < prof_size) {
grpcagent::Asset chunk_asset;
PopulateCommon(chunk_asset.mutable_common(),
ProfileTypeStr[stor.type],
req_id.c_str());
chunk_asset.set_thread_id(thread_id);
chunk_asset.mutable_metadata()->CopyFrom(metadata);

if (offset + chunk_size > prof_size) {
chunk_size = prof_size - offset;
}

chunk_asset.set_data(stor.profile.substr(offset, chunk_size));

Debug("Sending continuous profile chunk of size: %ld\n",
chunk_asset.ByteSizeLong());
stream->Write(std::move(chunk_asset));
offset += chunk_size;
}
} else {
stream->Write(std::move(asset));
}
}

void GrpcAgent::handle_command_request(CommandRequestStor&& req) {
const grpcagent::CommandRequest& request = req.request;
Debug("Command Received: %s\n", request.DebugString().c_str());
Expand Down Expand Up @@ -1893,6 +2044,16 @@ ErrorType GrpcAgent::do_start_prof_init(
uint64_t thread_id = args.thread_id();
uint64_t duration = args.duration();
StartProfiling start_profiling = nullptr;

// Check if continuous profiling is enabled for this profile type
if (type == ProfileType::kCpu) {
auto it = config_.find("contCpuProfile");
if (it != config_.end() && it->get<bool>() == true) {
// Continuous CPU profiling is enabled, don't allow manual CPU profiles
return ErrorType::EInProgressError;
}
}

switch (type) {
case ProfileType::kCpu:
start_profiling = &GrpcAgent::do_start_cpu_prof;
Expand All @@ -1915,12 +2076,12 @@ ErrorType GrpcAgent::do_start_prof_init(
opt.thread_id = thread_id;
opt.duration = duration;
opt.metadata_pb = std::move(args.metadata());
opt.start_timestamp = uv_hrtime();
}, options);

nsuv::ns_mutex::scoped_lock lock(profile_state_lock_);
ProfileState& profile_state = profile_state_[type];
ProfileStor stor{ req.requestid(),
uv_now(&loop_),
nullptr,
options };
auto iter = profile_state.pending_profiles_map.emplace(thread_id,
Expand Down Expand Up @@ -1952,7 +2113,8 @@ ErrorType GrpcAgent::do_start_prof_end(ErrorType err,
AssetStor{type, thread_id},
weak_from_this(),
agent_id_,
saas());
saas(),
EXPORT_ASSET);
if (err != ErrorType::ESuccess) {
send_asset_error(type, req_id, std::move(opts), stream, err);
return err;
Expand Down
11 changes: 10 additions & 1 deletion agents/grpc/src/grpc_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ class GrpcAgent: public std::enable_shared_from_this<GrpcAgent>,

struct ProfileStor {
std::string req_id;
uint64_t timestamp;
AssetStream* stream;
ProfileOptions options;
bool done = false;
Expand Down Expand Up @@ -166,6 +165,9 @@ class GrpcAgent: public std::enable_shared_from_this<GrpcAgent>,

static void config_msg_cb_(nsuv::ns_async*, WeakGrpcAgent);

static void cont_profiler_cb(const ProfileCollector::ProfileQStor&,
WeakGrpcAgent);

static void env_creation_cb_(SharedEnvInst, WeakGrpcAgent);

static void env_deletion_cb_(SharedEnvInst, WeakGrpcAgent);
Expand Down Expand Up @@ -246,6 +248,8 @@ class GrpcAgent: public std::enable_shared_from_this<GrpcAgent>,

void got_profile(const ProfileCollector::ProfileQStor& stor);

void got_continuous_profile(const ProfileCollector::ProfileQStor& stor);

void got_spans(const UniqRecordables& spans);

void handle_command_request(CommandRequestStor&& req);
Expand Down Expand Up @@ -351,6 +355,11 @@ class GrpcAgent: public std::enable_shared_from_this<GrpcAgent>,
nsuv::ns_async start_profiling_msg_;
TSQueue<StartProfStor> start_profiling_msg_q_;

// Continuous Profiling
ProfileStorMap cont_profile_stor_map_;
std::shared_ptr<AsyncTSQueue<ProfileCollector::ProfileQStor>>
cont_profile_queue_;

// For the grpc client
std::unique_ptr<grpcagent::NSolidService::StubInterface> nsolid_service_stub_;
std::unique_ptr<CommandStream> command_stream_;
Expand Down
Loading
Loading