Skip to content

Conversation

@vburenin
Copy link
Contributor

@vburenin vburenin commented Feb 23, 2021

What is the purpose of the pull request

This PR adds ability to:

  • Inject kafka meta information into the received data from avro kafka source, so users can track which kafka topic, partition, offset and the key delivered a specific record (super useful for debugging consistency issues). In SQL query user would need to specify <KAFKA_FIELDS> and enable appropriate options to enable the injection.
  • specify custom kafka avro data decoder that takes into account source latest schema and different schema versions.
  • cache retrieved source and target latest schemas
  • return null for the target schema if it is not in use by specifying "null" as a target schema url. (I know, there is NullTargetSchemaProvider, however it looks redundant).

Brief change log

  • Added kafka meta info injection
  • Kafka data deserializer can be swapped with a custom one.
  • source and target latest schema are optionally can being cached
  • if target schema url is set to "null", target schema will return null as a value forcing Hudi to infer the schema from the source one.

Verify this pull request

This change added tests and can be verified as follows:

  • Added unit tests for the components that were not originally covered.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@codecov-io
Copy link

codecov-io commented Feb 24, 2021

Codecov Report

Merging #2598 (9bc13c2) into master (0dde7f9) will decrease coverage by 42.37%.
The diff coverage is 0.00%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #2598       +/-   ##
============================================
- Coverage     51.55%   9.18%   -42.38%     
+ Complexity     3282      48     -3234     
============================================
  Files           445      55      -390     
  Lines         20317    2037    -18280     
  Branches       2102     251     -1851     
============================================
- Hits          10475     187    -10288     
+ Misses         8976    1837     -7139     
+ Partials        866      13      -853     
Flag Coverage Δ Complexity Δ
hudicli ? ?
hudiclient ? ?
hudicommon ? ?
hudiflink ? ?
hudihadoopmr ? ?
hudisparkdatasource ? ?
hudisync ? ?
huditimelineservice ? ?
hudiutilities 9.18% <0.00%> (-60.37%) 0.00 <0.00> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ Complexity Δ
...i/utilities/deser/KafkaAvroSchemaDeserializer.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
.../hudi/utilities/schema/SchemaRegistryProvider.java 0.00% <0.00%> (ø) 0.00 <0.00> (ø)
...apache/hudi/utilities/sources/AvroKafkaSource.java 0.00% <0.00%> (ø) 0.00 <0.00> (ø)
...lities/sources/helpers/AvroKafkaSourceHelpers.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
.../utilities/transform/SqlQueryBasedTransformer.java 0.00% <0.00%> (-75.00%) 0.00 <0.00> (-3.00)
...va/org/apache/hudi/utilities/IdentitySplitter.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-2.00%)
...va/org/apache/hudi/utilities/schema/SchemaSet.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-3.00%)
...a/org/apache/hudi/utilities/sources/RowSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-4.00%)
.../org/apache/hudi/utilities/sources/AvroSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-1.00%)
.../org/apache/hudi/utilities/sources/JsonSource.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-1.00%)
... and 429 more

@vinothchandar
Copy link
Member

@vburenin thanks for this. can we create a JIRA for this?

@vinothchandar
Copy link
Member

the goals here all look good to me. Please give me and @nsivabalan sometime to review

@vburenin
Copy link
Contributor Author

@vburenin thanks for this. can we create a JIRA for this?

It is in process, I may need to create multiple tickets. What I am really interested in right now is a code review so I do not spent too much time going forward and writing unit tests and then endup rolling it back.

result = bytes;
} else {
int start = buffer.position() + buffer.arrayOffset();
DatumReader reader = new GenericDatumReader(schema, sourceSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks good.

Copy link
Contributor

@nsivabalan nsivabalan Feb 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may I know whats the plan in keeping this file in sync with AbstractKafkaAvroDeserializer with version upgrades?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this work.

@Override
  protected Object deserialize(
      boolean includeSchemaAndVersion,
      String topic,
      Boolean isKey,
      byte[] payload,
      Schema readerSchema)
      throws SerializationException {
   super.deserialize(includeSchemaAndVersion,topic,isKey,payload, sourceSchema); 
// pass sourceSchema as last arg instead of readerSchema
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually should work, there was a history behind this change that predates me that ended up with only sourceSchema change as I currently see. So yah, it will drastically simplify things. Thanks for catching it!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more clarification @vburenin. Is this approach (passing sourceSchema instead of reader schema) is going to be useful for everyone(kakfa source users), even if they are not interested in meta fields to assist in schema evolution? If yes, wondering if we should make this customer deserializer as default kafka deserializer for hudi going forward?
Can you throw some light here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is definitely going to be useful for everybody. Kafka fields were added after this change was introduced. We had to keep up with the schema changes in the registry and this was the way to do that. The primary assumption, that is most likely true for every user, is that a schema is evolving... so, this is the way to keep up with the schema evolution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


public static final String INJECT_KAFKA_FIELDS = "hoodie.deltastreamer.source.inject_kafka_fields";

public static final String KAFKA_PARTITION = "_hudi_kafka_partition";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we plan to provide this as generic solution for everyone, lets have consensus on meta fields naming. @vinothchandar .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, we also can make it configurable.

@vburenin vburenin changed the title [WIP] Added custom kafka meta fields and custom kafka avro decoder. [HUDI-1648] Added custom kafka meta fields and custom kafka avro decoder. Mar 1, 2021
@nsivabalan nsivabalan added the priority:critical Production degraded; pipelines stalled label Mar 2, 2021
@vburenin
Copy link
Contributor Author

vburenin commented Mar 2, 2021

Is being split into multiple PRs. Closing.

@vburenin vburenin closed this Mar 2, 2021
@ouzhang
Copy link

ouzhang commented Apr 21, 2021

Hello. I notice that "[HUDI-1650] Custom avro kafka deserializer." has been merged. Is there another PR related to "custom kafka meta fields" ? Expecting to have this feature.

@vburenin
Copy link
Contributor Author

There is no related PR yet. I've been planning to contribute this feature to upstream in Q2, so far it seems possible.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

priority:critical Production degraded; pipelines stalled

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants