Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,7 @@ public int getMaxConsistencyCheckIntervalMs() {
}

public BulkInsertSortMode getBulkInsertSortMode() {
String sortMode = getString(BULK_INSERT_SORT_MODE);
String sortMode = getStringOrDefault(BULK_INSERT_SORT_MODE);
return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@

package org.apache.hudi.client.clustering.run.strategy;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.ConcatenatingIterator;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
Expand Down Expand Up @@ -58,11 +62,6 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -246,21 +245,28 @@ private JavaRDD<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext js
*/
private JavaRDD<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc,
List<ClusteringOperation> clusteringOps) {
return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
List<Iterator<IndexedRecord>> iteratorsForPartition = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
HoodieFileReader<IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
});
SerializableConfiguration hadoopConf = new SerializableConfiguration(getHoodieTable().getHadoopConf());
HoodieWriteConfig writeConfig = getWriteConfig();

// NOTE: It's crucial to make sure that we don't capture whole "this" object into the
// closure, as this might lead to issues attempting to serialize its nested fields
return jsc.parallelize(clusteringOps, clusteringOps.size())
.mapPartitions(clusteringOpsPartition -> {
List<Iterator<IndexedRecord>> iteratorsForPartition = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema()));
HoodieFileReader<IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(clusteringOp.getDataFilePath()));
iteratorsForPartition.add(baseFileReader.getRecordIterator(readerSchema));
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " + clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
}
});

return new ConcatenatingIterator<>(iteratorsForPartition);
}).map(this::transform);
return new ConcatenatingIterator<>(iteratorsForPartition);
})
.map(record -> transform(record, writeConfig));
}

/**
Expand All @@ -279,12 +285,12 @@ private JavaRDD<WriteStatus>[] convertStreamToArray(Stream<JavaRDD<WriteStatus>>
/**
* Transform IndexedRecord into HoodieRecord.
*/
private HoodieRecord<T> transform(IndexedRecord indexedRecord) {
private static <T> HoodieRecord<T> transform(IndexedRecord indexedRecord, HoodieWriteConfig writeConfig) {
GenericRecord record = (GenericRecord) indexedRecord;
Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
if (!getWriteConfig().populateMetaFields()) {
if (!writeConfig.populateMetaFields()) {
try {
keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(getWriteConfig().getProps())));
keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps()));
} catch (IOException e) {
throw new HoodieIOException("Only BaseKeyGenerators are supported when meta columns are disabled ", e);
}
Expand Down