Skip to content

Conversation

@dgdelahera
Copy link

@dgdelahera dgdelahera commented May 3, 2022

Motivation of the PR

This PR allows Kubernetes Pod Operator to run with sidecars containers.

When running the Kubernetes Pod Operator, the task waits until the pod is in a Terminate state. In the case that we are running the Pod Operator with multiple containers, the Pod will not be in a Terminate state until all the containers are in a Terminate state. This is a problem when running the main container among sidecars containers, because in many case this containers doesn't receive the signal to exit. An example would be an envoy container that handles the pod network.

Example of a Pod with one container in a terminated state. Note that the Pod status is Running.

Name:        ********
Namespace:   ********
Priority:     0
Node:        ********
Start Time:   Thu, 28 Apr 2022 15:50:25 +0200
Labels:       airflow_version=2.1.4
              dag_id=accounts-refresh
              execution_date=2022-04-28T135015.7751980000-8035c9679
              kubernetes_pod_operator=True
              task_id=********
              try_number=1
Annotations:  [kubernetes.io/psp](http://kubernetes.io/psp): eks.privileged
Status:       Running
IP:           ********
IPs:
  IP:  ********
Containers:
  base:
    Container ID:   ********
    Image:          ********
    Image ID:       ********
    Port:           <none>
    Host Port:      <none>
    State:          Terminated
      Reason:       Error
      Exit Code:    1
      Started:      Thu, 28 Apr 2022 15:50:26 +0200
      Finished:     Thu, 28 Apr 2022 15:50:30 +0200
    Ready:          False
    Restart Count:  0
    Environment:
      ********
    Mounts:
      /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-xnh99 (ro)
  envoy:
    Container ID:   ********
    Image:         ********
    Image ID:       ********
    Port:           <none>
    Host Port:      <none>
    State:          Running
      Started:      Thu, 28 Apr 2022 15:50:26 +0200

The other part that won't work is the cleanup. At the moment, the task will be marked as failed if the pod is not in the SUCCEEDED phase. When running with sidecars, if the sidecars hasn't stop running, the pod won't be in that state.

Changes

As we can see in the follow diagram of the run workflow, we don't need to wait until the pod have finished, so I have removed that part. Our base container (the one that is executing the workload) will always be finished when we reach that point. If we are running with a xcom container, now we will wait for that container.

image

For the cleanup, now we check if the base container is completed or not. In the case that the container is not in that state, it means that the task has failed.

The changes have been tested more than a week in an AWS EKS.

@dgdelahera dgdelahera requested a review from jedcunningham as a code owner May 3, 2022 14:55
@boring-cyborg boring-cyborg bot added provider:cncf-kubernetes Kubernetes (k8s) provider related issues area:providers labels May 3, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented May 3, 2022

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@jedcunningham
Copy link
Member

I'm wondering if instead we should just check the base container state, not just wait until any container finishes.

We also need to be mindful of how this will play with delete_worker_pods being false. K8s sidecars can get so messy.

@jedcunningham jedcunningham requested a review from dstandish May 6, 2022 23:22
@dgdelahera
Copy link
Author

My first approach was to check the base container. I think it is the simplest one, but I wasn't sure it that approach was generic enough (sidecars finishing before the workload). But yeah, we can specify to the users to use one workload per pod (which is a good practice) and if they have any sidecar that finish before the workload they should use an init container instead.

@dgdelahera
Copy link
Author

Hi @jedcunningham. Following your suggestion to watch the base container, I have updated the PR. Now I think we are keeping it simple.

In addition, I think that we can just check if the container is not running, because that implies that the container is in a terminated state. According to the documentation, there are three different states, and if the container is in a running state it can only move to the terminated state.

@potiuk
Copy link
Member

potiuk commented May 10, 2022

Hey @jedcunningham - not sure if you are available :) .but if you do, I am preparing release of providers now and I think this one might be a good one to be included.

@potiuk
Copy link
Member

potiuk commented May 11, 2022

Seems we have some realated failures - it will have to wait for the next round of providers.

while True:
remote_pod = self.read_pod(pod)
if remote_pod.status.phase in PodPhase.terminal_states:
if not self.container_is_running(pod=remote_pod, container_name=base_container):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this changes the meaning of this method from "await pod completion" to "await container completion"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth having a separate method for doing it based on container, if we go that way, so we dont introduce a breaking change here.

time.sleep(1)

def await_pod_completion(self, pod: V1Pod) -> V1Pod:
def await_pod_completion(self, pod: V1Pod, base_container: str) -> V1Pod:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i were gonna add this param, i would not call it 'base_container'. this method doesn't know that the caller is providing the "base" container, so better to just be generic with container_name or something. BUT i am not sure we should change this method anyway, since the purpose is to wait for pod termination.

@dstandish
Copy link
Contributor

i'm not seeing the await all containers option?

@jedcunningham
Copy link
Member

I still have concerns with how this will interact with delete_worker_pods being false, seems like a good way to get orphaned still-running pods in your cluster.

@dgdelahera
Copy link
Author

dgdelahera commented Jun 9, 2022

Hi @dstandish and @jedcunningham. Thank you for your review, and sorry for the delay, it's been a busy week. Going back to this PR, I have made a diagram to clarify it because I think that we can delete this function method call https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L418. That point can only be reached when the base container has finished, so we don't need to wait until the pod has finished. This will continue working with single containers, and it will fix the behavior when running with sidecars.

Untitled Diagram drawio

Regarding the delete_worker_pods interaction, I think that the best way to do the clean up is to inspect the base container state (the one that it is executing the workload) instead of the pod state.

Let me know what do you think about this :)

@dgdelahera
Copy link
Author

I have found other behavior that it isn't working as expected with multiple containers. During the cleanup(), if the pod isn't in the succeeded phase, it will fail. But as you can see in the following pod, the base container is terminated. I will make some changes and ask for a new review.

            'container_statuses': [{'container_id': '...',
                                    'image': '...',
                                    'image_id': '...',
                                    'last_state': {'running': None,
                                                   'terminated': None,
                                                   'waiting': None},
                                    'name': 'base',
                                    'ready': False,
                                    'restart_count': 0,
                                    'started': False,
                                    'state': {'running': None,
                                              'terminated': {'container_id': '...',
                                                             'exit_code': 0,
                                                             'finished_at': datetime.datetime(2022, 6, 14, 9, 35, 17, tzinfo=tzlocal()),
                                                             'message': None,
                                                             'reason': 'Completed',
                                                             'signal': None,
                                                             'started_at': datetime.datetime(2022, 6, 14, 9, 35, 2, tzinfo=tzlocal())},
                                              'waiting': None}},
                                   {'container_id': '...',
                                    'image': '...',
                                    'image_id': '...',
                                    'last_state': {'running': None,
                                                   'terminated': None,
                                                   'waiting': None},
                                    'name': 'envoy',
                                    'ready': True,
                                    'restart_count': 0,
                                    'started': True,
                                    'state': {'running': {'started_at': datetime.datetime(2022, 6, 14, 9, 35, 2, tzinfo=tzlocal())},
                                              'terminated': None,
                                              'waiting': None}}],
            'ephemeral_container_statuses': None,
            'host_ip': '...',
            'init_container_statuses': None,
            'message': None,
            'nominated_node_name': None,
            'phase': 'Running',
       ```

@dgdelahera dgdelahera changed the title Add await all containers option to Kubernetes Pod Operator Kubernetes Pod Operator support running with multiple containers Jun 14, 2022
@dstandish
Copy link
Contributor

@dgdelahera re usage of remote pod there's a related PR in flight #22092

also, when we patch the pod, which is sometimes done in cleanup, we need to use the latest remotely read pod (i.e. remote_pod) otherwise we might get error because of patching with outdated pod.

@dgdelahera
Copy link
Author

@dstandish My PR doesn't modify that workflow. https://github.com/apache/airflow/pull/23450/files#diff-947f9f98b4aedccee6dd81558568c350c738f5da050aad1224d8b48ec43677cdR418 There we are still reading the latest pod state before calling the cleanup

@dstandish
Copy link
Contributor

Hi @dstandish and @jedcunningham. Thank you for your review, and sorry for the delay, it's been a busy week. Going back to this PR, I have made a diagram to clarify it because I think that we can delete this function method call https://github.com/apache/airflow/blob/main/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L418. That point can only be reached when the base container has finished, so we don't need to wait until the pod has finished. This will continue working with single containers, and it will fix the behavior when running with sidecars.

So, when there is an xcom sidecar, it is terminated when we call self.extract_xcom(pod=self.pod). So after we terminate xcom sidecar, we need to wait for the pod to complete before proceeding on to cleanup.

Separately it seems either your description needs updating or the PR because it's talking about adding an "await all containers" option but not seeing that in the PR.

It might be helpful, in evaluating your approach, to have an example too, demonstrating how your change would help with multiple containers / sidecars.

@dgdelahera
Copy link
Author

@dstandish Oh, I didn't take into account the xcom sidecar. I will think it again, and I will also update the description. Thank you so much for the feedback

@dgdelahera
Copy link
Author

@dstandish @jedcunningham I have updated the description and also added a check to the xcom container. If you are agree with this approach, I will add the unit tests for the new methods. Thanks 😄

@dgdelahera dgdelahera requested a review from dstandish June 22, 2022 15:37
@dgdelahera
Copy link
Author

Hi @jedcunningham @dstandish, would you mind taking a new review? Thanks

@potiuk
Copy link
Member

potiuk commented Aug 27, 2022

Hi @jedcunningham @dstandish, would you mind taking a new review? Thanks

I think it would be good to make the tests pass first.

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Oct 12, 2022
@github-actions github-actions bot closed this Oct 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants