Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ public long addInserts(long numInserts) {
}

public long addUpdates(HoodieRecordLocation location, long numUpdates) {
updateLocationToCount.put(location.getFileId(), Pair.of(location.getInstantTime(), numUpdates));
long accNumUpdates = 0;
if (updateLocationToCount.containsKey(location.getFileId())) {
accNumUpdates = updateLocationToCount.get(location.getFileId()).getRight();
}
updateLocationToCount.put(
location.getFileId(),
Pair.of(location.getInstantTime(), numUpdates + accNumUpdates));
return this.numUpdates += numUpdates;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.table.action.commit;

import java.io.Serializable;
import java.util.Objects;

/**
* Helper class for a bucket's type (INSERT and UPDATE) and its file location.
Expand All @@ -29,6 +30,24 @@ public class BucketInfo implements Serializable {
String fileIdPrefix;
String partitionPath;

public BucketInfo(BucketType bucketType, String fileIdPrefix, String partitionPath) {
this.bucketType = bucketType;
this.fileIdPrefix = fileIdPrefix;
this.partitionPath = partitionPath;
}

public BucketType getBucketType() {
return bucketType;
}

public String getFileIdPrefix() {
return fileIdPrefix;
}

public String getPartitionPath() {
return partitionPath;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("BucketInfo {");
Expand All @@ -38,4 +57,23 @@ public String toString() {
sb.append('}');
return sb.toString();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BucketInfo that = (BucketInfo) o;
return bucketType == that.bucketType
&& fileIdPrefix.equals(that.fileIdPrefix)
&& partitionPath.equals(that.partitionPath);
}

@Override
public int hashCode() {
return Objects.hash(bucketType, fileIdPrefix, partitionPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
Expand Down Expand Up @@ -249,7 +250,17 @@ public String getLastCompletedInstant(String tableType) {
public void deletePendingInstant(String tableType, String instant) {
HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
table.getMetaClient().getActiveTimeline()
.deletePending(new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, instant));
HoodieActiveTimeline activeTimeline = table.getMetaClient().getActiveTimeline();
activeTimeline.deletePending(HoodieInstant.State.INFLIGHT, commitType, instant);
activeTimeline.deletePending(HoodieInstant.State.REQUESTED, commitType, instant);
}

public void transitionRequestedToInflight(String tableType, String inFlightInstant) {
HoodieFlinkTable<T> table = HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitType = CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant);
activeTimeline.transitionRequestedToInflight(requested, Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.FlinkHoodieIndex;
Expand Down Expand Up @@ -62,47 +61,14 @@ public FlinkInMemoryStateIndex(HoodieFlinkEngineContext context, HoodieWriteConf
public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> records,
HoodieEngineContext context,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException {
return context.map(records, record -> {
try {
if (mapState.contains(record.getKey())) {
record.unseal();
record.setCurrentLocation(mapState.get(record.getKey()));
record.seal();
}
} catch (Exception e) {
LOG.error(String.format("Tag record location failed, key = %s, %s", record.getRecordKey(), e.getMessage()));
}
return record;
}, 0);
throw new UnsupportedOperationException("No need to tag location for FlinkInMemoryStateIndex");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangxianghu Do you agree this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 if we do not query the location, how to identify the operation per record is insert or update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the document of BucketAssignerFunction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we add some comments about the indexing here and pointing to BucketAssignerFunction? This will be helpful for others to read the code. It's fine now, we can do this on your next PR.

}

@Override
public List<WriteStatus> updateLocation(List<WriteStatus> writeStatuses,
HoodieEngineContext context,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException {
return context.map(writeStatuses, writeStatus -> {
for (HoodieRecord record : writeStatus.getWrittenRecords()) {
if (!writeStatus.isErrored(record.getKey())) {
HoodieKey key = record.getKey();
Option<HoodieRecordLocation> newLocation = record.getNewLocation();
if (newLocation.isPresent()) {
try {
mapState.put(key, newLocation.get());
} catch (Exception e) {
LOG.error(String.format("Update record location failed, key = %s, %s", record.getRecordKey(), e.getMessage()));
}
} else {
// Delete existing index for a deleted record
try {
mapState.remove(key);
} catch (Exception e) {
LOG.error(String.format("Remove record location failed, key = %s, %s", record.getRecordKey(), e.getMessage()));
}
}
}
}
return writeStatus;
}, 0);
throw new UnsupportedOperationException("No need to update location for FlinkInMemoryStateIndex");
}

@Override
Expand All @@ -128,6 +94,6 @@ public boolean canIndexLogFiles() {
*/
@Override
public boolean isImplicitWithStorage() {
return false;
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io;

import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;

/**
* Create handle factory for Flink writer, use the specified fileID directly
* because it is unique anyway.
*/
public class FlinkCreateHandleFactory<T extends HoodieRecordPayload, I, K, O>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between CreateHandleFactory and FlinkCreateHandleFactory ? It seems they are the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I said the reason in the document, they are different in rolling over new files, FlinkCreateHandleFactory always uses the file handle name that we specified and never roll over with a number suffix.

extends CreateHandleFactory<T, I, K, O> {

@Override
public HoodieWriteHandle<T, I, K, O> create(
HoodieWriteConfig hoodieConfig, String commitTime,
HoodieTable<T, I, K, O> hoodieTable, String partitionPath,
String fileIdPrefix, TaskContextSupplier taskContextSupplier) {
return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
fileIdPrefix, taskContextSupplier);
}
}
Loading