Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): hash table scatter will always send agg meta #17245

Merged
merged 14 commits into from
Jan 13, 2025
11 changes: 5 additions & 6 deletions src/query/expression/src/aggregate/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,15 +401,14 @@ impl Payload {
true
}

pub fn empty_block(&self) -> DataBlock {
let columns = self
.aggrs
.iter()
.map(|f| ColumnBuilder::with_capacity(&f.return_type().unwrap(), 0).build())
pub fn empty_block(&self, fake_rows: Option<usize>) -> DataBlock {
let fake_rows = fake_rows.unwrap_or(0);
let columns = (0..self.aggrs.len())
.map(|_| ColumnBuilder::repeat_default(&DataType::Binary, fake_rows).build())
.chain(
self.group_types
.iter()
.map(|t| ColumnBuilder::with_capacity(t, 0).build()),
.map(|t| ColumnBuilder::repeat_default(t, fake_rows).build()),
)
.collect_vec();
DataBlock::new_from_columns(columns)
Expand Down
4 changes: 2 additions & 2 deletions src/query/expression/src/aggregate/payload_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl Payload {
}

if blocks.is_empty() {
return Ok(self.empty_block());
return Ok(self.empty_block(None));
}
DataBlock::concat(&blocks)
}
Expand Down Expand Up @@ -173,7 +173,7 @@ impl Payload {
}

if blocks.is_empty() {
return Ok(self.empty_block());
return Ok(self.empty_block(None));
}

DataBlock::concat(&blocks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,26 +173,20 @@ impl FlightScatter for HashTableHashScatter {
AggregateMeta::Partitioned { .. } => unreachable!(),
AggregateMeta::AggregateSpilling(payload) => {
for p in scatter_partitioned_payload(payload, self.buckets)? {
blocks.push(match p.len() == 0 {
true => DataBlock::empty(),
false => DataBlock::empty_with_meta(
AggregateMeta::create_agg_spilling(p),
),
});
blocks.push(DataBlock::empty_with_meta(
AggregateMeta::create_agg_spilling(p),
));
}
}
AggregateMeta::AggregatePayload(p) => {
for payload in scatter_payload(p.payload, self.buckets)? {
blocks.push(match payload.len() == 0 {
true => DataBlock::empty(),
false => {
DataBlock::empty_with_meta(AggregateMeta::create_agg_payload(
p.bucket,
payload,
p.max_partition_count,
))
}
});
blocks.push(DataBlock::empty_with_meta(
AggregateMeta::create_agg_payload(
p.bucket,
payload,
p.max_partition_count,
),
));
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ impl NewTransformPartitionBucket {
#[allow(unused_assignments)]
fn add_bucket(&mut self, mut data_block: DataBlock) -> Result<(isize, usize)> {
let (mut bucket, mut partition_count) = (0, 0);
let mut is_empty_block = false;
if let Some(block_meta) = data_block.get_meta() {
if let Some(block_meta) = AggregateMeta::downcast_ref_from(block_meta) {
(bucket, partition_count) = match block_meta {
Expand Down Expand Up @@ -250,7 +251,11 @@ impl NewTransformPartitionBucket {
if let Some(AggregateMeta::Spilled(buckets_payload)) =
AggregateMeta::downcast_from(meta)
{
let partition_count = buckets_payload[0].max_partition_count;
let partition_count = if !buckets_payload.is_empty() {
buckets_payload[0].max_partition_count
} else {
MAX_PARTITION_COUNT
};
self.max_partition_count =
self.max_partition_count.max(partition_count);

Expand All @@ -274,12 +279,14 @@ impl NewTransformPartitionBucket {
unreachable!()
}
AggregateMeta::Serialized(payload) => {
is_empty_block = payload.data_block.is_empty();
self.max_partition_count =
self.max_partition_count.max(payload.max_partition_count);

(payload.bucket, payload.max_partition_count)
}
AggregateMeta::AggregatePayload(payload) => {
is_empty_block = payload.payload.len() == 0;
self.max_partition_count =
self.max_partition_count.max(payload.max_partition_count);

Expand All @@ -298,23 +305,25 @@ impl NewTransformPartitionBucket {
));
}

if self.all_inputs_init {
if partition_count != self.max_partition_count {
return Err(ErrorCode::Internal(
if !is_empty_block {
if self.all_inputs_init {
if partition_count != self.max_partition_count {
return Err(ErrorCode::Internal(
"Internal, the partition count does not equal the max partition count on TransformPartitionBucket.
",
));
}
match self.buckets_blocks.entry(bucket) {
Entry::Vacant(v) => {
v.insert(vec![data_block]);
}
Entry::Occupied(mut v) => {
v.get_mut().push(data_block);
}
};
} else {
self.unpartitioned_blocks.push(data_block);
match self.buckets_blocks.entry(bucket) {
Entry::Vacant(v) => {
v.insert(vec![data_block]);
}
Entry::Occupied(mut v) => {
v.get_mut().push(data_block);
}
};
} else {
self.unpartitioned_blocks.push(data_block);
}
}

Ok((bucket, partition_count))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,23 @@ pub struct AggregateSerdeMeta {
pub columns_layout: Vec<usize>,
// use for new agg hashtable
pub max_partition_count: usize,
pub is_empty: bool,
}

impl AggregateSerdeMeta {
pub fn create(bucket: isize) -> BlockMetaInfoPtr {
Box::new(AggregateSerdeMeta {
typ: BUCKET_TYPE,
bucket,
location: None,
data_range: None,
columns_layout: vec![],
max_partition_count: 0,
})
}

pub fn create_agg_payload(bucket: isize, max_partition_count: usize) -> BlockMetaInfoPtr {
pub fn create_agg_payload(
bucket: isize,
max_partition_count: usize,
is_empty: bool,
) -> BlockMetaInfoPtr {
Box::new(AggregateSerdeMeta {
typ: BUCKET_TYPE,
bucket,
location: None,
data_range: None,
columns_layout: vec![],
max_partition_count,
is_empty,
})
}

Expand All @@ -61,6 +56,7 @@ impl AggregateSerdeMeta {
location: String,
data_range: Range<u64>,
columns_layout: Vec<usize>,
is_empty: bool,
) -> BlockMetaInfoPtr {
Box::new(AggregateSerdeMeta {
typ: SPILLED_TYPE,
Expand All @@ -69,6 +65,7 @@ impl AggregateSerdeMeta {
location: Some(location),
data_range: Some(data_range),
max_partition_count: 0,
is_empty,
})
}

Expand All @@ -86,6 +83,7 @@ impl AggregateSerdeMeta {
location: Some(location),
data_range: Some(data_range),
max_partition_count,
is_empty: false,
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ pub struct SerializeAggregateStream {
pub payload: Pin<Box<SerializePayload>>,
flush_state: PayloadFlushState,
end_iter: bool,
nums: usize,
}

unsafe impl Send for SerializeAggregateStream {}
Expand All @@ -198,6 +199,7 @@ impl SerializeAggregateStream {
flush_state: PayloadFlushState::default(),
_params: params.clone(),
end_iter: false,
nums: 0,
}
}
}
Expand Down Expand Up @@ -225,10 +227,32 @@ impl SerializeAggregateStream {
}

match block {
Some(block) => Ok(Some(block.add_meta(Some(
AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count),
))?)),
None => Ok(None),
Some(block) => {
self.nums += 1;
Ok(Some(block.add_meta(Some(
AggregateSerdeMeta::create_agg_payload(
p.bucket,
p.max_partition_count,
false,
),
))?))
}
None => {
// always return at least one block
if self.nums == 0 {
self.nums += 1;
let block = p.payload.empty_block(Some(1));
Ok(Some(block.add_meta(Some(
AggregateSerdeMeta::create_agg_payload(
p.bucket,
p.max_partition_count,
true,
),
))?))
} else {
Ok(None)
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,26 @@ impl TransformDeserializer {
}
Some(meta) => {
return match meta.typ == BUCKET_TYPE {
true => Ok(DataBlock::empty_with_meta(
AggregateMeta::create_serialized(
meta.bucket,
deserialize_block(
dict,
fragment_data,
&self.schema,
self.arrow_schema.clone(),
)?,
meta.max_partition_count,
),
)),
true => {
let mut block = deserialize_block(
dict,
fragment_data,
&self.schema,
self.arrow_schema.clone(),
)?;

if meta.is_empty {
block = block.slice(0..0);
}

Ok(DataBlock::empty_with_meta(
AggregateMeta::create_serialized(
meta.bucket,
block,
meta.max_partition_count,
),
))
}
false => {
let data_schema = Arc::new(exchange_defines::spilled_schema());
let arrow_schema = Arc::new(exchange_defines::spilled_arrow_schema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,31 +149,28 @@ impl BlockMetaTransform<ExchangeShuffleMeta> for TransformExchangeAggregateSeria
}

Some(AggregateMeta::AggregatePayload(p)) => {
let (bucket, max_partition_count) = (p.bucket, p.max_partition_count);

if index == self.local_pos {
serialized_blocks.push(FlightSerialized::DataBlock(
block.add_meta(Some(Box::new(AggregateMeta::AggregatePayload(p))))?,
));
continue;
}

let bucket = compute_block_number(p.bucket, p.max_partition_count)?;
let block_number = compute_block_number(bucket, max_partition_count)?;
let stream = SerializeAggregateStream::create(
&self.params,
SerializePayload::AggregatePayload(p),
);
let mut stream_blocks = stream.into_iter().collect::<Result<Vec<_>>>()?;

if stream_blocks.is_empty() {
serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty()));
} else {
let mut c = DataBlock::concat(&stream_blocks)?;
if let Some(meta) = stream_blocks[0].take_meta() {
c.replace_meta(meta);
}

let c = serialize_block(bucket, c, &self.options)?;
serialized_blocks.push(FlightSerialized::DataBlock(c));
debug_assert!(!stream_blocks.is_empty());
let mut c = DataBlock::concat(&stream_blocks)?;
if let Some(meta) = stream_blocks[0].take_meta() {
c.replace_meta(meta);
}
let c = serialize_block(block_number, c, &self.options)?;
serialized_blocks.push(FlightSerialized::DataBlock(c));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl TransformFinalAggregate {
AggregateMeta::Serialized(payload) => match agg_hashtable.as_mut() {
Some(ht) => {
debug_assert!(bucket == payload.bucket);

let payload = payload.convert_to_partitioned_payload(
self.params.group_data_types.clone(),
self.params.aggregate_functions.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ impl IEJoinState {
fn intersection(&self, left_block: &DataBlock, right_block: &DataBlock) -> bool {
let left_len = left_block.num_rows();
let right_len = right_block.num_rows();
if left_len == 0 || right_len == 0 {
return false;
}

let left_l1_column = left_block.columns()[0]
.value
.convert_to_full_column(&self.l1_data_type, left_len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ impl BlockingTransform for TransformSRF {
}

let input = self.input.take().unwrap();
if input.is_empty() {
return Ok(None);
}

let mut result_size = 0;
let mut used = 0;
Expand Down
2 changes: 1 addition & 1 deletion tests/suites/0_stateless/19_fuzz/19_0004_fuzz_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@

for i in range(num_predicates):
sql = f"explain analyze partial select * from t where a >= {int(step * i)};"
client1.run(sql)
client1.run(sql)
Loading