Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
Expand Down Expand Up @@ -191,7 +192,6 @@ private JavaRDD<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext js
LOG.info("MaxMemoryPerCompaction run as part of clustering => " + maxMemoryPerCompaction);
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieFileReader<? extends IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(table.getMetaClient().getFs())
.withBasePath(table.getMetaClient().getBasePath())
Expand All @@ -205,6 +205,9 @@ private JavaRDD<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext js
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.build();

Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
? Option.empty()
: Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath())));
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
recordIterators.add(getFileSliceReader(baseFileReader, scanner, readerSchema,
tableConfig.getPayloadClass(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Tag("functional")
class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTestHarness {
Expand Down Expand Up @@ -111,15 +113,17 @@ void testClustering(boolean doUpdates, boolean populateMetaFields, boolean prese
client.startCommitWithTime(newCommitTime);

List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");

/*
* Write 2 (more inserts to create new files)
*/
// we already set small file size to small number to force inserts to go into new file.
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
dataFiles = insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");

if (doUpdates) {
/*
Expand All @@ -144,28 +148,101 @@ void testClustering(boolean doUpdates, boolean populateMetaFields, boolean prese
assertEquals(allFiles.length, hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count());

// Do the clustering and validate
client.cluster(clusteringCommitTime, true);
doClusteringAndValidate(client, clusteringCommitTime, metaClient, cfg, dataGen);
}
}

metaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieTable clusteredTable = HoodieSparkTable.create(cfg, context(), metaClient);
clusteredTable.getHoodieView().sync();
Stream<HoodieBaseFile> dataFilesToRead = Arrays.stream(dataGen.getPartitionPaths())
.flatMap(p -> clusteredTable.getBaseFileOnlyView().getLatestBaseFiles(p));
// verify there should be only one base file per partition after clustering.
assertEquals(dataGen.getPartitionPaths().length, dataFilesToRead.count());

HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals(1, timeline.findInstantsAfter("003", Integer.MAX_VALUE).countInstants(),
"Expecting a single commit.");
assertEquals(clusteringCommitTime, timeline.lastInstant().get().getTimestamp());
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, timeline.lastInstant().get().getAction());
if (cfg.populateMetaFields()) {
assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.of("000")),
"Must contain 200 records");
} else {
assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.empty()));
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testClusteringWithNoBaseFiles(boolean doUpdates) throws Exception {
// set low compaction small File Size to generate more file groups.
HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
.withPath(basePath())
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withDeleteParallelism(2)
.withAutoCommit(true)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(10L)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.hfileMaxFileSize(1024 * 1024 * 1024)
.parquetMaxFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(true)
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
// set index type to INMEMORY so that log files can be indexed, and it is safe to send
// inserts straight to the log to produce file slices with only log files and no data files
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0)
.withInlineClustering(true)
.withInlineClusteringNumCommits(1).build())
.withRollbackUsingMarkers(false);
HoodieWriteConfig cfg = cfgBuilder.build();
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, cfg.getProps());
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();

try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
// test 2 inserts
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 400);
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records.subList(0, 200), client, cfg, newCommitTime);
assertTrue(!dataFiles.findAny().isPresent(), "should not have any base files");
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
dataFiles = insertRecords(metaClient, records.subList(200, 400), client, cfg, newCommitTime);
assertTrue(!dataFiles.findAny().isPresent(), "should not have any base files");
// run updates
if (doUpdates) {
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);
records = dataGen.generateUpdates(newCommitTime, 100);
updateRecords(metaClient, records, client, cfg, newCommitTime);
}

HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
hoodieTable.getHoodieView().sync();
FileStatus[] allBaseFiles = listAllBaseFilesInPath(hoodieTable);
// expect 0 base files for each partition
assertEquals(0, allBaseFiles.length);

String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
// verify log files are included in clustering plan for each partition.
assertEquals(dataGen.getPartitionPaths().length, hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count());

// do the clustering and validate
doClusteringAndValidate(client, clusteringCommitTime, metaClient, cfg, dataGen);
}
}

private void doClusteringAndValidate(SparkRDDWriteClient client,
String clusteringCommitTime,
HoodieTableMetaClient metaClient,
HoodieWriteConfig cfg,
HoodieTestDataGenerator dataGen) {
client.cluster(clusteringCommitTime, true);
metaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieTable clusteredTable = HoodieSparkTable.create(cfg, context(), metaClient);
clusteredTable.getHoodieView().sync();
Stream<HoodieBaseFile> dataFilesToRead = Arrays.stream(dataGen.getPartitionPaths())
.flatMap(p -> clusteredTable.getBaseFileOnlyView().getLatestBaseFiles(p));
assertEquals(dataGen.getPartitionPaths().length, dataFilesToRead.count());
HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals(1, timeline.findInstantsAfter("003", Integer.MAX_VALUE).countInstants(),
"Expecting a single commit.");
assertEquals(clusteringCommitTime, timeline.lastInstant().get().getTimestamp());
assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, timeline.lastInstant().get().getAction());
if (cfg.populateMetaFields()) {
assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.of("000")),
"Must contain 200 records");
} else {
assertEquals(400, HoodieClientTestUtils.countRecordsOptionallySince(jsc(), basePath(), sqlContext(), timeline, Option.empty()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
Expand Down Expand Up @@ -57,6 +58,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -94,7 +96,8 @@ public void testIncrementalReadsWithCompaction() throws Exception {
client.startCommitWithTime(commitTime1);

List<HoodieRecord> records001 = dataGen.generateInserts(commitTime1, 200);
insertRecords(metaClient, records001, client, cfg, commitTime1);
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records001, client, cfg, commitTime1);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");

// verify only one base file shows up with commit time 001
FileStatus[] snapshotROFiles = getROSnapshotFiles(partitionPath);
Expand Down Expand Up @@ -142,7 +145,8 @@ public void testIncrementalReadsWithCompaction() throws Exception {
String insertsTime = "006";
List<HoodieRecord> records006 = dataGen.generateInserts(insertsTime, 200);
client.startCommitWithTime(insertsTime);
insertRecords(metaClient, records006, client, cfg, insertsTime);
dataFiles = insertRecords(metaClient, records006, client, cfg, insertsTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");

// verify new write shows up in snapshot mode even though there is pending compaction
snapshotROFiles = getROSnapshotFiles(partitionPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public void testSimpleInsertAndUpdate(HoodieFileFormat fileFormat, boolean popul
client.startCommitWithTime(newCommitTime);

List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
insertRecords(metaClient, records, client, cfg, newCommitTime);
Stream<HoodieBaseFile> dataFiles = insertRecords(metaClient, records, client, cfg, newCommitTime);
assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");

/*
* Write 2 (updates)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ protected JavaRDD<WriteStatus> updateLocation(
index.updateLocation(HoodieJavaRDD.of(writeStatus), context, table));
}

protected void insertRecords(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
protected Stream<HoodieBaseFile> insertRecords(HoodieTableMetaClient metaClient, List<HoodieRecord> records,
SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient);

JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
Expand All @@ -228,8 +229,7 @@ protected void insertRecords(HoodieTableMetaClient metaClient, List<HoodieRecord

roView = getHoodieTableFileSystemView(reloadedMetaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(),
"should list the base files we wrote in the delta commit");
return dataFilesToRead;
}

protected void updateRecords(HoodieTableMetaClient metaClient, List<HoodieRecord> records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,53 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieFileReader;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;
import java.util.Iterator;
import java.util.stream.StreamSupport;

/**
* Reads records from base file and merges any updates from log files and provides iterable over all records in the file slice.
*/
public class HoodieFileSliceReader<T extends HoodieRecordPayload> implements Iterator<HoodieRecord<T>> {
private Iterator<HoodieRecord<T>> recordsIterator;
private final Iterator<HoodieRecord<T>> recordsIterator;

public static <R extends IndexedRecord, T> HoodieFileSliceReader getFileSliceReader(
HoodieFileReader<R> baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
String preCombineField, Option<Pair<String,String>> simpleKeyGenFieldsOpt) throws IOException {
Iterator<R> baseIterator = baseFileReader.getRecordIterator(schema);
while (baseIterator.hasNext()) {
GenericRecord record = (GenericRecord) baseIterator.next();
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = simpleKeyGenFieldsOpt.isPresent()
? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField())
: SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, scanner.isWithOperationField());
scanner.processNextRecord(hoodieRecord);
public static HoodieFileSliceReader getFileSliceReader(
Option<HoodieFileReader> baseFileReader, HoodieMergedLogRecordScanner scanner, Schema schema, String payloadClass,
String preCombineField, Option<Pair<String, String>> simpleKeyGenFieldsOpt) throws IOException {
if (baseFileReader.isPresent()) {
Iterator baseIterator = baseFileReader.get().getRecordIterator(schema);
while (baseIterator.hasNext()) {
GenericRecord record = (GenericRecord) baseIterator.next();
HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
scanner.processNextRecord(hoodieRecord);
}
return new HoodieFileSliceReader(scanner.iterator());
} else {
Iterable<HoodieRecord<? extends HoodieRecordPayload>> iterable = () -> scanner.iterator();
return new HoodieFileSliceReader(StreamSupport.stream(iterable.spliterator(), false)
.map(e -> {
try {
GenericRecord record = (GenericRecord) e.getData().getInsertValue(schema).get();
return transform(record, scanner, payloadClass, preCombineField, simpleKeyGenFieldsOpt);
} catch (IOException io) {
throw new HoodieIOException("Error while creating reader for file slice with no base file.", io);
}
}).iterator());
}
return new HoodieFileSliceReader(scanner.iterator());
}

private static HoodieRecord<? extends HoodieRecordPayload> transform(
GenericRecord record, HoodieMergedLogRecordScanner scanner, String payloadClass,
String preCombineField, Option<Pair<String, String>> simpleKeyGenFieldsOpt) {
return simpleKeyGenFieldsOpt.isPresent()
? SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField())
: SpillableMapUtils.convertToHoodieRecordPayload(record, payloadClass, preCombineField, scanner.isWithOperationField());
}

private HoodieFileSliceReader(Iterator<HoodieRecord<T>> recordsItr) {
Expand Down