Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodieCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.uber.hoodie.cli;

import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.util.FSUtils;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -36,7 +37,7 @@ public enum CLIState {

public static boolean initConf() {
if (HoodieCLI.conf == null) {
HoodieCLI.conf = new Configuration();
HoodieCLI.conf = FSUtils.prepareHadoopConf(new Configuration());
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ public String showCommits(

System.out
.println("===============> Showing only " + limit + " archived commits <===============");
FileStatus[] fsStatuses = FSUtils.getFs().globStatus(
new Path(HoodieCLI.tableMetadata.getBasePath() + "/.hoodie/.commits_.archive*"));
String basePath = HoodieCLI.tableMetadata.getBasePath();
FileStatus[] fsStatuses = FSUtils.getFs(basePath, HoodieCLI.conf)
.globStatus(new Path(basePath + "/.hoodie/.commits_.archive*"));
List<String[]> allCommits = new ArrayList<>();
for (FileStatus fs : fsStatuses) {
//read the archived file
HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(FSUtils.getFs(),
HoodieLogFormat.Reader reader = HoodieLogFormat
.newReader(FSUtils.getFs(basePath, HoodieCLI.conf),
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema(), false);

List<IndexedRecord> readRecords = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public String showCleans() throws IOException {
@CliCommand(value = "cleans refresh", help = "Refresh the commits")
public String refreshCleans() throws IOException {
HoodieTableMetaClient metadata =
new HoodieTableMetaClient(HoodieCLI.fs, HoodieCLI.tableMetadata.getBasePath());
new HoodieTableMetaClient(HoodieCLI.conf, HoodieCLI.tableMetadata.getBasePath());
HoodieCLI.setTableMetadata(metadata);
return "Metadata for table " + metadata.getTableConfig().getTableName() + " refreshed.";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public String showCommits(
@CliCommand(value = "commits refresh", help = "Refresh the commits")
public String refreshCommits() throws IOException {
HoodieTableMetaClient metadata =
new HoodieTableMetaClient(HoodieCLI.fs, HoodieCLI.tableMetadata.getBasePath());
new HoodieTableMetaClient(HoodieCLI.conf, HoodieCLI.tableMetadata.getBasePath());
HoodieCLI.setTableMetadata(metadata);
return "Metadata for table " + metadata.getTableConfig().getTableName() + " refreshed.";
}
Expand Down Expand Up @@ -224,14 +224,13 @@ public boolean isCompareCommitsAvailable() {
public String compareCommits(
@CliOption(key = {"path"}, help = "Path of the dataset to compare to")
final String path) throws Exception {
HoodieTableMetaClient target = new HoodieTableMetaClient(HoodieCLI.fs, path);

HoodieTableMetaClient target = new HoodieTableMetaClient(HoodieCLI.conf, path);
HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants();
;
HoodieTableMetaClient source = HoodieCLI.tableMetadata;
HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants();
;
String targetLatestCommit =
targetTimeline.getInstants().iterator().hasNext() ? "0"
: targetTimeline.lastInstant().get().getTimestamp();
Expand Down Expand Up @@ -266,7 +265,7 @@ public boolean isSyncCommitsAvailable() {
public String syncCommits(
@CliOption(key = {"path"}, help = "Path of the dataset to compare to")
final String path) throws Exception {
HoodieCLI.syncTableMetadata = new HoodieTableMetaClient(HoodieCLI.fs, path);
HoodieCLI.syncTableMetadata = new HoodieTableMetaClient(HoodieCLI.conf, path);
HoodieCLI.state = HoodieCLI.CLIState.SYNC;
return "Load sync state between " + HoodieCLI.tableMetadata.getTableConfig().getTableName()
+ " and " + HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public String connect(
final String path) throws IOException {
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
HoodieCLI.setTableMetadata(new HoodieTableMetaClient(HoodieCLI.fs, path));
HoodieCLI.setTableMetadata(new HoodieTableMetaClient(HoodieCLI.conf, path));
HoodieCLI.state = HoodieCLI.CLIState.DATASET;
return "Metadata for table " + HoodieCLI.tableMetadata.getTableConfig().getTableName()
+ " loaded";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public String rollbackToSavepoint(
@CliCommand(value = "savepoints refresh", help = "Refresh the savepoints")
public String refreshMetaClient() throws IOException {
HoodieTableMetaClient metadata =
new HoodieTableMetaClient(HoodieCLI.fs, HoodieCLI.tableMetadata.getBasePath());
new HoodieTableMetaClient(HoodieCLI.conf, HoodieCLI.tableMetadata.getBasePath());
HoodieCLI.setTableMetadata(metadata);
return "Metadata for table " + metadata.getTableConfig().getTableName() + " refreshed.";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ private static int deduplicatePartitionPath(JavaSparkContext jsc,
String repairedOutputPath,
String basePath)
throws Exception {
DedupeSparkJob job = new DedupeSparkJob(basePath,
duplicatedPartitionPath, repairedOutputPath, new SQLContext(jsc), FSUtils.getFs());
DedupeSparkJob job = new DedupeSparkJob(basePath, duplicatedPartitionPath, repairedOutputPath,
new SQLContext(jsc), FSUtils.getFs(basePath, jsc.hadoopConfiguration()));
job.fixDuplicates(true);
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.uber.hoodie.HoodieWriteClient;
import com.uber.hoodie.cli.commands.SparkMain;
import com.uber.hoodie.common.util.FSUtils;
import java.io.File;
import java.net.URISyntaxException;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -66,6 +67,7 @@ public static JavaSparkContext initJavaSparkConf(String name) {
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().setBoolean("parquet.enable.summary-metadata", false);
FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());
return jsc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class DedupeSparkJob(basePath: String,
val tmpTableName = s"htbl_${System.currentTimeMillis()}"
val dedupeTblName = s"${tmpTableName}_dupeKeys"

val metadata = new HoodieTableMetaClient(fs, basePath)
val metadata = new HoodieTableMetaClient(fs.getConf, basePath)

val allFiles = fs.listStatus(new org.apache.hadoop.fs.Path(s"${basePath}/${duplicatedPartitionPath}"))
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles)
Expand Down Expand Up @@ -127,7 +127,7 @@ class DedupeSparkJob(basePath: String,


def fixDuplicates(dryRun: Boolean = true) = {
val metadata = new HoodieTableMetaClient(fs, basePath)
val metadata = new HoodieTableMetaClient(fs.getConf, basePath)

val allFiles = fs.listStatus(new Path(s"${basePath}/${duplicatedPartitionPath}"))
val fsView = new HoodieTableFileSystemView(metadata, metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), allFiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import scala.collection.mutable._
object SparkHelpers {
@throws[Exception]
def skipKeysAndWriteNewFile(commitTime: String, fs: FileSystem, sourceFile: Path, destinationFile: Path, keysToSkip: Set[String]) {
val sourceRecords = ParquetUtils.readAvroRecords(sourceFile)
val sourceRecords = ParquetUtils.readAvroRecords(fs.getConf, sourceFile)
val schema: Schema = sourceRecords.get(0).getSchema
val filter: BloomFilter = new BloomFilter(HoodieIndexConfig.DEFAULT_BLOOM_FILTER_NUM_ENTRIES.toInt, HoodieIndexConfig.DEFAULT_BLOOM_FILTER_FPP.toDouble)
val writeSupport: HoodieAvroWriteSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), schema, filter)
Expand All @@ -47,6 +47,7 @@ object SparkHelpers {
for (rec <- sourceRecords) {
val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString
if (!keysToSkip.contains(key)) {

writer.writeAvro(key, rec)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ public class HoodieReadClient implements Serializable {
*/
public HoodieReadClient(JavaSparkContext jsc, String basePath) {
this.jsc = jsc;
this.fs = FSUtils.getFs();
this.fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
// Create a Hoodie table which encapsulated the commits and files visible
this.hoodieTable = HoodieTable
.getHoodieTable(new HoodieTableMetaClient(fs, basePath, true), null);
.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true), null);
this.commitTimeline = hoodieTable.getCommitTimeline().filterCompletedInstants();
this.index =
new HoodieBloomIndex(HoodieWriteConfig.newBuilder().withPath(basePath).build(), jsc);
Expand Down Expand Up @@ -129,8 +129,8 @@ public Dataset<Row> read(JavaRDD<HoodieKey> hoodieKeys, int parallelism)
JavaPairRDD<HoodieKey, Row> keyRowRDD = originalDF.javaRDD()
.mapToPair(row -> {
HoodieKey key = new HoodieKey(
row.<String>getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD),
row.<String>getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
row.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD),
row.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
return new Tuple2<>(key, row);
});

Expand Down
Loading