Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6231606
Replicated `HoodieRecordPayload` methods into `HoodieRecord` (tempora…
Feb 4, 2022
7585733
Separated out decl and impl of the APIs b/w `HoodieRecord` and `Hoodi…
Feb 4, 2022
3d20b4e
Added new APIs to `HoodieRecord` to make it drop-in replacement for `…
Feb 5, 2022
2555085
`HoodieFileWriter` > `HoodieAvroFileWriter`
Feb 5, 2022
01a6382
Removed generics from `HoodieAvroFileWriter`
Feb 5, 2022
31a3548
Added generic `HoodieFileWriter` to operate on `HoodieRecord`;
Feb 5, 2022
43bda22
Rebased `HoodieAvroFileWriter` to inherit from `HoodieRecordFileWriter`
Feb 5, 2022
8e338e1
Missing `SkipHoodieRecordPayload` impl
Feb 7, 2022
5af245f
Missing license
Feb 7, 2022
2e15d44
`lint`
Feb 7, 2022
536f18d
Fixing compilation
Feb 7, 2022
4cdaeb5
`HoodieFileReader` > `HoodieAvroFileReader`
Feb 8, 2022
0105f4e
Removed generic arguments for `HudiAvroFileReader`;
Feb 8, 2022
34ca461
Cleaned up `MergeHelper`s to rely on `HoodieAvroUtils.rewriteRecord`
Feb 8, 2022
65d9360
Added `HoodieFileReader` interface;
Feb 8, 2022
e08ebbd
Rebased `HoodieFileWriterFactory` to return `HoodieFileWriter`
Feb 5, 2022
c289410
Rebased `HoodieWriteHandle` to operate on `HoodieRecord` instead of Avro
Feb 5, 2022
09a70f6
Rebased `HoodieCreateHandle` onto using `HoodieRecord` instead of Avr…
Feb 5, 2022
79dde16
Rebased `HoodieDataBlock`s impls to operate on `HoodiRecord` instead …
Feb 7, 2022
2b36199
Make `HoodieParquetStreamWriter` implement `HoodieAvroFileWriter`
Feb 7, 2022
ad5133a
Fixed `AbstractHoodieLogRecordReader`
Feb 7, 2022
91992b3
Rebased `HoodieAppendHandle` to operate on `HoodieRecord`
Feb 7, 2022
2a056b7
Moved `HoodieAvroFileWriter` to "hudi-common"
Feb 7, 2022
71ecb31
Moved Writer's interfaces under "storage.io" package
Feb 8, 2022
a3bcff0
Moved `HoodieRecordMapper` into `HoodieRecord`
Feb 8, 2022
00f9241
Rebased `MergeHelper`s to operate on `HoodieRecord`
Feb 8, 2022
d92d55f
Tidying up
Feb 8, 2022
1cdae64
Rebased Clustering Excecution Strategies to operate on `HoodieRecord`;
Feb 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@
package org.apache.hudi.cli

import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.client.SparkTaskContextSupplier
import org.apache.hudi.common.HoodieJsonPayload
import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.util.BaseFileUtils
import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieParquetWriter}
import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieAvroParquetWriter}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}
Expand All @@ -50,13 +48,13 @@ object SparkHelpers {
// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)

val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier(),
val writer = new HoodieAvroParquetWriter(instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier(),
true)
for (rec <- sourceRecords) {
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
if (!keysToSkip.contains(key)) {

writer.writeAvro(key, rec)
writer.write(key, rec)
}
}
writer.close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@

package org.apache.hudi.client.utils;

import java.util.Iterator;
import java.util.function.Function;
import org.apache.avro.generic.GenericRecord;

import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;

public class MergingIterator<T extends GenericRecord> implements Iterator<T> {
import java.util.Iterator;
import java.util.function.BiFunction;

public class MergingIterator<T> implements Iterator<T> {

private final Iterator<T> leftIterator;
private final Iterator<T> rightIterator;
private final Function<Pair<T,T>, T> mergeFunction;
private final BiFunction<T, T, T> mergeFunction;

public MergingIterator(Iterator<T> leftIterator, Iterator<T> rightIterator, Function<Pair<T,T>, T> mergeFunction) {
public MergingIterator(Iterator<T> leftIterator, Iterator<T> rightIterator, BiFunction<T, T, T> mergeFunction) {
this.leftIterator = leftIterator;
this.rightIterator = rightIterator;
this.mergeFunction = mergeFunction;
Expand All @@ -47,6 +45,6 @@ public boolean hasNext() {

@Override
public T next() {
return mergeFunction.apply(Pair.of(leftIterator.next(), rightIterator.next()));
return mergeFunction.apply(leftIterator.next(), rightIterator.next());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.execution;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand Down Expand Up @@ -69,8 +70,8 @@ public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime,

@Override
public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
final HoodieRecord insertPayload = payload.record;
String partitionPath = insertPayload.getPartitionPath();
final HoodieRecord record = payload.record;
String partitionPath = record.getPartitionPath();
HoodieWriteHandle<?,?,?,?> handle = handles.get(partitionPath);
if (handle == null) {
// If the records are sorted, this means that we encounter a new partition path
Expand All @@ -81,7 +82,7 @@ public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
}
// Lazily initialize the handle, for the first time
handle = writeHandleFactory.create(config, instantTime, hoodieTable,
insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
record.getPartitionPath(), idPrefix, taskContextSupplier);
handles.put(partitionPath, handle);
}

Expand All @@ -90,10 +91,10 @@ public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
statuses.addAll(handle.close());
// Open new handle
handle = writeHandleFactory.create(config, instantTime, hoodieTable,
insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
record.getPartitionPath(), idPrefix, taskContextSupplier);
handles.put(partitionPath, handle);
}
handle.write(insertPayload, payload.insertValue, payload.exception);
handle.write(record, payload.schema, new TypedProperties(payload.props));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,18 @@

package org.apache.hudi.execution;

import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;

import java.util.Iterator;
import java.util.List;
import java.util.Properties;
Expand Down Expand Up @@ -79,18 +76,14 @@ public HoodieLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr, boolean are

// Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread.
public static class HoodieInsertValueGenResult<T extends HoodieRecord> {
public T record;
public Option<IndexedRecord> insertValue;
// It caches the exception seen while fetching insert value.
public Option<Exception> exception = Option.empty();
public final T record;
public final Schema schema;
public final Properties props;

public HoodieInsertValueGenResult(T record, Schema schema, Properties properties) {
this.record = record;
try {
this.insertValue = ((HoodieRecordPayload) record.getData()).getInsertValue(schema, properties);
} catch (Exception e) {
this.exception = Option.of(e);
}
this.schema = schema;
this.props = properties;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
Expand Down Expand Up @@ -132,7 +132,7 @@ public static List<String> filterKeysFromFile(Path filePath, List<String> candid
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = new HoodieTimer().startTimer();
HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath);
HoodieAvroFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath);
Set<String> fileRowKeys = fileReader.filterRowKeys(new TreeSet<>(candidateRecordKeys));
foundRecordKeys.addAll(fileRowKeys);
LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
Expand Down
Loading