Skip to content

Conversation

@YannByron
Copy link
Contributor

@YannByron YannByron commented Aug 23, 2022

Change Logs

This PR is going to support CDC on Spark.

The changes are listed:

  1. add a new field named cdcPath in HoodieWriteStat to hold the cdc file.
  2. upgrade HooideMergeHandle and its sub-classes to persist the cdc data to the cdc log file if needed.
  3. extract some cdc common classes in hudi-common that will be used to implement CDCReader for different engines;
  4. CDCReader to response the CDC query on spark.

Impact

Low

@YannByron YannByron changed the title [RFC-51][HUDI-3478] Support CDC for Spark in Hudi [HUDI-3478] Support CDC for Spark in Hudi Aug 23, 2022
@XuQianJin-Stars
Copy link
Contributor

nice work!

@YannByron
Copy link
Contributor Author

@prasannarajaperumal @xushiyan
This pr has been updated according to the updated RFC: #6256
please help to review this.

@xushiyan xushiyan added the priority:blocker Production down; release blocker label Aug 26, 2022
Copy link
Member

@xushiyan xushiyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will review a few more rounds. can you please also populate the PR description as per the PR template

totalLogBlocks.incrementAndGet();
if (logBlock.getBlockType() == CDC_DATA_BLOCK) {
// hit a cdc block, just skip.
continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get instant time above should be skipped if skipping here. also, should totalLogBlocks increment? pls confirm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to the following logic, even the block doesn't be read, totalLogBlocks also should increment.
image

@YannByron
Copy link
Contributor Author

@xushiyan have updated this, and solved the comments above. pls review again.

Copy link
Contributor

@prasannarajaperumal prasannarajaperumal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannByron - Thanks for making the implementation changes. I made an initial pass and suggested some changes.

I really would like this PR to be split into 2 parts

  1. All changes related to writing the CDC log blocks, We will try to land this first. When I enable CDC and set a logging mode - the CDC data is written but its not queryable yet.
  2. Changes related to constructing the CDC stream (Identifying cdc splits)

Doing 1 as the first step will make the abstractions we want to have clear in the writing path. For e.g. I am not too convinced about moving the log writer code into HoodieMergeHandle, I am still thinking what an alternative here would be.

Lets please split this PR and we can discuss more there.

Does that make sense? @YannByron
cc fyi @xushiyan @vinothchandar @yihua

protected HoodieLogFormat.Writer cdcWriter;
// the cdc data
protected Map<String, SerializableRecord> cdcData;
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: doc missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, miss it.

import static org.apache.hudi.common.table.timeline.HoodieTimeline.isInRange;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;

public class CDCExtractor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add Java doc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if (StringUtils.isNullOrEmpty(writeStat.getCdcPath())) {
// no cdc log files can be used directly. we reuse the existing data file to retrieve the change data.
String path = writeStat.getPath();
if (path.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
Copy link
Contributor

@prasannarajaperumal prasannarajaperumal Sep 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not extensible to other file types

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me upgrade this to support other base file types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to support other base file types like orc in spark is blocked by https://issues.apache.org/jira/browse/HUDI-4496.

@YannByron
Copy link
Contributor Author

@prasannarajaperumal Thank you for reviewing this pr.

maybe I prefer to one single pr. i think these changes are easy to judgement which related to write or read. and one pr is better to run the whole process tests.

I am not too convinced about moving the log writer code into HoodieMergeHandle, I am still thinking what an alternative here would be.

One key reason that put the log writer code in HoodieMergeHandler is that: persisting CDC data need to get the old value and the current value, these values can be get directly in HoodieMergeHandler without any other overhead.

GenericRecord oldRecord, GenericRecord newRecord) {
GenericData.Record record;
if (cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER)) {
record = CDCUtils.cdcRecord(operation.getValue(), instantTime,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we prefix classes with Hoodie? like HoodieCDCUtils , which is the convention in the codebase

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment on lines 479 to 487
protected Option<AppendResult> writeCDCData() {
if (!cdcEnabled || cdcData.isEmpty() || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the data are new-coming,
return Option.empty();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we encapsulate CDC writer logic within dedicated CDC writer as much as possible? the concern here is we don't want to spread the condition checkings across different classes, which would be hard to maintain, in case any of these assumption changed in future. Grouping all the CDC logic together really helps with evolving/maintenance in future

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If inheritance hierarchy is complicated, lets us not do that. Instead can we create a HoodieCDCLogger as a helper class and move all code related to writing CDC data into this abstraction. You can just initialize a HoodieCDCLogger from HoodieMergeHandle?

Would that work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that should work. I will do this asap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second that: CDC is non-trivial logic that will be proliferated across Writing Handles, therefore we should encapsulate its state along w/ well-defined API w/in single component

Comment on lines 121 to 122
this.keyField = config.populateMetaFields() ? HoodieRecord.RECORD_KEY_METADATA_FIELD
: hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be multiple keys that user configured. we might need a keyFields as a list

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Comment on lines 358 to 359
public List<HoodieInstant> getInstantsAsList() {
return instants;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should have a clean up ticket to call this getInstants and the other getInstantsAsStream. Public API names should not cause confusion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where's this method used? Can't find any usage

reusableRecordBuilder.build()
}

private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify this big chunk of code removal? a result of refactoring? if that's the case, let's keep change minimal for CDC-related. try to achieve the same functionalities without moving code around. note that there can be other refactoring work going on around this and conflicts can be tricky

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieCDCRDD need to use LogFileIterator and RecordMergingFileIterator to extract the cdc data from the log files of MOR tables. So I just extend the arguments of the construct method to let them can be reused and move them to the common place. That's all.

@YannByron YannByron force-pushed the master_cdc_code branch 2 times, most recently from afde1d3 to 2de30cf Compare September 9, 2022 09:27
@YannByron
Copy link
Contributor Author

@xushiyan @prasannarajaperumal update pr to solve the comments above. please continue to review, thanks.

@YannByron
Copy link
Contributor Author

@hudi-bot run azure

Copy link
Contributor

@prasannarajaperumal prasannarajaperumal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a pass. Looks good to me except for a small abstraction comment. Let us make sure we create jiras for all follow up work and leave a TODO comment in code. This is a big feature and thanks for driving this.

Comment on lines 479 to 487
protected Option<AppendResult> writeCDCData() {
if (!cdcEnabled || cdcData.isEmpty() || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the data are new-coming,
return Option.empty();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If inheritance hierarchy is complicated, lets us not do that. Instead can we create a HoodieCDCLogger as a helper class and move all code related to writing CDC data into this abstraction. You can just initialize a HoodieCDCLogger from HoodieMergeHandle?

Would that work?

return writeRecord(hoodieRecord, indexedRecord, isDelete);
boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
if (cdcEnabled) {
String recordKey = StringUtils.join(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not be matching the record key. We should be using proper record-key in here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a custom key just for cdc when there is more that one record key which users provide.
and this key will be parsed and used in HoodieCDCRDD.

protected HoodieFileWriter<IndexedRecord> fileWriter;
// a flag that indicate whether allow the change data to write out a cdc log file.
protected boolean cdcEnabled = false;
protected String cdcSupplementalLoggingMode;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a String?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you suggest that should use an enum?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was reviewing this before i caught up w/ an updated version of the RFC so got confused.
Yeah, let's use enum for this one

return new SerializableRecord(record);
}

protected GenericRecord addCommitMetadata(GenericRecord record, String recordKey, String partitionPath) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we adding the metadata to CDC payload?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the before value or after value is persisted in CDC Block, we want to make the cdc record with the all fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meta fields carry purely semantical information related to their persistence by Hudi.
These aren't the part of the record's payload and we shouldn't be carrying them w/in CDC payload.

Copy link
Contributor Author

@YannByron YannByron Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense. so we don't add the meta fields in it, and should remove meta fields from the old record. has updated in this pr and the separate writing-related pr.

Comment on lines 479 to 487
protected Option<AppendResult> writeCDCData() {
if (!cdcEnabled || cdcData.isEmpty() || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
// the following cases where we do not need to write out the cdc file:
// case 1: all the data from the previous file slice are deleted. and no new data is inserted;
// case 2: all the data are new-coming,
return Option.empty();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second that: CDC is non-trivial logic that will be proliferated across Writing Handles, therefore we should encapsulate its state along w/ well-defined API w/in single component

: hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
if (cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's abstract this conditional as a method (i believe you already have one below)

.withFs(fs)
.withRolloverLogWriteToken(writeToken)
.withLogWriteToken(logWriteToken)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we mixing CDCBlocks w/ normal Delta Data Blocks?

I don't think we can do that as this will severely affect performance for pure non-CDC queries for MOR tables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cdc log file with CDCBlock is a normal log file, same with others. But when read mor in non-cdc mode, we will skip the cdc block directly only with the cost of reading the first few bytes from InputStream of the log files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So one of the pre-requisites of the CDC is:

  • When we're issuing normal Data query (and not a CDC one), there should be no performance impact to it

Moreover, we should clearly disambiguate the CDC infra from the Data infra w/o the need to even fetch the first block of the file (we can still use the same Log format, but we should definitely create separate naming scheme for CDC Log files to not mix these up w/ the Data Delta Log files)

Copy link
Contributor

@alexeykudinkin alexeykudinkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YannByron this is gigantic effort. Thanks for taking this forward!

I'd really like us to split this PR in 2 parts to accelerate our convergence and make sure we can iterate and move forward faster -- it's already 4k LOC and it's quite hard to digest all o it at once, let alone iterate on it.

It seems very natural to me to split it up into

  1. CDC writing part
  2. CDC reading part

WDYT?

if (oldRecord == null) {
recordKey = hoodieRecord.getRecordKey();
} else {
recordKey = StringUtils.join(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check my previous comment that we have to use proper KeyGen here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has fixed this. use KeyGen.

fileWriter.writeAvroWithMetadata(key, rewriteRecord(avroRecord));
}
if (cdcEnabled) {
cdcLogger.getAndIncrement();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be needed

*
* This class wraps [[GenericData.Record]].
*/
public class SerializableRecord implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not going to work -- Avro's Schema is not serializable by Java's default serialization protocol

Copy link
Contributor

@prasannarajaperumal prasannarajaperumal Sep 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serialization of external map is done by Kryo here. Is there a Serializer registered for Avro Schema with kyro?. I dont see the test for this with ExternalSpillableMap anymore - @YannByron - Can you check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will use HoodieAvroPayload as the value type of ExternalSpillableMap. This pr has updated but hasn't remove the SerializableRecord code.
and i am going to split this pr to two according to your suggestion. @alexeykudinkin @prasannarajaperumal

*/
public static GenericData.Record cdcRecord(
String op, String commitTime, GenericRecord before, GenericRecord after) {
String beforeJsonStr = recordToJson(before);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using JSON and not Avro for ex?

import java.util.function.Function;
import java.util.stream.Collectors;

public class HoodieCDCLogger<T extends HoodieRecordPayload> implements Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to parameterize this by HoodieRecordPayload

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. updated.


@Override
public boolean hasNext() {
if (null == itr || !itr.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep code style consistent (itr == null)

Comment on lines 358 to 359
public List<HoodieInstant> getInstantsAsList() {
return instants;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where's this method used? Can't find any usage

}
(shortName(), sqlSchema)

if (CDCRelation.isCDCTable(metaClient) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isCDCEnabled ("CDCTable" reference is confusing)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also limit this conditional to fetching schema:

val schema = if (cdc) {
  // cdc-schema
} else {
  // ...
}

(shortName(), sqlSchema)

if (CDCRelation.isCDCTable(metaClient) &&
parameters.get(QUERY_TYPE.key).contains(QUERY_TYPE_INCREMENTAL_OPT_VAL) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can re-use isCdcQuery

* @param sqlContext instance of SqlContext.
*/
class EmptyRelation(val sqlContext: SQLContext, metaClient: HoodieTableMetaClient) extends BaseRelation with TableScan {
class EmptyRelation(val sqlContext: SQLContext, metaClient: HoodieTableMetaClient, isCDCQuery: Boolean) extends BaseRelation with TableScan {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead, let's modify EmptyRelation to accept schema

@hudi-bot
Copy link
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@xushiyan
Copy link
Member

Split into read and write PRs

@xushiyan xushiyan closed this Sep 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

big-needle-movers priority:blocker Production down; release blocker

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

7 participants