Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2174] GoT YarnService Integration with DynamicScaling #4077

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.loadgen.dynamic;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
import org.apache.gobblin.temporal.dynamic.ScalingDirective;
import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;


/**
* A dummy implementation of {@link ScalingDirectiveSource} that returns a fixed set of {@link ScalingDirective}s.
*/
public class DummyScalingDirectiveSource implements ScalingDirectiveSource {
private int count = 0;
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
private final Optional<ProfileDerivation> derivedFromBaseline;
public DummyScalingDirectiveSource() {
this.derivedFromBaseline = Optional.of(new ProfileDerivation(WorkforceProfiles.BASELINE_NAME,
new ProfileOverlay.Adding(
new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, "2048"),
new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2")
)
));
}

/**
* @return {@link ScalingDirective}s - an impl. may choose to return all known directives or to give only newer
* directives than previously returned
*/
@Override
public List<ScalingDirective> getScalingDirectives() {
// Note - profile should exist already pr is derived from other profile
if (this.count == 0) {
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
this.count++;
return Arrays.asList(
new ScalingDirective("firstProfile", 3, System.currentTimeMillis(), this.derivedFromBaseline),
new ScalingDirective("secondProfile", 2, System.currentTimeMillis(), this.derivedFromBaseline)
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
);
} else if (this.count == 1) {
this.count++;
return Arrays.asList(
new ScalingDirective("firstProfile", 5, System.currentTimeMillis()),
new ScalingDirective("secondProfile", 3, System.currentTimeMillis())
);
} else if (this.count == 2) {
this.count++;
return Arrays.asList(
new ScalingDirective("firstProfile", 5, System.currentTimeMillis()),
new ScalingDirective("secondProfile", 3, System.currentTimeMillis())
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
);
}
return new ArrayList<>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.yarn;

import java.util.List;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import com.google.common.base.Optional;
import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.temporal.dynamic.ScalingDirective;
import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
import org.apache.gobblin.temporal.dynamic.WorkerProfile;
import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;

/**
* Service for dynamically scaling Gobblin containers running on YARN.
* This service manages workforce staffing and plans, and requests new containers as needed.
*/
@Slf4j
public class DynamicScalingYarnService extends YarnService {

/** this holds the current count of containers requested for each worker profile */
private final WorkforceStaffing workforceStaffing;
/** this holds the current total workforce plan as per latest received scaling directives */
private final WorkforcePlan workforcePlan;

public DynamicScalingYarnService(Config config, String applicationName, String applicationId,
YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception {
super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);

this.workforceStaffing = WorkforceStaffing.initialize(getInitialContainers());
this.workforcePlan = new WorkforcePlan(getConfig(), getInitialContainers());
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Revises the workforce plan and requests new containers based on the given scaling directives.
*
* @param scalingDirectives the list of scaling directives
*/
public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective> scalingDirectives) {
this.workforcePlan.reviseWhenNewer(scalingDirectives);
StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.workforceStaffing);
requestNewContainersForStaffingDeltas(deltas);
}

private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
deltas.getPerProfileDeltas().forEach(profileDelta -> {
if (profileDelta.getDelta() > 0) {
WorkerProfile workerProfile = profileDelta.getProfile();
String profileName = workerProfile.getName();
int curNumContainers = this.workforceStaffing.getStaffing(profileName).orElse(0);
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
int delta = profileDelta.getDelta();
log.info("Requesting {} new containers for profile {} having currently {} containers", delta,
profileName, curNumContainers);
requestContainersForWorkerProfile(workerProfile, delta);
// update our staffing after requesting new containers
this.workforceStaffing.reviseStaffing(profileName, curNumContainers + delta, System.currentTimeMillis());
}
// TODO: Decide how to handle negative deltas
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
});
}

private synchronized void requestContainersForWorkerProfile(WorkerProfile workerProfile, int numContainers) {
int containerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
int containerCores = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
long allocationRequestId = generateAllocationRequestId(workerProfile);
requestContainers(numContainers, Resource.newInstance(containerMemoryMbs, containerCores), Optional.of(allocationRequestId));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.yarn;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.fs.FileSystem;

import com.typesafe.config.Config;
import com.google.common.util.concurrent.AbstractIdleService;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
import org.apache.gobblin.temporal.dynamic.ScalingDirective;
import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;


/**
* This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing
* the latest scaling directives to the {@link DynamicScalingYarnService} for processing.
*/
@Slf4j
public class DynamicScalingYarnServiceManager extends AbstractIdleService {

private final String DYNAMIC_SCALING_PREFIX = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.";
private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX + "directives.dir";
private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + "errors.dir";
private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX + "initial.delay";
private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60;
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
private final String DYNAMIC_SCALING_POLLING_INTERVAL = DYNAMIC_SCALING_PREFIX + "polling.interval";
private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
private final Config config;
DynamicScalingYarnService dynamicScalingYarnService;
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
private final ScheduledExecutorService dynamicScalingExecutor;
private final FileSystem fs;

public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) {
this.config = appMaster.getConfig();
this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService();
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
com.google.common.base.Optional.of("DynamicScalingExecutor")));
this.fs = appMaster.getFs();
}

@Override
protected void startUp() {
int scheduleInterval = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_POLLING_INTERVAL,
DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
int initialDelay = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_INITIAL_DELAY,
DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS);

ScalingDirectiveSource fsScalingDirectiveSource = new FsScalingDirectiveSource(
this.fs,
this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR),
Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR))
);

// TODO: remove this line later
// Using for testing purposes only
ScalingDirectiveSource scalingDirectiveSource = new DummyScalingDirectiveSource();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be helpful for unit testing if, rather than hard-coding, this class took the ScalingDirectiveSource FQ class name? I see that could be harder based on the ctor params.

As a simpler alternative, make DynamicScalingYarnServiceManger abstract w/ a method

abstract protected ScalingDirectiveSource createScalingDirectiveSource();

and then the concrete FsSourceDynamicScalingYarnServiceManager would hard code the ScalingDirectiveSource class. you could have a different concrete DSYSM using DummyScalingDirectiveSource. one of those such FQ class names would be a param.

... which reminds me.... how is this DSYSM created and initialized at present?

Copy link
Contributor Author

@Blazer-007 Blazer-007 Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using DummyScalingDirectiveSource(); to launch containers at runtime if i run any job to test complete e2e.

... which reminds me.... how is this DSYSM created and initialized at present?

here after starting yarnservice - https://github.com/apache/gobblin/blob/master/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java#L102 we initialize other service classes whose names are passed through config

public static final String APP_MASTER_SERVICE_CLASSES = GOBBLIN_YARN_PREFIX + "app.master.serviceClasses";

Copy link
Contributor Author

@Blazer-007 Blazer-007 Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with point of creating abstract class, let me add that in next commit.
Refactoted the code to use AbstractDSYSM and also added unit tests.


log.info("Starting the " + this.getClass().getSimpleName());
log.info("Scheduling the dynamic scaling task with an interval of {} seconds", scheduleInterval);

this.dynamicScalingExecutor.scheduleAtFixedRate(
new GetScalingDirectivesRunnable(this.dynamicScalingYarnService, scalingDirectiveSource),
initialDelay, scheduleInterval, TimeUnit.SECONDS
);
}

@Override
protected void shutDown() {
log.info("Stopping the " + this.getClass().getSimpleName());
ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor, com.google.common.base.Optional.of(log));
}

/**
* A {@link Runnable} that gets the scaling directives from the {@link ScalingDirectiveSource} and passes them to the
* {@link DynamicScalingYarnService} for processing.
*/
@AllArgsConstructor
static class GetScalingDirectivesRunnable implements Runnable {
private final DynamicScalingYarnService dynamicScalingYarnService;
private final ScalingDirectiveSource scalingDirectiveSource;

@Override
public void run() {
try {
List<ScalingDirective> scalingDirectives = scalingDirectiveSource.getScalingDirectives();
if (!scalingDirectives.isEmpty()) {
dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
}
} catch (IOException e) {
log.error("Failed to get scaling directives", e);
}
Blazer-007 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public GobblinTemporalApplicationMaster(String applicationName, String applicati
protected YarnService buildTemporalYarnService(Config config, String applicationName, String applicationId,
YarnConfiguration yarnConfiguration, FileSystem fs)
throws Exception {
return new YarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus);
return new DynamicScalingYarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus);
}

/**
Expand Down
Loading
Loading