Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
Expand Down Expand Up @@ -81,7 +83,7 @@ public String showArchivedCommits(
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
blk.getRecordIterator().forEachRemaining(readRecords::add);
blk.getRecordIterator(HoodieAvroIndexedRecord::new).forEachRemaining(r -> readRecords.add((IndexedRecord) r.getData()));
}
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r)
.filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
Expand Down Expand Up @@ -155,8 +157,8 @@ public String showCommits(
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
recordItr.forEachRemaining(readRecords::add);
try (ClosableIterator<HoodieRecord> recordItr = blk.getRecordIterator(HoodieAvroIndexedRecord::new)) {
recordItr.forEachRemaining(r -> readRecords.add((IndexedRecord) r.getData()));
}
}
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
Expand Down Expand Up @@ -124,9 +126,9 @@ private int copyArchivedInstants(List<FileStatus> statuses, Set<String> actionSe
// read the avro blocks
while (reader.hasNext() && copyCount < limit) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
try (ClosableIterator<HoodieRecord> recordItr = blk.getRecordIterator(HoodieAvroIndexedRecord::new)) {
while (recordItr.hasNext()) {
IndexedRecord ir = recordItr.next();
IndexedRecord ir = (IndexedRecord) recordItr.next().getData();
// Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
// metadata record from the entry and convert it to json.
HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get()
Expand Down Expand Up @@ -157,7 +159,7 @@ private int copyArchivedInstants(List<FileStatus> statuses, Set<String> actionSe
default:
throw new HoodieException("Unknown type of action " + action);
}

final String instantTime = archiveEntryRecord.get("commitTime").toString();
final String outPath = localFolder + Path.SEPARATOR + instantTime + "." + action;
writeToFile(outPath, HoodieAvroUtils.avroToJson(metadata, true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
Expand Down Expand Up @@ -61,6 +61,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -124,7 +125,7 @@ public String showLogFileCommits(
instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
}
if (n instanceof HoodieDataBlock) {
try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) n).getRecordIterator()) {
try (ClosableIterator<HoodieRecord> recordItr = ((HoodieDataBlock) n).getRecordIterator(HoodieAvroIndexedRecord::new)) {
recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
}
}
Expand Down Expand Up @@ -221,8 +222,8 @@ public String showLogFileRecords(
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.build();
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);
for (HoodieRecord hoodieRecord : scanner) {
Option<IndexedRecord> record = hoodieRecord.toIndexedRecord(readerSchema, new Properties());
if (allRecords.size() < limit) {
allRecords.add(record.get());
}
Expand All @@ -238,10 +239,10 @@ public String showLogFileRecords(
HoodieLogBlock n = reader.next();
if (n instanceof HoodieDataBlock) {
HoodieDataBlock blk = (HoodieDataBlock) n;
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
try (ClosableIterator<HoodieRecord> recordItr = blk.getRecordIterator(HoodieAvroIndexedRecord::new)) {
recordItr.forEachRemaining(record -> {
if (allRecords.size() < limit) {
allRecords.add(record);
allRecords.add((IndexedRecord) record.getData());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
package org.apache.hudi.cli

import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.client.SparkTaskContextSupplier
import org.apache.hudi.common.HoodieJsonPayload
import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.util.BaseFileUtils
Expand All @@ -50,7 +48,7 @@ object SparkHelpers {
// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)

val writer = new HoodieAvroParquetWriter[IndexedRecord](destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true)
val writer = new HoodieAvroParquetWriter(destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true)
for (rec <- sourceRecords) {
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
if (!keysToSkip.contains(key)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
Expand Down Expand Up @@ -104,7 +105,7 @@ public void init() throws IOException, InterruptedException, URISyntaxException
.withFileId("test-log-fileid1").overBaseCommit("100").withFs(fs).build()) {

// write data to file
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
List<HoodieRecord> records = SchemaTestUtil.generateTestRecords(0, 100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
Expand Down Expand Up @@ -184,7 +185,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-log-fileid1").overBaseCommit(INSTANT_TIME).withFs(fs).withSizeThreshold(500).build();

List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<HoodieRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
Expand Down Expand Up @@ -223,12 +224,12 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.build();

Iterator<HoodieRecord<? extends HoodieRecordPayload>> records = scanner.iterator();
Iterator<HoodieRecord> records = scanner.iterator();
int num = 0;
int maxSize = 10;
List<IndexedRecord> indexRecords = new ArrayList<>();
while (records.hasNext() && num < maxSize) {
Option<IndexedRecord> hoodieRecord = records.next().getData().getInsertValue(schema);
Option<IndexedRecord> hoodieRecord = ((HoodieAvroRecord)records.next()).getData().getInsertValue(schema);
indexRecords.add(hoodieRecord.get());
num++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.hudi.client;

import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;

import java.io.IOException;
Expand All @@ -28,7 +27,7 @@
/**
* Client will run one round of clustering.
*/
public abstract class BaseClusterer<T extends HoodieRecordPayload, I, K, O> implements Serializable {
public abstract class BaseClusterer<T, I, K, O> implements Serializable {

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.client;

import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;

import java.io.IOException;
Expand All @@ -27,7 +26,7 @@
/**
* Run one round of compaction.
*/
public abstract class BaseCompactor<T extends HoodieRecordPayload, I, K, O> implements Serializable {
public abstract class BaseCompactor<T, I, K, O> implements Serializable {

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
Expand Down Expand Up @@ -124,7 +123,7 @@
* @param <K> Type of keys
* @param <O> Type of outputs
*/
public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, O> extends BaseHoodieClient
public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
implements RunsTableService {

protected static final String LOOKUP_STR = "lookup";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
Expand Down Expand Up @@ -339,7 +341,7 @@ public void mergeArchiveFiles(List<FileStatus> compactCandidate) throws IOExcept
// Read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
blk.getRecordIterator().forEachRemaining(records::add);
blk.getRecordIterator(HoodieAvroIndexedRecord::new).forEachRemaining(r -> records.add((IndexedRecord) r.getData()));
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}
Expand Down Expand Up @@ -646,7 +648,8 @@ private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) thro
Map<HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
final String keyField = table.getMetaClient().getTableConfig().getRecordKeyFieldProp();
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header, keyField);
List<HoodieRecord> indexRecords = records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, header, keyField);
writer.appendBlock(block);
records.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@

package org.apache.hudi.client.utils;

import java.util.Iterator;
import java.util.function.Function;
import org.apache.avro.generic.GenericRecord;

import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;

public class MergingIterator<T extends GenericRecord> implements Iterator<T> {
import java.util.Iterator;
import java.util.function.BiFunction;

public class MergingIterator<T> implements Iterator<T> {

private final Iterator<T> leftIterator;
private final Iterator<T> rightIterator;
private final Function<Pair<T,T>, T> mergeFunction;
private final BiFunction<T, T, T> mergeFunction;

public MergingIterator(Iterator<T> leftIterator, Iterator<T> rightIterator, Function<Pair<T,T>, T> mergeFunction) {
public MergingIterator(Iterator<T> leftIterator, Iterator<T> rightIterator, BiFunction<T, T, T> mergeFunction) {
this.leftIterator = leftIterator;
this.rightIterator = rightIterator;
this.mergeFunction = mergeFunction;
Expand All @@ -47,6 +45,6 @@ public boolean hasNext() {

@Override
public T next() {
return mergeFunction.apply(Pair.of(leftIterator.next(), rightIterator.next()));
return mergeFunction.apply(leftIterator.next(), rightIterator.next());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@
package org.apache.hudi.common.table.log;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieAvroFileReader;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -38,28 +37,28 @@
/**
* Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice.
*/
public class HoodieFileSliceReader<T extends HoodieRecordPayload> implements Iterator<HoodieRecord<T>> {
public class HoodieFileSliceReader<T> implements Iterator<HoodieRecord<T>> {
private final Iterator<HoodieRecord<T>> recordsIterator;

public static HoodieFileSliceReader getFileSliceReader(
Option<HoodieFileReader> baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
Option<HoodieAvroFileReader> baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
String preCombineField, Option<Pair<String, String>> simpleKeyGenFieldsOpt) throws IOException {
if (baseFileReader.isPresent()) {
Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
while (baseIterator.hasNext()) {
GenericRecord record = (GenericRecord) baseIterator.next();
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = transform(
HoodieRecord hoodieRecord = transform(
record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
scanner.processNextRecord(hoodieRecord);
}
return new HoodieFileSliceReader(scanner.iterator());
} else {
Iterable<HoodieRecord<? extends HoodieRecordPayload>> iterable = () -> scanner.iterator();
Iterable<HoodieRecord> iterable = () -> scanner.iterator();
HoodiePayloadConfig payloadConfig = HoodiePayloadConfig.newBuilder().withPayloadOrderingField(preCombineField).build();
return new HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false)
.map(e -> {
try {
GenericRecord record = (GenericRecord) e.getData().getInsertValue(schema, payloadConfig.getProps()).get();
GenericRecord record = (GenericRecord) e.toIndexedRecord(schema, payloadConfig.getProps()).get();
return transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
} catch (IOException io) {
throw new HoodieIOException("Error while creating reader for file slice with no base file.", io);
Expand All @@ -68,9 +67,11 @@ public static HoodieFileSliceReader getFileSliceReader(
}
}

private static HoodieRecord<? extends HoodieRecordPayload> transform(
GenericRecord record, HoodieMergedLogRecordScanner scanner, String payloadClass,
String preCombineField, Option<Pair<String, String>> simpleKeyGenFieldsOpt) {
private static HoodieRecord transform(GenericRecord record,
HoodieMergedLogRecordScanner scanner,
String payloadClass,
String preCombineField,
Option<Pair<String, String>> simpleKeyGenFieldsOpt) {
return simpleKeyGenFieldsOpt.isPresent()
? SpillableMapUtils.convertToHoodieRecordPayload(record,
payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField(), Option.empty())
Expand Down
Loading