Skip to content

Commit

Permalink
chore: fix wasm & python ci
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 27, 2024
1 parent bd68bab commit 4f203c1
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 65 deletions.
6 changes: 3 additions & 3 deletions bindings/python/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
};

use pyo3::{pyclass, pymethods};
use tonbo::record::{ColumnDesc, Datatype};
use tonbo::record::{Datatype, Value};

use crate::datatype::DataType;

Expand Down Expand Up @@ -64,9 +64,9 @@ impl From<Column> for ColumnDesc {
ColumnDesc::new(col.name, datatype, col.nullable)
}
}
impl From<Column> for tonbo::record::Column {
impl From<Column> for Value {
fn from(col: Column) -> Self {
let datatype = Datatype::from(col.datatype);
tonbo::record::Column::new(datatype, col.name, col.value, col.nullable)
Value::new(datatype, col.name, col.value, col.nullable)
}
}
10 changes: 5 additions & 5 deletions bindings/python/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use pyo3::{
use pyo3_asyncio::tokio::{future_into_py, get_runtime};
use tonbo::{
executor::tokio::TokioExecutor,
record::{ColumnDesc, DynRecord},
record::DynRecord,
DB,
};

use tonbo::record::{DynSchema, Value};
use crate::{
column::Column,
error::{CommitError, DbError},
Expand Down Expand Up @@ -57,14 +57,14 @@ impl TonboDB {
desc.push(ColumnDesc::from(col));
}
}
let schema = DynSchema::new(desc, primary_key_index.unwrap());
let option = option.into_option(primary_key_index.unwrap(), primary_key_name.unwrap());
let db = get_runtime()
.block_on(async {
DB::with_schema(
option,
TokioExecutor::new(),
desc,
primary_key_index.unwrap(),
schema,
)
.await
})
Expand All @@ -87,7 +87,7 @@ impl TonboDB {
for i in 0..values.len()? {
let value = values.get_item(i)?;
if let Ok(bound_col) = value.downcast::<Column>() {
let col = tonbo::record::Column::from(bound_col.extract::<Column>()?);
let col = Value::from(bound_col.extract::<Column>()?);
cols.push(col);
}
}
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/src/range.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::ops;

use pyo3::{pyclass, FromPyObject, Py, PyAny, Python};

use tonbo::record::Value;
use crate::{utils::to_col, Column};

#[pyclass]
Expand All @@ -12,7 +12,7 @@ pub enum Bound {
}

impl Bound {
pub(crate) fn to_bound(&self, py: Python, col: &Column) -> ops::Bound<tonbo::record::Column> {
pub(crate) fn to_bound(&self, py: Python, col: &Column) -> ops::Bound<Value> {
match self {
Bound::Included { key } => ops::Bound::Included(to_col(py, col, key.clone_ref(py))),
Bound::Excluded { key } => ops::Bound::Excluded(to_col(py, col, key.clone_ref(py))),
Expand Down
8 changes: 4 additions & 4 deletions bindings/python/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ use pyo3::{
types::{PyAnyMethods, PyMapping, PyMappingMethods},
Py, PyAny, PyResult, Python,
};
use tonbo::record::DynRecord;
use tonbo::record::{DynRecord, Value};

use crate::Column;

#[derive(Clone)]
struct Record {
columns: Vec<tonbo::record::Column>,
columns: Vec<Value>,
primary_key_index: usize,
}

impl Record {
fn new(columns: Vec<tonbo::record::Column>, primary_key_index: usize) -> Self {
fn new(columns: Vec<Value>, primary_key_index: usize) -> Self {
Self {
columns,
primary_key_index,
Expand Down Expand Up @@ -58,7 +58,7 @@ impl RecordBatch {
if col.primary_key {
primary_key_index = col_idx;
}
let col = tonbo::record::Column::from(col);
let col = Value::from(col);
cols.push(col);
col_idx += 1;
}
Expand Down
12 changes: 6 additions & 6 deletions bindings/python/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use pyo3::{
};
use pyo3_asyncio::tokio::future_into_py;
use tonbo::{record::DynRecord, transaction, Projection};

use tonbo::record::Value;
use crate::{
column::Column,
error::{repeated_commit_err, CommitError, DbError},
Expand Down Expand Up @@ -123,7 +123,7 @@ impl Transaction {
let tuple = x.downcast::<PyTuple>()?;
let col = tuple.get_item(1)?;
if let Ok(bound_col) = col.downcast::<Column>() {
let col = tonbo::record::Column::from(bound_col.extract::<Column>()?);
let col = Value::from(bound_col.extract::<Column>()?);
cols.push(col);
}
}
Expand Down Expand Up @@ -182,14 +182,14 @@ impl Transaction {
let mut scan = txn.scan((
unsafe {
transmute::<
std::ops::Bound<&tonbo::record::Column>,
std::ops::Bound<&'static tonbo::record::Column>,
std::ops::Bound<&Value>,
std::ops::Bound<&'static Value>,
>(lower.as_ref())
},
unsafe {
transmute::<
std::ops::Bound<&tonbo::record::Column>,
std::ops::Bound<&'static tonbo::record::Column>,
std::ops::Bound<&Value>,
std::ops::Bound<&'static Value>,
>(high.as_ref())
},
));
Expand Down
12 changes: 6 additions & 6 deletions bindings/python/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use pyo3::{
types::{PyBytes, PyDict, PyDictMethods},
Bound, Py, PyAny, Python,
};
use tonbo::record::Datatype;
use tonbo::record::{Datatype, Value};

use crate::{column::Column, datatype::DataType, range};

pub(crate) fn to_dict(
py: Python,
primary_key_index: usize,
record: Vec<tonbo::record::Column>,
record: Vec<Value>,
) -> Bound<PyDict> {
let dict = PyDict::new_bound(py);
for (idx, col) in record.iter().enumerate() {
Expand Down Expand Up @@ -185,8 +185,8 @@ pub(crate) fn to_key(
}
}

pub(crate) fn to_col(py: Python, col: &Column, key: Py<PyAny>) -> tonbo::record::Column {
tonbo::record::Column::new(
pub(crate) fn to_col(py: Python, col: &Column, key: Py<PyAny>) -> Value {
Value::new(
Datatype::from(&col.datatype),
col.name.to_owned(),
to_key(py, &col.datatype, key),
Expand All @@ -200,8 +200,8 @@ pub(crate) fn to_bound(
lower: Option<Py<range::Bound>>,
high: Option<Py<range::Bound>>,
) -> (
std::ops::Bound<tonbo::record::Column>,
std::ops::Bound<tonbo::record::Column>,
std::ops::Bound<Value>,
std::ops::Bound<Value>,
) {
let lower = match lower {
Some(bound) => bound.get().to_bound(py, col),
Expand Down
74 changes: 35 additions & 39 deletions tests/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,55 +7,54 @@ mod tests {
use futures::StreamExt;
use tonbo::{
executor::opfs::OpfsExecutor,
record::{Datatype, DynRecord, Record, Value, ValueDesc},
record::{Datatype, DynRecord, DynSchema, Record, Value, ValueDesc},
DbOption, Projection, DB,
};
use wasm_bindgen_test::wasm_bindgen_test;

fn test_dyn_item_schema() -> (Vec<ColumnDesc>, usize) {
fn test_dyn_item_schema() -> DynSchema {
let descs = vec![
ColumnDesc::new("id".to_string(), Datatype::Int64, false),
ColumnDesc::new("age".to_string(), Datatype::Int8, true),
ColumnDesc::new("name".to_string(), Datatype::String, false),
ColumnDesc::new("email".to_string(), Datatype::String, true),
ColumnDesc::new("bytes".to_string(), Datatype::Bytes, true),
ValueDesc::new("id".to_string(), Datatype::Int64, false),
ValueDesc::new("age".to_string(), Datatype::Int8, true),
ValueDesc::new("name".to_string(), Datatype::String, false),
ValueDesc::new("email".to_string(), Datatype::String, true),
ValueDesc::new("bytes".to_string(), Datatype::Bytes, true),
];
(descs, 0)
DynSchema::new(descs, 0)
}

fn test_dyn_items() -> Vec<DynRecord> {
let mut items = vec![];
for i in 0..50 {
let columns = vec![
Column::new(Datatype::Int64, "id".to_string(), Arc::new(i as i64), false),
Column::new(
Value::new(Datatype::Int64, "id".to_string(), Arc::new(i as i64), false),
Value::new(
Datatype::Int8,
"age".to_string(),
Arc::new(Some(i as i8)),
true,
),
Column::new(
Value::new(
Datatype::String,
"name".to_string(),
Arc::new(i.to_string()),
false,
),
Column::new(
Value::new(
Datatype::String,
"email".to_string(),
Arc::new(Some(format!("{}@tonbo.io", i))),
true,
),
Column::new(
Value::new(
Datatype::Bytes,
"bytes".to_string(),
Arc::new(Some((i as i32).to_le_bytes().to_vec())),
true,
),
];

let record = DynRecord::new(columns, 0);
items.push(record);
items.push(DynRecord::new(columns, 0));
}
items
}
Expand All @@ -69,21 +68,20 @@ mod tests {

#[wasm_bindgen_test]
async fn test_wasm_read_write() {
let (cols_desc, primary_key_index) = test_dyn_item_schema();
let schema = test_dyn_item_schema();
let path = Path::from_opfs_path("opfs_dir_rw").unwrap();
let fs = fusio::disk::LocalFs {};
fs.create_dir_all(&path).await.unwrap();

let option = DbOption::with_path(
Path::from_opfs_path("opfs_dir_rw").unwrap(),
"id".to_string(),
primary_key_index,
0,
);

let db: DB<DynRecord, OpfsExecutor> =
DB::with_schema(option, OpfsExecutor::new(), cols_desc, primary_key_index)
.await
.unwrap();
let db: DB<DynRecord, OpfsExecutor> = DB::with_schema(option, OpfsExecutor::new(), schema)
.await
.unwrap();

for item in test_dyn_items().into_iter() {
db.insert(item).await.unwrap();
Expand All @@ -94,7 +92,7 @@ mod tests {
let tx = db.transaction().await;

for i in 0..50 {
let key = Column::new(Datatype::Int64, "id".to_string(), Arc::new(i as i64), false);
let key = Value::new(Datatype::Int64, "id".to_string(), Arc::new(i as i64), false);
let option1 = tx.get(&key, Projection::All).await.unwrap();
let entry = option1.unwrap();
let record_ref = entry.get();
Expand Down Expand Up @@ -154,7 +152,7 @@ mod tests {

#[wasm_bindgen_test]
async fn test_wasm_transaction() {
let (cols_desc, primary_key_index) = test_dyn_item_schema();
let schema = test_dyn_item_schema();

let fs = fusio::disk::LocalFs {};
let path = Path::from_opfs_path("opfs_dir_txn").unwrap();
Expand All @@ -163,13 +161,12 @@ mod tests {
let option = DbOption::with_path(
Path::from_opfs_path("opfs_dir_txn").unwrap(),
"id".to_string(),
primary_key_index,
0,
);

let db: DB<DynRecord, OpfsExecutor> =
DB::with_schema(option, OpfsExecutor::new(), cols_desc, primary_key_index)
.await
.unwrap();
let db: DB<DynRecord, OpfsExecutor> = DB::with_schema(option, OpfsExecutor::new(), schema)
.await
.unwrap();

{
let mut txn = db.transaction().await;
Expand All @@ -182,8 +179,8 @@ mod tests {
// test scan
{
let txn = db.transaction().await;
let lower = Column::new(Datatype::Int64, "id".to_owned(), Arc::new(5_i64), false);
let upper = Column::new(Datatype::Int64, "id".to_owned(), Arc::new(47_i64), false);
let lower = Value::new(Datatype::Int64, "id".to_owned(), Arc::new(5_i64), false);
let upper = Value::new(Datatype::Int64, "id".to_owned(), Arc::new(47_i64), false);
let mut scan = txn
.scan((Bound::Included(&lower), Bound::Included(&upper)))
.projection(vec![0, 2, 4])
Expand Down Expand Up @@ -239,20 +236,20 @@ mod tests {

#[wasm_bindgen_test]
async fn test_wasm_schema_recover() {
let (cols_desc, primary_key_index) = test_dyn_item_schema();
let schema = test_dyn_item_schema();
let path = Path::from_opfs_path("opfs_dir").unwrap();
let fs = fusio::disk::LocalFs {};
fs.create_dir_all(&path).await.unwrap();

let option = DbOption::with_path(
Path::from_opfs_path("opfs_dir").unwrap(),
"id".to_string(),
primary_key_index,
0,
);

{
let db: DB<DynRecord, OpfsExecutor> =
DB::with_schema(option, OpfsExecutor::new(), cols_desc, primary_key_index)
DB::with_schema(option, OpfsExecutor::new(), schema)
.await
.unwrap();

Expand All @@ -263,16 +260,15 @@ mod tests {
db.flush_wal().await.unwrap();
}

let (cols_desc, primary_key_index) = test_dyn_item_schema();
let schema = test_dyn_item_schema();
let option = DbOption::with_path(
Path::from_opfs_path("opfs_dir").unwrap(),
"id".to_string(),
primary_key_index,
0,
);
let db: DB<DynRecord, OpfsExecutor> =
DB::with_schema(option, OpfsExecutor::new(), cols_desc, primary_key_index)
.await
.unwrap();
let db: DB<DynRecord, OpfsExecutor> = DB::with_schema(option, OpfsExecutor::new(), schema)
.await
.unwrap();

let mut sort_items = BTreeMap::new();
for item in test_dyn_items() {
Expand Down

0 comments on commit 4f203c1

Please sign in to comment.