diff --git a/control-plane/pkg/reconciler/base/reconciler.go b/control-plane/pkg/reconciler/base/reconciler.go index 1583d2dcbe..73b19bf6b0 100644 --- a/control-plane/pkg/reconciler/base/reconciler.go +++ b/control-plane/pkg/reconciler/base/reconciler.go @@ -5,13 +5,13 @@ import ( "fmt" "github.com/gogo/protobuf/proto" - "go.uber.org/multierr" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/util/retry" coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config" ) @@ -115,7 +115,8 @@ func (r *Reconciler) UpdateDataPlaneConfigMap(brokersTriggers *coreconfig.Broker _, err = r.KubeClient.CoreV1().ConfigMaps(configMap.Namespace).Update(configMap) if err != nil { - return fmt.Errorf("failed to update config map %s/%s: %w", configMap.Namespace, configMap.Name, err) + // Return the same error, so that we can handle conflicting updates. + return err } return nil @@ -123,24 +124,30 @@ func (r *Reconciler) UpdateDataPlaneConfigMap(brokersTriggers *coreconfig.Broker func (r *Reconciler) UpdateDispatcherPodsAnnotation(logger *zap.Logger, volumeGeneration uint64) error { - labelSelector := labels.SelectorFromSet(map[string]string{"app": DispatcherLabel}) - pods, errors := r.PodLister.Pods(r.SystemNamespace).List(labelSelector) - if errors != nil { - return fmt.Errorf("failed to list dispatcher pods in namespace %s: %w", r.SystemNamespace, errors) - } + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + + labelSelector := labels.SelectorFromSet(map[string]string{"app": DispatcherLabel}) + pods, errors := r.PodLister.Pods(r.SystemNamespace).List(labelSelector) + if errors != nil { + return fmt.Errorf("failed to list dispatcher pods in namespace %s: %w", r.SystemNamespace, errors) + } - return r.updatePodsAnnotation(logger, volumeGeneration, pods) + return r.updatePodsAnnotation(logger, volumeGeneration, pods) + }) } func (r *Reconciler) UpdateReceiverPodsAnnotation(logger *zap.Logger, volumeGeneration uint64) error { - labelSelector := labels.SelectorFromSet(map[string]string{"app": ReceiverLabel}) - pods, errors := r.PodLister.Pods(r.SystemNamespace).List(labelSelector) - if errors != nil { - return fmt.Errorf("failed to list receiver pods in namespace %s: %w", r.SystemNamespace, errors) - } + return retry.RetryOnConflict(retry.DefaultRetry, func() error { - return r.updatePodsAnnotation(logger, volumeGeneration, pods) + labelSelector := labels.SelectorFromSet(map[string]string{"app": ReceiverLabel}) + pods, errors := r.PodLister.Pods(r.SystemNamespace).List(labelSelector) + if errors != nil { + return fmt.Errorf("failed to list receiver pods in namespace %s: %w", r.SystemNamespace, errors) + } + + return r.updatePodsAnnotation(logger, volumeGeneration, pods) + }) } func (r *Reconciler) updatePodsAnnotation(logger *zap.Logger, volumeGeneration uint64, pods []*corev1.Pod) error { @@ -167,14 +174,13 @@ func (r *Reconciler) updatePodsAnnotation(logger *zap.Logger, volumeGeneration u pod.SetAnnotations(annotations) if _, err := r.KubeClient.CoreV1().Pods(pod.Namespace).Update(pod); err != nil { - - errors = multierr.Append(errors, fmt.Errorf( - "failed to update pod %s/%s: %w", - pod.Namespace, - pod.Name, - err, - )) + // Return the same error, so that we can handle conflicting updates. + return err } } return errors } + +func (r *Reconciler) HandleConflicts(f func() error) error { + return retry.RetryOnConflict(retry.DefaultRetry, f) +} diff --git a/control-plane/pkg/reconciler/broker/broker.go b/control-plane/pkg/reconciler/broker/broker.go index af7d0b33a4..a5991c9452 100644 --- a/control-plane/pkg/reconciler/broker/broker.go +++ b/control-plane/pkg/reconciler/broker/broker.go @@ -27,6 +27,7 @@ import ( "github.com/Shopify/sarama" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/retry" eventing "knative.dev/eventing/pkg/apis/eventing/v1beta1" "knative.dev/eventing/pkg/logging" "knative.dev/pkg/configmap" @@ -61,6 +62,12 @@ type Reconciler struct { } func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + return r.reconcileKind(ctx, broker) + }) +} + +func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event { logger := log.Logger(ctx, "broker", broker) @@ -123,7 +130,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) // Update the configuration map with the new brokersTriggers data. if err := r.UpdateDataPlaneConfigMap(brokersTriggers, brokersTriggersConfigMap); err != nil { - return statusConditionManager.failedToUpdateBrokersTriggersConfigMap(err) + return err } statusConditionManager.brokersTriggersConfigMapUpdated() @@ -137,7 +144,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) // Update volume generation annotation of receiver pods if err := r.UpdateReceiverPodsAnnotation(logger, brokersTriggers.VolumeGeneration); err != nil { - return statusConditionManager.failedToUpdateReceiverPodsAnnotation(err) + return err } logger.Debug("Updated receiver pod annotation") @@ -161,6 +168,13 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) } func (r *Reconciler) FinalizeKind(ctx context.Context, broker *eventing.Broker) reconciler.Event { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + return r.finalizeKind(ctx, broker) + }) +} + +func (r *Reconciler) finalizeKind(ctx context.Context, broker *eventing.Broker) reconciler.Event { + logger := log.Logger(ctx, "broker", broker) logger.Debug("Finalizing broker") @@ -192,7 +206,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, broker *eventing.Broker) // Update the configuration map with the new brokersTriggers data. if err := r.UpdateDataPlaneConfigMap(brokersTriggers, brokersTriggersConfigMap); err != nil { - return fmt.Errorf("failed to update configuration map: %w", err) + return err } logger.Debug("Brokers and triggers config map updated") @@ -208,7 +222,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, broker *eventing.Broker) logger.Debug("Topic deleted", zap.String("topic", topic)) - return reconciledNormal(broker.Namespace, broker.Name) + return nil } func incrementVolumeGeneration(generation uint64) uint64 { @@ -289,6 +303,7 @@ func (r *Reconciler) SetBootstrapServers(servers string) error { config := sarama.NewConfig() config.Version = sarama.MaxVersion config.Net.KeepAlive = time.Second * 60 + config.Metadata.RefreshFrequency = time.Minute kafkaClusterAdmin, err := NewClusterAdmin(addrs, config) if err != nil { diff --git a/control-plane/pkg/reconciler/broker/broker_lifecycle.go b/control-plane/pkg/reconciler/broker/broker_lifecycle.go index 179e7d9cfb..59b2537a11 100644 --- a/control-plane/pkg/reconciler/broker/broker_lifecycle.go +++ b/control-plane/pkg/reconciler/broker/broker_lifecycle.go @@ -135,7 +135,7 @@ func (manager *statusConditionManager) reconciled() reconciler.Event { } broker.GetConditionSet().Manage(&broker.Status).MarkTrue(ConditionAddressable) - return reconciledNormal(broker.Namespace, broker.Name) + return nil } func (manager *statusConditionManager) failedToUpdateDispatcherPodsAnnotation(err error) { @@ -161,11 +161,3 @@ func (manager *statusConditionManager) failedToGetBrokerConfig(err error) reconc return fmt.Errorf("failed to get broker configuration: %w", err) } - -func reconciledNormal(namespace, name string) reconciler.Event { - return reconciler.NewEvent( - corev1.EventTypeNormal, - Reconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, namespace, name), - ) -} diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index d64e11f896..a6fa65e106 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -107,11 +107,6 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - Reconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName), - ), }, WantUpdates: []clientgotesting.UpdateActionImpl{ ConfigMapUpdate(&configs, &coreconfig.Brokers{ @@ -164,11 +159,6 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - Reconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName), - ), }, WantUpdates: []clientgotesting.UpdateActionImpl{ ConfigMapUpdate(&configs, &coreconfig.Brokers{ @@ -285,11 +275,6 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - Reconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName), - ), }, WantUpdates: []clientgotesting.UpdateActionImpl{ ConfigMapUpdate(&configs, &coreconfig.Brokers{ @@ -355,11 +340,6 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - Reconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName), - ), }, WantUpdates: []clientgotesting.UpdateActionImpl{ ConfigMapUpdate(&configs, &coreconfig.Brokers{ @@ -447,11 +427,6 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - Reconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName), - ), }, WantUpdates: []clientgotesting.UpdateActionImpl{ ConfigMapUpdate(&configs, &coreconfig.Brokers{ @@ -539,11 +514,6 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - Reconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName), - ), }, WantUpdates: []clientgotesting.UpdateActionImpl{ ConfigMapUpdate(&configs, &coreconfig.Brokers{ @@ -621,11 +591,6 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) { Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - Reconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName), - ), }, WantUpdates: []clientgotesting.UpdateActionImpl{ ConfigMapUpdate(&configs, &coreconfig.Brokers{ @@ -771,13 +736,6 @@ func brokerFinalization(t *testing.T, format string, configs Configs) { }, &configs), }, Key: testKey, - WantEvents: []string{ - Eventf( - corev1.EventTypeNormal, - Reconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName), - ), - }, WantUpdates: []clientgotesting.UpdateActionImpl{ ConfigMapUpdate(&configs, &coreconfig.Brokers{ Broker: []*coreconfig.Broker{}, @@ -805,13 +763,6 @@ func brokerFinalization(t *testing.T, format string, configs Configs) { }, &configs), }, Key: testKey, - WantEvents: []string{ - Eventf( - corev1.EventTypeNormal, - Reconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName), - ), - }, WantUpdates: []clientgotesting.UpdateActionImpl{ ConfigMapUpdate(&configs, &coreconfig.Brokers{ VolumeGeneration: 1, @@ -912,13 +863,6 @@ func brokerFinalization(t *testing.T, format string, configs Configs) { }, &configs), }, Key: testKey, - WantEvents: []string{ - Eventf( - corev1.EventTypeNormal, - Reconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName), - ), - }, WantUpdates: []clientgotesting.UpdateActionImpl{ ConfigMapUpdate(&configs, &coreconfig.Brokers{ Broker: []*coreconfig.Broker{ @@ -953,13 +897,6 @@ func brokerFinalization(t *testing.T, format string, configs Configs) { }, &configs), }, Key: testKey, - WantEvents: []string{ - Eventf( - corev1.EventTypeNormal, - Reconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName), - ), - }, WantUpdates: []clientgotesting.UpdateActionImpl{ ConfigMapUpdate(&configs, &coreconfig.Brokers{ Broker: []*coreconfig.Broker{ @@ -992,13 +929,6 @@ func brokerFinalization(t *testing.T, format string, configs Configs) { }, &configs), }, Key: testKey, - WantEvents: []string{ - Eventf( - corev1.EventTypeNormal, - Reconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName), - ), - }, }, } diff --git a/control-plane/pkg/reconciler/trigger/trigger.go b/control-plane/pkg/reconciler/trigger/trigger.go index fc4878d2d5..a7d2a658d2 100644 --- a/control-plane/pkg/reconciler/trigger/trigger.go +++ b/control-plane/pkg/reconciler/trigger/trigger.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/util/retry" eventing "knative.dev/eventing/pkg/apis/eventing/v1beta1" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1beta1" "knative.dev/pkg/controller" @@ -52,96 +53,9 @@ type Reconciler struct { } func (r *Reconciler) ReconcileKind(ctx context.Context, trigger *eventing.Trigger) reconciler.Event { - - logger := log.Logger(ctx, "trigger", trigger) - - logger.Debug("Reconciling Trigger", zap.Any("trigger", trigger)) - - statusConditionManager := statusConditionManager{ - Trigger: trigger, - Configs: r.Configs, - Recorder: controller.GetEventRecorder(ctx), - } - - broker, err := r.BrokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker) - if err != nil && !apierrors.IsNotFound(err) { - return statusConditionManager.failedToGetBroker(err) - } - - if apierrors.IsNotFound(err) || !broker.GetDeletionTimestamp().IsZero() { - // The associated broker doesn't exist anymore, so clean up Trigger resources. - return r.FinalizeKind(ctx, trigger) - } - - statusConditionManager.propagateBrokerCondition(broker) - - // Get data plane config map. - dataPlaneConfigMap, err := r.GetDataPlaneConfigMap() - if err != nil { - return statusConditionManager.failedToGetDataPlaneConfigMap(err) - } - - logger.Debug("Got brokers and triggers config map") - - // Get data plane config data. - dataPlaneConfig, err := r.GetDataPlaneConfigMapData(logger, dataPlaneConfigMap) - if err != nil || dataPlaneConfig == nil { - return statusConditionManager.failedToGetDataPlaneConfigFromConfigMap(err) - } - - logger.Debug( - "Got brokers and triggers data from config map", - zap.Any(base.BrokersTriggersDataLogKey, log.BrokersMarshaller{Brokers: dataPlaneConfig}), - ) - - brokerIndex := brokerreconciler.FindBroker(dataPlaneConfig, broker) - if brokerIndex == brokerreconciler.NoBroker { - return statusConditionManager.brokerNotFoundInDataPlaneConfigMap() - } - triggerIndex := findTrigger(dataPlaneConfig.Broker[brokerIndex].Triggers, trigger) - - triggerConfig, err := r.GetTriggerConfig(trigger) - if err != nil { - return statusConditionManager.failedToResolveTriggerConfig(err) - } - - statusConditionManager.subscriberResolved() - - if triggerIndex == noTrigger { - dataPlaneConfig.Broker[brokerIndex].Triggers = append( - dataPlaneConfig.Broker[brokerIndex].Triggers, - &triggerConfig, - ) - } else { - dataPlaneConfig.Broker[brokerIndex].Triggers[triggerIndex] = &triggerConfig - } - - // Increment volumeGeneration - dataPlaneConfig.VolumeGeneration = incrementVolumeGeneration(dataPlaneConfig.VolumeGeneration) - - // Update the configuration map with the new dataPlaneConfig data. - if err := r.UpdateDataPlaneConfigMap(dataPlaneConfig, dataPlaneConfigMap); err != nil { - return fmt.Errorf("failed to update configuration map: %w", err) - } - - // Update volume generation annotation of dispatcher pods - if err := r.UpdateDispatcherPodsAnnotation(logger, dataPlaneConfig.VolumeGeneration); err != nil { - // Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds. - // Since the dispatcher side is the consumer side, we don't lose availability, and we can consider the Trigger - // ready. So, log out the error and move on to the next step. - logger.Warn( - "Failed to update dispatcher pod annotation to trigger an immediate config map refresh", - zap.Error(err), - ) - - statusConditionManager.failedToUpdateDispatcherPodsAnnotation(err) - } else { - logger.Debug("Updated dispatcher pod annotation") - } - - logger.Debug("Brokers and triggers config map updated") - - return statusConditionManager.reconciled() + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + return r.reconcileKind(ctx, trigger) + }) } func (r *Reconciler) FinalizeKind(ctx context.Context, trigger *eventing.Trigger) reconciler.Event { @@ -157,7 +71,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, trigger *eventing.Trigger if apierrors.IsNotFound(err) { // If the broker is deleted, resources associated with the Trigger will be deleted. - return reconciledNormal(trigger.Namespace, trigger.Name) + return nil } // Get data plane config map. @@ -182,7 +96,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, trigger *eventing.Trigger brokerIndex := brokerreconciler.FindBroker(brokersTriggers, broker) if brokerIndex == brokerreconciler.NoBroker { // If the broker is not there, resources associated with the Trigger are deleted accordingly. - return reconciledNormal(trigger.Namespace, trigger.Name) + return nil } logger.Debug("Found Broker", zap.Int("brokerIndex", brokerIndex)) @@ -191,7 +105,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, trigger *eventing.Trigger triggerIndex := findTrigger(triggers, trigger) if triggerIndex == noTrigger { // The trigger is not there, resources associated with the Trigger are deleted accordingly. - return reconciledNormal(trigger.Namespace, trigger.Name) + return nil } logger.Debug("Found Trigger", zap.Int("triggerIndex", brokerIndex)) @@ -205,7 +119,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, trigger *eventing.Trigger // Update data plane config map. err = r.UpdateDataPlaneConfigMap(brokersTriggers, dataPlaneConfigMap) if err != nil { - return fmt.Errorf("failed to update data plane config map %s: %w", r.Configs.DataPlaneConfigMapAsString(), err) + return err } logger.Debug("Updated data plane config map", zap.String("configmap", r.Configs.DataPlaneConfigMapAsString())) @@ -223,7 +137,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, trigger *eventing.Trigger logger.Debug("Updated dispatcher pod annotation successfully") } - return reconciledNormal(trigger.Namespace, trigger.Name) + return nil } func (r *Reconciler) GetTriggerConfig(trigger *eventing.Trigger) (coreconfig.Trigger, error) { @@ -266,6 +180,99 @@ func (r *Reconciler) deleteTrigger(triggers []*coreconfig.Trigger, index int) [] return triggers[:len(triggers)-1] } +func (r *Reconciler) reconcileKind(ctx context.Context, trigger *eventing.Trigger) reconciler.Event { + + logger := log.Logger(ctx, "trigger", trigger) + + logger.Debug("Reconciling Trigger", zap.Any("trigger", trigger)) + + statusConditionManager := statusConditionManager{ + Trigger: trigger, + Configs: r.Configs, + Recorder: controller.GetEventRecorder(ctx), + } + + broker, err := r.BrokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker) + if err != nil && !apierrors.IsNotFound(err) { + return statusConditionManager.failedToGetBroker(err) + } + + if apierrors.IsNotFound(err) || !broker.GetDeletionTimestamp().IsZero() { + // The associated broker doesn't exist anymore, so clean up Trigger resources. + return r.FinalizeKind(ctx, trigger) + } + + statusConditionManager.propagateBrokerCondition(broker) + + // Get data plane config map. + dataPlaneConfigMap, err := r.GetDataPlaneConfigMap() + if err != nil { + return statusConditionManager.failedToGetDataPlaneConfigMap(err) + } + + logger.Debug("Got brokers and triggers config map") + + // Get data plane config data. + dataPlaneConfig, err := r.GetDataPlaneConfigMapData(logger, dataPlaneConfigMap) + if err != nil || dataPlaneConfig == nil { + return statusConditionManager.failedToGetDataPlaneConfigFromConfigMap(err) + } + + logger.Debug( + "Got brokers and triggers data from config map", + zap.Any(base.BrokersTriggersDataLogKey, log.BrokersMarshaller{Brokers: dataPlaneConfig}), + ) + + brokerIndex := brokerreconciler.FindBroker(dataPlaneConfig, broker) + if brokerIndex == brokerreconciler.NoBroker { + return statusConditionManager.brokerNotFoundInDataPlaneConfigMap() + } + triggerIndex := findTrigger(dataPlaneConfig.Broker[brokerIndex].Triggers, trigger) + + triggerConfig, err := r.GetTriggerConfig(trigger) + if err != nil { + return statusConditionManager.failedToResolveTriggerConfig(err) + } + + statusConditionManager.subscriberResolved() + + if triggerIndex == noTrigger { + dataPlaneConfig.Broker[brokerIndex].Triggers = append( + dataPlaneConfig.Broker[brokerIndex].Triggers, + &triggerConfig, + ) + } else { + dataPlaneConfig.Broker[brokerIndex].Triggers[triggerIndex] = &triggerConfig + } + + // Increment volumeGeneration + dataPlaneConfig.VolumeGeneration = incrementVolumeGeneration(dataPlaneConfig.VolumeGeneration) + + // Update the configuration map with the new dataPlaneConfig data. + if err := r.UpdateDataPlaneConfigMap(dataPlaneConfig, dataPlaneConfigMap); err != nil { + return err + } + + // Update volume generation annotation of dispatcher pods + if err := r.UpdateDispatcherPodsAnnotation(logger, dataPlaneConfig.VolumeGeneration); err != nil { + // Failing to update dispatcher pods annotation leads to config map refresh delayed by several seconds. + // Since the dispatcher side is the consumer side, we don't lose availability, and we can consider the Trigger + // ready. So, log out the error and move on to the next step. + logger.Warn( + "Failed to update dispatcher pod annotation to trigger an immediate config map refresh", + zap.Error(err), + ) + + statusConditionManager.failedToUpdateDispatcherPodsAnnotation(err) + } else { + logger.Debug("Updated dispatcher pod annotation") + } + + logger.Debug("Brokers and triggers config map updated") + + return statusConditionManager.reconciled() +} + func incrementVolumeGeneration(generation uint64) uint64 { return (generation + 1) % (math.MaxUint64 - 1) } diff --git a/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go b/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go index 75d31217a6..0ed40db453 100644 --- a/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go +++ b/control-plane/pkg/reconciler/trigger/trigger_lifecycle.go @@ -94,7 +94,7 @@ func (m *statusConditionManager) reconciled() reconciler.Event { Status: corev1.ConditionTrue, }) - return reconciledNormal(m.Trigger.Namespace, m.Trigger.Name) + return nil } func (m *statusConditionManager) failedToResolveTriggerConfig(err error) reconciler.Event { @@ -125,11 +125,3 @@ func (m *statusConditionManager) failedToUpdateDispatcherPodsAnnotation(err erro func (m *statusConditionManager) subscriberResolved() { m.Trigger.Status.MarkSubscriberResolvedSucceeded() } - -func reconciledNormal(namespace, name string) reconciler.Event { - return reconciler.NewEvent( - corev1.EventTypeNormal, - triggerReconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, trigger, namespace, name), - ) -} diff --git a/control-plane/pkg/reconciler/trigger/trigger_test.go b/control-plane/pkg/reconciler/trigger/trigger_test.go index f8ce0f5a8c..8637814099 100644 --- a/control-plane/pkg/reconciler/trigger/trigger_test.go +++ b/control-plane/pkg/reconciler/trigger/trigger_test.go @@ -96,11 +96,6 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - triggerReconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, trigger, triggerNamespace, triggerName), - ), }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(), @@ -191,11 +186,6 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - triggerReconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, trigger, triggerNamespace, triggerName), - ), }, WantPatches: []clientgotesting.PatchActionImpl{ patchFinalizers(), @@ -338,11 +328,6 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - triggerReconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, trigger, triggerNamespace, triggerName), - ), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{ { @@ -367,11 +352,6 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - triggerReconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, trigger, triggerNamespace, triggerName), - ), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{ { @@ -402,11 +382,6 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - triggerReconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, trigger, triggerNamespace, triggerName), - ), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{ { @@ -448,11 +423,6 @@ func triggerReconciliation(t *testing.T, format string, configs broker.Configs) Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - triggerReconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, trigger, triggerNamespace, triggerName), - ), }, WantUpdates: []clientgotesting.UpdateActionImpl{ ConfigMapUpdate(&configs, &coreconfig.Brokers{ @@ -540,11 +510,6 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - triggerReconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, trigger, triggerNamespace, triggerName), - ), }, WantUpdates: []clientgotesting.UpdateActionImpl{ ConfigMapUpdate(&configs, &coreconfig.Brokers{ @@ -595,11 +560,6 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - triggerReconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, trigger, triggerNamespace, triggerName), - ), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{ { @@ -624,11 +584,6 @@ func triggerFinalizer(t *testing.T, format string, configs broker.Configs) { Key: testKey, WantEvents: []string{ finalizerUpdatedEvent, - Eventf( - corev1.EventTypeNormal, - triggerReconciled, - fmt.Sprintf(`%s reconciled: "%s/%s"`, trigger, triggerNamespace, triggerName), - ), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{ { diff --git a/data-plane/config/100-config-kafka-broker-dispatcher.yaml b/data-plane/config/100-config-kafka-broker-dispatcher.yaml index 7ecd30bcf8..16838f15aa 100644 --- a/data-plane/config/100-config-kafka-broker-dispatcher.yaml +++ b/data-plane/config/100-config-kafka-broker-dispatcher.yaml @@ -6,15 +6,13 @@ metadata: data: config-kafka-broker-producer.properties: | bootstrap.servers=my-cluster-kafka-bootstrap.kafka:9092 - # key.serializer=org.apache.kafka.common.serialization.StringSerializer - # value.serializer=org.apache.kafka.common.serialization.StringSerializer acks=1 buffer.memory=33554432 # compression.type=snappy retries=2147483647 batch.size=16384 client.dns.lookup=use_all_dns_ips - client.id=KKBD # Knative Kafka Broker Dispatcher + client.id=KKBD connections.max.idle.ms=600000 delivery.timeout.ms=120000 linger.ms=0 @@ -38,10 +36,7 @@ data: # transactional.id=null config-kafka-broker-consumer.properties: | bootstrap.servers=my-cluster-kafka-bootstrap.kafka:9092 - # key.serializer=org.apache.kafka.common.serialization.StringSerializer - # value.serializer=org.apache.kafka.common.serialization.StringSerializer fetch.min.bytes=1 - # group.id= dynamically set heartbeat.interval.ms=3000 max.partition.fetch.bytes=1048576 session.timeout.ms=10000 @@ -51,18 +46,17 @@ data: # ssl.truststore.location= # ssl.truststore.password= allow.auto.create.topics=true - auto.offsets.reset=latest + auto.offset.reset=latest client.dns.lookup=use_all_dns_ips connections.max.idle.ms=540000 default.api.timeout.ms=60000 enable.auto.commit=false - exclude.internals.topics=false + exclude.internal.topics=true fetch.max.bytes=52428800 - # group.instance.id= isolation.level=read_uncommitted max.poll.interval.ms=300000 max.poll.records=500 - partition.assignement.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor # Kafka 2.3 required + # partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor receive.buffer.bytes=65536 request.timeout.ms=30000 # sasl.client.callback.handler.class= @@ -79,7 +73,7 @@ data: # ssl.provider= auto.commit.interval.ms=5000 check.crcs=true - client.id=KKBD # Knative Kafka Broker Dispatcher + client.id=KKBD # client.rack= fetch.max.wait.ms=500 # interceptor.classes= @@ -103,5 +97,3 @@ data: # ssl.keymanager.algorithm # ssl.secure.random.implementation # ssl.trustmanager.algorithm - - diff --git a/data-plane/config/100-config-kafka-broker-receiver.yaml b/data-plane/config/100-config-kafka-broker-receiver.yaml index 75049a54f6..a39951db38 100644 --- a/data-plane/config/100-config-kafka-broker-receiver.yaml +++ b/data-plane/config/100-config-kafka-broker-receiver.yaml @@ -6,15 +6,13 @@ metadata: data: config-kafka-broker-producer.properties: | bootstrap.servers=my-cluster-kafka-bootstrap.kafka:9092 - # key.serializer=org.apache.kafka.common.serialization.StringSerializer - # value.serializer=org.apache.kafka.common.serialization.StringSerializer acks=1 buffer.memory=33554432 # compression.type=snappy retries=2147483647 batch.size=16384 client.dns.lookup=use_all_dns_ips - client.id=KKBR # Knative Kafka Broker Receiver + client.id=KKBR connections.max.idle.ms=600000 delivery.timeout.ms=120000 linger.ms=0 diff --git a/data-plane/config/template/500-dispatcher.yaml b/data-plane/config/template/500-dispatcher.yaml index b19097b1f2..c0ffc99424 100644 --- a/data-plane/config/template/500-dispatcher.yaml +++ b/data-plane/config/template/500-dispatcher.yaml @@ -48,6 +48,10 @@ spec: value: "100" - name: TRIGGERS_INITIAL_CAPACITY value: "20" + - name: INSTANCE_ID + valueFrom: + fieldRef: + fieldPath: metadata.uid command: - "java" # TODO add JVM arguments diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/BrokerWrapper.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/BrokerWrapper.java index 7f03c886d8..b23ae27e09 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/BrokerWrapper.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/BrokerWrapper.java @@ -76,7 +76,7 @@ public boolean equals(Object o) { && broker.getDeadLetterSink().equals(that.deadLetterSink()) && broker.getTopic().equals(that.topic()) && broker.getName().equals(that.name()) - && broker.getNamespace().equals(that.name()); + && broker.getNamespace().equals(that.namespace()); } @Override @@ -89,4 +89,11 @@ public int hashCode() { broker.getName() ); } + + @Override + public String toString() { + return "BrokerWrapper{" + + "broker=" + broker + + '}'; + } } diff --git a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/BrokerWrapperTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/BrokerWrapperTest.java index be88f01c21..78edea5360 100644 --- a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/BrokerWrapperTest.java +++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/BrokerWrapperTest.java @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import dev.knative.eventing.kafka.broker.core.config.BrokersConfig; import dev.knative.eventing.kafka.broker.core.config.BrokersConfig.Broker; import java.util.stream.Stream; import org.junit.jupiter.api.Test; @@ -62,7 +63,7 @@ public void topicCallShouldBeDelegatedToWrappedBroker() { public void testTriggerEquality( final dev.knative.eventing.kafka.broker.core.Broker b1, final dev.knative.eventing.kafka.broker.core.Broker b2) { - + assertThat(b1).isEqualTo(b2); assertThat(b1.hashCode()).isEqualTo(b2.hashCode()); } @@ -123,6 +124,54 @@ public static Stream equalTriggersProvider() { Broker.newBuilder().build() ) ), + Arguments.of( + new BrokerWrapper( + Broker.newBuilder() + .setName("broker") + .setNamespace("test-event-transformation-for-trigger-v1-broker-v1-6wlx9") + .setId("93ab71bd-9e3c-42ee-a0a2-aeec6dca48c9") + .setTopic( + "knative-broker-test-event-transformation-for-trigger-v1-broker-v1-6wlx9-broker") + .addTriggers(BrokersConfig.Trigger.newBuilder() + .setDestination( + "http://trans-pod.test-event-transformation-for-trigger-v1-broker-v1-6wlx9.svc.cluster.local/") + .setId("9bfd1eb6-5f09-49cf-b3fa-b70377aff64f") + .putAttributes("source", "source1") + .putAttributes("type", "type1") + .build()) + .addTriggers(BrokersConfig.Trigger.newBuilder() + .setDestination( + "http://recordevents-pod.test-event-transformation-for-trigger-v1-broker-v1-6wlx9.svc.cluster.local/") + .setId("2417e90f-fc9a-430b-a9f7-68ee3b5929cd") + .putAttributes("source", "source2") + .putAttributes("type", "type2") + .build()) + .build() + ), + new BrokerWrapper( + Broker.newBuilder() + .setName("broker") + .setNamespace("test-event-transformation-for-trigger-v1-broker-v1-6wlx9") + .setId("93ab71bd-9e3c-42ee-a0a2-aeec6dca48c9") + .setTopic( + "knative-broker-test-event-transformation-for-trigger-v1-broker-v1-6wlx9-broker") + .addTriggers(BrokersConfig.Trigger.newBuilder() + .setDestination( + "http://trans-pod.test-event-transformation-for-trigger-v1-broker-v1-6wlx9.svc.cluster.local/") + .setId("9bfd1eb6-5f09-49cf-b3fa-b70377aff64f") + .putAttributes("source", "source1") + .putAttributes("type", "type1") + .build()) + .addTriggers(BrokersConfig.Trigger.newBuilder() + .setDestination( + "http://recordevents-pod.test-event-transformation-for-trigger-v1-broker-v1-6wlx9.svc.cluster.local/") + .setId("2417e90f-fc9a-430b-a9f7-68ee3b5929cd") + .putAttributes("source", "source2") + .putAttributes("type", "type2") + .build()) + .build() + ) + ), Arguments.of( new BrokerWrapper( Broker.newBuilder() diff --git a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/TriggerWrapperTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/TriggerWrapperTest.java index fdc43d1a1c..6fb92e06c7 100644 --- a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/TriggerWrapperTest.java +++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/TriggerWrapperTest.java @@ -55,7 +55,6 @@ public void idCallShouldBeDelegatedToWrappedTrigger() { ); assertThat(triggerWrapper.id()).isEqualTo(id); - } @Test diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordHandler.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordHandler.java index 1fa44027b0..3770e7e7af 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordHandler.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordHandler.java @@ -116,14 +116,20 @@ record -> Future.failedFuture("no DLQ configured") @Override public void handle(final KafkaConsumerRecord record) { + logger.debug("handling record {}", record); + receiver.recordReceived(record); if (filter.match(record.value())) { + logger.debug("record match filtering {}", record); + subscriberSender.send(record) .compose(sinkResponseHandler::handle) .onSuccess(response -> onSuccessfullySentToSubscriber(record)) .onFailure(cause -> onFailedToSendToSubscriber(record, cause)); } else { + logger.debug("record doesn't match filtering {}", record); + receiver.recordDiscarded(record); } } @@ -167,6 +173,7 @@ private static void logFailedSendTo( keyValue("topic", record.topic()), keyValue("partition", record.partition()), keyValue("offset", record.offset()), + keyValue("event", record.value()), cause ); } @@ -178,7 +185,8 @@ private static void logSuccessfulSendTo( logger.debug("record successfully handled by " + component + " {} {} {}", keyValue("topic", record.topic()), keyValue("partition", record.partition()), - keyValue("offset", record.offset()) + keyValue("offset", record.offset()), + keyValue("event", record.value()) ); } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Main.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Main.java index d33c2d0690..14630839f3 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Main.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Main.java @@ -47,6 +47,7 @@ public class Main { private static final String CONSUMER_CONFIG_FILE_PATH = "CONSUMER_CONFIG_FILE_PATH"; private static final String BROKERS_INITIAL_CAPACITY = "BROKERS_INITIAL_CAPACITY"; private static final String TRIGGERS_INITIAL_CAPACITY = "TRIGGERS_INITIAL_CAPACITY"; + public static final String INSTANCE_ID = "INSTANCE_ID"; /** * Dispatcher entry point. @@ -75,6 +76,7 @@ public static void main(final String[] args) { final var producerConfigs = config(json.getString(PRODUCER_CONFIG_FILE_PATH)); final var consumerConfigs = config(json.getString(CONSUMER_CONFIG_FILE_PATH)); + final var instanceID = json.getString(INSTANCE_ID); final ConsumerRecordOffsetStrategyFactory consumerRecordOffsetStrategyFactory = ConsumerRecordOffsetStrategyFactory.create(); diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/UnorderedConsumerRecordOffsetStrategy.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/UnorderedConsumerRecordOffsetStrategy.java index 3c50f98592..2982614330 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/UnorderedConsumerRecordOffsetStrategy.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/UnorderedConsumerRecordOffsetStrategy.java @@ -106,11 +106,13 @@ public void recordDiscarded(final KafkaConsumerRecord record) { private Future> commit( final KafkaConsumerRecord record) { + logger.debug("committing record {}", record); + final Promise> promise = Promise.promise(); final var topicPartitionsToCommit = Map.of( topicPartition(record), - new OffsetAndMetadata(record.offset(), "") + new OffsetAndMetadata(record.offset() + 1, "") ); consumer.commit(topicPartitionsToCommit, promise); diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpConsumerRecordSender.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpConsumerRecordSender.java index 04e8588fd8..7067d5a3e8 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpConsumerRecordSender.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpConsumerRecordSender.java @@ -16,6 +16,8 @@ package dev.knative.eventing.kafka.broker.dispatcher.http; +import static net.logstash.logback.argument.StructuredArguments.keyValue; + import dev.knative.eventing.kafka.broker.dispatcher.ConsumerRecordSender; import io.cloudevents.CloudEvent; import io.cloudevents.http.vertx.VertxMessageFactory; @@ -26,10 +28,14 @@ import io.vertx.kafka.client.consumer.KafkaConsumerRecord; import java.net.URI; import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class HttpConsumerRecordSender implements ConsumerRecordSender { + private static final Logger logger = LoggerFactory.getLogger(HttpConsumerRecordSender.class); + private final HttpClient client; private final String subscriberURI; @@ -60,6 +66,7 @@ public HttpConsumerRecordSender( public Future send(final KafkaConsumerRecord record) { final Promise promise = Promise.promise(); final var request = client.postAbs(subscriberURI) + .setTimeout(10000) .exceptionHandler(promise::tryFail) .handler(response -> { if (response.statusCode() >= 300) { @@ -67,6 +74,19 @@ public Future send(final KafkaConsumerRecord https://github.com/knative/eventing/issues/2411). promise.tryFail("response status code is not 2xx - got: " + response.statusCode()); + if (logger.isDebugEnabled()) { + logger.error("failed to send event to subscriber {} {} {}", + keyValue("subscriberURI", subscriberURI), + keyValue("statusCode", response.statusCode()), + keyValue("event", record.value()) + ); + } else { + logger.error("failed to send event to subscriber {} {}", + keyValue("subscriberURI", subscriberURI), + keyValue("statusCode", response.statusCode()) + ); + } + return; } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpConsumerVerticleFactory.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpConsumerVerticleFactory.java index 9d29759073..01d7c811b4 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpConsumerVerticleFactory.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/http/HttpConsumerVerticleFactory.java @@ -17,7 +17,6 @@ package dev.knative.eventing.kafka.broker.dispatcher.http; import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG; import dev.knative.eventing.kafka.broker.core.Broker; import dev.knative.eventing.kafka.broker.core.Trigger; @@ -56,10 +55,11 @@ public class HttpConsumerVerticleFactory implements ConsumerVerticleFactory @@ -163,7 +163,6 @@ protected io.vertx.kafka.client.consumer.KafkaConsumer creat // consumerConfigs is a shared object and it acts as a prototype for each consumer instance. final var consumerConfigs = (Properties) this.consumerConfigs.clone(); consumerConfigs.setProperty(GROUP_ID_CONFIG, trigger.id()); - consumerConfigs.setProperty(GROUP_INSTANCE_ID_CONFIG, trigger.id()); // TODO this isn't unique // Note: KafkaConsumer instances are not thread-safe. // There are methods thread-safe, but in general they're not. diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/BrokersManagerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/BrokersManagerTest.java index 9fc240871c..1a89e93311 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/BrokersManagerTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/BrokersManagerTest.java @@ -64,12 +64,12 @@ public void shouldAddBrokerAndDeployVerticles(final Vertx vertx, final VertxTest 100 ); - final var reconciled = brokersManager.reconcile(brokers); - - reconciled.onSuccess(ignored -> context.verify(() -> { - assertThat(vertx.deploymentIDs()).hasSize(numTriggers); - checkpoints.flag(); - })); + brokersManager.reconcile(brokers) + .onSuccess(ignored -> context.verify(() -> { + assertThat(vertx.deploymentIDs()).hasSize(numTriggers); + checkpoints.flag(); + })) + .onFailure(context::failNow); } @Test @@ -91,10 +91,12 @@ public void shouldNotDeployWhenFailedToGetVerticle( 100 ); - brokersManager.reconcile(brokers).onFailure(ignored -> context.verify(() -> { - assertThat(vertx.deploymentIDs()).hasSize(0); - checkpoint.flag(); - })); + brokersManager.reconcile(brokers) + .onFailure(ignored -> context.verify(() -> { + assertThat(vertx.deploymentIDs()).hasSize(0); + checkpoint.flag(); + })) + .onFailure(context::failNow); } @Test @@ -123,18 +125,22 @@ public void shouldStopVerticleWhenTriggerDeleted( 100 ); - brokersManager.reconcile(brokersOld).onSuccess(ignored -> { - - context.verify(() -> { - assertThat(vertx.deploymentIDs()).hasSize(numTriggersOld); - checkpoints.flag(); - }); - - brokersManager.reconcile(brokersNew).onSuccess(ok -> context.verify(() -> { - assertThat(vertx.deploymentIDs()).hasSize(numTriggersNew); - checkpoints.flag(); - })); - }); + brokersManager.reconcile(brokersOld) + .onSuccess(ignored -> { + + context.verify(() -> { + assertThat(vertx.deploymentIDs()).hasSize(numTriggersOld); + checkpoints.flag(); + }); + + brokersManager.reconcile(brokersNew) + .onSuccess(ok -> context.verify(() -> { + assertThat(vertx.deploymentIDs()).hasSize(numTriggersNew); + checkpoints.flag(); + })) + .onFailure(context::failNow); + }) + .onFailure(context::failNow); } @Test @@ -164,25 +170,28 @@ public void shouldStopVerticlesWhenBrokerDeleted( 100 ); - brokersManager.reconcile(brokersOld).onSuccess(ignored -> { - context.verify(() -> { - assertThat(vertx.deploymentIDs()).hasSize(numTriggersOld); - checkpoints.flag(); - }); - - brokersManager.reconcile(brokersNew).onSuccess(ok -> context.verify(() -> { - assertThat(vertx.deploymentIDs()).hasSize(numTriggersNew); - checkpoints.flag(); - })); - }); + brokersManager.reconcile(brokersOld) + .onSuccess(ignored -> { + context.verify(() -> { + assertThat(vertx.deploymentIDs()).hasSize(numTriggersOld); + checkpoints.flag(); + }); + + brokersManager.reconcile(brokersNew) + .onSuccess(ok -> context.verify(() -> { + assertThat(vertx.deploymentIDs()).hasSize(numTriggersNew); + checkpoints.flag(); + })) + .onFailure(context::failNow); + }) + .onFailure(context::failNow); } @Test @Timeout(value = 2) public void shouldStopAndStartVerticlesWhenTriggerDeletedAndReAdded( final Vertx vertx, - final VertxTestContext context) - throws Exception { + final VertxTestContext context) { final var brokersOld = Map.of( broker1(), Set.of(trigger1(), trigger2(), trigger4()), @@ -207,33 +216,37 @@ public void shouldStopAndStartVerticlesWhenTriggerDeletedAndReAdded( ); final var oldDeployments = vertx.deploymentIDs(); - brokersManager.reconcile(brokersOld).onSuccess(ignored -> { - - context.verify(() -> { - assertThat(oldDeployments).hasSize(numTriggersOld); - checkpoints.flag(); - }); - - brokersManager.reconcile(brokersNew).onSuccess(ok -> { - context.verify(() -> { - assertThat(vertx.deploymentIDs()).hasSize(numTriggersNew); - assertThat(vertx.deploymentIDs()).contains(oldDeployments.toArray(new String[0])); - checkpoints.flag(); - }); - - brokersManager.reconcile(brokersOld).onSuccess(ok2 -> context.verify(() -> { - assertThat(oldDeployments).hasSize(numTriggersOld); - checkpoints.flag(); - })); - }); - }); + brokersManager.reconcile(brokersOld) + .onSuccess(ignored -> { + + context.verify(() -> { + assertThat(oldDeployments).hasSize(numTriggersOld); + checkpoints.flag(); + }); + + brokersManager.reconcile(brokersNew) + .onSuccess(ok -> { + context.verify(() -> { + assertThat(vertx.deploymentIDs()).hasSize(numTriggersNew); + assertThat(vertx.deploymentIDs()).contains(oldDeployments.toArray(new String[0])); + checkpoints.flag(); + }); + + brokersManager.reconcile(brokersOld).onSuccess(ok2 -> context.verify(() -> { + assertThat(oldDeployments).hasSize(numTriggersOld); + checkpoints.flag(); + })); + }) + .onFailure(context::failNow); + }) + .onFailure(context::failNow); } @Test @Timeout(value = 2) public void shouldDoNothingWhenTheStateIsTheSame( final Vertx vertx, - final VertxTestContext context) throws InterruptedException { + final VertxTestContext context) { final var brokers = Map.of( broker1(), Set.of(trigger1(), trigger2(), trigger4()), @@ -255,20 +268,22 @@ public void shouldDoNothingWhenTheStateIsTheSame( 100, 100 ); - brokersManager.reconcile(brokers).onSuccess(ignored -> { - - final var deployments = vertx.deploymentIDs(); - - context.verify(() -> { - assertThat(deployments).hasSize(numTriggers); - checkpoints.flag(); - }); - - brokersManager.reconcile(brokers2).onSuccess(ok -> context.verify(() -> { - assertThat(vertx.deploymentIDs()).containsExactly(deployments.toArray(new String[0])); - checkpoints.flag(); - })); - }); + brokersManager.reconcile(brokers) + .onSuccess(ignored -> { + + final var deployments = vertx.deploymentIDs(); + + context.verify(() -> { + assertThat(deployments).hasSize(numTriggers); + checkpoints.flag(); + }); + + brokersManager.reconcile(brokers2).onSuccess(ok -> context.verify(() -> { + assertThat(vertx.deploymentIDs()).containsExactly(deployments.toArray(new String[0])); + checkpoints.flag(); + })); + }) + .onFailure(context::failNow); } @Test diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/UnorderedConsumerRecordOffsetStrategyTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/UnorderedConsumerRecordOffsetStrategyTest.java index d1ddb12c05..ce0c2a7d27 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/UnorderedConsumerRecordOffsetStrategyTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/UnorderedConsumerRecordOffsetStrategyTest.java @@ -136,7 +136,7 @@ final var record = new KafkaConsumerRecordImpl<>( assertThat(committed).hasSize(1); assertThat(committed.keySet()).containsExactlyInAnyOrder(topicPartition); - assertThat(committed.values()).containsExactlyInAnyOrder(new OffsetAndMetadata(offset, "")); + assertThat(committed.values()).containsExactlyInAnyOrder(new OffsetAndMetadata(offset + 1, "")); } @SuppressWarnings("unchecked") diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/CloudEventRequestToRecordMapper.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/CloudEventRequestToRecordMapper.java index 934441f95e..4b81310e7a 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/CloudEventRequestToRecordMapper.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/CloudEventRequestToRecordMapper.java @@ -17,6 +17,7 @@ package dev.knative.eventing.kafka.broker.receiver; import static java.lang.String.join; +import static net.logstash.logback.argument.StructuredArguments.keyValue; import dev.knative.eventing.kafka.broker.core.cloudevents.PartitionKey; import io.cloudevents.CloudEvent; @@ -27,9 +28,14 @@ import io.vertx.core.http.HttpServerRequest; import io.vertx.kafka.client.producer.KafkaProducerRecord; import java.util.StringTokenizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CloudEventRequestToRecordMapper implements RequestToRecordMapper { + private static final Logger logger = LoggerFactory + .getLogger(CloudEventRequestToRecordMapper.class); + static final int PATH_TOKEN_NUMBER = 2; static final String PATH_DELIMITER = "/"; static final String TOPIC_DELIMITER = "-"; @@ -48,6 +54,8 @@ public Future> recordFromRequest( return Future.failedFuture(new IllegalArgumentException("event cannot be null")); } + logger.debug("received event {}", keyValue("event", event)); + final var topic = topic(request.path()); if (topic == null) { return Future.failedFuture(new IllegalArgumentException("unable to determine topic")); diff --git a/go.mod b/go.mod index f334081ef2..439739ab36 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/google/uuid v1.1.1 github.com/kelseyhightower/envconfig v1.4.0 github.com/stretchr/testify v1.5.1 - go.uber.org/multierr v1.5.0 go.uber.org/zap v1.14.1 gotest.tools v2.2.0+incompatible k8s.io/api v0.17.6 diff --git a/test/e2e-tests.sh b/test/e2e-tests.sh index 6f19a80fb2..544817f452 100755 --- a/test/e2e-tests.sh +++ b/test/e2e-tests.sh @@ -24,6 +24,6 @@ wait_until_pods_running knative-eventing || fail_test "Knative Eventing did not header "Running tests" -go_test_e2e -timeout=10m ./test/... || fail_test "Integration test failed" +go_test_e2e -timeout=30m ./test/... || fail_test "Integration test failed" success diff --git a/test/e2e/conformance/data_plane_conformance_test.go b/test/e2e/conformance/data_plane_conformance_test.go index b77876f0bd..02c7eb2f88 100644 --- a/test/e2e/conformance/data_plane_conformance_test.go +++ b/test/e2e/conformance/data_plane_conformance_test.go @@ -21,6 +21,7 @@ package conformance import ( "testing" + eventing "knative.dev/eventing/pkg/apis/eventing/v1beta1" conformance "knative.dev/eventing/test/conformance/helpers" testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/resources" @@ -28,16 +29,32 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/kafka" ) -func TestDataPlaneConformanceTest(t *testing.T) { +func TestBrokerIngressV1Beta1(t *testing.T) { client := testlib.Setup(t, true) defer testlib.TearDown(client) + broker := createBroker(client) + + conformance.BrokerV1Beta1IngressDataPlaneTestHelper(t, client, broker) +} + +func TestBrokerConsumerV1Beta1(t *testing.T) { + + client := testlib.Setup(t, true) + defer testlib.TearDown(client) + + broker := createBroker(client) + + conformance.BrokerV1Beta1ConsumerDataPlaneTestHelper(t, client, broker) +} + +func createBroker(client *testlib.Client) *eventing.Broker { + broker := client.CreateBrokerV1Beta1OrFail("broker", resources.WithBrokerClassForBrokerV1Beta1(kafka.BrokerClass), ) client.WaitForResourceReadyOrFail(broker.Name, testlib.BrokerTypeMeta) - - conformance.BrokerV1Beta1IngressDataPlaneTestHelper(t, client, broker) + return broker }