Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add correlation persistence for Infinispan #3617

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.kie.kogito.infinispan.correlation;

public record CorrelationRecord(String encodedCorrelationId, String correlatedId,
String correlation) {

@Override
public String toString() {
return "InfinispanCorrelation[" +
"encodedCorrelationId=" + encodedCorrelationId + ", " +
"correlatedId=" + correlatedId + ", " +
"correlation=" + correlation + ']';
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package org.kie.kogito.infinispan.correlation;

import java.io.UncheckedIOException;
import java.util.Optional;

import org.infinispan.client.hotrod.DefaultTemplate;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.kie.kogito.correlation.CompositeCorrelation;
import org.kie.kogito.correlation.Correlation;
import org.kie.kogito.correlation.CorrelationInstance;
import org.kie.kogito.correlation.SimpleCorrelation;
import org.kie.kogito.internal.utils.ConversionUtils;
import org.kie.kogito.jackson.utils.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;

public class InfinispanCorrelationRepository {

private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanCorrelationRepository.class);

private static final String ENCODED_CORRELATION_ID_FIELD = "encodedCorrelationId";
private static final String CORRELATED_ID_FIELD = "correlatedId";
private static final String CORRELATION_FIELD = "correlation";
public static final String CORRELATIONS_CACHE_NAME = "correlations";

private final RemoteCache<String, String> cache;
private final ObjectMapper objectMapper;

public InfinispanCorrelationRepository(
RemoteCacheManager cacheManager,
String templateName) {
if (ConversionUtils.isEmpty(templateName)) {
this.cache = cacheManager.administration().getOrCreateCache(CORRELATIONS_CACHE_NAME, DefaultTemplate.LOCAL);
} else {
this.cache = cacheManager.administration().getOrCreateCache(CORRELATIONS_CACHE_NAME, templateName);
}
SimpleModule simpleModule = new SimpleModule();
simpleModule.addAbstractTypeMapping(Correlation.class, SimpleCorrelation.class);
this.objectMapper = ObjectMapperFactory.get().copy().registerModule(simpleModule);
}

public CorrelationInstance insert(String encodedCorrelationId, String correlatedId, Correlation correlation) {
CorrelationInstance correlationInstance = new CorrelationInstance(encodedCorrelationId, correlatedId, correlation);

try {
String json = this.objectMapper.writeValueAsString(correlation);
CorrelationRecord correlationRecord = new CorrelationRecord(
encodedCorrelationId, correlatedId, json);
String value = this.objectMapper.writeValueAsString(correlationRecord);
this.cache.put(encodedCorrelationId, value);
return correlationInstance;
} catch (JsonProcessingException e) {
throw new UncheckedIOException(e);
}
}

public CorrelationInstance findByEncodedCorrelationId(String encodedCorrelationId) {
String json = this.cache.get(encodedCorrelationId);
if (json == null) {
return null;
}
try {
CorrelationRecord correlation = this.objectMapper.readValue(json, CorrelationRecord.class);
CompositeCorrelation compositeCorrelation = this.objectMapper.readValue(correlation.correlation(), CompositeCorrelation.class);
return new CorrelationInstance(
correlation.encodedCorrelationId(),
correlation.correlatedId(),
compositeCorrelation);
} catch (JsonProcessingException e) {
throw new UncheckedIOException(e);
}
}

public CorrelationInstance findByCorrelatedId(String correlatedId) {

Optional<String> first = this.cache.values().stream().filter(item -> item.contains("\"correlatedId\":\"%s\"".formatted(correlatedId)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjtirado look here

.findFirst();
if (first.isEmpty()) {
return null;
}

try {
CorrelationRecord correlationRecord = this.objectMapper.readValue(first.get(), CorrelationRecord.class);
CompositeCorrelation compositeCorrelation = this.objectMapper.readValue(correlationRecord.correlation(), CompositeCorrelation.class);
return new CorrelationInstance(
correlationRecord.encodedCorrelationId(),
correlationRecord.correlatedId(),
compositeCorrelation);
} catch (JsonProcessingException e) {
throw new UncheckedIOException(e);
}
}

public void delete(String encodedCorrelationId) {
this.cache.remove(encodedCorrelationId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.kie.kogito.infinispan.correlation;

import java.util.Optional;

import org.kie.kogito.correlation.Correlation;
import org.kie.kogito.correlation.CorrelationEncoder;
import org.kie.kogito.correlation.CorrelationInstance;
import org.kie.kogito.correlation.CorrelationService;
import org.kie.kogito.event.correlation.MD5CorrelationEncoder;

public class InfinispanCorrelationService implements CorrelationService {

private final CorrelationEncoder encoder;

private final InfinispanCorrelationRepository correlationRepository;

public InfinispanCorrelationService(InfinispanCorrelationRepository correlationRepository) {
this.correlationRepository = correlationRepository;
this.encoder = new MD5CorrelationEncoder();

}

@Override
public CorrelationInstance create(Correlation correlation, String correlatedId) {
String encodedCorrelationId = this.encoder.encode(correlation);
return this.correlationRepository.insert(
encodedCorrelationId, correlatedId, correlation);
}

@Override
public Optional<CorrelationInstance> find(Correlation correlation) {
String encodedCorrelationId = this.encoder.encode(correlation);
return Optional.ofNullable(this.correlationRepository.findByEncodedCorrelationId(encodedCorrelationId));
}

@Override
public Optional<CorrelationInstance> findByCorrelatedId(String correlatedId) {
return Optional.ofNullable(this.correlationRepository.findByCorrelatedId(correlatedId));
}

@Override
public void delete(Correlation correlation) {
String encodedCorrelationId = this.encoder.encode(correlation);
this.correlationRepository.delete(encodedCorrelationId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package org.kie.kogito.infinispan.correlation;

import java.util.Collections;
import java.util.Optional;

import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.kie.kogito.correlation.CompositeCorrelation;
import org.kie.kogito.correlation.CorrelationInstance;
import org.kie.kogito.correlation.CorrelationService;
import org.kie.kogito.correlation.SimpleCorrelation;
import org.kie.kogito.testcontainers.KogitoInfinispanContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

@Testcontainers
class CorrelationRecordServiceIT {

@Container
final static KogitoInfinispanContainer inifinispanContainer = new KogitoInfinispanContainer();
private static RemoteCacheManager remoteCacheManager;

@BeforeAll
static void setup() {
inifinispanContainer.start();
ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
configurationBuilder.addServer()
.host("127.0.0.1")
.port(inifinispanContainer.getMappedPort());
remoteCacheManager = new RemoteCacheManager(configurationBuilder.build());
}

@AfterEach
void tearDown() {
remoteCacheManager.getCache(InfinispanCorrelationRepository.CORRELATIONS_CACHE_NAME)
.clear();
}

@Test
void shouldSaveGetCorrelationCorrectly() {
// arrange
InfinispanCorrelationRepository repository = new InfinispanCorrelationRepository(remoteCacheManager, null);
CorrelationService sut = new InfinispanCorrelationService(repository);
String correlatedId = "id";
CompositeCorrelation correlation = new CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city", "Rio de Janeiro")));

// act
sut.create(correlation, correlatedId);

// assert
long quantity = remoteCacheManager.getCache(InfinispanCorrelationRepository.CORRELATIONS_CACHE_NAME)
.keySet()
.size();

remoteCacheManager.getCache(InfinispanCorrelationRepository.CORRELATIONS_CACHE_NAME)
.forEach((key, value) -> System.out.println(value));

assertThat(quantity).isPositive();
}

@Test
void shouldDeleteGetCorrelation() {
// arrange
String correlatedId = "id";
CompositeCorrelation correlation = new CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city", "São Paulo")));
InfinispanCorrelationRepository repository = new InfinispanCorrelationRepository(remoteCacheManager, null);
CorrelationService sut = new InfinispanCorrelationService(repository);
sut.create(correlation, correlatedId);

// act
sut.delete(correlation);

// assert
assertThat(remoteCacheManager.getCache(InfinispanCorrelationRepository.CORRELATIONS_CACHE_NAME).size()).isZero();
}

@Test
void shouldFindByGetCorrelatedId() {
// arrange
String correlatedId = "id";
InfinispanCorrelationRepository repository = new InfinispanCorrelationRepository(remoteCacheManager, null);
InfinispanCorrelationService sut = new InfinispanCorrelationService(repository);
CompositeCorrelation correlation = new CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city", "São Paulo")));
sut.create(correlation, correlatedId);

// act
Optional<CorrelationInstance> byCorrelatedId = sut.findByCorrelatedId(correlatedId);

// assert
assertThat(byCorrelatedId.isPresent()).isTrue();
}

@Test
void shouldFindByCorrelation() {
// arrange
CompositeCorrelation correlation = new CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city", "Osasco")));
String correlatedId = "id";
InfinispanCorrelationRepository repository = new InfinispanCorrelationRepository(remoteCacheManager, null);
InfinispanCorrelationService sut = new InfinispanCorrelationService(repository);
sut.create(correlation, correlatedId);

// act
Optional<CorrelationInstance> correlationInstance = sut.find(correlation);

// assert
assertThat(correlationInstance.isPresent()).isTrue();


}

}