Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends

private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class);

private final HoodieFileWriter<IndexedRecord> fileWriter;
private final Path path;
private long recordsWritten = 0;
private long insertRecordsWritten = 0;
private long recordsDeleted = 0;
protected final HoodieFileWriter<IndexedRecord> fileWriter;
protected final Path path;
protected long recordsWritten = 0;
protected long insertRecordsWritten = 0;
protected long recordsDeleted = 0;
private Map<String, HoodieRecord<T>> recordMap;
private boolean useWriterSchema = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
protected HoodieFileWriter<IndexedRecord> fileWriter;

protected Path newFilePath;
private Path oldFilePath;
protected Path oldFilePath;
protected long recordsWritten = 0;
private long recordsDeleted = 0;
private long updatedRecordsWritten = 0;
protected long recordsDeleted = 0;
protected long updatedRecordsWritten = 0;
protected long insertRecordsWritten = 0;
protected boolean useWriterSchema;
private HoodieBaseFile baseFileToMerge;
Expand Down Expand Up @@ -132,6 +132,13 @@ public Schema getWriterSchema() {
return writerSchema;
}

/**
* Returns the data file name.
*/
protected String generatesDataFileName() {
return FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension());
}

/**
* Extract old file path, initialize StorageWriter and WriteStatus.
*/
Expand All @@ -149,7 +156,7 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
partitionMetadata.trySave(getPartitionId());

oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath);
String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension());
String newFileName = generatesDataFileName();
String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
+ newFileName).toString();
newFilePath = new Path(config.getBasePath(), relativePath);
Expand Down Expand Up @@ -177,18 +184,25 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
}

/**
* Load the new incoming records in a map and return partitionPath.
* Initialize a spillable map for incoming records.
*/
private void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
protected void initializeIncomingRecordsMap() {
try {
// Load the new records in a map
long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps());
LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema));
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema));
} catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
}
}

/**
* Load the new incoming records in a map and return partitionPath.
*/
protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
initializeIncomingRecordsMap();
while (newRecordsItr.hasNext()) {
HoodieRecord<T> record = newRecordsItr.next();
// update the new location of the record, so we know where to find it next
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
Expand All @@ -39,6 +40,10 @@
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndex;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.FlinkCreateHandle;
import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
Expand All @@ -50,6 +55,7 @@

import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -58,12 +64,19 @@
public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
AbstractHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {

/**
* FileID to write handle mapping in order to record the write handles for each file group,
* so that we can append the mini-batch data buffer incrementally.
*/
private Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles;

public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
super(context, clientConfig);
this(context, clientConfig, false);
}

public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) {
super(context, writeConfig, rollbackPending);
this.bucketToHandles = new HashMap<>();
}

public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending,
Expand Down Expand Up @@ -111,7 +124,23 @@ public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTim
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT);
HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context, instantTime, records);
final HoodieRecord<T> record = records.get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use first record' bucket type? if first record is I and the second one is U, th code will use FlinkCreateHandle to process all records

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is the logic we want, because we try to merge records into small buckets.

Copy link
Contributor

@hk-lrzy hk-lrzy Feb 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we also need to filter dupicate records on the query side ?

final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId();
final boolean isInsert = loc.getInstantTime().equals("I");
final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this final?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, once created for a bucket, never change.

if (bucketToHandles.containsKey(fileID)) {
writeHandle = bucketToHandles.get(fileID);
} else {
// create the write handle if not exists
writeHandle = isInsert
? new FlinkCreateHandle<>(getConfig(), instantTime, table, record.getPartitionPath(),
fileID, table.getTaskContextSupplier())
: new FlinkMergeHandle<>(getConfig(), instantTime, table, records.listIterator(), record.getPartitionPath(),
fileID, table.getTaskContextSupplier());
bucketToHandles.put(fileID, writeHandle);
}
HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsert(context, writeHandle, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
Expand Down Expand Up @@ -202,6 +231,17 @@ protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatu
return getTableAndInitCtx(metaClient, operationType);
}

/**
* Clean the write handles within a checkpoint interval, this operation
* would close the underneath file handles.
*/
public void cleanHandles() {
this.bucketToHandles.values().forEach(handle -> {
((MiniBatchHandle) handle).finishWrite();
});
this.bucketToHandles.clear();
}

private HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) {
if (operationType == WriteOperationType.DELETE) {
setWriteSchemaForDeletes(metaClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,21 @@
import org.apache.hudi.table.HoodieTable;

/**
* Create handle factory for Flink writer, use the specified fileID directly
* because it is unique anyway.
* Create handle factory for Flink writer, use the specified write handle directly.
*/
public class FlinkCreateHandleFactory<T extends HoodieRecordPayload, I, K, O>
public class ExplicitCreateHandleFactory<T extends HoodieRecordPayload, I, K, O>
extends CreateHandleFactory<T, I, K, O> {
private HoodieWriteHandle<T, I, K, O> writeHandle;

public ExplicitCreateHandleFactory(HoodieWriteHandle<T, I, K, O> writeHandle) {
this.writeHandle = writeHandle;
}

@Override
public HoodieWriteHandle<T, I, K, O> create(
HoodieWriteConfig hoodieConfig, String commitTime,
HoodieTable<T, I, K, O> hoodieTable, String partitionPath,
String fileIdPrefix, TaskContextSupplier taskContextSupplier) {
return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
fileIdPrefix, taskContextSupplier);
return writeHandle;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* A {@link HoodieCreateHandle} that supports create write incrementally(mini-batches).
*
* <p>For the first mini-batch, it initialize and set up the next file path to write,
* but does not close the file writer until all the mini-batches write finish. Each mini-batch
* data are appended to the same file.
*
* @param <T> Payload type
* @param <I> Input type
* @param <K> Key type
* @param <O> Output type
*/
public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
extends HoodieCreateHandle<T, I, K, O> implements MiniBatchHandle {

private static final Logger LOG = LogManager.getLogger(FlinkCreateHandle.class);
private long lastFileSize = 0L;

public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config),
taskContextSupplier);
}

public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Pair<Schema, Schema> writerSchemaIncludingAndExcludingMetadataPair,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, partitionPath, fileId, writerSchemaIncludingAndExcludingMetadataPair,
taskContextSupplier);
}

/**
* Called by the compactor code path.
*/
public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, partitionPath, fileId, recordMap, taskContextSupplier);
}

/**
* Get the incremental write status. In mini-batch write mode,
* this handle would be reused for a checkpoint bucket(the bucket is appended as mini-batches),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this checkpoint bucket representing BucketWriter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It represents a write bucket within a checkpoint. A BucketWriter may hold several bucket write handles.

* thus, after a mini-batch append finish, we do not close the underneath writer but return
* the incremental WriteStatus instead.
*
* @return the incremental write status
*/
private WriteStatus getIncrementalWriteStatus() {
try {
long fileSizeInBytes = FSUtils.getFileSize(fs, path);
setUpWriteStatus(fileSizeInBytes);
// reset the write status
recordsWritten = 0;
recordsDeleted = 0;
insertRecordsWritten = 0;
this.lastFileSize = fileSizeInBytes;
writeStatus.setTotalErrorRecords(0);
return writeStatus;
} catch (IOException e) {
throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
}
}

/**
* Set up the write status.
*
* @param fileSizeInBytes File size in bytes
* @throws IOException if error occurs
*/
private void setUpWriteStatus(long fileSizeInBytes) throws IOException {
HoodieWriteStat stat = new HoodieWriteStat();
stat.setPartitionPath(writeStatus.getPartitionPath());
stat.setNumWrites(recordsWritten);
stat.setNumDeletes(recordsDeleted);
stat.setNumInserts(insertRecordsWritten);
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
stat.setFileId(writeStatus.getFileId());
stat.setPath(new Path(config.getBasePath()), path);
stat.setTotalWriteBytes(fileSizeInBytes - lastFileSize);
stat.setFileSizeInBytes(fileSizeInBytes);
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
runtimeStats.setTotalCreateTime(timer.endTimer());
stat.setRuntimeStats(runtimeStats);
timer = new HoodieTimer().startTimer();
writeStatus.setStat(stat);
}

public void finishWrite() {
LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten);
try {
fileWriter.close();
} catch (IOException e) {
throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
}
}

/**
* Performs actions to durably, persist the current changes and returns a WriteStatus object.
*/
@Override
public List<WriteStatus> close() {
return Collections.singletonList(getIncrementalWriteStatus());
}
}
Loading