Skip to content

Commit

Permalink
[improve] Upgrade to Oxia client 0.2.0 (#22663)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored May 7, 2024
1 parent 8167554 commit 09364a9
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 60 deletions.
29 changes: 0 additions & 29 deletions distribution/licenses/LICENSE-Reactive-gRPC.txt

This file was deleted.

9 changes: 2 additions & 7 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,10 @@ The Apache Software License, Version 2.0
* Prometheus
- io.prometheus-simpleclient_httpserver-0.16.0.jar
* Oxia
- io.streamnative.oxia-oxia-client-0.1.6.jar
- io.streamnative.oxia-oxia-client-metrics-api-0.1.6.jar
- io.streamnative.oxia-oxia-client-api-0.2.0.jar
- io.streamnative.oxia-oxia-client-0.2.0.jar
* OpenHFT
- net.openhft-zero-allocation-hashing-0.16.jar
* Project reactor
- io.projectreactor-reactor-core-3.5.2.jar
* Java JSON WebTokens
- io.jsonwebtoken-jjwt-api-0.11.1.jar
- io.jsonwebtoken-jjwt-impl-0.11.1.jar
Expand Down Expand Up @@ -552,9 +550,6 @@ BSD 3-clause "New" or "Revised" License
* JSR305 -- com.google.code.findbugs-jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt
* JLine -- jline-jline-2.14.6.jar -- ../licenses/LICENSE-JLine.txt
* JLine3 -- org.jline-jline-3.21.0.jar -- ../licenses/LICENSE-JLine.txt
* Reactive gRPC
- com.salesforce.servicelibs-reactive-grpc-common-1.2.4.jar -- ../licenses/LICENSE-Reactive-gRPC.txt
- com.salesforce.servicelibs-reactor-grpc-stub-1.2.4.jar -- ../licenses/LICENSE-Reactive-gRPC.txt

BSD 2-Clause License
* HdrHistogram -- org.hdrhistogram-HdrHistogram-2.1.9.jar -- ../licenses/LICENSE-HdrHistogram.txt
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ flexible messaging model and an intuitive client API.</description>
<apache-http-client.version>4.5.13</apache-http-client.version>
<apache-httpcomponents.version>4.4.15</apache-httpcomponents.version>
<jetcd.version>0.7.5</jetcd.version>
<oxia.version>0.1.6</oxia.version>
<oxia.version>0.2.0</oxia.version>
<snakeyaml.version>2.0</snakeyaml.version>
<ant.version>1.10.12</ant.version>
<seancfoley.ipaddress.version>5.3.3</seancfoley.ipaddress.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,23 @@
*/
package org.apache.pulsar.metadata.impl.oxia;

import io.streamnative.oxia.client.OxiaClientBuilder;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.DeleteOption;
import io.streamnative.oxia.client.api.KeyAlreadyExistsException;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.api.OxiaClientBuilder;
import io.streamnative.oxia.client.api.PutOption;
import io.streamnative.oxia.client.api.PutResult;
import io.streamnative.oxia.client.api.UnexpectedVersionIdException;
import io.streamnative.oxia.client.api.Version;
import io.streamnative.oxia.client.api.exceptions.KeyAlreadyExistsException;
import io.streamnative.oxia.client.api.exceptions.UnexpectedVersionIdException;
import java.time.Duration;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -69,7 +72,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
this.synchronizer = Optional.ofNullable(metadataStoreConfig.getSynchronizer());
identity = UUID.randomUUID().toString();
client =
new OxiaClientBuilder(serviceAddress)
OxiaClientBuilder.create(serviceAddress)
.clientIdentifier(identity)
.namespace(namespace)
.sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis()))
Expand Down Expand Up @@ -153,14 +156,14 @@ protected CompletableFuture<Void> storeDelete(String path, Optional<Long> expect
return getChildrenFromStore(path)
.thenCompose(
children -> {
if (children.size() > 0) {
if (!children.isEmpty()) {
return CompletableFuture.failedFuture(
new MetadataStoreException("Key '" + path + "' has children"));
} else {
var delOption =
Set<DeleteOption> delOption =
expectedVersion
.map(DeleteOption::ifVersionIdEquals)
.orElse(DeleteOption.Unconditionally);
.map(v -> Collections.singleton(DeleteOption.IfVersionIdEquals(v)))
.orElse(Collections.emptySet());
CompletableFuture<Boolean> result = client.delete(path, delOption);
return result
.thenCompose(
Expand Down Expand Up @@ -205,20 +208,20 @@ protected CompletableFuture<Stat> storePut(
} else {
actualPath = CompletableFuture.completedFuture(path);
}
var versionCondition =
expectedVersion
.map(
ver -> {
if (ver == -1) {
return PutOption.IfRecordDoesNotExist;
}
return PutOption.ifVersionIdEquals(ver);
})
.orElse(PutOption.Unconditionally);
var putOptions =
options.contains(CreateOption.Ephemeral)
? new PutOption[] {PutOption.AsEphemeralRecord, versionCondition}
: new PutOption[] {versionCondition};
Set<PutOption> putOptions = new HashSet<>();
expectedVersion
.map(
ver -> {
if (ver == -1) {
return PutOption.IfRecordDoesNotExist;
}
return PutOption.IfVersionIdEquals(ver);
})
.ifPresent(putOptions::add);

if (options.contains(CreateOption.Ephemeral)) {
putOptions.add(PutOption.AsEphemeralRecord);
}
return actualPath
.thenCompose(
aPath ->
Expand All @@ -242,6 +245,10 @@ private <T> CompletionStage<T> convertException(Throwable ex) {
}
}

private static final byte[] EMPTY_VALUE = new byte[0];
private static final Set<PutOption> IF_RECORD_DOES_NOT_EXIST =
Collections.singleton(PutOption.IfRecordDoesNotExist);

private CompletableFuture<Void> createParents(String path) {
var parent = parent(path);
if (parent == null || parent.isEmpty()) {
Expand All @@ -254,7 +261,7 @@ private CompletableFuture<Void> createParents(String path) {
return CompletableFuture.completedFuture(null);
} else {
return client
.put(parent, new byte[] {}, PutOption.IfRecordDoesNotExist)
.put(parent, EMPTY_VALUE, IF_RECORD_DOES_NOT_EXIST)
.thenCompose(__ -> createParents(parent));
}
})
Expand Down

0 comments on commit 09364a9

Please sign in to comment.