Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions runtimes/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ percent-encoding = "2.3.1"
aws-credential-types = "1.2.1"
regex = "1.11.1"
email_address = "0.2.9"
sync-cell = "0.2.0"

[build-dependencies]
prost-build = "0.12.3"
Expand Down
69 changes: 39 additions & 30 deletions runtimes/core/src/trace/log.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::convert::Infallible;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};

use crate::api::reqauth::platform;
Expand Down Expand Up @@ -58,10 +58,8 @@ pub(super) struct TraceEvent {

impl Reporter {
pub async fn start_reporting(self) {
let mut inner = Box::new(InnerTraceEventStream {
rx: self.rx,
anchor: self.anchor.clone(),
current: None,
let inner = Arc::new(InnerTraceEventStream {
rx: sync_cell::SyncCell::new(self.rx),
});
let trace_time_anchor = self.anchor.trace_header();

Expand Down Expand Up @@ -95,24 +93,26 @@ impl Reporter {

loop {
// Wait for at least one entry on rx before we open an HTTP request.
{
let Some(event) = inner.rx.recv().await else {
let event = {
let Some(event) = inner.lock().unwrap().rx.recv().await else {
// The stream is closed. This only happens if all senders have been dropped,
// which should never happen in regular use.
return;
};
inner.current = Some(StreamingTraceEvent {
StreamingTraceEvent {
event,
next: EventStreamState::Header,
});
}
};

// Construct the body stream.
let mut no_data_ticker = tokio::time::interval(std::time::Duration::from_millis(1000));
no_data_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let stream = TraceEventStream {
inner: inner.as_mut() as *mut InnerTraceEventStream,
anchor: self.anchor.clone(),
current: Some(event),
inner: inner.clone(),
num_events_this_tick: 1,
no_data_ticker,
};
Expand Down Expand Up @@ -158,53 +158,58 @@ impl Reporter {
}

struct TraceEventStream {
inner: *mut InnerTraceEventStream,
inner: InnerTraceEventStream,
anchor: TimeAnchor,

// The number of events received since the last tick.
num_events_this_tick: usize,

/// Ticks to detect when there is no data to close the stream.
no_data_ticker: tokio::time::Interval,

/// Current item received from rx and being streamed.
current: Option<StreamingTraceEvent>,
}

// Safety: the TraceEventStream only contains `poll_next` which requires a mutable reference
// to self. Therefore it is never called concurrently. The lifetime of inner is guaranteed
// to exceed the lifetime of the stream.
unsafe impl Send for TraceEventStream {}
unsafe impl Sync for TraceEventStream {}
// unsafe impl Send for TraceEventStream {}
// unsafe impl Sync for TraceEventStream {}

struct InnerTraceEventStream {
rx: tokio::sync::mpsc::UnboundedReceiver<TraceEvent>,
anchor: TimeAnchor,
rx: sync_cell::SyncCell<Option<tokio::sync::mpsc::UnboundedReceiver<TraceEvent>>>,
}

/// Current item received from rx and being streamed.
current: Option<StreamingTraceEvent>,
impl InnerTraceEventStream {
fn poll_recv(&self, cx: &mut Context) -> Poll<Option<TraceEvent>> {
let mut rx = self.rx.replace(None).unwrap();
let poll = rx.poll_recv(cx);
self.rx.replace(Some(rx));
poll
}
}

impl futures_core::stream::Stream for TraceEventStream {
type Item = Result<Bytes, Infallible>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// Safety: the inner pointer is boxed and never moved, and is kept alive
// by the start_reporting method for the lifetime of the stream.
let inner = unsafe { &mut *self.inner };

{
// If we have a current item, return it.
if inner.current.is_some() {
let next = inner.current.as_ref().unwrap().next;
if self.current.is_some() {
let next = self.current.as_ref().unwrap().next;
return match next {
EventStreamState::Header => {
inner.current.as_mut().unwrap().next = EventStreamState::Data;
Poll::Ready(Some(Ok(inner
self.current.as_mut().unwrap().next = EventStreamState::Data;
Poll::Ready(Some(Ok(self
.current
.as_ref()
.unwrap()
.header(&inner.anchor))))
.header(&self.anchor))))
}
EventStreamState::Data => {
let data = inner.current.as_ref().unwrap().event.data.clone(); // cheap clone
inner.current = None;
let data = self.current.as_ref().unwrap().event.data.clone(); // cheap clone
self.current = None;
Poll::Ready(Some(Ok(data)))
}
};
Expand All @@ -228,10 +233,14 @@ impl futures_core::stream::Stream for TraceEventStream {

// If we have no current item, poll the receiver for a new trace event.
{
match inner.rx.poll_recv(cx) {
let mut rx = self.inner.poll_recv(cx);
let poll = rx.poll_recv(cx);
self.inner.rx.replace(Some(rx));

match poll {
Poll::Ready(Some(event)) => {
self.num_events_this_tick += 1;
inner.current = Some(StreamingTraceEvent {
self.current = Some(StreamingTraceEvent {
event,
next: EventStreamState::Header,
});
Expand Down
Loading