Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction.lock;

import org.apache.hudi.client.transaction.lock.models.LockUpsertResult;
import org.apache.hudi.client.transaction.lock.models.StorageLockData;
import org.apache.hudi.client.transaction.lock.models.StorageLockFile;
import org.apache.hudi.client.transaction.lock.models.LockGetResult;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;

/**
* Defines a contract for a service which should be able to perform conditional writes to object storage.
* It expects to be interacting with a single lock file per context (table), and will be competing with other instances
* to perform writes, so it should handle these cases accordingly (using conditional writes).
*/
public interface StorageLockClient extends AutoCloseable {
/**
* Tries to create or update a lock file.
* All non pre-condition failure related errors should be returned as UNKNOWN_ERROR.
* @param newLockData The new data to write to the lock file
* @param previousLockFile The previous lock file
* @return A pair containing the result state and the new lock file (if successful)
*/
Pair<LockUpsertResult, Option<StorageLockFile>> tryUpsertLockFile(
StorageLockData newLockData,
Option<StorageLockFile> previousLockFile);

/**
* Reads the current lock file.
* @return The lock retrieve result and the current lock file if successfully retrieved.
* */
Pair<LockGetResult, Option<StorageLockFile>> readCurrentLockFile();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction.lock.models;

public enum LockGetResult {
// Lock file does not exist with code 0
NOT_EXISTS(0),
// Successfully retrieved the lock file with code 1
SUCCESS(1),
// Unable to determine lock state due to transient errors with code 2
UNKNOWN_ERROR(2);

private final int code;

LockGetResult(int code) {
this.code = code;
}

public int getCode() {
return code;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction.lock.models;

public enum LockUpsertResult {
// Lock was successfully created/updated with code 0
SUCCESS(0),
// Another process has modified the lock file (precondition failure) with code 1
ACQUIRED_BY_OTHERS(1),
// Unable to determine lock state due to transient errors with code 2
UNKNOWN_ERROR(2);

private final int code;

LockUpsertResult(int code) {
this.code = code;
}

public int getCode() {
return code;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction.lock.models;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Pojo for conditional writes-based lock provider.
*/
public class StorageLockData {
private final boolean expired;
private final long validUntil;
private final String owner;

/**
* Initializes an object describing a conditionally written lock.
* @param expired Whether the lock is expired.
* @param validUntil The epoch in ms when the lock is expired.
* @param owner The uuid owner of the owner of this lock.
*/
@JsonCreator
public StorageLockData(
@JsonProperty(value = "expired", required = true) boolean expired,
@JsonProperty(value = "validUntil", required = true) long validUntil,
@JsonProperty(value = "owner", required = true) String owner) {
this.expired = expired;
this.validUntil = validUntil;
this.owner = owner;
}

/**
* Gets the expiration.
* @return The long representing the expiration in ms.
*/
public long getValidUntil() {
return this.validUntil;
}

/**
* Whether the lock is expired.
* @return True boolean representing whether the lock is expired.
*/
public boolean isExpired() {
return this.expired;
}

/**
* The owner.
* @return A string representing the uuid of the owner of this lock.
*/
public String getOwner() {
return this.owner;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.transaction.lock.models;

import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class StorageLockFile {

private final StorageLockData data;
private final String versionId;

// Get a custom object mapper. See StorageLockData for required properties.
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
// This allows us to add new properties down the line.
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
// Should not let validUntil or expired be null.
.enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES);

/**
* Initializes a StorageLockFile using the data and unique versionId.
*
* @param data The data in the lock file.
* @param versionId The version of this lock file. Used to ensure consistency through conditional writes.
*/
public StorageLockFile(StorageLockData data, String versionId) {
ValidationUtils.checkArgument(data != null, "Data must not be null.");
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(versionId), "VersionId must not be null or empty.");
this.data = data;
this.versionId = versionId;
}

/**
* Factory method to load an input stream into a StorageLockFile.
*
* @param inputStream The input stream of the JSON content.
* @param versionId The unique version identifier for the lock file.
* @return A new instance of StorageLockFile.
* @throws HoodieIOException If deserialization fails.
*/
public static StorageLockFile createFromStream(InputStream inputStream, String versionId) {
try {
StorageLockData data = OBJECT_MAPPER.readValue(inputStream, StorageLockData.class);
return new StorageLockFile(data, versionId);
} catch (IOException e) {
throw new HoodieIOException("Failed to deserialize JSON content into StorageLockData", e);
}
}

/**
* Writes the serialized JSON representation of this object to an output stream.
*
* @param outputStream The output stream to write the JSON to.
* @throws HoodieIOException If serialization fails.
*/
public void writeToStream(OutputStream outputStream) {
try {
OBJECT_MAPPER.writeValue(outputStream, this.data);
} catch (IOException e) {
throw new HoodieIOException("Error writing object to JSON output stream", e);
}
}

/**
* Converts the data to a bytearray. Since we know the payloads will be small this is fine.
* @return A byte array.
* @throws HoodieIOException If serialization fails.
*/
public static byte[] toByteArray(StorageLockData data) {
try {
return OBJECT_MAPPER.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new HoodieIOException("Error writing object to byte array", e);
}
}

/**
* Gets the version id.
* @return A string for the version id.
*/
public String getVersionId() {
return this.versionId;
}

/**
* Gets the expiration time in ms.
* @return A long representing the expiration.
*/
public long getValidUntilMs() {
return this.data.getValidUntil();
}

/**
* Gets whether the lock is expired.
* @return A boolean representing expired.
*/
public boolean isExpired() {
return this.data.isExpired();
}

/**
* Gets the owner of the lock.
* @return A string for the owner of the lock.
*/
public String getOwner() {
return this.data.getOwner();
}
}
Loading
Loading