Skip to content

refactor(executor): implement time slicing for shared executor scheduling across queries #14594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
193 changes: 147 additions & 46 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::VecDeque;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand Down Expand Up @@ -102,24 +103,36 @@ impl Node {
}
}

const POINTS_MASK: u64 = 0xFFFFFFFF00000000;
const EPOCH_MASK: u64 = 0x00000000FFFFFFFF;

// TODO Replace with a variable, not a const value
const MAX_POINTS: u64 = 3;

struct ExecutingGraph {
finished_nodes: AtomicUsize,
graph: StableGraph<Arc<Node>, EdgeInfo>,
/// Store two values
///
/// - the high 32 bit store the number of points that can be consumed
/// - the low 32 bit store this points belong to which epoch
points: AtomicU64,
}

type StateLockGuard = ExecutingGraph;

impl ExecutingGraph {
pub fn create(mut pipeline: Pipeline) -> Result<ExecutingGraph> {
pub fn create(mut pipeline: Pipeline, init_epoch: u32) -> Result<ExecutingGraph> {
let mut graph = StableGraph::new();
Self::init_graph(&mut pipeline, &mut graph);
Ok(ExecutingGraph {
graph,
finished_nodes: AtomicUsize::new(0),
points: AtomicU64::new((MAX_POINTS << 32) | init_epoch as u64),
})
}

pub fn from_pipelines(mut pipelines: Vec<Pipeline>) -> Result<ExecutingGraph> {
pub fn from_pipelines(mut pipelines: Vec<Pipeline>, init_epoch: u32) -> Result<ExecutingGraph> {
let mut graph = StableGraph::new();

for pipeline in &mut pipelines {
Expand All @@ -129,6 +142,7 @@ impl ExecutingGraph {
Ok(ExecutingGraph {
finished_nodes: AtomicUsize::new(0),
graph,
points: AtomicU64::new((MAX_POINTS << 32) | init_epoch as u64),
})
}

Expand Down Expand Up @@ -228,10 +242,11 @@ impl ExecutingGraph {
pub unsafe fn init_schedule_queue(
locker: &StateLockGuard,
capacity: usize,
graph: &Arc<RunningGraph>,
) -> Result<ScheduleQueue> {
let mut schedule_queue = ScheduleQueue::with_capacity(capacity);
for sink_index in locker.graph.externals(Direction::Outgoing) {
ExecutingGraph::schedule_queue(locker, sink_index, &mut schedule_queue)?;
ExecutingGraph::schedule_queue(locker, sink_index, &mut schedule_queue, graph)?;
}

Ok(schedule_queue)
Expand All @@ -244,6 +259,7 @@ impl ExecutingGraph {
locker: &StateLockGuard,
index: NodeIndex,
schedule_queue: &mut ScheduleQueue,
graph: &Arc<RunningGraph>,
) -> Result<()> {
let mut need_schedule_nodes = VecDeque::new();
let mut need_schedule_edges = VecDeque::new();
Expand Down Expand Up @@ -304,11 +320,17 @@ impl ExecutingGraph {
}
Event::NeedData | Event::NeedConsume => State::Idle,
Event::Sync => {
schedule_queue.push_sync(node.processor.clone());
schedule_queue.push_sync(ProcessorWrapper {
processor: node.processor.clone(),
graph: graph.clone(),
});
State::Processing
}
Event::Async => {
schedule_queue.push_async(node.processor.clone());
schedule_queue.push_async(ProcessorWrapper {
processor: node.processor.clone(),
graph: graph.clone(),
});
State::Processing
}
};
Expand All @@ -320,11 +342,48 @@ impl ExecutingGraph {

Ok(())
}

/// We check if we can perform the task in current epoch. If we can, we will consume one point and return true.
pub fn can_perform_task(&self, global_epoch: u32, max_points: u64) -> bool {
let mut expected_value = 0;
let mut desired_value = 0;
loop {
match self.points.compare_exchange_weak(
expected_value,
desired_value,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(old_value) => {
return (old_value & EPOCH_MASK) as u32 == global_epoch;
}
Err(new_expected) => {
let remain_points = (new_expected & POINTS_MASK) >> 32;
let epoch = new_expected & EPOCH_MASK;

expected_value = new_expected;
if epoch != global_epoch as u64 {
desired_value = new_expected;
} else if remain_points >= 1 {
desired_value = (remain_points - 1) << 32 | epoch;
} else {
desired_value = max_points << 32 | (epoch + 1);
}
}
}
}
}
}

#[derive(Clone)]
pub struct ProcessorWrapper {
pub processor: ProcessorPtr,
pub graph: Arc<RunningGraph>,
}

pub struct ScheduleQueue {
pub sync_queue: VecDeque<ProcessorPtr>,
pub async_queue: VecDeque<ProcessorPtr>,
pub sync_queue: VecDeque<ProcessorWrapper>,
pub async_queue: VecDeque<ProcessorWrapper>,
}

impl ScheduleQueue {
Expand All @@ -336,23 +395,35 @@ impl ScheduleQueue {
}

#[inline]
pub fn push_sync(&mut self, processor: ProcessorPtr) {
pub fn push_sync(&mut self, processor: ProcessorWrapper) {
self.sync_queue.push_back(processor);
}

#[inline]
pub fn push_async(&mut self, processor: ProcessorPtr) {
pub fn push_async(&mut self, processor: ProcessorWrapper) {
self.async_queue.push_back(processor);
}

pub fn schedule_tail(mut self, global: &ExecutorTasksQueue, ctx: &mut ExecutorWorkerContext) {
let mut tasks = VecDeque::with_capacity(self.sync_queue.len());

pub fn schedule_tail(
mut self,
global: &ExecutorTasksQueue,
ctx: &mut ExecutorWorkerContext,
executor: &Arc<PipelineExecutor>,
) {
let mut current_tasks = VecDeque::with_capacity(self.sync_queue.len());
let mut next_tasks = VecDeque::with_capacity(self.sync_queue.len());
while let Some(processor) = self.sync_queue.pop_front() {
tasks.push_back(ExecutorTask::Sync(processor));
if processor
.graph
.can_perform_task(executor.epoch.load(Ordering::SeqCst), MAX_POINTS)
{
current_tasks.push_back(ExecutorTask::Sync(processor));
} else {
next_tasks.push_back(ExecutorTask::Sync(processor));
}
}

global.push_tasks(ctx, tasks)
let worker_id = ctx.get_worker_id();
global.push_tasks(worker_id, Some(current_tasks), Some(next_tasks));
}

pub fn schedule(
Expand All @@ -364,27 +435,36 @@ impl ScheduleQueue {
debug_assert!(!context.has_task());

while let Some(processor) = self.async_queue.pop_front() {
Self::schedule_async_task(
processor,
context.query_id.clone(),
executor,
context.get_worker_id(),
context.get_workers_condvar().clone(),
global.clone(),
)
if processor
.graph
.can_perform_task(executor.epoch.load(Ordering::SeqCst), MAX_POINTS)
{
Self::schedule_async_task(
processor,
context.query_id.clone(),
executor,
context.get_worker_id(),
context.get_workers_condvar().clone(),
global.clone(),
)
} else {
let mut tasks = VecDeque::with_capacity(1);
tasks.push_back(ExecutorTask::Async(processor));
global.push_tasks(context.get_worker_id(), None, Some(tasks));
}
}

if !self.sync_queue.is_empty() {
self.schedule_sync(global, context);
self.schedule_sync(global, context, executor);
}

if !self.sync_queue.is_empty() {
self.schedule_tail(global, context);
self.schedule_tail(global, context, executor);
}
}

pub fn schedule_async_task(
proc: ProcessorPtr,
proc: ProcessorWrapper,
query_id: Arc<String>,
executor: &Arc<PipelineExecutor>,
wakeup_worker_id: usize,
Expand All @@ -394,18 +474,20 @@ impl ScheduleQueue {
unsafe {
workers_condvar.inc_active_async_worker();
let weak_executor = Arc::downgrade(executor);
let node_profile = executor.graph.get_node_profile(proc.id()).clone();
let process_future = proc.async_process();
let graph = proc.graph;
let node_profile = executor.graph.get_node_profile(proc.processor.id()).clone();
let process_future = proc.processor.async_process();
executor.async_runtime.spawn(
query_id.as_ref().clone(),
TrackedFuture::create(ProcessorAsyncTask::create(
query_id,
wakeup_worker_id,
proc.clone(),
proc.processor.clone(),
global_queue,
workers_condvar,
weak_executor,
node_profile,
graph,
process_future,
))
.in_span(Span::enter_with_local_parent(std::any::type_name::<
Expand All @@ -415,41 +497,56 @@ impl ScheduleQueue {
}
}

fn schedule_sync(&mut self, _: &ExecutorTasksQueue, ctx: &mut ExecutorWorkerContext) {
if let Some(processor) = self.sync_queue.pop_front() {
ctx.set_task(ExecutorTask::Sync(processor));
fn schedule_sync(
&mut self,
global: &ExecutorTasksQueue,
ctx: &mut ExecutorWorkerContext,
executor: &Arc<PipelineExecutor>,
) {
while let Some(processor) = self.sync_queue.pop_front() {
if processor
.graph
.can_perform_task(executor.epoch.load(Ordering::SeqCst), MAX_POINTS)
{
ctx.set_task(ExecutorTask::Sync(processor));
break;
} else {
let mut tasks = VecDeque::with_capacity(1);
tasks.push_back(ExecutorTask::Sync(processor));
global.push_tasks(ctx.get_worker_id(), None, Some(tasks));
}
}
}
}

pub struct RunningGraph(ExecutingGraph);

impl RunningGraph {
pub fn create(pipeline: Pipeline) -> Result<RunningGraph> {
let graph_state = ExecutingGraph::create(pipeline)?;
pub fn create(pipeline: Pipeline, init_epoch: u32) -> Result<Arc<RunningGraph>> {
let graph_state = ExecutingGraph::create(pipeline, init_epoch)?;
debug!("Create running graph:{:?}", graph_state);
Ok(RunningGraph(graph_state))
Ok(Arc::new(RunningGraph(graph_state)))
}

pub fn from_pipelines(pipelines: Vec<Pipeline>) -> Result<RunningGraph> {
let graph_state = ExecutingGraph::from_pipelines(pipelines)?;
pub fn from_pipelines(pipelines: Vec<Pipeline>, init_epoch: u32) -> Result<Arc<RunningGraph>> {
let graph_state = ExecutingGraph::from_pipelines(pipelines, init_epoch)?;
debug!("Create running graph:{:?}", graph_state);
Ok(RunningGraph(graph_state))
Ok(Arc::new(RunningGraph(graph_state)))
}

/// # Safety
///
/// Method is thread unsafe and require thread safe call
pub unsafe fn init_schedule_queue(&self, capacity: usize) -> Result<ScheduleQueue> {
ExecutingGraph::init_schedule_queue(&self.0, capacity)
pub unsafe fn init_schedule_queue(self: Arc<Self>, capacity: usize) -> Result<ScheduleQueue> {
ExecutingGraph::init_schedule_queue(&self.0, capacity, &self)
}

/// # Safety
///
/// Method is thread unsafe and require thread safe call
pub unsafe fn schedule_queue(&self, node_index: NodeIndex) -> Result<ScheduleQueue> {
pub unsafe fn schedule_queue(self: Arc<Self>, node_index: NodeIndex) -> Result<ScheduleQueue> {
let mut schedule_queue = ScheduleQueue::with_capacity(0);
ExecutingGraph::schedule_queue(&self.0, node_index, &mut schedule_queue)?;
ExecutingGraph::schedule_queue(&self.0, node_index, &mut schedule_queue, &self)?;
Ok(schedule_queue)
}

Expand Down Expand Up @@ -485,6 +582,10 @@ impl RunningGraph {
}
}

pub fn can_perform_task(&self, global_epoch: u32, max_points: u64) -> bool {
self.0.can_perform_task(global_epoch, max_points)
}

pub fn format_graph_nodes(&self) -> String {
pub struct NodeDisplay {
id: usize,
Expand Down Expand Up @@ -627,15 +728,15 @@ impl Debug for ScheduleQueue {

for item in &self.sync_queue {
sync_queue.push(QueueItem {
id: item.id().index(),
name: item.name().to_string(),
id: item.processor.id().index(),
name: item.processor.name().to_string(),
})
}

for item in &self.async_queue {
async_queue.push(QueueItem {
id: item.id().index(),
name: item.name().to_string(),
id: item.processor.id().index(),
name: item.processor.name().to_string(),
})
}

Expand Down
Loading