From 71aece516b173526db5445d3ba1f6cdfa956d6eb Mon Sep 17 00:00:00 2001 From: ashirwadadayarathne Date: Thu, 12 Dec 2024 09:07:46 +0530 Subject: [PATCH] Adding realtime notification implementation --- .../accelerators/fs-is/pom.xml | 8 + .../common/util/HTTPClientUtils.java | 12 +- .../accelerator/common/util/JWTUtils.java | 10 +- .../pom.xml | 3 +- .../service/EventCreationService.java | 12 +- .../service/EventPollingService.java | 2 +- .../service/EventSubscriptionService.java | 2 +- .../service/RealtimeNotificationService.java | 144 +++++++++++++ .../constants/EventNotificationConstants.java | 38 ++++ .../internal/EventNotificationComponent.java | 30 ++- .../internal/EventNotificationDataHolder.java | 27 +-- .../model/RealtimeEventNotification.java | 60 ++++++ ...timeEventNotificationRequestGenerator.java | 43 ++++ .../EventNotificationProducerService.java | 105 +++++++++ ...ealtimeEventNotificationLoaderService.java | 95 +++++++++ ...timeEventNotificationRequestGenerator.java | 46 ++++ ...ealtimeEventNotificationSenderService.java | 201 ++++++++++++++++++ ...EventNotificationConsumerJobActivator.java | 91 ++++++++ .../job/EventNotificationConsumerJob.java | 90 ++++++++ ...EventNotificationConsumerJobScheduler.java | 78 +++++++ .../util/EventNotificationServiceUtil.java | 29 ++- 21 files changed, 1070 insertions(+), 56 deletions(-) create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/model/RealtimeEventNotification.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/DefaultRealtimeEventNotificationRequestGenerator.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationRequestGenerator.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/activator/PeriodicalEventNotificationConsumerJobActivator.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/job/EventNotificationConsumerJob.java create mode 100644 financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/scheduler/PeriodicalEventNotificationConsumerJobScheduler.java diff --git a/financial-services-accelerator/accelerators/fs-is/pom.xml b/financial-services-accelerator/accelerators/fs-is/pom.xml index e53ceba6..c33a60a5 100644 --- a/financial-services-accelerator/accelerators/fs-is/pom.xml +++ b/financial-services-accelerator/accelerators/fs-is/pom.xml @@ -48,6 +48,7 @@ **/commons-beanutils-1.9.4.jar **/hibernate-validator-6.0.20.Final.jar **/validation-api-2.0.1.Final.jar + **/quartz-2.3.2.jar @@ -117,6 +118,13 @@ regex="org.wso2.financial.services.accelerator.consent.mgt.dao-(\d.*?)\.jar$"/> + + + + + diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/HTTPClientUtils.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/HTTPClientUtils.java index 633ea10c..59cb2767 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/HTTPClientUtils.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/HTTPClientUtils.java @@ -66,7 +66,7 @@ public class HTTPClientUtils { * @return Closeable https client * @throws FinancialServicesException FinancialServicesException exception */ - @Generated(message = "Ignoring because ServerConfiguration cannot be mocked") + @Generated(message = "Ignoring since method contains no logics") public static CloseableHttpClient getHttpsClient() throws FinancialServicesException { SSLConnectionSocketFactory sslsf = createSSLConnectionSocketFactory(); @@ -89,13 +89,13 @@ public static CloseableHttpClient getHttpsClient() throws FinancialServicesExcep } /** - * Get closeable https client to send realtime event notifications. + * Get closeable https client with given max Total and max per route values. * * @return Closeable https client * @throws FinancialServicesException FinancialServicesException exception */ @Generated(message = "Ignoring since method contains no logics") - public static CloseableHttpClient getRealtimeEventNotificationHttpsClient() throws FinancialServicesException { + public static CloseableHttpClient getHttpsClient(int maxTotal, int maxPerRoute) throws FinancialServicesException { SSLConnectionSocketFactory sslsf = createSSLConnectionSocketFactory(); @@ -109,10 +109,8 @@ public static CloseableHttpClient getRealtimeEventNotificationHttpsClient() thro new PoolingHttpClientConnectionManager(); // configuring default maximum connections - connectionManager.setMaxTotal(FinancialServicesConfigParser.getInstance() - .getRealtimeEventNotificationMaxRetries() + 1); - connectionManager.setDefaultMaxPerRoute(FinancialServicesConfigParser.getInstance() - .getRealtimeEventNotificationMaxRetries() + 1); + connectionManager.setMaxTotal(maxTotal); + connectionManager.setDefaultMaxPerRoute(maxPerRoute); return HttpClients.custom().setConnectionManager(connectionManager).build(); } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/JWTUtils.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/JWTUtils.java index 234fc03f..1b6bace0 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/JWTUtils.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.common/src/main/java/org/wso2/financial/services/accelerator/common/util/JWTUtils.java @@ -373,8 +373,14 @@ public static KeyStore getTrustStore() throws ConsentManagementException { */ public static String signJWTWithDefaultKey(String body) throws Exception { KeyStoreManager keyStoreManager = KeyStoreManager.getInstance(-1234); - Key privateKey = keyStoreManager.getDefaultPrivateKey(); - return generateJWT(body, privateKey); + KeyStore primaryKeyStore = keyStoreManager.getPrimaryKeyStore(); + if (primaryKeyStore != null) { + Key privateKey = keyStoreManager.getDefaultPrivateKey(); + return generateJWT(body, privateKey); + } else { + throw new FinancialServicesRuntimeException("Error while retrieving the Primary Keystore"); + } + } /** diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/pom.xml b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/pom.xml index 4c3f4f99..edc8d0a5 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/pom.xml +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/pom.xml @@ -204,8 +204,9 @@ org.wso2.carbon.identity.oauth2.*;version="${identity.inbound.auth.oauth.version.range}", org.wso2.financial.services.accelerator.common.*;version="${project.version}", org.wso2.financial.services.accelerator.consent.mgt.dao.*;version="${project.version}", - org.wso2.financial.services.accelerator.consent.mgt.service.*;version="${project.version}", + org.wso2.financial.services.accelerator.consent.mgt.service.*;version="${project.version}" + * !org.wso2.financial.services.accelerator.event.notifications.service.internal, org.wso2.financial.services.accelerator.event.notifications.service.*;version="${project.version}", diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventCreationService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventCreationService.java index acefc46a..cd7e4d87 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventCreationService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventCreationService.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.json.JSONObject; +import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser; import org.wso2.financial.services.accelerator.common.util.DatabaseUtils; import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants; import org.wso2.financial.services.accelerator.event.notifications.service.dao.EventNotificationDAO; @@ -29,6 +30,7 @@ import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification; import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationEvent; import org.wso2.financial.services.accelerator.event.notifications.service.persistence.EventNotificationStoreInitializer; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.EventNotificationProducerService; import java.sql.Connection; import java.util.ArrayList; @@ -36,7 +38,7 @@ import java.util.UUID; /** - * This is the event creation service class. + * Event creation service class. */ public class EventCreationService { @@ -64,11 +66,9 @@ public String publishEventNotification(NotificationCreationDTO notificationCreat eventResponse = eventCreationDAO.persistEventNotification(connection, notification, eventsList); DatabaseUtils.commitTransaction(connection); - //TODO: - // Check whether the real time event notification is enabled. -// if (FinancialServicesConfigParser.getInstance().isRealtimeEventNotificationEnabled()) { -// new Thread(new EventNotificationProducerService(notification, eventsList)).start(); -// } + if (FinancialServicesConfigParser.getInstance().isRealtimeEventNotificationEnabled()) { + new Thread(new EventNotificationProducerService(notification, eventsList)).start(); + } return eventResponse; } catch (FSEventNotificationException e) { DatabaseUtils.rollbackTransaction(connection); diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventPollingService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventPollingService.java index e2513844..bcda12d5 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventPollingService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventPollingService.java @@ -42,7 +42,7 @@ import java.util.Map; /** - * This is the event polling service. + * Event polling service. */ public class EventPollingService { diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventSubscriptionService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventSubscriptionService.java index 0c7db316..36266b34 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventSubscriptionService.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/EventSubscriptionService.java @@ -33,7 +33,7 @@ import java.util.List; /** - * This is the event subscription service class. + * Event subscription service class. */ public class EventSubscriptionService { private static final Log log = LogFactory.getLog(EventSubscriptionService.class); diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java new file mode 100644 index 00000000..162b172d --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/RealtimeNotificationService.java @@ -0,0 +1,144 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.json.JSONException; +import org.wso2.financial.services.accelerator.common.util.DatabaseUtils; +import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants; +import org.wso2.financial.services.accelerator.event.notifications.service.dao.EventNotificationDAO; +import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException; +import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification; +import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationEvent; +import org.wso2.financial.services.accelerator.event.notifications.service.persistence.EventNotificationStoreInitializer; + +import java.sql.Connection; +import java.util.List; + +/** + * Realtime Notification service class. + */ +public class RealtimeNotificationService { + + private static final Log log = LogFactory.getLog(RealtimeNotificationService.class); + + /** + * Method to retrieve notification by status. + * + * @param status Notification status to retrieve + * @return List of notifications by status + * @throws FSEventNotificationException Exception when retrieving notifications by status + */ + public List getNotificationsByStatus(String status) + throws FSEventNotificationException { + + EventNotificationDAO eventNotificationDAO = EventNotificationStoreInitializer.getEventNotificationDAO(); + + Connection connection = DatabaseUtils.getDBConnection(); + + try { + List events = eventNotificationDAO. + getNotificationsByStatus(connection, status); + if (log.isDebugEnabled()) { + log.debug(String.format("Event Notifications with %s status retrieved successfully.", + status.replaceAll("[\r\n]", ""))); + } + DatabaseUtils.commitTransaction(connection); + return events; + } catch (FSEventNotificationException e) { + log.error("Error while retrieving event notification.", e); + DatabaseUtils.rollbackTransaction(connection); + throw new FSEventNotificationException(EventNotificationConstants.ERROR_STORING_EVENT_SUBSCRIPTION, e); + } finally { + log.debug(EventNotificationConstants.DATABASE_CONNECTION_CLOSE_LOG_MSG); + DatabaseUtils.closeConnection(connection); + } + } + + /** + * Method to retrieve notifications by NotificationID. + * + * @param notificationId Notification ID to retrieve + * @return List of notifications by notification ID + * @throws FSEventNotificationException Exception when retrieving notifications by notification ID + */ + public List getEventsByNotificationID(String notificationId) + throws FSEventNotificationException { + + EventNotificationDAO eventNotificationDAO = EventNotificationStoreInitializer.getEventNotificationDAO(); + + Connection connection = DatabaseUtils.getDBConnection(); + + try { + //store event subscription data in the database + List notificationEvents = eventNotificationDAO. + getEventsByNotificationID(connection, notificationId); + if (log.isDebugEnabled()) { + log.debug(String.format("Event Notifications with notification id %s retrieved successfully.", + notificationId.replaceAll("[\r\n]", ""))); + } + DatabaseUtils.commitTransaction(connection); + return notificationEvents; + } catch (FSEventNotificationException e) { + log.error("Error while retrieving event notification.", e); + DatabaseUtils.rollbackTransaction(connection); + throw new FSEventNotificationException(EventNotificationConstants.ERROR_STORING_EVENT_SUBSCRIPTION, e); + } finally { + log.debug(EventNotificationConstants.DATABASE_CONNECTION_CLOSE_LOG_MSG); + DatabaseUtils.closeConnection(connection); + } + } + + /** + * Method to update the notification status by ID, allowed values are. + * OPEN,ACK and ERR + * + * @param notificationId Notification ID to update + * @param notificationStatus Notification status to update + * @throws FSEventNotificationException Exception when updating notification status by ID + */ + public void updateNotificationStatusById(String notificationId, + EventNotificationConstants.EventNotificationStatusEnum notificationStatus) + throws FSEventNotificationException { + + Connection connection = DatabaseUtils.getDBConnection(); + + EventNotificationDAO eventNotificationDAO = EventNotificationStoreInitializer.getEventNotificationDAO(); + + try { + //update the stored event notification + eventNotificationDAO.updateNotificationStatusById(connection, notificationId, + notificationStatus.toString()); + + log.debug("Event Notification updated successfully."); + DatabaseUtils.commitTransaction(connection); + } catch (JSONException e) { + log.error("Error while Parsing the stored request Object", e); + throw new FSEventNotificationException("Error while Parsing the stored request Object", e); + } catch (FSEventNotificationException e) { + log.error("Error while updating event notification.", e); + DatabaseUtils.rollbackTransaction(connection); + throw new FSEventNotificationException(e.getMessage(), e); + } finally { + log.debug(EventNotificationConstants.DATABASE_CONNECTION_CLOSE_LOG_MSG); + DatabaseUtils.closeConnection(connection); + } + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/constants/EventNotificationConstants.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/constants/EventNotificationConstants.java index 78943682..7d4caf2e 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/constants/EventNotificationConstants.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/constants/EventNotificationConstants.java @@ -18,6 +18,10 @@ package org.wso2.financial.services.accelerator.event.notifications.service.constants; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + /** * Event Notification Constants. */ @@ -31,6 +35,40 @@ public class EventNotificationConstants { public static final String ERROR = "ERR"; public static final String OPEN = "OPEN"; + /** + * Specifies the Schema Names of Debtor Account. + */ + public enum EventNotificationStatusEnum { + + ACK("ACK"), + + ERROR("ERR"), + + OPEN("OPEN"); + + private final String value; + + EventNotificationStatusEnum(String value) { + this.value = value; + } + + @Override + public String toString() { + return String.valueOf(value); + } + + public static EventNotificationStatusEnum fromValue(String text) { + + List valueList = Arrays.asList(EventNotificationStatusEnum.values()); + Optional accountOpt = valueList + .stream() + .filter(i -> String.valueOf(i.value).equals(text)) + .findAny(); + + return accountOpt.orElse(null); + } + } + //Response Status public static final String NOT_FOUND = "NOTFOUND"; public static final String OK = "OK"; diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java index d6124767..53b6ff8a 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationComponent.java @@ -6,7 +6,7 @@ * in compliance with the License. * You may obtain a copy of the License at *

- * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -29,6 +29,8 @@ import org.wso2.carbon.identity.oauth2.OAuth2Service; import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser; import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigurationService; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationLoaderService; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.activator.PeriodicalEventNotificationConsumerJobActivator; /** * The Component class for activating event notification osgi service. @@ -37,10 +39,11 @@ name = "org.wso2.financial.services.accelerator.event.notifications.service.internal.EventNotificationComponent", immediate = true) public class EventNotificationComponent { - private static Log log = LogFactory.getLog(EventNotificationComponent.class); + + private static final Log log = LogFactory.getLog(EventNotificationComponent.class); @Activate - protected void activate(ComponentContext context) { + protected void activate(ComponentContext context) throws Exception { log.debug("Event Notification Service Component Activated"); // Check if realtime event notification enabled @@ -50,16 +53,16 @@ protected void activate(ComponentContext context) { * Initialize the quartz job for consuming the realtime event notifications * Initialize the thread for producing the open state realtime event notifications */ - //TODO: -// new Thread(new RealtimeEventNotificationLoaderService()).start(); -// new PeriodicalEventNotificationConsumerJobActivator().activate(); + log.debug("Realtime Event Notification Service Activated"); + new Thread(new RealtimeEventNotificationLoaderService()).start(); + new PeriodicalEventNotificationConsumerJobActivator().activate(); } } /** * Setters for the descendent OSGI services of the EventNotificationComponent. * This is added to run the EventNotification OSGI component after the Common module - * @param configService OpenBankingConfigurationService + * @param configurationService FinancialServicesConfigurationService */ @Reference( service = FinancialServicesConfigurationService.class, @@ -67,12 +70,12 @@ protected void activate(ComponentContext context) { policy = ReferencePolicy.DYNAMIC, unbind = "unsetConfigService" ) - public void setConfigService(FinancialServicesConfigurationService configService) { - EventNotificationDataHolder.getInstance().setFinancialServicesConfigurationService(configService); + public void setConfigService(FinancialServicesConfigurationService configurationService) { + EventNotificationDataHolder.getInstance().setConfigService(configurationService); } - public void unsetConfigService(FinancialServicesConfigurationService configService) { - EventNotificationDataHolder.getInstance().setFinancialServicesConfigurationService(null); + public void unsetConfigService(FinancialServicesConfigurationService configurationService) { + EventNotificationDataHolder.getInstance().setConfigService(null); } /** @@ -85,11 +88,6 @@ public void unsetConfigService(FinancialServicesConfigurationService configServi policy = ReferencePolicy.DYNAMIC, unbind = "unsetOAuth2Service" ) - - /** - * Setters for the descendent OSGI services of the EventNotificationComponent. - * @param oAuth2Service OAuth2Service - */ public void setOAuth2Service(OAuth2Service oAuth2Service) { } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java index d16bb2f7..63d08354 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/internal/EventNotificationDataHolder.java @@ -21,6 +21,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigurationService; +import org.wso2.financial.services.accelerator.event.notifications.service.model.RealtimeEventNotification; + +import java.util.concurrent.LinkedBlockingQueue; /** * Data holder for Open Banking Event Notifications. @@ -28,12 +31,11 @@ public class EventNotificationDataHolder { private static Log log = LogFactory.getLog(EventNotificationDataHolder.class); private static volatile EventNotificationDataHolder instance; -// private volatile LinkedBlockingQueue realtimeEventNotificationQueue; + private volatile LinkedBlockingQueue realtimeEventNotificationQueue; private FinancialServicesConfigurationService configService; private EventNotificationDataHolder() { - //TODO -// this.realtimeEventNotificationQueue = new LinkedBlockingQueue<>(); + this.realtimeEventNotificationQueue = new LinkedBlockingQueue<>(); } /** @@ -52,22 +54,21 @@ public static synchronized EventNotificationDataHolder getInstance() { return instance; } -// public LinkedBlockingQueue getRealtimeEventNotificationQueue() { -// return realtimeEventNotificationQueue; -// } + public LinkedBlockingQueue getRealtimeEventNotificationQueue() { + return realtimeEventNotificationQueue; + } - public FinancialServicesConfigurationService getFinancialServicesConfigurationService() { + public void setRealtimeEventNotificationQueue(LinkedBlockingQueue queue) { + this.realtimeEventNotificationQueue = queue; + } + + public FinancialServicesConfigurationService getConfigService() { return configService; } - public void setFinancialServicesConfigurationService( - FinancialServicesConfigurationService configService) { + public void setConfigService(FinancialServicesConfigurationService configService) { this.configService = configService; } - -// public void setRealtimeEventNotificationQueue(LinkedBlockingQueue queue) { -// this.realtimeEventNotificationQueue = queue; -// } } diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/model/RealtimeEventNotification.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/model/RealtimeEventNotification.java new file mode 100644 index 00000000..92ed3483 --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/model/RealtimeEventNotification.java @@ -0,0 +1,60 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.model; + +import org.json.JSONObject; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationRequestGenerator; +import org.wso2.financial.services.accelerator.event.notifications.service.util.EventNotificationServiceUtil; + +/** + * Model class for real time event notifications. + */ +public class RealtimeEventNotification { + private String callbackUrl = null; + private String securityEventToken = null; // Security Event Token to hold the Event Notification Data + private Notification notification = null; + + public void setCallbackUrl(String callbackUrl) { + this.callbackUrl = callbackUrl; + } + + public void setSecurityEventToken(String notification) { + this.securityEventToken = notification; + } + + public void setNotification(Notification notification) { + this.notification = notification; + } + + public String getCallbackUrl() { + return callbackUrl; + } + + public JSONObject getJsonPayload() { + RealtimeEventNotificationRequestGenerator eventNotificationRequestGenerator = + EventNotificationServiceUtil.getRealtimeEventNotificationRequestGenerator(); + return eventNotificationRequestGenerator + .getRealtimeEventNotificationPayload(notification, securityEventToken); + } + + public String getNotificationId() { + return notification.getNotificationId(); + } + +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/DefaultRealtimeEventNotificationRequestGenerator.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/DefaultRealtimeEventNotificationRequestGenerator.java new file mode 100644 index 00000000..a6198eb9 --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/DefaultRealtimeEventNotificationRequestGenerator.java @@ -0,0 +1,43 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service; + +import org.json.JSONObject; +import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification; + +import java.util.HashMap; +import java.util.Map; + +/** + * Default class for realtime event notification request generation. + * This is to generate the realtime event notification request payload and headers. + */ +public class DefaultRealtimeEventNotificationRequestGenerator implements RealtimeEventNotificationRequestGenerator { + + @Override + public JSONObject getRealtimeEventNotificationPayload(Notification notificationDTO, String eventSET) { + return new JSONObject("{\"notificationId\": " + notificationDTO.getNotificationId() + ", \"SET\": " + + eventSET + "}"); + } + + @Override + public Map getAdditionalHeaders() { + return new HashMap<>(); + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java new file mode 100644 index 00000000..54692e26 --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/EventNotificationProducerService.java @@ -0,0 +1,105 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service; + +import com.nimbusds.jose.JOSEException; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.identity.oauth2.IdentityOAuth2Exception; +import org.wso2.financial.services.accelerator.event.notifications.service.EventNotificationGenerator; +import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException; +import org.wso2.financial.services.accelerator.event.notifications.service.internal.EventNotificationDataHolder; +import org.wso2.financial.services.accelerator.event.notifications.service.model.EventSubscription; +import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification; +import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationEvent; +import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationResponse; +import org.wso2.financial.services.accelerator.event.notifications.service.model.RealtimeEventNotification; +import org.wso2.financial.services.accelerator.event.notifications.service.util.EventNotificationServiceUtil; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * This thread is used to produce the event notification and put it into the realtime event notification queue. + */ +public class EventNotificationProducerService implements Runnable { + private static final Log log = LogFactory.getLog(EventNotificationProducerService.class); + private final Notification notification; + private final List notificationEvents; + + public EventNotificationProducerService( + Notification notification, List notificationEvents) { + + this.notification = notification; + this.notificationEvents = notificationEvents; + } + + @Override + public void run() { + + try { + List subscriptionList = EventNotificationServiceUtil.getEventSubscriptionService() + .getEventSubscriptionsByClientId(notification.getClientId()); + if (CollectionUtils.isEmpty(subscriptionList)) { + throw new FSEventNotificationException("No subscriptions found for the client ID: " + + notification.getClientId()); + } + + LinkedBlockingQueue queue = EventNotificationDataHolder.getInstance(). + getRealtimeEventNotificationQueue(); + EventNotificationGenerator eventNotificationGenerator = EventNotificationServiceUtil. + getEventNotificationGenerator(); + + for (EventSubscription subscription : subscriptionList) { + + List allowedEvents = new ArrayList<>(); + notificationEvents.forEach(notificationEvent -> { + if (subscription.getEventTypes().contains(notificationEvent.getEventType())) { + allowedEvents.add(notificationEvent); + } + }); + + if (!allowedEvents.isEmpty() && StringUtils.isNotEmpty(subscription.getCallbackUrl())) { + RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification(); + realtimeEventNotification.setNotification(notification); + realtimeEventNotification.setCallbackUrl(subscription.getCallbackUrl()); + + NotificationResponse notificationResponse = eventNotificationGenerator + .generateEventNotificationBody(notification, allowedEvents); + realtimeEventNotification.setSecurityEventToken(eventNotificationGenerator + .generateEventNotification(NotificationResponse.getJsonNode(notificationResponse))); + + queue.put(realtimeEventNotification); // put the notification into the queue + } + } + } catch (InterruptedException e) { + log.error("Error when adding the Realtime Notification with notification ID " + + notification.getNotificationId().replaceAll("[\r\n]", "") + + " into the RealtimeEventNotification Queue", e); + } catch (FSEventNotificationException e) { + log.error("Error when generating the event notification", e); + } catch (IOException | JOSEException | IdentityOAuth2Exception e) { + log.error("Error while processing event notification JSON object", e); + } + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java new file mode 100644 index 00000000..01fd2e64 --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationLoaderService.java @@ -0,0 +1,95 @@ +/** + * Copyright (c) 2023, WSO2 LLC. (https://www.wso2.com). All Rights Reserved. + * + * This software is the property of WSO2 LLC. and its suppliers, if any. + * Dissemination of any information or reproduction of any material contained + * herein in any form is strictly forbidden, unless permitted by WSO2 expressly. + * You may not alter or remove any copyright or other notice from copies of this content. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service; + +import com.nimbusds.jose.JOSEException; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.identity.oauth2.IdentityOAuth2Exception; +import org.wso2.financial.services.accelerator.event.notifications.service.EventNotificationGenerator; +import org.wso2.financial.services.accelerator.event.notifications.service.RealtimeNotificationService; +import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants; +import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException; +import org.wso2.financial.services.accelerator.event.notifications.service.internal.EventNotificationDataHolder; +import org.wso2.financial.services.accelerator.event.notifications.service.model.EventSubscription; +import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification; +import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationEvent; +import org.wso2.financial.services.accelerator.event.notifications.service.model.NotificationResponse; +import org.wso2.financial.services.accelerator.event.notifications.service.model.RealtimeEventNotification; +import org.wso2.financial.services.accelerator.event.notifications.service.util.EventNotificationServiceUtil; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * This service is used to add open state event notifications to the realtime event notification queue. + * This service is called whenever the server starts. + */ +public class RealtimeEventNotificationLoaderService implements Runnable { + + private static final Log log = LogFactory.getLog(RealtimeEventNotificationLoaderService.class); + + @Override + public void run() { + // Get all open state event notifications from the database and add them to the queue + try { + LinkedBlockingQueue queue = EventNotificationDataHolder.getInstance(). + getRealtimeEventNotificationQueue(); + EventNotificationGenerator eventNotificationGenerator = EventNotificationServiceUtil. + getEventNotificationGenerator(); + RealtimeNotificationService realtimeNotificationService = EventNotificationServiceUtil + .getRealtimeNotificationService(); + List openNotifications = realtimeNotificationService + .getNotificationsByStatus(EventNotificationConstants.OPEN); + + for (Notification notification : openNotifications) { + //Get events by notificationId + List notificationEvents = realtimeNotificationService. + getEventsByNotificationID(notification.getNotificationId()); + + List subscriptionList = EventNotificationServiceUtil.getEventSubscriptionService() + .getEventSubscriptionsByClientId(notification.getClientId()); + if (subscriptionList.isEmpty()) { + throw new FSEventNotificationException("No subscriptions found for the client ID: " + + notification.getClientId()); + } + + for (EventSubscription subscription : subscriptionList) { + List allowedEvents = new ArrayList<>(); + notificationEvents.forEach(notificationEvent -> { + if (subscription.getEventTypes().contains(notificationEvent.getEventType())) { + allowedEvents.add(notificationEvent); + } + }); + + if (!allowedEvents.isEmpty() && StringUtils.isNotEmpty(subscription.getCallbackUrl())) { + NotificationResponse responseNotification = eventNotificationGenerator. + generateEventNotificationBody(notification, allowedEvents); + RealtimeEventNotification realtimeEventNotification = new RealtimeEventNotification(); + realtimeEventNotification.setCallbackUrl(subscription.getCallbackUrl()); + realtimeEventNotification.setSecurityEventToken(eventNotificationGenerator. + generateEventNotification(NotificationResponse.getJsonNode(responseNotification))); + realtimeEventNotification.setNotification(notification); + queue.put(realtimeEventNotification); // put the notification into the queue + } + } + } + } catch (InterruptedException e) { + log.error("Error when adding the Realtime Notification into the RealtimeEventNotification Queue", e); + } catch (FSEventNotificationException e) { + log.error("Error when generating the event notification", e); + } catch (IOException | JOSEException | IdentityOAuth2Exception e) { + log.error("Error while processing event notification JSON object", e); + } + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationRequestGenerator.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationRequestGenerator.java new file mode 100644 index 00000000..703f645a --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationRequestGenerator.java @@ -0,0 +1,46 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service; + +import org.json.JSONObject; +import org.wso2.financial.services.accelerator.event.notifications.service.model.Notification; + +import java.util.Map; + +/** + * Interface for event notification request metadata generation. For custom class extensions the class name + * is to be referred from the realtime_event_notification_request_generator in deployment.toml + */ +public interface RealtimeEventNotificationRequestGenerator { + /** + * This method is to generate realtime event notification payload. To generate custom values + * for the body this method should be extended. + * + * @return String payload + */ + JSONObject getRealtimeEventNotificationPayload(Notification notificationDTO, String eventSET); + + /** + * This method is to generate realtime event notification request headers. To generate custom values + * for the body this method should be extended. + * + * @return Map of headers + */ + Map getAdditionalHeaders(); +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java new file mode 100644 index 00000000..455d481d --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/service/RealtimeEventNotificationSenderService.java @@ -0,0 +1,201 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.service; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.json.JSONObject; +import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser; +import org.wso2.financial.services.accelerator.common.exception.FinancialServicesException; +import org.wso2.financial.services.accelerator.common.util.HTTPClientUtils; +import org.wso2.financial.services.accelerator.event.notifications.service.RealtimeNotificationService; +import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants; +import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException; +import org.wso2.financial.services.accelerator.event.notifications.service.util.EventNotificationServiceUtil; + +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.time.LocalTime; +import java.util.Map; + +/** + * This method is used to send the HTTP requests to the TPP provided callback URL. + * Exponential backoff and Circuit breaker based retry policy is used to retry failed POST requests. + */ +public class RealtimeEventNotificationSenderService implements Runnable { + + private static final Log log = LogFactory.getLog(RealtimeEventNotificationSenderService.class); + + private static final FinancialServicesConfigParser configParser = FinancialServicesConfigParser.getInstance(); + private static final int MAX_RETRIES = configParser.getRealtimeEventNotificationMaxRetries(); + private static final int INITIAL_BACKOFF_TIME_IN_SECONDS = + configParser.getRealtimeEventNotificationInitialBackoffTimeInSeconds(); + private static final String BACKOFF_FUNCTION = + configParser.getRealtimeEventNotificationBackoffFunction().replaceAll("[\r\n]", ""); + private static final int CIRCUIT_BREAKER_OPEN_TIMEOUT_IN_SECONDS = + configParser.getRealtimeEventNotificationCircuitBreakerOpenTimeoutInSeconds(); + private static final int TIMEOUT_IN_SECONDS = configParser.getRealtimeEventNotificationTimeoutInSeconds(); + + private CloseableHttpClient httpClient; + private RealtimeEventNotificationRequestGenerator httpRequestGenerator; + private String notificationId; + private String callbackUrl; + private JSONObject payloadJson; + + public RealtimeEventNotificationSenderService(String callbackUrl, JSONObject payloadJson, + String notificationId) { + int maxRetryCount = FinancialServicesConfigParser.getInstance() + .getRealtimeEventNotificationMaxRetries() + 1; + + try { + this.httpClient = HTTPClientUtils.getHttpsClient(maxRetryCount, maxRetryCount); + } catch (FinancialServicesException e) { + log.error("Failed to initialize the HTTP client for the realtime event notification", e); + } + + this.httpRequestGenerator = EventNotificationServiceUtil.getRealtimeEventNotificationRequestGenerator(); + this.notificationId = notificationId; + this.callbackUrl = callbackUrl; + this.payloadJson = payloadJson; + } + + public void run() { + try { + postWithRetry(); + } catch (FSEventNotificationException e) { + log.error("Failed to send the Real-time event notification with notificationId: " + + notificationId.replaceAll("[\r\n]", ""), e); + } + } + + /** + * This method is used to send the HTTP requests to the TPP provided callback URL. + * Exponential backoff and Circuit breaker based retry policy is used to retry failed POST requests. + * + * @throws FSEventNotificationException + */ + private void postWithRetry() throws FSEventNotificationException { + RealtimeNotificationService realtimeNotificationService = EventNotificationServiceUtil + .getRealtimeNotificationService(); + int retryCount = 0; + long backoffTimeMs = INITIAL_BACKOFF_TIME_IN_SECONDS * 1000L; + boolean circuitBreakerOpen = false; + LocalTime startTime = LocalTime.now(); + + while (retryCount <= MAX_RETRIES && !circuitBreakerOpen) { + try { + // This if closure will execute only if the initial POST request is failed. + // This includes the retry policy and will execute according to the configurations. + if (retryCount > 0) { + if (log.isDebugEnabled()) { + log.debug("HTTP request Retry #" + retryCount + " - waiting for " + + backoffTimeMs + " ms before trying again"); + } + Thread.sleep(backoffTimeMs); + + switch (BACKOFF_FUNCTION) { + case "CONSTANT": + // Backoff time will not be changed + // Retries will happen in constant time frames + break; + case "LINEAR": + // Backoff time will be doubled after each retry + // nextWaitingTime = 2 x previousWaitingTime + backoffTimeMs *= 2; + break; + case "EX": + // Backoff time will be increased exponentially + // nextWaitingTime = startWaitingTime x e^(retryCount) + backoffTimeMs = (long) + (INITIAL_BACKOFF_TIME_IN_SECONDS + * 1000 * Math.exp(retryCount)); + break; + default: + log.error("Invalid backoff function for the realtime event notification retry policy: " + + BACKOFF_FUNCTION); + throw new IllegalArgumentException( + "Invalid backoff function for the realtime event notification retry policy: " + + BACKOFF_FUNCTION); + } + } + + HttpPost httpPost = new HttpPost(URI.create(callbackUrl)); + + for (Map.Entry entry : httpRequestGenerator.getAdditionalHeaders().entrySet()) { + String headerName = entry.getKey(); + String headerValue = entry.getValue(); + httpPost.setHeader(headerName, headerValue); + } + + httpPost.setEntity(new StringEntity(String.valueOf(payloadJson), ContentType.APPLICATION_JSON)); + RequestConfig requestConfig = RequestConfig.custom() + .setConnectTimeout(TIMEOUT_IN_SECONDS * 1000) + .setConnectionRequestTimeout(TIMEOUT_IN_SECONDS * 1000) + .setSocketTimeout(TIMEOUT_IN_SECONDS * 1000) + .build(); + httpPost.setConfig(requestConfig); + + HttpResponse response = httpClient.execute(httpPost); + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == HttpStatus.SC_ACCEPTED) { + if (log.isDebugEnabled()) { + log.debug("Real-time event notification with notificationId: " + + notificationId.replaceAll("[\r\n]", "") + " sent successfully"); + } + realtimeNotificationService.updateNotificationStatusById(notificationId, + EventNotificationConstants.EventNotificationStatusEnum.ACK); + return; + } else { + if (log.isDebugEnabled()) { + log.debug("Real-time event notification with notificationId: " + + notificationId.replaceAll("[\r\n]", "") + + " sent failed with status code: " + statusCode); + } + } + + // Circuit breaker will be opened if the retrying time exceeds the configured circuit breaker timeout. + if (Duration.between(startTime, LocalTime.now()).toMillis() + > CIRCUIT_BREAKER_OPEN_TIMEOUT_IN_SECONDS * 1000) { + circuitBreakerOpen = true; + if (log.isDebugEnabled()) { + log.debug("Circuit breaker open for the realtime event notification with notificationId: " + + notificationId.replaceAll("[\r\n]", "")); + } + } + retryCount++; + + // If the circuit breaker is opened or the maximum retry count is exceeded, + // the notification status will be updated as ERROR. + realtimeNotificationService.updateNotificationStatusById(notificationId, + EventNotificationConstants.EventNotificationStatusEnum.ERROR); + } catch (IOException | InterruptedException | FSEventNotificationException e) { + log.error("Real-time event notification with notificationId: " + + notificationId.replaceAll("[\r\n]", "") + " sent failed", e); + } + } + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/activator/PeriodicalEventNotificationConsumerJobActivator.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/activator/PeriodicalEventNotificationConsumerJobActivator.java new file mode 100644 index 00000000..cb61616c --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/activator/PeriodicalEventNotificationConsumerJobActivator.java @@ -0,0 +1,91 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.activator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.quartz.CronExpression; +import org.quartz.JobDetail; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.Trigger; +import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser; +import org.wso2.financial.services.accelerator.common.util.Generated; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.job.EventNotificationConsumerJob; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.scheduler.PeriodicalEventNotificationConsumerJobScheduler; + +import java.text.ParseException; +import java.util.Date; + +import static org.quartz.JobBuilder.newJob; +import static org.quartz.TriggerBuilder.newTrigger; + +/** + * Scheduled Task definition and trigger to perform realtime event notification sending based on the cron string. + */ +@Generated(message = "Excluding from code coverage") +public class PeriodicalEventNotificationConsumerJobActivator { + + private static Log log = LogFactory.getLog(PeriodicalEventNotificationConsumerJobActivator.class); + + public void activate() { + int cronInSeconds = 60; + String periodicCronExpression = FinancialServicesConfigParser.getInstance() + .getRealtimeEventNotificationSchedulerCronExpression().replaceAll("[\r\n]", ""); + + try { + CronExpression cron = new CronExpression(periodicCronExpression); + + Date nextValidTime = cron.getNextValidTimeAfter(new Date()); + Date secondValidTime = cron.getNextValidTimeAfter(nextValidTime); + + cronInSeconds = (int) (secondValidTime.getTime() - nextValidTime.getTime()) / 1000; + + } catch (ParseException e) { + log.error("Error while parsing the event notification scheduler cron expression : " + + periodicCronExpression, e); + } + + JobDetail job = newJob(EventNotificationConsumerJob.class) + .withIdentity("RealtimeEventNotificationJob", "group2") + .build(); + + Trigger trigger = newTrigger() + .withIdentity("periodicalEvenNotificationTrigger", "group2") + .withSchedule(SimpleScheduleBuilder.simpleSchedule() + .withIntervalInSeconds(cronInSeconds) + .repeatForever()) + .build(); + + try { + Scheduler scheduler = PeriodicalEventNotificationConsumerJobScheduler.getInstance().getScheduler(); + // this check is to remove already stored jobs in clustered mode. + if (scheduler.checkExists(job.getKey())) { + scheduler.deleteJob(job.getKey()); + } + + scheduler.scheduleJob(job, trigger); + log.info("Periodical Realtime Event Notification sender Started with cron : " + + periodicCronExpression); + } catch (SchedulerException e) { + log.error("Error while starting Periodical Realtime Event Notification sender", e); + } + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/job/EventNotificationConsumerJob.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/job/EventNotificationConsumerJob.java new file mode 100644 index 00000000..8f16605a --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/job/EventNotificationConsumerJob.java @@ -0,0 +1,90 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.job; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.json.JSONObject; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.wso2.financial.services.accelerator.common.config.FinancialServicesConfigParser; +import org.wso2.financial.services.accelerator.common.util.Generated; +import org.wso2.financial.services.accelerator.event.notifications.service.internal.EventNotificationDataHolder; +import org.wso2.financial.services.accelerator.event.notifications.service.model.RealtimeEventNotification; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationSenderService; + +import java.util.ArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Scheduled Task to send realtime event notifications to callback Urls. + * This task is scheduled to run periodically. + * This task consumes all the notifications in the queue and send them to the callback urls. + */ +@Generated(message = "Excluding from code coverage") +@DisallowConcurrentExecution +public class EventNotificationConsumerJob implements Job { + + private static final Log log = LogFactory.getLog(EventNotificationConsumerJob.class); + private static final int THREAD_POOL_SIZE = FinancialServicesConfigParser + .getInstance().getEventNotificationThreadPoolSize(); + + @Override + public void execute(JobExecutionContext jobExecutionContext) { + ArrayList notifications = consumeNotifications(); + // send notifications to the callback urls + int threads = Math.min(notifications.size(), THREAD_POOL_SIZE); + int threadPoolSize = Math.max(threads, 2); + + ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize); + + for (RealtimeEventNotification notification : notifications) { + String callbackUrl = notification.getCallbackUrl(); + JSONObject payload = notification.getJsonPayload(); + Runnable worker = new RealtimeEventNotificationSenderService(callbackUrl, + payload, notification.getNotificationId()); + executor.execute(worker); + } + + executor.shutdown(); + } + + private static ArrayList consumeNotifications() { + + LinkedBlockingQueue queue = EventNotificationDataHolder.getInstance() + .getRealtimeEventNotificationQueue(); + ArrayList notifications = new ArrayList<>(); + + // consume all notifications in the queue + int key = 0; + while (!queue.isEmpty() && key < THREAD_POOL_SIZE) { + key++; + try { + RealtimeEventNotification notification = queue.take(); + notifications.add(notification); + } catch (InterruptedException ex) { + log.error("Error while consuming notifications from the event notification queue", ex); + } + } + return notifications; + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/scheduler/PeriodicalEventNotificationConsumerJobScheduler.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/scheduler/PeriodicalEventNotificationConsumerJobScheduler.java new file mode 100644 index 00000000..0c63162d --- /dev/null +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/realtime/util/scheduler/PeriodicalEventNotificationConsumerJobScheduler.java @@ -0,0 +1,78 @@ +/** + * Copyright (c) 2024, WSO2 LLC. (https://www.wso2.com). + *

+ * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.wso2.financial.services.accelerator.event.notifications.service.realtime.util.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.impl.StdSchedulerFactory; +import org.wso2.financial.services.accelerator.common.util.Generated; + +/** + * Periodic realtime event notification job scheduler class. + * This class initialize the scheduler and schedule configured jobs and triggers. + */ +@Generated(message = "Excluding from code coverage") +public class PeriodicalEventNotificationConsumerJobScheduler { + private static volatile PeriodicalEventNotificationConsumerJobScheduler instance; + private static volatile Scheduler scheduler; + private static Log log = LogFactory.getLog(PeriodicalEventNotificationConsumerJobScheduler.class); + + private PeriodicalEventNotificationConsumerJobScheduler() { + initScheduler(); + } + + public static synchronized PeriodicalEventNotificationConsumerJobScheduler getInstance() { + + if (instance == null) { + synchronized (PeriodicalEventNotificationConsumerJobScheduler.class) { + if (instance == null) { + instance = new PeriodicalEventNotificationConsumerJobScheduler(); + } + } + } + return instance; + } + + private void initScheduler() { + + if (instance != null) { + return; + } + synchronized (PeriodicalEventNotificationConsumerJobScheduler.class) { + try { + scheduler = StdSchedulerFactory.getDefaultScheduler(); + scheduler.start(); + } catch (SchedulerException e) { + log.error("Exception while initializing the Real-time Event notification scheduler", e); + } + } + } + + /** + * Returns the scheduler. + * + * @return Scheduler scheduler. + */ + public Scheduler getScheduler() { + return scheduler; + } +} diff --git a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java index b3916f14..496afb29 100644 --- a/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java +++ b/financial-services-accelerator/components/org.wso2.financial.services.accelerator.event.notifications.service/src/main/java/org/wso2/financial/services/accelerator/event/notifications/service/util/EventNotificationServiceUtil.java @@ -31,8 +31,11 @@ import org.wso2.financial.services.accelerator.common.util.Generated; import org.wso2.financial.services.accelerator.consent.mgt.service.impl.ConsentCoreServiceImpl; import org.wso2.financial.services.accelerator.event.notifications.service.EventNotificationGenerator; +import org.wso2.financial.services.accelerator.event.notifications.service.EventSubscriptionService; +import org.wso2.financial.services.accelerator.event.notifications.service.RealtimeNotificationService; import org.wso2.financial.services.accelerator.event.notifications.service.constants.EventNotificationConstants; import org.wso2.financial.services.accelerator.event.notifications.service.exception.FSEventNotificationException; +import org.wso2.financial.services.accelerator.event.notifications.service.realtime.service.RealtimeEventNotificationRequestGenerator; import java.util.Optional; @@ -61,15 +64,14 @@ public static EventNotificationGenerator getEventNotificationGenerator() { * * @return RealtimeEventNotificationRequestGenerator */ - //TODO -// public static RealtimeEventNotificationRequestGenerator getRealtimeEventNotificationRequestGenerator() { -// -// RealtimeEventNotificationRequestGenerator realtimeEventNotificationRequestGenerator = -// (RealtimeEventNotificationRequestGenerator) FinancialServicesUtils -// .getClassInstanceFromFQN(FinancialServicesConfigParser.getInstance(). -// getRealtimeEventNotificationRequestGenerator()); -// return realtimeEventNotificationRequestGenerator; -// } + public static RealtimeEventNotificationRequestGenerator getRealtimeEventNotificationRequestGenerator() { + + RealtimeEventNotificationRequestGenerator realtimeEventNotificationRequestGenerator = + (RealtimeEventNotificationRequestGenerator) FinancialServicesUtils + .getClassInstanceFromFQN(FinancialServicesConfigParser.getInstance(). + getRealtimeEventNotificationRequestGenerator()); + return realtimeEventNotificationRequestGenerator; + } /** * Method to get event JSON from eventInformation payload string. @@ -141,4 +143,13 @@ public static String getErrorDTO(String error, String errorDescription) { eventNotificationError.put(EventNotificationConstants.ERROR_DESCRIPTION_FIELD, errorDescription); return eventNotificationError.toString(); } + + public static EventSubscriptionService getEventSubscriptionService() { + return new EventSubscriptionService(); + } + + public static RealtimeNotificationService getRealtimeNotificationService() { + return new RealtimeNotificationService(); + } + }