Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io;

import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* A HoodieCreateHandleFactory is used to write all data in the spark partition into a single data file.
*
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
public class CreateFixedFileHandleFactory<T extends HoodieRecordPayload, I, K, O> extends WriteHandleFactory<T, I, K, O> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we subclass this from CreateHandleFactory? or call this SingleFileCreateHandleFactory?


private AtomicBoolean isHandleCreated = new AtomicBoolean(false);
private String fileId;

public CreateFixedFileHandleFactory(String fileId) {
super();
this.fileId = fileId;
}

@Override
public HoodieWriteHandle<T, I, K, O> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering why we need this actually. Would n't just passing Long.MAX_VALUE as the target file size, get the create handle to do this?

final HoodieTable<T, I, K, O> hoodieTable, final String partitionPath,
final String fileIdPrefix, TaskContextSupplier taskContextSupplier) {

if (isHandleCreated.compareAndSet(false, true)) {
return new HoodieCreateFixedHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
fileId, // ignore idPfx, always use same fileId
taskContextSupplier);
}

throw new HoodieIOException("Fixed handle create is only expected to be invoked once");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io;

import org.apache.avro.Schema;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.Map;

/**
* A HoodieCreateHandle which writes all data into a single file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HoodieCreateFixedHandle

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is bit of a misnomer. Even HoodieCreateHandle only writes to a single file.

Rename: HoodieUnboundedCreateHandle or something that captures that intent , that this does not respect the sizing aspects.

*
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
public class HoodieCreateFixedHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieCreateHandle<T, I, K, O> {

private static final Logger LOG = LogManager.getLogger(HoodieCreateFixedHandle.class);

public HoodieCreateFixedHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config),
taskContextSupplier);
}

public HoodieCreateFixedHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Pair<Schema, Schema> writerSchemaIncludingAndExcludingMetadataPair,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, partitionPath, fileId, writerSchemaIncludingAndExcludingMetadataPair,
taskContextSupplier);
}

/**
* Called by the compactor code path.
*/
public HoodieCreateFixedHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap,
TaskContextSupplier taskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier);
}

@Override
public boolean canWrite(HoodieRecord record) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just reuse CreateHandle with a large target file size? if we are doing all this for just a specific clustering strategy?

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import org.apache.avro.Schema;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -51,7 +53,7 @@ public ClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engine
* Note that commit is not done as part of strategy. commit is callers responsibility.
*/
public abstract O performClustering(final I inputRecords, final int numOutputGroups, final String instantTime,
final Map<String, String> strategyParams, final Schema schema);
final Map<String, String> strategyParams, final Schema schema, final List<HoodieFileGroupId> inputFileIds);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add javadocs for this method explaining what each param is.


protected HoodieTable<T,I,K, O> getHoodieTable() {
return this.hoodieTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand All @@ -38,6 +39,7 @@
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;

import java.util.List;
import java.util.Map;
import java.util.Properties;

Expand Down Expand Up @@ -66,7 +68,7 @@ public SparkSortAndSizeExecutionStrategy(HoodieSparkMergeOnReadTable<T> table,

@Override
public JavaRDD<WriteStatus> performClustering(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups,
final String instantTime, final Map<String, String> strategyParams, final Schema schema) {
final String instantTime, final Map<String, String> strategyParams, final Schema schema, final List<HoodieFileGroupId> inputFileIds) {
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
Properties props = getWriteConfig().getProps();
props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM, String.valueOf(numOutputGroups));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,20 @@ public SparkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
String idPrefix,
TaskContextSupplier taskContextSupplier,
WriteHandleFactory writeHandleFactory) {
this(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, writeHandleFactory, false);
}

public SparkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
boolean areRecordsSorted,
HoodieWriteConfig config,
String instantTime,
HoodieTable hoodieTable,
String idPrefix,
TaskContextSupplier taskContextSupplier,
WriteHandleFactory writeHandleFactory,
boolean useWriterSchema) {
super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, writeHandleFactory);
this.useWriterSchema = false;
this.useWriterSchema = useWriterSchema;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand Down Expand Up @@ -61,9 +62,11 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -98,7 +101,7 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusRDDStream);
JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);

HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = buildWriteMetadata(writeStatusRDD);
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
JavaRDD<WriteStatus> statuses = updateIndex(writeStatusRDD, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collect());
// validate clustering action before committing result
Expand Down Expand Up @@ -148,9 +151,12 @@ private CompletableFuture<JavaRDD<WriteStatus>> runClusteringForGroupAsync(Hoodi
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> inputRecords = readRecordsForGroup(jsc, clusteringGroup);
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the input file ids are already in the serialized plan? This PR just passes this around additionally?

.map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId()))
.collect(Collectors.toList());
return ((ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(), table, context, config))
.performClustering(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema);
.performClustering(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds);
});

return writeStatusesFuture;
Expand All @@ -163,8 +169,10 @@ protected String getCommitActionType() {

@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan).collect(
Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
Set<HoodieFileGroupId> newFilesWritten = new HashSet(writeStatuses.map(s -> s.getFileId()).collect());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename: newFileIds

return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
.filter(fg -> !newFilesWritten.contains(fg))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry. not following. why do we need this filter?

.collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
}

/**
Expand Down Expand Up @@ -257,12 +265,4 @@ private HoodieRecord<? extends HoodieRecordPayload> transform(IndexedRecord inde
return hoodieRecord;
}

private HoodieWriteMetadata<JavaRDD<WriteStatus>> buildWriteMetadata(JavaRDD<WriteStatus> writeStatusJavaRDD) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was removed, because the constructor does the same job?

HoodieWriteMetadata<JavaRDD<WriteStatus>> result = new HoodieWriteMetadata<>();
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeStatusJavaRDD));
result.setWriteStatuses(writeStatusJavaRDD);
result.setCommitMetadata(Option.empty());
result.setCommitted(false);
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi;

import org.apache.avro.Schema;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.execution.SparkLazyInsertIterable;
import org.apache.hudi.io.CreateFixedFileHandleFactory;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* Sample clustering strategy for testing. This actually doesnt transform data, but simply rewrites the same data
* in a new file.
*/
public class ClusteringIdentityTestExecutionStrategy<T extends HoodieRecordPayload<T>>
extends ClusteringExecutionStrategy<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {

private static final Logger LOG = LogManager.getLogger(ClusteringIdentityTestExecutionStrategy.class);

public ClusteringIdentityTestExecutionStrategy(HoodieSparkCopyOnWriteTable<T> table,
HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}

public ClusteringIdentityTestExecutionStrategy(HoodieSparkMergeOnReadTable<T> table,
HoodieSparkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}

@Override
public JavaRDD<WriteStatus> performClustering(
final JavaRDD<HoodieRecord<T>> inputRecords,
final int numOutputGroups,
final String instantTime,
final Map<String, String> strategyParams,
final Schema schema,
final List<HoodieFileGroupId> inputFileIds) {
if (inputRecords.getNumPartitions() != 1 || inputFileIds.size() != 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if must one fileid, each clustering group should just have one file group? but not see the limit in clustering scheduling

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is enforced by setting group size limit to a small number. See unit test added .withClusteringMaxBytesInGroup(10) // set small number so each file is considered as separate clustering group

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we support a other config such as filegroupLocalSort? Because reuse withClusteringMaxBytesInGroup to set it so small , users may be confuse.

throw new HoodieClusteringException("Expect only one partition for test strategy: " + getClass().getName());
}

Properties props = getWriteConfig().getProps();
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
TaskContextSupplier taskContextSupplier = getEngineContext().getTaskContextSupplier();
final HoodieTable hoodieTable = getHoodieTable();
final String schemaString = schema.toString();

JavaRDD<WriteStatus> writeStatus = inputRecords.mapPartitions(recordItr ->
insertRecords(recordItr, newConfig, instantTime, hoodieTable, schemaString, inputFileIds.get(0).getFileId(), taskContextSupplier))
.flatMap(List::iterator);
return writeStatus;
}

private static <T extends HoodieRecordPayload<T>> Iterator<List<WriteStatus>> insertRecords(final Iterator<HoodieRecord<T>> recordItr,
final HoodieWriteConfig newConfig,
final String instantTime,
final HoodieTable hoodieTable,
final String schema,
final String fileId,
final TaskContextSupplier taskContextSupplier) {

return new SparkLazyInsertIterable(recordItr, true, newConfig, instantTime, hoodieTable,
fileId, taskContextSupplier, new CreateFixedFileHandleFactory(fileId), true);
}
}
Loading