Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,20 @@ public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throw
HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), cfg.targetBasePath,
HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, "archived");
}

if (cfg.cleanInput) {
Path inputPath = new Path(cfg.inputBasePath);
if (fs.exists(inputPath)) {
fs.delete(inputPath, true);
}
}

if (cfg.cleanOutput) {
Path outputPath = new Path(cfg.targetBasePath);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
}
}

private static HiveConf getDefaultHiveConf(Configuration cfg) {
Expand Down Expand Up @@ -175,9 +189,24 @@ public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config {
required = true)
public Long limitFileSize = 1024 * 1024 * 120L;

@Parameter(names = {"--input-parallelism"}, description = "Parallelism to use when generation input files",
required = false)
public Integer inputParallelism = 0;

@Parameter(names = {"--delete-old-input"}, description = "Delete older input files once they have been ingested",
required = false)
public Boolean deleteOldInput = false;

@Parameter(names = {"--use-deltastreamer"}, description = "Choose whether to use HoodieDeltaStreamer to "
+ "perform ingestion. If set to false, HoodieWriteClient will be used")
public Boolean useDeltaStreamer = false;

@Parameter(names = {"--clean-input"}, description = "Clean the input folders and delete all files within it "
+ "before starting the Job")
public Boolean cleanInput = false;

@Parameter(names = {"--clean-output"}, description = "Clean the output folders and delete all files within it "
+ "before starting the Job")
public Boolean cleanOutput = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,22 @@ public class DFSDeltaConfig extends DeltaConfig {
private final Long maxFileSize;
// The current batch id
private Integer batchId;
// Paralleism to use when generating input data
private int inputParallelism;
// Whether to delete older input data once it has been ingested
private boolean deleteOldInputData;

public DFSDeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInputType,
SerializableConfiguration configuration,
String deltaBasePath, String targetBasePath, String schemaStr, Long maxFileSize) {
String deltaBasePath, String targetBasePath, String schemaStr, Long maxFileSize,
int inputParallelism, boolean deleteOldInputData) {
super(deltaOutputMode, deltaInputType, configuration);
this.deltaBasePath = deltaBasePath;
this.schemaStr = schemaStr;
this.maxFileSize = maxFileSize;
this.datasetOutputPath = targetBasePath;
this.inputParallelism = inputParallelism;
this.deleteOldInputData = deleteOldInputData;
}

public String getDeltaBasePath() {
Expand All @@ -70,4 +77,12 @@ public Integer getBatchId() {
public void setBatchId(Integer batchId) {
this.batchId = batchId;
}

public int getInputParallelism() {
return inputParallelism;
}

public boolean shouldDeleteOldInputData() {
return deleteOldInputData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public static class Config {
private static String DISABLE_INGEST = "disable_ingest";
private static String HIVE_LOCAL = "hive_local";
private static String REINIT_CONTEXT = "reinitialize_context";
private static String START_PARTITION = "start_partition";

private Map<String, Object> configsMap;

Expand Down Expand Up @@ -118,8 +119,12 @@ public int getNumUpsertPartitions() {
return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_UPSERT, 0).toString());
}

public int getStartPartition() {
return Integer.valueOf(configsMap.getOrDefault(START_PARTITION, 0).toString());
}

public int getNumUpsertFiles() {
return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 1).toString());
return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 0).toString());
}

public double getFractionUpsertPerFile() {
Expand Down Expand Up @@ -207,6 +212,11 @@ public Builder withFractionUpsertPerFile(double fractionUpsertPerFile) {
return this;
}

public Builder withStartPartition(int startPartition) {
this.configsMap.put(START_PARTITION, startPartition);
return this;
}

public Builder withNumTimesToRepeat(int repeatCount) {
this.configsMap.put(REPEAT_COUNT, repeatCount);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ public void initContext(JavaSparkContext jsc) throws HoodieException {
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc);
String schemaStr = schemaProvider.getSourceSchema().toString();
this.hoodieTestSuiteWriter = new HoodieTestSuiteWriter(jsc, props, cfg, schemaStr);
int inputParallelism = cfg.inputParallelism > 0 ? cfg.inputParallelism : jsc.defaultParallelism();
this.deltaGenerator = new DeltaGenerator(
new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), DeltaInputType.valueOf(cfg.inputFormatName),
new SerializableConfiguration(jsc.hadoopConfiguration()), cfg.inputBasePath, cfg.targetBasePath,
schemaStr, cfg.limitFileSize),
schemaStr, cfg.limitFileSize, inputParallelism, cfg.deleteOldInput),
jsc, sparkSession, schemaStr, keyGenerator);
log.info(String.format("Initialized writerContext with: %s", schemaStr));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,17 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.integ.testsuite.converter.UpdateConverter;
import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader;
import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader;
Expand All @@ -41,7 +49,6 @@
import org.apache.hudi.integ.testsuite.writer.DeltaWriterFactory;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -58,15 +65,15 @@ public class DeltaGenerator implements Serializable {

private static Logger log = LoggerFactory.getLogger(DeltaGenerator.class);

private DeltaConfig deltaOutputConfig;
private DFSDeltaConfig deltaOutputConfig;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose behind this change ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DFSDeltaConfig extends DeltaConfig

The two settings I have added (getInputParallelism and shouldDeleteOldInputData) are in DFSDeltaConfig.

private transient JavaSparkContext jsc;
private transient SparkSession sparkSession;
private String schemaStr;
private List<String> recordRowKeyFieldNames;
private List<String> partitionPathFieldNames;
private int batchId;

public DeltaGenerator(DeltaConfig deltaOutputConfig, JavaSparkContext jsc, SparkSession sparkSession,
public DeltaGenerator(DFSDeltaConfig deltaOutputConfig, JavaSparkContext jsc, SparkSession sparkSession,
String schemaStr, BuiltinKeyGenerator keyGenerator) {
this.deltaOutputConfig = deltaOutputConfig;
this.jsc = jsc;
Expand All @@ -77,6 +84,16 @@ public DeltaGenerator(DeltaConfig deltaOutputConfig, JavaSparkContext jsc, Spark
}

public JavaRDD<DeltaWriteStats> writeRecords(JavaRDD<GenericRecord> records) {
if (deltaOutputConfig.shouldDeleteOldInputData() && batchId > 1) {
Path oldInputDir = new Path(deltaOutputConfig.getDeltaBasePath(), Integer.toString(batchId - 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not work in case the last batches were rolled back. Can you take a look at RollbackNode and see what will be the implication ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RollbackNode will rollback the last commit. This should not interfere will these input directories.

The shouldDeleteOldInputData() setting only affects the data generated in the "input" directory (a separate directory) which is not part of the HUDI dataset under test. For each Node in the yaml, a sub-directory in the input directory (identified by batchId) is created. Within this new sub-directory, the data to be ingested as part of the Node is written as avro files.

We are deleting older input sub-directories. The default is to not delete anything.

try {
FileSystem fs = FSUtils.getFs(oldInputDir.toString(), deltaOutputConfig.getConfiguration());
fs.delete(oldInputDir, true);
} catch (IOException e) {
log.error("Failed to delete older input data direcory " + oldInputDir, e);
}
}

// The following creates a new anonymous function for iterator and hence results in serialization issues
JavaRDD<DeltaWriteStats> ws = records.mapPartitions(itr -> {
try {
Expand All @@ -95,11 +112,22 @@ public JavaRDD<GenericRecord> generateInserts(Config operation) {
int numPartitions = operation.getNumInsertPartitions();
long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;
int minPayloadSize = operation.getRecordSize();
JavaRDD<GenericRecord> inputBatch = jsc.parallelize(Collections.EMPTY_LIST)
.repartition(numPartitions).mapPartitions(p -> {
int startPartition = operation.getStartPartition();
Copy link
Contributor

@n3nash n3nash Oct 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain the startPartition with an example ? What happens when a spark stage is retried ? Take a look at how spark stage retries mess up the partition numbers to understand more..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose you insert 5 partitions. Then the following 5 new LazyRecordGeneratorIterator will be created:
new LazyRecordGeneratorIterator(..., 0)
new LazyRecordGeneratorIterator(..., 1)
new LazyRecordGeneratorIterator(..., 2)
new LazyRecordGeneratorIterator(..., 3)
new LazyRecordGeneratorIterator(..., 4)

Within the LazyRecordGeneratorIterator code, the integer for partition index (0, 1, .. above) are converted into partition timstamp (as date offset from 1970/01/01). So the first LazyRecordGeneratorIterator will be generating records from 1970/01/01, the second LazyRecordGeneratorIterator will generate records for 1970/01/02 ... and so on.

With this schema, the record generation always starts at offset 0. But what if you want to generate for only a specific partition? Or add new partition? This is where the start_offset comes into play.

new LazyRecordGeneratorIterator(..., 0 + start_offset)
new LazyRecordGeneratorIterator(..., 1 + start_offset)
new LazyRecordGeneratorIterator(..., 2 + start_offset)
new LazyRecordGeneratorIterator(..., 3 + start_offset)
new LazyRecordGeneratorIterator(..., 4 + start_offset)

By using a start_offset you can alter where the inserts will take place. Also new partitions can be created.

Spark retries can alter the partition numbers here. For that, we can use a pre-formatted List with partitions here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that makes sense @prashantwason. Spark retries are pretty common, lets handle that use-case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already done. Please see the update.


// Each spark partition below will generate records for a single partition given by the integer index.
List<Integer> partitionIndexes = IntStream.rangeClosed(0 + startPartition, numPartitions + startPartition)
.boxed().collect(Collectors.toList());

JavaRDD<GenericRecord> inputBatch = jsc.parallelize(partitionIndexes, numPartitions)
.mapPartitionsWithIndex((index, p) -> {
return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions));
});
minPayloadSize, schemaStr, partitionPathFieldNames, (Integer)index));
}, true);

if (deltaOutputConfig.getInputParallelism() < numPartitions) {
inputBatch = inputBatch.coalesce(deltaOutputConfig.getInputParallelism());
}

return inputBatch;
}

Expand Down Expand Up @@ -131,9 +159,11 @@ public JavaRDD<GenericRecord> generateUpdates(Config config) throws IOException
}
}

log.info("Repartitioning records");
// persist this since we will make multiple passes over this
adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism());
int numPartition = Math.min(deltaOutputConfig.getInputParallelism(),
Math.max(1, config.getNumUpsertPartitions()));
log.info("Repartitioning records into " + numPartition + " partitions");
adjustedRDD = adjustedRDD.repartition(numPartition);
log.info("Repartitioning records done");
UpdateConverter converter = new UpdateConverter(schemaStr, config.getRecordSize(),
partitionPathFieldNames, recordRowKeyFieldNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package org.apache.hudi.integ.testsuite.generator;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

Expand All @@ -37,18 +40,18 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator<GenericR
// Store last record for the partition path of the first payload to be used for all subsequent generated payloads
private GenericRecord lastRecord;
// Partition path field name
private List<String> partitionPathFieldNames;
private Set<String> partitionPathFieldNames;

public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, String schema) {
this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, GenericRecordFullPayloadGenerator.DEFAULT_NUM_DATE_PARTITIONS);
this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, 0);
}

public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, int minPayloadSize, String schemaStr,
List<String> partitionPathFieldNames, int numPartitions) {
List<String> partitionPathFieldNames, int partitionIndex) {
this.counter = maxEntriesToProduce;
this.partitionPathFieldNames = partitionPathFieldNames;
this.partitionPathFieldNames = new HashSet<>(partitionPathFieldNames);
Schema schema = new Schema.Parser().parse(schemaStr);
this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, numPartitions);
this.generator = new GenericRecordFullPayloadGenerator(schema, minPayloadSize, partitionIndex);
}

@Override
Expand All @@ -60,7 +63,7 @@ public boolean hasNext() {
public GenericRecord next() {
this.counter--;
if (lastRecord == null) {
GenericRecord record = this.generator.getNewPayload();
GenericRecord record = this.generator.getNewPayload(partitionPathFieldNames);
lastRecord = record;
return record;
} else {
Expand Down
Loading