Skip to content

Commit 2fc5ea6

Browse files
authored
Merge branch 'main' into bump-opendal
2 parents c95bff3 + 61e962e commit 2fc5ea6

14 files changed

+156
-79
lines changed

src/query/expression/src/aggregate/payload.rs

+5-6
Original file line numberDiff line numberDiff line change
@@ -402,15 +402,14 @@ impl Payload {
402402
true
403403
}
404404

405-
pub fn empty_block(&self) -> DataBlock {
406-
let columns = self
407-
.aggrs
408-
.iter()
409-
.map(|f| ColumnBuilder::with_capacity(&f.return_type().unwrap(), 0).build())
405+
pub fn empty_block(&self, fake_rows: Option<usize>) -> DataBlock {
406+
let fake_rows = fake_rows.unwrap_or(0);
407+
let columns = (0..self.aggrs.len())
408+
.map(|_| ColumnBuilder::repeat_default(&DataType::Binary, fake_rows).build())
410409
.chain(
411410
self.group_types
412411
.iter()
413-
.map(|t| ColumnBuilder::with_capacity(t, 0).build()),
412+
.map(|t| ColumnBuilder::repeat_default(t, fake_rows).build()),
414413
)
415414
.collect_vec();
416415
DataBlock::new_from_columns(columns)

src/query/expression/src/aggregate/payload_flush.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ impl Payload {
121121
}
122122

123123
if blocks.is_empty() {
124-
return Ok(self.empty_block());
124+
return Ok(self.empty_block(None));
125125
}
126126
DataBlock::concat(&blocks)
127127
}
@@ -173,7 +173,7 @@ impl Payload {
173173
}
174174

175175
if blocks.is_empty() {
176-
return Ok(self.empty_block());
176+
return Ok(self.empty_block(None));
177177
}
178178

179179
DataBlock::concat(&blocks)

src/query/service/src/interpreters/access/privilege_access.rs

+15-1
Original file line numberDiff line numberDiff line change
@@ -1151,7 +1151,21 @@ impl AccessChecker for PrivilegeAccess {
11511151
self.validate_access(&GrantObject::Global, UserPrivilegeType::Grant,false, false)
11521152
.await?;
11531153
}
1154-
Plan::Set(_) | Plan::Unset(_) | Plan::Kill(_) | Plan::SetPriority(_) | Plan::System(_) => {
1154+
Plan::Set(plan) => {
1155+
use databend_common_ast::ast::SetType;
1156+
if let SetType::SettingsGlobal = plan.set_type {
1157+
self.validate_access(&GrantObject::Global, UserPrivilegeType::Super, false, false)
1158+
.await?;
1159+
}
1160+
}
1161+
Plan::Unset(plan) => {
1162+
use databend_common_ast::ast::SetType;
1163+
if let SetType::SettingsGlobal = plan.unset_type {
1164+
self.validate_access(&GrantObject::Global, UserPrivilegeType::Super, false, false)
1165+
.await?;
1166+
}
1167+
}
1168+
Plan::Kill(_) | Plan::SetPriority(_) | Plan::System(_) => {
11551169
self.validate_access(&GrantObject::Global, UserPrivilegeType::Super, false, false)
11561170
.await?;
11571171
}

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs

+10-16
Original file line numberDiff line numberDiff line change
@@ -173,26 +173,20 @@ impl FlightScatter for HashTableHashScatter {
173173
AggregateMeta::Partitioned { .. } => unreachable!(),
174174
AggregateMeta::AggregateSpilling(payload) => {
175175
for p in scatter_partitioned_payload(payload, self.buckets)? {
176-
blocks.push(match p.len() == 0 {
177-
true => DataBlock::empty(),
178-
false => DataBlock::empty_with_meta(
179-
AggregateMeta::create_agg_spilling(p),
180-
),
181-
});
176+
blocks.push(DataBlock::empty_with_meta(
177+
AggregateMeta::create_agg_spilling(p),
178+
));
182179
}
183180
}
184181
AggregateMeta::AggregatePayload(p) => {
185182
for payload in scatter_payload(p.payload, self.buckets)? {
186-
blocks.push(match payload.len() == 0 {
187-
true => DataBlock::empty(),
188-
false => {
189-
DataBlock::empty_with_meta(AggregateMeta::create_agg_payload(
190-
p.bucket,
191-
payload,
192-
p.max_partition_count,
193-
))
194-
}
195-
});
183+
blocks.push(DataBlock::empty_with_meta(
184+
AggregateMeta::create_agg_payload(
185+
p.bucket,
186+
payload,
187+
p.max_partition_count,
188+
),
189+
));
196190
}
197191
}
198192
};

src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs

+23-14
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ impl NewTransformPartitionBucket {
212212
#[allow(unused_assignments)]
213213
fn add_bucket(&mut self, mut data_block: DataBlock) -> Result<(isize, usize)> {
214214
let (mut bucket, mut partition_count) = (0, 0);
215+
let mut is_empty_block = false;
215216
if let Some(block_meta) = data_block.get_meta() {
216217
if let Some(block_meta) = AggregateMeta::downcast_ref_from(block_meta) {
217218
(bucket, partition_count) = match block_meta {
@@ -250,7 +251,11 @@ impl NewTransformPartitionBucket {
250251
if let Some(AggregateMeta::Spilled(buckets_payload)) =
251252
AggregateMeta::downcast_from(meta)
252253
{
253-
let partition_count = buckets_payload[0].max_partition_count;
254+
let partition_count = if !buckets_payload.is_empty() {
255+
buckets_payload[0].max_partition_count
256+
} else {
257+
MAX_PARTITION_COUNT
258+
};
254259
self.max_partition_count =
255260
self.max_partition_count.max(partition_count);
256261

@@ -274,12 +279,14 @@ impl NewTransformPartitionBucket {
274279
unreachable!()
275280
}
276281
AggregateMeta::Serialized(payload) => {
282+
is_empty_block = payload.data_block.is_empty();
277283
self.max_partition_count =
278284
self.max_partition_count.max(payload.max_partition_count);
279285

280286
(payload.bucket, payload.max_partition_count)
281287
}
282288
AggregateMeta::AggregatePayload(payload) => {
289+
is_empty_block = payload.payload.len() == 0;
283290
self.max_partition_count =
284291
self.max_partition_count.max(payload.max_partition_count);
285292

@@ -298,23 +305,25 @@ impl NewTransformPartitionBucket {
298305
));
299306
}
300307

301-
if self.all_inputs_init {
302-
if partition_count != self.max_partition_count {
303-
return Err(ErrorCode::Internal(
308+
if !is_empty_block {
309+
if self.all_inputs_init {
310+
if partition_count != self.max_partition_count {
311+
return Err(ErrorCode::Internal(
304312
"Internal, the partition count does not equal the max partition count on TransformPartitionBucket.
305313
",
306314
));
307-
}
308-
match self.buckets_blocks.entry(bucket) {
309-
Entry::Vacant(v) => {
310-
v.insert(vec![data_block]);
311-
}
312-
Entry::Occupied(mut v) => {
313-
v.get_mut().push(data_block);
314315
}
315-
};
316-
} else {
317-
self.unpartitioned_blocks.push(data_block);
316+
match self.buckets_blocks.entry(bucket) {
317+
Entry::Vacant(v) => {
318+
v.insert(vec![data_block]);
319+
}
320+
Entry::Occupied(mut v) => {
321+
v.get_mut().push(data_block);
322+
}
323+
};
324+
} else {
325+
self.unpartitioned_blocks.push(data_block);
326+
}
318327
}
319328

320329
Ok((bucket, partition_count))

src/query/service/src/pipelines/processors/transforms/aggregator/serde/serde_meta.rs

+10-12
Original file line numberDiff line numberDiff line change
@@ -31,28 +31,23 @@ pub struct AggregateSerdeMeta {
3131
pub columns_layout: Vec<usize>,
3232
// use for new agg hashtable
3333
pub max_partition_count: usize,
34+
pub is_empty: bool,
3435
}
3536

3637
impl AggregateSerdeMeta {
37-
pub fn create(bucket: isize) -> BlockMetaInfoPtr {
38-
Box::new(AggregateSerdeMeta {
39-
typ: BUCKET_TYPE,
40-
bucket,
41-
location: None,
42-
data_range: None,
43-
columns_layout: vec![],
44-
max_partition_count: 0,
45-
})
46-
}
47-
48-
pub fn create_agg_payload(bucket: isize, max_partition_count: usize) -> BlockMetaInfoPtr {
38+
pub fn create_agg_payload(
39+
bucket: isize,
40+
max_partition_count: usize,
41+
is_empty: bool,
42+
) -> BlockMetaInfoPtr {
4943
Box::new(AggregateSerdeMeta {
5044
typ: BUCKET_TYPE,
5145
bucket,
5246
location: None,
5347
data_range: None,
5448
columns_layout: vec![],
5549
max_partition_count,
50+
is_empty,
5651
})
5752
}
5853

@@ -61,6 +56,7 @@ impl AggregateSerdeMeta {
6156
location: String,
6257
data_range: Range<u64>,
6358
columns_layout: Vec<usize>,
59+
is_empty: bool,
6460
) -> BlockMetaInfoPtr {
6561
Box::new(AggregateSerdeMeta {
6662
typ: SPILLED_TYPE,
@@ -69,6 +65,7 @@ impl AggregateSerdeMeta {
6965
location: Some(location),
7066
data_range: Some(data_range),
7167
max_partition_count: 0,
68+
is_empty,
7269
})
7370
}
7471

@@ -86,6 +83,7 @@ impl AggregateSerdeMeta {
8683
location: Some(location),
8784
data_range: Some(data_range),
8885
max_partition_count,
86+
is_empty: false,
8987
})
9088
}
9189
}

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs

+28-4
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ pub struct SerializeAggregateStream {
183183
pub payload: Pin<Box<SerializePayload>>,
184184
flush_state: PayloadFlushState,
185185
end_iter: bool,
186+
nums: usize,
186187
}
187188

188189
unsafe impl Send for SerializeAggregateStream {}
@@ -198,6 +199,7 @@ impl SerializeAggregateStream {
198199
flush_state: PayloadFlushState::default(),
199200
_params: params.clone(),
200201
end_iter: false,
202+
nums: 0,
201203
}
202204
}
203205
}
@@ -225,10 +227,32 @@ impl SerializeAggregateStream {
225227
}
226228

227229
match block {
228-
Some(block) => Ok(Some(block.add_meta(Some(
229-
AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count),
230-
))?)),
231-
None => Ok(None),
230+
Some(block) => {
231+
self.nums += 1;
232+
Ok(Some(block.add_meta(Some(
233+
AggregateSerdeMeta::create_agg_payload(
234+
p.bucket,
235+
p.max_partition_count,
236+
false,
237+
),
238+
))?))
239+
}
240+
None => {
241+
// always return at least one block
242+
if self.nums == 0 {
243+
self.nums += 1;
244+
let block = p.payload.empty_block(Some(1));
245+
Ok(Some(block.add_meta(Some(
246+
AggregateSerdeMeta::create_agg_payload(
247+
p.bucket,
248+
p.max_partition_count,
249+
true,
250+
),
251+
))?))
252+
} else {
253+
Ok(None)
254+
}
255+
}
232256
}
233257
}
234258
}

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_deserializer.rs

+20-12
Original file line numberDiff line numberDiff line change
@@ -89,18 +89,26 @@ impl TransformDeserializer {
8989
}
9090
Some(meta) => {
9191
return match meta.typ == BUCKET_TYPE {
92-
true => Ok(DataBlock::empty_with_meta(
93-
AggregateMeta::create_serialized(
94-
meta.bucket,
95-
deserialize_block(
96-
dict,
97-
fragment_data,
98-
&self.schema,
99-
self.arrow_schema.clone(),
100-
)?,
101-
meta.max_partition_count,
102-
),
103-
)),
92+
true => {
93+
let mut block = deserialize_block(
94+
dict,
95+
fragment_data,
96+
&self.schema,
97+
self.arrow_schema.clone(),
98+
)?;
99+
100+
if meta.is_empty {
101+
block = block.slice(0..0);
102+
}
103+
104+
Ok(DataBlock::empty_with_meta(
105+
AggregateMeta::create_serialized(
106+
meta.bucket,
107+
block,
108+
meta.max_partition_count,
109+
),
110+
))
111+
}
104112
false => {
105113
let data_schema = Arc::new(exchange_defines::spilled_schema());
106114
let arrow_schema = Arc::new(exchange_defines::spilled_arrow_schema());

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs

+9-12
Original file line numberDiff line numberDiff line change
@@ -149,31 +149,28 @@ impl BlockMetaTransform<ExchangeShuffleMeta> for TransformExchangeAggregateSeria
149149
}
150150

151151
Some(AggregateMeta::AggregatePayload(p)) => {
152+
let (bucket, max_partition_count) = (p.bucket, p.max_partition_count);
153+
152154
if index == self.local_pos {
153155
serialized_blocks.push(FlightSerialized::DataBlock(
154156
block.add_meta(Some(Box::new(AggregateMeta::AggregatePayload(p))))?,
155157
));
156158
continue;
157159
}
158160

159-
let bucket = compute_block_number(p.bucket, p.max_partition_count)?;
161+
let block_number = compute_block_number(bucket, max_partition_count)?;
160162
let stream = SerializeAggregateStream::create(
161163
&self.params,
162164
SerializePayload::AggregatePayload(p),
163165
);
164166
let mut stream_blocks = stream.into_iter().collect::<Result<Vec<_>>>()?;
165-
166-
if stream_blocks.is_empty() {
167-
serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty()));
168-
} else {
169-
let mut c = DataBlock::concat(&stream_blocks)?;
170-
if let Some(meta) = stream_blocks[0].take_meta() {
171-
c.replace_meta(meta);
172-
}
173-
174-
let c = serialize_block(bucket, c, &self.options)?;
175-
serialized_blocks.push(FlightSerialized::DataBlock(c));
167+
debug_assert!(!stream_blocks.is_empty());
168+
let mut c = DataBlock::concat(&stream_blocks)?;
169+
if let Some(meta) = stream_blocks[0].take_meta() {
170+
c.replace_meta(meta);
176171
}
172+
let c = serialize_block(block_number, c, &self.options)?;
173+
serialized_blocks.push(FlightSerialized::DataBlock(c));
177174
}
178175
};
179176
}

src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs

+1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ impl TransformFinalAggregate {
5959
AggregateMeta::Serialized(payload) => match agg_hashtable.as_mut() {
6060
Some(ht) => {
6161
debug_assert!(bucket == payload.bucket);
62+
6263
let payload = payload.convert_to_partitioned_payload(
6364
self.params.group_data_types.clone(),
6465
self.params.aggregate_functions.clone(),

src/query/service/src/pipelines/processors/transforms/range_join/ie_join_state.rs

+4
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,10 @@ impl IEJoinState {
125125
fn intersection(&self, left_block: &DataBlock, right_block: &DataBlock) -> bool {
126126
let left_len = left_block.num_rows();
127127
let right_len = right_block.num_rows();
128+
if left_len == 0 || right_len == 0 {
129+
return false;
130+
}
131+
128132
let left_l1_column = left_block.columns()[0]
129133
.value
130134
.convert_to_full_column(&self.l1_data_type, left_len);

src/query/service/src/pipelines/processors/transforms/transform_srf.rs

+3
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ impl BlockingTransform for TransformSRF {
113113
}
114114

115115
let input = self.input.take().unwrap();
116+
if input.is_empty() {
117+
return Ok(None);
118+
}
116119

117120
let mut result_size = 0;
118121
let mut used = 0;

0 commit comments

Comments
 (0)