-
Notifications
You must be signed in to change notification settings - Fork 196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for TimeStreamWrite and TimeStreamQuery #2707
Changes from all commits
233d9d6
fdb0076
347be17
77018c5
be66955
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,287 @@ | ||||||||||
/* | ||||||||||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||||||||||
* SPDX-License-Identifier: Apache-2.0 | ||||||||||
*/ | ||||||||||
|
||||||||||
//! Maintain a cache of discovered endpoints | ||||||||||
|
||||||||||
use aws_smithy_async::rt::sleep::AsyncSleep; | ||||||||||
use aws_smithy_async::time::TimeSource; | ||||||||||
use aws_smithy_client::erase::boxclone::BoxFuture; | ||||||||||
use aws_smithy_http::endpoint::{ResolveEndpoint, ResolveEndpointError}; | ||||||||||
use aws_smithy_types::endpoint::Endpoint; | ||||||||||
use std::fmt::{Debug, Formatter}; | ||||||||||
use std::future::Future; | ||||||||||
use std::sync::{Arc, Mutex}; | ||||||||||
use std::time::{Duration, SystemTime}; | ||||||||||
use tokio::sync::oneshot::error::TryRecvError; | ||||||||||
use tokio::sync::oneshot::{Receiver, Sender}; | ||||||||||
|
||||||||||
/// Endpoint reloader | ||||||||||
#[must_use] | ||||||||||
pub struct ReloadEndpoint { | ||||||||||
loader: Box<dyn Fn() -> BoxFuture<(Endpoint, SystemTime), ResolveEndpointError> + Send + Sync>, | ||||||||||
endpoint: Arc<Mutex<Option<ExpiringEndpoint>>>, | ||||||||||
error: Arc<Mutex<Option<ResolveEndpointError>>>, | ||||||||||
rx: Receiver<()>, | ||||||||||
sleep: Arc<dyn AsyncSleep>, | ||||||||||
time: Arc<dyn TimeSource>, | ||||||||||
} | ||||||||||
|
||||||||||
impl Debug for ReloadEndpoint { | ||||||||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { | ||||||||||
f.debug_struct("ReloadEndpoint").finish() | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
impl ReloadEndpoint { | ||||||||||
/// Reload the endpoint once | ||||||||||
pub async fn reload_once(&self) { | ||||||||||
match (self.loader)().await { | ||||||||||
Ok((endpoint, expiry)) => { | ||||||||||
*self.endpoint.lock().unwrap() = Some(ExpiringEndpoint { endpoint, expiry }) | ||||||||||
} | ||||||||||
Err(err) => *self.error.lock().unwrap() = Some(err), | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
/// An infinite loop task that will reload the endpoint | ||||||||||
/// | ||||||||||
/// This task will terminate when the corresponding [`Client`](crate::Client) is dropped. | ||||||||||
pub async fn reload_task(mut self) { | ||||||||||
loop { | ||||||||||
match self.rx.try_recv() { | ||||||||||
Ok(_) | Err(TryRecvError::Closed) => break, | ||||||||||
_ => {} | ||||||||||
} | ||||||||||
self.reload_increment(self.time.now()).await; | ||||||||||
self.sleep.sleep(Duration::from_secs(60)).await; | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
async fn reload_increment(&self, now: SystemTime) { | ||||||||||
let should_reload = self | ||||||||||
.endpoint | ||||||||||
.lock() | ||||||||||
.unwrap() | ||||||||||
.as_ref() | ||||||||||
.map(|e| e.is_expired(now)) | ||||||||||
.unwrap_or(true); | ||||||||||
if should_reload { | ||||||||||
tracing::debug!("reloading endpoint, previous endpoint was expired"); | ||||||||||
self.reload_once().await; | ||||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
#[derive(Debug, Clone)] | ||||||||||
pub(crate) struct EndpointCache { | ||||||||||
error: Arc<Mutex<Option<ResolveEndpointError>>>, | ||||||||||
endpoint: Arc<Mutex<Option<ExpiringEndpoint>>>, | ||||||||||
// When the sender is dropped, this allows the reload loop to stop | ||||||||||
_drop_guard: Arc<Sender<()>>, | ||||||||||
} | ||||||||||
|
||||||||||
impl<T> ResolveEndpoint<T> for EndpointCache { | ||||||||||
fn resolve_endpoint(&self, _params: &T) -> aws_smithy_http::endpoint::Result { | ||||||||||
self.resolve_endpoint() | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
#[derive(Debug)] | ||||||||||
struct ExpiringEndpoint { | ||||||||||
endpoint: Endpoint, | ||||||||||
expiry: SystemTime, | ||||||||||
} | ||||||||||
|
||||||||||
impl ExpiringEndpoint { | ||||||||||
fn is_expired(&self, now: SystemTime) -> bool { | ||||||||||
tracing::debug!(expiry = ?self.expiry, now = ?now, delta = ?self.expiry.duration_since(now), "checking expiry status of endpoint"); | ||||||||||
match self.expiry.duration_since(now) { | ||||||||||
Err(_) => true, | ||||||||||
Ok(t) => t < Duration::from_secs(120), | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm guessing this value is meant to always be 2x the 1min duration, perhaps linking them together with |
||||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
pub(crate) async fn create_cache<F>( | ||||||||||
loader_fn: impl Fn() -> F + Send + Sync + 'static, | ||||||||||
sleep: Arc<dyn AsyncSleep>, | ||||||||||
time: Arc<dyn TimeSource>, | ||||||||||
) -> Result<(EndpointCache, ReloadEndpoint), ResolveEndpointError> | ||||||||||
where | ||||||||||
F: Future<Output = Result<(Endpoint, SystemTime), ResolveEndpointError>> + Send + 'static, | ||||||||||
{ | ||||||||||
let error_holder = Arc::new(Mutex::new(None)); | ||||||||||
let endpoint_holder = Arc::new(Mutex::new(None)); | ||||||||||
let (tx, rx) = tokio::sync::oneshot::channel(); | ||||||||||
let cache = EndpointCache { | ||||||||||
error: error_holder.clone(), | ||||||||||
endpoint: endpoint_holder.clone(), | ||||||||||
_drop_guard: Arc::new(tx), | ||||||||||
}; | ||||||||||
let reloader = ReloadEndpoint { | ||||||||||
loader: Box::new(move || Box::pin((loader_fn)()) as _), | ||||||||||
endpoint: endpoint_holder, | ||||||||||
error: error_holder, | ||||||||||
rx, | ||||||||||
sleep, | ||||||||||
time, | ||||||||||
}; | ||||||||||
reloader.reload_once().await; | ||||||||||
// if we didn't successfully get an endpoint, bail out so the client knows | ||||||||||
// configuration failed to work | ||||||||||
Comment on lines
+132
to
+133
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
cache.resolve_endpoint()?; | ||||||||||
Ok((cache, reloader)) | ||||||||||
} | ||||||||||
|
||||||||||
impl EndpointCache { | ||||||||||
fn resolve_endpoint(&self) -> aws_smithy_http::endpoint::Result { | ||||||||||
self.endpoint | ||||||||||
.lock() | ||||||||||
.unwrap() | ||||||||||
.as_ref() | ||||||||||
.map(|e| e.endpoint.clone()) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need a
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is plucking out the endpoint in the map (not just cloning) |
||||||||||
.ok_or_else(|| { | ||||||||||
self.error | ||||||||||
.lock() | ||||||||||
.unwrap() | ||||||||||
.take() | ||||||||||
.unwrap_or_else(|| ResolveEndpointError::message("no endpoint loaded")) | ||||||||||
}) | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
#[cfg(test)] | ||||||||||
mod test { | ||||||||||
use crate::endpoint_discovery::create_cache; | ||||||||||
use aws_smithy_async::rt::sleep::TokioSleep; | ||||||||||
use aws_smithy_async::test_util::controlled_time_and_sleep; | ||||||||||
use aws_smithy_async::time::SystemTimeSource; | ||||||||||
use aws_smithy_types::endpoint::Endpoint; | ||||||||||
use std::sync::atomic::{AtomicUsize, Ordering}; | ||||||||||
use std::sync::Arc; | ||||||||||
use std::time::{Duration, SystemTime, UNIX_EPOCH}; | ||||||||||
use tokio::time::timeout; | ||||||||||
|
||||||||||
fn check_send_v<T: Send>(t: T) -> T { | ||||||||||
t | ||||||||||
} | ||||||||||
|
||||||||||
#[tokio::test] | ||||||||||
#[allow(unused_must_use)] | ||||||||||
async fn check_traits() { | ||||||||||
let (cache, reloader) = create_cache( | ||||||||||
|| async { | ||||||||||
Ok(( | ||||||||||
Endpoint::builder().url("http://foo.com").build(), | ||||||||||
SystemTime::now(), | ||||||||||
)) | ||||||||||
}, | ||||||||||
Arc::new(TokioSleep::new()), | ||||||||||
Arc::new(SystemTimeSource::new()), | ||||||||||
) | ||||||||||
.await | ||||||||||
.unwrap(); | ||||||||||
check_send_v(reloader.reload_task()); | ||||||||||
check_send_v(cache); | ||||||||||
} | ||||||||||
|
||||||||||
#[tokio::test] | ||||||||||
async fn erroring_endpoint_always_reloaded() { | ||||||||||
let expiry = UNIX_EPOCH + Duration::from_secs(123456789); | ||||||||||
let ct = Arc::new(AtomicUsize::new(0)); | ||||||||||
let (cache, reloader) = create_cache( | ||||||||||
move || { | ||||||||||
let shared_ct = ct.clone(); | ||||||||||
shared_ct.fetch_add(1, Ordering::AcqRel); | ||||||||||
async move { | ||||||||||
Ok(( | ||||||||||
Endpoint::builder() | ||||||||||
.url(format!("http://foo.com/{shared_ct:?}")) | ||||||||||
.build(), | ||||||||||
expiry, | ||||||||||
)) | ||||||||||
} | ||||||||||
}, | ||||||||||
Arc::new(TokioSleep::new()), | ||||||||||
Arc::new(SystemTimeSource::new()), | ||||||||||
) | ||||||||||
.await | ||||||||||
.expect("returns an endpoint"); | ||||||||||
assert_eq!( | ||||||||||
cache.resolve_endpoint().expect("ok").url(), | ||||||||||
"http://foo.com/1" | ||||||||||
); | ||||||||||
// 120 second buffer | ||||||||||
reloader | ||||||||||
.reload_increment(expiry - Duration::from_secs(240)) | ||||||||||
.await; | ||||||||||
assert_eq!( | ||||||||||
cache.resolve_endpoint().expect("ok").url(), | ||||||||||
"http://foo.com/1" | ||||||||||
); | ||||||||||
|
||||||||||
reloader.reload_increment(expiry).await; | ||||||||||
assert_eq!( | ||||||||||
cache.resolve_endpoint().expect("ok").url(), | ||||||||||
"http://foo.com/2" | ||||||||||
); | ||||||||||
} | ||||||||||
|
||||||||||
#[tokio::test] | ||||||||||
async fn test_advance_of_task() { | ||||||||||
let expiry = UNIX_EPOCH + Duration::from_secs(123456789); | ||||||||||
// expires in 8 minutes | ||||||||||
let (time, sleep, mut gate) = controlled_time_and_sleep(expiry - Duration::from_secs(239)); | ||||||||||
let ct = Arc::new(AtomicUsize::new(0)); | ||||||||||
let (cache, reloader) = create_cache( | ||||||||||
move || { | ||||||||||
let shared_ct = ct.clone(); | ||||||||||
shared_ct.fetch_add(1, Ordering::AcqRel); | ||||||||||
async move { | ||||||||||
Ok(( | ||||||||||
Endpoint::builder() | ||||||||||
.url(format!("http://foo.com/{shared_ct:?}")) | ||||||||||
.build(), | ||||||||||
expiry, | ||||||||||
)) | ||||||||||
} | ||||||||||
}, | ||||||||||
Arc::new(sleep.clone()), | ||||||||||
Arc::new(time.clone()), | ||||||||||
) | ||||||||||
.await | ||||||||||
.expect("first load success"); | ||||||||||
let reload_task = tokio::spawn(reloader.reload_task()); | ||||||||||
assert!(!reload_task.is_finished()); | ||||||||||
// expiry occurs after 2 sleeps | ||||||||||
// t = 0 | ||||||||||
assert_eq!( | ||||||||||
gate.expect_sleep().await.duration(), | ||||||||||
Duration::from_secs(60) | ||||||||||
); | ||||||||||
assert_eq!(cache.resolve_endpoint().unwrap().url(), "http://foo.com/1"); | ||||||||||
// t = 60 | ||||||||||
|
||||||||||
let sleep = gate.expect_sleep().await; | ||||||||||
// we're still holding the drop guard, so we haven't expired yet. | ||||||||||
assert_eq!(cache.resolve_endpoint().unwrap().url(), "http://foo.com/1"); | ||||||||||
assert_eq!(sleep.duration(), Duration::from_secs(60)); | ||||||||||
sleep.allow_progress(); | ||||||||||
// t = 120 | ||||||||||
|
||||||||||
let sleep = gate.expect_sleep().await; | ||||||||||
assert_eq!(cache.resolve_endpoint().unwrap().url(), "http://foo.com/2"); | ||||||||||
sleep.allow_progress(); | ||||||||||
|
||||||||||
let sleep = gate.expect_sleep().await; | ||||||||||
drop(cache); | ||||||||||
sleep.allow_progress(); | ||||||||||
|
||||||||||
timeout(Duration::from_secs(1), reload_task) | ||||||||||
.await | ||||||||||
.expect("task finishes successfully") | ||||||||||
.expect("finishes"); | ||||||||||
} | ||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How was the 1min increment decided? A comment would be appreciated.