Skip to content

Commit

Permalink
Fix missing read-resize of rate_limiter
Browse files Browse the repository at this point in the history
Signed-off-by: Bob Weinand <[email protected]>
  • Loading branch information
bwoebi committed Oct 1, 2024
1 parent a70f928 commit 9667e4a
Showing 1 changed file with 30 additions and 2 deletions.
32 changes: 30 additions & 2 deletions ipc/src/rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::platform::{FileBackedHandle, MappedMem, NamedShmHandle};
use crate::platform::{FileBackedHandle, MappedMem, MemoryHandle, NamedShmHandle};
use ddcommon::rate_limiter::{Limiter, LocalLimiter};
use std::cell::UnsafeCell;
use std::ffi::CString;
use std::ffi::{c_char, CStr, CString};
use std::fmt::{Debug, Formatter};
use std::io;
use std::marker::PhantomData;
Expand All @@ -23,13 +23,15 @@ struct ShmLimiterData<'a, Inner> {

pub struct ShmLimiterMemory<Inner = ()> {
mem: Arc<RwLock<MappedMem<NamedShmHandle>>>,
last_size: AtomicU32,
_phantom: PhantomData<Inner>,
}

impl<Inner> Clone for ShmLimiterMemory<Inner> {
fn clone(&self) -> Self {
ShmLimiterMemory {
mem: self.mem.clone(),
last_size: AtomicU32::new(self.last_size.load(Ordering::Relaxed)),
_phantom: Default::default(),
}
}
Expand All @@ -55,6 +57,7 @@ impl<Inner> ShmLimiterMemory<Inner> {

fn new(handle: MappedMem<NamedShmHandle>) -> Self {
Self {
last_size: AtomicU32::new(handle.get_size() as u32),
mem: Arc::new(RwLock::new(handle)),
_phantom: Default::default(),
}
Expand Down Expand Up @@ -115,12 +118,36 @@ impl<Inner> ShmLimiterMemory<Inner> {
};
reference
}

fn ensure_index(&self, idx: u32) -> Option<()> {
let end = idx + std::mem::size_of::<ShmLimiterData<Inner>>() as u32;
if end <= self.last_size.load(Ordering::Relaxed) {
Some(())
} else {
let mut mem = self.mem.write().unwrap();
let cur_size = mem.mem.get_size() as u32;
if cur_size >= end {
self.last_size.store(cur_size, Ordering::Relaxed);
Some(())
} else {
mem.ensure_space(end as usize);
let cur_size = mem.mem.get_size() as u32;
self.last_size.store(cur_size, Ordering::Relaxed);
if cur_size >= end {
Some(())
} else {
None
}
}
}
}

pub fn get(&self, idx: u32) -> Option<ShmLimiter<Inner>> {
assert_eq!(
idx % std::mem::size_of::<ShmLimiterData<Inner>>() as u32,
Self::START_OFFSET
);
self.ensure_index(idx);
let reference = ShmLimiter {
idx,
memory: self.clone(),
Expand Down Expand Up @@ -148,6 +175,7 @@ impl<Inner> ShmLimiterMemory<Inner> {
let mut cur = Self::START_OFFSET;
let mem = self.mem.read().unwrap();
loop {
self.ensure_index(cur);
let data: &ShmLimiterData<Inner> =
unsafe { &*mem.as_slice().as_ptr().add(cur as usize).cast() };
if data.next_free.load(Ordering::Relaxed) == 0 {
Expand Down

0 comments on commit 9667e4a

Please sign in to comment.