Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
@Component
public class CompactionCommand implements CommandMarker {

private static Logger log = LogManager.getLogger(CompactionCommand.class);
private static final Logger LOG = LogManager.getLogger(CompactionCommand.class);

private static final String TMP_DIR = "/tmp/";

Expand Down Expand Up @@ -249,7 +249,7 @@ private <T> T deSerializeOperationResult(String inputP, FileSystem fs) throws Ex
ObjectInputStream in = new ObjectInputStream(fsDataInputStream);
try {
T result = (T) in.readObject();
log.info("Result : " + result);
LOG.info("Result : " + result);
return result;
} finally {
in.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
@Component
public class HDFSParquetImportCommand implements CommandMarker {

private static Logger log = LogManager.getLogger(HDFSParquetImportCommand.class);
private static final Logger LOG = LogManager.getLogger(HDFSParquetImportCommand.class);

@CliCommand(value = "hdfsparquetimport", help = "Imports Parquet dataset to a hoodie dataset")
public String convert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*/
public class SparkMain {

protected static final Logger LOG = Logger.getLogger(SparkMain.class);
private static final Logger LOG = Logger.getLogger(SparkMain.class);

/**
* Commands.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
public class InputStreamConsumer extends Thread {

protected static final Logger LOG = Logger.getLogger(InputStreamConsumer.class.getName());
private static final Logger LOG = Logger.getLogger(InputStreamConsumer.class.getName());
private InputStream is;

public InputStreamConsumer(InputStream is) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*/
public class SparkUtil {

public static Logger logger = Logger.getLogger(SparkUtil.class);
private static final Logger LOG = Logger.getLogger(SparkUtil.class);
public static final String DEFUALT_SPARK_MASTER = "yarn-client";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
*/
public class CompactionAdminClient extends AbstractHoodieClient {

private static Logger log = LogManager.getLogger(CompactionAdminClient.class);
private static final Logger LOG = LogManager.getLogger(CompactionAdminClient.class);

public CompactionAdminClient(JavaSparkContext jsc, String basePath) {
super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build());
Expand Down Expand Up @@ -350,25 +350,25 @@ private ValidationOpResult validateCompactionOperation(HoodieTableMetaClient met
private List<RenameOpResult> runRenamingOps(HoodieTableMetaClient metaClient,
List<Pair<HoodieLogFile, HoodieLogFile>> renameActions, int parallelism, boolean dryRun) {
if (renameActions.isEmpty()) {
log.info("No renaming of log-files needed. Proceeding to removing file-id from compaction-plan");
LOG.info("No renaming of log-files needed. Proceeding to removing file-id from compaction-plan");
return new ArrayList<>();
} else {
log.info("The following compaction renaming operations needs to be performed to un-schedule");
LOG.info("The following compaction renaming operations needs to be performed to un-schedule");
if (!dryRun) {
return jsc.parallelize(renameActions, parallelism).map(lfPair -> {
try {
log.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath());
LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath());
renameLogFile(metaClient, lfPair.getLeft(), lfPair.getRight());
return new RenameOpResult(lfPair, true, Option.empty());
} catch (IOException e) {
log.error("Error renaming log file", e);
log.error("\n\n\n***NOTE Compaction is in inconsistent state. Try running \"compaction repair "
LOG.error("Error renaming log file", e);
LOG.error("\n\n\n***NOTE Compaction is in inconsistent state. Try running \"compaction repair "
+ lfPair.getLeft().getBaseCommitTime() + "\" to recover from failure ***\n\n\n");
return new RenameOpResult(lfPair, false, Option.of(e));
}
}).collect();
} else {
log.info("Dry-Run Mode activated for rename operations");
LOG.info("Dry-Run Mode activated for rename operations");
return renameActions.parallelStream().map(lfPair -> new RenameOpResult(lfPair, false, false, Option.empty()))
.collect(Collectors.toList());
}
Expand All @@ -393,7 +393,7 @@ protected List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedu
: new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline());
HoodieCompactionPlan plan = getCompactionPlan(metaClient, compactionInstant);
if (plan.getOperations() != null) {
log.info(
LOG.info(
"Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant);
List<CompactionOperation> ops = plan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
Expand All @@ -408,7 +408,7 @@ protected List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedu
}
}).collect();
}
log.warn("No operations for compaction instant : " + compactionInstant);
LOG.warn("No operations for compaction instant : " + compactionInstant);
return new ArrayList<>();
}

Expand Down
16 changes: 8 additions & 8 deletions hudi-client/src/main/java/org/apache/hudi/HoodieCleanClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

public class HoodieCleanClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {

private static Logger logger = LogManager.getLogger(HoodieCleanClient.class);
private static final Logger LOG = LogManager.getLogger(HoodieCleanClient.class);
private final transient HoodieMetrics metrics;

public HoodieCleanClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, HoodieMetrics metrics) {
Expand Down Expand Up @@ -85,7 +85,7 @@ protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOExcept

// If there are inflight(failed) or previously requested clean operation, first perform them
table.getCleanTimeline().filterInflightsAndRequested().getInstants().forEach(hoodieInstant -> {
logger.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant);
LOG.info("There were previously unfinished cleaner operations. Finishing Instant=" + hoodieInstant);
runClean(table, hoodieInstant.getTimestamp());
});

Expand Down Expand Up @@ -122,9 +122,9 @@ protected Option<HoodieCleanerPlan> scheduleClean(String startCleanTime) {
// Save to both aux and timeline folder
try {
table.getActiveTimeline().saveToCleanRequested(cleanInstant, AvroUtils.serializeCleanerPlan(cleanerPlan));
logger.info("Requesting Cleaning with instant time " + cleanInstant);
LOG.info("Requesting Cleaning with instant time " + cleanInstant);
} catch (IOException e) {
logger.error("Got exception when saving cleaner requested file", e);
LOG.error("Got exception when saving cleaner requested file", e);
throw new HoodieIOException(e.getMessage(), e);
}
return Option.of(cleanerPlan);
Expand All @@ -147,7 +147,7 @@ protected HoodieCleanMetadata runClean(HoodieTable<T> table, String cleanInstant
cleanInstant.getState().equals(State.REQUESTED) || cleanInstant.getState().equals(State.INFLIGHT));

try {
logger.info("Cleaner started");
LOG.info("Cleaner started");
final Timer.Context context = metrics.getCleanCtx();

if (!cleanInstant.isInflight()) {
Expand All @@ -165,20 +165,20 @@ protected HoodieCleanMetadata runClean(HoodieTable<T> table, String cleanInstant
Option<Long> durationInMs = Option.empty();
if (context != null) {
durationInMs = Option.of(metrics.getDurationInMs(context.stop()));
logger.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60));
LOG.info("cleanerElaspsedTime (Minutes): " + durationInMs.get() / (1000 * 60));
}

HoodieTableMetaClient metaClient = createMetaClient(true);
// Create the metadata and save it
HoodieCleanMetadata metadata =
CleanerUtils.convertCleanMetadata(metaClient, cleanInstant.getTimestamp(), durationInMs, cleanStats);
logger.info("Cleaned " + metadata.getTotalFilesDeleted() + " files. Earliest Retained :" + metadata.getEarliestCommitToRetain());
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files. Earliest Retained :" + metadata.getEarliestCommitToRetain());
metrics.updateCleanMetrics(durationInMs.orElseGet(() -> -1L), metadata.getTotalFilesDeleted());

table.getActiveTimeline().transitionCleanInflightToComplete(
new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant.getTimestamp()),
AvroUtils.serializeCleanMetadata(metadata));
logger.info("Marked clean started on " + cleanInstant.getTimestamp() + " as complete");
LOG.info("Marked clean started on " + cleanInstant.getTimestamp() + " as complete");
return metadata;
} catch (IOException e) {
throw new HoodieIOException("Failed to clean up after commit", e);
Expand Down
Loading