Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src/patchset.py: Implement Patchset service #342

Merged
merged 1 commit into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
.docker-env
data
*.pyc
*.venv

9 changes: 9 additions & 0 deletions config/kernelci.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ kdir = "/home/kernelci/data/src/linux"
output = "/home/kernelci/data/output"
storage_config = "docker-host"

[patchset]
kdir = "/home/kernelci/data/src/linux-patchset"
output = "/home/kernelci/data/output"
storage_config = "docker-host"
patchset_tmp_file_prefix = "kernel-patch"
patchset_short_hash_len = 13
allowed_domains = ["patchwork.kernel.org"]
polling_delay_secs = 30

[scheduler]
output = "/home/kernelci/data/output"

Expand Down
15 changes: 15 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,18 @@ services:
- '--mode=holdoff'
extra_hosts:
- "host.docker.internal:host-gateway"

patchset:
<<: *base-service
container_name: 'kernelci-pipeline-patchset'
command:
- './pipeline/patchset.py'
- '--settings=${KCI_SETTINGS:-/home/kernelci/config/kernelci.toml}'
- 'run'
volumes:
- './src:/home/kernelci/pipeline'
- './config:/home/kernelci/config'
- './data/ssh:/home/kernelci/data/ssh'
- './data/src:/home/kernelci/data/src'
- './data/output:/home/kernelci/data/output'

7 changes: 4 additions & 3 deletions src/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ def _run(self, sub_id):
event = self._api.receive_event(sub_id)
obj = event.data
dt = datetime.datetime.fromisoformat(event['time'])
commit = (obj['data']['kernel_revision']['commit'][:12]
if 'kernel_revision' in obj['data']
else str(None))
try:
commit = obj['data']['kernel_revision']['commit'][:12]
except (KeyError, TypeError):
commit = str(None)
result = result_map[obj['result']] if obj['result'] else str(None)
print(self.LOG_FMT.format(
time=dt.strftime('%Y-%m-%d %H:%M:%S.%f'),
Expand Down
329 changes: 329 additions & 0 deletions src/patchset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
#!/usr/bin/env python3
#
# SPDX-License-Identifier: LGPL-2.1-or-later
#
# Copyright (c) Meta Platforms, Inc. and affiliates.
# Author: Nikolay Yurin <[email protected]>

import os
import sys
import json
import requests
import time
import tempfile
import hashlib
from datetime import datetime, timedelta
from urllib.parse import urlparse
from urllib.request import urlopen

import kernelci
import kernelci.build
import kernelci.config
from kernelci.legacy.cli import Args, Command, parse_opts
import kernelci.storage

from base import Service
from tarball import Tarball


class Patchset(Tarball):
TAR_CREATE_CMD = """\
set -e
cd {target_dir}
tar --create --transform "s/^/{prefix}\\//" * | gzip > {tarball_path}
"""

APPLY_PATCH_SHELL_CMD = """\
set -e
cd {checkout_path}
patch -p1 < {patch_file}
"""

# FIXME: I really don"t have a good idea what I"m doing here
# This code probably needs rework and put into kernelci.patch
def _hash_patch(self, patch_name, patch_file):
allowed_prefixes = {
b"old mode", # Old file permissions
b"new mode", # New file permissions
b"-", # This convers both removed lines and source file
b"+", # This convers both added lines and target file
# "@" I don"t know how we should handle hunks yet
}
hashable_patch_lines = []
for line in patch_file.readlines():
if not line:
continue

for prefix in allowed_prefixes:
if line.startswith(prefix):
hashable_patch_lines.append(line)
break

hashable_content = b"/n".join(hashable_patch_lines)
self.log.debug(
"Hashable content:\n" +
hashable_content.decode("utf-8")
)
patch_hash_digest = hashlib.sha256(hashable_content).hexdigest()
self.log.debug(f"Patch {patch_name} hash: {patch_hash_digest}")
return patch_hash_digest

# FIXME: move into kernelci.patch
def _apply_patch(self, checkout_path, patch_name, patch_url):
self.log.info(
f"Applying patch {patch_name}, url: {patch_url}",
)
try:
encoding = urlopen(patch_url).headers.get_charsets()[0]
except Exception as e:
self.log.warn(
"Failed to fetch encoding from patch "
f"{patch_name} headers: {e}"
)
self.log.warn("Falling back to utf-8 encoding")
encoding = "utf-8"

with tempfile.NamedTemporaryFile(
prefix="{}-{}-".format(
self._service_config.patchset_tmp_file_prefix,
patch_name
),
encoding=encoding
) as tmp_f:
if not kernelci.build._download_file(patch_url, tmp_f.name):
raise FileNotFoundError(
f"Error downloading patch from {patch_url}"
)

kernelci.shell_cmd(self.APPLY_PATCH_SHELL_CMD.format(
checkout_path=checkout_path,
patch_file=tmp_f.name,
))

return self._hash_patch(patch_name, tmp_f)

# FIXME: move into kernelci.patch
def _apply_patches(self, checkout_path, patch_artifacts):
patchset_hash = hashlib.sha256()
for patch_name, patch_url in patch_artifacts.items():
patch_hash = self._apply_patch(checkout_path, patch_name, patch_url)
patchset_hash.update(patch_hash.encode("utf-8"))

patchset_hash_digest = patchset_hash.hexdigest()
self.log.debug(f"Patchset hash: {patchset_hash_digest}")
return patchset_hash_digest

def _download_checkout_archive(self, download_path, tarball_url, retries=3):
self.log.info(f"Downloading checkout tarball, url: {tarball_url}")
tar_filename = os.path.basename(urlparse(tarball_url).path)
kernelci.build.pull_tarball(
kdir=download_path,
url=tarball_url,
dest_filename=tar_filename,
retries=retries,
delete=True
)

def _update_node(
self,
patchset_node,
checkout_node,
tarball_url,
patchset_hash
):
patchset_data = checkout_node.get("data", {}).copy()
patchset_data["kernel_revision"]["patchset"] = patchset_hash

updated_node = patchset_node.copy()
updated_node["artifacts"]["tarball"] = tarball_url
updated_node["state"] = "available"
updated_node["data"] = patchset_data
updated_node["holdoff"] = str(
datetime.utcnow() + timedelta(minutes=10)
)

try:
self._api.node.update(updated_node)
except requests.exceptions.HTTPError as err:
err_msg = json.loads(err.response.content).get("detail", [])
self.log.error(err_msg)

def _setup(self, *args):
return self._api_helper.subscribe_filters({
"op": "created",
"name": "patchset",
"state": "running",
})

def _has_allowed_domain(self, url):
domain = urlparse(url).hostname
if domain not in self._service_config.allowed_domains:
raise RuntimeError(
"Forbidden mbox domain %s, allowed domains: %s",
domain,
self._service_config.allowed_domains,
)

def _get_patch_artifacts(self, patchset_node):
node_artifacts = patchset_node.get("artifacts")
if not node_artifacts:
raise ValueError(
"Patchset node %s has no artifacts",
patchset_node["id"],
)

for patch_mbox_url in node_artifacts.values():
self._has_allowed_domain(patch_mbox_url)

return node_artifacts

def _gen_checkout_name(self, checkout_node):
revision = checkout_node["data"]["kernel_revision"]
return "-".join([
"linux",
revision["tree"],
revision["branch"],
revision["describe"],
])

def _process_patchset(self, checkout_node, patchset_node):
patch_artifacts = self._get_patch_artifacts(patchset_node)

# Tarball download implicitely removes destination dir
# there's no need to cleanup this directory
self._download_checkout_archive(
download_path=self._service_config.kdir,
tarball_url=checkout_node["artifacts"]["tarball"]
)

checkout_name = self._gen_checkout_name(checkout_node)
checkout_path = os.path.join(self._service_config.kdir, checkout_name)

patchset_hash = self._apply_patches(checkout_path, patch_artifacts)
patchset_hash_short = patchset_hash[
:self._service_config.patchset_short_hash_len
]

tarball_path = self._make_tarball(
target_dir=checkout_path,
tarball_name=f"{checkout_name}-{patchset_hash_short}"
)
tarball_url = self._push_tarball(tarball_path)

self._update_node(
patchset_node=patchset_node,
checkout_node=checkout_node,
tarball_url=tarball_url,
patchset_hash=patchset_hash
)

def _mark_failed(self, patchset_node):
node = patchset_node.copy()
node.update({
"state": "done",
"result": "fail",
})
try:
self._api.node.update(node)
except requests.exceptions.HTTPError as err:
err_msg = json.loads(err.response.content).get("detail", [])
self.log.error(err_msg)

def _mark_failed_if_no_parent(self, patchset_node):
if not patchset_node["parent"]:
self.log.error(
f"Patchset node {patchset_node['id']} as has no parent"
"checkout node , marking node as failed",
)
self._mark_failed(patchset_node)
return True

return False

def _mark_failed_if_parent_failed(self, patchset_node, checkout_node):
if (
checkout_node["state"] == "done" and
checkout_node["result"] == "fail"
):
self.log.error(
f"Parent checkout node {checkout_node['id']} failed, "
f"marking patchset node {patchset_node['id']} as failed",
)
self._mark_failed(patchset_node)
return True

return False

def _run(self, _sub_id):
self.log.info("Listening for new trigger events")
self.log.info("Press Ctrl-C to stop.")

while True:
patchset_nodes = self._api.node.find({
"name": "patchset",
"state": "running",
})
yurinnick marked this conversation as resolved.
Show resolved Hide resolved

if patchset_nodes:
self.log.debug(f"Found patchset nodes: {patchset_nodes}")

for patchset_node in patchset_nodes:
yurinnick marked this conversation as resolved.
Show resolved Hide resolved
if self._mark_failed_if_no_parent(patchset_node):
continue

checkout_node = self._api.node.get(patchset_node["parent"])

if self._mark_failed_if_parent_failed(
patchset_node,
checkout_node
):
continue

if checkout_node["state"] == "running":
self.log.info(
f"Patchset node {patchset_node['id']} is waiting "
f"for checkout node {checkout_node['id']} to complete",
)
continue
Comment on lines +282 to +287
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, now I understand why you are finding all the running nodes on every event.
According to the state machine and design, checkout node should only have children after its state changes to available.
Instead of manually creating checkout and patchset nodes, we can listen to available checkout node events in the patchset service and create a node automatically from the service itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, for every patchset node I should create a new checkout node and wait for it to become available? It will pause the queue for quite some time and seems to be quite inefficient.
However it gave me another idea. Ideally I want to apply patchset to whatever latest commit is available on a tree. Is it possible to query latest available checkout node?

Copy link
Collaborator

@JenySadadia JenySadadia Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I am suggesting. You can listen to available checkout nodes and create patchset nodes from the service directly.
See, for instance, you can use the below filter here to get available checkout node events:

def _setup(self, *args):
        return self._api_helper.subscribe_filters({
            "kind": "checkout",
            "state": "available",
        })

Then you can get checkout node from the event just like https://github.com/kernelci/kernelci-pipeline/blob/main/src/tarball.py#L130 and then you can create patchset node from it.

Copy link
Contributor Author

@yurinnick yurinnick Mar 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it should be done outside of patchset service, right? In an another service that will prepare checkout node and then pass it to patchset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think the greatest challenge with checkout nodes so is picking the right one. I'm not sure how to determine an appropriate version to apply a patch and then correctly query a right node. If we have checkouts for 6.4, 6.8 etc how to pick the right version. Maybe we should pick the latest stable and next branch and create two patchset nodes based on them.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an another service that will prepare checkout node and then pass it to patchset?

The pipeline service trigger is already creating checkout nodes whenever a new commit is detected for a particular kernel revision. As I have not been involved in the patchset-related discussions, I am not aware of all the requirements, but do you think you can use checkout nodes created by the trigger service?
Once the trigger service creates a checkout node, the tarball service will create and push a tarball and change the node's status to available. Then you can use the below filter in the patchset service and create a patchset node from it:

    def _setup(self, *args):
        return self._api_helper.subscribe_filters({
            "op": "updated",
            "name": "checkout",
            "state": "available",
        })
    def _run(self, _sub_id):
        self.log.info("Listening for new trigger events")
        self.log.info("Press Ctrl-C to stop.")

        while True:
              checkout_node = self._api_helper.receive_event_node(sub_id)
              patchset_node = { create_patchset_node_from_checkout() }
              patchset_node = self._api.node.add(patchset_node)
              self._process_patchset(checkout_node, patchset_node)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed with @gctucker in kernelci-pipeline#295, patchset service only processed patchset nodes created by another service (patchwork<>kernelci connector). So patchset service should tream these nodes as created on-demand and process them accordingly.

I am open to suggestions on how to link existing checkout nodes and freshly created patchset node. I think it's a topic for a larger discussion on how we want to test kernel patches in general.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JenySadadia @yurinnick I agree it will be an important next step to find the best way to connect patchset nodes with checkout ones created by the trigger service. Let keep the scope of this PR to just processing patchset nodes created by on-demand - with an assumption for an already existing parent checkout node (like in the verification steps posted in this PR).

I created #490 to track this task. @JenySadadia Please let me know if there something else that should be done before starting integration of this service.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should merge this PR now and improvements can be discussed as follow-ups.


try:
self.log.info(
f"Processing patchset node: {patchset_node['id']}",
)
self._process_patchset(checkout_node, patchset_node)
except Exception as e:
self.log.error(
f"Patchset node {patchset_node['id']} "
f"processing failed: {e}",
)
self.log.traceback()
self._mark_failed(patchset_node)

self.log.info(
"Waiting %d seconds for a new nodes..." %
self._service_config.polling_delay_secs,
)
time.sleep(self._service_config.polling_delay_secs)


class cmd_run(Command):
help = (
"Wait for a checkout node to be available "
"and push a source+patchset tarball"
)
args = [
Args.kdir, Args.output, Args.api_config, Args.storage_config,
]
opt_args = [
Args.verbose, Args.storage_cred,
]

def __call__(self, configs, args):
return Patchset(configs, args).run(args)


if __name__ == "__main__":
opts = parse_opts("patchset", globals())
configs = kernelci.config.load("config")
status = opts.command(configs, opts)
sys.exit(0 if status is True else 1)
Loading
Loading