diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithCustomAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithCustomAvroPayload.java new file mode 100644 index 0000000000000..e61f66b532b8f --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithCustomAvroPayload.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.ColumnNotFoundException; +import org.apache.hudi.exception.UpdateKeyNotFoundException; +import org.apache.hudi.exception.WriteOperationException; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.stream.Collectors; + +/** + * subclass of OverwriteWithLatestAvroPayload used for delta streamer. + * + *
    + *
  1. combineAndGetUpdateValue - Accepts the column names to be updated; + *
  2. splitKeys - Split keys based upon keys; + *
+ */ +public class OverwriteWithCustomAvroPayload extends OverwriteWithLatestAvroPayload { + + public OverwriteWithCustomAvroPayload(GenericRecord record, Comparable orderingVal) { + super(record, orderingVal); + } + + /** + * split keys over. + */ + public List splitKeys(String keys) throws UpdateKeyNotFoundException { + if (keys == null) { + throw new UpdateKeyNotFoundException("keys cannot be null"); + } else if (keys.equals("")) { + throw new UpdateKeyNotFoundException("keys cannot be blank"); + } else { + return Arrays.stream(keys.split(",")).collect(Collectors.toList()); + } + } + + /** + * check column exi. + */ + public boolean checkColumnExists(List keys, Schema schema) { + List field = schema.getFields(); + List common = new ArrayList<>(); + for (Schema.Field columns : field) { + if (keys.contains(columns.name())) { + common.add(columns); + } + } + return common.size() == keys.size(); + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) + throws WriteOperationException, IOException, ColumnNotFoundException, UpdateKeyNotFoundException { + + if (!properties.getProperty("hoodie.datasource.write.operation").equals("upsert")) { + throw new WriteOperationException("write should be upsert"); + } + + Option recordOption = getInsertValue(schema); + + if (!recordOption.isPresent()) { + return Option.empty(); + } + + GenericRecord existingRecord = (GenericRecord) currentValue; + GenericRecord incomingRecord = (GenericRecord) recordOption.get(); + List keys = splitKeys(properties.getProperty("hoodie.update.keys")); + + if (checkColumnExists(keys, schema)) { + for (String key : keys) { + Object value = incomingRecord.get(key); + existingRecord.put(key, value); + } + return Option.of(existingRecord); + } else { + throw new ColumnNotFoundException("Update key not present please check the names"); + } + } + +} diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/ColumnNotFoundException.java b/hudi-common/src/main/java/org/apache/hudi/exception/ColumnNotFoundException.java new file mode 100644 index 0000000000000..6cb1bcb305bec --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/ColumnNotFoundException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +public class ColumnNotFoundException extends HoodieIOException { + public ColumnNotFoundException(String message) { + super(message); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/UpdateKeyNotFoundException.java b/hudi-common/src/main/java/org/apache/hudi/exception/UpdateKeyNotFoundException.java new file mode 100644 index 0000000000000..800cbb4e6d58f --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/UpdateKeyNotFoundException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +public class UpdateKeyNotFoundException extends HoodieIOException { + public UpdateKeyNotFoundException(String message) { + super(message); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/WriteOperationException.java b/hudi-common/src/main/java/org/apache/hudi/exception/WriteOperationException.java new file mode 100644 index 0000000000000..788b477dfacf9 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/WriteOperationException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +public class WriteOperationException extends HoodieException { + public WriteOperationException(String message) { + super(message); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithCustomAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithCustomAvroPayload.java new file mode 100644 index 0000000000000..5da1f4b6809fb --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithCustomAvroPayload.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Unit tests {@link OverwriteWithCustomAvroPayload}. + */ +public class TestOverwriteWithCustomAvroPayload { + + private Schema schema; + + @BeforeEach + public void setUp() throws Exception { + schema = Schema.createRecord(Arrays.asList( + new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null), + new Schema.Field("partition", Schema.create(Schema.Type.STRING), "", null), + new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null), + new Schema.Field("_hoodie_is_deleted", Schema.create(Schema.Type.BOOLEAN), "", false) + )); + } + + @Test + public void testsplitKeys() throws IOException { + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition0"); + record1.put("ts", 0L); + record1.put("_hoodie_is_deleted", false); + OverwriteWithCustomAvroPayload payload1 = new OverwriteWithCustomAvroPayload(record1, 1); + assertEquals(payload1.splitKeys("id,ts").size(), Arrays.asList("id", "ts").size()); + assertEquals(payload1.splitKeys("id,ts"), Arrays.asList("id", "ts")); + } + +} + diff --git a/hudi-sync/hudi-hive-sync/pom.xml b/hudi-sync/hudi-hive-sync/pom.xml index 44e5f9eeedcdc..6f3635b6e192a 100644 --- a/hudi-sync/hudi-hive-sync/pom.xml +++ b/hudi-sync/hudi-hive-sync/pom.xml @@ -99,7 +99,7 @@ hadoop-hdfs tests - + ${hive.groupid} diff --git a/pom.xml b/pom.xml index 8a9a9937cad46..26661e9bd2d9b 100644 --- a/pom.xml +++ b/pom.xml @@ -1017,6 +1017,10 @@ false + + pentaho.org + https://public.nexus.pentaho.org/repository/proxy-public-3rd-party-release/ + cloudera-repo-releases https://repository.cloudera.com/artifactory/public/ diff --git a/style/checkstyle.xml b/style/checkstyle.xml index 7dbce7973bfde..27c7f244d9507 100644 --- a/style/checkstyle.xml +++ b/style/checkstyle.xml @@ -1,37 +1,9 @@ - - - - - - + @@ -42,11 +14,12 @@ + - + @@ -55,8 +28,10 @@ - - + + @@ -77,12 +52,14 @@ - + - + @@ -90,9 +67,9 @@ - + value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement (4.1.3)"/> + @@ -137,58 +114,58 @@ + value="Package name ''{0}'' must match pattern ''{1}''."/> + value="Type name ''{0}'' must match pattern ''{1}''."/> + value="Member name ''{0}'' must match pattern ''{1}''."/> + value="Parameter name ''{0}'' must match pattern ''{1}''."/> + value="Catch parameter name ''{0}'' must match pattern ''{1}''."/> + value="Local variable name ''{0}'' must match pattern ''{1}''."/> + value="Class type name ''{0}'' must match pattern ''{1}''."/> + value="Method type name ''{0}'' must match pattern ''{1}''."/> + value="Interface type name ''{0}'' must match pattern ''{1}''."/> - - - + value="GenericWhitespace ''{0}'' is followed by whitespace."/> + + + @@ -213,13 +190,14 @@ - + - + @@ -254,7 +232,7 @@ + value="Method name ''{0}'' must match pattern ''{1}''."/> @@ -296,10 +274,10 @@ - + - + @@ -310,4 +288,4 @@ - + \ No newline at end of file