Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@
<import>${basedir}/src/main/avro/HoodieClusteringPlan.avsc</import>
<import>${basedir}/src/main/avro/HoodieRequestedReplaceMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieIndexPartitionInfo.avsc</import>
<import>${basedir}/src/main/avro/HoodieIndexPlan.avsc</import>
<import>${basedir}/src/main/avro/HoodieIndexCommitMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieArchivedMetaEntry.avsc</import>
</imports>
</configuration>
Expand Down
8 changes: 8 additions & 0 deletions hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@
"HoodieCommitMetadata"
],
"default": null
},
{
"name":"hoodieIndexCommitMetadata",
"type":[
"null",
"HoodieIndexCommitMetadata"
],
"default": null
}
]
}
51 changes: 51 additions & 0 deletions hudi-common/src/main/avro/HoodieIndexCommitMetadata.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace": "org.apache.hudi.avro.model",
"type": "record",
"name": "HoodieIndexCommitMetadata",
"fields": [
{
"name": "version",
"type": [
"int",
"null"
],
"default": 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the default version be 0 ?

},
{
"name": "operationType",
"type": [
"null",
"string"
],
"default": null
},
{
"name":"partitionToWriteStats",
"type":["null", {
"type":"map",
"values":{
"type":"array",
"items":"HoodieWriteStat"
}
}],
"default": null
}
]
}
48 changes: 48 additions & 0 deletions hudi-common/src/main/avro/HoodieIndexPartitionInfo.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace": "org.apache.hudi.avro.model",
"type": "record",
"name": "HoodieIndexPartitionInfo",
"fields": [
{
"name": "version",
"type": [
"int",
"null"
],
"default": 1
},
{
"name": "metadataPartitionPath",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "indexUptoInstant",
"type": [
"null",
"HoodieInstantInfo"
],
"default": null
}
]
}
43 changes: 43 additions & 0 deletions hudi-common/src/main/avro/HoodieIndexPlan.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace": "org.apache.hudi.avro.model",
"type": "record",
"name": "HoodieIndexPlan",
"fields": [
{
"name": "version",
"type": [
"int",
"null"
],
"default": 1
},
{
"name": "indexPartitionInfos",
"type": [
"null",
{
"type": "array",
"items": "HoodieIndexPartitionInfo"
}
],
"default": null
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public enum WriteOperationType {
INSERT_OVERWRITE_TABLE("insert_overwrite_table"),
// compact
COMPACT("compact"),

INDEX("index"),
// used for old version
UNKNOWN("unknown");

Expand Down Expand Up @@ -86,6 +88,8 @@ public static WriteOperationType fromValue(String value) {
return CLUSTER;
case "compact":
return COMPACT;
case "index":
return INDEX;
case "unknown":
return UNKNOWN;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION,
INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION,
ROLLBACK_EXTENSION, REQUESTED_ROLLBACK_EXTENSION, INFLIGHT_ROLLBACK_EXTENSION,
REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION));
REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION,
REQUESTED_INDEX_COMMIT_EXTENSION, INFLIGHT_INDEX_COMMIT_EXTENSION, INDEX_COMMIT_EXTENSION));
private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
protected HoodieTableMetaClient metaClient;

Expand All @@ -98,7 +99,6 @@ public static String createNewInstantTime() {
return HoodieInstantTimeGenerator.createNewInstantTime(0);
}


/**
* Returns next instant time that adds N milliseconds to current time.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
Expand Down Expand Up @@ -599,6 +599,65 @@ public void saveToRollbackRequested(HoodieInstant instant, Option<byte[]> conten
createFileInMetaPath(instant.getFileName(), content, false);
}

/**
* Transition index instant state from requested to inflight.
*
* @param requestedInstant Inflight Instant
* @return inflight instant
*/
public HoodieInstant transitionIndexRequestedToInflight(HoodieInstant requestedInstant, Option<byte[]> data) {
ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.INDEX_ACTION),
String.format("%s is not equal to %s action", requestedInstant.getAction(), INDEX_ACTION));
ValidationUtils.checkArgument(requestedInstant.isRequested(),
String.format("Instant %s not in requested state", requestedInstant.getTimestamp()));
HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT, INDEX_ACTION, requestedInstant.getTimestamp());
transitionState(requestedInstant, inflightInstant, data);
return inflightInstant;
}

/**
* Transition index instant state from inflight to completed.
* @param inflightInstant Inflight Instant
* @return completed instant
*/
public HoodieInstant transitionIndexInflightToComplete(HoodieInstant inflightInstant, Option<byte[]> data) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.INDEX_ACTION),
String.format("%s is not equal to %s action", inflightInstant.getAction(), INDEX_ACTION));
ValidationUtils.checkArgument(inflightInstant.isInflight(),
String.format("Instant %s not inflight", inflightInstant.getTimestamp()));
HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, INDEX_ACTION, inflightInstant.getTimestamp());
transitionState(inflightInstant, commitInstant, data);
return commitInstant;
}

/**
* Revert index instant state from inflight to requested.
* @param inflightInstant Inflight Instant
* @return requested instant
*/
public HoodieInstant revertIndexInflightToRequested(HoodieInstant inflightInstant) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.INDEX_ACTION),
String.format("%s is not equal to %s action", inflightInstant.getAction(), INDEX_ACTION));
ValidationUtils.checkArgument(inflightInstant.isInflight(),
String.format("Instant %s not inflight", inflightInstant.getTimestamp()));
HoodieInstant requestedInstant = new HoodieInstant(State.REQUESTED, INDEX_ACTION, inflightInstant.getTimestamp());
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
transitionState(inflightInstant, requestedInstant, Option.empty());
} else {
deleteInflight(inflightInstant);
}
return requestedInstant;
}

/**
* Save content for inflight/requested index instant.
*/
public void saveToPendingIndexCommit(HoodieInstant instant, Option<byte[]> content) {
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.INDEX_ACTION),
String.format("%s is not equal to %s action", instant.getAction(), INDEX_ACTION));
createFileInMetaPath(instant.getFileName(), content, false);
}

private void createFileInMetaPath(String filename, Option<byte[]> content, boolean allowOverwrite) {
Path fullPath = new Path(metaClient.getMetaPath(), filename);
if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
*
* @deprecated
*/
public HoodieArchivedTimeline() {}
public HoodieArchivedTimeline() {
}

/**
* This method is only used when this object is deserialized in a spark executor.
Expand Down Expand Up @@ -190,6 +191,8 @@ private Option<String> getMetadataKey(String action) {
return Option.of("hoodieCompactionPlan");
case HoodieTimeline.REPLACE_COMMIT_ACTION:
return Option.of("hoodieReplaceCommitMetadata");
case HoodieTimeline.INDEX_ACTION:
return Option.of("hoodieIndexCommitMetadata");
default:
LOG.error(String.format("Unknown action in metadata (%s)", action));
return Option.empty();
Expand Down Expand Up @@ -328,7 +331,7 @@ private int getArchivedFileSuffix(FileStatus f) {
@Override
public HoodieDefaultTimeline getWriteTimeline() {
// filter in-memory instants
Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION, INDEX_ACTION);
return new HoodieDefaultTimeline(getInstants().filter(i ->
readCommits.keySet().contains(i.getTimestamp()))
.filter(s -> validActions.contains(s.getAction())), details);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public void setInstants(List<HoodieInstant> instants) {
*
* @deprecated
*/
public HoodieDefaultTimeline() {}
public HoodieDefaultTimeline() {
}

@Override
public HoodieTimeline filterInflights() {
Expand Down Expand Up @@ -108,7 +109,7 @@ public HoodieTimeline filterCompletedAndCompactionInstants() {

@Override
public HoodieDefaultTimeline getWriteTimeline() {
Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION, INDEX_ACTION);
return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details);
}

Expand Down Expand Up @@ -181,6 +182,16 @@ public HoodieTimeline filter(Predicate<HoodieInstant> filter) {
return new HoodieDefaultTimeline(instants.stream().filter(filter), details);
}

@Override
public HoodieTimeline filterPendingIndexTimeline() {
return new HoodieDefaultTimeline(instants.stream().filter(s -> s.getAction().equals(INDEX_ACTION) && !s.isCompleted()), details);
}

@Override
public HoodieTimeline filterCompletedIndexTimeline() {
return new HoodieDefaultTimeline(instants.stream().filter(s -> s.getAction().equals(INDEX_ACTION)).filter(HoodieInstant::isCompleted), details);
}

/**
* Get all instants (commits, delta commits) that produce new data, in the active timeline.
*/
Expand All @@ -194,7 +205,7 @@ public HoodieTimeline getCommitsTimeline() {
*/
public HoodieTimeline getAllCommitsTimeline() {
return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION,
CLEAN_ACTION, COMPACTION_ACTION, SAVEPOINT_ACTION, ROLLBACK_ACTION, REPLACE_COMMIT_ACTION));
CLEAN_ACTION, COMPACTION_ACTION, SAVEPOINT_ACTION, ROLLBACK_ACTION, REPLACE_COMMIT_ACTION, INDEX_ACTION));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ public String getFileName() {
return isInflight() ? HoodieTimeline.makeInflightReplaceFileName(timestamp)
: isRequested() ? HoodieTimeline.makeRequestedReplaceFileName(timestamp)
: HoodieTimeline.makeReplaceFileName(timestamp);
} else if (HoodieTimeline.INDEX_ACTION.equals(action)) {
return isInflight() ? HoodieTimeline.makeInflightIndexFileName(timestamp)
: isRequested() ? HoodieTimeline.makeRequestedIndexFileName(timestamp)
: HoodieTimeline.makeIndexCommitFileName(timestamp);
}
throw new IllegalArgumentException("Cannot get file name for unknown action " + action);
}
Expand Down
Loading