-
Notifications
You must be signed in to change notification settings - Fork 748
Add framework and unit tests for DagActionStoreChangeMonitor #3817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
25bd334
2ee3551
f30dcfb
101d283
64ea98f
a249afb
07b6fac
e6b87d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,154 @@ | ||
| package org.apache.gobblin.runtime; | ||
|
|
||
| import com.typesafe.config.Config; | ||
| import com.typesafe.config.ConfigFactory; | ||
| import com.typesafe.config.ConfigValueFactory; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.apache.gobblin.configuration.ConfigurationKeys; | ||
| import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; | ||
| import org.apache.gobblin.kafka.client.Kafka09ConsumerClient; | ||
| import org.apache.gobblin.runtime.api.DagActionStore; | ||
| import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; | ||
| import org.apache.gobblin.service.modules.orchestration.DagManager; | ||
| import org.apache.gobblin.service.modules.orchestration.Orchestrator; | ||
| import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent; | ||
| import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor; | ||
| import org.apache.gobblin.service.monitoring.DagActionValue; | ||
| import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent; | ||
| import org.apache.gobblin.service.monitoring.OperationType; | ||
| import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
| import org.testng.annotations.BeforeClass; | ||
| import org.testng.annotations.Test; | ||
|
|
||
| import static org.mockito.Mockito.*; | ||
|
|
||
|
|
||
| /** | ||
| * Tests the main functionality of {@link DagActionStoreChangeMonitor} to process {@link DagActionStoreChangeEvent} type | ||
| * events stored in a {@link org.apache.gobblin.kafka.client.KafkaConsumerRecord}. The | ||
| * processMessage(DecodeableKafkaRecord message) function should be able to gracefully process a variety of message | ||
| * types, even with undesired formats, without throwing exceptions. | ||
| */ | ||
| @Slf4j | ||
| public class DagActionStoreChangeMonitorTest { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we test a negative scenario where Kafka message is too big. I have seen that error often. We need to decide what to do with those messages and handle them gracefully.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are messages all containing only a few fields, so I don't expect this case but I can test it if I find a way to create. What would the too large message cause as an error? How can we test?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this error would occur on the producer side, not on the consumer side |
||
| public static final String TOPIC = DagActionStoreChangeEvent.class.getSimpleName(); | ||
| private final int PARTITION = 1; | ||
| private final int OFFSET = 1; | ||
| private final String FLOW_GROUP = "flowGroup"; | ||
| private final String FLOW_NAME = "flowName"; | ||
| private final String FLOW_EXECUTION_ID = "123"; | ||
| private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor; | ||
| private int txidCounter = 0; | ||
|
|
||
| class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor { | ||
|
|
||
| public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads, | ||
| boolean isMultiActiveSchedulerEnabled) { | ||
| super(topic, config, mock(DagActionStore.class), mock(DagManager.class), numThreads, mock(FlowCatalog.class), | ||
| mock(Orchestrator.class), isMultiActiveSchedulerEnabled); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we are mocking the dagManager and orchestrator, it is possible for us to actually test side effects after each processMessage by measuring the function that is called and asserting on it. See https://stackoverflow.com/questions/9841623/mockito-how-to-verify-method-was-called-on-an-object-created-within-a-method
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added verification calls to each test case to ensure the right method is called depending on the input. |
||
| } | ||
|
|
||
| @Override | ||
| protected void processMessage(DecodeableKafkaRecord record) { | ||
| super.processMessage(record); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why to override when all the method do is call super's method? same for the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The methods are protected access but these are in a diff module so override it only to access the method. They are in this module to use the Kafka 09 version
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks better now. maybe we do not need |
||
| } | ||
|
|
||
| @Override | ||
| protected void startUp() { | ||
| super.startUp(); | ||
| } | ||
| } | ||
|
|
||
| MockDagActionStoreChangeMonitor createMockDagActionStoreChangeMonitor() { | ||
| Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000")) | ||
| .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer")) | ||
| .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore")) | ||
| .withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121")); | ||
| return new MockDagActionStoreChangeMonitor("dummyTopic", config, 5, true); | ||
| } | ||
|
|
||
| @BeforeClass | ||
| public void setup() { | ||
| mockDagActionStoreChangeMonitor = createMockDagActionStoreChangeMonitor(); | ||
| mockDagActionStoreChangeMonitor.startUp(); | ||
| } | ||
|
|
||
| /** | ||
| * Ensure no NPE results from passing a HEARTBEAT type message with a null {@link DagActionValue} | ||
| */ | ||
| @Test | ||
| public void testProcessMessageWithHeartbeat() { | ||
| Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = | ||
| wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", null); | ||
| mockDagActionStoreChangeMonitor.processMessage(consumerRecord); | ||
| } | ||
|
|
||
| /** | ||
| * Tests process message with an INSERT type message | ||
| */ | ||
| @Test | ||
| public void testProcessMessageWithInsert() { | ||
| Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = | ||
| wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH); | ||
| mockDagActionStoreChangeMonitor.processMessage(consumerRecord); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we just need to test that
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I want to verify more state ie: check meter counts but for simplicity mocked the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a few more verifications that checks which mocked classes are called with |
||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Tests process message with an UPDATE type message | ||
| */ | ||
| @Test | ||
| public void testProcessMessageWithUpdate() { | ||
| Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = | ||
| wrapDagActionStoreChangeEvent(OperationType.UPDATE, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH); | ||
| mockDagActionStoreChangeMonitor.processMessage(consumerRecord); | ||
| } | ||
|
|
||
| /** | ||
| * Tests process message with a DELETE type message | ||
| */ | ||
| @Test | ||
| public void testProcessMessageWithDelete() { | ||
| Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord = | ||
| wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH); | ||
| mockDagActionStoreChangeMonitor.processMessage(consumerRecord); | ||
| } | ||
|
|
||
| /** | ||
| * Util to create a general DagActionStoreChange type event | ||
| */ | ||
| private DagActionStoreChangeEvent createDagActionStoreChangeEvent(OperationType operationType, | ||
| String flowGroup, String flowName, String flowExecutionId, DagActionValue dagAction) { | ||
| String key = getKeyForFlow(flowGroup, flowName, flowExecutionId); | ||
| GenericStoreChangeEvent genericStoreChangeEvent = | ||
| new GenericStoreChangeEvent(key, String.valueOf(txidCounter), System.currentTimeMillis(), operationType); | ||
| txidCounter++; | ||
| return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, flowName, flowExecutionId, dagAction); | ||
| } | ||
|
|
||
| /** | ||
| * Form a key for events using the flow identifiers | ||
| * @return a key formed by adding an '_' delimiter between the flow identifiers | ||
| */ | ||
| private String getKeyForFlow(String flowGroup, String flowName, String flowExecutionId) { | ||
| return flowGroup + "_" + flowName + "_" + flowExecutionId; | ||
| } | ||
|
|
||
| /** | ||
| * Util to create wrapper around DagActionStoreChangeEvent | ||
| */ | ||
| private Kafka09ConsumerClient.Kafka09ConsumerRecord wrapDagActionStoreChangeEvent(OperationType operationType, String flowGroup, String flowName, | ||
| String flowExecutionId, DagActionValue dagAction) { | ||
| DagActionStoreChangeEvent eventToProcess = null; | ||
| try { | ||
| eventToProcess = | ||
| createDagActionStoreChangeEvent(operationType, flowGroup, flowName, flowExecutionId, dagAction); | ||
| } catch (Exception e) { | ||
| log.error("Exception while creating event ", e); | ||
| } | ||
| // TODO: handle partition and offset values better | ||
| ConsumerRecord consumerRecord = new ConsumerRecord<>(TOPIC, PARTITION, OFFSET, | ||
| getKeyForFlow(flowGroup, flowName, flowExecutionId), eventToProcess); | ||
| return new Kafka09ConsumerClient.Kafka09ConsumerRecord(consumerRecord); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a test that creates records with undesired format or some other kind of bad data ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1