diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLock.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLock.java new file mode 100644 index 0000000000000..6ed518039069d --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/StorageLock.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock; + +import org.apache.hudi.client.transaction.lock.models.StorageLockData; +import org.apache.hudi.client.transaction.lock.models.StorageLockFile; +import org.apache.hudi.client.transaction.lock.models.LockGetResult; +import org.apache.hudi.client.transaction.lock.models.LockUpdateResult; +import org.apache.hudi.common.util.collection.Pair; + +import java.util.function.Supplier; + +/** + * Defines a contract for a service which should be able to perform conditional writes to object storage. + * It expects to be interacting with a single lock file per context (table), and will be competing with other instances + * to perform writes, so it should handle these cases accordingly (using conditional writes). + */ +public interface StorageLock extends AutoCloseable { + /** + * Tries once to create or update a lock file. + * @param newLockData The new data to update the lock file with. + * @param previousLockFile The previous lock file, use this to conditionally update the lock file. + * @return A pair containing the result state and the new lock file (if successful) + */ + Pair tryCreateOrUpdateLockFile( + StorageLockData newLockData, + StorageLockFile previousLockFile); + + /** + * Tries to create or update a lock file while retrying N times. + * All non pre-condition failure related errors should be retried. + * @param newLockDataSupplier The new data supplier + * @param previousLockFile The previous lock file + * @param retryCount Number of retries to attempt + * @return A pair containing the result state and the new lock file (if successful) + */ + Pair tryCreateOrUpdateLockFileWithRetry( + Supplier newLockDataSupplier, + StorageLockFile previousLockFile, + long retryCount); + + /** + * Reads the current lock file. + * @return The lock retrieve result and the current lock file if successfully retrieved. + * */ + Pair readCurrentLockFile(); +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java new file mode 100644 index 0000000000000..68809d244c3d5 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockGetResult.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock.models; + +public enum LockGetResult { + // Lock file does not exist with code 0 + NOT_EXISTS(0), + // Successfully retrieved the lock file with code 1 + SUCCESS(1), + // Unable to determine lock state due to transient errors with code 2 + UNKNOWN_ERROR(2); + + private final int code; + + LockGetResult(int code) { + this.code = code; + } + + public int getCode() { + return code; + } +} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpdateResult.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpdateResult.java new file mode 100644 index 0000000000000..b9ee0ad4260ad --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/LockUpdateResult.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock.models; + +public enum LockUpdateResult { + // Lock was successfully created/updated with code 0 + SUCCESS(0), + // Another process has modified the lock file (precondition failure) with code 1 + ACQUIRED_BY_OTHERS(1), + // Unable to determine lock state due to transient errors with code 2 + UNKNOWN_ERROR(2); + + private final int code; + + LockUpdateResult(int code) { + this.code = code; + } + + public int getCode() { + return code; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java new file mode 100644 index 0000000000000..01c7d141ebda0 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockData.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock.models; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Pojo for conditional writes-based lock provider. + */ +public class StorageLockData { + private final boolean expired; + private final long validUntil; + private final String owner; + + /** + * Initializes an object describing a conditionally written lock. + * @param expired Whether the lock is expired. + * @param validUntil The epoch in ms when the lock is expired. + * @param owner The uuid owner of the owner of this lock. + */ + @JsonCreator + public StorageLockData( + @JsonProperty(value = "expired", required = true) boolean expired, + @JsonProperty(value = "validUntil", required = true) long validUntil, + @JsonProperty(value = "owner", required = true) String owner) { + this.expired = expired; + this.validUntil = validUntil; + this.owner = owner; + } + + /** + * Gets the expiration. + * @return The long representing the expiration in ms. + */ + public long getValidUntil() { + return this.validUntil; + } + + /** + * Whether the lock is expired. + * @return True boolean representing whether the lock is expired. + */ + public boolean isExpired() { + return this.expired; + } + + /** + * The owner. + * @return A string representing the uuid of the owner of this lock. + */ + public String getOwner() { + return this.owner; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java new file mode 100644 index 0000000000000..2a045f31aefc3 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/models/StorageLockFile.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock.models; + +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieIOException; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class StorageLockFile { + + private final StorageLockData data; + private final String versionId; + + // Get a custom object mapper. See ConditionalWriteLockData for required properties. + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() + // This allows us to add new properties down the line. + .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + // Should not let validUntil or expired be null. + .enable(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES); + + /** + * Initializes a ConditionalWriteLockFile using the data and unique versionId. + * + * @param data The data in the lock file. + * @param versionId The version of this lock file. Used to ensure consistency through conditional writes. + */ + public StorageLockFile(StorageLockData data, String versionId) { + ValidationUtils.checkArgument(data != null, "Data must not be null."); + ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(versionId), "VersionId must not be null or empty."); + this.data = data; + this.versionId = versionId; + } + + /** + * Factory method to load an input stream into a ConditionalWriteLockFile. + * + * @param inputStream The input stream of the JSON content. + * @param versionId The unique version identifier for the lock file. + * @return A new instance of ConditionalWriteLockFile. + * @throws HoodieIOException If deserialization fails. + */ + public static StorageLockFile createFromStream(InputStream inputStream, String versionId) { + try { + StorageLockData data = OBJECT_MAPPER.readValue(inputStream, StorageLockData.class); + return new StorageLockFile(data, versionId); + } catch (IOException e) { + throw new HoodieIOException("Failed to deserialize JSON content into ConditionalWriteLockData", e); + } + } + + /** + * Writes the serialized JSON representation of this object to an output stream. + * + * @param outputStream The output stream to write the JSON to. + * @throws HoodieIOException If serialization fails. + */ + public void writeToStream(OutputStream outputStream) { + try { + OBJECT_MAPPER.writeValue(outputStream, this.data); + } catch (IOException e) { + throw new HoodieIOException("Error writing object to JSON output stream", e); + } + } + + /** + * Converts the data to a bytearray. Since we know the payloads will be small this is fine. + * @return A byte array. + * @throws HoodieIOException If serialization fails. + */ + public static byte[] toByteArray(StorageLockData data) { + try { + return OBJECT_MAPPER.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + throw new HoodieIOException("Error writing object to byte array", e); + } + } + + /** + * Gets the version id. + * @return A string for the version id. + */ + public String getVersionId() { + return this.versionId; + } + + /** + * Gets the expiration time in ms. + * @return A long representing the expiration. + */ + public long getValidUntil() { + return this.data.getValidUntil(); + } + + /** + * Gets whether the lock is expired. + * @return A boolean representing expired. + */ + public boolean isExpired() { + return this.data.isExpired(); + } + + /** + * Gets the owner of the lock. + * @return A string for the owner of the lock. + */ + public String getOwner() { + return this.data.getOwner(); + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockFileTest.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockFileTest.java new file mode 100644 index 0000000000000..d1dca9dbaa309 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/lock/models/StorageLockFileTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.transaction.lock.models; + +import org.apache.hudi.exception.HoodieIOException; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +public class StorageLockFileTest { + + private static final String JSON_DATA = "{\"expired\":false,\"validUntil\":1700000000000,\"owner\":\"testOwner\"}"; + private static final String JSON_DATA_EXTRA_FIELD = "{\"expired\":true,\"validUntil\":1600000000000,\"owner\":\"otherOwner\",\"state\":\"active\"}"; + private static final String INVALID_JSON = "{\"invalidField\":123}"; + private static final String VERSION_ID = "testVersionId"; + + private InputStream validJsonStream; + private InputStream extraFieldValidJsonStream; + private InputStream invalidJsonStream; + + @BeforeEach + void setup() { + validJsonStream = new ByteArrayInputStream(JSON_DATA.getBytes()); + extraFieldValidJsonStream = new ByteArrayInputStream(JSON_DATA_EXTRA_FIELD.getBytes()); + invalidJsonStream = new ByteArrayInputStream(INVALID_JSON.getBytes()); + } + + @Test + void testCreateValidInputStream() { + StorageLockFile file = StorageLockFile.createFromStream(validJsonStream, VERSION_ID); + assertEquals(1700000000000L, file.getValidUntil()); + assertEquals("testOwner", file.getOwner()); + assertEquals(VERSION_ID, file.getVersionId()); + assertFalse(file.isExpired()); + } + + @Test + void testCreateValidInputStreamExtraField() { + StorageLockFile file = StorageLockFile.createFromStream(extraFieldValidJsonStream, VERSION_ID); + assertEquals(1600000000000L, file.getValidUntil()); + assertEquals("otherOwner", file.getOwner()); + assertEquals(VERSION_ID, file.getVersionId()); + assertTrue(file.isExpired()); + } + + @Test + void testCreateInvalidInputStreamFromMock() throws IOException { + InputStream mockInputStream = mock(InputStream.class); + + doThrow(new IOException("Simulated IOException")) + .when(mockInputStream) + .read(); + HoodieIOException exception = assertThrows(HoodieIOException.class, () -> StorageLockFile.createFromStream(mockInputStream, "versionId")); + assertTrue(exception.getMessage().contains("Failed to deserialize")); + } + + @Test + void testCreateInvalidInputStreamFromBadData() { + HoodieIOException exception = assertThrows(HoodieIOException.class, () -> + StorageLockFile.createFromStream(invalidJsonStream, VERSION_ID) + ); + assertTrue(exception.getMessage().contains("Failed to deserialize")); + } + + @Test + void testCreateNullData() { + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> + new StorageLockFile(null, VERSION_ID) + ); + assertTrue(exception.getMessage().contains("Data must not be null")); + } + + @Test + void testCreateNullVersionId() { + StorageLockData data = new StorageLockData(true, 1700000000000L, "testOwner"); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> + new StorageLockFile(data, null) + ); + assertTrue(exception.getMessage().contains("VersionId must not be null or empty.")); + exception = assertThrows(IllegalArgumentException.class, () -> + new StorageLockFile(data, "") + ); + assertTrue(exception.getMessage().contains("VersionId must not be null or empty.")); + } + + @Test + void testToJsonStreamValidData() { + StorageLockFile file = StorageLockFile.createFromStream(validJsonStream, VERSION_ID); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + file.writeToStream(outputStream); + String outputJson = new String(outputStream.toByteArray()); + assertTrue(outputJson.contains("\"expired\":false")); + assertTrue(outputJson.contains("\"validUntil\":1700000000000")); + assertTrue(outputJson.contains("\"owner\":\"testOwner\"")); + } + + @Test + void testToJsonStreamErrorHandling() throws IOException { + OutputStream mockOutputStream = mock(OutputStream.class); + + doThrow(new IOException("Simulated IOException")) + .when(mockOutputStream) + .write(any(byte[].class), anyInt(), anyInt()); + StorageLockFile file = new StorageLockFile( + new StorageLockData(true, System.currentTimeMillis() + 1000, "testOwner"), + VERSION_ID); + + HoodieIOException exception = assertThrows(HoodieIOException.class, () -> file.writeToStream(mockOutputStream)); + assertTrue(exception.getMessage().contains("Error writing object to JSON")); + } + + @Test + void testToByteArrayValidData() { + StorageLockData data = new StorageLockData(false, 1700000000000L, "testOwner"); + String outputJson = new String(StorageLockFile.toByteArray(data)); + assertTrue(outputJson.contains("\"expired\":false")); + assertTrue(outputJson.contains("\"validUntil\":1700000000000")); + assertTrue(outputJson.contains("\"owner\":\"testOwner\"")); + } + + @Test + void testIsExpired() { + StorageLockData data = new StorageLockData(true, System.currentTimeMillis() - 1000, "testOwner"); + StorageLockFile file = new StorageLockFile(data, VERSION_ID); + assertTrue(file.isExpired()); + } + + @Test + void testGetVersionId() { + StorageLockData data = new StorageLockData(false, 1700000000000L, "testOwner"); + StorageLockFile file = new StorageLockFile(data, VERSION_ID); + assertEquals(VERSION_ID, file.getVersionId()); + } +}