-
Notifications
You must be signed in to change notification settings - Fork 730
fix: invalidate index fragment bitmaps after data replacement and stale merge #5929
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
7358bda
6ba3166
0122b69
1eb9b55
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1302,20 +1302,21 @@ impl BTreeIndex { | |
| ))) | ||
| } | ||
|
|
||
| async fn into_old_data(self) -> Result<Arc<dyn ExecutionPlan>> { | ||
| let stream = self.into_data_stream().await?; | ||
| Ok(Arc::new(OneShotExec::new(stream))) | ||
| } | ||
|
|
||
| async fn combine_old_new( | ||
| self, | ||
| new_data: SendableRecordBatchStream, | ||
| chunk_size: u64, | ||
| valid_old_fragments: Option<RoaringBitmap>, | ||
| ) -> Result<SendableRecordBatchStream> { | ||
| let value_column_index = new_data.schema().index_of(VALUE_COLUMN_NAME)?; | ||
|
|
||
| let new_input = Arc::new(OneShotExec::new(new_data)); | ||
| let old_input = self.into_old_data().await?; | ||
| let old_stream = self.into_data_stream().await?; | ||
| let old_stream = match valid_old_fragments { | ||
| Some(valid_frags) => filter_row_ids_by_fragments(old_stream, valid_frags), | ||
| None => old_stream, | ||
| }; | ||
| let old_input = Arc::new(OneShotExec::new(old_stream)); | ||
| debug_assert_eq!( | ||
| old_input.schema().flattened_fields().len(), | ||
| new_input.schema().flattened_fields().len() | ||
|
|
@@ -1344,6 +1345,30 @@ impl BTreeIndex { | |
| } | ||
| } | ||
|
|
||
| /// Filter a stream of record batches to only include rows whose row address | ||
| /// belongs to a fragment in `valid_fragments`. Row addresses encode the fragment | ||
| /// ID in the upper 32 bits. | ||
| fn filter_row_ids_by_fragments( | ||
| stream: SendableRecordBatchStream, | ||
| valid_fragments: RoaringBitmap, | ||
| ) -> SendableRecordBatchStream { | ||
| let schema = stream.schema(); | ||
| let filtered = stream.map(move |batch_result| { | ||
| let batch = batch_result?; | ||
| let row_ids = batch | ||
| .column(1) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor nit: I've been trying to slowly switch over to using string constants like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can try that. |
||
| .as_any() | ||
| .downcast_ref::<arrow_array::UInt64Array>() | ||
| .expect("expected UInt64Array for row_id column"); | ||
| let mask: arrow_array::BooleanArray = row_ids | ||
| .iter() | ||
| .map(|id| id.map(|id| valid_fragments.contains((id >> 32) as u32))) | ||
| .collect(); | ||
| Ok(arrow_select::filter::filter_record_batch(&batch, &mask)?) | ||
| }); | ||
| Box::pin(RecordBatchStreamAdapter::new(schema, filtered)) | ||
| } | ||
|
|
||
| fn wrap_bound(bound: &Bound<ScalarValue>) -> Bound<OrderableScalarValue> { | ||
| match bound { | ||
| Bound::Unbounded => Bound::Unbounded, | ||
|
|
@@ -1570,11 +1595,12 @@ impl ScalarIndex for BTreeIndex { | |
| &self, | ||
| new_data: SendableRecordBatchStream, | ||
| dest_store: &dyn IndexStore, | ||
| valid_old_fragments: Option<&RoaringBitmap>, | ||
| ) -> Result<CreatedIndex> { | ||
| // Merge the existing index data with the new data and then retrain the index on the merged stream | ||
| let merged_data_source = self | ||
| .clone() | ||
| .combine_old_new(new_data, self.batch_size) | ||
| .combine_old_new(new_data, self.batch_size, valid_old_fragments.cloned()) | ||
| .await?; | ||
| train_btree_index(merged_data_source, dest_store, self.batch_size, None, None).await?; | ||
|
|
||
|
|
@@ -3984,7 +4010,7 @@ mod tests { | |
|
|
||
| // update the ranged index | ||
| ranged_index | ||
| .update(update_data_source, new_store.as_ref()) | ||
| .update(update_data_source, new_store.as_ref(), None) | ||
| .await | ||
| .expect("Error in updating ranged index"); | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this only matters for btree because it is the only index that scans the old portion of the index instead of rescanning the whole column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, bitmap does this too. I need to fix that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed bitmap. Other indices should be done as a follow up, but I'm not aware of a code path where they cause a bug yet. Many of them like ZoneMap and BloomFilter given inexact results.