Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 38 additions & 26 deletions agents/grpc/src/grpc_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ const uint64_t auth_timer_interval = 500;

const size_t GRPC_MAX_SIZE = 4L * 1024 * 1024; // 4GB

const int PUB_KEY_SIZE = 40;
const int CONSOLE_ID_SIZE = 36;

static const char* const root_certs[] = {
#include "node_root_certs.h" // NOLINT(build/include_order)
};
Expand Down Expand Up @@ -474,7 +477,7 @@ void GrpcAgent::reset_command_stream() {
command_stream_ = std::make_unique<CommandStream>(nsolid_service_stub_.get(),
weak_from_this(),
agent_id_,
saas_);
saas());
}

void GrpcAgent::set_asset_cb(SharedEnvInst envinst,
Expand Down Expand Up @@ -994,11 +997,9 @@ int GrpcAgent::config(const json& config) {

if (utils::find_any_fields_in_diff(diff, { "/saas" })) {
auto it = config_.find("saas");
saas_.reset();
if (it != config_.end()) {
parse_saas_token(*it);
} else {
saas_.clear();
console_id_.clear();
}
}

Expand All @@ -1011,20 +1012,21 @@ int GrpcAgent::config(const json& config) {
auto insecure_str =
per_process::system_environment->Get(kNSOLID_GRPC_INSECURE);
// Only parse the insecure flag in non SaaS mode.
if (saas_.empty() && insecure_str.has_value()) {
if (!saas_ && insecure_str.has_value()) {
// insecure = std::stoull(insecure_str.value());
insecure = std::stoi(insecure_str.value());
}

const std::string endpoint = console_id_.empty() ?
it->get<std::string>() : console_id_ + ".grpc.nodesource.io:443";
const std::string& endpoint = !saas_ ?
it->get<std::string>() :
saas_->endpoint;
Debug("GrpcAgent configured. Endpoint: %s. Insecure: %d\n",
endpoint.c_str(), static_cast<unsigned>(insecure));
{
OtlpGrpcExporterOptions options;
options.endpoint = endpoint;
options.metadata = {{"nsolid-agent-id", agent_id_},
{"nsolid-saas", saas_}};
{"nsolid-saas", saas()}};
if (!insecure) {
options.use_ssl_credentials = true;
if (!custom_certs_.empty()) {
Expand All @@ -1040,7 +1042,7 @@ int GrpcAgent::config(const json& config) {
OtlpGrpcMetricExporterOptions options;
options.endpoint = endpoint;
options.metadata = {{"nsolid-agent-id", agent_id_},
{"nsolid-saas", saas_}};
{"nsolid-saas", saas()}};
if (!insecure) {
options.use_ssl_credentials = true;
if (!custom_certs_.empty()) {
Expand All @@ -1056,7 +1058,7 @@ int GrpcAgent::config(const json& config) {
OtlpGrpcLogRecordExporterOptions options;
options.endpoint = endpoint;
options.metadata = {{"nsolid-agent-id", agent_id_},
{"nsolid-saas", saas_}};
{"nsolid-saas", saas()}};
if (!insecure) {
options.use_ssl_credentials = true;
if (!custom_certs_.empty()) {
Expand All @@ -1072,7 +1074,7 @@ int GrpcAgent::config(const json& config) {
OtlpGrpcClientOptions options;
options.endpoint = endpoint;
options.metadata = {{"nsolid-agent-id", agent_id_},
{"nsolid-saas", saas_}};
{"nsolid-saas", saas()}};
if (!insecure) {
options.use_ssl_credentials = true;
if (!custom_certs_.empty()) {
Expand Down Expand Up @@ -1450,22 +1452,32 @@ void GrpcAgent::handle_command_request(CommandRequestStor&& req) {
}

void GrpcAgent::parse_saas_token(const std::string& token) {
std::string pubKey = token.substr(0, 40);
Debug("Parsing SaaS token: %s\n", token.c_str());
std::string pubKey = token.substr(0, PUB_KEY_SIZE);
std::replace(pubKey.begin(), pubKey.end(), ',', '!');
std::string saasUrl = token.substr(40, token.length());
std::string saasUrl = token.substr(PUB_KEY_SIZE, token.length());
std::string baseUrl;
std::string basePort;
std::istringstream saasStream(saasUrl);
std::getline(saasStream, baseUrl, ':');
std::getline(saasStream, basePort, ':');

if (baseUrl.empty() || basePort.empty() || pubKey.length() != 40) {
if (baseUrl.empty() || basePort.empty() || pubKey.length() != PUB_KEY_SIZE) {
Debug("Invalid SaaS token: %s\n", token.c_str());
return;
}

std::string console_id = baseUrl.substr(0, baseUrl.find('.'));
if (console_id.size() != CONSOLE_ID_SIZE) {
Debug("Invalid SaaS token: %s\n", token.c_str());
return;
}

saas_ = token;
console_id_ = baseUrl.substr(0, baseUrl.find('.'));
bool is_staging = token.find("staging") != std::string::npos;
std::string endpoint = is_staging ?
console_id + ".grpc.staging.nodesource.io:443" :
console_id + ".grpc.nodesource.io:443";
saas_ = std::make_unique<SaaSInfo>(SaaSInfo{token, std::move(endpoint)});
}

bool GrpcAgent::pending_profiles() const {
Expand Down Expand Up @@ -1557,7 +1569,7 @@ void GrpcAgent::send_blocked_loop_event(BlockedLoopStor&& stor) {
Arena::Create<grpcagent::BlockedLoopEvent>(arena.get());
PopulateBlockedLoopEvent(event, stor);

auto context = GrpcClient::MakeClientContext(agent_id_, saas_);
auto context = GrpcClient::MakeClientContext(agent_id_, saas());

GrpcClient::DelegateAsyncExport(
nsolid_service_stub_.get(), std::move(context), std::move(arena),
Expand Down Expand Up @@ -1593,7 +1605,7 @@ void GrpcAgent::send_exit() {
exit_body->set_profile(cpu_profile_state.last_main_profile);
}

auto context = GrpcClient::MakeClientContext(agent_id_, saas_);
auto context = GrpcClient::MakeClientContext(agent_id_, saas());
uv_cond_t cond;
uv_mutex_t lock;
bool signaled = false;
Expand Down Expand Up @@ -1642,7 +1654,7 @@ void GrpcAgent::send_info_event(const char* req_id) {
PopulateInfoEvent(info_event, info, req_id);
}

auto context = GrpcClient::MakeClientContext(agent_id_, saas_);
auto context = GrpcClient::MakeClientContext(agent_id_, saas());

GrpcClient::DelegateAsyncExport(
nsolid_service_stub_.get(), std::move(context), std::move(arena),
Expand All @@ -1669,7 +1681,7 @@ void GrpcAgent::send_metrics_event(const char* req_id) {
thr_metrics_cache_,
req_id);

auto context = GrpcClient::MakeClientContext(agent_id_, saas_);
auto context = GrpcClient::MakeClientContext(agent_id_, saas());

GrpcClient::DelegateAsyncExport(
nsolid_service_stub_.get(), std::move(context), std::move(arena),
Expand All @@ -1691,7 +1703,7 @@ void GrpcAgent::send_packages_event(const char* req_id) {
auto packages_event = Arena::Create<grpcagent::PackagesEvent>(arena.get());
PopulatePackagesEvent(packages_event, req_id);

auto context = GrpcClient::MakeClientContext(agent_id_, saas_);
auto context = GrpcClient::MakeClientContext(agent_id_, saas());

GrpcClient::DelegateAsyncExport(
nsolid_service_stub_.get(), std::move(context), std::move(arena),
Expand All @@ -1714,7 +1726,7 @@ void GrpcAgent::send_reconfigure_event(const char* req_id) {
Arena::Create<grpcagent::ReconfigureEvent>(arena.get());
PopulateReconfigureEvent(reconfigure_event, req_id);

auto context = GrpcClient::MakeClientContext(agent_id_, saas_);
auto context = GrpcClient::MakeClientContext(agent_id_, saas());

GrpcClient::DelegateAsyncExport(
nsolid_service_stub_.get(), std::move(context), std::move(arena),
Expand Down Expand Up @@ -1764,7 +1776,7 @@ void GrpcAgent::send_source_code_event(const grpcagent::CommandRequest& req) {
}
}

auto context = GrpcClient::MakeClientContext(agent_id_, saas_);
auto context = GrpcClient::MakeClientContext(agent_id_, saas());

GrpcClient::DelegateAsyncExport(
nsolid_service_stub_.get(), std::move(context), std::move(arena),
Expand All @@ -1786,7 +1798,7 @@ void GrpcAgent::send_startup_times_event(const char* req_id) {
auto st_event = Arena::Create<grpcagent::StartupTimesEvent>(arena.get());
PopulateStartupTimesEvent(st_event, req_id);

auto context = GrpcClient::MakeClientContext(agent_id_, saas_);
auto context = GrpcClient::MakeClientContext(agent_id_, saas());

GrpcClient::DelegateAsyncExport(
nsolid_service_stub_.get(), std::move(context), std::move(arena),
Expand All @@ -1810,7 +1822,7 @@ void GrpcAgent::send_unblocked_loop_event(BlockedLoopStor&& stor) {
Arena::Create<grpcagent::UnblockedLoopEvent>(arena.get());
PopulateUnblockedLoopEvent(event, stor);

auto context = GrpcClient::MakeClientContext(agent_id_, saas_);
auto context = GrpcClient::MakeClientContext(agent_id_, saas());

GrpcClient::DelegateAsyncExport(
nsolid_service_stub_.get(), std::move(context), std::move(arena),
Expand Down Expand Up @@ -1912,7 +1924,7 @@ ErrorType GrpcAgent::do_start_prof_end(ErrorType err,
AssetStor{type, thread_id},
weak_from_this(),
agent_id_,
saas_);
saas());
if (err != ErrorType::ESuccess) {
send_asset_error(type, req_id, std::move(opts), stream, err);
return err;
Expand Down
13 changes: 10 additions & 3 deletions agents/grpc/src/grpc_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ class GrpcAgent: public std::enable_shared_from_this<GrpcAgent>,

const std::string& agent_id() const { return agent_id_; }

const std::string& saas() const { return saas_; }
const std::string& saas() const {
static std::string empty;
return saas_ ? saas_->token : empty;
}

private:
struct CommandRequestStor {
Expand Down Expand Up @@ -139,6 +142,11 @@ class GrpcAgent: public std::enable_shared_from_this<GrpcAgent>,
ProfileOptions options;
};

struct SaaSInfo {
std::string token;
std::string endpoint;
};

GrpcAgent();

~GrpcAgent();
Expand Down Expand Up @@ -325,8 +333,7 @@ class GrpcAgent: public std::enable_shared_from_this<GrpcAgent>,
TSQueue<nlohmann::json> config_msg_q_;
nlohmann::json config_;
std::string agent_id_;
std::string saas_;
std::string console_id_;
std::unique_ptr<SaaSInfo> saas_;

nsuv::ns_timer auth_timer_;
int auth_retries_;
Expand Down
Loading