Skip to content

Commit

Permalink
Merge pull request #703 from jbiblio/importer-metrics
Browse files Browse the repository at this point in the history
Export metrics about the import process
nitram509 authored Dec 27, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents bfae739 + 13dfd3f commit 5bce254
Showing 12 changed files with 237 additions and 49 deletions.
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ The application imports the data from Zeebe using the [Hazelcast exporter](https

### Upgrading from a prior version

See the [upgrade instructions](./UPGRADE.md).
See the [upgrade instructions](UPGRADE.md).

### Docker

@@ -297,6 +297,15 @@ Refer to the [docker-compose file](docker/docker-compose.yml) for a sample confi
Please be aware that when connecting to a Redis cluster you must activate
the `useClusterClient` option.

## Metrics
The monitor exports a couple of metrics via the usual `/actuator/prometheus` endpoint.

In addition to the default metrics that are available via Spring Boot, there are some metrics exported specific
- for the import process (e.g. number of imported process instances)
- Hazelcast's ringbuffer.

All metrics are prefixed with `zeebemonitor_importer`.

## Code of Conduct

This project adheres to the Contributor Covenant [Code of
Original file line number Diff line number Diff line change
@@ -3,8 +3,6 @@
import com.hazelcast.core.HazelcastInstance;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.hazelcast.connect.java.ZeebeHazelcast;
import io.zeebe.monitor.entity.HazelcastConfig;
import io.zeebe.monitor.repository.HazelcastConfigRepository;
import io.zeebe.monitor.zeebe.protobuf.importers.ErrorProtobufImporter;
import io.zeebe.monitor.zeebe.protobuf.importers.IncidentProtobufImporter;
import io.zeebe.monitor.zeebe.protobuf.importers.JobProtobufImporter;
@@ -30,21 +28,9 @@ public class HazelcastImportService {
@Autowired private TimerProtobufImporter timerImporter;
@Autowired private ErrorProtobufImporter errorImporter;

@Autowired private HazelcastConfigRepository hazelcastConfigRepository;
@Autowired private HazelcastStateService hazelcastStateService;

public ZeebeHazelcast importFrom(final HazelcastInstance hazelcast) {

final var hazelcastConfig =
hazelcastConfigRepository
.findById("cfg")
.orElseGet(
() -> {
final var config = new HazelcastConfig();
config.setId("cfg");
config.setSequence(-1);
return config;
});

final var builder =
ZeebeHazelcast.newBuilder(hazelcast)
.addProcessListener(
@@ -79,13 +65,11 @@ record ->
messageSubscriptionImporter::importMessageStartEventSubscription))
.addErrorListener(errorImporter::importError)
.postProcessListener(
sequence -> {
hazelcastConfig.setSequence(sequence);
hazelcastConfigRepository.save(hazelcastConfig);
});
hazelcastStateService::saveSequenceNumber);

if (hazelcastConfig.getSequence() >= 0) {
builder.readFrom(hazelcastConfig.getSequence());
final var lastSequence = hazelcastStateService.getLastSequenceNumber();
if (lastSequence >= 0) {
builder.readFrom(lastSequence);
} else {
builder.readFromHead();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.zeebe.monitor.zeebe.hazelcast;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.zeebe.monitor.entity.HazelcastConfig;
import io.zeebe.monitor.repository.HazelcastConfigRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

/**
* The HazelcastStateService manages the current pointer of the Hazelcast import process.
* <p>
* That pointer is required to read the next-relevant message from the RingBuffer
* <p>
* Usually, that RingBuffer is read 1 by 1, but sometimes, the RingBuffer may overrun by the export process,
* and in that case, the Import process will set the sequence to the current position of the RingBuffer.
*/
@Component
public class HazelcastStateService {

private final HazelcastConfigRepository hazelcastConfigRepository;
private final Counter sequenceCounter;

@Autowired
public HazelcastStateService(HazelcastConfigRepository hazelcastConfigRepository, MeterRegistry meterRegistry) {
this.hazelcastConfigRepository = hazelcastConfigRepository;

sequenceCounter = Counter.builder("zeebemonitor_importer_ringbuffer_sequences_read").
description("number of items read from Hazelcast's ringbuffer (sequence counter)").
register(meterRegistry);
}

public long getLastSequenceNumber() {
return getHazelcastConfig().getSequence();
}

@Transactional
public void saveSequenceNumber(long sequence) {
HazelcastConfig config = getHazelcastConfig();

long prev = config.getSequence();

config.setSequence(sequence);

hazelcastConfigRepository.save(config);

sequenceCounter.increment(sequence - prev);
}

private HazelcastConfig getHazelcastConfig() {
return hazelcastConfigRepository
.findById("cfg")
.orElseGet(
() -> {
final var config = new HazelcastConfig();
config.setId("cfg");
config.setSequence(-1);
return config;
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.ErrorEntity;
import io.zeebe.monitor.repository.ErrorRepository;
@@ -9,7 +11,15 @@
@Component
public class ErrorProtobufImporter {

@Autowired private ErrorRepository errorRepository;
private final ErrorRepository errorRepository;
private final Counter counter;

@Autowired
public ErrorProtobufImporter(ErrorRepository errorRepository, MeterRegistry meterRegistry) {
this.errorRepository = errorRepository;

this.counter = Counter.builder("zeebemonitor_importer_error").description("number of processed errors").register(meterRegistry);
}

public void importError(final Schema.ErrorRecord record) {

@@ -32,5 +42,7 @@ public void importError(final Schema.ErrorRecord record) {
});

errorRepository.save(entity);

counter.increment();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.IncidentEntity;
import io.zeebe.monitor.repository.IncidentRepository;
@@ -10,7 +12,17 @@
@Component
public class IncidentProtobufImporter {

@Autowired private IncidentRepository incidentRepository;
private final IncidentRepository incidentRepository;
private final Counter createdCounter;
private final Counter resolvedCounter;

@Autowired
public IncidentProtobufImporter(IncidentRepository incidentRepository, MeterRegistry meterRegistry) {
this.incidentRepository = incidentRepository;

createdCounter = Counter.builder("zeebemonitor_importer_incident").tag("action", "created").description("number of processed incidents").register(meterRegistry);
resolvedCounter = Counter.builder("zeebemonitor_importer_incident").tag("action", "resolved").description("number of processed incidents").register(meterRegistry);
}

public void importIncident(final Schema.IncidentRecord record) {

@@ -39,9 +51,13 @@ public void importIncident(final Schema.IncidentRecord record) {
entity.setCreated(timestamp);
incidentRepository.save(entity);

createdCounter.increment();

} else if (intent == IncidentIntent.RESOLVED) {
entity.setResolved(timestamp);
incidentRepository.save(entity);

resolvedCounter.increment();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.JobEntity;
import io.zeebe.monitor.repository.JobRepository;
@@ -10,7 +12,15 @@
@Component
public class JobProtobufImporter {

@Autowired private JobRepository jobRepository;
private final JobRepository jobRepository;
private final Counter counter;

@Autowired
public JobProtobufImporter(JobRepository jobRepository, MeterRegistry meterRegistry) {
this.jobRepository = jobRepository;

this.counter = Counter.builder("zeebemonitor_importer_job").description("number of processed jobs").register(meterRegistry);
}

public void importJob(final Schema.JobRecord record) {

@@ -36,5 +46,7 @@ public void importJob(final Schema.JobRecord record) {
entity.setWorker(record.getWorker());
entity.setRetries(record.getRetries());
jobRepository.save(entity);

counter.increment();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.MessageEntity;
import io.zeebe.monitor.repository.MessageRepository;
@@ -10,7 +12,15 @@
@Component
public class MessageProtobufImporter {

@Autowired private MessageRepository messageRepository;
private final MessageRepository messageRepository;
private final Counter counter;

@Autowired
public MessageProtobufImporter(MessageRepository messageRepository, MeterRegistry meterRegistry) {
this.messageRepository = messageRepository;

this.counter = Counter.builder("zeebemonitor_importer_message").description("number of processed messages").register(meterRegistry);
}

public void importMessage(final Schema.MessageRecord record) {

@@ -35,5 +45,7 @@ public void importMessage(final Schema.MessageRecord record) {
entity.setState(intent.name().toLowerCase());
entity.setTimestamp(timestamp);
messageRepository.save(entity);

counter.increment();
}
}
Original file line number Diff line number Diff line change
@@ -2,6 +2,8 @@

import io.camunda.zeebe.protocol.record.intent.MessageStartEventSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.MessageSubscriptionEntity;
import io.zeebe.monitor.repository.MessageSubscriptionRepository;
@@ -12,7 +14,18 @@
@Component
public class MessageSubscriptionProtobufImporter {

@Autowired private MessageSubscriptionRepository messageSubscriptionRepository;
private final MessageSubscriptionRepository messageSubscriptionRepository;
private final Counter subsCounter;
private final Counter eventCounter;

@Autowired
public MessageSubscriptionProtobufImporter(MessageSubscriptionRepository messageSubscriptionRepository, MeterRegistry meterRegistry) {
this.messageSubscriptionRepository = messageSubscriptionRepository;

this.subsCounter =
Counter.builder("zeebemonitor_importer_message_subscription").description("number of processed message subscriptions").register(meterRegistry);
this.eventCounter = Counter.builder("zeebemonitor_importer_message_start_event_subscription").description("number of processed message start events").register(meterRegistry);
}

public void importMessageSubscription(final Schema.MessageSubscriptionRecord record) {

@@ -39,6 +52,9 @@ public void importMessageSubscription(final Schema.MessageSubscriptionRecord rec
entity.setState(intent.name().toLowerCase());
entity.setTimestamp(timestamp);
messageSubscriptionRepository.save(entity);


subsCounter.increment();
}

public void importMessageStartEventSubscription(
@@ -66,6 +82,8 @@ public void importMessageStartEventSubscription(
entity.setState(intent.name().toLowerCase());
entity.setTimestamp(timestamp);
messageSubscriptionRepository.save(entity);

eventCounter.increment();
}

private String generateId() {
Original file line number Diff line number Diff line change
@@ -3,6 +3,8 @@
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.ElementInstanceEntity;
import io.zeebe.monitor.entity.ProcessEntity;
@@ -17,11 +19,29 @@
@Component
public class ProcessAndElementProtobufImporter {

@Autowired private ProcessRepository processRepository;
@Autowired private ProcessInstanceRepository processInstanceRepository;
@Autowired private ElementInstanceRepository elementInstanceRepository;

@Autowired private ZeebeNotificationService notificationService;
private final ProcessRepository processRepository;
private final ProcessInstanceRepository processInstanceRepository;
private final ElementInstanceRepository elementInstanceRepository;
private final ZeebeNotificationService notificationService;
private final Counter processCounter;
private final Counter instanceActivatedCounter;
private final Counter instanceCompletedCounter;
private final Counter instanceTerminatedCounter;
private final Counter elementInstanceCounter;

@Autowired
public ProcessAndElementProtobufImporter(ProcessRepository processRepository, ProcessInstanceRepository processInstanceRepository, ElementInstanceRepository elementInstanceRepository, MeterRegistry meterRegistry, ZeebeNotificationService notificationService) {
this.processRepository = processRepository;
this.processInstanceRepository = processInstanceRepository;
this.elementInstanceRepository = elementInstanceRepository;
this.notificationService = notificationService;

this.processCounter = Counter.builder("zeebemonitor_importer_process").description("number of processed processes").register(meterRegistry);
this.instanceActivatedCounter = Counter.builder("zeebemonitor_importer_process_instance").tag("action", "activated").description("number of activated process instances").register(meterRegistry);
this.instanceCompletedCounter = Counter.builder("zeebemonitor_importer_process_instance").tag("action", "activated").description("number of activated process instances").register(meterRegistry);
this.instanceTerminatedCounter = Counter.builder("zeebemonitor_importer_process_instance").tag("action", "activated").description("number of activated process instances").register(meterRegistry);
this.elementInstanceCounter = Counter.builder("zeebemonitor_importer_element_instance").description("number of processed element_instances").register(meterRegistry);
}

public void importProcess(final Schema.ProcessRecord record) {
final int partitionId = record.getMetadata().getPartitionId();
@@ -38,6 +58,8 @@ public void importProcess(final Schema.ProcessRecord record) {
entity.setResource(record.getResource().toStringUtf8());
entity.setTimestamp(record.getMetadata().getTimestamp());
processRepository.save(entity);

processCounter.increment();
}

public void importProcessInstance(final Schema.ProcessInstanceRecord record) {
@@ -53,21 +75,17 @@ private void addOrUpdateProcessInstance(final Schema.ProcessInstanceRecord recor
final long timestamp = record.getMetadata().getTimestamp();
final long processInstanceKey = record.getProcessInstanceKey();

final ProcessInstanceEntity entity =
processInstanceRepository
.findById(processInstanceKey)
.orElseGet(
() -> {
final ProcessInstanceEntity newEntity = new ProcessInstanceEntity();
newEntity.setPartitionId(record.getMetadata().getPartitionId());
newEntity.setKey(processInstanceKey);
newEntity.setBpmnProcessId(record.getBpmnProcessId());
newEntity.setVersion(record.getVersion());
newEntity.setProcessDefinitionKey(record.getProcessDefinitionKey());
newEntity.setParentProcessInstanceKey(record.getParentProcessInstanceKey());
newEntity.setParentElementInstanceKey(record.getParentElementInstanceKey());
return newEntity;
});
final ProcessInstanceEntity entity = processInstanceRepository.findById(processInstanceKey).orElseGet(() -> {
final ProcessInstanceEntity newEntity = new ProcessInstanceEntity();
newEntity.setPartitionId(record.getMetadata().getPartitionId());
newEntity.setKey(processInstanceKey);
newEntity.setBpmnProcessId(record.getBpmnProcessId());
newEntity.setVersion(record.getVersion());
newEntity.setProcessDefinitionKey(record.getProcessDefinitionKey());
newEntity.setParentProcessInstanceKey(record.getParentProcessInstanceKey());
newEntity.setParentElementInstanceKey(record.getParentElementInstanceKey());
return newEntity;
});

if (intent == ProcessInstanceIntent.ELEMENT_ACTIVATED) {
entity.setState("Active");
@@ -77,6 +95,8 @@ private void addOrUpdateProcessInstance(final Schema.ProcessInstanceRecord recor
notificationService.sendCreatedProcessInstance(
record.getProcessInstanceKey(), record.getProcessDefinitionKey());

instanceActivatedCounter.increment();

} else if (intent == ProcessInstanceIntent.ELEMENT_COMPLETED) {
entity.setState("Completed");
entity.setEnd(timestamp);
@@ -85,13 +105,17 @@ private void addOrUpdateProcessInstance(final Schema.ProcessInstanceRecord recor
notificationService.sendEndedProcessInstance(
record.getProcessInstanceKey(), record.getProcessDefinitionKey());

instanceCompletedCounter.increment();

} else if (intent == ProcessInstanceIntent.ELEMENT_TERMINATED) {
entity.setState("Terminated");
entity.setEnd(timestamp);
processInstanceRepository.save(entity);

notificationService.sendEndedProcessInstance(
record.getProcessInstanceKey(), record.getProcessDefinitionKey());

instanceTerminatedCounter.increment();
}
}

@@ -111,6 +135,8 @@ private void addElementInstance(final Schema.ProcessInstanceRecord record) {
elementInstanceRepository.save(entity);
notificationService.sendUpdatedProcessInstance(
record.getProcessInstanceKey(), record.getProcessDefinitionKey());

elementInstanceCounter.increment();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.camunda.zeebe.protocol.record.intent.TimerIntent;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.TimerEntity;
import io.zeebe.monitor.repository.TimerRepository;
@@ -10,7 +12,15 @@
@Component
public class TimerProtobufImporter {

@Autowired private TimerRepository timerRepository;
private final TimerRepository timerRepository;
private final Counter timerCounter;

@Autowired
public TimerProtobufImporter(TimerRepository timerRepository, MeterRegistry meterRegistry) {
this.timerRepository = timerRepository;

this.timerCounter = Counter.builder("zeebemonitor_importer_timer").description("number of processed timers").register(meterRegistry);
}

public void importTimer(final Schema.TimerRecord record) {

@@ -41,5 +51,7 @@ public void importTimer(final Schema.TimerRecord record) {
entity.setState(intent.name().toLowerCase());
entity.setTimestamp(timestamp);
timerRepository.save(entity);

timerCounter.increment();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.zeebe.monitor.zeebe.protobuf.importers;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.zeebe.exporter.proto.Schema;
import io.zeebe.monitor.entity.VariableEntity;
import io.zeebe.monitor.repository.VariableRepository;
@@ -9,7 +11,17 @@
@Component
public class VariableProtobufImporter {

@Autowired private VariableRepository variableRepository;
private final VariableRepository variableRepository;
private final Counter variableCreatedCounter;
private final Counter variableUpdatedCounter;

@Autowired
public VariableProtobufImporter(VariableRepository variableRepository, MeterRegistry meterRegistry) {
this.variableRepository = variableRepository;

this.variableCreatedCounter = Counter.builder("zeebemonitor_importer_variable").tag("action", "imported").description("number of processed variables").register(meterRegistry);
this.variableUpdatedCounter = Counter.builder("zeebemonitor_importer_variable").tag("action", "updated").description("number of processed variables").register(meterRegistry);
}

public void importVariable(final Schema.VariableRecord record) {
final VariableEntity newVariable = new VariableEntity();
@@ -23,6 +35,12 @@ public void importVariable(final Schema.VariableRecord record) {
newVariable.setScopeKey(record.getScopeKey());
newVariable.setState(record.getMetadata().getIntent().toLowerCase());
variableRepository.save(newVariable);

if (newVariable.getState().equals("updated")) {
variableUpdatedCounter.increment();
} else {
variableCreatedCounter.increment();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.zeebe.monitor.repository;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -50,6 +52,11 @@ public PlatformTransactionManager transactionManager(@Autowired LocalContainerEn
return transactionManager;
}

@Bean
public MeterRegistry meterRegistry() {
return new SimpleMeterRegistry();
}

private Properties getAdditionalJpaProperties() {
Properties p = new Properties();
p.setProperty("database-platform", "org.hibernate.dialect.H2Dialect");

0 comments on commit 5bce254

Please sign in to comment.