Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
Expand Down Expand Up @@ -83,7 +83,7 @@ public String showArchivedCommits(
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
blk.getRecordIterator(HoodieAvroIndexedRecord::new).forEachRemaining(r -> readRecords.add((IndexedRecord) r.getData()));
blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r -> readRecords.add((IndexedRecord) r.getData()));
}
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r)
.filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
Expand Down Expand Up @@ -157,8 +157,8 @@ public String showCommits(
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
try (ClosableIterator<HoodieRecord> recordItr = blk.getRecordIterator(HoodieAvroIndexedRecord::new)) {
recordItr.forEachRemaining(r -> readRecords.add((IndexedRecord) r.getData()));
try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) {
recordItr.forEachRemaining(r -> readRecords.add(r.getData()));
}
}
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
Expand Down Expand Up @@ -125,9 +125,9 @@ private int copyArchivedInstants(List<FileStatus> statuses, Set<String> actionSe
// read the avro blocks
while (reader.hasNext() && copyCount < limit) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
try (ClosableIterator<HoodieRecord> recordItr = blk.getRecordIterator(HoodieAvroIndexedRecord::new)) {
try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) {
while (recordItr.hasNext()) {
IndexedRecord ir = (IndexedRecord) recordItr.next().getData();
IndexedRecord ir = recordItr.next().getData();
// Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
// metadata record from the entry and convert it to json.
HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
Expand All @@ -38,6 +40,7 @@
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
Expand Down Expand Up @@ -125,7 +128,7 @@ public String showLogFileCommits(
instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
}
if (n instanceof HoodieDataBlock) {
try (ClosableIterator<HoodieRecord> recordItr = ((HoodieDataBlock) n).getRecordIterator(HoodieAvroIndexedRecord::new)) {
try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = ((HoodieDataBlock) n).getRecordIterator(HoodieRecordType.AVRO)) {
recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
}
}
Expand Down Expand Up @@ -221,11 +224,12 @@ public String showLogFileRecords(
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
for (HoodieRecord hoodieRecord : scanner) {
Option<IndexedRecord> record = hoodieRecord.toIndexedRecord(readerSchema, new Properties());
Option<HoodieAvroIndexedRecord> record = hoodieRecord.toIndexedRecord(readerSchema, new Properties());
if (allRecords.size() < limit) {
allRecords.add(record.get());
allRecords.add(record.get().getData());
}
}
} else {
Expand All @@ -239,10 +243,10 @@ public String showLogFileRecords(
HoodieLogBlock n = reader.next();
if (n instanceof HoodieDataBlock) {
HoodieDataBlock blk = (HoodieDataBlock) n;
try (ClosableIterator<HoodieRecord> recordItr = blk.getRecordIterator(HoodieAvroIndexedRecord::new)) {
try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) {
recordItr.forEachRemaining(record -> {
if (allRecords.size() < limit) {
allRecords.add((IndexedRecord) record.getData());
allRecords.add(record.getData());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.client.SparkTaskContextSupplier
import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.util.BaseFileUtils
import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, HoodieParquetConfig}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}

import scala.collection.JavaConversions._
import scala.collection.mutable._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
Expand All @@ -38,6 +39,7 @@
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
Expand Down Expand Up @@ -222,6 +224,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();

Iterator<HoodieRecord> records = scanner.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand Down Expand Up @@ -344,7 +345,7 @@ public void mergeArchiveFiles(List<FileStatus> compactCandidate) throws IOExcept
// Read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
blk.getRecordIterator(HoodieAvroIndexedRecord::new).forEachRemaining(r -> records.add((IndexedRecord) r.getData()));
blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r -> records.add((IndexedRecord) r.getData()));
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand All @@ -49,8 +50,9 @@ public FullRecordBootstrapDataProvider(TypedProperties props, HoodieEngineContex
* @param tableName Hudi Table Name
* @param sourceBasePath Source Base Path
* @param partitionPaths Partition Paths
* @param config config
* @return input records
*/
public abstract I generateInputRecords(String tableName,
String sourceBasePath, List<Pair<String, List<HoodieFileStatus>>> partitionPaths);
String sourceBasePath, List<Pair<String, List<HoodieFileStatus>>> partitionPaths, HoodieWriteConfig config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,62 +21,32 @@

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieFileReader;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

import java.io.IOException;
import java.util.Iterator;
import java.util.stream.StreamSupport;
import java.util.Properties;

/**
* Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice.
*/
public class HoodieFileSliceReader<T> implements Iterator<HoodieRecord<T>> {

private final Iterator<HoodieRecord<T>> recordsIterator;

public static HoodieFileSliceReader getFileSliceReader(
Option<HoodieAvroFileReader> baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
String preCombineField, Option<Pair<String, String>> simpleKeyGenFieldsOpt) throws IOException {
Option<HoodieFileReader> baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, Properties props, Option<Pair<String, String>> simpleKeyGenFieldsOpt) throws IOException {
if (baseFileReader.isPresent()) {
Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
Iterator<HoodieRecord> baseIterator = baseFileReader.get().getRecordIterator(schema);
while (baseIterator.hasNext()) {
GenericRecord record = (GenericRecord) baseIterator.next();
HoodieRecord hoodieRecord = transform(
record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
scanner.processNextRecord(hoodieRecord);
scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema, props,
simpleKeyGenFieldsOpt, scanner.isWithOperationField(), scanner.getPartitionName(), false));
}
return new HoodieFileSliceReader(scanner.iterator());
} else {
Iterable<HoodieRecord> iterable = () -> scanner.iterator();
HoodiePayloadConfig payloadConfig = HoodiePayloadConfig.newBuilder().withPayloadOrderingField(preCombineField).build();
return new HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false)
.map(e -> {
try {
GenericRecord record = (GenericRecord) e.toIndexedRecord(schema, payloadConfig.getProps()).get();
return transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
} catch (IOException io) {
throw new HoodieIOException("Error while creating reader for file slice with no base file.", io);
}
}).iterator());
}
}

private static HoodieRecord transform(GenericRecord record,
HoodieMergedLogRecordScanner scanner,
String payloadClass,
String preCombineField,
Option<Pair<String, String>> simpleKeyGenFieldsOpt) {
return simpleKeyGenFieldsOpt.isPresent()
? SpillableMapUtils.convertToHoodieRecordPayload(record,
payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField(), Option.empty())
: SpillableMapUtils.convertToHoodieRecordPayload(record,
payloadClass, preCombineField, scanner.isWithOperationField(), scanner.getPartitionName());
return new HoodieFileSliceReader(scanner.iterator());
}

private HoodieFileSliceReader(Iterator<HoodieRecord<T>> recordsItr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.model.HoodieAvroRecordMerge;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
Expand Down Expand Up @@ -120,12 +119,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ "compaction during each compaction run. By default. Hudi picks the log file "
+ "with most accumulated unmerged data");

public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty
.key("hoodie.compaction.merge.class")
.defaultValue(HoodieAvroRecordMerge.class.getName())
.withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord "
+ "types, such as Spark records or Flink records.");

public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE = ConfigProperty
.key("hoodie.compaction.lazy.block.read")
.defaultValue("true")
Expand Down Expand Up @@ -359,11 +352,6 @@ public Builder withCompactionStrategy(CompactionStrategy compactionStrategy) {
return this;
}

public Builder withMergeClass(String mergeClass) {
compactionConfig.setValue(MERGE_CLASS_NAME, mergeClass);
return this;
}

public Builder withTargetIOPerCompactionInMB(long targetIOPerCompactionInMB) {
compactionConfig.setValue(TARGET_IO_PER_COMPACTION_IN_MB, String.valueOf(targetIOPerCompactionInMB));
return this;
Expand Down
Loading