Skip to content

Fix: misc issues with Jetstream management#3543

Closed
fullykubed wants to merge 8 commits intoargoproj:masterfrom
fullykubed:fix/stream-settings-sync
Closed

Fix: misc issues with Jetstream management#3543
fullykubed wants to merge 8 commits intoargoproj:masterfrom
fullykubed:fix/stream-settings-sync

Conversation

@fullykubed
Copy link
Copy Markdown
Contributor

Motivation

We use Argo Events with the Jetstream client and noticed some reliability issues, so we dug in and resolved a handful of bugs in our custom patch that we run in production. I am now attempting to upstream those changes so the community can benefit. Here are the fixes:

  1. The Kubernetes controller framework did not have a logging library set, and this was generating a null pointer error on controller startup. Ultimately, this wasn't harming anything, but to remove all doubt we fixed the issue by setting a logger.
  2. The Argo Events controller would accept Jetsteam streamSettings updates for the EventBus, but those were not being propagated to streams that were already created. This path adds this update capability and adds a watch that triggers an EventSource reconciliation anytime a connected EventBus's settings are updated.
  3. In adding (2), we found that the Argo Events controller had a problem where the deployments for EventSources and Sensors were not always being updated. This was caused because the original code was using client-side updates and outdated resourceVersion fields due to a stale cache. There was no retry logic, so we either had to (a) implement retry logic or (b) switch to server-side applies (SSA). Since SSA has been stable since Kubernetes 1.22 and is the recommended way for controllers to manage resources, we swapped out the client-side applies for SSA. The SSA isn't 100% optimized due to the CRDs not having all the required merge configurations, but it is more performant than the original implementation and resolves the resourceVersion conflict bug.
  4. The streamSettings weren't getting logged out with their keys, so it was impossible to tell what settings were actually being applied without logging into NATS directly. Resolved.
  5. The EventSource Jetstream publish call did not have retries enabled. This resulted in occasional message delivery failures if a NATS node was being restarted or there was a temporary network blip. This patch enables some basic retries (already builtin to the nats.go client) and resolves the issues we saw.
  6. Similar to (5), the NATS client wasn't configured to retry on failed connect. This resolved some more issues and has no downside (and is in fact the library's default -- the original code had disabled this for some reason?).
  7. Slightly bumped the Sensor fetch wait time from 1 second to 3 seconds to deal with issues in resource constrained environments. Ultimately, this should slightly reduce the number of errors that occur with no downside.

Notes

  1. We did remove the needsUpdate check for patching EventBus, Sensor, and EventSource resources in the reconciliation loop. In transitioning from client-side to server-side applies, the FieldOwner needs to be set on the finalizers even if the finalizer isn't updated in order to allow the controller to delete the finalizers in future server-side applies. From our testing, this didn't have any significant impact since the reconciliation loop is run so infrequently -- we figured better safe than sorry here.

Testing

We have done the following testing:

  1. Deployed this patch in our live Kubernetes system.
  2. Confirmed that updating streamSettings on the EventBus resource now appropriately updates the stream configuration in NATS.
  3. Confirmed that all the scenarios (we know of) that created failures in publishing to NATS are now resolved.
  4. Confirmed that Sensor, EventBus, and EventSource creation still work.
  5. Confirmed that all the reconciliation errors that were once common in the Argo Events controller no longer appear and that updates to Sensors and EventSources now always and immediately propagate to their controlled Deployments.
  6. Confirmed that finalizers and deletion still work.

Future Work

  1. The Sensors use a KV Jetstream store. This is set up incorrectly with only a replica count of 1 (even if the the Jetstream replicas is >1). This is a resiliency and availability problem. However, it has far less impact that on the EventSource side where the current issues prevent Argo Events from publishing to NATS in many common scenarios such as a restart of a NATS server. We will resolve the Sensor issue in a future PR.

@fullykubed fullykubed requested a review from whynowy as a code owner March 31, 2025 00:03
Signed-off-by: fullykubed <github@fullstackjack.io>
Signed-off-by: fullykubed <github@fullstackjack.io>
Signed-off-by: fullykubed <github@fullstackjack.io>
Signed-off-by: fullykubed <github@fullstackjack.io>
Signed-off-by: fullykubed <github@fullstackjack.io>
@fullykubed fullykubed force-pushed the fix/stream-settings-sync branch from 62adeec to 000d3f1 Compare March 31, 2025 00:04
Signed-off-by: fullykubed <github@fullstackjack.io>
@fullykubed
Copy link
Copy Markdown
Contributor Author

@whynowy There appears to be some nondeterminism in the unit test at pkg/sensors/triggers/kafka/kafka_test.go:198.

The CI runner is showing it as failing but I am not able to reproduce the failure locally and the test is not related to anything that this PR changes.

@fullykubed
Copy link
Copy Markdown
Contributor Author

@whynowy For the linting, I am not sure what is wrong, and some assistance would be appreciated.

When I run make lint, I receive the following logs:

go mod tidy
golangci-lint run --fix --verbose --concurrency 4 --timeout 5m --enable goimports
INFO [config_reader] Config search paths: [./ /home/jack/repos/argo-events-2 /home/jack/repos /home/jack /home /]
INFO [config_reader] Used config file .golangci.yml
WARN [config_reader] The configuration option `linters.errcheck.ignore` is deprecated, please use `linters.errcheck.exclude-functions`.
INFO [lintersdb] Active 17 linters: [dogsled errcheck goconst gocritic gofmt goimports goprintffuncname gosimple govet ineffassign misspell nakedret rowserrcheck staticcheck unconvert unused whitespace]
INFO [loader] Go packages loading at mode 575 (types_sizes|compiled_files|files|imports|deps|exports_file|name) took 528.67725ms
INFO [runner/filename_unadjuster] Pre-built 0 adjustments in 53.024117ms
INFO [linters_context/goanalysis] analyzers took 10.021090018s with top 10 stages: SA6002: 147.005551ms, SA1002: 135.97978ms, SA1027: 134.248702ms, SA5012: 133.680941ms, SA4015: 129.134875ms, SA4005: 128.575253ms, composites: 126.653755ms, printf: 121.442686ms, SA2003: 118.94888ms, nilness: 118.416932ms
INFO [runner] fixer took 0s with no stages
INFO [runner/skip_dirs] Skipped 624 issues from dir pkg/client/clientset/versioned/typed/events/v1alpha1 by pattern pkg/client
INFO [runner/skip_dirs] Skipped 936 issues from dir pkg/client/listers/events/v1alpha1 by pattern pkg/client
INFO [runner/max_same_issues] 18/21 issues with text "conn.Logger undefined (type *JetstreamTriggerConn has no field or method Logger)" were hidden, use --max-same-issues
INFO [runner/max_same_issues] 14/17 issues with text "s.Logger undefined (type *KafkaSensor has no field or method Logger)" were hidden, use --max-same-issues
INFO [runner/max_same_issues] 12/15 issues with text "stream.Logger undefined (type *SensorJetstream has no field or method Logger)" were hidden, use --max-same-issues
INFO [runner/max_same_issues] 2/5 issues with text "s.T undefined (type *E2ESuite has no field or method T)" were hidden, use --max-same-issues
INFO [runner/max_same_issues] 2/5 issues with text "could not import k8s.io/apimachinery/pkg/types (-: could not load export data: internal error in importing \"k8s.io/apimachinery/pkg/types\" (unsupported version: 2); please report an issue)" were hidden, use --max-same-issues
INFO [runner/max_from_linter] 13/63 issues from linter typecheck were hidden, use --max-issues-per-linter
INFO [runner] Issues before processing: 21216, after processing: 50
INFO [runner] Processors filtering stat (out/in): uniq_by_line: 111/19032, max_same_issues: 63/111, sort_results: 50/50, filename_unadjuster: 21216/21216, invalid_issue: 21216/21216, skip_dirs: 19032/20592, source_code: 50/50, path_shortener: 50/50, severity-rules: 50/50, path_prettifier: 21216/21216, exclude-rules: 19032/19032, nolint: 19032/19032, diff: 111/111, max_per_file_from_linter: 111/111, max_from_linter: 50/63, path_prefixer: 50/50, cgo: 21216/21216, autogenerated_exclude: 19032/19032, exclude: 19032/19032, skip_files: 20592/21216, identifier_marker: 19032/19032, fixer: 50/50
INFO [runner] processing took 512.131492ms with stages: exclude-rules: 336.285715ms, identifier_marker: 142.835183ms, skip_files: 14.902908ms, nolint: 6.194521ms, path_prettifier: 5.069828ms, skip_dirs: 2.142447ms, cgo: 1.386051ms, filename_unadjuster: 1.168123ms, invalid_issue: 1.099903ms, uniq_by_line: 509.697µs, autogenerated_exclude: 251.728µs, source_code: 216.749µs, max_same_issues: 33.889µs, fixer: 23.34µs, max_from_linter: 4.9µs, path_shortener: 3.36µs, max_per_file_from_linter: 2.35µs, sort_results: 250ns, exclude: 180ns, diff: 170ns, severity-rules: 130ns, path_prefixer: 70ns
INFO [runner] linters took 9.19342049s with stages: goanalysis_metalinter: 8.681237428s
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:50:16: stream.Init undefined (type *SensorJetstream has no field or method Init) (typecheck)
	err := stream.Init() // member of jetstreambase.Jetstream
	              ^
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:56:35: stream.MgmtConnection undefined (type *SensorJetstream has no field or method MgmtConnection) (typecheck)
	stream.keyValueStore, _ = stream.MgmtConnection.JSContext.KeyValue(stream.sensorName)
	                                 ^
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:59:38: stream.MgmtConnection undefined (type *SensorJetstream has no field or method MgmtConnection) (typecheck)
		stream.keyValueStore, err = stream.MgmtConnection.JSContext.CreateKeyValue(&nats.KeyValueConfig{Bucket: stream.sensorName})
		                                   ^
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:64:10: stream.Logger undefined (type *SensorJetstream has no field or method Logger) (typecheck)
		stream.Logger.Infof("found existing K/V store for sensor %s, using that", stream.sensorName)
		       ^
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:66:9: stream.Logger undefined (type *SensorJetstream has no field or method Logger) (typecheck)
	stream.Logger.Infof("successfully created/located K/V store for sensor %s", stream.sensorName)
	       ^
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:75:22: stream.MakeConnection undefined (type *SensorJetstream has no field or method MakeConnection) (typecheck)
	conn, err := stream.MakeConnection()
	                    ^
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:89:16: stream.Logger undefined (type *SensorJetstream has no field or method Logger) (typecheck)
	log := stream.Logger
	              ^
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:148:13: stream.MgmtConnection undefined (type *SensorJetstream has no field or method MgmtConnection) (typecheck)
	_ = stream.MgmtConnection.JSContext.DeleteConsumer("default", durableName) // sometimes we call this on a trigger/dependency combination not sure if it actually exists or not, so
	           ^
pkg/eventbus/jetstream/sensor/trigger_conn.go:66:33: connection.Logger undefined (type *JetstreamTriggerConn has no field or method Logger) (typecheck)
	connection.Logger = connection.Logger.With("triggerName", connection.triggerName, "sensorName", connection.sensorName)
	                               ^
pkg/eventbus/jetstream/sensor/trigger_conn.go:78:13: connection.Logger undefined (type *JetstreamTriggerConn has no field or method Logger) (typecheck)
	connection.Logger.Infof("Successfully located K/V store for sensor %s", sensorName)
	           ^
pkg/eventbus/jetstream/sensor/trigger_conn.go:113:14: conn.Logger undefined (type *JetstreamTriggerConn has no field or method Logger) (typecheck)
	log := conn.Logger
	            ^
pkg/eventbus/jetstream/sensor/trigger_conn.go:141:8: conn.Logger undefined (type *JetstreamTriggerConn has no field or method Logger) (typecheck)
		conn.Logger.Debugf("durable name for sensor='%s', trigger='%s', dep='%s': '%s'", conn.sensorName, conn.triggerName, dependency.Name, durableName)
		     ^
pkg/eventbus/jetstream/sensor/trigger_conn.go:143:48: conn.JSContext undefined (type *JetstreamTriggerConn has no field or method JSContext) (typecheck)
		subscriptions[subscriptionIndex], err = conn.JSContext.PullSubscribe(subject, durableName, nats.AckExplicit(), nats.DeliverNew())
		                                             ^
pkg/eventbus/jetstream/sensor/trigger_conn.go:184:7: conn.NATSConn undefined (type *JetstreamTriggerConn has no field or method NATSConn) (typecheck)
	conn.NATSConn.Close()
	     ^
pkg/eventbus/jetstream/sensor/trigger_conn.go:185:7: conn.Logger undefined (type *JetstreamTriggerConn has no field or method Logger) (typecheck)
	conn.Logger.Debug("closed NATSConn")
	     ^
pkg/eventbus/kafka/eventsource/source_conn.go:30:6: c.Logger undefined (type *KafkaSourceConnection has no field or method Logger) (typecheck)
			c.Logger.Fatalf(
			  ^
pkg/eventbus/kafka/eventsource/source_conn.go:40:4: c.Logger undefined (type *KafkaSourceConnection has no field or method Logger) (typecheck)
	c.Logger.Infow("Published message to kafka", zap.String("topic", c.Topic), zap.String("key", key), zap.Int32("partition", partition), zap.Int64("offset", offset))
	  ^
pkg/eventbus/kafka/eventsource/source_kafka.go:29:19: s.Config undefined (type *KafkaSource has no field or method Config) (typecheck)
	config, err := s.Config()
	                 ^
pkg/eventbus/kafka/eventsource/source_kafka.go:38:36: s.Brokers undefined (type *KafkaSource has no field or method Brokers) (typecheck)
	client, err := sarama.NewClient(s.Brokers(), config)
	                                  ^
pkg/eventbus/kafka/eventsource/source_kafka.go:49:46: s.Logger undefined (type *KafkaSource has no field or method Logger) (typecheck)
		KafkaConnection: base.NewKafkaConnection(s.Logger),
		                                           ^
pkg/eventbus/kafka/sensor/kafka_handler.go:207:7: h.Lock undefined (type *KafkaHandler has no field or method Lock) (typecheck)
				h.Lock()
				  ^
pkg/eventbus/kafka/sensor/kafka_handler.go:208:13: h.Unlock undefined (type *KafkaHandler has no field or method Unlock) (typecheck)
				defer h.Unlock()
				        ^
pkg/eventbus/kafka/sensor/kafka_handler.go:225:4: h.Lock undefined (type *KafkaHandler has no field or method Lock) (typecheck)
	h.Lock()
	  ^
pkg/eventbus/kafka/sensor/kafka_handler.go:226:10: h.Unlock undefined (type *KafkaHandler has no field or method Unlock) (typecheck)
	defer h.Unlock()
	        ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:107:19: s.Config undefined (type *KafkaSensor has no field or method Config) (typecheck)
	config, err := s.Config()
	                 ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:115:36: s.Brokers undefined (type *KafkaSensor has no field or method Brokers) (typecheck)
	client, err := sarama.NewClient(s.Brokers(), config)
	                                  ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:138:6: s.Logger undefined (type *KafkaSensor has no field or method Logger) (typecheck)
			s.Logger.Errorf("Kafka producer error", zap.Error(err))
			  ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:146:20: s.Logger undefined (type *KafkaSensor has no field or method Logger) (typecheck)
		Logger:        s.Logger,
		                 ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:163:4: s.Lock undefined (type *KafkaSensor has no field or method Lock) (typecheck)
	s.Lock()
	  ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:164:10: s.Unlock undefined (type *KafkaSensor has no field or method Unlock) (typecheck)
	defer s.Unlock()
	        ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:186:47: s.Logger undefined (type *KafkaSensor has no field or method Logger) (typecheck)
			KafkaConnection: base.NewKafkaConnection(s.Logger),
			                                           ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:239:4: s.Lock undefined (type *KafkaSensor has no field or method Lock) (typecheck)
	s.Lock()
	  ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:240:10: s.Unlock undefined (type *KafkaSensor has no field or method Unlock) (typecheck)
	defer s.Unlock()
	        ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:246:4: s.Lock undefined (type *KafkaSensor has no field or method Lock) (typecheck)
	s.Lock()
	  ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:247:10: s.Unlock undefined (type *KafkaSensor has no field or method Unlock) (typecheck)
	defer s.Unlock()
	        ^
pkg/eventbus/kafka/sensor/trigger_conn.go:45:12: e1.Source undefined (type *eventWithMetadata has no field or method Source) (typecheck)
	return e1.Source() == e2.Source() && e1.Subject() == e2.Subject()
	          ^
pkg/eventbus/kafka/sensor/trigger_handler.go:144:4: c.Logger undefined (type *KafkaTriggerConnection has no field or method Logger) (typecheck)
	c.Logger.Infow("Evaluating", zap.String("expr", c.depExpression.String()), zap.Any("parameters", parameters))
	  ^
pkg/eventbus/stan/eventsource/source_conn.go:22:11: n.STANConn undefined (type *STANSourceConn has no field or method STANConn) (typecheck)
	return n.STANConn.Publish(n.subject, msg.Body)
	         ^
pkg/eventbus/stan/eventsource/source_stan.go:28:17: n.MakeConnection undefined (type *SourceSTAN has no field or method MakeConnection) (typecheck)
	conn, err := n.MakeConnection(clientID)
	               ^
pkg/eventbus/stan/sensor/sensor_stan.go:38:17: n.MakeConnection undefined (type *SensorSTAN has no field or method MakeConnection) (typecheck)
	conn, err := n.MakeConnection(clientID)
	               ^
pkg/eventbus/stan/sensor/trigger_conn.go:33:15: n.Logger undefined (type *STANTriggerConn has no field or method Logger) (typecheck)
	n.Logger = n.Logger.With("triggerName", n.triggerName).With("clientID", n.ClientID)
	             ^
pkg/eventbus/stan/sensor/trigger_conn.go:41:76: n.ClientID undefined (type *STANTriggerConn has no field or method ClientID) (typecheck)
	return fmt.Sprintf("STANTriggerConn{ClientID:%s,Sensor:%s,Trigger:%s}", n.ClientID, n.sensorName, n.triggerName)
	                                                                          ^
pkg/eventbus/stan/sensor/trigger_conn.go:79:11: n.Logger undefined (type *STANTriggerConn has no field or method Logger) (typecheck)
	log := n.Logger
	         ^
pkg/eventbus/stan/sensor/trigger_conn.go:90:45: n.ClientID undefined (type *STANTriggerConn has no field or method ClientID) (typecheck)
	group, err := n.getGroupNameFromClientID(n.ClientID)
	                                           ^
pkg/eventbus/stan/sensor/trigger_conn.go:95:16: n.STANConn undefined (type *STANTriggerConn has no field or method STANConn) (typecheck)
	sub, err := n.STANConn.QueueSubscribe(*defaultSubject, group, func(m *stan.Msg) {
	              ^
pkg/eventbus/stan/sensor/trigger_conn.go:281:51: n.ClientID undefined (type *STANTriggerConn has no field or method ClientID) (typecheck)
	log.Debugf("Triggering actions for client %s", n.ClientID)
	                                                 ^
pkg/eventbus/stan/sensor/trigger_conn.go:290:11: n.Logger undefined (type *STANTriggerConn has no field or method Logger) (typecheck)
	log := n.Logger.With("clientID", n.ClientID)
	         ^
pkg/reconciler/cmd/start.go:11:2: could not import k8s.io/apimachinery/pkg/types (-: could not load export data: internal error in importing "k8s.io/apimachinery/pkg/types" (unsupported version: 2); please report an issue) (typecheck)
	"k8s.io/apimachinery/pkg/types"
	^
test/e2e/fixtures/e2e_suite.go:99:4: s.T undefined (type *E2ESuite has no field or method T) (typecheck)
	s.T().Log("EventBus is ready")
	  ^
test/e2e/fixtures/e2e_suite.go:112:4: s.T undefined (type *E2ESuite has no field or method T) (typecheck)
	s.T().Log("EventBus is deleted")
	  ^
INFO File cache stats: 14 entries of total size 88.3KiB
INFO Memory: 99 samples, avg is 1286.8MB, max is 1808.2MB
INFO Execution took 9.781753625s
make: *** [Makefile:177: lint] Error 1

However, the vast majority of these files are not ones that this PR changes nor are these the same errors that show up in the CI runner. Are there particular versions of the linting utilities that I should be installing?

@whynowy whynowy self-assigned this Apr 6, 2025
@fullykubed
Copy link
Copy Markdown
Contributor Author

@whynowy Anything I can add to help get this integrated? I have another patch for the Jetstream sensor stability & availability improvements that I'd like to open a PR for, but it builds on the work here.

@whynowy
Copy link
Copy Markdown
Member

whynowy commented Apr 13, 2025

@whynowy For the linting, I am not sure what is wrong, and some assistance would be appreciated.

When I run make lint, I receive the following logs:

go mod tidy
golangci-lint run --fix --verbose --concurrency 4 --timeout 5m --enable goimports
INFO [config_reader] Config search paths: [./ /home/jack/repos/argo-events-2 /home/jack/repos /home/jack /home /]
INFO [config_reader] Used config file .golangci.yml
WARN [config_reader] The configuration option `linters.errcheck.ignore` is deprecated, please use `linters.errcheck.exclude-functions`.
INFO [lintersdb] Active 17 linters: [dogsled errcheck goconst gocritic gofmt goimports goprintffuncname gosimple govet ineffassign misspell nakedret rowserrcheck staticcheck unconvert unused whitespace]
INFO [loader] Go packages loading at mode 575 (types_sizes|compiled_files|files|imports|deps|exports_file|name) took 528.67725ms
INFO [runner/filename_unadjuster] Pre-built 0 adjustments in 53.024117ms
INFO [linters_context/goanalysis] analyzers took 10.021090018s with top 10 stages: SA6002: 147.005551ms, SA1002: 135.97978ms, SA1027: 134.248702ms, SA5012: 133.680941ms, SA4015: 129.134875ms, SA4005: 128.575253ms, composites: 126.653755ms, printf: 121.442686ms, SA2003: 118.94888ms, nilness: 118.416932ms
INFO [runner] fixer took 0s with no stages
INFO [runner/skip_dirs] Skipped 624 issues from dir pkg/client/clientset/versioned/typed/events/v1alpha1 by pattern pkg/client
INFO [runner/skip_dirs] Skipped 936 issues from dir pkg/client/listers/events/v1alpha1 by pattern pkg/client
INFO [runner/max_same_issues] 18/21 issues with text "conn.Logger undefined (type *JetstreamTriggerConn has no field or method Logger)" were hidden, use --max-same-issues
INFO [runner/max_same_issues] 14/17 issues with text "s.Logger undefined (type *KafkaSensor has no field or method Logger)" were hidden, use --max-same-issues
INFO [runner/max_same_issues] 12/15 issues with text "stream.Logger undefined (type *SensorJetstream has no field or method Logger)" were hidden, use --max-same-issues
INFO [runner/max_same_issues] 2/5 issues with text "s.T undefined (type *E2ESuite has no field or method T)" were hidden, use --max-same-issues
INFO [runner/max_same_issues] 2/5 issues with text "could not import k8s.io/apimachinery/pkg/types (-: could not load export data: internal error in importing \"k8s.io/apimachinery/pkg/types\" (unsupported version: 2); please report an issue)" were hidden, use --max-same-issues
INFO [runner/max_from_linter] 13/63 issues from linter typecheck were hidden, use --max-issues-per-linter
INFO [runner] Issues before processing: 21216, after processing: 50
INFO [runner] Processors filtering stat (out/in): uniq_by_line: 111/19032, max_same_issues: 63/111, sort_results: 50/50, filename_unadjuster: 21216/21216, invalid_issue: 21216/21216, skip_dirs: 19032/20592, source_code: 50/50, path_shortener: 50/50, severity-rules: 50/50, path_prettifier: 21216/21216, exclude-rules: 19032/19032, nolint: 19032/19032, diff: 111/111, max_per_file_from_linter: 111/111, max_from_linter: 50/63, path_prefixer: 50/50, cgo: 21216/21216, autogenerated_exclude: 19032/19032, exclude: 19032/19032, skip_files: 20592/21216, identifier_marker: 19032/19032, fixer: 50/50
INFO [runner] processing took 512.131492ms with stages: exclude-rules: 336.285715ms, identifier_marker: 142.835183ms, skip_files: 14.902908ms, nolint: 6.194521ms, path_prettifier: 5.069828ms, skip_dirs: 2.142447ms, cgo: 1.386051ms, filename_unadjuster: 1.168123ms, invalid_issue: 1.099903ms, uniq_by_line: 509.697µs, autogenerated_exclude: 251.728µs, source_code: 216.749µs, max_same_issues: 33.889µs, fixer: 23.34µs, max_from_linter: 4.9µs, path_shortener: 3.36µs, max_per_file_from_linter: 2.35µs, sort_results: 250ns, exclude: 180ns, diff: 170ns, severity-rules: 130ns, path_prefixer: 70ns
INFO [runner] linters took 9.19342049s with stages: goanalysis_metalinter: 8.681237428s
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:50:16: stream.Init undefined (type *SensorJetstream has no field or method Init) (typecheck)
	err := stream.Init() // member of jetstreambase.Jetstream
	              ^
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:56:35: stream.MgmtConnection undefined (type *SensorJetstream has no field or method MgmtConnection) (typecheck)
	stream.keyValueStore, _ = stream.MgmtConnection.JSContext.KeyValue(stream.sensorName)
	                                 ^
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:59:38: stream.MgmtConnection undefined (type *SensorJetstream has no field or method MgmtConnection) (typecheck)
		stream.keyValueStore, err = stream.MgmtConnection.JSContext.CreateKeyValue(&nats.KeyValueConfig{Bucket: stream.sensorName})
		                                   ^
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:64:10: stream.Logger undefined (type *SensorJetstream has no field or method Logger) (typecheck)
		stream.Logger.Infof("found existing K/V store for sensor %s, using that", stream.sensorName)
		       ^
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:66:9: stream.Logger undefined (type *SensorJetstream has no field or method Logger) (typecheck)
	stream.Logger.Infof("successfully created/located K/V store for sensor %s", stream.sensorName)
	       ^
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:75:22: stream.MakeConnection undefined (type *SensorJetstream has no field or method MakeConnection) (typecheck)
	conn, err := stream.MakeConnection()
	                    ^
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:89:16: stream.Logger undefined (type *SensorJetstream has no field or method Logger) (typecheck)
	log := stream.Logger
	              ^
pkg/eventbus/jetstream/sensor/sensor_jetstream.go:148:13: stream.MgmtConnection undefined (type *SensorJetstream has no field or method MgmtConnection) (typecheck)
	_ = stream.MgmtConnection.JSContext.DeleteConsumer("default", durableName) // sometimes we call this on a trigger/dependency combination not sure if it actually exists or not, so
	           ^
pkg/eventbus/jetstream/sensor/trigger_conn.go:66:33: connection.Logger undefined (type *JetstreamTriggerConn has no field or method Logger) (typecheck)
	connection.Logger = connection.Logger.With("triggerName", connection.triggerName, "sensorName", connection.sensorName)
	                               ^
pkg/eventbus/jetstream/sensor/trigger_conn.go:78:13: connection.Logger undefined (type *JetstreamTriggerConn has no field or method Logger) (typecheck)
	connection.Logger.Infof("Successfully located K/V store for sensor %s", sensorName)
	           ^
pkg/eventbus/jetstream/sensor/trigger_conn.go:113:14: conn.Logger undefined (type *JetstreamTriggerConn has no field or method Logger) (typecheck)
	log := conn.Logger
	            ^
pkg/eventbus/jetstream/sensor/trigger_conn.go:141:8: conn.Logger undefined (type *JetstreamTriggerConn has no field or method Logger) (typecheck)
		conn.Logger.Debugf("durable name for sensor='%s', trigger='%s', dep='%s': '%s'", conn.sensorName, conn.triggerName, dependency.Name, durableName)
		     ^
pkg/eventbus/jetstream/sensor/trigger_conn.go:143:48: conn.JSContext undefined (type *JetstreamTriggerConn has no field or method JSContext) (typecheck)
		subscriptions[subscriptionIndex], err = conn.JSContext.PullSubscribe(subject, durableName, nats.AckExplicit(), nats.DeliverNew())
		                                             ^
pkg/eventbus/jetstream/sensor/trigger_conn.go:184:7: conn.NATSConn undefined (type *JetstreamTriggerConn has no field or method NATSConn) (typecheck)
	conn.NATSConn.Close()
	     ^
pkg/eventbus/jetstream/sensor/trigger_conn.go:185:7: conn.Logger undefined (type *JetstreamTriggerConn has no field or method Logger) (typecheck)
	conn.Logger.Debug("closed NATSConn")
	     ^
pkg/eventbus/kafka/eventsource/source_conn.go:30:6: c.Logger undefined (type *KafkaSourceConnection has no field or method Logger) (typecheck)
			c.Logger.Fatalf(
			  ^
pkg/eventbus/kafka/eventsource/source_conn.go:40:4: c.Logger undefined (type *KafkaSourceConnection has no field or method Logger) (typecheck)
	c.Logger.Infow("Published message to kafka", zap.String("topic", c.Topic), zap.String("key", key), zap.Int32("partition", partition), zap.Int64("offset", offset))
	  ^
pkg/eventbus/kafka/eventsource/source_kafka.go:29:19: s.Config undefined (type *KafkaSource has no field or method Config) (typecheck)
	config, err := s.Config()
	                 ^
pkg/eventbus/kafka/eventsource/source_kafka.go:38:36: s.Brokers undefined (type *KafkaSource has no field or method Brokers) (typecheck)
	client, err := sarama.NewClient(s.Brokers(), config)
	                                  ^
pkg/eventbus/kafka/eventsource/source_kafka.go:49:46: s.Logger undefined (type *KafkaSource has no field or method Logger) (typecheck)
		KafkaConnection: base.NewKafkaConnection(s.Logger),
		                                           ^
pkg/eventbus/kafka/sensor/kafka_handler.go:207:7: h.Lock undefined (type *KafkaHandler has no field or method Lock) (typecheck)
				h.Lock()
				  ^
pkg/eventbus/kafka/sensor/kafka_handler.go:208:13: h.Unlock undefined (type *KafkaHandler has no field or method Unlock) (typecheck)
				defer h.Unlock()
				        ^
pkg/eventbus/kafka/sensor/kafka_handler.go:225:4: h.Lock undefined (type *KafkaHandler has no field or method Lock) (typecheck)
	h.Lock()
	  ^
pkg/eventbus/kafka/sensor/kafka_handler.go:226:10: h.Unlock undefined (type *KafkaHandler has no field or method Unlock) (typecheck)
	defer h.Unlock()
	        ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:107:19: s.Config undefined (type *KafkaSensor has no field or method Config) (typecheck)
	config, err := s.Config()
	                 ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:115:36: s.Brokers undefined (type *KafkaSensor has no field or method Brokers) (typecheck)
	client, err := sarama.NewClient(s.Brokers(), config)
	                                  ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:138:6: s.Logger undefined (type *KafkaSensor has no field or method Logger) (typecheck)
			s.Logger.Errorf("Kafka producer error", zap.Error(err))
			  ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:146:20: s.Logger undefined (type *KafkaSensor has no field or method Logger) (typecheck)
		Logger:        s.Logger,
		                 ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:163:4: s.Lock undefined (type *KafkaSensor has no field or method Lock) (typecheck)
	s.Lock()
	  ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:164:10: s.Unlock undefined (type *KafkaSensor has no field or method Unlock) (typecheck)
	defer s.Unlock()
	        ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:186:47: s.Logger undefined (type *KafkaSensor has no field or method Logger) (typecheck)
			KafkaConnection: base.NewKafkaConnection(s.Logger),
			                                           ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:239:4: s.Lock undefined (type *KafkaSensor has no field or method Lock) (typecheck)
	s.Lock()
	  ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:240:10: s.Unlock undefined (type *KafkaSensor has no field or method Unlock) (typecheck)
	defer s.Unlock()
	        ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:246:4: s.Lock undefined (type *KafkaSensor has no field or method Lock) (typecheck)
	s.Lock()
	  ^
pkg/eventbus/kafka/sensor/kafka_sensor.go:247:10: s.Unlock undefined (type *KafkaSensor has no field or method Unlock) (typecheck)
	defer s.Unlock()
	        ^
pkg/eventbus/kafka/sensor/trigger_conn.go:45:12: e1.Source undefined (type *eventWithMetadata has no field or method Source) (typecheck)
	return e1.Source() == e2.Source() && e1.Subject() == e2.Subject()
	          ^
pkg/eventbus/kafka/sensor/trigger_handler.go:144:4: c.Logger undefined (type *KafkaTriggerConnection has no field or method Logger) (typecheck)
	c.Logger.Infow("Evaluating", zap.String("expr", c.depExpression.String()), zap.Any("parameters", parameters))
	  ^
pkg/eventbus/stan/eventsource/source_conn.go:22:11: n.STANConn undefined (type *STANSourceConn has no field or method STANConn) (typecheck)
	return n.STANConn.Publish(n.subject, msg.Body)
	         ^
pkg/eventbus/stan/eventsource/source_stan.go:28:17: n.MakeConnection undefined (type *SourceSTAN has no field or method MakeConnection) (typecheck)
	conn, err := n.MakeConnection(clientID)
	               ^
pkg/eventbus/stan/sensor/sensor_stan.go:38:17: n.MakeConnection undefined (type *SensorSTAN has no field or method MakeConnection) (typecheck)
	conn, err := n.MakeConnection(clientID)
	               ^
pkg/eventbus/stan/sensor/trigger_conn.go:33:15: n.Logger undefined (type *STANTriggerConn has no field or method Logger) (typecheck)
	n.Logger = n.Logger.With("triggerName", n.triggerName).With("clientID", n.ClientID)
	             ^
pkg/eventbus/stan/sensor/trigger_conn.go:41:76: n.ClientID undefined (type *STANTriggerConn has no field or method ClientID) (typecheck)
	return fmt.Sprintf("STANTriggerConn{ClientID:%s,Sensor:%s,Trigger:%s}", n.ClientID, n.sensorName, n.triggerName)
	                                                                          ^
pkg/eventbus/stan/sensor/trigger_conn.go:79:11: n.Logger undefined (type *STANTriggerConn has no field or method Logger) (typecheck)
	log := n.Logger
	         ^
pkg/eventbus/stan/sensor/trigger_conn.go:90:45: n.ClientID undefined (type *STANTriggerConn has no field or method ClientID) (typecheck)
	group, err := n.getGroupNameFromClientID(n.ClientID)
	                                           ^
pkg/eventbus/stan/sensor/trigger_conn.go:95:16: n.STANConn undefined (type *STANTriggerConn has no field or method STANConn) (typecheck)
	sub, err := n.STANConn.QueueSubscribe(*defaultSubject, group, func(m *stan.Msg) {
	              ^
pkg/eventbus/stan/sensor/trigger_conn.go:281:51: n.ClientID undefined (type *STANTriggerConn has no field or method ClientID) (typecheck)
	log.Debugf("Triggering actions for client %s", n.ClientID)
	                                                 ^
pkg/eventbus/stan/sensor/trigger_conn.go:290:11: n.Logger undefined (type *STANTriggerConn has no field or method Logger) (typecheck)
	log := n.Logger.With("clientID", n.ClientID)
	         ^
pkg/reconciler/cmd/start.go:11:2: could not import k8s.io/apimachinery/pkg/types (-: could not load export data: internal error in importing "k8s.io/apimachinery/pkg/types" (unsupported version: 2); please report an issue) (typecheck)
	"k8s.io/apimachinery/pkg/types"
	^
test/e2e/fixtures/e2e_suite.go:99:4: s.T undefined (type *E2ESuite has no field or method T) (typecheck)
	s.T().Log("EventBus is ready")
	  ^
test/e2e/fixtures/e2e_suite.go:112:4: s.T undefined (type *E2ESuite has no field or method T) (typecheck)
	s.T().Log("EventBus is deleted")
	  ^
INFO File cache stats: 14 entries of total size 88.3KiB
INFO Memory: 99 samples, avg is 1286.8MB, max is 1808.2MB
INFO Execution took 9.781753625s
make: *** [Makefile:177: lint] Error 1

However, the vast majority of these files are not ones that this PR changes nor are these the same errors that show up in the CI runner. Are there particular versions of the linting utilities that I should be installing?

Some extra blank lines.

@fullykubed
Copy link
Copy Markdown
Contributor Author

Some extra blank lines.

Hmmm... I don't think I am following. Looking at the diff, I don't believe we added any blank lines to the files that are failing the linter?

@whynowy
Copy link
Copy Markdown
Member

whynowy commented Apr 17, 2025

Some extra blank lines.

Hmmm... I don't think I am following. Looking at the diff, I don't believe we added any blank lines to the files that are failing the linter?

Screenshot 2025-04-17 at 8 22 23 AM

@whynowy
Copy link
Copy Markdown
Member

whynowy commented Apr 17, 2025

I still need more information about the outdated resourceVersion, for my understanding.

@fullykubed
Copy link
Copy Markdown
Contributor Author

fullykubed commented Apr 18, 2025

I still need more information about the outdated resourceVersion, for my understanding.

I think this article provides a nice summary.

Consider the following scenario for the current state of affairs:

  1. argo-events makes an update to a Deployment that it manages (e.g., for an EventSource)
  2. Another controller makes updates to that Deployment as well (e.g., kyverno for deploying standard labels, etc.). This increments the resourceVersion.
  3. argo-events tries to make another update but with an outdated resourceVersion (b/c it isn't watching for changes to the Deployment's it controls)
  4. This update fails with the object has been modified; please apply your changes to the latest version and try again
  5. Because no retry logic is implemented anywhere, the Deployment is now out-of-sync from what argo-events wants it to be, likely until the argo-events controller is restarted.

When we added the feature for propagating Jetstream updates, we noticed this other issue was also causing settings to become out-of-sync.

There are two options:

  1. Implement retries (the old method)
  2. Leverage server-side applies (the new method as of k8s 1.22)

K8s implement SSA to tackle this specific issue as retries are not very performant and are more brittle.

@whynowy
Copy link
Copy Markdown
Member

whynowy commented Apr 21, 2025

I still need more information about the outdated resourceVersion, for my understanding.

I think this article provides a nice summary.

Consider the following scenario for the current state of affairs:

  1. argo-events makes an update to a Deployment that it manages (e.g., for an EventSource)
  2. Another controller makes updates to that Deployment as well (e.g., kyverno for deploying standard labels, etc.). This increments the resourceVersion.
  3. argo-events tries to make another update but with an outdated resourceVersion (b/c it isn't watching for changes to the Deployment's it controls)
  4. This update fails with the object has been modified; please apply your changes to the latest version and try again
  5. Because no retry logic is implemented anywhere, the Deployment is now out-of-sync from what argo-events wants it to be, likely until the argo-events controller is restarted.

When we added the feature for propagating Jetstream updates, we noticed this other issue was also causing settings to become out-of-sync.

There are two options:

  1. Implement retries (the old method)
  2. Leverage server-side applies (the new method as of k8s 1.22)

K8s implement SSA to tackle this specific issue as retries are not very performant and are more brittle.

Would it be easier if we update the watch logic from generation changes to resourceVersion changes? Also, Why do you think there's no retry logic, every returned reconciliation error leads to a requeue, did I miss anything?

@fullykubed
Copy link
Copy Markdown
Contributor Author

Why do you think there's no retry logic, every returned reconciliation error leads to a requeue, did I miss anything?

It's been a bit since I looked at it, but it wasn't retrying when it failed to apply the updated Deployment specs which was most of the time in our cluster (unless the entire argo-events pod was restarted).

If it was retrying, I'd expect to see either (a) a long/infinite loop of retries (as evidenced by the failure logs) as the update would never succeed as the code never updates the resource version or (b) a successful reconcile. We did not observe either which is what started us out putting together this fix. However, we didn't dig too deep as it was pretty straightforward to just switch to server-side apply to address the issue.

Would it be easier if we update the watch logic from generation changes to resourceVersion changes?

Ultimately, how to proceed here is mostly a stylistic choice -- our only concern is 100% correctness.

The k8s team has provided server-side apply to simplify operations for controller devs because it means that you don't need to worry about tracking resource versions at all (reasoning here). There are many more benefits in addition to simply squashing this particular bug, so I'd recommend checking out that link.

While the move from client-side to server-side apply is a change worth thinking critically about, it seems to be simpler to implement and maintain than adding more layers of tracking onto the client-side approach, especially if you can now leverage the k8s API server to do that tracking + merging for you.

I'd recommend that approach as the default unless you had a specific reason to avoid it. However, if you wanted to add the additional client-side tracking necessary to ensure reconciles always succeeded, we'd definitely be happy to test it out in our environment where we were noticing reconciliation failures.

@whynowy
Copy link
Copy Markdown
Member

whynowy commented Apr 23, 2025

Why do you think there's no retry logic, every returned reconciliation error leads to a requeue, did I miss anything?

It's been a bit since I looked at it, but it wasn't retrying when it failed to apply the updated Deployment specs which was most of the time in our cluster (unless the entire argo-events pod was restarted).

If it was retrying, I'd expect to see either (a) a long/infinite loop of retries (as evidenced by the failure logs) as the update would never succeed as the code never updates the resource version or (b) a successful reconcile. We did not observe either which is what started us out putting together this fix. However, we didn't dig too deep as it was pretty straightforward to just switch to server-side apply to address the issue.

Would it be easier if we update the watch logic from generation changes to resourceVersion changes?

Ultimately, how to proceed here is mostly a stylistic choice -- our only concern is 100% correctness.

The k8s team has provided server-side apply to simplify operations for controller devs because it means that you don't need to worry about tracking resource versions at all (reasoning here). There are many more benefits in addition to simply squashing this particular bug, so I'd recommend checking out that link.

While the move from client-side to server-side apply is a change worth thinking critically about, it seems to be simpler to implement and maintain than adding more layers of tracking onto the client-side approach, especially if you can now leverage the k8s API server to do that tracking + merging for you.

I'd recommend that approach as the default unless you had a specific reason to avoid it. However, if you wanted to add the additional client-side tracking necessary to ensure reconciles always succeeded, we'd definitely be happy to test it out in our environment where we were noticing reconciliation failures.

Thanks for all the information! I tried by myself, did see the reconciliation error as you mentioned (when the log option you added is enabled), and those errors are swallowed by controller-runtime, which means there's no way to retry. In this case, I sort of tend to use the server-side apply solution you proposed. Considering this is not a corner case, do you know if any other projects had similar problem, and what kind of solution they took?

@fullykubed
Copy link
Copy Markdown
Contributor Author

fullykubed commented Apr 24, 2025

Considering this is not a corner case, do you know if any other projects had similar problem, and what kind of solution they took?

I have not personally run into this issue with other controllers -- I believe most are defaulting to SSA nowadays since it has been available for a couple years, so issues like this are becoming less common. Here is a blog post from the team at Google that was rolling this out in 2021 with some thoughts on best practices.

AFAIK there is no downside of SSA for controller authors other than slightly increased load on the k8s API server which would only come into play if you were continuously making many updates which isn't the case here (or for the vast majority of controllers). There are some other use cases (e.g., CICD pipelines) where using client-side apply instead makes sense, but I don't think that is relevant here.

Not exactly like this particular case, but here are a few example PRs from other projects who have been migrating if you were interested in seeing some specific examples of reasons other projects have been switching to it for deploying resource updates in various scenarios:

Let me know if there is any more info I can provide.

// during the migration from client-side to server-side apply. Otherewise, users
// may end up in a state where the finalizer cannot be removed automatically.
patch := &v1alpha1.EventBus{
Status: busCopy.Status,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add Status (even though adding it won't make any difference I think)? Since this is only supposed to update the finalizers.

@github-actions
Copy link
Copy Markdown
Contributor

This Pull Request is stale because it has been open for 60 days with
no activity. It will be closed in 7 days if no further activity.

@github-actions github-actions bot added the stale label Jun 25, 2025
@fullykubed
Copy link
Copy Markdown
Contributor Author

@whynowy Is there anything else you need from me here? Would love to get this merged in. Have some other patches that depend on this ready to open.

@github-actions github-actions bot removed the stale label Jun 26, 2025
@fullykubed
Copy link
Copy Markdown
Contributor Author

@whynowy Hey, just checking in here. Anything you need on my end?

@github-actions
Copy link
Copy Markdown
Contributor

This Pull Request is stale because it has been open for 60 days with
no activity. It will be closed in 7 days if no further activity.

@github-actions github-actions bot added the stale label Aug 30, 2025
@shearn89
Copy link
Copy Markdown
Contributor

It would be cool to get this merged if possible!

@whynowy whynowy removed the stale label Aug 30, 2025
Copy link
Copy Markdown
Member

@whynowy whynowy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also fix the lint error?

@github-actions
Copy link
Copy Markdown
Contributor

This Pull Request is stale because it has been open for 60 days with
no activity. It will be closed in 7 days if no further activity.

@github-actions github-actions bot added the stale label Oct 30, 2025
@github-actions github-actions bot closed this Nov 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants