Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem

protected final HoodieWriteConfig config;
protected final HoodieTableMetaClient metaClient;
protected final HoodieIndex<T, I, K, O> index;
protected final HoodieIndex index;
private SerializableConfiguration hadoopConfiguration;
protected final TaskContextSupplier taskContextSupplier;
private final HoodieTableMetadata metadata;
Expand All @@ -123,7 +123,7 @@ protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context, Hoo
this.taskContextSupplier = context.getTaskContextSupplier();
}

protected abstract HoodieIndex<T, I, K, O> getIndex(HoodieWriteConfig config, HoodieEngineContext context);
protected abstract HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext context);

private synchronized FileSystemViewManager getViewManager() {
if (null == viewManager) {
Expand Down Expand Up @@ -345,7 +345,7 @@ public HoodieActiveTimeline getActiveTimeline() {
/**
* Return the index.
*/
public HoodieIndex<T, I, K, O> getIndex() {
public HoodieIndex getIndex() {
return index;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,29 @@ public class HoodieWriteMetadata<O> {
public HoodieWriteMetadata() {
}

public <T> HoodieWriteMetadata<T> clone(T transformedWriteStatuses) {
HoodieWriteMetadata<T> newMetadataInstance = new HoodieWriteMetadata<>();
newMetadataInstance.setWriteStatuses(transformedWriteStatuses);
if (indexLookupDuration.isPresent()) {
newMetadataInstance.setIndexLookupDuration(indexLookupDuration.get());
}
newMetadataInstance.setCommitted(isCommitted);
newMetadataInstance.setCommitMetadata(commitMetadata);
if (writeStats.isPresent()) {
newMetadataInstance.setWriteStats(writeStats.get());
}
if (indexUpdateDuration.isPresent()) {
newMetadataInstance.setIndexUpdateDuration(indexUpdateDuration.get());
}
if (finalizeDuration.isPresent()) {
newMetadataInstance.setFinalizeDuration(finalizeDuration.get());
}
if (partitionToReplaceFileIds.isPresent()) {
newMetadataInstance.setPartitionToReplaceFileIds(partitionToReplaceFileIds.get());
}
return newMetadataInstance;
}

public O getWriteStatuses() {
return writeStatuses;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.table.action.bootstrap;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieData;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BaseCommitHelper;

import java.io.Serializable;
import java.util.Map;

public abstract class BaseBootstrapHelper<T extends HoodieRecordPayload<T>> implements Serializable {
public abstract HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> execute(
HoodieEngineContext context, HoodieTable table, HoodieWriteConfig config,
Option<Map<String, String>> extraMetadata,
BaseCommitHelper<T> commitHelper);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.table.action.bootstrap;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieData;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
import org.apache.hudi.table.action.commit.BaseCommitHelper;

import java.util.Map;

public class BootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseCommitActionExecutor<T, HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>>> {

private final BaseBootstrapHelper<T> bootstrapHelper;

public BootstrapCommitActionExecutor(
HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table,
Option<Map<String, String>> extraMetadata,
BaseCommitHelper<T> commitHelper, BaseBootstrapHelper<T> bootstrapHelper) {
super(context, config, table, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
WriteOperationType.BOOTSTRAP, extraMetadata, commitHelper);
this.bootstrapHelper = bootstrapHelper;
}

@Override
public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> execute() {
return bootstrapHelper.execute(context, table, config, extraMetadata, commitHelper);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.table.action.cluster;

import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;

public abstract class BaseClusteringPlanHelper<T extends HoodieRecordPayload<T>> {
public abstract Option<HoodieClusteringPlan> createClusteringPlan(
HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@

import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieData;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
Expand All @@ -36,24 +40,24 @@
import java.util.Collections;
import java.util.Map;

public abstract class BaseClusteringPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieClusteringPlan>> {
public class ClusteringPlanActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, Option<HoodieClusteringPlan>> {

private final BaseClusteringPlanHelper<T> clusteringPlanHelper;
private final Option<Map<String, String>> extraMetadata;

public BaseClusteringPlanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
Option<Map<String, String>> extraMetadata) {
public ClusteringPlanActionExecutor(
HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, Option<Map<String, String>> extraMetadata,
BaseClusteringPlanHelper<T> clusteringPlanHelper) {
super(context, config, table, instantTime);
this.extraMetadata = extraMetadata;
this.clusteringPlanHelper = clusteringPlanHelper;
}

protected abstract Option<HoodieClusteringPlan> createClusteringPlan();

@Override
public Option<HoodieClusteringPlan> execute() {
Option<HoodieClusteringPlan> planOption = createClusteringPlan();
Option<HoodieClusteringPlan> planOption = clusteringPlanHelper.createClusteringPlan(context, config, table);
if (planOption.isPresent()) {
HoodieInstant clusteringInstant =
new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,33 @@

package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieData;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

public abstract class AbstractBulkInsertHelper<T extends HoodieRecordPayload, I, K, O, R> {
public abstract class BaseBulkInsertHelper<T extends HoodieRecordPayload<T>> {

/**
* Mark instant as inflight, write input records, update index and return result.
*/
public abstract HoodieWriteMetadata<O> bulkInsert(I inputRecords, String instantTime,
HoodieTable<T, I, K, O> table, HoodieWriteConfig config,
BaseCommitActionExecutor<T, I, K, O, R> executor, boolean performDedupe,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner);
public abstract HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsert(
HoodieData<HoodieRecord<T>> inputRecords, String instantTime, HoodieTable table,
HoodieWriteConfig config, boolean performDedupe,
Option<BulkInsertPartitioner<HoodieData<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner,
BaseCommitHelper<T> commitHelper);

/**
* Only write input records. Does not change timeline/index. Return information about new files created.
*/
public abstract O bulkInsert(I inputRecords, String instantTime,
HoodieTable<T, I, K, O> table, HoodieWriteConfig config,
boolean performDedupe,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner,
boolean addMetadataFields,
int parallelism,
boolean preserveMetadata);
public abstract HoodieData<WriteStatus> bulkInsert(
HoodieData<HoodieRecord<T>> inputRecords, String instantTime, HoodieTable table,
HoodieWriteConfig config, boolean performDedupe,
Option<BulkInsertPartitioner<HoodieData<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner,
boolean addMetadataFields, int parallelism, boolean preserveMetadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,22 @@
* under the License.
*/

package org.apache.hudi.table.action.deltacommit;
package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieData;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata;

public abstract class BaseJavaDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseJavaCommitActionExecutor<T> {
import java.util.Map;

public BaseJavaDeltaCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, WriteOperationType operationType) {
super(context, config, table, instantTime, operationType);
}
public abstract class BaseClusteringHelper<T extends HoodieRecordPayload<T>> {
public abstract HoodieWriteMetadata<HoodieData<WriteStatus>> execute(
HoodieEngineContext context, String instantTime, HoodieTable table, HoodieWriteConfig config,
Option<Map<String, String>> extraMetadata,
BaseCommitHelper<T> commitHelper);
}
Loading