Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ public String compactionsAll(
try {
// This could be a completed compaction. Assume a compaction request file is present but skip if fails
compactionPlan = AvroUtils.deserializeCompactionPlan(
activeTimeline.readPlanAsBytes(
activeTimeline.readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
} catch (HoodieIOException ioe) {
// SKIP
}
} else {
compactionPlan = AvroUtils.deserializeCompactionPlan(activeTimeline.readPlanAsBytes(
compactionPlan = AvroUtils.deserializeCompactionPlan(activeTimeline.readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
}

Expand Down Expand Up @@ -156,7 +156,7 @@ public String compactionShow(
HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
activeTimeline.readPlanAsBytes(
activeTimeline.readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());

List<Comparable[]> rows = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public List<RenameOpResult> repairCompaction(String compactionInstant, int paral
private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, String compactionInstant)
throws IOException {
return AvroUtils.deserializeCompactionPlan(
metaClient.getActiveTimeline().readPlanAsBytes(
metaClient.getActiveTimeline().readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus>
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
timeline.readPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
timeline.readCompactionPlanAsBytes(HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
// Merge extra meta-data passed by user with the one already in inflight compaction
Option<Map<String, String>> mergedMetaData = extraMetadata.map(m -> {
Map<String, String> merged = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -279,18 +280,28 @@ public Option<byte[]> getInstantDetails(HoodieInstant instant) {
return readDataFromPath(detailPath);
}

public Option<byte[]> readCleanerInfoAsBytes(HoodieInstant instant) {
// Cleaner metadata are always stored only in timeline .hoodie
return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
}

//-----------------------------------------------------------------
// BEGIN - COMPACTION RELATED META-DATA MANAGEMENT.
//-----------------------------------------------------------------

public Option<byte[]> readPlanAsBytes(HoodieInstant instant) {
Path detailPath = null;
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
detailPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());
} else {
detailPath = new Path(metaClient.getMetaPath(), instant.getFileName());
public Option<byte[]> readCompactionPlanAsBytes(HoodieInstant instant) {
try {
// Reading from auxiliary path first. In future release, we will cleanup compaction management
// to only write to timeline and skip auxiliary and this code will be able to handle it.
return readDataFromPath(new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName()));
} catch (HoodieIOException e) {
// This will be removed in future release. See HUDI-546
if (e.getIOException() instanceof FileNotFoundException) {
return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName()));
} else {
throw e;
}
}
return readDataFromPath(detailPath);
}

/**
Expand Down Expand Up @@ -344,14 +355,9 @@ public HoodieInstant transitionCompactionInflightToComplete(HoodieInstant inflig
}

private void createFileInAuxiliaryFolder(HoodieInstant instant, Option<byte[]> data) {
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
/**
* For latest version, since we write immutable files directly in timeline directory, there is no need to write
* additional immutable files in .aux folder
*/
Path fullPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());
createFileInPath(fullPath, data);
}
// This will be removed in future release. See HUDI-546
Path fullPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());
createFileInPath(fullPath, data);
}

//-----------------------------------------------------------------
Expand All @@ -369,8 +375,6 @@ public HoodieInstant transitionCleanInflightToComplete(HoodieInstant inflightIns
Preconditions.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
Preconditions.checkArgument(inflightInstant.isInflight());
HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, CLEAN_ACTION, inflightInstant.getTimestamp());
// First write metadata to aux folder
createFileInAuxiliaryFolder(commitInstant, data);
// Then write to timeline
transitionState(inflightInstant, commitInstant, data);
return commitInstant;
Expand Down Expand Up @@ -471,8 +475,6 @@ public void saveToCompactionRequested(HoodieInstant instant, Option<byte[]> cont
public void saveToCleanRequested(HoodieInstant instant, Option<byte[]> content) {
Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
Preconditions.checkArgument(instant.getState().equals(State.REQUESTED));
// Write workload to auxiliary folder
createFileInAuxiliaryFolder(instant, content);
// Plan is stored in meta path
createFileInMetaPath(instant.getFileName(), content, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static HoodieCleanMetadata getCleanerMetadata(HoodieTableMetaClient metaC
throws IOException {
CleanMetadataMigrator metadataMigrator = new CleanMetadataMigrator(metaClient);
HoodieCleanMetadata cleanMetadata = AvroUtils.deserializeHoodieCleanMetadata(
metaClient.getActiveTimeline().readPlanAsBytes(cleanInstant).get());
metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get());
return metadataMigrator.upgradeToLatest(cleanMetadata, cleanMetadata.getVersion());
}

Expand All @@ -85,7 +85,7 @@ public static HoodieCleanMetadata getCleanerMetadata(HoodieTableMetaClient metaC
*/
public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient metaClient, HoodieInstant cleanInstant)
throws IOException {
return AvroUtils.deserializeAvroMetadata(metaClient.getActiveTimeline().readPlanAsBytes(cleanInstant).get(),
return AvroUtils.deserializeAvroMetadata(metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get(),
HoodieCleanerPlan.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaC
throws IOException {
CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient);
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
metaClient.getActiveTimeline().readPlanAsBytes(
metaClient.getActiveTimeline().readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstant)).get());
return migrator.upgradeToLatest(compactionPlan, compactionPlan.getVersion());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package org.apache.hudi.common.table.string;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.HoodieCommonTestHarness;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.model.TimelineLayoutVersion;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand All @@ -31,6 +34,7 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.IOException;
import java.util.stream.Stream;

import static org.junit.Assert.assertEquals;
Expand All @@ -52,7 +56,7 @@ public void setUp() throws Exception {
}

@Test
public void testLoadingInstantsFromFiles() {
public void testLoadingInstantsFromFiles() throws IOException {
HoodieInstant instant1 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "1");
HoodieInstant instant2 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "3");
HoodieInstant instant3 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "5");
Expand Down Expand Up @@ -96,6 +100,30 @@ public void testLoadingInstantsFromFiles() {
timeline.getCommitTimeline().filterCompletedInstants().getInstants());
HoodieTestUtils.assertStreamEquals("Check the instants stream", Stream.of(instant5),
timeline.getCommitTimeline().filterPendingExcludingCompaction().getInstants());

// Backwards compatibility testing for reading compaction plans
HoodieInstant instant6 = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "9");
byte[] dummy = new byte[5];
HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline(new HoodieTableMetaClient(metaClient.getHadoopConf(),
metaClient.getBasePath(), true, metaClient.getConsistencyGuardConfig(),
Option.of(new TimelineLayoutVersion(TimelineLayoutVersion.VERSION_0))));
// Old Timeline writes both to aux and timeline folder
oldTimeline.saveToCompactionRequested(instant6, Option.of(dummy));
// Now use latest timeline version
timeline = timeline.reload();
// Ensure aux file is present
assertTrue(metaClient.getFs().exists(new Path(metaClient.getMetaAuxiliaryPath(), instant6.getFileName())));
// Read 5 bytes
assertEquals(timeline.readCompactionPlanAsBytes(instant6).get().length, 5);

// Delete auxiliary file to mimic future release where we stop writing to aux
metaClient.getFs().delete(new Path(metaClient.getMetaAuxiliaryPath(), instant6.getFileName()));

// Ensure requested instant is not present in aux
assertFalse(metaClient.getFs().exists(new Path(metaClient.getMetaAuxiliaryPath(), instant6.getFileName())));

// Now read compaction plan again which should not throw exception
assertEquals(timeline.readCompactionPlanAsBytes(instant6).get().length, 5);
}

@Test
Expand Down