diff --git a/rust/kcl-lib/src/engine/mod.rs b/rust/kcl-lib/src/engine/mod.rs index 6cef90c8e2..d78d3c20cd 100644 --- a/rust/kcl-lib/src/engine/mod.rs +++ b/rust/kcl-lib/src/engine/mod.rs @@ -93,6 +93,16 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static { /// Get the artifact commands that have accumulated so far. fn artifact_commands(&self) -> Arc>>; + /// Take the batch of commands that have accumulated so far and clear them. + async fn take_batch(&self) -> Vec<(WebSocketRequest, SourceRange)> { + std::mem::take(&mut *self.batch().write().await) + } + + /// Take the batch of end commands that have accumulated so far and clear them. + async fn take_batch_end(&self) -> IndexMap { + std::mem::take(&mut *self.batch_end().write().await) + } + /// Clear all artifact commands that have accumulated so far. async fn clear_artifact_commands(&self) { self.artifact_commands().write().await.clear(); @@ -370,11 +380,11 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static { source_range: SourceRange, ) -> Result { let all_requests = if batch_end { - let mut requests = self.batch().read().await.clone(); - requests.extend(self.batch_end().read().await.values().cloned()); + let mut requests = self.take_batch().await.clone(); + requests.extend(self.take_batch_end().await.values().cloned()); requests } else { - self.batch().read().await.clone() + self.take_batch().await.clone() }; // Return early if we have no commands to send. @@ -442,11 +452,6 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static { } } - // Throw away the old batch queue. - self.batch().write().await.clear(); - if batch_end { - self.batch_end().write().await.clear(); - } self.stats().batches_sent.fetch_add(1, Ordering::Relaxed); // We pop off the responses to cleanup our mappings.