diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index 2fbd71dc6ab26..41b0a107c2c44 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -301,6 +301,7 @@ private String getDefaultIndexType(EngineType engineType) { case SPARK: return HoodieIndex.IndexType.BLOOM.name(); case FLINK: + case JAVA: return HoodieIndex.IndexType.INMEMORY.name(); default: throw new HoodieNotSupportedException("Unsupported engine " + engineType); diff --git a/hudi-client/hudi-java-client/pom.xml b/hudi-client/hudi-java-client/pom.xml index 0ef741f924772..8a020c0e83195 100644 --- a/hudi-client/hudi-java-client/pom.xml +++ b/hudi-client/hudi-java-client/pom.xml @@ -66,6 +66,19 @@ ${project.version} test + + ${hive.groupid} + hive-exec + ${hive.version} + test + ${hive.exec.classifier} + + + ${hive.groupid} + hive-metastore + ${hive.version} + test + diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaBulkInsertInternalPartitionerFactory.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaBulkInsertInternalPartitionerFactory.java new file mode 100644 index 0000000000000..62523d3399054 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaBulkInsertInternalPartitionerFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.table.BulkInsertPartitioner; + +/** + * A factory to generate built-in partitioner to repartition input records into at least + * expected number of output spark partitions for bulk insert operation. + */ +public abstract class JavaBulkInsertInternalPartitionerFactory { + + public static BulkInsertPartitioner get(BulkInsertSortMode sortMode) { + switch (sortMode) { + case NONE: + return new JavaNonSortPartitioner(); + case GLOBAL_SORT: + return new JavaGlobalSortPartitioner(); + default: + throw new HoodieException("The bulk insert sort mode \"" + sortMode.name() + + "\" is not supported in java client."); + } + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java new file mode 100644 index 0000000000000..fded0ffab51bd --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.table.BulkInsertPartitioner; + +import java.util.Comparator; +import java.util.List; + +/** + * A built-in partitioner that does global sorting for the input records across partitions + * after repartition for bulk insert operation, corresponding to the + * {@code BulkInsertSortMode.GLOBAL_SORT} mode. + * + * @param HoodieRecordPayload type + */ +public class JavaGlobalSortPartitioner + implements BulkInsertPartitioner>> { + + @Override + public List> repartitionRecords(List> records, + int outputSparkPartitions) { + // Now, sort the records and line them up nicely for loading. + records.sort(new Comparator() { + @Override + public int compare(Object o1, Object o2) { + HoodieRecord o11 = (HoodieRecord) o1; + HoodieRecord o22 = (HoodieRecord) o2; + String left = new StringBuilder() + .append(o11.getPartitionPath()) + .append("+") + .append(o11.getRecordKey()) + .toString(); + String right = new StringBuilder() + .append(o22.getPartitionPath()) + .append("+") + .append(o22.getRecordKey()) + .toString(); + return left.compareTo(right); + } + }); + return records; + } + + @Override + public boolean arePartitionRecordsSorted() { + return true; + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaNonSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaNonSortPartitioner.java new file mode 100644 index 0000000000000..b40459d838444 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaNonSortPartitioner.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.table.BulkInsertPartitioner; + +import java.util.List; + +/** + * A built-in partitioner that only does coalesce for input records for bulk insert operation, + * corresponding to the {@code BulkInsertSortMode.NONE} mode. + * + * @param HoodieRecordPayload type + */ +public class JavaNonSortPartitioner + implements BulkInsertPartitioner>> { + + @Override + public List> repartitionRecords(List> records, + int outputPartitions) { + return records; + } + + @Override + public boolean arePartitionRecordsSorted() { + return false; + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 9895df3a3e254..157e11a55d6de 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -39,10 +39,17 @@ import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; import org.apache.hudi.table.action.clean.JavaCleanActionExecutor; import org.apache.hudi.table.action.commit.JavaDeleteCommitActionExecutor; +import org.apache.hudi.table.action.commit.JavaBulkInsertCommitActionExecutor; +import org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor; +import org.apache.hudi.table.action.commit.JavaInsertOverwriteCommitActionExecutor; +import org.apache.hudi.table.action.commit.JavaInsertOverwriteTableCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor; +import org.apache.hudi.table.action.restore.JavaCopyOnWriteRestoreActionExecutor; +import org.apache.hudi.table.action.rollback.JavaCopyOnWriteRollbackActionExecutor; +import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; import java.util.List; import java.util.Map; @@ -75,7 +82,8 @@ public HoodieWriteMetadata> bulkInsert(HoodieEngineContext con String instantTime, List> records, Option>>> bulkInsertPartitioner) { - throw new HoodieNotSupportedException("BulkInsert is not supported yet"); + return new JavaBulkInsertCommitActionExecutor((HoodieJavaEngineContext) context, config, + this, instantTime, records, bulkInsertPartitioner).execute(); } @Override @@ -112,21 +120,24 @@ public HoodieWriteMetadata> bulkInsertPrepped(HoodieEngineCont String instantTime, List> preppedRecords, Option>>> bulkInsertPartitioner) { - throw new HoodieNotSupportedException("BulkInsertPrepped is not supported yet"); + return new JavaBulkInsertPreppedCommitActionExecutor((HoodieJavaEngineContext) context, config, + this, instantTime, preppedRecords, bulkInsertPartitioner).execute(); } @Override public HoodieWriteMetadata> insertOverwrite(HoodieEngineContext context, String instantTime, List> records) { - throw new HoodieNotSupportedException("InsertOverwrite is not supported yet"); + return new JavaInsertOverwriteCommitActionExecutor( + context, config, this, instantTime, records).execute(); } @Override public HoodieWriteMetadata> insertOverwriteTable(HoodieEngineContext context, String instantTime, List> records) { - throw new HoodieNotSupportedException("InsertOverwrite is not supported yet"); + return new JavaInsertOverwriteTableCommitActionExecutor( + context, config, this, instantTime, records).execute(); } @Override @@ -175,7 +186,8 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) { - throw new HoodieNotSupportedException("Rollback is not supported yet"); + return new JavaCopyOnWriteRollbackActionExecutor( + context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute(); } @Override @@ -183,13 +195,15 @@ public HoodieSavepointMetadata savepoint(HoodieEngineContext context, String instantToSavepoint, String user, String comment) { - throw new HoodieNotSupportedException("Savepoint is not supported yet"); + return new SavepointActionExecutor( + context, config, this, instantToSavepoint, user, comment).execute(); } @Override public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) { - throw new HoodieNotSupportedException("Restore is not supported yet"); + return new JavaCopyOnWriteRestoreActionExecutor((HoodieJavaEngineContext) context, + config, this, restoreInstantTime, instantToRestore).execute(); } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java index 3e0b80c5d5a41..a4a6a4f92108c 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java @@ -121,6 +121,7 @@ public HoodieWriteMetadata> execute(List> inpu } }); updateIndex(writeStatuses, result); + updateIndexAndCommitIfNeeded(writeStatuses, result); return result; } @@ -297,8 +298,7 @@ protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, } @Override - public Iterator> handleInsert(String idPfx, Iterator> recordItr) - throws Exception { + public Iterator> handleInsert(String idPfx, Iterator> recordItr) { // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records if (!recordItr.hasNext()) { LOG.info("Empty partition"); @@ -325,4 +325,13 @@ public Partitioner getInsertPartitioner(WorkloadProfile profile) { return getUpsertPartitioner(profile); } + public void updateIndexAndCommitIfNeeded(List writeStatuses, HoodieWriteMetadata result) { + Instant indexStartTime = Instant.now(); + // Update the index back + List statuses = table.getIndex().updateLocation(writeStatuses, context, table); + result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now())); + result.setWriteStatuses(statuses); + result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(statuses)); + commitOnAutoCommit(result); + } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertCommitActionExecutor.java new file mode 100644 index 0000000000000..9780262fb2b92 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertCommitActionExecutor.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.List; +import java.util.Map; + +public class JavaBulkInsertCommitActionExecutor> extends BaseJavaCommitActionExecutor { + + private final List> inputRecords; + private final Option>>> bulkInsertPartitioner; + + public JavaBulkInsertCommitActionExecutor(HoodieJavaEngineContext context, HoodieWriteConfig config, HoodieTable table, + String instantTime, List> inputRecords, + Option>>> bulkInsertPartitioner) { + this(context, config, table, instantTime, inputRecords, bulkInsertPartitioner, Option.empty()); + } + + public JavaBulkInsertCommitActionExecutor(HoodieJavaEngineContext context, HoodieWriteConfig config, HoodieTable table, + String instantTime, List> inputRecords, + Option>>> bulkInsertPartitioner, + Option> extraMetadata) { + super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata); + this.inputRecords = inputRecords; + this.bulkInsertPartitioner = bulkInsertPartitioner; + } + + @Override + public HoodieWriteMetadata> execute() { + try { + return JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, table, config, + this, true, bulkInsertPartitioner); + } catch (HoodieInsertException ie) { + throw ie; + } catch (Throwable e) { + throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e); + } + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java new file mode 100644 index 0000000000000..cce8ad1b000df --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.execution.JavaLazyInsertIterable; +import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory; +import org.apache.hudi.io.CreateHandleFactory; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.ArrayList; +import java.util.List; + +/** + * A java implementation of {@link AbstractBulkInsertHelper}. + * + * @param + */ +@SuppressWarnings("checkstyle:LineLength") +public class JavaBulkInsertHelper extends AbstractBulkInsertHelper>, + List, List, R> { + + private JavaBulkInsertHelper() { + } + + private static class BulkInsertHelperHolder { + private static final JavaBulkInsertHelper JAVA_BULK_INSERT_HELPER = new JavaBulkInsertHelper(); + } + + public static JavaBulkInsertHelper newInstance() { + return BulkInsertHelperHolder.JAVA_BULK_INSERT_HELPER; + } + + @Override + public HoodieWriteMetadata> bulkInsert(final List> inputRecords, + final String instantTime, + final HoodieTable>, List, List> table, + final HoodieWriteConfig config, + final BaseCommitActionExecutor>, List, List, R> executor, + final boolean performDedupe, + final Option> userDefinedBulkInsertPartitioner) { + HoodieWriteMetadata result = new HoodieWriteMetadata(); + + //transition bulk_insert state to inflight + table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED, + table.getMetaClient().getCommitActionType(), instantTime), Option.empty(), + config.shouldAllowMultiWriteOnSameInstant()); + // write new files + List writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism()); + //update index + ((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); + return result; + } + + @Override + public List bulkInsert(List> inputRecords, + String instantTime, + HoodieTable>, List, List> table, + HoodieWriteConfig config, + boolean performDedupe, + Option> userDefinedBulkInsertPartitioner, + boolean useWriterSchema, + int parallelism) { + + // De-dupe/merge if needed + List> dedupedRecords = inputRecords; + + if (performDedupe) { + dedupedRecords = (List>) JavaWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords, + parallelism, table); + } + + final List> repartitionedRecords; + BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent() + ? userDefinedBulkInsertPartitioner.get() + : JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()); + repartitionedRecords = (List>) partitioner.repartitionRecords(dedupedRecords, parallelism); + + String idPfx = FSUtils.createNewFileIdPfx(); + + List writeStatuses = new ArrayList<>(); + + new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true, config, instantTime, table, idPfx, + table.getTaskContextSupplier(), new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll); + + return writeStatuses; + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java new file mode 100644 index 0000000000000..37b56b6325bc3 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.BulkInsertPartitioner; + +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.List; + +public class JavaBulkInsertPreppedCommitActionExecutor> + extends BaseJavaCommitActionExecutor { + + private final List> preppedInputRecord; + private final Option> userDefinedBulkInsertPartitioner; + + public JavaBulkInsertPreppedCommitActionExecutor(HoodieJavaEngineContext context, + HoodieWriteConfig config, HoodieTable table, + String instantTime, List> preppedInputRecord, + Option> userDefinedBulkInsertPartitioner) { + super(context, config, table, instantTime, WriteOperationType.BULK_INSERT); + this.preppedInputRecord = preppedInputRecord; + this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner; + } + + @Override + public HoodieWriteMetadata> execute() { + try { + return JavaBulkInsertHelper.newInstance().bulkInsert(preppedInputRecord, instantTime, table, config, + this, false, userDefinedBulkInsertPartitioner); + } catch (Throwable e) { + if (e instanceof HoodieInsertException) { + throw e; + } + throw new HoodieInsertException("Failed to bulk insert for commit time " + instantTime, e); + } + } +} \ No newline at end of file diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteCommitActionExecutor.java new file mode 100644 index 0000000000000..519cb76fc24af --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteCommitActionExecutor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class JavaInsertOverwriteCommitActionExecutor> + extends BaseJavaCommitActionExecutor { + + private final List> inputRecords; + + public JavaInsertOverwriteCommitActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, HoodieTable table, + String instantTime, List> inputRecords) { + this(context, config, table, instantTime, inputRecords, WriteOperationType.INSERT_OVERWRITE); + } + + public JavaInsertOverwriteCommitActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, HoodieTable table, + String instantTime, List> inputRecords, + WriteOperationType writeOperationType) { + super(context, config, table, instantTime, writeOperationType); + this.inputRecords = inputRecords; + } + + @Override + public HoodieWriteMetadata> execute() { + return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table, + config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false); + } + + @Override + protected String getCommitActionType() { + return HoodieTimeline.REPLACE_COMMIT_ACTION; + } + + @Override + protected Map> getPartitionToReplacedFileIds(List writeStatuses) { + return context.mapToPair( + writeStatuses.stream().map(status -> status.getStat().getPartitionPath()).distinct().collect(Collectors.toList()), + partitionPath -> + Pair.of(partitionPath, getAllExistingFileIds(partitionPath)), 1 + ); + } + + private List getAllExistingFileIds(String partitionPath) { + // because new commit is not complete. it is safe to mark all existing file Ids as old files + return table.getSliceView().getLatestFileSlices(partitionPath).map(fg -> fg.getFileId()).distinct().collect(Collectors.toList()); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteTableCommitActionExecutor.java new file mode 100644 index 0000000000000..ca6885ccf52d2 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertOverwriteTableCommitActionExecutor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class JavaInsertOverwriteTableCommitActionExecutor> + extends JavaInsertOverwriteCommitActionExecutor { + + public JavaInsertOverwriteTableCommitActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, HoodieTable table, + String instantTime, List> inputRecords) { + super(context, config, table, instantTime, inputRecords, WriteOperationType.INSERT_OVERWRITE_TABLE); + } + + protected List getAllExistingFileIds(String partitionPath) { + return table.getSliceView().getLatestFileSlices(partitionPath) + .map(fg -> fg.getFileId()).distinct().collect(Collectors.toList()); + } + + @Override + protected Map> getPartitionToReplacedFileIds(List writeStatuses) { + Map> partitionToExistingFileIds = new HashMap<>(); + List partitionPaths = FSUtils.getAllPartitionPaths(context, + table.getMetaClient().getBasePath(), config.useFileListingMetadata(), + config.getFileListingMetadataVerify(), config.shouldAssumeDatePartitioning()); + + if (partitionPaths != null && partitionPaths.size() > 0) { + partitionToExistingFileIds = context.mapToPair(partitionPaths, + partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)), 1); + } + return partitionToExistingFileIds; + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java new file mode 100644 index 0000000000000..75c1e0e30f255 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/restore/JavaCopyOnWriteRestoreActionExecutor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.restore; + +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.rollback.JavaCopyOnWriteRollbackActionExecutor; + +import java.util.List; + +public class JavaCopyOnWriteRestoreActionExecutor extends + BaseRestoreActionExecutor>, List, List> { + + public JavaCopyOnWriteRestoreActionExecutor(HoodieJavaEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + String restoreInstantTime) { + super(context, config, table, instantTime, restoreInstantTime); + } + + @Override + protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback) { + table.getMetaClient().reloadActiveTimeline(); + JavaCopyOnWriteRollbackActionExecutor rollbackActionExecutor = new JavaCopyOnWriteRollbackActionExecutor( + context, + config, + table, + HoodieActiveTimeline.createNewInstantTime(), + instantToRollback, + true, + true, + false); + if (!instantToRollback.getAction().equals(HoodieTimeline.COMMIT_ACTION) + && !instantToRollback.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + throw new HoodieRollbackException("Unsupported action in rollback instant:" + instantToRollback); + } + return rollbackActionExecutor.execute(); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java new file mode 100644 index 0000000000000..15e393220f083 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaCopyOnWriteRollbackActionExecutor.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.rollback; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import java.util.List; + +@SuppressWarnings("checkstyle:LineLength") +public class JavaCopyOnWriteRollbackActionExecutor extends + BaseCopyOnWriteRollbackActionExecutor>, List, List> { + public JavaCopyOnWriteRollbackActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable>, List, List> table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants) { + super(context, config, table, instantTime, commitInstant, deleteInstants); + } + + public JavaCopyOnWriteRollbackActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable>, List, List> table, + String instantTime, + HoodieInstant commitInstant, + boolean deleteInstants, + boolean skipTimelinePublish, + boolean useMarkerBasedStrategy) { + super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy); + } + + @Override + protected BaseRollbackActionExecutor.RollbackStrategy getRollbackStrategy() { + if (useMarkerBasedStrategy) { + return new JavaMarkerBasedRollbackStrategy(table, context, config, instantTime); + } else { + return this::executeRollbackUsingFileListing; + } + } + + @Override + protected List executeRollbackUsingFileListing(HoodieInstant instantToRollback) { + List rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW( + context, table.getMetaClient().getBasePath(), config); + return new JavaListingBasedRollbackHelper(table.getMetaClient(), config) + .performRollback(context, instantToRollback, rollbackRequests); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java new file mode 100644 index 0000000000000..5331ca5891c28 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaListingBasedRollbackHelper.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.rollback; + +import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieCommandBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieRollbackException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.PathFilter; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Performs Rollback of Hoodie Tables. + */ +public class JavaListingBasedRollbackHelper implements Serializable { + + private static final Logger LOG = LogManager.getLogger(JavaListingBasedRollbackHelper.class); + + private final HoodieTableMetaClient metaClient; + private final HoodieWriteConfig config; + + public JavaListingBasedRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig config) { + this.metaClient = metaClient; + this.config = config; + } + + /** + * Performs all rollback actions that we have collected in parallel. + */ + public List performRollback(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests) { + Map partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, true); + + Map>> collect = partitionPathRollbackStatsPairs.entrySet() + .stream() + .map(x -> Pair.of(x.getKey(), x.getValue())).collect(Collectors.groupingBy(Pair::getLeft)); + return collect.values().stream() + .map(pairs -> pairs.stream().map(Pair::getRight).reduce(RollbackUtils::mergeRollbackStat).orElse(null)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + /** + * Collect all file info that needs to be rollbacked. + */ + public List collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback, List rollbackRequests) { + Map partitionPathRollbackStatsPairs = maybeDeleteAndCollectStats(context, instantToRollback, rollbackRequests, false); + return new ArrayList<>(partitionPathRollbackStatsPairs.values()); + } + + /** + * May be delete interested files and collect stats or collect stats only. + * + * @param context instance of {@link HoodieEngineContext} to use. + * @param instantToRollback {@link HoodieInstant} of interest for which deletion or collect stats is requested. + * @param rollbackRequests List of {@link ListingBasedRollbackRequest} to be operated on. + * @param doDelete {@code true} if deletion has to be done. {@code false} if only stats are to be collected w/o performing any deletes. + * @return stats collected with or w/o actual deletions. + */ + Map maybeDeleteAndCollectStats(HoodieEngineContext context, + HoodieInstant instantToRollback, + List rollbackRequests, + boolean doDelete) { + return context.mapToPair(rollbackRequests, rollbackRequest -> { + switch (rollbackRequest.getType()) { + case DELETE_DATA_FILES_ONLY: { + final Map filesToDeletedStatus = deleteBaseFiles(metaClient, config, instantToRollback.getTimestamp(), + rollbackRequest.getPartitionPath(), doDelete); + return new ImmutablePair<>(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withDeletedFileResults(filesToDeletedStatus).build()); + } + case DELETE_DATA_AND_LOG_FILES: { + final Map filesToDeletedStatus = deleteBaseAndLogFiles(metaClient, config, instantToRollback.getTimestamp(), rollbackRequest.getPartitionPath(), doDelete); + return new ImmutablePair<>(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withDeletedFileResults(filesToDeletedStatus).build()); + } + case APPEND_ROLLBACK_BLOCK: { + HoodieLogFormat.Writer writer = null; + try { + writer = HoodieLogFormat.newWriterBuilder() + .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) + .withFileId(rollbackRequest.getFileId().get()) + .overBaseCommit(rollbackRequest.getLatestBaseInstant().get()).withFs(metaClient.getFs()) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); + + // generate metadata + if (doDelete) { + Map header = generateHeader(instantToRollback.getTimestamp()); + // if update belongs to an existing log file + writer.appendBlock(new HoodieCommandBlock(header)); + } + } catch (IOException | InterruptedException io) { + throw new HoodieRollbackException("Failed to rollback for instant " + instantToRollback, io); + } finally { + try { + if (writer != null) { + writer.close(); + } + } catch (IOException io) { + throw new HoodieIOException("Error appending rollback block..", io); + } + } + + // This step is intentionally done after writer is closed. Guarantees that + // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in + // cloud-storage : HUDI-168 + Map filesToNumBlocksRollback = Collections.singletonMap( + metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), 1L + ); + return new ImmutablePair<>(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) + .withRollbackBlockAppendResults(filesToNumBlocksRollback).build()); + } + default: + throw new IllegalStateException("Unknown Rollback action " + rollbackRequest); + } + }, 0); + } + + /** + * Common method used for cleaning out base files under a partition path during rollback of a set of commits. + */ + private Map deleteBaseAndLogFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, + String commit, String partitionPath, boolean doDelete) throws IOException { + LOG.info("Cleaning path " + partitionPath); + String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); + SerializablePathFilter filter = (path) -> { + if (path.toString().endsWith(basefileExtension)) { + String fileCommitTime = FSUtils.getCommitTime(path.getName()); + return commit.equals(fileCommitTime); + } else if (FSUtils.isLogFile(path)) { + // Since the baseCommitTime is the only commit for new log files, it's okay here + String fileCommitTime = FSUtils.getBaseCommitTimeFromLogPath(path); + return commit.equals(fileCommitTime); + } + return false; + }; + + final Map results = new HashMap<>(); + FileSystem fs = metaClient.getFs(); + FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); + for (FileStatus file : toBeDeleted) { + if (doDelete) { + boolean success = fs.delete(file.getPath(), false); + results.put(file, success); + LOG.info("Delete file " + file.getPath() + "\t" + success); + } else { + results.put(file, true); + } + } + return results; + } + + /** + * Common method used for cleaning out base files under a partition path during rollback of a set of commits. + */ + private Map deleteBaseFiles(HoodieTableMetaClient metaClient, HoodieWriteConfig config, + String commit, String partitionPath, boolean doDelete) throws IOException { + final Map results = new HashMap<>(); + LOG.info("Cleaning path " + partitionPath); + FileSystem fs = metaClient.getFs(); + String basefileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); + PathFilter filter = (path) -> { + if (path.toString().contains(basefileExtension)) { + String fileCommitTime = FSUtils.getCommitTime(path.getName()); + return commit.equals(fileCommitTime); + } + return false; + }; + FileStatus[] toBeDeleted = fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); + for (FileStatus file : toBeDeleted) { + if (doDelete) { + boolean success = fs.delete(file.getPath(), false); + results.put(file, success); + LOG.info("Delete file " + file.getPath() + "\t" + success); + } else { + results.put(file, true); + } + } + return results; + } + + private Map generateHeader(String commit) { + // generate metadata + Map header = new HashMap<>(3); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, metaClient.getActiveTimeline().lastInstant().get().getTimestamp()); + header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit); + header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + return header; + } + + public interface SerializablePathFilter extends PathFilter, Serializable { + + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java new file mode 100644 index 0000000000000..f1e2bf354e81b --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/rollback/JavaMarkerBasedRollbackStrategy.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.rollback; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.MarkerFiles; + +import java.util.List; +import java.util.stream.Collectors; + +@SuppressWarnings("checkstyle:LineLength") +public class JavaMarkerBasedRollbackStrategy extends AbstractMarkerBasedRollbackStrategy>, List, List> { + public JavaMarkerBasedRollbackStrategy(HoodieTable>, List, List> table, + HoodieEngineContext context, + HoodieWriteConfig config, + String instantTime) { + super(table, context, config, instantTime); + } + + @Override + public List execute(HoodieInstant instantToRollback) { + try { + MarkerFiles markerFiles = new MarkerFiles(table, instantToRollback.getTimestamp()); + List rollbackStats = context.map(markerFiles.allMarkerFilePaths(), markerFilePath -> { + String typeStr = markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1); + IOType type = IOType.valueOf(typeStr); + switch (type) { + case MERGE: + return undoMerge(MarkerFiles.stripMarkerSuffix(markerFilePath)); + case APPEND: + return undoAppend(MarkerFiles.stripMarkerSuffix(markerFilePath), instantToRollback); + case CREATE: + return undoCreate(MarkerFiles.stripMarkerSuffix(markerFilePath)); + default: + throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback); + } + }, 0); + + return rollbackStats.stream().map(rollbackStat -> Pair.of(rollbackStat.getPartitionPath(), rollbackStat)) + .collect(Collectors.groupingBy(Pair::getKey)) + .values() + .stream() + .map(x -> x.stream().map(y -> y.getValue()).reduce(RollbackUtils::mergeRollbackStat).get()) + .collect(Collectors.toList()); + } catch (Exception e) { + throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e); + } + } +} diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java new file mode 100644 index 0000000000000..17b174279b4d3 --- /dev/null +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.HoodieJavaWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.testutils.RawTripTestPayload; +import org.apache.hudi.common.testutils.Transformations; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.hadoop.HoodieParquetInputFormat; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; +import org.apache.hudi.io.HoodieCreateHandle; +import org.apache.hudi.table.HoodieJavaCopyOnWriteTable; +import org.apache.hudi.table.HoodieJavaTable; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hudi.testutils.HoodieJavaClientTestBase; +import org.apache.hudi.testutils.MetadataMergeWriteStatus; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.hadoop.ParquetReader; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; +import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase { + + private static final Logger LOG = LogManager.getLogger(TestJavaCopyOnWriteActionExecutor.class); + private static final Schema SCHEMA = getSchemaFromResource(TestJavaCopyOnWriteActionExecutor.class, "/exampleSchema.avsc"); + + @Test + public void testMakeNewPath() { + String fileName = UUID.randomUUID().toString(); + String partitionPath = "2016/05/04"; + + String instantTime = makeNewCommitTime(); + HoodieWriteConfig config = makeHoodieClientConfig(); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieJavaTable.create(config, context, metaClient); + + Pair newPathWithWriteToken = Arrays.asList(1).stream().map(x -> { + HoodieRecord record = mock(HoodieRecord.class); + when(record.getPartitionPath()).thenReturn(partitionPath); + String writeToken = FSUtils.makeWriteToken(context.getTaskContextSupplier().getPartitionIdSupplier().get(), + context.getTaskContextSupplier().getStageIdSupplier().get(), + context.getTaskContextSupplier().getAttemptIdSupplier().get()); + HoodieCreateHandle io = new HoodieCreateHandle(config, instantTime, table, partitionPath, fileName, + context.getTaskContextSupplier()); + return Pair.of(io.makeNewPath(record.getPartitionPath()), writeToken); + }).collect(Collectors.toList()).get(0); + + assertEquals(newPathWithWriteToken.getKey().toString(), Paths.get(this.basePath, partitionPath, + FSUtils.makeDataFileName(instantTime, newPathWithWriteToken.getRight(), fileName)).toString()); + } + + private HoodieWriteConfig makeHoodieClientConfig() { + return makeHoodieClientConfigBuilder().build(); + } + + private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() { + // Prepare the AvroParquetIO + return HoodieWriteConfig.newBuilder() + .withEngineType(EngineType.JAVA) + .withPath(basePath) + .withSchema(SCHEMA.toString()); + } + + @Test + public void testUpdateRecords() throws Exception { + // Prepare the AvroParquetIO + HoodieWriteConfig config = makeHoodieClientConfig(); + String firstCommitTime = makeNewCommitTime(); + HoodieJavaWriteClient writeClient = getHoodieWriteClient(config); + writeClient.startCommitWithTime(firstCommitTime); + metaClient = HoodieTableMetaClient.reload(metaClient); + + String partitionPath = "2016/01/31"; + HoodieJavaCopyOnWriteTable table = (HoodieJavaCopyOnWriteTable) HoodieJavaTable.create(config, context, metaClient); + + // Get some records belong to the same partition (2016/01/31) + String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; + String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; + String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + String recordStr4 = "{\"_row_key\":\"8eb5b87d-1fej-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":51}"; + + List records = new ArrayList<>(); + RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1); + records.add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1)); + RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2); + records.add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2)); + RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3); + records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3)); + + // Insert new records + final HoodieJavaCopyOnWriteTable cowTable = table; + writeClient.insert(records, firstCommitTime); + + FileStatus[] allFiles = getIncrementalFiles(partitionPath, "0", -1); + assertEquals(1, allFiles.length); + + // Read out the bloom filter and make sure filter can answer record exist or not + Path parquetFilePath = allFiles[0].getPath(); + BloomFilter filter = ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, parquetFilePath); + for (HoodieRecord record : records) { + assertTrue(filter.mightContain(record.getRecordKey())); + } + + // Read the parquet file, check the record content + List fileRecords = ParquetUtils.readAvroRecords(hadoopConf, parquetFilePath); + GenericRecord newRecord; + int index = 0; + for (GenericRecord record : fileRecords) { + //System.out.println("Got :" + record.get("_row_key").toString() + ", Exp :" + records.get(index).getRecordKey()); + assertEquals(records.get(index).getRecordKey(), record.get("_row_key").toString()); + index++; + } + + // We update the 1st record & add a new record + String updateRecordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + RawTripTestPayload updateRowChanges1 = new RawTripTestPayload(updateRecordStr1); + HoodieRecord updatedRecord1 = new HoodieRecord( + new HoodieKey(updateRowChanges1.getRowKey(), updateRowChanges1.getPartitionPath()), updateRowChanges1); + + RawTripTestPayload rowChange4 = new RawTripTestPayload(recordStr4); + HoodieRecord insertedRecord1 = + new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); + + List updatedRecords = Arrays.asList(updatedRecord1, insertedRecord1); + + Thread.sleep(1000); + String newCommitTime = makeNewCommitTime(); + metaClient = HoodieTableMetaClient.reload(metaClient); + writeClient.startCommitWithTime(newCommitTime); + List statuses = writeClient.upsert(updatedRecords, newCommitTime); + + allFiles = getIncrementalFiles(partitionPath, firstCommitTime, -1); + assertEquals(1, allFiles.length); + // verify new incremental file group is same as the previous one + assertEquals(FSUtils.getFileId(parquetFilePath.getName()), FSUtils.getFileId(allFiles[0].getPath().getName())); + + // Check whether the record has been updated + Path updatedParquetFilePath = allFiles[0].getPath(); + BloomFilter updatedFilter = + ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, updatedParquetFilePath); + for (HoodieRecord record : records) { + // No change to the _row_key + assertTrue(updatedFilter.mightContain(record.getRecordKey())); + } + + assertTrue(updatedFilter.mightContain(insertedRecord1.getRecordKey())); + records.add(insertedRecord1);// add this so it can further check below + + ParquetReader updatedReader = ParquetReader.builder(new AvroReadSupport<>(), updatedParquetFilePath).build(); + index = 0; + while ((newRecord = (GenericRecord) updatedReader.read()) != null) { + assertEquals(newRecord.get("_row_key").toString(), records.get(index).getRecordKey()); + if (index == 0) { + assertEquals("15", newRecord.get("number").toString()); + } + index++; + } + updatedReader.close(); + // Also check the numRecordsWritten + WriteStatus writeStatus = statuses.get(0); + assertEquals(1, statuses.size(), "Should be only one file generated"); + assertEquals(4, writeStatus.getStat().getNumWrites());// 3 rewritten records + 1 new record + } + + private FileStatus[] getIncrementalFiles(String partitionPath, String startCommitTime, int numCommitsToPull) + throws Exception { + // initialize parquet input format + HoodieParquetInputFormat hoodieInputFormat = new HoodieParquetInputFormat(); + JobConf jobConf = new JobConf(hadoopConf); + hoodieInputFormat.setConf(jobConf); + HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE); + setupIncremental(jobConf, startCommitTime, numCommitsToPull); + FileInputFormat.setInputPaths(jobConf, Paths.get(basePath, partitionPath).toString()); + return hoodieInputFormat.listStatus(jobConf); + } + + private void setupIncremental(JobConf jobConf, String startCommit, int numberOfCommitsToPull) { + String modePropertyName = + String.format(HoodieHiveUtils.HOODIE_CONSUME_MODE_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(modePropertyName, HoodieHiveUtils.INCREMENTAL_SCAN_MODE); + + String startCommitTimestampName = + String.format(HoodieHiveUtils.HOODIE_START_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.set(startCommitTimestampName, startCommit); + + String maxCommitPulls = + String.format(HoodieHiveUtils.HOODIE_MAX_COMMIT_PATTERN, HoodieTestUtils.RAW_TRIPS_TEST_NAME); + jobConf.setInt(maxCommitPulls, numberOfCommitsToPull); + } + + private List newHoodieRecords(int n, String time) throws Exception { + List records = new ArrayList<>(); + for (int i = 0; i < n; i++) { + String recordStr = + String.format("{\"_row_key\":\"%s\",\"time\":\"%s\",\"number\":%d}", UUID.randomUUID().toString(), time, i); + RawTripTestPayload rowChange = new RawTripTestPayload(recordStr); + records.add(new HoodieRecord(new HoodieKey(rowChange.getRowKey(), rowChange.getPartitionPath()), rowChange)); + } + return records; + } + + // Check if record level metadata is aggregated properly at the end of write. + @Test + public void testMetadataAggregateFromWriteStatus() throws Exception { + // Prepare the AvroParquetIO + HoodieWriteConfig config = + makeHoodieClientConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class).build(); + String firstCommitTime = makeNewCommitTime(); + metaClient = HoodieTableMetaClient.reload(metaClient); + + HoodieJavaCopyOnWriteTable table = (HoodieJavaCopyOnWriteTable) HoodieJavaTable.create(config, context, metaClient); + + // Get some records belong to the same partition (2016/01/31) + String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; + String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; + String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + + List records = new ArrayList<>(); + RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1); + records.add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1)); + RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2); + records.add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2)); + RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3); + records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3)); + + // Insert new records + BaseJavaCommitActionExecutor actionExecutor = new JavaInsertCommitActionExecutor(context, config, table, + firstCommitTime, records); + List writeStatuses = new ArrayList<>(); + actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator()) + .forEachRemaining(x -> writeStatuses.addAll((List)x)); + + Map allWriteStatusMergedMetadataMap = + MetadataMergeWriteStatus.mergeMetadataForWriteStatuses(writeStatuses); + assertTrue(allWriteStatusMergedMetadataMap.containsKey("InputRecordCount_1506582000")); + // For metadata key InputRecordCount_1506582000, value is 2 for each record. So sum of this + // should be 2 * 3 + assertEquals("6", allWriteStatusMergedMetadataMap.get("InputRecordCount_1506582000")); + } + + private void verifyStatusResult(List statuses, Map expectedPartitionNumRecords) { + Map actualPartitionNumRecords = new HashMap<>(); + + for (int i = 0; i < statuses.size(); i++) { + WriteStatus writeStatus = statuses.get(i); + String partitionPath = writeStatus.getPartitionPath(); + actualPartitionNumRecords.put( + partitionPath, + actualPartitionNumRecords.getOrDefault(partitionPath, 0L) + writeStatus.getTotalRecords()); + assertEquals(0, writeStatus.getFailedRecords().size()); + } + + assertEquals(expectedPartitionNumRecords, actualPartitionNumRecords); + } + + @Test + public void testInsertRecords() throws Exception { + HoodieWriteConfig config = makeHoodieClientConfig(); + String instantTime = makeNewCommitTime(); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieJavaCopyOnWriteTable table = (HoodieJavaCopyOnWriteTable) HoodieJavaTable.create(config, context, metaClient); + + // Case 1: + // 10 records for partition 1, 1 record for partition 2. + List records = newHoodieRecords(10, "2016-01-31T03:16:41.415Z"); + records.addAll(newHoodieRecords(1, "2016-02-01T03:16:41.415Z")); + + // Insert new records + final List recs2 = records; + BaseJavaCommitActionExecutor actionExecutor = new JavaInsertPreppedCommitActionExecutor(context, config, table, + instantTime, recs2); + + final List returnedStatuses = new ArrayList<>(); + actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs2.iterator()) + .forEachRemaining(x -> returnedStatuses.addAll((List)x)); + + assertEquals(2, returnedStatuses.size()); + Map expectedPartitionNumRecords = new HashMap<>(); + expectedPartitionNumRecords.put("2016/01/31", 10L); + expectedPartitionNumRecords.put("2016/02/01", 1L); + verifyStatusResult(returnedStatuses, expectedPartitionNumRecords); + + // Case 2: + // 1 record for partition 1, 5 record for partition 2, 1 records for partition 3. + records = newHoodieRecords(1, "2016-01-31T03:16:41.415Z"); + records.addAll(newHoodieRecords(5, "2016-02-01T03:16:41.415Z")); + records.addAll(newHoodieRecords(1, "2016-02-02T03:16:41.415Z")); + + // Insert new records + final List recs3 = records; + BaseJavaCommitActionExecutor newActionExecutor = new JavaUpsertPreppedCommitActionExecutor(context, config, table, + instantTime, recs3); + + final List returnedStatuses1 = new ArrayList<>(); + newActionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs3.iterator()) + .forEachRemaining(x -> returnedStatuses1.addAll((List)x)); + + assertEquals(3, returnedStatuses1.size()); + expectedPartitionNumRecords.clear(); + expectedPartitionNumRecords.put("2016/01/31", 1L); + expectedPartitionNumRecords.put("2016/02/01", 5L); + expectedPartitionNumRecords.put("2016/02/02", 1L); + verifyStatusResult(returnedStatuses1, expectedPartitionNumRecords); + } + + @Test + public void testFileSizeUpsertRecords() throws Exception { + HoodieWriteConfig config = makeHoodieClientConfigBuilder().withStorageConfig(HoodieStorageConfig.newBuilder() + .parquetMaxFileSize(64 * 1024).hfileMaxFileSize(64 * 1024) + .parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024).build()).build(); + + String instantTime = makeNewCommitTime(); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieJavaCopyOnWriteTable table = (HoodieJavaCopyOnWriteTable) HoodieJavaTable.create(config, context, metaClient); + + List records = new ArrayList<>(); + // Approx 1150 records are written for block size of 64KB + for (int i = 0; i < 2000; i++) { + String recordStr = "{\"_row_key\":\"" + UUID.randomUUID().toString() + + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":" + i + "}"; + RawTripTestPayload rowChange = new RawTripTestPayload(recordStr); + records.add(new HoodieRecord(new HoodieKey(rowChange.getRowKey(), rowChange.getPartitionPath()), rowChange)); + } + + // Insert new records + BaseJavaCommitActionExecutor actionExecutor = new JavaUpsertCommitActionExecutor(context, config, table, + instantTime, records); + + Arrays.asList(1).stream() + .map(i -> actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator())) + .map(Transformations::flatten).collect(Collectors.toList()); + + // Check the updated file + int counts = 0; + for (File file : Paths.get(basePath, "2016/01/31").toFile().listFiles()) { + if (file.getName().endsWith(".parquet") && FSUtils.getCommitTime(file.getName()).equals(instantTime)) { + LOG.info(file.getName() + "-" + file.length()); + counts++; + } + } + assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file"); + } + + @Test + public void testInsertUpsertWithHoodieAvroPayload() throws Exception { + Schema schema = getSchemaFromResource(TestJavaCopyOnWriteActionExecutor.class, "/testDataGeneratorSchema.txt"); + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .withEngineType(EngineType.JAVA) + .withPath(basePath) + .withSchema(schema.toString()) + .withStorageConfig(HoodieStorageConfig.newBuilder() + .parquetMaxFileSize(1000 * 1024).hfileMaxFileSize(1000 * 1024).build()) + .build(); + metaClient = HoodieTableMetaClient.reload(metaClient); + final HoodieJavaCopyOnWriteTable table = (HoodieJavaCopyOnWriteTable) HoodieJavaTable.create(config, context, metaClient); + String instantTime = "000"; + // Perform inserts of 100 records to test CreateHandle and BufferedExecutor + final List inserts = dataGen.generateInsertsWithHoodieAvroPayload(instantTime, 100); + BaseJavaCommitActionExecutor actionExecutor = new JavaInsertCommitActionExecutor(context, config, table, + instantTime, inserts); + + final List> ws = new ArrayList<>(); + actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator()) + .forEachRemaining(x -> ws.add((List)x)); + + WriteStatus writeStatus = ws.get(0).get(0); + String fileId = writeStatus.getFileId(); + metaClient.getFs().create(new Path(Paths.get(basePath, ".hoodie", "000.commit").toString())).close(); + final List updates = dataGen.generateUpdatesWithHoodieAvroPayload(instantTime, inserts); + + String partitionPath = writeStatus.getPartitionPath(); + long numRecordsInPartition = updates.stream().filter(u -> u.getPartitionPath().equals(partitionPath)).count(); + BaseJavaCommitActionExecutor newActionExecutor = new JavaUpsertCommitActionExecutor(context, config, table, + instantTime, updates); + + taskContextSupplier.reset(); + final List> updateStatus = new ArrayList<>(); + newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator()) + .forEachRemaining(x -> updateStatus.add((List)x)); + assertEquals(updates.size() - numRecordsInPartition, updateStatus.get(0).get(0).getTotalErrorRecords()); + } + + public void testBulkInsertRecords(String bulkInsertMode) throws Exception { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA) + .withBulkInsertParallelism(2).withBulkInsertSortMode(bulkInsertMode).build(); + String instantTime = makeNewCommitTime(); + HoodieJavaWriteClient writeClient = getHoodieWriteClient(config); + writeClient.startCommitWithTime(instantTime); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieJavaCopyOnWriteTable table = (HoodieJavaCopyOnWriteTable) HoodieJavaTable.create(config, context, metaClient); + + // Insert new records + final List inputRecords = generateTestRecordsForBulkInsert(); + JavaBulkInsertCommitActionExecutor bulkInsertExecutor = new JavaBulkInsertCommitActionExecutor( + context, config, table, instantTime, inputRecords, Option.empty()); + List returnedStatuses = (List)bulkInsertExecutor.execute().getWriteStatuses(); + verifyStatusResult(returnedStatuses, generateExpectedPartitionNumRecords(inputRecords)); + } + + public static Map generateExpectedPartitionNumRecords(List records) { + return records.stream().map(record -> Pair.of(record.getPartitionPath(), 1)) + .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting())); + } + + public static List generateTestRecordsForBulkInsert() { + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + // RDD partition 1 + List records1 = dataGenerator.generateInserts("0", 100); + // RDD partition 2 + List records2 = dataGenerator.generateInserts("0", 150); + records1.addAll(records2); + return records1; + } +} diff --git a/hudi-client/hudi-java-client/src/test/resources/testDataGeneratorSchema.txt b/hudi-client/hudi-java-client/src/test/resources/testDataGeneratorSchema.txt new file mode 100644 index 0000000000000..ada01b3530ff5 --- /dev/null +++ b/hudi-client/hudi-java-client/src/test/resources/testDataGeneratorSchema.txt @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "type" : "record", + "name" : "triprec", + "fields" : [ + { + "name" : "timestamp", + "type" : "long" + }, { + "name" : "_row_key", + "type" : "string" + }, { + "name" : "rider", + "type" : "string" + }, { + "name" : "driver", + "type" : "string" + }, { + "name" : "begin_lat", + "type" : "double" + }, { + "name" : "begin_lon", + "type" : "double" + }, { + "name" : "end_lat", + "type" : "double" + }, { + "name" : "end_lon", + "type" : "double" + }, { + "name" : "distance_in_meters", + "type" : "int" + }, { + "name" : "seconds_since_epoch", + "type" : "long" + }, { + "name" : "weight", + "type" : "float" + },{ + "name" : "nation", + "type" : "bytes" + },{ + "name" : "current_date", + "type" : { + "type" : "int", + "logicalType" : "date" + } + },{ + "name" : "current_ts", + "type" : { + "type" : "long" + } + },{ + "name" : "height", + "type" : { + "type" : "fixed", + "name" : "abc", + "size" : 5, + "logicalType" : "decimal", + "precision" : 10, + "scale": 6 + } + }, { + "name" :"city_to_state", + "type" : { + "type" : "map", + "values": "string" + } + }, + { + "name" : "fare", + "type" : { + "type" : "record", + "name" : "fare", + "fields" : [ + { + "name" : "amount", + "type" : "double" + }, + { + "name" : "currency", + "type" : "string" + } + ] + } + }, + { + "name" : "tip_history", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "tip_history", + "fields" : [ + { + "name" : "amount", + "type" : "double" + }, + { + "name" : "currency", + "type" : "string" + } + ] + } + } + }, + { + "name" : "_hoodie_is_deleted", + "type" : "boolean", + "default" : false + } ] +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineType.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineType.java index 5834fa979c221..5010452a5bffb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/EngineType.java @@ -22,5 +22,5 @@ * Hoodie data processing engine. support only Apache Spark and Apache Flink for now. */ public enum EngineType { - SPARK, FLINK + SPARK, FLINK, JAVA }