Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
08307d4
Add initial changes for storage-based lock provider abstract implemen…
alexr17 Apr 8, 2025
d61049d
fix formatting
alexr17 Apr 8, 2025
6a999a7
checkstyle
alexr17 Apr 8, 2025
6b2f1f0
more checkstyle
alexr17 Apr 8, 2025
2cda6c8
Update hudi-client/hudi-client-common/src/main/java/org/apache/hudi/c…
alexr17 Apr 8, 2025
96a0978
Update hudi-client/hudi-client-common/src/main/java/org/apache/hudi/c…
alexr17 Apr 8, 2025
697da63
Update hudi-client/hudi-client-common/src/main/java/org/apache/hudi/c…
alexr17 Apr 8, 2025
39d9430
Update hudi-client/hudi-client-common/src/main/java/org/apache/hudi/c…
alexr17 Apr 8, 2025
a465892
Update hudi-client/hudi-client-common/src/main/java/org/apache/hudi/c…
alexr17 Apr 8, 2025
bf66cfc
Update hudi-client/hudi-client-common/src/main/java/org/apache/hudi/c…
alexr17 Apr 8, 2025
6d67370
Update hudi-client/hudi-client-common/src/main/java/org/apache/hudi/c…
alexr17 Apr 8, 2025
95266ea
Apply suggestions from code review
alexr17 Apr 8, 2025
fdd7113
respond to feedback and refactor models
alexr17 Apr 9, 2025
5519af2
update config
alexr17 Apr 9, 2025
6954801
rename class
alexr17 Apr 10, 2025
d5310ed
conditional writes rename
alexr17 Apr 10, 2025
e081f83
add shutdown hook
alexr17 Apr 10, 2025
2d3784b
refactor configs based on feedback
alexr17 Apr 10, 2025
45d1b48
cleanup lp initialization based on feedback
alexr17 Apr 11, 2025
188735a
checkstyle
alexr17 Apr 11, 2025
06d4900
respond to minor feedback
alexr17 Apr 11, 2025
269c57d
convert to milliseconds
alexr17 Apr 11, 2025
2d596de
Address comments
yihua Apr 11, 2025
03f4a6c
Address more comment
yihua Apr 11, 2025
8f21de8
Address nits
yihua Apr 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

/**
* LockProviderHeartbeatManager is a helper class which handles the scheduling and stopping of heartbeat
* tasks. This is intended for use with the conditional write lock provider, which requires
* tasks. This is intended for use with the storage based lock provider, which requires
* a separate thread to spawn and renew the lock repeatedly.
* It should be responsible for the entire lifecycle of the heartbeat task.
* Importantly, a new instance should be created for each lock provider.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ public class StorageLockFile {
private final StorageLockData data;
private final String versionId;

// Get a custom object mapper. See ConditionalWriteLockData for required properties.
// Get a custom object mapper. See StorageLockData for required properties.
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
// This allows us to add new properties down the line.
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
// Should not let validUntil or expired be null.
.enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);

/**
* Initializes a ConditionalWriteLockFile using the data and unique versionId.
* Initializes a StorageLockFile using the data and unique versionId.
*
* @param data The data in the lock file.
* @param versionId The version of this lock file. Used to ensure consistency through conditional writes.
Expand All @@ -56,19 +56,19 @@ public StorageLockFile(StorageLockData data, String versionId) {
}

/**
* Factory method to load an input stream into a ConditionalWriteLockFile.
* Factory method to load an input stream into a StorageLockFile.
*
* @param inputStream The input stream of the JSON content.
* @param versionId The unique version identifier for the lock file.
* @return A new instance of ConditionalWriteLockFile.
* @return A new instance of StorageLockFile.
* @throws HoodieIOException If deserialization fails.
*/
public static StorageLockFile createFromStream(InputStream inputStream, String versionId) {
try {
StorageLockData data = OBJECT_MAPPER.readValue(inputStream, StorageLockData.class);
return new StorageLockFile(data, versionId);
} catch (IOException e) {
throw new HoodieIOException("Failed to deserialize JSON content into ConditionalWriteLockData", e);
throw new HoodieIOException("Failed to deserialize JSON content into StorageLockData", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.config;

import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;

import java.util.concurrent.TimeUnit;

import static org.apache.hudi.common.config.HoodieCommonConfig.BASE_PATH;

public class StorageBasedLockConfig extends HoodieConfig {
private static final String SINCE_VERSION_1_0_2 = "1.0.2";
private static final String STORAGE_BASED_LOCK_PROPERTY_PREFIX = LockConfiguration.LOCK_PREFIX
+ "storage.";

public static final ConfigProperty<Long> VALIDITY_TIMEOUT_SECONDS = ConfigProperty
.key(STORAGE_BASED_LOCK_PROPERTY_PREFIX + "validity.timeout.secs")
.defaultValue(TimeUnit.MINUTES.toSeconds(5))
.markAdvanced()
.sinceVersion(SINCE_VERSION_1_0_2)
.withDocumentation(
"For storage-based lock provider, the amount of time in seconds each new lock is valid for. "
+ "The lock provider will attempt to renew its lock until it successfully extends the lock lease period "
+ "or the validity timeout is reached.");

public static final ConfigProperty<Long> HEARTBEAT_POLL_SECONDS = ConfigProperty
.key(STORAGE_BASED_LOCK_PROPERTY_PREFIX + "heartbeat.poll.secs")
.defaultValue(30L)
.markAdvanced()
.sinceVersion(SINCE_VERSION_1_0_2)
.withDocumentation(
"For storage-based lock provider, the amount of time in seconds to wait before renewing the lock. "
+ "Defaults to 30 seconds.");

public long getValiditySeconds() {
return getLong(VALIDITY_TIMEOUT_SECONDS);
}

public long getHeartbeatPollSeconds() {
return getLong(HEARTBEAT_POLL_SECONDS);
}

public String getHudiTableBasePath() {
return getString(BASE_PATH);
}

public static class Builder {
private final StorageBasedLockConfig lockConfig = new StorageBasedLockConfig();

public StorageBasedLockConfig build() {
lockConfig.setDefaults(StorageBasedLockConfig.class.getName());
return lockConfig;
}

public StorageBasedLockConfig.Builder fromProperties(TypedProperties props) {
lockConfig.getProps().putAll(props);
checkRequiredProps();
return this;
}

private void checkRequiredProps() {
String notExistsMsg = " does not exist!";
if (!lockConfig.contains(BASE_PATH)) {
throw new IllegalArgumentException(BASE_PATH.key() + notExistsMsg);
}
if (lockConfig.getLongOrDefault(VALIDITY_TIMEOUT_SECONDS) < lockConfig.getLongOrDefault(HEARTBEAT_POLL_SECONDS)
* 3) {
throw new IllegalArgumentException(
VALIDITY_TIMEOUT_SECONDS.key() + " should be more than triple " + HEARTBEAT_POLL_SECONDS.key());
}
if (lockConfig.getLongOrDefault(VALIDITY_TIMEOUT_SECONDS) < 5) {
throw new IllegalArgumentException(
VALIDITY_TIMEOUT_SECONDS.key() + " should be greater than or equal to 5 seconds.");
}
if (lockConfig.getLongOrDefault(HEARTBEAT_POLL_SECONDS) < 1) {
throw new IllegalArgumentException(
HEARTBEAT_POLL_SECONDS.key() + " should be greater than or equal to 1 second.");
}
}
}
}
Loading
Loading