diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index aeefe6c3043b1..1417e40a9f587 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -40,7 +40,6 @@ import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.io.HoodieRangeInfoHandle; import org.apache.hudi.table.HoodieTable; - import org.apache.log4j.LogManager; import org.apache.log4j.Logger; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java index c42d80c62e758..cffee5ee74081 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java @@ -20,7 +20,6 @@ package org.apache.hudi.index.bloom; import org.apache.hudi.common.data.HoodieData; -import org.apache.hudi.common.data.HoodieList; import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; @@ -60,7 +59,7 @@ public HoodiePairData findMatchingFilesForRecor HoodieData> fileComparisonPairs, Map> partitionToFileInfo, Map recordsPerPartition) { List> fileComparisonPairList = - HoodieList.getList(fileComparisonPairs).stream() + fileComparisonPairs.collectAsList().stream() .sorted(Comparator.comparing(Pair::getLeft)).collect(toList()); List keyLookupResults = new ArrayList<>(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java index c3584d234a8e5..cbb3b07f4457f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java @@ -83,7 +83,8 @@ protected HoodieRecord computeNext() { Option loc = mapper.getRecordLocation(record.getKey(), record.getPartitionPath()); return HoodieIndexUtils.getTaggedRecord(record, loc); } - } + }, + false ); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index ac75997ea5c92..dce92458601d0 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -20,7 +20,7 @@ import org.apache.hudi.async.AsyncCleanerService; import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; @@ -127,8 +127,7 @@ public List> filterExists(List> hoodieRecords) { // Create a Hoodie table which encapsulated the commits and files visible HoodieFlinkTable table = getHoodieTable(); Timer.Context indexTimer = metrics.getIndexCtx(); - List> recordsWithLocation = HoodieList.getList( - getIndex().tagLocation(HoodieList.of(hoodieRecords), context, table)); + List> recordsWithLocation = getIndex().tagLocation(HoodieListData.eager(hoodieRecords), context, table).collectAsList(); metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList()); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index 62f8d4fa03ad2..76ca606009689 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -23,7 +23,7 @@ import org.apache.hudi.common.data.HoodieAccumulator; import org.apache.hudi.common.data.HoodieAtomicLongAccumulator; import org.apache.hudi.common.data.HoodieData; -import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.TaskContextSupplier; @@ -84,12 +84,12 @@ public HoodieAccumulator newAccumulator() { @Override public HoodieData emptyHoodieData() { - return HoodieList.of(Collections.emptyList()); + return HoodieListData.eager(Collections.emptyList()); } @Override public HoodieData parallelize(List data, int parallelism) { - return HoodieList.of(data); + return HoodieListData.eager(data); } public RuntimeContext getRuntimeContext() { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java index 66c1b07793ee7..be2273a8409b8 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java @@ -23,7 +23,7 @@ import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; -import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -61,8 +61,8 @@ public abstract List> tagLocation(List> records, public HoodieData> tagLocation( HoodieData> records, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { - List> hoodieRecords = tagLocation(HoodieList.getList(records.map(record -> (HoodieRecord) record)), context, hoodieTable); - return HoodieList.of(hoodieRecords.stream().map(r -> (HoodieRecord) r).collect(Collectors.toList())); + List> hoodieRecords = tagLocation(records.map(record -> (HoodieRecord) record).collectAsList(), context, hoodieTable); + return HoodieListData.eager(hoodieRecords.stream().map(r -> (HoodieRecord) r).collect(Collectors.toList())); } @Override @@ -70,6 +70,6 @@ public HoodieData> tagLocation( public HoodieData updateLocation( HoodieData writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { - return HoodieList.of(updateLocation(HoodieList.getList(writeStatuses), context, hoodieTable)); + return HoodieListData.eager(updateLocation(writeStatuses.collectAsList(), context, hoodieTable)); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 222ff78edc9fe..81df5a93615f5 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -21,7 +21,6 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; -import org.apache.hudi.common.data.HoodieList; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.HoodieRecord; @@ -106,7 +105,7 @@ protected void commit(String instantTime, Map preppedRecords = prepRecords(partitionRecordsMap); - List preppedRecordList = HoodieList.getList(preppedRecords); + List preppedRecordList = preppedRecords.collectAsList(); try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) { if (canTriggerTableService) { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index 26149918c6549..3431c7334a3bb 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -40,8 +40,6 @@ import java.util.List; -import static org.apache.hudi.common.data.HoodieList.getList; - public abstract class HoodieFlinkTable extends HoodieTable>, List, List> implements ExplicitWriteHandleTable { @@ -78,7 +76,7 @@ public static HoodieFlinkTable create(HoodieW public static HoodieWriteMetadata> convertMetadata( HoodieWriteMetadata> metadata) { - return metadata.clone(getList(metadata.getWriteStatuses())); + return metadata.clone(metadata.getWriteStatuses().collectAsList()); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java index 8dd0c99bae299..9540354e10a74 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java @@ -19,7 +19,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -97,8 +97,7 @@ public HoodieWriteMetadata> execute(String instantTime, dedupedKeys.stream().map(key -> new HoodieAvroRecord<>(key, new EmptyHoodieRecordPayload())).collect(Collectors.toList()); Instant beginTag = Instant.now(); // perform index look up to get existing location of records - List> taggedRecords = HoodieList.getList( - table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context, table)); + List> taggedRecords = table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context, table).collectAsList(); Duration tagLocationDuration = Duration.between(beginTag, Instant.now()); // filter out non existent keys/records diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index 9c17e77b91831..57e5aa9ad50c0 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -19,7 +19,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; @@ -83,8 +83,7 @@ public HoodieWriteMetadata> write(String instantTime, List> tag(List> dedupedRecords, HoodieEngineContext context, HoodieTable>, List, List> table) { - return HoodieList.getList( - table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context, table)); + return table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context, table).collectAsList(); } @Override diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java index e23ee4ad58e6e..b62e09eb275f6 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java @@ -18,11 +18,12 @@ package org.apache.hudi.index.bloom; +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; -import org.apache.hudi.common.data.HoodieList; -import org.apache.hudi.common.data.HoodieMapPair; +import org.apache.hudi.common.data.HoodieListPairData; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -37,9 +38,6 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.HoodieFlinkClientTestHarness; import org.apache.hudi.testutils.HoodieFlinkWriteableTestTable; - -import org.apache.avro.Schema; -import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -185,8 +183,7 @@ public void testRangePruning(boolean rangePruning, boolean treeFiltering, boolea partitionRecordKeyMap.put(t.getLeft(), recordKeyList); }); - List> comparisonKeyList = HoodieList.getList( - index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieMapPair.of(partitionRecordKeyMap))); + List> comparisonKeyList = index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList(); assertEquals(10, comparisonKeyList.size()); java.util.Map> recordKeyToFileComps = comparisonKeyList.stream() diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java index 054a363168d75..dfbb3336b9a3e 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java @@ -21,7 +21,8 @@ import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; @@ -133,7 +134,7 @@ protected void initMetaClient(HoodieTableType tableType) throws IOException { protected List tagLocation( HoodieIndex index, List records, HoodieTable table) { - return HoodieList.getList(index.tagLocation(HoodieList.of(records), context, table)); + return ((HoodieData) index.tagLocation(HoodieListData.eager(records), context, table)).collectAsList(); } /** diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index fbfb85bab3b8f..b6951bc6b7874 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -20,7 +20,7 @@ import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; -import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; @@ -67,8 +67,7 @@ public List> filterExists(List> hoodieRecords) { // Create a Hoodie table which encapsulated the commits and files visible HoodieJavaTable table = HoodieJavaTable.create(config, (HoodieJavaEngineContext) context); Timer.Context indexTimer = metrics.getIndexCtx(); - List> recordsWithLocation = HoodieList.getList( - getIndex().tagLocation(HoodieList.of(hoodieRecords), context, table)); + List> recordsWithLocation = getIndex().tagLocation(HoodieListData.eager(hoodieRecords), context, table).collectAsList(); metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList()); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java index adcbb874e84e6..456bb3cb47c0f 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java @@ -25,7 +25,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.JavaTaskContextSupplier; import org.apache.hudi.common.data.HoodieData; -import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.ClusteringOperation; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -94,7 +94,7 @@ public HoodieWriteMetadata> performClustering( Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), instantTime))); HoodieWriteMetadata> writeMetadata = new HoodieWriteMetadata<>(); - writeMetadata.setWriteStatuses(HoodieList.of(writeStatusList)); + writeMetadata.setWriteStatuses(HoodieListData.eager(writeStatusList)); return writeMetadata; } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java index 22d4ccabcdd67..2211c8a1030ae 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java @@ -24,7 +24,7 @@ import org.apache.hudi.common.data.HoodieAccumulator; import org.apache.hudi.common.data.HoodieAtomicLongAccumulator; import org.apache.hudi.common.data.HoodieData; -import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.TaskContextSupplier; @@ -74,12 +74,12 @@ public HoodieAccumulator newAccumulator() { @Override public HoodieData emptyHoodieData() { - return HoodieList.of(Collections.emptyList()); + return HoodieListData.eager(Collections.emptyList()); } @Override public HoodieData parallelize(List data, int parallelism) { - return HoodieList.of(data); + return HoodieListData.eager(data); } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java index dd64859cad7e5..dcc9d050dcbe5 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java @@ -23,7 +23,7 @@ import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; -import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -58,8 +58,8 @@ public abstract List> tagLocation(List> records, public HoodieData> tagLocation( HoodieData> records, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { - List> hoodieRecords = tagLocation(HoodieList.getList(records.map(record -> (HoodieRecord) record)), context, hoodieTable); - return HoodieList.of(hoodieRecords.stream().map(r -> (HoodieRecord) r).collect(Collectors.toList())); + List> hoodieRecords = tagLocation(records.map(record -> (HoodieRecord) record).collectAsList(), context, hoodieTable); + return HoodieListData.eager(hoodieRecords.stream().map(r -> (HoodieRecord) r).collect(Collectors.toList())); } @Override @@ -67,6 +67,6 @@ public HoodieData> tagLocation( public HoodieData updateLocation( HoodieData writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException { - return HoodieList.of(updateLocation(HoodieList.getList(writeStatuses), context, hoodieTable)); + return HoodieListData.eager(updateLocation(writeStatuses.collectAsList(), context, hoodieTable)); } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java index f9c7caff6ebbc..3c878cbc14cf8 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java @@ -36,8 +36,6 @@ import java.util.List; -import static org.apache.hudi.common.data.HoodieList.getList; - public abstract class HoodieJavaTable extends HoodieTable>, List, List> { protected HoodieJavaTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { @@ -67,7 +65,7 @@ public static HoodieJavaTable create(HoodieWr public static HoodieWriteMetadata> convertMetadata( HoodieWriteMetadata> metadata) { - return metadata.clone(getList(metadata.getWriteStatuses())); + return metadata.clone(metadata.getWriteStatuses().collectAsList()); } @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java index dc6994d315f02..22c90eb8bb445 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java @@ -19,7 +19,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -130,8 +130,7 @@ public HoodieWriteMetadata> execute(List> inpu protected List updateIndex(List writeStatuses, HoodieWriteMetadata> result) { Instant indexStartTime = Instant.now(); // Update the index back - List statuses = HoodieList.getList( - table.getIndex().updateLocation(HoodieList.of(writeStatuses), context, table)); + List statuses = table.getIndex().updateLocation(HoodieListData.eager(writeStatuses), context, table).collectAsList(); result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now())); result.setWriteStatuses(statuses); return statuses; @@ -339,8 +338,7 @@ public Partitioner getInsertPartitioner(WorkloadProfile profile) { public void updateIndexAndCommitIfNeeded(List writeStatuses, HoodieWriteMetadata result) { Instant indexStartTime = Instant.now(); // Update the index back - List statuses = HoodieList.getList( - table.getIndex().updateLocation(HoodieList.of(writeStatuses), context, table)); + List statuses = table.getIndex().updateLocation(HoodieListData.eager(writeStatuses), context, table).collectAsList(); result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now())); result.setWriteStatuses(statuses); result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result)); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java index f82c1c561b2c5..57d796c925298 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaDeleteHelper.java @@ -19,7 +19,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -99,8 +99,7 @@ public HoodieWriteMetadata> execute(String instantTime, dedupedKeys.stream().map(key -> new HoodieAvroRecord<>(key, new EmptyHoodieRecordPayload())).collect(Collectors.toList()); Instant beginTag = Instant.now(); // perform index look up to get existing location of records - List> taggedRecords = HoodieList.getList( - table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context, table)); + List> taggedRecords = table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context, table).collectAsList(); Duration tagLocationDuration = Duration.between(beginTag, Instant.now()); // filter out non existent keys/records diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java index 3a1fa4b884fd0..4504a9bdccddf 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java @@ -19,7 +19,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; @@ -50,8 +50,7 @@ public static JavaWriteHelper newInstance() { @Override protected List> tag(List> dedupedRecords, HoodieEngineContext context, HoodieTable>, List, List> table) { - return HoodieList.getList( - table.getIndex().tagLocation(HoodieList.of(dedupedRecords), context, table)); + return table.getIndex().tagLocation(HoodieListData.eager(dedupedRecords), context, table).collectAsList(); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java index ddcaaec0fa6ca..9ec3c4cf71592 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java @@ -31,6 +31,7 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.storage.StorageLevel; +import java.util.List; import java.util.Map; import scala.Tuple2; @@ -41,7 +42,7 @@ * @param type of key. * @param type of value. */ -public class HoodieJavaPairRDD extends HoodiePairData { +public class HoodieJavaPairRDD implements HoodiePairData { private final JavaPairRDD pairRDDData; @@ -105,8 +106,13 @@ public Map countByKey() { } @Override - public HoodiePairData reduceByKey(SerializableBiFunction func, int parallelism) { - return HoodieJavaPairRDD.of(pairRDDData.reduceByKey(func::apply, parallelism)); + public HoodiePairData> groupByKey() { + return new HoodieJavaPairRDD<>(pairRDDData.groupByKey()); + } + + @Override + public HoodiePairData reduceByKey(SerializableBiFunction combiner, int parallelism) { + return HoodieJavaPairRDD.of(pairRDDData.reduceByKey(combiner::apply, parallelism)); } @Override @@ -130,4 +136,9 @@ public HoodiePairData>> leftOuterJoin(HoodiePairData new Tuple2<>(tuple._1, new ImmutablePair<>(tuple._2._1, Option.ofNullable(tuple._2._2.orElse(null))))))); } + + @Override + public List> collectAsList() { + return pairRDDData.map(t -> Pair.of(t._1, t._2)).collect(); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java index 0843dfc3c9920..3964fa2d6bfbf 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.function.SerializablePairFunction; import org.apache.hudi.common.util.collection.Pair; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.storage.StorageLevel; @@ -39,7 +40,7 @@ * * @param type of object. */ -public class HoodieJavaRDD extends HoodieData { +public class HoodieJavaRDD implements HoodieData { private final JavaRDD rddData; @@ -74,17 +75,16 @@ public static HoodieJavaRDD of( * @return the a {@link JavaRDD} of objects in type T. */ public static JavaRDD getJavaRDD(HoodieData hoodieData) { - return ((HoodieJavaRDD) hoodieData).get(); + return ((HoodieJavaRDD) hoodieData).rddData; } - @Override - public JavaRDD get() { - return rddData; + public static JavaPairRDD getJavaRDD(HoodiePairData hoodieData) { + return ((HoodieJavaPairRDD) hoodieData).get(); } @Override - public void persist(String cacheConfig) { - rddData.persist(StorageLevel.fromString(cacheConfig)); + public void persist(String level) { + rddData.persist(StorageLevel.fromString(level)); } @Override @@ -112,20 +112,15 @@ public HoodieData mapPartitions(SerializableFunction, Iterato return HoodieJavaRDD.of(rddData.mapPartitions(func::apply, preservesPartitioning)); } - @Override - public HoodieData mapPartitions(SerializableFunction, Iterator> func) { - return HoodieJavaRDD.of(rddData.mapPartitions(func::apply)); - } - @Override public HoodieData flatMap(SerializableFunction> func) { return HoodieJavaRDD.of(rddData.flatMap(e -> func.apply(e))); } @Override - public HoodiePairData mapToPair(SerializablePairFunction mapToPairFunc) { + public HoodiePairData mapToPair(SerializablePairFunction func) { return HoodieJavaPairRDD.of(rddData.mapToPair(input -> { - Pair pair = mapToPairFunc.call(input); + Pair pair = func.call(input); return new Tuple2<>(pair.getLeft(), pair.getRight()); })); } @@ -140,13 +135,6 @@ public HoodieData distinct(int parallelism) { return HoodieJavaRDD.of(rddData.distinct(parallelism)); } - @Override - public HoodieData distinctWithKey(SerializableFunction keyGetter, int parallelism) { - return mapToPair(i -> Pair.of(keyGetter.apply(i), i)) - .reduceByKey((value1, value2) -> value1, parallelism) - .values(); - } - @Override public HoodieData filter(SerializableFunction filterFunc) { return HoodieJavaRDD.of(rddData.filter(filterFunc::apply)); @@ -154,7 +142,7 @@ public HoodieData filter(SerializableFunction filterFunc) { @Override public HoodieData union(HoodieData other) { - return HoodieJavaRDD.of(rddData.union((JavaRDD) other.get())); + return HoodieJavaRDD.of(rddData.union(((HoodieJavaRDD) other).rddData)); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java new file mode 100644 index 0000000000000..398762dc63070 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.data; + +import org.apache.hudi.common.util.Either; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public abstract class HoodieBaseListData { + + protected final Either, List> data; + protected final boolean lazy; + + protected HoodieBaseListData(List data, boolean lazy) { + this.data = lazy ? Either.left(data.stream().parallel()) : Either.right(data); + this.lazy = lazy; + } + + protected HoodieBaseListData(Stream dataStream, boolean lazy) { + // NOTE: In case this container is being instantiated by an eager parent, we have to + // pre-materialize the stream + this.data = lazy ? Either.left(dataStream) : Either.right(dataStream.collect(Collectors.toList())); + this.lazy = lazy; + } + + protected Stream asStream() { + return lazy ? data.asLeft() : data.asRight().parallelStream(); + } + + protected boolean isEmpty() { + if (lazy) { + return data.asLeft().findAny().isPresent(); + } else { + return data.asRight().isEmpty(); + } + } + + protected long count() { + if (lazy) { + return data.asLeft().count(); + } else { + return data.asRight().size(); + } + } + + protected List collectAsList() { + if (lazy) { + return data.asLeft().collect(Collectors.toList()); + } else { + return data.asRight(); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java index 4b391ecbab752..2d24e7dd12966 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java @@ -21,108 +21,163 @@ import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.function.SerializablePairFunction; +import org.apache.hudi.common.util.collection.Pair; import java.io.Serializable; import java.util.Iterator; import java.util.List; /** - * An abstraction for a data collection of objects in type T to store the reference - * and do transformation. + * An interface abstracting a container holding a collection of objects of type {@code T} + * allowing to perform common transformation on it. * - * @param type of object. + * This abstraction provides common API implemented by + *
    + *
  1. In-memory implementation ({@code HoodieListData}, {@code HoodieListPairData}), where all objects + * are held in-memory by the executing process
  2. + *
  3. RDD-based implementation ({@code HoodieJavaRDD}, etc)
  4. , where underlying collection is held + * by an RDD allowing to execute transformations using Spark engine on the cluster + *
+ * + * All implementations provide for consistent semantic, where + *
    + *
  • All non-terminal* operations are executed lazily (for ex, {@code map}, {@code filter}, etc)
  • + *
  • All terminal operations are executed eagerly, executing all previously accumulated transformations. + * Note that, collection could not be re-used after invoking terminal operation on it.
  • + *
+ * + * @param type of object */ -public abstract class HoodieData implements Serializable { +public interface HoodieData extends Serializable { + /** - * @return the collection of objects. + * Persists the data w/ provided {@code level} (if applicable) */ - public abstract Object get(); + void persist(String level); /** - * Caches the data. - * - * @param cacheConfig config value for caching. + * Un-persists the data (if previously persisted) */ - public abstract void persist(String cacheConfig); + void unpersist(); /** - * Removes the cached data. + * Returns whether the collection is empty. */ - public abstract void unpersist(); + boolean isEmpty(); /** - * @return whether the collection is empty. + * Returns number of objects held in the collection + * + * NOTE: This is a terminal operation */ - public abstract boolean isEmpty(); + long count(); /** - * @return the number of objects. + * Maps every element in the collection using provided mapping {@code func}. + * + * This is an intermediate operation + * + * @param func serializable map function + * @param output object type + * @return {@link HoodieData} holding mapped elements */ - public abstract long count(); + HoodieData map(SerializableFunction func); /** - * @param func serializable map function. - * @param output object type. - * @return {@link HoodieData} containing the result. Actual execution may be deferred. + * Maps every element in the collection's partition (if applicable) by applying provided + * mapping {@code func} to every collection's partition + * + * This is an intermediate operation + * + * @param func serializable map function accepting {@link Iterator} of a single + * partition's elements and returning a new {@link Iterator} mapping + * every element of the partition into a new one + * @param preservesPartitioning whether to preserve partitioning in the resulting collection + * @param output object type + * @return {@link HoodieData} holding mapped elements */ - public abstract HoodieData map(SerializableFunction func); + HoodieData mapPartitions(SerializableFunction, + Iterator> func, boolean preservesPartitioning); /** - * @param func serializable map function by taking a partition of objects - * and generating an iterator. - * @param preservesPartitioning whether to preserve partitions in the result. - * @param output object type. - * @return {@link HoodieData} containing the result. Actual execution may be deferred. + * Maps every element in the collection into a collection of the new elements (provided by + * {@link Iterator}) using provided mapping {@code func}, subsequently flattening the result + * (by concatenating) into a single collection + * + * This is an intermediate operation + * + * @param func serializable function mapping every element {@link T} into {@code Iterator} + * @param output object type + * @return {@link HoodieData} holding mapped elements */ - public abstract HoodieData mapPartitions( - SerializableFunction, Iterator> func, boolean preservesPartitioning); + HoodieData flatMap(SerializableFunction> func); /** - * @param func serializable map function by taking a partition of objects - * and generating an iterator. - * @param output object type. - * @return {@link HoodieData} containing the result. Actual execution may be deferred. + * Maps every element in the collection using provided mapping {@code func} into a {@link Pair} + * of elements {@code K} and {@code V} + *

+ * This is an intermediate operation + * + * @param func serializable map function + * @param key type of the pair + * @param value type of the pair + * @return {@link HoodiePairData} holding mapped elements */ - public abstract HoodieData mapPartitions( - SerializableFunction, Iterator> func); + HoodiePairData mapToPair(SerializablePairFunction func); /** - * @param func serializable flatmap function. - * @param output object type. - * @return {@link HoodieData} containing the result. Actual execution may be deferred. + * Returns new {@link HoodieData} collection holding only distinct objects of the original one + * + * This is a stateful intermediate operation */ - public abstract HoodieData flatMap(SerializableFunction> func); + HoodieData distinct(); /** - * @param mapToPairFunc serializable map function to generate a pair. - * @param key type of the pair. - * @param value type of the pair. - * @return {@link HoodiePairData} containing the result. Actual execution may be deferred. + * Returns new {@link HoodieData} collection holding only distinct objects of the original one + * + * This is a stateful intermediate operation */ - public abstract HoodiePairData mapToPair(SerializablePairFunction mapToPairFunc); + HoodieData distinct(int parallelism); /** - * @return distinct objects in {@link HoodieData}. + * Returns new instance of {@link HoodieData} collection only containing elements matching provided + * {@code filterFunc} (ie ones it returns true on) + * + * @param filterFunc filtering func either accepting or rejecting the elements + * @return {@link HoodieData} holding filtered elements */ - public abstract HoodieData distinct(); + HoodieData filter(SerializableFunction filterFunc); - public abstract HoodieData distinct(int parallelism); - - public abstract HoodieData distinctWithKey(SerializableFunction keyGetter, int parallelism); - - public abstract HoodieData filter(SerializableFunction filterFunc); + /** + * Unions {@link HoodieData} with another instance of {@link HoodieData}. + * Note that, it's only able to union same underlying collection implementations. + * + * This is a stateful intermediate operation + * + * @param other {@link HoodieData} collection + * @return {@link HoodieData} holding superset of elements of this and {@code other} collections + */ + HoodieData union(HoodieData other); /** - * Unions this {@link HoodieData} with other {@link HoodieData}. - * @param other {@link HoodieData} of interest. - * @return the union of two as as instance of {@link HoodieData}. + * Collects results of the underlying collection into a {@link List} + * + * This is a terminal operation */ - public abstract HoodieData union(HoodieData other); + List collectAsList(); /** - * @return collected results in {@link List}. + * Re-partitions underlying collection (if applicable) making sure new {@link HoodieData} has + * exactly {@code parallelism} partitions + * + * @param parallelism target number of partitions in the underlying collection + * @return {@link HoodieData} holding re-partitioned collection */ - public abstract List collectAsList(); + HoodieData repartition(int parallelism); - public abstract HoodieData repartition(int parallelism); + default HoodieData distinctWithKey(SerializableFunction keyGetter, int parallelism) { + return mapToPair(i -> Pair.of(keyGetter.apply(i), i)) + .reduceByKey((value1, value2) -> value1, parallelism) + .values(); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java deleted file mode 100644 index 28ed2e282deb5..0000000000000 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.common.data; - -import org.apache.hudi.common.function.SerializableFunction; -import org.apache.hudi.common.function.SerializablePairFunction; -import org.apache.hudi.common.util.collection.Pair; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper; -import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper; - -/** - * Holds a {@link List} of objects. - * - * @param type of object. - */ -public class HoodieList extends HoodieData { - - private final List listData; - - private HoodieList(List listData) { - this.listData = listData; - } - - /** - * @param listData a {@link List} of objects in type T. - * @param type of object. - * @return a new instance containing the {@link List} reference. - */ - public static HoodieList of(List listData) { - return new HoodieList<>(listData); - } - - /** - * @param hoodieData {@link HoodieList } instance containing the {@link List} of objects. - * @param type of object. - * @return the a {@link List} of objects in type T. - */ - public static List getList(HoodieData hoodieData) { - return ((HoodieList) hoodieData).get(); - } - - @Override - public List get() { - return listData; - } - - @Override - public void persist(String cacheConfig) { - // No OP - } - - @Override - public void unpersist() { - // No OP - } - - @Override - public boolean isEmpty() { - return listData.isEmpty(); - } - - @Override - public long count() { - return listData.size(); - } - - @Override - public HoodieData map(SerializableFunction func) { - return HoodieList.of(listData.stream().parallel() - .map(throwingMapWrapper(func)).collect(Collectors.toList())); - } - - @Override - public HoodieData mapPartitions(SerializableFunction, Iterator> func, boolean preservesPartitioning) { - return mapPartitions(func); - } - - @Override - public HoodieData mapPartitions(SerializableFunction, Iterator> func) { - List result = new ArrayList<>(); - throwingMapWrapper(func).apply(listData.iterator()).forEachRemaining(result::add); - return HoodieList.of(result); - } - - @Override - public HoodieData flatMap(SerializableFunction> func) { - Function> throwableFunc = throwingMapWrapper(func); - return HoodieList.of(listData.stream().flatMap(e -> { - List result = new ArrayList<>(); - Iterator iterator = throwableFunc.apply(e); - iterator.forEachRemaining(result::add); - return result.stream(); - }).collect(Collectors.toList())); - } - - @Override - public HoodiePairData mapToPair(SerializablePairFunction mapToPairFunc) { - Map> mapOfPairs = new HashMap<>(); - Function> throwableMapToPairFunc = throwingMapToPairWrapper(mapToPairFunc); - listData.forEach(data -> { - Pair pair = throwableMapToPairFunc.apply(data); - List list = mapOfPairs.computeIfAbsent(pair.getKey(), k -> new ArrayList<>()); - list.add(pair.getValue()); - }); - return HoodieMapPair.of(mapOfPairs); - } - - @Override - public HoodieData distinct() { - return HoodieList.of(new ArrayList<>(new HashSet<>(listData))); - } - - @Override - public HoodieData distinct(int parallelism) { - return distinct(); - } - - @Override - public HoodieData distinctWithKey(SerializableFunction keyGetter, int parallelism) { - return mapToPair(i -> Pair.of(keyGetter.apply(i), i)) - .reduceByKey((value1, value2) -> value1, parallelism) - .values(); - } - - @Override - public HoodieData filter(SerializableFunction filterFunc) { - return HoodieList.of(listData - .stream() - .filter(i -> throwingMapWrapper(filterFunc).apply(i)) - .collect(Collectors.toList())); - } - - @Override - public HoodieData union(HoodieData other) { - List unionResult = new ArrayList<>(); - unionResult.addAll(listData); - unionResult.addAll(other.collectAsList()); - return HoodieList.of(unionResult); - } - - @Override - public List collectAsList() { - return listData; - } - - @Override - public HoodieData repartition(int parallelism) { - // no op - return this; - } -} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java new file mode 100644 index 0000000000000..0be9ec9fa73b3 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.data; + +import org.apache.hudi.common.function.SerializableFunction; +import org.apache.hudi.common.function.SerializablePairFunction; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; + +import java.util.Iterator; +import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.function.Function; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper; +import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper; + +/** + * In-memory implementation of {@link HoodieData} holding internally a {@link Stream} of objects. + * + * {@link HoodieListData} can have either of the 2 execution semantics: + * + *

    + *
  1. Eager: with every operation being executed right away
  2. + *
  3. Lazy: with every operation being "stacked up", with it execution postponed until + * "terminal" operation is invoked
  4. + *
+ * + * NOTE: This is an in-memory counterpart for {@code HoodieJavaRDD}, and it strives to provide + * similar semantic as RDD container -- all intermediate (non-terminal, not de-referencing + * the stream like "collect", "groupBy", etc) operations are executed *lazily*. + * This allows to make sure that compute/memory churn is minimal since only necessary + * computations will ultimately be performed. + * + * Please note, however, that while RDD container allows the same collection to be + * de-referenced more than once (ie terminal operation invoked more than once), + * {@link HoodieListData} allows that only when instantiated w/ an eager execution semantic. + * + * @param type of object. + */ +public class HoodieListData extends HoodieBaseListData implements HoodieData { + + private HoodieListData(List data, boolean lazy) { + super(data, lazy); + } + + HoodieListData(Stream dataStream, boolean lazy) { + super(dataStream, lazy); + } + + /** + * Creates instance of {@link HoodieListData} bearing *eager* execution semantic + * + * @param listData a {@link List} of objects in type T + * @param type of object + * @return a new instance containing the {@link List} reference + */ + public static HoodieListData eager(List listData) { + return new HoodieListData<>(listData, false); + } + + /** + * Creates instance of {@link HoodieListData} bearing *lazy* execution semantic + * + * @param listData a {@link List} of objects in type T + * @param type of object + * @return a new instance containing the {@link List} reference + */ + public static HoodieListData lazy(List listData) { + return new HoodieListData<>(listData, true); + } + + @Override + public void persist(String level) { + // No OP + } + + @Override + public void unpersist() { + // No OP + } + + @Override + public HoodieData map(SerializableFunction func) { + return new HoodieListData<>(asStream().map(throwingMapWrapper(func)), lazy); + } + + @Override + public HoodieData mapPartitions(SerializableFunction, Iterator> func, boolean preservesPartitioning) { + Function, Iterator> mapper = throwingMapWrapper(func); + return new HoodieListData<>( + StreamSupport.stream( + Spliterators.spliteratorUnknownSize( + mapper.apply(asStream().iterator()), Spliterator.ORDERED), true), + lazy + ); + } + + @Override + public HoodieData flatMap(SerializableFunction> func) { + Function> mapper = throwingMapWrapper(func); + Stream mappedStream = asStream().flatMap(e -> + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(mapper.apply(e), Spliterator.ORDERED), true)); + return new HoodieListData<>(mappedStream, lazy); + } + + @Override + public HoodiePairData mapToPair(SerializablePairFunction func) { + Function> throwableMapToPairFunc = throwingMapToPairWrapper(func); + return new HoodieListPairData<>(asStream().map(throwableMapToPairFunc), lazy); + } + + @Override + public HoodieData distinct() { + return new HoodieListData<>(asStream().distinct(), lazy); + } + + @Override + public HoodieData distinct(int parallelism) { + return distinct(); + } + + @Override + public HoodieData distinctWithKey(SerializableFunction keyGetter, int parallelism) { + return mapToPair(i -> Pair.of(keyGetter.apply(i), i)) + .reduceByKey((value1, value2) -> value1, parallelism) + .values(); + } + + @Override + public HoodieData filter(SerializableFunction filterFunc) { + return new HoodieListData<>(asStream().filter(r -> throwingMapWrapper(filterFunc).apply(r)), lazy); + } + + @Override + public HoodieData union(HoodieData other) { + ValidationUtils.checkArgument(other instanceof HoodieListData); + return new HoodieListData<>(Stream.concat(asStream(), ((HoodieListData)other).asStream()), lazy); + } + + @Override + public HoodieData repartition(int parallelism) { + // no op + return this; + } + + @Override + public boolean isEmpty() { + return super.isEmpty(); + } + + @Override + public long count() { + return super.count(); + } + + @Override + public List collectAsList() { + return super.collectAsList(); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java new file mode 100644 index 0000000000000..a389649548e98 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.data; + +import org.apache.hudi.common.function.SerializableBiFunction; +import org.apache.hudi.common.function.SerializableFunction; +import org.apache.hudi.common.function.SerializablePairFunction; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper; +import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper; + +/** + * In-memory implementation of {@link HoodiePairData} holding internally a {@link Stream} of {@link Pair}s. + * + * {@link HoodieListData} can have either of the 2 execution semantics: + * + *
    + *
  1. Eager: with every operation being executed right away
  2. + *
  3. Lazy: with every operation being "stacked up", with it execution postponed until + * "terminal" operation is invoked
  4. + *
+ * + * + * NOTE: This is an in-memory counterpart for {@code HoodieJavaPairRDD}, and it strives to provide + * similar semantic as RDD container -- all intermediate (non-terminal, not de-referencing + * the stream like "collect", "groupBy", etc) operations are executed *lazily*. + * This allows to make sure that compute/memory churn is minimal since only necessary + * computations will ultimately be performed. + * + * Please note, however, that while RDD container allows the same collection to be + * de-referenced more than once (ie terminal operation invoked more than once), + * {@link HoodieListData} allows that only when instantiated w/ an eager execution semantic. + * + * @param type of the key in the pair + * @param type of the value in the pair + */ +public class HoodieListPairData extends HoodieBaseListData> implements HoodiePairData { + + private HoodieListPairData(List> data, boolean lazy) { + super(data, lazy); + } + + HoodieListPairData(Stream> dataStream, boolean lazy) { + super(dataStream, lazy); + } + + @Override + public List> get() { + return collectAsList(); + } + + @Override + public void persist(String cacheConfig) { + // no-op + } + + @Override + public void unpersist() { + // no-op + } + + @Override + public HoodieData keys() { + return new HoodieListData<>(asStream().map(Pair::getKey), lazy); + } + + @Override + public HoodieData values() { + return new HoodieListData<>(asStream().map(Pair::getValue), lazy); + } + + @Override + public Map countByKey() { + return asStream().collect(Collectors.groupingBy(Pair::getKey, Collectors.counting())); + } + + @Override + public HoodiePairData> groupByKey() { + Collector, ?, List> mappingCollector = Collectors.mapping(Pair::getValue, Collectors.toList()); + Collector, ?, Map>> groupingCollector = + Collectors.groupingBy(Pair::getKey, mappingCollector); + + Map> groupedByKey = asStream().collect(groupingCollector); + return new HoodieListPairData<>( + groupedByKey.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())), + lazy + ); + } + + @Override + public HoodiePairData reduceByKey(SerializableBiFunction combiner, int parallelism) { + Map> reducedMap = asStream().collect( + Collectors.groupingBy( + Pair::getKey, + HashMap::new, + Collectors.mapping(Pair::getValue, Collectors.reducing(combiner::apply)))); + + return new HoodieListPairData<>( + reducedMap.entrySet() + .stream() + .map(e -> Pair.of(e.getKey(), e.getValue().orElse(null))), + lazy + ); + } + + @Override + public HoodieData map(SerializableFunction, O> func) { + Function, O> uncheckedMapper = throwingMapWrapper(func); + return new HoodieListData<>(asStream().map(uncheckedMapper), lazy); + } + + @Override + public HoodiePairData mapToPair(SerializablePairFunction, L, W> mapToPairFunc) { + return new HoodieListPairData<>(asStream().map(p -> throwingMapToPairWrapper(mapToPairFunc).apply(p)), lazy); + } + + @Override + public HoodiePairData>> leftOuterJoin(HoodiePairData other) { + ValidationUtils.checkArgument(other instanceof HoodieListPairData); + + // Transform right-side container to a multi-map of [[K]] to [[List]] values + HashMap> rightStreamMap = ((HoodieListPairData) other).asStream().collect( + Collectors.groupingBy( + Pair::getKey, + HashMap::new, + Collectors.mapping(Pair::getValue, Collectors.toList()))); + + Stream>>> leftOuterJoined = asStream().flatMap(pair -> { + K key = pair.getKey(); + V leftValue = pair.getValue(); + List rightValues = rightStreamMap.get(key); + + if (rightValues == null) { + return Stream.of(Pair.of(key, Pair.of(leftValue, Option.empty()))); + } else { + return rightValues.stream().map(rightValue -> + Pair.of(key, Pair.of(leftValue, Option.of(rightValue)))); + } + }); + + return new HoodieListPairData<>(leftOuterJoined, lazy); + } + + @Override + public long count() { + return super.count(); + } + + @Override + public List> collectAsList() { + return super.collectAsList(); + } + + public static HoodieListPairData lazy(List> data) { + return new HoodieListPairData<>(data, true); + } + + public static HoodieListPairData eager(List> data) { + return new HoodieListPairData<>(data, false); + } + + public static HoodieListPairData lazy(Map> data) { + return new HoodieListPairData<>(explode(data), true); + } + + public static HoodieListPairData eager(Map> data) { + return new HoodieListPairData<>(explode(data), false); + } + + private static Stream> explode(Map> data) { + return data.entrySet().stream() + .flatMap(e -> e.getValue().stream().map(v -> Pair.of(e.getKey(), v))); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java deleted file mode 100644 index 1e125c90a190b..0000000000000 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.common.data; - -import org.apache.hudi.common.function.FunctionWrapper; -import org.apache.hudi.common.function.SerializableBiFunction; -import org.apache.hudi.common.function.SerializableFunction; -import org.apache.hudi.common.function.SerializablePairFunction; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.ImmutablePair; -import org.apache.hudi.common.util.collection.Pair; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper; - -/** - * Implementation of {@link HoodiePairData} using Java {@link Map}. - * The pairs are organized by the key in the Map and values for the same key - * are stored in a list as the value corresponding to the key in the Map. - * - * @param type of key. - * @param type of value. - */ -public class HoodieMapPair extends HoodiePairData { - - private final Map> mapPairData; - - private HoodieMapPair(Map> mapPairData) { - this.mapPairData = mapPairData; - } - - /** - * @param mapPairData a {@link Map} of pairs. - * @param type of key. - * @param type of value. - * @return a new instance containing the {@link Map>} reference. - */ - public static HoodieMapPair of(Map> mapPairData) { - return new HoodieMapPair<>(mapPairData); - } - - /** - * @param hoodiePairData {@link HoodieMapPair } instance containing the {@link Map} of pairs. - * @param type of key. - * @param type of value. - * @return the {@link Map} of pairs. - */ - public static Map> getMapPair(HoodiePairData hoodiePairData) { - return ((HoodieMapPair) hoodiePairData).get(); - } - - @Override - public Map> get() { - return mapPairData; - } - - @Override - public void persist(String cacheConfig) { - // No OP - } - - @Override - public void unpersist() { - // No OP - } - - @Override - public HoodieData keys() { - return HoodieList.of(new ArrayList<>(mapPairData.keySet())); - } - - @Override - public HoodieData values() { - return HoodieList.of( - mapPairData.values().stream().flatMap(List::stream).collect(Collectors.toList())); - } - - @Override - public long count() { - return mapPairData.values().stream().map( - list -> (long) list.size()).reduce(Long::sum).orElse(0L); - } - - @Override - public Map countByKey() { - return mapPairData.entrySet().stream().collect( - Collectors.toMap(Map.Entry::getKey, entry -> (long) entry.getValue().size())); - } - - @Override - public HoodiePairData reduceByKey(SerializableBiFunction func, int parallelism) { - return HoodieMapPair.of(mapPairData.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> { - Option reducedValue = Option.fromJavaOptional(e.getValue().stream().reduce(func::apply)); - return reducedValue.isPresent() ? Collections.singletonList(reducedValue.get()) : Collections.emptyList(); - }))); - } - - @Override - public HoodieData map(SerializableFunction, O> func) { - Function, O> throwableFunc = throwingMapWrapper(func); - return HoodieList.of( - streamAllPairs().map(throwableFunc).collect(Collectors.toList())); - } - - @Override - public HoodiePairData mapToPair(SerializablePairFunction, L, W> mapToPairFunc) { - Map> newMap = new HashMap<>(); - Function, Pair> throwableMapToPairFunc = - FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc); - streamAllPairs().map(pair -> throwableMapToPairFunc.apply(pair)).forEach(newPair -> { - List list = newMap.computeIfAbsent(newPair.getKey(), k -> new ArrayList<>()); - list.add(newPair.getValue()); - }); - return HoodieMapPair.of(newMap); - } - - @Override - public HoodiePairData>> leftOuterJoin(HoodiePairData other) { - Map> otherMapPairData = HoodieMapPair.getMapPair(other); - Stream>>>> pairs = streamAllPairs() - .map(pair -> new ImmutablePair<>(pair.getKey(), new ImmutablePair<>( - pair.getValue(), Option.ofNullable(otherMapPairData.get(pair.getKey()))))); - Map>>> resultMap = new HashMap<>(); - pairs.forEach(pair -> { - K key = pair.getKey(); - ImmutablePair>> valuePair = pair.getValue(); - List>> resultList = resultMap.computeIfAbsent(key, k -> new ArrayList<>()); - if (!valuePair.getRight().isPresent()) { - resultList.add(new ImmutablePair<>(valuePair.getLeft(), Option.empty())); - } else { - resultList.addAll(valuePair.getRight().get().stream().map( - w -> new ImmutablePair<>(valuePair.getLeft(), Option.of(w))).collect(Collectors.toList())); - } - }); - return HoodieMapPair.of(resultMap); - } - - private Stream> streamAllPairs() { - return mapPairData.entrySet().stream().flatMap( - entry -> entry.getValue().stream().map(e -> new ImmutablePair<>(entry.getKey(), e))); - } -} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java index 9ff52793d6f0e..49fa7174da9a6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.util.collection.Pair; import java.io.Serializable; +import java.util.List; import java.util.Map; /** @@ -35,71 +36,92 @@ * @param type of key. * @param type of value. */ -public abstract class HoodiePairData implements Serializable { +public interface HoodiePairData extends Serializable { /** * @return the collection of pairs. */ - public abstract Object get(); + Object get(); /** - * Caches the data. + * Persists the data (if applicable) * * @param cacheConfig config value for caching. */ - public abstract void persist(String cacheConfig); + void persist(String cacheConfig); /** - * Removes the cached data. + * Un-persists the data (if applicable) */ - public abstract void unpersist(); + void unpersist(); /** - * @return all keys in {@link HoodieData}. + * Returns a {@link HoodieData} holding the key from every corresponding pair */ - public abstract HoodieData keys(); + HoodieData keys(); /** - * @return all values in {@link HoodieData}. + * Returns a {@link HoodieData} holding the value from every corresponding pair */ - public abstract HoodieData values(); + HoodieData values(); /** - * @return the number of pairs. + * Returns number of held pairs */ - public abstract long count(); + long count(); /** - * @return the number of pairs per key in a {@link Map}. + * Counts the number of pairs grouping them by key */ - public abstract Map countByKey(); + Map countByKey(); - public abstract HoodiePairData reduceByKey(SerializableBiFunction func, int parallelism); + /** + * Groups the values for each key in the dataset into a single sequence + */ + HoodiePairData> groupByKey(); + + /** + * Reduces original sequence by de-duplicating the pairs w/ the same key, using provided + * binary operator {@code combiner}. Returns an instance of {@link HoodiePairData} holding + * the "de-duplicated" pairs, ie only pairs with unique keys. + * + * @param combiner method to combine values of the pairs with the same key + * @param parallelism target parallelism (if applicable) + */ + HoodiePairData reduceByKey(SerializableBiFunction combiner, int parallelism); /** * @param func serializable map function. * @param output object type. * @return {@link HoodieData} containing the result. Actual execution may be deferred. */ - public abstract HoodieData map(SerializableFunction, O> func); + HoodieData map(SerializableFunction, O> func); /** * @param mapToPairFunc serializable map function to generate another pair. * @param new key type. * @param new value type. - * @return {@link HoodiePairData} containing the result. Actual execution may be deferred. + * @return containing the result. Actual execution may be deferred. */ - public abstract HoodiePairData mapToPair( + HoodiePairData mapToPair( SerializablePairFunction, L, W> mapToPairFunc); /** - * Performs a left outer join of this and other. For each element (k, v) in this, - * the resulting HoodiePairData will either contain all pairs (k, (v, Some(w))) for w in other, - * or the pair (k, (v, None)) if no elements in other have key k. + * Performs a left outer join of this dataset against {@code other}. + * + * For each element (k, v) in this, the resulting {@link HoodiePairData} will either contain all + * pairs {@code (k, (v, Some(w)))} for every {@code w} in the {@code other}, or the pair {@code (k, (v, None))} + * if no elements in {@code other} have the pair w/ a key {@code k} * * @param other the other {@link HoodiePairData} * @param value type of the other {@link HoodiePairData} - * @return {@link HoodiePairData>>} containing the left outer join result. - * Actual execution may be deferred. + * @return containing the result of the left outer join + */ + HoodiePairData>> leftOuterJoin(HoodiePairData other); + + /** + * Collects results of the underlying collection into a {@link List>} + * + * This is a terminal operation */ - public abstract HoodiePairData>> leftOuterJoin(HoodiePairData other); + List> collectAsList(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java index c99430e284db9..5d7d193dc6b8f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java @@ -24,7 +24,7 @@ import org.apache.hudi.common.data.HoodieAccumulator; import org.apache.hudi.common.data.HoodieAtomicLongAccumulator; import org.apache.hudi.common.data.HoodieData; -import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.function.SerializableBiFunction; import org.apache.hudi.common.function.SerializableConsumer; import org.apache.hudi.common.function.SerializableFunction; @@ -71,12 +71,12 @@ public HoodieAccumulator newAccumulator() { @Override public HoodieData emptyHoodieData() { - return HoodieList.of(Collections.emptyList()); + return HoodieListData.eager(Collections.emptyList()); } @Override public HoodieData parallelize(List data, int parallelism) { - return HoodieList.of(data); + return HoodieListData.eager(data); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java index 9040a04d5edcf..bb1ef72beae3d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java @@ -34,6 +34,7 @@ import java.util.Set; import java.util.Spliterator; import java.util.Spliterators; +import java.util.function.BiFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -107,6 +108,19 @@ public static HashMap combine(Map one, Map another) { return combined; } + /** + * Combines provided {@link Map}s into one, returning new instance of {@link HashMap}. + * + * NOTE: That values associated with overlapping keys from the second map, will override + * values from the first one + */ + public static HashMap combine(Map one, Map another, BiFunction merge) { + HashMap combined = new HashMap<>(one.size() + another.size()); + combined.putAll(one); + another.forEach((k, v) -> combined.merge(k, v, merge)); + return combined; + } + /** * Returns difference b/w {@code one} {@link Set} of elements and {@code another} */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/Either.java b/hudi-common/src/main/java/org/apache/hudi/common/util/Either.java new file mode 100644 index 0000000000000..93accc4b75566 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/Either.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import javax.annotation.Nonnull; + +import static org.apache.hudi.TypeUtils.unsafeCast; + +/** + * Utility that could hold exclusively only either of (hence the name): + *
    + *
  • Non-null value of type {@link L}
  • + *
  • Non-null value of type {@link R}
  • + *
+ * + * @param type of the "left" potential element + * @param type of the "right" potential element + */ +public abstract class Either { + + @Nonnull + protected abstract Object getValue(); + + public final boolean isLeft() { + return this instanceof EitherLeft; + } + + public final boolean isRight() { + return this instanceof EitherRight; + } + + public R asRight() { + ValidationUtils.checkArgument(isRight(), "Trying to access non-existent value of Either"); + EitherRight right = unsafeCast(this); + return right.getValue(); + } + + public L asLeft() { + ValidationUtils.checkArgument(isLeft(), "Trying to access non-existent value of Either"); + EitherLeft left = unsafeCast(this); + return left.getValue(); + } + + public static Either right(R right) { + return new EitherRight<>(right); + } + + public static Either left(L left) { + return new EitherLeft<>(left); + } + + public static class EitherRight extends Either { + private final R value; + private EitherRight(@Nonnull R right) { + this.value = right; + } + + @Nonnull + @Override + protected R getValue() { + return value; + } + } + + public static class EitherLeft extends Either { + private final L value; + private EitherLeft(@Nonnull L value) { + this.value = value; + } + + @Nonnull + @Override + protected L getValue() { + return value; + } + } +} \ No newline at end of file diff --git a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieList.java b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java similarity index 64% rename from hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieList.java rename to hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java index 6130d4af1094e..8da8be1338a30 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieList.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java @@ -21,17 +21,19 @@ import org.apache.hudi.common.util.collection.Pair; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; -class TestHoodieList { +class TestHoodieListData { private static Stream distinctWithKey() { return Stream.of( @@ -44,7 +46,22 @@ private static Stream distinctWithKey() { @ParameterizedTest @MethodSource void distinctWithKey(List> expected, List> originalList) { - List> distinctList = HoodieList.of(originalList).distinctWithKey(Pair::getLeft, 1).collectAsList(); + List> distinctList = HoodieListData.eager(originalList).distinctWithKey(Pair::getLeft, 1).collectAsList(); assertEquals(expected, distinctList); } + + @Test + void testEagerSemantic() { + List sourceList = Arrays.asList("quick", "brown", "fox"); + + HoodieListData originalListData = HoodieListData.eager(sourceList); + HoodieData lengthsListData = originalListData.map(String::length); + + List expectedLengths = sourceList.stream().map(String::length).collect(Collectors.toList()); + assertEquals(expectedLengths, lengthsListData.collectAsList()); + // Here we assert that even though we already de-referenced derivative container, + // we still can dereference its parent (multiple times) + assertEquals(3, originalListData.count()); + assertEquals(sourceList, originalListData.collectAsList()); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java similarity index 75% rename from hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java rename to hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java index 20e9a8f5d9b13..bb65909230da0 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java @@ -22,8 +22,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.common.util.collection.Pair; - -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -37,12 +36,13 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static org.apache.hudi.common.util.CollectionUtils.createImmutableList; import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap; import static org.junit.jupiter.api.Assertions.assertEquals; -public class TestHoodieMapPair { +public class TestHoodieListDataPairData { private static final String KEY1 = "key1"; private static final String KEY2 = "key2"; @@ -63,30 +63,30 @@ public class TestHoodieMapPair { private static final int INTEGER_VALUE4 = 4; private static final int INTEGER_VALUE5 = 5; - private static List> TEST_PAIRS; - private static HoodiePairData TEST_HOODIE_MAP_PAIR; + private List> testPairs; + private HoodiePairData testHoodiePairData; - @BeforeAll - public static void setup() { - TEST_PAIRS = constructPairs(); - TEST_HOODIE_MAP_PAIR = constructTestMapPairData(TEST_PAIRS); + @BeforeEach + public void setup() { + testPairs = constructPairs(); + testHoodiePairData = HoodieListPairData.lazy(testPairs); } @Test public void testKeys() { - assertHoodieDataEquals(Arrays.asList(KEY1, KEY2, KEY3, KEY4), TEST_HOODIE_MAP_PAIR.keys()); + assertHoodieDataEquals(Arrays.asList(KEY1, KEY1, KEY2, KEY2, KEY3, KEY4), testHoodiePairData.keys()); } @Test public void testValues() { assertHoodieDataEquals(Arrays.asList( STRING_VALUE1, STRING_VALUE2, STRING_VALUE3, STRING_VALUE4, STRING_VALUE5, STRING_VALUE6), - TEST_HOODIE_MAP_PAIR.values()); + testHoodiePairData.values()); } @Test public void testCount() { - assertEquals(6, TEST_HOODIE_MAP_PAIR.count()); + assertEquals(6, testHoodiePairData.count()); } @Test @@ -97,14 +97,14 @@ public void testCountByKey() { expectedResultMap.put(KEY3, 1L); expectedResultMap.put(KEY4, 1L); - assertEquals(expectedResultMap, TEST_HOODIE_MAP_PAIR.countByKey()); + assertEquals(expectedResultMap, testHoodiePairData.countByKey()); } @Test public void testMap() { assertHoodieDataEquals(Arrays.asList( "key1,value1", "key1,value2", "key2,value3", "key2,value4", "key3,value5", "key4,value6"), - TEST_HOODIE_MAP_PAIR.map(pair -> pair.getKey() + "," + pair.getValue())); + testHoodiePairData.map(pair -> pair.getKey() + "," + pair.getValue())); } @Test @@ -114,8 +114,8 @@ public void testMapToPair() { expectedResultMap.put("key20", Arrays.asList(3, 4)); expectedResultMap.put("key30", Arrays.asList(5)); expectedResultMap.put("key40", Arrays.asList(6)); - assertEquals(expectedResultMap, HoodieMapPair.getMapPair( - TEST_HOODIE_MAP_PAIR.mapToPair( + assertEquals(expectedResultMap, toMap( + testHoodiePairData.mapToPair( pair -> { String value = pair.getValue(); return new ImmutablePair<>(pair.getKey() + "0", @@ -129,8 +129,7 @@ private static Stream testReduceByKey() { createImmutableMap( Pair.of(1, createImmutableList(1001)), Pair.of(2, createImmutableList(2001)), - Pair.of(3, createImmutableList(3001)), - Pair.of(4, createImmutableList())), + Pair.of(3, createImmutableList(3001))), createImmutableMap( Pair.of(1, createImmutableList(1001, 1002, 1003)), Pair.of(2, createImmutableList(2001, 2002)), @@ -142,20 +141,20 @@ private static Stream testReduceByKey() { @ParameterizedTest @MethodSource public void testReduceByKey(Map> expected, Map> original) { - HoodiePairData reduced = HoodieMapPair.of(original).reduceByKey((a, b) -> a, 1); - assertEquals(expected, HoodieMapPair.getMapPair(reduced)); + HoodiePairData reduced = HoodieListPairData.lazy(original).reduceByKey((a, b) -> a, 1); + assertEquals(expected, toMap(reduced)); } @Test public void testLeftOuterJoinSingleValuePerKey() { - HoodiePairData pairData1 = constructTestMapPairData(Arrays.asList( + HoodiePairData pairData1 = HoodieListPairData.lazy(Arrays.asList( ImmutablePair.of(KEY1, STRING_VALUE1), ImmutablePair.of(KEY2, STRING_VALUE2), ImmutablePair.of(KEY3, STRING_VALUE3), ImmutablePair.of(KEY4, STRING_VALUE4) )); - HoodiePairData pairData2 = constructTestMapPairData(Arrays.asList( + HoodiePairData pairData2 = HoodieListPairData.lazy(Arrays.asList( ImmutablePair.of(KEY1, INTEGER_VALUE1), ImmutablePair.of(KEY2, INTEGER_VALUE2), ImmutablePair.of(KEY5, INTEGER_VALUE3) @@ -172,12 +171,12 @@ public void testLeftOuterJoinSingleValuePerKey() { ImmutablePair.of(STRING_VALUE4, Option.empty()))); assertEquals(expectedResultMap, - HoodieMapPair.getMapPair(pairData1.leftOuterJoin(pairData2))); + toMap(pairData1.leftOuterJoin(pairData2))); } @Test public void testLeftOuterJoinMultipleValuesPerKey() { - HoodiePairData otherPairData = constructTestMapPairData(Arrays.asList( + HoodiePairData otherPairData = HoodieListPairData.lazy(Arrays.asList( ImmutablePair.of(KEY1, INTEGER_VALUE1), ImmutablePair.of(KEY2, INTEGER_VALUE2), ImmutablePair.of(KEY2, INTEGER_VALUE3), @@ -200,7 +199,25 @@ public void testLeftOuterJoinMultipleValuesPerKey() { ImmutablePair.of(STRING_VALUE6, Option.empty()))); assertEquals(expectedResultMap, - HoodieMapPair.getMapPair(TEST_HOODIE_MAP_PAIR.leftOuterJoin(otherPairData))); + toMap(testHoodiePairData.leftOuterJoin(otherPairData))); + } + + @Test + void testEagerSemantic() { + List> sourceList = + Stream.of("quick", "brown", "fox") + .map(s -> Pair.of(s, s.length())) + .collect(Collectors.toList()); + + HoodieListPairData originalListData = HoodieListPairData.eager(sourceList); + HoodieData lengthsListData = originalListData.values(); + + List expectedLengths = sourceList.stream().map(Pair::getValue).collect(Collectors.toList()); + assertEquals(expectedLengths, lengthsListData.collectAsList()); + // Here we assert that even though we already de-referenced derivative container, + // we still can dereference its parent (multiple times) + assertEquals(3, originalListData.count()); + assertEquals(sourceList, originalListData.collectAsList()); } private static List> constructPairs() { @@ -214,11 +231,14 @@ private static List> constructPairs() { ); } - private static HoodiePairData constructTestMapPairData( - final List> pairs) { - Map> map = new HashMap<>(); - addPairsToMap(map, pairs); - return HoodieMapPair.of(map); + private static Map> toMap(HoodiePairData pairData) { + return ((List>>) pairData.groupByKey().get()).stream() + .collect( + Collectors.toMap( + p -> p.getKey(), + p -> StreamSupport.stream(p.getValue().spliterator(), false).collect(Collectors.toList()) + ) + ); } private static void addPairsToMap( diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index 4eed1c4ef502e..8dadd2e2dcf62 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -20,7 +20,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.data.HoodieList; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; @@ -158,7 +158,7 @@ private void doCommit(String instant, Collection events) .collect(Collectors.toList()); HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata( - table, instant, HoodieList.of(statuses), writeClient.getConfig().getSchema()); + table, instant, HoodieListData.eager(statuses), writeClient.getConfig().getSchema()); // commit the compaction this.writeClient.commitCompaction(instant, metadata, Option.empty());