Skip to content

Commit

Permalink
feat(flow): flow refill state (Part 1) (#5295)
Browse files Browse the repository at this point in the history
* feat(flow): (Part 1) refill utils

* chore: after rebase fix

* chore: more rebase

* rm refill.rs to reduce pr size

* chore: simpler args

* refactor: per review

* docs: more explain for instant requests

* refactor: per review
  • Loading branch information
discord9 authored Jan 8, 2025
1 parent 369b59c commit 0ee4133
Show file tree
Hide file tree
Showing 17 changed files with 756 additions and 15 deletions.
7 changes: 7 additions & 0 deletions src/common/meta/src/key/table_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ impl TableInfoManager {
))
}

/// Checks if the table exists.
pub async fn exists(&self, table_id: TableId) -> Result<bool> {
let key = TableInfoKey::new(table_id);
let raw_key = key.to_bytes();
self.kv_backend.exists(&raw_key).await
}

pub async fn get(
&self,
table_id: TableId,
Expand Down
2 changes: 1 addition & 1 deletion src/common/recordbatch/src/recordbatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::DfRecordBatch;
#[derive(Clone, Debug, PartialEq)]
pub struct RecordBatch {
pub schema: SchemaRef,
columns: Vec<VectorRef>,
pub columns: Vec<VectorRef>,
df_record_batch: DfRecordBatch,
}

Expand Down
2 changes: 2 additions & 0 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,8 @@ impl FlowWorkerManager {
}
}

node_ctx.add_flow_plan(flow_id, flow_plan.clone());

let _ = comment;
let _ = flow_options;

Expand Down
40 changes: 40 additions & 0 deletions src/flow/src/adapter/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use common_recordbatch::RecordBatch;
use common_telemetry::trace;
use datatypes::prelude::ConcreteDataType;
use session::context::QueryContext;
Expand All @@ -31,6 +32,7 @@ use crate::error::{Error, EvalSnafu, TableNotFoundSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::{Batch, GlobalId};
use crate::metrics::METRIC_FLOW_INPUT_BUF_SIZE;
use crate::plan::TypedPlan;
use crate::repr::{DiffRow, RelationDesc, BATCH_SIZE, BROADCAST_CAP, SEND_BUF_CAP};

/// A context that holds the information of the dataflow
Expand All @@ -40,6 +42,7 @@ pub struct FlownodeContext {
pub source_to_tasks: BTreeMap<TableId, BTreeSet<FlowId>>,
/// mapping from task to sink table, useful for sending data back to the client when a task is done running
pub flow_to_sink: BTreeMap<FlowId, TableName>,
pub flow_plans: BTreeMap<FlowId, TypedPlan>,
pub sink_to_flow: BTreeMap<TableName, FlowId>,
/// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender
///
Expand All @@ -63,6 +66,7 @@ impl FlownodeContext {
Self {
source_to_tasks: Default::default(),
flow_to_sink: Default::default(),
flow_plans: Default::default(),
sink_to_flow: Default::default(),
source_sender: Default::default(),
sink_receiver: Default::default(),
Expand Down Expand Up @@ -179,6 +183,22 @@ impl SourceSender {

Ok(0)
}

/// send record batch
pub async fn send_record_batch(&self, batch: RecordBatch) -> Result<usize, Error> {
let row_cnt = batch.num_rows();
let batch = Batch::from(batch);

self.send_buf_row_cnt.fetch_add(row_cnt, Ordering::SeqCst);

self.send_buf_tx.send(batch).await.map_err(|e| {
crate::error::InternalSnafu {
reason: format!("Failed to send batch, error = {:?}", e),
}
.build()
})?;
Ok(row_cnt)
}
}

impl FlownodeContext {
Expand All @@ -200,6 +220,16 @@ impl FlownodeContext {
sender.send_rows(rows, batch_datatypes).await
}

pub async fn send_rb(&self, table_id: TableId, batch: RecordBatch) -> Result<usize, Error> {
let sender = self
.source_sender
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
name: table_id.to_string(),
})?;
sender.send_record_batch(batch).await
}

/// flush all sender's buf
///
/// return numbers being sent
Expand Down Expand Up @@ -235,6 +265,15 @@ impl FlownodeContext {
self.sink_to_flow.insert(sink_table_name, task_id);
}

/// add flow plan to worker context
pub fn add_flow_plan(&mut self, task_id: FlowId, plan: TypedPlan) {
self.flow_plans.insert(task_id, plan);
}

pub fn get_flow_plan(&self, task_id: &FlowId) -> Option<TypedPlan> {
self.flow_plans.get(task_id).cloned()
}

/// remove flow from worker context
pub fn remove_flow(&mut self, task_id: FlowId) {
if let Some(sink_table_name) = self.flow_to_sink.remove(&task_id) {
Expand All @@ -246,6 +285,7 @@ impl FlownodeContext {
self.source_sender.remove(source_table_id);
}
}
self.flow_plans.remove(&task_id);
}

/// try add source sender, if already exist, do nothing
Expand Down
33 changes: 33 additions & 0 deletions src/flow/src/adapter/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,31 @@ impl ManagedTableSource {
}
}

/// Get the time index column from table id
pub async fn get_time_index_column_from_table_id(
&self,
table_id: TableId,
) -> Result<(usize, datatypes::schema::ColumnSchema), Error> {
let info = self
.table_info_manager
.get(table_id)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.context(UnexpectedSnafu {
reason: format!("Table id = {:?}, couldn't found table info", table_id),
})?;
let raw_schema = &info.table_info.meta.schema;
let Some(ts_index) = raw_schema.timestamp_index else {
UnexpectedSnafu {
reason: format!("Table id = {:?}, couldn't found timestamp index", table_id),
}
.fail()?
};
let col_schema = raw_schema.column_schemas[ts_index].clone();
Ok((ts_index, col_schema))
}

pub async fn get_table_id_from_proto_name(
&self,
name: &greptime_proto::v1::TableName,
Expand Down Expand Up @@ -168,6 +193,14 @@ impl ManagedTableSource {
let desc = table_info_value_to_relation_desc(table_info_value)?;
Ok((table_name, desc))
}

pub async fn check_table_exist(&self, table_id: &TableId) -> Result<bool, Error> {
self.table_info_manager
.exists(*table_id)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
}

impl std::fmt::Debug for ManagedTableSource {
Expand Down
8 changes: 7 additions & 1 deletion src/flow/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use common_error::{define_into_tonic_status, from_err_code_msg_to_header};
use common_macro::stack_trace_debug;
use common_telemetry::common_error::ext::ErrorExt;
use common_telemetry::common_error::status_code::StatusCode;
use snafu::{Location, Snafu};
use snafu::{Location, ResultExt, Snafu};
use tonic::metadata::MetadataMap;

use crate::adapter::FlowId;
Expand Down Expand Up @@ -259,3 +259,9 @@ impl ErrorExt for Error {
}

define_into_tonic_status!(Error);

impl From<EvalError> for Error {
fn from(e: EvalError) -> Self {
Err::<(), _>(e).context(EvalSnafu).unwrap_err()
}
}
11 changes: 11 additions & 0 deletions src/flow/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod linear;
pub(crate) mod relation;
mod scalar;
mod signature;
pub(crate) mod utils;

use arrow::compute::FilterBuilder;
use datatypes::prelude::{ConcreteDataType, DataType};
Expand Down Expand Up @@ -54,6 +55,16 @@ pub struct Batch {
diffs: Option<VectorRef>,
}

impl From<common_recordbatch::RecordBatch> for Batch {
fn from(value: common_recordbatch::RecordBatch) -> Self {
Self {
row_count: value.num_rows(),
batch: value.columns,
diffs: None,
}
}
}

impl PartialEq for Batch {
fn eq(&self, other: &Self) -> bool {
let mut batch_eq = true;
Expand Down
24 changes: 24 additions & 0 deletions src/flow/src/expr/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,30 @@ impl MapFilterProject {
}
}

pub fn get_nth_expr(&self, n: usize) -> Option<ScalarExpr> {
let idx = *self.projection.get(n)?;
if idx < self.input_arity {
Some(ScalarExpr::Column(idx))
} else {
// find direct ref to input's expr

let mut expr = self.expressions.get(idx - self.input_arity)?;
loop {
match expr {
ScalarExpr::Column(prev) => {
if *prev < self.input_arity {
return Some(ScalarExpr::Column(*prev));
} else {
expr = self.expressions.get(*prev - self.input_arity)?;
continue;
}
}
_ => return Some(expr.clone()),
}
}
}
}

/// The number of columns expected in the output row.
pub fn output_arity(&self) -> usize {
self.projection.len()
Expand Down
3 changes: 3 additions & 0 deletions src/flow/src/expr/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ impl ScalarExpr {
}

/// Eval this expression with the given values.
///
/// TODO(discord9): add tests to make sure `eval_batch` is the same as `eval` in
/// most cases
pub fn eval(&self, values: &[Value]) -> Result<Value, EvalError> {
match self {
ScalarExpr::Column(index) => Ok(values[*index].clone()),
Expand Down
Loading

0 comments on commit 0ee4133

Please sign in to comment.