Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

import scala.Tuple2;

import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat;
import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStats;
import static org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -104,7 +104,7 @@ private static List<HoodieInstant> setupHoodieInstants() {
}

private static List<HoodieWriteStat> generateCommitStatWith(int totalRecordsWritten, int totalBytesWritten) {
List<HoodieWriteStat> writeStatsList = generateFakeHoodieWriteStat(5);
List<HoodieWriteStat> writeStatsList = generateFakeHoodieWriteStats(5);
// clear all record and byte stats except for last entry.
for (int i = 0; i < writeStatsList.size() - 1; i++) {
HoodieWriteStat writeStat = writeStatsList.get(i);
Expand Down Expand Up @@ -172,7 +172,7 @@ public void testAverageBytesPerRecordForEmptyCommitTimeLine() throws Exception {

@Test
public void testUpsertPartitioner() throws Exception {
final String testPartitionPath = "2016/09/26";
final String testPartitionPath = "1/09/26";
// Inserts + Updates... Check all updates go together & inserts subsplit
UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024, testPartitionPath, false);
List<InsertBucket> insertBuckets = partitioner.getInsertBuckets(testPartitionPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

package org.apache.hudi.common.model;

import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.FileIOUtils;

import org.junit.jupiter.api.Test;

import java.util.List;

import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStats;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -37,7 +37,7 @@ public class TestHoodieCommitMetadata {
@Test
public void testPerfStatPresenceInHoodieMetadata() throws Exception {

List<HoodieWriteStat> fakeHoodieWriteStats = HoodieTestUtils.generateFakeHoodieWriteStat(100);
List<HoodieWriteStat> fakeHoodieWriteStats = generateFakeHoodieWriteStats(100);
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
fakeHoodieWriteStats.forEach(stat -> commitMetadata.addWriteStat(stat.getPartitionPath(), stat));
assertTrue(commitMetadata.getTotalCreateTime() > 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,24 +505,51 @@ public static List<String> monotonicIncreasingCommitTimestamps(int numTimestamps
return commits;
}

public static List<HoodieWriteStat> generateFakeHoodieWriteStat(int limit) {
public static List<HoodieWriteStat> generateFakeHoodieWriteStats(int limit) {
return generateFakeHoodieWriteStats(limit, "/some/fake/path", "/some/fake/partition/path");
}

public static List<HoodieWriteStat> generateFakeHoodieWriteStats(int limit, String relFilePath, String relPartitionPath) {
List<HoodieWriteStat> writeStatList = new ArrayList<>();
for (int i = 0; i < limit; i++) {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(UUID.randomUUID().toString());
writeStat.setNumDeletes(0);
writeStat.setNumUpdateWrites(100);
writeStat.setNumWrites(100);
writeStat.setPath("/some/fake/path" + i);
writeStat.setPartitionPath("/some/fake/partition/path" + i);
writeStat.setTotalLogFilesCompacted(100L);
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalScanTime(100);
runtimeStats.setTotalCreateTime(100);
runtimeStats.setTotalUpsertTime(100);
writeStat.setRuntimeStats(runtimeStats);
writeStatList.add(writeStat);
writeStatList.add(generateFakeHoodieWriteStat(relFilePath + i, (relPartitionPath + i)));
}
return writeStatList;
}

public static HoodieWriteStat generateFakeHoodieWriteStat(String relFilePath, String relPartitionPath) {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(UUID.randomUUID().toString());
writeStat.setNumDeletes(0);
writeStat.setNumUpdateWrites(100);
writeStat.setNumWrites(100);
writeStat.setPath(relFilePath);
writeStat.setPartitionPath(relPartitionPath);
writeStat.setTotalLogFilesCompacted(100L);
writeStat.setFileSizeInBytes(1024 * 10);
writeStat.setTotalWriteBytes(1024 * 10);
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalScanTime(100);
runtimeStats.setTotalCreateTime(100);
runtimeStats.setTotalUpsertTime(100);
writeStat.setRuntimeStats(runtimeStats);
return writeStat;
}

public static HoodieWriteStat generateFakeHoodieWriteStats(java.nio.file.Path basePath, HoodieLogFile logFile) {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(UUID.randomUUID().toString());
writeStat.setNumDeletes(0);
writeStat.setNumUpdateWrites(100);
writeStat.setNumWrites(100);
writeStat.setPath(logFile.getPath().toString().replaceFirst(basePath.toString() + "/", ""));
writeStat.setPartitionPath("/some/fake/partition/path");
writeStat.setTotalLogFilesCompacted(100L);
RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalScanTime(100);
runtimeStats.setTotalCreateTime(100);
runtimeStats.setTotalUpsertTime(100);
writeStat.setRuntimeStats(runtimeStats);
return writeStat;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class InputPathHandler {
private final List<Path> snapshotPaths;
private final List<Path> nonHoodieInputPaths;

InputPathHandler(Configuration conf, Path[] inputPaths, List<String> incrementalTables) throws IOException {
public InputPathHandler(Configuration conf, Path[] inputPaths, List<String> incrementalTables) throws IOException {
this.conf = conf;
tableMetaClientMap = new HashMap<>();
snapshotPaths = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@

package org.apache.hudi.hadoop.realtime;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.LogReaderUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
Expand All @@ -34,19 +39,22 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.log.LogReaderUtils.readLatestSchemaFromLogFiles;
import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.addPartitionFields;
import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.generateProjectionSchema;
import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getNameToFieldMap;
import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.orderFields;
import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.readSchema;

/**
* Record Reader implementation to merge fresh avro data with base parquet data, to support real time queries.
*/
public abstract class AbstractRealtimeRecordReader {
private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class);

protected final HoodieRealtimeFileSplit split;
private final String basePath;
protected final JobConf jobConf;
protected final boolean usesCustomPayload;
// Schema handles
Expand All @@ -55,22 +63,42 @@ public abstract class AbstractRealtimeRecordReader {
private Schema hiveSchema;

public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job) {
this.split = split;
this.basePath = split.getBasePath();
this.jobConf = job;
LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
try {
this.usesCustomPayload = usesCustomPayload();
LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
String logMessage = "About to read compacted logs " + split.getDeltaLogPaths() + " for base split "
+ split.getPath() + ", projecting cols %s";
init(split.getDeltaLogPaths(), logMessage, Option.of(split.getPath()));
} catch (IOException e) {
throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + split.getPath(), e);
}
}

public AbstractRealtimeRecordReader(HoodieMORIncrementalFileSplit split, JobConf job) {
this.basePath = split.getBasePath();
this.jobConf = job;
LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
try {
this.usesCustomPayload = usesCustomPayload();
LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
init();
String logMessage = "About to read compacted logs for fileGroupId: "
+ split.getFileGroupId().toString() + ", projecting cols %s";
String latestBaseFilePath = split.getLatestBaseFilePath();
init(split.getLatestLogFilePaths(), logMessage, latestBaseFilePath != null ? Option.of(new Path(latestBaseFilePath)) : Option.empty());
} catch (IOException e) {
throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e);
throw new HoodieIOException("Could not create HoodieMORIncrementalRecordReader on file group Id " + split.getFileGroupId().toString(), e);
}
}

private boolean usesCustomPayload() {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, split.getBasePath());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, basePath);
return !(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName())
|| metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload"));
}
Expand All @@ -80,11 +108,10 @@ private boolean usesCustomPayload() {
* back to the schema from the latest parquet file. Finally, sets the partition column and projection fields into the
* job conf.
*/
private void init() throws IOException {
Schema schemaFromLogFile =
LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogPaths(), jobConf);
if (schemaFromLogFile == null) {
writerSchema = HoodieRealtimeRecordReaderUtils.readSchema(jobConf, split.getPath());
private void init(List<String> deltaLogPaths, String logMessage, Option<Path> splitPath) throws IOException {
Schema schemaFromLogFile = readLatestSchemaFromLogFiles(basePath, deltaLogPaths, jobConf);
if (schemaFromLogFile == null && splitPath.isPresent()) {
writerSchema = readSchema(jobConf, splitPath.get());
LOG.debug("Writer Schema From Parquet => " + writerSchema.getFields());
} else {
writerSchema = schemaFromLogFile;
Expand All @@ -95,18 +122,17 @@ private void init() throws IOException {
List<String> partitioningFields =
partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
: new ArrayList<>();
writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields);
List<String> projectionFields = HoodieRealtimeRecordReaderUtils.orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
writerSchema = addPartitionFields(writerSchema, partitioningFields);
List<String> projectionFields = orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields);

Map<String, Field> schemaFieldsMap = HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema);
Map<String, Field> schemaFieldsMap = getNameToFieldMap(writerSchema);
hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap);
// TODO(vc): In the future, the reader schema should be updated based on log files & be able
// to null out fields not present before

readerSchema = HoodieRealtimeRecordReaderUtils.generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
split.getDeltaLogPaths(), split.getPath(), projectionFields));
readerSchema = generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
LOG.info(String.format(logMessage, projectionFields));
}

private Schema constructHiveOrderedSchema(Schema writerSchema, Map<String, Field> schemaFieldsMap) {
Expand Down
Loading