diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java similarity index 100% rename from hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/common/function/FunctionWrapper.java diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java new file mode 100644 index 000000000000..022170567fa3 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/DummyTaskContextSupplier.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi; + +import org.apache.hudi.client.common.EngineProperty; +import org.apache.hudi.client.common.TaskContextSupplier; +import org.apache.hudi.common.util.Option; + +import java.util.function.Supplier; + +public class DummyTaskContextSupplier extends TaskContextSupplier { + + @Override + public Supplier getPartitionIdSupplier() { + return null; + } + + @Override + public Supplier getStageIdSupplier() { + return null; + } + + @Override + public Supplier getAttemptIdSupplier() { + return null; + } + + @Override + public Option getProperty(EngineProperty prop) { + return null; + } +} diff --git a/hudi-client/hudi-java-client/pom.xml b/hudi-client/hudi-java-client/pom.xml new file mode 100644 index 000000000000..6429adedc6e1 --- /dev/null +++ b/hudi-client/hudi-java-client/pom.xml @@ -0,0 +1,152 @@ + + + + + hudi-client + org.apache.hudi + 0.6.1-SNAPSHOT + + 4.0.0 + + hudi-java-client + ${parent.version} + + hudi-java-client + jar + + + + + org.apache.hudi + hudi-client-common + ${parent.version} + + + + + org.apache.hudi + hudi-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-client-common + ${project.version} + tests + test-jar + test + + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.junit.vintage + junit-vintage-engine + test + + + org.junit.jupiter + junit-jupiter-params + test + + + org.mockito + mockito-junit-jupiter + test + + + org.junit.platform + junit-platform-runner + test + + + org.junit.platform + junit-platform-suite-api + test + + + org.junit.platform + junit-platform-commons + test + + + + + + + org.jacoco + jacoco-maven-plugin + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + test-compile + + + + false + + + + org.apache.rat + apache-rat-plugin + + + + + + src/main/resources + + + src/test/resources + + + + + diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java new file mode 100644 index 000000000000..a04a18b19096 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.common; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.client.common.function.SerializableConsumer; +import org.apache.hudi.client.common.function.SerializableFunction; +import org.apache.hudi.client.common.function.SerializablePairFunction; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.util.Option; + +import scala.Tuple2; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; +import static org.apache.hudi.client.common.function.FunctionWrapper.throwingFlatMapWrapper; +import static org.apache.hudi.client.common.function.FunctionWrapper.throwingForeachWrapper; +import static org.apache.hudi.client.common.function.FunctionWrapper.throwingMapToPairWrapper; +import static org.apache.hudi.client.common.function.FunctionWrapper.throwingMapWrapper; + +/** + * A java engine implementation of HoodieEngineContext. + */ +public class HoodieJavaEngineContext extends HoodieEngineContext { + + public HoodieJavaEngineContext(Configuration conf, TaskContextSupplier taskContextSupplier) { + super(new SerializableConfiguration(conf), taskContextSupplier); + } + + @Override + public List map(List data, SerializableFunction func, int parallelism) { + return data.stream().parallel().map(throwingMapWrapper(func)).collect(toList()); + } + + @Override + public List flatMap(List data, SerializableFunction> func, int parallelism) { + return data.stream().parallel().flatMap(throwingFlatMapWrapper(func)).collect(toList()); + } + + @Override + public void foreach(List data, SerializableConsumer consumer, int parallelism) { + data.stream().forEach(throwingForeachWrapper(consumer)); + } + + @Override + public Map mapToPair(List data, SerializablePairFunction func, Integer parallelism) { + return data.stream().map(throwingMapToPairWrapper(func)).collect( + Collectors.toMap(Tuple2::_1, Tuple2::_2, (oldVal, newVal) -> newVal) + ); + } + + @Override + public void setProperty(EngineProperty key, String value) { + // no operation for now + } + + @Override + public Option getProperty(EngineProperty key) { + return Option.empty(); + } + + @Override + public void setJobStatus(String activeModule, String activityDescription) { + // no operation for now + } +} diff --git a/hudi-client/hudi-java-client/src/main/resources/log4j.properties b/hudi-client/hudi-java-client/src/main/resources/log4j.properties new file mode 100644 index 000000000000..ff268faf6363 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/resources/log4j.properties @@ -0,0 +1,23 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=INFO, A1 +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/common/TestHoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/common/TestHoodieJavaEngineContext.java new file mode 100644 index 000000000000..b81c11b710f7 --- /dev/null +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/common/TestHoodieJavaEngineContext.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.common; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.DummyTaskContextSupplier; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Assertions; + +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class TestHoodieJavaEngineContext { + private HoodieJavaEngineContext context = + new HoodieJavaEngineContext(new Configuration(), new DummyTaskContextSupplier()); + + @Test + public void testMap() { + List mapList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + List result = context.map(mapList, x -> x + 1, 2); + result.removeAll(mapList); + + Assertions.assertEquals(1, result.size()); + Assertions.assertEquals(11, result.get(0)); + } + + @Test + public void testFlatMap() { + List list1 = Arrays.asList("a", "b", "c"); + List list2 = Arrays.asList("d", "e", "f"); + List list3 = Arrays.asList("g", "h", "i"); + + List> inputList = new ArrayList<>(); + inputList.add(list1); + inputList.add(list2); + inputList.add(list3); + + List result = context.flatMap(inputList, Collection::stream, 2); + + Assertions.assertEquals(9, result.size()); + } + + @Test + public void testForeach() { + List mapList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + List result = new ArrayList<>(10); + context.foreach(mapList, result::add, 2); + + Assertions.assertEquals(result.size(), mapList.size()); + Assertions.assertTrue(result.containsAll(mapList)); + } + + @Test + public void testMapToPair() { + List mapList = Arrays.asList("hudi_flink", "hudi_spark", "hudi_java"); + + Map resultMap = context.mapToPair(mapList, x -> { + String[] splits = x.split("_"); + return Tuple2.apply(splits[0], splits[1]); + }, 2); + + Assertions.assertNotNull(resultMap.get("hudi")); + } +} diff --git a/hudi-client/hudi-java-client/src/test/resources/log4j-surefire.properties b/hudi-client/hudi-java-client/src/test/resources/log4j-surefire.properties new file mode 100644 index 000000000000..32af462093ae --- /dev/null +++ b/hudi-client/hudi-java-client/src/test/resources/log4j-surefire.properties @@ -0,0 +1,31 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=WARN, CONSOLE +log4j.logger.org.apache=INFO +log4j.logger.org.apache.hudi=DEBUG +log4j.logger.org.apache.hadoop.hbase=ERROR + +# A1 is set to be a ConsoleAppender. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +# A1 uses PatternLayout. +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter +log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true +log4j.appender.CONSOLE.filter.a.LevelMin=WARN +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml index e8ff9e905bd4..cb838390e07a 100644 --- a/hudi-client/pom.xml +++ b/hudi-client/pom.xml @@ -32,6 +32,7 @@ hudi-client-common + hudi-java-client hudi-spark-client hudi-flink-client