Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@
<import>${basedir}/src/main/avro/HoodieBootstrapSourceFilePartitionInfo.avsc</import>
<import>${basedir}/src/main/avro/HoodieBootstrapIndexInfo.avsc</import>
<import>${basedir}/src/main/avro/HoodieBootstrapMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieSliceInfo.avsc</import>
<import>${basedir}/src/main/avro/HoodieClusteringGroup.avsc</import>
<import>${basedir}/src/main/avro/HoodieClusteringStrategy.avsc</import>
<import>${basedir}/src/main/avro/HoodieClusteringPlan.avsc</import>
<import>${basedir}/src/main/avro/HoodieRequestedReplaceMetadata.avsc</import>
</imports>
</configuration>
</plugin>
Expand Down
49 changes: 49 additions & 0 deletions hudi-common/src/main/avro/HoodieClusteringGroup.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace":"org.apache.hudi.avro.model",
"type":"record",
"name":"HoodieClusteringGroup",
"type":"record",
"fields":[
{
/* Group of files that needs to merged. All the slices in a group will belong to same partition initially.
* Files of different partitions may be grouped later when we have better on disk layout with indexing support.
*/
"name":"slices",
"type":["null", {
"type":"array",
"items": "HoodieSliceInfo"
}],
"default": null
},
{
"name":"metrics",
"type":["null", {
"type":"map",
"values":"double"
}],
"default": null
},
{
"name":"version",
"type":["int", "null"],
"default": 1
}
]
}
50 changes: 50 additions & 0 deletions hudi-common/src/main/avro/HoodieClusteringPlan.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace":"org.apache.hudi.avro.model",
"type":"record",
"name":"HoodieClusteringPlan",
"fields":[
{
"name":"inputGroups",
"type":["null", {
"type":"array",
"items": "HoodieClusteringGroup"
}],
"default": null
},
{
"name":"strategy",
"type":["HoodieClusteringStrategy", "null"],
"default": null
},
{
"name":"extraMetadata",
"type":["null", {
"type":"map",
"values":"string"
}],
"default": null
},
{
"name":"version",
"type":["int", "null"],
"default": 1
}
]
}
42 changes: 42 additions & 0 deletions hudi-common/src/main/avro/HoodieClusteringStrategy.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace":"org.apache.hudi.avro.model",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a high level, don't think we need a clustering strategy avsc for this, we can just drive this from Reflection and classnames, just like CompactionPlans ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There needs to be coordination between scheduling and executing clustering. For example, when scheduling, we specify parameters such as 'sortColumns', 'targetFileSize' etc. These needs to be passed to async job that clusters data. These parameters are serialized and stored in the strategy/plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameters such as 'sortColumns', 'targetFileSize' etc serialized to the clustering job makes sense to me, my question is around the strategy class name itself being written to the file. Is it just for tracking the strategy or will the async clustering job actually make use of this strategy to read our the strategyParams ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, async strategy job would read the strategy to instantiate the class. That strategy object will be used to write clustered records (Example: strategy may use custom partitioner to respect sortColumns)

"name":"HoodieClusteringStrategy",
"type":"record",
"fields":[
{
"name":"strategyClassName", /* have to be subclass of ClusteringStrategy interface defined in hudi. ClusteringStrategy class include methods like getPartitioner */
"type":["null","string"],
"default": null
},
{
"name":"strategyParams", /* Parameters could be different for different strategies. example, if sorting is needed for the strategy, parameters can contain sortColumns */
"type":["null", {
"type":"map",
"values":"string"
}],
"default": null
},
{
"name":"version",
"type":["int", "null"],
"default": 1
}
]
}
47 changes: 47 additions & 0 deletions hudi-common/src/main/avro/HoodieRequestedReplaceMetadata.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace":"org.apache.hudi.avro.model",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment, just roll up all the datastructures that you need into the HoodieClusteringPlan, is it possible to merge the clustering plan with the insert-overwrite plan ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this model is flexible for future operations. Replace could be used for other operations such as column pruning and many other use cases. Trying to fit metadata for all these special operations into one format may cause problems (InsertOverwrite doesnt really have a plan today. so we have to generate plan just for sake of creating it). We also dont want to introduce other top level commits because its lot of work, so i want to keep this as generic as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping the model flexible sounds fair to me, I'm not following how that is connected to introducing other top level commits, could you please expand ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initially we were considering replacecommit, clusteringcommit as separate things. Only reason not to introduce new commit is to keep it simple as post commit metadata looks very similar. I was trying to say that if these were separate top level commits, this file is not needed.

"type":"record",
"name":"HoodieRequestedReplaceMetadata",
"fields":[
{
"name":"operationType",
"type":["null", "string"],
"default": ""
},
{
"name":"clusteringPlan", /* only set if operationType == clustering" */
"type":["HoodieClusteringPlan", "null"],
"default": null
},
{
"name":"extraMetadata",
"type":["null", {
"type":"map",
"values":"string"
}],
"default": null
},
{
"name":"version",
"type":["int", "null"],
"default": 1
}
]
}
56 changes: 56 additions & 0 deletions hudi-common/src/main/avro/HoodieSliceInfo.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace":"org.apache.hudi.avro.model",
"type":"record",
"name":"HoodieSliceInfo",
"fields":[
{
"name":"dataFilePath",
"type":["null","string"],
"default": null
},
{
"name":"deltaFilePaths",
"type":["null", {
"type":"array",
"items":"string"
}],
"default": null
},
{
"name":"fileId",
"type":["null","string"]
},
{
"name":"partitionPath",
"type":["null","string"],
"default": null
},
{
"name":"bootstrapFilePath",
"type":["null", "string"],
"default": null
},
{
"name":"version",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this as the first field (so no issues with reordering etc) we should always be able to read this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realize we can achieve this by passing writer/reader schema and also there are places where this is not being done uniformly. So treat this more as a nit

"type":["int", "null"],
"default": 1
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public enum WriteOperationType {
BOOTSTRAP("bootstrap"),
// insert overwrite
INSERT_OVERWRITE("insert_overwrite"),
// cluster
CLUSTER("cluster"),
// used for old version
UNKNOWN("unknown");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,14 @@ public void saveToCompactionRequested(HoodieInstant instant, Option<byte[]> cont
createFileInMetaPath(instant.getFileName(), content, overwrite);
}

/**
* Saves content for inflight/requested REPLACE instant.
*/
public void saveToPendingReplaceCommit(HoodieInstant instant, Option<byte[]> content) {
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
createFileInMetaPath(instant.getFileName(), content, false);
}

public void saveToCleanRequested(HoodieInstant instant, Option<byte[]> content) {
ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.CLEAN_ACTION));
ValidationUtils.checkArgument(instant.getState().equals(State.REQUESTED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ public HoodieTimeline getCompletedReplaceTimeline() {
instants.stream().filter(s -> s.getAction().equals(REPLACE_COMMIT_ACTION)).filter(s -> s.isCompleted()), details);
}

@Override
public HoodieTimeline filterPendingReplaceTimeline() {
return new HoodieDefaultTimeline(instants.stream().filter(
s -> s.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) && !s.isCompleted()), details);
}

@Override
public HoodieTimeline filterPendingCompactionTimeline() {
return new HoodieDefaultTimeline(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline filterPendingCompactionTimeline();

/**
* Filter this timeline to just include requested and inflight replacecommit instants.
*/
HoodieTimeline filterPendingReplaceTimeline();


/**
* Create a new Timeline with all the instants after startTs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,20 @@

package org.apache.hudi.common.table.timeline;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
Expand All @@ -31,16 +41,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -115,6 +115,10 @@ public static Option<byte[]> serializeRestoreMetadata(HoodieRestoreMetadata rest
return serializeAvroMetadata(restoreMetadata, HoodieRestoreMetadata.class);
}

public static Option<byte[]> serializeRequestedReplaceMetadata(HoodieRequestedReplaceMetadata clusteringPlan) throws IOException {
return serializeAvroMetadata(clusteringPlan, HoodieRequestedReplaceMetadata.class);
}

public static <T extends SpecificRecordBase> Option<byte[]> serializeAvroMetadata(T metadata, Class<T> clazz)
throws IOException {
DatumWriter<T> datumWriter = new SpecificDatumWriter<>(clazz);
Expand Down Expand Up @@ -146,6 +150,10 @@ public static HoodieSavepointMetadata deserializeHoodieSavepointMetadata(byte[]
return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class);
}

public static HoodieRequestedReplaceMetadata deserializeRequestedReplaceMetadta(byte[] bytes) throws IOException {
return deserializeAvroMetadata(bytes, HoodieRequestedReplaceMetadata.class);
}

public static <T extends SpecificRecordBase> T deserializeAvroMetadata(byte[] bytes, Class<T> clazz)
throws IOException {
DatumReader<T> reader = new SpecificDatumReader<>(clazz);
Expand Down
Loading