diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java similarity index 100% rename from hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/Partitioner.java diff --git a/hudi-client/hudi-java-client/pom.xml b/hudi-client/hudi-java-client/pom.xml index 6429adedc6e12..30835e2440e11 100644 --- a/hudi-client/hudi-java-client/pom.xml +++ b/hudi-client/hudi-java-client/pom.xml @@ -55,6 +55,12 @@ test-jar test + + org.apache.hudi + hudi-hadoop-mr + ${project.version} + test + diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java new file mode 100644 index 0000000000000..67a6071599665 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import com.codahale.metrics.Timer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.client.embedded.EmbeddedTimelineService; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.index.JavaHoodieIndex; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.table.HoodieJavaTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class HoodieJavaWriteClient extends + AbstractHoodieWriteClient>, List, List> { + + public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { + super(context, clientConfig); + } + + public HoodieJavaWriteClient(HoodieEngineContext context, + HoodieWriteConfig writeConfig, + boolean rollbackPending, + Option timelineService) { + super(context, writeConfig, rollbackPending, timelineService); + } + + @Override + public List> filterExists(List> hoodieRecords) { + // Create a Hoodie table which encapsulated the commits and files visible + HoodieJavaTable table = HoodieJavaTable.create(config, (HoodieJavaEngineContext) context); + Timer.Context indexTimer = metrics.getIndexCtx(); + List> recordsWithLocation = getIndex().tagLocation(hoodieRecords, context, table); + metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); + return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList()); + } + + @Override + protected HoodieIndex>, List, List> createIndex(HoodieWriteConfig writeConfig) { + return JavaHoodieIndex.createIndex(config); + } + + @Override + public boolean commit(String instantTime, + List writeStatuses, + Option> extraMetadata, + String commitActionType, + Map> partitionToReplacedFileIds) { + List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); + return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds); + } + + @Override + protected HoodieTable>, List, List> createTable(HoodieWriteConfig config, + Configuration hadoopConf) { + return HoodieJavaTable.create(config, context); + } + + @Override + public List upsert(List> records, + String instantTime) { + HoodieTable>, List, List> table = + getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); + table.validateUpsertSchema(); + setOperationType(WriteOperationType.UPSERT); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); + HoodieWriteMetadata> result = table.upsert(context, instantTime, records); + if (result.getIndexLookupDuration().isPresent()) { + metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); + } + return postWrite(result, instantTime, table); + } + + @Override + public List upsertPreppedRecords(List> preppedRecords, + String instantTime) { + HoodieTable>, List, List> table = + getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime); + table.validateUpsertSchema(); + setOperationType(WriteOperationType.UPSERT_PREPPED); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); + HoodieWriteMetadata> result = table.upsertPrepped(context,instantTime, preppedRecords); + return postWrite(result, instantTime, table); + } + + @Override + public List insert(List> records, String instantTime) { + HoodieTable>, List, List> table = + getTableAndInitCtx(WriteOperationType.INSERT, instantTime); + table.validateUpsertSchema(); + setOperationType(WriteOperationType.INSERT); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); + HoodieWriteMetadata> result = table.insert(context, instantTime, records); + if (result.getIndexLookupDuration().isPresent()) { + metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); + } + return postWrite(result, instantTime, table); + } + + @Override + public List insertPreppedRecords(List> preppedRecords, + String instantTime) { + HoodieTable>, List, List> table = + getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime); + table.validateInsertSchema(); + setOperationType(WriteOperationType.INSERT_PREPPED); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); + HoodieWriteMetadata> result = table.insertPrepped(context,instantTime, preppedRecords); + return postWrite(result, instantTime, table); + } + + @Override + public List bulkInsert(List> records, + String instantTime) { + throw new HoodieNotSupportedException("BulkInsert is not supported in HoodieJavaClient"); + } + + @Override + public List bulkInsert(List> records, + String instantTime, + Option>>> userDefinedBulkInsertPartitioner) { + throw new HoodieNotSupportedException("BulkInsert is not supported in HoodieJavaClient"); + } + + @Override + public List bulkInsertPreppedRecords(List> preppedRecords, + String instantTime, + Option>>> bulkInsertPartitioner) { + throw new HoodieNotSupportedException("BulkInsertPreppedRecords is not supported in HoodieJavaClient"); + } + + @Override + public List delete(List keys, + String instantTime) { + throw new HoodieNotSupportedException("Delete is not supported in HoodieJavaClient"); + } + + @Override + protected List postWrite(HoodieWriteMetadata> result, + String instantTime, + HoodieTable>, List, List> hoodieTable) { + if (result.getIndexLookupDuration().isPresent()) { + metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis()); + } + if (result.isCommitted()) { + // Perform post commit operations. + if (result.getFinalizeDuration().isPresent()) { + metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(), + result.getWriteStats().get().size()); + } + + postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime, Option.empty()); + + emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType()); + } + return result.getWriteStatuses(); + } + + @Override + public void commitCompaction(String compactionInstantTime, + List writeStatuses, + Option> extraMetadata) throws IOException { + throw new HoodieNotSupportedException("CommitCompaction is not supported in HoodieJavaClient"); + } + + @Override + protected void completeCompaction(HoodieCommitMetadata metadata, + List writeStatuses, + HoodieTable>, List, List> table, + String compactionCommitTime) { + throw new HoodieNotSupportedException("CompleteCompaction is not supported in HoodieJavaClient"); + } + + @Override + protected List compact(String compactionInstantTime, + boolean shouldComplete) { + throw new HoodieNotSupportedException("Compact is not supported in HoodieJavaClient"); + } + + @Override + protected HoodieTable>, List, List> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { + HoodieTableMetaClient metaClient = createMetaClient(true); + // new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime); + return getTableAndInitCtx(metaClient, operationType); + } + + private HoodieTable>, List, List> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) { + if (operationType == WriteOperationType.DELETE) { + setWriteSchemaForDeletes(metaClient); + } + // Create a Hoodie table which encapsulated the commits and files visible + HoodieJavaTable table = HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient); + if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { + writeTimer = metrics.getCommitCtx(); + } else { + writeTimer = metrics.getDeltaCommitCtx(); + } + return table; + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java index a5cbdd1c1bb4a..72663102adba3 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java @@ -43,6 +43,10 @@ */ public class HoodieJavaEngineContext extends HoodieEngineContext { + public HoodieJavaEngineContext(Configuration conf) { + this(conf, new JavaTaskContextSupplier()); + } + public HoodieJavaEngineContext(Configuration conf, TaskContextSupplier taskContextSupplier) { super(new SerializableConfiguration(conf), taskContextSupplier); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java new file mode 100644 index 0000000000000..100d237172709 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/JavaTaskContextSupplier.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.common; + +import org.apache.hudi.common.util.Option; + +import java.util.function.Supplier; + +public class JavaTaskContextSupplier extends TaskContextSupplier { + @Override + public Supplier getPartitionIdSupplier() { + return () -> 0; + } + + @Override + public Supplier getStageIdSupplier() { + return () -> 0; + } + + @Override + public Supplier getAttemptIdSupplier() { + return () -> 0L; + } + + @Override + public Option getProperty(EngineProperty prop) { + return Option.empty(); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java new file mode 100644 index 0000000000000..08c4831d95143 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.TaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; +import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.io.WriteHandleFactory; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; + +import java.util.Iterator; +import java.util.List; + +public class JavaLazyInsertIterable extends HoodieLazyInsertIterable { + public JavaLazyInsertIterable(Iterator> recordItr, + boolean areRecordsSorted, + HoodieWriteConfig config, + String instantTime, + HoodieTable hoodieTable, + String idPrefix, + TaskContextSupplier taskContextSupplier) { + super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier); + } + + public JavaLazyInsertIterable(Iterator> recordItr, + boolean areRecordsSorted, + HoodieWriteConfig config, + String instantTime, + HoodieTable hoodieTable, + String idPrefix, + TaskContextSupplier taskContextSupplier, + WriteHandleFactory writeHandleFactory) { + super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, writeHandleFactory); + } + + @Override + protected List computeNext() { + // Executor service used for launching writer thread. + BoundedInMemoryExecutor, HoodieInsertValueGenResult, List> bufferedIteratorExecutor = + null; + try { + final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); + bufferedIteratorExecutor = + new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema)); + final List result = bufferedIteratorExecutor.execute(); + assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); + return result; + } catch (Exception e) { + throw new HoodieException(e); + } finally { + if (null != bufferedIteratorExecutor) { + bufferedIteratorExecutor.shutdownNow(); + } + } + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java new file mode 100644 index 0000000000000..3cec3fbacd1f0 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndex.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index; + +import org.apache.hudi.ApiMaturityLevel; +import org.apache.hudi.PublicAPIMethod; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.table.HoodieTable; + +import java.util.List; + +public abstract class JavaHoodieIndex extends HoodieIndex>, List, List> { + protected JavaHoodieIndex(HoodieWriteConfig config) { + super(config); + } + + public static JavaHoodieIndex createIndex(HoodieWriteConfig config) { + // first use index class config to create index. + if (!StringUtils.isNullOrEmpty(config.getIndexClass())) { + Object instance = ReflectionUtils.loadClass(config.getIndexClass(), config); + if (!(instance instanceof HoodieIndex)) { + throw new HoodieIndexException(config.getIndexClass() + " is not a subclass of HoodieIndex"); + } + return (JavaHoodieIndex) instance; + } + + // TODO more indexes to be added + switch (config.getIndexType()) { + case INMEMORY: + return new JavaInMemoryHashIndex(config); + default: + throw new HoodieIndexException("Unsupported index type " + config.getIndexType()); + } + } + + @Override + @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) + public abstract List updateLocation(List writeStatuses, + HoodieEngineContext context, + HoodieTable>, List, List> hoodieTable) throws HoodieIndexException; + + @Override + @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) + public abstract List> tagLocation(List> records, + HoodieEngineContext context, + HoodieTable>, List, List> hoodieTable) throws HoodieIndexException; +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaInMemoryHashIndex.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaInMemoryHashIndex.java new file mode 100644 index 0000000000000..e95ee6108a87b --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaInMemoryHashIndex.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +/** + * Hoodie Index implementation backed by an in-memory Hash map. + *

+ * ONLY USE FOR LOCAL TESTING + */ +@SuppressWarnings("checkstyle:LineLength") +public class JavaInMemoryHashIndex extends JavaHoodieIndex { + + private static ConcurrentMap recordLocationMap; + + public JavaInMemoryHashIndex(HoodieWriteConfig config) { + super(config); + synchronized (JavaInMemoryHashIndex.class) { + if (recordLocationMap == null) { + recordLocationMap = new ConcurrentHashMap<>(); + } + } + } + + @Override + public List> tagLocation(List> records, HoodieEngineContext context, + HoodieTable>, List, List> hoodieTable) { + List> taggedRecords = new ArrayList<>(); + records.stream().forEach(record -> { + if (recordLocationMap.containsKey(record.getKey())) { + record.unseal(); + record.setCurrentLocation(recordLocationMap.get(record.getKey())); + record.seal(); + } + taggedRecords.add(record); + }); + return taggedRecords; + } + + @Override + public List updateLocation(List writeStatusList, + HoodieEngineContext context, + HoodieTable>, List, List> hoodieTable) { + return writeStatusList.stream().map(writeStatus -> { + for (HoodieRecord record : writeStatus.getWrittenRecords()) { + if (!writeStatus.isErrored(record.getKey())) { + HoodieKey key = record.getKey(); + Option newLocation = record.getNewLocation(); + if (newLocation.isPresent()) { + recordLocationMap.put(key, newLocation.get()); + } else { + // Delete existing index for a deleted record + recordLocationMap.remove(key); + } + } + } + return writeStatus; + }).collect(Collectors.toList()); + } + + @Override + public boolean rollbackCommit(String instantTime) { + return true; + } + + /** + * Only looks up by recordKey. + */ + @Override + public boolean isGlobal() { + return true; + } + + /** + * Mapping is available in HBase already. + */ + @Override + public boolean canIndexLogFiles() { + return true; + } + + /** + * Index needs to be explicitly updated after storage write. + */ + @Override + public boolean isImplicitWithStorage() { + return false; + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java new file mode 100644 index 0000000000000..7c45b75fb223f --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieSavepointMetadata; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; +import org.apache.hudi.table.action.clean.JavaCleanActionExecutor; +import org.apache.hudi.table.action.commit.JavaInsertCommitActionExecutor; +import org.apache.hudi.table.action.commit.JavaInsertPreppedCommitActionExecutor; +import org.apache.hudi.table.action.commit.JavaUpsertCommitActionExecutor; +import org.apache.hudi.table.action.commit.JavaUpsertPreppedCommitActionExecutor; + +import java.util.List; +import java.util.Map; + +public class HoodieJavaCopyOnWriteTable extends HoodieJavaTable { + protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config, + HoodieEngineContext context, + HoodieTableMetaClient metaClient) { + super(config, context, metaClient); + } + + @Override + public HoodieWriteMetadata> upsert(HoodieEngineContext context, + String instantTime, + List> records) { + return new JavaUpsertCommitActionExecutor<>(context, config, + this, instantTime, records).execute(); + } + + @Override + public HoodieWriteMetadata> insert(HoodieEngineContext context, + String instantTime, + List> records) { + return new JavaInsertCommitActionExecutor<>(context, config, + this, instantTime, records).execute(); + } + + @Override + public HoodieWriteMetadata> bulkInsert(HoodieEngineContext context, + String instantTime, + List> records, + Option>>> bulkInsertPartitioner) { + throw new HoodieNotSupportedException("BulkInsert is not supported yet"); + } + + @Override + public HoodieWriteMetadata> delete(HoodieEngineContext context, + String instantTime, + List keys) { + throw new HoodieNotSupportedException("Delete is not supported yet"); + } + + @Override + public HoodieWriteMetadata> upsertPrepped(HoodieEngineContext context, + String instantTime, + List> preppedRecords) { + return new JavaUpsertPreppedCommitActionExecutor<>((HoodieJavaEngineContext) context, config, + this, instantTime, preppedRecords).execute(); + + } + + @Override + public HoodieWriteMetadata> insertPrepped(HoodieEngineContext context, + String instantTime, + List> preppedRecords) { + return new JavaInsertPreppedCommitActionExecutor<>((HoodieJavaEngineContext) context, config, + this, instantTime, preppedRecords).execute(); + } + + @Override + public HoodieWriteMetadata> bulkInsertPrepped(HoodieEngineContext context, + String instantTime, + List> preppedRecords, + Option>>> bulkInsertPartitioner) { + throw new HoodieNotSupportedException("BulkInsertPrepped is not supported yet"); + } + + @Override + public HoodieWriteMetadata> insertOverwrite(HoodieEngineContext context, + String instantTime, + List> records) { + throw new HoodieNotSupportedException("InsertOverwrite is not supported yet"); + } + + @Override + public HoodieWriteMetadata> insertOverwriteTable(HoodieEngineContext context, + String instantTime, + List> records) { + throw new HoodieNotSupportedException("InsertOverwrite is not supported yet"); + } + + @Override + public Option scheduleCompaction(HoodieEngineContext context, + String instantTime, + Option> extraMetadata) { + throw new HoodieNotSupportedException("ScheduleCompaction is not supported yet"); + } + + @Override + public HoodieWriteMetadata> compact(HoodieEngineContext context, + String compactionInstantTime) { + throw new HoodieNotSupportedException("Compact is not supported yet"); + } + + @Override + public HoodieBootstrapWriteMetadata> bootstrap(HoodieEngineContext context, + Option> extraMetadata) { + throw new HoodieNotSupportedException("Bootstrap is not supported yet"); + } + + @Override + public void rollbackBootstrap(HoodieEngineContext context, + String instantTime) { + throw new HoodieNotSupportedException("RollbackBootstrap is not supported yet"); + } + + @Override + public HoodieCleanMetadata clean(HoodieEngineContext context, + String cleanInstantTime) { + return new JavaCleanActionExecutor(context, config, this, cleanInstantTime).execute(); + } + + @Override + public HoodieRollbackMetadata rollback(HoodieEngineContext context, + String rollbackInstantTime, + HoodieInstant commitInstant, + boolean deleteInstants) { + throw new HoodieNotSupportedException("Rollback is not supported yet"); + } + + @Override + public HoodieSavepointMetadata savepoint(HoodieEngineContext context, + String instantToSavepoint, + String user, + String comment) { + throw new HoodieNotSupportedException("Savepoint is not supported yet"); + } + + @Override + public HoodieRestoreMetadata restore(HoodieEngineContext context, + String restoreInstantTime, + String instantToRestore) { + throw new HoodieNotSupportedException("Restore is not supported yet"); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java new file mode 100644 index 0000000000000..b446abe379c8a --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table; + +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.config.HoodieWriteConfig; + +public class HoodieJavaMergeOnReadTable extends HoodieJavaCopyOnWriteTable { + protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { + super(config, context, metaClient); + } + // TODO not support yet. +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java new file mode 100644 index 0000000000000..60a306561cafb --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.index.JavaHoodieIndex; +import org.apache.hudi.index.HoodieIndex; + +import java.util.List; + +public abstract class HoodieJavaTable + extends HoodieTable>, List, List> { + protected HoodieJavaTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { + super(config, context, metaClient); + } + + public static HoodieJavaTable create(HoodieWriteConfig config, HoodieEngineContext context) { + HoodieTableMetaClient metaClient = new HoodieTableMetaClient( + context.getHadoopConf().get(), + config.getBasePath(), + true, + config.getConsistencyGuardConfig(), + Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())) + ); + return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient); + } + + public static HoodieJavaTable create(HoodieWriteConfig config, + HoodieJavaEngineContext context, + HoodieTableMetaClient metaClient) { + switch (metaClient.getTableType()) { + case COPY_ON_WRITE: + return new HoodieJavaCopyOnWriteTable<>(config, context, metaClient); + case MERGE_ON_READ: + throw new HoodieNotSupportedException("MERGE_ON_READ is not supported yet"); + default: + throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); + } + } + + @Override + protected HoodieIndex>, List, List> getIndex(HoodieWriteConfig config, HoodieEngineContext context) { + return JavaHoodieIndex.createIndex(config); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java new file mode 100644 index 0000000000000..d1626c81db1de --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/clean/JavaCleanActionExecutor.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.clean; + +import org.apache.hudi.avro.model.HoodieActionInstant; +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.HoodieCleanStat; +import org.apache.hudi.common.model.CleanFileInfo; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class JavaCleanActionExecutor extends + BaseCleanActionExecutor>, List, List> { + + private static final Logger LOG = LogManager.getLogger(JavaCleanActionExecutor.class); + + public JavaCleanActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable>, List, List> table, + String instantTime) { + super(context, config, table, instantTime); + } + + @Override + List clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) { + + Iterator> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream() + .flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))).iterator(); + + Stream> partitionCleanStats = + deleteFilesFunc(filesToBeDeletedPerPartition, table) + .collect(Collectors.groupingBy(Pair::getLeft)) + .entrySet().stream() + .map(x -> new ImmutablePair(x.getKey(), x.getValue().stream().map(y -> y.getRight()).reduce(PartitionCleanStat::merge).get())); + + Map partitionCleanStatsMap = partitionCleanStats + .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); + + // Return PartitionCleanStat for each partition passed. + return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> { + PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath) + ? partitionCleanStatsMap.get(partitionPath) + : new PartitionCleanStat(partitionPath); + HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain(); + return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath) + .withEarliestCommitRetained(Option.ofNullable( + actionInstant != null + ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()), + actionInstant.getAction(), actionInstant.getTimestamp()) + : null)) + .withDeletePathPattern(partitionCleanStat.deletePathPatterns()) + .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles()) + .withFailedDeletes(partitionCleanStat.failedDeleteFiles()) + .withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns()) + .withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles()) + .withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles()) + .build(); + }).collect(Collectors.toList()); + } + + private static Stream> deleteFilesFunc(Iterator> iter, HoodieTable table) { + Map partitionCleanStatMap = new HashMap<>(); + FileSystem fs = table.getMetaClient().getFs(); + + while (iter.hasNext()) { + Pair partitionDelFileTuple = iter.next(); + String partitionPath = partitionDelFileTuple.getLeft(); + Path deletePath = new Path(partitionDelFileTuple.getRight().getFilePath()); + String deletePathStr = deletePath.toString(); + Boolean deletedFileResult = null; + try { + deletedFileResult = deleteFileAndGetResult(fs, deletePathStr); + } catch (IOException e) { + LOG.error("Delete file failed"); + } + if (!partitionCleanStatMap.containsKey(partitionPath)) { + partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath)); + } + boolean isBootstrapBasePathFile = partitionDelFileTuple.getRight().isBootstrapBaseFile(); + PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath); + if (isBootstrapBasePathFile) { + // For Bootstrap Base file deletions, store the full file path. + partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true); + partitionCleanStat.addDeletedFileResult(deletePath.toString(), deletedFileResult, true); + } else { + partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false); + partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false); + } + } + return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue())); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java new file mode 100644 index 0000000000000..17b02e8c15259 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.execution.JavaLazyInsertIterable; +import org.apache.hudi.io.CreateHandleFactory; +import org.apache.hudi.io.HoodieMergeHandle; +import org.apache.hudi.io.HoodieSortedMergeHandle; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.WorkloadProfile; +import org.apache.hudi.table.WorkloadStat; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public abstract class BaseJavaCommitActionExecutor extends + BaseCommitActionExecutor>, List, List, HoodieWriteMetadata> { + + private static final Logger LOG = LogManager.getLogger(BaseJavaCommitActionExecutor.class); + + public BaseJavaCommitActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + WriteOperationType operationType) { + super(context, config, table, instantTime, operationType, Option.empty()); + } + + public BaseJavaCommitActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + WriteOperationType operationType, + Option extraMetadata) { + super(context, config, table, instantTime, operationType, extraMetadata); + } + + @Override + public HoodieWriteMetadata> execute(List> inputRecords) { + HoodieWriteMetadata> result = new HoodieWriteMetadata<>(); + + WorkloadProfile profile = null; + if (isWorkloadProfileNeeded()) { + profile = new WorkloadProfile(buildProfile(inputRecords)); + LOG.info("Workload profile :" + profile); + try { + saveWorkloadProfileMetadataToInflight(profile, instantTime); + } catch (Exception e) { + HoodieTableMetaClient metaClient = table.getMetaClient(); + HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime); + try { + if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) { + throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e); + } + } catch (IOException ex) { + LOG.error("Check file exists failed"); + throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex); + } + } + } + + final Partitioner partitioner = getPartitioner(profile); + Map>> partitionedRecords = partition(inputRecords, partitioner); + + List writeStatuses = new LinkedList<>(); + partitionedRecords.forEach((partition, records) -> { + if (WriteOperationType.isChangingRecords(operationType)) { + handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll); + } else { + handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll); + } + }); + updateIndex(writeStatuses, result); + return result; + } + + protected void updateIndex(List writeStatuses, HoodieWriteMetadata> result) { + Instant indexStartTime = Instant.now(); + // Update the index back + List statuses = table.getIndex().updateLocation(writeStatuses, context, table); + result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now())); + result.setWriteStatuses(statuses); + } + + @Override + protected String getCommitActionType() { + return table.getMetaClient().getCommitActionType(); + } + + private Partitioner getPartitioner(WorkloadProfile profile) { + if (WriteOperationType.isChangingRecords(operationType)) { + return getUpsertPartitioner(profile); + } else { + return getInsertPartitioner(profile); + } + } + + private Map>> partition(List> dedupedRecords, Partitioner partitioner) { + Map>, HoodieRecord>>> partitionedMidRecords = dedupedRecords + .stream() + .map(record -> Pair.of(Pair.of(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record)) + .collect(Collectors.groupingBy(x -> partitioner.getPartition(x.getLeft()))); + Map>> results = new LinkedHashMap<>(); + partitionedMidRecords.forEach((key, value) -> results.put(key, value.stream().map(x -> x.getRight()).collect(Collectors.toList()))); + return results; + } + + protected Pair, WorkloadStat> buildProfile(List> inputRecords) { + HashMap partitionPathStatMap = new HashMap<>(); + WorkloadStat globalStat = new WorkloadStat(); + + Map>, Long> partitionLocationCounts = inputRecords + .stream() + .map(record -> Pair.of( + Pair.of(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record)) + .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting())); + + for (Map.Entry>, Long> e : partitionLocationCounts.entrySet()) { + String partitionPath = e.getKey().getLeft(); + Long count = e.getValue(); + Option locOption = e.getKey().getRight(); + + if (!partitionPathStatMap.containsKey(partitionPath)) { + partitionPathStatMap.put(partitionPath, new WorkloadStat()); + } + + if (locOption.isPresent()) { + // update + partitionPathStatMap.get(partitionPath).addUpdates(locOption.get(), count); + globalStat.addUpdates(locOption.get(), count); + } else { + // insert + partitionPathStatMap.get(partitionPath).addInserts(count); + globalStat.addInserts(count); + } + } + return Pair.of(partitionPathStatMap, globalStat); + } + + @Override + protected void commit(Option> extraMetadata, HoodieWriteMetadata> result) { + commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList())); + } + + protected void commit(Option> extraMetadata, HoodieWriteMetadata> result, List writeStats) { + String actionType = getCommitActionType(); + LOG.info("Committing " + instantTime + ", action Type " + actionType); + result.setCommitted(true); + result.setWriteStats(writeStats); + // Finalize write + finalizeWrite(instantTime, writeStats, result); + + try { + LOG.info("Committing " + instantTime + ", action Type " + getCommitActionType()); + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(), + extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType()); + + activeTimeline.saveAsComplete(new HoodieInstant(true, getCommitActionType(), instantTime), + Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + LOG.info("Committed " + instantTime); + result.setCommitMetadata(Option.of(metadata)); + } catch (IOException e) { + throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, + e); + } + } + + protected Map> getPartitionToReplacedFileIds(List writeStatuses) { + return Collections.emptyMap(); + } + + @Override + protected boolean isWorkloadProfileNeeded() { + return true; + } + + @SuppressWarnings("unchecked") + protected Iterator> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr, + Partitioner partitioner) { + UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner; + BucketInfo binfo = upsertPartitioner.getBucketInfo(partition); + BucketType btype = binfo.bucketType; + try { + if (btype.equals(BucketType.INSERT)) { + return handleInsert(binfo.fileIdPrefix, recordItr); + } else if (btype.equals(BucketType.UPDATE)) { + return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr); + } else { + throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition); + } + } catch (Throwable t) { + String msg = "Error upserting bucketType " + btype + " for partition :" + partition; + LOG.error(msg, t); + throw new HoodieUpsertException(msg, t); + } + } + + protected Iterator> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr, + Partitioner partitioner) { + return handleUpsertPartition(instantTime, partition, recordItr, partitioner); + } + + @Override + public Iterator> handleUpdate(String partitionPath, String fileId, + Iterator> recordItr) + throws IOException { + // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records + if (!recordItr.hasNext()) { + LOG.info("Empty partition with fileId => " + fileId); + return Collections.singletonList((List) Collections.EMPTY_LIST).iterator(); + } + // these are updates + HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, recordItr); + return handleUpdateInternal(upsertHandle, fileId); + } + + protected Iterator> handleUpdateInternal(HoodieMergeHandle upsertHandle, String fileId) + throws IOException { + if (upsertHandle.getOldFilePath() == null) { + throw new HoodieUpsertException( + "Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId); + } else { + JavaMergeHelper.newInstance().runMerge(table, upsertHandle); + } + + // TODO(vc): This needs to be revisited + if (upsertHandle.getWriteStatus().getPartitionPath() == null) { + LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " + + upsertHandle.getWriteStatus()); + } + return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())).iterator(); + } + + protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator> recordItr) { + if (table.requireSortedRecords()) { + return new HoodieSortedMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier); + } else { + return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier); + } + } + + protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, + Map> keyToNewRecords, + HoodieBaseFile dataFileToBeMerged) { + return new HoodieMergeHandle<>(config, instantTime, table, keyToNewRecords, + partitionPath, fileId, dataFileToBeMerged, taskContextSupplier); + } + + @Override + public Iterator> handleInsert(String idPfx, Iterator> recordItr) + throws Exception { + // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records + if (!recordItr.hasNext()) { + LOG.info("Empty partition"); + return Collections.singletonList((List) Collections.EMPTY_LIST).iterator(); + } + return new JavaLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx, + taskContextSupplier, new CreateHandleFactory<>()); + } + + /** + * Provides a partitioner to perform the upsert operation, based on the workload profile. + */ + public Partitioner getUpsertPartitioner(WorkloadProfile profile) { + if (profile == null) { + throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner."); + } + return new UpsertPartitioner(profile, context, table, config); + } + + /** + * Provides a partitioner to perform the insert operation, based on the workload profile. + */ + public Partitioner getInsertPartitioner(WorkloadProfile profile) { + return getUpsertPartitioner(profile); + } + +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertCommitActionExecutor.java new file mode 100644 index 0000000000000..45cf3d65f190a --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertCommitActionExecutor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.List; + +public class JavaInsertCommitActionExecutor> extends BaseJavaCommitActionExecutor { + + private List> inputRecords; + + public JavaInsertCommitActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + List> inputRecords) { + super(context, config, table, instantTime, WriteOperationType.INSERT); + this.inputRecords = inputRecords; + } + + @Override + public HoodieWriteMetadata> execute() { + return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table, + config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, false); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertPreppedCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertPreppedCommitActionExecutor.java new file mode 100644 index 0000000000000..349cf69dcc30b --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaInsertPreppedCommitActionExecutor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.List; + +public class JavaInsertPreppedCommitActionExecutor> + extends BaseJavaCommitActionExecutor { + + private final List> preppedRecords; + + public JavaInsertPreppedCommitActionExecutor(HoodieJavaEngineContext context, + HoodieWriteConfig config, HoodieTable table, + String instantTime, List> preppedRecords) { + super(context, config, table, instantTime, WriteOperationType.INSERT_PREPPED); + this.preppedRecords = preppedRecords; + } + + @Override + public HoodieWriteMetadata> execute() { + return super.execute(preppedRecords); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java new file mode 100644 index 0000000000000..bd596bea541e1 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaMergeHelper.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; +import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.io.HoodieMergeHandle; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public class JavaMergeHelper extends AbstractMergeHelper>, + List, List> { + + private JavaMergeHelper() { + } + + private static class MergeHelperHolder { + private static final JavaMergeHelper JAVA_MERGE_HELPER = new JavaMergeHelper(); + } + + public static JavaMergeHelper newInstance() { + return JavaMergeHelper.MergeHelperHolder.JAVA_MERGE_HELPER; + } + + @Override + public void runMerge(HoodieTable>, List, List> table, + HoodieMergeHandle>, List, List> upsertHandle) throws IOException { + final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation(); + Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf()); + HoodieMergeHandle>, List, List> mergeHandle = upsertHandle; + HoodieBaseFile baseFile = mergeHandle.baseFileForMerge(); + + final GenericDatumWriter gWriter; + final GenericDatumReader gReader; + Schema readSchema; + if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) { + readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); + gWriter = new GenericDatumWriter<>(readSchema); + gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetafields()); + } else { + gReader = null; + gWriter = null; + readSchema = mergeHandle.getWriterSchemaWithMetafields(); + } + + BoundedInMemoryExecutor wrapper = null; + HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); + try { + final Iterator readerIterator; + if (baseFile.getBootstrapBaseFile().isPresent()) { + readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); + } else { + readerIterator = reader.getRecordIterator(readSchema); + } + + ThreadLocal encoderCache = new ThreadLocal<>(); + ThreadLocal decoderCache = new ThreadLocal<>(); + wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator), + Option.of(new UpdateHandler(mergeHandle)), record -> { + if (!externalSchemaTransformation) { + return record; + } + return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record); + }); + wrapper.execute(); + } catch (Exception e) { + throw new HoodieException(e); + } finally { + if (reader != null) { + reader.close(); + } + mergeHandle.close(); + if (null != wrapper) { + wrapper.shutdownNow(); + } + } + } + +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertCommitActionExecutor.java new file mode 100644 index 0000000000000..cdb252776472c --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertCommitActionExecutor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.List; + +public class JavaUpsertCommitActionExecutor> extends BaseJavaCommitActionExecutor { + + private List> inputRecords; + + public JavaUpsertCommitActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + List> inputRecords) { + super(context, config, table, instantTime, WriteOperationType.UPSERT); + this.inputRecords = inputRecords; + } + + @Override + public HoodieWriteMetadata> execute() { + return JavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table, + config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, true); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPreppedCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPreppedCommitActionExecutor.java new file mode 100644 index 0000000000000..8eea5b5105826 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPreppedCommitActionExecutor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.List; + +public class JavaUpsertPreppedCommitActionExecutor> + extends BaseJavaCommitActionExecutor { + + private final List> preppedRecords; + + public JavaUpsertPreppedCommitActionExecutor(HoodieJavaEngineContext context, + HoodieWriteConfig config, HoodieTable table, + String instantTime, List> preppedRecords) { + super(context, config, table, instantTime, WriteOperationType.UPSERT_PREPPED); + this.preppedRecords = preppedRecords; + } + + @Override + public HoodieWriteMetadata> execute() { + return super.execute(preppedRecords); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java new file mode 100644 index 0000000000000..ec7ea1641a442 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.index.HoodieIndex; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +public class JavaWriteHelper extends AbstractWriteHelper>, + List, List, R> { + + private JavaWriteHelper() { + } + + private static class WriteHelperHolder { + private static final JavaWriteHelper JAVA_WRITE_HELPER = new JavaWriteHelper(); + } + + public static JavaWriteHelper newInstance() { + return WriteHelperHolder.JAVA_WRITE_HELPER; + } + + @Override + public List> deduplicateRecords(List> records, + HoodieIndex>, List, List> index, + int parallelism) { + boolean isIndexingGlobal = index.isGlobal(); + Map>>> keyedRecords = records.stream().map(record -> { + HoodieKey hoodieKey = record.getKey(); + // If index used is global, then records are expected to differ in their partitionPath + Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey; + return Pair.of(key, record); + }).collect(Collectors.groupingBy(Pair::getLeft)); + + return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> { + @SuppressWarnings("unchecked") + T reducedData = (T) rec1.getData().preCombine(rec2.getData()); + // we cannot allow the user to change the key or partitionPath, since that will affect + // everything + // so pick it from one of the records. + return new HoodieRecord(rec1.getKey(), reducedData); + }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList()); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java new file mode 100644 index 0000000000000..4b0fcdf951dfb --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.NumericUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.WorkloadProfile; +import org.apache.hudi.table.WorkloadStat; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Packs incoming records to be upserted, into buckets. + */ +public class UpsertPartitioner> implements Partitioner { + + private static final Logger LOG = LogManager.getLogger(UpsertPartitioner.class); + + /** + * List of all small files to be corrected. + */ + protected List smallFiles = new ArrayList<>(); + /** + * Total number of RDD partitions, is determined by total buckets we want to pack the incoming workload into. + */ + private int totalBuckets = 0; + /** + * Stat for the current workload. Helps in determining inserts, upserts etc. + */ + private WorkloadProfile profile; + /** + * Helps decide which bucket an incoming update should go to. + */ + private HashMap updateLocationToBucket; + /** + * Helps us pack inserts into 1 or more buckets depending on number of incoming records. + */ + private HashMap> partitionPathToInsertBucketInfos; + /** + * Remembers what type each bucket is for later. + */ + private HashMap bucketInfoMap; + + protected final HoodieTable table; + + protected final HoodieWriteConfig config; + + public UpsertPartitioner(WorkloadProfile profile, HoodieEngineContext context, HoodieTable table, + HoodieWriteConfig config) { + updateLocationToBucket = new HashMap<>(); + partitionPathToInsertBucketInfos = new HashMap<>(); + bucketInfoMap = new HashMap<>(); + this.profile = profile; + this.table = table; + this.config = config; + assignUpdates(profile); + assignInserts(profile, context); + + LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n" + + "Partition to insert buckets => " + partitionPathToInsertBucketInfos + ", \n" + + "UpdateLocations mapped to buckets =>" + updateLocationToBucket); + } + + private void assignUpdates(WorkloadProfile profile) { + // each update location gets a partition + Set> partitionStatEntries = profile.getPartitionPathStatMap().entrySet(); + for (Map.Entry partitionStat : partitionStatEntries) { + for (Map.Entry> updateLocEntry : + partitionStat.getValue().getUpdateLocationToCount().entrySet()) { + addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey()); + } + } + } + + private int addUpdateBucket(String partitionPath, String fileIdHint) { + int bucket = totalBuckets; + updateLocationToBucket.put(fileIdHint, bucket); + BucketInfo bucketInfo = new BucketInfo(); + bucketInfo.bucketType = BucketType.UPDATE; + bucketInfo.fileIdPrefix = fileIdHint; + bucketInfo.partitionPath = partitionPath; + bucketInfoMap.put(totalBuckets, bucketInfo); + totalBuckets++; + return bucket; + } + + private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) { + // for new inserts, compute buckets depending on how many records we have for each partition + Set partitionPaths = profile.getPartitionPaths(); + long averageRecordSize = + averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(), + config); + LOG.info("AvgRecordSize => " + averageRecordSize); + + Map> partitionSmallFilesMap = + getSmallFilesForPartitions(new ArrayList(partitionPaths), context); + + for (String partitionPath : partitionPaths) { + WorkloadStat pStat = profile.getWorkloadStat(partitionPath); + if (pStat.getNumInserts() > 0) { + + List smallFiles = partitionSmallFilesMap.get(partitionPath); + this.smallFiles.addAll(smallFiles); + + LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); + + long totalUnassignedInserts = pStat.getNumInserts(); + List bucketNumbers = new ArrayList<>(); + List recordsPerBucket = new ArrayList<>(); + + // first try packing this into one of the smallFiles + for (SmallFile smallFile : smallFiles) { + long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, + totalUnassignedInserts); + if (recordsToAppend > 0 && totalUnassignedInserts > 0) { + // create a new bucket or re-use an existing bucket + int bucket; + if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) { + bucket = updateLocationToBucket.get(smallFile.location.getFileId()); + LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); + } else { + bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId()); + LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); + } + bucketNumbers.add(bucket); + recordsPerBucket.add(recordsToAppend); + totalUnassignedInserts -= recordsToAppend; + } + } + + // if we have anything more, create new insert buckets, like normal + if (totalUnassignedInserts > 0) { + long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize(); + if (config.shouldAutoTuneInsertSplits()) { + insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize; + } + + int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket); + LOG.info("After small file assignment: unassignedInserts => " + totalUnassignedInserts + + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " + insertRecordsPerBucket); + for (int b = 0; b < insertBuckets; b++) { + bucketNumbers.add(totalBuckets); + recordsPerBucket.add(totalUnassignedInserts / insertBuckets); + BucketInfo bucketInfo = new BucketInfo(); + bucketInfo.bucketType = BucketType.INSERT; + bucketInfo.partitionPath = partitionPath; + bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx(); + bucketInfoMap.put(totalBuckets, bucketInfo); + totalBuckets++; + } + } + + // Go over all such buckets, and assign weights as per amount of incoming inserts. + List insertBuckets = new ArrayList<>(); + double curentCumulativeWeight = 0; + for (int i = 0; i < bucketNumbers.size(); i++) { + InsertBucket bkt = new InsertBucket(); + bkt.bucketNumber = bucketNumbers.get(i); + bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts(); + curentCumulativeWeight += bkt.weight; + insertBuckets.add(new InsertBucketCumulativeWeightPair(bkt, curentCumulativeWeight)); + } + LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets); + partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets); + } + } + } + + private Map> getSmallFilesForPartitions(List partitionPaths, HoodieEngineContext context) { + Map> partitionSmallFilesMap = new HashMap<>(); + if (partitionPaths != null && partitionPaths.size() > 0) { + context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions"); + partitionSmallFilesMap = context.mapToPair(partitionPaths, + partitionPath -> new ImmutablePair<>(partitionPath, getSmallFiles(partitionPath)), 0); + } + return partitionSmallFilesMap; + } + + /** + * Returns a list of small files in the given partition path. + */ + protected List getSmallFiles(String partitionPath) { + + // smallFiles only for partitionPath + List smallFileLocations = new ArrayList<>(); + + HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants(); + + if (!commitTimeline.empty()) { // if we have some commits + HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); + List allFiles = table.getBaseFileOnlyView() + .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); + + for (HoodieBaseFile file : allFiles) { + if (file.getFileSize() < config.getParquetSmallFileLimit()) { + String filename = file.getFileName(); + SmallFile sf = new SmallFile(); + sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); + sf.sizeBytes = file.getFileSize(); + smallFileLocations.add(sf); + } + } + } + + return smallFileLocations; + } + + public BucketInfo getBucketInfo(int bucketNumber) { + return bucketInfoMap.get(bucketNumber); + } + + public List getInsertBuckets(String partitionPath) { + return partitionPathToInsertBucketInfos.get(partitionPath); + } + + @Override + public int getNumPartitions() { + return totalBuckets; + } + + @Override + public int getPartition(Object key) { + Pair> keyLocation = + (Pair>) key; + if (keyLocation.getRight().isPresent()) { + HoodieRecordLocation location = keyLocation.getRight().get(); + return updateLocationToBucket.get(location.getFileId()); + } else { + String partitionPath = keyLocation.getLeft().getPartitionPath(); + List targetBuckets = partitionPathToInsertBucketInfos.get(partitionPath); + // pick the target bucket to use based on the weights. + final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts()); + final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation.getLeft().getRecordKey()); + final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts; + + int index = Collections.binarySearch(targetBuckets, new InsertBucketCumulativeWeightPair(new InsertBucket(), r)); + + if (index >= 0) { + return targetBuckets.get(index).getKey().bucketNumber; + } + + if ((-1 * index - 1) < targetBuckets.size()) { + return targetBuckets.get((-1 * index - 1)).getKey().bucketNumber; + } + + // return first one, by default + return targetBuckets.get(0).getKey().bucketNumber; + } + } + + /** + * Obtains the average record size based on records written during previous commits. Used for estimating how many + * records pack into one file. + */ + protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) { + long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate(); + long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit()); + try { + if (!commitTimeline.empty()) { + // Go over the reverse ordered commits to get a more recent estimate of average record size. + Iterator instants = commitTimeline.getReverseOrderedInstants().iterator(); + while (instants.hasNext()) { + HoodieInstant instant = instants.next(); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); + long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); + if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { + avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten); + break; + } + } + } + } catch (Throwable t) { + // make this fail safe. + LOG.error("Error trying to compute average bytes/record ", t); + } + return avgSize; + } +} diff --git a/hudi-examples/pom.xml b/hudi-examples/pom.xml index ba132904e19f1..647b1b6d4e652 100644 --- a/hudi-examples/pom.xml +++ b/hudi-examples/pom.xml @@ -133,6 +133,12 @@ ${project.version} + + org.apache.hudi + hudi-java-client + ${project.version} + + org.apache.hudi hudi-spark-client diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java b/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java index 4a9868bd39fea..71c6408ccb2cd 100644 --- a/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/common/HoodieExampleDataGenerator.java @@ -55,7 +55,7 @@ public class HoodieExampleDataGenerator> { public static final String[] DEFAULT_PARTITION_PATHS = {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}; public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ " - + "{\"name\": \"ts\",\"type\": \"double\"},{\"name\": \"uuid\", \"type\": \"string\"}," + + "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"}," + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"}," + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"}," + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"}," diff --git a/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java new file mode 100644 index 0000000000000..31fccfa7725e8 --- /dev/null +++ b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.java; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.client.HoodieJavaWriteClient; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.examples.common.HoodieExampleDataGenerator; +import org.apache.hudi.index.HoodieIndex; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + + +/** + * Simple examples of #{@link HoodieJavaWriteClient}. + * + * Usage: HoodieWriteClientExample + * and describe root path of hudi and table name + * for example, `HoodieWriteClientExample file:///tmp/hoodie/sample-table hoodie_rt` + */ +public class HoodieJavaWriteClientExample { + + private static final Logger LOG = LogManager.getLogger(HoodieJavaWriteClientExample.class); + + private static String tableType = HoodieTableType.COPY_ON_WRITE.name(); + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: HoodieWriteClientExample "); + System.exit(1); + } + String tablePath = args[0]; + String tableName = args[1]; + + // Generator of some records to be loaded in. + HoodieExampleDataGenerator dataGen = new HoodieExampleDataGenerator<>(); + + Configuration hadoopConf = new Configuration(); + // initialize the table, if not done already + Path path = new Path(tablePath); + FileSystem fs = FSUtils.getFs(tablePath, hadoopConf); + if (!fs.exists(path)) { + HoodieTableMetaClient.initTableType(hadoopConf, tablePath, HoodieTableType.valueOf(tableType), + tableName, HoodieAvroPayload.class.getName()); + } + + // Create the write client to write some records in + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath) + .withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) + .withDeleteParallelism(2).forTable(tableName) + .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build(); + HoodieJavaWriteClient client = + new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg); + + // inserts + String newCommitTime = client.startCommit(); + LOG.info("Starting commit " + newCommitTime); + + List> records = dataGen.generateInserts(newCommitTime, 10); + List> recordsSoFar = new ArrayList<>(records); + List> writeRecords = + recordsSoFar.stream().map(r -> new HoodieRecord(r)).collect(Collectors.toList()); + client.upsert(writeRecords, newCommitTime); + + // updates + newCommitTime = client.startCommit(); + LOG.info("Starting commit " + newCommitTime); + List> toBeUpdated = dataGen.generateUpdates(newCommitTime, 2); + records.addAll(toBeUpdated); + recordsSoFar.addAll(toBeUpdated); + writeRecords = + recordsSoFar.stream().map(r -> new HoodieRecord(r)).collect(Collectors.toList()); + client.upsert(writeRecords, newCommitTime); + + client.close(); + } +}