Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 109 additions & 23 deletions mm2src/mm2_db/src/indexed_db/drivers/cursor/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ mod multi_key_cursor;
mod single_key_bound_cursor;
mod single_key_cursor;

use crate::indexed_db::indexed_cursor::CursorCondition;
use empty_cursor::IdbEmptyCursor;
use multi_key_bound_cursor::IdbMultiKeyBoundCursor;
use multi_key_cursor::IdbMultiKeyCursor;
use single_key_bound_cursor::IdbSingleKeyBoundCursor;
use single_key_cursor::IdbSingleKeyCursor;

pub type CursorResult<T> = Result<T, MmError<CursorError>>;
type CursorCondition = Box<dyn Fn(Json) -> CursorResult<bool> + Send + 'static>;

#[derive(Debug, Display, EnumFromTrait, PartialEq)]
pub enum CursorError {
Expand Down Expand Up @@ -84,13 +84,28 @@ pub enum CursorBoundValue {
BigUint(BeBigUint),
}

/// Represents criteria for filtering and refining results when using database cursors.
#[derive(Default)]
pub struct CursorFilters {
/// Specifies key-value pairs to filter results by.
pub(crate) only_keys: Vec<(String, Json)>,
/// Specifies range filters for keys..
pub(crate) bound_keys: Vec<(String, CursorBoundValue, CursorBoundValue)>,
/// Indicates whether to sort results in reverse order.
pub(crate) reverse: bool,
}

/// Provides extended filtering options for cursor-based queries.
#[derive(Default)]
pub struct CursorFiltersExt {
Comment thread
shamardy marked this conversation as resolved.
/// An optional filter expression.
pub(crate) where_: Option<CursorCondition>,
/// The maximum number of results to return.
pub(crate) limit: Option<usize>,
/// The number of results to skip before returning.
pub(crate) offset: Option<u32>,
}

impl From<u32> for CursorBoundValue {
fn from(uint: u32) -> Self { CursorBoundValue::Uint(uint) }
}
Expand Down Expand Up @@ -192,6 +207,7 @@ pub trait CursorDriverImpl: Sized {
pub(crate) struct CursorDriver {
/// An actual cursor implementation.
inner: IdbCursorEnum,
filters_ext: CursorFiltersExt,
cursor_request: IdbRequest,
cursor_item_rx: mpsc::Receiver<Result<JsValue, JsValue>>,
/// Whether we got `CursorAction::Stop` at the last iteration or not.
Expand All @@ -202,7 +218,11 @@ pub(crate) struct CursorDriver {
}

impl CursorDriver {
pub(crate) fn init_cursor(db_index: IdbIndex, filters: CursorFilters) -> CursorResult<CursorDriver> {
pub(crate) fn init_cursor(
db_index: IdbIndex,
filters: CursorFilters,
filters_ext: CursorFiltersExt,
) -> CursorResult<CursorDriver> {
let reverse = filters.reverse;
let inner = IdbCursorEnum::new(filters)?;

Expand Down Expand Up @@ -234,6 +254,7 @@ impl CursorDriver {

Ok(CursorDriver {
inner,
filters_ext,
cursor_request,
cursor_item_rx,
stopped: false,
Expand All @@ -242,19 +263,11 @@ impl CursorDriver {
})
}

pub(crate) async fn next(&mut self, where_: Option<CursorCondition>) -> CursorResult<Option<(ItemId, Json)>> {
loop {
if self.stopped {
return Ok(None);
}

match self.process_cursor_item(where_.as_ref()).await? {
Some(result) => return Ok(Some(result)),
None => continue,
}
}
}

/// Continues the cursor according to the provided `CursorAction`.
/// If the action is `CursorAction::Continue`, the cursor advances to the next item.
/// If the action is `CursorAction::ContinueWithValue`, the cursor advances to the specified value.
/// If the action is `CursorAction::Stop`, the cursor is stopped, and subsequent calls to `next`
/// will return `None`.
async fn continue_(&mut self, cursor: &IdbCursorWithValue, cursor_action: &CursorAction) -> CursorResult<()> {
match cursor_action {
CursorAction::Continue => cursor.continue_().map_to_mm(|e| CursorError::AdvanceError {
Expand All @@ -277,7 +290,28 @@ impl CursorDriver {
Ok(())
}

async fn process_cursor_item(&mut self, where_: Option<&CursorCondition>) -> CursorResult<Option<(ItemId, Json)>> {
/// Advances the cursor by the offset specified in the `filters_ext.offset` field.
/// This operation is typically performed once at the beginning of cursor-based iteration.
/// After the offset is applied, the value in `filters_ext.offset` is cleared.
/// An error will be thrown if the cursor is currently being iterated or has iterated past its end.
// https://developer.mozilla.org/en-US/docs/Web/API/IDBCursor/advance
async fn advance_by_offset(&mut self) -> CursorResult<()> {
if let Some(offset) = self.filters_ext.offset.take() {
if let Some(cursor) = self.get_cursor_or_stop().await? {
cursor.advance(offset).map_to_mm(|e| CursorError::AdvanceError {
description: stringify_js_error(&e),
})?;
} else {
self.stopped = true;
}
}

Ok(())
}

/// Helper function to retrieve a cursor or indicate the processing should stop.
/// Handles potential errors related to opening the cursor and receiving events.
async fn get_cursor_or_stop(&mut self) -> CursorResult<Option<IdbCursorWithValue>> {
let event = match self.cursor_item_rx.next().await {
Some(event) => event,
None => {
Expand All @@ -290,14 +324,39 @@ impl CursorDriver {
description: stringify_js_error(&e),
})?;

let cursor = match cursor_from_request(&self.cursor_request)? {
cursor_from_request(&self.cursor_request)
}

/// Continuously processes cursor items until it retrieves a valid result or
/// the cursor is stopped. It returns a `CursorResult` containing either the next item
/// wrapped in `Some`, or `None` if the cursor is stopped.
pub(crate) async fn next(&mut self) -> CursorResult<Option<(ItemId, Json)>> {
// Handle offset on first iteration if there's any.
self.advance_by_offset().await?;

loop {
if self.stopped {
return Ok(None);
}

match self.process_cursor_item().await? {
Some(result) => return Ok(Some(result)),
None => continue,
}
}
}

/// Processes the next item from the cursor, which includes fetching the cursor event,
/// opening the cursor, deserializing the item, and performing actions based on the item and cursor conditions.
/// It returns an `Option` containing the item ID and value if an item is processed successfully, otherwise `None`.
async fn process_cursor_item(&mut self) -> CursorResult<Option<(ItemId, Json)>> {
let cursor = match self.get_cursor_or_stop().await? {
Some(cursor) => cursor,
None => {
self.stopped = true;
return Ok(None);
},
};

let (key, js_value) = match (cursor.key(), cursor.value()) {
(Ok(key), Ok(js_value)) => (key, js_value),
_ => {
Expand All @@ -311,25 +370,52 @@ impl CursorDriver {
let (item_action, cursor_action) = self.inner.on_iteration(key)?;

let (id, val) = item.into_pair();

// Checks if the given `where_` condition, represented by an optional closure (`cursor_condition`),
// is satisfied for the provided `item`. If the condition is met, return the corresponding `(id, val)` or skip to the next item.

if matches!(item_action, CursorItemAction::Include) {
if let Some(cursor_condition) = where_ {
if let Some(cursor_condition) = &self.filters_ext.where_ {
if cursor_condition(val.clone())? {
// stop iteration and return value.
self.stopped = true;
// Update limit (if applicable) and return
if self.filters_ext.limit.is_some() {
self.update_limit_and_continue(&cursor, &cursor_action).await?;
} else {
self.stopped = true;
};
return Ok(Some((id, val)));
Comment thread
shamardy marked this conversation as resolved.
}
} else {
self.continue_(&cursor, &cursor_action).await?;
self.update_limit_and_continue(&cursor, &cursor_action).await?;
return Ok(Some((id, val)));
};
}

self.continue_(&cursor, &cursor_action).await?;
Ok(None)
}

/// Checks the current limit set for the cursor. If the limit is greater than 1,
/// it decrements the limit by 1. If the limit becomes 1 or less, it sets the `stopped` flag
/// to true, indicating that the cursor should stop.
async fn update_limit_and_continue(
&mut self,
cursor: &IdbCursorWithValue,
cursor_action: &CursorAction,
) -> CursorResult<()> {
if let Some(limit) = self.filters_ext.limit {
// Early return if limit is reached
if limit <= 1 {
self.stopped = true;
return Ok(());
}

// Decrement limit and continue
self.filters_ext.limit = Some(limit - 1);
return self.continue_(cursor, cursor_action).await;
};

self.continue_(cursor, cursor_action).await
}
}

pub(crate) enum IdbCursorEnum {
Expand Down
Loading