Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -869,12 +869,11 @@ private void startCommit(String instantTime) {
LOG.info("Generate a new instant time " + instantTime);
HoodieTableMetaClient metaClient = createMetaClient(true);
// if there are pending compactions, their instantTime must not be greater than that of this instant time
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> {
Preconditions.checkArgument(
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER),
"Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
+ latestPending + ", Ingesting at " + instantTime);
});
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending ->
Preconditions.checkArgument(
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER),
"Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
+ latestPending + ", Ingesting at " + instantTime));
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitActionType = table.getMetaClient().getCommitActionType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,13 @@ public class MercifulJsonConverter {
* Build type processor map for each avro type.
*/
private static Map<Schema.Type, JsonToAvroFieldProcessor> getFieldTypeProcessors() {
Map<Schema.Type, JsonToAvroFieldProcessor> processorMap =
new ImmutableMap.Builder<Schema.Type, JsonToAvroFieldProcessor>().put(Type.STRING, generateStringTypeHandler())
.put(Type.BOOLEAN, generateBooleanTypeHandler()).put(Type.DOUBLE, generateDoubleTypeHandler())
.put(Type.FLOAT, generateFloatTypeHandler()).put(Type.INT, generateIntTypeHandler())
.put(Type.LONG, generateLongTypeHandler()).put(Type.ARRAY, generateArrayTypeHandler())
.put(Type.RECORD, generateRecordTypeHandler()).put(Type.ENUM, generateEnumTypeHandler())
.put(Type.MAP, generateMapTypeHandler()).put(Type.BYTES, generateBytesTypeHandler())
.put(Type.FIXED, generateFixedTypeHandler()).build();
return processorMap;
return new ImmutableMap.Builder<Type, JsonToAvroFieldProcessor>().put(Type.STRING, generateStringTypeHandler())
.put(Type.BOOLEAN, generateBooleanTypeHandler()).put(Type.DOUBLE, generateDoubleTypeHandler())
.put(Type.FLOAT, generateFloatTypeHandler()).put(Type.INT, generateIntTypeHandler())
.put(Type.LONG, generateLongTypeHandler()).put(Type.ARRAY, generateArrayTypeHandler())
.put(Type.RECORD, generateRecordTypeHandler()).put(Type.ENUM, generateEnumTypeHandler())
.put(Type.MAP, generateMapTypeHandler()).put(Type.BYTES, generateBytesTypeHandler())
.put(Type.FIXED, generateFixedTypeHandler()).build();
}

/**
Expand Down Expand Up @@ -286,7 +284,7 @@ private static JsonToAvroFieldProcessor generateArrayTypeHandler() {
public Pair<Boolean, Object> convert(Object value, String name, Schema schema)
throws HoodieJsonToAvroConversionException {
Schema elementSchema = schema.getElementType();
List listRes = new ArrayList();
List<Object> listRes = new ArrayList<>();
for (Object v : (List) value) {
listRes.add(convertJsonToAvroField(v, name, elementSchema));
}
Expand All @@ -301,7 +299,7 @@ private static JsonToAvroFieldProcessor generateMapTypeHandler() {
public Pair<Boolean, Object> convert(Object value, String name, Schema schema)
throws HoodieJsonToAvroConversionException {
Schema valueSchema = schema.getValueType();
Map<String, Object> mapRes = new HashMap<String, Object>();
Map<String, Object> mapRes = new HashMap<>();
for (Map.Entry<String, Object> v : ((Map<String, Object>) value).entrySet()) {
mapRes.put(v.getKey(), convertJsonToAvroField(v.getValue(), name, valueSchema));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ public void add(Key key) {

@Override
public void and(InternalFilter filter) {
if (filter == null
|| !(filter instanceof InternalDynamicBloomFilter)
if (!(filter instanceof InternalDynamicBloomFilter)
|| filter.vectorSize != this.vectorSize
|| filter.nbHash != this.nbHash) {
throw new IllegalArgumentException("filters cannot be and-ed");
Expand All @@ -122,8 +121,8 @@ public boolean membershipTest(Key key) {
return true;
}

for (int i = 0; i < matrix.length; i++) {
if (matrix[i].membershipTest(key)) {
for (BloomFilter bloomFilter : matrix) {
if (bloomFilter.membershipTest(key)) {
return true;
}
}
Expand All @@ -133,15 +132,14 @@ public boolean membershipTest(Key key) {

@Override
public void not() {
for (int i = 0; i < matrix.length; i++) {
matrix[i].not();
for (BloomFilter bloomFilter : matrix) {
bloomFilter.not();
}
}

@Override
public void or(InternalFilter filter) {
if (filter == null
|| !(filter instanceof InternalDynamicBloomFilter)
if (!(filter instanceof InternalDynamicBloomFilter)
|| filter.vectorSize != this.vectorSize
|| filter.nbHash != this.nbHash) {
throw new IllegalArgumentException("filters cannot be or-ed");
Expand All @@ -159,8 +157,7 @@ public void or(InternalFilter filter) {

@Override
public void xor(InternalFilter filter) {
if (filter == null
|| !(filter instanceof InternalDynamicBloomFilter)
if (!(filter instanceof InternalDynamicBloomFilter)
|| filter.vectorSize != this.vectorSize
|| filter.nbHash != this.nbHash) {
throw new IllegalArgumentException("filters cannot be xor-ed");
Expand All @@ -180,8 +177,8 @@ public void xor(InternalFilter filter) {
public String toString() {
StringBuilder res = new StringBuilder();

for (int i = 0; i < matrix.length; i++) {
res.append(matrix[i]);
for (BloomFilter bloomFilter : matrix) {
res.append(bloomFilter);
res.append(Character.LINE_SEPARATOR);
}
return res.toString();
Expand All @@ -195,8 +192,8 @@ public void write(DataOutput out) throws IOException {
out.writeInt(nr);
out.writeInt(currentNbRecord);
out.writeInt(matrix.length);
for (int i = 0; i < matrix.length; i++) {
matrix[i].write(out);
for (BloomFilter bloomFilter : matrix) {
bloomFilter.write(out);
}
}

Expand All @@ -217,13 +214,9 @@ public void readFields(DataInput in) throws IOException {
* Adds a new row to <i>this</i> dynamic Bloom filter.
*/
private void addRow() {
org.apache.hadoop.util.bloom.BloomFilter[] tmp = new org.apache.hadoop.util.bloom.BloomFilter[matrix.length + 1];

for (int i = 0; i < matrix.length; i++) {
tmp[i] = matrix[i];
}

tmp[tmp.length - 1] = new org.apache.hadoop.util.bloom.BloomFilter(vectorSize, nbHash, hashType);
BloomFilter[] tmp = new BloomFilter[matrix.length + 1];
System.arraycopy(matrix, 0, tmp, 0, matrix.length);
tmp[tmp.length - 1] = new BloomFilter(vectorSize, nbHash, hashType);
matrix = tmp;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

public class SimpleBloomFilter implements BloomFilter {

private org.apache.hadoop.util.bloom.BloomFilter filter = null;
private org.apache.hadoop.util.bloom.BloomFilter filter;

/**
* Create a new Bloom filter with the given configurations.
Expand Down Expand Up @@ -114,8 +114,7 @@ private void writeObject(ObjectOutputStream os)
filter.write(os);
}

private void readObject(ObjectInputStream is)
throws IOException, ClassNotFoundException {
private void readObject(ObjectInputStream is) throws IOException {
filter = new org.apache.hadoop.util.bloom.BloomFilter();
filter.readFields(is);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public static String getHoodieScheme(String scheme) {
}

@Override
public void initialize(URI uri, Configuration conf) throws IOException {
public void initialize(URI uri, Configuration conf) {
// Get the default filesystem to decorate
Path path = new Path(uri);
// Remove 'hoodie-' prefix from path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ private static class TimelineLayoutV1 extends TimelineLayout {
@Override
public Stream<HoodieInstant> filterHoodieInstants(Stream<HoodieInstant> instantStream) {
return instantStream.collect(Collectors.groupingBy(instant -> Pair.of(instant.getTimestamp(),
HoodieInstant.getComparableAction(instant.getAction())))).entrySet().stream()
.map(e -> e.getValue().stream().reduce((x, y) -> {
HoodieInstant.getComparableAction(instant.getAction())))).values().stream()
.map(hoodieInstants -> hoodieInstants.stream().reduce((x, y) -> {
// Pick the one with the highest state
if (x.getState().compareTo(y.getState()) >= 0) {
return x;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
public static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss");

public static final Set<String> VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = new HashSet<>(Arrays.asList(
new String[]{COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION,
INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION,
INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION,
INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION}));
COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION,
INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION,
INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION,
INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION));

private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
protected HoodieTableMetaClient metaClient;
Expand All @@ -79,7 +79,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
*/
public static String createNewInstantTime() {
return lastInstantTime.updateAndGet((oldVal) -> {
String newCommitTime = null;
String newCommitTime;
do {
newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date());
} while (HoodieTimeline.compareTimestamps(newCommitTime, oldVal, LESSER_OR_EQUAL));
Expand Down Expand Up @@ -255,7 +255,7 @@ public void deletePending(HoodieInstant instant) {

public void deleteCompactionRequested(HoodieInstant instant) {
Preconditions.checkArgument(instant.isRequested());
Preconditions.checkArgument(instant.getAction() == HoodieTimeline.COMPACTION_ACTION);
Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
deleteInstantFile(instant);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void setInstants(List<HoodieInstant> instants) {
final MessageDigest md;
try {
md = MessageDigest.getInstance(HASHING_ALGORITHM);
this.instants.stream().forEach(i -> md
this.instants.forEach(i -> md
.update(StringUtils.joinUsingDelim("_", i.getTimestamp(), i.getAction(), i.getState().name()).getBytes()));
} catch (NoSuchAlgorithmException nse) {
throw new HoodieException(nse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
.put(HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.COMMIT_ACTION).build();

public static final Comparator<HoodieInstant> ACTION_COMPARATOR =
Comparator.<HoodieInstant, String>comparing(instant -> getComparableAction(instant.getAction()));
Comparator.comparing(instant -> getComparableAction(instant.getAction()));

public static final Comparator<HoodieInstant> COMPARATOR = Comparator.comparing(HoodieInstant::getTimestamp)
.thenComparing(ACTION_COMPARATOR).thenComparing(HoodieInstant::getState);

public static final String getComparableAction(String action) {
return COMPARABLE_ACTIONS.containsKey(action) ? COMPARABLE_ACTIONS.get(action) : action;
public static String getComparableAction(String action) {
return COMPARABLE_ACTIONS.getOrDefault(action, action);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public SyncableFileSystemView getFileSystemView(String basePath) {
* Closes all views opened.
*/
public void close() {
this.globalViewMap.values().stream().forEach(v -> v.close());
this.globalViewMap.values().forEach(SyncableFileSystemView::close);
this.globalViewMap.clear();
}

Expand Down Expand Up @@ -196,7 +196,7 @@ public static FileSystemViewManager createViewManager(final SerializableConfigur
return new FileSystemViewManager(conf, config, (basePath, viewConfig) -> {
RemoteHoodieTableFileSystemView remoteFileSystemView =
createRemoteFileSystemView(conf, viewConfig, new HoodieTableMetaClient(conf.newCopy(), basePath));
SyncableFileSystemView secondaryView = null;
SyncableFileSystemView secondaryView;
switch (viewConfig.getSecondaryStorageType()) {
case MEMORY:
secondaryView = createInMemoryFileSystemView(conf, viewConfig, basePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public FileSystemViewStorageType getStorageType() {
}

public boolean isIncrementalTimelineSyncEnabled() {
return Boolean.valueOf(props.getProperty(FILESYSTEM_VIEW_INCREMENTAL_SYNC_MODE));
return Boolean.parseBoolean(props.getProperty(FILESYSTEM_VIEW_INCREMENTAL_SYNC_MODE));
}

public String getRemoteViewServerHost() {
Expand All @@ -87,10 +87,8 @@ public long getMaxMemoryForFileGroupMap() {

public long getMaxMemoryForPendingCompaction() {
long totalMemory = Long.parseLong(props.getProperty(FILESYSTEM_VIEW_SPILLABLE_MEM));
long reservedForPendingComaction =
new Double(totalMemory * Double.parseDouble(props.getProperty(FILESYSTEM_VIEW_PENDING_COMPACTION_MEM_FRACTION)))
.longValue();
return reservedForPendingComaction;
return new Double(totalMemory * Double.parseDouble(props.getProperty(FILESYSTEM_VIEW_PENDING_COMPACTION_MEM_FRACTION)))
.longValue();
}

public String getBaseStoreDir() {
Expand All @@ -113,12 +111,9 @@ public static class Builder {
private final Properties props = new Properties();

public Builder fromFile(File propertiesFile) throws IOException {
FileReader reader = new FileReader(propertiesFile);
try {
try (FileReader reader = new FileReader(propertiesFile)) {
props.load(reader);
return this;
} finally {
reader.close();
}
}

Expand Down
Loading