Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-7995] MercifulJsonConverter support convert number to fixed #11637

Closed
wants to merge 1 commit into from

Conversation

waitingF
Copy link
Contributor

Change Logs

When using JsonKafkaSource to dump source, for type decimal, it will be convert to arvo fixed type.
But the source can be Number instead of Fixed, it will fail the conversion. The MR is to fix the small bug.

Impact

none

Risk level (write none, low medium or high below)

low

Documentation Update

no doc update

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:S PR with lines of changes in (10, 100] label Jul 16, 2024
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

GenericFixed genericFixed = DECIMAL_CONVERSION.toFixed(bigDecimal, schema, schema.getLogicalType());
return Pair.of(true, new GenericData.Fixed(schema, genericFixed.bytes()));
}

// The ObjectMapper use List to represent FixedType
// eg: "decimal_val": [0, 0, 14, -63, -52] will convert to ArrayList<Integer>
List<Integer> converval = (List<Integer>) value;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, do you know why this value is a list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Decimal type uses a byte array to store data internally

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, can you provide a JSON example? and add it to ut for make ut more complete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already added a json example in ut

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in ut, you directly specified the list, is should be meaningless, List should use Array Type. I don't know why here value can be List type? I am not sure, does mapper.readValue(json, Map.class) will create List type expect Json Array.

@@ -301,6 +306,14 @@ private static JsonToAvroFieldProcessor generateFixedTypeHandler() {
return new JsonToAvroFieldProcessor() {
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
// the value can be Number
if (value instanceof Number) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why number type will use Fixed type schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my case, I dump mysql binlog to kafka via debezium.
There is a Decimal(20, 0) type field and a Decimal(20, 2) field in the mysql source table, when dumping to kafka, the 2 fields are both stored as number instead of list of Integer, eg 1234567 for Decimal(20, 0), 1234567.89 for Decimal(20, 2).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, so I think you avro schema decimal_field should not be fixed, should be float or double

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, in my case, the decimal(20, 0) is for bigint and decimal(20, 2) for double, So the debezium decode as I expected.
And the hudi JdbcSchemaProvider convert the mysql table schema as expected to Decimal(20, 0) and Decimal(20, 2).

But when hudi try decoding the bigint and double field from kafka, it throw exception which is unexpected.

the source table schema like

CREATE TABLE `xxxx` (
  `push_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '',
  `plan_id` bigint(20) unsigned NOT NULL COMMENT '',
  `to_station_id` bigint(20) unsigned NOT NULL,
  `plan_status` tinyint(3) NOT NULL,
  `api_type` tinyint(3) NOT NULL COMMENT ''
  PRIMARY KEY (`push_id`),
)

tranlated avro schema like

{
    "type": "record",
    "name": "sample_table",
    "namespace": "sample_namespace",
    "fields": [
        {
            "name": "push_id",
            "type": [
                "null",
                {
                    "type": "fixed",
                    "name": "fixed",
                    "namespace": "sample_namespace.sample_table.push_id",
                    "size": 9,
                    "logicalType": "decimal",
                    "precision": 20,
                    "scale": 0
                }
            ],
            "default": null
        },
        {
            "name": "plan_id",
            "type": [
                "null",
                {
                    "type": "fixed",
                    "name": "fixed",
                    "namespace": "sample_namespace.sample_table.plan_id",
                    "size": 9,
                    "logicalType": "decimal",
                    "precision": 20,
                    "scale": 0
                }
            ],
            "default": null
        },
        {
            "name": "to_station_id",
            "type": [
                "null",
                {
                    "type": "fixed",
                    "name": "fixed",
                    "namespace": "sample_namespace.sample_table.to_station_id",
                    "size": 9,
                    "logicalType": "decimal",
                    "precision": 20,
                    "scale": 0
                }
            ],
            "default": null
        },
        {
            "name": "plan_status",
            "type": [
                "null",
                "int"
            ],
            "default": null
        },
        {
            "name": "api_type",
            "type": [
                "null",
                "int"
            ],
            "default": null
        }
    ]
}

The bigint(20) is tranlated to Decimal(20, 0), but the data in kafka is just long, not the list of Integer, so the json convert throw below exception.

Caused by: org.apache.hudi.internal.schema.HoodieSchemaException: Failed to convert schema from json to avro: {"_event":{"old":null,"type":"insert","ts_ms":1720447722000,"version":"1.9.5.Final","connector":"mysql","name":"debezium.xx","ts":1720447722,"snapshot":"false","database":"xxx","sequence":null,"table":"xxx","server_id":241149,"gtid":"xxx","file":"mysql-bin.000416","position":161676701,"row":0,"thread":1459349,"query":null},"push_id":11193065,"plan_id":8928834333,"to_station_id":2028,"plan_status":5,"api_type":4}
at org.apache.hudi.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:121)
at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:33)
at org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:40)
at org.apache.hudi.utilities.schema.LazyCastingIterator.computeNext(LazyCastingIterator.java:30)
at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
... 18 more
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.util.List
at org.apache.hudi.avro.MercifulJsonConverter$9.convert(MercifulJsonConverter.java:306)
at org.apache.hudi.avro.MercifulJsonConverter$JsonToAvroFieldProcessor.convertToAvro(MercifulJsonConverter.java:203)
at org.apache.hudi.avro.MercifulJsonConverter.convertJsonToAvroField(MercifulJsonConverter.java:192)
at org.apache.hudi.avro.MercifulJsonConverter.convertJsonToAvro(MercifulJsonConverter.java:134)
at org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:115)
at org.apache.hudi.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:118)
... 24 more

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

Schema schema = Schema.parse(testSchemaStr);

String testValueStr = "{\n"
+ " \"decimal_field\": 1720623716,\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the schema type is Fixed, why use Number value for test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because avro store Decimal type as Fixed type internally

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the decimal type is logicalType, data type is Fixed, which is bytes, the logical type decimal I think in fixed is used to conver bytes to decimal, not conver number to decimal.

Copy link
Contributor

@KnightChess KnightChess left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking, I think Fixed type data should not handle numeric data types, although it supports the logical type Decimal. It should ideally refer to handling data types like bytes or strings. However, in a broader sense, enhancing support for numeric types can be considered. Yet, when converting it to JSON, there is ambiguity about what type it should be converted to—based on the actual Avro type or the logical type?
what do you think about? I also consulted the official Avro tools for converting JSON to Avro, which directly throws errors when handling numeric types.

GenericFixed genericFixed = DECIMAL_CONVERSION.toFixed(bigDecimal, schema, schema.getLogicalType());
return Pair.of(true, new GenericData.Fixed(schema, genericFixed.bytes()));
}

// The ObjectMapper use List to represent FixedType
// eg: "decimal_val": [0, 0, 14, -63, -52] will convert to ArrayList<Integer>
List<Integer> converval = (List<Integer>) value;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, can you provide a JSON example? and add it to ut for make ut more complete

Schema schema = Schema.parse(testSchemaStr);

String testValueStr = "{\n"
+ " \"decimal_field\": 1720623716,\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the decimal type is logicalType, data type is Fixed, which is bytes, the logical type decimal I think in fixed is used to conver bytes to decimal, not conver number to decimal.

@@ -301,6 +306,14 @@ private static JsonToAvroFieldProcessor generateFixedTypeHandler() {
return new JsonToAvroFieldProcessor() {
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
// the value can be Number
if (value instanceof Number) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, so I think you avro schema decimal_field should not be fixed, should be float or double

@waitingF
Copy link
Contributor Author

Strictly speaking, I think Fixed type data should not handle numeric data types, although it supports the logical type Decimal. It should ideally refer to handling data types like bytes or strings. However, in a broader sense, enhancing support for numeric types can be considered. Yet, when converting it to JSON, there is ambiguity about what type it should be converted to—based on the actual Avro type or the logical type? what do you think about? I also consulted the official Avro tools for converting JSON to Avro, which directly throws errors when handling numeric types.

Yeah, I think support for numeric types should be considered.
In my case, it should be the debezium's responsibility to encode the bigint(20) to fixed(as a byte array). But hudi as a data platform, should be compatible with most of cases for convenience.

Copy link
Contributor

@KnightChess KnightChess left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bvaradar can you help review it.

@@ -301,6 +306,14 @@ private static JsonToAvroFieldProcessor generateFixedTypeHandler() {
return new JsonToAvroFieldProcessor() {
@Override
public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
// the value can be Number
if (value instanceof Number) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

GenericFixed genericFixed = DECIMAL_CONVERSION.toFixed(bigDecimal, schema, schema.getLogicalType());
return Pair.of(true, new GenericData.Fixed(schema, genericFixed.bytes()));
}

// The ObjectMapper use List to represent FixedType
// eg: "decimal_val": [0, 0, 14, -63, -52] will convert to ArrayList<Integer>
List<Integer> converval = (List<Integer>) value;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in ut, you directly specified the list, is should be meaningless, List should use Array Type. I don't know why here value can be List type? I am not sure, does mapper.readValue(json, Map.class) will create List type expect Json Array.

@yihua
Copy link
Contributor

yihua commented Jul 24, 2024

This PR covers the decimal support: #11265. There will be more changes on top of it to support different decimal input format.

@waitingF waitingF closed this Aug 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:S PR with lines of changes in (10, 100]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants