Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.runtime;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.net.URI;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
import org.apache.gobblin.service.monitoring.DagActionValue;
import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent;
import org.apache.gobblin.service.monitoring.OperationType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.testng.annotations.Test;

import static org.mockito.Mockito.*;


/**
* Tests the main functionality of {@link DagActionStoreChangeMonitor} to process {@link DagActionStoreChangeEvent} type
* events stored in a {@link org.apache.gobblin.kafka.client.KafkaConsumerRecord}. The
* processMessage(DecodeableKafkaRecord message) function should be able to gracefully process a variety of message
* types, even with undesired formats, without throwing exceptions.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a test that creates records with undesired format or some other kind of bad data ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

*/
@Slf4j
public class DagActionStoreChangeMonitorTest {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we test a negative scenario where Kafka message is too big. I have seen that error often. We need to decide what to do with those messages and handle them gracefully.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are messages all containing only a few fields, so I don't expect this case but I can test it if I find a way to create. What would the too large message cause as an error? How can we test?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this error would occur on the producer side, not on the consumer side

public static final String TOPIC = DagActionStoreChangeEvent.class.getSimpleName();
private final int PARTITION = 1;
private final int OFFSET = 1;
private final String FLOW_GROUP = "flowGroup";
private final String FLOW_NAME = "flowName";
private final String FLOW_EXECUTION_ID = "123";
private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor;
private int txidCounter = 0;

/**
* Note: The class methods are wrapped in a test specific method because the original methods are package protected
* and cannot be accessed by this class.
*/
class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor {

public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads,
boolean isMultiActiveSchedulerEnabled) {
super(topic, config, mock(DagActionStore.class), mock(DagManager.class), numThreads, mock(FlowCatalog.class),
mock(Orchestrator.class), isMultiActiveSchedulerEnabled);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are mocking the dagManager and orchestrator, it is possible for us to actually test side effects after each processMessage by measuring the function that is called and asserting on it. See https://stackoverflow.com/questions/9841623/mockito-how-to-verify-method-was-called-on-an-object-created-within-a-method

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added verification calls to each test case to ensure the right method is called depending on the input.

}

protected void processMessageForTest(DecodeableKafkaRecord record) {
super.processMessage(record);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to override when all the method do is call super's method? same for the startUp

Copy link
Copy Markdown
Contributor Author

@umustafi umustafi Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methods are protected access but these are in a diff module so override it only to access the method. They are in this module to use the Kafka 09 version

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks better now. maybe we do not need super. there


}

protected void startUpForTest() {
super.startUp();
}
}

MockDagActionStoreChangeMonitor createMockDagActionStoreChangeMonitor() {
Config config = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
.withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore"))
.withValue("zookeeper.connect", ConfigValueFactory.fromAnyRef("localhost:2121"));
return new MockDagActionStoreChangeMonitor("dummyTopic", config, 5, true);
}

// Called at start of every test so the count of each method being called is reset to 0
public void setup() {
mockDagActionStoreChangeMonitor = createMockDagActionStoreChangeMonitor();
mockDagActionStoreChangeMonitor.startUpForTest();
}

/**
* Ensure no NPE results from passing a HEARTBEAT type message with a null {@link DagActionValue} and the message is
* filtered out since it's a heartbeat type so no methods are called.
*/
@Test
public void testProcessMessageWithHeartbeatAndNullDagAction() throws SpecNotFoundException {
setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", null);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
// Note: Indirectly verifies submitFlowToDagManagerHelper is called which is not a mocked object so cannot be asserted
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class));
}

/**
* Ensure a HEARTBEAT type message with non-empty flow information is filtered out since it's a heartbeat type so no
* methods are called.
*/
@Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndNullDagAction")
public void testProcessMessageWithHeartbeatAndFlowInfo() throws SpecNotFoundException {
setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class));
}

/**
* Tests process message with an INSERT type message of a `launch` action
*/
@Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndFlowInfo")
public void testProcessMessageWithInsertLaunchType() throws SpecNotFoundException {
setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(1)).getSpecs(any(URI.class));
}

/**
* Tests process message with an INSERT type message of a `resume` action. It re-uses the same flow information however
* since it is a different tid used every time it will be considered unique and submit a kill request.
*/
@Test (dependsOnMethods = "testProcessMessageWithInsertLaunchType")
public void testProcessMessageWithInsertResumeType() throws SpecNotFoundException {
setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(1)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class));
}

/**
* Tests process message with an INSERT type message of a `kill` action. Similar to `testProcessMessageWithInsertResumeType`.
*/
@Test (dependsOnMethods = "testProcessMessageWithInsertResumeType")
public void testProcessMessageWithInsertKillType() throws SpecNotFoundException {
setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.KILL);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(1)).handleKillFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class));
}

/**
* Tests process message with an UPDATE type message of the 'launch' action above. Although processMessage does not
* expect this message type it should handle it gracefully
*/
@Test (dependsOnMethods = "testProcessMessageWithInsertKillType")
public void testProcessMessageWithUpdate() throws SpecNotFoundException {
setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.UPDATE, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class));
}

/**
* Tests process message with a DELETE type message which should be ignored regardless of the flow information.
*/
@Test (dependsOnMethods = "testProcessMessageWithUpdate")
public void testProcessMessageWithDelete() throws SpecNotFoundException {
setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP, FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), times(0)).getSpecs(any(URI.class));
}

/**
* Util to create a general DagActionStoreChange type event
*/
private DagActionStoreChangeEvent createDagActionStoreChangeEvent(OperationType operationType,
String flowGroup, String flowName, String flowExecutionId, DagActionValue dagAction) {
String key = getKeyForFlow(flowGroup, flowName, flowExecutionId);
GenericStoreChangeEvent genericStoreChangeEvent =
new GenericStoreChangeEvent(key, String.valueOf(txidCounter), System.currentTimeMillis(), operationType);
txidCounter++;
return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, flowName, flowExecutionId, dagAction);
}

/**
* Form a key for events using the flow identifiers
* @return a key formed by adding an '_' delimiter between the flow identifiers
*/
private String getKeyForFlow(String flowGroup, String flowName, String flowExecutionId) {
return flowGroup + "_" + flowName + "_" + flowExecutionId;
}

/**
* Util to create wrapper around DagActionStoreChangeEvent
*/
private Kafka09ConsumerClient.Kafka09ConsumerRecord wrapDagActionStoreChangeEvent(OperationType operationType, String flowGroup, String flowName,
String flowExecutionId, DagActionValue dagAction) {
DagActionStoreChangeEvent eventToProcess = null;
try {
eventToProcess =
createDagActionStoreChangeEvent(operationType, flowGroup, flowName, flowExecutionId, dagAction);
} catch (Exception e) {
log.error("Exception while creating event ", e);
}
// TODO: handle partition and offset values better
ConsumerRecord consumerRecord = new ConsumerRecord<>(TOPIC, PARTITION, OFFSET,
getKeyForFlow(flowGroup, flowName, flowExecutionId), eventToProcess);
return new Kafka09ConsumerClient.Kafka09ConsumerRecord(consumerRecord);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -518,15 +518,15 @@ public static int maxFlowSpecUriLength() {
return URI_SCHEME.length() + ":".length() // URI separator
+ URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_NAME_LENGTH + URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH;
}
}

/**
* Create a new FlowSpec object with the added property defined by path and value parameters
* @param path key for new property
* @param value
*/
public static FlowSpec createFlowSpecWithProperty(FlowSpec flowSpec, String path, String value) {
Config updatedConfig = flowSpec.getConfig().withValue(path, ConfigValueFactory.fromAnyRef(value));
return new Builder(flowSpec.getUri()).withConfig(updatedConfig).build();
/**
* Create a new FlowSpec object with the added property defined by path and value parameters
* @param path key for new property
* @param value
*/
public static FlowSpec createFlowSpecWithProperty(FlowSpec flowSpec, String path, String value) {
Config updatedConfig = flowSpec.getConfig().withValue(path, ConfigValueFactory.fromAnyRef(value));
return new Builder(flowSpec.getUri()).withConfig(updatedConfig).build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.apache.gobblin.runtime.api.FlowSpec.*;


public class FlowSpecTest {

Expand All @@ -51,7 +49,7 @@ public void testAddProperty() throws URISyntaxException {
properties.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, "true");

FlowSpec originalFlowSpec = FlowSpec.builder(flowUri).withConfigAsProperties(properties).build();
FlowSpec updatedFlowSpec = createFlowSpecWithProperty(originalFlowSpec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
FlowSpec updatedFlowSpec = FlowSpec.Utils.createFlowSpecWithProperty(originalFlowSpec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);

Properties updatedProperties = updatedFlowSpec.getConfigAsProperties();
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.gobblin.service.monitoring;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand All @@ -27,6 +28,7 @@
import java.net.URISyntaxException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
Expand All @@ -42,8 +44,6 @@
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;

import static org.apache.gobblin.runtime.api.FlowSpec.*;


/**
* A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} schema to process Kafka messages received
Expand Down Expand Up @@ -79,10 +79,13 @@ public String load(String key) throws Exception {
dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build(cacheLoader);

protected DagActionStore dagActionStore;

@Getter
@VisibleForTesting
protected DagManager dagManager;
protected Orchestrator orchestrator;
protected boolean isMultiActiveSchedulerEnabled;
@Getter
@VisibleForTesting
protected FlowCatalog flowCatalog;

// Note that the topic is an empty string (rather than null to avoid NPE) because this monitor relies on the consumer
Expand Down Expand Up @@ -200,7 +203,7 @@ protected void submitFlowToDagManagerHelper(String flowGroup, String flowName, S
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
// Adds flowExecutionId to config to ensure they are consistent across hosts
FlowSpec updatedSpec = createFlowSpecWithProperty(spec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
FlowSpec updatedSpec = FlowSpec.Utils.createFlowSpecWithProperty(spec, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
this.orchestrator.submitFlowToDagManager(updatedSpec);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage());
Expand Down