From e5e0506fa93b5f692809058a92dd3bf60e0d4fa4 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 8 Apr 2022 09:40:30 -0700 Subject: [PATCH 01/15] Stubbed out all Avro `newBuilder` invokations on the hot-path to avoid calls punitive `SpecificData.getForSchema` calls --- .../hudi/metadata/HoodieMetadataPayload.java | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 58d186f971cb..ef4a8ff7f170 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -143,6 +143,18 @@ public class HoodieMetadataPayload implements HoodieRecordPayload filesystemMetadata = null; @@ -201,7 +213,7 @@ public HoodieMetadataPayload(Option recordOpt) { checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_COLUMN_STATS) == null, String.format("Valid %s record expected for type: %s", SCHEMA_FIELD_ID_COLUMN_STATS, METADATA_TYPE_COLUMN_STATS)); } else { - columnStatMetadata = HoodieMetadataColumnStats.newBuilder() + columnStatMetadata = HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB) .setFileName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME)) .setColumnName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME)) .setMinValue(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE)) @@ -605,7 +617,7 @@ private static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataC .max(Comparator.naturalOrder()) .orElse(null); - return HoodieMetadataColumnStats.newBuilder() + return HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB) .setFileName(newColumnStats.getFileName()) .setColumnName(newColumnStats.getColumnName()) .setMinValue(wrapStatisticValue(minValue)) @@ -653,11 +665,11 @@ private static Object wrapStatisticValue(Comparable statValue) { LocalDate localDate = statValue instanceof LocalDate ? (LocalDate) statValue : ((Date) statValue).toLocalDate(); - return DateWrapper.newBuilder().setValue((int) localDate.toEpochDay()).build(); + return DateWrapper.newBuilder(DATE_WRAPPER_BUILDER_STUB).setValue((int) localDate.toEpochDay()).build(); } else if (statValue instanceof BigDecimal) { Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema(); BigDecimal upcastDecimal = tryUpcastDecimal((BigDecimal) statValue, (LogicalTypes.Decimal) valueSchema.getLogicalType()); - return DecimalWrapper.newBuilder() + return DecimalWrapper.newBuilder(DECIMAL_WRAPPER_BUILDER_STUB) .setValue(AVRO_DECIMAL_CONVERSION.toBytes(upcastDecimal, valueSchema, valueSchema.getLogicalType())) .build(); } else if (statValue instanceof Timestamp) { @@ -665,23 +677,23 @@ private static Object wrapStatisticValue(Comparable statValue) { // rely on logical types to do proper encoding of the native Java types, // and hereby have to encode statistic manually Instant instant = ((Timestamp) statValue).toInstant(); - return TimestampMicrosWrapper.newBuilder() + return TimestampMicrosWrapper.newBuilder(TIMESTAMP_MICROS_WRAPPER_BUILDER_STUB) .setValue(instantToMicros(instant)) .build(); } else if (statValue instanceof Boolean) { - return BooleanWrapper.newBuilder().setValue((Boolean) statValue).build(); + return BooleanWrapper.newBuilder(BOOLEAN_WRAPPER_BUILDER_STUB).setValue((Boolean) statValue).build(); } else if (statValue instanceof Integer) { - return IntWrapper.newBuilder().setValue((Integer) statValue).build(); + return IntWrapper.newBuilder(INT_WRAPPER_BUILDER_STUB).setValue((Integer) statValue).build(); } else if (statValue instanceof Long) { - return LongWrapper.newBuilder().setValue((Long) statValue).build(); + return LongWrapper.newBuilder(LONG_WRAPPER_BUILDER_STUB).setValue((Long) statValue).build(); } else if (statValue instanceof Float) { - return FloatWrapper.newBuilder().setValue((Float) statValue).build(); + return FloatWrapper.newBuilder(FLOAT_WRAPPER_BUILDER_STUB).setValue((Float) statValue).build(); } else if (statValue instanceof Double) { - return DoubleWrapper.newBuilder().setValue((Double) statValue).build(); + return DoubleWrapper.newBuilder(DOUBLE_WRAPPER_BUILDER_STUB).setValue((Double) statValue).build(); } else if (statValue instanceof ByteBuffer) { - return BytesWrapper.newBuilder().setValue((ByteBuffer) statValue).build(); + return BytesWrapper.newBuilder(BYTES_WRAPPER_BUILDER_STUB).setValue((ByteBuffer) statValue).build(); } else if (statValue instanceof String || statValue instanceof Utf8) { - return StringWrapper.newBuilder().setValue(statValue.toString()).build(); + return StringWrapper.newBuilder(STRING_WRAPPER_BUILDER_STUB).setValue(statValue.toString()).build(); } else { throw new UnsupportedOperationException(String.format("Unsupported type of the statistic (%s)", statValue.getClass())); } From eee61c624ea619b5d15e636542048d43348fb76b Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 8 Apr 2022 09:41:15 -0700 Subject: [PATCH 02/15] Cleaned up `BaseFile` to avoid `new Path` calls in the hot-path (to avoid memory churn) --- .../apache/hudi/common/model/BaseFile.java | 33 +++++++++++++------ 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java index f12c207ee75b..cd35861b7499 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java @@ -31,26 +31,35 @@ public class BaseFile implements Serializable { private static final long serialVersionUID = 1L; + private transient FileStatus fileStatus; private final String fullPath; + private final String fileName; private long fileLen; public BaseFile(BaseFile dataFile) { - this.fileStatus = dataFile.fileStatus; - this.fullPath = dataFile.fullPath; - this.fileLen = dataFile.fileLen; + this(dataFile.fileStatus, + dataFile.fullPath, + dataFile.getFileName(), + dataFile.getFileLen()); } public BaseFile(FileStatus fileStatus) { - this.fileStatus = fileStatus; - this.fullPath = fileStatus.getPath().toString(); - this.fileLen = fileStatus.getLen(); + this(fileStatus, + fileStatus.getPath().toString(), + fileStatus.getPath().getName(), + fileStatus.getLen()); } public BaseFile(String filePath) { - this.fileStatus = null; - this.fullPath = filePath; - this.fileLen = -1; + this(null, filePath, getFileName(filePath), -1); + } + + private BaseFile(FileStatus fileStatus, String fullPath, String fileName, long fileLen) { + this.fileStatus = fileStatus; + this.fullPath = fullPath; + this.fileLen = fileLen; + this.fileName = fileName; } public String getPath() { @@ -58,7 +67,7 @@ public String getPath() { } public String getFileName() { - return new Path(fullPath).getName(); + return fileName; } public FileStatus getFileStatus() { @@ -98,4 +107,8 @@ public int hashCode() { public String toString() { return "BaseFile{fullPath=" + fullPath + ", fileLen=" + fileLen + '}'; } + + private static String getFileName(String fullPath) { + return new Path(fullPath).getName(); + } } From d840703137e7d0ca2f75cf31f1d2e74c05fafdd0 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 8 Apr 2022 10:10:35 -0700 Subject: [PATCH 03/15] `FileNameCachingPath` > `LazyCachingPath` --- .../common/fs/HoodieWrapperFileSystem.java | 4 +- ...eCachingPath.java => LazyCachingPath.java} | 43 +++++++++++++++++-- 2 files changed, 42 insertions(+), 5 deletions(-) rename hudi-common/src/main/java/org/apache/hudi/hadoop/{FileNameCachingPath.java => LazyCachingPath.java} (55%) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java index 4bbd94384420..57fa1f0b0b07 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java @@ -48,7 +48,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; -import org.apache.hudi.hadoop.FileNameCachingPath; +import org.apache.hudi.hadoop.LazyCachingPath; import java.io.IOException; import java.net.URI; @@ -142,7 +142,7 @@ public static Path convertPathWithScheme(Path oldPath, String newScheme) { try { newURI = new URI(newScheme, oldURI.getUserInfo(), oldURI.getHost(), oldURI.getPort(), oldURI.getPath(), oldURI.getQuery(), oldURI.getFragment()); - return new FileNameCachingPath(newURI); + return new LazyCachingPath(newURI); } catch (URISyntaxException e) { // TODO - Better Exception handling throw new RuntimeException(e); diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/FileNameCachingPath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/LazyCachingPath.java similarity index 55% rename from hudi-common/src/main/java/org/apache/hudi/hadoop/FileNameCachingPath.java rename to hudi-common/src/main/java/org/apache/hudi/hadoop/LazyCachingPath.java index 873f7f98f7c9..9c2224b3ceb7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/hadoop/FileNameCachingPath.java +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/LazyCachingPath.java @@ -20,19 +20,46 @@ import org.apache.hadoop.fs.Path; +import javax.annotation.concurrent.ThreadSafe; import java.net.URI; /** + * This is an extension of the {@code Path} class allowing to avoid repetitive + * computations (like {@code getFileName}, {@code toString}) which are secured + * by its immutability + * * NOTE: This class is thread-safe */ -public class FileNameCachingPath extends Path { +@ThreadSafe +public class LazyCachingPath extends Path { - // NOTE: volatile keyword is redundant here and put mostly for reader notice, since all + // NOTE: `volatile` keyword is redundant here and put mostly for reader notice, since all // reads/writes to references are always atomic (including 64-bit JVMs) // https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.7 private volatile String fileName; + private volatile String s; + + public LazyCachingPath(String parent, String child) { + super(parent, child); + } + + public LazyCachingPath(Path parent, String child) { + super(parent, child); + } + + public LazyCachingPath(String parent, Path child) { + super(parent, child); + } + + public LazyCachingPath(Path parent, Path child) { + super(parent, child); + } - public FileNameCachingPath(URI aUri) { + public LazyCachingPath(String pathString) throws IllegalArgumentException { + super(pathString); + } + + public LazyCachingPath(URI aUri) { super(aUri); } @@ -45,4 +72,14 @@ public String getName() { } return fileName; } + + @Override + public String toString() { + // This value could be overwritten concurrently and that's okay, since + // {@code Path} is immutable + if (s == null) { + s = super.toString(); + } + return s; + } } From 1b5b94e4131a47e6409bcc60996e46ddc44cbc4c Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 8 Apr 2022 10:12:16 -0700 Subject: [PATCH 04/15] Rebased `HoodieTableMetaClient` onto `LazyCachingPath` to hold pointers to base-/meta-paths to avoid construction of Hadoop's `Path` in the hot-path; Avoid churning `Path` objects --- .../org/apache/hudi/common/fs/FSUtils.java | 24 ++++--- .../common/table/HoodieTableMetaClient.java | 68 +++++++++---------- .../view/AbstractTableFileSystemView.java | 6 +- 3 files changed, 51 insertions(+), 47 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 1bde88d3bb64..79badb48a589 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -96,22 +96,26 @@ public static Configuration prepareHadoopConf(Configuration conf) { return conf; } - public static FileSystem getFs(String path, Configuration conf) { + public static FileSystem getFs(String pathStr, Configuration conf) { + return getFs(new Path(pathStr), conf); + } + + public static FileSystem getFs(Path path, Configuration conf) { FileSystem fs; prepareHadoopConf(conf); try { - fs = new Path(path).getFileSystem(conf); + fs = path.getFileSystem(conf); } catch (IOException e) { throw new HoodieIOException("Failed to get instance of " + FileSystem.class.getName(), e); } return fs; } - public static FileSystem getFs(String path, Configuration conf, boolean localByDefault) { + public static FileSystem getFs(String pathStr, Configuration conf, boolean localByDefault) { if (localByDefault) { - return getFs(addSchemeIfLocalPath(path).toString(), conf); + return getFs(addSchemeIfLocalPath(pathStr), conf); } - return getFs(path, conf); + return getFs(pathStr, conf); } /** @@ -178,7 +182,7 @@ public static String getCommitFromCommitFile(String commitFileName) { } public static String getCommitTime(String fullFileName) { - if (isLogFile(new Path(fullFileName))) { + if (isLogFile(fullFileName)) { return fullFileName.split("_")[1].split("\\.")[0]; } return fullFileName.split("_")[2].split("\\.")[0]; @@ -461,8 +465,12 @@ public static boolean isBaseFile(Path path) { } public static boolean isLogFile(Path logPath) { - Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName()); - return matcher.find() && logPath.getName().contains(".log"); + return isLogFile(logPath.getName()); + } + + public static boolean isLogFile(String fileName) { + Matcher matcher = LOG_FILE_PATTERN.matcher(fileName); + return matcher.find() && fileName.contains(".log"); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 38b5509cd577..38d9b27ec8ed 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -49,6 +49,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hudi.hadoop.LazyCachingPath; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -84,7 +85,6 @@ public class HoodieTableMetaClient implements Serializable { public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap"; public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".heartbeat"; public static final String METADATA_TABLE_FOLDER_PATH = METAFOLDER_NAME + Path.SEPARATOR + "metadata"; - public static final String COLUMN_STATISTICS_INDEX_NAME = ".colstatsindex"; public static final String BOOTSTRAP_INDEX_BY_PARTITION_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR + ".partitions"; public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR @@ -94,9 +94,13 @@ public class HoodieTableMetaClient implements Serializable { public static final String MARKER_EXTN = ".marker"; - private String basePath; + // NOTE: Since those two parameters lay on the hot-path of a lot of computations, we + // use tailored extension of the {@code Path} class allowing to avoid repetitive + // computations secured by its immutability + private LazyCachingPath basePath; + private LazyCachingPath metaPath; + private transient HoodieWrapperFileSystem fs; - private String metaPath; private boolean loadActiveTimelineOnLoad; private SerializableConfiguration hadoopConf; private HoodieTableType tableType; @@ -114,13 +118,11 @@ private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadA this.consistencyGuardConfig = consistencyGuardConfig; this.fileSystemRetryConfig = fileSystemRetryConfig; this.hadoopConf = new SerializableConfiguration(conf); - Path basePathDir = new Path(basePath); - this.basePath = basePathDir.toString(); - this.metaPath = new Path(basePath, METAFOLDER_NAME).toString(); - Path metaPathDir = new Path(this.metaPath); + this.basePath = new LazyCachingPath(basePath); + this.metaPath = new LazyCachingPath(basePath, METAFOLDER_NAME); this.fs = getFs(); - TableNotFoundException.checkTableValidity(fs, basePathDir, metaPathDir); - this.tableConfig = new HoodieTableConfig(fs, metaPath, payloadClassName); + TableNotFoundException.checkTableValidity(fs, this.basePath, metaPath); + this.tableConfig = new HoodieTableConfig(fs, metaPath.toString(), payloadClassName); this.tableType = tableConfig.getTableType(); Option tableConfigVersion = tableConfig.getTimelineLayoutVersion(); if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) { @@ -147,8 +149,13 @@ private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadA public HoodieTableMetaClient() {} public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) { - return HoodieTableMetaClient.builder().setConf(oldMetaClient.hadoopConf.get()).setBasePath(oldMetaClient.basePath).setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad) - .setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null) + return HoodieTableMetaClient.builder() + .setConf(oldMetaClient.hadoopConf.get()) + .setBasePath(oldMetaClient.basePath.toString()) + .setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad) + .setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig) + .setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)) + .setPayloadClassName(null) .setFileSystemRetryConfig(oldMetaClient.fileSystemRetryConfig).build(); } @@ -166,11 +173,20 @@ private void writeObject(java.io.ObjectOutputStream out) throws IOException { out.defaultWriteObject(); } + /** + * Returns base path of the table + */ + public Path getBasePathV2() { + return basePath; + } + /** * @return Base path + * @deprecated please use {@link #getBasePathV2()} */ + @Deprecated public String getBasePath() { - return basePath; + return basePath.toString(); // this invocation is cached } /** @@ -184,14 +200,7 @@ public HoodieTableType getTableType() { * @return Meta path */ public String getMetaPath() { - return metaPath; - } - - /** - * @return Column Statistics index path - */ - public String getColumnStatsIndexPath() { - return new Path(metaPath, COLUMN_STATISTICS_INDEX_NAME).toString(); + return metaPath.toString(); // this invocation is cached } /** @@ -437,8 +446,7 @@ public static HoodieTableMetaClient initTableAndGetMetaClient(Configuration hado return metaClient; } - public static void initializeBootstrapDirsIfNotExists(Configuration hadoopConf, - String basePath, FileSystem fs) throws IOException { + public static void initializeBootstrapDirsIfNotExists(Configuration hadoopConf, String basePath, FileSystem fs) throws IOException { // Create bootstrap index by partition folder if it does not exist final Path bootstrap_index_folder_by_partition = @@ -542,7 +550,7 @@ public String getCommitActionType() { */ public List scanHoodieInstantsFromFileSystem(Set includedExtensions, boolean applyLayoutVersionFilters) throws IOException { - return scanHoodieInstantsFromFileSystem(new Path(metaPath), includedExtensions, applyLayoutVersionFilters); + return scanHoodieInstantsFromFileSystem(metaPath, includedExtensions, applyLayoutVersionFilters); } /** @@ -599,19 +607,7 @@ public String toString() { } public void initializeBootstrapDirsIfNotExists() throws IOException { - initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath, getFs()); - } - - public void setBasePath(String basePath) { - this.basePath = basePath; - } - - public void setMetaPath(String metaPath) { - this.metaPath = metaPath; - } - - public void setActiveTimeline(HoodieActiveTimeline activeTimeline) { - this.activeTimeline = activeTimeline; + initializeBootstrapDirsIfNotExists(getHadoopConf(), basePath.toString(), getFs()); } public static Builder builder() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index 208d7ef2ba45..c6e618ac4769 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -95,7 +95,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV private BootstrapIndex bootstrapIndex; private String getPartitionPathFromFilePath(String fullPath) { - return FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), new Path(fullPath).getParent()); + return FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), new Path(fullPath).getParent()); } /** @@ -172,7 +172,7 @@ protected List buildFileGroups(Stream baseFileS Map, List> logFiles = logFileStream.collect(Collectors.groupingBy((logFile) -> { String partitionPathStr = - FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), logFile.getPath().getParent()); + FSUtils.getRelativePartitionPath(metaClient.getBasePathV2(), logFile.getPath().getParent()); return Pair.of(partitionPathStr, logFile.getFileId()); })); @@ -299,7 +299,7 @@ private void ensurePartitionLoadedCorrectly(String partition) { try { LOG.info("Building file system view for partition (" + partitionPathStr + ")"); - Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPathStr); + Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPathStr); long beginLsTs = System.currentTimeMillis(); FileStatus[] statuses = listPartition(partitionPath); long endLsTs = System.currentTimeMillis(); From 5a4956ba6ee3baf0e86042ef2754f1fafd1743d4 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 8 Apr 2022 10:13:18 -0700 Subject: [PATCH 05/15] Fixing tests --- .../hudi/cli/integ/ITTestClusteringCommand.java | 13 +++++-------- .../apache/hudi/cli/integ/ITTestCommitsCommand.java | 10 +++++----- .../hudi/cli/integ/ITTestCompactionCommand.java | 13 +++++-------- 3 files changed, 15 insertions(+), 21 deletions(-) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java index 17075f9d3dfb..1fedb58cbba0 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java @@ -61,21 +61,18 @@ */ public class ITTestClusteringCommand extends AbstractShellIntegrationTest { - private String tablePath; - private String tableName; - @BeforeEach public void init() throws IOException { tableName = "test_table_" + ITTestClusteringCommand.class.getName(); - tablePath = Paths.get(basePath, tableName).toString(); + basePath = Paths.get(basePath, tableName).toString(); HoodieCLI.conf = jsc.hadoopConfiguration(); // Create table and connect new TableCommand().createTable( - tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), + basePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); - metaClient.setBasePath(tablePath); - metaClient = HoodieTableMetaClient.reload(metaClient); + + initMetaClient(); } /** @@ -168,7 +165,7 @@ private void generateCommits() throws IOException { HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); // Create the write client to write some records in - HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withDeleteParallelism(2).forTable(tableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java index 18f4a387d474..fd533be09b6b 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java @@ -54,16 +54,16 @@ public class ITTestCommitsCommand extends AbstractShellIntegrationTest { @BeforeEach public void init() throws IOException { - String tableName = "test_table_" + ITTestCommitsCommand.class.getName(); - String tablePath = Paths.get(basePath, tableName).toString(); + tableName = "test_table_" + ITTestCommitsCommand.class.getName(); + basePath = Paths.get(basePath, tableName).toString(); HoodieCLI.conf = jsc.hadoopConfiguration(); // Create table and connect new TableCommand().createTable( - tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), + basePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); - metaClient.setBasePath(tablePath); - metaClient = HoodieTableMetaClient.reload(metaClient); + + initMetaClient(); } /** diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java index 4734f45e7074..a75250aadd70 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java @@ -73,21 +73,18 @@ */ public class ITTestCompactionCommand extends AbstractShellIntegrationTest { - private String tablePath; - private String tableName; - @BeforeEach public void init() throws IOException { tableName = "test_table_" + ITTestCompactionCommand.class.getName(); - tablePath = Paths.get(basePath, tableName).toString(); + basePath = Paths.get(basePath, tableName).toString(); HoodieCLI.conf = jsc.hadoopConfiguration(); // Create table and connect new TableCommand().createTable( - tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(), + basePath, tableName, HoodieTableType.MERGE_ON_READ.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); - metaClient.setBasePath(tablePath); - metaClient = HoodieTableMetaClient.reload(metaClient); + + initMetaClient(); } /** @@ -298,7 +295,7 @@ private void generateCommits() throws IOException { HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); // Create the write client to write some records in - HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withDeleteParallelism(2).forTable(tableName) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(); From e6e5a28dc19413cfdac8d8e054d00b8892f0abd7 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 8 Apr 2022 12:19:16 -0700 Subject: [PATCH 06/15] Added comments --- .../apache/hudi/metadata/HoodieMetadataPayload.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index ef4a8ff7f170..a070a2247ed9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -143,6 +143,19 @@ public class HoodieMetadataPayload implements HoodieRecordPayload Date: Fri, 8 Apr 2022 14:04:48 -0700 Subject: [PATCH 07/15] Fixing serializability of `HoodieTableMetaClient` --- .../hudi/common/table/HoodieTableMetaClient.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 38d9b27ec8ed..b8a098a978ef 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -97,8 +97,8 @@ public class HoodieTableMetaClient implements Serializable { // NOTE: Since those two parameters lay on the hot-path of a lot of computations, we // use tailored extension of the {@code Path} class allowing to avoid repetitive // computations secured by its immutability - private LazyCachingPath basePath; - private LazyCachingPath metaPath; + private transient LazyCachingPath basePath; + private transient LazyCachingPath metaPath; private transient HoodieWrapperFileSystem fs; private boolean loadActiveTimelineOnLoad; @@ -166,11 +166,18 @@ public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) */ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); + String basePathStr = in.readUTF(); + String metaPathStr = in.readUTF(); + fs = null; // will be lazily initialized + basePath = new LazyCachingPath(basePathStr); + metaPath = new LazyCachingPath(metaPathStr); } private void writeObject(java.io.ObjectOutputStream out) throws IOException { out.defaultWriteObject(); + out.writeBytes(basePath.toString()); + out.writeBytes(metaPath.toString()); } /** From c0aa9130fdcb79661b3234ec24ec0054d916fb9a Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 8 Apr 2022 14:06:29 -0700 Subject: [PATCH 08/15] Fixing compilation --- .../java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java | 2 -- .../java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java | 2 -- 2 files changed, 4 deletions(-) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java index 1fedb58cbba0..97d3d91fb63a 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -37,7 +36,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.testutils.HoodieClientTestBase; - import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.junit.jupiter.api.BeforeEach; diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java index a75250aadd70..267cee70f289 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java @@ -32,7 +32,6 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -48,7 +47,6 @@ import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; - import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.shell.core.CommandResult; From 9e06a743f02efb0b9f8453db160fb5491e0b2ca4 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 8 Apr 2022 17:17:36 -0700 Subject: [PATCH 09/15] `LazyCachingPath` > `CachingPath`; --- .../common/fs/HoodieWrapperFileSystem.java | 4 ++-- ...{LazyCachingPath.java => CachingPath.java} | 23 ++++++++++--------- 2 files changed, 14 insertions(+), 13 deletions(-) rename hudi-common/src/main/java/org/apache/hudi/hadoop/{LazyCachingPath.java => CachingPath.java} (79%) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java index 57fa1f0b0b07..a79d1571afe7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java @@ -48,7 +48,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; -import org.apache.hudi.hadoop.LazyCachingPath; +import org.apache.hudi.hadoop.CachingPath; import java.io.IOException; import java.net.URI; @@ -142,7 +142,7 @@ public static Path convertPathWithScheme(Path oldPath, String newScheme) { try { newURI = new URI(newScheme, oldURI.getUserInfo(), oldURI.getHost(), oldURI.getPort(), oldURI.getPath(), oldURI.getQuery(), oldURI.getFragment()); - return new LazyCachingPath(newURI); + return new CachingPath(newURI); } catch (URISyntaxException e) { // TODO - Better Exception handling throw new RuntimeException(e); diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/LazyCachingPath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java similarity index 79% rename from hudi-common/src/main/java/org/apache/hudi/hadoop/LazyCachingPath.java rename to hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java index 9c2224b3ceb7..01b3eb9d409b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/hadoop/LazyCachingPath.java +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/CachingPath.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path; import javax.annotation.concurrent.ThreadSafe; +import java.io.Serializable; import java.net.URI; /** @@ -31,35 +32,35 @@ * NOTE: This class is thread-safe */ @ThreadSafe -public class LazyCachingPath extends Path { +public class CachingPath extends Path implements Serializable { // NOTE: `volatile` keyword is redundant here and put mostly for reader notice, since all // reads/writes to references are always atomic (including 64-bit JVMs) // https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.7 private volatile String fileName; - private volatile String s; + private volatile String fullPathStr; - public LazyCachingPath(String parent, String child) { + public CachingPath(String parent, String child) { super(parent, child); } - public LazyCachingPath(Path parent, String child) { + public CachingPath(Path parent, String child) { super(parent, child); } - public LazyCachingPath(String parent, Path child) { + public CachingPath(String parent, Path child) { super(parent, child); } - public LazyCachingPath(Path parent, Path child) { + public CachingPath(Path parent, Path child) { super(parent, child); } - public LazyCachingPath(String pathString) throws IllegalArgumentException { + public CachingPath(String pathString) throws IllegalArgumentException { super(pathString); } - public LazyCachingPath(URI aUri) { + public CachingPath(URI aUri) { super(aUri); } @@ -77,9 +78,9 @@ public String getName() { public String toString() { // This value could be overwritten concurrently and that's okay, since // {@code Path} is immutable - if (s == null) { - s = super.toString(); + if (fullPathStr == null) { + fullPathStr = super.toString(); } - return s; + return fullPathStr; } } From f9ec5729c8114356c1d112b109af1170c735e8c5 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 8 Apr 2022 17:18:00 -0700 Subject: [PATCH 10/15] Added `SerializablePath`; Rebased `HoodieTableMetaClient` onto `SerializablePath` --- .../common/table/HoodieTableMetaClient.java | 31 +++++++-------- .../apache/hudi/hadoop/SerializablePath.java | 38 +++++++++++++++++++ 2 files changed, 51 insertions(+), 18 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index b8a098a978ef..9573ad57f388 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -49,7 +49,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hudi.hadoop.LazyCachingPath; +import org.apache.hudi.hadoop.CachingPath; +import org.apache.hudi.hadoop.SerializablePath; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -97,8 +98,8 @@ public class HoodieTableMetaClient implements Serializable { // NOTE: Since those two parameters lay on the hot-path of a lot of computations, we // use tailored extension of the {@code Path} class allowing to avoid repetitive // computations secured by its immutability - private transient LazyCachingPath basePath; - private transient LazyCachingPath metaPath; + private transient SerializablePath basePath; + private transient SerializablePath metaPath; private transient HoodieWrapperFileSystem fs; private boolean loadActiveTimelineOnLoad; @@ -118,10 +119,10 @@ private HoodieTableMetaClient(Configuration conf, String basePath, boolean loadA this.consistencyGuardConfig = consistencyGuardConfig; this.fileSystemRetryConfig = fileSystemRetryConfig; this.hadoopConf = new SerializableConfiguration(conf); - this.basePath = new LazyCachingPath(basePath); - this.metaPath = new LazyCachingPath(basePath, METAFOLDER_NAME); + this.basePath = new SerializablePath(new CachingPath(basePath)); + this.metaPath = new SerializablePath(new CachingPath(basePath, METAFOLDER_NAME)); this.fs = getFs(); - TableNotFoundException.checkTableValidity(fs, this.basePath, metaPath); + TableNotFoundException.checkTableValidity(fs, this.basePath.get(), metaPath.get()); this.tableConfig = new HoodieTableConfig(fs, metaPath.toString(), payloadClassName); this.tableType = tableConfig.getTableType(); Option tableConfigVersion = tableConfig.getTimelineLayoutVersion(); @@ -166,25 +167,19 @@ public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) */ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); - String basePathStr = in.readUTF(); - String metaPathStr = in.readUTF(); fs = null; // will be lazily initialized - basePath = new LazyCachingPath(basePathStr); - metaPath = new LazyCachingPath(metaPathStr); } private void writeObject(java.io.ObjectOutputStream out) throws IOException { out.defaultWriteObject(); - out.writeBytes(basePath.toString()); - out.writeBytes(metaPath.toString()); } /** * Returns base path of the table */ public Path getBasePathV2() { - return basePath; + return basePath.get(); } /** @@ -193,7 +188,7 @@ public Path getBasePathV2() { */ @Deprecated public String getBasePath() { - return basePath.toString(); // this invocation is cached + return basePath.get().toString(); // this invocation is cached } /** @@ -207,14 +202,14 @@ public HoodieTableType getTableType() { * @return Meta path */ public String getMetaPath() { - return metaPath.toString(); // this invocation is cached + return metaPath.get().toString(); // this invocation is cached } /** * @return schema folder path */ public String getSchemaFolderName() { - return new Path(metaPath, SCHEMA_FOLDER_NAME).toString(); + return new Path(metaPath.get(), SCHEMA_FOLDER_NAME).toString(); } /** @@ -286,7 +281,7 @@ public TimelineLayoutVersion getTimelineLayoutVersion() { */ public HoodieWrapperFileSystem getFs() { if (fs == null) { - FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.newCopy()); + FileSystem fileSystem = FSUtils.getFs(metaPath.get(), hadoopConf.newCopy()); if (fileSystemRetryConfig.isFileSystemActionRetryEnable()) { fileSystem = new HoodieRetryWrapperFileSystem(fileSystem, @@ -557,7 +552,7 @@ public String getCommitActionType() { */ public List scanHoodieInstantsFromFileSystem(Set includedExtensions, boolean applyLayoutVersionFilters) throws IOException { - return scanHoodieInstantsFromFileSystem(metaPath, includedExtensions, applyLayoutVersionFilters); + return scanHoodieInstantsFromFileSystem(metaPath.get(), includedExtensions, applyLayoutVersionFilters); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java new file mode 100644 index 000000000000..7e26bd81562d --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java @@ -0,0 +1,38 @@ +package org.apache.hudi.hadoop; + +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +/** + * {@link Serializable} wrapper encapsulating {@link Path} + */ +public class SerializablePath implements Serializable { + + private Path path; + + public SerializablePath(Path path) { + this.path = path; + } + + public Path get() { + return path; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(path.toString()); + } + + private void readObject(ObjectInputStream in) throws IOException { + String pathStr = in.readUTF(); + path = new CachingPath(pathStr); + } + + @Override + public String toString() { + return path.toString(); + } +} From 447e25246995a655e7132b8702b5fd46486e5a55 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 8 Apr 2022 17:20:33 -0700 Subject: [PATCH 11/15] Missing license --- .../apache/hudi/hadoop/SerializablePath.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java index 7e26bd81562d..e5b258067745 100644 --- a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hudi.hadoop; import org.apache.hadoop.fs.Path; From a6b6d88511769a6f78f8909d0556ab683e04b852 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 8 Apr 2022 18:04:20 -0700 Subject: [PATCH 12/15] Removing `transient` annotations --- .../org/apache/hudi/common/table/HoodieTableMetaClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 9573ad57f388..23623976d915 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -98,8 +98,8 @@ public class HoodieTableMetaClient implements Serializable { // NOTE: Since those two parameters lay on the hot-path of a lot of computations, we // use tailored extension of the {@code Path} class allowing to avoid repetitive // computations secured by its immutability - private transient SerializablePath basePath; - private transient SerializablePath metaPath; + private SerializablePath basePath; + private SerializablePath metaPath; private transient HoodieWrapperFileSystem fs; private boolean loadActiveTimelineOnLoad; From 83979a633593b684af7a7e9924c3f2cadf464897 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 8 Apr 2022 18:48:09 -0700 Subject: [PATCH 13/15] Implement missing `equals` --- .../org/apache/hudi/hadoop/SerializablePath.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java index e5b258067745..e6c3c6449751 100644 --- a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java @@ -19,11 +19,13 @@ package org.apache.hudi.hadoop; import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.table.HoodieTableMetaClient; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.util.Objects; /** * {@link Serializable} wrapper encapsulating {@link Path} @@ -53,4 +55,16 @@ private void readObject(ObjectInputStream in) throws IOException { public String toString() { return path.toString(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SerializablePath that = (SerializablePath) o; + return Objects.equals(path, that.path); + } } From d68663cb71c525d4dcdd7c32dfb1a166dfb5c51b Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 8 Apr 2022 19:03:32 -0700 Subject: [PATCH 14/15] `lint` --- .../src/main/java/org/apache/hudi/hadoop/SerializablePath.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java index e6c3c6449751..5ad2307ef804 100644 --- a/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java +++ b/hudi-common/src/main/java/org/apache/hudi/hadoop/SerializablePath.java @@ -19,7 +19,6 @@ package org.apache.hudi.hadoop; import org.apache.hadoop.fs.Path; -import org.apache.hudi.common.table.HoodieTableMetaClient; import java.io.IOException; import java.io.ObjectInputStream; From a87c363fbd49ab8ce17a5f1d77d92205b7f9ada4 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Sat, 9 Apr 2022 10:38:03 -0700 Subject: [PATCH 15/15] Lazy init builder stubs to defer initialization until actually used (to workaround CI issues of building Flink w/ incorrect version of Avro, breaking code-gen) --- .../hudi/metadata/HoodieMetadataPayload.java | 49 ++++++++++--------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index a070a2247ed9..c9bdc59da976 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -54,6 +54,7 @@ import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.io.storage.HoodieHFileReader; +import org.apache.hudi.util.Lazy; import java.io.IOException; import java.math.BigDecimal; @@ -156,17 +157,17 @@ public class HoodieMetadataPayload implements HoodieRecordPayload METADATA_COLUMN_STATS_BUILDER_STUB = Lazy.lazily(HoodieMetadataColumnStats::newBuilder); + private static final Lazy STRING_WRAPPER_BUILDER_STUB = Lazy.lazily(StringWrapper::newBuilder); + private static final Lazy BYTES_WRAPPER_BUILDER_STUB = Lazy.lazily(BytesWrapper::newBuilder); + private static final Lazy DOUBLE_WRAPPER_BUILDER_STUB = Lazy.lazily(DoubleWrapper::newBuilder); + private static final Lazy FLOAT_WRAPPER_BUILDER_STUB = Lazy.lazily(FloatWrapper::newBuilder); + private static final Lazy LONG_WRAPPER_BUILDER_STUB = Lazy.lazily(LongWrapper::newBuilder); + private static final Lazy INT_WRAPPER_BUILDER_STUB = Lazy.lazily(IntWrapper::newBuilder); + private static final Lazy BOOLEAN_WRAPPER_BUILDER_STUB = Lazy.lazily(BooleanWrapper::newBuilder); + private static final Lazy TIMESTAMP_MICROS_WRAPPER_BUILDER_STUB = Lazy.lazily(TimestampMicrosWrapper::newBuilder); + private static final Lazy DECIMAL_WRAPPER_BUILDER_STUB = Lazy.lazily(DecimalWrapper::newBuilder); + private static final Lazy DATE_WRAPPER_BUILDER_STUB = Lazy.lazily(DateWrapper::newBuilder); private String key = null; private int type = 0; @@ -226,7 +227,7 @@ public HoodieMetadataPayload(Option recordOpt) { checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_COLUMN_STATS) == null, String.format("Valid %s record expected for type: %s", SCHEMA_FIELD_ID_COLUMN_STATS, METADATA_TYPE_COLUMN_STATS)); } else { - columnStatMetadata = HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB) + columnStatMetadata = HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get()) .setFileName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME)) .setColumnName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME)) .setMinValue(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE)) @@ -630,7 +631,7 @@ private static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataC .max(Comparator.naturalOrder()) .orElse(null); - return HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB) + return HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get()) .setFileName(newColumnStats.getFileName()) .setColumnName(newColumnStats.getColumnName()) .setMinValue(wrapStatisticValue(minValue)) @@ -678,11 +679,13 @@ private static Object wrapStatisticValue(Comparable statValue) { LocalDate localDate = statValue instanceof LocalDate ? (LocalDate) statValue : ((Date) statValue).toLocalDate(); - return DateWrapper.newBuilder(DATE_WRAPPER_BUILDER_STUB).setValue((int) localDate.toEpochDay()).build(); + return DateWrapper.newBuilder(DATE_WRAPPER_BUILDER_STUB.get()) + .setValue((int) localDate.toEpochDay()) + .build(); } else if (statValue instanceof BigDecimal) { Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema(); BigDecimal upcastDecimal = tryUpcastDecimal((BigDecimal) statValue, (LogicalTypes.Decimal) valueSchema.getLogicalType()); - return DecimalWrapper.newBuilder(DECIMAL_WRAPPER_BUILDER_STUB) + return DecimalWrapper.newBuilder(DECIMAL_WRAPPER_BUILDER_STUB.get()) .setValue(AVRO_DECIMAL_CONVERSION.toBytes(upcastDecimal, valueSchema, valueSchema.getLogicalType())) .build(); } else if (statValue instanceof Timestamp) { @@ -690,23 +693,23 @@ private static Object wrapStatisticValue(Comparable statValue) { // rely on logical types to do proper encoding of the native Java types, // and hereby have to encode statistic manually Instant instant = ((Timestamp) statValue).toInstant(); - return TimestampMicrosWrapper.newBuilder(TIMESTAMP_MICROS_WRAPPER_BUILDER_STUB) + return TimestampMicrosWrapper.newBuilder(TIMESTAMP_MICROS_WRAPPER_BUILDER_STUB.get()) .setValue(instantToMicros(instant)) .build(); } else if (statValue instanceof Boolean) { - return BooleanWrapper.newBuilder(BOOLEAN_WRAPPER_BUILDER_STUB).setValue((Boolean) statValue).build(); + return BooleanWrapper.newBuilder(BOOLEAN_WRAPPER_BUILDER_STUB.get()).setValue((Boolean) statValue).build(); } else if (statValue instanceof Integer) { - return IntWrapper.newBuilder(INT_WRAPPER_BUILDER_STUB).setValue((Integer) statValue).build(); + return IntWrapper.newBuilder(INT_WRAPPER_BUILDER_STUB.get()).setValue((Integer) statValue).build(); } else if (statValue instanceof Long) { - return LongWrapper.newBuilder(LONG_WRAPPER_BUILDER_STUB).setValue((Long) statValue).build(); + return LongWrapper.newBuilder(LONG_WRAPPER_BUILDER_STUB.get()).setValue((Long) statValue).build(); } else if (statValue instanceof Float) { - return FloatWrapper.newBuilder(FLOAT_WRAPPER_BUILDER_STUB).setValue((Float) statValue).build(); + return FloatWrapper.newBuilder(FLOAT_WRAPPER_BUILDER_STUB.get()).setValue((Float) statValue).build(); } else if (statValue instanceof Double) { - return DoubleWrapper.newBuilder(DOUBLE_WRAPPER_BUILDER_STUB).setValue((Double) statValue).build(); + return DoubleWrapper.newBuilder(DOUBLE_WRAPPER_BUILDER_STUB.get()).setValue((Double) statValue).build(); } else if (statValue instanceof ByteBuffer) { - return BytesWrapper.newBuilder(BYTES_WRAPPER_BUILDER_STUB).setValue((ByteBuffer) statValue).build(); + return BytesWrapper.newBuilder(BYTES_WRAPPER_BUILDER_STUB.get()).setValue((ByteBuffer) statValue).build(); } else if (statValue instanceof String || statValue instanceof Utf8) { - return StringWrapper.newBuilder(STRING_WRAPPER_BUILDER_STUB).setValue(statValue.toString()).build(); + return StringWrapper.newBuilder(STRING_WRAPPER_BUILDER_STUB.get()).setValue(statValue.toString()).build(); } else { throw new UnsupportedOperationException(String.format("Unsupported type of the statistic (%s)", statValue.getClass())); }