diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index dacef2d286ae4..804b2cea34abf 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -105,7 +105,6 @@ public JavaRDD> tagLocation(JavaRDD> recordRDD, recordRDD.unpersist(); // unpersist the input Record RDD keyFilenamePairRDD.unpersist(); } - return taggedRecordRDD; } @@ -321,8 +320,9 @@ JavaRDD> explodeRecordRDDWithFileComparisons( String recordKey = partitionRecordKeyPair._2(); String partitionPath = partitionRecordKeyPair._1(); - return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream() - .map(matchingFile -> new Tuple2<>(matchingFile, new HoodieKey(recordKey, partitionPath))) + return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream() + .map(partitionFileIdPair -> new Tuple2<>(partitionFileIdPair.getRight(), + new HoodieKey(recordKey, partitionPath))) .collect(Collectors.toList()); }).flatMap(List::iterator); } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java index 2881ce006f7b5..22042875cb116 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java @@ -66,7 +66,8 @@ class LazyKeyCheckIterator extends LazyIterableIterator computeNext() { @@ -113,6 +114,7 @@ protected List computeNext() { } @Override - protected void end() {} + protected void end() { + } } } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java index 8db45e035d75f..89291b78cb283 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java @@ -33,12 +33,11 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.Optional; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.stream.Collectors; import scala.Tuple2; @@ -59,7 +58,7 @@ public HoodieGlobalBloomIndex(HoodieWriteConfig config) { @Override @VisibleForTesting List> loadInvolvedFiles(List partitions, final JavaSparkContext jsc, - final HoodieTable hoodieTable) { + final HoodieTable hoodieTable) { HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); try { List allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), @@ -74,7 +73,7 @@ List> loadInvolvedFiles(List partitio * For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be * checked. For datasets, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files * to be compared gets cut down a lot from range pruning. - * + *

* Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on * recordKey ranges in the index info. the partition path of the incoming record (partitionRecordKeyPairRDD._2()) will * be ignored since the search scope should be bigger than that @@ -85,10 +84,6 @@ List> loadInvolvedFiles(List partitio JavaRDD> explodeRecordRDDWithFileComparisons( final Map> partitionToFileIndexInfo, JavaPairRDD partitionRecordKeyPairRDD) { - Map indexToPartitionMap = new HashMap<>(); - for (Entry> entry : partitionToFileIndexInfo.entrySet()) { - entry.getValue().forEach(indexFile -> indexToPartitionMap.put(indexFile.getFileId(), entry.getKey())); - } IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges() ? new IntervalTreeBasedGlobalIndexFileFilter(partitionToFileIndexInfo) @@ -98,26 +93,37 @@ JavaRDD> explodeRecordRDDWithFileComparisons( String recordKey = partitionRecordKeyPair._2(); String partitionPath = partitionRecordKeyPair._1(); - return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream() - .map(file -> new Tuple2<>(file, new HoodieKey(recordKey, indexToPartitionMap.get(file)))) + return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey).stream() + .map(partitionFileIdPair -> new Tuple2<>(partitionFileIdPair.getRight(), + new HoodieKey(recordKey, partitionFileIdPair.getLeft()))) .collect(Collectors.toList()); }).flatMap(List::iterator); } - /** * Tagging for global index should only consider the record key. */ @Override protected JavaRDD> tagLocationBacktoRecords( - JavaPairRDD keyFilenamePairRDD, JavaRDD> recordRDD) { - JavaPairRDD> rowKeyRecordPairRDD = + JavaPairRDD keyLocationPairRDD, JavaRDD> recordRDD) { + + JavaPairRDD> incomingRowKeyRecordPairRDD = recordRDD.mapToPair(record -> new Tuple2<>(record.getRecordKey(), record)); - // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), - // so we do left outer join. - return rowKeyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), p._2))) - .values().map(value -> getTaggedRecord(value._1, Option.ofNullable(value._2.orNull()))); + JavaPairRDD> existingRecordKeyToRecordLocationHoodieKeyMap = + keyLocationPairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), new Tuple2<>(p._2, p._1))); + + // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join. + return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().map(record -> { + final HoodieRecord hoodieRecord = record._1; + final Optional> recordLocationHoodieKeyPair = record._2; + if (recordLocationHoodieKeyPair.isPresent()) { + // Record key matched to file + return getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), Option.ofNullable(recordLocationHoodieKeyPair.get()._1)); + } else { + return getTaggedRecord(hoodieRecord, Option.empty()); + } + }); } @Override diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/IndexFileFilter.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/IndexFileFilter.java index 78173ff2ea124..017a5692252a3 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/IndexFileFilter.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/IndexFileFilter.java @@ -18,6 +18,8 @@ package org.apache.hudi.index.bloom; +import org.apache.hudi.common.util.collection.Pair; + import java.io.Serializable; import java.util.Set; @@ -27,12 +29,13 @@ public interface IndexFileFilter extends Serializable { /** - * Fetches all matching files for a given record key and partition. + * Fetches all matching files and partition pair for a given record key and partition path. * * @param partitionPath the partition path of interest - * @param recordKey the record key to be looked up - * @return the {@link Set} of matching file names where the record could potentially be present. + * @param recordKey the record key to be looked up + * @return the {@link Set} of matching pairs where the record could potentially be + * present. */ - Set getMatchingFiles(String partitionPath, String recordKey); + Set> getMatchingFilesAndPartition(String partitionPath, String recordKey); } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java index f3001ea05b435..ea670bb8a968e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java @@ -18,13 +18,15 @@ package org.apache.hudi.index.bloom; -import java.util.Collection; +import org.apache.hudi.common.util.collection.Pair; + +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; /** * Interval Tree based index look up for Global Index. Builds an {@link KeyRangeLookupTree} for all index files (across @@ -34,6 +36,7 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter { private final KeyRangeLookupTree indexLookUpTree = new KeyRangeLookupTree(); private final Set filesWithNoRanges = new HashSet<>(); + private final Map fileIdToPartitionPathMap = new HashMap<>(); /** * Instantiates {@link IntervalTreeBasedGlobalIndexFileFilter}. @@ -41,8 +44,15 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter { * @param partitionToFileIndexInfo Map of partition to List of {@link BloomIndexFileInfo}s */ IntervalTreeBasedGlobalIndexFileFilter(final Map> partitionToFileIndexInfo) { - List allIndexFiles = - partitionToFileIndexInfo.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); + List allIndexFiles = new ArrayList<>(); + + partitionToFileIndexInfo.forEach((parition, bloomIndexFileInfoList) -> { + bloomIndexFileInfoList.forEach(file -> { + fileIdToPartitionPathMap.put(file.getFileId(), parition); + allIndexFiles.add(file); + }); + }); + // Note that the interval tree implementation doesn't have auto-balancing to ensure logN search time. // So, we are shuffling the input here hoping the tree will not have any skewness. If not, the tree could be skewed // which could result in N search time instead of NlogN. @@ -58,10 +68,12 @@ class IntervalTreeBasedGlobalIndexFileFilter implements IndexFileFilter { } @Override - public Set getMatchingFiles(String partitionPath, String recordKey) { - Set toReturn = new HashSet<>(); - toReturn.addAll(indexLookUpTree.getMatchingIndexFiles(recordKey)); - toReturn.addAll(filesWithNoRanges); + public Set> getMatchingFilesAndPartition(String partitionPath, String recordKey) { + Set matchingFiles = new HashSet<>(); + matchingFiles.addAll(indexLookUpTree.getMatchingIndexFiles(recordKey)); + matchingFiles.addAll(filesWithNoRanges); + Set> toReturn = new HashSet<>(); + matchingFiles.forEach(file -> toReturn.add(Pair.of(fileIdToPartitionPathMap.get(file), file))); return toReturn; } } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedIndexFileFilter.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedIndexFileFilter.java index 849cdc6945e14..53ef653eb107d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedIndexFileFilter.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/IntervalTreeBasedIndexFileFilter.java @@ -18,6 +18,8 @@ package org.apache.hudi.index.bloom; +import org.apache.hudi.common.util.collection.Pair; + import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -62,14 +64,16 @@ class IntervalTreeBasedIndexFileFilter implements IndexFileFilter { } @Override - public Set getMatchingFiles(String partitionPath, String recordKey) { - Set toReturn = new HashSet<>(); + public Set> getMatchingFilesAndPartition(String partitionPath, String recordKey) { + Set> toReturn = new HashSet<>(); // could be null, if there are no files in a given partition yet or if all index files have no ranges if (partitionToFileIndexLookUpTree.containsKey(partitionPath)) { - toReturn.addAll(partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey)); + partitionToFileIndexLookUpTree.get(partitionPath).getMatchingIndexFiles(recordKey).forEach(file -> + toReturn.add(Pair.of(partitionPath, file))); } if (partitionToFilesWithNoRanges.containsKey(partitionPath)) { - toReturn.addAll(partitionToFilesWithNoRanges.get(partitionPath)); + partitionToFilesWithNoRanges.get(partitionPath).forEach(file -> + toReturn.add(Pair.of(partitionPath, file))); } return toReturn; } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/KeyRangeNode.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/KeyRangeNode.java index 2c8971aaa10b2..a5d27a74150d9 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/KeyRangeNode.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/KeyRangeNode.java @@ -73,8 +73,8 @@ public String toString() { * * @param that the {@link KeyRangeNode} to be compared with * @return the result of comparison. 0 if both min and max are equal in both. 1 if this {@link KeyRangeNode} is - * greater than the {@code that} keyRangeNode. -1 if {@code that} keyRangeNode is greater than this - * {@link KeyRangeNode} + * greater than the {@code that} keyRangeNode. -1 if {@code that} keyRangeNode is greater than this {@link + * KeyRangeNode} */ @Override public int compareTo(KeyRangeNode that) { diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/ListBasedGlobalIndexFileFilter.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/ListBasedGlobalIndexFileFilter.java index 1f3e04e2fc4b5..6f6fbdaec7ab8 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/ListBasedGlobalIndexFileFilter.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/ListBasedGlobalIndexFileFilter.java @@ -18,6 +18,8 @@ package org.apache.hudi.index.bloom; +import org.apache.hudi.common.util.collection.Pair; + import java.util.HashSet; import java.util.List; import java.util.Map; @@ -35,17 +37,14 @@ class ListBasedGlobalIndexFileFilter extends ListBasedIndexFileFilter { } @Override - public Set getMatchingFiles(String partitionPath, String recordKey) { - Set toReturn = new HashSet<>(); - partitionToFileIndexInfo.values().forEach(indexInfos -> { - if (indexInfos != null) { // could be null, if there are no files in a given partition yet. - // for each candidate file in partition, that needs to be compared. - for (BloomIndexFileInfo indexInfo : indexInfos) { - if (shouldCompareWithFile(indexInfo, recordKey)) { - toReturn.add(indexInfo.getFileId()); - } + public Set> getMatchingFilesAndPartition(String partitionPath, String recordKey) { + Set> toReturn = new HashSet<>(); + partitionToFileIndexInfo.forEach((partition, bloomIndexFileInfoList) -> { + bloomIndexFileInfoList.forEach(file -> { + if (shouldCompareWithFile(file, recordKey)) { + toReturn.add(Pair.of(partition, file.getFileId())); } - } + }); }); return toReturn; } diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/ListBasedIndexFileFilter.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/ListBasedIndexFileFilter.java index b9a6ce382f301..c5e39146fbdf9 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/ListBasedIndexFileFilter.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/ListBasedIndexFileFilter.java @@ -18,6 +18,8 @@ package org.apache.hudi.index.bloom; +import org.apache.hudi.common.util.collection.Pair; + import java.util.HashSet; import java.util.List; import java.util.Map; @@ -41,14 +43,14 @@ class ListBasedIndexFileFilter implements IndexFileFilter { } @Override - public Set getMatchingFiles(String partitionPath, String recordKey) { + public Set> getMatchingFilesAndPartition(String partitionPath, String recordKey) { List indexInfos = partitionToFileIndexInfo.get(partitionPath); - Set toReturn = new HashSet<>(); + Set> toReturn = new HashSet<>(); if (indexInfos != null) { // could be null, if there are no files in a given partition yet. // for each candidate file in partition, that needs to be compared. for (BloomIndexFileInfo indexInfo : indexInfos) { if (shouldCompareWithFile(indexInfo, recordKey)) { - toReturn.add(indexInfo.getFileId()); + toReturn.add(Pair.of(partitionPath, indexInfo.getFileId())); } } } diff --git a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java index 0b3b88a55ae8f..24d832b75417c 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java @@ -95,7 +95,7 @@ protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean } protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit, - HoodieIndex index) { + HoodieIndex index) { return new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index); } @@ -112,6 +112,9 @@ protected HoodieWriteConfig getConfig() { return getConfigBuilder().build(); } + protected HoodieWriteConfig getConfig(IndexType indexType) { + return getConfigBuilder(indexType).build(); + } /** * Get Config builder with default configs set. @@ -127,7 +130,20 @@ HoodieWriteConfig.Builder getConfigBuilder() { * * @return Config Builder */ + HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) { + return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType); + } + HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) { + return getConfigBuilder(schemaStr, IndexType.BLOOM); + } + + /** + * Get Config builder with default configs set. + * + * @return Config Builder + */ + HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) { return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2) .withWriteStatusClass(MetadataMergeWriteStatus.class) @@ -135,7 +151,7 @@ HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) { .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build()) .forTable("test-trip-table") - .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build()) .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder() .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()); } @@ -162,7 +178,7 @@ static void assertNoWriteErrors(List statuses) { * Ensure presence of partition meta-data at known depth. * * @param partitionPaths Partition paths to check - * @param fs File System + * @param fs File System * @throws IOException in case of error */ void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException { @@ -178,7 +194,7 @@ void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOEx * Ensure records have location field set. * * @param taggedRecords Tagged Records - * @param commitTime Commit Timestamp + * @param commitTime Commit Timestamp */ void checkTaggedRecords(List taggedRecords, String commitTime) { for (HoodieRecord rec : taggedRecords) { @@ -212,7 +228,7 @@ void assertNodupesWithinPartition(List records) { * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is * guaranteed by record-generation function itself. * - * @param writeConfig Hoodie Write Config + * @param writeConfig Hoodie Write Config * @param recordGenFunction Records Generation function * @return Wrapped function */ @@ -233,7 +249,7 @@ private Function2, String, Integer> wrapRecordsGenFunctionFor * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is * guaranteed by key-generation function itself. * - * @param writeConfig Hoodie Write Config + * @param writeConfig Hoodie Write Config * @param keyGenFunction Keys Generation function * @return Wrapped function */ @@ -255,12 +271,12 @@ private Function2, String, Integer> wrapDeleteKeysGenFunctionFor * Generate wrapper for record generation function for testing Prepped APIs. * * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs - * @param writeConfig Hoodie Write Config - * @param wrapped Actual Records Generation function + * @param writeConfig Hoodie Write Config + * @param wrapped Actual Records Generation function * @return Wrapped Function */ Function2, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI, - HoodieWriteConfig writeConfig, Function2, String, Integer> wrapped) { + HoodieWriteConfig writeConfig, Function2, String, Integer> wrapped) { if (isPreppedAPI) { return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped); } else { @@ -272,12 +288,12 @@ Function2, String, Integer> generateWrapRecordsFn(boolean isP * Generate wrapper for delete key generation function for testing Prepped APIs. * * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs - * @param writeConfig Hoodie Write Config - * @param wrapped Actual Records Generation function + * @param writeConfig Hoodie Write Config + * @param wrapped Actual Records Generation function * @return Wrapped Function */ Function2, String, Integer> generateWrapDeleteKeysFn(boolean isPreppedAPI, - HoodieWriteConfig writeConfig, Function2, String, Integer> wrapped) { + HoodieWriteConfig writeConfig, Function2, String, Integer> wrapped) { if (isPreppedAPI) { return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped); } else { @@ -288,22 +304,22 @@ Function2, String, Integer> generateWrapDeleteKeysFn(boolean isP /** * Helper to insert first batch of records and do regular assertions on the state after successful completion. * - * @param writeConfig Hoodie Write Config - * @param client Hoodie Write Client - * @param newCommitTime New Commit Timestamp to be used - * @param initCommitTime Begin Timestamp (usually "000") + * @param writeConfig Hoodie Write Config + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param initCommitTime Begin Timestamp (usually "000") * @param numRecordsInThisCommit Number of records to be added in the new commit - * @param writeFn Write Function to be used for insertion - * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records - * @param assertForCommit Enable Assertion of Writes + * @param writeFn Write Function to be used for insertion + * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records + * @param assertForCommit Enable Assertion of Writes * @param expRecordsInThisCommit Expected number of records in this commit * @return RDD of write-status * @throws Exception in case of error */ JavaRDD insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, - String initCommitTime, int numRecordsInThisCommit, - Function3, HoodieWriteClient, JavaRDD, String> writeFn, boolean isPreppedAPI, - boolean assertForCommit, int expRecordsInThisCommit) throws Exception { + String initCommitTime, int numRecordsInThisCommit, + Function3, HoodieWriteClient, JavaRDD, String> writeFn, boolean isPreppedAPI, + boolean assertForCommit, int expRecordsInThisCommit) throws Exception { final Function2, String, Integer> recordGenFunction = generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts); @@ -314,27 +330,27 @@ JavaRDD insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWrite /** * Helper to upsert batch of records and do regular assertions on the state after successful completion. * - * @param writeConfig Hoodie Write Config - * @param client Hoodie Write Client - * @param newCommitTime New Commit Timestamp to be used - * @param prevCommitTime Commit Timestamp used in previous commit + * @param writeConfig Hoodie Write Config + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param prevCommitTime Commit Timestamp used in previous commit * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime - * @param initCommitTime Begin Timestamp (usually "000") - * @param numRecordsInThisCommit Number of records to be added in the new commit - * @param writeFn Write Function to be used for upsert - * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records - * @param assertForCommit Enable Assertion of Writes - * @param expRecordsInThisCommit Expected number of records in this commit - * @param expTotalRecords Expected number of records when scanned - * @param expTotalCommits Expected number of commits (including this commit) + * @param initCommitTime Begin Timestamp (usually "000") + * @param numRecordsInThisCommit Number of records to be added in the new commit + * @param writeFn Write Function to be used for upsert + * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records + * @param assertForCommit Enable Assertion of Writes + * @param expRecordsInThisCommit Expected number of records in this commit + * @param expTotalRecords Expected number of records when scanned + * @param expTotalCommits Expected number of commits (including this commit) * @return RDD of write-status * @throws Exception in case of error */ JavaRDD updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, - String prevCommitTime, Option> commitTimesBetweenPrevAndNew, String initCommitTime, - int numRecordsInThisCommit, - Function3, HoodieWriteClient, JavaRDD, String> writeFn, boolean isPreppedAPI, - boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception { + String prevCommitTime, Option> commitTimesBetweenPrevAndNew, String initCommitTime, + int numRecordsInThisCommit, + Function3, HoodieWriteClient, JavaRDD, String> writeFn, boolean isPreppedAPI, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception { final Function2, String, Integer> recordGenFunction = generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates); @@ -346,25 +362,25 @@ JavaRDD updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClien /** * Helper to delete batch of keys and do regular assertions on the state after successful completion. * - * @param writeConfig Hoodie Write Config - * @param client Hoodie Write Client - * @param newCommitTime New Commit Timestamp to be used - * @param prevCommitTime Commit Timestamp used in previous commit - * @param initCommitTime Begin Timestamp (usually "000") + * @param writeConfig Hoodie Write Config + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param prevCommitTime Commit Timestamp used in previous commit + * @param initCommitTime Begin Timestamp (usually "000") * @param numRecordsInThisCommit Number of records to be added in the new commit - * @param deleteFn Delete Function to be used for deletes - * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records - * @param assertForCommit Enable Assertion of Writes + * @param deleteFn Delete Function to be used for deletes + * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records + * @param assertForCommit Enable Assertion of Writes * @param expRecordsInThisCommit Expected number of records in this commit - * @param expTotalRecords Expected number of records when scanned + * @param expTotalRecords Expected number of records when scanned * @return RDD of write-status * @throws Exception in case of error */ JavaRDD deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, - String prevCommitTime, String initCommitTime, - int numRecordsInThisCommit, - Function3, HoodieWriteClient, JavaRDD, String> deleteFn, boolean isPreppedAPI, - boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception { + String prevCommitTime, String initCommitTime, + int numRecordsInThisCommit, + Function3, HoodieWriteClient, JavaRDD, String> deleteFn, boolean isPreppedAPI, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception { final Function2, String, Integer> keyGenFunction = generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes); @@ -376,25 +392,25 @@ JavaRDD deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClien /** * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion. * - * @param client Hoodie Write Client - * @param newCommitTime New Commit Timestamp to be used - * @param prevCommitTime Commit Timestamp used in previous commit + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param prevCommitTime Commit Timestamp used in previous commit * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime - * @param initCommitTime Begin Timestamp (usually "000") - * @param numRecordsInThisCommit Number of records to be added in the new commit - * @param recordGenFunction Records Generation Function - * @param writeFn Write Function to be used for upsert - * @param assertForCommit Enable Assertion of Writes - * @param expRecordsInThisCommit Expected number of records in this commit - * @param expTotalRecords Expected number of records when scanned - * @param expTotalCommits Expected number of commits (including this commit) + * @param initCommitTime Begin Timestamp (usually "000") + * @param numRecordsInThisCommit Number of records to be added in the new commit + * @param recordGenFunction Records Generation Function + * @param writeFn Write Function to be used for upsert + * @param assertForCommit Enable Assertion of Writes + * @param expRecordsInThisCommit Expected number of records in this commit + * @param expTotalRecords Expected number of records when scanned + * @param expTotalCommits Expected number of commits (including this commit) * @throws Exception in case of error */ JavaRDD writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime, - Option> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit, - Function2, String, Integer> recordGenFunction, - Function3, HoodieWriteClient, JavaRDD, String> writeFn, - boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception { + Option> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit, + Function2, String, Integer> recordGenFunction, + Function3, HoodieWriteClient, JavaRDD, String> writeFn, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception { // Write 1 (only inserts) client.startCommitWithTime(newCommitTime); @@ -447,22 +463,22 @@ JavaRDD writeBatch(HoodieWriteClient client, String newCommitTime, /** * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion. * - * @param client Hoodie Write Client - * @param newCommitTime New Commit Timestamp to be used - * @param prevCommitTime Commit Timestamp used in previous commit - * @param initCommitTime Begin Timestamp (usually "000") - * @param keyGenFunction Key Generation function - * @param deleteFn Write Function to be used for delete - * @param assertForCommit Enable Assertion of Writes + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param prevCommitTime Commit Timestamp used in previous commit + * @param initCommitTime Begin Timestamp (usually "000") + * @param keyGenFunction Key Generation function + * @param deleteFn Write Function to be used for delete + * @param assertForCommit Enable Assertion of Writes * @param expRecordsInThisCommit Expected number of records in this commit - * @param expTotalRecords Expected number of records when scanned + * @param expTotalRecords Expected number of records when scanned * @throws Exception in case of error */ JavaRDD deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime, - String initCommitTime, int numRecordsInThisCommit, - Function2, String, Integer> keyGenFunction, - Function3, HoodieWriteClient, JavaRDD, String> deleteFn, - boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception { + String initCommitTime, int numRecordsInThisCommit, + Function2, String, Integer> keyGenFunction, + Function3, HoodieWriteClient, JavaRDD, String> deleteFn, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception { // Delete 1 (only deletes) client.startCommitWithTime(newCommitTime); @@ -510,7 +526,7 @@ JavaRDD deleteBatch(HoodieWriteClient client, String newCommitTime, * Get Cleaner state corresponding to a partition path. * * @param hoodieCleanStatsTwo List of Clean Stats - * @param partitionPath Partition path for filtering + * @param partitionPath Partition path for filtering * @return Cleaner state corresponding to partition path */ HoodieCleanStat getCleanStat(List hoodieCleanStatsTwo, String partitionPath) { @@ -520,9 +536,9 @@ HoodieCleanStat getCleanStat(List hoodieCleanStatsTwo, String p /** * Utility to simulate commit touching files in a partition. * - * @param files List of file-Ids to be touched + * @param files List of file-Ids to be touched * @param partitionPath Partition - * @param commitTime Commit Timestamp + * @param commitTime Commit Timestamp * @throws IOException in case of error */ void updateAllFilesInPartition(List files, String partitionPath, String commitTime) throws IOException { @@ -535,8 +551,8 @@ void updateAllFilesInPartition(List files, String partitionPath, String * Helper methods to create new data files in a partition. * * @param partitionPath Partition - * @param commitTime Commit Timestamp - * @param numFiles Number of files to be added + * @param commitTime Commit Timestamp + * @param numFiles Number of files to be added * @return Created files * @throws IOException in case of error */ diff --git a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java index bb716d650aee8..60655b81073d7 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java @@ -45,6 +45,7 @@ import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.table.HoodieTable; import org.apache.avro.generic.GenericRecord; @@ -138,7 +139,7 @@ public void testAutoCommitOnBulkInsertPrepped() throws Exception { * @throws Exception in case of failure */ private void testAutoCommit(Function3, HoodieWriteClient, JavaRDD, String> writeFn, - boolean isPrepped) throws Exception { + boolean isPrepped) throws Exception { // Set autoCommit false HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); try (HoodieWriteClient client = getHoodieWriteClient(cfg);) { @@ -261,12 +262,12 @@ public void testUpsertsPrepped() throws Exception { /** * Test one of HoodieWriteClient upsert(Prepped) APIs. * - * @param config Write Config + * @param config Write Config * @param writeFn One of Hoodie Write Function API * @throws Exception in case of error */ private void testUpsertsInternal(HoodieWriteConfig config, - Function3, HoodieWriteClient, JavaRDD, String> writeFn, boolean isPrepped) + Function3, HoodieWriteClient, JavaRDD, String> writeFn, boolean isPrepped) throws Exception { // Force using older timeline layout HoodieWriteConfig hoodieWriteConfig = getConfigBuilder().withProps(config.getProps()).withTimelineLayoutVersion( @@ -382,6 +383,59 @@ public void testDeletes() throws Exception { HoodieWriteClient::upsert, true, 50, 150, 2); } + /** + * Test update of a record to different partition with Global Index. + */ + @Test + public void testUpsertToDiffPartitionGlobalIndex() throws Exception { + HoodieWriteClient client = getHoodieWriteClient(getConfig(IndexType.GLOBAL_BLOOM), false); + /** + * Write 1 (inserts and deletes) Write actual 200 insert records and ignore 100 delete records + */ + String newCommitTime = "001"; + List inserts1 = dataGen.generateInserts(newCommitTime, 10); + + // Write 1 (only inserts) + client.startCommitWithTime(newCommitTime); + JavaRDD writeRecords = jsc.parallelize(inserts1, 1); + + JavaRDD result = client.insert(writeRecords, newCommitTime); + List statuses = result.collect(); + assertNoWriteErrors(statuses); + + // check the partition metadata is written out + assertPartitionMetadata(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, fs); + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + assertEquals("Must contain " + 10 + " records", 10, + HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); + + /** + * Write 2. Updates with different partition + */ + newCommitTime = "004"; + client.startCommitWithTime(newCommitTime); + + List updates1 = dataGen.generateUpdatesWithDiffPartition(newCommitTime, inserts1); + JavaRDD updateRecords = jsc.parallelize(updates1, 1); + + JavaRDD result1 = client.upsert(updateRecords, newCommitTime); + List statuses1 = result1.collect(); + assertNoWriteErrors(statuses1); + + // check the partition metadata is written out + assertPartitionMetadata(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, fs); + // Check the entire dataset has all records still + fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + assertEquals("Must contain " + 10 + " records", 10, + HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); + } + /** * Test scenario of new file-group getting added during upsert(). */ @@ -391,7 +445,7 @@ public void testSmallInsertHandlingForUpserts() throws Exception { final int insertSplitLimit = 100; // setup the small file handling params HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max - dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath}); + dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); HoodieWriteClient client = getHoodieWriteClient(config, false); @@ -504,7 +558,7 @@ public void testSmallInsertHandlingForInserts() throws Exception { final int insertSplitLimit = 100; // setup the small file handling params HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max - dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath}); + dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); HoodieWriteClient client = getHoodieWriteClient(config, false); // Inserts => will write file1 @@ -516,7 +570,7 @@ public void testSmallInsertHandlingForInserts() throws Exception { List statuses = client.insert(insertRecordsRDD1, commitTime1).collect(); assertNoWriteErrors(statuses); - assertPartitionMetadata(new String[]{testPartitionPath}, fs); + assertPartitionMetadata(new String[] {testPartitionPath}, fs); assertEquals("Just 1 file needs to be added.", 1, statuses.size()); String file1 = statuses.get(0).getFileId(); @@ -586,7 +640,7 @@ public void testDeletesWithDeleteApi() throws Exception { List keysSoFar = new ArrayList<>(); // setup the small file handling params HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max - dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath}); + dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); HoodieWriteClient client = getHoodieWriteClient(config, false); @@ -648,7 +702,7 @@ public void testDeletesWithDeleteApi() throws Exception { } private Pair, List> testUpdates(String commitTime, HoodieWriteClient client, - int sizeToInsertAndUpdate, int expectedTotalRecords) + int sizeToInsertAndUpdate, int expectedTotalRecords) throws IOException { client.startCommitWithTime(commitTime); List inserts = dataGen.generateInserts(commitTime, sizeToInsertAndUpdate); @@ -672,7 +726,7 @@ private Pair, List> testUpdates(String commitTime, Hoo } private void testDeletes(HoodieWriteClient client, List previousRecords, int sizeToDelete, - String existingFile, String commitTime, int exepctedRecords, List keys) { + String existingFile, String commitTime, int exepctedRecords, List keys) { client.startCommitWithTime(commitTime); List hoodieKeysToDelete = HoodieClientTestUtils @@ -714,7 +768,7 @@ public void testDeletesWithoutInserts() throws Exception { final int insertSplitLimit = 100; // setup the small file handling params HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, true); // hold upto 200 records max - dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath}); + dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); HoodieWriteClient client = getHoodieWriteClient(config, false); diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java index 75ce74a76df72..e6f8d15e56c11 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java @@ -315,6 +315,24 @@ public List generateUpdates(String commitTime, List return updates; } + public List generateUpdatesWithDiffPartition(String commitTime, List baseRecords) + throws IOException { + List updates = new ArrayList<>(); + for (HoodieRecord baseRecord : baseRecords) { + String partition = baseRecord.getPartitionPath(); + String newPartition = ""; + if (partitionPaths[0].equalsIgnoreCase(partition)) { + newPartition = partitionPaths[1]; + } else { + newPartition = partitionPaths[0]; + } + HoodieKey key = new HoodieKey(baseRecord.getRecordKey(), newPartition); + HoodieRecord record = generateUpdateRecord(key, commitTime); + updates.add(record); + } + return updates; + } + /** * Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned * list diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java index ebfb8cfa5a327..1a3128dda6904 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java @@ -22,7 +22,9 @@ import org.apache.hudi.config.HoodieHBaseIndexConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.index.bloom.HoodieBloomIndex; +import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex; import org.apache.hudi.index.hbase.HBaseIndex; import org.junit.After; @@ -62,5 +64,8 @@ public void testCreateIndex() throws Exception { config = clientConfigBuilder.withPath(basePath) .withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); assertTrue(HoodieIndex.createIndex(config, jsc) instanceof HoodieBloomIndex); + config = clientConfigBuilder.withPath(basePath) + .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.GLOBAL_BLOOM).build()).build(); + assertTrue(HoodieIndex.createIndex(config, jsc) instanceof HoodieGlobalBloomIndex); } } diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index d140c547a29e7..55afb11dc234a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -61,7 +61,8 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { private String schemaStr; private Schema schema; - public TestHoodieGlobalBloomIndex() throws Exception {} + public TestHoodieGlobalBloomIndex() throws Exception { + } @Before public void setUp() throws Exception { @@ -171,7 +172,7 @@ public void testExplodeRecordRDDWithFileComparisons() { partitionToFileIndexInfo.put("2017/10/23", Arrays.asList(new BloomIndexFileInfo("f4", "002", "007"), new BloomIndexFileInfo("f5", "009", "010"))); - // the partition partition of the key of the incoming records will be ignored + // the partition of the key of the incoming records will be ignored JavaPairRDD partitionRecordKeyPairRDD = jsc.parallelize(Arrays.asList(new Tuple2<>("2017/10/21", "003"), new Tuple2<>("2017/10/22", "002"), new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/23", "004"))).mapToPair(t -> t); @@ -240,7 +241,7 @@ public void testTagLocation() throws Exception { TestRawTripPayload rowChange5 = new TestRawTripPayload("{\"_row_key\":\"003\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}"); HoodieRecord record5 = - new HoodieRecord(new HoodieKey(rowChange5.getRowKey(), rowChange5.getPartitionPath()), rowChange4); + new HoodieRecord(new HoodieKey(rowChange5.getRowKey(), rowChange5.getPartitionPath()), rowChange5); JavaRDD recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5)); @@ -257,7 +258,6 @@ public void testTagLocation() throws Exception { metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - // Add some commits new File(basePath + "/.hoodie").mkdirs(); @@ -267,12 +267,19 @@ public void testTagLocation() throws Exception { for (HoodieRecord record : taggedRecordRDD.collect()) { if (record.getRecordKey().equals("000")) { assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename0))); + assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange1.getJsonData()); } else if (record.getRecordKey().equals("001")) { assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename2))); + assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange2.getJsonData()); } else if (record.getRecordKey().equals("002")) { assertTrue(!record.isCurrentLocationKnown()); + assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange3.getJsonData()); + } else if (record.getRecordKey().equals("003")) { + assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3))); + assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange5.getJsonData()); } else if (record.getRecordKey().equals("004")) { assertTrue(record.getCurrentLocation().getFileId().equals(FSUtils.getFileId(filename3))); + assertEquals(((TestRawTripPayload) record.getData()).getJsonData(), rowChange4.getJsonData()); } } }