Skip to content

Commit

Permalink
[ISSUE apache#4804] Fix SubStreamHandler exception loop by closeOnErr…
Browse files Browse the repository at this point in the history
…or (apache#4807)

* Handle exception loop by closeOnError

* Lombok optimization

* some format optimization

* Avoid closing multiple times

* Remove redundant set null

* Revert "Avoid closing multiple times"

This reverts commit 774397f.

* Use synchronized latch to keep senderOnComplete called once

* Use boolean to prevent latch called by somebody else

* Remove the unique callee/caller close() of onCompleted()
  • Loading branch information
Pil0tXia authored Apr 13, 2024
1 parent a003c03 commit fe2bd7c
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ public EventMeshGrpcConsumer(final EventMeshGrpcClientConfig clientConfig) {
}

public void init() {
this.channel = ManagedChannelBuilder.forAddress(clientConfig.getServerAddr(), clientConfig.getServerPort()).usePlaintext()
.build();
this.channel = ManagedChannelBuilder.forAddress(clientConfig.getServerAddr(), clientConfig.getServerPort()).usePlaintext().build();
this.consumerClient = ConsumerServiceGrpc.newBlockingStub(channel);
this.consumerAsyncClient = ConsumerServiceGrpc.newStub(channel);
this.heartbeatClient = HeartbeatServiceGrpc.newBlockingStub(channel);
Expand Down Expand Up @@ -125,8 +124,8 @@ public void subscribe(final List<SubscriptionItem> subscriptionItems) {

addSubscription(subscriptionItems, SDK_STREAM_URL, GrpcType.STREAM);

CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null,
subscriptionItems);
CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(
clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, subscriptionItems);
synchronized (this) {
if (subStreamHandler == null) {
subStreamHandler = new SubStreamHandler<>(consumerAsyncClient, clientConfig, listener);
Expand All @@ -137,8 +136,8 @@ public void subscribe(final List<SubscriptionItem> subscriptionItems) {
}

private Response subscribeWebhook(List<SubscriptionItem> subscriptionItems, String url) {
final CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE,
url, subscriptionItems);
final CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(
clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, subscriptionItems);
try {
CloudEvent response = consumerClient.subscribe(subscription);
log.info("Received response:{}", response);
Expand Down Expand Up @@ -169,8 +168,8 @@ public Response unsubscribe(final List<SubscriptionItem> subscriptionItems, fina

removeSubscription(subscriptionItems);

final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url,
subscriptionItems);
final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(
clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, subscriptionItems);
try {
final CloudEvent response = consumerClient.unsubscribe(cloudEvent);
log.info("Received response:{}", response);
Expand All @@ -191,8 +190,8 @@ public Response unsubscribe(final List<SubscriptionItem> subscriptionItems) {

removeSubscription(subscriptionItems);

final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null,
subscriptionItems);
final CloudEvent cloudEvent = EventMeshCloudEventBuilder.buildEventSubscription(
clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, null, subscriptionItems);

try {
final CloudEvent response = consumerClient.unsubscribe(cloudEvent);
Expand Down Expand Up @@ -277,14 +276,12 @@ private void resubscribe() {

subscriptionGroup.forEach((url, items) -> {
if (isStreamSub.get()) {
CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE,
url,
items);
CloudEvent subscription = EventMeshCloudEventBuilder.buildEventSubscription(
clientConfig, EventMeshProtocolType.EVENT_MESH_MESSAGE, url, items);
subStreamHandler.sendSubscription(subscription);
} else {
subscribeWebhook(items, url);
}

});
}

Expand All @@ -303,6 +300,7 @@ public void close() {
}
}

@Data
private static class SubscriptionInfo {

private transient SubscriptionItem subscriptionItem;
Expand All @@ -314,25 +312,5 @@ private static class SubscriptionInfo {
this.url = url;
this.grpcType = grpcType;
}

public GrpcType getGrpcType() {
return grpcType;
}

public SubscriptionItem getSubscriptionItem() {
return subscriptionItem;
}

public void setSubscriptionItem(final SubscriptionItem subscriptionItem) {
this.subscriptionItem = subscriptionItem;
}

public String getUrl() {
return url;
}

public void setUrl(final String url) {
this.url = url;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ public void onNext(final CloudEvent message) {
@Override
public void onError(final Throwable t) {
log.error("Received Server side error", t);
close();
}

@Override
public void onCompleted() {
log.info("Finished receiving messages from server.");
close();
}
};
}
Expand Down Expand Up @@ -134,7 +134,6 @@ public void close() {
}

latch.countDown();

log.info("SubStreamHandler closed.");
}

Expand All @@ -145,6 +144,7 @@ private void senderOnNext(final CloudEvent subscription) {
}
} catch (Exception e) {
log.error("StreamObserver Error onNext", e);
close();
}
}

Expand Down

0 comments on commit fe2bd7c

Please sign in to comment.