From 14fc6e4250ade9217583eee80bee0dbd68b1747a Mon Sep 17 00:00:00 2001 From: Michael Holtermann Date: Fri, 26 Apr 2024 11:42:44 +0200 Subject: [PATCH 1/9] count imported instances for the Hazelcast import --- README.md | 9 +++++++++ .../protobuf/importers/ErrorProtobufImporter.java | 5 +++++ .../importers/IncidentProtobufImporter.java | 7 +++++++ .../protobuf/importers/JobProtobufImporter.java | 5 +++++ .../protobuf/importers/MessageProtobufImporter.java | 5 +++++ .../MessageSubscriptionProtobufImporter.java | 7 +++++++ .../ProcessAndElementProtobufImporter.java | 13 +++++++++++++ .../protobuf/importers/TimerProtobufImporter.java | 5 +++++ .../importers/VariableProtobufImporter.java | 5 +++++ .../repository/TestContextJpaConfiguration.java | 7 +++++++ 10 files changed, 68 insertions(+) diff --git a/README.md b/README.md index 0749ac7b..65340509 100644 --- a/README.md +++ b/README.md @@ -297,6 +297,15 @@ Refer to the [docker-compose file](docker/docker-compose.yml) for a sample confi Please be aware that when connecting to a Redis cluster you must activate the `useClusterClient` option. +## Metrics +The monitor exports a couple of metrics via the usual `/actuator/prometheus` endpoint. + +In addition to the default metrics that are available via Spring Boot, there are some metrics exported specific +- for the import process (e.g. number of imported process instances) +- Hazelcast's ringbuffer. + +All metrics are prefixed with `zeebemonitor_importer`. + ## Code of Conduct This project adheres to the Contributor Covenant [Code of diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java index 136fe5bc..e882ff3a 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java @@ -1,5 +1,7 @@ package io.zeebe.monitor.zeebe.protobuf.importers; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; import io.zeebe.exporter.proto.Schema; import io.zeebe.monitor.entity.ErrorEntity; import io.zeebe.monitor.repository.ErrorRepository; @@ -10,6 +12,7 @@ public class ErrorProtobufImporter { @Autowired private ErrorRepository errorRepository; + @Autowired private MeterRegistry meterRegistry; public void importError(final Schema.ErrorRecord record) { @@ -32,5 +35,7 @@ public void importError(final Schema.ErrorRecord record) { }); errorRepository.save(entity); + + Counter.builder("zeebemonitor_importer_error_imported").description("number of processed errors").register(meterRegistry).increment(); } } diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java index 79057c1b..4e533953 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java @@ -1,6 +1,8 @@ package io.zeebe.monitor.zeebe.protobuf.importers; import io.camunda.zeebe.protocol.record.intent.IncidentIntent; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; import io.zeebe.exporter.proto.Schema; import io.zeebe.monitor.entity.IncidentEntity; import io.zeebe.monitor.repository.IncidentRepository; @@ -11,6 +13,7 @@ public class IncidentProtobufImporter { @Autowired private IncidentRepository incidentRepository; + @Autowired private MeterRegistry meterRegistry; public void importIncident(final Schema.IncidentRecord record) { @@ -39,9 +42,13 @@ public void importIncident(final Schema.IncidentRecord record) { entity.setCreated(timestamp); incidentRepository.save(entity); + Counter.builder("zeebemonitor_importer_incident").tag("action", "created").description("number of processed incidents").register(meterRegistry).increment(); + } else if (intent == IncidentIntent.RESOLVED) { entity.setResolved(timestamp); incidentRepository.save(entity); + + Counter.builder("zeebemonitor_importer_incident").tag("action", "resolved").description("number of processed incidents").register(meterRegistry).increment(); } } } diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java index 401a5ae5..6caa0482 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java @@ -1,6 +1,8 @@ package io.zeebe.monitor.zeebe.protobuf.importers; import io.camunda.zeebe.protocol.record.intent.JobIntent; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; import io.zeebe.exporter.proto.Schema; import io.zeebe.monitor.entity.JobEntity; import io.zeebe.monitor.repository.JobRepository; @@ -11,6 +13,7 @@ public class JobProtobufImporter { @Autowired private JobRepository jobRepository; + @Autowired private MeterRegistry meterRegistry; public void importJob(final Schema.JobRecord record) { @@ -36,5 +39,7 @@ public void importJob(final Schema.JobRecord record) { entity.setWorker(record.getWorker()); entity.setRetries(record.getRetries()); jobRepository.save(entity); + + Counter.builder("zeebemonitor_importer_job").tag("action", "imported").tag("state", entity.getState()).description("number of processed jobs").register(meterRegistry).increment(); } } diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java index cd3154b6..49c9460a 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java @@ -1,6 +1,8 @@ package io.zeebe.monitor.zeebe.protobuf.importers; import io.camunda.zeebe.protocol.record.intent.MessageIntent; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; import io.zeebe.exporter.proto.Schema; import io.zeebe.monitor.entity.MessageEntity; import io.zeebe.monitor.repository.MessageRepository; @@ -11,6 +13,7 @@ public class MessageProtobufImporter { @Autowired private MessageRepository messageRepository; + @Autowired private MeterRegistry meterRegistry; public void importMessage(final Schema.MessageRecord record) { @@ -35,5 +38,7 @@ public void importMessage(final Schema.MessageRecord record) { entity.setState(intent.name().toLowerCase()); entity.setTimestamp(timestamp); messageRepository.save(entity); + + Counter.builder("zeebemonitor_importer_message").tag("action", "imported").tag("state", intent.name().toLowerCase()).description("number of processed messages").register(meterRegistry).increment(); } } diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java index 7d513661..c18b9dbf 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java @@ -2,6 +2,8 @@ import io.camunda.zeebe.protocol.record.intent.MessageStartEventSubscriptionIntent; import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; import io.zeebe.exporter.proto.Schema; import io.zeebe.monitor.entity.MessageSubscriptionEntity; import io.zeebe.monitor.repository.MessageSubscriptionRepository; @@ -13,6 +15,7 @@ public class MessageSubscriptionProtobufImporter { @Autowired private MessageSubscriptionRepository messageSubscriptionRepository; + @Autowired private MeterRegistry meterRegistry; public void importMessageSubscription(final Schema.MessageSubscriptionRecord record) { @@ -39,6 +42,8 @@ public void importMessageSubscription(final Schema.MessageSubscriptionRecord rec entity.setState(intent.name().toLowerCase()); entity.setTimestamp(timestamp); messageSubscriptionRepository.save(entity); + + Counter.builder("zeebemonitor_importer_message_subscription").tag("action", "imported").tag("state", entity.getState()).description("number of processed message subscriptions").register(meterRegistry).increment(); } public void importMessageStartEventSubscription( @@ -66,6 +71,8 @@ public void importMessageStartEventSubscription( entity.setState(intent.name().toLowerCase()); entity.setTimestamp(timestamp); messageSubscriptionRepository.save(entity); + + Counter.builder("zeebemonitor_importer_message_subscription").tag("action", "imported").tag("state", entity.getState()).description("number of processed message start events").register(meterRegistry).increment(); } private String generateId() { diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java index df8c3c7a..3c336e86 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java @@ -3,6 +3,8 @@ import io.camunda.zeebe.protocol.Protocol; import io.camunda.zeebe.protocol.record.intent.Intent; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; import io.zeebe.exporter.proto.Schema; import io.zeebe.monitor.entity.ElementInstanceEntity; import io.zeebe.monitor.entity.ProcessEntity; @@ -20,6 +22,7 @@ public class ProcessAndElementProtobufImporter { @Autowired private ProcessRepository processRepository; @Autowired private ProcessInstanceRepository processInstanceRepository; @Autowired private ElementInstanceRepository elementInstanceRepository; + @Autowired private MeterRegistry meterRegistry; @Autowired private ZeebeNotificationService notificationService; @@ -38,6 +41,8 @@ public void importProcess(final Schema.ProcessRecord record) { entity.setResource(record.getResource().toStringUtf8()); entity.setTimestamp(record.getMetadata().getTimestamp()); processRepository.save(entity); + + Counter.builder("zeebemonitor_importer_process").tag("action", "imported").description("number of processed processes").register(meterRegistry).increment(); } public void importProcessInstance(final Schema.ProcessInstanceRecord record) { @@ -77,6 +82,8 @@ private void addOrUpdateProcessInstance(final Schema.ProcessInstanceRecord recor notificationService.sendCreatedProcessInstance( record.getProcessInstanceKey(), record.getProcessDefinitionKey()); + Counter.builder("zeebemonitor_importer_process_instance").tag("action", "activated").description("number of activated process instances").register(meterRegistry).increment(); + } else if (intent == ProcessInstanceIntent.ELEMENT_COMPLETED) { entity.setState("Completed"); entity.setEnd(timestamp); @@ -85,6 +92,8 @@ private void addOrUpdateProcessInstance(final Schema.ProcessInstanceRecord recor notificationService.sendEndedProcessInstance( record.getProcessInstanceKey(), record.getProcessDefinitionKey()); + Counter.builder("zeebemonitor_importer_process_instance").tag("action", "completed").description("number of processed process instances").register(meterRegistry).increment(); + } else if (intent == ProcessInstanceIntent.ELEMENT_TERMINATED) { entity.setState("Terminated"); entity.setEnd(timestamp); @@ -92,6 +101,8 @@ private void addOrUpdateProcessInstance(final Schema.ProcessInstanceRecord recor notificationService.sendEndedProcessInstance( record.getProcessInstanceKey(), record.getProcessDefinitionKey()); + + Counter.builder("zeebemonitor_importer_process_instance").tag("action", "terminated").description("number of processed process instances").register(meterRegistry).increment(); } } @@ -111,6 +122,8 @@ private void addElementInstance(final Schema.ProcessInstanceRecord record) { elementInstanceRepository.save(entity); notificationService.sendUpdatedProcessInstance( record.getProcessInstanceKey(), record.getProcessDefinitionKey()); + + Counter.builder("zeebemonitor_importer_element_instance").tag("action", "imported").tag("type", entity.getBpmnElementType()).description("number of processed element_instances").register(meterRegistry).increment(); } } } diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java index 06bd1973..0d8e96d9 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java @@ -1,6 +1,8 @@ package io.zeebe.monitor.zeebe.protobuf.importers; import io.camunda.zeebe.protocol.record.intent.TimerIntent; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; import io.zeebe.exporter.proto.Schema; import io.zeebe.monitor.entity.TimerEntity; import io.zeebe.monitor.repository.TimerRepository; @@ -11,6 +13,7 @@ public class TimerProtobufImporter { @Autowired private TimerRepository timerRepository; + @Autowired private MeterRegistry meterRegistry; public void importTimer(final Schema.TimerRecord record) { @@ -41,5 +44,7 @@ public void importTimer(final Schema.TimerRecord record) { entity.setState(intent.name().toLowerCase()); entity.setTimestamp(timestamp); timerRepository.save(entity); + + Counter.builder("zeebemonitor_importer_timer").tag("action", "imported").tag("state", entity.getState()).description("number of processed timers").register(meterRegistry).increment(); } } diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java index 3cdef87c..0f5aeb6e 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java @@ -1,5 +1,7 @@ package io.zeebe.monitor.zeebe.protobuf.importers; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; import io.zeebe.exporter.proto.Schema; import io.zeebe.monitor.entity.VariableEntity; import io.zeebe.monitor.repository.VariableRepository; @@ -10,6 +12,7 @@ public class VariableProtobufImporter { @Autowired private VariableRepository variableRepository; + @Autowired private MeterRegistry meterRegistry; public void importVariable(final Schema.VariableRecord record) { final VariableEntity newVariable = new VariableEntity(); @@ -23,6 +26,8 @@ public void importVariable(final Schema.VariableRecord record) { newVariable.setScopeKey(record.getScopeKey()); newVariable.setState(record.getMetadata().getIntent().toLowerCase()); variableRepository.save(newVariable); + + Counter.builder("zeebemonitor_importer_variable").tag("action", "imported").tag("state", newVariable.getState()).description("number of processed variables").register(meterRegistry).increment(); } } } diff --git a/src/test/java/io/zeebe/monitor/repository/TestContextJpaConfiguration.java b/src/test/java/io/zeebe/monitor/repository/TestContextJpaConfiguration.java index 8020de67..15f9f041 100644 --- a/src/test/java/io/zeebe/monitor/repository/TestContextJpaConfiguration.java +++ b/src/test/java/io/zeebe/monitor/repository/TestContextJpaConfiguration.java @@ -1,5 +1,7 @@ package io.zeebe.monitor.repository; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -50,6 +52,11 @@ public PlatformTransactionManager transactionManager(@Autowired LocalContainerEn return transactionManager; } + @Bean + public MeterRegistry meterRegistry() { + return new SimpleMeterRegistry(); + } + private Properties getAdditionalJpaProperties() { Properties p = new Properties(); p.setProperty("database-platform", "org.hibernate.dialect.H2Dialect"); From 014b31eab867673febfecca0f7b73ea37f815909 Mon Sep 17 00:00:00 2001 From: Michael Holtermann Date: Fri, 26 Apr 2024 12:08:27 +0200 Subject: [PATCH 2/9] measure reliability of the Hazelcast import process --- .../hazelcast/HazelcastImportService.java | 20 ++---- .../importers/HazelcastStateService.java | 62 +++++++++++++++++++ 2 files changed, 66 insertions(+), 16 deletions(-) create mode 100644 src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/HazelcastStateService.java diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java b/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java index f0fb258d..7809ae60 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java +++ b/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java @@ -33,18 +33,6 @@ public class HazelcastImportService { @Autowired private HazelcastConfigRepository hazelcastConfigRepository; public ZeebeHazelcast importFrom(final HazelcastInstance hazelcast) { - - final var hazelcastConfig = - hazelcastConfigRepository - .findById("cfg") - .orElseGet( - () -> { - final var config = new HazelcastConfig(); - config.setId("cfg"); - config.setSequence(-1); - return config; - }); - final var builder = ZeebeHazelcast.newBuilder(hazelcast) .addProcessListener( @@ -80,12 +68,12 @@ record -> .addErrorListener(errorImporter::importError) .postProcessListener( sequence -> { - hazelcastConfig.setSequence(sequence); - hazelcastConfigRepository.save(hazelcastConfig); + hazelcastStateService.saveSequenceNumber(sequence); }); - if (hazelcastConfig.getSequence() >= 0) { - builder.readFrom(hazelcastConfig.getSequence()); + final var lastSequence = hazelcastStateService.getLastSequenceNumber(); + if (lastSequence >= 0) { + builder.readFrom(lastSequence); } else { builder.readFromHead(); } diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/HazelcastStateService.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/HazelcastStateService.java new file mode 100644 index 00000000..8c455263 --- /dev/null +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/HazelcastStateService.java @@ -0,0 +1,62 @@ +package io.zeebe.monitor.zeebe.hazelcast.importers; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.zeebe.monitor.entity.HazelcastConfig; +import io.zeebe.monitor.repository.HazelcastConfigRepository; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +/** + * The HazelcastStateService manages the current pointer of the Hazelcast import process. + * + * That pointer is required to read the next-relevant message from the RingBuffer + * + * Usually, that RingBuffer is read 1 by 1, but sometimes, the RingBuffer may overrun by the export process, + * and in that case, the Import process will set the sequence to the current position of the RingBuffer. + */ +@Component +public class HazelcastStateService { + + private final HazelcastConfigRepository hazelcastConfigRepository; + private final Counter sequenceCounter; + + @Autowired + public HazelcastStateService(HazelcastConfigRepository hazelcastConfigRepository, MeterRegistry meterRegistry) { + this.hazelcastConfigRepository = hazelcastConfigRepository; + + sequenceCounter = Counter.builder("zeebemonitor_importer_ringbuffer_sequences_read"). + description("number of items read from Hazelcast's ringbuffer (sequence counter)"). + register(meterRegistry); + } + + public long getLastSequenceNumber() { + return getHazelcastConfig().getSequence(); + } + + @Transactional + public void saveSequenceNumber(long sequence) { + HazelcastConfig config = getHazelcastConfig(); + + long prev = config.getSequence(); + + config.setSequence(sequence); + + hazelcastConfigRepository.save(config); + + sequenceCounter.increment(sequence - prev); + } + + private HazelcastConfig getHazelcastConfig() { + return hazelcastConfigRepository + .findById("cfg") + .orElseGet( + () -> { + final var config = new HazelcastConfig(); + config.setId("cfg"); + config.setSequence(-1); + return config; + }); + } +} From c8a18fc2615edd162f720527e03d572e71bceaf1 Mon Sep 17 00:00:00 2001 From: Michael Holtermann Date: Fri, 26 Apr 2024 12:09:15 +0200 Subject: [PATCH 3/9] measure reliability of the Hazelcast import process --- .../zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java | 4 +--- .../zeebe/protobuf/importers/HazelcastStateService.java | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java b/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java index 7809ae60..8efc6e2d 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java +++ b/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java @@ -67,9 +67,7 @@ record -> messageSubscriptionImporter::importMessageStartEventSubscription)) .addErrorListener(errorImporter::importError) .postProcessListener( - sequence -> { - hazelcastStateService.saveSequenceNumber(sequence); - }); + sequence -> hazelcastStateService.saveSequenceNumber(sequence)); final var lastSequence = hazelcastStateService.getLastSequenceNumber(); if (lastSequence >= 0) { diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/HazelcastStateService.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/HazelcastStateService.java index 8c455263..0feb47d9 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/HazelcastStateService.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/HazelcastStateService.java @@ -10,9 +10,9 @@ /** * The HazelcastStateService manages the current pointer of the Hazelcast import process. - * + *

* That pointer is required to read the next-relevant message from the RingBuffer - * + *

* Usually, that RingBuffer is read 1 by 1, but sometimes, the RingBuffer may overrun by the export process, * and in that case, the Import process will set the sequence to the current position of the RingBuffer. */ From a3cee0d75bca2f8b1954dfbd22a05d1cf1f510fe Mon Sep 17 00:00:00 2001 From: Michael Holtermann Date: Fri, 26 Apr 2024 12:09:21 +0200 Subject: [PATCH 4/9] typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 65340509..dcdd2bd5 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ The application imports the data from Zeebe using the [Hazelcast exporter](https ### Upgrading from a prior version -See the [upgrade instructions](./UPGRADE.md). +See the [upgrade instructions](UPGRADE.md). ### Docker From 3e8c8c0bf87d13cf57617056c485459edbe472fd Mon Sep 17 00:00:00 2001 From: Michael Holtermann Date: Fri, 26 Apr 2024 13:10:22 +0200 Subject: [PATCH 5/9] switch to constructor-based injection, create fields for counters --- .../importers/ErrorProtobufImporter.java | 14 +++- .../importers/IncidentProtobufImporter.java | 17 +++- .../importers/JobProtobufImporter.java | 12 ++- .../importers/MessageProtobufImporter.java | 13 +++- .../MessageSubscriptionProtobufImporter.java | 18 ++++- .../ProcessAndElementProtobufImporter.java | 77 +++++++++++-------- .../importers/TimerProtobufImporter.java | 12 ++- .../importers/VariableProtobufImporter.java | 18 ++++- 8 files changed, 124 insertions(+), 57 deletions(-) diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java index e882ff3a..130cd88b 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java @@ -11,8 +11,16 @@ @Component public class ErrorProtobufImporter { - @Autowired private ErrorRepository errorRepository; - @Autowired private MeterRegistry meterRegistry; + private final Counter counter; + + private final ErrorRepository errorRepository; + + @Autowired + public ErrorHazelcastImporter(ErrorRepository errorRepository, MeterRegistry meterRegistry) { + this.errorRepository = errorRepository; + + this.counter = Counter.builder("zeebemonitor_importer_error").description("number of processed errors").register(meterRegistry); + } public void importError(final Schema.ErrorRecord record) { @@ -36,6 +44,6 @@ public void importError(final Schema.ErrorRecord record) { errorRepository.save(entity); - Counter.builder("zeebemonitor_importer_error_imported").description("number of processed errors").register(meterRegistry).increment(); + counter.increment(); } } diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java index 4e533953..637d2cd0 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java @@ -12,8 +12,17 @@ @Component public class IncidentProtobufImporter { - @Autowired private IncidentRepository incidentRepository; - @Autowired private MeterRegistry meterRegistry; + private final IncidentRepository incidentRepository; + private final Counter createdCounter; + private final Counter resolvedCounter; + + @Autowired + public IncidentHazelcastImporter(IncidentRepository incidentRepository, MeterRegistry meterRegistry) { + this.incidentRepository = incidentRepository; + + createdCounter = Counter.builder("zeebemonitor_importer_incident").tag("action", "created").description("number of processed incidents").register(meterRegistry); + resolvedCounter = Counter.builder("zeebemonitor_importer_incident").tag("action", "resolved").description("number of processed incidents").register(meterRegistry); + } public void importIncident(final Schema.IncidentRecord record) { @@ -42,13 +51,13 @@ public void importIncident(final Schema.IncidentRecord record) { entity.setCreated(timestamp); incidentRepository.save(entity); - Counter.builder("zeebemonitor_importer_incident").tag("action", "created").description("number of processed incidents").register(meterRegistry).increment(); + createdCounter.increment(); } else if (intent == IncidentIntent.RESOLVED) { entity.setResolved(timestamp); incidentRepository.save(entity); - Counter.builder("zeebemonitor_importer_incident").tag("action", "resolved").description("number of processed incidents").register(meterRegistry).increment(); + resolvedCounter.increment(); } } } diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java index 6caa0482..9385351e 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java @@ -12,8 +12,14 @@ @Component public class JobProtobufImporter { - @Autowired private JobRepository jobRepository; - @Autowired private MeterRegistry meterRegistry; + private final JobRepository jobRepository; + private final Counter counter; + + public JobHazelcastImporter(JobRepository jobRepository, MeterRegistry meterRegistry) { + this.jobRepository = jobRepository; + + counter = Counter.builder("zeebemonitor_importer_job").description("number of processed jobs").register(meterRegistry); + } public void importJob(final Schema.JobRecord record) { @@ -40,6 +46,6 @@ public void importJob(final Schema.JobRecord record) { entity.setRetries(record.getRetries()); jobRepository.save(entity); - Counter.builder("zeebemonitor_importer_job").tag("action", "imported").tag("state", entity.getState()).description("number of processed jobs").register(meterRegistry).increment(); + counter.increment(); } } diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java index 49c9460a..14ed4bef 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java @@ -12,8 +12,15 @@ @Component public class MessageProtobufImporter { - @Autowired private MessageRepository messageRepository; - @Autowired private MeterRegistry meterRegistry; + private final MessageRepository messageRepository; + private final Counter counter; + + @Autowired + public MessageHazelcastImporter(MessageRepository messageRepository, MeterRegistry meterRegistry) { + this.messageRepository = messageRepository; + + this.counter = Counter.builder("zeebemonitor_importer_message").description("number of processed messages").register(meterRegistry); + } public void importMessage(final Schema.MessageRecord record) { @@ -39,6 +46,6 @@ public void importMessage(final Schema.MessageRecord record) { entity.setTimestamp(timestamp); messageRepository.save(entity); - Counter.builder("zeebemonitor_importer_message").tag("action", "imported").tag("state", intent.name().toLowerCase()).description("number of processed messages").register(meterRegistry).increment(); + counter.increment(); } } diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java index c18b9dbf..a02864b4 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java @@ -14,8 +14,17 @@ @Component public class MessageSubscriptionProtobufImporter { - @Autowired private MessageSubscriptionRepository messageSubscriptionRepository; - @Autowired private MeterRegistry meterRegistry; + private final MessageSubscriptionRepository messageSubscriptionRepository; + private final Counter subsCounter; + private final Counter eventCounter; + + public MessageSubscriptionHazelcastImporter(MessageSubscriptionRepository messageSubscriptionRepository, MeterRegistry meterRegistry) { + this.messageSubscriptionRepository = messageSubscriptionRepository; + + this.subsCounter = + Counter.builder("zeebemonitor_importer_message_subscription").description("number of processed message subscriptions").register(meterRegistry); + this.eventCounter = Counter.builder("zeebemonitor_importer_message_start_event_subscription").description("number of processed message start events").register(meterRegistry); + } public void importMessageSubscription(final Schema.MessageSubscriptionRecord record) { @@ -43,7 +52,8 @@ public void importMessageSubscription(final Schema.MessageSubscriptionRecord rec entity.setTimestamp(timestamp); messageSubscriptionRepository.save(entity); - Counter.builder("zeebemonitor_importer_message_subscription").tag("action", "imported").tag("state", entity.getState()).description("number of processed message subscriptions").register(meterRegistry).increment(); + + subsCounter.increment(); } public void importMessageStartEventSubscription( @@ -72,7 +82,7 @@ public void importMessageStartEventSubscription( entity.setTimestamp(timestamp); messageSubscriptionRepository.save(entity); - Counter.builder("zeebemonitor_importer_message_subscription").tag("action", "imported").tag("state", entity.getState()).description("number of processed message start events").register(meterRegistry).increment(); + eventCounter.increment(); } private String generateId() { diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java index 3c336e86..0b96404a 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java @@ -19,12 +19,29 @@ @Component public class ProcessAndElementProtobufImporter { - @Autowired private ProcessRepository processRepository; - @Autowired private ProcessInstanceRepository processInstanceRepository; - @Autowired private ElementInstanceRepository elementInstanceRepository; - @Autowired private MeterRegistry meterRegistry; - - @Autowired private ZeebeNotificationService notificationService; + private final ProcessRepository processRepository; + private final ProcessInstanceRepository processInstanceRepository; + private final ElementInstanceRepository elementInstanceRepository; + private final ZeebeNotificationService notificationService; + private final Counter processCounter; + private final Counter instanceActivatedCounter; + private final Counter instanceCompletedCounter; + private final Counter instanceTerminatedCounter; + private final Counter elementInstanceCounter; + + @Autowired + public ProcessAndElementHazelcastImporter(ProcessRepository processRepository, ProcessInstanceRepository processInstanceRepository, ElementInstanceRepository elementInstanceRepository, MeterRegistry meterRegistry, ZeebeNotificationService notificationService) { + this.processRepository = processRepository; + this.processInstanceRepository = processInstanceRepository; + this.elementInstanceRepository = elementInstanceRepository; + this.notificationService = notificationService; + + this.processCounter = Counter.builder("zeebemonitor_importer_process").description("number of processed processes").register(meterRegistry); + this.instanceActivatedCounter = Counter.builder("zeebemonitor_importer_process_instance").tag("action", "activated").description("number of activated process instances").register(meterRegistry); + this.instanceCompletedCounter = Counter.builder("zeebemonitor_importer_process_instance").tag("action", "activated").description("number of activated process instances").register(meterRegistry); + this.instanceTerminatedCounter = Counter.builder("zeebemonitor_importer_process_instance").tag("action", "activated").description("number of activated process instances").register(meterRegistry); + this.elementInstanceCounter = Counter.builder("zeebemonitor_importer_element_instance").description("number of processed element_instances").register(meterRegistry); + } public void importProcess(final Schema.ProcessRecord record) { final int partitionId = record.getMetadata().getPartitionId(); @@ -42,7 +59,7 @@ public void importProcess(final Schema.ProcessRecord record) { entity.setTimestamp(record.getMetadata().getTimestamp()); processRepository.save(entity); - Counter.builder("zeebemonitor_importer_process").tag("action", "imported").description("number of processed processes").register(meterRegistry).increment(); + processCounter.increment(); } public void importProcessInstance(final Schema.ProcessInstanceRecord record) { @@ -58,51 +75,44 @@ private void addOrUpdateProcessInstance(final Schema.ProcessInstanceRecord recor final long timestamp = record.getMetadata().getTimestamp(); final long processInstanceKey = record.getProcessInstanceKey(); - final ProcessInstanceEntity entity = - processInstanceRepository - .findById(processInstanceKey) - .orElseGet( - () -> { - final ProcessInstanceEntity newEntity = new ProcessInstanceEntity(); - newEntity.setPartitionId(record.getMetadata().getPartitionId()); - newEntity.setKey(processInstanceKey); - newEntity.setBpmnProcessId(record.getBpmnProcessId()); - newEntity.setVersion(record.getVersion()); - newEntity.setProcessDefinitionKey(record.getProcessDefinitionKey()); - newEntity.setParentProcessInstanceKey(record.getParentProcessInstanceKey()); - newEntity.setParentElementInstanceKey(record.getParentElementInstanceKey()); - return newEntity; - }); + final ProcessInstanceEntity entity = processInstanceRepository.findById(processInstanceKey).orElseGet(() -> { + final ProcessInstanceEntity newEntity = new ProcessInstanceEntity(); + newEntity.setPartitionId(record.getMetadata().getPartitionId()); + newEntity.setKey(processInstanceKey); + newEntity.setBpmnProcessId(record.getBpmnProcessId()); + newEntity.setVersion(record.getVersion()); + newEntity.setProcessDefinitionKey(record.getProcessDefinitionKey()); + newEntity.setParentProcessInstanceKey(record.getParentProcessInstanceKey()); + newEntity.setParentElementInstanceKey(record.getParentElementInstanceKey()); + return newEntity; + }); if (intent == ProcessInstanceIntent.ELEMENT_ACTIVATED) { entity.setState("Active"); entity.setStart(timestamp); processInstanceRepository.save(entity); - notificationService.sendCreatedProcessInstance( - record.getProcessInstanceKey(), record.getProcessDefinitionKey()); + notificationService.sendCreatedProcessInstance(record.getProcessInstanceKey(), record.getProcessDefinitionKey()); - Counter.builder("zeebemonitor_importer_process_instance").tag("action", "activated").description("number of activated process instances").register(meterRegistry).increment(); + instanceActivatedCounter.increment(); } else if (intent == ProcessInstanceIntent.ELEMENT_COMPLETED) { entity.setState("Completed"); entity.setEnd(timestamp); processInstanceRepository.save(entity); - notificationService.sendEndedProcessInstance( - record.getProcessInstanceKey(), record.getProcessDefinitionKey()); + notificationService.sendEndedProcessInstance(record.getProcessInstanceKey(), record.getProcessDefinitionKey()); - Counter.builder("zeebemonitor_importer_process_instance").tag("action", "completed").description("number of processed process instances").register(meterRegistry).increment(); + instanceCompletedCounter.increment(); } else if (intent == ProcessInstanceIntent.ELEMENT_TERMINATED) { entity.setState("Terminated"); entity.setEnd(timestamp); processInstanceRepository.save(entity); - notificationService.sendEndedProcessInstance( - record.getProcessInstanceKey(), record.getProcessDefinitionKey()); + notificationService.sendEndedProcessInstance(record.getProcessInstanceKey(), record.getProcessDefinitionKey()); - Counter.builder("zeebemonitor_importer_process_instance").tag("action", "terminated").description("number of processed process instances").register(meterRegistry).increment(); + instanceTerminatedCounter.increment(); } } @@ -120,10 +130,9 @@ private void addElementInstance(final Schema.ProcessInstanceRecord record) { entity.setProcessDefinitionKey(record.getProcessDefinitionKey()); entity.setBpmnElementType(record.getBpmnElementType()); elementInstanceRepository.save(entity); - notificationService.sendUpdatedProcessInstance( - record.getProcessInstanceKey(), record.getProcessDefinitionKey()); + notificationService.sendUpdatedProcessInstance(record.getProcessInstanceKey(), record.getProcessDefinitionKey()); - Counter.builder("zeebemonitor_importer_element_instance").tag("action", "imported").tag("type", entity.getBpmnElementType()).description("number of processed element_instances").register(meterRegistry).increment(); + elementInstanceCounter.increment(); } } } diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java index 0d8e96d9..0e4bff26 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java @@ -12,8 +12,14 @@ @Component public class TimerProtobufImporter { - @Autowired private TimerRepository timerRepository; - @Autowired private MeterRegistry meterRegistry; + private final TimerRepository timerRepository; + private final Counter timerCounter; + + public TimerHazelcastImporter(TimerRepository timerRepository, MeterRegistry meterRegistry) { + this.timerRepository = timerRepository; + + this.timerCounter = Counter.builder("zeebemonitor_importer_timer").description("number of processed timers").register(meterRegistry); + } public void importTimer(final Schema.TimerRecord record) { @@ -45,6 +51,6 @@ public void importTimer(final Schema.TimerRecord record) { entity.setTimestamp(timestamp); timerRepository.save(entity); - Counter.builder("zeebemonitor_importer_timer").tag("action", "imported").tag("state", entity.getState()).description("number of processed timers").register(meterRegistry).increment(); + timerCounter.increment(); } } diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java index 0f5aeb6e..6aaeb8ee 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java @@ -11,8 +11,16 @@ @Component public class VariableProtobufImporter { - @Autowired private VariableRepository variableRepository; - @Autowired private MeterRegistry meterRegistry; + private final VariableRepository variableRepository; + private final Counter variableCreatedCounter; + private final Counter variableUpdatedCounter; + + public VariableHazelcastImporter(VariableRepository variableRepository, MeterRegistry meterRegistry) { + this.variableRepository = variableRepository; + + this.variableCreatedCounter = Counter.builder("zeebemonitor_importer_variable").tag("action", "imported").description("number of processed variables").register(meterRegistry); + this.variableUpdatedCounter = Counter.builder("zeebemonitor_importer_variable").tag("action", "updated").description("number of processed variables").register(meterRegistry); + } public void importVariable(final Schema.VariableRecord record) { final VariableEntity newVariable = new VariableEntity(); @@ -27,7 +35,11 @@ public void importVariable(final Schema.VariableRecord record) { newVariable.setState(record.getMetadata().getIntent().toLowerCase()); variableRepository.save(newVariable); - Counter.builder("zeebemonitor_importer_variable").tag("action", "imported").tag("state", newVariable.getState()).description("number of processed variables").register(meterRegistry).increment(); + if (newVariable.getState().equals("updated")) { + variableUpdatedCounter.increment(); + } else { + variableCreatedCounter.increment(); + } } } } From cdebc5c6265e9cc430bade99b7161fc086565256 Mon Sep 17 00:00:00 2001 From: Michael Holtermann Date: Fri, 26 Apr 2024 13:11:38 +0200 Subject: [PATCH 6/9] switch to constructor-based injection, create fields for counters --- .../monitor/zeebe/protobuf/importers/JobProtobufImporter.java | 1 + .../protobuf/importers/MessageSubscriptionProtobufImporter.java | 1 + .../monitor/zeebe/protobuf/importers/TimerProtobufImporter.java | 1 + .../zeebe/protobuf/importers/VariableProtobufImporter.java | 1 + 4 files changed, 4 insertions(+) diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java index 9385351e..ceed9fd6 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java @@ -15,6 +15,7 @@ public class JobProtobufImporter { private final JobRepository jobRepository; private final Counter counter; + @Autowired public JobHazelcastImporter(JobRepository jobRepository, MeterRegistry meterRegistry) { this.jobRepository = jobRepository; diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java index a02864b4..235bbfae 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java @@ -18,6 +18,7 @@ public class MessageSubscriptionProtobufImporter { private final Counter subsCounter; private final Counter eventCounter; + @Autowired public MessageSubscriptionHazelcastImporter(MessageSubscriptionRepository messageSubscriptionRepository, MeterRegistry meterRegistry) { this.messageSubscriptionRepository = messageSubscriptionRepository; diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java index 0e4bff26..8d397b77 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java @@ -15,6 +15,7 @@ public class TimerProtobufImporter { private final TimerRepository timerRepository; private final Counter timerCounter; + @Autowired public TimerHazelcastImporter(TimerRepository timerRepository, MeterRegistry meterRegistry) { this.timerRepository = timerRepository; diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java index 6aaeb8ee..bbb0bc4e 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java @@ -15,6 +15,7 @@ public class VariableProtobufImporter { private final Counter variableCreatedCounter; private final Counter variableUpdatedCounter; + @Autowired public VariableHazelcastImporter(VariableRepository variableRepository, MeterRegistry meterRegistry) { this.variableRepository = variableRepository; From a05a8a793f38bf7466a64ff3df685a84bdad190c Mon Sep 17 00:00:00 2001 From: Michael Holtermann Date: Fri, 26 Apr 2024 13:17:35 +0200 Subject: [PATCH 7/9] no star import --- .../zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java b/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java index 8efc6e2d..d3f52eb2 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java +++ b/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java @@ -67,7 +67,7 @@ record -> messageSubscriptionImporter::importMessageStartEventSubscription)) .addErrorListener(errorImporter::importError) .postProcessListener( - sequence -> hazelcastStateService.saveSequenceNumber(sequence)); + hazelcastStateService::saveSequenceNumber); final var lastSequence = hazelcastStateService.getLastSequenceNumber(); if (lastSequence >= 0) { From 2149b59d19a1f57661268b46bfd1f39e9353ebdd Mon Sep 17 00:00:00 2001 From: Michael Holtermann Date: Fri, 26 Apr 2024 14:36:07 +0200 Subject: [PATCH 8/9] rebase --- .../monitor/zeebe/hazelcast/HazelcastImportService.java | 4 +--- .../importers => hazelcast}/HazelcastStateService.java | 2 +- .../zeebe/protobuf/importers/ErrorProtobufImporter.java | 5 ++--- .../zeebe/protobuf/importers/IncidentProtobufImporter.java | 2 +- .../zeebe/protobuf/importers/JobProtobufImporter.java | 2 +- .../zeebe/protobuf/importers/MessageProtobufImporter.java | 2 +- .../importers/MessageSubscriptionProtobufImporter.java | 2 +- .../importers/ProcessAndElementProtobufImporter.java | 2 +- .../zeebe/protobuf/importers/TimerProtobufImporter.java | 2 +- .../zeebe/protobuf/importers/VariableProtobufImporter.java | 2 +- 10 files changed, 11 insertions(+), 14 deletions(-) rename src/main/java/io/zeebe/monitor/zeebe/{protobuf/importers => hazelcast}/HazelcastStateService.java (97%) diff --git a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java b/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java index d3f52eb2..f369bdc5 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java +++ b/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastImportService.java @@ -3,8 +3,6 @@ import com.hazelcast.core.HazelcastInstance; import io.zeebe.exporter.proto.Schema; import io.zeebe.hazelcast.connect.java.ZeebeHazelcast; -import io.zeebe.monitor.entity.HazelcastConfig; -import io.zeebe.monitor.repository.HazelcastConfigRepository; import io.zeebe.monitor.zeebe.protobuf.importers.ErrorProtobufImporter; import io.zeebe.monitor.zeebe.protobuf.importers.IncidentProtobufImporter; import io.zeebe.monitor.zeebe.protobuf.importers.JobProtobufImporter; @@ -30,7 +28,7 @@ public class HazelcastImportService { @Autowired private TimerProtobufImporter timerImporter; @Autowired private ErrorProtobufImporter errorImporter; - @Autowired private HazelcastConfigRepository hazelcastConfigRepository; + @Autowired private HazelcastStateService hazelcastStateService; public ZeebeHazelcast importFrom(final HazelcastInstance hazelcast) { final var builder = diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/HazelcastStateService.java b/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastStateService.java similarity index 97% rename from src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/HazelcastStateService.java rename to src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastStateService.java index 0feb47d9..65311f2f 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/HazelcastStateService.java +++ b/src/main/java/io/zeebe/monitor/zeebe/hazelcast/HazelcastStateService.java @@ -1,4 +1,4 @@ -package io.zeebe.monitor.zeebe.hazelcast.importers; +package io.zeebe.monitor.zeebe.hazelcast; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java index 130cd88b..cd15a518 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ErrorProtobufImporter.java @@ -11,12 +11,11 @@ @Component public class ErrorProtobufImporter { - private final Counter counter; - private final ErrorRepository errorRepository; + private final Counter counter; @Autowired - public ErrorHazelcastImporter(ErrorRepository errorRepository, MeterRegistry meterRegistry) { + public ErrorProtobufImporter(ErrorRepository errorRepository, MeterRegistry meterRegistry) { this.errorRepository = errorRepository; this.counter = Counter.builder("zeebemonitor_importer_error").description("number of processed errors").register(meterRegistry); diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java index 637d2cd0..df0c58e3 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/IncidentProtobufImporter.java @@ -17,7 +17,7 @@ public class IncidentProtobufImporter { private final Counter resolvedCounter; @Autowired - public IncidentHazelcastImporter(IncidentRepository incidentRepository, MeterRegistry meterRegistry) { + public IncidentProtobufImporter(IncidentRepository incidentRepository, MeterRegistry meterRegistry) { this.incidentRepository = incidentRepository; createdCounter = Counter.builder("zeebemonitor_importer_incident").tag("action", "created").description("number of processed incidents").register(meterRegistry); diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java index ceed9fd6..801c40eb 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java @@ -16,7 +16,7 @@ public class JobProtobufImporter { private final Counter counter; @Autowired - public JobHazelcastImporter(JobRepository jobRepository, MeterRegistry meterRegistry) { + public JobProtobufImporter(JobRepository jobRepository, MeterRegistry meterRegistry) { this.jobRepository = jobRepository; counter = Counter.builder("zeebemonitor_importer_job").description("number of processed jobs").register(meterRegistry); diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java index 14ed4bef..d65fd17a 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageProtobufImporter.java @@ -16,7 +16,7 @@ public class MessageProtobufImporter { private final Counter counter; @Autowired - public MessageHazelcastImporter(MessageRepository messageRepository, MeterRegistry meterRegistry) { + public MessageProtobufImporter(MessageRepository messageRepository, MeterRegistry meterRegistry) { this.messageRepository = messageRepository; this.counter = Counter.builder("zeebemonitor_importer_message").description("number of processed messages").register(meterRegistry); diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java index 235bbfae..5ae954b1 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/MessageSubscriptionProtobufImporter.java @@ -19,7 +19,7 @@ public class MessageSubscriptionProtobufImporter { private final Counter eventCounter; @Autowired - public MessageSubscriptionHazelcastImporter(MessageSubscriptionRepository messageSubscriptionRepository, MeterRegistry meterRegistry) { + public MessageSubscriptionProtobufImporter(MessageSubscriptionRepository messageSubscriptionRepository, MeterRegistry meterRegistry) { this.messageSubscriptionRepository = messageSubscriptionRepository; this.subsCounter = diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java index 0b96404a..189dde9c 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java @@ -30,7 +30,7 @@ public class ProcessAndElementProtobufImporter { private final Counter elementInstanceCounter; @Autowired - public ProcessAndElementHazelcastImporter(ProcessRepository processRepository, ProcessInstanceRepository processInstanceRepository, ElementInstanceRepository elementInstanceRepository, MeterRegistry meterRegistry, ZeebeNotificationService notificationService) { + public ProcessAndElementProtobufImporter(ProcessRepository processRepository, ProcessInstanceRepository processInstanceRepository, ElementInstanceRepository elementInstanceRepository, MeterRegistry meterRegistry, ZeebeNotificationService notificationService) { this.processRepository = processRepository; this.processInstanceRepository = processInstanceRepository; this.elementInstanceRepository = elementInstanceRepository; diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java index 8d397b77..50397118 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/TimerProtobufImporter.java @@ -16,7 +16,7 @@ public class TimerProtobufImporter { private final Counter timerCounter; @Autowired - public TimerHazelcastImporter(TimerRepository timerRepository, MeterRegistry meterRegistry) { + public TimerProtobufImporter(TimerRepository timerRepository, MeterRegistry meterRegistry) { this.timerRepository = timerRepository; this.timerCounter = Counter.builder("zeebemonitor_importer_timer").description("number of processed timers").register(meterRegistry); diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java index bbb0bc4e..34766d3e 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/VariableProtobufImporter.java @@ -16,7 +16,7 @@ public class VariableProtobufImporter { private final Counter variableUpdatedCounter; @Autowired - public VariableHazelcastImporter(VariableRepository variableRepository, MeterRegistry meterRegistry) { + public VariableProtobufImporter(VariableRepository variableRepository, MeterRegistry meterRegistry) { this.variableRepository = variableRepository; this.variableCreatedCounter = Counter.builder("zeebemonitor_importer_variable").tag("action", "imported").description("number of processed variables").register(meterRegistry); From 13dfd3f78979a330dd346aa270c1ba9806a027be Mon Sep 17 00:00:00 2001 From: Michael Holtermann Date: Fri, 26 Apr 2024 14:47:02 +0200 Subject: [PATCH 9/9] rebase --- .../protobuf/importers/JobProtobufImporter.java | 2 +- .../importers/ProcessAndElementProtobufImporter.java | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java index 801c40eb..681a9aaa 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/JobProtobufImporter.java @@ -19,7 +19,7 @@ public class JobProtobufImporter { public JobProtobufImporter(JobRepository jobRepository, MeterRegistry meterRegistry) { this.jobRepository = jobRepository; - counter = Counter.builder("zeebemonitor_importer_job").description("number of processed jobs").register(meterRegistry); + this.counter = Counter.builder("zeebemonitor_importer_job").description("number of processed jobs").register(meterRegistry); } public void importJob(final Schema.JobRecord record) { diff --git a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java index 189dde9c..71b55073 100644 --- a/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java +++ b/src/main/java/io/zeebe/monitor/zeebe/protobuf/importers/ProcessAndElementProtobufImporter.java @@ -92,7 +92,8 @@ private void addOrUpdateProcessInstance(final Schema.ProcessInstanceRecord recor entity.setStart(timestamp); processInstanceRepository.save(entity); - notificationService.sendCreatedProcessInstance(record.getProcessInstanceKey(), record.getProcessDefinitionKey()); + notificationService.sendCreatedProcessInstance( + record.getProcessInstanceKey(), record.getProcessDefinitionKey()); instanceActivatedCounter.increment(); @@ -101,7 +102,8 @@ private void addOrUpdateProcessInstance(final Schema.ProcessInstanceRecord recor entity.setEnd(timestamp); processInstanceRepository.save(entity); - notificationService.sendEndedProcessInstance(record.getProcessInstanceKey(), record.getProcessDefinitionKey()); + notificationService.sendEndedProcessInstance( + record.getProcessInstanceKey(), record.getProcessDefinitionKey()); instanceCompletedCounter.increment(); @@ -110,7 +112,8 @@ private void addOrUpdateProcessInstance(final Schema.ProcessInstanceRecord recor entity.setEnd(timestamp); processInstanceRepository.save(entity); - notificationService.sendEndedProcessInstance(record.getProcessInstanceKey(), record.getProcessDefinitionKey()); + notificationService.sendEndedProcessInstance( + record.getProcessInstanceKey(), record.getProcessDefinitionKey()); instanceTerminatedCounter.increment(); } @@ -130,7 +133,8 @@ private void addElementInstance(final Schema.ProcessInstanceRecord record) { entity.setProcessDefinitionKey(record.getProcessDefinitionKey()); entity.setBpmnElementType(record.getBpmnElementType()); elementInstanceRepository.save(entity); - notificationService.sendUpdatedProcessInstance(record.getProcessInstanceKey(), record.getProcessDefinitionKey()); + notificationService.sendUpdatedProcessInstance( + record.getProcessInstanceKey(), record.getProcessDefinitionKey()); elementInstanceCounter.increment(); }