From ab621a7cc9113dbc5ad7971920f444f6a9f5e4a3 Mon Sep 17 00:00:00 2001 From: Bhavani Sudha Saktheeswaran Date: Fri, 10 Jul 2020 06:15:05 -0700 Subject: [PATCH 1/2] Refactoring changes for fixing point in time incremental queries on MOR - HoodieTestUtils changes to include fake write stats per affected file - Refactor AbstractRealtimeRecordReader to be able to support additional FileSplit types. Currently it only takes HoodieRealtimeFileSplit. In future, we can add another constructor with FileSplit for incremental queries. --- .../action/commit/TestUpsertPartitioner.java | 6 +- .../model/TestHoodieCommitMetadata.java | 4 +- .../common/testutils/HoodieTestUtils.java | 57 ++++++++++++++----- .../apache/hudi/hadoop/InputPathHandler.java | 2 +- .../AbstractRealtimeRecordReader.java | 52 ++++++++++------- .../RealtimeCompactedRecordReader.java | 40 +++++++++---- .../hadoop/TestHoodieParquetInputFormat.java | 7 ++- .../hadoop/testutils/InputFormatTestUtil.java | 2 +- 8 files changed, 113 insertions(+), 57 deletions(-) diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index 91b3bb084db72..e1983c2ab9ddd 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -50,7 +50,7 @@ import scala.Tuple2; -import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat; +import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStats; import static org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; @@ -104,7 +104,7 @@ private static List setupHoodieInstants() { } private static List generateCommitStatWith(int totalRecordsWritten, int totalBytesWritten) { - List writeStatsList = generateFakeHoodieWriteStat(5); + List writeStatsList = generateFakeHoodieWriteStats(5); // clear all record and byte stats except for last entry. for (int i = 0; i < writeStatsList.size() - 1; i++) { HoodieWriteStat writeStat = writeStatsList.get(i); @@ -172,7 +172,7 @@ public void testAverageBytesPerRecordForEmptyCommitTimeLine() throws Exception { @Test public void testUpsertPartitioner() throws Exception { - final String testPartitionPath = "2016/09/26"; + final String testPartitionPath = "1/09/26"; // Inserts + Updates... Check all updates go together & inserts subsplit UpsertPartitioner partitioner = getUpsertPartitioner(0, 200, 100, 1024, testPartitionPath, false); List insertBuckets = partitioner.getInsertBuckets(testPartitionPath); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java index 0eaaff1267d43..6af95a337644e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java @@ -18,13 +18,13 @@ package org.apache.hudi.common.model; -import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.FileIOUtils; import org.junit.jupiter.api.Test; import java.util.List; +import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStats; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -37,7 +37,7 @@ public class TestHoodieCommitMetadata { @Test public void testPerfStatPresenceInHoodieMetadata() throws Exception { - List fakeHoodieWriteStats = HoodieTestUtils.generateFakeHoodieWriteStat(100); + List fakeHoodieWriteStats = generateFakeHoodieWriteStats(100); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); fakeHoodieWriteStats.forEach(stat -> commitMetadata.addWriteStat(stat.getPartitionPath(), stat)); assertTrue(commitMetadata.getTotalCreateTime() > 0); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index caae246eccf51..b2db4ac85995b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -505,24 +505,51 @@ public static List monotonicIncreasingCommitTimestamps(int numTimestamps return commits; } - public static List generateFakeHoodieWriteStat(int limit) { + public static List generateFakeHoodieWriteStats(int limit) { + return generateFakeHoodieWriteStats(limit, "/some/fake/path", "/some/fake/partition/path"); + } + + public static List generateFakeHoodieWriteStats(int limit, String relFilePath, String relPartitionPath) { List writeStatList = new ArrayList<>(); for (int i = 0; i < limit; i++) { - HoodieWriteStat writeStat = new HoodieWriteStat(); - writeStat.setFileId(UUID.randomUUID().toString()); - writeStat.setNumDeletes(0); - writeStat.setNumUpdateWrites(100); - writeStat.setNumWrites(100); - writeStat.setPath("/some/fake/path" + i); - writeStat.setPartitionPath("/some/fake/partition/path" + i); - writeStat.setTotalLogFilesCompacted(100L); - RuntimeStats runtimeStats = new RuntimeStats(); - runtimeStats.setTotalScanTime(100); - runtimeStats.setTotalCreateTime(100); - runtimeStats.setTotalUpsertTime(100); - writeStat.setRuntimeStats(runtimeStats); - writeStatList.add(writeStat); + writeStatList.add(generateFakeHoodieWriteStat(relFilePath + i, (relPartitionPath + i))); } return writeStatList; } + + public static HoodieWriteStat generateFakeHoodieWriteStat(String relFilePath, String relPartitionPath) { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId(UUID.randomUUID().toString()); + writeStat.setNumDeletes(0); + writeStat.setNumUpdateWrites(100); + writeStat.setNumWrites(100); + writeStat.setPath(relFilePath); + writeStat.setPartitionPath(relPartitionPath); + writeStat.setTotalLogFilesCompacted(100L); + writeStat.setFileSizeInBytes(1024 * 10); + writeStat.setTotalWriteBytes(1024 * 10); + RuntimeStats runtimeStats = new RuntimeStats(); + runtimeStats.setTotalScanTime(100); + runtimeStats.setTotalCreateTime(100); + runtimeStats.setTotalUpsertTime(100); + writeStat.setRuntimeStats(runtimeStats); + return writeStat; + } + + public static HoodieWriteStat generateFakeHoodieWriteStats(java.nio.file.Path basePath, HoodieLogFile logFile) { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId(UUID.randomUUID().toString()); + writeStat.setNumDeletes(0); + writeStat.setNumUpdateWrites(100); + writeStat.setNumWrites(100); + writeStat.setPath(logFile.getPath().toString().replaceFirst(basePath.toString() + "/", "")); + writeStat.setPartitionPath("/some/fake/partition/path"); + writeStat.setTotalLogFilesCompacted(100L); + RuntimeStats runtimeStats = new RuntimeStats(); + runtimeStats.setTotalScanTime(100); + runtimeStats.setTotalCreateTime(100); + runtimeStats.setTotalUpsertTime(100); + writeStat.setRuntimeStats(runtimeStats); + return writeStat; + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java index 1ad3812906b99..0a5055a056fa2 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java @@ -51,7 +51,7 @@ public class InputPathHandler { private final List snapshotPaths; private final List nonHoodieInputPaths; - InputPathHandler(Configuration conf, Path[] inputPaths, List incrementalTables) throws IOException { + public InputPathHandler(Configuration conf, Path[] inputPaths, List incrementalTables) throws IOException { this.conf = conf; tableMetaClientMap = new HashMap<>(); snapshotPaths = new ArrayList<>(); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index 3758b9b8519c9..ac00b9daec3de 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -18,12 +18,17 @@ package org.apache.hudi.hadoop.realtime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.log.LogReaderUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; -import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; @@ -34,19 +39,22 @@ import org.apache.log4j.Logger; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.common.table.log.LogReaderUtils.readLatestSchemaFromLogFiles; +import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.addPartitionFields; +import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.generateProjectionSchema; +import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getNameToFieldMap; +import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.orderFields; +import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.readSchema; + /** * Record Reader implementation to merge fresh avro data with base parquet data, to support real time queries. */ public abstract class AbstractRealtimeRecordReader { private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class); - protected final HoodieRealtimeFileSplit split; + private final String basePath; protected final JobConf jobConf; protected final boolean usesCustomPayload; // Schema handles @@ -55,7 +63,7 @@ public abstract class AbstractRealtimeRecordReader { private Schema hiveSchema; public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job) { - this.split = split; + this.basePath = split.getBasePath(); this.jobConf = job; LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); @@ -63,14 +71,16 @@ public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job) try { this.usesCustomPayload = usesCustomPayload(); LOG.info("usesCustomPayload ==> " + this.usesCustomPayload); - init(); + String logMessage = "About to read compacted logs " + split.getDeltaLogPaths() + " for base split " + + split.getPath() + ", projecting cols %s"; + init(split.getDeltaLogPaths(), logMessage, Option.of(split.getPath())); } catch (IOException e) { - throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e); + throw new HoodieIOException("Could not create HoodieRealtimeRecordReader on path " + split.getPath(), e); } } private boolean usesCustomPayload() { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, split.getBasePath()); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, basePath); return !(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName()) || metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload")); } @@ -80,11 +90,10 @@ private boolean usesCustomPayload() { * back to the schema from the latest parquet file. Finally, sets the partition column and projection fields into the * job conf. */ - private void init() throws IOException { - Schema schemaFromLogFile = - LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaLogPaths(), jobConf); - if (schemaFromLogFile == null) { - writerSchema = HoodieRealtimeRecordReaderUtils.readSchema(jobConf, split.getPath()); + private void init(List deltaLogPaths, String logMessage, Option splitPath) throws IOException { + Schema schemaFromLogFile = readLatestSchemaFromLogFiles(basePath, deltaLogPaths, jobConf); + if (schemaFromLogFile == null && splitPath.isPresent()) { + writerSchema = readSchema(jobConf, splitPath.get()); LOG.debug("Writer Schema From Parquet => " + writerSchema.getFields()); } else { writerSchema = schemaFromLogFile; @@ -95,18 +104,17 @@ private void init() throws IOException { List partitioningFields = partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) : new ArrayList<>(); - writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields); - List projectionFields = HoodieRealtimeRecordReaderUtils.orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), + writerSchema = addPartitionFields(writerSchema, partitioningFields); + List projectionFields = orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR), partitioningFields); - Map schemaFieldsMap = HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema); + Map schemaFieldsMap = getNameToFieldMap(writerSchema); hiveSchema = constructHiveOrderedSchema(writerSchema, schemaFieldsMap); // TODO(vc): In the future, the reader schema should be updated based on log files & be able // to null out fields not present before - readerSchema = HoodieRealtimeRecordReaderUtils.generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields); - LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s", - split.getDeltaLogPaths(), split.getPath(), projectionFields)); + readerSchema = generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields); + LOG.info(String.format(logMessage, projectionFields)); } private Schema constructHiveOrderedSchema(Schema writerSchema, Map schemaFieldsMap) { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 02bb5eb63c130..7f505c90ce0f0 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -18,13 +18,14 @@ package org.apache.hudi.hadoop.realtime; +import java.util.List; +import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.util.Option; -import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; @@ -40,6 +41,13 @@ import java.io.IOException; import java.util.Map; +import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP; +import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED; +import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE; +import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH; +import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP; +import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP; + class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader implements RecordReader { @@ -52,28 +60,40 @@ public RealtimeCompactedRecordReader(HoodieRealtimeFileSplit split, JobConf job, RecordReader realReader) throws IOException { super(split, job); this.parquetReader = realReader; - this.deltaRecordMap = getMergedLogRecordScanner().getRecords(); + this.deltaRecordMap = getMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), jobConf), + split.getBasePath(), + split.getDeltaLogPaths(), + usesCustomPayload, + split.getMaxCommitTime(), + jobConf + ).getRecords(); } /** * Goes through the log files and populates a map with latest version of each key logged, since the base split was * written. */ - private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOException { + public HoodieMergedLogRecordScanner getMergedLogRecordScanner(FileSystem fs, + String basePath, + List deltaLogPaths, + boolean usesCustomPayload, + String maxCommitTime, + JobConf jobConf) throws IOException { // NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit // but can return records for completed commits > the commit we are trying to read (if using // readCommit() API) return new HoodieMergedLogRecordScanner( - FSUtils.getFs(split.getPath().toString(), jobConf), - split.getBasePath(), - split.getDeltaLogPaths(), + fs, + basePath, + deltaLogPaths, usesCustomPayload ? getWriterSchema() : getReaderSchema(), - split.getMaxCommitTime(), + maxCommitTime, getMaxCompactionMemoryInBytes(), - Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), + Boolean + .parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), false, - jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), - jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)); + jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), + jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, DEFAULT_SPILLABLE_MAP_BASE_PATH)); } @Override diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index 918aade813327..65aec6b78f6fd 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -28,7 +28,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; -import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; @@ -52,6 +51,8 @@ import java.util.ArrayList; import java.util.List; +import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStats; +import static org.apache.hudi.common.testutils.HoodieTestUtils.init; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -102,7 +103,7 @@ public void testPendingCompactionWithActiveCommits() throws IOException { instants.add(t4); instants.add(t5); instants.add(t6); - HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.toString()); + HoodieTableMetaClient metaClient = init(basePath.toString()); HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient); timeline.setInstants(instants); @@ -239,7 +240,7 @@ public void testIncrementalSimple() throws IOException { private void createCommitFile(java.nio.file.Path basePath, String commitNumber, String partitionPath) throws IOException { - List writeStats = HoodieTestUtils.generateFakeHoodieWriteStat(1); + List writeStats = generateFakeHoodieWriteStats(1); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); writeStats.forEach(stat -> commitMetadata.addWriteStat(partitionPath, stat)); File file = basePath.resolve(".hoodie").resolve(commitNumber + ".commit").toFile(); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 0201fac2d759f..967277d235395 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -59,7 +59,7 @@ public class InputFormatTestUtil { - private static String TEST_WRITE_TOKEN = "1-0-1"; + public static String TEST_WRITE_TOKEN = "1-0-1"; public static File prepareTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles, String commitNumber) From 304061316e3f633fef102f9f217c3e446ce14156 Mon Sep 17 00:00:00 2001 From: Bhavani Sudha Saktheeswaran Date: Fri, 10 Jul 2020 11:30:47 -0700 Subject: [PATCH 2/2] [HUDI-651] Fix incremental queries in MOR tables - This commit addresses two issues: 1. Honors end time if less than the most recent completed commit time 2. Doesnt require a base parquet file to be present in case when the begin and end times match only the the deltacommits. To achieve this: - Created a seperate FileSplit for handling incremental queries - New RecordReader to handle the new FileSplit - FileSlice Scanner to scan files in a File Slice. First takes base parquet file (if present) and applies merged records from all log files in that slice. If base file is not present returns merged records from log files on scanning - HoodieParquetRealtimeInputFormat modified to switch to this HoodieMORIncrementalFileSplit and HoodieMORIncrementalRecordReader from getSplit(..) and getRecordReader(..) in case of incremental queries. - Includes unit test to test different incremental queries --- .../AbstractRealtimeRecordReader.java | 18 + .../HoodieMORIncrementalFileSplit.java | 178 +++++++++ .../HoodieMORIncrementalRecordReader.java | 86 +++++ .../HoodieMergedFileSlicesScanner.java | 244 +++++++++++++ .../HoodieParquetRealtimeInputFormat.java | 120 +++++- .../TestHoodieParquetRealtimeInputFormat.java | 341 ++++++++++++++++++ 6 files changed, 977 insertions(+), 10 deletions(-) create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMORIncrementalFileSplit.java create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMORIncrementalRecordReader.java create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergedFileSlicesScanner.java create mode 100644 hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieParquetRealtimeInputFormat.java diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index ac00b9daec3de..7b4c438b5027e 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -79,6 +79,24 @@ public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job) } } + public AbstractRealtimeRecordReader(HoodieMORIncrementalFileSplit split, JobConf job) { + this.basePath = split.getBasePath(); + this.jobConf = job; + LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); + LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); + LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "")); + try { + this.usesCustomPayload = usesCustomPayload(); + LOG.info("usesCustomPayload ==> " + this.usesCustomPayload); + String logMessage = "About to read compacted logs for fileGroupId: " + + split.getFileGroupId().toString() + ", projecting cols %s"; + String latestBaseFilePath = split.getLatestBaseFilePath(); + init(split.getLatestLogFilePaths(), logMessage, latestBaseFilePath != null ? Option.of(new Path(latestBaseFilePath)) : Option.empty()); + } catch (IOException e) { + throw new HoodieIOException("Could not create HoodieMORIncrementalRecordReader on file group Id " + split.getFileGroupId().toString(), e); + } + } + private boolean usesCustomPayload() { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, basePath); return !(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName()) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMORIncrementalFileSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMORIncrementalFileSplit.java new file mode 100644 index 0000000000000..e73ed1a94ce2c --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMORIncrementalFileSplit.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.realtime; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.util.Option; + +/** + * Filesplit that wraps a HoodieFileGroup involved in Incremental queries. + */ +public class HoodieMORIncrementalFileSplit extends FileSplit { + + private String basePath; + private HoodieFileGroupId fileGroupId; + private List fileGroupSlices; + String maxCommitTime; + + public HoodieMORIncrementalFileSplit(HoodieFileGroup hoodieFileGroup, String basePath, String maxCommitTime) { + this.fileGroupId = hoodieFileGroup.getFileGroupId(); + this.fileGroupSlices = hoodieFileGroup.getAllFileSlices().collect(Collectors.toList()); + this.basePath = basePath; + this.maxCommitTime = maxCommitTime; + } + + public HoodieFileGroupId getFileGroupId() { + return fileGroupId; + } + + public List getFileSlices() { + return fileGroupSlices; + } + + public String getBasePath() { + return basePath; + } + + public String getMaxCommitTime() { + return maxCommitTime; + } + + // returns the most recent Parquet file path from the group of file slices + public String getLatestBaseFilePath() { + Option fileSlice = Option.fromJavaOptional(fileGroupSlices.stream().filter(slice -> slice.getBaseFile().isPresent()).findFirst()); + return fileSlice.map(slice -> slice.getBaseFile().get().getPath()).orElse(null); + } + + // returns the most recent log file paths from the group of file slices + public List getLatestLogFilePaths() { + Option fileSlice = Option.fromJavaOptional(fileGroupSlices.stream().filter(slice -> slice.getLogFiles().findAny().isPresent()).findFirst()); + return fileSlice.map(slice -> slice.getLogFiles().map(logFile -> logFile.getPath().toString()).collect(Collectors.toList())).orElse(new ArrayList<>()); + } + + private static void writeString(String str, DataOutput out) throws IOException { + byte[] bytes = str.getBytes(StandardCharsets.UTF_8); + out.writeInt(bytes.length); + out.write(bytes); + } + + private static void writeFileGroupId(HoodieFileGroupId fileGroupId, DataOutput out) throws IOException { + writeString(fileGroupId.getPartitionPath(), out); + writeString(fileGroupId.getFileId(), out); + } + + private static void writeFileStatus(FileStatus fileStatus, DataOutput out) throws IOException { + out.writeLong(fileStatus.getLen()); + writeString(fileStatus.getPath().toString(), out); + } + + private static void writeFileSlice(FileSlice slice, DataOutput out) throws IOException { + boolean isBaseFilePresent = slice.getBaseFile().isPresent(); + out.writeBoolean(isBaseFilePresent); + if (isBaseFilePresent) { + writeFileStatus(slice.getBaseFile().get().getFileStatus(), out); + } + List logFilePaths = slice.getLogFiles() + .map(f -> f.getPath().toString()) + .collect(Collectors.toList()); + out.writeInt(logFilePaths.size()); + for (String logFilePath : logFilePaths) { + writeString(logFilePath, out); + } + } + + @Override + public void write(DataOutput out) throws IOException { + writeString(basePath, out); + writeString(maxCommitTime, out); + writeFileGroupId(fileGroupId, out); + out.writeInt(fileGroupSlices.size()); + for (FileSlice fileSlice: fileGroupSlices) { + writeFileSlice(fileSlice, out); + } + } + + private static String readString(DataInput in) throws IOException { + byte[] bytes = new byte[in.readInt()]; + in.readFully(bytes); + return new String(bytes, StandardCharsets.UTF_8); + } + + private static HoodieFileGroupId readFileGroupId(DataInput in) throws IOException { + return new HoodieFileGroupId(readString(in), readString(in)); + } + + private static FileStatus readFileStatus(DataInput in) throws IOException { + return new FileStatus(in.readLong(), false, 0, 0, 0, new Path(readString(in))); + } + + private static FileSlice readFileSlice(HoodieFileGroupId fileGroupId, DataInput in) throws IOException { + boolean isBaseFilePresent = in.readBoolean(); + Option baseFile = isBaseFilePresent ? Option.of(new HoodieBaseFile(readFileStatus(in))) : Option.empty(); + int numLogFiles = in.readInt(); + List logFiles = new ArrayList<>(); + Option baseInstantTime = Option.empty(); + for (int i = 0; i < numLogFiles; i++) { + HoodieLogFile logFile = new HoodieLogFile(readString(in)); + logFiles.add(logFile); + if (!baseInstantTime.isPresent()) { + baseInstantTime = Option.of(logFile.getBaseCommitTime()); + } + } + FileSlice fileSlice = new FileSlice(fileGroupId, baseInstantTime.get()); + baseFile.ifPresent(fileSlice::setBaseFile); + for (HoodieLogFile logFile: logFiles) { + fileSlice.addLogFile(logFile); + } + return fileSlice; + } + + @Override + public void readFields(DataInput in) throws IOException { + basePath = readString(in); + maxCommitTime = readString(in); + fileGroupId = readFileGroupId(in); + int totalSlices = in.readInt(); + fileGroupSlices = new ArrayList<>(); + for (int i = 0; i < totalSlices; i++) { + fileGroupSlices.add(readFileSlice(fileGroupId, in)); + } + } + + @Override + public String toString() { + return "HoodieMORIncrementalFileSplit{BasePath=" + basePath + ", FileGroupId=" + fileGroupId + + ", maxcommitTime='" + maxCommitTime + + ", numFileSlices='" + fileGroupSlices.size() + '}'; + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMORIncrementalRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMORIncrementalRecordReader.java new file mode 100644 index 0000000000000..e52c6ba981e03 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMORIncrementalRecordReader.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.realtime; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Record Reader which can do compacted (merge-on-read) record reading to serve incremental queries. + */ +public class HoodieMORIncrementalRecordReader extends AbstractRealtimeRecordReader implements RecordReader { + private static final Logger LOG = LogManager.getLogger(HoodieMORIncrementalRecordReader.class); + private final Map records; + private final Iterator iterator; + + public HoodieMORIncrementalRecordReader(HoodieMORIncrementalFileSplit split, JobConf job, Reporter reporter) { + super(split, job); + HoodieMergedFileSlicesScanner fileSlicesScanner = new HoodieMergedFileSlicesScanner(split, job, reporter, this); + try { + fileSlicesScanner.scan(); + this.records = fileSlicesScanner.getRecords(); + this.iterator = records.values().iterator(); + } catch (IOException e) { + throw new HoodieIOException("IOException when reading log file "); + } + } + + @Override + public boolean next(NullWritable key, ArrayWritable value) throws IOException { + if (!iterator.hasNext()) { + return false; + } + value.set(iterator.next().get()); + return true; + } + + @Override + public NullWritable createKey() { + return null; + } + + @Override + public ArrayWritable createValue() { + return new ArrayWritable(Writable.class); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + } + + @Override + public float getProgress() throws IOException { + return 0; + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergedFileSlicesScanner.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergedFileSlicesScanner.java new file mode 100644 index 0000000000000..afb52a1268a99 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergedFileSlicesScanner.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.realtime; + +import java.io.IOException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP; +import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED; +import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE; +import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH; +import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP; +import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP; +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS; +import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.arrayWritableToString; +import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.avroToArrayWritable; + +/** + * Scans through all files in a file slice. Constructs a map of records from Parquet file if present. Constructs merged + * log records if log files are present in the slice. Merges records from log files on top of records from base parquet + * file. + * + */ +public class HoodieMergedFileSlicesScanner { + + private static final Logger LOG = LogManager.getLogger(HoodieMergedFileSlicesScanner.class); + + private final transient MapredParquetInputFormat mapredParquetInputFormat; + private final HoodieMORIncrementalFileSplit split; + private final List fileSlices; + + // Final map of compacted/merged records + // TODO change map to external spillable map. But ArrayWritable is not implementing Serializable + private final Map records; + + private final JobConf jobConf; + private final Reporter reporter; + private final HoodieMORIncrementalRecordReader recordReader; + + public HoodieMergedFileSlicesScanner( + HoodieMORIncrementalFileSplit split, + JobConf jobConf, + Reporter reporter, + HoodieMORIncrementalRecordReader recordReader) { + this.split = split; + this.fileSlices = split.getFileSlices() + .stream() + .sorted(Comparator.comparing(FileSlice::getBaseInstantTime)) + .collect(Collectors.toList()); + this.mapredParquetInputFormat = new MapredParquetInputFormat(); + this.records = new HashMap<>(); + this.jobConf = jobConf; + this.reporter = reporter; + this.recordReader = recordReader; + } + + public void scan() throws IOException { + for (FileSlice fileSlice: fileSlices) { + Map newRecords = scanFileSlice(fileSlice); + mergeRecords(newRecords); + } + } + + private void mergeRecords(Map newFileSliceRecords) { + for (Entry entry: newFileSliceRecords.entrySet()) { + records.put(entry.getKey(), entry.getValue()); + } + } + + private Map scanFileSlice(FileSlice fileSlice) throws IOException { + Map baseFileRecords = fetchBaseFileRecordsMapForSlice(fileSlice); + Map> deltaRecords = fetchDeltaRecordMapForSlice(fileSlice); + if ((baseFileRecords == null || baseFileRecords.isEmpty()) && deltaRecords != null) { + Map result = new HashMap<>(); + for (Entry> entry: deltaRecords.entrySet()) { + Option aWritable = hoodieRecordToArrayWritable(entry.getValue()); + if (aWritable.isPresent()) { + result.put(entry.getKey(), aWritable.get()); + } + } + return result; + } + if (deltaRecords == null && baseFileRecords != null) { + return baseFileRecords; + } + // Apply delta records on top of base file records + return applyUpdatesToBaseFile(baseFileRecords, deltaRecords); + } + + private Map applyUpdatesToBaseFile( + Map baseFileRecords, + Map> deltaRecords) throws IOException { + Iterator> iterator = baseFileRecords.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + if (deltaRecords.containsKey(entry.getKey())) { + Option aWritable = hoodieRecordToArrayWritable(deltaRecords.get(entry.getKey())); + if (!aWritable.isPresent()) { + // case where a valid record in basefile is deleted later in log files + iterator.remove(); + } else { + // replace original value with value from log file. + Writable[] originalValue = entry.getValue().get(); + Writable[] replaceValue = aWritable.get().get(); + try { + System.arraycopy(replaceValue, 0, originalValue, 0, originalValue.length); + entry.getValue().set(originalValue); + } catch (RuntimeException re) { + LOG.error("Got exception when doing array copy", re); + LOG.error("Base record :" + arrayWritableToString(entry.getValue())); + LOG.error("Log record :" + arrayWritableToString(aWritable.get())); + String errMsg = "Base-record :" + arrayWritableToString(entry.getValue()) + + " ,Log-record :" + arrayWritableToString(aWritable.get()) + " ,Error :" + re.getMessage(); + throw new RuntimeException(errMsg, re); + } + } + } + } + return baseFileRecords; + } + + private Option hoodieRecordToArrayWritable( + HoodieRecord record) throws IOException { + Option rec; + if (recordReader.usesCustomPayload) { + rec = record.getData().getInsertValue(recordReader.getWriterSchema()); + } else { + rec = record.getData().getInsertValue(recordReader.getReaderSchema()); + } + if (!rec.isPresent()) { + // possibly the record is deleted + return Option.empty(); + } + GenericRecord recordToReturn = rec.get(); + if (recordReader.usesCustomPayload) { + // If using a custom payload, return only the projection fields. The readerSchema is a schema derived from + // the writerSchema with only the projection fields + recordToReturn = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(), recordReader.getReaderSchema()); + } + return Option.of((ArrayWritable) avroToArrayWritable(recordToReturn, recordReader.getHiveSchema())); + } + + private Map fetchBaseFileRecordsMapForSlice(FileSlice fileSlice) throws IOException { + if (!fileSlice.getBaseFile().isPresent()) { + return null; + } + + // create a single split from the base parquet file and read the records. + FileStatus baseFileStatus = fileSlice.getBaseFile().get().getFileStatus(); + FileSplit fileSplit = new FileSplit(baseFileStatus.getPath(), 0, baseFileStatus.getLen(), (String[]) null); + Map result = new HashMap<>(); + RecordReader recordReader = null; + try { + recordReader = mapredParquetInputFormat.getRecordReader(fileSplit, jobConf, reporter); + NullWritable key = recordReader.createKey(); + ArrayWritable value = recordReader.createValue(); + boolean hasNext = recordReader.next(key, value); + while (hasNext) { + String hoodieRecordKey = value.get()[HOODIE_RECORD_KEY_COL_POS].toString(); + result.put(hoodieRecordKey, value); + key = recordReader.createKey(); + value = recordReader.createValue(); + hasNext = recordReader.next(key, value); + } + } catch (IOException e) { + LOG.error("Got exception when iterating parquet file: " + baseFileStatus.getPath(), e); + throw new HoodieIOException("IO exception when reading parquet file"); + } finally { + if (recordReader != null) { + recordReader.close(); + } + } + return result; + } + + private Map> fetchDeltaRecordMapForSlice(FileSlice fileSlice) { + if (!fileSlice.getLogFiles().findAny().isPresent()) { + return null; + } + // reverse the log files to be in natural order + List logFilesPath = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(file -> file.getPath().toString()).collect(Collectors.toList()); + + return new HoodieMergedLogRecordScanner( + FSUtils.getFs(split.getBasePath(), jobConf), + split.getBasePath(), + logFilesPath, + recordReader.usesCustomPayload ? recordReader.getWriterSchema() : recordReader.getReaderSchema(), + split.getMaxCommitTime(), + recordReader.getMaxCompactionMemoryInBytes(), + Boolean + .parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), + false, + jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), + jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, DEFAULT_SPILLABLE_MAP_BASE_PATH) + ).getRecords(); + } + + public Map getRecords() { + return records; + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index 11247910effaf..b0d22a0c0b9ff 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -18,17 +18,33 @@ package org.apache.hudi.hadoop.realtime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.InputPathHandler; import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; @@ -42,8 +58,11 @@ import org.apache.log4j.Logger; import java.io.IOException; -import java.util.Arrays; +import java.util.stream.Collectors; import java.util.stream.Stream; +import org.mortbay.log.Log; + +import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.getIncrementalTableNames; /** * Input Format, that provides a real-time view of data in a Hoodie table. @@ -62,16 +81,93 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + // is this an incremental query + List incrementalTables = getIncrementalTableNames(Job.getInstance(job)); + if (!incrementalTables.isEmpty()) { + //TODO For now assuming the query can be either incremental or snapshot and NOT both. + return getSplitsForIncrementalQueries(job, incrementalTables); + } + Stream fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is); return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits); } - @Override - public FileStatus[] listStatus(JobConf job) throws IOException { - // Call the HoodieInputFormat::listStatus to obtain all latest parquet files, based on commit - // timeline. - return super.listStatus(job); + protected InputSplit[] getSplitsForIncrementalQueries(JobConf job, List incrementalTables) throws IOException { + InputPathHandler inputPathHandler = new InputPathHandler(conf, getInputPaths(job), incrementalTables); + Map tableMetaClientMap = inputPathHandler.getTableMetaClientMap(); + List splits = new ArrayList<>(); + + for (String table : incrementalTables) { + HoodieTableMetaClient metaClient = tableMetaClientMap.get(table); + if (metaClient == null) { + /* This can happen when the INCREMENTAL mode is set for a table but there were no InputPaths + * in the jobConf + */ + continue; + } + String tableName = metaClient.getTableConfig().getTableName(); + Path basePath = new Path(metaClient.getBasePath()); + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(Job.getInstance(job), tableName); + // Total number of commits to return in this batch. Set this to -1 to get all the commits. + Integer maxCommits = HoodieHiveUtils.readMaxCommits(Job.getInstance(job), tableName); + LOG.info("Last Incremental timestamp for table: " + table + ", was set as " + lastIncrementalTs); + List commitsToCheck = timeline.findInstantsAfter(lastIncrementalTs, maxCommits) + .getInstants().collect(Collectors.toList()); + + Map> partitionToFileStatusesMap = listStatusForAffectedPartitions(basePath, commitsToCheck, timeline); + + List fileStatuses = new ArrayList<>(); + for (List statuses: partitionToFileStatusesMap.values()) { + fileStatuses.addAll(statuses); + } + LOG.info("Stats after applying Hudi incremental filter: total_commits_to_check: " + commitsToCheck.size() + + ", total_partitions_touched: " + partitionToFileStatusesMap.size() + ", total_files_processed: " + + fileStatuses.size()); + FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]); + List commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline, statuses); + + // Iterate partitions to create splits + partitionToFileStatusesMap.keySet().forEach(path -> { + // create an Incremental Split for each file group. + fsView.getAllFileGroups(path) + .forEach( + fileGroup -> splits.add( + new HoodieMORIncrementalFileSplit(fileGroup, basePath.toString(), commitsList.get(commitsList.size() - 1)) + )); + }); + } + Log.info("Total splits generated: " + splits.size()); + return splits.toArray(new InputSplit[0]); + } + + private Map> listStatusForAffectedPartitions( + Path basePath, List commitsToCheck, HoodieTimeline timeline) throws IOException { + // Extract files touched by these commits. + // TODO This might need to be done in parallel like listStatus parallelism ? + HashMap> partitionToFileStatusesMap = new HashMap<>(); + for (HoodieInstant commit: commitsToCheck) { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), + HoodieCommitMetadata.class); + for (Map.Entry> entry: commitMetadata.getPartitionToWriteStats().entrySet()) { + if (!partitionToFileStatusesMap.containsKey(entry.getKey())) { + partitionToFileStatusesMap.put(entry.getKey(), new ArrayList<>()); + } + for (HoodieWriteStat stat : entry.getValue()) { + String relativeFilePath = stat.getPath(); + Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null; + if (fullPath != null) { + //TODO Should the length of file be totalWriteBytes or fileSizeInBytes? + FileStatus fs = new FileStatus(stat.getTotalWriteBytes(), false, 0, 0, + 0, fullPath); + partitionToFileStatusesMap.get(entry.getKey()).add(fs); + } + } + } + } + return partitionToFileStatusesMap; } @Override @@ -165,11 +261,15 @@ public RecordReader getRecordReader(final InputSpli LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); // sanity check - ValidationUtils.checkArgument(split instanceof HoodieRealtimeFileSplit, + ValidationUtils.checkArgument(split instanceof HoodieRealtimeFileSplit || split instanceof HoodieMORIncrementalFileSplit, "HoodieRealtimeRecordReader can only work on HoodieRealtimeFileSplit and not with " + split); - return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, jobConf, - super.getRecordReader(split, jobConf, reporter)); + if (split instanceof HoodieRealtimeFileSplit) { + return new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) split, jobConf, + super.getRecordReader(split, jobConf, reporter)); + } + + return new HoodieMORIncrementalRecordReader((HoodieMORIncrementalFileSplit) split, jobConf, reporter); } @Override diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieParquetRealtimeInputFormat.java new file mode 100644 index 0000000000000..94197e9b88467 --- /dev/null +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieParquetRealtimeInputFormat.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.realtime; + +import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStats; +import static org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultHadoopConf; +import static org.apache.hudi.common.testutils.HoodieTestUtils.init; +import static org.apache.hudi.common.testutils.SchemaTestUtil.generateAvroRecordFromJson; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getEvolvedSchema; +import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP; +import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestHoodieParquetRealtimeInputFormat { + + private HoodieParquetRealtimeInputFormat inputFormat; + private JobConf jobConf; + private Configuration hadoopConf; + private FileSystem fs; + private Schema schema; + int numberOfRecords = 150; + int numRecordsPerBaseFile = numberOfRecords / 2; + int numberOfUniqueLogRecords = numberOfRecords / 6; + int numberOfFiles = 1; + String fileID = "fileid0"; + String partitionPath = "2016/05/01"; + String baseInstant1 = "100"; + String deltaCommitTime1 = "101"; + String deltaCommitTime2 = "102"; + String deltaCommitTime3 = "103"; + String baseInstant2 = "200"; + String deltaCommitTime4 = "201"; + String deltaCommitTime5 = "202"; + + @TempDir + public java.nio.file.Path basePath; + + @BeforeEach + public void setUp() throws IOException { + inputFormat = new HoodieParquetRealtimeInputFormat(); + jobConf = new JobConf(); + jobConf.set(MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(1024 * 1024)); + hadoopConf = getDefaultHadoopConf(); + fs = FSUtils.getFs(basePath.toString(), hadoopConf); + schema = HoodieAvroUtils.addMetadataFields(getEvolvedSchema()); + setPropsForInputFormat(inputFormat, jobConf, schema); + } + + private static void setPropsForInputFormat(HoodieParquetRealtimeInputFormat inputFormat, JobConf jobConf, + Schema schema) { + List fields = schema.getFields(); + String names = fields.stream().map(Schema.Field::name).collect(Collectors.joining(",")); + String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); + Configuration conf = getDefaultHadoopConf(); + + String hiveColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase("datestr")) + .map(Schema.Field::name).collect(Collectors.joining(",")); + hiveColumnNames = hiveColumnNames + ",datestr"; + + String hiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes("string,string,string,bigint,string,string"); + hiveColumnTypes = hiveColumnTypes + ",string"; + jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); + jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, hiveColumnTypes); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); + jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr"); + conf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); + conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); + conf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "datestr"); + conf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, hiveColumnTypes); + inputFormat.setConf(conf); + jobConf.addResource(conf); + } + + private void prepareTestData() throws IOException, InterruptedException { + // [inserts][initial commit] => creates one parquet file + init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); + File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, numberOfFiles, numRecordsPerBaseFile, baseInstant1); + createCommitFile(basePath, baseInstant1, partitionPath); + + moreUpdates(partitionDir, baseInstant1, deltaCommitTime1, 0); + + moreUpdates(partitionDir, baseInstant1, deltaCommitTime2, 1 * numberOfUniqueLogRecords); + + moreUpdates(partitionDir, baseInstant1, deltaCommitTime3, 2 * numberOfUniqueLogRecords); + + InputFormatTestUtil.prepareParquetTable(basePath, schema, numberOfFiles, numRecordsPerBaseFile, baseInstant2); + createCommitFile(basePath, baseInstant2, partitionPath); + + moreUpdates(partitionDir, baseInstant2, deltaCommitTime4, 0); + + moreUpdates(partitionDir, baseInstant2, deltaCommitTime5, 1 * numberOfUniqueLogRecords); + + // Add the paths to jobConf + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + } + + // moreUpdates creates a new log file on top of latest FileSlice and publishes a new deltacommit + private void moreUpdates(File partitionDir, String baseCommit, String deltaCommit, int offset) throws IOException, InterruptedException { + HoodieLogFormat.Writer writer = writeLogFileWithUniqueUpdates(partitionDir, schema, fileID, baseCommit, deltaCommit, + numberOfUniqueLogRecords, offset); + long size = writer.getCurrentSize(); + writer.close(); + assertTrue(size > 0, "block - size should be > 0"); + createDeltaCommitFile(basePath, deltaCommit, partitionPath, writer); + } + + private HoodieLogFormat.Writer writeLogFileWithUniqueUpdates( + File partitionDir, + Schema schema, + String fileId, + String baseCommit, + String newCommit, + int numberOfRecords, + int offset) throws InterruptedException, IOException { + return writeDataBlockToLogFile(partitionDir, schema, fileId, baseCommit, newCommit, numberOfRecords, offset, 0); + } + + private HoodieLogFormat.Writer writeDataBlockToLogFile(File partitionDir, Schema schema, String fileId, + String baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion) + throws InterruptedException, IOException { + HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).withLogVersion(logVersion) + .withLogWriteToken("1-0-1").overBaseCommit(baseCommit).withFs(fs).build(); + List records = new ArrayList<>(); + for (int i = offset; i < offset + numberOfRecords; i++) { + records.add(generateAvroRecordFromJson(schema, i, newCommit, fileID)); + } + Schema writeSchema = records.get(0).getSchema(); + Map header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writeSchema.toString()); + HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header); + writer = writer.appendBlock(dataBlock); + return writer; + } + + private void createCommitFile(java.nio.file.Path basePath, String commitNumber, String relativePartitionPath) + throws IOException { + String dataFileName = FSUtils.makeDataFileName(commitNumber, InputFormatTestUtil.TEST_WRITE_TOKEN, fileID); + java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01")); + Path fullPath = new Path(partitionPath.resolve(dataFileName).toString()); + String relFilePath = fullPath.toString().replaceFirst(basePath.toString() + "/", ""); + HoodieWriteStat writeStat = HoodieTestUtils.generateFakeHoodieWriteStat(relFilePath, relativePartitionPath); + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + commitMetadata.addWriteStat(relativePartitionPath, writeStat); + java.nio.file.Path file = Files.createFile(basePath.resolve(Paths.get(".hoodie", commitNumber + ".commit"))); + FileOutputStream fileOutputStream = new FileOutputStream(file.toAbsolutePath().toString()); + fileOutputStream.write(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)); + fileOutputStream.flush(); + fileOutputStream.close(); + } + + private void createDeltaCommitFile(java.nio.file.Path basePath, String commitNumber, String partitionPath, HoodieLogFormat.Writer writer) + throws IOException { + HoodieWriteStat writeStat = generateFakeHoodieWriteStats(basePath, writer.getLogFile()); + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + commitMetadata.addWriteStat(partitionPath, writeStat); + java.nio.file.Path file = Files.createFile(basePath.resolve(Paths.get(".hoodie/", commitNumber + ".deltacommit"))); + FileOutputStream fileOutputStream = new FileOutputStream(file.toAbsolutePath().toString()); + fileOutputStream.write(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)); + fileOutputStream.flush(); + fileOutputStream.close(); + } + + private void testIncrementalQuery(String msg, String endCommitTime, Option beginCommitTime, + int expectedNumberOfRecordsInCommit) throws IOException { + int actualCount = 0; + InputSplit[] splits = inputFormat.getSplits(jobConf, 1); + assertEquals(1, splits.length); + for (InputSplit split : splits) { + RecordReader recordReader = inputFormat.getRecordReader(split, jobConf, null); + NullWritable key = recordReader.createKey(); + ArrayWritable writable = recordReader.createValue(); + + while (recordReader.next(key, writable)) { + // writable returns an array with schema like org.apache.hudi.common.util.TestRecord.java + // Take the commit time and compare with the one we are interested in + Writable[] values = writable.get(); + String commitTime = values[HOODIE_COMMIT_TIME_COL_POS].toString(); + boolean endCommitTimeMatches = HoodieTimeline.compareTimestamps(endCommitTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, commitTime); + boolean beginCommitTimeMatches = !beginCommitTime.isPresent() || HoodieTimeline.compareTimestamps(beginCommitTime.get(), HoodieTimeline.LESSER_THAN, commitTime); + assertTrue(beginCommitTimeMatches && endCommitTimeMatches); + actualCount++; + } + } + assertEquals(expectedNumberOfRecordsInCommit, actualCount, msg); + } + + private void ensureRecordsInCommitRange(InputSplit split, String beginCommitTime, String endCommitTime, int totalExpected) + throws IOException { + int actualCount = 0; + assertTrue(split instanceof HoodieRealtimeFileSplit); + RecordReader recordReader = inputFormat.getRecordReader(split, jobConf, null); + NullWritable key = recordReader.createKey(); + ArrayWritable writable = recordReader.createValue(); + + while (recordReader.next(key, writable)) { + // writable returns an array with schema like org.apache.hudi.common.util.TestRecord.java + // Take the commit time and compare with the one we are interested in + Writable[] values = writable.get(); + String commitTime = values[HOODIE_COMMIT_TIME_COL_POS].toString(); + + // The begin and end commits are inclusive + boolean endCommitTimeMatches = HoodieTimeline.compareTimestamps(endCommitTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, commitTime); + boolean beginCommitTimeMatches = HoodieTimeline.compareTimestamps(beginCommitTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, commitTime); + assertTrue(beginCommitTimeMatches && endCommitTimeMatches); + actualCount++; + } + assertEquals(totalExpected, actualCount, "Expected num of records(" + totalExpected + ") do not match " + + "actual number of records(" + actualCount + ") for the split: " + ((HoodieRealtimeFileSplit)split).toString()); + } + + @Test // Simple querying on MOR table. Does not test incremental querying. + public void testInputFormatLoadAndUpdate() throws IOException, InterruptedException { + prepareTestData(); + + InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 1); + assertEquals(1, inputSplits.length); + + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals(1, files.length); + + /* + * Since the begin and end commits are included the expected number of records is same as numRecordsPerBaseFile + * as the records in log files are really updates for existing keys and are merged by the RecordReader + */ + ensureRecordsInCommitRange(inputSplits[0], baseInstant2, deltaCommitTime5, numRecordsPerBaseFile); + } + + @Test + public void testIncrementalWithNoNewCommits() throws IOException, InterruptedException { + prepareTestData(); + InputFormatTestUtil.setupIncremental(jobConf, deltaCommitTime5, 1); + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals(0, files.length, "We should exclude commit 202 when returning incremental pull with start commit time as 202"); + } + + @Test + public void testChangesSinceLastCommit() throws IOException, InterruptedException { + prepareTestData(); + + // query for latest changes since (deltaCommitTime4,] (meaning > deltaCommitTime4 and <= deltaCommitTime5) + InputFormatTestUtil.setupIncremental(jobConf, deltaCommitTime4, 1); + + /* + * Since the begin commit is not included in incremental query, here the expected output is the num of records that + * were updated in the one log file belonging to deltaCommitTime5 + */ + testIncrementalQuery("Number of actual records from Incremental querying does not match expected: " + numberOfUniqueLogRecords, + deltaCommitTime5, Option.of(deltaCommitTime4), numberOfUniqueLogRecords); + } + + @Test + public void testChangesHappenedInTimePeriod() throws IOException, InterruptedException { + prepareTestData(); + + // query for changes in the period (baseInstant1,deltaCommitTime2] (meaning > baseInstant1 and <= deltaCommitTime2) + InputFormatTestUtil.setupIncremental(jobConf, baseInstant1, 2); + /* + * Since the begin commit is not included in incremental query, here the expected output is the num of records that + * were updated in the two log files belonging to deltaCommitTime1 and deltaCommitTime2. And we set up test data + * such that the log records belong to unique record keys. This might not be true in real world + */ + testIncrementalQuery("Number of actual records from Incremental querying does not match expected: " + numberOfUniqueLogRecords * 2, + deltaCommitTime2, Option.of(baseInstant1), numberOfUniqueLogRecords * 2); + + // query for changes in the period (deltaCommitTime1,deltaCommitTime4] meaning > deltaCommitTime1 and <= deltaCommitTime4 + InputFormatTestUtil.setupIncremental(jobConf, deltaCommitTime1, 4); + /* + * Here the expected output is the num of records that matches the filter in first and second file slices. From + * first file slice records updated as part of deltaCommitTime2 and deltaCommitTime3 are considered + * (this would be 2 * 25 = 50 unique updates). From second file slice base file (75 records) and updates belonging to + * deltaCommitTime4 (25 updates to the baseInstant 2) would be considered. In total expected match would be 75 since + * the base file is the superset of all unique record keys and others are all updates. + */ + testIncrementalQuery("Number of actual records from Incremental querying does not match expected: " + numRecordsPerBaseFile, + deltaCommitTime4, Option.of(deltaCommitTime1), numRecordsPerBaseFile); + } +}