Skip to content

Commit

Permalink
fix: unload csv file too large.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Jun 7, 2024
1 parent af82370 commit de6380e
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub(super) struct LimitFileSizeProcessor {
input: Arc<InputPort>,
output: Arc<OutputPort>,
threshold: usize,
flushing: bool,
buffered_size: usize,

input_data: Option<DataBlock>,
output_data: Option<DataBlock>,
Expand All @@ -42,23 +44,25 @@ impl LimitFileSizeProcessor {
pub(super) fn try_create(
input: Arc<InputPort>,
output: Arc<OutputPort>,
threshold: usize,
max_file_size: usize,
) -> Result<ProcessorPtr> {
let p = Self {
input,
output,
threshold,
threshold: max_file_size,
input_data: None,
output_data: None,
buffers: Vec::new(),
flushing: false,
buffered_size: 0,
};
Ok(ProcessorPtr::create(Box::new(p)))
}
}

impl Processor for LimitFileSizeProcessor {
fn name(&self) -> String {
String::from("ResizeProcessor")
String::from("LimitFileSizeProcessor")
}

fn as_any(&mut self) -> &mut dyn Any {
Expand All @@ -79,20 +83,20 @@ impl Processor for LimitFileSizeProcessor {
Ok(Event::NeedConsume)
}
None => {
if self.input_data.is_some() {
// backwards
if self.buffered_size > self.threshold || self.input_data.is_some() {
Ok(Event::Sync)
} else if self.input.has_data() {
self.input_data = Some(self.input.pull_data().unwrap()?);
Ok(Event::Sync)
} else if self.input.is_finished() {
if self.buffers.is_empty() {
assert_eq!(self.buffered_size, 0);
self.output.finish();
Ok(Event::Finished)
} else {
let buffers = std::mem::take(&mut self.buffers);
self.output
.push_data(Ok(FileOutputBuffers::create_block(buffers)));
Ok(Event::NeedConsume)
self.flushing = true;
Ok(Event::Sync)
}
} else {
self.input.set_need_data();
Expand All @@ -103,39 +107,38 @@ impl Processor for LimitFileSizeProcessor {
}
}

fn process(&mut self) -> databend_common_exception::Result<()> {
fn process(&mut self) -> Result<()> {
assert!(self.output_data.is_none());
assert!(self.input_data.is_some());
assert!(self.input_data.is_some() || self.flushing || self.buffered_size > self.threshold);

let block = self.input_data.take().unwrap();
let block_meta = block.get_owned_meta().unwrap();
let buffers = FileOutputBuffers::downcast_from(block_meta).unwrap();
let buffers = buffers.buffers;

self.buffers.extend(buffers);
if self.buffered_size <= self.threshold {
if let Some(block) = self.input_data.take() {
let block_meta = block.get_owned_meta().unwrap();
let buffers = FileOutputBuffers::downcast_from(block_meta).unwrap();
let buffers = buffers.buffers;
self.buffered_size += buffers.iter().map(|b| b.buffer.len()).sum::<usize>();
self.buffers.extend(buffers);
}
}

let mut size = 0;
let mut buffers = mem::take(&mut self.buffers);
let break_idx = buffers
.iter()
.enumerate()
.find_map(|(idx, b)| {
size += b.buffer.len();
if size >= self.threshold {
Some(idx)
} else {
None
}
})
.unwrap_or(buffers.len());
if break_idx == buffers.len() {
self.buffers = buffers;
Ok(())
} else {
let remain = buffers.split_off(break_idx + 1);
self.output_data = Some(FileOutputBuffers::create_block(buffers));
self.buffers = remain;
Ok(())
for i in 0..self.buffers.len() {
size += self.buffers[i].buffer.len();
if size > self.threshold {
let mut buffers = mem::take(&mut self.buffers);
self.buffers = buffers.split_off(i + 1);
self.buffered_size = self.buffers.iter().map(|b| b.buffer.len()).sum::<usize>();
self.output_data = Some(FileOutputBuffers::create_block(buffers));
return Ok(());
}
}
if self.flushing {
assert!(self.input_data.is_none());
let buffers = mem::take(&mut self.buffers);
self.output
.push_data(Ok(FileOutputBuffers::create_block(buffers)));
self.buffered_size = 0;
}
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl RowBasedFileWriter {
#[async_trait]
impl Processor for RowBasedFileWriter {
fn name(&self) -> String {
"StageSink".to_string()
"RowBasedFileWriter".to_string()
}

fn as_any(&mut self) -> &mut dyn Any {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
>>>> drop stage if exists unload_csv_max_file_size;
>>>> create stage unload_csv_max_file_size url='fs:///tmp/unload_csv_max_file_size/'
>>>> copy into @unload_csv_max_file_size from (select * from numbers(100000000)) max_file_size=100000 file_format=(type=csv)
100000000 888888890 888888890
100000000
>>>> drop stage if exists unload_csv_max_file_size;
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../../../shell_env.sh

name="unload_csv_max_file_size"

path="/tmp/$name/"
rm -r $path/*

stmt "drop stage if exists ${name};"
stmt "create stage ${name} url='fs://$path'"
stmt "copy into @${name} from (select * from numbers(100000000)) max_file_size=100000 file_format=(type=csv)"

cat $path/* | wc -l | sed 's/ //g'


if [[ "$OSTYPE" == "darwin"* ]]; then
file_sizes=($(find "$path" -type f -exec stat -f "%z" {} + | sort -n -r))
else
file_sizes=($(find "$path" -type f -exec stat -c "%s" {} + | sort -n -r))
fi
max_size=${file_sizes[1]}

if [ "$max_size" -gt 120000 ]; then
echo "max_size is $max_size"
fi
stmt "drop stage if exists ${name};"

0 comments on commit de6380e

Please sign in to comment.