diff --git a/CHANGELOG.md b/CHANGELOG.md index 81529861906..b9712052c54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,7 +72,7 @@ New deprecation(s): ### Other -- TODO +- **RabbitMQ Scaler:** Move from `streadway/amqp` to `rabbitmq/amqp091-go` ([#4004](https://github.com/kedacore/keda/pull/4039)) ## v2.9.1 diff --git a/go.mod b/go.mod index 46b4f6b7187..6fc979400a2 100644 --- a/go.mod +++ b/go.mod @@ -55,10 +55,10 @@ require ( github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.37.0 + github.com/rabbitmq/amqp091-go v1.5.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/robfig/cron/v3 v3.0.1 github.com/spf13/pflag v1.0.5 - github.com/streadway/amqp v1.0.0 github.com/stretchr/testify v1.8.1 github.com/tidwall/gjson v1.14.4 github.com/xdg/scram v1.0.5 diff --git a/go.sum b/go.sum index ed5f922fe67..229d3bad68b 100644 --- a/go.sum +++ b/go.sum @@ -793,6 +793,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= +github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg= +github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= @@ -829,8 +831,6 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= -github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= -github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -959,6 +959,7 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= @@ -1213,6 +1214,7 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index eb937516343..0ae43bdb178 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -12,7 +12,7 @@ import ( "time" "github.com/go-logr/logr" - "github.com/streadway/amqp" + amqp "github.com/rabbitmq/amqp091-go" v2 "k8s.io/api/autoscaling/v2" "k8s.io/metrics/pkg/apis/external_metrics" diff --git a/vendor/github.com/rabbitmq/amqp091-go/.gitignore b/vendor/github.com/rabbitmq/amqp091-go/.gitignore new file mode 100644 index 00000000000..a93cced225e --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/.gitignore @@ -0,0 +1,6 @@ +certs/* +spec/spec +examples/simple-consumer/simple-consumer +examples/simple-producer/simple-producer + +.idea/ diff --git a/vendor/github.com/rabbitmq/amqp091-go/.golangci.yml b/vendor/github.com/rabbitmq/amqp091-go/.golangci.yml new file mode 100644 index 00000000000..4341bcf984a --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/.golangci.yml @@ -0,0 +1,3 @@ +run: + build-tags: + - integration diff --git a/vendor/github.com/rabbitmq/amqp091-go/CODE_OF_CONDUCT.md b/vendor/github.com/rabbitmq/amqp091-go/CODE_OF_CONDUCT.md new file mode 100644 index 00000000000..24b5675902f --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/CODE_OF_CONDUCT.md @@ -0,0 +1,77 @@ +# Contributor Covenant Code of Conduct + +## Our Pledge + +In the interest of fostering an open and welcoming environment, we as +contributors and maintainers pledge to making participation in RabbitMQ Operator project and +our community a harassment-free experience for everyone, regardless of age, body +size, disability, ethnicity, sex characteristics, gender identity and expression, +level of experience, education, socio-economic status, nationality, personal +appearance, race, religion, or sexual identity and orientation. + +## Our Standards + +Examples of behavior that contributes to creating a positive environment +include: + +* Using welcoming and inclusive language +* Being respectful of differing viewpoints and experiences +* Gracefully accepting constructive criticism +* Focusing on what is best for the community +* Showing empathy towards other community members + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery and unwelcome sexual attention or + advances +* Trolling, insulting/derogatory comments, and personal or political attacks +* Public or private harassment +* Publishing others' private information, such as a physical or electronic + address, without explicit permission +* Other conduct which could reasonably be considered inappropriate in a + professional setting + +## Our Responsibilities + +Project maintainers are responsible for clarifying the standards of acceptable +behavior and are expected to take appropriate and fair corrective action in +response to any instances of unacceptable behavior. + +Project maintainers have the right and responsibility to remove, edit, or +reject comments, commits, code, wiki edits, issues, and other contributions +that are not aligned to this Code of Conduct, or to ban temporarily or +permanently any contributor for other behaviors that they deem inappropriate, +threatening, offensive, or harmful. + +## Scope + +This Code of Conduct applies both within project spaces and in public spaces +when an individual is representing the project or its community. Examples of +representing a project or community include using an official project e-mail +address, posting via an official social media account, or acting as an appointed +representative at an online or offline event. Representation of a project may be +further defined and clarified by project maintainers. + +## Enforcement + +Instances of abusive, harassing, or otherwise unacceptable behavior may be +reported by contacting the project team at oss-coc@vmware.com. All +complaints will be reviewed and investigated and will result in a response that +is deemed necessary and appropriate to the circumstances. The project team is +obligated to maintain confidentiality with regard to the reporter of an incident. +Further details of specific enforcement policies may be posted separately. + +Project maintainers who do not follow or enforce the Code of Conduct in good +faith may face temporary or permanent repercussions as determined by other +members of the project's leadership. + +## Attribution + +This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, +available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html + +[homepage]: https://www.contributor-covenant.org + +For answers to common questions about this code of conduct, see +https://www.contributor-covenant.org/faq + diff --git a/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md b/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md new file mode 100644 index 00000000000..ed1b971fccb --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/CONTRIBUTING.md @@ -0,0 +1,48 @@ +# Contributing + +## Workflow + +Here is the recommended workflow: + +1. Fork this repository, **github.com/rabbitmq/amqp091-go** +1. Create your feature branch (`git checkout -b my-new-feature`) +1. Run Static Checks +1. Run integration tests (see below) +1. **Implement tests** +1. Implement fixs +1. Commit your changes (`git commit -am 'Add some feature'`) +1. Push to a branch (`git push -u origin my-new-feature`) +1. Submit a pull request + +## Running Static Checks + +golangci-lint must be installed to run the static checks. See [installation +docs](https://golangci-lint.run/usage/install/) for more information. + +The static checks can be run via: + +```shell +make checks +``` + +## Running Tests + +### Integration Tests + +Running the Integration tests require: + +* A running RabbitMQ node with all defaults: + [https://www.rabbitmq.com/download.html](https://www.rabbitmq.com/download.html) +* That the server is either reachable via `amqp://guest:guest@127.0.0.1:5672/` + or the environment variable `AMQP_URL` set to it's URL + (e.g.: `export AMQP_URL="amqp://guest:verysecretpasswd@rabbitmq-host:5772/`) + +The integration tests can be run via: + +```shell +make tests +``` + +All integration tests should use the `integrationConnection(...)` test +helpers defined in `integration_test.go` to setup the integration environment +and logging. diff --git a/vendor/github.com/streadway/amqp/LICENSE b/vendor/github.com/rabbitmq/amqp091-go/LICENSE similarity index 89% rename from vendor/github.com/streadway/amqp/LICENSE rename to vendor/github.com/rabbitmq/amqp091-go/LICENSE index 07b89680a72..72fa55ebcba 100644 --- a/vendor/github.com/streadway/amqp/LICENSE +++ b/vendor/github.com/rabbitmq/amqp091-go/LICENSE @@ -1,5 +1,7 @@ -Copyright (c) 2012-2019, Sean Treadway, SoundCloud Ltd. -All rights reserved. +AMQP 0-9-1 Go Client +Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. + +Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: diff --git a/vendor/github.com/rabbitmq/amqp091-go/Makefile b/vendor/github.com/rabbitmq/amqp091-go/Makefile new file mode 100644 index 00000000000..1bf3f28caa8 --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/Makefile @@ -0,0 +1,29 @@ +.DEFAULT_GOAL := list + +# Insert a comment starting with '##' after a target, and it will be printed by 'make' and 'make list' +.PHONY: list +list: ## list Makefile targets + @echo "The most used targets: \n" + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}' + +.PHONY: fmt +fmt: ## Run go fmt against code + go fmt ./... + +.PHONY: tests +tests: ## Run all tests and requires a running rabbitmq-server + go test -cpu 1,2 -race -v -tags integration + +.PHONY: check +check: + golangci-lint run ./... + +.PHONY: rabbitmq-server +rabbitmq-server: + docker run --detach --rm --name amqp091-go-rabbitmq \ + --publish 5672:5672 --publish 15672:15672 \ + --pull always rabbitmq:3-management + +.PHONY: stop-rabbitmq-server +stop-rabbitmq-server: + docker stop $$(docker inspect --format='{{.Id}}' amqp091-go-rabbitmq) diff --git a/vendor/github.com/rabbitmq/amqp091-go/README.md b/vendor/github.com/rabbitmq/amqp091-go/README.md new file mode 100644 index 00000000000..d0512b19486 --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/README.md @@ -0,0 +1,107 @@ +# Go RabbitMQ Client Library + +[![amqp091-go](https://github.com/rabbitmq/amqp091-go/actions/workflows/tests.yml/badge.svg)](https://github.com/rabbitmq/amqp091-go/actions/workflows/tests.yml) +[![Go Reference](https://pkg.go.dev/badge/github.com/rabbitmq/amqp091-go.svg)](https://pkg.go.dev/github.com/rabbitmq/amqp091-go) + +This is a Go AMQP 0.9.1 client maintained by the [RabbitMQ core team](https://github.com/rabbitmq). +It was [originally developed by Sean Treadway](https://github.com/streadway/amqp). + +## Differences from streadway/amqp + +Some things are different compared to the original client, +others haven't changed. + +### Package Name + +This library uses a different package name. If moving from `streadway/amqp`, +using an alias may reduce the number of changes needed: + +``` go +amqp "github.com/rabbitmq/amqp091-go" +``` + +### License + +This client uses the same 2-clause BSD license as the original project. + +### Public API Evolution + + This client retains key API elements as practically possible. + It is, however, open to reasonable breaking public API changes suggested by the community. + We don't have the "no breaking public API changes ever" rule and fully recognize + that a good client API evolves over time. + + +## Project Maturity + +This project is based on a mature Go client that's been around for over a decade. + +We expect this client to undergo moderate breaking public API changes in 2021. +Major and minor versions will be updated accordingly. + + +## Supported Go Versions + +This client supports two most recent Go release series. + + +## Supported RabbitMQ Versions + +This project supports RabbitMQ versions starting with `2.0` but primarily tested +against [currently supported RabbitMQ release series](https://www.rabbitmq.com/versions.html). + +Some features and behaviours may be server version-specific. + +## Goals + +Provide a functional interface that closely represents the AMQP 0.9.1 model +targeted to RabbitMQ as a server. This includes the minimum necessary to +interact the semantics of the protocol. + +## Non-goals + +Things not intended to be supported. + + * Auto reconnect and re-synchronization of client and server topologies. + * Reconnection would require understanding the error paths when the + topology cannot be declared on reconnect. This would require a new set + of types and code paths that are best suited at the call-site of this + package. AMQP has a dynamic topology that needs all peers to agree. If + this doesn't happen, the behavior is undefined. Instead of producing a + possible interface with undefined behavior, this package is designed to + be simple for the caller to implement the necessary connection-time + topology declaration so that reconnection is trivial and encapsulated in + the caller's application code. + * AMQP Protocol negotiation for forward or backward compatibility. + * 0.9.1 is stable and widely deployed. AMQP 1.0 is a divergent + specification (a different protocol) and belongs to a different library. + * Anything other than PLAIN and EXTERNAL authentication mechanisms. + * Keeping the mechanisms interface modular makes it possible to extend + outside of this package. If other mechanisms prove to be popular, then + we would accept patches to include them in this package. + * Support for [`basic.return` and `basic.ack` frame ordering](https://www.rabbitmq.com/confirms.html#when-publishes-are-confirmed). + This client uses Go channels for certain protocol events and ordering between + events sent to two different channels generally cannot be guaranteed. + +## Usage + +See the [_examples](_examples) subdirectory for simple producers and consumers executables. +If you have a use-case in mind which isn't well-represented by the examples, +please file an issue. + +## Documentation + + * [Godoc API reference](http://godoc.org/github.com/rabbitmq/amqp091-go) + * [RabbitMQ tutorials in Go](https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go) + +## Contributing + +Pull requests are very much welcomed. Create your pull request on a non-main +branch, make sure a test or example is included that covers your change, and +your commits represent coherent changes that include a reason for the change. + +See [CONTRIBUTING.md](CONTRIBUTING.md) for more information. + +## License + +BSD 2 clause, see LICENSE for more details. diff --git a/vendor/github.com/streadway/amqp/allocator.go b/vendor/github.com/rabbitmq/amqp091-go/allocator.go similarity index 89% rename from vendor/github.com/streadway/amqp/allocator.go rename to vendor/github.com/rabbitmq/amqp091-go/allocator.go index 53620e7d0ce..0688e4b6433 100644 --- a/vendor/github.com/streadway/amqp/allocator.go +++ b/vendor/github.com/rabbitmq/amqp091-go/allocator.go @@ -1,4 +1,9 @@ -package amqp +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package amqp091 import ( "bytes" diff --git a/vendor/github.com/streadway/amqp/auth.go b/vendor/github.com/rabbitmq/amqp091-go/auth.go similarity index 63% rename from vendor/github.com/streadway/amqp/auth.go rename to vendor/github.com/rabbitmq/amqp091-go/auth.go index 435c94b12e3..0c07bb3ece5 100644 --- a/vendor/github.com/streadway/amqp/auth.go +++ b/vendor/github.com/rabbitmq/amqp091-go/auth.go @@ -1,11 +1,12 @@ -// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Source code and contact info at http://github.com/streadway/amqp -package amqp +package amqp091 import ( + "bytes" "fmt" ) @@ -43,13 +44,33 @@ func (auth *AMQPlainAuth) Mechanism() string { return "AMQPLAIN" } -// Response returns the null character delimited encoding for the SASL PLAIN Mechanism. +// Response returns an AMQP encoded credentials table, without the field table size. func (auth *AMQPlainAuth) Response() string { - return fmt.Sprintf("LOGIN:%sPASSWORD:%s", auth.Username, auth.Password) + var buf bytes.Buffer + table := Table{"LOGIN": auth.Username, "PASSWORD": auth.Password} + if err := writeTable(&buf, table); err != nil { + return "" + } + return buf.String()[4:] +} + +// ExternalAuth for RabbitMQ-auth-mechanism-ssl. +type ExternalAuth struct { +} + +// Mechanism returns "EXTERNAL" +func (*ExternalAuth) Mechanism() string { + return "EXTERNAL" +} + +// Response returns an AMQP encoded credentials table, without the field table size. +func (*ExternalAuth) Response() string { + return "\000*\000*" } // Finds the first mechanism preferred by the client that the server supports. func pickSASLMechanism(client []Authentication, serverMechanisms []string) (auth Authentication, ok bool) { + for _, auth = range client { for _, mech := range serverMechanisms { if auth.Mechanism() == mech { diff --git a/vendor/github.com/streadway/amqp/certs.sh b/vendor/github.com/rabbitmq/amqp091-go/certs.sh similarity index 98% rename from vendor/github.com/streadway/amqp/certs.sh rename to vendor/github.com/rabbitmq/amqp091-go/certs.sh index 834f4224270..403e80c544e 100644 --- a/vendor/github.com/streadway/amqp/certs.sh +++ b/vendor/github.com/rabbitmq/amqp091-go/certs.sh @@ -38,7 +38,7 @@ serial = $dir/serial default_crl_days = 7 default_days = 3650 -default_md = sha1 +default_md = sha256 policy = testca_policy x509_extensions = certificate_extensions @@ -57,7 +57,7 @@ basicConstraints = CA:false [ req ] default_bits = 2048 default_keyfile = ./private/cakey.pem -default_md = sha1 +default_md = sha256 prompt = yes distinguished_name = root_ca_distinguished_name x509_extensions = root_ca_extensions diff --git a/vendor/github.com/rabbitmq/amqp091-go/change_version.sh b/vendor/github.com/rabbitmq/amqp091-go/change_version.sh new file mode 100644 index 00000000000..c6401adee35 --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/change_version.sh @@ -0,0 +1,4 @@ +#/bin/bash +echo $1 > VERSION +sed -i -e "s/.*buildVersion = \"*.*/buildVersion = \"$1\"/" ./connection.go +go fmt ./... diff --git a/vendor/github.com/streadway/amqp/channel.go b/vendor/github.com/rabbitmq/amqp091-go/channel.go similarity index 88% rename from vendor/github.com/streadway/amqp/channel.go rename to vendor/github.com/rabbitmq/amqp091-go/channel.go index cd19ce7ee0a..862e57c8a78 100644 --- a/vendor/github.com/streadway/amqp/channel.go +++ b/vendor/github.com/rabbitmq/amqp091-go/channel.go @@ -1,11 +1,13 @@ -// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Source code and contact info at http://github.com/streadway/amqp -package amqp +package amqp091 import ( + "context" + "errors" "reflect" "sync" "sync/atomic" @@ -66,7 +68,7 @@ type Channel struct { errors chan *Error // State machine that manages frame order, must only be mutated by the connection - recv func(*Channel, frame) error + recv func(*Channel, frame) // Current state for frame re-assembly, only mutated from recv message messageWithContent @@ -87,9 +89,16 @@ func newChannel(c *Connection, id uint16) *Channel { } } +// Signal that from now on, Channel.send() should call Channel.sendClosed() +func (ch *Channel) setClosed() { + atomic.StoreInt32(&ch.closed, 1) +} + // shutdown is called by Connection after the channel has been removed from the // connection registry. func (ch *Channel) shutdown(e *Error) { + ch.setClosed() + ch.destructor.Do(func() { ch.m.Lock() defer ch.m.Unlock() @@ -105,10 +114,6 @@ func (ch *Channel) shutdown(e *Error) { } } - // Signal that from now on, Channel.send() should call - // Channel.sendClosed() - atomic.StoreInt32(&ch.closed, 1) - // Notify RPC if we're selecting if e != nil { ch.errors <- e @@ -154,7 +159,7 @@ func (ch *Channel) shutdown(e *Error) { // only 'channel.close' is sent to the server. func (ch *Channel) send(msg message) (err error) { // If the channel is closed, use Channel.sendClosed() - if atomic.LoadInt32(&ch.closed) == 1 { + if ch.IsClosed() { return ch.sendClosed(msg) } @@ -230,6 +235,11 @@ func (ch *Channel) sendOpen(msg message) (err error) { size = len(body) } + // If the channel is closed, use Channel.sendClosed() + if ch.IsClosed() { + return ch.sendClosed(msg) + } + if err = ch.connection.send(&methodFrame{ ChannelId: ch.id, Method: content, @@ -260,6 +270,11 @@ func (ch *Channel) sendOpen(msg message) (err error) { } } } else { + // If the channel is closed, use Channel.sendClosed() + if ch.IsClosed() { + return ch.sendClosed(msg) + } + err = ch.connection.send(&methodFrame{ ChannelId: ch.id, Method: msg, @@ -274,11 +289,16 @@ func (ch *Channel) sendOpen(msg message) (err error) { func (ch *Channel) dispatch(msg message) { switch m := msg.(type) { case *channelClose: + // Note: channel state is set to closed immedately after the message is + // decoded by the Connection + // lock before sending connection.close-ok // to avoid unexpected interleaving with basic.publish frames if // publishing is happening concurrently ch.m.Lock() - ch.send(&channelCloseOk{}) + if err := ch.send(&channelCloseOk{}); err != nil { + Logger.Printf("error sending channelCloseOk, channel id: %d error: %+v", ch.id, err) + } ch.m.Unlock() ch.connection.closeChannel(ch, newError(m.ReplyCode, m.ReplyText)) @@ -288,7 +308,9 @@ func (ch *Channel) dispatch(msg message) { c <- m.Active } ch.notifyM.RUnlock() - ch.send(&channelFlowOk{Active: m.Active}) + if err := ch.send(&channelFlowOk{Active: m.Active}); err != nil { + Logger.Printf("error sending channelFlowOk, channel id: %d error: %+v", ch.id, err) + } case *basicCancel: ch.notifyM.RLock() @@ -334,40 +356,41 @@ func (ch *Channel) dispatch(msg message) { } } -func (ch *Channel) transition(f func(*Channel, frame) error) error { +func (ch *Channel) transition(f func(*Channel, frame)) { ch.recv = f - return nil } -func (ch *Channel) recvMethod(f frame) error { +func (ch *Channel) recvMethod(f frame) { switch frame := f.(type) { case *methodFrame: if msg, ok := frame.Method.(messageWithContent); ok { ch.body = make([]byte, 0) ch.message = msg - return ch.transition((*Channel).recvHeader) + ch.transition((*Channel).recvHeader) + return } ch.dispatch(frame.Method) // termination state - return ch.transition((*Channel).recvMethod) + ch.transition((*Channel).recvMethod) case *headerFrame: // drop - return ch.transition((*Channel).recvMethod) + ch.transition((*Channel).recvMethod) case *bodyFrame: // drop - return ch.transition((*Channel).recvMethod) - } + ch.transition((*Channel).recvMethod) - panic("unexpected frame type") + default: + panic("unexpected frame type") + } } -func (ch *Channel) recvHeader(f frame) error { +func (ch *Channel) recvHeader(f frame) { switch frame := f.(type) { case *methodFrame: // interrupt content and handle method - return ch.recvMethod(f) + ch.recvMethod(f) case *headerFrame: // start collecting if we expect body frames @@ -376,29 +399,31 @@ func (ch *Channel) recvHeader(f frame) error { if frame.Size == 0 { ch.message.setContent(ch.header.Properties, ch.body) ch.dispatch(ch.message) // termination state - return ch.transition((*Channel).recvMethod) + ch.transition((*Channel).recvMethod) + return } - return ch.transition((*Channel).recvContent) + ch.transition((*Channel).recvContent) case *bodyFrame: // drop and reset - return ch.transition((*Channel).recvMethod) - } + ch.transition((*Channel).recvMethod) - panic("unexpected frame type") + default: + panic("unexpected frame type") + } } // state after method + header and before the length // defined by the header has been reached -func (ch *Channel) recvContent(f frame) error { +func (ch *Channel) recvContent(f frame) { switch frame := f.(type) { case *methodFrame: // interrupt content and handle method - return ch.recvMethod(f) + ch.recvMethod(f) case *headerFrame: // drop and reset - return ch.transition((*Channel).recvMethod) + ch.transition((*Channel).recvMethod) case *bodyFrame: if cap(ch.body) == 0 { @@ -409,13 +434,15 @@ func (ch *Channel) recvContent(f frame) error { if uint64(len(ch.body)) >= ch.header.Size { ch.message.setContent(ch.header.Properties, ch.body) ch.dispatch(ch.message) // termination state - return ch.transition((*Channel).recvMethod) + ch.transition((*Channel).recvMethod) + return } - return ch.transition((*Channel).recvContent) - } + ch.transition((*Channel).recvContent) - panic("unexpected frame type") + default: + panic("unexpected frame type") + } } /* @@ -433,6 +460,12 @@ func (ch *Channel) Close() error { ) } +// IsClosed returns true if the channel is marked as closed, otherwise false +// is returned. +func (ch *Channel) IsClosed() bool { + return atomic.LoadInt32(&ch.closed) == 1 +} + /* NotifyClose registers a listener for when the server sends a channel or connection exception in the form of a Connection.Close or Channel.Close method. @@ -443,6 +476,9 @@ this channel. The chan provided will be closed when the Channel is closed and on a graceful close, no error will be sent. +In case of a non graceful close the error will be notified synchronously by the library +so that it will be necessary to consume the Channel from the caller in order to avoid deadlocks + */ func (ch *Channel) NotifyClose(c chan *Error) chan *Error { ch.notifyM.Lock() @@ -594,6 +630,9 @@ or Channel while confirms are in-flight. It's advisable to wait for all Confirmations to arrive before calling Channel.Close() or Connection.Close(). +It is also advisable for the caller to consume from the channel returned till it is closed +to avoid possible deadlocks + */ func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation { ch.notifyM.Lock() @@ -1082,7 +1121,7 @@ func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, return nil, err } - return (<-chan Delivery)(deliveries), nil + return deliveries, nil } /* @@ -1322,10 +1361,75 @@ confirmations start at 1. Exit when all publishings are confirmed. When Publish does not return an error and the channel is in confirm mode, the internal counter for DeliveryTags with the first confirmation starts at 1. +Deprecated: Use PublishWithContext instead. */ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error { + _, err := ch.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, mandatory, immediate, msg) + return err +} + +/* +PublishWithContext sends a Publishing from the client to an exchange on the server. + +When you want a single message to be delivered to a single queue, you can +publish to the default exchange with the routingKey of the queue name. This is +because every declared queue gets an implicit route to the default exchange. + +Since publishings are asynchronous, any undeliverable message will get returned +by the server. Add a listener with Channel.NotifyReturn to handle any +undeliverable message when calling publish with either the mandatory or +immediate parameters as true. + +Publishings can be undeliverable when the mandatory flag is true and no queue is +bound that matches the routing key, or when the immediate flag is true and no +consumer on the matched queue is ready to accept the delivery. + +This can return an error when the channel, connection or socket is closed. The +error or lack of an error does not indicate whether the server has received this +publishing. + +It is possible for publishing to not reach the broker if the underlying socket +is shut down without pending publishing packets being flushed from the kernel +buffers. The easy way of making it probable that all publishings reach the +server is to always call Connection.Close before terminating your publishing +application. The way to ensure that all publishings reach the server is to add +a listener to Channel.NotifyPublish and put the channel in confirm mode with +Channel.Confirm. Publishing delivery tags and their corresponding +confirmations start at 1. Exit when all publishings are confirmed. + +When Publish does not return an error and the channel is in confirm mode, the +internal counter for DeliveryTags with the first confirmation starts at 1. +*/ +func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { + _, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg) + return err +} + +/* +PublishWithDeferredConfirm behaves identically to Publish but additionally returns a +DeferredConfirmation, allowing the caller to wait on the publisher confirmation +for this message. If the channel has not been put into confirm mode, +the DeferredConfirmation will be nil. + +Deprecated: Use PublishWithDeferredConfirmWithContext instead. +*/ +func (ch *Channel) PublishWithDeferredConfirm(exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { + return ch.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, mandatory, immediate, msg) +} + +/* +PublishWithDeferredConfirmWithContext behaves identically to Publish but additionally returns a +DeferredConfirmation, allowing the caller to wait on the publisher confirmation +for this message. If the channel has not been put into confirm mode, +the DeferredConfirmation will be nil. +*/ +func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { + if ctx == nil { + return nil, errors.New("amqp091-go: nil Context") + } + if err := msg.Headers.Validate(); err != nil { - return err + return nil, err } ch.m.Lock() @@ -1353,14 +1457,14 @@ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg AppId: msg.AppId, }, }); err != nil { - return err + return nil, err } if ch.confirming { - ch.confirms.Publish() + return ch.confirms.Publish(ctx), nil } - return nil + return nil, nil } /* @@ -1530,6 +1634,11 @@ If the deliveries cannot be recovered, an error will be returned and the channel will be closed. Note: this method is not implemented on RabbitMQ, use Delivery.Nack instead + +Deprecated: This method is deprecated in RabbitMQ. RabbitMQ used Recover(true) +as a mechanism for consumers to tell the broker that they were ready for more +deliveries, back in 2008-2009. Support for this will be removed from RabbitMQ in +a future release. Use Nack() with requeue=true instead. */ func (ch *Channel) Recover(requeue bool) error { return ch.call( @@ -1591,3 +1700,12 @@ func (ch *Channel) Reject(tag uint64, requeue bool) error { Requeue: requeue, }) } + +// GetNextPublishSeqNo returns the sequence number of the next message to be +// published, when in confirm mode. +func (ch *Channel) GetNextPublishSeqNo() uint64 { + ch.confirms.m.Lock() + defer ch.confirms.m.Unlock() + + return ch.confirms.published + 1 +} diff --git a/vendor/github.com/rabbitmq/amqp091-go/confirms.go b/vendor/github.com/rabbitmq/amqp091-go/confirms.go new file mode 100644 index 00000000000..c7e671c2756 --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/confirms.go @@ -0,0 +1,190 @@ +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package amqp091 + +import ( + "context" + "sync" +) + +// confirms resequences and notifies one or multiple publisher confirmation listeners +type confirms struct { + m sync.Mutex + listeners []chan Confirmation + sequencer map[uint64]Confirmation + deferredConfirmations *deferredConfirmations + published uint64 + publishedMut sync.Mutex + expecting uint64 +} + +// newConfirms allocates a confirms +func newConfirms() *confirms { + return &confirms{ + sequencer: map[uint64]Confirmation{}, + deferredConfirmations: newDeferredConfirmations(), + published: 0, + expecting: 1, + } +} + +func (c *confirms) Listen(l chan Confirmation) { + c.m.Lock() + defer c.m.Unlock() + + c.listeners = append(c.listeners, l) +} + +// Publish increments the publishing counter +func (c *confirms) Publish(ctx context.Context) *DeferredConfirmation { + c.publishedMut.Lock() + defer c.publishedMut.Unlock() + + c.published++ + return c.deferredConfirmations.Add(ctx, c.published) +} + +// confirm confirms one publishing, increments the expecting delivery tag, and +// removes bookkeeping for that delivery tag. +func (c *confirms) confirm(confirmation Confirmation) { + delete(c.sequencer, c.expecting) + c.expecting++ + for _, l := range c.listeners { + l <- confirmation + } +} + +// resequence confirms any out of order delivered confirmations +func (c *confirms) resequence() { + c.publishedMut.Lock() + defer c.publishedMut.Unlock() + + for c.expecting <= c.published { + sequenced, found := c.sequencer[c.expecting] + if !found { + return + } + c.confirm(sequenced) + } +} + +// One confirms one publishing and all following in the publishing sequence +func (c *confirms) One(confirmed Confirmation) { + c.m.Lock() + defer c.m.Unlock() + + c.deferredConfirmations.Confirm(confirmed) + + if c.expecting == confirmed.DeliveryTag { + c.confirm(confirmed) + } else { + c.sequencer[confirmed.DeliveryTag] = confirmed + } + c.resequence() +} + +// Multiple confirms all publishings up until the delivery tag +func (c *confirms) Multiple(confirmed Confirmation) { + c.m.Lock() + defer c.m.Unlock() + + c.deferredConfirmations.ConfirmMultiple(confirmed) + + for c.expecting <= confirmed.DeliveryTag { + c.confirm(Confirmation{c.expecting, confirmed.Ack}) + } + c.resequence() +} + +// Cleans up the confirms struct and its dependencies. +// Closes all listeners, discarding any out of sequence confirmations +func (c *confirms) Close() error { + c.m.Lock() + defer c.m.Unlock() + + c.deferredConfirmations.Close() + + for _, l := range c.listeners { + close(l) + } + c.listeners = nil + return nil +} + +type deferredConfirmations struct { + m sync.Mutex + confirmations map[uint64]*DeferredConfirmation +} + +func newDeferredConfirmations() *deferredConfirmations { + return &deferredConfirmations{ + confirmations: map[uint64]*DeferredConfirmation{}, + } +} + +func (d *deferredConfirmations) Add(ctx context.Context, tag uint64) *DeferredConfirmation { + d.m.Lock() + defer d.m.Unlock() + + dc := &DeferredConfirmation{DeliveryTag: tag} + dc.ctx, dc.cancel = context.WithCancel(ctx) + d.confirmations[tag] = dc + return dc +} + +func (d *deferredConfirmations) Confirm(confirmation Confirmation) { + d.m.Lock() + defer d.m.Unlock() + + dc, found := d.confirmations[confirmation.DeliveryTag] + if !found { + // we should never receive a confirmation for a tag that hasn't been published, but a test causes this to happen + return + } + dc.Confirm(confirmation.Ack) + delete(d.confirmations, confirmation.DeliveryTag) +} + +func (d *deferredConfirmations) ConfirmMultiple(confirmation Confirmation) { + d.m.Lock() + defer d.m.Unlock() + + for k, v := range d.confirmations { + if k <= confirmation.DeliveryTag { + v.Confirm(confirmation.Ack) + delete(d.confirmations, k) + } + } +} + +// Nacks all pending DeferredConfirmations being blocked by dc.Wait() +func (d *deferredConfirmations) Close() { + d.m.Lock() + defer d.m.Unlock() + + for k, v := range d.confirmations { + v.Confirm(false) + delete(d.confirmations, k) + } +} + +// Confirm ack confirmation. +func (d *DeferredConfirmation) Confirm(ack bool) { + d.m.Lock() + defer d.m.Unlock() + + d.confirmation.Ack = ack + d.cancel() +} + +// Waits for publisher confirmation. Returns true if server successfully received the publishing. +func (d *DeferredConfirmation) Wait() bool { + <-d.ctx.Done() + + d.m.Lock() + defer d.m.Unlock() + return d.confirmation.Ack +} diff --git a/vendor/github.com/streadway/amqp/connection.go b/vendor/github.com/rabbitmq/amqp091-go/connection.go similarity index 79% rename from vendor/github.com/streadway/amqp/connection.go rename to vendor/github.com/rabbitmq/amqp091-go/connection.go index b9d8e8eee17..189f748c204 100644 --- a/vendor/github.com/streadway/amqp/connection.go +++ b/vendor/github.com/rabbitmq/amqp091-go/connection.go @@ -1,15 +1,19 @@ -// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Source code and contact info at http://github.com/streadway/amqp -package amqp +package amqp091 import ( "bufio" "crypto/tls" + "crypto/x509" + "errors" + "fmt" "io" "net" + "os" "reflect" "strconv" "strings" @@ -23,8 +27,9 @@ const ( defaultHeartbeat = 10 * time.Second defaultConnectionTimeout = 30 * time.Second - defaultProduct = "https://github.com/streadway/amqp" - defaultVersion = "β" + defaultProduct = "AMQP 0.9.1 Client" + buildVersion = "1.5.0" + platform = "golang" // Safer default that makes channel leaks a lot easier to spot // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593. defaultChannelMax = (2 << 10) - 1 @@ -71,6 +76,13 @@ type Config struct { Dial func(network, addr string) (net.Conn, error) } +// NewConnectionProperties initialises an amqp.Table struct to empty value. This +// amqp.Table can be used as Properties in amqp.Config to set the connection +// name, using amqp.DialConfig() +func NewConnectionProperties() Table { + return make(Table) +} + // Connection manages the serialization and deserialization of frames from IO // and dispatches the frames to the appropriate channel. All RPC methods and // asynchronous Publishing, Delivery, Ack, Nack and Return messages are @@ -157,6 +169,23 @@ func DialTLS(url string, amqps *tls.Config) (*Connection, error) { }) } +// DialTLS_ExternalAuth accepts a string in the AMQP URI format and returns a +// new Connection over TCP using EXTERNAL auth. Defaults to a server heartbeat +// interval of 10 seconds and sets the initial read deadline to 30 seconds. +// +// This mechanism is used, when RabbitMQ is configured for EXTERNAL auth with +// ssl_cert_login plugin for userless/passwordless logons +// +// DialTLS_ExternalAuth uses the provided tls.Config when encountering an +// amqps:// scheme. +func DialTLS_ExternalAuth(url string, amqps *tls.Config) (*Connection, error) { + return DialConfig(url, Config{ + Heartbeat: defaultHeartbeat, + TLSClientConfig: amqps, + SASL: []Authentication{&ExternalAuth{}}, + }) +} + // DialConfig accepts a string in the AMQP URI format and a configuration for // the transport and connection setup, returning a new Connection. Defaults to // a server heartbeat interval of 10 seconds and sets the initial read deadline @@ -192,7 +221,11 @@ func DialConfig(url string, config Config) (*Connection, error) { if uri.Scheme == "amqps" { if config.TLSClientConfig == nil { - config.TLSClientConfig = new(tls.Config) + tlsConfig, err := tlsConfigFromURI(uri) + if err != nil { + return nil, fmt.Errorf("create TLS config from URI: %w", err) + } + config.TLSClientConfig = tlsConfig } // If ServerName has not been specified in TLSClientConfig, @@ -203,7 +236,6 @@ func DialConfig(url string, config Config) (*Connection, error) { client := tls.Client(conn, config.TLSClientConfig) if err := client.Handshake(); err != nil { - conn.Close() return nil, err } @@ -234,6 +266,22 @@ func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) { return c, c.open(config) } +/* +UpdateSecret updates the secret used to authenticate this connection. It is used when +secrets have an expiration date and need to be renewed, like OAuth 2 tokens. + +It returns an error if the operation is not successful, or if the connection is closed. +*/ +func (c *Connection) UpdateSecret(newSecret, reason string) error { + if c.IsClosed() { + return ErrClosed + } + return c.call(&connectionUpdateSecret{ + NewSecret: newSecret, + Reason: reason, + }, &connectionUpdateSecretOk{}) +} + /* LocalAddr returns the local TCP peer address, or ":0" (the zero value of net.TCPAddr) as a fallback default value if the underlying transport does not support LocalAddr(). @@ -247,6 +295,18 @@ func (c *Connection) LocalAddr() net.Addr { return &net.TCPAddr{} } +/* +RemoteAddr returns the remote TCP peer address, if known. +*/ +func (c *Connection) RemoteAddr() net.Addr { + if conn, ok := c.conn.(interface { + RemoteAddr() net.Addr + }); ok { + return conn.RemoteAddr() + } + return &net.TCPAddr{} +} + // ConnectionState returns basic TLS details of the underlying transport. // Returns a zero value when the underlying connection does not implement // ConnectionState() tls.ConnectionState. @@ -263,7 +323,11 @@ func (c *Connection) ConnectionState() tls.ConnectionState { NotifyClose registers a listener for close events either initiated by an error accompanying a connection.close method or by a normal shutdown. -On normal shutdowns, the chan will be closed. +The chan provided will be closed when the Connection is closed and on a +graceful close, no error will be sent. + +In case of a non graceful close the error will be notified synchronously by the library +so that it will be necessary to consume the Channel from the caller in order to avoid deadlocks To reconnect after a transport or protocol error, register a listener here and re-run your setup process. @@ -443,11 +507,10 @@ func (c *Connection) dispatch0(f frame) { switch m := mf.Method.(type) { case *connectionClose: // Send immediately as shutdown will close our side of the writer. - c.send(&methodFrame{ - ChannelId: 0, - Method: &connectionCloseOk{}, - }) - + f := &methodFrame{ChannelId: 0, Method: &connectionCloseOk{}} + if err := c.send(f); err != nil { + Logger.Printf("error sending connectionCloseOk, error: %+v", err) + } c.shutdown(newError(m.ReplyCode, m.ReplyText)) case *connectionBlocked: for _, c := range c.blocks { @@ -464,15 +527,20 @@ func (c *Connection) dispatch0(f frame) { // kthx - all reads reset our deadline. so we can drop this default: // lolwat - channel0 only responds to methods and heartbeats - c.closeWith(ErrUnexpectedFrame) + if err := c.closeWith(ErrUnexpectedFrame); err != nil { + Logger.Printf("error sending connectionCloseOk with ErrUnexpectedFrame, error: %+v", err) + } } } func (c *Connection) dispatchN(f frame) { c.m.Lock() channel := c.channels[f.channel()] + updateChannel(f, channel) c.m.Unlock() + // Note: this could result in concurrent dispatch depending on + // how channels are managed in an application if channel != nil { channel.recv(channel, f) } else { @@ -496,15 +564,17 @@ func (c *Connection) dispatchClosed(f frame) { if mf, ok := f.(*methodFrame); ok { switch mf.Method.(type) { case *channelClose: - c.send(&methodFrame{ - ChannelId: f.channel(), - Method: &channelCloseOk{}, - }) + f := &methodFrame{ChannelId: f.channel(), Method: &channelCloseOk{}} + if err := c.send(f); err != nil { + Logger.Printf("error sending channelCloseOk, channel id: %d error: %+v", f.channel(), err) + } case *channelCloseOk: // we are already closed, so do nothing default: // unexpected method on closed channel - c.closeWith(ErrClosed) + if err := c.closeWith(ErrClosed); err != nil { + Logger.Printf("error sending connectionCloseOk with ErrClosed, error: %+v", err) + } } } } @@ -517,6 +587,8 @@ func (c *Connection) reader(r io.Reader) { frames := &reader{buf} conn, haveDeadliner := r.(readDeadliner) + defer close(c.rpc) + for { frame, err := frames.ReadFrame() @@ -528,7 +600,12 @@ func (c *Connection) reader(r io.Reader) { c.demux(frame) if haveDeadliner { - c.deadlines <- conn + select { + case c.deadlines <- conn: + default: + // On c.Close() c.heartbeater() might exit just before c.deadlines <- conn is called. + // Which results in this goroutine being stuck forever. + } } } } @@ -571,7 +648,13 @@ func (c *Connection) heartbeater(interval time.Duration, done chan *Error) { // When reading, reset our side of the deadline, if we've negotiated one with // a deadline that covers at least 2 server heartbeats if interval > 0 { - conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval)) + if err := conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval)); err != nil { + var opErr *net.OpError + if !errors.As(err, &opErr) { + Logger.Printf("error setting read deadline in heartbeater: %+v", err) + return + } + } } case <-done: @@ -662,27 +745,26 @@ func (c *Connection) call(req message, res ...message) error { } } - select { - case err, ok := <-c.errors: - if !ok { + msg, ok := <-c.rpc + if !ok { + err, errorsChanIsOpen := <-c.errors + if !errorsChanIsOpen { return ErrClosed } return err + } - case msg := <-c.rpc: - // Try to match one of the result types - for _, try := range res { - if reflect.TypeOf(msg) == reflect.TypeOf(try) { - // *res = *msg - vres := reflect.ValueOf(try).Elem() - vmsg := reflect.ValueOf(msg).Elem() - vres.Set(vmsg) - return nil - } + // Try to match one of the result types + for _, try := range res { + if reflect.TypeOf(msg) == reflect.TypeOf(try) { + // *res = *msg + vres := reflect.ValueOf(try).Elem() + vmsg := reflect.ValueOf(msg).Elem() + vres.Set(vmsg) + return nil } - return ErrCommandInvalid } - // unreachable + return ErrCommandInvalid } // Connection = open-Connection *use-Connection close-Connection @@ -712,7 +794,7 @@ func (c *Connection) openStart(config Config) error { c.Major = int(start.VersionMajor) c.Minor = int(start.VersionMinor) - c.Properties = Table(start.ServerProperties) + c.Properties = start.ServerProperties c.Locales = strings.Split(start.Locales, " ") // eventually support challenge/response here by also responding to @@ -734,14 +816,17 @@ func (c *Connection) openStart(config Config) error { func (c *Connection) openTune(config Config, auth Authentication) error { if len(config.Properties) == 0 { config.Properties = Table{ - "product": defaultProduct, - "version": defaultVersion, + "product": defaultProduct, + "version": buildVersion, + "platform": platform, } } config.Properties["capabilities"] = Table{ "connection.blocked": true, "consumer_cancel_notify": true, + "basic.nack": true, + "publisher_confirms": true, } ok := &connectionStartOk{ @@ -779,7 +864,7 @@ func (c *Connection) openTune(config Config, auth Authentication) error { // "The client should start sending heartbeats after receiving a // Connection.Tune method" - go c.heartbeater(c.Config.Heartbeat, c.NotifyClose(make(chan *Error, 1))) + go c.heartbeater(c.Config.Heartbeat/2, c.NotifyClose(make(chan *Error, 1))) if err := c.send(&methodFrame{ ChannelId: 0, @@ -825,6 +910,45 @@ func (c *Connection) openComplete() error { return nil } +// tlsConfigFromURI tries to create TLS configuration based on query parameters. +// Returns default (empty) config in case no suitable client cert and/or client key not provided. +// Returns error in case certificates can not be parsed. +func tlsConfigFromURI(uri URI) (*tls.Config, error) { + var certPool *x509.CertPool + if uri.CACertFile != "" { + data, err := os.ReadFile(uri.CACertFile) + if err != nil { + return nil, fmt.Errorf("read CA certificate: %w", err) + } + + certPool = x509.NewCertPool() + certPool.AppendCertsFromPEM(data) + } else if sysPool, err := x509.SystemCertPool(); err != nil { + return nil, fmt.Errorf("load system certificates: %w", err) + } else { + certPool = sysPool + } + + if uri.CertFile == "" || uri.KeyFile == "" { + // no client auth (mTLS), just server auth + return &tls.Config{ + RootCAs: certPool, + ServerName: uri.ServerName, + }, nil + } + + certificate, err := tls.LoadX509KeyPair(uri.CertFile, uri.KeyFile) + if err != nil { + return nil, fmt.Errorf("load client certificate: %w", err) + } + + return &tls.Config{ + Certificates: []tls.Certificate{certificate}, + RootCAs: certPool, + ServerName: uri.ServerName, + }, nil +} + func max(a, b int) int { if a > b { return a diff --git a/vendor/github.com/streadway/amqp/consumers.go b/vendor/github.com/rabbitmq/amqp091-go/consumers.go similarity index 94% rename from vendor/github.com/streadway/amqp/consumers.go rename to vendor/github.com/rabbitmq/amqp091-go/consumers.go index 887ac7494ec..8c23fadab70 100644 --- a/vendor/github.com/streadway/amqp/consumers.go +++ b/vendor/github.com/rabbitmq/amqp091-go/consumers.go @@ -1,9 +1,9 @@ -// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Source code and contact info at http://github.com/streadway/amqp -package amqp +package amqp091 import ( "os" diff --git a/vendor/github.com/streadway/amqp/delivery.go b/vendor/github.com/rabbitmq/amqp091-go/delivery.go similarity index 96% rename from vendor/github.com/streadway/amqp/delivery.go rename to vendor/github.com/rabbitmq/amqp091-go/delivery.go index 72412644231..e94cf343703 100644 --- a/vendor/github.com/streadway/amqp/delivery.go +++ b/vendor/github.com/rabbitmq/amqp091-go/delivery.go @@ -1,9 +1,9 @@ -// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Source code and contact info at http://github.com/streadway/amqp -package amqp +package amqp091 import ( "errors" @@ -13,7 +13,7 @@ import ( var errDeliveryNotInitialized = errors.New("delivery not initialized") // Acknowledger notifies the server of successful or failed consumption of -// delivieries via identifier found in the Delivery.DeliveryTag field. +// deliveries via identifier found in the Delivery.DeliveryTag field. // // Applications can provide mock implementations in tests of Delivery handlers. type Acknowledger interface { diff --git a/vendor/github.com/streadway/amqp/doc.go b/vendor/github.com/rabbitmq/amqp091-go/doc.go similarity index 62% rename from vendor/github.com/streadway/amqp/doc.go rename to vendor/github.com/rabbitmq/amqp091-go/doc.go index ee69c5b3822..dbd4bad49e4 100644 --- a/vendor/github.com/streadway/amqp/doc.go +++ b/vendor/github.com/rabbitmq/amqp091-go/doc.go @@ -1,10 +1,10 @@ -// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Source code and contact info at http://github.com/streadway/amqp /* -Package amqp is an AMQP 0.9.1 client with RabbitMQ extensions +Package amqp091 is an AMQP 0.9.1 client with RabbitMQ extensions Understand the AMQP 0.9.1 messaging model by reviewing these links first. Much of the terminology in this library directly relates to AMQP concepts. @@ -59,7 +59,7 @@ of band from an RPC call like basic.ack or basic.flow. Any asynchronous events, including Deliveries and Publishings must always have a receiver until the corresponding chans are closed. Without asynchronous -receivers, the sychronous methods will block. +receivers, the synchronous methods will block. Use Case @@ -104,5 +104,65 @@ encounters an amqp:// scheme. SSL/TLS in RabbitMQ is documented here: http://www.rabbitmq.com/ssl.html +Best practises to handle library notifications. + +Best practises for Connections and Channels notifications: + +In order to be notified when a connection or channel gets closed both the structures offer the possibility to register channels using the `notifyClose` function like: +notifyConnClose := make(chan *amqp.Error) +conn.NotifyClose(notifyConnClose) +No errors will be sent in case of a graceful connection close. +In case of a non-graceful close, because of a network issue of forced disconnection from the UI, the error will be notified synchronously by the library. +You can see that in the shutdown function of connection and channel (see connection.go and channel.go) + + if err != nil { + for _, c := range c.closes { + c <- err + } + } + +The error is sent synchronously to the channel so that the flow will wait until the channel will be consumed by the caller. +To avoid deadlocks it is necessary to consume the messages from the channels. +This could be done inside a different goroutine with a select listening on the two channels inside a for loop like: + + go func() { + for notifyConnClose != nil || notifyChanClose != nil { + select { + case err, ok := <-notifyConnClose: + if !(ok) { + notifyConnClose = nil + } else { + fmt.Printf("connection closed, error %s", err) + } + case err, ok := <-notifyChanClose: + if !(ok) { + notifyChanClose = nil + } else { + fmt.Printf("channel closed, error %s", err) + } + } + } + }() + +Best practises for NotifyPublish notifications: + +Similary to the previous sceneario using the NotifyPublish method allows the caller of the library to be notified through a go channel when a message has been received +from the broker after Channel.Confirm has been set. +It's advisable to wait for all Confirmations to arrive before calling Channel.Close() or Connection.Close(). +It is also necessary for the caller to always consume from this channel till it get closed from the library to avoid possible deadlocks. +Confirmations go channel are indeed notified inside the confirm function of the Confirm struct synchronously: + + // confirm confirms one publishing, increments the expecting delivery tag, and + // removes bookkeeping for that delivery tag. + func (c *confirms) confirm(confirmation Confirmation) { + delete(c.sequencer, c.expecting) + c.expecting++ + for _, l := range c.listeners { + l <- confirmation + } + } + +It is so necessary to have a goroutine consuming from this channel till it get closed. + */ -package amqp +package amqp091 diff --git a/vendor/github.com/rabbitmq/amqp091-go/fuzz.go b/vendor/github.com/rabbitmq/amqp091-go/fuzz.go new file mode 100644 index 00000000000..c9f03ea4e61 --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/fuzz.go @@ -0,0 +1,23 @@ +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build gofuzz +// +build gofuzz + +package amqp091 + +import "bytes" + +func Fuzz(data []byte) int { + r := reader{bytes.NewReader(data)} + frame, err := r.ReadFrame() + if err != nil { + if frame != nil { + panic("frame is not nil") + } + return 0 + } + return 1 +} diff --git a/vendor/github.com/streadway/amqp/gen.sh b/vendor/github.com/rabbitmq/amqp091-go/gen.sh similarity index 100% rename from vendor/github.com/streadway/amqp/gen.sh rename to vendor/github.com/rabbitmq/amqp091-go/gen.sh diff --git a/vendor/github.com/rabbitmq/amqp091-go/log.go b/vendor/github.com/rabbitmq/amqp091-go/log.go new file mode 100644 index 00000000000..7540f137afe --- /dev/null +++ b/vendor/github.com/rabbitmq/amqp091-go/log.go @@ -0,0 +1,23 @@ +// Copyright (c) 2022 VMware, Inc. or its affiliates. All Rights Reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package amqp091 + +type Logging interface { + Printf(format string, v ...interface{}) +} + +var Logger Logging = NullLogger{} + +// Enables logging using a custom Logging instance. Note that this is +// not thread safe and should be called at application start +func SetLogger(logger Logging) { + Logger = logger +} + +type NullLogger struct { +} + +func (l NullLogger) Printf(format string, v ...interface{}) { +} diff --git a/vendor/github.com/streadway/amqp/read.go b/vendor/github.com/rabbitmq/amqp091-go/read.go similarity index 94% rename from vendor/github.com/streadway/amqp/read.go rename to vendor/github.com/rabbitmq/amqp091-go/read.go index 3aa0b338112..97a97743c0b 100644 --- a/vendor/github.com/streadway/amqp/read.go +++ b/vendor/github.com/rabbitmq/amqp091-go/read.go @@ -1,9 +1,9 @@ -// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Source code and contact info at http://github.com/streadway/amqp -package amqp +package amqp091 import ( "bytes" @@ -14,7 +14,7 @@ import ( ) /* -Reads a frame from an input stream and returns an interface that can be cast into +ReadFrame reads a frame from an input stream and returns an interface that can be cast into one of the following: methodFrame @@ -50,7 +50,7 @@ func (r *reader) ReadFrame() (frame frame, err error) { return } - typ := uint8(scratch[0]) + typ := scratch[0] channel := binary.BigEndian.Uint16(scratch[1:3]) size := binary.BigEndian.Uint32(scratch[3:7]) @@ -131,20 +131,6 @@ func readDecimal(r io.Reader) (v Decimal, err error) { return } -func readFloat32(r io.Reader) (v float32, err error) { - if err = binary.Read(r, binary.BigEndian, &v); err != nil { - return - } - return -} - -func readFloat64(r io.Reader) (v float64, err error) { - if err = binary.Read(r, binary.BigEndian, &v); err != nil { - return - } - return -} - func readTimestamp(r io.Reader) (v time.Time, err error) { var sec int64 if err = binary.Read(r, binary.BigEndian, &sec); err != nil { @@ -161,7 +147,8 @@ func readTimestamp(r io.Reader) (v time.Time, err error) { 'S': string 'T': time.Time 'V': nil -'b': byte +'b': int8 +'B': byte 'd': float64 'f': float32 'l': int64 @@ -183,13 +170,20 @@ func readField(r io.Reader) (v interface{}, err error) { } return (value != 0), nil - case 'b': + case 'B': var value [1]byte if _, err = io.ReadFull(r, value[0:1]); err != nil { return } return value[0], nil + case 'b': + var value int8 + if err = binary.Read(r, binary.BigEndian, &value); err != nil { + return + } + return value, nil + case 's': var value int16 if err = binary.Read(r, binary.BigEndian, &value); err != nil { @@ -309,7 +303,7 @@ func readArray(r io.Reader) ([]interface{}, error) { var ( lim = &io.LimitedReader{R: r, N: int64(size)} - arr = []interface{}{} + arr []interface{} field interface{} ) diff --git a/vendor/github.com/streadway/amqp/return.go b/vendor/github.com/rabbitmq/amqp091-go/return.go similarity index 93% rename from vendor/github.com/streadway/amqp/return.go rename to vendor/github.com/rabbitmq/amqp091-go/return.go index 10dcedb2c8c..cdc3875edce 100644 --- a/vendor/github.com/streadway/amqp/return.go +++ b/vendor/github.com/rabbitmq/amqp091-go/return.go @@ -1,9 +1,9 @@ -// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Source code and contact info at http://github.com/streadway/amqp -package amqp +package amqp091 import ( "time" diff --git a/vendor/github.com/streadway/amqp/spec091.go b/vendor/github.com/rabbitmq/amqp091-go/spec091.go similarity index 97% rename from vendor/github.com/streadway/amqp/spec091.go rename to vendor/github.com/rabbitmq/amqp091-go/spec091.go index cd53ebe7401..d86e753a953 100644 --- a/vendor/github.com/streadway/amqp/spec091.go +++ b/vendor/github.com/rabbitmq/amqp091-go/spec091.go @@ -1,12 +1,12 @@ -// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Source code and contact info at http://github.com/streadway/amqp /* GENERATED FILE - DO NOT EDIT */ /* Rebuild from the spec/gen.go tool */ -package amqp +package amqp091 import ( "encoding/binary" @@ -552,6 +552,66 @@ func (msg *connectionUnblocked) read(r io.Reader) (err error) { return } +type connectionUpdateSecret struct { + NewSecret string + Reason string +} + +func (msg *connectionUpdateSecret) id() (uint16, uint16) { + return 10, 70 +} + +func (msg *connectionUpdateSecret) wait() bool { + return true +} + +func (msg *connectionUpdateSecret) write(w io.Writer) (err error) { + + if err = writeLongstr(w, msg.NewSecret); err != nil { + return + } + + if err = writeShortstr(w, msg.Reason); err != nil { + return + } + + return +} + +func (msg *connectionUpdateSecret) read(r io.Reader) (err error) { + + if msg.NewSecret, err = readLongstr(r); err != nil { + return + } + + if msg.Reason, err = readShortstr(r); err != nil { + return + } + + return +} + +type connectionUpdateSecretOk struct { +} + +func (msg *connectionUpdateSecretOk) id() (uint16, uint16) { + return 10, 71 +} + +func (msg *connectionUpdateSecretOk) wait() bool { + return true +} + +func (msg *connectionUpdateSecretOk) write(w io.Writer) (err error) { + + return +} + +func (msg *connectionUpdateSecretOk) read(r io.Reader) (err error) { + + return +} + type channelOpen struct { reserved1 string } @@ -2852,6 +2912,22 @@ func (r *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err err } mf.Method = method + case 70: // connection update-secret + //fmt.Println("NextMethod: class:10 method:70") + method := &connectionUpdateSecret{} + if err = method.read(r.r); err != nil { + return + } + mf.Method = method + + case 71: // connection update-secret-ok + //fmt.Println("NextMethod: class:10 method:71") + method := &connectionUpdateSecretOk{} + if err = method.read(r.r); err != nil { + return + } + mf.Method = method + default: return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) } diff --git a/vendor/github.com/streadway/amqp/types.go b/vendor/github.com/rabbitmq/amqp091-go/types.go similarity index 91% rename from vendor/github.com/streadway/amqp/types.go rename to vendor/github.com/rabbitmq/amqp091-go/types.go index 83bd92f9779..80f73622248 100644 --- a/vendor/github.com/streadway/amqp/types.go +++ b/vendor/github.com/rabbitmq/amqp091-go/types.go @@ -1,13 +1,15 @@ -// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Source code and contact info at http://github.com/streadway/amqp -package amqp +package amqp091 import ( + "context" "fmt" "io" + "sync" "time" ) @@ -29,7 +31,7 @@ var ( ErrChannelMax = &Error{Code: ChannelError, Reason: "channel id space exhausted"} // ErrSASL is returned from Dial when the authentication mechanism could not - // be negoated. + // be negotiated. ErrSASL = &Error{Code: AccessRefused, Reason: "SASL could not negotiate a shared mechanism"} // ErrCredentials is returned when the authenticated client is not authorized @@ -179,6 +181,17 @@ type Blocking struct { Reason string // Server reason for activation } +// DeferredConfirmation represents a future publisher confirm for a message. It +// allows users to directly correlate a publishing to a confirmation. These are +// returned from PublishWithDeferredConfirm on Channels. +type DeferredConfirmation struct { + m sync.Mutex + ctx context.Context + cancel context.CancelFunc + DeliveryTag uint64 + confirmation Confirmation +} + // Confirmation notifies the acknowledgment or negative acknowledgement of a // publishing identified by its delivery tag. Use NotifyPublish on the Channel // to consume these events. @@ -198,6 +211,7 @@ type Decimal struct { // // bool // byte +// int8 // float32 // float64 // int @@ -226,7 +240,7 @@ type Table map[string]interface{} func validateField(f interface{}) error { switch fv := f.(type) { - case nil, bool, byte, int, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time: + case nil, bool, byte, int8, int, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time: return nil case []interface{}: @@ -254,17 +268,12 @@ func (t Table) Validate() error { return validateField(t) } -// Heap interface for maintaining delivery tags -type tagSet []uint64 - -func (set tagSet) Len() int { return len(set) } -func (set tagSet) Less(i, j int) bool { return (set)[i] < (set)[j] } -func (set tagSet) Swap(i, j int) { (set)[i], (set)[j] = (set)[j], (set)[i] } -func (set *tagSet) Push(tag interface{}) { *set = append(*set, tag.(uint64)) } -func (set *tagSet) Pop() interface{} { - val := (*set)[len(*set)-1] - *set = (*set)[:len(*set)-1] - return val +// Sets the connection name property. This property can be used in +// amqp.Config to set a custom connection name during amqp.DialConfig(). This +// can be helpful to identify specific connections in RabbitMQ, for debugging or +// tracing purposes. +func (t Table) SetClientConnectionName(connName string) { + t["connection_name"] = connName } type message interface { @@ -310,6 +319,18 @@ type frame interface { channel() uint16 } +/* +Perform any updates on the channel immediately after the frame is decoded while the +connection mutex is held. +*/ +func updateChannel(f frame, channel *Channel) { + if mf, isMethodFrame := f.(*methodFrame); isMethodFrame { + if _, isChannelClose := mf.Method.(*channelClose); isChannelClose { + channel.setClosed() + } + } +} + type reader struct { r io.Reader } diff --git a/vendor/github.com/streadway/amqp/uri.go b/vendor/github.com/rabbitmq/amqp091-go/uri.go similarity index 74% rename from vendor/github.com/streadway/amqp/uri.go rename to vendor/github.com/rabbitmq/amqp091-go/uri.go index e584715497f..320daaedec7 100644 --- a/vendor/github.com/streadway/amqp/uri.go +++ b/vendor/github.com/rabbitmq/amqp091-go/uri.go @@ -1,9 +1,9 @@ -// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Source code and contact info at http://github.com/streadway/amqp -package amqp +package amqp091 import ( "errors" @@ -32,12 +32,16 @@ var defaultURI = URI{ // URI represents a parsed AMQP URI string. type URI struct { - Scheme string - Host string - Port int - Username string - Password string - Vhost string + Scheme string + Host string + Port int + Username string + Password string + Vhost string + CertFile string // client TLS auth - path to certificate (PEM) + CACertFile string // client TLS auth - path to CA certificate (PEM) + KeyFile string // client TLS auth - path to private key (PEM) + ServerName string // client TLS auth - server name } // ParseURI attempts to parse the given AMQP URI according to the spec. @@ -52,10 +56,21 @@ type URI struct { // Password: guest // Vhost: / // +// Supports TLS query parameters. See https://www.rabbitmq.com/uri-query-parameters.html +// +// certfile: +// keyfile: +// cacertfile: +// server_name_indication: +// +// If cacertfile is not provided, system CA certificates will be used. +// Mutual TLS (client auth) will be enabled only in case keyfile AND certfile provided. +// +// If Config.TLSClientConfig is set, TLS parameters from URI will be ignored. func ParseURI(uri string) (URI, error) { builder := defaultURI - if strings.Contains(uri, " ") == true { + if strings.Contains(uri, " ") { return builder, errURIWhitespace } @@ -113,6 +128,13 @@ func ParseURI(uri string) (URI, error) { } } + // see https://www.rabbitmq.com/uri-query-parameters.html + params := u.Query() + builder.CertFile = params.Get("certfile") + builder.KeyFile = params.Get("keyfile") + builder.CACertFile = params.Get("cacertfile") + builder.ServerName = params.Get("server_name_indication") + return builder, nil } diff --git a/vendor/github.com/streadway/amqp/write.go b/vendor/github.com/rabbitmq/amqp091-go/write.go similarity index 96% rename from vendor/github.com/streadway/amqp/write.go rename to vendor/github.com/rabbitmq/amqp091-go/write.go index 94a46d115e2..3cee0c346b8 100644 --- a/vendor/github.com/streadway/amqp/write.go +++ b/vendor/github.com/rabbitmq/amqp091-go/write.go @@ -1,9 +1,9 @@ -// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd. +// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved. +// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Source code and contact info at http://github.com/streadway/amqp -package amqp +package amqp091 import ( "bufio" @@ -212,7 +212,7 @@ func writeFrame(w io.Writer, typ uint8, channel uint16, payload []byte) (err err size := uint(len(payload)) _, err = w.Write([]byte{ - byte(typ), + typ, byte((channel & 0xff00) >> 8), byte((channel & 0x00ff) >> 0), byte((size & 0xff000000) >> 24), @@ -276,7 +276,8 @@ func writeLongstr(w io.Writer, s string) (err error) { 'S': string 'T': time.Time 'V': nil -'b': byte +'b': int8 +'B': byte 'd': float64 'f': float32 'l': int64 @@ -299,8 +300,13 @@ func writeField(w io.Writer, value interface{}) (err error) { enc = buf[:2] case byte: + buf[0] = 'B' + buf[1] = v + enc = buf[:2] + + case int8: buf[0] = 'b' - buf[1] = byte(v) + buf[1] = uint8(v) enc = buf[:2] case int16: @@ -335,7 +341,7 @@ func writeField(w io.Writer, value interface{}) (err error) { case Decimal: buf[0] = 'D' - buf[1] = byte(v.Scale) + buf[1] = v.Scale binary.BigEndian.PutUint32(buf[2:6], uint32(v.Value)) enc = buf[:6] @@ -412,5 +418,5 @@ func writeTable(w io.Writer, table Table) (err error) { } } - return writeLongstr(w, string(buf.Bytes())) + return writeLongstr(w, buf.String()) } diff --git a/vendor/github.com/streadway/amqp/.gitignore b/vendor/github.com/streadway/amqp/.gitignore deleted file mode 100644 index 667fb50c57c..00000000000 --- a/vendor/github.com/streadway/amqp/.gitignore +++ /dev/null @@ -1,12 +0,0 @@ -certs/* -spec/spec -examples/simple-consumer/simple-consumer -examples/simple-producer/simple-producer - -.idea/**/workspace.xml -.idea/**/tasks.xml -.idea/**/usage.statistics.xml -.idea/**/dictionaries -.idea/**/shelf - -.idea/**/contentModel.xml diff --git a/vendor/github.com/streadway/amqp/.travis.yml b/vendor/github.com/streadway/amqp/.travis.yml deleted file mode 100644 index 7eee262b4b9..00000000000 --- a/vendor/github.com/streadway/amqp/.travis.yml +++ /dev/null @@ -1,25 +0,0 @@ -language: go - -go: - - 1.10.x - - 1.11.x - - 1.12.x - - 1.13.x - -addons: - apt: - packages: - - rabbitmq-server - -services: - - rabbitmq - -env: - - GO111MODULE=on AMQP_URL=amqp://guest:guest@127.0.0.1:5672/ - -before_install: - - go get -v golang.org/x/lint/golint - -script: - - ./pre-commit - - go test -cpu=1,2 -v -tags integration ./... diff --git a/vendor/github.com/streadway/amqp/CONTRIBUTING.md b/vendor/github.com/streadway/amqp/CONTRIBUTING.md deleted file mode 100644 index c87f3d7e0f1..00000000000 --- a/vendor/github.com/streadway/amqp/CONTRIBUTING.md +++ /dev/null @@ -1,35 +0,0 @@ -## Prequisites - -1. Go: [https://golang.org/dl/](https://golang.org/dl/) -1. Golint `go get -u -v github.com/golang/lint/golint` - -## Contributing - -The workflow is pretty standard: - -1. Fork github.com/streadway/amqp -1. Add the pre-commit hook: `ln -s ../../pre-commit .git/hooks/pre-commit` -1. Create your feature branch (`git checkout -b my-new-feature`) -1. Run integration tests (see below) -1. **Implement tests** -1. Implement fixs -1. Commit your changes (`git commit -am 'Add some feature'`) -1. Push to a branch (`git push -u origin my-new-feature`) -1. Submit a pull request - -## Running Tests - -The test suite assumes that: - - * A RabbitMQ node is running on localhost with all defaults: [https://www.rabbitmq.com/download.html](https://www.rabbitmq.com/download.html) - * `AMQP_URL` is exported to `amqp://guest:guest@127.0.0.1:5672/` - -### Integration Tests - -After starting a local RabbitMQ, run integration tests with the following: - - env AMQP_URL=amqp://guest:guest@127.0.0.1:5672/ go test -v -cpu 2 -tags integration -race - -All integration tests should use the `integrationConnection(...)` test -helpers defined in `integration_test.go` to setup the integration environment -and logging. diff --git a/vendor/github.com/streadway/amqp/README.md b/vendor/github.com/streadway/amqp/README.md deleted file mode 100644 index 287830b28b0..00000000000 --- a/vendor/github.com/streadway/amqp/README.md +++ /dev/null @@ -1,93 +0,0 @@ -[![Build Status](https://api.travis-ci.org/streadway/amqp.svg)](http://travis-ci.org/streadway/amqp) [![GoDoc](https://godoc.org/github.com/streadway/amqp?status.svg)](http://godoc.org/github.com/streadway/amqp) - -# Go RabbitMQ Client Library - -This is an AMQP 0.9.1 client with RabbitMQ extensions in Go. - -## Project Maturity - -This project has been used in production systems for many years. It is reasonably mature -and feature complete, and as of November 2016 has [a team of maintainers](https://github.com/streadway/amqp/issues/215). - -Future API changes are unlikely but possible. They will be discussed on [Github -issues](https://github.com/streadway/amqp/issues) along with any bugs or -enhancements. - -## Supported Go Versions - -This library supports two most recent Go release series, currently 1.10 and 1.11. - - -## Supported RabbitMQ Versions - -This project supports RabbitMQ versions starting with `2.0` but primarily tested -against reasonably recent `3.x` releases. Some features and behaviours may be -server version-specific. - -## Goals - -Provide a functional interface that closely represents the AMQP 0.9.1 model -targeted to RabbitMQ as a server. This includes the minimum necessary to -interact the semantics of the protocol. - -## Non-goals - -Things not intended to be supported. - - * Auto reconnect and re-synchronization of client and server topologies. - * Reconnection would require understanding the error paths when the - topology cannot be declared on reconnect. This would require a new set - of types and code paths that are best suited at the call-site of this - package. AMQP has a dynamic topology that needs all peers to agree. If - this doesn't happen, the behavior is undefined. Instead of producing a - possible interface with undefined behavior, this package is designed to - be simple for the caller to implement the necessary connection-time - topology declaration so that reconnection is trivial and encapsulated in - the caller's application code. - * AMQP Protocol negotiation for forward or backward compatibility. - * 0.9.1 is stable and widely deployed. Versions 0.10 and 1.0 are divergent - specifications that change the semantics and wire format of the protocol. - We will accept patches for other protocol support but have no plans for - implementation ourselves. - * Anything other than PLAIN and EXTERNAL authentication mechanisms. - * Keeping the mechanisms interface modular makes it possible to extend - outside of this package. If other mechanisms prove to be popular, then - we would accept patches to include them in this package. - -## Usage - -See the 'examples' subdirectory for simple producers and consumers executables. -If you have a use-case in mind which isn't well-represented by the examples, -please file an issue. - -## Documentation - -Use [Godoc documentation](http://godoc.org/github.com/streadway/amqp) for -reference and usage. - -[RabbitMQ tutorials in -Go](https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go) are also -available. - -## Contributing - -Pull requests are very much welcomed. Create your pull request on a non-master -branch, make sure a test or example is included that covers your change and -your commits represent coherent changes that include a reason for the change. - -To run the integration tests, make sure you have RabbitMQ running on any host, -export the environment variable `AMQP_URL=amqp://host/` and run `go test -tags -integration`. TravisCI will also run the integration tests. - -Thanks to the [community of contributors](https://github.com/streadway/amqp/graphs/contributors). - -## External packages - - * [Google App Engine Dialer support](https://github.com/soundtrackyourbrand/gaeamqp) - * [RabbitMQ examples in Go](https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go) - -## License - -BSD 2 clause - see LICENSE for more details. - - diff --git a/vendor/github.com/streadway/amqp/confirms.go b/vendor/github.com/streadway/amqp/confirms.go deleted file mode 100644 index 06cbaa71103..00000000000 --- a/vendor/github.com/streadway/amqp/confirms.go +++ /dev/null @@ -1,94 +0,0 @@ -package amqp - -import "sync" - -// confirms resequences and notifies one or multiple publisher confirmation listeners -type confirms struct { - m sync.Mutex - listeners []chan Confirmation - sequencer map[uint64]Confirmation - published uint64 - expecting uint64 -} - -// newConfirms allocates a confirms -func newConfirms() *confirms { - return &confirms{ - sequencer: map[uint64]Confirmation{}, - published: 0, - expecting: 1, - } -} - -func (c *confirms) Listen(l chan Confirmation) { - c.m.Lock() - defer c.m.Unlock() - - c.listeners = append(c.listeners, l) -} - -// publish increments the publishing counter -func (c *confirms) Publish() uint64 { - c.m.Lock() - defer c.m.Unlock() - - c.published++ - return c.published -} - -// confirm confirms one publishing, increments the expecting delivery tag, and -// removes bookkeeping for that delivery tag. -func (c *confirms) confirm(confirmation Confirmation) { - delete(c.sequencer, c.expecting) - c.expecting++ - for _, l := range c.listeners { - l <- confirmation - } -} - -// resequence confirms any out of order delivered confirmations -func (c *confirms) resequence() { - for c.expecting <= c.published { - sequenced, found := c.sequencer[c.expecting] - if !found { - return - } - c.confirm(sequenced) - } -} - -// one confirms one publishing and all following in the publishing sequence -func (c *confirms) One(confirmed Confirmation) { - c.m.Lock() - defer c.m.Unlock() - - if c.expecting == confirmed.DeliveryTag { - c.confirm(confirmed) - } else { - c.sequencer[confirmed.DeliveryTag] = confirmed - } - c.resequence() -} - -// multiple confirms all publishings up until the delivery tag -func (c *confirms) Multiple(confirmed Confirmation) { - c.m.Lock() - defer c.m.Unlock() - - for c.expecting <= confirmed.DeliveryTag { - c.confirm(Confirmation{c.expecting, confirmed.Ack}) - } - c.resequence() -} - -// Close closes all listeners, discarding any out of sequence confirmations -func (c *confirms) Close() error { - c.m.Lock() - defer c.m.Unlock() - - for _, l := range c.listeners { - close(l) - } - c.listeners = nil - return nil -} diff --git a/vendor/github.com/streadway/amqp/fuzz.go b/vendor/github.com/streadway/amqp/fuzz.go deleted file mode 100644 index 16e626ce751..00000000000 --- a/vendor/github.com/streadway/amqp/fuzz.go +++ /dev/null @@ -1,17 +0,0 @@ -// +build gofuzz - -package amqp - -import "bytes" - -func Fuzz(data []byte) int { - r := reader{bytes.NewReader(data)} - frame, err := r.ReadFrame() - if err != nil { - if frame != nil { - panic("frame is not nil") - } - return 0 - } - return 1 -} diff --git a/vendor/github.com/streadway/amqp/pre-commit b/vendor/github.com/streadway/amqp/pre-commit deleted file mode 100644 index 37155300735..00000000000 --- a/vendor/github.com/streadway/amqp/pre-commit +++ /dev/null @@ -1,67 +0,0 @@ -#!/bin/sh - -LATEST_STABLE_SUPPORTED_GO_VERSION="1.11" - -main() { - if local_go_version_is_latest_stable - then - run_gofmt - run_golint - run_govet - fi - run_unit_tests -} - -local_go_version_is_latest_stable() { - go version | grep -q $LATEST_STABLE_SUPPORTED_GO_VERSION -} - -log_error() { - echo "$*" 1>&2 -} - -run_gofmt() { - GOFMT_FILES=$(gofmt -l .) - if [ -n "$GOFMT_FILES" ] - then - log_error "gofmt failed for the following files: -$GOFMT_FILES - -please run 'gofmt -w .' on your changes before committing." - exit 1 - fi -} - -run_golint() { - GOLINT_ERRORS=$(golint ./... | grep -v "Id should be") - if [ -n "$GOLINT_ERRORS" ] - then - log_error "golint failed for the following reasons: -$GOLINT_ERRORS - -please run 'golint ./...' on your changes before committing." - exit 1 - fi -} - -run_govet() { - GOVET_ERRORS=$(go tool vet ./*.go 2>&1) - if [ -n "$GOVET_ERRORS" ] - then - log_error "go vet failed for the following reasons: -$GOVET_ERRORS - -please run 'go tool vet ./*.go' on your changes before committing." - exit 1 - fi -} - -run_unit_tests() { - if [ -z "$NOTEST" ] - then - log_error 'Running short tests...' - env AMQP_URL= go test -short - fi -} - -main diff --git a/vendor/modules.txt b/vendor/modules.txt index 70397d4aaf3..05b6ec7ecba 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -964,6 +964,9 @@ github.com/prometheus/common/model github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util +# github.com/rabbitmq/amqp091-go v1.5.0 +## explicit; go 1.16 +github.com/rabbitmq/amqp091-go # github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 ## explicit github.com/rcrowley/go-metrics @@ -989,9 +992,6 @@ github.com/spf13/cobra # github.com/spf13/pflag v1.0.5 ## explicit; go 1.12 github.com/spf13/pflag -# github.com/streadway/amqp v1.0.0 -## explicit; go 1.10 -github.com/streadway/amqp # github.com/stretchr/objx v0.5.0 ## explicit; go 1.12 github.com/stretchr/objx