Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -72,7 +71,6 @@ public class ConsistentBucketIndexUtils {
* @param table Hoodie table
* @param partition Table partition
* @param numBuckets Default bucket number
*
* @return Consistent hashing metadata
*/
public static HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable table, String partition, int numBuckets) {
Expand All @@ -85,7 +83,7 @@ public static HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable t

// There is no metadata, so try to create a new one and save it.
HoodieConsistentHashingMetadata metadata = new HoodieConsistentHashingMetadata(partition, numBuckets);
if (saveMetadata(table, metadata, false)) {
if (saveMetadata(table, metadata)) {
return metadata;
}

Expand Down Expand Up @@ -177,25 +175,22 @@ public static Option<HoodieConsistentHashingMetadata> loadMetadata(HoodieTable t
/**
* Saves the metadata into storage
*
* @param table Hoodie table
* @param metadata Hashing metadata to be saved
* @param overwrite Whether to overwrite existing metadata
* @param table Hoodie table
* @param metadata Hashing metadata to be saved
* @return true if the metadata is saved successfully
*/
public static boolean saveMetadata(HoodieTable table, HoodieConsistentHashingMetadata metadata, boolean overwrite) {
public static boolean saveMetadata(HoodieTable table, HoodieConsistentHashingMetadata metadata) {
HoodieStorage storage = table.getStorage();
StoragePath dir = FSUtils.constructAbsolutePath(
table.getMetaClient().getHashingMetadataPath(), metadata.getPartitionPath());
StoragePath fullPath = new StoragePath(dir, metadata.getFilename());
try (OutputStream out = storage.create(fullPath, overwrite)) {
byte[] bytes = metadata.toBytes();
out.write(bytes);
out.close();
try {
storage.createImmutableFileInPath(fullPath, Option.of(metadata.toBytes()));
return true;
} catch (IOException e) {
LOG.warn("Failed to update bucket metadata: " + metadata, e);
return false;
}
return false;
}

/***
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatu
.collect(Collectors.toList());
HoodieConsistentHashingMetadata newMeta = new HoodieConsistentHashingMetadata(meta.getVersion(), meta.getPartitionPath(),
instantTime, meta.getNumBuckets(), seqNo + 1, newNodes);
ConsistentBucketIndexUtils.saveMetadata(hoodieTable, newMeta, true);
if (!ConsistentBucketIndexUtils.saveMetadata(hoodieTable, newMeta)) {
throw new HoodieIndexException("Failed to save metadata for partition: " + partition);
}
});

return writeStatuses;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,6 @@ public final void createImmutableFileInPath(StoragePath path,
StoragePath tmpPath = null;

boolean needTempFile = needCreateTempFile();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need an explicit flag for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need an explicit flag for this?

HoodieStorage::needCreateTempFile only be true for HDFS, but for other fs, such as local fs, we should also avoid visiting the intermediate state of the file. So completely, regardless of the underlying file system, we will always use the temp file method to create files in this scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have assumption that the storage has two categories: HDFS and Object Store, the later should always be atomic for file creation so that the renaming is only needful for HDFS.

If you think there is other fs system we need to support, at least to fix the HoodieStorage::needCreateTempFile itself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have assumption that the storage has two categories: HDFS and Object Store, the later should always be atomic for file creation so that the renaming is only needful for HDFS.

If you think there is other fs system we need to support, at least to fix the HoodieStorage::needCreateTempFile itself.

I fix HoodieStorage::needCreateTempFile to for all storage without write-transaction should create temp file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not do this, the flag isWriteTransactional does not set up right for many object stores. Let's just keep it as it is, local fs is only for testing purpose so we should be good.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not do this, the flag isWriteTransactional does not set up right for many object stores. Let's just keep it as it is, local fs is only for testing purpose so we should be good.

Got it! But can I just add a judgment of whether it's local fs or not? Because ut's tests rely on local fs, if we do not add this judgment, ut will have unexpected logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not change this logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not change this logic.

Ok, I have restored the original logic~


try {
if (!content.isPresent()) {
fsout = create(path, false);
Expand Down
Loading