Skip to content

Commit

Permalink
Allow tag sending
Browse files Browse the repository at this point in the history
Signed-off-by: Bob Weinand <[email protected]>
  • Loading branch information
bwoebi committed Jun 27, 2024
1 parent eb37744 commit bc824b8
Show file tree
Hide file tree
Showing 15 changed files with 160 additions and 43 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions live-debugger-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@ crate-type = ["lib", "staticlib", "cdylib"]
datadog-live-debugger = { path = "../live-debugger" }
ddcommon = { path = "../ddcommon" }
ddcommon-ffi = { path = "../ddcommon-ffi" }
percent-encoding = "2.1"
uuid = { version = "1.7.0", features = ["v4"] }
serde_json = "1.0"
tokio = "1.36.0"
log = "0.4.21"

[features]
default = ["cbindgen"]
cbindgen = ["build_common/cbindgen", "ddcommon-ffi/cbindgen"]

[build-dependencies]
build_common = { path = "../build-common" }
10 changes: 10 additions & 0 deletions live-debugger-ffi/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
extern crate build_common;

use build_common::generate_and_configure_header;

fn main() {
let header_name = "live-debugger.h";
generate_and_configure_header(header_name);
}
1 change: 1 addition & 0 deletions live-debugger-ffi/src/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ pub extern "C" fn ddog_evaluated_value_get<'a>(
#[no_mangle]
pub extern "C" fn ddog_evaluated_value_drop(_: Box<InternalIntermediateValue>) {}

#[allow(clippy::boxed_local)]
pub fn ddog_evaluated_value_into_string<'a>(
value: Box<InternalIntermediateValue<'a>>,
context: &'a mut c_void,
Expand Down
3 changes: 3 additions & 0 deletions live-debugger-ffi/src/send_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ pub extern "C" fn ddog_update_payload_message<'a>(
}

#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_snapshot_entry<'a>(
payload: &mut DebuggerPayload<'a>,
) -> *mut DebuggerCapture<'a> {
Expand All @@ -173,6 +174,7 @@ pub unsafe extern "C" fn ddog_snapshot_entry<'a>(
}

#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_snapshot_lines<'a>(
payload: &mut DebuggerPayload<'a>,
line: u32,
Expand All @@ -194,6 +196,7 @@ pub unsafe extern "C" fn ddog_snapshot_lines<'a>(
}

#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_snapshot_exit<'a>(
payload: &mut DebuggerPayload<'a>,
) -> *mut DebuggerCapture<'a> {
Expand Down
24 changes: 21 additions & 3 deletions live-debugger-ffi/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use ddcommon_ffi::{CharSlice, MaybeError};
use log::warn;
use std::sync::Arc;
use std::thread::JoinHandle;
use percent_encoding::{CONTROLS, percent_encode};
use tokio::sync::mpsc;
use datadog_live_debugger::sender::generate_tags;
use ddcommon::tag::Tag;

macro_rules! try_c {
($failable:expr) => {
Expand Down Expand Up @@ -37,21 +40,23 @@ enum SendData {
Wrapped(OwnedCharSlice),
}

async fn sender_routine(endpoint: Arc<Endpoint>, mut receiver: mpsc::Receiver<SendData>) {
async fn sender_routine(endpoint: Arc<Endpoint>, tags: String, mut receiver: mpsc::Receiver<SendData>) {
let tags = Arc::new(tags);
loop {
let data = match receiver.recv().await {
None => break,
Some(data) => data,
};

let endpoint = endpoint.clone();
let tags = tags.clone();
tokio::spawn(async move {
let data = match &data {
SendData::Raw(vec) => vec.as_slice(),
SendData::Wrapped(wrapped) => wrapped.slice.as_bytes(),
};

if let Err(e) = sender::send(data, &endpoint).await {
if let Err(e) = sender::send(data, &endpoint, &tags).await {
warn!("Failed to send debugger data: {e:?}");
}
});
Expand All @@ -63,9 +68,20 @@ pub struct SenderHandle {
channel: mpsc::Sender<SendData>,
}

#[no_mangle]
pub extern "C" fn ddog_live_debugger_build_tags(debugger_version: CharSlice, env: CharSlice, version: CharSlice, runtime_id: CharSlice, global_tags: ddcommon_ffi::Vec<Tag>) -> Box<String> {
Box::new(generate_tags(&debugger_version.to_utf8_lossy(), &env.to_utf8_lossy(), &version.to_utf8_lossy(), &runtime_id.to_utf8_lossy(), &mut global_tags.into_iter()))
}

#[no_mangle]
pub extern "C" fn ddog_live_debugger_tags_from_raw(tags: CharSlice) -> Box<String> {
Box::new(percent_encode(tags.as_bytes(), CONTROLS).to_string())
}

#[no_mangle]
pub extern "C" fn ddog_live_debugger_spawn_sender(
endpoint: &Endpoint,
tags: Box<String>,
handle: &mut *mut SenderHandle,
) -> MaybeError {
let runtime = try_c!(tokio::runtime::Builder::new_current_thread()
Expand All @@ -77,7 +93,7 @@ pub extern "C" fn ddog_live_debugger_spawn_sender(

*handle = Box::into_raw(Box::new(SenderHandle {
join: std::thread::spawn(move || {
runtime.block_on(sender_routine(endpoint, mailbox));
runtime.block_on(sender_routine(endpoint, *tags, mailbox));
runtime.shutdown_background();
}),
channel: tx,
Expand Down Expand Up @@ -106,11 +122,13 @@ pub extern "C" fn ddog_live_debugger_send_payload(
}

#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_live_debugger_drop_sender(sender: *mut SenderHandle) {
drop(Box::from_raw(sender));
}

#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_live_debugger_join_sender(sender: *mut SenderHandle) {
let sender = Box::from_raw(sender);
drop(sender.channel);
Expand Down
2 changes: 2 additions & 0 deletions live-debugger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ ddcommon = { path = "../ddcommon" }
hyper = { version = "0.14", features = ["client"] }
regex = "1.9.3"
json = "0.12.4"
percent-encoding = "2.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sys-info = { version = "0.9.0" }
uuid = { version = "1.0", features = ["v4"] }
2 changes: 1 addition & 1 deletion live-debugger/src/expr_eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ where
{
let mut eval = Eval { eval, it: None };
eval.value(&value.0)
.and_then(|v| Ok(v.try_use(&mut eval)?))
.and_then(|v| v.try_use(&mut eval))
.map_err(|e| SnapshotEvaluationError {
expr: value.to_string(),
message: e.0,
Expand Down
33 changes: 21 additions & 12 deletions live-debugger/src/sender.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use std::fmt::Display;
use crate::debugger_defs::DebuggerPayload;
use ddcommon::connector::Connector;
use ddcommon::Endpoint;
Expand All @@ -9,7 +10,10 @@ use hyper::{Body, Client, Method, Uri};
use serde::Serialize;
use std::hash::Hash;
use std::str::FromStr;
use std::sync::Arc;
use percent_encoding::{CONTROLS, percent_encode};
use uuid::Uuid;
use ddcommon::tag::Tag;

pub const PROD_INTAKE_SUBDOMAIN: &str = "http-intake.logs";

Expand All @@ -18,7 +22,7 @@ const AGENT_TELEMETRY_URL_PATH: &str = "/debugger/v1/input";

#[derive(Default)]
pub struct Config {
pub endpoint: Option<Endpoint>,
pub endpoint: Option<Arc<Endpoint>>,
}

impl Config {
Expand All @@ -34,7 +38,7 @@ impl Config {
}

endpoint.url = Uri::from_parts(uri_parts)?;
self.endpoint = Some(endpoint);
self.endpoint = Some(Arc::new(endpoint));
Ok(())
}
}
Expand All @@ -43,7 +47,20 @@ pub fn encode<S: Eq + Hash + Serialize>(data: Vec<DebuggerPayload>) -> Vec<u8> {
serde_json::to_vec(&data).unwrap()
}

pub async fn send(payload: &[u8], endpoint: &Endpoint) -> anyhow::Result<()> {
pub fn generate_tags(debugger_version: &dyn Display, env: &dyn Display, version: &dyn Display, runtime_id: &dyn Display, custom_tags: &mut dyn Iterator<Item=&Tag>) -> String {
let mut tags = format!("debugger_version:{debugger_version},env:{env},version:{version},runtime_id:{runtime_id}");
if let Ok(hostname) = sys_info::hostname() {
tags.push_str(",host_name:");
tags.push_str(hostname.as_str());
}
for tag in custom_tags {
tags.push(',');
tags.push_str(tag.as_ref());
}
percent_encode(tags.as_bytes(), CONTROLS).to_string()
}

pub async fn send(payload: &[u8], endpoint: &Endpoint, percent_encoded_tags: &str) -> anyhow::Result<()> {
let mut req = hyper::Request::builder()
.header(
hyper::header::USER_AGENT,
Expand All @@ -58,17 +75,9 @@ pub async fn send(payload: &[u8], endpoint: &Endpoint) -> anyhow::Result<()> {
}

let mut parts = url.into_parts();
let mut query = String::from(parts.path_and_query.unwrap().as_str());
query.push_str("?ddtags=host:");
query.push_str(""); // TODO hostname
// TODO container tags and such
let query = format!("{}?ddtags={}", parts.path_and_query.unwrap(), percent_encoded_tags);
parts.path_and_query = Some(PathAndQuery::from_str(&query)?);
url = Uri::from_parts(parts)?;
// "env:" + config.getEnv(),
// "version:" + config.getVersion(),
// "debugger_version:" + DDTraceCoreInfo.VERSION,
// "agent_version:" + DebuggerAgent.getAgentVersion(),
// "host_name:" + config.getHostName());

// SAFETY: we ensure the reference exists across the request
let req = req.uri(url).body(Body::from(unsafe {
Expand Down
7 changes: 6 additions & 1 deletion sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ pub unsafe extern "C" fn ddog_sidecar_send_trace_v04_bytes(
pub unsafe extern "C" fn ddog_sidecar_send_debugger_data(
transport: &mut Box<SidecarTransport>,
instance_id: &InstanceId,
queue_id: QueueId,
payloads: Vec<DebuggerPayload>,
) -> MaybeError {
if payloads.is_empty() {
Expand All @@ -631,6 +632,7 @@ pub unsafe extern "C" fn ddog_sidecar_send_debugger_data(
try_c!(blocking::send_debugger_data_shm_vec(
transport,
instance_id,
queue_id,
payloads,
));

Expand All @@ -643,9 +645,10 @@ pub unsafe extern "C" fn ddog_sidecar_send_debugger_data(
pub unsafe extern "C" fn ddog_sidecar_send_debugger_datum(
transport: &mut Box<SidecarTransport>,
instance_id: &InstanceId,
queue_id: QueueId,
payload: Box<DebuggerPayload>,
) -> MaybeError {
ddog_sidecar_send_debugger_data(transport, instance_id, vec![*payload])
ddog_sidecar_send_debugger_data(transport, instance_id, queue_id, vec![*payload])
}

#[no_mangle]
Expand All @@ -657,6 +660,7 @@ pub unsafe extern "C" fn ddog_sidecar_set_remote_config_data(
service_name: ffi::CharSlice,
env_name: ffi::CharSlice,
app_version: ffi::CharSlice,
global_tags: ddcommon_ffi::Vec<Tag>,
) -> MaybeError {
try_c!(blocking::set_remote_config_data(
transport,
Expand All @@ -665,6 +669,7 @@ pub unsafe extern "C" fn ddog_sidecar_set_remote_config_data(
service_name.to_utf8_lossy().into(),
env_name.to_utf8_lossy().into(),
app_version.to_utf8_lossy().into(),
global_tags.into(),
));

MaybeError::None
Expand Down
9 changes: 9 additions & 0 deletions sidecar/src/service/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
time::{Duration, Instant},
};
use tracing::info;
use ddcommon::tag::Tag;

/// `SidecarTransport` is a wrapper around a BlockingTransport struct from the `datadog_ipc` crate
/// that handles transparent reconnection.
Expand Down Expand Up @@ -278,6 +279,7 @@ pub fn send_trace_v04_shm(
///
/// * `transport` - The transport used for communication.
/// * `instance_id` - The ID of the instance.
/// * `queue_id` - The unique identifier for the trace context.
/// * `handle` - The handle to the shared memory.
///
/// # Returns
Expand All @@ -286,10 +288,12 @@ pub fn send_trace_v04_shm(
pub fn send_debugger_data_shm(
transport: &mut SidecarTransport,
instance_id: &InstanceId,
queue_id: QueueId,
handle: ShmHandle,
) -> io::Result<()> {
transport.send(SidecarInterfaceRequest::SendDebuggerDataShm {
instance_id: instance_id.clone(),
queue_id,
handle,
})
}
Expand All @@ -300,6 +304,7 @@ pub fn send_debugger_data_shm(
///
/// * `transport` - The transport used for communication.
/// * `instance_id` - The ID of the instance.
/// * `queue_id` - The unique identifier for the trace context.
/// * `payloads` - The payloads to be sent
///
/// # Returns
Expand All @@ -308,6 +313,7 @@ pub fn send_debugger_data_shm(
pub fn send_debugger_data_shm_vec(
transport: &mut SidecarTransport,
instance_id: &InstanceId,
queue_id: QueueId,
payloads: Vec<datadog_live_debugger::debugger_defs::DebuggerPayload>,
) -> anyhow::Result<()> {
struct SizeCount(usize);
Expand All @@ -331,6 +337,7 @@ pub fn send_debugger_data_shm_vec(
Ok(send_debugger_data_shm(
transport,
instance_id,
queue_id,
mapped.into(),
)?)
}
Expand Down Expand Up @@ -358,13 +365,15 @@ pub fn set_remote_config_data(
service_name: String,
env_name: String,
app_version: String,
global_tags: Vec<Tag>,
) -> io::Result<()> {
transport.send(SidecarInterfaceRequest::SetRemoteConfigData {
instance_id: instance_id.clone(),
queue_id: *queue_id,
service_name,
env_name,
app_version,
global_tags,
})
}

Expand Down
Loading

0 comments on commit bc824b8

Please sign in to comment.