Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.hudi.common.fs.inline;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.util.ValidationUtils;

import java.io.File;

/**
* Utils to parse InLineFileSystem paths.
Expand All @@ -29,46 +32,58 @@
public class InLineFSUtils {
private static final String START_OFFSET_STR = "start_offset";
private static final String LENGTH_STR = "length";
private static final String PATH_SEPARATOR = "/";
private static final String SCHEME_SEPARATOR = ":";
private static final String EQUALS_STR = "=";
private static final String LOCAL_FILESYSTEM_SCHEME = "file";

/**
* Fetch inline file path from outer path.
* Eg
* Input:
* Path = s3a://file1, origScheme: file, startOffset = 20, length = 40
* Output: "inlinefs:/file1/s3a/?start_offset=20&length=40"
* Get the InlineFS Path for a given schema and its Path.
* <p>
* Examples:
* Input Path: s3a://file1, origScheme: file, startOffset = 20, length = 40
* Output: "inlinefs://file1/s3a/?start_offset=20&length=40"
*
* @param outerPath
* @param origScheme
* @param inLineStartOffset
* @param inLineLength
* @return
* @param outerPath The outer file Path
* @param origScheme The file schema
* @param inLineStartOffset Start offset for the inline file
* @param inLineLength Length for the inline file
* @return InlineFS Path for the requested outer path and schema
*/
public static Path getInlineFilePath(Path outerPath, String origScheme, long inLineStartOffset, long inLineLength) {
String subPath = outerPath.toString().substring(outerPath.toString().indexOf(":") + 1);
final String subPath = new File(outerPath.toString().substring(outerPath.toString().indexOf(":") + 1)).getPath();
return new Path(
InLineFileSystem.SCHEME + "://" + subPath + "/" + origScheme
+ "/" + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset
InLineFileSystem.SCHEME + SCHEME_SEPARATOR + PATH_SEPARATOR + subPath + PATH_SEPARATOR + origScheme
+ PATH_SEPARATOR + "?" + START_OFFSET_STR + EQUALS_STR + inLineStartOffset
+ "&" + LENGTH_STR + EQUALS_STR + inLineLength
);
}

/**
* Inline file format
* "inlinefs://<path_to_outer_file>/<outer_file_scheme>/?start_offset=start_offset>&length=<length>"
* Outer File format
* "<outer_file_scheme>://<path_to_outer_file>"
* InlineFS Path format:
* "inlinefs://path/to/outer/file/outer_file_schema/?start_offset=start_offset>&length=<length>"
* <p>
* Eg input : "inlinefs://file1/sa3/?start_offset=20&length=40".
* Output : "sa3://file1"
* Outer File Path format:
* "outer_file_schema://path/to/outer/file"
* <p>
* Example
* Input: "inlinefs://file1/s3a/?start_offset=20&length=40".
* Output: "s3a://file1"
*
* @param inlinePath inline file system path
* @return
* @param inlineFSPath InLineFS Path to get the outer file Path
* @return Outer file Path from the InLineFS Path
*/
public static Path getOuterfilePathFromInlinePath(Path inlinePath) {
String scheme = inlinePath.getParent().getName();
Path basePath = inlinePath.getParent().getParent();
return new Path(basePath.toString().replaceFirst(InLineFileSystem.SCHEME, scheme));
public static Path getOuterFilePathFromInlinePath(Path inlineFSPath) {
final String scheme = inlineFSPath.getParent().getName();
final Path basePath = inlineFSPath.getParent().getParent();
ValidationUtils.checkArgument(basePath.toString().contains(SCHEME_SEPARATOR),
"Invalid InLineFSPath: " + inlineFSPath);

final String pathExceptScheme = basePath.toString().substring(basePath.toString().indexOf(SCHEME_SEPARATOR) + 1);
final String fullPath = scheme + SCHEME_SEPARATOR
+ (scheme.equals(LOCAL_FILESYSTEM_SCHEME) ? PATH_SEPARATOR : "")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this work for hdfs scheme? does n't that expect a / , just like file (LOCAL_FILESYSTEM_SCHEME)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I am not wrong, pathExceptScheme has the slashes. even I had the same doubt when I reviewed the patch and had to clarify.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar
Right, hdfs/s3a/s3 and all other non-local fs needs :// as scheme separator. Where as the local fs needs :/ or :///. Unit test was added to verify the expected inlinefs forward conversion and backward conversions - https://github.com/apache/hudi/pull/3977/files#diff-5b10f493d7ba18b2f58b6c2fe4e544413d47ceb0d24a980ec07b9b1b14dca167R312

+ pathExceptScheme;
return new Path(fullPath);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public String getScheme() {

@Override
public FSDataInputStream open(Path inlinePath, int bufferSize) throws IOException {
Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
Path outerPath = InLineFSUtils.getOuterFilePathFromInlinePath(inlinePath);
FileSystem outerFs = outerPath.getFileSystem(conf);
FSDataInputStream outerStream = outerFs.open(outerPath, bufferSize);
return new InLineFsDataInputStream(InLineFSUtils.startOffset(inlinePath), outerStream, InLineFSUtils.length(inlinePath));
Expand All @@ -80,7 +80,7 @@ public boolean exists(Path f) {

@Override
public FileStatus getFileStatus(Path inlinePath) throws IOException {
Path outerPath = InLineFSUtils.getOuterfilePathFromInlinePath(inlinePath);
Path outerPath = InLineFSUtils.getOuterFilePathFromInlinePath(inlinePath);
FileSystem outerFs = outerPath.getFileSystem(conf);
FileStatus status = outerFs.getFileStatus(outerPath);
FileStatus toReturn = new FileStatus(InLineFSUtils.length(inlinePath), status.isDirectory(), status.getReplication(), status.getBlockSize(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
private final Schema readerSchema;
private final boolean readBlocksLazily;
private final boolean reverseLogReader;
private final boolean enableInLineReading;
private int bufferSize;

private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class);
Expand All @@ -62,6 +63,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
this.reverseLogReader = reverseLogReader;
this.bufferSize = bufferSize;
this.prevReadersInOpenState = new ArrayList<>();
this.enableInLineReading = enableInlineReading;
if (logFiles.size() > 0) {
HoodieLogFile nextLogFile = logFiles.remove(0);
this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading);
Expand Down Expand Up @@ -104,7 +106,8 @@ public boolean hasNext() {
this.prevReadersInOpenState.add(currentReader);
}
this.currentReader =
new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false);
new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false,
this.enableInLineReading);
} catch (IOException io) {
throw new HoodieIOException("unable to initialize read with log file ", io);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,64 @@ public void testsetWorkingDirectory() throws IOException {
}, "Should have thrown exception");
}

static class TestFSPath {
final Path inputPath;
final Path expectedInLineFSPath;
final Path transformedInputPath;

TestFSPath(final Path inputPath, final Path expectedInLineFSPath, final Path transformedInputPath) {
this.inputPath = inputPath;
this.expectedInLineFSPath = expectedInLineFSPath;
this.transformedInputPath = transformedInputPath;
}
}

@Test
public void testInLineFSPathConversions() {
final List<TestFSPath> expectedInLinePaths = Arrays.asList(
new TestFSPath(
new Path("/zero/524bae7e-f01d-47ae-b7cd-910400a81336"),
new Path("inlinefs://zero/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"),
new Path("file:/zero/524bae7e-f01d-47ae-b7cd-910400a81336")),
new TestFSPath(
new Path("file:/one/524bae7e-f01d-47ae-b7cd-910400a81336"),
new Path("inlinefs://one/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"),
new Path("file:/one/524bae7e-f01d-47ae-b7cd-910400a81336")),
new TestFSPath(
new Path("file://two/524bae7e-f01d-47ae-b7cd-910400a81336"),
new Path("inlinefs://two/524bae7e-f01d-47ae-b7cd-910400a81336/file/?start_offset=10&length=10"),
new Path("file:/two/524bae7e-f01d-47ae-b7cd-910400a81336")),
new TestFSPath(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar : here are examples for hdfs path conversion to inline paths.

new Path("hdfs://three/524bae7e-f01d-47ae-b7cd-910400a81336"),
new Path("inlinefs://three/524bae7e-f01d-47ae-b7cd-910400a81336/hdfs/?start_offset=10&length=10"),
new Path("hdfs://three/524bae7e-f01d-47ae-b7cd-910400a81336")),
new TestFSPath(
new Path("s3://four/524bae7e-f01d-47ae-b7cd-910400a81336"),
new Path("inlinefs://four/524bae7e-f01d-47ae-b7cd-910400a81336/s3/?start_offset=10&length=10"),
new Path("s3://four/524bae7e-f01d-47ae-b7cd-910400a81336")),
new TestFSPath(
new Path("s3a://five/524bae7e-f01d-47ae-b7cd-910400a81336"),
new Path("inlinefs://five/524bae7e-f01d-47ae-b7cd-910400a81336/s3a/?start_offset=10&length=10"),
new Path("s3a://five/524bae7e-f01d-47ae-b7cd-910400a81336"))
);

for (TestFSPath entry : expectedInLinePaths) {
final Path inputPath = entry.inputPath;
final Path expectedInLineFSPath = entry.expectedInLineFSPath;
final Path expectedTransformedInputPath = entry.transformedInputPath;

String scheme = "file";
if (inputPath.toString().contains(":")) {
scheme = inputPath.toString().split(":")[0];
}
final Path actualInLineFSPath = InLineFSUtils.getInlineFilePath(inputPath, scheme, 10, 10);
assertEquals(expectedInLineFSPath, actualInLineFSPath);

final Path actualOuterFilePath = InLineFSUtils.getOuterFilePathFromInlinePath(actualInLineFSPath);
assertEquals(expectedTransformedInputPath, actualOuterFilePath);
}
}

@Test
public void testExists() throws IOException {
Path inlinePath = getRandomInlinePath();
Expand Down