Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
kb1ns committed Dec 14, 2023
1 parent 024c0ea commit d4e61af
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 23 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# v0.7.0-rc.8
# v0.7.0-rc.9

- online test of migration

Expand Down
37 changes: 36 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 34 additions & 21 deletions engine/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,29 +89,42 @@ mod v1_to_v2 {
.unwrap();
data.into_raw(file).unwrap();
if !ignore_sequences {
let r = futures::executor::block_on(async move {
let sql = format!(
"select f_id,f_cmd,f_status,f_timestamp from t_sequence where f_id > {}",
event_id
);
sqlx::query(&sql)
.map(|row: sqlx::mysql::MySqlRow| -> (u64, Command, u8) {
let mut cmd: Command = serde_json::from_str(row.get("f_cmd")).unwrap();
cmd.timestamp = Some(
row.get::<sqlx::types::time::OffsetDateTime, &str>("f_timestamp")
.unix_timestamp() as u64,
);
(row.get("f_id"), cmd, row.get("f_status"))
})
.fetch_all(pool.as_ref())
.await
.unwrap()
});
for cmd in r {
if cmd.2 != 2 {
crate::sequencer::save(cmd.0, serde_json::to_vec(&cmd.1).unwrap()).unwrap();
let mut cursor = event_id;
loop {
cursor = migrate_sequences(&pool, cursor, 1000).await;
if cursor == event_id {
break;
}
}
}
}

async fn migrate_sequences(pool: &Pool<MySql>, event_id: u64, limit: usize) -> u64 {
let r = futures::executor::block_on(async move {
let sql = format!(
"select f_id,f_cmd,f_status,f_timestamp from t_sequence where f_id > {} limit {}",
event_id, limit
);
sqlx::query(&sql)
.map(|row: sqlx::mysql::MySqlRow| -> (u64, Command, u8) {
let mut cmd: Command = serde_json::from_str(row.get("f_cmd")).unwrap();
cmd.timestamp = Some(
row.get::<sqlx::types::time::OffsetDateTime, &str>("f_timestamp")
.unix_timestamp() as u64,
);
(row.get("f_id"), cmd, row.get("f_status"))
})
.fetch_all(pool)
.await
.unwrap()
});
let mut cursor = event_id;
for cmd in r {
if cmd.2 != 2 {
crate::sequencer::save(cmd.0, serde_json::to_vec(&cmd.1).unwrap()).unwrap();
cursor = cmd.0;
}
}
cursor
}
}

0 comments on commit d4e61af

Please sign in to comment.