Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re_query::Caches -> re_query::QueryCache #7915

Merged
merged 4 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/store/re_dataframe/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use re_chunk::{EntityPath, TransportChunk};
use re_chunk_store::{ChunkStore, ColumnDescriptor, QueryExpression};
use re_log_types::EntityPathFilter;
use re_query::Caches;
use re_query::QueryCache;

use crate::QueryHandle;

Expand Down Expand Up @@ -31,7 +31,7 @@ pub type RecordBatch = TransportChunk;
// first, and this is not as straightforward as it seems.
pub struct QueryEngine<'a> {
pub store: &'a ChunkStore,
pub cache: &'a Caches,
pub cache: &'a QueryCache,
}

impl QueryEngine<'_> {
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_dataframe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use self::external::re_log_types::{
EntityPath, EntityPathFilter, ResolvedTimeRange, TimeInt, Timeline,
};
#[doc(no_inline)]
pub use self::external::re_query::Caches as QueryCache;
pub use self::external::re_query::QueryCache;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this won't even affect the only really public API where this was used as we anticipated the change.


pub mod external {
pub use re_chunk;
Expand Down
6 changes: 3 additions & 3 deletions crates/store/re_entity_db/src/entity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub struct EntityDb {
data_store: ChunkStore,

/// Query caches for the data in [`Self::data_store`].
query_caches: re_query::Caches,
query_caches: re_query::QueryCache,

stats: IngestionStatistics,
}
Expand All @@ -77,7 +77,7 @@ impl EntityDb {

pub fn with_store_config(store_id: StoreId, store_config: ChunkStoreConfig) -> Self {
let data_store = ChunkStore::new(store_id.clone(), store_config);
let query_caches = re_query::Caches::new(&data_store);
let query_caches = re_query::QueryCache::new(&data_store);

Self {
data_source: None,
Expand Down Expand Up @@ -117,7 +117,7 @@ impl EntityDb {
}

#[inline]
pub fn query_caches(&self) -> &re_query::Caches {
pub fn query_caches(&self) -> &re_query::QueryCache {
&self.query_caches
}

Expand Down
10 changes: 5 additions & 5 deletions crates/store/re_query/benches/latest_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use re_chunk::{Chunk, RowId};
use re_chunk_store::{ChunkStore, ChunkStoreSubscriber, LatestAtQuery};
use re_log_types::{entity_path, EntityPath, TimeInt, TimeType, Timeline};
use re_query::clamped_zip_1x1;
use re_query::{Caches, LatestAtResults};
use re_query::{LatestAtResults, QueryCache};
use re_types::{
archetypes::Points2D,
components::{Color, Position2D, Text},
Expand Down Expand Up @@ -245,12 +245,12 @@ fn build_strings_chunks(paths: &[EntityPath], num_strings: usize) -> Vec<Arc<Chu
.collect()
}

fn insert_chunks<'a>(msgs: impl Iterator<Item = &'a Arc<Chunk>>) -> (Caches, ChunkStore) {
fn insert_chunks<'a>(msgs: impl Iterator<Item = &'a Arc<Chunk>>) -> (QueryCache, ChunkStore) {
let mut store = ChunkStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
Default::default(),
);
let mut caches = Caches::new(&store);
let mut caches = QueryCache::new(&store);

msgs.for_each(|chunk| {
caches.on_events(&store.insert_chunk(chunk).unwrap());
Expand All @@ -265,7 +265,7 @@ struct SavePoint {
}

fn query_and_visit_points(
caches: &Caches,
caches: &QueryCache,
store: &ChunkStore,
paths: &[EntityPath],
) -> Vec<SavePoint> {
Expand Down Expand Up @@ -303,7 +303,7 @@ struct SaveString {
}

fn query_and_visit_strings(
caches: &Caches,
caches: &QueryCache,
store: &ChunkStore,
paths: &[EntityPath],
) -> Vec<SaveString> {
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_query/examples/latest_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() -> anyhow::Result<()> {
let query = LatestAtQuery::latest(timeline);
eprintln!("query:{query:?}");

let caches = re_query::Caches::new(&store);
let caches = re_query::QueryCache::new(&store);

// First, get the (potentially cached) results for this query.
let results: LatestAtResults = caches.latest_at(
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_query/examples/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn main() -> anyhow::Result<()> {
let query = RangeQuery::new(timeline, ResolvedTimeRange::EVERYTHING);
eprintln!("query:{query:?}");

let caches = re_query::Caches::new(&store);
let caches = re_query::QueryCache::new(&store);

// First, get the (potentially cached) results for this query.
let results: RangeResults = caches.range(
Expand Down
34 changes: 18 additions & 16 deletions crates/store/re_query/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ use crate::{LatestAtCache, RangeCache};

// ---

/// Uniquely identifies cached query results in the [`Caches`].
/// Uniquely identifies cached query results in the [`QueryCache`].
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CacheKey {
pub struct QueryCacheKey {
pub entity_path: EntityPath,
pub timeline: Timeline,
pub component_name: ComponentName,
}

impl re_types_core::SizeBytes for CacheKey {
impl re_types_core::SizeBytes for QueryCacheKey {
#[inline]
fn heap_size_bytes(&self) -> u64 {
let Self {
Expand All @@ -38,7 +38,7 @@ impl re_types_core::SizeBytes for CacheKey {
}
}

impl std::fmt::Debug for CacheKey {
impl std::fmt::Debug for QueryCacheKey {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
Expand All @@ -53,7 +53,7 @@ impl std::fmt::Debug for CacheKey {
}
}

impl CacheKey {
impl QueryCacheKey {
#[inline]
pub fn new(
entity_path: impl Into<EntityPath>,
Expand All @@ -68,8 +68,7 @@ impl CacheKey {
}
}

// TODO(cmc): this needs to be renamed to `QueryCache` as this will slowly become part of the public API now.
pub struct Caches {
pub struct QueryCache {
/// The [`StoreId`] of the associated [`ChunkStore`].
pub(crate) store_id: StoreId,

Expand All @@ -82,13 +81,13 @@ pub struct Caches {
pub(crate) might_require_clearing: RwLock<IntSet<EntityPath>>,

// NOTE: `Arc` so we can cheaply free the top-level lock early when needed.
pub(crate) latest_at_per_cache_key: RwLock<HashMap<CacheKey, Arc<RwLock<LatestAtCache>>>>,
pub(crate) latest_at_per_cache_key: RwLock<HashMap<QueryCacheKey, Arc<RwLock<LatestAtCache>>>>,

// NOTE: `Arc` so we can cheaply free the top-level lock early when needed.
pub(crate) range_per_cache_key: RwLock<HashMap<CacheKey, Arc<RwLock<RangeCache>>>>,
pub(crate) range_per_cache_key: RwLock<HashMap<QueryCacheKey, Arc<RwLock<RangeCache>>>>,
}

impl std::fmt::Debug for Caches {
impl std::fmt::Debug for QueryCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
store_id,
Expand Down Expand Up @@ -147,7 +146,7 @@ impl std::fmt::Debug for Caches {
}
}

impl Caches {
impl QueryCache {
#[inline]
pub fn new(store: &ChunkStore) -> Self {
Self {
Expand All @@ -173,7 +172,7 @@ impl Caches {
}
}

impl ChunkStoreSubscriber for Caches {
impl ChunkStoreSubscriber for QueryCache {
#[inline]
fn name(&self) -> String {
"rerun.store_subscribers.QueryCache".into()
Expand All @@ -195,8 +194,8 @@ impl ChunkStoreSubscriber for Caches {
#[derive(Default, Debug)]
struct CompactedEvents {
static_: HashMap<(EntityPath, ComponentName), BTreeSet<ChunkId>>,
temporal_latest_at: HashMap<CacheKey, TimeInt>,
temporal_range: HashMap<CacheKey, BTreeSet<ChunkId>>,
temporal_latest_at: HashMap<QueryCacheKey, TimeInt>,
temporal_range: HashMap<QueryCacheKey, BTreeSet<ChunkId>>,
}

let mut compacted_events = CompactedEvents::default();
Expand Down Expand Up @@ -244,8 +243,11 @@ impl ChunkStoreSubscriber for Caches {

for (timeline, per_component) in chunk.time_range_per_component() {
for (component_name, time_range) in per_component {
let key =
CacheKey::new(chunk.entity_path().clone(), timeline, component_name);
let key = QueryCacheKey::new(
chunk.entity_path().clone(),
timeline,
component_name,
);

// latest-at
{
Expand Down
10 changes: 5 additions & 5 deletions crates/store/re_query/src/cache_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ use std::collections::BTreeMap;

use re_types_core::SizeBytes as _;

use crate::{CacheKey, Caches};
use crate::{QueryCache, QueryCacheKey};

// ---

/// Stats for all primary caches.
///
/// Fetch them via [`Caches::stats`].
/// Fetch them via [`QueryCache::stats`].
#[derive(Default, Debug, Clone)]
pub struct CachesStats {
pub latest_at: BTreeMap<CacheKey, CacheStats>,
pub range: BTreeMap<CacheKey, CacheStats>,
pub latest_at: BTreeMap<QueryCacheKey, CacheStats>,
pub range: BTreeMap<QueryCacheKey, CacheStats>,
}

impl CachesStats {
Expand Down Expand Up @@ -49,7 +49,7 @@ pub struct CacheStats {
pub total_actual_size_bytes: u64,
}

impl Caches {
impl QueryCache {
/// Computes the stats for all primary caches.
pub fn stats(&self) -> CachesStats {
re_tracing::profile_function!();
Expand Down
14 changes: 7 additions & 7 deletions crates/store/re_query/src/latest_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use re_types_core::{
components::ClearIsRecursive, Component, ComponentName, Loggable as _, SizeBytes,
};

use crate::{CacheKey, Caches, QueryError};
use crate::{QueryCache, QueryCacheKey, QueryError};

// --- Public API ---

Expand All @@ -35,7 +35,7 @@ fn compare_indices(lhs: (TimeInt, RowId), rhs: (TimeInt, RowId)) -> std::cmp::Or
}
}

impl Caches {
impl QueryCache {
/// Queries for the given `component_names` using latest-at semantics.
///
/// See [`LatestAtResults`] for more information about how to handle the results.
Expand Down Expand Up @@ -96,7 +96,7 @@ impl Caches {
continue;
}

let key = CacheKey::new(
let key = QueryCacheKey::new(
clear_entity_path.clone(),
query.timeline(),
ClearIsRecursive::name(),
Expand Down Expand Up @@ -144,7 +144,7 @@ impl Caches {
}

for component_name in component_names {
let key = CacheKey::new(entity_path.clone(), query.timeline(), component_name);
let key = QueryCacheKey::new(entity_path.clone(), query.timeline(), component_name);

let cache = Arc::clone(
self.latest_at_per_cache_key
Expand Down Expand Up @@ -538,10 +538,10 @@ impl LatestAtResults {

// --- Cached implementation ---

/// Caches the results of `LatestAt` queries for a given [`CacheKey`].
/// Caches the results of `LatestAt` queries for a given [`QueryCacheKey`].
pub struct LatestAtCache {
/// For debugging purposes.
pub cache_key: CacheKey,
pub cache_key: QueryCacheKey,

/// Organized by _query_ time.
///
Expand All @@ -562,7 +562,7 @@ pub struct LatestAtCache {

impl LatestAtCache {
#[inline]
pub fn new(cache_key: CacheKey) -> Self {
pub fn new(cache_key: QueryCacheKey) -> Self {
Self {
cache_key,
per_query_time: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod range;
pub mod clamped_zip;
pub mod range_zip;

pub use self::cache::{CacheKey, Caches};
pub use self::cache::{QueryCache, QueryCacheKey};
pub use self::cache_stats::{CacheStats, CachesStats};
pub use self::clamped_zip::*;
pub use self::latest_at::LatestAtResults;
Expand Down
16 changes: 8 additions & 8 deletions crates/store/re_query/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use re_chunk_store::{ChunkStore, RangeQuery, TimeInt};
use re_log_types::{EntityPath, ResolvedTimeRange};
use re_types_core::{ComponentName, DeserializationError, SizeBytes};

use crate::{CacheKey, Caches};
use crate::{QueryCache, QueryCacheKey};

// --- Public API ---

impl Caches {
impl QueryCache {
/// Queries for the given `component_names` using range semantics.
///
/// See [`RangeResults`] for more information about how to handle the results.
Expand All @@ -38,7 +38,7 @@ impl Caches {
});

for component_name in component_names {
let key = CacheKey::new(entity_path.clone(), query.timeline(), component_name);
let key = QueryCacheKey::new(entity_path.clone(), query.timeline(), component_name);

let cache = Arc::clone(
self.range_per_cache_key
Expand Down Expand Up @@ -127,10 +127,10 @@ impl RangeResults {

// --- Cache implementation ---

/// Caches the results of `Range` queries for a given [`CacheKey`].
/// Caches the results of `Range` queries for a given [`QueryCacheKey`].
pub struct RangeCache {
/// For debugging purposes.
pub cache_key: CacheKey,
pub cache_key: QueryCacheKey,

/// All the [`Chunk`]s currently cached.
///
Expand All @@ -148,7 +148,7 @@ pub struct RangeCache {

impl RangeCache {
#[inline]
pub fn new(cache_key: CacheKey) -> Self {
pub fn new(cache_key: QueryCacheKey) -> Self {
Self {
cache_key,
chunks: HashMap::default(),
Expand Down Expand Up @@ -205,7 +205,7 @@ impl std::fmt::Debug for RangeCache {
pub struct RangeCachedChunk {
pub chunk: Chunk,

/// When a `Chunk` gets cached, it is pre-processed according to the current [`CacheKey`],
/// When a `Chunk` gets cached, it is pre-processed according to the current [`QueryCacheKey`],
/// e.g. it is time-sorted on the appropriate timeline.
///
/// In the happy case, pre-processing a `Chunk` is a no-op, and the cached `Chunk` is just a
Expand Down Expand Up @@ -264,7 +264,7 @@ impl RangeCache {
// It's fine to run the query every time -- the index scan itself is not the costly part of a
// range query.
//
// For all relevant chunks that we find, we process them according to the [`CacheKey`], and
// For all relevant chunks that we find, we process them according to the [`QueryCacheKey`], and
// cache them.

let raw_chunks = store.range_relevant_chunks(query, entity_path, component_name);
Expand Down
Loading
Loading