Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.util.Map;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -169,12 +170,7 @@ public static GenericRecord jsonBytesToAvro(byte[] bytes, Schema schema) throws
}

public static boolean isMetadataField(String fieldName) {
return HoodieRecord.COMMIT_TIME_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.RECORD_KEY_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.PARTITION_PATH_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName)
|| HoodieRecord.OPERATION_METADATA_FIELD.equals(fieldName);
return HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldName);
}

public static Schema createHoodieWriteSchema(Schema originalSchema) {
Expand Down Expand Up @@ -245,7 +241,7 @@ public static Schema removeMetadataFields(Schema schema) {
return removeFields(schema, HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION);
}

public static Schema removeFields(Schema schema, List<String> fieldsToRemove) {
public static Schema removeFields(Schema schema, Set<String> fieldsToRemove) {
List<Schema.Field> filteredFields = schema.getFields()
.stream()
.filter(field -> !fieldsToRemove.contains(field.name()))
Expand Down Expand Up @@ -422,7 +418,7 @@ public static List<GenericRecord> rewriteRecords(List<GenericRecord> records, Sc
* <p>
* To better understand how it removes please check {@link #rewriteRecord(GenericRecord, Schema)}
*/
public static GenericRecord removeFields(GenericRecord record, List<String> fieldsToRemove) {
public static GenericRecord removeFields(GenericRecord record, Set<String> fieldsToRemove) {
Schema newSchema = removeFields(record.getSchema(), fieldsToRemove);
return rewriteRecord(record, newSchema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -48,8 +49,8 @@ public abstract class HoodieRecord<T> implements Serializable {

// Temporary to support the '_hoodie_operation' field, once we solve
// the compatibility problem, it can be removed.
public static final List<String> HOODIE_META_COLUMNS_WITH_OPERATION =
CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
public static final Set<String> HOODIE_META_COLUMNS_WITH_OPERATION =
CollectionUtils.createImmutableSet(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD,
OPERATION_METADATA_FIELD);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.nio.ByteBuffer;
import java.sql.Date;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -249,7 +249,7 @@ public void testRemoveFields() {
rec.put("non_pii_col", "val1");
rec.put("pii_col", "val2");
rec.put("timestamp", 3.5);
GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList("pii_col"));
GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Collections.singleton("pii_col"));
assertEquals("key1", rec1.get("_row_key"));
assertEquals("val1", rec1.get("non_pii_col"));
assertEquals(3.5, rec1.get("timestamp"));
Expand All @@ -262,7 +262,7 @@ public void testRemoveFields() {
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+ "{\"name\": \"pii_col\", \"type\": \"string\"}]},";
expectedSchema = new Schema.Parser().parse(schemaStr);
rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList(""));
rec1 = HoodieAvroUtils.removeFields(rec, Collections.singleton(""));
assertEquals(expectedSchema, rec1.getSchema());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table.catalog;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
Expand Down Expand Up @@ -60,7 +61,7 @@ public static List<String> getFieldNames(List<FieldSchema> fieldSchemas) {
public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTable) {
List<FieldSchema> allCols = hiveTable.getSd().getCols().stream()
// filter out the metadata columns
.filter(s -> !HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(s.getName()))
.filter(s -> !HoodieAvroUtils.isMetadataField(s.getName()))
.collect(Collectors.toList());
// need to refactor the partition key field positions: they are not always in the last
allCols.addAll(hiveTable.getPartitionKeys());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,9 @@ object HoodieSparkSqlWriter {
}

def generateSchemaWithoutPartitionColumns(partitionParam: String, schema: Schema): Schema = {
val fieldsToRemove = new java.util.ArrayList[String]()
val fieldsToRemove = new java.util.HashSet[String]()
partitionParam.split(",").map(partitionField => partitionField.trim)
.filter(s => !s.isEmpty).map(field => fieldsToRemove.add(field))
.filter(s => s.nonEmpty).map(field => fieldsToRemove.add(field))
HoodieAvroUtils.removeFields(schema, fieldsToRemove)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSourc
}

boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT);
List<String> partitionColumns = getPartitionColumns(keyGenerator, props);
Set<String> partitionColumns = getPartitionColumns(keyGenerator, props);
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
JavaRDD<HoodieRecord> records = avroRDD.map(record -> {
GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(record, partitionColumns) : record;
Expand Down Expand Up @@ -952,14 +952,14 @@ private Boolean isDropPartitionColumns() {
}

/**
* Get the list of partition columns as a list of strings.
* Get the partition columns as a set of strings.
*
* @param keyGenerator KeyGenerator
* @param props TypedProperties
* @return List of partition columns.
* @return Set of partition columns.
*/
private List<String> getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) {
private Set<String> getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) {
String partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
return Arrays.asList(partitionColumns.split(","));
return Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
}
}