Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,49 +631,6 @@ mod tests {
);
}

#[test]
fn arr_obj_ignore_all_null_field() {
let json = json!([
{
"a": 1,
"b": "hello",
"c": null
},
{
"a": 1,
"b": "hello",
"c": null
},
{
"a": 1,
"b": "hello",
"c": null
},
]);

let req = TestRequest::default().to_http_request();

let (_, rb, _) = into_event_batch(
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None,
None,
)
.unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
assert_eq!(
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from(vec![Some(1), Some(1), Some(1)])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
&StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),])
);
}

#[test]
fn arr_schema_mismatch() {
let json = json!([
Expand Down
9 changes: 7 additions & 2 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http();
.to_http()?;

let time = time.elapsed().as_secs_f64();

Expand Down Expand Up @@ -293,12 +293,17 @@ pub enum QueryError {
EventError(#[from] EventError),
#[error("Error: {0}")]
MalformedQuery(String),
#[error(
r#"Error: Failed to Parse Record Batch into Json
Description: {0}"#
)]
JsonParse(String),
}

impl actix_web::ResponseError for QueryError {
fn status_code(&self) -> http::StatusCode {
match self {
QueryError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR,
QueryError::Execute(_) | QueryError::JsonParse(_) => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::BAD_REQUEST,
}
}
Expand Down
10 changes: 5 additions & 5 deletions server/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
*
*/

use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
use actix_web::{web, Responder};
use datafusion::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use serde_json::{json, Value};

use crate::utils::arrow::record_batches_to_json;

pub struct QueryResponse {
pub records: Vec<RecordBatch>,
pub fields: Vec<String>,
Expand All @@ -31,10 +30,11 @@ pub struct QueryResponse {
}

impl QueryResponse {
pub fn to_http(&self) -> impl Responder {
pub fn to_http(&self) -> Result<impl Responder, QueryError> {
log::info!("{}", "Returning query results");
let records: Vec<&RecordBatch> = self.records.iter().collect();
let mut json_records = record_batches_to_json(&records);
let mut json_records = record_batches_to_json(&records)
.map_err(|err| QueryError::JsonParse(err.to_string()))?;
if self.fill_null {
for map in &mut json_records {
for field in &self.fields {
Expand All @@ -55,6 +55,6 @@ impl QueryResponse {
Value::Array(values)
};

web::Json(response)
Ok(web::Json(response))
}
}
25 changes: 19 additions & 6 deletions server/src/utils/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod batch_adapter;
pub mod merged_reader;
pub mod reverse_reader;

use anyhow::Result;
pub use batch_adapter::adapt_batch;
pub use merged_reader::MergedRecordReader;
use serde_json::{Map, Value};
Expand Down Expand Up @@ -63,19 +64,23 @@ pub fn replace_columns(
/// * `records` - The record batches to convert.
///
/// # Returns
/// * Result<Vec<Map<String, Value>>>
///
/// A vector of JSON objects representing the record batches.
pub fn record_batches_to_json(records: &[&RecordBatch]) -> Vec<Map<String, Value>> {
pub fn record_batches_to_json(records: &[&RecordBatch]) -> Result<Vec<Map<String, Value>>> {
let buf = vec![];
let mut writer = arrow_json::ArrayWriter::new(buf);
writer.write_batches(records).unwrap();
writer.finish().unwrap();
writer.write_batches(records)?;
writer.finish()?;

let buf = writer.into_inner();

let json_rows: Vec<Map<String, Value>> = serde_json::from_reader(buf.as_slice()).unwrap();
let json_rows: Vec<Map<String, Value>> = match serde_json::from_reader(buf.as_slice()) {
Ok(json) => json,
Err(_) => vec![],
};

json_rows
Ok(json_rows)
}

/// Retrieves a field from a slice of fields by name.
Expand Down Expand Up @@ -105,7 +110,7 @@ mod tests {
use arrow_array::{Array, Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};

use super::replace_columns;
use super::{record_batches_to_json, replace_columns};

#[test]
fn check_replace() {
Expand Down Expand Up @@ -135,4 +140,12 @@ mod tests {
assert_eq!(new_rb.num_columns(), 3);
assert_eq!(new_rb.num_rows(), 3)
}

#[test]
fn check_empty_json_to_record_batches() {
let r = RecordBatch::new_empty(Arc::new(Schema::empty()));
let rb = vec![&r];
let batches = record_batches_to_json(&rb).unwrap();
assert_eq!(batches, vec![]);
}
}