From 2182f542acd4ae3d1fd239e6441e9761eb2d33a1 Mon Sep 17 00:00:00 2001 From: xiarixiaoyao Date: Thu, 1 Jul 2021 09:59:16 +0800 Subject: [PATCH] [HUDI-2086]redo the logical of mor_incremental_view for hive --- .../hudi/hadoop/BaseFileWithLogsSplit.java | 118 ++++++++ .../hudi/hadoop/HoodieParquetInputFormat.java | 12 +- .../hudi/hadoop/PathWithLogFilePath.java | 106 +++++++ .../hudi/hadoop/RealtimeFileStatus.java | 103 +++++++ .../realtime/HoodieEmptyRecordReader.java | 68 +++++ .../HoodieParquetRealtimeInputFormat.java | 205 ++++++++++++- .../hadoop/utils/HoodieInputFormatUtils.java | 49 +++- .../utils/HoodieRealtimeInputFormatUtils.java | 107 +++++-- .../TestHoodieRealtimeRecordReader.java | 269 +++++++++++++++++- 9 files changed, 1005 insertions(+), 32 deletions(-) create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java create mode 100644 hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieEmptyRecordReader.java diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java new file mode 100644 index 000000000000..34e0a392bd9d --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BaseFileWithLogsSplit.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Encode additional information in split to track matching log file and base files. + * Hence, this class tracks a log/base file split. + */ +public class BaseFileWithLogsSplit extends FileSplit { + // a flag to mark this split is produced by incremental query or not. + private boolean belongToIncrementalSplit = false; + // the log file paths of this split. + private List deltaLogPaths = new ArrayList<>(); + // max commit time of current split. + private String maxCommitTime = ""; + // the basePath of current hoodie table. + private String basePath = ""; + // the base file belong to this split. + private String baseFilePath = ""; + + public BaseFileWithLogsSplit(Path file, long start, long length, String[] hosts) { + super(file, start, length, hosts); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeBoolean(belongToIncrementalSplit); + Text.writeString(out, maxCommitTime); + Text.writeString(out, basePath); + Text.writeString(out, baseFilePath); + out.writeInt(deltaLogPaths.size()); + for (String logPath : deltaLogPaths) { + Text.writeString(out, logPath); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + belongToIncrementalSplit = in.readBoolean(); + maxCommitTime = Text.readString(in); + basePath = Text.readString(in); + baseFilePath = Text.readString(in); + int deltaLogSize = in.readInt(); + List tempDeltaLogs = new ArrayList<>(); + for (int i = 0; i < deltaLogSize; i++) { + tempDeltaLogs.add(Text.readString(in)); + } + deltaLogPaths = tempDeltaLogs; + } + + public boolean getBelongToIncrementalSplit() { + return belongToIncrementalSplit; + } + + public void setBelongToIncrementalSplit(boolean belongToIncrementalSplit) { + this.belongToIncrementalSplit = belongToIncrementalSplit; + } + + public List getDeltaLogPaths() { + return deltaLogPaths; + } + + public void setDeltaLogPaths(List deltaLogPaths) { + this.deltaLogPaths = deltaLogPaths; + } + + public String getMaxCommitTime() { + return maxCommitTime; + } + + public void setMaxCommitTime(String maxCommitTime) { + this.maxCommitTime = maxCommitTime; + } + + public String getBasePath() { + return basePath; + } + + public void setBasePath(String basePath) { + this.basePath = basePath; + } + + public String getBaseFilePath() { + return baseFilePath; + } + + public void setBaseFilePath(String baseFilePath) { + this.baseFilePath = baseFilePath; + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index 0288cbd4ebc2..810e6ecb413f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -73,6 +73,14 @@ protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline tim return HoodieInputFormatUtils.filterInstantsTimeline(timeline); } + protected FileStatus[] getStatus(JobConf job) throws IOException { + return super.listStatus(job); + } + + protected boolean includeLogFilesForSnapShotView() { + return false; + } + @Override public FileStatus[] listStatus(JobConf job) throws IOException { // Segregate inputPaths[] to incremental, snapshot and non hoodie paths @@ -108,7 +116,7 @@ public FileStatus[] listStatus(JobConf job) throws IOException { // process snapshot queries next. List snapshotPaths = inputPathHandler.getSnapshotPaths(); if (snapshotPaths.size() > 0) { - returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths)); + returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapShotView())); } return returns.toArray(new FileStatus[0]); } @@ -120,7 +128,7 @@ public FileStatus[] listStatus(JobConf job) throws IOException { * partitions and then filtering based on the commits of interest, this logic first extracts the * partitions touched by the desired commits and then lists only those partitions. */ - private List listStatusForIncrementalMode( + protected List listStatusForIncrementalMode( JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths) throws IOException { String tableName = tableMetaClient.getTableConfig().getTableName(); Job jobContext = Job.getInstance(job); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java new file mode 100644 index 000000000000..5b4e535e62d1 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hadoop.fs.Path; + +import java.util.ArrayList; +import java.util.List; + +/** + * Encode additional information in Path to track matching log file and base files. + * Hence, this class tracks a log/base file status. + */ +public class PathWithLogFilePath extends Path { + // a flag to mark this split is produced by incremental query or not. + private boolean belongToIncrementalPath = false; + // the log files belong this path. + private List deltaLogPaths = new ArrayList<>(); + // max commit time of current path. + private String maxCommitTime = ""; + // the basePath of current hoodie table. + private String basePath = ""; + // the base file belong to this path; + private String baseFilePath = ""; + // the bootstrap file belong to this path. + // only if current query table is bootstrap table, this field is used. + private PathWithBootstrapFileStatus pathWithBootstrapFileStatus; + + public PathWithLogFilePath(Path parent, String child) { + super(parent, child); + } + + public void setBelongToIncrementalPath(boolean belongToIncrementalPath) { + this.belongToIncrementalPath = belongToIncrementalPath; + } + + public List getDeltaLogPaths() { + return deltaLogPaths; + } + + public void setDeltaLogPaths(List deltaLogPaths) { + this.deltaLogPaths = deltaLogPaths; + } + + public String getMaxCommitTime() { + return maxCommitTime; + } + + public void setMaxCommitTime(String maxCommitTime) { + this.maxCommitTime = maxCommitTime; + } + + public String getBasePath() { + return basePath; + } + + public void setBasePath(String basePath) { + this.basePath = basePath; + } + + public void setBaseFilePath(String baseFilePath) { + this.baseFilePath = baseFilePath; + } + + public boolean splitable() { + return !baseFilePath.isEmpty(); + } + + public PathWithBootstrapFileStatus getPathWithBootstrapFileStatus() { + return pathWithBootstrapFileStatus; + } + + public void setPathWithBootstrapFileStatus(PathWithBootstrapFileStatus pathWithBootstrapFileStatus) { + this.pathWithBootstrapFileStatus = pathWithBootstrapFileStatus; + } + + public boolean includeBootstrapFilePath() { + return pathWithBootstrapFileStatus != null; + } + + public BaseFileWithLogsSplit buildSplit(Path file, long start, long length, String[] hosts) { + BaseFileWithLogsSplit bs = new BaseFileWithLogsSplit(file, start, length, hosts); + bs.setBelongToIncrementalSplit(belongToIncrementalPath); + bs.setDeltaLogPaths(deltaLogPaths); + bs.setMaxCommitTime(maxCommitTime); + bs.setBasePath(basePath); + bs.setBaseFilePath(baseFilePath); + return bs; + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java new file mode 100644 index 000000000000..542720b4919b --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RealtimeFileStatus.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * With the base input format implementations in Hadoop/Hive, + * we need to encode additional information in Path to track base files and logs files for realtime read. + * Hence, this class tracks a log/base file status + * in Path. + */ +public class RealtimeFileStatus extends FileStatus { + // a flag to mark this split is produced by incremental query or not. + private boolean belongToIncrementalFileStatus = false; + // the log files belong this fileStatus. + private List deltaLogPaths = new ArrayList<>(); + // max commit time of current fileStatus. + private String maxCommitTime = ""; + // the basePath of current hoodie table. + private String basePath = ""; + // the base file belong to this status; + private String baseFilePath = ""; + // the bootstrap file belong to this status. + // only if current query table is bootstrap table, this field is used. + private FileStatus bootStrapFileStatus; + + public RealtimeFileStatus(FileStatus fileStatus) throws IOException { + super(fileStatus); + } + + @Override + public Path getPath() { + Path path = super.getPath(); + PathWithLogFilePath pathWithLogFilePath = new PathWithLogFilePath(path.getParent(), path.getName()); + pathWithLogFilePath.setBelongToIncrementalPath(belongToIncrementalFileStatus); + pathWithLogFilePath.setDeltaLogPaths(deltaLogPaths); + pathWithLogFilePath.setMaxCommitTime(maxCommitTime); + pathWithLogFilePath.setBasePath(basePath); + pathWithLogFilePath.setBaseFilePath(baseFilePath); + if (bootStrapFileStatus != null) { + pathWithLogFilePath.setPathWithBootstrapFileStatus((PathWithBootstrapFileStatus)bootStrapFileStatus.getPath()); + } + return pathWithLogFilePath; + } + + public void setBelongToIncrementalFileStatus(boolean belongToIncrementalFileStatus) { + this.belongToIncrementalFileStatus = belongToIncrementalFileStatus; + } + + public List getDeltaLogPaths() { + return deltaLogPaths; + } + + public void setDeltaLogPaths(List deltaLogPaths) { + this.deltaLogPaths = deltaLogPaths; + } + + public String getMaxCommitTime() { + return maxCommitTime; + } + + public void setMaxCommitTime(String maxCommitTime) { + this.maxCommitTime = maxCommitTime; + } + + public String getBasePath() { + return basePath; + } + + public void setBasePath(String basePath) { + this.basePath = basePath; + } + + public void setBaseFilePath(String baseFilePath) { + this.baseFilePath = baseFilePath; + } + + public void setBootStrapFileStatus(FileStatus bootStrapFileStatus) { + this.bootStrapFileStatus = bootStrapFileStatus; + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieEmptyRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieEmptyRecordReader.java new file mode 100644 index 000000000000..d995e44cd16c --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieEmptyRecordReader.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.realtime; + +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; + +import java.io.IOException; + +/** + * Dummy record for log only realtime split. + */ +public class HoodieEmptyRecordReader extends AbstractRealtimeRecordReader + implements RecordReader { + + public HoodieEmptyRecordReader(RealtimeSplit split, JobConf job) { + super(split, job); + } + + @Override + public boolean next(NullWritable nullWritable, ArrayWritable arrayWritable) throws IOException { + return false; + } + + @Override + public NullWritable createKey() { + return null; + } + + @Override + public ArrayWritable createValue() { + return new ArrayWritable(Writable.class, new Writable[getHiveSchema().getFields().size()]); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + + } + + @Override + public float getProgress() throws IOException { + return 0; + } +} diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index 028641c62b96..553bfadfbec1 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -18,14 +18,32 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.BootstrapBaseFileSplit; +import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile; +import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile; +import org.apache.hudi.hadoop.RealtimeFileStatus; +import org.apache.hudi.hadoop.PathWithLogFilePath; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat; import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -41,8 +59,13 @@ import org.apache.log4j.Logger; import java.io.IOException; + +import java.util.ArrayList; import java.util.Arrays; -import java.util.stream.Stream; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.stream.Collectors; /** * Input Format, that provides a real-time view of data in a Hoodie table. @@ -61,9 +84,180 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - Stream fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is); + List fileSplits = Arrays.stream(super.getSplits(job, numSplits)).map(is -> (FileSplit) is).collect(Collectors.toList()); + + boolean isIncrementalSplits = HoodieRealtimeInputFormatUtils.isIncrementalQuerySplits(fileSplits); + + return isIncrementalSplits ? HoodieRealtimeInputFormatUtils.getIncrementalRealtimeSplits(job, fileSplits.stream()) : HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits.stream()); + } + + /** + * Keep the logic of mor_incr_view as same as spark datasource. + * Step1: Get list of commits to be fetched based on start commit and max commits(for snapshot max commits is -1). + * Step2: Get list of affected files status for these affected file status. + * Step3: Construct HoodieTableFileSystemView based on those affected file status. + * a. Filter affected partitions based on inputPaths. + * b. Get list of fileGroups based on affected partitions by fsView.getAllFileGroups. + * Step4: Set input paths based on filtered affected partition paths. changes that amony original input paths passed to + * this method. some partitions did not have commits as part of the trimmed down list of commits and hence we need this step. + * Step5: Find candidate fileStatus, since when we get baseFileStatus from HoodieTableFileSystemView, + * the BaseFileStatus will missing file size information. + * We should use candidate fileStatus to update the size information for BaseFileStatus. + * Step6: For every file group from step3(b) + * Get 1st available base file from all file slices. then we use candidate file status to update the baseFileStatus, + * and construct RealTimeFileStatus and add it to result along with log files. + * If file group just has log files, construct RealTimeFileStatus and add it to result. + * TODO: unify the incremental view code between hive/spark-sql and spark datasource + */ + @Override + protected List listStatusForIncrementalMode( + JobConf job, HoodieTableMetaClient tableMetaClient, List inputPaths) throws IOException { + List result = new ArrayList<>(); + String tableName = tableMetaClient.getTableConfig().getTableName(); + Job jobContext = Job.getInstance(job); + + // step1 + Option timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); + if (!timeline.isPresent()) { + return result; + } + HoodieTimeline commitsTimelineToReturn = HoodieInputFormatUtils.getHoodieTimelineForIncrementalQuery(jobContext, tableName, timeline.get()); + Option> commitsToCheck = Option.of(commitsTimelineToReturn.getInstants().collect(Collectors.toList())); + if (!commitsToCheck.isPresent()) { + return result; + } + // step2 + commitsToCheck.get().sort(HoodieInstant::compareTo); + List metadataList = commitsToCheck + .get().stream().map(instant -> { + try { + return HoodieInputFormatUtils.getCommitMetadata(instant, commitsTimelineToReturn); + } catch (IOException e) { + throw new HoodieException(String.format("cannot get metadata for instant: %s", instant)); + } + }).collect(Collectors.toList()); + + // build fileGroup from fsView + List affectedFileStatus = Arrays.asList(HoodieInputFormatUtils + .listAffectedFilesForCommits(new Path(tableMetaClient.getBasePath()), metadataList)); + // step3 + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0])); + // build fileGroup from fsView + Path basePath = new Path(tableMetaClient.getBasePath()); + // filter affectedPartition by inputPaths + List affectedPartition = HoodieInputFormatUtils.getWritePartitionPaths(metadataList).stream() + .filter(k -> k.isEmpty() ? inputPaths.contains(basePath) : inputPaths.contains(new Path(basePath, k))).collect(Collectors.toList()); + if (affectedPartition.isEmpty()) { + return result; + } + List fileGroups = affectedPartition.stream() + .flatMap(partitionPath -> fsView.getAllFileGroups(partitionPath)).collect(Collectors.toList()); + // step4 + setInputPaths(job, affectedPartition.stream() + .map(p -> p.isEmpty() ? basePath.toString() : new Path(basePath, p).toString()).collect(Collectors.joining(","))); + + // step5 + // find all file status in partitionPaths. + FileStatus[] fileStatuses = getStatus(job); + Map candidateFileStatus = new HashMap<>(); + for (int i = 0; i < fileStatuses.length; i++) { + String key = fileStatuses[i].getPath().toString(); + candidateFileStatus.put(key, fileStatuses[i]); + } + + String maxCommitTime = fsView.getLastInstant().get().getTimestamp(); + // step6 + result.addAll(collectAllIncrementalFiles(fileGroups, maxCommitTime, basePath.toString(), candidateFileStatus)); + return result; + } + + private List collectAllIncrementalFiles(List fileGroups, String maxCommitTime, String basePath, Map candidateFileStatus) { + List result = new ArrayList<>(); + fileGroups.stream().forEach(f -> { + try { + List baseFiles = f.getAllFileSlices().filter(slice -> slice.getBaseFile().isPresent()).collect(Collectors.toList()); + if (!baseFiles.isEmpty()) { + FileStatus baseFileStatus = HoodieInputFormatUtils.getFileStatus(baseFiles.get(0).getBaseFile().get()); + String baseFilePath = baseFileStatus.getPath().toUri().toString(); + if (!candidateFileStatus.containsKey(baseFilePath)) { + throw new HoodieException("Error obtaining fileStatus for file: " + baseFilePath); + } + // We cannot use baseFileStatus.getPath() here, since baseFileStatus.getPath() missing file size information. + // So we use candidateFileStatus.get(baseFileStatus.getPath()) to get a correct path. + RealtimeFileStatus fileStatus = new RealtimeFileStatus(candidateFileStatus.get(baseFilePath)); + fileStatus.setMaxCommitTime(maxCommitTime); + fileStatus.setBelongToIncrementalFileStatus(true); + fileStatus.setBasePath(basePath); + fileStatus.setBaseFilePath(baseFilePath); + fileStatus.setDeltaLogPaths(f.getLatestFileSlice().get().getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList())); + // try to set bootstrapfileStatus + if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) { + fileStatus.setBootStrapFileStatus(baseFileStatus); + } + result.add(fileStatus); + } + // add file group which has only logs. + if (f.getLatestFileSlice().isPresent() && baseFiles.isEmpty()) { + List logFileStatus = f.getLatestFileSlice().get().getLogFiles().map(logFile -> logFile.getFileStatus()).collect(Collectors.toList()); + if (logFileStatus.size() > 0) { + RealtimeFileStatus fileStatus = new RealtimeFileStatus(logFileStatus.get(0)); + fileStatus.setBelongToIncrementalFileStatus(true); + fileStatus.setDeltaLogPaths(logFileStatus.stream().map(l -> l.getPath().toString()).collect(Collectors.toList())); + fileStatus.setMaxCommitTime(maxCommitTime); + fileStatus.setBasePath(basePath); + result.add(fileStatus); + } + } + } catch (IOException e) { + throw new HoodieException("Error obtaining data file/log file grouping ", e); + } + }); + return result; + } + + @Override + protected boolean includeLogFilesForSnapShotView() { + return true; + } - return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits); + @Override + protected boolean isSplitable(FileSystem fs, Path filename) { + if (filename instanceof PathWithLogFilePath) { + return ((PathWithLogFilePath)filename).splitable(); + } + return super.isSplitable(fs, filename); + } + + // make split for path. + // When query the incremental view, the read files may be bootstrap files, we wrap those bootstrap files into + // PathWithLogFilePath, so those bootstrap files should be processed int this function. + @Override + protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) { + if (file instanceof PathWithLogFilePath) { + return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, null); + } + return super.makeSplit(file, start, length, hosts); + } + + @Override + protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) { + if (file instanceof PathWithLogFilePath) { + return doMakeSplitForPathWithLogFilePath((PathWithLogFilePath) file, start, length, hosts, inMemoryHosts); + } + return super.makeSplit(file, start, length, hosts, inMemoryHosts); + } + + private FileSplit doMakeSplitForPathWithLogFilePath(PathWithLogFilePath path, long start, long length, String[] hosts, String[] inMemoryHosts) { + if (!path.includeBootstrapFilePath()) { + return path.buildSplit(path, start, length, hosts); + } else { + FileSplit bf = + inMemoryHosts == null + ? super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts) + : super.makeSplit(path.getPathWithBootstrapFileStatus(), start, length, hosts, inMemoryHosts); + return HoodieRealtimeInputFormatUtils + .createRealtimeBoostrapBaseFileSplit((BootstrapBaseFileSplit) bf, path.getBasePath(), path.getDeltaLogPaths(), path.getMaxCommitTime()); + } } @Override @@ -119,6 +313,11 @@ public RecordReader getRecordReader(final InputSpli addProjectionToJobConf(realtimeSplit, jobConf); LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); + + // for log only split, set the parquet reader as empty. + if (FSUtils.isLogFile(realtimeSplit.getPath())) { + return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf)); + } return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, super.getRecordReader(split, jobConf, reporter)); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index c5b97f99f83a..25dde840c13b 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -21,10 +21,12 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -35,10 +37,11 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile; import org.apache.hudi.hadoop.HoodieHFileInputFormat; import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.RealtimeFileStatus; import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile; +import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile; import org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; @@ -164,6 +167,10 @@ public static FileInputFormat getInputFormat(String path, boolean realtime, Conf if (extension.equals(HoodieFileFormat.HFILE.getFileExtension())) { return getInputFormat(HoodieFileFormat.HFILE, realtime, conf); } + // now we support read log file, try to find log file + if (FSUtils.isLogFile(new Path(path)) && realtime) { + return getInputFormat(HoodieFileFormat.PARQUET, realtime, conf); + } throw new HoodieIOException("Hoodie InputFormat not implemented for base file of type " + extension); } @@ -280,12 +287,24 @@ public static Option getFilteredCommitsTimeline(Job job, HoodieT * @return */ public static Option> getCommitsForIncrementalQuery(Job job, String tableName, HoodieTimeline timeline) { + return Option.of(getHoodieTimelineForIncrementalQuery(job, tableName, timeline) + .getInstants().collect(Collectors.toList())); + } + + /** + * Get HoodieTimeline for incremental query from Hive map reduce configuration. + * + * @param job + * @param tableName + * @param timeline + * @return + */ + public static HoodieTimeline getHoodieTimelineForIncrementalQuery(Job job, String tableName, HoodieTimeline timeline) { String lastIncrementalTs = HoodieHiveUtils.readStartCommitTime(job, tableName); // Total number of commits to return in this batch. Set this to -1 to get all the commits. Integer maxCommits = HoodieHiveUtils.readMaxCommits(job, tableName); LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs); - return Option.of(timeline.findInstantsAfter(lastIncrementalTs, maxCommits) - .getInstants().collect(Collectors.toList())); + return timeline.findInstantsAfter(lastIncrementalTs, maxCommits); } /** @@ -422,6 +441,11 @@ public static HoodieMetadataConfig buildMetadataConfig(Configuration conf) { public static List filterFileStatusForSnapshotMode(JobConf job, Map tableMetaClientMap, List snapshotPaths) throws IOException { + return filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, false); + } + + public static List filterFileStatusForSnapshotMode(JobConf job, Map tableMetaClientMap, + List snapshotPaths, boolean includeLogFiles) throws IOException { HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job); List returns = new ArrayList<>(); @@ -442,10 +466,21 @@ public static List filterFileStatusForSnapshotMode(JobConf job, Map< HoodieTableFileSystemView fsView = fsViewCache.computeIfAbsent(metaClient, tableMetaClient -> FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, tableMetaClient, buildMetadataConfig(job), timeline)); List filteredBaseFiles = new ArrayList<>(); + Map> filteredLogs = new HashMap<>(); for (Path p : entry.getValue()) { String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), p); List matched = fsView.getLatestBaseFiles(relativePartitionPath).collect(Collectors.toList()); filteredBaseFiles.addAll(matched); + if (includeLogFiles) { + List logMatched = fsView.getLatestFileSlices(relativePartitionPath) + .filter(f -> !f.getBaseFile().isPresent() && f.getLatestLogFile().isPresent()) + .collect(Collectors.toList()); + logMatched.forEach(f -> { + List logPaths = f.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) + .map(log -> log.getPath().toString()).collect(Collectors.toList()); + filteredLogs.put(f.getLatestLogFile().get().getFileStatus(), logPaths); + }); + } } LOG.info("Total paths to process after hoodie filter " + filteredBaseFiles.size()); @@ -456,6 +491,12 @@ public static List filterFileStatusForSnapshotMode(JobConf job, Map< filteredFile = refreshFileStatus(job, filteredFile); returns.add(getFileStatus(filteredFile)); } + + for (Map.Entry> filterLogEntry : filteredLogs.entrySet()) { + RealtimeFileStatus rs = new RealtimeFileStatus(filterLogEntry.getKey()); + rs.setDeltaLogPaths(filterLogEntry.getValue()); + returns.add(rs); + } } } finally { fsViewCache.forEach(((metaClient, fsView) -> fsView.close())); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index f84e3440516d..9140969c6021 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -36,6 +36,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.BaseFileWithLogsSplit; import org.apache.hudi.hadoop.BootstrapBaseFileSplit; import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit; import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo; @@ -85,18 +86,7 @@ public static InputSplit[] getRealtimeSplits(Configuration conf, Stream hoodieVirtualKeyInfo = Option.empty(); if (partitionsToParquetSplits.size() > 0) { HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionsToParquetSplits.keySet().iterator().next()); - HoodieTableConfig tableConfig = metaClient.getTableConfig(); - if (!tableConfig.populateMetaFields()) { - TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); - try { - MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema(); - hoodieVirtualKeyInfo = Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(), - tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()), - parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp()))); - } catch (Exception exception) { - throw new HoodieException("Fetching table schema failed with exception ", exception); - } - } + hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(metaClient); } Option finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo; partitionsToParquetSplits.keySet().forEach(partitionPath -> { @@ -121,27 +111,24 @@ public static InputSplit[] getRealtimeSplits(Configuration conf, Stream> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream() - .collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName()))); + .collect(Collectors.groupingBy(split -> FSUtils.getFileIdFromFilePath(split.getPath()))); // Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table String maxCommitTime = metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); latestFileSlices.forEach(fileSlice -> { - List dataFileSplits = groupedInputSplits.get(fileSlice.getFileId()); + List dataFileSplits = groupedInputSplits.getOrDefault(fileSlice.getFileId(), new ArrayList<>()); dataFileSplits.forEach(split -> { try { List logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()) .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()); if (split instanceof BootstrapBaseFileSplit) { BootstrapBaseFileSplit eSplit = (BootstrapBaseFileSplit) split; - String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) - .filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0]; - String[] inMemoryHosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) - .filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0]; - FileSplit baseSplit = new FileSplit(eSplit.getPath(), eSplit.getStart(), eSplit.getLength(), - hosts, inMemoryHosts); - rtSplits.add(new RealtimeBootstrapBaseFileSplit(baseSplit, metaClient.getBasePath(), - logFilePaths, maxCommitTime, eSplit.getBootstrapFileSplit())); + rtSplits.add(createRealtimeBoostrapBaseFileSplit(eSplit, metaClient.getBasePath(), logFilePaths, maxCommitTime)); + } else if (split instanceof BaseFileWithLogsSplit) { + BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)split; + HoodieRealtimeFileSplit hoodieRealtimeFileSplit = new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo); + rtSplits.add(hoodieRealtimeFileSplit); } else { rtSplits.add(new HoodieRealtimeFileSplit(split, metaClient.getBasePath(), logFilePaths, maxCommitTime, finalHoodieVirtualKeyInfo)); } @@ -161,6 +148,82 @@ public static InputSplit[] getRealtimeSplits(Configuration conf, Stream fileSplits) throws IOException { + List rtSplits = new ArrayList<>(); + List fileSplitList = fileSplits.collect(Collectors.toList()); + Set partitionSet = fileSplitList.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet()); + Map partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet); + // Pre process tableConfig from first partition to fetch virtual key info + Option hoodieVirtualKeyInfo = Option.empty(); + if (partitionSet.size() > 0) { + hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next())); + } + Option finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo; + fileSplitList.stream().forEach(s -> { + // deal with incremental query. + try { + if (s instanceof BaseFileWithLogsSplit) { + BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; + if (bs.getBelongToIncrementalSplit()) { + rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogPaths(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); + } + } else if (s instanceof RealtimeBootstrapBaseFileSplit) { + rtSplits.add(s); + } + } catch (IOException e) { + throw new HoodieIOException("Error creating hoodie real time split ", e); + } + }); + LOG.info("Returning a total splits of " + rtSplits.size()); + return rtSplits.toArray(new InputSplit[0]); + } + + public static Option getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) { + HoodieTableConfig tableConfig = metaClient.getTableConfig(); + if (!tableConfig.populateMetaFields()) { + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); + try { + MessageType parquetSchema = tableSchemaResolver.getTableParquetSchema(); + return Option.of(new HoodieVirtualKeyInfo(tableConfig.getRecordKeyFieldProp(), + tableConfig.getPartitionFieldProp(), parquetSchema.getFieldIndex(tableConfig.getRecordKeyFieldProp()), + parquetSchema.getFieldIndex(tableConfig.getPartitionFieldProp()))); + } catch (Exception exception) { + throw new HoodieException("Fetching table schema failed with exception ", exception); + } + } + return Option.empty(); + } + + public static boolean isIncrementalQuerySplits(List fileSplits) { + if (fileSplits == null || fileSplits.size() == 0) { + return false; + } + return fileSplits.stream().anyMatch(s -> { + if (s instanceof BaseFileWithLogsSplit) { + BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; + return bs.getBelongToIncrementalSplit(); + } else { + return s instanceof RealtimeBootstrapBaseFileSplit; + } + }); + } + + public static RealtimeBootstrapBaseFileSplit createRealtimeBoostrapBaseFileSplit( + BootstrapBaseFileSplit split, String basePath, List deltaLogPaths, String maxInstantTime) { + try { + String[] hosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) + .filter(x -> !x.isInMemory()).toArray(String[]::new) : new String[0]; + String[] inMemoryHosts = split.getLocationInfo() != null ? Arrays.stream(split.getLocationInfo()) + .filter(SplitLocationInfo::isInMemory).toArray(String[]::new) : new String[0]; + FileSplit baseSplit = new FileSplit(split.getPath(), split.getStart(), split.getLength(), + hosts, inMemoryHosts); + return new RealtimeBootstrapBaseFileSplit(baseSplit, basePath, deltaLogPaths, maxInstantTime, split.getBootstrapFileSplit()); + } catch (IOException e) { + throw new HoodieIOException("Error creating hoodie real time split ", e); + } + } + // Return parquet file with a list of log files in the same file group. public static List, List>> groupLogsByBaseFile(Configuration conf, List partitionPaths) { Set partitionSet = new HashSet<>(partitionPaths); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index a647da9b9b99..497ffb315e38 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -18,16 +18,24 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.Reporter; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; @@ -35,8 +43,11 @@ import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; +import org.apache.hudi.hadoop.BaseFileWithLogsSplit; +import org.apache.hudi.hadoop.PathWithLogFilePath; +import org.apache.hudi.hadoop.RealtimeFileStatus; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; +import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; @@ -66,13 +77,17 @@ import org.junit.jupiter.params.provider.MethodSource; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.Map; +import java.util.HashMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -535,4 +550,256 @@ private static Stream testArguments() { arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true) ); } + + @Test + public void testIncrementalWithOnlylog() throws Exception { + // initial commit + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); + HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); + String instantTime = "100"; + final int numRecords = 1000; + File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, numRecords, instantTime, + HoodieTableType.MERGE_ON_READ); + //FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime); + createDeltaCommitFile(basePath, instantTime,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0"); + // Add the paths + FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); + + // insert new records to log file + try { + String newCommitTime = "102"; + HoodieLogFormat.Writer writer = + InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", instantTime, newCommitTime, + numRecords, numRecords, 0); + writer.close(); + createDeltaCommitFile(basePath, newCommitTime,"2016/05/01", "2016/05/01/.fileid0_100.log.1_1-0-1", "fileid0"); + + InputFormatTestUtil.setupIncremental(baseJobConf, "101", 1); + + HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); + inputFormat.setConf(baseJobConf); + InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1); + assertTrue(splits.length == 1); + JobConf newJobConf = new JobConf(baseJobConf); + List fields = schema.getFields(); + setHiveColumnNameProps(fields, newJobConf, false); + RecordReader reader = inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL); + // use reader to read log file. + NullWritable key = reader.createKey(); + ArrayWritable value = reader.createValue(); + while (reader.next(key, value)) { + Writable[] values = value.get(); + // since we set incremental start commit as 101 and commit_number as 1. + // the data belong to commit 102 should be read out. + assertEquals(newCommitTime, values[0].toString()); + key = reader.createKey(); + value = reader.createValue(); + } + reader.close(); + } catch (IOException e) { + throw new HoodieException(e.getMessage(), e); + } + } + + @Test + public void testIncrementalWithReplace() throws Exception { + // initial commit + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); + HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); + String baseInstant = "100"; + File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 100, baseInstant, + HoodieTableType.MERGE_ON_READ); + //FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime); + createDeltaCommitFile(basePath, baseInstant,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0"); + // Add the paths + FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); + + InputFormatTestUtil.simulateInserts(partitionDir, ".parquet", "fileid1", 1, "200"); + Map> partitionToReplaceFileIds = new HashMap<>(); + List replacedFileId = new ArrayList<>(); + replacedFileId.add("fileid0"); + partitionToReplaceFileIds.put("2016/05/01", replacedFileId); + createReplaceCommitFile(basePath, + "200","2016/05/01", "2016/05/01/fileid10_1-0-1_200.parquet", "fileid10", partitionToReplaceFileIds); + + InputFormatTestUtil.setupIncremental(baseJobConf, "0", 1); + + HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); + inputFormat.setConf(baseJobConf); + InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1); + assertTrue(splits.length == 1); + JobConf newJobConf = new JobConf(baseJobConf); + List fields = schema.getFields(); + setHiveColumnNameProps(fields, newJobConf, false); + newJobConf.set("columns.types", "string,string,string,string,string,string,string,string,bigint,string,string"); + RecordReader reader = inputFormat.getRecordReader(splits[0], newJobConf, Reporter.NULL); + + // use reader to read log file. + NullWritable key = reader.createKey(); + ArrayWritable value = reader.createValue(); + while (reader.next(key, value)) { + Writable[] values = value.get(); + // since we set incremental start commit as 0 and commit_number as 1. + // the data belong to commit 100 should be read out. + assertEquals("100", values[0].toString()); + key = reader.createKey(); + value = reader.createValue(); + } + reader.close(); + } + + private void createReplaceCommitFile( + java.nio.file.Path basePath, + String commitNumber, + String partitionPath, + String filePath, + String fileId, + Map> partitionToReplaceFileIds) throws IOException { + List writeStats = new ArrayList<>(); + HoodieWriteStat writeStat = createHoodieWriteStat(basePath, commitNumber, partitionPath, filePath, fileId); + writeStats.add(writeStat); + HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); + replaceMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds); + writeStats.forEach(stat -> replaceMetadata.addWriteStat(partitionPath, stat)); + File file = basePath.resolve(".hoodie").resolve(commitNumber + ".replacecommit").toFile(); + file.createNewFile(); + FileOutputStream fileOutputStream = new FileOutputStream(file); + fileOutputStream.write(replaceMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)); + fileOutputStream.flush(); + fileOutputStream.close(); + } + + private HoodieWriteStat createHoodieWriteStat(java.nio.file.Path basePath, String commitNumber, String partitionPath, String filePath, String fileId) { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId(fileId); + writeStat.setNumDeletes(0); + writeStat.setNumUpdateWrites(100); + writeStat.setNumWrites(100); + writeStat.setPath(filePath); + writeStat.setPartitionPath(partitionPath); + writeStat.setTotalLogFilesCompacted(100L); + HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats(); + runtimeStats.setTotalScanTime(100); + runtimeStats.setTotalCreateTime(100); + runtimeStats.setTotalUpsertTime(100); + writeStat.setRuntimeStats(runtimeStats); + return writeStat; + } + + private void createDeltaCommitFile( + java.nio.file.Path basePath, + String commitNumber, + String partitionPath, + String filePath, + String fileId) throws IOException { + List writeStats = new ArrayList<>(); + HoodieWriteStat writeStat = createHoodieWriteStat(basePath, commitNumber, partitionPath, filePath, fileId); + writeStats.add(writeStat); + + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + writeStats.forEach(stat -> commitMetadata.addWriteStat(partitionPath, stat)); + File file = basePath.resolve(".hoodie").resolve(commitNumber + ".deltacommit").toFile(); + file.createNewFile(); + FileOutputStream fileOutputStream = new FileOutputStream(file); + fileOutputStream.write(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)); + fileOutputStream.flush(); + fileOutputStream.close(); + } + + @Test + public void testLogOnlyReader() throws Exception { + // initial commit + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); + HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); + String baseInstant = "100"; + File partitionDir = InputFormatTestUtil.prepareNonPartitionedParquetTable(basePath, schema, 1, 100, baseInstant, + HoodieTableType.MERGE_ON_READ); + FileCreateUtils.createDeltaCommit(basePath.toString(), baseInstant); + // Add the paths + FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); + + FileSlice fileSlice = new FileSlice("default", baseInstant, "fileid1"); + try { + // update files or generate new log file + int logVersion = 1; + int baseInstantTs = Integer.parseInt(baseInstant); + String instantTime = String.valueOf(baseInstantTs + logVersion); + HoodieLogFormat.Writer writer = InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid1", baseInstant, + instantTime, 100, 0, logVersion); + long size = writer.getCurrentSize(); + writer.close(); + assertTrue(size > 0, "block - size should be > 0"); + FileCreateUtils.createDeltaCommit(basePath.toString(), instantTime); + // create a split with new log file(s) + fileSlice.addLogFile(writer.getLogFile()); + RealtimeFileStatus realtimeFileStatus = new RealtimeFileStatus(new FileStatus(0, false, 1, 1, 0, writer.getLogFile().getPath())); + realtimeFileStatus.setMaxCommitTime(instantTime); + realtimeFileStatus.setBasePath(basePath.toString()); + realtimeFileStatus.setDeltaLogPaths(fileSlice.getLogFiles().map(l -> l.getPath().toString()).collect(Collectors.toList())); + PathWithLogFilePath pathWithLogFileStatus = (PathWithLogFilePath) realtimeFileStatus.getPath(); + BaseFileWithLogsSplit bs = pathWithLogFileStatus.buildSplit(pathWithLogFileStatus, 0, 0, new String[] {""}); + HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogPaths(), bs.getMaxCommitTime(), Option.empty()); + + JobConf newJobConf = new JobConf(baseJobConf); + List fields = schema.getFields(); + setHiveColumnNameProps(fields, newJobConf, false); + // create a dummy RecordReader to be used by HoodieRealtimeRecordReader + RecordReader reader = new HoodieRealtimeRecordReader(split, newJobConf, new HoodieEmptyRecordReader(split, newJobConf)); + // use reader to read log file. + NullWritable key = reader.createKey(); + ArrayWritable value = reader.createValue(); + while (reader.next(key, value)) { + Writable[] values = value.get(); + assertEquals(instantTime, values[0].toString()); + key = reader.createKey(); + value = reader.createValue(); + } + reader.close(); + } catch (Exception e) { + throw new HoodieException(e.getMessage(), e); + } + } + + @Test + public void testIncrementalWithCompaction() throws Exception { + // initial commit + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); + HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); + String baseInstant = "100"; + File partitionDir = InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, 100, baseInstant, + HoodieTableType.MERGE_ON_READ); + createDeltaCommitFile(basePath, baseInstant,"2016/05/01", "2016/05/01/fileid0_1-0-1_100.parquet", "fileid0"); + // Add the paths + FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath()); + + createCompactionFile(basePath, "125"); + + // add inserts after compaction timestamp + InputFormatTestUtil.simulateInserts(partitionDir, ".parquet", "fileId2", 5, "200"); + InputFormatTestUtil.commit(basePath, "200"); + + InputFormatTestUtil.setupIncremental(baseJobConf, "100", 10); + + // verify that incremental reads do NOT show inserts after compaction timestamp + HoodieParquetRealtimeInputFormat inputFormat = new HoodieParquetRealtimeInputFormat(); + inputFormat.setConf(baseJobConf); + InputSplit[] splits = inputFormat.getSplits(baseJobConf, 1); + assertTrue(splits.length == 0); + } + + private File createCompactionFile(java.nio.file.Path basePath, String commitTime) + throws IOException { + File file = basePath.resolve(".hoodie") + .resolve(HoodieTimeline.makeRequestedCompactionFileName(commitTime)).toFile(); + assertTrue(file.createNewFile()); + FileOutputStream os = new FileOutputStream(file); + try { + HoodieCompactionPlan compactionPlan = HoodieCompactionPlan.newBuilder().setVersion(2).build(); + // Write empty commit metadata + os.write(TimelineMetadataUtils.serializeCompactionPlan(compactionPlan).get()); + return file; + } finally { + os.close(); + } + } }