Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
public class JobExecutionPlan {
public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
public static final String JOB_PROPS_KEY = "job.props";
private static final int MAX_JOB_NAME_LENGTH = 255;
private static final int MAX_JOB_NAME_LENGTH = 128;

private final JobSpec jobSpec;
private final SpecExecutor specExecutor;
Expand Down Expand Up @@ -112,10 +112,10 @@ private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long fl
// job names are assumed to be unique within a dag.
int hash = flowInputPath.hashCode();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we still hash this one or use the whole path in jobName and then only hash that? Similar to Arjun's comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't reuse that hash because it prevents collisions where flows with the same jobnames within edges. Since we hash this string, it helps the second hash be unique as well to not run into the same scenario

jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName, jobName, edgeId, hash);
// jobNames are commonly used as a directory name, which is limited to 255 characters
// jobNames are commonly used as a directory name, which is limited to 255 characters (account for potential prefixes added/file name lengths)
Copy link
Copy Markdown
Contributor

@arjun4084346 arjun4084346 Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Should we add flowExecutionId before calculating hash?
  2. Why did get hash of flowInputPath ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. No because then it would create a different work directory per execution which would lead to jars not being cached etc.
  2. Not sure but I believe it was decided a while back that it would be more unique in the multihop/parallel job setting than edgeId or flowName, as you can have one edge map to multiple jobs.

if (jobName.length() >= MAX_JOB_NAME_LENGTH) {
// shorten job length to be 128 characters (flowGroup) + (hashed) flowName, hashCode length
jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName.hashCode(), hash);
// shorten job length but make it uniquely identifiable in multihop flows or concurrent jobs, max length 139 characters (128 flow group + hash)
jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, jobName.hashCode());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we avoid doing hash two times?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will only be done on long flow names so it should be fine, I wanted to get a hash of the jobName and not reuse the prior hash so that it can cover scenarios where there is the same input path in a multi hop flow with multiple edges

}
JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, flowSpec)).withConfig(jobConfig)
.withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ public void testCreateDagLongName() throws Exception {

Dag<JobExecutionPlan> dag1 = new JobExecutionPlanDagFactory().createDag(Arrays.asList(jobExecutionPlan));

Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY).length(), 142);
Assert.assertEquals(dag1.getStartNodes().get(0).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY).length(), 139);

}

@Test
Expand Down