Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction.lock;

import org.apache.hudi.client.transaction.lock.models.StorageLockData;
import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
import org.apache.hudi.client.transaction.lock.models.LockGetResult;
import org.apache.hudi.client.transaction.lock.models.LockUpdateResult;
import org.apache.hudi.common.util.collection.Pair;

import java.util.function.Supplier;

/**
* Defines a contract for a service which should be able to perform conditional writes to object storage.
* It expects to be interacting with a single lock file per context (table), and will be competing with other instances
* to perform writes, so it should handle these cases accordingly (using conditional writes).
*/
public interface StorageLock extends AutoCloseable {
/**
* Tries once to create or update a lock file.
* @param newLockData The new data to update the lock file with.
* @param previousLockFile The previous lock file, use this to conditionally update the lock file.
* @return A pair containing the result state and the new lock file (if successful)
*/
Pair<LockUpdateResult, StorageLockFile> tryCreateOrUpdateLockFile(
StorageLockData newLockData,
StorageLockFile previousLockFile);

/**
* Tries to create or update a lock file while retrying N times.
* All non pre-condition failure related errors should be retried.
* @param newLockDataSupplier The new data supplier
* @param previousLockFile The previous lock file
* @param retryCount Number of retries to attempt
* @return A pair containing the result state and the new lock file (if successful)
*/
Pair<LockUpdateResult, StorageLockFile> tryCreateOrUpdateLockFileWithRetry(
Supplier<StorageLockData> newLockDataSupplier,
StorageLockFile previousLockFile,
long retryCount);

/**
* Reads the current lock file.
* @return The lock retrieve result and the current lock file if successfully retrieved.
* */
Pair<LockGetResult, StorageLockFile> readCurrentLockFile();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction.lock.models;

public enum LockGetResult {
// Lock file does not exist with code 0
NOT_EXISTS(0),
// Successfully retrieved the lock file with code 1
SUCCESS(1),
// Unable to determine lock state due to transient errors with code 2
UNKNOWN_ERROR(2);

private final int code;

LockGetResult(int code) {
this.code = code;
}

public int getCode() {
return code;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction.lock.models;

public enum LockUpdateResult {
// Lock was successfully created/updated with code 0
SUCCESS(0),
// Another process has modified the lock file (precondition failure) with code 1
ACQUIRED_BY_OTHERS(1),
// Unable to determine lock state due to transient errors with code 2
UNKNOWN_ERROR(2);

private final int code;

LockUpdateResult(int code) {
this.code = code;
}

public int getCode() {
return code;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction.lock.models;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Pojo for conditional writes-based lock provider.
*/
public class StorageLockData {
private final boolean expired;
private final long validUntil;
private final String owner;

/**
* Initializes an object describing a conditionally written lock.
* @param expired Whether the lock is expired.
* @param validUntil The epoch in ms when the lock is expired.
* @param owner The uuid owner of the owner of this lock.
*/
@JsonCreator
public StorageLockData(
@JsonProperty(value = "expired", required = true) boolean expired,
@JsonProperty(value = "validUntil", required = true) long validUntil,
@JsonProperty(value = "owner", required = true) String owner) {
this.expired = expired;
this.validUntil = validUntil;
this.owner = owner;
}

/**
* Gets the expiration.
* @return The long representing the expiration in ms.
*/
public long getValidUntil() {
return this.validUntil;
}

/**
* Whether the lock is expired.
* @return True boolean representing whether the lock is expired.
*/
public boolean isExpired() {
return this.expired;
}

/**
* The owner.
* @return A string representing the uuid of the owner of this lock.
*/
public String getOwner() {
return this.owner;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction.lock.models;

import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class StorageLockFile {

private final StorageLockData data;
private final String versionId;

// Get a custom object mapper. See ConditionalWriteLockData for required properties.
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
// This allows us to add new properties down the line.
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
// Should not let validUntil or expired be null.
.enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);

/**
* Initializes a ConditionalWriteLockFile using the data and unique versionId.
*
* @param data The data in the lock file.
* @param versionId The version of this lock file. Used to ensure consistency through conditional writes.
*/
public StorageLockFile(StorageLockData data, String versionId) {
ValidationUtils.checkArgument(data != null, "Data must not be null.");
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(versionId), "VersionId must not be null or empty.");
this.data = data;
this.versionId = versionId;
}

/**
* Factory method to load an input stream into a ConditionalWriteLockFile.
*
* @param inputStream The input stream of the JSON content.
* @param versionId The unique version identifier for the lock file.
* @return A new instance of ConditionalWriteLockFile.
* @throws HoodieIOException If deserialization fails.
*/
public static StorageLockFile createFromStream(InputStream inputStream, String versionId) {
try {
StorageLockData data = OBJECT_MAPPER.readValue(inputStream, StorageLockData.class);
return new StorageLockFile(data, versionId);
} catch (IOException e) {
throw new HoodieIOException("Failed to deserialize JSON content into ConditionalWriteLockData", e);
}
}

/**
* Writes the serialized JSON representation of this object to an output stream.
*
* @param outputStream The output stream to write the JSON to.
* @throws HoodieIOException If serialization fails.
*/
public void writeToStream(OutputStream outputStream) {
try {
OBJECT_MAPPER.writeValue(outputStream, this.data);
} catch (IOException e) {
throw new HoodieIOException("Error writing object to JSON output stream", e);
}
}

/**
* Converts the data to a bytearray. Since we know the payloads will be small this is fine.
* @return A byte array.
* @throws HoodieIOException If serialization fails.
*/
public static byte[] toByteArray(StorageLockData data) {
try {
return OBJECT_MAPPER.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new HoodieIOException("Error writing object to byte array", e);
}
}

/**
* Gets the version id.
* @return A string for the version id.
*/
public String getVersionId() {
return this.versionId;
}

/**
* Gets the expiration time in ms.
* @return A long representing the expiration.
*/
public long getValidUntil() {
return this.data.getValidUntil();
}

/**
* Gets whether the lock is expired.
* @return A boolean representing expired.
*/
public boolean isExpired() {
return this.data.isExpired();
}

/**
* Gets the owner of the lock.
* @return A string for the owner of the lock.
*/
public String getOwner() {
return this.data.getOwner();
}
}
Loading
Loading