Skip to content

Commit ac23d25

Browse files
[HUDI-1357] Added a check to validate records are not lost during merges. (#2216)
- Turned off by default
1 parent b826c53 commit ac23d25

File tree

6 files changed

+122
-2
lines changed

6 files changed

+122
-2
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
117117
public static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
118118
public static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
119119

120+
// Data validation check performed during merges before actual commits
121+
private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = "hoodie.merge.data.validation.enabled";
122+
private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = "false";
123+
120124
/**
121125
* HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
122126
* multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
@@ -282,6 +286,10 @@ public BulkInsertSortMode getBulkInsertSortMode() {
282286
return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
283287
}
284288

289+
public boolean isMergeDataValidationCheckEnabled() {
290+
return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED));
291+
}
292+
285293
/**
286294
* compaction properties.
287295
*/
@@ -983,6 +991,11 @@ public Builder withExternalSchemaTrasformation(boolean enabled) {
983991
return this;
984992
}
985993

994+
public Builder withMergeDataValidationCheckEnabled(boolean enabled) {
995+
props.setProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED, String.valueOf(enabled));
996+
return this;
997+
}
998+
986999
public Builder withProperties(Properties properties) {
9871000
this.props.putAll(properties);
9881001
return this;
@@ -1032,6 +1045,8 @@ protected void setDefaults() {
10321045
setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE), AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE);
10331046
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_SORT_MODE),
10341047
BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE);
1048+
setDefaultOnCondition(props, !props.containsKey(MERGE_DATA_VALIDATION_CHECK_ENABLED),
1049+
MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED);
10351050

10361051
// Make sure the props is propagated
10371052
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build());

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,11 @@
3434
import org.apache.hudi.common.util.Option;
3535
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
3636
import org.apache.hudi.config.HoodieWriteConfig;
37+
import org.apache.hudi.exception.HoodieCorruptedDataException;
3738
import org.apache.hudi.exception.HoodieIOException;
3839
import org.apache.hudi.exception.HoodieUpsertException;
40+
import org.apache.hudi.io.storage.HoodieFileReader;
41+
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
3942
import org.apache.hudi.io.storage.HoodieFileWriter;
4043
import org.apache.hudi.table.HoodieTable;
4144

@@ -292,6 +295,8 @@ public WriteStatus close() {
292295
runtimeStats.setTotalUpsertTime(timer.endTimer());
293296
stat.setRuntimeStats(runtimeStats);
294297

298+
performMergeDataValidationCheck(writeStatus);
299+
295300
LOG.info(String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(),
296301
stat.getFileId(), runtimeStats.getTotalUpsertTime()));
297302

@@ -301,6 +306,28 @@ public WriteStatus close() {
301306
}
302307
}
303308

309+
public void performMergeDataValidationCheck(WriteStatus writeStatus) {
310+
if (!config.isMergeDataValidationCheckEnabled()) {
311+
return;
312+
}
313+
314+
long oldNumWrites = 0;
315+
try {
316+
HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
317+
oldNumWrites = reader.getTotalRecords();
318+
} catch (IOException e) {
319+
throw new HoodieUpsertException("Failed to check for merge data validation", e);
320+
}
321+
322+
if ((writeStatus.getStat().getNumWrites() + writeStatus.getStat().getNumDeletes()) < oldNumWrites) {
323+
throw new HoodieCorruptedDataException(
324+
String.format("Record write count decreased for file: %s, Partition Path: %s (%s:%d + %d < %s:%d)",
325+
writeStatus.getFileId(), writeStatus.getPartitionPath(),
326+
instantTime, writeStatus.getStat().getNumWrites(), writeStatus.getStat().getNumDeletes(),
327+
FSUtils.getCommitTime(oldFilePath.toString()), oldNumWrites));
328+
}
329+
}
330+
304331
public Path getOldFilePath() {
305332
return oldFilePath;
306333
}

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,12 @@
4747
import org.apache.hudi.config.HoodieStorageConfig;
4848
import org.apache.hudi.config.HoodieWriteConfig;
4949
import org.apache.hudi.exception.HoodieCommitException;
50+
import org.apache.hudi.exception.HoodieCorruptedDataException;
5051
import org.apache.hudi.exception.HoodieIOException;
5152
import org.apache.hudi.exception.HoodieRollbackException;
5253
import org.apache.hudi.index.HoodieIndex;
5354
import org.apache.hudi.index.HoodieIndex.IndexType;
55+
import org.apache.hudi.io.HoodieMergeHandle;
5456
import org.apache.hudi.table.HoodieSparkTable;
5557
import org.apache.hudi.table.HoodieTable;
5658
import org.apache.hudi.table.MarkerFiles;
@@ -376,6 +378,53 @@ private void testUpsertsInternal(HoodieWriteConfig config,
376378
instants.get(3));
377379
assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "006"),
378380
instants.get(4));
381+
382+
final HoodieWriteConfig cfg = hoodieWriteConfig;
383+
final String instantTime = "007";
384+
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
385+
String basePathStr = basePath;
386+
HoodieTable table = getHoodieTable(metaClient, cfg);
387+
jsc.parallelize(Arrays.asList(1)).map(e -> {
388+
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
389+
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(
390+
metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get()).get(),
391+
HoodieCommitMetadata.class);
392+
String filePath = commitMetadata.getPartitionToWriteStats().values().stream()
393+
.flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(".parquet")).findAny()
394+
.map(ee -> ee.getPath()).orElse(null);
395+
String partitionPath = commitMetadata.getPartitionToWriteStats().values().stream()
396+
.flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(".parquet")).findAny()
397+
.map(ee -> ee.getPartitionPath()).orElse(null);
398+
Path parquetFilePath = new Path(basePathStr, filePath);
399+
HoodieBaseFile baseFile = new HoodieBaseFile(parquetFilePath.toString());
400+
401+
try {
402+
HoodieMergeHandle handle = new HoodieMergeHandle(cfg, instantTime, table, new HashMap<>(),
403+
partitionPath, FSUtils.getFileId(parquetFilePath.getName()), baseFile, new SparkTaskContextSupplier());
404+
WriteStatus writeStatus = new WriteStatus(false, 0.0);
405+
writeStatus.setStat(new HoodieWriteStat());
406+
writeStatus.getStat().setNumWrites(0);
407+
handle.performMergeDataValidationCheck(writeStatus);
408+
} catch (HoodieCorruptedDataException e1) {
409+
fail("Exception not expected because merge validation check is disabled");
410+
}
411+
412+
try {
413+
final String newInstantTime = "006";
414+
cfg.getProps().setProperty("hoodie.merge.data.validation.enabled", "true");
415+
HoodieWriteConfig cfg2 = HoodieWriteConfig.newBuilder().withProps(cfg.getProps()).build();
416+
HoodieMergeHandle handle = new HoodieMergeHandle(cfg2, newInstantTime, table, new HashMap<>(),
417+
partitionPath, FSUtils.getFileId(parquetFilePath.getName()), baseFile, new SparkTaskContextSupplier());
418+
WriteStatus writeStatus = new WriteStatus(false, 0.0);
419+
writeStatus.setStat(new HoodieWriteStat());
420+
writeStatus.getStat().setNumWrites(0);
421+
handle.performMergeDataValidationCheck(writeStatus);
422+
fail("The above line should have thrown an exception");
423+
} catch (HoodieCorruptedDataException e2) {
424+
// expected
425+
}
426+
return true;
427+
}).collect();
379428
}
380429

381430
/**

hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.parquet.avro.AvroSchemaConverter;
4040
import org.apache.parquet.hadoop.ParquetFileReader;
4141
import org.apache.parquet.hadoop.ParquetReader;
42+
import org.apache.parquet.hadoop.metadata.BlockMetaData;
4243
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
4344
import org.apache.parquet.schema.MessageType;
4445

@@ -261,6 +262,22 @@ public static List<GenericRecord> readAvroRecords(Configuration configuration, P
261262
return records;
262263
}
263264

265+
/**
266+
* Returns the number of records in the parquet file.
267+
*
268+
* @param conf Configuration
269+
* @param parquetFilePath path of the file
270+
*/
271+
public static long getRowCount(Configuration conf, Path parquetFilePath) {
272+
ParquetMetadata footer;
273+
long rowCount = 0;
274+
footer = readMetadata(conf, parquetFilePath);
275+
for (BlockMetaData b : footer.getBlocks()) {
276+
rowCount += b.getRowCount();
277+
}
278+
return rowCount;
279+
}
280+
264281
static class RecordKeysFilterFunction implements Function<String, Boolean> {
265282

266283
private final Set<String> candidateKeys;

hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ public void close() {
7474

7575
@Override
7676
public long getTotalRecords() {
77-
// TODO Auto-generated method stub
78-
return 0;
77+
return ParquetUtils.getRowCount(conf, path);
7978
}
8079
}

hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.parquet.hadoop.ParquetWriter;
3737
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
3838
import org.junit.jupiter.api.BeforeEach;
39+
import org.junit.jupiter.api.Test;
3940
import org.junit.jupiter.params.ParameterizedTest;
4041
import org.junit.jupiter.params.provider.Arguments;
4142
import org.junit.jupiter.params.provider.MethodSource;
@@ -147,6 +148,18 @@ public void testFetchRecordKeyPartitionPathFromParquet(String typeCode) throws E
147148
}
148149
}
149150

151+
@Test
152+
public void testReadCounts() throws Exception {
153+
String filePath = basePath + "/test.parquet";
154+
List<String> rowKeys = new ArrayList<>();
155+
for (int i = 0; i < 123; i++) {
156+
rowKeys.add(UUID.randomUUID().toString());
157+
}
158+
writeParquetFile(BloomFilterTypeCode.SIMPLE.name(), filePath, rowKeys);
159+
160+
assertEquals(123, ParquetUtils.getRowCount(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)));
161+
}
162+
150163
private void writeParquetFile(String typeCode, String filePath, List<String> rowKeys) throws Exception {
151164
writeParquetFile(typeCode, filePath, rowKeys, HoodieAvroUtils.getRecordKeySchema(), false, "");
152165
}

0 commit comments

Comments
 (0)