diff --git a/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java b/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java index 72da33fb..49280ba3 100644 --- a/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java +++ b/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java @@ -22,17 +22,20 @@ import com.wso2.openbanking.accelerator.event.notifications.service.dto.NotificationDTO; import com.wso2.openbanking.accelerator.event.notifications.service.exceptions.OBEventNotificationException; import com.wso2.openbanking.accelerator.event.notifications.service.internal.EventNotificationDataHolder; +import com.wso2.openbanking.accelerator.event.notifications.service.model.EventSubscription; import com.wso2.openbanking.accelerator.event.notifications.service.model.Notification; import com.wso2.openbanking.accelerator.event.notifications.service.model.NotificationEvent; import com.wso2.openbanking.accelerator.event.notifications.service.realtime.model.RealtimeEventNotification; import com.wso2.openbanking.accelerator.event.notifications.service.service.EventNotificationGenerator; import com.wso2.openbanking.accelerator.event.notifications.service.service.EventPollingService; import com.wso2.openbanking.accelerator.event.notifications.service.util.EventNotificationServiceUtil; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.identity.oauth2.IdentityOAuth2Exception; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; @@ -52,23 +55,42 @@ public EventNotificationProducerService( @Override public void run() { - String callbackUrl = EventNotificationServiceUtil.getCallbackURL(notificationDTO.getClientId()); - - LinkedBlockingQueue queue = EventNotificationDataHolder.getInstance(). - getRealtimeEventNotificationQueue(); - EventNotificationGenerator eventNotificationGenerator = EventNotificationServiceUtil. - getEventNotificationGenerator(); - RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification(); - realtimeEventNotification.setNotificationDTO(notificationDTO); - realtimeEventNotification.setCallbackUrl(callbackUrl); try { - Notification notification = eventNotificationGenerator.generateEventNotificationBody( - notificationDTO, notificationEvents); - realtimeEventNotification.setEventSET(eventNotificationGenerator.generateEventNotification( - Notification.getJsonNode(notification))); + List subscriptionList = EventNotificationServiceUtil.getEventSubscriptionService() + .getEventSubscriptionsByClientId(notificationDTO.getClientId()); + if (CollectionUtils.isEmpty(subscriptionList)) { + throw new OBEventNotificationException("No subscriptions found for the client ID: " + + notificationDTO.getClientId()); + } + + LinkedBlockingQueue queue = EventNotificationDataHolder.getInstance(). + getRealtimeEventNotificationQueue(); + EventNotificationGenerator eventNotificationGenerator = EventNotificationServiceUtil. + getEventNotificationGenerator(); + + for (EventSubscription subscription : subscriptionList) { + + List allowedEvents = new ArrayList<>(); + notificationEvents.forEach(notificationEvent -> { + if (subscription.getEventTypes().contains(notificationEvent.getEventType())) { + allowedEvents.add(notificationEvent); + } + }); + + if (!allowedEvents.isEmpty()) { + RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification(); + realtimeEventNotification.setNotificationDTO(notificationDTO); + realtimeEventNotification.setCallbackUrl(subscription.getCallbackUrl()); + + Notification notification = eventNotificationGenerator.generateEventNotificationBody( + notificationDTO, allowedEvents); + realtimeEventNotification.setEventSET(eventNotificationGenerator + .generateEventNotification(Notification.getJsonNode(notification))); - queue.put(realtimeEventNotification); // put the notification into the queue + queue.put(realtimeEventNotification); // put the notification into the queue + } + } } catch (InterruptedException e) { log.error("Error when adding the Realtime Notification with notification ID " + notificationDTO.getNotificationId() + " into the RealtimeEventNotification Queue", e); diff --git a/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java b/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java index f31ff3bf..df4ce693 100644 --- a/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java +++ b/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java @@ -24,6 +24,7 @@ import com.wso2.openbanking.accelerator.event.notifications.service.dto.NotificationDTO; import com.wso2.openbanking.accelerator.event.notifications.service.exceptions.OBEventNotificationException; import com.wso2.openbanking.accelerator.event.notifications.service.internal.EventNotificationDataHolder; +import com.wso2.openbanking.accelerator.event.notifications.service.model.EventSubscription; import com.wso2.openbanking.accelerator.event.notifications.service.model.Notification; import com.wso2.openbanking.accelerator.event.notifications.service.model.NotificationEvent; import com.wso2.openbanking.accelerator.event.notifications.service.persistence.EventPollingStoreInitializer; @@ -31,11 +32,13 @@ import com.wso2.openbanking.accelerator.event.notifications.service.service.EventNotificationGenerator; import com.wso2.openbanking.accelerator.event.notifications.service.service.EventPollingService; import com.wso2.openbanking.accelerator.event.notifications.service.util.EventNotificationServiceUtil; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.identity.oauth2.IdentityOAuth2Exception; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; @@ -63,17 +66,32 @@ public void run() { List notificationEvents = aggregatedPollingDAO. getEventsByNotificationID(notificationDTO.getNotificationId()); - Notification responseNotification = eventNotificationGenerator. - generateEventNotificationBody(notificationDTO, notificationEvents); + List subscriptionList = EventNotificationServiceUtil.getEventSubscriptionService() + .getEventSubscriptionsByClientId(notificationDTO.getClientId()); + if (CollectionUtils.isEmpty(subscriptionList)) { + throw new OBEventNotificationException("No subscriptions found for the client ID: " + + notificationDTO.getClientId()); + } - String callbackUrl = EventNotificationServiceUtil.getCallbackURL(notificationDTO.getClientId()); + for (EventSubscription subscription : subscriptionList) { + List allowedEvents = new ArrayList<>(); + notificationEvents.forEach(notificationEvent -> { + if (subscription.getEventTypes().contains(notificationEvent.getEventType())) { + allowedEvents.add(notificationEvent); + } + }); - RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification(); - realtimeEventNotification.setCallbackUrl(callbackUrl); - realtimeEventNotification.setEventSET(eventNotificationGenerator.generateEventNotification( - Notification.getJsonNode(responseNotification))); - realtimeEventNotification.setNotificationDTO(notificationDTO); - queue.put(realtimeEventNotification); // put the notification into the queue + if (!allowedEvents.isEmpty()) { + Notification responseNotification = eventNotificationGenerator. + generateEventNotificationBody(notificationDTO, allowedEvents); + RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification(); + realtimeEventNotification.setCallbackUrl(subscription.getCallbackUrl()); + realtimeEventNotification.setEventSET(eventNotificationGenerator. + generateEventNotification(Notification.getJsonNode(responseNotification))); + realtimeEventNotification.setNotificationDTO(notificationDTO); + queue.put(realtimeEventNotification); // put the notification into the queue + } + } } } catch (InterruptedException e) { log.error("Error when adding the Realtime Notification into the RealtimeEventNotification Queue", e); diff --git a/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java b/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java index 8b2ae6c9..676ce5e1 100644 --- a/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java +++ b/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java @@ -157,7 +157,7 @@ private void postWithRetry() throws OBEventNotificationException { HttpResponse response = httpClient.execute(httpPost); int statusCode = response.getStatusLine().getStatusCode(); - if (statusCode == HttpStatus.SC_OK) { + if (statusCode == HttpStatus.SC_ACCEPTED) { if (log.isDebugEnabled()) { log.debug("Real-time event notification with notificationId: " + notificationId + " sent successfully"); diff --git a/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java b/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java index a83a9cc7..78a3bff9 100644 --- a/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java +++ b/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/main/java/com/wso2/openbanking/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java @@ -29,6 +29,7 @@ import com.wso2.openbanking.accelerator.event.notifications.service.handler.DefaultEventCreationServiceHandler; import com.wso2.openbanking.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationRequestGenerator; import com.wso2.openbanking.accelerator.event.notifications.service.service.EventNotificationGenerator; +import com.wso2.openbanking.accelerator.event.notifications.service.service.EventSubscriptionService; import net.minidev.json.JSONObject; import net.minidev.json.parser.JSONParser; import net.minidev.json.parser.ParseException; @@ -135,17 +136,6 @@ public static synchronized ConsentCoreServiceImpl getConsentCoreServiceImpl() { return consentCoreService; } - /** - * Get the callback URL of the TPP from the Subscription Object. - * - * @param clientID client ID of the TPP - * @return callback URL of the TPP - */ - public static String getCallbackURL(String clientID) { - - return "http://localhost:8080/sample-tpp-server"; - } - /** * Get the default event creation service handler. * @@ -168,4 +158,8 @@ public static EventNotificationErrorDTO getErrorDTO(String error, String errorDe eventNotificationErrorDTO.setErrorDescription(errorDescription); return eventNotificationErrorDTO; } + + public static EventSubscriptionService getEventSubscriptionService() { + return new EventSubscriptionService(); + } } diff --git a/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/test/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/EventNotificationProducerServiceTests.java b/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/test/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/EventNotificationProducerServiceTests.java index 7f58cdf0..19d6fd30 100644 --- a/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/test/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/EventNotificationProducerServiceTests.java +++ b/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/test/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/EventNotificationProducerServiceTests.java @@ -25,6 +25,7 @@ import com.wso2.openbanking.accelerator.event.notifications.service.model.Notification; import com.wso2.openbanking.accelerator.event.notifications.service.realtime.model.RealtimeEventNotification; import com.wso2.openbanking.accelerator.event.notifications.service.service.DefaultEventNotificationGenerator; +import com.wso2.openbanking.accelerator.event.notifications.service.service.EventSubscriptionService; import com.wso2.openbanking.accelerator.event.notifications.service.util.EventNotificationServiceUtil; import com.wso2.openbanking.accelerator.event.notifications.service.utils.EventNotificationTestUtils; import net.minidev.json.parser.ParseException; @@ -51,7 +52,10 @@ public class EventNotificationProducerServiceTests extends PowerMockTestCase { @Test public void testRun() throws OBEventNotificationException, ParseException, InterruptedException { LinkedBlockingQueue eventQueue = new LinkedBlockingQueue<>(); - String callbackUrl = EventNotificationTestConstants.SAMPLE_CALLBACK_URL; + + EventSubscriptionService eventSubscriptionService = Mockito.mock(EventSubscriptionService.class); + Mockito.when(eventSubscriptionService.getEventSubscriptionsByClientId(any())) + .thenReturn(EventNotificationTestUtils.getEventSubscrptionList()); DefaultEventNotificationGenerator mockedEventNotificationGenerator = Mockito.mock(DefaultEventNotificationGenerator.class); @@ -63,7 +67,8 @@ public void testRun() throws OBEventNotificationException, ParseException, Inter mockedEventNotificationGenerator); PowerMockito.when(EventNotificationServiceUtil.getRealtimeEventNotificationRequestGenerator()) .thenReturn(mockedRealtimeEventNotificationRequestGenerator); - PowerMockito.when(EventNotificationServiceUtil.getCallbackURL(Mockito.any())).thenReturn(callbackUrl); + PowerMockito.when(EventNotificationServiceUtil.getEventSubscriptionService()) + .thenReturn(eventSubscriptionService); EventNotificationDataHolder eventNotificationDataHolderMock = Mockito.mock(EventNotificationDataHolder.class); Mockito.when(eventNotificationDataHolderMock.getRealtimeEventNotificationQueue()).thenReturn(eventQueue); @@ -94,6 +99,6 @@ public void testRun() throws OBEventNotificationException, ParseException, Inter Assert.assertEquals(notification.getJsonPayload(), testPayload); Assert.assertEquals(notification.getNotificationId(), EventNotificationTestConstants.SAMPLE_NOTIFICATION_ID); - Assert.assertEquals(notification.getCallbackUrl(), callbackUrl); + Assert.assertEquals(notification.getCallbackUrl(), EventNotificationTestConstants.SAMPLE_CALLBACK_URL); } } diff --git a/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/test/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderServiceTest.java b/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/test/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderServiceTest.java index b0a2595e..c04660a3 100644 --- a/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/test/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderServiceTest.java +++ b/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/test/java/com/wso2/openbanking/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderServiceTest.java @@ -28,7 +28,10 @@ import com.wso2.openbanking.accelerator.event.notifications.service.persistence.EventPollingStoreInitializer; import com.wso2.openbanking.accelerator.event.notifications.service.realtime.model.RealtimeEventNotification; import com.wso2.openbanking.accelerator.event.notifications.service.service.DefaultEventNotificationGenerator; +import com.wso2.openbanking.accelerator.event.notifications.service.service.EventSubscriptionService; import com.wso2.openbanking.accelerator.event.notifications.service.util.EventNotificationServiceUtil; +import com.wso2.openbanking.accelerator.event.notifications.service.utils.EventNotificationTestUtils; +import net.minidev.json.parser.ParseException; import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -53,9 +56,13 @@ DefaultRealtimeEventNotificationRequestGenerator.class}) public class RealtimeEventNotificationLoaderServiceTest extends PowerMockTestCase { @Test - public void testRun() throws OBEventNotificationException, InterruptedException { + public void testRun() throws OBEventNotificationException, InterruptedException, ParseException { LinkedBlockingQueue eventQueue = new LinkedBlockingQueue<>(); + EventSubscriptionService eventSubscriptionService = Mockito.mock(EventSubscriptionService.class); + Mockito.when(eventSubscriptionService.getEventSubscriptionsByClientId(any())) + .thenReturn(EventNotificationTestUtils.getEventSubscrptionList()); + DefaultEventNotificationGenerator mockedEventNotificationGenerator = Mockito.mock(DefaultEventNotificationGenerator.class); DefaultRealtimeEventNotificationRequestGenerator mockedRealtimeEventNotificationRequestGenerator = @@ -66,6 +73,8 @@ public void testRun() throws OBEventNotificationException, InterruptedException mockedEventNotificationGenerator); PowerMockito.when(EventNotificationServiceUtil.getRealtimeEventNotificationRequestGenerator()).thenReturn( mockedRealtimeEventNotificationRequestGenerator); + PowerMockito.when(EventNotificationServiceUtil.getEventSubscriptionService()) + .thenReturn(eventSubscriptionService); EventNotificationDataHolder eventNotificationDataHolderMock = Mockito.mock(EventNotificationDataHolder.class); Mockito.when(eventNotificationDataHolderMock.getRealtimeEventNotificationQueue()).thenReturn(eventQueue); @@ -87,6 +96,8 @@ public void testRun() throws OBEventNotificationException, InterruptedException AggregatedPollingDAOImpl mockAggregatedPollingDAOImpl = Mockito.mock(AggregatedPollingDAOImpl.class); doReturn(notifications).when(mockAggregatedPollingDAOImpl).getNotificationsByStatus( EventNotificationConstants.OPEN); + doReturn(EventNotificationTestUtils.getSampleNotificationsList()) + .when(mockAggregatedPollingDAOImpl).getEventsByNotificationID(any()); PowerMockito.mockStatic(EventPollingStoreInitializer.class); PowerMockito.when(EventPollingStoreInitializer.getAggregatedPollingDAO()) .thenReturn(mockAggregatedPollingDAOImpl); @@ -113,5 +124,4 @@ public void testRun() throws OBEventNotificationException, InterruptedException Assert.assertEquals(notification1.getJsonPayload(), testPayload); Assert.assertEquals(notification2.getJsonPayload(), testPayload); } - } diff --git a/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/test/java/com/wso2/openbanking/accelerator/event/notifications/service/utils/EventNotificationTestUtils.java b/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/test/java/com/wso2/openbanking/accelerator/event/notifications/service/utils/EventNotificationTestUtils.java index 2878f1a8..e7c22816 100644 --- a/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/test/java/com/wso2/openbanking/accelerator/event/notifications/service/utils/EventNotificationTestUtils.java +++ b/open-banking-accelerator/components/event-notifications/com.wso2.openbanking.accelerator.event.notifications.service/src/test/java/com/wso2/openbanking/accelerator/event/notifications/service/utils/EventNotificationTestUtils.java @@ -63,6 +63,23 @@ public static NotificationDTO getSampleNotificationDTO() { return notificationDTO; } + + public static List getEventSubscrptionList() { + EventSubscription eventSubscription = new EventSubscription(); + eventSubscription.setCallbackUrl(EventNotificationTestConstants.SAMPLE_CALLBACK_URL); + eventSubscription.setEventTypes(getSampleEventTypeList()); + List eventSubscriptions = new ArrayList<>(); + eventSubscriptions.add(eventSubscription); + return eventSubscriptions; + } + + public static List getSampleEventTypeList() { + List eventsTypes = new ArrayList<>(); + eventsTypes.add(EventNotificationTestConstants.SAMPLE_NOTIFICATION_EVENT_TYPE_1); + + return eventsTypes; + } + public static List getSampleNotificationsList() throws ParseException { List eventsList = new ArrayList(); NotificationEvent notificationEvent = new NotificationEvent();