Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_3
displayName: UT spark-datasource
timeoutInMinutes: '150'
timeoutInMinutes: '180'
steps:
- task: Maven@4
displayName: maven install
Expand All @@ -179,7 +179,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_4
displayName: UT FT other modules
timeoutInMinutes: '150'
timeoutInMinutes: '180'
steps:
- task: Maven@4
displayName: maven install
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
Expand Down Expand Up @@ -118,7 +120,7 @@ public String showArchivedCommits(
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
blk.getRecordIterator().forEachRemaining(readRecords::add);
blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r -> readRecords.add((IndexedRecord) r.getData()));
}
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r)
.filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
Expand Down Expand Up @@ -192,8 +194,8 @@ public String showCommits(
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
recordItr.forEachRemaining(readRecords::add);
try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) {
recordItr.forEachRemaining(r -> readRecords.add(r.getData()));
}
}
List<Comparable[]> readCommits = readRecords.stream().map(r -> (GenericRecord) r)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
Expand Down Expand Up @@ -121,9 +123,9 @@ private int copyArchivedInstants(List<FileStatus> statuses, Set<String> actionSe
// read the avro blocks
while (reader.hasNext() && copyCount < limit) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) {
while (recordItr.hasNext()) {
IndexedRecord ir = recordItr.next();
IndexedRecord ir = recordItr.next().getData();
// Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
// metadata record from the entry and convert it to json.
HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
Expand All @@ -44,6 +46,7 @@
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
Expand All @@ -61,6 +64,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -121,7 +125,7 @@ public String showLogFileCommits(
instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
}
if (n instanceof HoodieDataBlock) {
try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) n).getRecordIterator()) {
try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = ((HoodieDataBlock) n).getRecordIterator(HoodieRecordType.AVRO)) {
recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
}
}
Expand Down Expand Up @@ -217,12 +221,13 @@ public String showLogFileRecords(
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue()))
.build();
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);
for (HoodieRecord hoodieRecord : scanner) {
Option<HoodieAvroIndexedRecord> record = hoodieRecord.toIndexedRecord(readerSchema, new Properties());
if (allRecords.size() < limit) {
allRecords.add(record.get());
allRecords.add(record.get().getData());
}
}
} else {
Expand All @@ -236,10 +241,10 @@ public String showLogFileRecords(
HoodieLogBlock n = reader.next();
if (n instanceof HoodieDataBlock) {
HoodieDataBlock blk = (HoodieDataBlock) n;
try (ClosableIterator<IndexedRecord> recordItr = blk.getRecordIterator()) {
try (ClosableIterator<HoodieRecord<IndexedRecord>> recordItr = blk.getRecordIterator(HoodieRecordType.AVRO)) {
recordItr.forEachRemaining(record -> {
if (allRecords.size() < limit) {
allRecords.add(record);
allRecords.add(record.getData());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.apache.hudi.cli.HoodieCliSparkConfig;
import org.apache.hudi.cli.commands.SparkEnvCommand;
import org.apache.hudi.cli.commands.SparkMain;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;

import org.apache.spark.HoodieSparkKryoProvider$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.launcher.SparkLauncher;
Expand Down Expand Up @@ -116,7 +116,7 @@ public static JavaSparkContext initJavaSparkContext(String name, Option<String>
}

public static JavaSparkContext initJavaSparkContext(SparkConf sparkConf) {
SparkRDDWriteClient.registerClasses(sparkConf);
HoodieSparkKryoProvider$.MODULE$.register(sparkConf);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false);
FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,19 @@
package org.apache.hudi.cli

import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroWriteSupport
import org.apache.hudi.client.SparkTaskContextSupplier
import org.apache.hudi.common.bloom.{BloomFilter, BloomFilterFactory}
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.util.BaseFileUtils
import org.apache.hudi.config.{HoodieIndexConfig, HoodieStorageConfig}
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, HoodieParquetConfig}
import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.{DataFrame, SQLContext}

import scala.collection.JavaConversions._
import scala.collection.mutable._

Expand All @@ -58,7 +57,7 @@ object SparkHelpers {
// Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'.
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)

val writer = new HoodieAvroParquetWriter[IndexedRecord](destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true)
val writer = new HoodieAvroParquetWriter(destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true)
for (rec <- sourceRecords) {
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
if (!keysToSkip.contains(key)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@
import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
Expand Down Expand Up @@ -111,7 +114,7 @@ public void init() throws IOException, InterruptedException, URISyntaxException
.withFileId("test-log-fileid1").overBaseCommit("100").withFs(fs).build()) {

// write data to file
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
List<HoodieRecord> records = SchemaTestUtil.generateTestRecords(0, 100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
Expand Down Expand Up @@ -191,7 +194,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withFileId("test-log-fileid1").overBaseCommit(INSTANT_TIME).withFs(fs).withSizeThreshold(500).build();

List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
List<HoodieRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
Expand Down Expand Up @@ -228,15 +231,16 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue()))
.build();

Iterator<HoodieRecord<? extends HoodieRecordPayload>> records = scanner.iterator();
Iterator<HoodieRecord> records = scanner.iterator();
int num = 0;
int maxSize = 10;
List<IndexedRecord> indexRecords = new ArrayList<>();
while (records.hasNext() && num < maxSize) {
Option<IndexedRecord> hoodieRecord = records.next().getData().getInsertValue(schema);
Option<IndexedRecord> hoodieRecord = ((HoodieAvroRecord)records.next()).getData().getInsertValue(schema);
indexRecords.add(hoodieRecord.get());
num++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
package org.apache.hudi.cli.functional;

import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hudi.timeline.service.TimelineService;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.HoodieSparkKryoProvider$;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
Expand Down Expand Up @@ -100,7 +100,7 @@ public synchronized void runBeforeEach() {
initialized = spark != null;
if (!initialized) {
SparkConf sparkConf = conf();
SparkRDDWriteClient.registerClasses(sparkConf);
HoodieSparkKryoProvider$.MODULE$.register(sparkConf);
SparkRDDReadClient.addHoodieSupport(sparkConf);
spark = SparkSession.builder().config(sparkConf).getOrCreate();
sqlContext = spark.sqlContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.hudi.client;

import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;

import java.io.IOException;
Expand All @@ -28,7 +27,7 @@
/**
* Client will run one round of clustering.
*/
public abstract class BaseClusterer<T extends HoodieRecordPayload, I, K, O> implements Serializable {
public abstract class BaseClusterer<T, I, K, O> implements Serializable {

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.client;

import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;

import java.io.IOException;
Expand All @@ -27,7 +26,7 @@
/**
* Run one round of compaction.
*/
public abstract class BaseCompactor<T extends HoodieRecordPayload, I, K, O> implements Serializable {
public abstract class BaseCompactor<T, I, K, O> implements Serializable {

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
Expand Down Expand Up @@ -123,12 +122,12 @@
* Abstract Write Client providing functionality for performing commit, index updates and rollback
* Reused for regular write operations like upsert/insert/bulk-insert.. as well as bootstrap
*
* @param <T> Sub type of HoodieRecordPayload
* @param <T> Type of data
* @param <I> Type of inputs
* @param <K> Type of keys
* @param <O> Type of outputs
*/
public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, O> extends BaseHoodieClient
public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
implements RunsTableService {

protected static final String LOOKUP_STR = "lookup";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
Expand Down Expand Up @@ -341,7 +344,7 @@ public void mergeArchiveFiles(List<FileStatus> compactCandidate) throws IOExcept
// Read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
blk.getRecordIterator().forEachRemaining(records::add);
blk.getRecordIterator(HoodieRecordType.AVRO).forEachRemaining(r -> records.add((IndexedRecord) r.getData()));
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}
Expand Down Expand Up @@ -668,7 +671,8 @@ private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) thro
Map<HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
final String keyField = table.getMetaClient().getTableConfig().getRecordKeyFieldProp();
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header, keyField);
List<HoodieRecord> indexRecords = records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, header, keyField);
writer.appendBlock(block);
records.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand All @@ -49,8 +50,9 @@ public FullRecordBootstrapDataProvider(TypedProperties props, HoodieEngineContex
* @param tableName Hudi Table Name
* @param sourceBasePath Source Base Path
* @param partitionPaths Partition Paths
* @param config config
* @return input records
*/
public abstract I generateInputRecords(String tableName,
String sourceBasePath, List<Pair<String, List<HoodieFileStatus>>> partitionPaths);
String sourceBasePath, List<Pair<String, List<HoodieFileStatus>>> partitionPaths, HoodieWriteConfig config);
}
Loading