Skip to content

Commit

Permalink
implement Display for Partition, Segment and Stream
Browse files Browse the repository at this point in the history
Signed-off-by: Berend Sliedrecht <[email protected]>
  • Loading branch information
berendsliedrecht committed Jan 7, 2024
1 parent bea57fa commit cb4de4f
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 37 deletions.
6 changes: 3 additions & 3 deletions server/src/streaming/partitions/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ impl Partition {

let mut maybe_start_offset = None;
for segment in self.segments.iter() {
if segment.time_indexes.is_none() {
if segment.time_indices.is_none() {
continue;
}

let time_indexes = segment.time_indexes.as_ref().unwrap();
if time_indexes.is_empty() {
let time_indices = segment.time_indices.as_ref().unwrap();
if time_indices.is_empty() {
continue;
}

Expand Down
43 changes: 43 additions & 0 deletions server/src/streaming/partitions/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use iggy::consumer::ConsumerKind;
use iggy::models::messages::Message;
use iggy::utils::timestamp::TimeStamp;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use tokio::sync::RwLock;

Expand All @@ -32,6 +33,48 @@ pub struct Partition {
pub(crate) storage: Arc<SystemStorage>,
}

impl fmt::Display for Partition {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "stream ID: {}", self.stream_id)?;
write!(f, "topic ID: {}", self.topic_id)?;
write!(f, "partition ID: {}", self.partition_id)?;
write!(f, "path: {}", self.path)?;
write!(f, "current offset: {}", self.current_offset)?;
if let Some(cache) = &self.cache {
write!(f, "cache: {:?}", cache)?;
};
if let Some(cmt) = &self.cached_memory_tracker {
write!(f, "cached memory tracker: {:?}", cmt)?;
};
if let Some(mdd) = &self.message_deduplicator {
write!(f, "message deduplicator: {:?}", mdd)?;
};
write!(f, "unsaved message count: {}", self.unsaved_messages_count)?;
write!(
f,
"should increment offset: {}",
self.should_increment_offset
)?;
write!(f, "created at: {}", self.created_at)?;
if let Some(me) = &self.message_expiry {
write!(f, "message expiry: {}", me)?;
};
write!(
f,
"consumer offset count: {}",
self.consumer_offsets.blocking_read().len()
)?;
write!(
f,
"consumer group offset count: {}",
self.consumer_group_offsets.blocking_read().len()
)?;
write!(f, "segment count: {}", self.segments.len())?;
write!(f, "config: {}", self.config)?;
write!(f, "storage: {:?}", self.storage)
}
}

#[derive(Debug, PartialEq)]
pub struct ConsumerOffset {
pub kind: ConsumerKind,
Expand Down
20 changes: 10 additions & 10 deletions server/src/streaming/segments/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Segment {
return Ok(EMPTY_MESSAGES);
}

if let Some(indexes) = &self.indexes {
if let Some(indexes) = &self.indices {
let relative_start_offset = start_offset - self.start_offset;
let relative_end_offset = end_offset - self.start_offset;
let start_index = indexes.get(relative_start_offset as usize);
Expand Down Expand Up @@ -187,26 +187,26 @@ impl Segment {
let unsaved_messages = self.unsaved_messages.get_or_insert_with(Vec::new);
unsaved_messages.reserve(len);

if let Some(indexes) = &mut self.indexes {
if let Some(indexes) = &mut self.indices {
indexes.reserve(len);
}

if let Some(time_indexes) = &mut self.time_indexes {
if let Some(time_indexes) = &mut self.time_indices {
time_indexes.reserve(len);
}

// Not the prettiest code. It's done this way to avoid repeatably
// checking if indexes and time_indexes are Some or None.
if self.indexes.is_some() && self.time_indexes.is_some() {
if self.indices.is_some() && self.time_indices.is_some() {
for message in messages {
let relative_offset = (message.offset - self.start_offset) as u32;

self.indexes.as_mut().unwrap().push(Index {
self.indices.as_mut().unwrap().push(Index {
relative_offset,
position: self.current_size_bytes,
});

self.time_indexes.as_mut().unwrap().push(TimeIndex {
self.time_indices.as_mut().unwrap().push(TimeIndex {
relative_offset,
timestamp: message.timestamp,
});
Expand All @@ -215,11 +215,11 @@ impl Segment {
self.current_offset = message.offset;
unsaved_messages.push(message.clone());
}
} else if self.indexes.is_some() {
} else if self.indices.is_some() {
for message in messages {
let relative_offset = (message.offset - self.start_offset) as u32;

self.indexes.as_mut().unwrap().push(Index {
self.indices.as_mut().unwrap().push(Index {
relative_offset,
position: self.current_size_bytes,
});
Expand All @@ -228,11 +228,11 @@ impl Segment {
self.current_offset = message.offset;
unsaved_messages.push(message.clone());
}
} else if self.time_indexes.is_some() {
} else if self.time_indices.is_some() {
for message in messages {
let relative_offset = (message.offset - self.start_offset) as u32;

self.time_indexes.as_mut().unwrap().push(TimeIndex {
self.time_indices.as_mut().unwrap().push(TimeIndex {
relative_offset,
timestamp: message.timestamp,
});
Expand Down
47 changes: 39 additions & 8 deletions server/src/streaming/segments/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::streaming::segments::time_index::TimeIndex;
use crate::streaming::storage::SystemStorage;
use iggy::models::messages::Message;
use iggy::utils::timestamp::TimeStamp;
use std::fmt;
use std::sync::Arc;

pub const LOG_EXTENSION: &str = "log";
Expand All @@ -27,11 +28,41 @@ pub struct Segment {
pub(crate) message_expiry: Option<u32>,
pub(crate) unsaved_messages: Option<Vec<Arc<Message>>>,
pub(crate) config: Arc<SystemConfig>,
pub(crate) indexes: Option<Vec<Index>>,
pub(crate) time_indexes: Option<Vec<TimeIndex>>,
pub(crate) indices: Option<Vec<Index>>,
pub(crate) time_indices: Option<Vec<TimeIndex>>,
pub(crate) storage: Arc<SystemStorage>,
}

impl fmt::Display for Segment {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "stream ID: {}", self.stream_id)?;
write!(f, "topic ID: {}", self.topic_id)?;
write!(f, "partition ID: {}", self.partition_id)?;
write!(f, "start offset: {}", self.start_offset)?;
write!(f, "end offset: {}", self.end_offset)?;
write!(f, "current offset: {}", self.current_offset)?;
write!(f, "index path: {}", self.index_path)?;
write!(f, "log path: {}", self.log_path)?;
write!(f, "time index path: {}", self.time_index_path)?;
write!(f, "current size (bytes): {}", self.current_size_bytes)?;
write!(f, "is closed: {}", self.is_closed)?;
if let Some(me) = &self.message_expiry {
write!(f, "message expiry: {}", me)?;
};
if let Some(usm) = &self.unsaved_messages {
write!(f, "unsaved messages count: {}", usm.len())?;
};
write!(f, "config: {}", self.config)?;
if let Some(indices) = &self.indices {
write!(f, "indices count: {}", indices.len())?;
};
if let Some(t_indices) = &self.time_indices {
write!(f, "time indices count: {}", t_indices.len())?;
};
write!(f, "storage: {:?}", self.storage)
}
}

impl Segment {
pub fn create(
stream_id: u32,
Expand All @@ -56,11 +87,11 @@ impl Segment {
time_index_path: Self::get_time_index_path(&path),
current_size_bytes: 0,
message_expiry,
indexes: match config.segment.cache_indexes {
indices: match config.segment.cache_indexes {
true => Some(Vec::new()),
false => None,
},
time_indexes: match config.segment.cache_time_indexes {
time_indices: match config.segment.cache_time_indexes {
true => Some(Vec::new()),
false => None,
},
Expand Down Expand Up @@ -154,8 +185,8 @@ mod tests {
assert_eq!(segment.time_index_path, time_index_path);
assert_eq!(segment.message_expiry, message_expiry);
assert!(segment.unsaved_messages.is_none());
assert!(segment.indexes.is_some());
assert!(segment.time_indexes.is_some());
assert!(segment.indices.is_some());
assert!(segment.time_indices.is_some());
assert!(!segment.is_closed);
assert!(!segment.is_full().await);
}
Expand Down Expand Up @@ -185,7 +216,7 @@ mod tests {
None,
);

assert!(segment.indexes.is_none());
assert!(segment.indices.is_none());
}

#[test]
Expand Down Expand Up @@ -213,6 +244,6 @@ mod tests {
None,
);

assert!(segment.time_indexes.is_none());
assert!(segment.time_indices.is_none());
}
}
8 changes: 4 additions & 4 deletions server/src/streaming/segments/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ impl Storage<Segment> for FileSegmentStorage {
);

if segment.config.segment.cache_indexes {
segment.indexes = Some(segment.storage.segment.load_all_indexes(segment).await?);
segment.indices = Some(segment.storage.segment.load_all_indexes(segment).await?);
info!(
"Loaded {} indexes for segment with start offset: {} and partition with ID: {} for topic with ID: {} and stream with ID: {}.",
segment.indexes.as_ref().unwrap().len(),
segment.indices.as_ref().unwrap().len(),
segment.start_offset,
segment.partition_id,
segment.topic_id,
Expand All @@ -72,12 +72,12 @@ impl Storage<Segment> for FileSegmentStorage {
if !time_indexes.is_empty() {
let last_index = time_indexes.last().unwrap();
segment.current_offset = segment.start_offset + last_index.relative_offset as u64;
segment.time_indexes = Some(time_indexes);
segment.time_indices = Some(time_indexes);
}

info!(
"Loaded {} time indexes for segment with start offset: {} and partition with ID: {} for topic with ID: {} and stream with ID: {}.",
segment.time_indexes.as_ref().unwrap().len(),
segment.time_indices.as_ref().unwrap().len(),
segment.start_offset,
segment.partition_id,
segment.topic_id,
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/streams/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ impl Storage<Stream> for FileStreamStorage {
continue;
}

if stream.topics_ids.contains_key(&topic.name) {
if stream.topic_ids.contains_key(&topic.name) {
error!(
"Topic with name: '{}' already exists for stream with ID: {}.",
&topic.name, &stream.stream_id
);
continue;
}

stream.topics_ids.insert(topic.name.clone(), topic.topic_id);
stream.topic_ids.insert(topic.name.clone(), topic.topic_id);
stream.topics.insert(topic.topic_id, topic);
}

Expand Down
19 changes: 17 additions & 2 deletions server/src/streaming/streams/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::streaming::storage::SystemStorage;
use crate::streaming::topics::topic::Topic;
use iggy::utils::timestamp::TimeStamp;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;

#[derive(Debug)]
Expand All @@ -13,11 +14,25 @@ pub struct Stream {
pub topics_path: String,
pub created_at: u64,
pub(crate) topics: HashMap<u32, Topic>,
pub(crate) topics_ids: HashMap<String, u32>,
pub(crate) topic_ids: HashMap<String, u32>,
pub(crate) config: Arc<SystemConfig>,
pub(crate) storage: Arc<SystemStorage>,
}

impl fmt::Display for Stream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "stream ID: {}", self.stream_id)?;
write!(f, "name: {}", self.name)?;
write!(f, "path: {}", self.path)?;
write!(f, "topics path: {}", self.topics_path)?;
write!(f, "created at: {}", self.created_at)?;
write!(f, "topics count: {}", self.topics.len())?;
write!(f, "topic ids count: {}", self.topic_ids.len())?;
write!(f, "config: {}", self.config)?;
write!(f, "storage: {:?}", self.storage)
}
}

impl Stream {
pub fn empty(id: u32, config: Arc<SystemConfig>, storage: Arc<SystemStorage>) -> Self {
Stream::create(id, "", config, storage)
Expand All @@ -39,7 +54,7 @@ impl Stream {
topics_path,
config,
topics: HashMap::new(),
topics_ids: HashMap::new(),
topic_ids: HashMap::new(),
storage,
created_at: TimeStamp::now().to_micros(),
}
Expand Down
16 changes: 8 additions & 8 deletions server/src/streaming/streams/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl Stream {
}

let name = text::to_lowercase_non_whitespace(name);
if self.topics_ids.contains_key(&name) {
if self.topic_ids.contains_key(&name) {
return Err(Error::TopicNameAlreadyExists(name, self.stream_id));
}

Expand All @@ -40,7 +40,7 @@ impl Stream {
"Created topic: {} with ID: {}, partitions: {}",
name, id, partitions_count
);
self.topics_ids.insert(name, id);
self.topic_ids.insert(name, id);
self.topics.insert(id, topic);

Ok(())
Expand All @@ -61,7 +61,7 @@ impl Stream {
let updated_name = text::to_lowercase_non_whitespace(name);

{
if let Some(topic_id_by_name) = self.topics_ids.get(&updated_name) {
if let Some(topic_id_by_name) = self.topic_ids.get(&updated_name) {
if *topic_id_by_name != topic_id {
return Err(Error::TopicNameAlreadyExists(
updated_name.to_string(),
Expand All @@ -77,8 +77,8 @@ impl Stream {
};

{
self.topics_ids.remove(&old_topic_name.clone());
self.topics_ids.insert(updated_name.clone(), topic_id);
self.topic_ids.remove(&old_topic_name.clone());
self.topic_ids.insert(updated_name.clone(), topic_id);
let topic = self.get_topic_mut(id)?;
topic.name = updated_name;
topic.message_expiry = message_expiry;
Expand Down Expand Up @@ -125,7 +125,7 @@ impl Stream {
}

fn get_topic_by_name(&self, name: &str) -> Result<&Topic, Error> {
let topic_id = self.topics_ids.get(name);
let topic_id = self.topic_ids.get(name);
if topic_id.is_none() {
return Err(Error::TopicNameNotFound(name.to_string(), self.stream_id));
}
Expand All @@ -143,7 +143,7 @@ impl Stream {
}

fn get_topic_by_name_mut(&mut self, name: &str) -> Result<&mut Topic, Error> {
let topic_id = self.topics_ids.get(name);
let topic_id = self.topic_ids.get(name);
if topic_id.is_none() {
return Err(Error::TopicNameNotFound(name.to_string(), self.stream_id));
}
Expand All @@ -164,7 +164,7 @@ impl Stream {
}

let topic = self.topics.remove(&topic_id).unwrap();
self.topics_ids.remove(&topic_name);
self.topic_ids.remove(&topic_name);

Ok(topic)
}
Expand Down

0 comments on commit cb4de4f

Please sign in to comment.