Skip to content

Commit

Permalink
Download Helm repository indexes w/ Helm's getter
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddeco committed Apr 9, 2020
1 parent 22bbab7 commit 81873e7
Show file tree
Hide file tree
Showing 7 changed files with 402 additions and 63 deletions.
19 changes: 14 additions & 5 deletions api/v1alpha1/helmrepository_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import (

// HelmRepositorySpec defines the desired state of HelmRepository
type HelmRepositorySpec struct {
// +kubebuilder:validation:MinLength=4

// The repository address
Url string `json:"url"`
// +kubebuilder:validation:MinLength=4
URL string `json:"url"`

// The interval at which to check for repository updates
Interval metav1.Duration `json:"interval"`
Expand All @@ -35,6 +34,15 @@ type HelmRepositorySpec struct {
type HelmRepositoryStatus struct {
// +optional
Conditions []RepositoryCondition `json:"conditions,omitempty"`

// LastUpdateTime is the timestamp corresponding to the last status
// change of this repository.
// +optional
LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"`

// Path to the artifact of the last repository index.
// +optional
Artifact string `json:"artifact,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down Expand Up @@ -67,6 +75,7 @@ func init() {
}

const (
IndexDownloadFailedReason string = "IndexDownloadFailed"
IndexDownloadSucceedReason string = "IndexDownloadSucceed"
InvalidHelmRepositoryURLReason string = "InvalidHelmRepositoryURL"
IndexFetchFailedReason string = "IndexFetchFailedReason"
IndexFetchSucceededReason string = "IndexFetchSucceed"
)
4 changes: 4 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions config/crd/bases/source.fluxcd.io_helmrepositories.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ spec:
status:
description: HelmRepositoryStatus defines the observed state of HelmRepository
properties:
artifact:
description: Path to the artifact (index file) of the last repository
sync.
type: string
conditions:
items:
description: RepositoryCondition contains condition information for
Expand Down Expand Up @@ -93,6 +97,11 @@ spec:
- type
type: object
type: array
lastUpdateTime:
description: LastUpdateTime is the timestamp corresponding to the last
status change of this repository.
format: date-time
type: string
type: object
type: object
version: v1alpha1
Expand Down
242 changes: 187 additions & 55 deletions controllers/helmrepository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,38 @@ package controllers

import (
"context"
"crypto/sha1"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"time"

"github.com/go-logr/logr"
"gopkg.in/yaml.v2"
"helm.sh/helm/v3/pkg/getter"
"helm.sh/helm/v3/pkg/repo"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

sourcev1 "github.com/fluxcd/source-controller/api/v1alpha1"
)

// HelmRepositoryReconciler reconciles a HelmRepository object
type HelmRepositoryReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Log logr.Logger
Scheme *runtime.Scheme
StoragePath string
Getters getter.Providers
}

// +kubebuilder:rbac:groups=source.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -52,97 +61,220 @@ func (r *HelmRepositoryReconciler) Reconcile(req ctrl.Request) (ctrl.Result, err

log := r.Log.WithValues("helmrepository", req.NamespacedName)

var repo sourcev1.HelmRepository

if err := r.Get(ctx, req.NamespacedName, &repo); err != nil {
var repository sourcev1.HelmRepository
if err := r.Get(ctx, req.NamespacedName, &repository); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

readyCondition := sourcev1.RepositoryCondition{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionUnknown,
}
result := ctrl.Result{RequeueAfter: repository.Spec.Interval.Duration}

if len(repo.Status.Conditions) == 0 {
log.Info("Starting index download")
repo.Status.Conditions = []sourcev1.RepositoryCondition{readyCondition}
if err := r.Status().Update(ctx, &repo); err != nil {
log.Error(err, "unable to update HelmRepository status")
return ctrl.Result{}, err
// set initial status
if r.shouldResetStatus(repository) {
log.Info("Initialising repository")
repository.Status = sourcev1.HelmRepositoryStatus{}
repository.Status.Conditions = []sourcev1.RepositoryCondition{
{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionUnknown,
},
}
} else {
for _, condition := range repo.Status.Conditions {
if condition.Type == sourcev1.RepositoryConditionReady {
readyCondition = condition
break
}
if err := r.Status().Update(ctx, &repository); err != nil {
log.Error(err, "unable to update HelmRepository status")
return result, err
}
}

if err := r.downloadIndex(repo.Spec.Url); err != nil {
log.Info("Index download error", "error", err.Error())
readyCondition.Reason = sourcev1.IndexDownloadFailedReason
readyCondition.Message = err.Error()
readyCondition.Status = corev1.ConditionFalse
// try to download index
readyCondition, artifact, err := r.index(repository)
if err != nil {
log.Info("Helm repository index failed", "error", err.Error())
} else {
log.Info("Index download successful")
readyCondition.Reason = sourcev1.IndexDownloadSucceedReason
readyCondition.Message = "Repository is ready"
readyCondition.Status = corev1.ConditionTrue
// update artifact if path changed
if repository.Status.Artifact != artifact {
timeNew := metav1.Now()
repository.Status.LastUpdateTime = &timeNew
repository.Status.Artifact = artifact
}
log.Info("Helm repository index succeeded", "msg", readyCondition.Message)
}

// update status
timeNew := metav1.Now()
readyCondition.LastTransitionTime = &timeNew
repo.Status.Conditions = []sourcev1.RepositoryCondition{readyCondition}
repository.Status.Conditions = []sourcev1.RepositoryCondition{readyCondition}

if err := r.Status().Update(ctx, &repo); err != nil {
if err := r.Status().Update(ctx, &repository); err != nil {
log.Error(err, "unable to update HelmRepository status")
return ctrl.Result{}, err
return result, err
}

return ctrl.Result{RequeueAfter: repo.Spec.Interval.Duration}, nil
// requeue repository
return result, nil
}

func (r *HelmRepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&sourcev1.HelmRepository{}).
WithEventFilter(RepositoryChangePredicate{}).
WithEventFilter(predicate.Funcs{
DeleteFunc: func(e event.DeleteEvent) bool {
// delete artifacts
repoDir := filepath.Join(r.StoragePath,
fmt.Sprintf("helmrepositories/%s-%s", e.Meta.GetName(), e.Meta.GetNamespace()))
if err := os.RemoveAll(repoDir); err != nil {
r.Log.Error(err, "unable to delete artifacts",
"helmrepository", fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName()))
} else {
r.Log.Info("Helm repository artifacts deleted",
"helmrepository", fmt.Sprintf("%s/%s", e.Meta.GetNamespace(), e.Meta.GetName()))
}
return false
},
}).
Complete(r)
}

func (r *HelmRepositoryReconciler) downloadIndex(repoUrl string) error {
parsedURL, err := url.Parse(repoUrl)
func (r *HelmRepositoryReconciler) index(repository sourcev1.HelmRepository) (sourcev1.RepositoryCondition, string, error) {
u, err := url.Parse(repository.Spec.URL)
if err != nil {
return sourcev1.RepositoryCondition{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.InvalidHelmRepositoryURLReason,
Message: err.Error(),
}, "", err
}

c, err := r.Getters.ByScheme(u.Scheme)
if err != nil {
return sourcev1.RepositoryCondition{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.InvalidHelmRepositoryURLReason,
Message: err.Error(),
}, "", err
}

u.RawPath = path.Join(u.RawPath, "index.yaml")
u.Path = path.Join(u.Path, "index.yaml")

indexURL := u.String()
// TODO(hidde): add authentication config
res, err := c.Get(indexURL, getter.WithURL(repository.Spec.URL))
if err != nil {
return sourcev1.RepositoryCondition{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.IndexFetchFailedReason,
Message: err.Error(),
}, "", err
}

index, err := ioutil.ReadAll(res)
if err != nil {
return fmt.Errorf("unable to parse repository url %w", err)
return sourcev1.RepositoryCondition{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.IndexFetchFailedReason,
Message: err.Error(),
}, "", err
}

i := &repo.IndexFile{}
if err := yaml.Unmarshal(index, i); err != nil {
return sourcev1.RepositoryCondition{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.IndexFetchFailedReason,
Message: err.Error(),
}, "", err
}
parsedURL.RawPath = path.Join(parsedURL.RawPath, "index.yaml")
parsedURL.Path = path.Join(parsedURL.Path, "index.yaml")
indexURL := parsedURL.String()

res, err := http.DefaultClient.Get(indexURL)
b, err := yaml.Marshal(i)
if err != nil {
return fmt.Errorf("unable to download repository index %w", err)
return sourcev1.RepositoryCondition{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.IndexFetchFailedReason,
Message: err.Error(),
}, "", err
}

defer res.Body.Close()
repoPath := fmt.Sprintf("helmrepositories/%s-%s", repository.Name, repository.Namespace)
storage := filepath.Join(r.StoragePath, repoPath)
sum := checksum(b)
indexFileName := fmt.Sprintf("index-%s.yaml", sum)
indexFilePath := filepath.Join(storage, indexFileName)
artifactsURL := fmt.Sprintf("http://%s/helmrepositories/%s/%s", host(), repoPath, indexFileName)

if res.StatusCode > 300 {
return fmt.Errorf("unable to download repository index, respose status code %v", res.StatusCode)
if file, err := os.Stat(indexFilePath); !os.IsNotExist(err) && !file.IsDir() {
if fb, err := ioutil.ReadFile(indexFilePath); err == nil && sum == checksum(fb) {
return sourcev1.RepositoryCondition{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionTrue,
Reason: "GitCloneSucceed",
Message: fmt.Sprintf("Fetched artifact is available at %s", indexFilePath),
}, artifactsURL, nil
}
}

body, err := ioutil.ReadAll(res.Body)
err = os.MkdirAll(storage, 0755)
if err != nil {
err = fmt.Errorf("unable to create repository index directory: %w", err)
return sourcev1.RepositoryCondition{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.IndexFetchFailedReason,
Message: err.Error(),
}, "", err
}
err = ioutil.WriteFile(indexFilePath, index, 0644)
if err != nil {
return fmt.Errorf("unable to read repository index %w", err)
err = fmt.Errorf("unable to write repository index file: %w", err)
return sourcev1.RepositoryCondition{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionFalse,
Reason: sourcev1.IndexFetchFailedReason,
Message: err.Error(),
}, "", err
}
return sourcev1.RepositoryCondition{
Type: sourcev1.RepositoryConditionReady,
Status: corev1.ConditionTrue,
Reason: sourcev1.IndexFetchSucceededReason,
Message: fmt.Sprintf("Fetched artifact is available at %s", indexFilePath),
}, artifactsURL, nil
}

index := struct {
APIVersion string `json:"apiVersion"`
Generated time.Time `json:"generated"`
}{}
func (r *HelmRepositoryReconciler) shouldResetStatus(repository sourcev1.HelmRepository) bool {
resetStatus := false
if repository.Status.Artifact != "" {
pathParts := strings.Split(repository.Status.Artifact, "/")
path := fmt.Sprintf("helmrepositories/%s-%s/%s", repository.Name, repository.Namespace, pathParts[len(pathParts)-1])
if _, err := os.Stat(filepath.Join(r.StoragePath, path)); err != nil {
resetStatus = true
}
}

if err := yaml.Unmarshal(body, &index); err != nil {
return fmt.Errorf("unable to unmarshal repository index %w", err)
// set initial status
if len(repository.Status.Conditions) == 0 || resetStatus {
resetStatus = true
}

return nil
return resetStatus
}

// Checksum returns the SHA1 checksum for the given bytes as a string.
func checksum(b []byte) string {
return fmt.Sprintf("%x", sha1.Sum(b))
}

func host() string {
hostname := "localhost"
if os.Getenv("RUNTIME_NAMESPACE") != "" {
svcParts := strings.Split(os.Getenv("HOSTNAME"), "-")
hostname = fmt.Sprintf("%s.%s",
strings.Join(svcParts[:len(svcParts)-2], "-"), os.Getenv("RUNTIME_NAMESPACE"))
}
return hostname
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/onsi/ginkgo v1.11.0
github.com/onsi/gomega v1.8.1
gopkg.in/yaml.v2 v2.2.4
helm.sh/helm/v3 v3.1.2
k8s.io/api v0.17.2
k8s.io/apimachinery v0.17.2
k8s.io/client-go v0.17.2
Expand Down
Loading

0 comments on commit 81873e7

Please sign in to comment.