Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OB3] Publishing OB Analytics logs required to implement ELK Support for the OB Accelerator #35

Merged
merged 3 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,13 @@
<Enabled>false</Enabled>
{% endif %}
</APIMAnalytics>
<ELKAnalytics>
{% if open_banking.analytics.elk is defined %}
<Enabled>{{open_banking.analytics.elk.enabled}}</Enabled>
{% else %}
<Enabled>false</Enabled>
{% endif %}
</ELKAnalytics>
<DataPublishing>
{% if open_banking.data_publishing.enable is defined %}
<Enabled>{{open_banking.data_publishing.enable}}</Enabled>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,9 @@ priority = 1000
[open_banking.apim.analytics]
enable=false

[open_banking.analytics.elk]
enabled = false

[open_banking.data_publishing]
enable = false
username="$ref{super_admin.username}@carbon.super"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,9 @@ priority = 1000
[open_banking.apim.analytics]
enable=false

[open_banking.analytics.elk]
enabled = false

[open_banking.data_publishing]
enable = false
username="$ref{super_admin.username}@carbon.super"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,9 @@ priority = 1000
[open_banking.apim.analytics]
enable=false

[open_banking.analytics.elk]
enabled = false

[open_banking.data_publishing]
enable = false
username="$ref{super_admin.username}@carbon.super"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,13 @@
<IdpStep>{{open_banking.sca.idp.step}}</IdpStep>
{% endif %}
</SCA>
<ELKAnalytics>
{% if open_banking.analytics.elk is defined %}
<Enabled>{{open_banking.analytics.elk.enabled}}</Enabled>
{% else %}
<Enabled>false</Enabled>
{% endif %}
</ELKAnalytics>
<DataPublishing>
{% if open_banking.data_publishing.enable is defined %}
<Enabled>{{open_banking.data_publishing.enable}}</Enabled>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ ssa_property_value_for_production = "production"
#required = false
#allowed_values = ["web"]

[open_banking.analytics.elk]
enabled = false

[open_banking.data_publishing]
enable = false
username="$ref{super_admin.username}@carbon.super"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ ssa_property_value_for_production = "production"
#required = false
#allowed_values = ["web"]

[open_banking.analytics.elk]
enabled = false

[open_banking.data_publishing]
enable = false
username="$ref{super_admin.username}@carbon.super"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ ssa_property_value_for_production = "production"
#required = false
#allowed_values = ["web"]

[open_banking.analytics.elk]
enabled = false

[open_banking.data_publishing]
enable = false
username="$ref{super_admin.username}@carbon.super"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright (c) 2023, WSO2 LLC. (https://www.wso2.com).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.wso2.openbanking.accelerator.common.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wso2.openbanking.accelerator.common.exception.OpenBankingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.Map;

/**
* Open Banking common utility class to publish analytics logs
*/
public class AnalyticsLogsUtils {

private static final Log log = LogFactory.getLog(AnalyticsLogsUtils.class);
private static final String LOG_FORMAT = "Data Stream : %s , Data Stream Version : %s , Data : {\"payload\":%s}";
private static final String DATA_PROCESSING_ERROR = "Error occurred while processing the analytics dataset";

/**
* Method to add analytics logs to the OB analytics log file
*
* @param logFile Name of the logger which is used to log analytics data to the log file
* @param dataStream Name of the data stream to which the data belongs
* @param dataVersion Version of the data stream to which the data belongs
* @param analyticsData Data which belongs to the given data stream that needs to be logged via the given logger
*/
public static void addAnalyticsLogs (String logFile, String dataStream, String dataVersion, Map<String,
ChinthakaJ98 marked this conversation as resolved.
Show resolved Hide resolved
Object> analyticsData) throws OpenBankingException {
Log customLog = LogFactory.getLog(logFile);
try {
customLog.info(String.format(LOG_FORMAT, dataStream,
dataVersion, new ObjectMapper().writeValueAsString(analyticsData)));
} catch (JsonProcessingException e) {
log.error(DATA_PROCESSING_ERROR);
throw new OpenBankingException(DATA_PROCESSING_ERROR, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ public class DataPublishingConstants {
public static final String DATA_PUBLISHING_POOL_WAIT_TIME = "DataPublishing.PoolWaitTimeMs";
public static final String DATA_PUBLISHING_PROTOCOL = "DataPublishing.Protocol";
public static final String DATA_PUBLISHING_ENABLED = "DataPublishing.Enabled";
public static final String ELK_ANALYTICS_ENABLED = "ELKAnalytics.Enabled";
public static final String APIM_ANALYTICS_ENABLED = "APIMAnalytics.Enabled";
public static final String QUEUE_SIZE = "DataPublishing.QueueSize";
public static final String WORKER_THREAD_COUNT = "DataPublishing.WorkerThreadCount";

public static final String THRIFT_PUBLISHING_TIMEOUT = "DataPublishing.Thrift.PublishingTimeout";
public static final String LOG_FILE_NAME = "OB_LOG";

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package com.wso2.openbanking.accelerator.data.publisher.common.util;

import com.wso2.openbanking.accelerator.common.config.OpenBankingConfigParser;
import com.wso2.openbanking.accelerator.common.exception.OpenBankingException;
import com.wso2.openbanking.accelerator.common.util.AnalyticsLogsUtils;
import com.wso2.openbanking.accelerator.data.publisher.common.DataPublisherPool;
import com.wso2.openbanking.accelerator.data.publisher.common.EventQueue;
import com.wso2.openbanking.accelerator.data.publisher.common.OpenBankingDataPublisher;
Expand Down Expand Up @@ -60,6 +63,17 @@ public static void releaseDataPublishingInstance(OpenBankingDataPublisher instan
*/
public static void publishData(String streamName, String streamVersion, Map<String, Object> analyticsData) {

// Analytics data will be added to the OB analytics logfile for processing if ELK is configured for the server.
if (Boolean.parseBoolean((String) OpenBankingConfigParser.getInstance().getConfiguration()
.get(DataPublishingConstants.ELK_ANALYTICS_ENABLED))) {
try {
AnalyticsLogsUtils.addAnalyticsLogs(DataPublishingConstants.LOG_FILE_NAME, streamName,
streamVersion, analyticsData);
} catch (OpenBankingException e) {
log.error("Error occurred while writing analytics logs", e);
}
}

if (Boolean.parseBoolean((String) OBAnalyticsDataHolder.getInstance().getConfigurationMap()
.get(DataPublishingConstants.DATA_PUBLISHING_ENABLED))) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,22 @@

package com.wso2.openbanking.accelerator.data.publisher.common;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wso2.openbanking.accelerator.common.config.OpenBankingConfigParser;
import com.wso2.openbanking.accelerator.common.config.OpenBankingConfigurationService;
import com.wso2.openbanking.accelerator.common.exception.OpenBankingRuntimeException;
import com.wso2.openbanking.accelerator.data.publisher.common.internal.OBAnalyticsDataHolder;
import com.wso2.openbanking.accelerator.data.publisher.common.util.OBDataPublisherUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.testng.PowerMockTestCase;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -36,7 +46,12 @@
/**
* Open Banking analytics event queue test.
*/
public class OBAnalyticsEventQueueTest {
@PowerMockIgnore({"jdk.internal.reflect.*"})
@PrepareForTest({OpenBankingConfigParser.class})
public class OBAnalyticsEventQueueTest extends PowerMockTestCase {

@Mock
OpenBankingConfigParser openBankingConfigParser;

private static ByteArrayOutputStream outContent;
private static Logger logger = null;
Expand All @@ -45,6 +60,7 @@ public class OBAnalyticsEventQueueTest {
@BeforeClass
public void beforeTests() {

MockitoAnnotations.initMocks(this);
outContent = new ByteArrayOutputStream();
printStream = new PrintStream(outContent);
System.setOut(printStream);
Expand All @@ -59,6 +75,12 @@ public void testAddingDataToQueue() {
configs.put("DataPublishing.WorkerThreadCount", "3");
configs.put("DataPublishing.QueueSize", "10");
configs.put("DataPublishing.Enabled", "true");
configs.put("ELKAnalytics.Enabled", "true");

PowerMockito.mockStatic(OpenBankingConfigParser.class);
Mockito.when(OpenBankingConfigParser.getInstance())
.thenReturn(openBankingConfigParser);
Mockito.when(openBankingConfigParser.getConfiguration()).thenReturn(configs);

OpenBankingConfigurationService openBankingConfigurationService =
Mockito.mock(OpenBankingConfigurationService.class);
Expand All @@ -67,7 +89,14 @@ public void testAddingDataToQueue() {
OBAnalyticsDataHolder.getInstance().setOpenBankingConfigurationService(openBankingConfigurationService);

OBDataPublisherUtil.publishData("testStream", "1.0", configs);
Assert.assertTrue(outContent.toString().isEmpty());
try {
Assert.assertTrue(outContent.toString().contains("Data Stream : testStream , Data Stream Version : 1.0 , " +
"Data : {\"payload\":" + new ObjectMapper().writeValueAsString(configs) + "}"));
Assert.assertFalse(outContent.toString().contains("Data publishing is disabled. " +
"Failed to obtain a data publisher instance."));
} catch (JsonProcessingException e) {
throw new OpenBankingRuntimeException("Error in processing JSON payload", e);
}
}

@Test
Expand All @@ -78,6 +107,12 @@ public void tryAddingToQueueWhenDataPublishingDisabled() {
configs.put("DataPublishing.WorkerThreadCount", "3");
configs.put("DataPublishing.QueueSize", "10");
configs.put("DataPublishing.Enabled", "false");
configs.put("ELKAnalytics.Enabled", "true");

PowerMockito.mockStatic(OpenBankingConfigParser.class);
Mockito.when(OpenBankingConfigParser.getInstance())
.thenReturn(openBankingConfigParser);
Mockito.when(openBankingConfigParser.getConfiguration()).thenReturn(configs);

OpenBankingConfigurationService openBankingConfigurationService =
Mockito.mock(OpenBankingConfigurationService.class);
Expand Down
Loading