Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.common.config;

import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;

/**
* An extension of {@link java.util.Properties} that maintains the order.
* The implementation is not thread-safe.
*/
public class OrderedProperties extends Properties {

private final HashSet<Object> keys = new LinkedHashSet<>();

public OrderedProperties() {
super(null);
}

public OrderedProperties(Properties defaults) {
if (Objects.nonNull(defaults)) {
for (String key : defaults.stringPropertyNames()) {
put(key, defaults.getProperty(key));
}
}
}

@Override
public Enumeration propertyNames() {
return Collections.enumeration(keys);
}

@Override
public synchronized Enumeration<Object> keys() {
return Collections.enumeration(keys);
}

@Override
public Set<String> stringPropertyNames() {
Set<String> set = new LinkedHashSet<>();
for (Object key : this.keys) {
if (key instanceof String) {
set.add((String) key);
}
}
return set;
}

public synchronized void putAll(Properties t) {
for (Map.Entry<?, ?> e : t.entrySet()) {
if (!containsKey(String.valueOf(e.getKey()))) {
keys.add(e.getKey());
}
super.put(e.getKey(), e.getValue());
}
}

@Override
public synchronized Object put(Object key, Object value) {
keys.remove(key);
keys.add(key);
return super.put(key, value);
}

public synchronized Object putIfAbsent(Object key, Object value) {
if (!containsKey(String.valueOf(key))) {
keys.add(key);
}
return super.putIfAbsent(key, value);
}

@Override
public Object remove(Object key) {
keys.remove(key);
return super.remove(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,16 @@

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Type-aware extension of {@link java.util.Properties}.
*/
public class TypedProperties extends Properties implements Serializable {

private final HashSet<Object> keys = new LinkedHashSet<>();

public TypedProperties() {
super(null);
}
Expand All @@ -50,56 +42,6 @@ public TypedProperties(Properties defaults) {
}
}

@Override
public Enumeration propertyNames() {
return Collections.enumeration(keys);
}

@Override
public synchronized Enumeration<Object> keys() {
return Collections.enumeration(keys);
}

@Override
public Set<String> stringPropertyNames() {
Set<String> set = new LinkedHashSet<>();
for (Object key : this.keys) {
if (key instanceof String) {
set.add((String) key);
}
}
return set;
}

public synchronized void putAll(Properties t) {
for (Map.Entry<?, ?> e : t.entrySet()) {
if (!containsKey(String.valueOf(e.getKey()))) {
keys.add(e.getKey());
}
super.put(e.getKey(), e.getValue());
}
}

@Override
public synchronized Object put(Object key, Object value) {
keys.remove(key);
keys.add(key);
return super.put(key, value);
}

public synchronized Object putIfAbsent(Object key, Object value) {
if (!containsKey(String.valueOf(key))) {
keys.add(key);
}
return super.putIfAbsent(key, value);
}

@Override
public Object remove(Object key) {
keys.remove(key);
return super.remove(key);
}

private void checkKey(String property) {
if (!containsKey(property)) {
throw new IllegalArgumentException("Property " + property + " not found");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,13 @@

package org.apache.hudi.common.table;

import org.apache.avro.Schema;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.bootstrap.index.NoOpBootstrapIndex;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.OrderedProperties;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
Expand All @@ -47,12 +41,17 @@
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -220,10 +219,7 @@ public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName
setValue(PAYLOAD_CLASS_NAME, payloadClassName);
// FIXME(vc): wonder if this can be removed. Need to look into history.
try (FSDataOutputStream outputStream = fs.create(propertyPath)) {
if (!isValidChecksum()) {
setValue(TABLE_CHECKSUM, String.valueOf(generateChecksum(props)));
}
props.store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
storeProperties(props, outputStream);
}
}
} catch (IOException e) {
Expand All @@ -233,6 +229,34 @@ public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName
"hoodie.properties file seems invalid. Please check for left over `.updated` files if any, manually copy it to hoodie.properties and retry");
}

private static Properties getOrderedPropertiesWithTableChecksum(Properties props) {
Properties orderedProps = new OrderedProperties(props);
orderedProps.put(TABLE_CHECKSUM.key(), String.valueOf(generateChecksum(props)));
return orderedProps;
}

/**
* Write the properties to the given output stream and return the table checksum.
*
* @param props - properties to be written
* @param outputStream - output stream to which properties will be written
* @return return the table checksum
* @throws IOException
*/
private static String storeProperties(Properties props, FSDataOutputStream outputStream) throws IOException {
String checksum;
if (props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props)) {
checksum = props.getProperty(TABLE_CHECKSUM.key());
props.store(outputStream, "Updated at " + Instant.now());
} else {
Properties propsWithChecksum = getOrderedPropertiesWithTableChecksum(props);
propsWithChecksum.store(outputStream, "Properties saved on " + Instant.now());
checksum = propsWithChecksum.getProperty(TABLE_CHECKSUM.key());
props.setProperty(TABLE_CHECKSUM.key(), checksum);
}
return checksum;
}

private boolean isValidChecksum() {
return contains(TABLE_CHECKSUM) && validateChecksum(props);
}
Expand Down Expand Up @@ -316,13 +340,7 @@ private static void modify(FileSystem fs, Path metadataFolder, Properties modify
Properties props = new TypedProperties();
props.load(in);
modifyFn.accept(props, modifyProps);
if (props.containsKey(TABLE_CHECKSUM.key()) && validateChecksum(props)) {
checksum = props.getProperty(TABLE_CHECKSUM.key());
} else {
checksum = String.valueOf(generateChecksum(props));
props.setProperty(TABLE_CHECKSUM.key(), checksum);
}
props.store(out, "Updated at " + System.currentTimeMillis());
checksum = storeProperties(props, out);
}
// 4. verify and remove backup.
try (FSDataInputStream in = fs.open(cfgPath)) {
Expand Down Expand Up @@ -385,12 +403,7 @@ public static void create(FileSystem fs, Path metadataFolder, Properties propert
if (hoodieConfig.contains(TIMELINE_TIMEZONE)) {
HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getString(TIMELINE_TIMEZONE)));
}
if (hoodieConfig.contains(TABLE_CHECKSUM)) {
hoodieConfig.setValue(TABLE_CHECKSUM, hoodieConfig.getString(TABLE_CHECKSUM));
} else {
hoodieConfig.setValue(TABLE_CHECKSUM, String.valueOf(generateChecksum(hoodieConfig.getProps())));
}
hoodieConfig.getProps().store(outputStream, "Properties saved on " + new Date(System.currentTimeMillis()));
storeProperties(hoodieConfig.getProps(), outputStream);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.common.properties;

import org.apache.hudi.common.config.OrderedProperties;

import org.junit.jupiter.api.Test;

import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class TestOrderedProperties {

@Test
public void testPutPropertiesOrder() {
Properties properties = new OrderedProperties();
properties.put("key0", "true");
properties.put("key1", "false");
properties.put("key2", "true");
properties.put("key3", "false");
properties.put("key4", "true");
properties.put("key5", "true");
properties.put("key6", "false");
properties.put("key7", "true");
properties.put("key8", "false");
properties.put("key9", "true");

OrderedProperties typedProperties = new OrderedProperties(properties);
assertTypeProperties(typedProperties, 0);
}

@Test
void testPutAllPropertiesOrder() {
Properties firstProp = new OrderedProperties();
firstProp.put("key0", "true");
firstProp.put("key1", "false");
firstProp.put("key2", "true");

OrderedProperties firstProperties = new OrderedProperties(firstProp);
assertTypeProperties(firstProperties, 0);

OrderedProperties secondProperties = new OrderedProperties();
secondProperties.put("key3", "true");
secondProperties.put("key4", "false");
secondProperties.put("key5", "true");
assertTypeProperties(secondProperties, 3);

OrderedProperties thirdProperties = new OrderedProperties();
thirdProperties.putAll(firstProp);
thirdProperties.putAll(secondProperties);

assertEquals(3, firstProp.stringPropertyNames().size());
assertEquals(3, secondProperties.stringPropertyNames().size());
assertEquals(6, thirdProperties.stringPropertyNames().size());
}

private void assertTypeProperties(OrderedProperties typedProperties, int start) {
String[] props = typedProperties.stringPropertyNames().toArray(new String[0]);
for (int i = start; i < props.length; i++) {
assertEquals(String.format("key%d", i), props[i]);
}
}
}
Loading