Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand Down Expand Up @@ -117,7 +118,17 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach(metadata::addMetadata);
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
String schema = config.getSchema();
if (config.updatePartialFields()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have this so that whether or not we can update partial fields is simply controlled by the payload impl.
I am concerned that a config like config.updatePartialFields is not inline with how we treat payloads. for e.g for MOR, there is no writeConfig when we merge the fields via record reader

try {
TableSchemaResolver resolver = new TableSchemaResolver(table.getMetaClient());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to create resolver again? Does config.getLastSchema() work here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to create resolver again? Does config.getLastSchema() work here?

I tested it E2E and found that would not get the lastSchema from config since the config object are different

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get the last Schema from the commit metadata, right?

schema = resolver.getTableAvroSchemaWithoutMetadataFields().toString();
} catch (Exception e) {
// ignore exception.
schema = config.getSchema();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are potentially reducing schema here, so I think this can lead to issues. Can we throw error? At the least, can you add a LOG here to make sure this gets noticed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are potentially reducing schema here, so I think this can lead to issues. Can we throw error? At the least, can you add a LOG here to make sure this gets noticed?

it handles the case that users config update partial fields in the first time, my original idea is not to throw error in this case, and LOG here sounds reasonable to me.

}
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema);
metadata.setOperationType(operationType);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version";
public static final String BASE_PATH_PROP = "hoodie.base.path";
public static final String AVRO_SCHEMA = "hoodie.avro.schema";
public static final String LAST_AVRO_SCHEMA = "hoodie.last.avro.schema";
public static final String AVRO_SCHEMA_VALIDATE = "hoodie.avro.schema.validate";
public static final String DEFAULT_AVRO_SCHEMA_VALIDATE = "false";
public static final String DEFAULT_PARALLELISM = "1500";
Expand Down Expand Up @@ -94,6 +95,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode";
public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT
.toString();
public static final String DELETE_MARKER_FIELD_PROP = "hoodie.write.delete.marker.field";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed for this change? what is this used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, not needed. will revert

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

public static final String DEFAULT_DELETE_MARKER_FIELD = "_hoodie_is_deleted";

public static final String UPDATE_PARTIAL_FIELDS = "hoodie.update.partial.fields";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs on what these mean please.

public static final String DEFAULT_UPDATE_PARTIAL_FIELDS = "false";

public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
Expand Down Expand Up @@ -161,10 +167,18 @@ public String getSchema() {
return props.getProperty(AVRO_SCHEMA);
}

public String getLastSchema() {
return props.getProperty(LAST_AVRO_SCHEMA);
}

public void setSchema(String schemaStr) {
props.setProperty(AVRO_SCHEMA, schemaStr);
}

public void setLastSchema(String schemaStr) {
props.setProperty(LAST_AVRO_SCHEMA, schemaStr);
}

public boolean getAvroSchemaValidate() {
return Boolean.parseBoolean(props.getProperty(AVRO_SCHEMA_VALIDATE));
}
Expand Down Expand Up @@ -274,6 +288,14 @@ public BulkInsertSortMode getBulkInsertSortMode() {
return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
}

public String getDeleteMarkerField() {
return props.getProperty(DELETE_MARKER_FIELD_PROP);
}

public Boolean updatePartialFields() {
return Boolean.parseBoolean(props.getProperty(UPDATE_PARTIAL_FIELDS));
}

/**
* compaction properties.
*/
Expand Down Expand Up @@ -784,6 +806,11 @@ public Builder withSchema(String schemaStr) {
return this;
}

public Builder withLastSchema(String schemaStr) {
props.setProperty(LAST_AVRO_SCHEMA, schemaStr);
return this;
}

public Builder withAvroSchemaValidate(boolean enable) {
props.setProperty(AVRO_SCHEMA_VALIDATE, String.valueOf(enable));
return this;
Expand Down Expand Up @@ -940,6 +967,11 @@ public Builder withExternalSchemaTrasformation(boolean enabled) {
return this;
}

public Builder withUpdatePartialFields(boolean updatePartialFields) {
props.setProperty(UPDATE_PARTIAL_FIELDS, String.valueOf(updatePartialFields));
return this;
}

public Builder withProperties(Properties properties) {
this.props.putAll(properties);
return this;
Expand All @@ -950,6 +982,7 @@ protected void setDefaults() {
setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_PARALLELISM), BULKINSERT_PARALLELISM,
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(UPDATE_PARTIAL_FIELDS), UPDATE_PARTIAL_FIELDS, DEFAULT_UPDATE_PARTIAL_FIELDS);
setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.generic.GenericRecord;
Expand All @@ -60,7 +59,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri

public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable,
String partitionPath, String fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config),
this(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config, hoodieTable),
sparkTaskContextSupplier);
}

Expand All @@ -79,7 +78,7 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
partitionMetadata.trySave(getPartitionId());
createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension()));
this.fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, writerSchemaWithMetafields, this.sparkTaskContextSupplier);
this.fileWriter = createNewFileWriter(instantTime, path, hoodieTable, config, writerSchemaWithMetafields, this.sparkTaskContextSupplier);
} catch (IOException e) {
throw new HoodieInsertException("Failed to initialize HoodieStorageWriter for path " + path, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -65,7 +67,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends H
public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
String fileId, HoodieTable<T> hoodieTable, SparkTaskContextSupplier sparkTaskContextSupplier) {
this(config, instantTime, partitionPath, fileId, hoodieTable,
getWriterSchemaIncludingAndExcludingMetadataPair(config), sparkTaskContextSupplier);
getWriterSchemaIncludingAndExcludingMetadataPair(config, hoodieTable), sparkTaskContextSupplier);
}

protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath, String fileId,
Expand All @@ -90,9 +92,19 @@ protected HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String
* @param config Write Config
* @return
*/
protected static Pair<Schema, Schema> getWriterSchemaIncludingAndExcludingMetadataPair(HoodieWriteConfig config) {
protected static Pair<Schema, Schema> getWriterSchemaIncludingAndExcludingMetadataPair(HoodieWriteConfig config, HoodieTable hoodieTable) {
Schema originalSchema = new Schema.Parser().parse(config.getSchema());
Schema hoodieSchema = HoodieAvroUtils.addMetadataFields(originalSchema);
boolean updatePartialFields = config.updatePartialFields();
if (updatePartialFields) {
try {
TableSchemaResolver resolver = new TableSchemaResolver(hoodieTable.getMetaClient());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only applicable for MergeHandle if i understand correctly. Do you think its better to override this in MergeHandle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds reasonable

Schema lastSchema = resolver.getTableAvroSchema();
config.setLastSchema(lastSchema.toString());
} catch (Exception e) {
// Ignore exception.
}
}
return Pair.of(originalSchema, hoodieSchema);
}

Expand Down Expand Up @@ -164,7 +176,11 @@ public void write(HoodieRecord record, Option<IndexedRecord> avroRecord, Option<
* Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields.
*/
protected GenericRecord rewriteRecord(GenericRecord record) {
return HoodieAvroUtils.rewriteRecord(record, writerSchemaWithMetafields);
if (config.updatePartialFields() && !StringUtils.isNullOrEmpty(config.getLastSchema())) {
return HoodieAvroUtils.rewriteRecord(record, new Schema.Parser().parse(config.getLastSchema()));
} else {
return HoodieAvroUtils.rewriteRecord(record, writerSchemaWithMetafields);
}
}

public abstract WriteStatus close();
Expand Down Expand Up @@ -192,6 +208,10 @@ protected long getAttemptId() {

protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, HoodieTable<T> hoodieTable,
HoodieWriteConfig config, Schema schema, SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, sparkTaskContextSupplier);
if (config.updatePartialFields() && !StringUtils.isNullOrEmpty(config.getLastSchema())) {
return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, new Schema.Parser().parse(config.getLastSchema()), sparkTaskContextSupplier);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question as above, is it better to override this only in MergeHandle?

} else {
return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, sparkTaskContextSupplier);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -237,6 +238,9 @@ protected void finalizeWrite(String instantTime, List<HoodieWriteStat> stats, Ho
* By default, return the writer schema in Write Config for storing in commit.
*/
protected String getSchemaToStoreInCommit() {
if (config.updatePartialFields() && !StringUtils.isNullOrEmpty(config.getLastSchema())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is repeated in multiple places. I usually find this pattern error-prone. Is it possible to reorganize? For example, we always expect config.getSchema() to represent full table schema. We add new config.getUpdateSchema() that tracks partial fields that are being updated.

With that approach, I think we can use getUpdateSchema only in MergeHandle/helpers. Storing schema would work as before as we store full table schema i.e., config.getSchema()

return config.getLastSchema();
}
return config.getSchema();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -73,7 +74,11 @@ public static <T extends HoodieRecordPayload<T>> void runMerge(HoodieTable<T> ta
} else {
gReader = null;
gWriter = null;
readSchema = upsertHandle.getWriterSchemaWithMetafields();
if (table.getConfig().updatePartialFields() && !StringUtils.isNullOrEmpty(table.getConfig().getLastSchema())) {
readSchema = new Schema.Parser().parse(table.getConfig().getLastSchema());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comment as before. if we make config.getSchema() to always track full table schema, this can be simplified.

} else {
readSchema = upsertHandle.getWriterSchemaWithMetafields();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are also calling getWriterSchemaWithMetafields in other places in this class (example: line 163). Dont we need to read getLastSchema() there?

}
}

BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.PartialUpdatePayload;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_TRIP_SCHEMA;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_SCHEMA;
import static org.apache.hudi.common.testutils.Transformations.recordsToHoodieKeys;
import static org.apache.hudi.common.util.ParquetUtils.readAvroRecords;
import static org.apache.hudi.common.util.ParquetUtils.readAvroSchema;
import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestHoodiePartialUpdate extends HoodieClientTestBase {

@Test
public void testCopyOnWritePartialUpdate() {
final String testPartitionPath = "2016/09/26";
HoodieWriteClient client = getHoodieWriteClient(getConfig(true, false));
dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});

String commitTime1 = "001";
client.startCommitWithTime(commitTime1);

List<HoodieRecord> inserts1 =
dataGen.generateInsertsStream(commitTime1, 100, false, TRIP_SCHEMA).collect(Collectors.toList()); // this writes ~500kb

List<HoodieKey> insertKeys = recordsToHoodieKeys(inserts1);
upsertAndCheck(client, insertKeys, commitTime1, false);

client = getHoodieWriteClient(getConfig(true, true));
String commitTime2 = "002";
client.startCommitWithTime(commitTime2);

WriteStatus writeStatus = upsertAndCheck(client, insertKeys, commitTime2, true);

Schema schema = readAvroSchema(hadoopConf, new Path(basePath, writeStatus.getStat().getPath()));
List<String> oldSchemaFieldNames = AVRO_TRIP_SCHEMA.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
List<String> parquetFieldNames = schema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());

for (String name : oldSchemaFieldNames) {
assertTrue(parquetFieldNames.contains(name));
}

List<GenericRecord> records1 = readAvroRecords(hadoopConf, new Path(basePath, writeStatus.getStat().getPath()));
for (GenericRecord record : records1) {
assertEquals("rider-" + commitTime1, record.get("rider").toString());
assertEquals("driver-" + commitTime1, record.get("driver").toString());
assertEquals(String.valueOf(1.0), record.get("timestamp").toString());
}
}

private WriteStatus upsertAndCheck(HoodieWriteClient client, List<HoodieKey> insertKeys, String commitTime, boolean partial) {
List<HoodieRecord> records = new ArrayList<>();
for (HoodieKey hoodieKey : insertKeys) {
PartialUpdatePayload payload;
if (partial) {
payload = dataGen.generatePartialUpdatePayloadForPartialTripSchema(hoodieKey, commitTime);
} else {
payload = dataGen.generatePartialUpdatePayloadForTripSchema(hoodieKey, commitTime);
}
HoodieRecord record = new HoodieRecord(hoodieKey, payload);
records.add(record);
}

JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(records, 1);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime).collect();

assertNoWriteErrors(statuses);

assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
assertEquals(100,
readRowKeysFromParquet(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records");

return statuses.get(0);
}
}
Loading