Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions hudi-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>

<!-- Parquet -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.apache.hudi.config.HoodieWriteConfig;

import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -39,7 +39,7 @@
*/
public abstract class AbstractHoodieClient implements Serializable, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(AbstractHoodieClient.class);
private static final Logger LOG = LogManager.getLogger(AbstractHoodieClient.class);

protected final transient FileSystem fs;
protected final transient JavaSparkContext jsc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -65,7 +65,7 @@
*/
public class CompactionAdminClient extends AbstractHoodieClient {

private static final Logger LOG = LoggerFactory.getLogger(CompactionAdminClient.class);
private static final Logger LOG = LogManager.getLogger(CompactionAdminClient.class);

public CompactionAdminClient(JavaSparkContext jsc, String basePath) {
super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build());
Expand Down Expand Up @@ -358,14 +358,13 @@ private List<RenameOpResult> runRenamingOps(HoodieTableMetaClient metaClient,
if (!dryRun) {
return jsc.parallelize(renameActions, parallelism).map(lfPair -> {
try {
LOG.info("RENAME {} => {}", lfPair.getLeft().getPath(), lfPair.getRight().getPath());
LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath());
renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight());
return new RenameOpResult(lfPair, true, Option.empty());
} catch (IOException e) {
LOG.error("Error renaming log file", e);
LOG.error("\n\n\n***NOTE Compaction is in inconsistent state. "
+ "Try running \"compaction repair {} \" to recover from failure ***\n\n\n",
lfPair.getLeft().getBaseCommitTime());
LOG.error("\n\n\n***NOTE Compaction is in inconsistent state. Try running \"compaction repair "
+ lfPair.getLeft().getBaseCommitTime() + "\" to recover from failure ***\n\n\n");
return new RenameOpResult(lfPair, false, Option.of(e));
}
}).collect();
Expand Down Expand Up @@ -396,7 +395,7 @@ protected List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedu
HoodieCompactionPlan plan = getCompactionPlan(metaClient, compactionInstant);
if (plan.getOperations() != null) {
LOG.info(
"Number of Compaction Operations :{} for instant :{}", plan.getOperations().size(), compactionInstant);
"Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant);
List<CompactionOperation> ops = plan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
return jsc.parallelize(ops, parallelism).flatMap(op -> {
Expand All @@ -410,7 +409,7 @@ protected List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedu
}
}).collect();
}
LOG.warn("No operations for compaction instant : {}", compactionInstant);
LOG.warn("No operations for compaction instant : " + compactionInstant);
return new ArrayList<>();
}

Expand Down
16 changes: 8 additions & 8 deletions hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {

private static final Logger LOG = LoggerFactory.getLogger(HoodieCleanClient.class);
private static final Logger LOG = LogManager.getLogger(HoodieCleanClient.class);
private final transient HoodieMetrics metrics;

public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics) {
Expand Down Expand Up @@ -85,7 +85,7 @@ protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOExcept

// If there are inflight(failed) or previously requested clean operation, first perform them
table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> {
LOG.info("There were previously unfinished cleaner operations. Finishing Instant={}", hoodieInstant);
LOG.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant);
runClean(table, hoodieInstant);
});

Expand Down Expand Up @@ -122,7 +122,7 @@ protected Option<HoodieCleanerPlan> scheduleClean(String startCleanTime) {
// Save to both aux and timeline folder
try {
table.getActiveTimeline().saveToCleanRequested(cleanInstant, AvroUtils.serializeCleanerPlan(cleanerPlan));
LOG.info("Requesting Cleaning with instant time {}", cleanInstant);
LOG.info("Requesting Cleaning with instant time " + cleanInstant);
} catch (IOException e) {
LOG.error("Got exception when saving cleaner requested file", e);
throw new HoodieIOException(e.getMessage(), e);
Expand Down Expand Up @@ -173,20 +173,20 @@ private HoodieCleanMetadata runClean(HoodieTable<T> table, HoodieInstant cleanIn
Option<Long> durationInMs = Option.empty();
if (context != null) {
durationInMs = Option.of(metrics.getDurationInMs(context.stop()));
LOG.info("cleanerElaspsedTime (Minutes): {}", durationInMs.get() / (1000 * 60));
LOG.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60));
}

HoodieTableMetaClient metaClient = createMetaClient(true);
// Create the metadata and save it
HoodieCleanMetadata metadata =
CleanerUtils.convertCleanMetadata(metaClient, cleanInstant.getTimestamp(), durationInMs, cleanStats);
LOG.info("Cleaned {} files. Earliest Retained : {}", metadata.getTotalFilesDeleted(), metadata.getEarliestCommitToRetain());
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files. Earliest Retained :" + metadata.getEarliestCommitToRetain());
metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted());

table.getActiveTimeline().transitionCleanInflightToComplete(
new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant.getTimestamp()),
AvroUtils.serializeCleanMetadata(metadata));
LOG.info("Marked clean started on {} as complete", cleanInstant.getTimestamp());
LOG.info("Marked clean started on " + cleanInstant.getTimestamp() + " as complete");
return metadata;
} catch (IOException e) {
throw new HoodieIOException("Failed to clean up after commit", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
Expand All @@ -49,16 +51,14 @@
import java.util.Set;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/**
* Provides an RDD based API for accessing/filtering Hoodie tables, based on keys.
*/
public class HoodieReadClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {

private static final Logger LOG = LoggerFactory.getLogger(HoodieReadClient.class);
private static final Logger LOG = LogManager.getLogger(HoodieReadClient.class);

/**
* TODO: We need to persist the index type into hoodie.properties and be able to access the index just with a simple
Expand Down
Loading