Skip to content

Commit

Permalink
Fix issues in realtime notification
Browse files Browse the repository at this point in the history
  • Loading branch information
Ashi1993 committed Nov 5, 2024
1 parent 2a0ac2c commit 99086fa
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@
import com.wso2.openbanking.accelerator.event.notifications.service.dto.NotificationDTO;
import com.wso2.openbanking.accelerator.event.notifications.service.exceptions.OBEventNotificationException;
import com.wso2.openbanking.accelerator.event.notifications.service.internal.EventNotificationDataHolder;
import com.wso2.openbanking.accelerator.event.notifications.service.model.EventSubscription;
import com.wso2.openbanking.accelerator.event.notifications.service.model.Notification;
import com.wso2.openbanking.accelerator.event.notifications.service.model.NotificationEvent;
import com.wso2.openbanking.accelerator.event.notifications.service.realtime.model.RealtimeEventNotification;
import com.wso2.openbanking.accelerator.event.notifications.service.service.EventNotificationGenerator;
import com.wso2.openbanking.accelerator.event.notifications.service.service.EventPollingService;
import com.wso2.openbanking.accelerator.event.notifications.service.util.EventNotificationServiceUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.identity.oauth2.IdentityOAuth2Exception;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

Expand All @@ -52,23 +55,42 @@ public EventNotificationProducerService(

@Override
public void run() {
String callbackUrl = EventNotificationServiceUtil.getCallbackURL(notificationDTO.getClientId());

LinkedBlockingQueue<RealtimeEventNotification> queue = EventNotificationDataHolder.getInstance().
getRealtimeEventNotificationQueue();
EventNotificationGenerator eventNotificationGenerator = EventNotificationServiceUtil.
getEventNotificationGenerator();
RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification();
realtimeEventNotification.setNotificationDTO(notificationDTO);
realtimeEventNotification.setCallbackUrl(callbackUrl);

try {
Notification notification = eventNotificationGenerator.generateEventNotificationBody(
notificationDTO, notificationEvents);
realtimeEventNotification.setEventSET(eventNotificationGenerator.generateEventNotification(
Notification.getJsonNode(notification)));
List<EventSubscription> subscriptionList = EventNotificationServiceUtil.getEventSubscriptionService()
.getEventSubscriptionsByClientId(notificationDTO.getClientId());
if (CollectionUtils.isEmpty(subscriptionList)) {
throw new OBEventNotificationException("No subscriptions found for the client ID: " +
notificationDTO.getClientId());
}

LinkedBlockingQueue<RealtimeEventNotification> queue = EventNotificationDataHolder.getInstance().
getRealtimeEventNotificationQueue();
EventNotificationGenerator eventNotificationGenerator = EventNotificationServiceUtil.
getEventNotificationGenerator();

for (EventSubscription subscription : subscriptionList) {

List<NotificationEvent> allowedEvents = new ArrayList<>();
notificationEvents.forEach(notificationEvent -> {
if (subscription.getEventTypes().contains(notificationEvent.getEventType())) {
allowedEvents.add(notificationEvent);
}
});

if (!allowedEvents.isEmpty()) {
RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification();
realtimeEventNotification.setNotificationDTO(notificationDTO);
realtimeEventNotification.setCallbackUrl(subscription.getCallbackUrl());

Notification notification = eventNotificationGenerator.generateEventNotificationBody(
notificationDTO, allowedEvents);
realtimeEventNotification.setEventSET(eventNotificationGenerator
.generateEventNotification(Notification.getJsonNode(notification)));

queue.put(realtimeEventNotification); // put the notification into the queue
queue.put(realtimeEventNotification); // put the notification into the queue
}
}
} catch (InterruptedException e) {
log.error("Error when adding the Realtime Notification with notification ID " +
notificationDTO.getNotificationId() + " into the RealtimeEventNotification Queue", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,21 @@
import com.wso2.openbanking.accelerator.event.notifications.service.dto.NotificationDTO;
import com.wso2.openbanking.accelerator.event.notifications.service.exceptions.OBEventNotificationException;
import com.wso2.openbanking.accelerator.event.notifications.service.internal.EventNotificationDataHolder;
import com.wso2.openbanking.accelerator.event.notifications.service.model.EventSubscription;
import com.wso2.openbanking.accelerator.event.notifications.service.model.Notification;
import com.wso2.openbanking.accelerator.event.notifications.service.model.NotificationEvent;
import com.wso2.openbanking.accelerator.event.notifications.service.persistence.EventPollingStoreInitializer;
import com.wso2.openbanking.accelerator.event.notifications.service.realtime.model.RealtimeEventNotification;
import com.wso2.openbanking.accelerator.event.notifications.service.service.EventNotificationGenerator;
import com.wso2.openbanking.accelerator.event.notifications.service.service.EventPollingService;
import com.wso2.openbanking.accelerator.event.notifications.service.util.EventNotificationServiceUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.identity.oauth2.IdentityOAuth2Exception;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

Expand Down Expand Up @@ -63,17 +66,32 @@ public void run() {
List<NotificationEvent> notificationEvents = aggregatedPollingDAO.
getEventsByNotificationID(notificationDTO.getNotificationId());

Notification responseNotification = eventNotificationGenerator.
generateEventNotificationBody(notificationDTO, notificationEvents);
List<EventSubscription> subscriptionList = EventNotificationServiceUtil.getEventSubscriptionService()
.getEventSubscriptionsByClientId(notificationDTO.getClientId());
if (CollectionUtils.isEmpty(subscriptionList)) {
throw new OBEventNotificationException("No subscriptions found for the client ID: " +
notificationDTO.getClientId());
}

String callbackUrl = EventNotificationServiceUtil.getCallbackURL(notificationDTO.getClientId());
for (EventSubscription subscription : subscriptionList) {
List<NotificationEvent> allowedEvents = new ArrayList<>();
notificationEvents.forEach(notificationEvent -> {
if (subscription.getEventTypes().contains(notificationEvent.getEventType())) {
allowedEvents.add(notificationEvent);
}
});

RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification();
realtimeEventNotification.setCallbackUrl(callbackUrl);
realtimeEventNotification.setEventSET(eventNotificationGenerator.generateEventNotification(
Notification.getJsonNode(responseNotification)));
realtimeEventNotification.setNotificationDTO(notificationDTO);
queue.put(realtimeEventNotification); // put the notification into the queue
if (!allowedEvents.isEmpty()) {
Notification responseNotification = eventNotificationGenerator.
generateEventNotificationBody(notificationDTO, allowedEvents);
RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification();
realtimeEventNotification.setCallbackUrl(subscription.getCallbackUrl());
realtimeEventNotification.setEventSET(eventNotificationGenerator.
generateEventNotification(Notification.getJsonNode(responseNotification)));
realtimeEventNotification.setNotificationDTO(notificationDTO);
queue.put(realtimeEventNotification); // put the notification into the queue
}
}
}
} catch (InterruptedException e) {
log.error("Error when adding the Realtime Notification into the RealtimeEventNotification Queue", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private void postWithRetry() throws OBEventNotificationException {

HttpResponse response = httpClient.execute(httpPost);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK) {
if (statusCode == HttpStatus.SC_ACCEPTED) {
if (log.isDebugEnabled()) {
log.debug("Real-time event notification with notificationId: " + notificationId
+ " sent successfully");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.wso2.openbanking.accelerator.event.notifications.service.handler.DefaultEventCreationServiceHandler;
import com.wso2.openbanking.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationRequestGenerator;
import com.wso2.openbanking.accelerator.event.notifications.service.service.EventNotificationGenerator;
import com.wso2.openbanking.accelerator.event.notifications.service.service.EventSubscriptionService;
import net.minidev.json.JSONObject;
import net.minidev.json.parser.JSONParser;
import net.minidev.json.parser.ParseException;
Expand Down Expand Up @@ -135,17 +136,6 @@ public static synchronized ConsentCoreServiceImpl getConsentCoreServiceImpl() {
return consentCoreService;
}

/**
* Get the callback URL of the TPP from the Subscription Object.
*
* @param clientID client ID of the TPP
* @return callback URL of the TPP
*/
public static String getCallbackURL(String clientID) {

return "http://localhost:8080/sample-tpp-server";
}

/**
* Get the default event creation service handler.
*
Expand All @@ -168,4 +158,8 @@ public static EventNotificationErrorDTO getErrorDTO(String error, String errorDe
eventNotificationErrorDTO.setErrorDescription(errorDescription);
return eventNotificationErrorDTO;
}

public static EventSubscriptionService getEventSubscriptionService() {
return new EventSubscriptionService();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.wso2.openbanking.accelerator.event.notifications.service.model.Notification;
import com.wso2.openbanking.accelerator.event.notifications.service.realtime.model.RealtimeEventNotification;
import com.wso2.openbanking.accelerator.event.notifications.service.service.DefaultEventNotificationGenerator;
import com.wso2.openbanking.accelerator.event.notifications.service.service.EventSubscriptionService;
import com.wso2.openbanking.accelerator.event.notifications.service.util.EventNotificationServiceUtil;
import com.wso2.openbanking.accelerator.event.notifications.service.utils.EventNotificationTestUtils;
import net.minidev.json.parser.ParseException;
Expand All @@ -51,7 +52,10 @@ public class EventNotificationProducerServiceTests extends PowerMockTestCase {
@Test
public void testRun() throws OBEventNotificationException, ParseException, InterruptedException {
LinkedBlockingQueue<RealtimeEventNotification> eventQueue = new LinkedBlockingQueue<>();
String callbackUrl = EventNotificationTestConstants.SAMPLE_CALLBACK_URL;

EventSubscriptionService eventSubscriptionService = Mockito.mock(EventSubscriptionService.class);
Mockito.when(eventSubscriptionService.getEventSubscriptionsByClientId(any()))
.thenReturn(EventNotificationTestUtils.getEventSubscrptionList());

DefaultEventNotificationGenerator mockedEventNotificationGenerator =
Mockito.mock(DefaultEventNotificationGenerator.class);
Expand All @@ -63,7 +67,8 @@ public void testRun() throws OBEventNotificationException, ParseException, Inter
mockedEventNotificationGenerator);
PowerMockito.when(EventNotificationServiceUtil.getRealtimeEventNotificationRequestGenerator())
.thenReturn(mockedRealtimeEventNotificationRequestGenerator);
PowerMockito.when(EventNotificationServiceUtil.getCallbackURL(Mockito.any())).thenReturn(callbackUrl);
PowerMockito.when(EventNotificationServiceUtil.getEventSubscriptionService())
.thenReturn(eventSubscriptionService);

EventNotificationDataHolder eventNotificationDataHolderMock = Mockito.mock(EventNotificationDataHolder.class);
Mockito.when(eventNotificationDataHolderMock.getRealtimeEventNotificationQueue()).thenReturn(eventQueue);
Expand Down Expand Up @@ -94,6 +99,6 @@ public void testRun() throws OBEventNotificationException, ParseException, Inter

Assert.assertEquals(notification.getJsonPayload(), testPayload);
Assert.assertEquals(notification.getNotificationId(), EventNotificationTestConstants.SAMPLE_NOTIFICATION_ID);
Assert.assertEquals(notification.getCallbackUrl(), callbackUrl);
Assert.assertEquals(notification.getCallbackUrl(), EventNotificationTestConstants.SAMPLE_CALLBACK_URL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import com.wso2.openbanking.accelerator.event.notifications.service.persistence.EventPollingStoreInitializer;
import com.wso2.openbanking.accelerator.event.notifications.service.realtime.model.RealtimeEventNotification;
import com.wso2.openbanking.accelerator.event.notifications.service.service.DefaultEventNotificationGenerator;
import com.wso2.openbanking.accelerator.event.notifications.service.service.EventSubscriptionService;
import com.wso2.openbanking.accelerator.event.notifications.service.util.EventNotificationServiceUtil;
import com.wso2.openbanking.accelerator.event.notifications.service.utils.EventNotificationTestUtils;
import net.minidev.json.parser.ParseException;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
Expand All @@ -53,9 +56,13 @@
DefaultRealtimeEventNotificationRequestGenerator.class})
public class RealtimeEventNotificationLoaderServiceTest extends PowerMockTestCase {
@Test
public void testRun() throws OBEventNotificationException, InterruptedException {
public void testRun() throws OBEventNotificationException, InterruptedException, ParseException {
LinkedBlockingQueue<RealtimeEventNotification> eventQueue = new LinkedBlockingQueue<>();

EventSubscriptionService eventSubscriptionService = Mockito.mock(EventSubscriptionService.class);
Mockito.when(eventSubscriptionService.getEventSubscriptionsByClientId(any()))
.thenReturn(EventNotificationTestUtils.getEventSubscrptionList());

DefaultEventNotificationGenerator mockedEventNotificationGenerator =
Mockito.mock(DefaultEventNotificationGenerator.class);
DefaultRealtimeEventNotificationRequestGenerator mockedRealtimeEventNotificationRequestGenerator =
Expand All @@ -66,6 +73,8 @@ public void testRun() throws OBEventNotificationException, InterruptedException
mockedEventNotificationGenerator);
PowerMockito.when(EventNotificationServiceUtil.getRealtimeEventNotificationRequestGenerator()).thenReturn(
mockedRealtimeEventNotificationRequestGenerator);
PowerMockito.when(EventNotificationServiceUtil.getEventSubscriptionService())
.thenReturn(eventSubscriptionService);

EventNotificationDataHolder eventNotificationDataHolderMock = Mockito.mock(EventNotificationDataHolder.class);
Mockito.when(eventNotificationDataHolderMock.getRealtimeEventNotificationQueue()).thenReturn(eventQueue);
Expand All @@ -87,6 +96,8 @@ public void testRun() throws OBEventNotificationException, InterruptedException
AggregatedPollingDAOImpl mockAggregatedPollingDAOImpl = Mockito.mock(AggregatedPollingDAOImpl.class);
doReturn(notifications).when(mockAggregatedPollingDAOImpl).getNotificationsByStatus(
EventNotificationConstants.OPEN);
doReturn(EventNotificationTestUtils.getSampleNotificationsList())
.when(mockAggregatedPollingDAOImpl).getEventsByNotificationID(any());
PowerMockito.mockStatic(EventPollingStoreInitializer.class);
PowerMockito.when(EventPollingStoreInitializer.getAggregatedPollingDAO())
.thenReturn(mockAggregatedPollingDAOImpl);
Expand All @@ -113,5 +124,4 @@ public void testRun() throws OBEventNotificationException, InterruptedException
Assert.assertEquals(notification1.getJsonPayload(), testPayload);
Assert.assertEquals(notification2.getJsonPayload(), testPayload);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@ public static NotificationDTO getSampleNotificationDTO() {

return notificationDTO;
}

public static List<EventSubscription> getEventSubscrptionList() {
EventSubscription eventSubscription = new EventSubscription();
eventSubscription.setCallbackUrl(EventNotificationTestConstants.SAMPLE_CALLBACK_URL);
eventSubscription.setEventTypes(getSampleEventTypeList());
List<EventSubscription> eventSubscriptions = new ArrayList<>();
eventSubscriptions.add(eventSubscription);
return eventSubscriptions;
}

public static List<String> getSampleEventTypeList() {
List<String> eventsTypes = new ArrayList<>();
eventsTypes.add(EventNotificationTestConstants.SAMPLE_NOTIFICATION_EVENT_TYPE_1);

return eventsTypes;
}

public static List<NotificationEvent> getSampleNotificationsList() throws ParseException {
List<NotificationEvent> eventsList = new ArrayList<NotificationEvent>();
NotificationEvent notificationEvent = new NotificationEvent();
Expand Down

0 comments on commit 99086fa

Please sign in to comment.