Skip to content

Commit

Permalink
Fix escaping in ParquetJson schema formatter, add JSON_AOS format alias
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Oct 26, 2024
1 parent 380c944 commit 25f8d53
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 9 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ Recommendation: for ease of reading, use the following order:
- Fixed
-->

## [Unreleased]
### Changed
- GraphQL: Removed deprecated `JSON_LD` in favor of `ND_JSON` in `DataBatchFormat`
- GraphQL: In `DataBatchFormat` introduced `JSON_AOS` format to replace the now deprecated `JSON` in effort to harmonize format names with REST API
### Fixed
- GraphQL: Fixed invalid JSON encoding in `PARQUET_JSON` schema format when column names contain special characters (#746)

## [0.206.1] - 2024-10-24
### Changed
- `kamu repo list`: supports all types of output
Expand Down
5 changes: 5 additions & 0 deletions resources/schema.gql
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,12 @@ type DataBatch {
}

enum DataBatchFormat {
"""
Deprecated: Use `JSON_AOS` instead and expect it to become default in
future versions
"""
JSON
JSON_AOS
JSON_SOA
JSON_AOA
ND_JSON
Expand Down
11 changes: 9 additions & 2 deletions src/adapter/graphql/src/scalars/data_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ const MAX_SOA_BUFFER_SIZE: usize = 100_000_000;

#[derive(Enum, Default, Debug, Clone, Copy, PartialEq, Eq)]
pub enum DataBatchFormat {
/// Deprecated: Use `JSON_AOS` instead and expect it to become default in
/// future versions
#[default]
Json,
JsonAOS,
JsonSOA,
JsonAOA,
NdJson,
Expand All @@ -45,7 +48,9 @@ impl DataBatch {
DataBatchFormat::Csv | DataBatchFormat::JsonLD | DataBatchFormat::NdJson => {
String::new()
}
DataBatchFormat::Json | DataBatchFormat::JsonAOA => String::from("[]"),
DataBatchFormat::Json | DataBatchFormat::JsonAOS | DataBatchFormat::JsonAOA => {
String::from("[]")
}
DataBatchFormat::JsonSOA => String::from("{}"),
},
num_records: 0,
Expand Down Expand Up @@ -77,7 +82,9 @@ impl DataBatch {
..Default::default()
},
)),
DataBatchFormat::Json => Box::new(JsonArrayOfStructsWriter::new(&mut buf)),
DataBatchFormat::Json | DataBatchFormat::JsonAOS => {
Box::new(JsonArrayOfStructsWriter::new(&mut buf))
}
DataBatchFormat::JsonSOA => {
Box::new(JsonStructOfArraysWriter::new(&mut buf, MAX_SOA_BUFFER_SIZE))
}
Expand Down
46 changes: 43 additions & 3 deletions src/adapter/graphql/src/scalars/data_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ mod test {
use super::*;

#[test_log::test(tokio::test)]
async fn test_from_parquet_schema_parquet() {
async fn test_data_schema_parquet() {
let df = get_test_df().await;

let result = DataSchema::from_data_frame_schema(df.schema(), DataSchemaFormat::Parquet);
Expand All @@ -140,7 +140,7 @@ mod test {
}

#[test_log::test(tokio::test)]
async fn test_from_parquet_schema_parquet_json() {
async fn test_data_schema_parquet_json() {
let df = get_test_df().await;

let result = DataSchema::from_data_frame_schema(df.schema(), DataSchemaFormat::ParquetJson);
Expand Down Expand Up @@ -173,7 +173,47 @@ mod test {
}

#[test_log::test(tokio::test)]
async fn test_from_parquet_schema_parquet_arrow_json() {
async fn test_data_schema_parquet_json_escaping() {
let schema = Arc::new(Schema::new(vec![Field::new(
"a \" b",
DataType::Utf8,
false,
)]));

let batch =
RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(vec!["a"]))]).unwrap();

let ctx = SessionContext::new();
ctx.register_batch("t", batch).unwrap();
let df = ctx.table("t").await.unwrap();

let result = DataSchema::from_data_frame_schema(df.schema(), DataSchemaFormat::ParquetJson);

let data_schema = result.unwrap();

assert_eq!(data_schema.format, DataSchemaFormat::ParquetJson);

let schema_content = data_schema.content;

let data_schema_json = serde_json::from_str::<Value>(schema_content.as_str()).unwrap();

assert_eq!(
data_schema_json,
serde_json::json!({
"fields": [{
"logicalType": "STRING",
"name": "a \" b",
"repetition": "REQUIRED",
"type": "BYTE_ARRAY"
}],
"name": "arrow_schema",
"type": "struct"
})
);
}

#[test_log::test(tokio::test)]
async fn test_data_schema_arrow_json() {
let df = get_test_df().await;

let result = DataSchema::from_data_frame_schema(df.schema(), DataSchemaFormat::ArrowJson);
Expand Down
32 changes: 32 additions & 0 deletions src/infra/core/tests/tests/test_schema_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,38 @@ use datafusion::parquet::basic::{ConvertedType, LogicalType, Repetition, Type as
use datafusion::parquet::schema::types::Type;
use kamu_data_utils::schema::format::write_schema_parquet_json;

#[test]
fn test_write_schema_parquet_json_escaping() {
let f1 = Type::primitive_type_builder("a \" b", PhysicalType::INT32)
.build()
.unwrap();
let fields = vec![Arc::new(f1)];
let message = Type::group_type_builder("schema")
.with_fields(fields)
.with_id(Some(1))
.build()
.unwrap();

let mut buf = Vec::new();
write_schema_parquet_json(&mut buf, &message).unwrap();

println!("{}", std::str::from_utf8(&buf).unwrap());

let actual: serde_json::Value = serde_json::from_slice(&buf).unwrap();

let expected = serde_json::json!({
"name": "schema",
"type": "struct",
"fields": [{
"name": "a \" b",
"repetition": "OPTIONAL",
"type": "INT32",
}]
});

assert_eq!(actual, expected);
}

#[test]
fn test_write_schema_parquet_json_group() {
let f1 = Type::primitive_type_builder("f1", PhysicalType::INT32)
Expand Down
26 changes: 22 additions & 4 deletions src/utils/data-utils/src/schema/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ impl<'a> ParquetJsonSchemaWriter<'a> {
} => {
write!(
self.output,
r#"{{"name": "{}", "repetition": "{}""#,
basic_info.name(),
r#"{{"name": {}, "repetition": "{}""#,
JsonEscapedString(basic_info.name()),
basic_info.repetition()
)?;

Expand Down Expand Up @@ -117,8 +117,8 @@ impl<'a> ParquetJsonSchemaWriter<'a> {
} => {
write!(
self.output,
r#"{{"name": "{}", "type": "struct""#,
basic_info.name()
r#"{{"name": {}, "type": "struct""#,
JsonEscapedString(basic_info.name())
)?;

if basic_info.has_repetition() {
Expand Down Expand Up @@ -227,3 +227,21 @@ impl<'a> ParquetJsonSchemaWriter<'a> {
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

struct JsonEscapedString<'a>(&'a str);

impl<'a> std::fmt::Display for JsonEscapedString<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use serde::Serializer;

// TODO: PERF: Find a way to avoid allocation and write directly into
// formatter's buffer
let mut buf = Vec::new();
let mut serializer = serde_json::Serializer::new(&mut buf);
serializer.serialize_str(self.0).unwrap();
write!(f, "{}", std::str::from_utf8(&buf).unwrap())?;
Ok(())
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

0 comments on commit 25f8d53

Please sign in to comment.