Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private static int deduplicatePartitionPath(JavaSparkContext jsc,
String basePath)
throws Exception {
DedupeSparkJob job = new DedupeSparkJob(basePath,
duplicatedPartitionPath,repairedOutputPath,new SQLContext(jsc), FSUtils.getFs());
duplicatedPartitionPath,repairedOutputPath,new SQLContext(jsc), FSUtils.getFs(basePath));
job.fixDuplicates(true);
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class HoodieReadClient implements Serializable {
*/
public HoodieReadClient(JavaSparkContext jsc, String basePath) {
this.jsc = jsc;
this.fs = FSUtils.getFs();
this.fs = FSUtils.getFs(basePath);
// Create a Hoodie table which encapsulated the commits and files visible
this.hoodieTable = HoodieTable
.getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) t
* @param rollbackInFlight
*/
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackInFlight) {
this.fs = FSUtils.getFs();
this.jsc = jsc;
this.config = clientConfig;
this.fs = FSUtils.getFs(config.getBasePath());
this.index = HoodieIndex.createIndex(config, jsc);
this.metrics = new HoodieMetrics(config, config.getTableName());
this.archiveLog = new HoodieCommitArchiveLog(clientConfig, fs);
Expand Down Expand Up @@ -654,7 +654,7 @@ private void rollback(List<String> commits) {
.map((Function<String, HoodieRollbackStat>) partitionPath -> {
// Scan all partitions files with this commit time
logger.info("Cleaning path " + partitionPath);
FileSystem fs1 = FSUtils.getFs();
FileSystem fs1 = FSUtils.getFs(config.getBasePath());
FileStatus[] toBeDeleted =
fs1.listStatus(new Path(config.getBasePath(), partitionPath), path -> {
if(!path.toString().contains(".parquet")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public HoodieIOHandle(HoodieWriteConfig config, String commitTime,
HoodieTable<T> hoodieTable) {
this.commitTime = commitTime;
this.config = config;
this.fs = FSUtils.getFs();
this.fs = FSUtils.getFs(config.getBasePath());
this.hoodieTable = hoodieTable;
this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline();
this.fileSystemView = hoodieTable.getROFileSystemView();
Expand All @@ -74,7 +74,7 @@ public static void cleanupTmpFilesFromCurrentCommit(HoodieWriteConfig config,
String commitTime,
String partitionPath,
int taskPartitionId) {
FileSystem fs = FSUtils.getFs();
FileSystem fs = FSUtils.getFs(config.getBasePath());
try {
FileStatus[] prevFailedFiles = fs.globStatus(new Path(String
.format("%s/%s/%s", config.getBasePath(), partitionPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private boolean isCompactionSucceeded(HoodieCompactionMetadata result) {
private List<CompactionWriteStat> executeCompaction(HoodieTableMetaClient metaClient,
HoodieWriteConfig config, CompactionOperation operation, String commitTime)
throws IOException {
FileSystem fs = FSUtils.getFs();
FileSystem fs = FSUtils.getFs(config.getBasePath());
Schema readerSchema =
HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.uber.hoodie.avro.HoodieAvroWriteSupport;
import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.HoodieAvroUtils;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -52,10 +53,9 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe
private final String commitTime;
private final Schema schema;


private static Configuration registerFileSystem(Configuration conf) {
private static Configuration registerFileSystem(Path file, Configuration conf) {
Configuration returnConf = new Configuration(conf);
String scheme = FileSystem.getDefaultUri(conf).getScheme();
String scheme = FSUtils.getFs(file.toString(), conf).getScheme();
returnConf.set("fs." + HoodieWrapperFileSystem.getHoodieScheme(scheme) + ".impl",
HoodieWrapperFileSystem.class.getName());
return returnConf;
Expand All @@ -69,11 +69,11 @@ public HoodieParquetWriter(String commitTime, Path file,
parquetConfig.getPageSize(), parquetConfig.getPageSize(),
ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, ParquetWriter.DEFAULT_WRITER_VERSION,
registerFileSystem(parquetConfig.getHadoopConf()));
registerFileSystem(file, parquetConfig.getHadoopConf()));
this.file =
HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf());
this.fs = (HoodieWrapperFileSystem) this.file
.getFileSystem(registerFileSystem(parquetConfig.getHadoopConf()));
.getFileSystem(registerFileSystem(file, parquetConfig.getHadoopConf()));
// We cannot accurately measure the snappy compressed output file size. We are choosing a conservative 10%
// TODO - compute this compression ratio dynamically by looking at the bytes written to the stream and the actual file size reported by HDFS
this.maxFileSize = parquetConfig.getMaxFileSize() + Math
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.IOException;

public class HoodieStorageWriterFactory {

public static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieStorageWriter<R> getStorageWriter(
String commitTime, Path path, HoodieTable<T> hoodieTable, HoodieWriteConfig config, Schema schema)
throws IOException {
Expand All @@ -50,7 +51,7 @@ private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieSt
HoodieParquetConfig parquetConfig =
new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP,
config.getParquetBlockSize(), config.getParquetPageSize(),
config.getParquetMaxFileSize(), FSUtils.getFs().getConf());
config.getParquetMaxFileSize(), FSUtils.getFs(config.getBasePath()).getConf());

return new HoodieParquetWriter<>(commitTime, path, parquetConfig, schema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.uber.hoodie.io.storage;

import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.exception.HoodieIOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.AclEntry;
Expand Down Expand Up @@ -67,7 +69,12 @@ public class HoodieWrapperFileSystem extends FileSystem {

@Override public void initialize(URI uri, Configuration conf) throws IOException {
// Get the default filesystem to decorate
fileSystem = FileSystem.get(conf);
Path path = new Path(uri);
// Remove 'hoodie-' prefix from path
if (path.toString().startsWith(HOODIE_SCHEME_PREFIX)) {
path = new Path(path.toString().replace(HOODIE_SCHEME_PREFIX, ""));
}
this.fileSystem = FSUtils.getFs(path.toString(), conf);
// Do not need to explicitly initialize the default filesystem, its done already in the above FileSystem.get
// fileSystem.initialize(FileSystem.getDefaultUri(conf), conf);
// fileSystem.setConf(conf);
Expand Down Expand Up @@ -632,8 +639,12 @@ public Path convertToHoodiePath(Path oldPath) {
}

public static Path convertToHoodiePath(Path file, Configuration conf) {
String scheme = FileSystem.getDefaultUri(conf).getScheme();
return convertPathWithScheme(file, getHoodieScheme(scheme));
try {
String scheme = FSUtils.getFs(file.toString(), conf).getScheme();
return convertPathWithScheme(file, getHoodieScheme(scheme));
} catch (HoodieIOException e) {
throw e;
}
}

private Path convertToDefaultPath(Path oldPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileLo
throw new HoodieUpsertException("Error in finding the old file path at commit " +
commitTime +" at fileLoc: " + fileLoc);
} else {
Configuration conf = FSUtils.getFs().getConf();
Configuration conf = FSUtils.getFs(config.getBasePath()).getConf();
AvroReadSupport.setAvroReadSchema(conf, upsertHandle.getSchema());
ParquetReader<IndexedRecord> reader =
AvroParquetReader.builder(upsertHandle.getOldFilePath()).withConf(conf).build();
Expand Down
2 changes: 1 addition & 1 deletion hoodie-client/src/test/java/HoodieClientExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void run() throws Exception {

// initialize the table, if not done already
Path path = new Path(tablePath);
FileSystem fs = FSUtils.getFs();
FileSystem fs = FSUtils.getFs(tablePath);
if (!fs.exists(path)) {
HoodieTableMetaClient.initTableType(fs, tablePath, HoodieTableType.valueOf(tableType), tableName);
}
Expand Down
Loading