diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index be6fd9758..e8c1fa6e5 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -41,12 +41,12 @@ impl Partition { let mut maybe_start_offset = None; for segment in self.segments.iter() { - if segment.time_indexes.is_none() { + if segment.time_indices.is_none() { continue; } - let time_indexes = segment.time_indexes.as_ref().unwrap(); - if time_indexes.is_empty() { + let time_indices = segment.time_indices.as_ref().unwrap(); + if time_indices.is_empty() { continue; } diff --git a/server/src/streaming/partitions/partition.rs b/server/src/streaming/partitions/partition.rs index 12a3a27b1..769b10669 100644 --- a/server/src/streaming/partitions/partition.rs +++ b/server/src/streaming/partitions/partition.rs @@ -8,6 +8,7 @@ use iggy::consumer::ConsumerKind; use iggy::models::messages::Message; use iggy::utils::timestamp::TimeStamp; use std::collections::HashMap; +use std::fmt; use std::sync::Arc; use tokio::sync::RwLock; @@ -32,6 +33,48 @@ pub struct Partition { pub(crate) storage: Arc, } +impl fmt::Display for Partition { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "stream ID: {}", self.stream_id)?; + write!(f, "topic ID: {}", self.topic_id)?; + write!(f, "partition ID: {}", self.partition_id)?; + write!(f, "path: {}", self.path)?; + write!(f, "current offset: {}", self.current_offset)?; + if let Some(cache) = &self.cache { + write!(f, "cache: {:?}", cache)?; + }; + if let Some(cmt) = &self.cached_memory_tracker { + write!(f, "cached memory tracker: {:?}", cmt)?; + }; + if let Some(mdd) = &self.message_deduplicator { + write!(f, "message deduplicator: {:?}", mdd)?; + }; + write!(f, "unsaved message count: {}", self.unsaved_messages_count)?; + write!( + f, + "should increment offset: {}", + self.should_increment_offset + )?; + write!(f, "created at: {}", self.created_at)?; + if let Some(me) = &self.message_expiry { + write!(f, "message expiry: {}", me)?; + }; + write!( + f, + "consumer offset count: {}", + self.consumer_offsets.blocking_read().len() + )?; + write!( + f, + "consumer group offset count: {}", + self.consumer_group_offsets.blocking_read().len() + )?; + write!(f, "segment count: {}", self.segments.len())?; + write!(f, "config: {}", self.config)?; + write!(f, "storage: {:?}", self.storage) + } +} + #[derive(Debug, PartialEq)] pub struct ConsumerOffset { pub kind: ConsumerKind, diff --git a/server/src/streaming/segments/messages.rs b/server/src/streaming/segments/messages.rs index 48faf6ff6..a614d1e08 100644 --- a/server/src/streaming/segments/messages.rs +++ b/server/src/streaming/segments/messages.rs @@ -112,7 +112,7 @@ impl Segment { return Ok(EMPTY_MESSAGES); } - if let Some(indexes) = &self.indexes { + if let Some(indexes) = &self.indices { let relative_start_offset = start_offset - self.start_offset; let relative_end_offset = end_offset - self.start_offset; let start_index = indexes.get(relative_start_offset as usize); @@ -187,26 +187,26 @@ impl Segment { let unsaved_messages = self.unsaved_messages.get_or_insert_with(Vec::new); unsaved_messages.reserve(len); - if let Some(indexes) = &mut self.indexes { + if let Some(indexes) = &mut self.indices { indexes.reserve(len); } - if let Some(time_indexes) = &mut self.time_indexes { + if let Some(time_indexes) = &mut self.time_indices { time_indexes.reserve(len); } // Not the prettiest code. It's done this way to avoid repeatably // checking if indexes and time_indexes are Some or None. - if self.indexes.is_some() && self.time_indexes.is_some() { + if self.indices.is_some() && self.time_indices.is_some() { for message in messages { let relative_offset = (message.offset - self.start_offset) as u32; - self.indexes.as_mut().unwrap().push(Index { + self.indices.as_mut().unwrap().push(Index { relative_offset, position: self.current_size_bytes, }); - self.time_indexes.as_mut().unwrap().push(TimeIndex { + self.time_indices.as_mut().unwrap().push(TimeIndex { relative_offset, timestamp: message.timestamp, }); @@ -215,11 +215,11 @@ impl Segment { self.current_offset = message.offset; unsaved_messages.push(message.clone()); } - } else if self.indexes.is_some() { + } else if self.indices.is_some() { for message in messages { let relative_offset = (message.offset - self.start_offset) as u32; - self.indexes.as_mut().unwrap().push(Index { + self.indices.as_mut().unwrap().push(Index { relative_offset, position: self.current_size_bytes, }); @@ -228,11 +228,11 @@ impl Segment { self.current_offset = message.offset; unsaved_messages.push(message.clone()); } - } else if self.time_indexes.is_some() { + } else if self.time_indices.is_some() { for message in messages { let relative_offset = (message.offset - self.start_offset) as u32; - self.time_indexes.as_mut().unwrap().push(TimeIndex { + self.time_indices.as_mut().unwrap().push(TimeIndex { relative_offset, timestamp: message.timestamp, }); diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index 43c2f709f..df75d4f82 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -4,6 +4,7 @@ use crate::streaming::segments::time_index::TimeIndex; use crate::streaming::storage::SystemStorage; use iggy::models::messages::Message; use iggy::utils::timestamp::TimeStamp; +use std::fmt; use std::sync::Arc; pub const LOG_EXTENSION: &str = "log"; @@ -27,11 +28,41 @@ pub struct Segment { pub(crate) message_expiry: Option, pub(crate) unsaved_messages: Option>>, pub(crate) config: Arc, - pub(crate) indexes: Option>, - pub(crate) time_indexes: Option>, + pub(crate) indices: Option>, + pub(crate) time_indices: Option>, pub(crate) storage: Arc, } +impl fmt::Display for Segment { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "stream ID: {}", self.stream_id)?; + write!(f, "topic ID: {}", self.topic_id)?; + write!(f, "partition ID: {}", self.partition_id)?; + write!(f, "start offset: {}", self.start_offset)?; + write!(f, "end offset: {}", self.end_offset)?; + write!(f, "current offset: {}", self.current_offset)?; + write!(f, "index path: {}", self.index_path)?; + write!(f, "log path: {}", self.log_path)?; + write!(f, "time index path: {}", self.time_index_path)?; + write!(f, "current size (bytes): {}", self.current_size_bytes)?; + write!(f, "is closed: {}", self.is_closed)?; + if let Some(me) = &self.message_expiry { + write!(f, "message expiry: {}", me)?; + }; + if let Some(usm) = &self.unsaved_messages { + write!(f, "unsaved messages count: {}", usm.len())?; + }; + write!(f, "config: {}", self.config)?; + if let Some(indices) = &self.indices { + write!(f, "indices count: {}", indices.len())?; + }; + if let Some(t_indices) = &self.time_indices { + write!(f, "time indices count: {}", t_indices.len())?; + }; + write!(f, "storage: {:?}", self.storage) + } +} + impl Segment { pub fn create( stream_id: u32, @@ -56,11 +87,11 @@ impl Segment { time_index_path: Self::get_time_index_path(&path), current_size_bytes: 0, message_expiry, - indexes: match config.segment.cache_indexes { + indices: match config.segment.cache_indexes { true => Some(Vec::new()), false => None, }, - time_indexes: match config.segment.cache_time_indexes { + time_indices: match config.segment.cache_time_indexes { true => Some(Vec::new()), false => None, }, @@ -154,8 +185,8 @@ mod tests { assert_eq!(segment.time_index_path, time_index_path); assert_eq!(segment.message_expiry, message_expiry); assert!(segment.unsaved_messages.is_none()); - assert!(segment.indexes.is_some()); - assert!(segment.time_indexes.is_some()); + assert!(segment.indices.is_some()); + assert!(segment.time_indices.is_some()); assert!(!segment.is_closed); assert!(!segment.is_full().await); } @@ -185,7 +216,7 @@ mod tests { None, ); - assert!(segment.indexes.is_none()); + assert!(segment.indices.is_none()); } #[test] @@ -213,6 +244,6 @@ mod tests { None, ); - assert!(segment.time_indexes.is_none()); + assert!(segment.time_indices.is_none()); } } diff --git a/server/src/streaming/segments/storage.rs b/server/src/streaming/segments/storage.rs index 2cde45362..dd1950528 100644 --- a/server/src/streaming/segments/storage.rs +++ b/server/src/streaming/segments/storage.rs @@ -56,10 +56,10 @@ impl Storage for FileSegmentStorage { ); if segment.config.segment.cache_indexes { - segment.indexes = Some(segment.storage.segment.load_all_indexes(segment).await?); + segment.indices = Some(segment.storage.segment.load_all_indexes(segment).await?); info!( "Loaded {} indexes for segment with start offset: {} and partition with ID: {} for topic with ID: {} and stream with ID: {}.", - segment.indexes.as_ref().unwrap().len(), + segment.indices.as_ref().unwrap().len(), segment.start_offset, segment.partition_id, segment.topic_id, @@ -72,12 +72,12 @@ impl Storage for FileSegmentStorage { if !time_indexes.is_empty() { let last_index = time_indexes.last().unwrap(); segment.current_offset = segment.start_offset + last_index.relative_offset as u64; - segment.time_indexes = Some(time_indexes); + segment.time_indices = Some(time_indexes); } info!( "Loaded {} time indexes for segment with start offset: {} and partition with ID: {} for topic with ID: {} and stream with ID: {}.", - segment.time_indexes.as_ref().unwrap().len(), + segment.time_indices.as_ref().unwrap().len(), segment.start_offset, segment.partition_id, segment.topic_id, diff --git a/server/src/streaming/streams/storage.rs b/server/src/streaming/streams/storage.rs index 47964772d..883eafe6c 100644 --- a/server/src/streaming/streams/storage.rs +++ b/server/src/streaming/streams/storage.rs @@ -128,7 +128,7 @@ impl Storage for FileStreamStorage { continue; } - if stream.topics_ids.contains_key(&topic.name) { + if stream.topic_ids.contains_key(&topic.name) { error!( "Topic with name: '{}' already exists for stream with ID: {}.", &topic.name, &stream.stream_id @@ -136,7 +136,7 @@ impl Storage for FileStreamStorage { continue; } - stream.topics_ids.insert(topic.name.clone(), topic.topic_id); + stream.topic_ids.insert(topic.name.clone(), topic.topic_id); stream.topics.insert(topic.topic_id, topic); } diff --git a/server/src/streaming/streams/stream.rs b/server/src/streaming/streams/stream.rs index aecee132d..d0f21f009 100644 --- a/server/src/streaming/streams/stream.rs +++ b/server/src/streaming/streams/stream.rs @@ -3,6 +3,7 @@ use crate::streaming::storage::SystemStorage; use crate::streaming::topics::topic::Topic; use iggy::utils::timestamp::TimeStamp; use std::collections::HashMap; +use std::fmt; use std::sync::Arc; #[derive(Debug)] @@ -13,11 +14,25 @@ pub struct Stream { pub topics_path: String, pub created_at: u64, pub(crate) topics: HashMap, - pub(crate) topics_ids: HashMap, + pub(crate) topic_ids: HashMap, pub(crate) config: Arc, pub(crate) storage: Arc, } +impl fmt::Display for Stream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "stream ID: {}", self.stream_id)?; + write!(f, "name: {}", self.name)?; + write!(f, "path: {}", self.path)?; + write!(f, "topics path: {}", self.topics_path)?; + write!(f, "created at: {}", self.created_at)?; + write!(f, "topics count: {}", self.topics.len())?; + write!(f, "topic ids count: {}", self.topic_ids.len())?; + write!(f, "config: {}", self.config)?; + write!(f, "storage: {:?}", self.storage) + } +} + impl Stream { pub fn empty(id: u32, config: Arc, storage: Arc) -> Self { Stream::create(id, "", config, storage) @@ -39,7 +54,7 @@ impl Stream { topics_path, config, topics: HashMap::new(), - topics_ids: HashMap::new(), + topic_ids: HashMap::new(), storage, created_at: TimeStamp::now().to_micros(), } diff --git a/server/src/streaming/streams/topics.rs b/server/src/streaming/streams/topics.rs index 9f9d98439..fcc17964f 100644 --- a/server/src/streaming/streams/topics.rs +++ b/server/src/streaming/streams/topics.rs @@ -22,7 +22,7 @@ impl Stream { } let name = text::to_lowercase_non_whitespace(name); - if self.topics_ids.contains_key(&name) { + if self.topic_ids.contains_key(&name) { return Err(Error::TopicNameAlreadyExists(name, self.stream_id)); } @@ -40,7 +40,7 @@ impl Stream { "Created topic: {} with ID: {}, partitions: {}", name, id, partitions_count ); - self.topics_ids.insert(name, id); + self.topic_ids.insert(name, id); self.topics.insert(id, topic); Ok(()) @@ -61,7 +61,7 @@ impl Stream { let updated_name = text::to_lowercase_non_whitespace(name); { - if let Some(topic_id_by_name) = self.topics_ids.get(&updated_name) { + if let Some(topic_id_by_name) = self.topic_ids.get(&updated_name) { if *topic_id_by_name != topic_id { return Err(Error::TopicNameAlreadyExists( updated_name.to_string(), @@ -77,8 +77,8 @@ impl Stream { }; { - self.topics_ids.remove(&old_topic_name.clone()); - self.topics_ids.insert(updated_name.clone(), topic_id); + self.topic_ids.remove(&old_topic_name.clone()); + self.topic_ids.insert(updated_name.clone(), topic_id); let topic = self.get_topic_mut(id)?; topic.name = updated_name; topic.message_expiry = message_expiry; @@ -125,7 +125,7 @@ impl Stream { } fn get_topic_by_name(&self, name: &str) -> Result<&Topic, Error> { - let topic_id = self.topics_ids.get(name); + let topic_id = self.topic_ids.get(name); if topic_id.is_none() { return Err(Error::TopicNameNotFound(name.to_string(), self.stream_id)); } @@ -143,7 +143,7 @@ impl Stream { } fn get_topic_by_name_mut(&mut self, name: &str) -> Result<&mut Topic, Error> { - let topic_id = self.topics_ids.get(name); + let topic_id = self.topic_ids.get(name); if topic_id.is_none() { return Err(Error::TopicNameNotFound(name.to_string(), self.stream_id)); } @@ -164,7 +164,7 @@ impl Stream { } let topic = self.topics.remove(&topic_id).unwrap(); - self.topics_ids.remove(&topic_name); + self.topic_ids.remove(&topic_name); Ok(topic) }