Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions datafusion/functions-table/src/generate_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ impl GenerateSeriesTable {
pub fn as_generator(
&self,
batch_size: usize,
projection: Option<Vec<usize>>,
) -> Result<Arc<RwLock<dyn LazyBatchGenerator>>> {
let generator: Arc<RwLock<dyn LazyBatchGenerator>> = match &self.args {
GenSeriesArgs::ContainsNull { name } => Arc::new(RwLock::new(Empty { name })),
Expand All @@ -256,7 +255,6 @@ impl GenerateSeriesTable {
batch_size,
include_end: *include_end,
name,
projection,
})),
GenSeriesArgs::TimestampArgs {
start,
Expand Down Expand Up @@ -297,7 +295,6 @@ impl GenerateSeriesTable {
batch_size,
include_end: *include_end,
name,
projection,
}))
}
GenSeriesArgs::DateArgs {
Expand Down Expand Up @@ -327,7 +324,6 @@ impl GenerateSeriesTable {
batch_size,
include_end: *include_end,
name,
projection,
})),
};

Expand All @@ -345,7 +341,6 @@ pub struct GenericSeriesState<T: SeriesValue> {
current: T,
include_end: bool,
name: &'static str,
projection: Option<Vec<usize>>,
}

impl<T: SeriesValue> GenericSeriesState<T> {
Expand Down Expand Up @@ -401,11 +396,7 @@ impl<T: SeriesValue> LazyBatchGenerator for GenericSeriesState<T> {

let array = self.current.create_array(buf)?;
let batch = RecordBatch::try_new(Arc::clone(&self.schema), vec![array])?;
let projected = match self.projection.as_ref() {
Some(projection) => batch.project(projection)?,
None => batch,
};
Ok(Some(projected))
Ok(Some(batch))
}
}

Expand Down Expand Up @@ -481,14 +472,12 @@ impl TableProvider for GenerateSeriesTable {
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let batch_size = state.config_options().execution.batch_size;
let schema = match projection {
Some(projection) => Arc::new(self.schema.project(projection)?),
None => self.schema(),
};

let generator = self.as_generator(batch_size, projection.cloned())?;
let generator = self.as_generator(batch_size)?;

Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
Ok(Arc::new(
LazyMemoryExec::try_new(self.schema(), vec![generator])?
.with_projection(projection.cloned()),
))
}
}

Expand Down
30 changes: 29 additions & 1 deletion datafusion/physical-plan/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display {
pub struct LazyMemoryExec {
/// Schema representing the data
schema: SchemaRef,
/// Optional projection for which columns to load
projection: Option<Vec<usize>>,
/// Functions to generate batches for each partition
batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
/// Plan properties cache storing equivalence properties, partitioning, and execution mode
Expand Down Expand Up @@ -199,12 +201,28 @@ impl LazyMemoryExec {

Ok(Self {
schema,
projection: None,
batch_generators: generators,
cache,
metrics: ExecutionPlanMetricsSet::new(),
})
}

pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
match projection.as_ref() {
Some(columns) => {
let projected = Arc::new(self.schema.project(columns).unwrap());
self.cache = self.cache.with_eq_properties(EquivalenceProperties::new(
Arc::clone(&projected),
));
self.schema = projected;
self.projection = projection;
self
}
_ => self,
}
}

pub fn try_set_partitioning(&mut self, partitioning: Partitioning) -> Result<()> {
if partitioning.partition_count() != self.batch_generators.len() {
internal_err!(
Expand Down Expand Up @@ -320,6 +338,7 @@ impl ExecutionPlan for LazyMemoryExec {

let stream = LazyMemoryStream {
schema: Arc::clone(&self.schema),
projection: self.projection.clone(),
generator: Arc::clone(&self.batch_generators[partition]),
baseline_metrics,
};
Expand All @@ -338,6 +357,8 @@ impl ExecutionPlan for LazyMemoryExec {
/// Stream that generates record batches on demand
pub struct LazyMemoryStream {
schema: SchemaRef,
/// Optional projection for which columns to load
projection: Option<Vec<usize>>,
/// Generator to produce batches
///
/// Note: Idiomatically, DataFusion uses plan-time parallelism - each stream
Expand All @@ -361,7 +382,14 @@ impl Stream for LazyMemoryStream {
let batch = self.generator.write().generate_next_batch();

let poll = match batch {
Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))),
Ok(Some(batch)) => {
// return just the columns requested
let batch = match self.projection.as_ref() {
Some(columns) => batch.project(columns)?,
None => batch,
};
Poll::Ready(Some(Ok(batch)))
}
Ok(None) => Poll::Ready(None),
Err(e) => Poll::Ready(Some(Err(e))),
};
Expand Down
3 changes: 1 addition & 2 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1940,8 +1940,7 @@ impl protobuf::PhysicalPlanNode {
};

let table = GenerateSeriesTable::new(Arc::clone(&schema), args);
let generator =
table.as_generator(generate_series.target_batch_size as usize, None)?;
let generator = table.as_generator(generate_series.target_batch_size as usize)?;

Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
}
Expand Down