Skip to content

Commit

Permalink
[fix][metadata] fixed ephemeral zk put
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Jan 27, 2025
1 parent 6afd414 commit 957cbe4
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ private void internalStorePut(OpPut opPut) {
future.completeExceptionally(getException(Code.BADVERSION, opPut.getPath()));
} else {
// The z-node does not exist, let's create it first
put(opPut.getPath(), opPut.getData(), Optional.of(-1L)).thenAccept(
s -> future.complete(s))
put(opPut.getPath(), opPut.getData(), Optional.of(-1L), opPut.getOptions())
.thenAccept(s -> future.complete(s))
.exceptionally(ex -> {
if (ex.getCause() instanceof BadVersionException) {
// The z-node exist now, let's overwrite it
Expand Down Expand Up @@ -478,7 +478,7 @@ public void close() throws Exception {

private Stat getStat(String path, org.apache.zookeeper.data.Stat zkStat) {
return new Stat(path, zkStat.getVersion(), zkStat.getCtime(), zkStat.getMtime(),
zkStat.getEphemeralOwner() != -1,
zkStat.getEphemeralOwner() != 0,
zkStat.getEphemeralOwner() == zkc.getSessionId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.metadata;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
Expand Down Expand Up @@ -66,4 +67,38 @@ public void sequentialKeys(String provider, Supplier<String> urlSupplier) throws
assertNotEquals(seq1, seq2);
assertTrue(n1 < n2);
}

@Test(dataProvider = "impl")
public void testPersistentOrEphemeralPut(String provider, Supplier<String> urlSupplier) throws Exception {
final String key1 = newKey();
MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
store.put(key1, "value-1".getBytes(), Optional.empty(), EnumSet.noneOf(CreateOption.class)).join();
var value = store.get(key1).join().get();
assertEquals(value.getValue(), "value-1".getBytes());
assertFalse(value.getStat().isEphemeral());
assertTrue(value.getStat().isFirstVersion());
var version = value.getStat().getVersion();

store.put(key1, "value-2".getBytes(), Optional.empty(), EnumSet.noneOf(CreateOption.class)).join();
value = store.get(key1).join().get();
assertEquals(value.getValue(), "value-2".getBytes());
assertFalse(value.getStat().isEphemeral());
assertEquals(value.getStat().getVersion(), version + 1);

final String key2 = newKey();
store.put(key2, "value-4".getBytes(), Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
value = store.get(key2).join().get();
assertEquals(value.getValue(), "value-4".getBytes());
assertTrue(value.getStat().isEphemeral());
assertTrue(value.getStat().isFirstVersion());
version = value.getStat().getVersion();


store.put(key2, "value-5".getBytes(), Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
value = store.get(key2).join().get();
assertEquals(value.getValue(), "value-5".getBytes());
assertTrue(value.getStat().isEphemeral());
assertEquals(value.getStat().getVersion(), version + 1);
}

}

0 comments on commit 957cbe4

Please sign in to comment.