Skip to content

Conversation

@wangxianghu
Copy link
Contributor

@wangxianghu wangxianghu commented Mar 1, 2022

Tips

What is the purpose of the pull request

Currently, we have Transform to transform source to target dataset before writing, but it is based on DataSet.
In some scenarios, our kafka data is not in the right format we need, such as binlog json format.
We need a way to extract/prepare the data we need from the original data before converting it into a DataSet.

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@wangxianghu wangxianghu changed the title [HUDI-3525] Introduce JsonkafkaSourceProcessor to support data prepro… [HUDI-3525] Introduce JsonkafkaSourcePostProcessor to support data preprocess before it is transformed to DataSet Mar 1, 2022
@nsivabalan
Copy link
Contributor

Can you check the CI failure please

@nsivabalan
Copy link
Contributor

@pratyakshsharma : Can you assist in reviewing this patch.

@nsivabalan nsivabalan added priority:high Significant impact; potential bugs priority:medium Moderate impact; usability gaps and removed priority:high Significant impact; potential bugs labels Mar 1, 2022
@pratyakshsharma
Copy link
Contributor

@pratyakshsharma : Can you assist in reviewing this patch.

Ack. Will review it today.

@nsivabalan nsivabalan self-assigned this Mar 2, 2022
Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test case by adding a test JsonKafkaPostProcessor and ensure it works.
also add a test where you set some invalid class for the new config added. and assert for exception.

@wangxianghu
Copy link
Contributor Author

Can we add a test case by adding a test JsonKafkaPostProcessor and ensure it works. also add a test where you set some invalid class for the new config added. and assert for exception.

done

* Base class for Json kafka source post processor. User can define their own processor that extends this class to do
* some post process on the incoming json string records before the records are converted to DataSet<T>.
*/
public abstract class JsonKafkaSourcePostProcessor implements Serializable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to an abstract class so as to provide a unified constructor

@wangxianghu
Copy link
Contributor Author

@hudi-bot run azure

1 similar comment
@wangxianghu
Copy link
Contributor Author

@hudi-bot run azure

@hudi-bot
Copy link
Collaborator

hudi-bot commented Mar 3, 2022

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@wangxianghu
Copy link
Contributor Author

@nsivabalan please take another look when free :)

@pratyakshsharma
Copy link
Contributor

@wangxianghu The changes look good to me. I have a high level query. You have mentioned facing issues with binlog json format for example. Do you mean to say transformation is not possible with data in binlog json format? Data coming from binlogs also has a structure/schema assigned to it as far as I remember. Can you post a sample event where you feel this new PostProcessor you introduced might be useful?
Basically I want to understand the motivation behind introducing this PR.

@wangxianghu
Copy link
Contributor Author

wangxianghu commented Mar 6, 2022

@wangxianghu The changes look good to me. I have a high level query. You have mentioned facing issues with binlog json format for example. Do you mean to say transformation is not possible with data in binlog json format? Data coming from binlogs also has a structure/schema assigned to it as far as I remember. Can you post a sample event where you feel this new PostProcessor you introduced might be useful? Basically I want to understand the motivation behind introducing this PR.

It is possible to deal with data in binlog json format, but not very convenient.

  1. For maxwell(our company use it to capture changed data)
{
    "database": "test",
    "table": "maxwell",
    "type": "update",
    "ts": 1449786341,
    "xid": 940786,
    "commit": true,
    "data": {"id":1, "daemon": "Firebus!  Firebus!","update_time" : "2022-02-03 12:22:42"},
    "old":  {"daemon": "Stanislaw Lem"}
  }

all we want is just :

{
    "id": 1, 
    "daemon": "Firebus!  Firebus!", 
    "update_time": "2022-02-03 12:22:42"
}

we can add write a processor to extract the data from the entire json and maybe do some custom define process, without configuring a huge schema file(including all the fields in the binlog json, no matter if we need them or not)

  1. in some scenes, we need to encode some fileds for safety purpose, the processor can help us

  2. sometimes our data quality is not very well, some key field let's say precombine field have null value, we can use processor to fix it

  3. when our schema is read from jdbc or hive, we can use processor adjust our kafka data compatible to it.

All in all, with custom processor we can do anything we want on the incoming json data before they are converted into DataSet
Of course Transformer is a very useful feature too, but it is based on Spark DataSet, and have certain requirements for data quality.

@pratyakshsharma
Copy link
Contributor

Right. With Debezium, this extraction of data is already supported. But with maxwell and any other similar service, this processor will be helpful.

@wangxianghu wangxianghu changed the title [HUDI-3525] Introduce JsonkafkaSourcePostProcessor to support data preprocess before it is transformed to DataSet [HUDI-3525] Introduce JsonkafkaSourcePostProcessor to support data post process before it is transformed to DataSet Mar 6, 2022
@wangxianghu
Copy link
Contributor Author

hi @nsivabalan any orther concern ?

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@nsivabalan nsivabalan merged commit c9ffdc4 into apache:master Mar 6, 2022
vingov pushed a commit to vingov/hudi that referenced this pull request Apr 3, 2022
stayrascal pushed a commit to stayrascal/hudi that referenced this pull request Apr 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:medium Moderate impact; usability gaps

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants