Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7995] MercifulJsonConverter support convert number to fixed #11637

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,17 @@
import org.apache.hudi.exception.HoodieIOException;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;

import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -38,6 +42,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.hudi.avro.HoodieAvroUtils.DECIMAL_CONVERSION;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;

/**
Expand Down Expand Up @@ -301,6 +306,14 @@ private static JsonToAvroFieldProcessor generateFixedTypeHandler() {
return new JsonToAvroFieldProcessor() {
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
// the value can be Number
if (value instanceof Number) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why number type will use Fixed type schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my case, I dump mysql binlog to kafka via debezium.
There is a Decimal(20, 0) type field and a Decimal(20, 2) field in the mysql source table, when dumping to kafka, the 2 fields are both stored as number instead of list of Integer, eg 1234567 for Decimal(20, 0), 1234567.89 for Decimal(20, 2).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, so I think you avro schema decimal_field should not be fixed, should be float or double

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, in my case, the decimal(20, 0) is for bigint and decimal(20, 2) for double, So the debezium decode as I expected.
And the hudi JdbcSchemaProvider convert the mysql table schema as expected to Decimal(20, 0) and Decimal(20, 2).

But when hudi try decoding the bigint and double field from kafka, it throw exception which is unexpected.

the source table schema like

CREATE TABLE `xxxx` (
  `push_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '',
  `plan_id` bigint(20) unsigned NOT NULL COMMENT '',
  `to_station_id` bigint(20) unsigned NOT NULL,
  `plan_status` tinyint(3) NOT NULL,
  `api_type` tinyint(3) NOT NULL COMMENT ''
  PRIMARY KEY (`push_id`),
)

tranlated avro schema like

{
    "type": "record",
    "name": "sample_table",
    "namespace": "sample_namespace",
    "fields": [
        {
            "name": "push_id",
            "type": [
                "null",
                {
                    "type": "fixed",
                    "name": "fixed",
                    "namespace": "sample_namespace.sample_table.push_id",
                    "size": 9,
                    "logicalType": "decimal",
                    "precision": 20,
                    "scale": 0
                }
            ],
            "default": null
        },
        {
            "name": "plan_id",
            "type": [
                "null",
                {
                    "type": "fixed",
                    "name": "fixed",
                    "namespace": "sample_namespace.sample_table.plan_id",
                    "size": 9,
                    "logicalType": "decimal",
                    "precision": 20,
                    "scale": 0
                }
            ],
            "default": null
        },
        {
            "name": "to_station_id",
            "type": [
                "null",
                {
                    "type": "fixed",
                    "name": "fixed",
                    "namespace": "sample_namespace.sample_table.to_station_id",
                    "size": 9,
                    "logicalType": "decimal",
                    "precision": 20,
                    "scale": 0
                }
            ],
            "default": null
        },
        {
            "name": "plan_status",
            "type": [
                "null",
                "int"
            ],
            "default": null
        },
        {
            "name": "api_type",
            "type": [
                "null",
                "int"
            ],
            "default": null
        }
    ]
}

The bigint(20) is tranlated to Decimal(20, 0), but the data in kafka is just long, not the list of Integer, so the json convert throw below exception.

Caused by: org.apache.hudi.internal.schema.HoodieSchemaException: Failed to convert schema from json to avro: {"_event":{"old":null,"type":"insert","ts_ms":1720447722000,"version":"1.9.5.Final","connector":"mysql","name":"debezium.xx","ts":1720447722,"snapshot":"false","database":"xxx","sequence":null,"table":"xxx","server_id":241149,"gtid":"xxx","file":"mysql-bin.000416","position":161676701,"row":0,"thread":1459349,"query":null},"push_id":11193065,"plan_id":8928834333,"to_station_id":2028,"plan_status":5,"api_type":4}
at org.apache.hudi.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:121)
at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:33)
at org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40)
at org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30)
at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
... 18 more
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.util.List
at org.apache.hudi.avro.MercifulJsonConverter$9.convert(MercifulJsonConverter.java:306)
at org.apache.hudi.avro.MercifulJsonConverter$JsonToAvroFieldProcessor.convertToAvro(MercifulJsonConverter.java:203)
at org.apache.hudi.avro.MercifulJsonConverter.convertJsonToAvroField(MercifulJsonConverter.java:192)
at org.apache.hudi.avro.MercifulJsonConverter.convertJsonToAvro(MercifulJsonConverter.java:134)
at org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:115)
at org.apache.hudi.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:118)
... 24 more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) schema.getLogicalType();
BigDecimal bigDecimal = new java.math.BigDecimal(value.toString()).setScale(decimalType.getScale(), RoundingMode.HALF_UP);
GenericFixed genericFixed = DECIMAL_CONVERSION.toFixed(bigDecimal, schema, schema.getLogicalType());
return Pair.of(true, new GenericData.Fixed(schema, genericFixed.bytes()));
}

// The ObjectMapper use List to represent FixedType
// eg: "decimal_val": [0, 0, 14, -63, -52] will convert to ArrayList<Integer>
List<Integer> converval = (List<Integer>) value;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, do you know why this value is a list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Decimal type uses a byte array to store data internally

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, can you provide a JSON example? and add it to ut for make ut more complete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already added a json example in ut

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in ut, you directly specified the list, is should be meaningless, List should use Array Type. I don't know why here value can be List type? I am not sure, does mapper.readValue(json, Map.class) will create List type expect Json Array.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,24 @@

import org.apache.hudi.common.testutils.SchemaTestUtil;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Conversions;
import org.apache.avro.Conversions.DecimalConversion;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Fixed;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class TestMercifulJsonConverter {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final MercifulJsonConverter CONVERTER = new MercifulJsonConverter(true,"__");
Expand Down Expand Up @@ -99,4 +106,44 @@ public void conversionWithFieldNameAliases() throws IOException {

Assertions.assertEquals(rec, CONVERTER.convert(json, sanitizedSchema));
}

@Test
public void testConvertNumberToFixed() throws IOException {
String testSchemaStr = "{\"type\": \"record\",\"name\": \"test_record\",\"namespace\": \"test_namespace\",\"fields\": "
+ "[{\"name\": \"decimal_field\",\"type\": [\"null\",{\"type\": \"fixed\",\"name\": \"fixed\",\"namespace\": \"test_namespace.decimal_field\",\"size\": 9,"
+ " \"logicalType\": \"decimal\",\"precision\": 20,\"scale\": 0}],\"default\": null},"
+ "{\"name\": \"decimal2_field\",\"type\": [\"null\",{\"type\": \"fixed\",\"name\": \"fixed\",\"namespace\": \"test_namespace.decimal2_field\",\"size\": 9,"
+ " \"logicalType\": \"decimal\",\"precision\": 20,\"scale\": 2}],\"default\": null},"
+ "{\"name\": \"decimal3_field\",\"type\": [\"null\",{\"type\": \"fixed\",\"name\": \"fixed\",\"namespace\": \"test_namespace.decimal3_field\",\"size\": 9,"
+ " \"logicalType\": \"decimal\",\"precision\": 20,\"scale\": 2}],\"default\": null},"
+ "{\"name\": \"int_field\",\"type\": [\"null\",\"int\"],\"default\": null},"
+ "{\"name\": \"long_field\",\"type\": [\"null\",\"long\"],\"default\": null},"
+ "{\"name\": \"string_field\",\"type\": [\"null\",\"string\"],\"default\": null}]}";
Schema schema = Schema.parse(testSchemaStr);

String testValueStr = "{\n"
+ " \"decimal_field\": 1720623716,\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the schema type is Fixed, why use Number value for test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because avro store Decimal type as Fixed type internally

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the decimal type is logicalType, data type is Fixed, which is bytes, the logical type decimal I think in fixed is used to conver bytes to decimal, not conver number to decimal.

+ " \"decimal2_field\": 1720623716.23,\n"
+ " \"decimal3_field\": [0, 0, 0, 0, 40, 15, -73, 111, 39],\n"
+ " \"int_field\": 1720623716,\n"
+ " \"long_field\": 1720623716,\n"
+ " \"string_field\": \"STRING040467046577\"\n"
+ "}";

GenericRecord record = CONVERTER.convert(testValueStr, schema);
ObjectMapper objectMapper = new ObjectMapper();
JsonNode root = objectMapper.readTree(testValueStr);

assertEquals(root.get("decimal_field").asLong(), convertFixedToDecimal((Fixed) record.get("decimal_field")).longValue());
assertEquals(root.get("decimal2_field").asDouble(), convertFixedToDecimal((Fixed) record.get("decimal2_field")).doubleValue());

Fixed testFixedValue = new Fixed(((Fixed) record.get("decimal3_field")).getSchema(), new byte[] {0, 0, 0, 0, 40, 15, -73, 111, 39});
assertEquals(1720623716.23, convertFixedToDecimal(testFixedValue).doubleValue());
assertEquals(testFixedValue, record.get("decimal3_field"));
}

private BigDecimal convertFixedToDecimal(GenericData.Fixed fixedRecord) {
DecimalConversion decimalConversion = new Conversions.DecimalConversion();
return decimalConversion.fromFixed(fixedRecord, fixedRecord.getSchema(), fixedRecord.getSchema().getLogicalType());
}
}
Loading