Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add correlation persistence for Infinispan #3617

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.kie.kogito.infinispan.correlation;

public record CorrelationType(String encodedCorrelationId, String correlatedId,
String correlation) {

@Override
public String toString() {
return "CorrelationType[" +
"encodedCorrelationId=" + encodedCorrelationId + ", " +
"correlatedId=" + correlatedId + ", " +
"correlation=" + correlation + ']';
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package org.kie.kogito.infinispan.correlation;

import java.io.UncheckedIOException;
import java.util.Optional;

import org.infinispan.client.hotrod.DefaultTemplate;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.kie.kogito.correlation.CompositeCorrelation;
import org.kie.kogito.correlation.Correlation;
import org.kie.kogito.correlation.CorrelationInstance;
import org.kie.kogito.correlation.SimpleCorrelation;
import org.kie.kogito.internal.utils.ConversionUtils;
import org.kie.kogito.jackson.utils.ObjectMapperFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;

public class InfinispanCorrelationRepository {

public static final String CORRELATIONS_CACHE_NAME = "correlations";

// TODO: I would like to use <String, CorrelationType>
private final RemoteCache<String, String> cache;
private final ObjectMapper objectMapper;

public InfinispanCorrelationRepository(
RemoteCacheManager cacheManager,
String templateName) {
if (ConversionUtils.isEmpty(templateName)) {
this.cache = cacheManager.administration().getOrCreateCache(CORRELATIONS_CACHE_NAME, DefaultTemplate.LOCAL);
} else {
this.cache = cacheManager.administration().getOrCreateCache(CORRELATIONS_CACHE_NAME, templateName);
}
SimpleModule simpleModule = new SimpleModule();
simpleModule.addAbstractTypeMapping(Correlation.class, SimpleCorrelation.class);
this.objectMapper = ObjectMapperFactory.get().copy().registerModule(simpleModule);
}

public CorrelationInstance insert(String encodedCorrelationId, String correlatedId, Correlation correlation) {
CorrelationInstance correlationInstance = new CorrelationInstance(encodedCorrelationId, correlatedId, correlation);

try {
// TODO: I would like to remove 2 calls for writeValueAsString
String json = this.objectMapper.writeValueAsString(correlation);
CorrelationType correlationType = new CorrelationType(
encodedCorrelationId, correlatedId, json);
String value = this.objectMapper.writeValueAsString(correlationType);
this.cache.put(encodedCorrelationId, value);
return correlationInstance;
} catch (JsonProcessingException e) {
throw new UncheckedIOException(e);
}
}

public CorrelationInstance findByEncodedCorrelationId(String encodedCorrelationId) {
String json = this.cache.get(encodedCorrelationId);
if (json == null) {
return null;
}
try {
CorrelationType correlation = this.objectMapper.readValue(json, CorrelationType.class);
CompositeCorrelation compositeCorrelation = this.objectMapper.readValue(correlation.correlation(), CompositeCorrelation.class);
return new CorrelationInstance(
correlation.encodedCorrelationId(),
correlation.correlatedId(),
compositeCorrelation);
} catch (JsonProcessingException e) {
throw new UncheckedIOException(e);
}
}

public CorrelationInstance findByCorrelatedId(String correlatedId) {

Optional<String> first = this.cache.values().stream().filter(item -> item.contains("\"correlatedId\":\"%s\"".formatted(correlatedId)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjtirado look here

.findFirst();
if (first.isEmpty()) {
return null;
}

try {
CorrelationType correlationType = this.objectMapper.readValue(first.get(), CorrelationType.class);
CompositeCorrelation compositeCorrelation = this.objectMapper.readValue(correlationType.correlation(), CompositeCorrelation.class);
return new CorrelationInstance(
correlationType.encodedCorrelationId(),
correlationType.correlatedId(),
compositeCorrelation);
} catch (JsonProcessingException e) {
throw new UncheckedIOException(e);
}
}

public void delete(String encodedCorrelationId) {
this.cache.remove(encodedCorrelationId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.kie.kogito.infinispan.correlation;

import java.util.Optional;

import org.kie.kogito.correlation.Correlation;
import org.kie.kogito.correlation.CorrelationEncoder;
import org.kie.kogito.correlation.CorrelationInstance;
import org.kie.kogito.correlation.CorrelationService;
import org.kie.kogito.event.correlation.MD5CorrelationEncoder;

public class InfinispanCorrelationService implements CorrelationService {

private final CorrelationEncoder encoder;

private final InfinispanCorrelationRepository correlationRepository;

public InfinispanCorrelationService(InfinispanCorrelationRepository correlationRepository) {
this.correlationRepository = correlationRepository;
this.encoder = new MD5CorrelationEncoder();

}

@Override
public CorrelationInstance create(Correlation correlation, String correlatedId) {
String encodedCorrelationId = this.encoder.encode(correlation);
return this.correlationRepository.insert(
encodedCorrelationId, correlatedId, correlation);
}

@Override
public Optional<CorrelationInstance> find(Correlation correlation) {
String encodedCorrelationId = this.encoder.encode(correlation);
return Optional.ofNullable(this.correlationRepository.findByEncodedCorrelationId(encodedCorrelationId));
}

@Override
public Optional<CorrelationInstance> findByCorrelatedId(String correlatedId) {
return Optional.ofNullable(this.correlationRepository.findByCorrelatedId(correlatedId));
}

@Override
public void delete(Correlation correlation) {
String encodedCorrelationId = this.encoder.encode(correlation);
this.correlationRepository.delete(encodedCorrelationId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package org.kie.kogito.infinispan.correlation;

import java.util.Collections;
import java.util.Optional;

import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.kie.kogito.correlation.CompositeCorrelation;
import org.kie.kogito.correlation.CorrelationInstance;
import org.kie.kogito.correlation.CorrelationService;
import org.kie.kogito.correlation.SimpleCorrelation;
import org.kie.kogito.testcontainers.KogitoInfinispanContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

@Testcontainers
class InfinispanCorrelationServiceIT {

@Container
final static KogitoInfinispanContainer inifinispanContainer = new KogitoInfinispanContainer();
private static RemoteCacheManager remoteCacheManager;

@BeforeAll
static void setup() {
inifinispanContainer.start();
ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
configurationBuilder.addServer()
.host("127.0.0.1")
.port(inifinispanContainer.getMappedPort());
remoteCacheManager = new RemoteCacheManager(configurationBuilder.build());
}

@AfterEach
void tearDown() {
remoteCacheManager.getCache(InfinispanCorrelationRepository.CORRELATIONS_CACHE_NAME)
.clear();
}

@Test
void shouldSaveGetCorrelationCorrectly() {
// arrange
InfinispanCorrelationRepository repository = new InfinispanCorrelationRepository(remoteCacheManager, null);
CorrelationService sut = new InfinispanCorrelationService(repository);
String correlatedId = "id";
CompositeCorrelation correlation = new CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city", "Rio de Janeiro")));

// act
sut.create(correlation, correlatedId);

// assert
long quantity = remoteCacheManager.getCache(InfinispanCorrelationRepository.CORRELATIONS_CACHE_NAME)
.keySet()
.size();

remoteCacheManager.getCache(InfinispanCorrelationRepository.CORRELATIONS_CACHE_NAME)
.forEach((key, value) -> System.out.println(value));

assertThat(quantity).isPositive();
}

@Test
void shouldDeleteGetCorrelation() {
// arrange
String correlatedId = "id";
CompositeCorrelation correlation = new CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city", "São Paulo")));
InfinispanCorrelationRepository repository = new InfinispanCorrelationRepository(remoteCacheManager, null);
CorrelationService sut = new InfinispanCorrelationService(repository);
sut.create(correlation, correlatedId);

// act
sut.delete(correlation);

// assert
assertThat(remoteCacheManager.getCache(InfinispanCorrelationRepository.CORRELATIONS_CACHE_NAME).size()).isZero();
}

@Test
void shouldFindByGetCorrelatedId() {
// arrange
String correlatedId = "id";
InfinispanCorrelationRepository repository = new InfinispanCorrelationRepository(remoteCacheManager, null);
InfinispanCorrelationService sut = new InfinispanCorrelationService(repository);
CompositeCorrelation correlation = new CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city", "São Paulo")));
sut.create(correlation, correlatedId);

// act
Optional<CorrelationInstance> byCorrelatedId = sut.findByCorrelatedId(correlatedId);

// assert
assertThat(byCorrelatedId.isPresent()).isTrue();
}

@Test
void shouldFindByCorrelation() {
// arrange
CompositeCorrelation correlation = new CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city", "Osasco")));
String correlatedId = "id";
InfinispanCorrelationRepository repository = new InfinispanCorrelationRepository(remoteCacheManager, null);
InfinispanCorrelationService sut = new InfinispanCorrelationService(repository);
sut.create(correlation, correlatedId);

// act
Optional<CorrelationInstance> correlationInstance = sut.find(correlation);

// assert
assertThat(correlationInstance.isPresent()).isTrue();


}

}
Loading