Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions datafusion/functions-table/src/generate_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ impl GenerateSeriesTable {
pub fn as_generator(
&self,
batch_size: usize,
projection: Option<Vec<usize>>,
) -> Result<Arc<RwLock<dyn LazyBatchGenerator>>> {
let generator: Arc<RwLock<dyn LazyBatchGenerator>> = match &self.args {
GenSeriesArgs::ContainsNull { name } => Arc::new(RwLock::new(Empty { name })),
Expand All @@ -255,6 +256,7 @@ impl GenerateSeriesTable {
batch_size,
include_end: *include_end,
name,
projection,
})),
GenSeriesArgs::TimestampArgs {
start,
Expand Down Expand Up @@ -295,6 +297,7 @@ impl GenerateSeriesTable {
batch_size,
include_end: *include_end,
name,
projection,
}))
}
GenSeriesArgs::DateArgs {
Expand Down Expand Up @@ -324,6 +327,7 @@ impl GenerateSeriesTable {
batch_size,
include_end: *include_end,
name,
projection,
})),
};

Expand All @@ -341,6 +345,7 @@ pub struct GenericSeriesState<T: SeriesValue> {
current: T,
include_end: bool,
name: &'static str,
projection: Option<Vec<usize>>,
}

impl<T: SeriesValue> GenericSeriesState<T> {
Expand Down Expand Up @@ -396,7 +401,11 @@ impl<T: SeriesValue> LazyBatchGenerator for GenericSeriesState<T> {

let array = self.current.create_array(buf)?;
let batch = RecordBatch::try_new(Arc::clone(&self.schema), vec![array])?;
Ok(Some(batch))
let projected = match self.projection.as_ref() {
Copy link
Contributor Author

@mkleen mkleen Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that the MemoryStream projects in https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/memory.rs#L103 while the LazyMemoryStream used in generate_series doesn't. An alternative approach could be to project generically in the LazyMemoryStream instead inside of the generator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like a great idea. Perhaps you could do so as a follow on PR

Some(projection) => batch.project(projection)?,
None => batch,
};
Ok(Some(projected))
}
}

Expand Down Expand Up @@ -477,7 +486,7 @@ impl TableProvider for GenerateSeriesTable {
None => self.schema(),
};

let generator = self.as_generator(batch_size)?;
let generator = self.as_generator(batch_size, projection.cloned())?;

Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1940,7 +1940,8 @@ impl protobuf::PhysicalPlanNode {
};

let table = GenerateSeriesTable::new(Arc::clone(&schema), args);
let generator = table.as_generator(generate_series.target_batch_size as usize)?;
let generator =
table.as_generator(generate_series.target_batch_size as usize, None)?;

Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
}
Expand Down
15 changes: 15 additions & 0 deletions datafusion/sqllogictest/test_files/table_functions.slt
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,21 @@ SELECT generate_series(1, t1.end) FROM generate_series(3, 5) as t1(end)
[1, 2, 3, 4]
[1, 2, 3]

# join with projection on generate_series
query I
select g1.value from generate_series(1, 3) g1 CROSS JOIN generate_series(1, 3) g2;
----
1
1
1
2
2
2
3
3
3


# Test range table function
query I
SELECT * FROM range(6)
Expand Down