Skip to content

Commit

Permalink
[CLN]: remove offset ID atomic from record segment reader
Browse files Browse the repository at this point in the history
  • Loading branch information
codetheweb committed Jan 3, 2025
1 parent be300a3 commit 4635765
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 17 deletions.
5 changes: 2 additions & 3 deletions rust/worker/src/execution/operators/limit.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{cmp::Ordering, num::TryFromIntError, sync::atomic};
use std::{cmp::Ordering, num::TryFromIntError};

use async_trait::async_trait;
use chroma_blockstore::provider::BlockfileProvider;
Expand Down Expand Up @@ -120,8 +120,7 @@ impl<'me> SeekScanner<'me> {

let mut size = self
.record_segment
.get_current_max_offset_id()
.load(atomic::Ordering::Relaxed)
.get_max_offset_id()
.max(self.log_offset_ids.max().unwrap_or(0));
if size == 0 {
return Ok(0);
Expand Down
3 changes: 2 additions & 1 deletion rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ impl CompactOrchestrator {
match RecordSegmentReader::from_segment(&record_segment, &self.blockfile_provider).await
{
Ok(reader) => {
let current_max_offset_id = reader.get_current_max_offset_id();
let current_max_offset_id =
Arc::new(AtomicU32::new(reader.get_max_offset_id()));
current_max_offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(current_max_offset_id)
}
Expand Down
14 changes: 7 additions & 7 deletions rust/worker/src/segment/record_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ pub struct RecordSegmentReader<'me> {
user_id_to_id: BlockfileReader<'me, &'me str, u32>,
id_to_user_id: BlockfileReader<'me, u32, &'me str>,
id_to_data: BlockfileReader<'me, u32, DataRecord<'me>>,
curr_max_offset_id: Arc<AtomicU32>,
max_offset_id: u32,
}

impl Debug for RecordSegmentReader<'_> {
Expand Down Expand Up @@ -708,10 +708,10 @@ impl RecordSegmentReader<'_> {
};
let exising_max_offset_id = match max_offset_id_bf_reader {
Some(reader) => match reader.get("", MAX_OFFSET_ID).await {
Ok(Some(max_offset_id)) => Arc::new(AtomicU32::new(max_offset_id)),
Ok(None) | Err(_) => Arc::new(AtomicU32::new(0)),
Ok(Some(max_offset_id)) => max_offset_id,
Ok(None) | Err(_) => 0,
},
None => Arc::new(AtomicU32::new(0)),
None => 0,
};

let user_id_to_id = match blockfile_provider
Expand Down Expand Up @@ -773,12 +773,12 @@ impl RecordSegmentReader<'_> {
user_id_to_id,
id_to_user_id,
id_to_data,
curr_max_offset_id: existing_max_offset_id,
max_offset_id: existing_max_offset_id,
})
}

pub(crate) fn get_current_max_offset_id(&self) -> Arc<AtomicU32> {
self.curr_max_offset_id.clone()
pub(crate) fn get_max_offset_id(&self) -> u32 {
self.max_offset_id
}

pub(crate) async fn get_offset_id_for_user_id(
Expand Down
10 changes: 4 additions & 6 deletions rust/worker/src/segment/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,14 +501,12 @@ impl<'log_data> Iterator for MaterializeLogsResultIter<'log_data> {
}
}

/// Materializes a chunk of log records.
/// - `record_segment_reader` can be `None` if the record segment is uninitialized.
/// - `next_offset_id` must be provided if the log was partitioned and `materialize_logs()` is called for each partition: if it is not provided, generated offset IDs will conflict between partitions. When it is not provided, it is initialized from the max offset ID in the record segment.
pub async fn materialize_logs(
// Is None when record segment is uninitialized.
record_segment_reader: &Option<RecordSegmentReader<'_>>,
logs: Chunk<LogRecord>,
// Is None for readers. In that case, the materializer reads
// the current maximum from the record segment and uses that
// for materializing. Writers pass this value to the materializer
// because they need to share this across all log partitions.
next_offset_id: Option<Arc<AtomicU32>>,
) -> Result<MaterializeLogsResult, LogMaterializerError> {
// Trace the total_len since len() iterates over the entire chunk
Expand All @@ -520,7 +518,7 @@ pub async fn materialize_logs(
None => {
match record_segment_reader.as_ref() {
Some(reader) => {
let offset_id = reader.get_current_max_offset_id();
let offset_id = Arc::new(AtomicU32::new(reader.get_max_offset_id()));
offset_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
offset_id
}
Expand Down

0 comments on commit 4635765

Please sign in to comment.