Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@
package org.apache.hudi.cli

import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.client.SparkTaskContextSupplier
import org.apache.hudi.common.HoodieJsonPayload
import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.util.BaseFileUtils
import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieParquetWriter}
import org.apache.hudi.io.storage.{HoodieAvroParquetConfig, HoodieAvroParquetWriter}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}
Expand All @@ -50,13 +48,13 @@ object SparkHelpers {
// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)

val writer = new HoodieParquetWriter[HoodieJsonPayload, IndexedRecord](instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier(),
val writer = new HoodieAvroParquetWriter(instantTime, destinationFile, parquetConfig, schema, new SparkTaskContextSupplier(),
true)
for (rec <- sourceRecords) {
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
if (!keysToSkip.contains(key)) {

writer.writeAvro(key, rec)
writer.write(key, rec)
}
}
writer.close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

package org.apache.hudi.client.utils;

import java.util.Iterator;
import java.util.function.Function;
import org.apache.avro.generic.GenericRecord;

import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;

public class MergingIterator<T extends GenericRecord> implements Iterator<T> {
import java.util.Iterator;
import java.util.function.Function;

public class MergingIterator<T extends IndexedRecord> implements Iterator<T> {

private final Iterator<T> leftIterator;
private final Iterator<T> rightIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
Expand Down Expand Up @@ -132,7 +132,7 @@ public static List<String> filterKeysFromFile(Path filePath, List<String> candid
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = new HoodieTimer().startTimer();
HoodieFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath);
HoodieAvroFileReader fileReader = HoodieFileReaderFactory.getFileReader(configuration, filePath);
Set<String> fileRowKeys = fileReader.filterRowKeys(new TreeSet<>(candidateRecordKeys));
foundRecordKeys.addAll(fileRowKeys);
LOG.info(String.format("Checked keys against file %s, in %d ms. #candidates (%d) #found (%d)", filePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.io;

import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
Expand All @@ -29,8 +30,6 @@
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -93,7 +92,7 @@ public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTa
public void write(GenericRecord oldRecord) {
String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
try {
fileWriter.writeAvro(key, oldRecord);
fileWriter.write(key, oldRecord);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.hudi.io;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -32,14 +36,9 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieAvroFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -53,7 +52,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends

private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class);

protected final HoodieFileWriter<IndexedRecord> fileWriter;
protected final HoodieAvroFileWriter fileWriter;
protected final Path path;
protected long recordsWritten = 0;
protected long insertRecordsWritten = 0;
Expand Down Expand Up @@ -142,9 +141,9 @@ public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
if (preserveHoodieMetadata) {
// do not preserve FILENAME_METADATA_FIELD
recordWithMetadataInSchema.put(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD), path.getName());
fileWriter.writeAvro(record.getRecordKey(), recordWithMetadataInSchema);
fileWriter.write(record.getRecordKey(), recordWithMetadataInSchema);
} else {
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
fileWriter.writeWithMetadata(recordWithMetadataInSchema, record);
}
// update the new location of record, so we know where to find it next
record.unseal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.io;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
Expand All @@ -31,10 +32,8 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -93,7 +92,7 @@ private BloomFilter getBloomFilter() {
new HoodieDynamicBoundedBloomFilter(StandardCharsets.UTF_8.decode(bloomFilterByteBuffer.get()).toString(),
BloomFilterTypeCode.DYNAMIC_V0);
} else {
try (HoodieFileReader reader = createNewFileReader()) {
try (HoodieAvroFileReader reader = createNewFileReader()) {
bloomFilter = reader.readBloomFilter();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.hudi.io;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -39,17 +43,12 @@
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieAvroFileWriter;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -96,7 +95,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H

protected Map<String, HoodieRecord<T>> keyToNewRecords;
protected Set<String> writtenRecordKeys;
protected HoodieFileWriter<IndexedRecord> fileWriter;
protected HoodieAvroFileWriter fileWriter;
private boolean preserveMetadata = false;

protected Path newFilePath;
Expand Down Expand Up @@ -294,9 +293,9 @@ protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get());
if (preserveMetadata) {
fileWriter.writeAvro(hoodieRecord.getRecordKey(), recordWithMetadataInSchema);
fileWriter.write(hoodieRecord.getRecordKey(), recordWithMetadataInSchema);
} else {
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
fileWriter.writeWithMetadata(recordWithMetadataInSchema, hoodieRecord);
}
recordsWritten++;
} else {
Expand Down Expand Up @@ -354,7 +353,7 @@ public void write(GenericRecord oldRecord) {
if (copyOldRecord) {
// this should work as it is, since this is an existing record
try {
fileWriter.writeAvro(key, oldRecord);
fileWriter.write(key, oldRecord);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
Expand Down Expand Up @@ -426,7 +425,7 @@ public void performMergeDataValidationCheck(WriteStatus writeStatus) {

long oldNumWrites = 0;
try {
HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
HoodieAvroFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
oldNumWrites = reader.getTotalRecords();
} catch (IOException e) {
throw new HoodieUpsertException("Failed to check for merge data validation", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.table.HoodieTable;

import java.io.IOException;
Expand All @@ -37,7 +37,7 @@ public HoodieRangeInfoHandle(HoodieWriteConfig config, HoodieTable<T, I, K, O> h
}

public String[] getMinMaxKeys() throws IOException {
try (HoodieFileReader reader = createNewFileReader()) {
try (HoodieAvroFileReader reader = createNewFileReader()) {
return reader.readMinMaxRecordKeys();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;

Expand Down Expand Up @@ -62,7 +62,7 @@ protected HoodieBaseFile getLatestDataFile() {
.getLatestBaseFile(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight()).get();
}

protected HoodieFileReader createNewFileReader() throws IOException {
protected HoodieAvroFileReader createNewFileReader() throws IOException {
return HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(),
new Path(getLatestDataFile().getPath()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.hudi.io;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
Expand All @@ -30,16 +35,10 @@
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieAvroFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -256,8 +255,8 @@ protected long getAttemptId() {
return taskContextSupplier.getAttemptIdSupplier().get();
}

protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, HoodieTable<T, I, K, O> hoodieTable,
HoodieWriteConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException {
protected HoodieAvroFileWriter createNewFileWriter(String instantTime, Path path, HoodieTable<T, I, K, O> hoodieTable,
HoodieWriteConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException {
return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, taskContextSupplier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,34 @@

package org.apache.hudi.io.storage;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.io.storage.HoodieRecordFileWriter;
import org.apache.hudi.common.model.HoodieRecord;

import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;

public interface HoodieFileWriter<R extends IndexedRecord> {

void writeAvroWithMetadata(R newRecord, HoodieRecord record) throws IOException;

boolean canWrite();
public interface HoodieAvroFileWriter extends HoodieRecordFileWriter<IndexedRecord> {

void close() throws IOException;
long getBytesWritten();

void writeAvro(String key, R oldRecord) throws IOException;
// TODO rename
@Override
default void writeWithMetadata(HoodieRecord record, Schema schema, Properties props) throws IOException {
record.writeWithMetadata(this, schema, props);
}

long getBytesWritten();
// TODO rename
@Override
default void write(HoodieRecord record, Schema schema, Properties props) throws IOException {
record.write(this, schema, props);
}

default void prepRecordWithMetadata(R avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
default void prepRecordWithMetadata(IndexedRecord avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement());
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
Expand Down
Loading