Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
a211291
add arrangement backfill executor
kwannoel Jun 9, 2023
72a50c6
iter over pk bounds of state table
kwannoel Jun 9, 2023
6e573c2
add ordered iter stub
kwannoel Jun 13, 2023
8c6dc84
add reference to schema
kwannoel Jun 13, 2023
d6bc636
duplicate collect_data_chunk for now
kwannoel Jun 13, 2023
f9c688d
enable collect_data_chunk
kwannoel Jun 13, 2023
022710f
add back instrumentation
kwannoel Jun 13, 2023
2f37d1b
checkpoint the work on merge_sort
kwannoel Jun 13, 2023
6f8a97e
make compile
kwannoel Jun 14, 2023
f55d375
tmp commit
kwannoel Jun 14, 2023
098bedf
hack
kwannoel Jun 14, 2023
2b827c5
ok works lol
kwannoel Jun 14, 2023
c9ed209
finally last part of merge sort
kwannoel Jun 14, 2023
57f1edd
make it compile with arrangement backfill too
kwannoel Jun 15, 2023
7ea8503
refactor merge_sort to separate module
kwannoel Jun 15, 2023
b7b5173
unified collect_chunk somewhat
kwannoel Jun 15, 2023
b540eb7
make the diff better
kwannoel Jun 15, 2023
7fdcb2f
remove outdated comment
kwannoel Jun 15, 2023
faf7c50
fix outdated comment
kwannoel Jun 15, 2023
cbb2127
risedev cf
kwannoel Jun 15, 2023
120bb70
some more refactoring + run on ci to make sure nothing broke
kwannoel Jun 15, 2023
5b3e034
remove outdated code
kwannoel Jun 16, 2023
bd2cf4a
borrow chunk when flushing to downstream
kwannoel Jun 16, 2023
16ece5b
finish implementing buffered chunk logic
kwannoel Jun 16, 2023
1f52547
fix pk_in_output_indices
kwannoel Jun 16, 2023
17cad18
minor
kwannoel Jun 19, 2023
6508378
doc
kwannoel Jun 19, 2023
3ff06bc
do not drain upstream_chunk_buffer, it needs to be flushed to state_t…
kwannoel Jun 19, 2023
3cd524d
add more docs
kwannoel Jun 19, 2023
e49f1f3
remove unused args
kwannoel Jun 19, 2023
1a4a25c
fix logic
kwannoel Jun 19, 2023
dd5f7c2
add notes on further optimization
kwannoel Jun 19, 2023
87a01de
commit to state table too
kwannoel Jun 19, 2023
633e184
fix
kwannoel Jun 19, 2023
c17c557
Merge branch 'main' into kwannoel/arrangement-backfill
kwannoel Jun 19, 2023
a82bcbd
remove unrelated changes
kwannoel Jun 20, 2023
8eec53a
remove unnecessary backwards compat code for arrangement backfill
kwannoel Jun 20, 2023
372ed2a
fix double for-loop
kwannoel Jun 20, 2023
02a7199
add merge_sort to iterator
kwannoel Jun 20, 2023
1392d4b
merge merge_sort implementations
kwannoel Jun 20, 2023
d2b7ed0
refactor backfill -> no_shuffle_backfill
kwannoel Jun 20, 2023
0166d08
use crate imports
kwannoel Jun 20, 2023
bbfc060
interim
kwannoel Jun 20, 2023
9d170a0
unify utils: part 1
kwannoel Jun 20, 2023
73fb08a
unify utils: part 2
kwannoel Jun 20, 2023
ab1c71e
refactor snapshot read internals
kwannoel Jun 21, 2023
de00edb
finish persist state
kwannoel Jun 21, 2023
402aa38
refactor snapshot read
kwannoel Jun 21, 2023
6eb6e21
fix replicated state store init
kwannoel Jun 21, 2023
3d32ee0
change instances of state table
kwannoel Jun 21, 2023
600dae9
add fixme
kwannoel Jun 21, 2023
b7b065b
fix
kwannoel Jun 21, 2023
d4be00d
Merge branch 'main' into kwannoel/arrangement-backfill
kwannoel Jun 21, 2023
e108a94
Merge branch 'main' into kwannoel/arrangement-backfill
kwannoel Jun 26, 2023
30e9eb1
use match for range_bounds
kwannoel Jun 26, 2023
19c8069
separate MergeSort to separate module
kwannoel Jun 27, 2023
f40f109
cmp datum iter
kwannoel Jun 27, 2023
4c7e6c0
debug_assert data_chunk cardinality
kwannoel Jun 27, 2023
c15244d
rm compact + risedev cf
kwannoel Jun 27, 2023
4321e7e
docs + add current_pos map
kwannoel Jun 27, 2023
f8de0c0
just use a vec
kwannoel Jun 27, 2023
424a7ca
add mark chunk per vnode
kwannoel Jun 27, 2023
565ff49
add docs + FIXME
kwannoel Jun 27, 2023
16cf7c6
add snapshot_read_per_vnode_skeleton
kwannoel Jul 3, 2023
056a1ed
finish implement snapshot read per vnode
kwannoel Jul 3, 2023
4b95e43
docs
kwannoel Jul 3, 2023
c8936cc
replace all snapshot_read with snapshot_read_per_vnode
kwannoel Jul 3, 2023
042ce2d
refactor barrier processing
kwannoel Jul 3, 2023
dbd4491
fix some warnings
kwannoel Jul 3, 2023
39b7cbb
implement compute vnode
kwannoel Jul 3, 2023
352d8dd
risedev cf
kwannoel Jul 4, 2023
687b719
clean
kwannoel Jul 4, 2023
75cd4fa
clean
kwannoel Jul 4, 2023
317c7c3
enforce barrier should be present
kwannoel Jul 4, 2023
6d6fbfd
handle stats + TODOs
kwannoel Jul 4, 2023
60b6a07
docs
kwannoel Jul 4, 2023
ba3a7e6
remove TODO
kwannoel Jul 4, 2023
694b11e
add function to fetch progress per vnode
kwannoel Jul 4, 2023
d5a4dd3
start refactoring state persistence to be per vnode
kwannoel Jul 4, 2023
bc39753
add update_pos_per_vnode interface + interleave by chunk in snapshot …
kwannoel Jul 4, 2023
b697a5c
finish refactor to produce chunk + vnode
kwannoel Jul 4, 2023
d522787
workaround current_pos_map reference, by cloning it
kwannoel Jul 4, 2023
da8fffb
add FIXME
kwannoel Jul 4, 2023
5e4a5c2
update persist_data per vnode
kwannoel Jul 4, 2023
3558a0d
naming + init backfill state
kwannoel Jul 4, 2023
12d7d93
refactor to snapshot_read with backfill_state
kwannoel Jul 4, 2023
df4942a
update progress to backfill_state
kwannoel Jul 4, 2023
55d26de
update mark_chunk_ref_by_vnode to use backfill_state
kwannoel Jul 4, 2023
56d853d
interim commit: persist_state_per_vnode
kwannoel Jul 4, 2023
259c278
use persist_state_per_vnode
kwannoel Jul 4, 2023
d1ff383
clean
kwannoel Jul 4, 2023
c3edb09
minor
kwannoel Jul 4, 2023
9cc6ad2
use try_join_all
kwannoel Jul 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{Distribution, TableIter};
use risingwave_storage::table::{collect_data_chunk, get_second, Distribution};
use risingwave_storage::{dispatch_state_store, StateStore};

use crate::executor::{
Expand Down Expand Up @@ -416,14 +416,14 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
ordered,
PrefetchOptions::new_for_exhaust_iter(),
)
.await?;
.await?
.map(get_second);

pin_mut!(iter);
loop {
let timer = histogram.as_ref().map(|histogram| histogram.start_timer());

let chunk = iter
.collect_data_chunk(table.schema(), Some(chunk_size))
let chunk = collect_data_chunk(&mut iter, table.schema(), Some(chunk_size))
.await
.map_err(RwError::from)?;

Expand Down
1 change: 0 additions & 1 deletion src/storage/src/table/batch_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod iter_utils;
pub mod storage_table;
4 changes: 2 additions & 2 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ use risingwave_hummock_sdk::key::{end_bound_of_prefix, next_key, prefixed_range}
use risingwave_hummock_sdk::HummockReadEpoch;
use tracing::trace;

use super::iter_utils;
use crate::error::{StorageError, StorageResult};
use crate::hummock::CachePolicy;
use crate::row_serde::row_serde_util::{
parse_raw_key_to_vnode_and_key, serialize_pk, serialize_pk_with_vnode,
};
use crate::row_serde::{find_columns_by_ids, ColumnMapping};
use crate::store::{PrefetchOptions, ReadOptions};
use crate::table::merge_sort::merge_sort;
use crate::table::{compute_vnode, Distribution, TableIter, DEFAULT_VNODE};
use crate::StateStore;

Expand Down Expand Up @@ -511,7 +511,7 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
// Concat all iterators if not to preserve order.
_ if !ordered => futures::stream::iter(iterators).flatten(),
// Merge all iterators if to preserve order.
_ => iter_utils::merge_sort(iterators.into_iter().map(Box::pin).collect()),
_ => merge_sort(iterators.into_iter().map(Box::pin).collect()),
};

Ok(iter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,64 +14,60 @@

use std::collections::binary_heap::PeekMut;
use std::collections::BinaryHeap;
use std::error::Error;

use futures::StreamExt;
use futures::{Stream, StreamExt};
use futures_async_stream::try_stream;
use risingwave_common::row::OwnedRow;

use super::storage_table::PkAndRowStream;
use crate::error::StorageError;
pub trait MergeSortKey = Eq + PartialEq + Ord + PartialOrd;

/// We use a binary heap to merge the results of the different streams in order.
/// This is the node type of the heap.
struct Node<S: PkAndRowStream> {
struct Node<K: MergeSortKey, S> {
stream: S,

/// The next item polled from `stream` previously. Since the `eq` and `cmp` must be synchronous
/// functions, we need to implement peeking manually.
peeked: (Vec<u8>, OwnedRow),
peeked: (K, OwnedRow),
}

impl<S: PkAndRowStream> PartialEq for Node<S> {
impl<K: MergeSortKey, S> PartialEq for Node<K, S> {
fn eq(&self, other: &Self) -> bool {
match self.peeked.0 == other.peeked.0 {
true => unreachable!("primary key from different iters should be unique"),
false => false,
}
}
}
impl<S: PkAndRowStream> Eq for Node<S> {}
impl<K: MergeSortKey, S> Eq for Node<K, S> {}

impl<S: PkAndRowStream> PartialOrd for Node<S> {
impl<K: MergeSortKey, S> PartialOrd for Node<K, S> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<S: PkAndRowStream> Ord for Node<S> {

impl<K: MergeSortKey, S> Ord for Node<K, S> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// The heap is a max heap, so we need to reverse the order.
self.peeked.0.cmp(&other.peeked.0).reverse()
}
}

/// Merge multiple streams of primary key and rows into a single stream, sorted by primary key.
/// We should ensure that the primary key from different streams are unique.
#[try_stream(ok = (Vec<u8>, OwnedRow), error = StorageError)]
pub(super) async fn merge_sort<S>(streams: Vec<S>)
#[try_stream(ok=(K, OwnedRow), error=E)]
pub async fn merge_sort<'a, K, E, R>(streams: Vec<R>)
where
S: PkAndRowStream + Unpin,
K: MergeSortKey + 'a,
E: Error + 'a,
R: Stream<Item = Result<(K, OwnedRow), E>> + 'a + Unpin,
{
let mut heap = BinaryHeap::with_capacity(streams.len());
let mut heap = BinaryHeap::new();
for mut stream in streams {
if let Some(peeked) = stream.next().await.transpose()? {
heap.push(Node { stream, peeked });
}
}

while let Some(mut node) = heap.peek_mut() {
// Note: If the `next` returns `Err`, we'll fail to yield the previous item.
// This is acceptable since we're not going to handle errors from cell-based table
// iteration, so where to fail does not matter. Or we need an `Option` for this.
yield match node.stream.next().await.transpose()? {
// There still remains data in the stream, take and update the peeked value.
Some(new_peeked) => std::mem::replace(&mut node.peeked, new_peeked),
Expand Down
69 changes: 40 additions & 29 deletions src/storage/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
// limitations under the License.

pub mod batch_table;
pub mod merge_sort;

use std::sync::{Arc, LazyLock};

use futures::{Stream, StreamExt};
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
Expand Down Expand Up @@ -81,43 +83,52 @@ impl Distribution {
#[async_trait::async_trait]
pub trait TableIter: Send {
async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>>;
}

async fn collect_data_chunk(
&mut self,
schema: &Schema,
chunk_size: Option<usize>,
) -> StorageResult<Option<DataChunk>> {
let mut builders = schema.create_array_builders(chunk_size.unwrap_or(0));

let mut row_count = 0;
for _ in 0..chunk_size.unwrap_or(usize::MAX) {
match self.next_row().await? {
Some(row) => {
for (datum, builder) in row.iter().zip_eq_fast(builders.iter_mut()) {
builder.append(datum);
}
row_count += 1;
/// Collects data chunks from stream of rows.
pub async fn collect_data_chunk<E, S>(
stream: &mut S,
schema: &Schema,
chunk_size: Option<usize>,
) -> Result<Option<DataChunk>, E>
where
S: Stream<Item = Result<OwnedRow, E>> + Unpin,
{
let mut builders = schema.create_array_builders(chunk_size.unwrap_or(0));

let mut row_count = 0;
for _ in 0..chunk_size.unwrap_or(usize::MAX) {
match stream.next().await.transpose()? {
Some(row) => {
for (datum, builder) in row.iter().zip_eq_fast(builders.iter_mut()) {
builder.append(datum);
}
None => break,
}
None => break,
}

let chunk = {
let columns: Vec<_> = builders
.into_iter()
.map(|builder| builder.finish().into())
.collect();
DataChunk::new(columns, row_count)
};

if chunk.cardinality() == 0 {
Ok(None)
} else {
Ok(Some(chunk))
}
row_count += 1;
}

let chunk = {
let columns: Vec<_> = builders
.into_iter()
.map(|builder| builder.finish().into())
.collect();
DataChunk::new(columns, row_count)
};

if chunk.cardinality() == 0 {
Ok(None)
} else {
Ok(Some(chunk))
}
}

pub fn get_second<T, U, E>(arg: Result<(T, U), E>) -> Result<U, E> {
arg.map(|x| x.1)
}

/// Get vnode value with `indices` on the given `row`.
pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> VirtualNode {
let vnode = if indices.is_empty() {
Expand Down
29 changes: 13 additions & 16 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use risingwave_storage::row_serde::row_serde_util::{
use risingwave_storage::store::{
LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, StateStoreIterItemStream,
};
use risingwave_storage::table::{compute_chunk_vnode, compute_vnode, Distribution};
use risingwave_storage::table::{compute_chunk_vnode, compute_vnode, get_second, Distribution};
use risingwave_storage::StateStore;
use tracing::{trace, Instrument};

Expand All @@ -62,6 +62,7 @@ const STATE_CLEANING_PERIOD_EPOCH: usize = 5;
pub struct StateTableInner<
S,
SD = BasicSerde,
const IS_REPLICATED: bool = false,
W = WatermarkBufferByEpoch<STATE_CLEANING_PERIOD_EPOCH>,
> where
S: StateStore,
Expand Down Expand Up @@ -120,9 +121,10 @@ pub struct StateTableInner<

/// `StateTable` will use `BasicSerde` as default
pub type StateTable<S> = StateTableInner<S, BasicSerde>;
pub type ReplicatedStateTable<S> = StateTableInner<S, BasicSerde, true>;

// initialize
impl<S, SD, W> StateTableInner<S, SD, W>
impl<S, SD, const IS_REPLICATED: bool, W> StateTableInner<S, SD, IS_REPLICATED, W>
where
S: StateStore,
SD: ValueRowSerde,
Expand Down Expand Up @@ -188,13 +190,12 @@ where
};

let table_option = TableOption::build_table_option(table_catalog.get_properties());
let local_state_store = store
.new_local(NewLocalOptions::new(
table_id,
is_consistent_op,
table_option,
))
.await;
let new_local_options = if IS_REPLICATED {
NewLocalOptions::new_replicated(table_id, is_consistent_op, table_option)
} else {
NewLocalOptions::new(table_id, is_consistent_op, table_option)
};
let local_state_store = store.new_local(new_local_options).await;

let pk_data_types = pk_indices
.iter()
Expand Down Expand Up @@ -511,7 +512,7 @@ where
}

// point get
impl<S, SD> StateTableInner<S, SD>
impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
where
S: StateStore,
SD: ValueRowSerde,
Expand Down Expand Up @@ -609,7 +610,7 @@ where
}

// write
impl<S, SD> StateTableInner<S, SD>
impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
where
S: StateStore,
SD: ValueRowSerde,
Expand Down Expand Up @@ -870,12 +871,8 @@ where
}
}

fn get_second<T, U>(arg: StreamExecutorResult<(T, U)>) -> StreamExecutorResult<U> {
arg.map(|x| x.1)
}

// Iterator functions
impl<S, SD, W> StateTableInner<S, SD, W>
impl<S, SD, const IS_REPLICATED: bool, W> StateTableInner<S, SD, IS_REPLICATED, W>
where
S: StateStore,
SD: ValueRowSerde,
Expand Down
Loading