diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java index efc24444ddea3..ddeec6a36239a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java @@ -57,6 +57,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; + /** * Class to be used in tests to keep generating test inserts and updates against a corpus. *

@@ -73,14 +74,15 @@ public class HoodieTestDataGenerator { public static final String[] DEFAULT_PARTITION_PATHS = {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}; public static final int DEFAULT_PARTITION_DEPTH = 3; - public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ " - + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"}," - + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"}," - + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"}," - + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"}," - + "{\"name\":\"fare\",\"type\": \"double\"}]}"; + public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " + + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"}," + + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"}," + + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"}," + + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"}," + + "{\"name\":\"fare\",\"type\": \"double\"}," + + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}"; public static String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); - public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double"; + public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double,boolean"; public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); public static Schema avroSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(avroSchema); @@ -117,6 +119,15 @@ public static TestRawTripPayload generateRandomValue(HoodieKey key, String commi return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); } + /** + * Generates a new avro record of the above schema format for a delete. + */ + public static TestRawTripPayload generateRandomDeleteValue(HoodieKey key, String commitTime) throws IOException { + GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0, + true); + return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA); + } + /** * Generates a new avro record of the above schema format, retaining the key if optionally provided. */ @@ -126,7 +137,12 @@ public static HoodieAvroPayload generateAvroPayload(HoodieKey key, String commit } public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, - double timestamp) { + double timestamp) { + return generateGenericRecord(rowKey, riderName, driverName, timestamp, false); + } + + public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName, + double timestamp, boolean isDeleteRecord) { GenericRecord rec = new GenericData.Record(avroSchema); rec.put("_row_key", rowKey); rec.put("timestamp", timestamp); @@ -137,12 +153,18 @@ public static GenericRecord generateGenericRecord(String rowKey, String riderNam rec.put("end_lat", rand.nextDouble()); rec.put("end_lon", rand.nextDouble()); rec.put("fare", rand.nextDouble() * 100); + if (isDeleteRecord) { + rec.put("_hoodie_is_deleted", true); + } else { + rec.put("_hoodie_is_deleted", false); + } return rec; } public static void createCommitFile(String basePath, String commitTime, Configuration configuration) { Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime), - HoodieTimeline.makeRequestedCommitFileName(commitTime)).forEach(f -> { + HoodieTimeline.makeRequestedCommitFileName(commitTime)) + .forEach(f -> { Path commitFile = new Path( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f); FSDataOutputStream os = null; @@ -176,7 +198,7 @@ public static void createCompactionRequestedFile(String basePath, String commitT } public static void createCompactionAuxiliaryMetadata(String basePath, HoodieInstant instant, - Configuration configuration) throws IOException { + Configuration configuration) throws IOException { Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instant.getFileName()); FileSystem fs = FSUtils.getFs(basePath, configuration); @@ -332,7 +354,7 @@ public List generateUpdatesWithDiffPartition(String commitTime, Li * list * * @param commitTime Commit Timestamp - * @param n Number of updates (including dups) + * @param n Number of updates (including dups) * @return list of hoodie record updates */ public List generateUpdates(String commitTime, Integer n) throws IOException { @@ -349,7 +371,7 @@ public List generateUpdates(String commitTime, Integer n) throws I * Generates deduped updates of keys previously inserted, randomly distributed across the keys above. * * @param commitTime Commit Timestamp - * @param n Number of unique records + * @param n Number of unique records * @return list of hoodie record updates */ public List generateUniqueUpdates(String commitTime, Integer n) { @@ -370,7 +392,7 @@ public List generateUniqueDeletes(Integer n) { * Generates deduped updates of keys previously inserted, randomly distributed across the keys above. * * @param commitTime Commit Timestamp - * @param n Number of unique records + * @param n Number of unique records * @return stream of hoodie record updates */ public Stream generateUniqueUpdatesStream(String commitTime, Integer n) { @@ -418,11 +440,46 @@ public Stream generateUniqueDeleteStream(Integer n) { index = (index + 1) % numExistingKeys; kp = existingKeys.get(index); } + existingKeys.remove(kp); + numExistingKeys--; used.add(kp); return kp.key; }); } + /** + * Generates deduped delete records previously inserted, randomly distributed across the keys above. + * + * @param commitTime Commit Timestamp + * @param n Number of unique records + * @return stream of hoodie records for delete + */ + public Stream generateUniqueDeleteRecordStream(String commitTime, Integer n) { + final Set used = new HashSet<>(); + + if (n > numExistingKeys) { + throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys"); + } + + return IntStream.range(0, n).boxed().map(i -> { + int index = numExistingKeys == 1 ? 0 : rand.nextInt(numExistingKeys - 1); + KeyPartition kp = existingKeys.get(index); + // Find the available keyPartition starting from randomly chosen one. + while (used.contains(kp)) { + index = (index + 1) % numExistingKeys; + kp = existingKeys.get(index); + } + existingKeys.remove(kp); + numExistingKeys--; + used.add(kp); + try { + return new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key, commitTime)); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }); + } + public String[] getPartitionPaths() { return partitionPaths; } diff --git a/hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java index b13356050f729..3bf07d98b0589 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java +++ b/hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java @@ -31,7 +31,6 @@ * Base class for all AVRO record based payloads, that can be ordered based on a field. */ public abstract class BaseAvroPayload implements Serializable { - /** * Avro data extracted from the source converted to bytes. */ @@ -43,8 +42,10 @@ public abstract class BaseAvroPayload implements Serializable { protected final Comparable orderingVal; /** - * @param record - * @param orderingVal + * Instantiate {@link BaseAvroPayload}. + * + * @param record Generic record for the payload. + * @param orderingVal {@link Comparable} to be used in pre combine. */ public BaseAvroPayload(GenericRecord record, Comparable orderingVal) { try { diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index 1dbd944ffa5ce..b06a9ae427c6e 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -103,7 +103,7 @@ public static Object getNestedFieldVal(GenericRecord record, String fieldName) { /** * Create a key generator class via reflection, passing in any configs needed. - * + *

* If the class name of key generator is configured through the properties file, i.e., {@code props}, use the * corresponding key generator class; otherwise, use the default key generator class specified in {@code * DataSourceWriteOptions}. @@ -125,7 +125,7 @@ public static HoodieRecordPayload createPayload(String payloadClass, GenericReco throws IOException { try { return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass, - new Class[]{GenericRecord.class, Comparable.class}, record, orderingVal); + new Class[] {GenericRecord.class, Comparable.class}, record, orderingVal); } catch (Throwable e) { throw new IOException("Could not create payload for class: " + payloadClass, e); } @@ -140,7 +140,7 @@ public static void checkRequiredProperties(TypedProperties props, List c } public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath, - String tblName, Map parameters) { + String tblName, Map parameters) { // inline compaction is on by default for MOR boolean inlineCompact = parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY()) @@ -162,7 +162,7 @@ public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String } public static JavaRDD doWriteOperation(HoodieWriteClient client, JavaRDD hoodieRecords, - String commitTime, String operation) { + String commitTime, String operation) { if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) { return client.bulkInsert(hoodieRecords, commitTime); } else if (operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())) { @@ -174,19 +174,19 @@ public static JavaRDD doWriteOperation(HoodieWriteClient client, Ja } public static JavaRDD doDeleteOperation(HoodieWriteClient client, JavaRDD hoodieKeys, - String commitTime) { + String commitTime) { return client.delete(hoodieKeys, commitTime); } public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey, - String payloadClass) throws IOException { + String payloadClass) throws IOException { HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal); return new HoodieRecord<>(hKey, payload); } @SuppressWarnings("unchecked") public static JavaRDD dropDuplicates(JavaSparkContext jssc, JavaRDD incomingHoodieRecords, - HoodieWriteConfig writeConfig, Option timelineService) { + HoodieWriteConfig writeConfig, Option timelineService) { HoodieReadClient client = null; try { client = new HoodieReadClient<>(jssc, writeConfig, timelineService); @@ -205,7 +205,7 @@ public static JavaRDD dropDuplicates(JavaSparkContext jssc, JavaRD @SuppressWarnings("unchecked") public static JavaRDD dropDuplicates(JavaSparkContext jssc, JavaRDD incomingHoodieRecords, - Map parameters, Option timelineService) { + Map parameters, Option timelineService) { HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build(); return dropDuplicates(jssc, incomingHoodieRecords, writeConfig, timelineService); diff --git a/hudi-spark/src/main/java/org/apache/hudi/OverwriteWithLatestAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/OverwriteWithLatestAvroPayload.java index e860837f2fa0b..32d584eab7e68 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/OverwriteWithLatestAvroPayload.java +++ b/hudi-spark/src/main/java/org/apache/hudi/OverwriteWithLatestAvroPayload.java @@ -38,8 +38,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements HoodieRecordPayload { /** - * @param record - * @param orderingVal + * */ public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) { super(record, orderingVal); @@ -61,8 +60,15 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { + + GenericRecord genericRecord = (GenericRecord) getInsertValue(schema).get(); // combining strategy here trivially ignores currentValue on disk and writes this record - return getInsertValue(schema); + Object deleteMarker = genericRecord.get("_hoodie_is_deleted"); + if (deleteMarker instanceof Boolean && (boolean) deleteMarker) { + return Option.empty(); + } else { + return Option.of(genericRecord); + } } @Override diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 7dfb015efa7d8..a001323d27a2f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -158,8 +158,8 @@ public class DeltaSync implements Serializable { private final HoodieTableType tableType; public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, - HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf, - Function onInitializingHoodieWriteClient) throws IOException { + HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf, + Function onInitializingHoodieWriteClient) throws IOException { this.cfg = cfg; this.jssc = jssc; @@ -288,7 +288,7 @@ private Pair>> readFromSource // default to RowBasedSchemaProvider schemaProvider = this.schemaProvider == null || this.schemaProvider.getTargetSchema() == null ? transformed.map(r -> (SchemaProvider) new RowBasedSchemaProvider(r.schema())).orElse( - dataAndCheckpoint.getSchemaProvider()) + dataAndCheckpoint.getSchemaProvider()) : this.schemaProvider; } else { // Pull the data from the source & prepare the write @@ -316,22 +316,22 @@ private Pair>> readFromSource (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField)); return new HoodieRecord<>(keyGenerator.getKey(gr), payload); }); + return Pair.of(schemaProvider, Pair.of(checkpointStr, records)); } /** * Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive if needed. * - * @param records Input Records + * @param records Input Records * @param checkpointStr Checkpoint String - * @param metrics Metrics + * @param metrics Metrics * @return Option Compaction instant if one is scheduled */ private Option writeToSink(JavaRDD records, String checkpointStr, - HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception { + HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception { Option scheduledCompactionInstant = Option.empty(); - // filter dupes if needed if (cfg.filterDupes) { // turn upserts to insert diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java index a21d26361c396..dd266ed1f72b6 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java @@ -50,10 +50,6 @@ public SourceFormatAdapter(Source source) { /** * Fetch new data in avro format. If the source provides data in different format, they are translated to Avro format - * - * @param lastCkptStr - * @param sourceLimit - * @return */ public InputBatch> fetchNewDataInAvroFormat(Option lastCkptStr, long sourceLimit) { switch (source.getSourceType()) { @@ -78,10 +74,6 @@ public InputBatch> fetchNewDataInAvroFormat(Option> fetchNewDataInRowFormat(Option lastCkptStr, long sourceLimit) { switch (source.getSourceType()) { @@ -95,7 +87,8 @@ public InputBatch> fetchNewDataInRowFormat(Option lastCkptS .ofNullable( r.getBatch() .map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(), - source.getSparkSession())) + source.getSparkSession()) + ) .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider()); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index c5f6c76767a1d..51e5b3cc52c36 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -179,7 +179,7 @@ static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, Stri } static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName, - String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass) { + String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips"; @@ -198,7 +198,7 @@ static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, Stri } static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, Operation op, - boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) { + boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips_copy"; @@ -352,11 +352,11 @@ public void testBulkInsertsAndUpserts() throws Exception { cfg.sourceLimit = 2000; cfg.operation = Operation.UPSERT; new HoodieDeltaStreamer(cfg, jsc).sync(); - TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext); - TestHelpers.assertDistanceCount(2000, datasetBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertRecordCount(1950, datasetBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertDistanceCount(1950, datasetBasePath + "/*/*.parquet", sqlContext); TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2); List counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext); - assertEquals(2000, counts.get(0).getLong(1)); + assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); } @Test @@ -396,8 +396,8 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir } else { TestHelpers.assertAtleastNCompactionCommits(5, datasetBasePath, dfs); } - TestHelpers.assertRecordCount(totalRecords, datasetBasePath + "/*/*.parquet", sqlContext); - TestHelpers.assertDistanceCount(totalRecords, datasetBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertRecordCount(totalRecords + 200, datasetBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertDistanceCount(totalRecords + 200, datasetBasePath + "/*/*.parquet", sqlContext); return true; }, 180); ds.shutdownGracefully(); @@ -457,12 +457,12 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t cfg.sourceLimit = 2000; cfg.operation = Operation.UPSERT; new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); - TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext); - TestHelpers.assertDistanceCount(2000, datasetBasePath + "/*/*.parquet", sqlContext); - TestHelpers.assertDistanceCountWithExactValue(2000, datasetBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertRecordCount(1950, datasetBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertDistanceCount(1950, datasetBasePath + "/*/*.parquet", sqlContext); + TestHelpers.assertDistanceCountWithExactValue(1950, datasetBasePath + "/*/*.parquet", sqlContext); lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2); List counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext); - assertEquals(2000, counts.get(0).getLong(1)); + assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); // Incrementally pull changes in upstream hudi table and apply to downstream table downstreamCfg = @@ -476,7 +476,7 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t String finalInstant = TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamDatasetBasePath, dfs, 2); counts = TestHelpers.countsPerCommit(downstreamDatasetBasePath + "/*/*.parquet", sqlContext); - assertEquals(2000, counts.get(0).getLong(1)); + assertEquals(2000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); // Test Hive integration HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs); @@ -566,12 +566,11 @@ public static class DistanceUDF implements UDF4 apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset rowDataset, - TypedProperties properties) { + TypedProperties properties) { rowDataset.sqlContext().udf().register("distance_udf", new DistanceUDF(), DataTypes.DoubleType); return rowDataset.withColumn("haversine_distance", functions.callUDF("distance_udf", functions.col("begin_lat"), functions.col("end_lat"), functions.col("begin_lon"), functions.col("end_lat"))); @@ -607,7 +606,7 @@ public static class DropAllTransformer implements Transformer { @Override public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset rowDataset, - TypedProperties properties) { + TypedProperties properties) { System.out.println("DropAllTransformer called !!"); return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema()); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java index 745b0f013a25c..4eab369cbd0f1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java @@ -76,12 +76,12 @@ public static void resetDataGen() { } protected AbstractBaseTestSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, - SchemaProvider schemaProvider) { + SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); } protected static Stream fetchNextBatch(TypedProperties props, int sourceLimit, String commitTime, - int partition) { + int partition) { int maxUniqueKeys = props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS); @@ -94,10 +94,12 @@ protected static Stream fetchNextBatch(TypedProperties props, int int numUpdates = Math.min(numExistingKeys, sourceLimit / 2); int numInserts = sourceLimit - numUpdates; LOG.info("Before adjustments => numInserts=" + numInserts + ", numUpdates=" + numUpdates); + boolean reachedMax = false; if (numInserts + numExistingKeys > maxUniqueKeys) { // Limit inserts so that maxUniqueRecords is maintained numInserts = Math.max(0, maxUniqueKeys - numExistingKeys); + reachedMax = true; } if ((numInserts + numUpdates) < sourceLimit) { @@ -105,16 +107,25 @@ protected static Stream fetchNextBatch(TypedProperties props, int numUpdates = Math.min(numExistingKeys, sourceLimit - numInserts); } - LOG.info("NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys); + Stream deleteStream = Stream.empty(); + Stream updateStream; long memoryUsage1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); LOG.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total Memory=" + Runtime.getRuntime().totalMemory() + ", Free Memory=" + Runtime.getRuntime().freeMemory()); - - Stream updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates) - .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); + if (!reachedMax && numUpdates >= 50) { + LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + (numUpdates - 50) + ", NumDeletes=50, maxUniqueRecords=" + + maxUniqueKeys); + // if we generate update followed by deletes -> some keys in update batch might be picked up for deletes. Hence generating delete batch followed by updates + deleteStream = dataGenerator.generateUniqueDeleteRecordStream(commitTime, 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); + updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates - 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); + } else { + LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys); + updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates) + .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); + } Stream insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts) .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator)); - return Stream.concat(updateStream, insertStream); + return Stream.concat(deleteStream, Stream.concat(updateStream, insertStream)); } private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) { diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc index 2796e081d1e86..95757a3e1a23b 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc +++ b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc @@ -43,9 +43,15 @@ }, { "name" : "end_lon", "type" : "double" - }, { + }, + { "name" : "fare", "type" : "double" + }, + { + "name" : "_hoodie_is_deleted", + "type" : "boolean", + "default" : false } ] } diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties index 16a09ed05d22b..e8b285703422c 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties +++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties @@ -16,4 +16,4 @@ # limitations under the License. ### include=base.properties -hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.fare, CAST(1.0 AS DOUBLE) AS haversine_distance FROM a +hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.fare, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM a diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc index 11e23a493b874..38e72556fe63f 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc +++ b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc @@ -46,9 +46,15 @@ }, { "name" : "fare", "type" : "double" - }, { - "name" : "haversine_distance", - "type" : "double" + }, + { + "name" : "_hoodie_is_deleted", + "type" : "boolean", + "default" : false + }, + { + "name" : "haversine_distance", + "type" : "double" }] }