Skip to content

Commit

Permalink
Airflow armada (#191)
Browse files Browse the repository at this point in the history
  • Loading branch information
Martynas Asipauskas authored and GitHub Enterprise committed Aug 8, 2024
1 parent 407b849 commit 548a9e2
Show file tree
Hide file tree
Showing 17 changed files with 424 additions and 1,516 deletions.
3 changes: 1 addition & 2 deletions third_party/airflow/armada/auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import Dict, Any, Tuple, Protocol

from typing import Any, Dict, Protocol, Tuple

""" We use this interface for objects fetching Kubernetes auth tokens. Since
it's used within the Trigger, it must be serialisable."""
Expand Down
144 changes: 144 additions & 0 deletions third_party/airflow/armada/log_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
from __future__ import annotations

import math
import threading
from http.client import HTTPResponse
from typing import Dict, List, Optional, Tuple, cast

import pendulum
from airflow.utils.log.logging_mixin import LoggingMixin
from armada.auth import TokenRetriever
from kubernetes import client, config
from pendulum import DateTime
from pendulum.parsing.exceptions import ParserError
from urllib3.exceptions import HTTPError


class KubernetesPodLogManager(LoggingMixin):
"""Monitor logs of Kubernetes pods asynchronously."""

CLIENTS_LOCK = threading.Lock()
CLIENTS: Dict[str, client.CoreV1Api] = {}

def __init__(
self,
token_retriever: Optional[TokenRetriever] = None,
):
"""
Create PodLogManger.
:param token_retriever: Retrieves auth tokens
"""
super().__init__()
self._token_retriever = token_retriever

def _k8s_client(self, k8s_context) -> client.CoreV1Api:
"""
K8S Clients are expensive to initialize (especially loading configuration).
We want to setup as few of them as possible, instead of one per long-running job.
We cache them in class level cache.
Access to this method can be from multiple-threads.
"""
if k8s_context not in KubernetesPodLogManager.CLIENTS:
with KubernetesPodLogManager.CLIENTS_LOCK:
configuration = client.Configuration()
config.load_kube_config(
client_configuration=configuration, context=k8s_context
)
k8s_client = client.CoreV1Api(
api_client=client.ApiClient(configuration=configuration)
)
k8s_client.api_client.configuration.api_key_prefix["authorization"] = (
"Bearer"
)
KubernetesPodLogManager.CLIENTS[k8s_context] = k8s_client
return KubernetesPodLogManager.CLIENTS[k8s_context]

def _with_bearer_auth(self, client):
client.api_client.configuration.api_key["authorization"] = (
self._token_retriever.get_token()
)

def fetch_container_logs(
self,
*,
k8s_context: str,
namespace: str,
pod: str,
container: str,
since_time: Optional[DateTime],
) -> Optional[DateTime]:
"""
Fetches container logs, do not follow container logs.
"""
client = self._k8s_client(k8s_context)
self._with_bearer_auth(client)
since_seconds = (
math.ceil((pendulum.now() - since_time).total_seconds())
if since_time
else None
)
try:
logs = client.read_namespaced_pod_log(
namespace=namespace,
name=pod,
container=container,
follow=False,
timestamps=True,
since_seconds=since_seconds,
_preload_content=False,
)
if logs.status == 404:
self.log.warning(f"Unable to fetch logs - pod {pod} has been deleted.")
return since_time
except HTTPError as e:
self.log.exception(f"There was an error reading the kubernetes API: {e}.")
raise

return self._stream_logs(container, since_time, logs)

def _stream_logs(
self, container: str, since_time: Optional[DateTime], logs: HTTPResponse
) -> Optional[DateTime]:
messages: List[str] = []
message_timestamp = None
try:
chunk = logs.read()
lines = chunk.decode("utf-8", errors="backslashreplace").splitlines()
for raw_line in lines:
line_timestamp, message = self._parse_log_line(raw_line)

if line_timestamp: # detect new log-line (starts with timestamp)
if since_time and line_timestamp <= since_time:
continue
self._log_container_message(container, messages)
messages.clear()
message_timestamp = line_timestamp
messages.append(message)
except HTTPError as e:
self.log.warning(
f"Reading of logs interrupted for container {container} with error {e}."
)

self._log_container_message(container, messages)
return message_timestamp

def _log_container_message(self, container: str, messages: List[str]):
if messages:
self.log.info("[%s] %s", container, "\n".join(messages))

def _parse_log_line(self, line: bytes) -> Tuple[DateTime | None, str]:
"""
Parse K8s log line and returns the final state.
:param line: k8s log line
:return: timestamp and log message
"""
timestamp, sep, message = line.strip().partition(" ")
if not sep:
return None, line
try:
last_log_time = cast(DateTime, pendulum.parse(timestamp))
except ParserError:
return None, line
return last_log_time, message
Loading

0 comments on commit 548a9e2

Please sign in to comment.