Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
Expand All @@ -37,7 +36,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.testutils.HoodieClientTestBase;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -61,21 +59,18 @@
*/
public class ITTestClusteringCommand extends AbstractShellIntegrationTest {

private String tablePath;
private String tableName;

@BeforeEach
public void init() throws IOException {
tableName = "test_table_" + ITTestClusteringCommand.class.getName();
tablePath = Paths.get(basePath, tableName).toString();
basePath = Paths.get(basePath, tableName).toString();

HoodieCLI.conf = jsc.hadoopConfiguration();
// Create table and connect
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
basePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
metaClient.setBasePath(tablePath);
metaClient = HoodieTableMetaClient.reload(metaClient);

initMetaClient();
}

/**
Expand Down Expand Up @@ -168,7 +163,7 @@ private void generateCommits() throws IOException {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();

// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ public class ITTestCommitsCommand extends AbstractShellIntegrationTest {

@BeforeEach
public void init() throws IOException {
String tableName = "test_table_" + ITTestCommitsCommand.class.getName();
String tablePath = Paths.get(basePath, tableName).toString();
tableName = "test_table_" + ITTestCommitsCommand.class.getName();
basePath = Paths.get(basePath, tableName).toString();

HoodieCLI.conf = jsc.hadoopConfiguration();
// Create table and connect
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
basePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
metaClient.setBasePath(tablePath);
metaClient = HoodieTableMetaClient.reload(metaClient);

initMetaClient();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
Expand All @@ -48,7 +47,6 @@
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult;
Expand All @@ -73,21 +71,18 @@
*/
public class ITTestCompactionCommand extends AbstractShellIntegrationTest {

private String tablePath;
private String tableName;

@BeforeEach
public void init() throws IOException {
tableName = "test_table_" + ITTestCompactionCommand.class.getName();
tablePath = Paths.get(basePath, tableName).toString();
basePath = Paths.get(basePath, tableName).toString();

HoodieCLI.conf = jsc.hadoopConfiguration();
// Create table and connect
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
basePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
metaClient.setBasePath(tablePath);
metaClient = HoodieTableMetaClient.reload(metaClient);

initMetaClient();
}

/**
Expand Down Expand Up @@ -298,7 +293,7 @@ private void generateCommits() throws IOException {
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();

// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
Expand Down
24 changes: 16 additions & 8 deletions hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,26 @@ public static Configuration prepareHadoopConf(Configuration conf) {
return conf;
}

public static FileSystem getFs(String path, Configuration conf) {
public static FileSystem getFs(String pathStr, Configuration conf) {
return getFs(new Path(pathStr), conf);
}

public static FileSystem getFs(Path path, Configuration conf) {
FileSystem fs;
prepareHadoopConf(conf);
try {
fs = new Path(path).getFileSystem(conf);
fs = path.getFileSystem(conf);
} catch (IOException e) {
throw new HoodieIOException("Failed to get instance of " + FileSystem.class.getName(), e);
}
return fs;
}

public static FileSystem getFs(String path, Configuration conf, boolean localByDefault) {
public static FileSystem getFs(String pathStr, Configuration conf, boolean localByDefault) {
if (localByDefault) {
return getFs(addSchemeIfLocalPath(path).toString(), conf);
return getFs(addSchemeIfLocalPath(pathStr), conf);
}
return getFs(path, conf);
return getFs(pathStr, conf);
}

/**
Expand Down Expand Up @@ -178,7 +182,7 @@ public static String getCommitFromCommitFile(String commitFileName) {
}

public static String getCommitTime(String fullFileName) {
if (isLogFile(new Path(fullFileName))) {
if (isLogFile(fullFileName)) {
return fullFileName.split("_")[1].split("\\.")[0];
}
return fullFileName.split("_")[2].split("\\.")[0];
Expand Down Expand Up @@ -461,8 +465,12 @@ public static boolean isBaseFile(Path path) {
}

public static boolean isLogFile(Path logPath) {
Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName());
return matcher.find() && logPath.getName().contains(".log");
return isLogFile(logPath.getName());
}

public static boolean isLogFile(String fileName) {
Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
return matcher.find() && fileName.contains(".log");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import org.apache.hudi.hadoop.FileNameCachingPath;
import org.apache.hudi.hadoop.CachingPath;

import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -142,7 +142,7 @@ public static Path convertPathWithScheme(Path oldPath, String newScheme) {
try {
newURI = new URI(newScheme, oldURI.getUserInfo(), oldURI.getHost(), oldURI.getPort(), oldURI.getPath(),
oldURI.getQuery(), oldURI.getFragment());
return new FileNameCachingPath(newURI);
return new CachingPath(newURI);
} catch (URISyntaxException e) {
// TODO - Better Exception handling
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,43 @@
public class BaseFile implements Serializable {

private static final long serialVersionUID = 1L;

private transient FileStatus fileStatus;
private final String fullPath;
private final String fileName;
private long fileLen;

public BaseFile(BaseFile dataFile) {
this.fileStatus = dataFile.fileStatus;
this.fullPath = dataFile.fullPath;
this.fileLen = dataFile.fileLen;
this(dataFile.fileStatus,
dataFile.fullPath,
dataFile.getFileName(),
dataFile.getFileLen());
}

public BaseFile(FileStatus fileStatus) {
this.fileStatus = fileStatus;
this.fullPath = fileStatus.getPath().toString();
this.fileLen = fileStatus.getLen();
this(fileStatus,
fileStatus.getPath().toString(),
fileStatus.getPath().getName(),
fileStatus.getLen());
}

public BaseFile(String filePath) {
this.fileStatus = null;
this.fullPath = filePath;
this.fileLen = -1;
this(null, filePath, getFileName(filePath), -1);
}

private BaseFile(FileStatus fileStatus, String fullPath, String fileName, long fileLen) {
this.fileStatus = fileStatus;
this.fullPath = fullPath;
this.fileLen = fileLen;
this.fileName = fileName;
}

public String getPath() {
return fullPath;
}

public String getFileName() {
return new Path(fullPath).getName();
return fileName;
}

public FileStatus getFileStatus() {
Expand Down Expand Up @@ -98,4 +107,8 @@ public int hashCode() {
public String toString() {
return "BaseFile{fullPath=" + fullPath + ", fileLen=" + fileLen + '}';
}

private static String getFileName(String fullPath) {
return new Path(fullPath).getName();
}
}
Loading