Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,17 @@ public boolean isScheduled() {
return getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY);
}

/**
* Create a new FlowSpec object with the added property defined by path and value parameters
* @param path key for new property
* @param value
*/
public FlowSpec addProperty(String path, String value) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm , one last comment.
lets rename it to createFlowSpecWithProperty and move it to FlowSpec.Utils

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Properties properties = this.getConfigAsProperties();
properties.setProperty(path, value);
return new Builder(this.getUri()).withConfigAsProperties(properties).build();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you intend to clone a new FlowSpec?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should create a new FlowSpec with the shared configAsProperties object. I think you did not mean to do that.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, i think we should add configs to config field also. that is a better source of truth.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I have to clone and create a new FlowSpec because type safe Configs are immutable plus we have made Config a final variable (All variables are final). Rather than change what may be a core idea that FlowSpecs are immutable, I am creating a clone which has all the previous properties and URI plus the new one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can also update the Config field, I thought it would get sync'd with the Properties but doesn't look like it. Making a change and updating test too.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern here is mostly that it would create a decent amount of object churn, given the size of configs it should lead to an increase of jvm GC given that this would happen once per execution.

That being said it might be an overoptimization to enforce such a rule until you instrument results on a high load, the alternative would be messy, need to add a way for compileSpec() to take in a flow execution ID and enforce using that ID instead of using the provided one in the flow config or from the system time, which would also lead to a bit of an anti-pattern of compiler expecting to use either the flow config's ID (that you provided here) or it's own generated ID.

@arjun4084346 arjun4084346 Nov 3, 2023

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I do not prefer cloning too, but config is final here. Not sure if it should be.
Regarding the extra objects, the spec created in line 201 should lose reference by line 204, so we should be good at least in this case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the name changed to createFlowSpecWithProperty it becomes more clear that this method is creating a new spec and callers should not hold the reference of the original spec. Maybe we can write it in the methods javadoc @umustafi

}

@Slf4j
public static class Utils {
private final static String URI_SCHEME = "gobblin-flow";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.apache.gobblin.runtime.api;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Properties;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.FlowId;
import org.testng.Assert;
import org.testng.annotations.Test;


public class FlowSpecTest {

/**
* Tests that the addProperty() function to ensure the new flowSpec returned has the original properties and updated
* ones
* @throws URISyntaxException
*/
@Test
public void testAddProperty() throws URISyntaxException {
String flowGroup = "myGroup";
String flowName = "myName";
String flowExecutionId = "myId";
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);

// Create properties to be used as config
Properties properties = new Properties();
properties.setProperty(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
properties.setProperty(ConfigurationKeys.FLOW_NAME_KEY, flowName);
properties.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, "true");

FlowSpec originalFlowSpec = FlowSpec.builder(flowUri).withConfigAsProperties(properties).build();
FlowSpec updatedFlowSpec = originalFlowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);

Properties updatedProperties = updatedFlowSpec.getConfigAsProperties();
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY), flowName);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY), "true");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@

package org.apache.gobblin.service.monitoring;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
Expand Down Expand Up @@ -165,7 +163,7 @@ protected void processMessage(DecodeableKafkaRecord message) {
throw new RuntimeException(String.format("Received LAUNCH dagAction while not in multi-active scheduler "
+ "mode for flowAction: %s", dagAction));
}
submitFlowToDagManagerHelper(flowGroup, flowName);
submitFlowToDagManagerHelper(flowGroup, flowName, flowExecutionId);
} else {
log.warn("Received unsupported dagAction {}. Expected to be a KILL, RESUME, or LAUNCH", dagActionType);
this.unexpectedErrors.mark();
Expand All @@ -192,14 +190,16 @@ protected void processMessage(DecodeableKafkaRecord message) {
dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
}

protected void submitFlowToDagManagerHelper(String flowGroup, String flowName) {
protected void submitFlowToDagManagerHelper(String flowGroup, String flowName, String flowExecutionId) {
// Retrieve job execution plan by recompiling the flow spec to send to the DagManager
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
FlowSpec spec = null;
try {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
this.orchestrator.submitFlowToDagManager(spec);
// Adds flowExecutionId to config to ensure they are consistent across hosts
FlowSpec updatedSpec = spec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
this.orchestrator.submitFlowToDagManager(updatedSpec);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {}. Exception {}", flowId, e.getMessage());
this.failedFlowLaunchSubmissions.mark();
Expand Down