Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Discussion] PyTorch Operator Improvement #1836

Open
kuizhiqing opened this issue Jun 17, 2023 · 36 comments
Open

[Discussion] PyTorch Operator Improvement #1836

kuizhiqing opened this issue Jun 17, 2023 · 36 comments

Comments

@kuizhiqing
Copy link
Member

kuizhiqing commented Jun 17, 2023

Motivation

The current PyTorch Operator focuses on a one process per pod architecture, which may not fully utilize the generic design of PyTorch and can underperform. When users adopt torchrun as the entrypoint, the operator does not function properly, see #1790.

Background

PyTorch Architecture

For distributed training with GPUs, frameworks like PyTorch and PaddlePaddle use a one process per GPU architecture. They introduce a launch process on each node to manage the GPU-bound processes.

Each process is identified by:
world_size: The total number of GPUs.
rank: The global rank of the process (from 0 to world_size - 1).
local_size: The number of GPUs on the node.
local_rank: The rank of the process within the node.

Thanks to argparse_util, these settings can also be passed through environment variables which have higher priority. For this proposal, we do not distinguish between args and env vars.

Since version 1.9.0 (released on Jun 2021), PyTorch has torchrun which is an alias for python -m torch.distributed.run.
Compared to torch.distributed.launchtorchrun provides:

  • Elasticity: Specify a range for number of nodes and handle worker recovery.
  • Auto-rendezvous: rank and size are assigned automatically.

Note that when using torchrun, specifying rank is optional; rank can be provided with the --node_rank argument if desired, but torchrun will automatically assign ranks otherwise.

Megatron and Large Language Model Training

3D parallelism is commonly used to train large models in a distributed fashion. For example, a job could use:

  • DP (data parallel) = 8
  • TP (tensor parallel) = 8
  • PP (pipeline parallel) = 4

This requires 8x8x4 = 256 GPUs across 32 nodes of 8 GPUs each. The model has 8 (TP) x 4 (PP) = 32 partitions, with 8 replicas of the model taking different input data.

Communication overhead is typically TP > DP > PP. So we prefer to place the 8 TP partitions on the same node (DGX box), the 8 DP partitions on the same Switch, and PP partitions as close as possible.

The parallel groups (TP, DP, PP) are formed based on the rank of each worker, so the rank of each worker (bound to a GPU) indicates its proximity.

The scheduler assigns resources to optimize proximity. The operator or something else should assign ranks accordingly.
One more thing to note, in performance critical scenarios, users will typically run pods with host network for maximum efficiency.

Current Design

The current operator design appears tailored for PyTorch versions before 1.9.0 and favors running without the torchrun module; specifically, it calculates WORLD_SIZE based on pod replica count, inconsistent with torchrun's methodology.

Proposed Design

The goal of the operator should be to natively support running Megatron-LM examples, such as the one shown here, using a YAML spec like the following:

apiVersion: "kubeflow.org/v1"
kind: PyTorchJob
metadata:
  name: pytorch-megatron
  namespace: kubeflow
spec:
  pytorchReplicaSpecs:
    Worker:
      replicas: 32
      restartPolicy: OnFailure
      template:
        spec:
          containers:
            - name: pytorch
              image: docker.io/example/pytorch-megatron:1
              imagePullPolicy: Always
              command:
                - "torchrun"
                - "pretrain_gpt.py"
                - "OTHER_ARGS"
              resources:
                limits:
                  example.com/gpu: 8

The operator will handle setting distributed training environment variables for the example. It will:

  • Allocate 32 nodes * 8 GPUs = 256 total GPUs
  • Launch 8 processes per node
  • Set WORLD_SIZE=256 and LOCAL_SIZE=8

Rank assignment remains open for discussion:

  1. Allow PyTorch to assign node ranks:
       - In dynamic mode, ranks are assigned by alphabetical sorted IP, partially optimizing locality
       - In elastic etcd mode, ranks are assigned randomly by join order
  2. Have the operator assign pod ranks by numerically sorted IP, assuming IP proximity correlates with locality

For maximum performance, users often implement custom rank assignments before calling torchrun or by modifying PyTorch's internal rank assignment logic.

Discussion List

  1. Should pod rank assignment be in the scope of the operator, or handled externally?
    There are arguments for either approach. The operator assigning ranks enables optimization but reduces flexibility. External rank assignment is more flexible but may lack optimization.

  2. Is only supporting torchrun and elastic mode acceptable, to simplify the operator? 
    I think Yes, focusing the operator on elastic distributed training with torchrun encourages a simple, robust design aligned with PyTorch's capabilities. Warnings could notify users if torchrun is not used.

  3. Is designating one pod as a master separately necessary for collective training?
    The current design is somewhat confusing and technically unnecessary.

  4. Should pod rank be omitted from pod names in elastic mode?
    Omitting rank from pod names in elastic mode decouples pod identity from rank, allowing ranks to change dynamically as nodes join or leave the cluster. This flexibility is important for elasticity.

This draft is not yet mature and there are many aspects that require further consideration. Comments and discussion are welcome.

Reference

@tenzen-y
Copy link
Member

@kuizhiqing Thanks for creating great issue ! Can you create a proposal like https://github.com/kubeflow/katib/tree/master/docs/proposals?

@tenzen-y
Copy link
Member

I think this great proposal is worth leaving as documentation.

@tenzen-y
Copy link
Member

cc: @kubeflow/wg-training-leads

@gaocegege
Copy link
Member

cc @zw0610

@kuizhiqing
Copy link
Member Author

@tenzen-y Thank you for your kind feedback. I appreciate your suggestion to provide a documentation version which could be a PR though. However, I think it would be better to do so after further discussion and we have reached some agreement on this issue. As you noted, I have proposed some questions to prompt discussion, but I am still refining these ideas to develop a more robust proposal, especially for the elastic training part.

@tenzen-y
Copy link
Member

@tenzen-y Thank you for your kind feedback. I appreciate your suggestion to provide a documentation version which could be a PR though. However, I think it would be better to do so after further discussion and we have reached some agreement on this issue. As you noted, I have proposed some questions to prompt discussion, but I am still refining these ideas to develop a more robust proposal, especially for the elastic training part.

I see. SGTM. I will leave my feedback later on this issue.

@johnugeorge
Copy link
Member

Thanks for taking time for writing this detailed proposal. I will review it carefully soon. I have few early questions

  1. What are your thoughts on implementing nproc-per-node in K8s. In the operator, nodes are currently mapped to a replica pod. Same applies to the elastic training as well.(Ref)

  2. What are your expectations in mapping the the distributed args in the operator context?

  3. Will your new definition of WORLD_SIZE and RANK(with 3D parallelism) hold true also in the default Data parallelism case as well? How does the operator differenciate between both schemes?

  4. Are we expecting users to use torchrun in the container command always?

@kuizhiqing
Copy link
Member Author

@johnugeorge Thank you for your comment. Let me try to answer your questions:

  1. In the GPU case, nproc_per_node should equal the number of GPUs on each node, which equals the number of GPU resources declared. This can be implemented using an environment variable. In the CPU case, the operator may not assign a value to it, allowing the user to do so using arguments or a user-defined environment variable.

  2. For those DISTRIBUTED_ARGS:

    • --nproc_per_node was mentioned above
    • --nnodes should be the number of replicas, which is the number of pods
    • --node_rank is not obligatory for torchrun. That is the first question I proposed in the list.
    • --master_addr will be assigned the address of one pod, which could be rank 0 or another pod.
    • --master_port can be any available port, which can be achieved by random selection within a range.

Again, arguments can be set equally using environment variables.

  1. This will always follow the same scheme for the operator. I explained 3D parallelism for two main reasons:

    • The rank is important since parallel groups depend on the rank, which is proximity-sensitive.
    • The elastic granularity may not be 1. It should equal the DP degree in the simple case. For example, if we only have 31 nodes available, then we should continue the trainning with 28 nodes and DP=7. This part is complicated, and I am still working on a better solution.
  2. That is the second question in my list. In my personal opinion, it is OK to just support torchrun since it is a generic feature of PyTorch.

@andreyvelich
Copy link
Member

andreyvelich commented Jun 20, 2023

Thank you for driving this @kuizhiqing

In my personal opinion, it is OK to just support torchrun since it is a generic feature of PyTorch.

Does it mean that users can't use PyTorchJob without torchrun ?
How can we deal with users that still use Kubeflow PyTorchJob with PyTorch < 1.9.0 version ?

@tenzen-y
Copy link
Member

tenzen-y commented Jun 20, 2023

Is only supporting torchrun and elastic mode acceptable, to simplify the operator?
I think Yes, focusing the operator on elastic distributed training with torchrun encourages a simple, robust design aligned with PyTorch's capabilities. Warnings could notify users if torchrun is not used.

I think that this is breaking change. So I would propose that we create a new v2beta1 PyTrochJob.

Rank assignment remains open for discussion:

Allow PyTorch to assign node ranks:

  • In dynamic mode, ranks are assigned by alphabetical sorted IP, partially optimizing locality
  • In elastic etcd mode, ranks are assigned randomly by join order
    Have the operator assign pod ranks by numerically sorted IP, assuming IP proximity correlates with locality

At first, we can implement simple PyTorchJob v2 assuming torchrun. Then, we can evolve API and controller to support elastic and dynamic semantics.

@kuizhiqing WDYT?

@andreyvelich
Copy link
Member

@tenzen-y I am not sure, that dropping support for older PyTorch versions in PyTorchJobs makes sense for our users.
Is it possible to support 2 distributed techniques ? Using torchrun and using old method with RANK and WORLD_SIZE.

@kuizhiqing
Copy link
Member Author

kuizhiqing commented Jun 21, 2023

@andreyvelich @tenzen-y
I think we have three ways to define the entrypoint for training,

  1. python train.py - this is our current approach
  2. torch.distributed.launch train.py - this approach requires specifying WORLD_SIZE and LOCAL_RANK enviroment variables.
  3. torch.distributed.run train.py - this approach does not require specifying WORLD_SIZE and LOCAL_RANK.

Overall, I want to unify all training to use torchrun, including regular training and elastic training, though unifying the approaches has been more difficult than anticipated.

For now, if we talking about a short-term and practical solution, since approach 3 is compatible with approach 2, I prefer to make minor changes to support approaches 2 and 3, without breaking change.
Approach 1 is unnecessary to support since its functionality can be achieved with approach 2 by setting nproc=1.

@tenzen-y
Copy link
Member

Is it possible to support 2 distributed techniques ? Using torchrun and using old method with RANK and WORLD_SIZE.

I agree with you.
I think we can take the above way when it isn't possible.

For now, if we talking about a short-term and practical solution, since approach 3 is compatible with approach 2, I prefer to make minor changes to support approaches 2 and 3, without breaking change.
Approach 1 is unnecessary to support since its functionality can be achieved with approach 2 by setting nproc=1.

Overall it sounds good to me. But I have one more concern and question.
How to know that users can not use approach 1 and that the training operator has stopped supporting approach 1?
Document? Release Note? I think adding validation to check whether podSpec.Containers.command has torchrun or torch.distributed.run might be good. But if users define the command as Dockerfile ENTRYPOINT, this validation doesn't work well.

@kuizhiqing @andreyvelich @johnugeorge Do you have any ideas?

@andreyvelich
Copy link
Member

@tenzen-y @kuizhiqing Can we just verify the container start command and assign the appropriate env variables?

  • if start command is python train.py, controller sets WORLD_SIZE and RANK.
  • if start command is torchrun or torch.distributed.launch train.py, controller set WORLD_SIZE and LOCAL_SIZE

@tenzen-y
Copy link
Member

@tenzen-y @kuizhiqing Can we just verify the container start command and assign the appropriate env variables?

  • if start command is python train.py, controller sets WORLD_SIZE and RANK.
  • if start command is torchrun or torch.distributed.launch train.py, controller set WORLD_SIZE and LOCAL_SIZE

If users define commands as Dockerfile ENTRYPOINT, that way doesn't work fine. But I don't have a good idea which env vars we should set when podSpec.containers[0].command is empty.

@andreyvelich
Copy link
Member

@tenzen-y I remember, that we can extract Docker Entrypoint using go-containerregistry library.
Like in Katib: https://github.com/kubeflow/katib/blob/master/pkg/webhook/v1beta1/pod/utils.go#L103-L111C25.
Not sure if that is still working, but we can try to do the same for Training Operator.

@tenzen-y
Copy link
Member

tenzen-y commented Jun 22, 2023

@tenzen-y I remember, that we can extract Docker Entrypoint using go-containerregistry library. Like in Katib: https://github.com/kubeflow/katib/blob/master/pkg/webhook/v1beta1/pod/utils.go#L103-L111C25. Not sure if that is still working, but we can try to do the same for Training Operator.

Thanks for the great suggestion. Maybe we should evaluate the library: "Which standards are they supported? (Docker v1, Dokcer v2, OCI v1, OCI v2, Lazy Pulling, etc...)"

@johnugeorge
Copy link
Member

If I understand correctly, the only change proposed is to change the semantics how WORLD_SIZE, RANK, LOCAL_RANK, LOCAL_SIZE variables are populated.

  1. Having torchrun by default breaks the current deployments/examples. What is the way to provide the support for current scheme? Or at a minimum, provide backwards compatibility to previous deployments?
  2. In the default torchrun case when nproc_per_node is set to number of GPUs per node(>1), torchrun starts multiple process where each process has LOCAL_RANK ranging from 0 to nproc_per_node. In your proposal, how is this handled? multiple containers in the same pod?
  3. In the current code, we do not extrct or take any decision based on the Worker PodSpec. In your proposal, you meant to extract example.com/gpu info from the PodSpec to set nproc_per_node(number of GPUs per node)?

@kuizhiqing
Copy link
Member Author

If I understand correctly, the only change proposed is to change the semantics how WORLD_SIZE, RANK, LOCAL_RANK, LOCAL_SIZE variables are populated.

  1. Having torchrun by default breaks the current deployments/examples. What is the way to provide the support for current scheme? Or at a minimum, provide backwards compatibility to previous deployments?
  2. In the default torchrun case when nproc_per_node is set to number of GPUs per node(>1), torchrun starts multiple process where each process has LOCAL_RANK ranging from 0 to nproc_per_node. In your proposal, how is this handled? multiple containers in the same pod?
  3. In the current code, we do not extrct or take any decision based on the Worker PodSpec. In your proposal, you meant to extract example.com/gpu info from the PodSpec to set nproc_per_node(number of GPUs per node)?

@johnugeorge

  1. Current examples did not allow declare GPU resource more than one, so if we take GPU resource declaration into consideration for calculation WORLD_SIZE, it would be OK.
  2. The entrypoint of the pod would be torchrun, and it will run multiprocess in the same container, this is efficient and transparent to operator.
  3. Yes, the controller set env based on the PodSpec is OK for me. Do you have any concern about it ?

@tenzen-y
Copy link
Member

  1. In the current code, we do not extrct or take any decision based on the Worker PodSpec. In your proposal, you meant to extract example.com/gpu info from the PodSpec to set nproc_per_node(number of GPUs per node)?
  1. Yes, the controller set env based on the PodSpec is OK for me. Do you have any concern about it ?

I think we should add fields for setting nproc_per_node to the top level instead of taking example.com/gpu from podSpec.
Then I have 2 concerns about taking example.com/gpu and setting the value asnproc_per_node:

  1. Which devices are counted as nproc_per_node? AFAIK, Operator can not know in advance which resources specified in podSpec.containers.resource are accelerators since we can set any resources to podSpec.containers.resource provided by device-plugins (OSS one and in-house one, etc...).
  2. That behaviors are Implicit and aren't declarative. In the K8s world, we should strive to make the parameters declaratively configurable.

@kuizhiqing
Copy link
Member Author

@tenzen-y Yes, you are right, many cloud provider custom their resources declaration. Maybe adding field explicit is better.

@zw0610
Copy link
Member

zw0610 commented Jun 25, 2023

5. In the current code, we do not extrct or take any decision based on the Worker PodSpec. In your proposal, you meant to extract example.com/gpu info from the PodSpec to set nproc_per_node(number of GPUs per node)?

  1. Yes, the controller set env based on the PodSpec is OK for me. Do you have any concern about it ?

I think we should add fields for setting nproc_per_node to the top level instead of taking example.com/gpu from podSpec. Then I have 2 concerns about taking example.com/gpu and setting the value asnproc_per_node:

  1. Which devices are counted as nproc_per_node? AFAIK, Operator can not know in advance which resources specified in podSpec.containers.resource are accelerators since we can set any resources to podSpec.containers.resource provided by device-plugins (OSS one and in-house one, etc...).
  2. That behaviors are Implicit and aren't declarative. In the K8s world, we should strive to make the parameters declaratively configurable.

It would be better if such a new field for proc_per_node is compatible with the contemporary PyTorchJob API instead of v2.

@tenzen-y
Copy link
Member

@tenzen-y Yes, you are right, many cloud provider custom their resources declaration. Maybe adding field explicit is better.

@kuizhiqing Could you update the proposal to include new fields for the nproc_per_node? Thanks for your effort.

@tenzen-y
Copy link
Member

It would be better if such a new field for proc_per_node is compatible with the contemporary PyTorchJob API instead of v2.

I'm ok with adding new fields to PyTorchJob v1 to maintain backward compatibility.

@johnugeorge
Copy link
Member

Yes, a new field will help in maintaining backward compatibilty in supporting all launchers

nprocs_per_node by default is set to 1. If this field is set with a value greater than 1, use new way of setting WORLD_SIZE and other parameters. Else, we will use current way. Master spec can be explicit master or one of the pods(eg: pod0)

@kuizhiqing
Copy link
Member Author

@tenzen-y Yes, you are right, many cloud provider custom their resources declaration. Maybe adding field explicit is better.

@kuizhiqing Could you update the proposal to include new fields for the nproc_per_node? Thanks for your effort.

I'm working on a version try to make a compatible change as we discussed, this PR is not fully tested in all cases.

@johnugeorge
Copy link
Member

So we prefer to place the 8 TP partitions on the same node, the 8 DP partitions on the same DGX box"

@kuizhiqing Curious which DGX box are you using? Is it a 32 DGX cluster with 8 GPUs each? You referred to having TP and DP on the same DGX?

Regarding the rank assignment discussion, what is ideal topology and expectation of rank of workers given your DGX cluster setup ?

@tenzen-y
Copy link
Member

So we prefer to place the 8 TP partitions on the same node, the 8 DP partitions on the same DGX box"

@kuizhiqing Curious which DGX box are you using? Is it a 32 DGX cluster with 8 GPUs each? You referred to having TP and DP on the same DGX?

Regarding the rank assignment discussion, what is ideal topology and expectation of rank of workers given your DGX cluster setup ?

@kuizhiqing Does DGX box mean DGX superPOD?

@kuizhiqing
Copy link
Member Author

@johnugeorge @tenzen-y
Sorry I've made a mistake here, the DGX box is the same as node here, I mean Switch for DGX.
While the principle holds: TP group closer than DP group, closer than PP group.

@kuizhiqing
Copy link
Member Author

@johnugeorge @tenzen-y
BTW, the PR #1840 is ready to review.

It implement part of what I propose to do, I'm continuing working on it.

@johnugeorge
Copy link
Member

@kuizhiqing After #1840 , do you need more changes wrt this issue? How do you handle rank assignment in your current deployment?

@kuizhiqing
Copy link
Member Author

@johnugeorge wrt this issue, I want to make a change that make the master/worker declaration separation optional, as someone already request this feature.

Otherwise, I will leave other idea in long term improvement.

@johnugeorge
Copy link
Member

do you plan to take this up in a week? Release feature freeze will happen by end of next week.

@kuizhiqing
Copy link
Member Author

@johnugeorge After evaluation, I think the feature making master declaration optional may not necessary in current design, and it can be done in the current elastic mode. Overall, I have no more change pending before release. Thx.

@github-actions
Copy link

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@tenzen-y
Copy link
Member

/lifecycle frozen

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants