From df20e89a0e276344d2188b0e812b7807c5b1ec16 Mon Sep 17 00:00:00 2001 From: Alexander Rhee Date: Mon, 11 Aug 2025 19:54:56 -0700 Subject: [PATCH] Add lock provider scaffolding --- .../transaction/lock/StorageLockClient.java | 50 ++++++ .../lock/models/LockGetResult.java | 38 ++++ .../lock/models/LockUpsertResult.java | 38 ++++ .../lock/models/StorageLockData.java | 71 ++++++++ .../lock/models/StorageLockFile.java | 133 ++++++++++++++ .../models/StorageLockClientFileTest.java | 165 ++++++++++++++++++ 6 files changed, 495 insertions(+) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java new file mode 100644 index 0000000000000..ac8ff2ed9d0d6 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLockClient.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock; + +import org.apache.hudi.client.transaction.lock.models.LockUpsertResult; +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; + +/** + * Defines a contract for a service which should be able to perform conditional writes to object storage. + * It expects to be interacting with a single lock file per context (table), and will be competing with other instances + * to perform writes, so it should handle these cases accordingly (using conditional writes). + */ +public interface StorageLockClient extends AutoCloseable { + /** + * Tries to create or update a lock file. + * All non pre-condition failure related errors should be returned as UNKNOWN_ERROR. + * @param newLockData The new data to write to the lock file + * @param previousLockFile The previous lock file + * @return A pair containing the result state and the new lock file (if successful) + */ + Pair> tryUpsertLockFile( + StorageLockData newLockData, + Option previousLockFile); + + /** + * Reads the current lock file. + * @return The lock retrieve result and the current lock file if successfully retrieved. + * */ + Pair> readCurrentLockFile(); +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java new file mode 100644 index 0000000000000..68809d244c3d5 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock.models; + +public enum LockGetResult { + // Lock file does not exist with code 0 + NOT_EXISTS(0), + // Successfully retrieved the lock file with code 1 + SUCCESS(1), + // Unable to determine lock state due to transient errors with code 2 + UNKNOWN_ERROR(2); + + private final int code; + + LockGetResult(int code) { + this.code = code; + } + + public int getCode() { + return code; + } +} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java new file mode 100644 index 0000000000000..eec707d4e56d4 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpsertResult.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock.models; + +public enum LockUpsertResult { + // Lock was successfully created/updated with code 0 + SUCCESS(0), + // Another process has modified the lock file (precondition failure) with code 1 + ACQUIRED_BY_OTHERS(1), + // Unable to determine lock state due to transient errors with code 2 + UNKNOWN_ERROR(2); + + private final int code; + + LockUpsertResult(int code) { + this.code = code; + } + + public int getCode() { + return code; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java new file mode 100644 index 0000000000000..01c7d141ebda0 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock.models; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Pojo for conditional writes-based lock provider. + */ +public class StorageLockData { + private final boolean expired; + private final long validUntil; + private final String owner; + + /** + * Initializes an object describing a conditionally written lock. + * @param expired Whether the lock is expired. + * @param validUntil The epoch in ms when the lock is expired. + * @param owner The uuid owner of the owner of this lock. + */ + @JsonCreator + public StorageLockData( + @JsonProperty(value = "expired", required = true) boolean expired, + @JsonProperty(value = "validUntil", required = true) long validUntil, + @JsonProperty(value = "owner", required = true) String owner) { + this.expired = expired; + this.validUntil = validUntil; + this.owner = owner; + } + + /** + * Gets the expiration. + * @return The long representing the expiration in ms. + */ + public long getValidUntil() { + return this.validUntil; + } + + /** + * Whether the lock is expired. + * @return True boolean representing whether the lock is expired. + */ + public boolean isExpired() { + return this.expired; + } + + /** + * The owner. + * @return A string representing the uuid of the owner of this lock. + */ + public String getOwner() { + return this.owner; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java new file mode 100644 index 0000000000000..4929d207d9a44 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock.models; + +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieIOException; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class StorageLockFile { + + private final StorageLockData data; + private final String versionId; + + // Get a custom object mapper. See StorageLockData for required properties. + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + // This allows us to add new properties down the line. + .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + // Should not let validUntil or expired be null. + .enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES); + + /** + * Initializes a StorageLockFile using the data and unique versionId. + * + * @param data The data in the lock file. + * @param versionId The version of this lock file. Used to ensure consistency through conditional writes. + */ + public StorageLockFile(StorageLockData data, String versionId) { + ValidationUtils.checkArgument(data != null, "Data must not be null."); + ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(versionId), "VersionId must not be null or empty."); + this.data = data; + this.versionId = versionId; + } + + /** + * Factory method to load an input stream into a StorageLockFile. + * + * @param inputStream The input stream of the JSON content. + * @param versionId The unique version identifier for the lock file. + * @return A new instance of StorageLockFile. + * @throws HoodieIOException If deserialization fails. + */ + public static StorageLockFile createFromStream(InputStream inputStream, String versionId) { + try { + StorageLockData data = OBJECT_MAPPER.readValue(inputStream, StorageLockData.class); + return new StorageLockFile(data, versionId); + } catch (IOException e) { + throw new HoodieIOException("Failed to deserialize JSON content into StorageLockData", e); + } + } + + /** + * Writes the serialized JSON representation of this object to an output stream. + * + * @param outputStream The output stream to write the JSON to. + * @throws HoodieIOException If serialization fails. + */ + public void writeToStream(OutputStream outputStream) { + try { + OBJECT_MAPPER.writeValue(outputStream, this.data); + } catch (IOException e) { + throw new HoodieIOException("Error writing object to JSON output stream", e); + } + } + + /** + * Converts the data to a bytearray. Since we know the payloads will be small this is fine. + * @return A byte array. + * @throws HoodieIOException If serialization fails. + */ + public static byte[] toByteArray(StorageLockData data) { + try { + return OBJECT_MAPPER.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + throw new HoodieIOException("Error writing object to byte array", e); + } + } + + /** + * Gets the version id. + * @return A string for the version id. + */ + public String getVersionId() { + return this.versionId; + } + + /** + * Gets the expiration time in ms. + * @return A long representing the expiration. + */ + public long getValidUntilMs() { + return this.data.getValidUntil(); + } + + /** + * Gets whether the lock is expired. + * @return A boolean representing expired. + */ + public boolean isExpired() { + return this.data.isExpired(); + } + + /** + * Gets the owner of the lock. + * @return A string for the owner of the lock. + */ + public String getOwner() { + return this.data.getOwner(); + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java new file mode 100644 index 0000000000000..33d3c9bf44ff2 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockClientFileTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock.models; + +import org.apache.hudi.exception.HoodieIOException; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +public class StorageLockClientFileTest { + + private static final String JSON_DATA = "{\"expired\":false,\"validUntil\":1700000000000,\"owner\":\"testOwner\"}"; + private static final String JSON_DATA_EXTRA_FIELD = "{\"expired\":true,\"validUntil\":1600000000000,\"owner\":\"otherOwner\",\"state\":\"active\"}"; + private static final String INVALID_JSON = "{\"invalidField\":123}"; + private static final String VERSION_ID = "testVersionId"; + + private InputStream validJsonStream; + private InputStream extraFieldValidJsonStream; + private InputStream invalidJsonStream; + + @BeforeEach + void setup() { + validJsonStream = new ByteArrayInputStream(JSON_DATA.getBytes()); + extraFieldValidJsonStream = new ByteArrayInputStream(JSON_DATA_EXTRA_FIELD.getBytes()); + invalidJsonStream = new ByteArrayInputStream(INVALID_JSON.getBytes()); + } + + @Test + void testCreateValidInputStream() { + StorageLockFile file = StorageLockFile.createFromStream(validJsonStream, VERSION_ID); + assertEquals(1700000000000L, file.getValidUntilMs()); + assertEquals("testOwner", file.getOwner()); + assertEquals(VERSION_ID, file.getVersionId()); + assertFalse(file.isExpired()); + } + + @Test + void testCreateValidInputStreamExtraField() { + StorageLockFile file = StorageLockFile.createFromStream(extraFieldValidJsonStream, VERSION_ID); + assertEquals(1600000000000L, file.getValidUntilMs()); + assertEquals("otherOwner", file.getOwner()); + assertEquals(VERSION_ID, file.getVersionId()); + assertTrue(file.isExpired()); + } + + @Test + void testCreateInvalidInputStreamFromMock() throws IOException { + InputStream mockInputStream = mock(InputStream.class); + + doThrow(new IOException("Simulated IOException")) + .when(mockInputStream) + .read(); + HoodieIOException exception = assertThrows(HoodieIOException.class, () -> StorageLockFile.createFromStream(mockInputStream, "versionId")); + assertTrue(exception.getMessage().contains("Failed to deserialize")); + } + + @Test + void testCreateInvalidInputStreamFromBadData() { + HoodieIOException exception = assertThrows(HoodieIOException.class, () -> + StorageLockFile.createFromStream(invalidJsonStream, VERSION_ID) + ); + assertTrue(exception.getMessage().contains("Failed to deserialize")); + } + + @Test + void testCreateNullData() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> + new StorageLockFile(null, VERSION_ID) + ); + assertTrue(exception.getMessage().contains("Data must not be null")); + } + + @Test + void testCreateNullVersionId() { + StorageLockData data = new StorageLockData(true, 1700000000000L, "testOwner"); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> + new StorageLockFile(data, null) + ); + assertTrue(exception.getMessage().contains("VersionId must not be null or empty.")); + exception = assertThrows(IllegalArgumentException.class, () -> + new StorageLockFile(data, "") + ); + assertTrue(exception.getMessage().contains("VersionId must not be null or empty.")); + } + + @Test + void testToJsonStreamValidData() { + StorageLockFile file = StorageLockFile.createFromStream(validJsonStream, VERSION_ID); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + file.writeToStream(outputStream); + String outputJson = new String(outputStream.toByteArray()); + assertTrue(outputJson.contains("\"expired\":false")); + assertTrue(outputJson.contains("\"validUntil\":1700000000000")); + assertTrue(outputJson.contains("\"owner\":\"testOwner\"")); + } + + @Test + void testToJsonStreamErrorHandling() throws IOException { + OutputStream mockOutputStream = mock(OutputStream.class); + + doThrow(new IOException("Simulated IOException")) + .when(mockOutputStream) + .write(any(byte[].class), anyInt(), anyInt()); + StorageLockFile file = new StorageLockFile( + new StorageLockData(true, System.currentTimeMillis() + 1000, "testOwner"), + VERSION_ID); + + HoodieIOException exception = assertThrows(HoodieIOException.class, () -> file.writeToStream(mockOutputStream)); + assertTrue(exception.getMessage().contains("Error writing object to JSON")); + } + + @Test + void testToByteArrayValidData() { + StorageLockData data = new StorageLockData(false, 1700000000000L, "testOwner"); + String outputJson = new String(StorageLockFile.toByteArray(data)); + assertTrue(outputJson.contains("\"expired\":false")); + assertTrue(outputJson.contains("\"validUntil\":1700000000000")); + assertTrue(outputJson.contains("\"owner\":\"testOwner\"")); + } + + @Test + void testIsExpired() { + StorageLockData data = new StorageLockData(true, System.currentTimeMillis() - 1000, "testOwner"); + StorageLockFile file = new StorageLockFile(data, VERSION_ID); + assertTrue(file.isExpired()); + } + + @Test + void testGetVersionId() { + StorageLockData data = new StorageLockData(false, 1700000000000L, "testOwner"); + StorageLockFile file = new StorageLockFile(data, VERSION_ID); + assertEquals(VERSION_ID, file.getVersionId()); + } +}