-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
memory_limit.rs
330 lines (292 loc) · 10.1 KB
/
memory_limit.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! This module contains tests for limiting memory at runtime in DataFusion
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use futures::StreamExt;
use std::sync::Arc;
use datafusion::datasource::streaming::{PartitionStream, StreamingTable};
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionState;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::physical_optimizer::pipeline_fixer::PipelineFixer;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion_common::assert_contains;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_execution::TaskContext;
use test_utils::AccessLogGenerator;
#[cfg(test)]
#[ctor::ctor]
fn init() {
// Enable RUST_LOG logging configuration for test
let _ = env_logger::try_init();
}
#[tokio::test]
async fn oom_sort() {
run_limit_test(
"select * from t order by host DESC",
vec![
"Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)",
],
200_000,
)
.await
}
#[tokio::test]
async fn group_by_none() {
run_limit_test(
"select median(image) from t",
vec![
"Resources exhausted: Failed to allocate additional",
"AggregateStream",
],
20_000,
)
.await
}
#[tokio::test]
async fn group_by_row_hash() {
run_limit_test(
"select count(*) from t GROUP BY response_bytes",
vec![
"Resources exhausted: Failed to allocate additional",
"GroupedHashAggregateStream",
],
2_000,
)
.await
}
#[tokio::test]
async fn group_by_hash() {
run_limit_test(
// group by dict column
"select count(*) from t GROUP BY service, host, pod, container",
vec![
"Resources exhausted: Failed to allocate additional",
"GroupedHashAggregateStream",
],
1_000,
)
.await
}
#[tokio::test]
async fn join_by_key_multiple_partitions() {
let config = SessionConfig::new().with_target_partitions(2);
run_limit_test_with_config(
"select t1.* from t t1 JOIN t t2 ON t1.service = t2.service",
vec![
"Resources exhausted: Failed to allocate additional",
"HashJoinInput[0]",
],
1_000,
config,
)
.await
}
#[tokio::test]
async fn join_by_key_single_partition() {
let config = SessionConfig::new().with_target_partitions(1);
run_limit_test_with_config(
"select t1.* from t t1 JOIN t t2 ON t1.service = t2.service",
vec![
"Resources exhausted: Failed to allocate additional",
"HashJoinInput",
],
1_000,
config,
)
.await
}
#[tokio::test]
async fn join_by_expression() {
run_limit_test(
"select t1.* from t t1 JOIN t t2 ON t1.service != t2.service",
vec![
"Resources exhausted: Failed to allocate additional",
"NestedLoopJoinLoad[0]",
],
1_000,
)
.await
}
#[tokio::test]
async fn cross_join() {
run_limit_test(
"select t1.* from t t1 CROSS JOIN t t2",
vec![
"Resources exhausted: Failed to allocate additional",
"CrossJoinExec",
],
1_000,
)
.await
}
#[tokio::test]
async fn merge_join() {
// Planner chooses MergeJoin only if number of partitions > 1
let config = SessionConfig::new()
.with_target_partitions(2)
.set_bool("datafusion.optimizer.prefer_hash_join", false);
run_limit_test_with_config(
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
vec![
"Resources exhausted: Failed to allocate additional",
"SMJStream",
],
1_000,
config,
)
.await
}
#[tokio::test]
async fn test_limit_symmetric_hash_join() {
let config = SessionConfig::new();
run_streaming_test_with_config(
"select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time",
vec![
"Resources exhausted: Failed to allocate additional",
"SymmetricHashJoinStream",
],
1_000,
config,
)
.await
}
/// 50 byte memory limit
const MEMORY_FRACTION: f64 = 0.95;
/// runs the specified query against 1000 rows with specified
/// memory limit and no disk manager enabled with default SessionConfig.
async fn run_limit_test(
query: &str,
expected_error_contains: Vec<&str>,
memory_limit: usize,
) {
let config = SessionConfig::new();
run_limit_test_with_config(query, expected_error_contains, memory_limit, config).await
}
/// runs the specified query against 1000 rows with a 50
/// byte memory limit and no disk manager enabled
/// with specified SessionConfig instance
async fn run_limit_test_with_config(
query: &str,
expected_error_contains: Vec<&str>,
memory_limit: usize,
config: SessionConfig,
) {
let batches: Vec<_> = AccessLogGenerator::new()
.with_row_limit(1000)
.with_max_batch_size(50)
.collect();
let table = MemTable::try_new(batches[0].schema(), vec![batches]).unwrap();
let rt_config = RuntimeConfig::new()
// do not allow spilling
.with_disk_manager(DiskManagerConfig::Disabled)
.with_memory_limit(memory_limit, MEMORY_FRACTION);
let runtime = RuntimeEnv::new(rt_config).unwrap();
// Disabling physical optimizer rules to avoid sorts / repartitions
// (since RepartitionExec / SortExec also has a memory budget which we'll likely hit first)
let state = SessionState::with_config_rt(config, Arc::new(runtime))
.with_physical_optimizer_rules(vec![]);
let ctx = SessionContext::with_state(state);
ctx.register_table("t", Arc::new(table))
.expect("registering table");
let df = ctx.sql(query).await.expect("Planning query");
match df.collect().await {
Ok(_batches) => {
panic!("Unexpected success when running, expected memory limit failure")
}
Err(e) => {
for error_substring in expected_error_contains {
assert_contains!(e.to_string(), error_substring);
}
}
}
}
struct DummyStreamPartition {
schema: SchemaRef,
batches: Vec<RecordBatch>,
}
impl PartitionStream for DummyStreamPartition {
fn schema(&self) -> &SchemaRef {
&self.schema
}
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
// We create an iterator from the record batches and map them into Ok values,
// converting the iterator into a futures::stream::Stream
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
futures::stream::iter(self.batches.clone()).map(Ok),
))
}
}
async fn run_streaming_test_with_config(
query: &str,
expected_error_contains: Vec<&str>,
memory_limit: usize,
config: SessionConfig,
) {
// Generate a set of access logs with a row limit of 1000 and a max batch size of 50
let batches: Vec<_> = AccessLogGenerator::new()
.with_row_limit(1000)
.with_max_batch_size(50)
.collect();
// Create a new streaming table with the generated schema and batches
let table = StreamingTable::try_new(
batches[0].schema(),
vec![Arc::new(DummyStreamPartition {
schema: batches[0].schema(),
batches: batches.clone(),
})],
)
.unwrap()
.with_infinite_table(true);
// Configure the runtime environment with custom settings
let rt_config = RuntimeConfig::new()
// Disable disk manager to disallow spilling
.with_disk_manager(DiskManagerConfig::Disabled)
// Set memory limit to 50 bytes
.with_memory_limit(memory_limit, MEMORY_FRACTION);
// Create a new runtime environment with the configured settings
let runtime = RuntimeEnv::new(rt_config).unwrap();
// Create a new session state with the given configuration and runtime environment
// Disable all physical optimizer rules except the PipelineFixer rule to avoid sorts or
// repartition, as they also have memory budgets that may be hit first
let state = SessionState::with_config_rt(config, Arc::new(runtime))
.with_physical_optimizer_rules(vec![Arc::new(PipelineFixer::new())]);
// Create a new session context with the session state
let ctx = SessionContext::with_state(state);
// Register the streaming table with the session context
ctx.register_table("t", Arc::new(table))
.expect("registering table");
// Execute the SQL query and get a DataFrame
let df = ctx.sql(query).await.expect("Planning query");
// Collect the results of the DataFrame execution
match df.collect().await {
// If the execution succeeds, panic as we expect memory limit failure
Ok(_batches) => {
panic!("Unexpected success when running, expected memory limit failure")
}
// If the execution fails, verify if the error contains the expected substrings
Err(e) => {
for error_substring in expected_error_contains {
assert_contains!(e.to_string(), error_substring);
}
}
}
}