Skip to content

Commit

Permalink
make image pull policy configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
aslom committed Mar 27, 2019
1 parent c4a7041 commit e42e43f
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 19 deletions.
4 changes: 3 additions & 1 deletion contrib/kafka/config/500-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ spec:
containers:
- name: manager
image: github.com/knative/eventing-sources/contrib/kafka/cmd/controller
imagePullPolicy: Always
imagePullPolicy: IfNotPresent
env:
- name: KAFKA_RA_IMAGE
value: github.com/knative/eventing-sources/contrib/kafka/cmd/receive_adapter
- name: KAFKA_RA_IMAGE_PULL_POLLICY
value: IfNotPresent
volumeMounts:
resources:
limits:
Expand Down
46 changes: 34 additions & 12 deletions contrib/kafka/pkg/reconciler/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (
"github.com/knative/eventing-sources/contrib/kafka/pkg/apis/sources/v1alpha1"
"github.com/knative/pkg/logging"
"go.uber.org/zap"
"k8s.io/api/apps/v1"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -44,34 +45,54 @@ import (
)

const (
controllerAgentName = "kafka-source-controller"
raImageEnvVar = "KAFKA_RA_IMAGE"
controllerAgentName = "kafka-source-controller"
raImageEnvVar = "KAFKA_RA_IMAGE"
raImagePullPolicyEnvVar = "KAFKA_RA_IMAGE_PULL_POLLICY"
)

func Add(mgr manager.Manager) error {
raImage, defined := os.LookupEnv(raImageEnvVar)
if !defined {
return fmt.Errorf("required environment variable '%s' not defined", raImageEnvVar)
}
raImagePullPolicyStr, defined := os.LookupEnv(raImagePullPolicyEnvVar)
var raImagePullPolicy corev1.PullPolicy
if defined {
switch raImagePullPolicyStr {
case string(corev1.PullAlways):
raImagePullPolicy = corev1.PullAlways
case string(corev1.PullIfNotPresent):
raImagePullPolicy = corev1.PullIfNotPresent
case string(corev1.PullNever):
raImagePullPolicy = corev1.PullNever
default:
return fmt.Errorf("unsupported value %s for environment variable '%s'", raImagePullPolicyStr, raImagePullPolicyEnvVar)
}
} else {
raImagePullPolicy = corev1.PullIfNotPresent
}

log.Println("Adding the Apache Kafka Source controller.")
log.Printf("%s is %s\n", raImagePullPolicyEnvVar, string(raImagePullPolicy))
p := &sdk.Provider{
AgentName: controllerAgentName,
Parent: &v1alpha1.KafkaSource{},
Owns: []runtime.Object{&v1.Deployment{}},
Reconciler: &reconciler{
scheme: mgr.GetScheme(),
receiveAdapterImage: raImage,
scheme: mgr.GetScheme(),
receiveAdapterImage: raImage,
receiveAdapterImagePullPolicy: raImagePullPolicy,
},
}

return p.Add(mgr)
}

type reconciler struct {
client client.Client
scheme *runtime.Scheme
receiveAdapterImage string
client client.Client
scheme *runtime.Scheme
receiveAdapterImage string
receiveAdapterImagePullPolicy corev1.PullPolicy
}

func (r *reconciler) InjectClient(c client.Client) error {
Expand Down Expand Up @@ -121,10 +142,11 @@ func (r *reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Kaf
return ra, nil
}
svc := resources.MakeReceiveAdapter(&resources.ReceiveAdapterArgs{
Image: r.receiveAdapterImage,
Source: src,
Labels: getLabels(src),
SinkURI: sinkURI,
Image: r.receiveAdapterImage,
ImagePullPolicy: r.receiveAdapterImagePullPolicy,
Source: src,
Labels: getLabels(src),
SinkURI: sinkURI,
})

if err := controllerutil.SetControllerReference(src, svc, r.scheme); err != nil {
Expand Down
14 changes: 8 additions & 6 deletions contrib/kafka/pkg/reconciler/resources/receive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ package resources

import (
"fmt"
"strconv"

"github.com/knative/eventing-sources/contrib/kafka/pkg/apis/sources/v1alpha1"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strconv"
)

type ReceiveAdapterArgs struct {
Image string
Source *v1alpha1.KafkaSource
Labels map[string]string
SinkURI string
Image string
ImagePullPolicy corev1.PullPolicy
Source *v1alpha1.KafkaSource
Labels map[string]string
SinkURI string
}

func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment {
Expand Down Expand Up @@ -58,7 +60,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment {
{
Name: "receive-adapter",
Image: args.Image,
ImagePullPolicy: "Always",
ImagePullPolicy: args.ImagePullPolicy,
Env: []corev1.EnvVar{
{
Name: "KAFKA_BOOTSTRAP_SERVERS",
Expand Down

0 comments on commit e42e43f

Please sign in to comment.