diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index c2c242ab1816a..7b3324e4b569e 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -96,6 +96,20 @@ public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throw HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), cfg.targetBasePath, HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived"); } + + if (cfg.cleanInput) { + Path inputPath = new Path(cfg.inputBasePath); + if (fs.exists(inputPath)) { + fs.delete(inputPath, true); + } + } + + if (cfg.cleanOutput) { + Path outputPath = new Path(cfg.targetBasePath); + if (fs.exists(outputPath)) { + fs.delete(outputPath, true); + } + } } private static HiveConf getDefaultHiveConf(Configuration cfg) { @@ -175,9 +189,24 @@ public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config { required = true) public Long limitFileSize = 1024 * 1024 * 120L; + @Parameter(names = {"--input-parallelism"}, description = "Parallelism to use when generation input files", + required = false) + public Integer inputParallelism = 0; + + @Parameter(names = {"--delete-old-input"}, description = "Delete older input files once they have been ingested", + required = false) + public Boolean deleteOldInput = false; + @Parameter(names = {"--use-deltastreamer"}, description = "Choose whether to use HoodieDeltaStreamer to " + "perform ingestion. If set to false, HoodieWriteClient will be used") public Boolean useDeltaStreamer = false; + @Parameter(names = {"--clean-input"}, description = "Clean the input folders and delete all files within it " + + "before starting the Job") + public Boolean cleanInput = false; + + @Parameter(names = {"--clean-output"}, description = "Clean the output folders and delete all files within it " + + "before starting the Job") + public Boolean cleanOutput = false; } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java index 2915628081ed5..0ac36687f485c 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java @@ -36,15 +36,22 @@ public class DFSDeltaConfig extends DeltaConfig { private final Long maxFileSize; // The current batch id private Integer batchId; + // Paralleism to use when generating input data + private int inputParallelism; + // Whether to delete older input data once it has been ingested + private boolean deleteOldInputData; public DFSDeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInputType, SerializableConfiguration configuration, - String deltaBasePath, String targetBasePath, String schemaStr, Long maxFileSize) { + String deltaBasePath, String targetBasePath, String schemaStr, Long maxFileSize, + int inputParallelism, boolean deleteOldInputData) { super(deltaOutputMode, deltaInputType, configuration); this.deltaBasePath = deltaBasePath; this.schemaStr = schemaStr; this.maxFileSize = maxFileSize; this.datasetOutputPath = targetBasePath; + this.inputParallelism = inputParallelism; + this.deleteOldInputData = deleteOldInputData; } public String getDeltaBasePath() { @@ -70,4 +77,12 @@ public Integer getBatchId() { public void setBatchId(Integer batchId) { this.batchId = batchId; } + + public int getInputParallelism() { + return inputParallelism; + } + + public boolean shouldDeleteOldInputData() { + return deleteOldInputData; + } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index 7a66681e59ed6..db156046418ac 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -83,6 +83,7 @@ public static class Config { private static String DISABLE_INGEST = "disable_ingest"; private static String HIVE_LOCAL = "hive_local"; private static String REINIT_CONTEXT = "reinitialize_context"; + private static String START_PARTITION = "start_partition"; private Map configsMap; @@ -118,8 +119,12 @@ public int getNumUpsertPartitions() { return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_UPSERT, 0).toString()); } + public int getStartPartition() { + return Integer.valueOf(configsMap.getOrDefault(START_PARTITION, 0).toString()); + } + public int getNumUpsertFiles() { - return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 1).toString()); + return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 0).toString()); } public double getFractionUpsertPerFile() { @@ -207,6 +212,11 @@ public Builder withFractionUpsertPerFile(double fractionUpsertPerFile) { return this; } + public Builder withStartPartition(int startPartition) { + this.configsMap.put(START_PARTITION, startPartition); + return this; + } + public Builder withNumTimesToRepeat(int repeatCount) { this.configsMap.put(REPEAT_COUNT, repeatCount); return this; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java index 320c986323b32..00cb2d3e64a4c 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java @@ -67,10 +67,11 @@ public void initContext(JavaSparkContext jsc) throws HoodieException { this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc); String schemaStr = schemaProvider.getSourceSchema().toString(); this.hoodieTestSuiteWriter = new HoodieTestSuiteWriter(jsc, props, cfg, schemaStr); + int inputParallelism = cfg.inputParallelism > 0 ? cfg.inputParallelism : jsc.defaultParallelism(); this.deltaGenerator = new DeltaGenerator( new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), DeltaInputType.valueOf(cfg.inputFormatName), new SerializableConfiguration(jsc.hadoopConfiguration()), cfg.inputBasePath, cfg.targetBasePath, - schemaStr, cfg.limitFileSize), + schemaStr, cfg.limitFileSize, inputParallelism, cfg.deleteOldInput), jsc, sparkSession, schemaStr, keyGenerator); log.info(String.format("Initialized writerContext with: %s", schemaStr)); } catch (Exception e) { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java index 8dc7f4be52ca7..dc991b11e7602 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java @@ -28,9 +28,17 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.StreamSupport; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.integ.testsuite.converter.UpdateConverter; import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader; import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader; @@ -41,7 +49,6 @@ import org.apache.hudi.integ.testsuite.writer.DeltaWriterFactory; import org.apache.hudi.keygen.BuiltinKeyGenerator; import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig; -import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -58,7 +65,7 @@ public class DeltaGenerator implements Serializable { private static Logger log = LoggerFactory.getLogger(DeltaGenerator.class); - private DeltaConfig deltaOutputConfig; + private DFSDeltaConfig deltaOutputConfig; private transient JavaSparkContext jsc; private transient SparkSession sparkSession; private String schemaStr; @@ -66,7 +73,7 @@ public class DeltaGenerator implements Serializable { private List partitionPathFieldNames; private int batchId; - public DeltaGenerator(DeltaConfig deltaOutputConfig, JavaSparkContext jsc, SparkSession sparkSession, + public DeltaGenerator(DFSDeltaConfig deltaOutputConfig, JavaSparkContext jsc, SparkSession sparkSession, String schemaStr, BuiltinKeyGenerator keyGenerator) { this.deltaOutputConfig = deltaOutputConfig; this.jsc = jsc; @@ -77,6 +84,16 @@ public DeltaGenerator(DeltaConfig deltaOutputConfig, JavaSparkContext jsc, Spark } public JavaRDD writeRecords(JavaRDD records) { + if (deltaOutputConfig.shouldDeleteOldInputData() && batchId > 1) { + Path oldInputDir = new Path(deltaOutputConfig.getDeltaBasePath(), Integer.toString(batchId - 1)); + try { + FileSystem fs = FSUtils.getFs(oldInputDir.toString(), deltaOutputConfig.getConfiguration()); + fs.delete(oldInputDir, true); + } catch (IOException e) { + log.error("Failed to delete older input data direcory " + oldInputDir, e); + } + } + // The following creates a new anonymous function for iterator and hence results in serialization issues JavaRDD ws = records.mapPartitions(itr -> { try { @@ -95,11 +112,22 @@ public JavaRDD generateInserts(Config operation) { int numPartitions = operation.getNumInsertPartitions(); long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions; int minPayloadSize = operation.getRecordSize(); - JavaRDD inputBatch = jsc.parallelize(Collections.EMPTY_LIST) - .repartition(numPartitions).mapPartitions(p -> { + int startPartition = operation.getStartPartition(); + + // Each spark partition below will generate records for a single partition given by the integer index. + List partitionIndexes = IntStream.rangeClosed(0 + startPartition, numPartitions + startPartition) + .boxed().collect(Collectors.toList()); + + JavaRDD inputBatch = jsc.parallelize(partitionIndexes, numPartitions) + .mapPartitionsWithIndex((index, p) -> { return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition, - minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions)); - }); + minPayloadSize, schemaStr, partitionPathFieldNames, (Integer)index)); + }, true); + + if (deltaOutputConfig.getInputParallelism() < numPartitions) { + inputBatch = inputBatch.coalesce(deltaOutputConfig.getInputParallelism()); + } + return inputBatch; } @@ -131,9 +159,11 @@ public JavaRDD generateUpdates(Config config) throws IOException } } - log.info("Repartitioning records"); // persist this since we will make multiple passes over this - adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism()); + int numPartition = Math.min(deltaOutputConfig.getInputParallelism(), + Math.max(1, config.getNumUpsertPartitions())); + log.info("Repartitioning records into " + numPartition + " partitions"); + adjustedRDD = adjustedRDD.repartition(numPartition); log.info("Repartitioning records done"); UpdateConverter converter = new UpdateConverter(schemaStr, config.getRecordSize(), partitionPathFieldNames, recordRowKeyFieldNames); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java index 512118fa1df2c..270dcd1696abb 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java @@ -18,8 +18,11 @@ package org.apache.hudi.integ.testsuite.generator; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -37,18 +40,18 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator partitionPathFieldNames; + private Set partitionPathFieldNames; public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, String schema) { - this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, GenericRecordFullPayloadGenerator.DEFAULT_NUM_DATE_PARTITIONS); + this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, 0); } public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, int minPayloadSize, String schemaStr, - List partitionPathFieldNames, int numPartitions) { + List partitionPathFieldNames, int partitionIndex) { this.counter = maxEntriesToProduce; - this.partitionPathFieldNames = partitionPathFieldNames; + this.partitionPathFieldNames = new HashSet<>(partitionPathFieldNames); Schema schema = new Schema.Parser().parse(schemaStr); - this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, numPartitions); + this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, partitionIndex); } @Override @@ -60,7 +63,7 @@ public boolean hasNext() { public GenericRecord next() { this.counter--; if (lastRecord == null) { - GenericRecord record = this.generator.getNewPayload(); + GenericRecord record = this.generator.getNewPayload(partitionPathFieldNames); lastRecord = record; return record; } else { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java index df9a4499254af..f61fad6cd54ac 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java @@ -22,10 +22,12 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -47,14 +49,13 @@ public class GenericRecordFullPayloadGenerator implements Serializable { private static Logger LOG = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class); public static final int DEFAULT_PAYLOAD_SIZE = 1024 * 10; // 10 KB - public static final int DEFAULT_NUM_DATE_PARTITIONS = 50; protected final Random random = new Random(); // The source schema used to generate a payload private final transient Schema baseSchema; // Used to validate a generic record private final transient GenericData genericData = new GenericData(); - // The number of unique dates to create - private int numDatePartitions = DEFAULT_NUM_DATE_PARTITIONS; + // The index of partition for which records are being generated + private int partitionIndex = 0; // The size of a full record where every field of a generic record created contains 1 random value private final int estimatedFullPayloadSize; // Number of extra entries to add in a complex/collection field to achieve the desired record size @@ -89,9 +90,9 @@ public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize) { } } - public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, int numDatePartitions) { + public GenericRecordFullPayloadGenerator(Schema schema, int minPayloadSize, int partitionIndex) { this(schema, minPayloadSize); - this.numDatePartitions = numDatePartitions; + this.partitionIndex = partitionIndex; } protected static boolean isPrimitive(Schema localSchema) { @@ -115,7 +116,50 @@ public GenericRecord getNewPayload() { } protected GenericRecord getNewPayload(Schema schema) { - return randomize(new GenericData.Record(schema), null); + return create(schema, null); + } + + /** + * Create a new {@link GenericRecord} with random value according to given schema. + * + * Long fields which are specified within partitionPathFieldNames are constrained to the value of the partition + * for which records are being generated. + * + * @return {@link GenericRecord} with random value + */ + public GenericRecord getNewPayload(Set partitionPathFieldNames) { + return create(baseSchema, partitionPathFieldNames); + } + + protected GenericRecord create(Schema schema, Set partitionPathFieldNames) { + GenericRecord result = new GenericData.Record(schema); + for (Schema.Field f : schema.getFields()) { + if (isPartialLongField(f, partitionPathFieldNames)) { + // This is a long field used as partition field. Set it to seconds since epoch. + long value = TimeUnit.SECONDS.convert(partitionIndex, TimeUnit.DAYS); + result.put(f.name(), (long)value); + } else { + result.put(f.name(), typeConvert(f)); + } + } + return result; + } + + /** + * Return true if this is a partition field of type long which should be set to the partition index. + * @return + */ + private boolean isPartialLongField(Schema.Field field, Set partitionPathFieldNames) { + if ((partitionPathFieldNames == null) || !partitionPathFieldNames.contains(field.name())) { + return false; + } + + Schema fieldSchema = field.schema(); + if (isOption(fieldSchema)) { + fieldSchema = getNonNull(fieldSchema); + } + + return fieldSchema.getType() == org.apache.avro.Schema.Type.LONG; } /** @@ -125,7 +169,7 @@ protected GenericRecord getNewPayload(Schema schema) { * @param blacklistFields Fields whose value should not be touched * @return The updated {@link GenericRecord} */ - public GenericRecord getUpdatePayload(GenericRecord record, List blacklistFields) { + public GenericRecord getUpdatePayload(GenericRecord record, Set blacklistFields) { return randomize(record, blacklistFields); } @@ -158,7 +202,7 @@ protected GenericRecord convertPartial(Schema schema) { * @param blacklistFields blacklistFields where the filed will not be randomized. * @return Randomized GenericRecord. */ - protected GenericRecord randomize(GenericRecord record, List blacklistFields) { + protected GenericRecord randomize(GenericRecord record, Set blacklistFields) { for (Schema.Field f : record.getSchema().getFields()) { if (blacklistFields == null || !blacklistFields.contains(f.name())) { record.put(f.name(), typeConvert(f)); @@ -167,12 +211,6 @@ protected GenericRecord randomize(GenericRecord record, List blacklistFi return record; } - private long getNextConstrainedLong() { - int numPartitions = random.nextInt(numDatePartitions); - long unixTimeStamp = TimeUnit.SECONDS.convert(numPartitions, TimeUnit.DAYS); - return unixTimeStamp; - } - /** * Generate random value according to their type. */ @@ -191,7 +229,7 @@ private Object typeConvert(Schema.Field field) { case INT: return random.nextInt(); case LONG: - return getNextConstrainedLong(); + return random.nextLong(); case STRING: return UUID.randomUUID().toString(); case ENUM: diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java index a33ef0c7fe400..d9d137a422a2e 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java @@ -18,9 +18,11 @@ package org.apache.hudi.integ.testsuite.generator; -import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -31,14 +33,14 @@ public class UpdateGeneratorIterator implements Iterator { // Use the full payload generator as default private GenericRecordFullPayloadGenerator generator; - private List blackListedFields; + private Set blackListedFields; // iterator private Iterator itr; public UpdateGeneratorIterator(Iterator itr, String schemaStr, List partitionPathFieldNames, List recordKeyFieldNames, int minPayloadSize) { this.itr = itr; - this.blackListedFields = new ArrayList<>(); + this.blackListedFields = new HashSet<>(); this.blackListedFields.addAll(partitionPathFieldNames); this.blackListedFields.addAll(recordKeyFieldNames); Schema schema = new Schema.Parser().parse(schemaStr); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index e209118db1eda..cfe7991f43ee5 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -36,6 +36,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; @@ -78,8 +79,10 @@ public DFSHoodieDatasetInputReader(JavaSparkContext jsc, String basePath, String } protected List getPartitions(Option partitionsLimit) throws IOException { - List partitionPaths = FSUtils - .getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), false); + // Using FSUtils.getFS here instead of metaClient.getFS() since we dont want to count these listStatus + // calls in metrics as they are not part of normal HUDI operation. + FileSystem fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf()); + List partitionPaths = FSUtils.getAllPartitionPaths(fs, metaClient.getBasePath(), false); // Sort partition so we can pick last N partitions by default Collections.sort(partitionPaths); if (!partitionPaths.isEmpty()) { @@ -136,6 +139,9 @@ private JavaRDD fetchRecordsFromDataset(Option numPartit // Read all file slices in the partition JavaPairRDD> partitionToFileSlice = getPartitionToFileSlice(metaClient, partitionPaths); + Map partitionToFileIdCountMap = partitionToFileSlice + .mapToPair(p -> new Tuple2<>(p._1, iteratorSize(p._2))).collectAsMap(); + // TODO : read record count from metadata // Read the records in a single file long recordsInSingleFile = iteratorSize(readParquetOrLogFiles(getSingleSliceFromRDD(partitionToFileSlice))); @@ -144,7 +150,11 @@ private JavaRDD fetchRecordsFromDataset(Option numPartit if (!numFiles.isPresent() || numFiles.get() == 0) { // If num files are not passed, find the number of files to update based on total records to update and records // per file - numFilesToUpdate = (int) (numRecordsToUpdate.get() / recordsInSingleFile); + numFilesToUpdate = (int)Math.ceil((double)numRecordsToUpdate.get() / recordsInSingleFile); + // recordsInSingleFile is not average so we still need to account for bias is records distribution + // in the files. Limit to the maximum number of files available. + int totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get(); + numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount); log.info("Files to update {}", numFilesToUpdate); numRecordsToUpdatePerFile = recordsInSingleFile; } else { @@ -154,9 +164,10 @@ private JavaRDD fetchRecordsFromDataset(Option numPartit numRecordsToUpdatePerFile = percentageRecordsPerFile.isPresent() ? (long) (recordsInSingleFile * percentageRecordsPerFile.get()) : numRecordsToUpdate.get() / numFilesToUpdate; } + // Adjust the number of files to read per partition based on the requested partition & file counts Map adjustedPartitionToFileIdCountMap = getFilesToReadPerPartition(partitionToFileSlice, - partitionPaths.size(), numFilesToUpdate); + partitionPaths.size(), numFilesToUpdate, partitionToFileIdCountMap); JavaRDD updates = projectSchema(generateUpdates(adjustedPartitionToFileIdCountMap, partitionToFileSlice, numFilesToUpdate, (int) numRecordsToUpdatePerFile)); if (numRecordsToUpdate.isPresent() && numFiles.isPresent() && numFiles.get() != 0 && numRecordsToUpdate.get() @@ -190,10 +201,7 @@ private JavaRDD generateUpdates(Map adjustedPart } private Map getFilesToReadPerPartition(JavaPairRDD> - partitionToFileSlice, Integer numPartitions, Integer numFiles) { - int numFilesPerPartition = (int) Math.ceil(numFiles / numPartitions); - Map partitionToFileIdCountMap = partitionToFileSlice - .mapToPair(p -> new Tuple2<>(p._1, iteratorSize(p._2))).collectAsMap(); + partitionToFileSlice, Integer numPartitions, Integer numFiles, Map partitionToFileIdCountMap) { long totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get(); ValidationUtils.checkArgument(totalExistingFilesCount >= numFiles, "Cannot generate updates " + "for more files than present in the dataset, file requested " + numFiles + ", files present " @@ -204,7 +212,9 @@ private Map getFilesToReadPerPartition(JavaPairRDD e2, LinkedHashMap::new)); + // Limit files to be read per partition + int numFilesPerPartition = (int) Math.ceil((double)numFiles / numPartitions); Map adjustedPartitionToFileIdCountMap = new HashMap<>(); partitionToFileIdCountSortedMap.entrySet().stream().forEach(e -> { if (e.getValue() <= numFilesPerPartition) { diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java index ff41b44dd6253..ff92bd037d558 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java @@ -125,7 +125,7 @@ public void testDFSTwoFilesWriteWithRollover() throws IOException { public void testDFSWorkloadSinkWithMultipleFilesFunctional() throws IOException { DeltaConfig dfsSinkConfig = new DFSDeltaConfig(DeltaOutputMode.DFS, DeltaInputType.AVRO, new SerializableConfiguration(jsc.hadoopConfiguration()), dfsBasePath, dfsBasePath, - schemaProvider.getSourceSchema().toString(), 10240L); + schemaProvider.getSourceSchema().toString(), 10240L, jsc.defaultParallelism(), false); DeltaWriterAdapter dfsDeltaWriterAdapter = DeltaWriterFactory .getDeltaWriterAdapter(dfsSinkConfig, 1); FlexibleSchemaRecordGenerationIterator itr = new FlexibleSchemaRecordGenerationIterator(1000, diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java index 7524d4af8e9b7..94515959d01cd 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java @@ -24,7 +24,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.IntStream; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -92,7 +94,8 @@ public void testUpdatePayloadGenerator() throws IOException { insertRowKeys.add(record.get("_row_key").toString()); insertTimeStamps.add((Long) record.get("timestamp")); }); - List blacklistFields = Arrays.asList("_row_key"); + Set blacklistFields = new HashSet<>(); + blacklistFields.add("_row_key"); records.stream().forEach(a -> { // Generate 10 updated records GenericRecord record = payloadGenerator.getUpdatePayload(a, blacklistFields);