From e819f4c6250cc3be0794c77202bc2daae7474218 Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Thu, 10 Sep 2020 21:12:16 -0700 Subject: [PATCH] [HUDI-802] AWSDmsTransformer does not handle insert and delete of a row in a single batch correctly --- .../hudi/payload/AWSDmsAvroPayload.java | 23 ++- .../hudi/payload/TestAWSDmsAvroPayload.java | 132 ++++++++++++++++++ 2 files changed, 151 insertions(+), 4 deletions(-) create mode 100644 hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java diff --git a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java index 975151c3ce1e7..73711c705267d 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java +++ b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java @@ -53,10 +53,12 @@ public AWSDmsAvroPayload(Option record) { this(record.get(), (record1) -> 0); // natural order } - @Override - public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) - throws IOException { - IndexedRecord insertValue = getInsertValue(schema).get(); + /** + * + * Handle a possible delete - check for "D" in Op column and return empty row if found. + * @param insertValue The new row that is being "inserted". + */ + private Option handleDeleteOperation(IndexedRecord insertValue) throws IOException { boolean delete = false; if (insertValue instanceof GenericRecord) { GenericRecord record = (GenericRecord) insertValue; @@ -65,4 +67,17 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue return delete ? Option.empty() : Option.of(insertValue); } + + @Override + public Option getInsertValue(Schema schema) throws IOException { + IndexedRecord insertValue = super.getInsertValue(schema).get(); + return handleDeleteOperation(insertValue); + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) + throws IOException { + IndexedRecord insertValue = super.getInsertValue(schema).get(); + return handleDeleteOperation(insertValue); + } } diff --git a/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java b/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java new file mode 100644 index 0000000000000..802096a3a74e1 --- /dev/null +++ b/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.payload; + +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.util.Option; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class TestAWSDmsAvroPayload { + + private static final String AVRO_SCHEMA_STRING = "{\"type\": \"record\"," + + "\"name\": \"events\"," + "\"fields\": [ " + + "{\"name\": \"field1\", \"type\" : \"int\"}," + + "{\"name\": \"Op\", \"type\": \"string\"}" + + "]}"; + + @Test + public void testInsert() { + + Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); + GenericRecord record = new GenericData.Record(avroSchema); + record.put("field1", 0); + record.put("Op", "I"); + + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(record)); + + try { + Option outputPayload = payload.getInsertValue(avroSchema); + assertTrue((int) outputPayload.get().get(0) == 0); + assertTrue(outputPayload.get().get(1).toString().equals("I")); + } catch (Exception e) { + fail("Unexpected exception"); + } + + } + + @Test + public void testUpdate() { + Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); + GenericRecord newRecord = new GenericData.Record(avroSchema); + newRecord.put("field1", 1); + newRecord.put("Op", "U"); + + GenericRecord oldRecord = new GenericData.Record(avroSchema); + oldRecord.put("field1", 0); + oldRecord.put("Op", "I"); + + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(newRecord)); + + try { + Option outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema); + assertTrue((int) outputPayload.get().get(0) == 1); + assertTrue(outputPayload.get().get(1).toString().equals("U")); + } catch (Exception e) { + fail("Unexpected exception"); + } + + } + + @Test + public void testDelete() { + Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); + GenericRecord deleteRecord = new GenericData.Record(avroSchema); + deleteRecord.put("field1", 2); + deleteRecord.put("Op", "D"); + + GenericRecord oldRecord = new GenericData.Record(avroSchema); + oldRecord.put("field1", 2); + oldRecord.put("Op", "U"); + + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord)); + + try { + Option outputPayload = payload.combineAndGetUpdateValue(oldRecord, avroSchema); + // expect nothing to be comitted to table + assertFalse(outputPayload.isPresent()); + } catch (Exception e) { + fail("Unexpected exception"); + } + + } + + @Test + public void testPreCombineWithDelete() { + Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING); + GenericRecord deleteRecord = new GenericData.Record(avroSchema); + deleteRecord.put("field1", 4); + deleteRecord.put("Op", "D"); + + GenericRecord oldRecord = new GenericData.Record(avroSchema); + oldRecord.put("field1", 4); + oldRecord.put("Op", "I"); + + AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord)); + AWSDmsAvroPayload insertPayload = new AWSDmsAvroPayload(Option.of(oldRecord)); + + try { + OverwriteWithLatestAvroPayload output = payload.preCombine(insertPayload); + Option outputPayload = output.getInsertValue(avroSchema); + // expect nothing to be comitted to table + assertFalse(outputPayload.isPresent()); + } catch (Exception e) { + fail("Unexpected exception"); + } + } +}