Skip to content

Commit

Permalink
fix: reset context progress value after compact hook
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Dec 15, 2023
1 parent 61b4af4 commit 9541020
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/common/base/src/base/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ impl Progress {
.fetch_add(progress_values.bytes, Ordering::Relaxed);
}

pub fn set(&self, progress_values: &ProgressValues) {
self.rows.store(progress_values.rows, Ordering::Relaxed);
self.bytes.store(progress_values.bytes, Ordering::Relaxed);
}

pub fn fetch(&self) -> ProgressValues {
let rows = self.rows.fetch_min(0, Ordering::SeqCst);
let bytes = self.bytes.fetch_min(0, Ordering::SeqCst);
Expand Down
5 changes: 5 additions & 0 deletions src/query/service/src/interpreters/common/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,15 @@ async fn compact_table(

let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;

// keep the original progress value
let progress_value = ctx.get_write_progress_value();
// Clears previously generated segment locations to avoid duplicate data in the refresh phase
ctx.clear_segment_locations()?;
ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;

// reset the progress value
ctx.get_write_progress().set(&progress_value);
}
Ok(())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
expects .stats.write_progress.rows be 2
expects .error be null
2
null
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env bash

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../../../shell_env.sh

# set up
cat <<EOF | $BENDSQL_CLIENT_CONNECT
create database i13947;
use i13947;
create stage test_stage;
create table tmp(id int);
insert into tmp values(1);
insert into tmp values(2);
copy into @test_stage from (select * from tmp);
EOF


# It is not convenient to extract the .stats.write_progress.rows from the output of bendsql,
# thus, curl is used. To prevent the immature result returned by curl(which will not poll the result until the query is finished),
# pagination.wait_time_secs is set to 6 seconds.

# since 2 rows will be copied into tmp table from stage
echo "expects .stats.write_progress.rows be 2"
echo "expects .error be null"
curl -s -u root: -XPOST "http://localhost:8000/v1/query" \
--header 'Content-Type: application/json' \
-d '{"sql": "copy into i13947.tmp from (select * from @test_stage)", "pagination": { "wait_time_secs": 6}}' \
| jq -r '.stats.write_progress.rows, .error'

echo "DROP database i13947;" | $BENDSQL_CLIENT_CONNECT

0 comments on commit 9541020

Please sign in to comment.