diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java index 17075f9d3dfb6..97d3d91fb63ad 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -37,7 +36,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.testutils.HoodieClientTestBase; - import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.junit.jupiter.api.BeforeEach; @@ -61,21 +59,18 @@ */ public class ITTestClusteringCommand extends AbstractShellIntegrationTest { - private String tablePath; - private String tableName; - @BeforeEach public void init() throws IOException { tableName = "test_table_" + ITTestClusteringCommand.class.getName(); - tablePath = Paths.get(basePath, tableName).toString(); + basePath = Paths.get(basePath, tableName).toString(); HoodieCLI.conf = jsc.hadoopConfiguration(); // Create table and connect new TableCommand().createTable( - tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), + basePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); - metaClient.setBasePath(tablePath); - metaClient = HoodieTableMetaClient.reload(metaClient); + + initMetaClient(); } /** @@ -168,7 +163,7 @@ private void generateCommits() throws IOException { HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); // Create the write client to write some records in - HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withDeleteParallelism(2).forTable(tableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java index 18f4a387d474e..fd533be09b6be 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java @@ -54,16 +54,16 @@ public class ITTestCommitsCommand extends AbstractShellIntegrationTest { @BeforeEach public void init() throws IOException { - String tableName = "test_table_" + ITTestCommitsCommand.class.getName(); - String tablePath = Paths.get(basePath, tableName).toString(); + tableName = "test_table_" + ITTestCommitsCommand.class.getName(); + basePath = Paths.get(basePath, tableName).toString(); HoodieCLI.conf = jsc.hadoopConfiguration(); // Create table and connect new TableCommand().createTable( - tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), + basePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); - metaClient.setBasePath(tablePath); - metaClient = HoodieTableMetaClient.reload(metaClient); + + initMetaClient(); } /** diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java index 4734f45e7074b..267cee70f2893 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java @@ -32,7 +32,6 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -48,7 +47,6 @@ import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; - import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.shell.core.CommandResult; @@ -73,21 +71,18 @@ */ public class ITTestCompactionCommand extends AbstractShellIntegrationTest { - private String tablePath; - private String tableName; - @BeforeEach public void init() throws IOException { tableName = "test_table_" + ITTestCompactionCommand.class.getName(); - tablePath = Paths.get(basePath, tableName).toString(); + basePath = Paths.get(basePath, tableName).toString(); HoodieCLI.conf = jsc.hadoopConfiguration(); // Create table and connect new TableCommand().createTable( - tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(), + basePath, tableName, HoodieTableType.MERGE_ON_READ.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); - metaClient.setBasePath(tablePath); - metaClient = HoodieTableMetaClient.reload(metaClient); + + initMetaClient(); } /** @@ -298,7 +293,7 @@ private void generateCommits() throws IOException { HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); // Create the write client to write some records in - HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withDeleteParallelism(2).forTable(tableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 1bde88d3bb647..79badb48a5895 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -96,22 +96,26 @@ public static Configuration prepareHadoopConf(Configuration conf) { return conf; } - public static FileSystem getFs(String path, Configuration conf) { + public static FileSystem getFs(String pathStr, Configuration conf) { + return getFs(new Path(pathStr), conf); + } + + public static FileSystem getFs(Path path, Configuration conf) { FileSystem fs; prepareHadoopConf(conf); try { - fs = new Path(path).getFileSystem(conf); + fs = path.getFileSystem(conf); } catch (IOException e) { throw new HoodieIOException("Failed to get instance of " + FileSystem.class.getName(), e); } return fs; } - public static FileSystem getFs(String path, Configuration conf, boolean localByDefault) { + public static FileSystem getFs(String pathStr, Configuration conf, boolean localByDefault) { if (localByDefault) { - return getFs(addSchemeIfLocalPath(path).toString(), conf); + return getFs(addSchemeIfLocalPath(pathStr), conf); } - return getFs(path, conf); + return getFs(pathStr, conf); } /** @@ -178,7 +182,7 @@ public static String getCommitFromCommitFile(String commitFileName) { } public static String getCommitTime(String fullFileName) { - if (isLogFile(new Path(fullFileName))) { + if (isLogFile(fullFileName)) { return fullFileName.split("_")[1].split("\\.")[0]; } return fullFileName.split("_")[2].split("\\.")[0]; @@ -461,8 +465,12 @@ public static boolean isBaseFile(Path path) { } public static boolean isLogFile(Path logPath) { - Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName()); - return matcher.find() && logPath.getName().contains(".log"); + return isLogFile(logPath.getName()); + } + + public static boolean isLogFile(String fileName) { + Matcher matcher = LOG_FILE_PATTERN.matcher(fileName); + return matcher.find() && fileName.contains(".log"); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java index 4bbd94384420d..a79d1571afe73 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java @@ -48,7 +48,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; -import org.apache.hudi.hadoop.FileNameCachingPath; +import org.apache.hudi.hadoop.CachingPath; import java.io.IOException; import java.net.URI; @@ -142,7 +142,7 @@ public static Path convertPathWithScheme(Path oldPath, String newScheme) { try { newURI = new URI(newScheme, oldURI.getUserInfo(), oldURI.getHost(), oldURI.getPort(), oldURI.getPath(), oldURI.getQuery(), oldURI.getFragment()); - return new FileNameCachingPath(newURI); + return new CachingPath(newURI); } catch (URISyntaxException e) { // TODO - Better Exception handling throw new RuntimeException(e); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java index f12c207ee75b6..cd35861b7499e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java @@ -31,26 +31,35 @@ public class BaseFile implements Serializable { private static final long serialVersionUID = 1L; + private transient FileStatus fileStatus; private final String fullPath; + private final String fileName; private long fileLen; public BaseFile(BaseFile dataFile) { - this.fileStatus = dataFile.fileStatus; - this.fullPath = dataFile.fullPath; - this.fileLen = dataFile.fileLen; + this(dataFile.fileStatus, + dataFile.fullPath, + dataFile.getFileName(), + dataFile.getFileLen()); } public BaseFile(FileStatus fileStatus) { - this.fileStatus = fileStatus; - this.fullPath = fileStatus.getPath().toString(); - this.fileLen = fileStatus.getLen(); + this(fileStatus, + fileStatus.getPath().toString(), + fileStatus.getPath().getName(), + fileStatus.getLen()); } public BaseFile(String filePath) { - this.fileStatus = null; - this.fullPath = filePath; - this.fileLen = -1; + this(null, filePath, getFileName(filePath), -1); + } + + private BaseFile(FileStatus fileStatus, String fullPath, String fileName, long fileLen) { + this.fileStatus = fileStatus; + this.fullPath = fullPath; + this.fileLen = fileLen; + this.fileName = fileName; } public String getPath() { @@ -58,7 +67,7 @@ public String getPath() { } public String getFileName() { - return new Path(fullPath).getName(); + return fileName; } public FileStatus getFileStatus() { @@ -98,4 +107,8 @@ public int hashCode() { public String toString() { return "BaseFile{fullPath=" + fullPath + ", fileLen=" + fileLen + '}'; } + + private static String getFileName(String fullPath) { + return new Path(fullPath).getName(); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 38b5509cd577f..23623976d915d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -49,6 +49,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hudi.hadoop.CachingPath; +import org.apache.hudi.hadoop.SerializablePath; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -84,7 +86,6 @@ public class HoodieTableMetaClient implements Serializable { public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap"; public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".heartbeat"; public static final String METADATA_TABLE_FOLDER_PATH = METAFOLDER_NAME + Path.SEPARATOR + "metadata"; - public static final String COLUMN_STATISTICS_INDEX_NAME = ".colstatsindex"; public static final String BOOTSTRAP_INDEX_BY_PARTITION_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR + ".partitions"; public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR @@ -94,9 +95,13 @@ public class HoodieTableMetaClient implements Serializable { public static final String MARKER_EXTN = ".marker"; - private String basePath; + // NOTE: Since those two parameters lay on the hot-path of a lot of computations, we + // use tailored extension of the {@code Path} class allowing to avoid repetitive + // computations secured by its immutability + private SerializablePath basePath; + private SerializablePath metaPath; + private transient HoodieWrapperFileSystem fs; - private String metaPath; private boolean loadActiveTimelineOnLoad; private SerializableConfiguration hadoopConf; private HoodieTableType tableType; @@ -114,13 +119,11 @@ private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadA this.consistencyGuardConfig = consistencyGuardConfig; this.fileSystemRetryConfig = fileSystemRetryConfig; this.hadoopConf = new SerializableConfiguration(conf); - Path basePathDir = new Path(basePath); - this.basePath = basePathDir.toString(); - this.metaPath = new Path(basePath, METAFOLDER_NAME).toString(); - Path metaPathDir = new Path(this.metaPath); + this.basePath = new SerializablePath(new CachingPath(basePath)); + this.metaPath = new SerializablePath(new CachingPath(basePath, METAFOLDER_NAME)); this.fs = getFs(); - TableNotFoundException.checkTableValidity(fs, basePathDir, metaPathDir); - this.tableConfig = new HoodieTableConfig(fs, metaPath, payloadClassName); + TableNotFoundException.checkTableValidity(fs, this.basePath.get(), metaPath.get()); + this.tableConfig = new HoodieTableConfig(fs, metaPath.toString(), payloadClassName); this.tableType = tableConfig.getTableType(); Option tableConfigVersion = tableConfig.getTimelineLayoutVersion(); if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) { @@ -147,8 +150,13 @@ private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadA public HoodieTableMetaClient() {} public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) { - return HoodieTableMetaClient.builder().setConf(oldMetaClient.hadoopConf.get()).setBasePath(oldMetaClient.basePath).setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad) - .setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null) + return HoodieTableMetaClient.builder() + .setConf(oldMetaClient.hadoopConf.get()) + .setBasePath(oldMetaClient.basePath.toString()) + .setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad) + .setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig) + .setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)) + .setPayloadClassName(null) .setFileSystemRetryConfig(oldMetaClient.fileSystemRetryConfig).build(); } @@ -159,6 +167,7 @@ public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) */ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); + fs = null; // will be lazily initialized } @@ -166,11 +175,20 @@ private void writeObject(java.io.ObjectOutputStream out) throws IOException { out.defaultWriteObject(); } + /** + * Returns base path of the table + */ + public Path getBasePathV2() { + return basePath.get(); + } + /** * @return Base path + * @deprecated please use {@link #getBasePathV2()} */ + @Deprecated public String getBasePath() { - return basePath; + return basePath.get().toString(); // this invocation is cached } /** @@ -184,21 +202,14 @@ public HoodieTableType getTableType() { * @return Meta path */ public String getMetaPath() { - return metaPath; - } - - /** - * @return Column Statistics index path - */ - public String getColumnStatsIndexPath() { - return new Path(metaPath, COLUMN_STATISTICS_INDEX_NAME).toString(); + return metaPath.get().toString(); // this invocation is cached } /** * @return schema folder path */ public String getSchemaFolderName() { - return new Path(metaPath, SCHEMA_FOLDER_NAME).toString(); + return new Path(metaPath.get(), SCHEMA_FOLDER_NAME).toString(); } /** @@ -270,7 +281,7 @@ public TimelineLayoutVersion getTimelineLayoutVersion() { */ public HoodieWrapperFileSystem getFs() { if (fs == null) { - FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.newCopy()); + FileSystem fileSystem = FSUtils.getFs(metaPath.get(), hadoopConf.newCopy()); if (fileSystemRetryConfig.isFileSystemActionRetryEnable()) { fileSystem = new HoodieRetryWrapperFileSystem(fileSystem, @@ -437,8 +448,7 @@ public static HoodieTableMetaClient initTableAndGetMetaClient(Configuration hado return metaClient; } - public static void initializeBootstrapDirsIfNotExists(Configuration hadoopConf, - String basePath, FileSystem fs) throws IOException { + public static void initializeBootstrapDirsIfNotExists(Configuration hadoopConf, String basePath, FileSystem fs) throws IOException { // Create bootstrap index by partition folder if it does not exist final Path bootstrap_index_folder_by_partition = @@ -542,7 +552,7 @@ public String getCommitActionType() { */ public List scanHoodieInstantsFromFileSystem(Set includedExtensions, boolean applyLayoutVersionFilters) throws IOException { - return scanHoodieInstantsFromFileSystem(new Path(metaPath), includedExtensions, applyLayoutVersionFilters); + return scanHoodieInstantsFromFileSystem(metaPath.get(), includedExtensions, applyLayoutVersionFilters); } /** @@ -599,19 +609,7 @@ public String toString() { } public void initializeBootstrapDirsIfNotExists() throws IOException { - initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath, getFs()); - } - - public void setBasePath(String basePath) { - this.basePath = basePath; - } - - public void setMetaPath(String metaPath) { - this.metaPath = metaPath; - } - - public void setActiveTimeline(HoodieActiveTimeline activeTimeline) { - this.activeTimeline = activeTimeline; + initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath.toString(), getFs()); } public static Builder builder() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index 208d7ef2ba456..c6e618ac4769a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -95,7 +95,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV private BootstrapIndex bootstrapIndex; private String getPartitionPathFromFilePath(String fullPath) { - return FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), new Path(fullPath).getParent()); + return FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), new Path(fullPath).getParent()); } /** @@ -172,7 +172,7 @@ protected List buildFileGroups(Stream baseFileS Map, List> logFiles = logFileStream.collect(Collectors.groupingBy((logFile) -> { String partitionPathStr = - FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), logFile.getPath().getParent()); + FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), logFile.getPath().getParent()); return Pair.of(partitionPathStr, logFile.getFileId()); })); @@ -299,7 +299,7 @@ private void ensurePartitionLoadedCorrectly(String partition) { try { LOG.info("Building file system view for partition (" + partitionPathStr + ")"); - Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPathStr); + Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPathStr); long beginLsTs = System.currentTimeMillis(); FileStatus[] statuses = listPartition(partitionPath); long endLsTs = System.currentTimeMillis(); diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/FileNameCachingPath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java similarity index 54% rename from hudi-common/src/main/java/org/apache/hudi/hadoop/FileNameCachingPath.java rename to hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java index 873f7f98f7c9e..01b3eb9d409bb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/hadoop/FileNameCachingPath.java +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java @@ -20,19 +20,47 @@ import org.apache.hadoop.fs.Path; +import javax.annotation.concurrent.ThreadSafe; +import java.io.Serializable; import java.net.URI; /** + * This is an extension of the {@code Path} class allowing to avoid repetitive + * computations (like {@code getFileName}, {@code toString}) which are secured + * by its immutability + * * NOTE: This class is thread-safe */ -public class FileNameCachingPath extends Path { +@ThreadSafe +public class CachingPath extends Path implements Serializable { - // NOTE: volatile keyword is redundant here and put mostly for reader notice, since all + // NOTE: `volatile` keyword is redundant here and put mostly for reader notice, since all // reads/writes to references are always atomic (including 64-bit JVMs) // https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.7 private volatile String fileName; + private volatile String fullPathStr; + + public CachingPath(String parent, String child) { + super(parent, child); + } + + public CachingPath(Path parent, String child) { + super(parent, child); + } + + public CachingPath(String parent, Path child) { + super(parent, child); + } + + public CachingPath(Path parent, Path child) { + super(parent, child); + } - public FileNameCachingPath(URI aUri) { + public CachingPath(String pathString) throws IllegalArgumentException { + super(pathString); + } + + public CachingPath(URI aUri) { super(aUri); } @@ -45,4 +73,14 @@ public String getName() { } return fileName; } + + @Override + public String toString() { + // This value could be overwritten concurrently and that's okay, since + // {@code Path} is immutable + if (fullPathStr == null) { + fullPathStr = super.toString(); + } + return fullPathStr; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java new file mode 100644 index 0000000000000..5ad2307ef804a --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.Objects; + +/** + * {@link Serializable} wrapper encapsulating {@link Path} + */ +public class SerializablePath implements Serializable { + + private Path path; + + public SerializablePath(Path path) { + this.path = path; + } + + public Path get() { + return path; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(path.toString()); + } + + private void readObject(ObjectInputStream in) throws IOException { + String pathStr = in.readUTF(); + path = new CachingPath(pathStr); + } + + @Override + public String toString() { + return path.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SerializablePath that = (SerializablePath) o; + return Objects.equals(path, that.path); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 58d186f971cb8..c9bdc59da9763 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -54,6 +54,7 @@ import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.io.storage.HoodieHFileReader; +import org.apache.hudi.util.Lazy; import java.io.IOException; import java.math.BigDecimal; @@ -143,6 +144,31 @@ public class HoodieMetadataPayload implements HoodieRecordPayload METADATA_COLUMN_STATS_BUILDER_STUB = Lazy.lazily(HoodieMetadataColumnStats::newBuilder); + private static final Lazy STRING_WRAPPER_BUILDER_STUB = Lazy.lazily(StringWrapper::newBuilder); + private static final Lazy BYTES_WRAPPER_BUILDER_STUB = Lazy.lazily(BytesWrapper::newBuilder); + private static final Lazy DOUBLE_WRAPPER_BUILDER_STUB = Lazy.lazily(DoubleWrapper::newBuilder); + private static final Lazy FLOAT_WRAPPER_BUILDER_STUB = Lazy.lazily(FloatWrapper::newBuilder); + private static final Lazy LONG_WRAPPER_BUILDER_STUB = Lazy.lazily(LongWrapper::newBuilder); + private static final Lazy INT_WRAPPER_BUILDER_STUB = Lazy.lazily(IntWrapper::newBuilder); + private static final Lazy BOOLEAN_WRAPPER_BUILDER_STUB = Lazy.lazily(BooleanWrapper::newBuilder); + private static final Lazy TIMESTAMP_MICROS_WRAPPER_BUILDER_STUB = Lazy.lazily(TimestampMicrosWrapper::newBuilder); + private static final Lazy DECIMAL_WRAPPER_BUILDER_STUB = Lazy.lazily(DecimalWrapper::newBuilder); + private static final Lazy DATE_WRAPPER_BUILDER_STUB = Lazy.lazily(DateWrapper::newBuilder); + private String key = null; private int type = 0; private Map filesystemMetadata = null; @@ -201,7 +227,7 @@ public HoodieMetadataPayload(Option recordOpt) { checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_COLUMN_STATS) == null, String.format("Valid %s record expected for type: %s", SCHEMA_FIELD_ID_COLUMN_STATS, METADATA_TYPE_COLUMN_STATS)); } else { - columnStatMetadata = HoodieMetadataColumnStats.newBuilder() + columnStatMetadata = HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get()) .setFileName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME)) .setColumnName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME)) .setMinValue(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE)) @@ -605,7 +631,7 @@ private static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataC .max(Comparator.naturalOrder()) .orElse(null); - return HoodieMetadataColumnStats.newBuilder() + return HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get()) .setFileName(newColumnStats.getFileName()) .setColumnName(newColumnStats.getColumnName()) .setMinValue(wrapStatisticValue(minValue)) @@ -653,11 +679,13 @@ private static Object wrapStatisticValue(Comparable statValue) { LocalDate localDate = statValue instanceof LocalDate ? (LocalDate) statValue : ((Date) statValue).toLocalDate(); - return DateWrapper.newBuilder().setValue((int) localDate.toEpochDay()).build(); + return DateWrapper.newBuilder(DATE_WRAPPER_BUILDER_STUB.get()) + .setValue((int) localDate.toEpochDay()) + .build(); } else if (statValue instanceof BigDecimal) { Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema(); BigDecimal upcastDecimal = tryUpcastDecimal((BigDecimal) statValue, (LogicalTypes.Decimal) valueSchema.getLogicalType()); - return DecimalWrapper.newBuilder() + return DecimalWrapper.newBuilder(DECIMAL_WRAPPER_BUILDER_STUB.get()) .setValue(AVRO_DECIMAL_CONVERSION.toBytes(upcastDecimal, valueSchema, valueSchema.getLogicalType())) .build(); } else if (statValue instanceof Timestamp) { @@ -665,23 +693,23 @@ private static Object wrapStatisticValue(Comparable statValue) { // rely on logical types to do proper encoding of the native Java types, // and hereby have to encode statistic manually Instant instant = ((Timestamp) statValue).toInstant(); - return TimestampMicrosWrapper.newBuilder() + return TimestampMicrosWrapper.newBuilder(TIMESTAMP_MICROS_WRAPPER_BUILDER_STUB.get()) .setValue(instantToMicros(instant)) .build(); } else if (statValue instanceof Boolean) { - return BooleanWrapper.newBuilder().setValue((Boolean) statValue).build(); + return BooleanWrapper.newBuilder(BOOLEAN_WRAPPER_BUILDER_STUB.get()).setValue((Boolean) statValue).build(); } else if (statValue instanceof Integer) { - return IntWrapper.newBuilder().setValue((Integer) statValue).build(); + return IntWrapper.newBuilder(INT_WRAPPER_BUILDER_STUB.get()).setValue((Integer) statValue).build(); } else if (statValue instanceof Long) { - return LongWrapper.newBuilder().setValue((Long) statValue).build(); + return LongWrapper.newBuilder(LONG_WRAPPER_BUILDER_STUB.get()).setValue((Long) statValue).build(); } else if (statValue instanceof Float) { - return FloatWrapper.newBuilder().setValue((Float) statValue).build(); + return FloatWrapper.newBuilder(FLOAT_WRAPPER_BUILDER_STUB.get()).setValue((Float) statValue).build(); } else if (statValue instanceof Double) { - return DoubleWrapper.newBuilder().setValue((Double) statValue).build(); + return DoubleWrapper.newBuilder(DOUBLE_WRAPPER_BUILDER_STUB.get()).setValue((Double) statValue).build(); } else if (statValue instanceof ByteBuffer) { - return BytesWrapper.newBuilder().setValue((ByteBuffer) statValue).build(); + return BytesWrapper.newBuilder(BYTES_WRAPPER_BUILDER_STUB.get()).setValue((ByteBuffer) statValue).build(); } else if (statValue instanceof String || statValue instanceof Utf8) { - return StringWrapper.newBuilder().setValue(statValue.toString()).build(); + return StringWrapper.newBuilder(STRING_WRAPPER_BUILDER_STUB.get()).setValue(statValue.toString()).build(); } else { throw new UnsupportedOperationException(String.format("Unsupported type of the statistic (%s)", statValue.getClass())); }