-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-2086] Refactor hive mor_incremental_view #3203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,118 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hudi.hadoop; | ||
|
|
||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.hadoop.io.Text; | ||
| import org.apache.hadoop.mapred.FileSplit; | ||
|
|
||
| import java.io.DataInput; | ||
| import java.io.DataOutput; | ||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
|
|
||
| /** | ||
| * Encode additional information in split to track matching log file and base files. | ||
| * Hence, this class tracks a log/base file split. | ||
| */ | ||
| public class BaseFileWithLogsSplit extends FileSplit { | ||
| // a flag to mark this split is produced by incremental query or not. | ||
| private boolean belongToIncrementalSplit = false; | ||
|
||
| // the log file paths of this split. | ||
| private List<String> deltaLogPaths = new ArrayList<>(); | ||
|
||
| // max commit time of current split. | ||
| private String maxCommitTime = ""; | ||
| // the basePath of current hoodie table. | ||
| private String basePath = ""; | ||
| // the base file belong to this split. | ||
| private String baseFilePath = ""; | ||
|
||
|
|
||
| public BaseFileWithLogsSplit(Path file, long start, long length, String[] hosts) { | ||
| super(file, start, length, hosts); | ||
| } | ||
|
|
||
| @Override | ||
| public void write(DataOutput out) throws IOException { | ||
| super.write(out); | ||
| out.writeBoolean(belongToIncrementalSplit); | ||
| Text.writeString(out, maxCommitTime); | ||
| Text.writeString(out, basePath); | ||
| Text.writeString(out, baseFilePath); | ||
| out.writeInt(deltaLogPaths.size()); | ||
| for (String logPath : deltaLogPaths) { | ||
| Text.writeString(out, logPath); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void readFields(DataInput in) throws IOException { | ||
| super.readFields(in); | ||
| belongToIncrementalSplit = in.readBoolean(); | ||
| maxCommitTime = Text.readString(in); | ||
| basePath = Text.readString(in); | ||
| baseFilePath = Text.readString(in); | ||
| int deltaLogSize = in.readInt(); | ||
| List<String> tempDeltaLogs = new ArrayList<>(); | ||
| for (int i = 0; i < deltaLogSize; i++) { | ||
| tempDeltaLogs.add(Text.readString(in)); | ||
| } | ||
| deltaLogPaths = tempDeltaLogs; | ||
| } | ||
|
|
||
| public boolean getBelongToIncrementalSplit() { | ||
| return belongToIncrementalSplit; | ||
| } | ||
|
|
||
| public void setBelongToIncrementalSplit(boolean belongToIncrementalSplit) { | ||
| this.belongToIncrementalSplit = belongToIncrementalSplit; | ||
| } | ||
|
|
||
| public List<String> getDeltaLogPaths() { | ||
| return deltaLogPaths; | ||
| } | ||
|
|
||
| public void setDeltaLogPaths(List<String> deltaLogPaths) { | ||
| this.deltaLogPaths = deltaLogPaths; | ||
| } | ||
|
|
||
| public String getMaxCommitTime() { | ||
| return maxCommitTime; | ||
| } | ||
|
|
||
| public void setMaxCommitTime(String maxCommitTime) { | ||
| this.maxCommitTime = maxCommitTime; | ||
| } | ||
|
|
||
| public String getBasePath() { | ||
| return basePath; | ||
| } | ||
|
|
||
| public void setBasePath(String basePath) { | ||
| this.basePath = basePath; | ||
| } | ||
|
|
||
| public String getBaseFilePath() { | ||
| return baseFilePath; | ||
| } | ||
|
|
||
| public void setBaseFilePath(String baseFilePath) { | ||
| this.baseFilePath = baseFilePath; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -73,6 +73,14 @@ protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline tim | |
| return HoodieInputFormatUtils.filterInstantsTimeline(timeline); | ||
| } | ||
|
|
||
| protected FileStatus[] getStatus(JobConf job) throws IOException { | ||
|
||
| return super.listStatus(job); | ||
| } | ||
|
|
||
| protected boolean includeLogFilesForSnapShotView() { | ||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public FileStatus[] listStatus(JobConf job) throws IOException { | ||
| // Segregate inputPaths[] to incremental, snapshot and non hoodie paths | ||
|
|
@@ -108,7 +116,7 @@ public FileStatus[] listStatus(JobConf job) throws IOException { | |
| // process snapshot queries next. | ||
| List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths(); | ||
| if (snapshotPaths.size() > 0) { | ||
| returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths)); | ||
| returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapShotView())); | ||
| } | ||
| return returns.toArray(new FileStatus[0]); | ||
| } | ||
|
|
@@ -120,7 +128,7 @@ public FileStatus[] listStatus(JobConf job) throws IOException { | |
| * partitions and then filtering based on the commits of interest, this logic first extracts the | ||
| * partitions touched by the desired commits and then lists only those partitions. | ||
| */ | ||
| private List<FileStatus> listStatusForIncrementalMode( | ||
| protected List<FileStatus> listStatusForIncrementalMode( | ||
| JobConf job, HoodieTableMetaClient tableMetaClient, List<Path> inputPaths) throws IOException { | ||
| String tableName = tableMetaClient.getTableConfig().getTableName(); | ||
| Job jobContext = Job.getInstance(job); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hudi.hadoop; | ||
|
|
||
| import org.apache.hadoop.fs.Path; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
|
|
||
| /** | ||
| * Encode additional information in Path to track matching log file and base files. | ||
| * Hence, this class tracks a log/base file status. | ||
| */ | ||
| public class PathWithLogFilePath extends Path { | ||
| // a flag to mark this split is produced by incremental query or not. | ||
| private boolean belongToIncrementalPath = false; | ||
| // the log files belong this path. | ||
| private List<String> deltaLogPaths = new ArrayList<>(); | ||
vinothchandar marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // max commit time of current path. | ||
| private String maxCommitTime = ""; | ||
| // the basePath of current hoodie table. | ||
| private String basePath = ""; | ||
| // the base file belong to this path; | ||
| private String baseFilePath = ""; | ||
|
||
| // the bootstrap file belong to this path. | ||
| // only if current query table is bootstrap table, this field is used. | ||
| private PathWithBootstrapFileStatus pathWithBootstrapFileStatus; | ||
|
|
||
| public PathWithLogFilePath(Path parent, String child) { | ||
| super(parent, child); | ||
| } | ||
|
|
||
| public void setBelongToIncrementalPath(boolean belongToIncrementalPath) { | ||
| this.belongToIncrementalPath = belongToIncrementalPath; | ||
| } | ||
|
|
||
| public List<String> getDeltaLogPaths() { | ||
| return deltaLogPaths; | ||
| } | ||
|
|
||
| public void setDeltaLogPaths(List<String> deltaLogPaths) { | ||
| this.deltaLogPaths = deltaLogPaths; | ||
| } | ||
|
|
||
| public String getMaxCommitTime() { | ||
| return maxCommitTime; | ||
| } | ||
|
|
||
| public void setMaxCommitTime(String maxCommitTime) { | ||
| this.maxCommitTime = maxCommitTime; | ||
| } | ||
|
|
||
| public String getBasePath() { | ||
| return basePath; | ||
| } | ||
|
|
||
| public void setBasePath(String basePath) { | ||
| this.basePath = basePath; | ||
| } | ||
|
|
||
| public void setBaseFilePath(String baseFilePath) { | ||
| this.baseFilePath = baseFilePath; | ||
| } | ||
|
|
||
| public boolean splitable() { | ||
| return !baseFilePath.isEmpty(); | ||
| } | ||
|
|
||
| public PathWithBootstrapFileStatus getPathWithBootstrapFileStatus() { | ||
| return pathWithBootstrapFileStatus; | ||
| } | ||
|
|
||
| public void setPathWithBootstrapFileStatus(PathWithBootstrapFileStatus pathWithBootstrapFileStatus) { | ||
| this.pathWithBootstrapFileStatus = pathWithBootstrapFileStatus; | ||
| } | ||
|
|
||
| public boolean includeBootstrapFilePath() { | ||
| return pathWithBootstrapFileStatus != null; | ||
| } | ||
|
|
||
| public BaseFileWithLogsSplit buildSplit(Path file, long start, long length, String[] hosts) { | ||
| BaseFileWithLogsSplit bs = new BaseFileWithLogsSplit(file, start, length, hosts); | ||
| bs.setBelongToIncrementalSplit(belongToIncrementalPath); | ||
| bs.setDeltaLogPaths(deltaLogPaths); | ||
| bs.setMaxCommitTime(maxCommitTime); | ||
| bs.setBasePath(basePath); | ||
| bs.setBaseFilePath(baseFilePath); | ||
| return bs; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hudi.hadoop; | ||
|
|
||
| import org.apache.hadoop.fs.FileStatus; | ||
| import org.apache.hadoop.fs.Path; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
|
|
||
| /** | ||
| * With the base input format implementations in Hadoop/Hive, | ||
| * we need to encode additional information in Path to track base files and logs files for realtime read. | ||
| * Hence, this class tracks a log/base file status | ||
| * in Path. | ||
| */ | ||
| public class RealtimeFileStatus extends FileStatus { | ||
|
||
| // a flag to mark this split is produced by incremental query or not. | ||
| private boolean belongToIncrementalFileStatus = false; | ||
| // the log files belong this fileStatus. | ||
| private List<String> deltaLogPaths = new ArrayList<>(); | ||
xiarixiaoyao marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // max commit time of current fileStatus. | ||
| private String maxCommitTime = ""; | ||
| // the basePath of current hoodie table. | ||
| private String basePath = ""; | ||
| // the base file belong to this status; | ||
| private String baseFilePath = ""; | ||
| // the bootstrap file belong to this status. | ||
| // only if current query table is bootstrap table, this field is used. | ||
| private FileStatus bootStrapFileStatus; | ||
|
|
||
| public RealtimeFileStatus(FileStatus fileStatus) throws IOException { | ||
| super(fileStatus); | ||
| } | ||
|
|
||
| @Override | ||
| public Path getPath() { | ||
| Path path = super.getPath(); | ||
| PathWithLogFilePath pathWithLogFilePath = new PathWithLogFilePath(path.getParent(), path.getName()); | ||
| pathWithLogFilePath.setBelongToIncrementalPath(belongToIncrementalFileStatus); | ||
| pathWithLogFilePath.setDeltaLogPaths(deltaLogPaths); | ||
| pathWithLogFilePath.setMaxCommitTime(maxCommitTime); | ||
| pathWithLogFilePath.setBasePath(basePath); | ||
| pathWithLogFilePath.setBaseFilePath(baseFilePath); | ||
| if (bootStrapFileStatus != null) { | ||
| pathWithLogFilePath.setPathWithBootstrapFileStatus((PathWithBootstrapFileStatus)bootStrapFileStatus.getPath()); | ||
| } | ||
| return pathWithLogFilePath; | ||
| } | ||
|
|
||
| public void setBelongToIncrementalFileStatus(boolean belongToIncrementalFileStatus) { | ||
| this.belongToIncrementalFileStatus = belongToIncrementalFileStatus; | ||
| } | ||
|
|
||
| public List<String> getDeltaLogPaths() { | ||
| return deltaLogPaths; | ||
| } | ||
|
|
||
| public void setDeltaLogPaths(List<String> deltaLogPaths) { | ||
| this.deltaLogPaths = deltaLogPaths; | ||
| } | ||
|
|
||
| public String getMaxCommitTime() { | ||
| return maxCommitTime; | ||
| } | ||
|
|
||
| public void setMaxCommitTime(String maxCommitTime) { | ||
| this.maxCommitTime = maxCommitTime; | ||
| } | ||
|
|
||
| public String getBasePath() { | ||
| return basePath; | ||
| } | ||
|
|
||
| public void setBasePath(String basePath) { | ||
| this.basePath = basePath; | ||
| } | ||
|
|
||
| public void setBaseFilePath(String baseFilePath) { | ||
| this.baseFilePath = baseFilePath; | ||
| } | ||
|
|
||
| public void setBootStrapFileStatus(FileStatus bootStrapFileStatus) { | ||
| this.bootStrapFileStatus = bootStrapFileStatus; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.