diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java index 1564b87b2bec8..7513f876e193a 100644 --- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java +++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SparkMain.java @@ -98,7 +98,7 @@ private static int deduplicatePartitionPath(JavaSparkContext jsc, String basePath) throws Exception { DedupeSparkJob job = new DedupeSparkJob(basePath, - duplicatedPartitionPath,repairedOutputPath,new SQLContext(jsc), FSUtils.getFs()); + duplicatedPartitionPath,repairedOutputPath,new SQLContext(jsc), FSUtils.getFs(basePath)); job.fixDuplicates(true); return 0; } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java index b57bd3285207e..84b1e43016fb8 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java @@ -87,7 +87,7 @@ public class HoodieReadClient implements Serializable { */ public HoodieReadClient(JavaSparkContext jsc, String basePath) { this.jsc = jsc; - this.fs = FSUtils.getFs(); + this.fs = FSUtils.getFs(basePath); // Create a Hoodie table which encapsulated the commits and files visible this.hoodieTable = HoodieTable .getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index c24227d399482..cc8e4b548cb20 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -116,9 +116,9 @@ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) t * @param rollbackInFlight */ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackInFlight) { - this.fs = FSUtils.getFs(); this.jsc = jsc; this.config = clientConfig; + this.fs = FSUtils.getFs(config.getBasePath()); this.index = HoodieIndex.createIndex(config, jsc); this.metrics = new HoodieMetrics(config, config.getTableName()); this.archiveLog = new HoodieCommitArchiveLog(clientConfig, fs); @@ -654,7 +654,7 @@ private void rollback(List commits) { .map((Function) partitionPath -> { // Scan all partitions files with this commit time logger.info("Cleaning path " + partitionPath); - FileSystem fs1 = FSUtils.getFs(); + FileSystem fs1 = FSUtils.getFs(config.getBasePath()); FileStatus[] toBeDeleted = fs1.listStatus(new Path(config.getBasePath(), partitionPath), path -> { if(!path.toString().contains(".parquet")) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java index 8ba4068ad071d..428c08bc0cbfa 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java @@ -47,7 +47,7 @@ public HoodieIOHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable) { this.commitTime = commitTime; this.config = config; - this.fs = FSUtils.getFs(); + this.fs = FSUtils.getFs(config.getBasePath()); this.hoodieTable = hoodieTable; this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline(); this.fileSystemView = hoodieTable.getROFileSystemView(); @@ -74,7 +74,7 @@ public static void cleanupTmpFilesFromCurrentCommit(HoodieWriteConfig config, String commitTime, String partitionPath, int taskPartitionId) { - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(config.getBasePath()); try { FileStatus[] prevFailedFiles = fs.globStatus(new Path(String .format("%s/%s/%s", config.getBasePath(), partitionPath, diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 003f8e69981b7..ac507b2caf002 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -136,7 +136,7 @@ private boolean isCompactionSucceeded(HoodieCompactionMetadata result) { private List executeCompaction(HoodieTableMetaClient metaClient, HoodieWriteConfig config, CompactionOperation operation, String commitTime) throws IOException { - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(config.getBasePath()); Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java index ca3ed57091bcc..23b58bea5d0e9 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java @@ -19,6 +19,7 @@ import com.uber.hoodie.avro.HoodieAvroWriteSupport; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -52,10 +53,9 @@ public class HoodieParquetWriter HoodieStorageWriter getStorageWriter( String commitTime, Path path, HoodieTable hoodieTable, HoodieWriteConfig config, Schema schema) throws IOException { @@ -50,7 +51,7 @@ private static HoodieSt HoodieParquetConfig parquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, config.getParquetBlockSize(), config.getParquetPageSize(), - config.getParquetMaxFileSize(), FSUtils.getFs().getConf()); + config.getParquetMaxFileSize(), FSUtils.getFs(config.getBasePath()).getConf()); return new HoodieParquetWriter<>(commitTime, path, parquetConfig, schema); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java index 7cd2b8377e9a8..f76f4f73cdaf9 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java @@ -16,6 +16,8 @@ package com.uber.hoodie.io.storage; +import com.uber.hoodie.common.util.FSUtils; +import com.uber.hoodie.exception.HoodieIOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.AclEntry; @@ -67,7 +69,12 @@ public class HoodieWrapperFileSystem extends FileSystem { @Override public void initialize(URI uri, Configuration conf) throws IOException { // Get the default filesystem to decorate - fileSystem = FileSystem.get(conf); + Path path = new Path(uri); + // Remove 'hoodie-' prefix from path + if (path.toString().startsWith(HOODIE_SCHEME_PREFIX)) { + path = new Path(path.toString().replace(HOODIE_SCHEME_PREFIX, "")); + } + this.fileSystem = FSUtils.getFs(path.toString(), conf); // Do not need to explicitly initialize the default filesystem, its done already in the above FileSystem.get // fileSystem.initialize(FileSystem.getDefaultUri(conf), conf); // fileSystem.setConf(conf); @@ -632,8 +639,12 @@ public Path convertToHoodiePath(Path oldPath) { } public static Path convertToHoodiePath(Path file, Configuration conf) { - String scheme = FileSystem.getDefaultUri(conf).getScheme(); - return convertPathWithScheme(file, getHoodieScheme(scheme)); + try { + String scheme = FSUtils.getFs(file.toString(), conf).getScheme(); + return convertPathWithScheme(file, getHoodieScheme(scheme)); + } catch (HoodieIOException e) { + throw e; + } } private Path convertToDefaultPath(Path oldPath) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 23fec955dc000..3520e2e6c5515 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -421,7 +421,7 @@ public Iterator> handleUpdate(String commitTime, String fileLo throw new HoodieUpsertException("Error in finding the old file path at commit " + commitTime +" at fileLoc: " + fileLoc); } else { - Configuration conf = FSUtils.getFs().getConf(); + Configuration conf = FSUtils.getFs(config.getBasePath()).getConf(); AvroReadSupport.setAvroReadSchema(conf, upsertHandle.getSchema()); ParquetReader reader = AvroParquetReader.builder(upsertHandle.getOldFilePath()).withConf(conf).build(); diff --git a/hoodie-client/src/test/java/HoodieClientExample.java b/hoodie-client/src/test/java/HoodieClientExample.java index 1045418c37bdc..bcc7732e94855 100644 --- a/hoodie-client/src/test/java/HoodieClientExample.java +++ b/hoodie-client/src/test/java/HoodieClientExample.java @@ -82,7 +82,7 @@ public void run() throws Exception { // initialize the table, if not done already Path path = new Path(tablePath); - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(tablePath); if (!fs.exists(path)) { HoodieTableMetaClient.initTableType(fs, tablePath, HoodieTableType.valueOf(tableType), tableName); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java index fadb9255f7101..6ac8d7722d4a2 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClient.java @@ -203,7 +203,7 @@ public void testUpserts() throws Exception { HoodieWriteConfig cfg = getConfig(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); HoodieIndex index = HoodieIndex.createIndex(cfg, jsc); - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(basePath); /** * Write 1 (only inserts) @@ -282,7 +282,7 @@ public void testDeletes() throws Exception { HoodieWriteConfig cfg = getConfig(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); HoodieIndex index = HoodieIndex.createIndex(cfg, jsc); - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(basePath); /** * Write 1 (inserts and deletes) @@ -362,7 +362,7 @@ public void testCreateSavepoint() throws Exception { .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1) .build()).build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(basePath); HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath); /** @@ -446,7 +446,7 @@ public void testRollbackToSavepoint() throws Exception { .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1) .build()).build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(basePath); HoodieTestDataGenerator.writePartitionMetadata(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath); /** @@ -550,7 +550,7 @@ public void testInsertAndCleanByVersions() throws Exception { .retainFileVersions(maxVersions).build()).build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); HoodieIndex index = HoodieIndex.createIndex(cfg, jsc); - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(basePath); /** * do a big insert @@ -644,7 +644,7 @@ public void testInsertAndCleanByCommits() throws Exception { .retainCommits(maxCommits).build()).build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); HoodieIndex index = HoodieIndex.createIndex(cfg, jsc); - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(basePath); /** * do a big insert @@ -724,7 +724,7 @@ public void testRollbackCommit() throws Exception { String commitTime2 = "20160502020601"; String commitTime3 = "20160506030611"; new File(basePath + "/.hoodie").mkdirs(); - HoodieTestDataGenerator.writePartitionMetadata(FSUtils.getFs(), + HoodieTestDataGenerator.writePartitionMetadata(FSUtils.getFs(basePath), new String[] {"2016/05/01", "2016/05/02", "2016/05/06"}, basePath); @@ -817,7 +817,7 @@ public void testAutoRollbackCommit() throws Exception { String commitTime2 = "20160502020601"; String commitTime3 = "20160506030611"; new File(basePath + "/.hoodie").mkdirs(); - HoodieTestDataGenerator.writePartitionMetadata(FSUtils.getFs(), + HoodieTestDataGenerator.writePartitionMetadata(FSUtils.getFs(basePath), new String[] {"2016/05/01", "2016/05/02", "2016/05/06"}, basePath); @@ -896,7 +896,7 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize) { @Test public void testSmallInsertHandlingForUpserts() throws Exception { - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(basePath); final String TEST_PARTITION_PATH = "2016/09/26"; final int INSERT_SPLIT_LIMIT = 10; // setup the small file handling params @@ -1013,7 +1013,7 @@ public void testSmallInsertHandlingForInserts() throws Exception { List statuses= client.insert(insertRecordsRDD1, commitTime1).collect(); assertNoWriteErrors(statuses); - assertPartitionMetadata(new String[]{TEST_PARTITION_PATH}, FSUtils.getFs()); + assertPartitionMetadata(new String[]{TEST_PARTITION_PATH}, FSUtils.getFs(basePath)); assertEquals("Just 1 file needs to be added.", 1, statuses.size()); String file1 = statuses.get(0).getFileId(); @@ -1052,7 +1052,7 @@ public void testSmallInsertHandlingForInserts() throws Exception { assertEquals("2 files needs to be committed.", 2, statuses.size()); - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); List files = @@ -1084,7 +1084,7 @@ public void testKeepLatestFileVersions() throws IOException { String file1P0C0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "000"); String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[1], "000"); HoodieTable table = HoodieTable - .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(basePath), config.getBasePath(), true), config); List hoodieCleanStatsOne = table.clean(jsc); assertEquals("Must not clean any files" , 0, getCleanStat(hoodieCleanStatsOne, partitionPaths[0]).getSuccessDeleteFiles().size()); @@ -1095,7 +1095,7 @@ public void testKeepLatestFileVersions() throws IOException { // make next commit, with 1 insert & 1 update per partition HoodieTestUtils.createCommitFiles(basePath, "001"); table = HoodieTable - .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(basePath), config.getBasePath(), true), config); String file2P0C1 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "001"); // insert String file2P1C1 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[1], "001"); // insert @@ -1113,7 +1113,7 @@ public void testKeepLatestFileVersions() throws IOException { // make next commit, with 2 updates to existing files, and 1 insert HoodieTestUtils.createCommitFiles(basePath, "002"); table = HoodieTable - .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(basePath), config.getBasePath(), true), config); HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "002", file1P0C0); // update HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "002", file2P0C1); // update @@ -1183,7 +1183,7 @@ public void testKeepLatestCommits() throws IOException { String file1P1C0 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[1], "000"); HoodieTable table = HoodieTable - .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(basePath), config.getBasePath(), true), config); List hoodieCleanStatsOne = table.clean(jsc); assertEquals("Must not clean any files" , 0, getCleanStat(hoodieCleanStatsOne, partitionPaths[0]).getSuccessDeleteFiles().size()); @@ -1194,7 +1194,7 @@ public void testKeepLatestCommits() throws IOException { // make next commit, with 1 insert & 1 update per partition HoodieTestUtils.createCommitFiles(basePath, "001"); table = HoodieTable - .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(basePath), config.getBasePath(), true), config); String file2P0C1 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[0], "001"); // insert String file2P1C1 = HoodieTestUtils.createNewDataFile(basePath, partitionPaths[1], "001"); // insert @@ -1212,7 +1212,7 @@ public void testKeepLatestCommits() throws IOException { // make next commit, with 2 updates to existing files, and 1 insert HoodieTestUtils.createCommitFiles(basePath, "002"); table = HoodieTable - .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(basePath), config.getBasePath(), true), config); HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "002", file1P0C0); // update HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "002", file2P0C1); // update @@ -1228,7 +1228,7 @@ public void testKeepLatestCommits() throws IOException { // make next commit, with 2 updates to existing files, and 1 insert HoodieTestUtils.createCommitFiles(basePath, "003"); table = HoodieTable - .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(basePath), config.getBasePath(), true), config); HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "003", file1P0C0); // update HoodieTestUtils.createDataFile(basePath, partitionPaths[0], "003", file2P0C1); // update @@ -1268,7 +1268,7 @@ public void testCleaningWithZeroPartitonPaths() throws IOException { HoodieTestUtils.createCommitFiles(basePath, "000"); HoodieTable table = HoodieTable - .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(basePath), config.getBasePath(), true), config); List hoodieCleanStatsOne = table.clean(jsc); @@ -1332,7 +1332,7 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) { updateAllFilesInPartition(filesP2C0, partitionPaths[2], "003"); HoodieTable table = HoodieTable - .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(), config.getBasePath(), true), config); + .getHoodieTable(new HoodieTableMetaClient(FSUtils.getFs(basePath), config.getBasePath(), true), config); List hoodieCleanStats = table.clean(jsc); assertEquals(100, getCleanStat(hoodieCleanStats, partitionPaths[0]).getSuccessDeleteFiles().size()); @@ -1354,7 +1354,7 @@ public void testCommitWritesRelativePaths() throws Exception { HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs, basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java index d13dd33733a8a..94c37d1b23d9f 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestMergeOnReadTable.java @@ -73,7 +73,7 @@ public class TestMergeOnReadTable { private transient JavaSparkContext jsc = null; private transient SQLContext sqlContext; - private String basePath = null; + private static String basePath = null; private HoodieCompactor compactor; private FileSystem fs; @@ -92,7 +92,7 @@ public static void cleanUp() throws Exception { FSUtils.setFs(null); // TEMPFIX(vc): Fix failing build //FileSystem.closeAll(); - HoodieTestUtils.resetFS(); + HoodieTestUtils.resetFS(basePath); } @BeforeClass @@ -106,21 +106,21 @@ public static void setUpDFS() throws IOException { dfs = dfsCluster.getFileSystem(); } FSUtils.setFs(dfs); - HoodieTestUtils.resetFS(); + HoodieTestUtils.resetFS(basePath); } @Before public void init() throws IOException { - this.fs = FSUtils.getFs(); // Initialize a local spark env jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieMergeOnReadTable")); - jsc.hadoopConfiguration().addResource(FSUtils.getFs().getConf()); + jsc.hadoopConfiguration().addResource(FSUtils.getFs(basePath).getConf()); // Create a temp folder as the base path TemporaryFolder folder = new TemporaryFolder(); folder.create(); basePath = folder.getRoot().getAbsolutePath(); + fs = FSUtils.getFs(basePath); dfs.mkdirs(new Path(basePath)); FSUtils.setFs(dfs); HoodieTestUtils.initTableType(basePath, HoodieTableType.MERGE_ON_READ); @@ -296,7 +296,7 @@ public void testSimpleInsertAndDelete() throws Exception { assertTrue(dataFilesToRead.findAny().isPresent()); List dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); - List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles); + List recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); //Wrote 40 records and deleted 20 records, so remaining 40-20 = 20 assertEquals("Must contain 20 records", 20, recordsRead.size()); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieMergeOnReadTestUtils.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieMergeOnReadTestUtils.java index 4b2424eb72f35..3e23722b5a19d 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieMergeOnReadTestUtils.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieMergeOnReadTestUtils.java @@ -43,11 +43,11 @@ */ public class HoodieMergeOnReadTestUtils { - public static List getRecordsUsingInputFormat(List inputPaths) throws IOException { + public static List getRecordsUsingInputFormat(List inputPaths, String basePath) throws IOException { JobConf jobConf = new JobConf(); Schema schema = HoodieAvroUtils.addMetadataFields(Schema.parse(TRIP_EXAMPLE_SCHEMA)); HoodieRealtimeInputFormat inputFormat = new HoodieRealtimeInputFormat(); - setPropsForInputFormat(inputFormat, jobConf, schema); + setPropsForInputFormat(inputFormat, jobConf, schema, basePath); return inputPaths.stream().map(path -> { setInputPath(jobConf, path); List records = new ArrayList<>(); @@ -75,11 +75,11 @@ public static List getRecordsUsingInputFormat(List inputP }).get(); } - private static void setPropsForInputFormat(HoodieRealtimeInputFormat inputFormat, JobConf jobConf, Schema schema) { + private static void setPropsForInputFormat(HoodieRealtimeInputFormat inputFormat, JobConf jobConf, Schema schema, String basePath) { List fields = schema.getFields(); String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); - Configuration conf = FSUtils.getFs().getConf(); + Configuration conf = FSUtils.getFs(basePath).getConf(); jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); jobConf.set("partition_columns", "datestr"); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java index fe9c9fd49a1e8..bca5bdf13154e 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java @@ -187,7 +187,7 @@ public static GenericRecord generateGenericRecord(String rowKey, String riderNam public static void createCommitFile(String basePath, String commitTime) throws IOException { Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCommitFileName(commitTime)); - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(basePath); FSDataOutputStream os = fs.create(commitFile, true); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); try { @@ -203,7 +203,7 @@ public static void createSavepointFile(String basePath, String commitTime) throw Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline .makeSavePointFileName(commitTime)); - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(basePath); FSDataOutputStream os = fs.create(commitFile, true); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); try { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java b/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java index 955865e1f070d..edb7fcf8f40ed 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java @@ -56,7 +56,7 @@ public void init() throws Exception { public void testSchemaEvolutionOnUpdate() throws Exception { // Create a bunch of records with a old version of schema HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.txt"); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); + HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(basePath), basePath); HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metadata); String recordStr1 = @@ -81,12 +81,12 @@ public void testSchemaEvolutionOnUpdate() throws Exception { Iterator> insertResult = table.handleInsert("100", records.iterator()); Path commitFile = new Path(config.getBasePath() + "/.hoodie/" + HoodieTimeline.makeCommitFileName("100")); - FSUtils.getFs().create(commitFile); + FSUtils.getFs(config.getBasePath()).create(commitFile); // Now try an update with an evolved schema // Evolved schema does not have guarantee on preserving the original field ordering config = makeHoodieClientConfig("/exampleEvolvedSchema.txt"); - metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); + metadata = new HoodieTableMetaClient(FSUtils.getFs(basePath), basePath); String fileId = insertResult.next().get(0).getFileId(); System.out.println(fileId); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java index 1a49b5953312c..2861c3772b76a 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java @@ -80,7 +80,10 @@ public class TestHoodieBloomIndex { private Schema schema; public TestHoodieBloomIndex() throws Exception { - fs = FSUtils.getFs(); + TemporaryFolder folder = new TemporaryFolder(); + folder.create(); + basePath = folder.getRoot().getAbsolutePath(); + fs = FSUtils.getFs(basePath); } @Before @@ -88,9 +91,6 @@ public void init() throws IOException { // Initialize a local spark env jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("TestHoodieBloomIndex")); // Create a temp folder as the base path - TemporaryFolder folder = new TemporaryFolder(); - folder.create(); - basePath = folder.getRoot().getAbsolutePath(); HoodieTestUtils.init(basePath); // We have some records to be tagged (two different partitions) schemaStr = IOUtils.toString(getClass().getResourceAsStream("/exampleSchema.txt"), "UTF-8"); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java index 2934f97956b12..d507392c98cd8 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java @@ -56,7 +56,7 @@ public void init() throws Exception { folder.create(); basePath = folder.getRoot().getAbsolutePath(); HoodieTestUtils.init(basePath); - fs = FSUtils.getFs(); + fs = FSUtils.getFs(basePath); } @Test @@ -111,7 +111,7 @@ public void testArchiveDatasetWithArchival() throws IOException { originalCommits.removeAll(timeline.getInstants().collect(Collectors.toList())); //read the file - HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(), + HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(basePath), new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1")), HoodieArchivedMetaEntry.getClassSchema()); int archivedRecordsCount = 0; diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index 1ff8c300fc941..611271f56d1c7 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -106,7 +106,7 @@ private HoodieWriteConfig.Builder getConfigBuilder() { public void testCompactionOnCopyOnWriteFail() throws Exception { HoodieTestUtils.initTableType(basePath, HoodieTableType.COPY_ON_WRITE); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(basePath), basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig()); compactor.compact(jsc, getConfig(), table); @@ -114,7 +114,7 @@ public void testCompactionOnCopyOnWriteFail() throws Exception { @Test public void testCompactionEmpty() throws Exception { - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(basePath), basePath); HoodieWriteConfig config = getConfig(); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); @@ -133,7 +133,7 @@ public void testCompactionEmpty() throws Exception { @Test public void testLogFileCountsAfterCompaction() throws Exception { - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(basePath); // insert 100 records HoodieWriteConfig config = getConfig(); HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java index f01c4c80178e8..5ea4c48d615a4 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java @@ -88,7 +88,7 @@ public void testMakeNewPath() throws Exception { String commitTime = HoodieTestUtils.makeNewCommitTime(); HoodieWriteConfig config = makeHoodieClientConfig(); - HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(), basePath); + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(FSUtils.getFs(basePath), basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); HoodieCreateHandle io = new HoodieCreateHandle(config, commitTime, table, partitionPath); @@ -113,7 +113,7 @@ public void testUpdateRecords() throws Exception { // Prepare the AvroParquetIO HoodieWriteConfig config = makeHoodieClientConfig(); String firstCommitTime = HoodieTestUtils.makeNewCommitTime(); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); + HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(basePath), basePath); String partitionPath = "/2016/01/31"; HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metadata); @@ -176,7 +176,7 @@ public void testUpdateRecords() throws Exception { Thread.sleep(1000); String newCommitTime = HoodieTestUtils.makeNewCommitTime(); - metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); + metadata = new HoodieTableMetaClient(FSUtils.getFs(basePath), basePath); table = new HoodieCopyOnWriteTable(config, metadata); Iterator> iter = table.handleUpdate(newCommitTime, updatedRecord1.getCurrentLocation().getFileId(), updatedRecords.iterator()); @@ -241,7 +241,7 @@ private List newHoodieRecords(int n, String time) throws Exception @Test public void testInsertWithPartialFailures() throws Exception { HoodieWriteConfig config = makeHoodieClientConfig(); String commitTime = HoodieTestUtils.makeNewCommitTime(); - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(config.getBasePath()); HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs, basePath); HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metadata); @@ -280,7 +280,7 @@ private List newHoodieRecords(int n, String time) throws Exception @Test public void testInsertRecords() throws Exception { HoodieWriteConfig config = makeHoodieClientConfig(); String commitTime = HoodieTestUtils.makeNewCommitTime(); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); + HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(basePath), basePath); HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metadata); // Case 1: @@ -327,7 +327,7 @@ private List newHoodieRecords(int n, String time) throws Exception HoodieStorageConfig.newBuilder().limitFileSize(64 * 1024).parquetBlockSize(64 * 1024) .parquetPageSize(64 * 1024).build()).build(); String commitTime = HoodieTestUtils.makeNewCommitTime(); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); + HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(basePath), basePath); HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metadata); List records = new ArrayList<>(); @@ -371,7 +371,7 @@ private List testUpsertPartitioner(int smal HoodieClientTestUtils.fakeCommitFile(basePath, "001"); HoodieClientTestUtils.fakeDataFile(basePath, TEST_PARTITION_PATH, "001", "file1", fileSize); - HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(), basePath); + HoodieTableMetaClient metadata = new HoodieTableMetaClient(FSUtils.getFs(basePath), basePath); HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, metadata); HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{TEST_PARTITION_PATH}); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java index b0387d870acd5..1d80d6d875ed3 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java @@ -101,7 +101,7 @@ public HoodieTableMetaClient() { private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); - this.fs = FSUtils.getFs(); + this.fs = FSUtils.getFs(basePath); } private void writeObject(java.io.ObjectOutputStream out) diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java index e0887846716a6..d493845862843 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java @@ -141,7 +141,7 @@ public WriterBuilder onParentPath(Path parentPath) { public Writer build() throws IOException, InterruptedException { log.info("Building HoodieLogFormat Writer"); if (fs == null) { - fs = FSUtils.getFs(); + fs = FSUtils.getFs(parentPath.toString()); } if (logFileId == null) { throw new IllegalArgumentException("FileID is not specified"); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java index 2a5b463000901..29d59a5d3f143 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieActiveTimeline.java @@ -117,7 +117,7 @@ public HoodieActiveTimeline() { private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); - this.fs = FSUtils.getFs(); + this.fs = FSUtils.getFs(metaPath); } /** diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java index 458cf6eb8dce3..474928a84bd1f 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/timeline/HoodieArchivedTimeline.java @@ -92,7 +92,7 @@ public HoodieArchivedTimeline() { private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); - this.fs = FSUtils.getFs(); + this.fs = FSUtils.getFs(metaPath); } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java index 6f1d63c44091c..338466dce2008 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemView.java @@ -106,7 +106,7 @@ public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); - this.fs = FSUtils.getFs(); + this.fs = FSUtils.getFs(metaClient.getBasePath()); } private void writeObject(java.io.ObjectOutputStream out) diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java index 18ec2b3fb05e2..d7f5797e454dc 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java @@ -66,17 +66,23 @@ public static void setFs(FileSystem fs) { FSUtils.fs = fs; } - - public static FileSystem getFs() { + public static FileSystem getFs(String path) { if (fs != null) { return fs; } Configuration conf = new Configuration(); conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + return getFs(path, conf); + } + + public static FileSystem getFs(String path, Configuration conf) { + if (fs != null) { + return fs; + } FileSystem fs; try { - fs = FileSystem.get(conf); + fs = new Path(path).getFileSystem(conf); } catch (IOException e) { throw new HoodieIOException("Failed to get instance of " + FileSystem.class.getName(), e); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java index 541d60b8306e9..ab76608d263cd 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java @@ -56,7 +56,7 @@ public class ParquetUtils { */ public static Set readRowKeysFromParquet(Path filePath) { Configuration conf = new Configuration(); - conf.addResource(getFs().getConf()); + conf.addResource(getFs(filePath.toString()).getConf()); Schema readSchema = HoodieAvroUtils.getRecordKeySchema(); AvroReadSupport.setAvroReadSchema(conf, readSchema); AvroReadSupport.setRequestedProjection(conf, readSchema); @@ -102,7 +102,7 @@ public static ParquetMetadata readMetadata(Configuration conf, Path parquetFileP ParquetMetadata footer; try { // TODO(vc): Should we use the parallel reading version here? - footer = ParquetFileReader.readFooter(getFs().getConf(), parquetFilePath); + footer = ParquetFileReader.readFooter(getFs(parquetFilePath.toString()).getConf(), parquetFilePath); } catch (IOException e) { throw new HoodieIOException("Failed to read footer for parquet " + parquetFilePath, e); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index f3ed874763a69..68199cea8ce47 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -68,18 +68,19 @@ import static org.junit.Assert.fail; public class HoodieTestUtils { - public static FileSystem fs = FSUtils.getFs(); + public static FileSystem fs; public static final String TEST_EXTENSION = ".test"; public static final String RAW_TRIPS_TEST_NAME = "raw_trips"; public static final int DEFAULT_TASK_PARTITIONID = 1; public static final String[] DEFAULT_PARTITION_PATHS = {"2016/03/15", "2015/03/16", "2015/03/17"}; private static Random rand = new Random(46474747); - public static void resetFS() { - HoodieTestUtils.fs = FSUtils.getFs(); + public static void resetFS(String basePath) { + HoodieTestUtils.fs = FSUtils.getFs(basePath); } public static HoodieTableMetaClient init(String basePath) throws IOException { + fs = FSUtils.getFs(basePath); return initTableType(basePath, HoodieTableType.COPY_ON_WRITE); } @@ -87,6 +88,7 @@ public static HoodieTableMetaClient initTableType(String basePath, HoodieTableTy Properties properties = new Properties(); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, tableType.name()); + fs = FSUtils.getFs(basePath); return HoodieTableMetaClient.initializePathAsHoodieDataset(fs, basePath, properties); } @@ -183,7 +185,7 @@ public static String makeInflightTestFileName(String instant) { public static void createCleanFiles(String basePath, String commitTime) throws IOException { Path commitFile = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCleanerFileName(commitTime)); - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(basePath); FSDataOutputStream os = fs.create(commitFile, true); try { HoodieCleanStat cleanStats = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, @@ -233,6 +235,8 @@ public static T serializeDeserialize(T object, Class } public static void writeRecordsToLogFiles(String basePath, Schema schema, List updatedRecords) { + + fs = FSUtils.getFs(basePath); Map> groupedUpdated = updatedRecords.stream() .collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)); diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java index 4e92d9aeff5db..a9e9459ba44e0 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java @@ -318,7 +318,7 @@ public void testAppendAndReadOnCorruptedLog() writer.close(); // Append some arbit byte[] to thee end of the log (mimics a partially written commit) - fs = FileSystem.get(fs.getConf()); + fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath()); // create a block with outputStream.write(HoodieLogFormat.MAGIC); @@ -484,7 +484,7 @@ public void testAvroLogRecordReaderWithRollbackPartialBlock() writer.close(); // Append some arbit byte[] to thee end of the log (mimics a partially written commit) - fs = FileSystem.get(fs.getConf()); + fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf()); FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath()); // create a block with outputStream.write(HoodieLogFormat.MAGIC); diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java index b68a299dbc88f..6a9b10487c83c 100644 --- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java +++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -126,8 +126,7 @@ private void readAndCompactLog() throws IOException { split.getDeltaFilePaths(), split.getPath(), projectionFields)); HoodieCompactedLogRecordScanner compactedLogRecordScanner = - new HoodieCompactedLogRecordScanner(FSUtils.getFs(), split.getDeltaFilePaths(), - readerSchema); + new HoodieCompactedLogRecordScanner(FSUtils.getFs(split.getPath().toString()), split.getDeltaFilePaths(), readerSchema); // NOTE: HoodieCompactedLogRecordScanner will not return records for an in-flight commit // but can return records for completed commits > the commit we are trying to read (if using readCommit() API) diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java index 2968e0d0f11d0..8bb712f9bd8fe 100644 --- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java +++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java @@ -75,7 +75,7 @@ private HoodieLogFormat.Writer writeLogFile(File partitionDir, Schema schema, St String baseCommit, String newCommit, int numberOfRecords) throws InterruptedException,IOException { HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(partitionDir.getPath())) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId) - .overBaseCommit(baseCommit).withFs(FSUtils.getFs()).build(); + .overBaseCommit(baseCommit).withFs(FSUtils.getFs(basePath.getRoot().getAbsolutePath())).build(); List records = new ArrayList<>(); for(int i=0; i < numberOfRecords; i++) { records.add(InputFormatTestUtil.generateAvroRecordFromJson(schema, i, newCommit, "fileid0")); @@ -114,7 +114,7 @@ public void testReader() throws Exception { RecordReader reader = new MapredParquetInputFormat(). getRecordReader(new FileSplit(split.getPath(), 0, - FSUtils.getFs().getLength(split.getPath()), (String[]) null), jobConf, null); + FSUtils.getFs(basePath.getRoot().getAbsolutePath()).getLength(split.getPath()), (String[]) null), jobConf, null); JobConf jobConf = new JobConf(); List fields = schema.getFields(); String names = fields.stream().map(f -> f.name().toString()).collect(Collectors.joining(",")); @@ -168,10 +168,9 @@ public void testReaderWithNestedAndComplexSchema() throws Exception { RecordReader reader = new MapredParquetInputFormat(). getRecordReader(new FileSplit(split.getPath(), 0, - FSUtils.getFs().getLength(split.getPath()), (String[]) null), jobConf, null); + FSUtils.getFs(basePath.toString()).getLength(split.getPath()), (String[]) null), jobConf, null); JobConf jobConf = new JobConf(); List fields = schema.getFields(); - String names = fields.stream().map(f -> f.name()).collect(Collectors.joining(",")); String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); diff --git a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncTool.java b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncTool.java index 1268e69e85695..d513d9c9a78d5 100644 --- a/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncTool.java +++ b/hoodie-hive/src/main/java/com/uber/hoodie/hive/HiveSyncTool.java @@ -185,7 +185,7 @@ public static void main(String[] args) throws Exception { cmd.usage(); System.exit(1); } - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(cfg.basePath); HiveConf hiveConf = new HiveConf(); hiveConf.addResource(fs.getConf()); new HiveSyncTool(cfg, hiveConf, fs).syncHoodieTable(); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java index a8338b727e952..9ae63799bb0c0 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java @@ -70,7 +70,7 @@ public class HDFSParquetImporter implements Serializable{ public HDFSParquetImporter( Config cfg) throws IOException { this.cfg = cfg; - fs = FSUtils.getFs(); + fs = FSUtils.getFs(cfg.targetPath); } public static class FormatValidator implements IValueValidator { diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java index 9bb7869bd1b55..28d9a4ff78d69 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieSnapshotCopier.java @@ -67,7 +67,7 @@ static class Config implements Serializable { } public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDir, final boolean shouldAssumeDatePartitioning) throws IOException { - FileSystem fs = FSUtils.getFs(); + FileSystem fs = FSUtils.getFs(baseDir); final HoodieTableMetaClient tableMetadata = new HoodieTableMetaClient(fs, baseDir); final TableFileSystemView.ReadOptimizedView fsView = new HoodieTableFileSystemView(tableMetadata, tableMetadata.getActiveTimeline().getCommitsAndCompactionsTimeline().filterCompletedInstants()); @@ -95,7 +95,7 @@ public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDi jsc.parallelize(partitions, partitions.size()) .flatMap(partition -> { // Only take latest version files <= latestCommit. - FileSystem fs1 = FSUtils.getFs(); + FileSystem fs1 = FSUtils.getFs(baseDir); List> filePaths = new ArrayList<>(); Stream dataFiles = fsView.getLatestDataFilesBeforeOrOn(partition, latestCommitTimestamp); dataFiles.forEach(hoodieDataFile -> filePaths.add(new Tuple2<>(partition, hoodieDataFile.getPath()))); @@ -111,7 +111,7 @@ public void snapshot(JavaSparkContext jsc, String baseDir, final String outputDi String partition = tuple._1(); Path sourceFilePath = new Path(tuple._2()); Path toPartitionPath = new Path(outputDir, partition); - FileSystem fs1 = FSUtils.getFs(); + FileSystem fs1 = FSUtils.getFs(baseDir); if (!fs1.exists(toPartitionPath)) { fs1.mkdirs(toPartitionPath); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java index ec0cbccc9e9d0..0e58b57bc6e2b 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -114,7 +114,7 @@ public class HoodieDeltaStreamer implements Serializable { public HoodieDeltaStreamer(Config cfg) throws IOException { this.cfg = cfg; - this.fs = FSUtils.getFs(); + this.fs = FSUtils.getFs(cfg.targetBasePath); if (fs.exists(new Path(cfg.targetBasePath))) { @@ -193,7 +193,7 @@ private void sync() throws Exception { } else { Properties properties = new Properties(); properties.put(HoodieWriteConfig.TABLE_NAME, cfg.targetTableName); - HoodieTableMetaClient.initializePathAsHoodieDataset(FSUtils.getFs(), cfg.targetBasePath, properties); + HoodieTableMetaClient.initializePathAsHoodieDataset(FSUtils.getFs(cfg.targetBasePath), cfg.targetBasePath, properties); } log.info("Checkpoint to resume from : " + resumeCheckpointStr); diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java index dc09a703a14a7..073125a8793cd 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/FilebasedSchemaProvider.java @@ -51,7 +51,7 @@ static class Config { public FilebasedSchemaProvider(PropertiesConfiguration config) { super(config); - this.fs = FSUtils.getFs(); + this.fs = FSUtils.getFs(config.getBasePath()); UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.SOURCE_SCHEMA_FILE_PROP, Config.TARGET_SCHEMA_FILE_PROP)); try { diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java index 9440dffd207f8..4cf8d0754e8d5 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/DFSSource.java @@ -65,7 +65,7 @@ static class Config { public DFSSource(PropertiesConfiguration config, JavaSparkContext sparkContext, SourceDataFormat dataFormat, SchemaProvider schemaProvider) { super(config, sparkContext, dataFormat, schemaProvider); - this.fs = FSUtils.getFs(); + this.fs = FSUtils.getFs(config.getBasePath()); UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP)); } diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java index 6ebc1103e24cd..1f31e0ace3314 100644 --- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java +++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/sources/HiveIncrPullSource.java @@ -73,7 +73,7 @@ static class Config { public HiveIncrPullSource(PropertiesConfiguration config, JavaSparkContext sparkContext, SourceDataFormat dataFormat, SchemaProvider schemaProvider) { super(config, sparkContext, dataFormat, schemaProvider); - this.fs = FSUtils.getFs(); + this.fs = FSUtils.getFs(config.getBasePath()); UtilHelpers.checkRequiredProperties(config, Arrays.asList(Config.ROOT_INPUT_PATH_PROP)); this.incrPullRootPath = config.getString(Config.ROOT_INPUT_PATH_PROP); } diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java index 33459b9fcb8cf..cfe1742c091c0 100644 --- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java @@ -49,7 +49,7 @@ public void init() throws IOException { basePath = rootPath + "/" + HoodieTestUtils.RAW_TRIPS_TEST_NAME; HoodieTestUtils.init(basePath); outputPath = rootPath + "/output"; - fs = FSUtils.getFs(); + fs = FSUtils.getFs(basePath); // Start a local Spark job SparkConf conf = new SparkConf().setAppName("snapshot-test-job").setMaster("local[2]"); jsc = new JavaSparkContext(conf); diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestMultipleFSExample.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestMultipleFSExample.java new file mode 100644 index 0000000000000..53b0ef17a3395 --- /dev/null +++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestMultipleFSExample.java @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2017 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.utilities; + + +import static org.junit.Assert.assertEquals; + +import com.beust.jcommander.Parameter; +import com.uber.hoodie.HoodieReadClient; +import com.uber.hoodie.HoodieWriteClient; +import com.uber.hoodie.common.HoodieTestDataGenerator; +import com.uber.hoodie.common.minicluster.HdfsTestService; +import com.uber.hoodie.common.model.HoodieRecord; +import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.table.HoodieTableMetaClient; +import com.uber.hoodie.common.util.FSUtils; +import java.io.Serializable; +import java.util.*; + +import com.uber.hoodie.config.HoodieIndexConfig; +import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.index.HoodieIndex; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestMultipleFSExample implements Serializable { + private static String dfsBasePath; + private static HdfsTestService hdfsTestService; + private static MiniDFSCluster dfsCluster; + private static DistributedFileSystem dfs; + private static Logger logger = LogManager.getLogger(TestMultipleFSExample.class); + private String tablePath = "file:///tmp/hoodie/sample-table"; + private String tableName = "hoodie_rt"; + private String tableType = HoodieTableType.COPY_ON_WRITE.name(); + + @BeforeClass + public static void initClass() throws Exception { + hdfsTestService = new HdfsTestService(); + dfsCluster = hdfsTestService.start(true); + + // Create a temp folder as the base path + dfs = dfsCluster.getFileSystem(); + dfsBasePath = dfs.getWorkingDirectory().toString(); + dfs.mkdirs(new Path(dfsBasePath)); + } + + @AfterClass + public static void cleanupClass() throws Exception { + if (hdfsTestService != null) { + hdfsTestService.stop(); + } + } + + @Test + public void readLocalWriteHDFS() throws Exception { + + SparkConf sparkConf = new SparkConf().setAppName("hoodie-client-example"); + sparkConf.setMaster("local[1]"); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + sparkConf.set("spark.kryoserializer.buffer.max", "512m"); + JavaSparkContext jsc = new JavaSparkContext(sparkConf); + SQLContext sqlContext = new SQLContext(jsc); + + // Generator of some records to be loaded in. + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + + // Initialize table and filesystem + FileSystem hdfs = FSUtils.getFs(dfsBasePath); + HoodieTableMetaClient.initTableType(hdfs, dfsBasePath, HoodieTableType.valueOf(tableType), tableName); + + //Create write client to write some records in + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(dfsBasePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .forTable(tableName).withIndexConfig( + HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .build(); + HoodieWriteClient hdfsWriteClient = new HoodieWriteClient(jsc, cfg); + + // Write generated data to hdfs (only inserts) + String readCommitTime = hdfsWriteClient.startCommit(); + logger.info("Starting commit " + readCommitTime); + List records = dataGen.generateInserts(readCommitTime, 100); + JavaRDD writeRecords = jsc.parallelize(records, 1); + hdfsWriteClient.upsert(writeRecords, readCommitTime); + + // Read from hdfs + HoodieReadClient hdfsReadClient = new HoodieReadClient(jsc, dfsBasePath, sqlContext); + Dataset readRecords = hdfsReadClient.readCommit(readCommitTime); + assertEquals("Should contain 100 records", readRecords.count(), records.size()); + + // Write to local + FileSystem local = FSUtils.getFs(tablePath); + HoodieTableMetaClient.initTableType(local, tablePath, HoodieTableType.valueOf(tableType), tableName); + HoodieWriteConfig localConfig = HoodieWriteConfig.newBuilder().withPath(tablePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .forTable(tableName).withIndexConfig( + HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) + .build(); + HoodieWriteClient localWriteClient = new HoodieWriteClient(jsc, localConfig); + + String writeCommitTime = localWriteClient.startCommit(); + logger.info("Starting write commit " + writeCommitTime); + List localRecords = dataGen.generateInserts(writeCommitTime, 100); + JavaRDD localWriteRecords = jsc.parallelize(localRecords, 1); + logger.info("Writing to path: " + tablePath); + localWriteClient.upsert(localWriteRecords, writeCommitTime); + + logger.info("Reading from path: " + tablePath); + HoodieReadClient localReadClient = new HoodieReadClient(jsc, tablePath, sqlContext); + Dataset localReadRecords = localReadClient.readCommit(writeCommitTime); + assertEquals("Should contain 100 records", localReadRecords.count(), localRecords.size()); + + } +}