diff --git a/CODEOWNERS b/CODEOWNERS index d7ad3a0b2558c..37e376e77e79d 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -40,6 +40,8 @@ extensions/filters/common/original_src @snowp @klarose /*/extensions/tracers/xray @marcomagdy @lavignes @mattklein123 # mysql_proxy extension /*/extensions/filters/network/mysql_proxy @rshriram @venilnoronha @mattklein123 +# postgres_proxy extension +/*/extensions/filters/network/postgres_proxy @fabriziomello @cpakulski @dio # quic extension /*/extensions/quic_listeners/ @alyssawilk @danzh2010 @mattklein123 @mpwarres @wu-bin # zookeeper_proxy extension diff --git a/api/BUILD b/api/BUILD index 2f50226761bbd..0dafe82267e98 100644 --- a/api/BUILD +++ b/api/BUILD @@ -211,6 +211,7 @@ proto_library( "//envoy/extensions/filters/network/local_ratelimit/v3:pkg", "//envoy/extensions/filters/network/mongo_proxy/v3:pkg", "//envoy/extensions/filters/network/mysql_proxy/v3:pkg", + "//envoy/extensions/filters/network/postgres_proxy/v3alpha:pkg", "//envoy/extensions/filters/network/ratelimit/v3:pkg", "//envoy/extensions/filters/network/rbac/v3:pkg", "//envoy/extensions/filters/network/redis_proxy/v3:pkg", diff --git a/api/envoy/extensions/filters/network/postgres_proxy/v3alpha/BUILD b/api/envoy/extensions/filters/network/postgres_proxy/v3alpha/BUILD new file mode 100644 index 0000000000000..ef3541ebcb1df --- /dev/null +++ b/api/envoy/extensions/filters/network/postgres_proxy/v3alpha/BUILD @@ -0,0 +1,9 @@ +# DO NOT EDIT. This file is generated by tools/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = ["@com_github_cncf_udpa//udpa/annotations:pkg"], +) diff --git a/api/envoy/extensions/filters/network/postgres_proxy/v3alpha/postgres_proxy.proto b/api/envoy/extensions/filters/network/postgres_proxy/v3alpha/postgres_proxy.proto new file mode 100644 index 0000000000000..61f3ec45c8838 --- /dev/null +++ b/api/envoy/extensions/filters/network/postgres_proxy/v3alpha/postgres_proxy.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +package envoy.extensions.filters.network.postgres_proxy.v3alpha; + +import "udpa/annotations/migrate.proto"; +import "udpa/annotations/status.proto"; +import "udpa/annotations/versioning.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.filters.network.postgres_proxy.v3alpha"; +option java_outer_classname = "PostgresProxyProto"; +option java_multiple_files = true; +option (udpa.annotations.file_status).work_in_progress = true; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Postgres proxy] +// Postgres Proxy :ref:`configuration overview +// `. +// [#extension: envoy.filters.network.postgres_proxy] + +message PostgresProxy { + // The human readable prefix to use when emitting :ref:`statistics + // `. + string stat_prefix = 1 [(validate.rules).string = {min_len: 1}]; +} diff --git a/api/versioning/BUILD b/api/versioning/BUILD index 21cd386fc94e5..0ffaf85a1cdd0 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -92,6 +92,7 @@ proto_library( "//envoy/extensions/filters/network/local_ratelimit/v3:pkg", "//envoy/extensions/filters/network/mongo_proxy/v3:pkg", "//envoy/extensions/filters/network/mysql_proxy/v3:pkg", + "//envoy/extensions/filters/network/postgres_proxy/v3alpha:pkg", "//envoy/extensions/filters/network/ratelimit/v3:pkg", "//envoy/extensions/filters/network/rbac/v3:pkg", "//envoy/extensions/filters/network/redis_proxy/v3:pkg", diff --git a/docs/root/configuration/listeners/network_filters/network_filters.rst b/docs/root/configuration/listeners/network_filters/network_filters.rst index bb4d14911142a..65511250f84bb 100644 --- a/docs/root/configuration/listeners/network_filters/network_filters.rst +++ b/docs/root/configuration/listeners/network_filters/network_filters.rst @@ -19,6 +19,7 @@ filters. local_rate_limit_filter mongo_proxy_filter mysql_proxy_filter + postgres_proxy_filter rate_limit_filter rbac_filter redis_proxy_filter diff --git a/docs/root/configuration/listeners/network_filters/postgres_proxy_filter.rst b/docs/root/configuration/listeners/network_filters/postgres_proxy_filter.rst new file mode 100644 index 0000000000000..dd7b489fd344d --- /dev/null +++ b/docs/root/configuration/listeners/network_filters/postgres_proxy_filter.rst @@ -0,0 +1,92 @@ +.. _config_network_filters_postgres_proxy: + +Postgres proxy +================ + +The Postgres proxy filter decodes the wire protocol between a Postgres client (downstream) and a Postgres server +(upstream). The decoded information is currently used only to produce Postgres level statistics like sesions, +statements or transactions executed, among others. This current version does not decode SQL queries. Future versions may +add more statistics and more advanced capabilities. When the Postgres filter detects that a session is encrypted, the messages are ignored and no decoding takes +place. More information: + +* Postgres :ref:`architecture overview ` + +.. attention:: + + The `postgres_proxy` filter is experimental and is currently under active development. + Capabilities will be expanded over time and the configuration structures are likely to change. + + +.. warning:: + + The `postgreql_proxy` filter was tested only with + `Postgres frontend/backend protocol version 3.0`_, which was introduced in + Postgres 7.4. Earlier versions are thus not supported. Testing is limited + anyway to not EOL-ed versions. + + .. _Postgres frontend/backend protocol version 3.0: https://www.postgresql.org/docs/current/protocol.html + + + +Configuration +------------- + +The Postgres proxy filter should be chained with the TCP proxy as shown in the configuration +example below: + +.. code-block:: yaml + + filter_chains: + - filters: + - name: envoy.filters.network.postgres_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.postgres_proxy.v3alpha.PostgresProxy + stat_prefix: postgres + - name: envoy.tcp_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy + stat_prefix: tcp + cluster: postgres_cluster + + +.. _config_network_filters_postgres_proxy_stats: + +Statistics +---------- + +Every configured Postgres proxy filter has statistics rooted at postgres. with the following statistics: + +.. csv-table:: + :header: Name, Type, Description + :widths: 2, 1, 2 + + errors, Counter, Number of times the server replied with ERROR message + errors_error, Counter, Number of times the server replied with ERROR message with ERROR severity + errors_fatal, Counter, Number of times the server replied with ERROR message with FATAL severity + errors_panic, Counter, Number of times the server replied with ERROR message with PANIC severity + errors_unknown, Counter, Number of times the server replied with ERROR message but the decoder could not parse it + messages, Counter, Total number of messages processed by the filter + messages_backend, Counter, Total number of backend messages detected by the filter + messages_frontend, Counter, Number of frontend messages detected by the filter + messages_unknown, Counter, Number of times the filter successfully decoded a message but did not know what to do with it + sessions, Counter, Total number of successful logins + sessions_encrypted, Counter, Number of times the filter detected encrypted sessions + sessions_unencrypted, Counter, Number of messages indicating unencrypted successful login + statements, Counter, Total number of SQL statements + statements_delete, Counter, Number of DELETE statements + statements_insert, Counter, Number of INSERT statements + statements_select, Counter, Number of SELECT statements + statements_update, Counter, Number of UPDATE statements + statements_other, Counter, "Number of statements other than DELETE, INSERT, SELECT or UPDATE" + transactions, Counter, Total number of SQL transactions + transactions_commit, Counter, Number of COMMIT transactions + transactions_rollback, Counter, Number of ROLLBACK transactions + notices, Counter, Total number of NOTICE messages + notices_notice, Counter, Number of NOTICE messages with NOTICE subtype + notices_log, Counter, Number of NOTICE messages with LOG subtype + notices_warning, Counter, Number ofr NOTICE messags with WARNING severity + notices_debug, Counter, Number of NOTICE messages with DEBUG severity + notices_info, Counter, Number of NOTICE messages with INFO severity + notices_unknown, Counter, Number of NOTICE messages which could not be recognized + + diff --git a/docs/root/intro/arch_overview/other_protocols/other_protocols.rst b/docs/root/intro/arch_overview/other_protocols/other_protocols.rst index 1081a96993afd..f6257a2855233 100644 --- a/docs/root/intro/arch_overview/other_protocols/other_protocols.rst +++ b/docs/root/intro/arch_overview/other_protocols/other_protocols.rst @@ -8,3 +8,4 @@ Other protocols mongo dynamo redis + postgres diff --git a/docs/root/intro/arch_overview/other_protocols/postgres.rst b/docs/root/intro/arch_overview/other_protocols/postgres.rst new file mode 100644 index 0000000000000..7fa14f5f4b687 --- /dev/null +++ b/docs/root/intro/arch_overview/other_protocols/postgres.rst @@ -0,0 +1,31 @@ +.. _arch_overview_postgres: + +Postgres +========== + +Envoy supports a network level Postgres sniffing filter to add network observability. By using the +Postgres proxy, Envoy is able to decode `Postgres frontend/backend protocol`_ and gather +statistics from the decoded information. + +The main goal of the Postgres filter is to capture runtime statistics without impacting or +generating any load on the Postgres upstream server, it is transparent to it. The filter currently +offers the following features: + +* Decode non SSL traffic, ignore SSL traffic. +* Decode session information. +* Capture transaction information, including commits and rollbacks. +* Expose counters for different types of statements (INSERTs, DELETEs, UPDATEs, etc). + The counters are updated based on decoding backend CommandComplete messages not by decoding SQL statements sent by a client. +* Count frontend, backend and unknown messages. +* Identify errors and notices backend responses. + +The Postgres filter solves a notable problem for Postgres deployments: +gathering this information either imposes additional load to the server; or +requires pull-based querying for metadata from the server, sometimes requiring +external components or extensions. This filter provides valuable observability +information, without impacting the performance of the upstream Postgres +server or requiring the installation of any software. + +Postgres proxy filter :ref:`configuration reference `. + +.. _Postgres frontend/backend protocol: https://www.postgresql.org/docs/current/protocol.html diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index df79a2a34364c..662f014a72d2a 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -15,6 +15,7 @@ Changes `google.api.HttpBody `_. * http: fixed a bug where the upgrade header was not cleared on responses to non-upgrade requests. Can be reverted temporarily by setting runtime feature `envoy.reloadable_features.fix_upgrade_response` to false. +* network filters: added a :ref:`postgres proxy filter `. * router: allow retries of streaming or incomplete requests. This removes stat `rq_retry_skipped_request_not_complete`. * tracing: tracing configuration has been made fully dynamic and every HTTP connection manager can now have a separate :ref:`tracing provider `. diff --git a/generated_api_shadow/envoy/extensions/filters/network/postgres_proxy/v3alpha/BUILD b/generated_api_shadow/envoy/extensions/filters/network/postgres_proxy/v3alpha/BUILD new file mode 100644 index 0000000000000..ef3541ebcb1df --- /dev/null +++ b/generated_api_shadow/envoy/extensions/filters/network/postgres_proxy/v3alpha/BUILD @@ -0,0 +1,9 @@ +# DO NOT EDIT. This file is generated by tools/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = ["@com_github_cncf_udpa//udpa/annotations:pkg"], +) diff --git a/generated_api_shadow/envoy/extensions/filters/network/postgres_proxy/v3alpha/postgres_proxy.proto b/generated_api_shadow/envoy/extensions/filters/network/postgres_proxy/v3alpha/postgres_proxy.proto new file mode 100644 index 0000000000000..61f3ec45c8838 --- /dev/null +++ b/generated_api_shadow/envoy/extensions/filters/network/postgres_proxy/v3alpha/postgres_proxy.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +package envoy.extensions.filters.network.postgres_proxy.v3alpha; + +import "udpa/annotations/migrate.proto"; +import "udpa/annotations/status.proto"; +import "udpa/annotations/versioning.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.filters.network.postgres_proxy.v3alpha"; +option java_outer_classname = "PostgresProxyProto"; +option java_multiple_files = true; +option (udpa.annotations.file_status).work_in_progress = true; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Postgres proxy] +// Postgres Proxy :ref:`configuration overview +// `. +// [#extension: envoy.filters.network.postgres_proxy] + +message PostgresProxy { + // The human readable prefix to use when emitting :ref:`statistics + // `. + string stat_prefix = 1 [(validate.rules).string = {min_len: 1}]; +} diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index cb8956ff560c8..87b79586312f4 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -92,6 +92,7 @@ EXTENSIONS = { "envoy.filters.network.local_ratelimit": "//source/extensions/filters/network/local_ratelimit:config", "envoy.filters.network.mongo_proxy": "//source/extensions/filters/network/mongo_proxy:config", "envoy.filters.network.mysql_proxy": "//source/extensions/filters/network/mysql_proxy:config", + "envoy.filters.network.postgres_proxy": "//source/extensions/filters/network/postgres_proxy:config", "envoy.filters.network.ratelimit": "//source/extensions/filters/network/ratelimit:config", "envoy.filters.network.rbac": "//source/extensions/filters/network/rbac:config", "envoy.filters.network.redis_proxy": "//source/extensions/filters/network/redis_proxy:config", diff --git a/source/extensions/filters/network/postgres_proxy/BUILD b/source/extensions/filters/network/postgres_proxy/BUILD new file mode 100644 index 0000000000000..05b99ade974fe --- /dev/null +++ b/source/extensions/filters/network/postgres_proxy/BUILD @@ -0,0 +1,51 @@ +licenses(["notice"]) # Apache 2 + +#package(default_visibility = ["//visibility:public"]) + +# PostgresSQL proxy L7 network filter. +# Public docs: docs/root/configuration/network_filters/postgres_proxy_filter.rst + +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_extension", + "envoy_cc_library", + "envoy_package", +) + +envoy_package() + +envoy_cc_library( + name = "filter", + srcs = [ + "postgres_decoder.cc", + "postgres_filter.cc", + ], + hdrs = [ + "postgres_decoder.h", + "postgres_filter.h", + "postgres_session.h", + ], + repository = "@envoy", + deps = [ + "//include/envoy/network:filter_interface", + "//include/envoy/server:filter_config_interface", + "//include/envoy/stats:stats_interface", + "//include/envoy/stats:stats_macros", + "//source/common/buffer:buffer_lib", + "//source/common/network:filter_lib", + ], +) + +envoy_cc_extension( + name = "config", + srcs = ["config.cc"], + hdrs = ["config.h"], + repository = "@envoy", + security_posture = "requires_trusted_downstream_and_upstream", + deps = [ + ":filter", + "//source/extensions/filters/network:well_known_names", + "//source/extensions/filters/network/common:factory_base_lib", + "@envoy_api//envoy/extensions/filters/network/postgres_proxy/v3alpha:pkg_cc_proto", + ], +) diff --git a/source/extensions/filters/network/postgres_proxy/config.cc b/source/extensions/filters/network/postgres_proxy/config.cc new file mode 100644 index 0000000000000..948ccd9f58a07 --- /dev/null +++ b/source/extensions/filters/network/postgres_proxy/config.cc @@ -0,0 +1,34 @@ +#include "extensions/filters/network/postgres_proxy/config.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace PostgresProxy { + +/** + * Config registration for the Postgres proxy filter. @see NamedNetworkFilterConfigFactory. + */ +Network::FilterFactoryCb +NetworkFilters::PostgresProxy::PostgresConfigFactory::createFilterFactoryFromProtoTyped( + const envoy::extensions::filters::network::postgres_proxy::v3alpha::PostgresProxy& proto_config, + Server::Configuration::FactoryContext& context) { + ASSERT(!proto_config.stat_prefix().empty()); + + const std::string stat_prefix = fmt::format("postgres.{}", proto_config.stat_prefix()); + + PostgresFilterConfigSharedPtr filter_config( + std::make_shared(stat_prefix, context.scope())); + return [filter_config](Network::FilterManager& filter_manager) -> void { + filter_manager.addFilter(std::make_shared(filter_config)); + }; +} + +/** + * Static registration for the Postgres proxy filter. @see RegisterFactory. + */ +REGISTER_FACTORY(PostgresConfigFactory, Server::Configuration::NamedNetworkFilterConfigFactory); + +} // namespace PostgresProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/postgres_proxy/config.h b/source/extensions/filters/network/postgres_proxy/config.h new file mode 100644 index 0000000000000..4c5f1e4a8a504 --- /dev/null +++ b/source/extensions/filters/network/postgres_proxy/config.h @@ -0,0 +1,34 @@ +#pragma once + +#include "envoy/extensions/filters/network/postgres_proxy/v3alpha/postgres_proxy.pb.h" +#include "envoy/extensions/filters/network/postgres_proxy/v3alpha/postgres_proxy.pb.validate.h" + +#include "extensions/filters/network/common/factory_base.h" +#include "extensions/filters/network/postgres_proxy/postgres_filter.h" +#include "extensions/filters/network/well_known_names.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace PostgresProxy { + +/** + * Config registration for the Postgres proxy filter. + */ +class PostgresConfigFactory + : public Common::FactoryBase< + envoy::extensions::filters::network::postgres_proxy::v3alpha::PostgresProxy> { +public: + PostgresConfigFactory() : FactoryBase{NetworkFilterNames::get().Postgres} {} + +private: + Network::FilterFactoryCb createFilterFactoryFromProtoTyped( + const envoy::extensions::filters::network::postgres_proxy::v3alpha::PostgresProxy& + proto_config, + Server::Configuration::FactoryContext& context) override; +}; + +} // namespace PostgresProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/postgres_proxy/postgres_decoder.cc b/source/extensions/filters/network/postgres_proxy/postgres_decoder.cc new file mode 100644 index 0000000000000..d4d3702c33a7f --- /dev/null +++ b/source/extensions/filters/network/postgres_proxy/postgres_decoder.cc @@ -0,0 +1,330 @@ +#include "extensions/filters/network/postgres_proxy/postgres_decoder.h" + +#include + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace PostgresProxy { + +void DecoderImpl::initialize() { + // Special handler for first message of the transaction. + first_ = MsgProcessor{"Startup", {}}; + + // Frontend messages. + FE_messages_.direction_ = "Frontend"; + + // Setup handlers for known messages. + absl::flat_hash_map& FE_known_msgs = FE_messages_.messages_; + + // Handler for know messages. + FE_known_msgs['B'] = MsgProcessor{"Bind", {}}; + FE_known_msgs['C'] = MsgProcessor{"Close", {}}; + FE_known_msgs['d'] = MsgProcessor{"CopyData", {}}; + FE_known_msgs['c'] = MsgProcessor{"CopyDone", {}}; + FE_known_msgs['f'] = MsgProcessor{"CopyFail", {}}; + FE_known_msgs['D'] = MsgProcessor{"Describe", {}}; + FE_known_msgs['E'] = MsgProcessor{"Execute", {}}; + FE_known_msgs['H'] = MsgProcessor{"Flush", {}}; + FE_known_msgs['F'] = MsgProcessor{"FunctionCall", {}}; + FE_known_msgs['p'] = + MsgProcessor{"PasswordMessage/GSSResponse/SASLInitialResponse/SASLResponse", {}}; + FE_known_msgs['P'] = MsgProcessor{"Parse", {}}; + FE_known_msgs['Q'] = MsgProcessor{"Query", {}}; + FE_known_msgs['S'] = MsgProcessor{"Sync", {}}; + FE_known_msgs['X'] = MsgProcessor{"Terminate", {&DecoderImpl::decodeFrontendTerminate}}; + + // Handler for unknown messages. + FE_messages_.unknown_ = MsgProcessor{"Other", {&DecoderImpl::incMessagesUnknown}}; + + // Backend messages. + BE_messages_.direction_ = "Backend"; + + // Setup handlers for known messages. + absl::flat_hash_map& BE_known_msgs = BE_messages_.messages_; + + // Handler for know messages. + BE_known_msgs['R'] = MsgProcessor{"Authentication", {&DecoderImpl::decodeAuthentication}}; + BE_known_msgs['K'] = MsgProcessor{"BackendKeyData", {}}; + BE_known_msgs['2'] = MsgProcessor{"BindComplete", {}}; + BE_known_msgs['3'] = MsgProcessor{"CloseComplete", {}}; + BE_known_msgs['C'] = MsgProcessor{"CommandComplete", {&DecoderImpl::decodeBackendStatements}}; + BE_known_msgs['d'] = MsgProcessor{"CopyData", {}}; + BE_known_msgs['c'] = MsgProcessor{"CopyDone", {}}; + BE_known_msgs['G'] = MsgProcessor{"CopyInResponse", {}}; + BE_known_msgs['H'] = MsgProcessor{"CopyOutResponse", {}}; + BE_known_msgs['D'] = MsgProcessor{"DataRow", {}}; + BE_known_msgs['I'] = MsgProcessor{"EmptyQueryResponse", {}}; + BE_known_msgs['E'] = MsgProcessor{"ErrorResponse", {&DecoderImpl::decodeBackendErrorResponse}}; + BE_known_msgs['V'] = MsgProcessor{"FunctionCallResponse", {}}; + BE_known_msgs['v'] = MsgProcessor{"NegotiateProtocolVersion", {}}; + BE_known_msgs['n'] = MsgProcessor{"NoData", {}}; + BE_known_msgs['N'] = MsgProcessor{"NoticeResponse", {&DecoderImpl::decodeBackendNoticeResponse}}; + BE_known_msgs['A'] = MsgProcessor{"NotificationResponse", {}}; + BE_known_msgs['t'] = MsgProcessor{"ParameterDescription", {}}; + BE_known_msgs['S'] = MsgProcessor{"ParameterStatus", {}}; + BE_known_msgs['1'] = MsgProcessor{"ParseComplete", {}}; + BE_known_msgs['s'] = MsgProcessor{"PortalSuspend", {}}; + BE_known_msgs['Z'] = MsgProcessor{"ReadyForQuery", {}}; + BE_known_msgs['T'] = MsgProcessor{"RowDescription", {}}; + + // Handler for unknown messages. + BE_messages_.unknown_ = MsgProcessor{"Other", {&DecoderImpl::incMessagesUnknown}}; + + // Setup hash map for handling backend statements. + BE_statements_["BEGIN"] = [this](DecoderImpl*) -> void { + callbacks_->incStatements(DecoderCallbacks::StatementType::Other); + callbacks_->incTransactions(); + session_.setInTransaction(true); + }; + BE_statements_["ROLLBACK"] = [this](DecoderImpl*) -> void { + callbacks_->incStatements(DecoderCallbacks::StatementType::Noop); + callbacks_->incTransactionsRollback(); + session_.setInTransaction(false); + }; + BE_statements_["START"] = [this](DecoderImpl*) -> void { + callbacks_->incStatements(DecoderCallbacks::StatementType::Other); + callbacks_->incTransactions(); + session_.setInTransaction(true); + }; + BE_statements_["COMMIT"] = [this](DecoderImpl*) -> void { + callbacks_->incStatements(DecoderCallbacks::StatementType::Noop); + session_.setInTransaction(false); + callbacks_->incTransactionsCommit(); + }; + BE_statements_["SELECT"] = [this](DecoderImpl*) -> void { + callbacks_->incStatements(DecoderCallbacks::StatementType::Select); + callbacks_->incTransactions(); + callbacks_->incTransactionsCommit(); + }; + BE_statements_["INSERT"] = [this](DecoderImpl*) -> void { + callbacks_->incStatements(DecoderCallbacks::StatementType::Insert); + callbacks_->incTransactions(); + callbacks_->incTransactionsCommit(); + }; + BE_statements_["UPDATE"] = [this](DecoderImpl*) -> void { + callbacks_->incStatements(DecoderCallbacks::StatementType::Update); + callbacks_->incTransactions(); + callbacks_->incTransactionsCommit(); + }; + BE_statements_["DELETE"] = [this](DecoderImpl*) -> void { + callbacks_->incStatements(DecoderCallbacks::StatementType::Delete); + callbacks_->incTransactions(); + callbacks_->incTransactionsCommit(); + }; + + // Setup hash map for handling backend ErrorResponse messages. + BE_errors_.keywords_["ERROR"] = [this](DecoderImpl*) -> void { + callbacks_->incErrors(DecoderCallbacks::ErrorType::Error); + }; + BE_errors_.keywords_["FATAL"] = [this](DecoderImpl*) -> void { + callbacks_->incErrors(DecoderCallbacks::ErrorType::Fatal); + }; + BE_errors_.keywords_["PANIC"] = [this](DecoderImpl*) -> void { + callbacks_->incErrors(DecoderCallbacks::ErrorType::Panic); + }; + // Setup handler which is called when decoder cannot decode the message and treats it as Unknown + // Error message. + BE_errors_.unknown_ = [this](DecoderImpl*) -> void { + callbacks_->incErrors(DecoderCallbacks::ErrorType::Unknown); + }; + + // Setup hash map for handling backend NoticeResponse messages. + BE_notices_.keywords_["WARNING"] = [this](DecoderImpl*) -> void { + callbacks_->incNotices(DecoderCallbacks::NoticeType::Warning); + }; + BE_notices_.keywords_["NOTICE"] = [this](DecoderImpl*) -> void { + callbacks_->incNotices(DecoderCallbacks::NoticeType::Notice); + }; + BE_notices_.keywords_["DEBUG"] = [this](DecoderImpl*) -> void { + callbacks_->incNotices(DecoderCallbacks::NoticeType::Debug); + }; + BE_notices_.keywords_["INFO"] = [this](DecoderImpl*) -> void { + callbacks_->incNotices(DecoderCallbacks::NoticeType::Info); + }; + BE_notices_.keywords_["LOG"] = [this](DecoderImpl*) -> void { + callbacks_->incNotices(DecoderCallbacks::NoticeType::Log); + }; + // Setup handler which is called when decoder cannot decode the message and treats it as Unknown + // Notice message. + BE_notices_.unknown_ = [this](DecoderImpl*) -> void { + callbacks_->incNotices(DecoderCallbacks::NoticeType::Unknown); + }; +} + +bool DecoderImpl::parseMessage(Buffer::Instance& data) { + ENVOY_LOG(trace, "postgres_proxy: parsing message, len {}", data.length()); + + // The minimum size of the message sufficient for parsing is 5 bytes. + if (data.length() < 5) { + // not enough data in the buffer. + return false; + } + + if (!startup_) { + data.copyOut(0, 1, &command_); + ENVOY_LOG(trace, "postgres_proxy: command is {}", command_); + } + + // The 1 byte message type and message length should be in the buffer + // Check if the entire message has been read. + std::string message; + uint32_t length = data.peekBEInt(startup_ ? 0 : 1); + if (data.length() < (length + (startup_ ? 0 : 1))) { + ENVOY_LOG(trace, "postgres_proxy: cannot parse message. Need {} bytes in buffer", + length + (startup_ ? 0 : 1)); + // Not enough data in the buffer. + return false; + } + + if (startup_) { + uint32_t code = data.peekBEInt(4); + // Startup message with 1234 in the most significant 16 bits + // indicate request to encrypt. + if (code >= 0x04d20000) { + ENVOY_LOG(trace, "postgres_proxy: detected encrypted traffic."); + encrypted_ = true; + startup_ = false; + incSessionsEncrypted(); + data.drain(data.length()); + return false; + } else { + ENVOY_LOG(debug, "Detected version {}.{} of Postgres", code >> 16, code & 0x0000FFFF); + } + } + + setMessageLength(length); + + data.drain(startup_ ? 4 : 5); // Length plus optional 1st byte. + + auto bytesToRead = length - 4; + message.assign(std::string(static_cast(data.linearize(bytesToRead)), bytesToRead)); + data.drain(bytesToRead); + setMessage(message); + + ENVOY_LOG(trace, "postgres_proxy: msg parsed"); + return true; +} + +bool DecoderImpl::onData(Buffer::Instance& data, bool frontend) { + // If encrypted, just drain the traffic. + if (encrypted_) { + ENVOY_LOG(trace, "postgres_proxy: ignoring {} bytes of encrypted data", data.length()); + data.drain(data.length()); + return true; + } + + ENVOY_LOG(trace, "postgres_proxy: decoding {} bytes", data.length()); + + if (!parseMessage(data)) { + return false; + } + + MsgGroup& msg_processor = std::ref(frontend ? FE_messages_ : BE_messages_); + frontend ? callbacks_->incMessagesFrontend() : callbacks_->incMessagesBackend(); + + // Set processing to the handler of unknown messages. + // If message is found, the processing will be updated. + std::reference_wrapper msg = msg_processor.unknown_; + + if (startup_) { + msg = std::ref(first_); + startup_ = false; + } else { + auto it = msg_processor.messages_.find(command_); + if (it != msg_processor.messages_.end()) { + msg = std::ref((*it).second); + } + } + + std::vector& actions = std::get<1>(msg.get()); + for (const auto& action : actions) { + action(this); + } + + ENVOY_LOG(debug, "({}) command = {} ({})", msg_processor.direction_, command_, + std::get<0>(msg.get())); + ENVOY_LOG(debug, "({}) length = {}", msg_processor.direction_, getMessageLength()); + ENVOY_LOG(debug, "({}) message = {}", msg_processor.direction_, getMessage()); + + ENVOY_LOG(trace, "postgres_proxy: {} bytes remaining in buffer", data.length()); + + return true; +} + +// Method is called when C (CommandComplete) message has been +// decoded. It extracts the keyword from message's payload +// and updates stats associated with that keyword. +void DecoderImpl::decodeBackendStatements() { + // The message_ contains the statement. Find space character + // and the statement is the first word. If space cannot be found + // take the whole message. + std::string statement = message_.substr(0, message_.find(' ')); + + auto it = BE_statements_.find(statement); + if (it != BE_statements_.end()) { + (*it).second(this); + } else { + callbacks_->incStatements(DecoderCallbacks::StatementType::Other); + callbacks_->incTransactions(); + callbacks_->incTransactionsCommit(); + } +} + +// Method is called when X (Terminate) message +// is encountered by the decoder. +void DecoderImpl::decodeFrontendTerminate() { + if (session_.inTransaction()) { + session_.setInTransaction(false); + callbacks_->incTransactionsRollback(); + } +} + +// Method does deep inspection of Authentication message. +// It looks for 4 bytes of zeros, which means that login to +// database was successful. +void DecoderImpl::decodeAuthentication() { + // Check if auth message indicates successful authentication. + // Length must be 8 and payload must be 0. + if ((8 == message_len_) && (0 == message_.data()[0]) && (0 == message_.data()[1]) && + (0 == message_.data()[2]) && (0 == message_.data()[3])) { + incSessionsUnencrypted(); + } +} + +// Method is used to parse Error and Notice messages. Their syntax is the same, but they +// use different keywords inside the message and statistics fields are different. +void DecoderImpl::decodeErrorNotice(MsgParserDict& types) { + // Error/Notice message should start with character "S". + if (message_[0] != 'S') { + types.unknown_(this); + return; + } + + for (const auto& it : types.keywords_) { + // Try to find a keyword with S prefix or V prefix. + // Postgres versions prior to 9.6 use only S prefix while + // versions higher than 9.6 use S and V prefixes. + if ((message_.find("S" + it.first) != std::string::npos) || + (message_.find("V" + it.first) != std::string::npos)) { + it.second(this); + return; + } + } + + // Keyword was not found in the message. Count is as Unknown. + types.unknown_(this); +} + +// Method parses E (Error) message and looks for string +// indicating that error happened. +void DecoderImpl::decodeBackendErrorResponse() { decodeErrorNotice(BE_errors_); } + +// Method parses N (Notice) message and looks for string +// indicating its meaning. It can be warning, notice, info, debug or log. +void DecoderImpl::decodeBackendNoticeResponse() { decodeErrorNotice(BE_notices_); } + +} // namespace PostgresProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/postgres_proxy/postgres_decoder.h b/source/extensions/filters/network/postgres_proxy/postgres_decoder.h new file mode 100644 index 0000000000000..bd779a2c24ac4 --- /dev/null +++ b/source/extensions/filters/network/postgres_proxy/postgres_decoder.h @@ -0,0 +1,152 @@ +#pragma once +#include + +#include "envoy/common/platform.h" + +#include "common/buffer/buffer_impl.h" +#include "common/common/logger.h" + +#include "extensions/filters/network/postgres_proxy/postgres_session.h" + +#include "absl/container/flat_hash_map.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace PostgresProxy { + +// General callbacks for dispatching decoded Postgres messages to a sink. +class DecoderCallbacks { +public: + virtual ~DecoderCallbacks() = default; + + virtual void incMessagesBackend() PURE; + virtual void incMessagesFrontend() PURE; + virtual void incMessagesUnknown() PURE; + + virtual void incSessionsEncrypted() PURE; + virtual void incSessionsUnencrypted() PURE; + + enum class StatementType { Insert, Delete, Select, Update, Other, Noop }; + virtual void incStatements(StatementType) PURE; + + virtual void incTransactions() PURE; + virtual void incTransactionsCommit() PURE; + virtual void incTransactionsRollback() PURE; + + enum class NoticeType { Warning, Notice, Debug, Info, Log, Unknown }; + virtual void incNotices(NoticeType) PURE; + + enum class ErrorType { Error, Fatal, Panic, Unknown }; + virtual void incErrors(ErrorType) PURE; +}; + +// Postgres message decoder. +class Decoder { +public: + virtual ~Decoder() = default; + + virtual bool onData(Buffer::Instance& data, bool frontend) PURE; + virtual PostgresSession& getSession() PURE; +}; + +using DecoderPtr = std::unique_ptr; + +class DecoderImpl : public Decoder, Logger::Loggable { +public: + DecoderImpl(DecoderCallbacks* callbacks) : callbacks_(callbacks) { initialize(); } + + bool onData(Buffer::Instance& data, bool frontend) override; + PostgresSession& getSession() override { return session_; } + + void setMessage(std::string message) { message_ = message; } + std::string getMessage() { return message_; } + + void setMessageLength(uint32_t message_len) { message_len_ = message_len; } + uint32_t getMessageLength() { return message_len_; } + + void setStartup(bool startup) { startup_ = startup; } + void initialize(); + + bool encrypted() const { return encrypted_; } + +protected: + // Message action defines the Decoder's method which will be invoked + // when a specific message has been decoded. + using MsgAction = std::function; + + // MsgProcessor has two fields: + // first - string with message description + // second - vector of Decoder's methods which are invoked when the message + // is processed. + using MsgProcessor = std::pair>; + + // Frontend and Backend messages. + using MsgGroup = struct { + // String describing direction (Frontend or Backend). + std::string direction_; + // Hash map indexed by messages' 1st byte points to handlers used for processing messages. + absl::flat_hash_map messages_; + // Handler used for processing messages not found in hash map. + MsgProcessor unknown_; + }; + + // Hash map binding keyword found in a message to an + // action to be executed when the keyword is found. + using KeywordProcessor = absl::flat_hash_map; + + // Structure is used for grouping keywords found in a specific message. + // Known keywords are dispatched via hash map and unknown keywords + // are handled by unknown_. + using MsgParserDict = struct { + // Handler for known keywords. + KeywordProcessor keywords_; + // Handler invoked when a keyword is not found in hash map. + MsgAction unknown_; + }; + + bool parseMessage(Buffer::Instance& data); + void decode(Buffer::Instance& data); + void decodeAuthentication(); + void decodeBackendStatements(); + void decodeBackendErrorResponse(); + void decodeBackendNoticeResponse(); + void decodeFrontendTerminate(); + void decodeErrorNotice(MsgParserDict& types); + + void incMessagesUnknown() { callbacks_->incMessagesUnknown(); } + void incSessionsEncrypted() { callbacks_->incSessionsEncrypted(); } + void incSessionsUnencrypted() { callbacks_->incSessionsUnencrypted(); } + + DecoderCallbacks* callbacks_{}; + PostgresSession session_{}; + + // The following fields store result of message parsing. + char command_{}; + std::string message_; + uint32_t message_len_{}; + + bool startup_{true}; // startup stage does not have 1st byte command + bool encrypted_{false}; // tells if exchange is encrypted + + // Dispatchers for Backend (BE) and Frontend (FE) messages. + MsgGroup FE_messages_; + MsgGroup BE_messages_; + + // Handler for startup postgres message. + // Startup message message which does not start with 1 byte TYPE. + // It starts with message length and must be therefore handled + // differently. + MsgProcessor first_; + + // hash map for dispatching backend transaction messages + KeywordProcessor BE_statements_; + + MsgParserDict BE_errors_; + MsgParserDict BE_notices_; +}; + +} // namespace PostgresProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/postgres_proxy/postgres_filter.cc b/source/extensions/filters/network/postgres_proxy/postgres_filter.cc new file mode 100644 index 0000000000000..c339de5dd47c7 --- /dev/null +++ b/source/extensions/filters/network/postgres_proxy/postgres_filter.cc @@ -0,0 +1,173 @@ +#include "extensions/filters/network/postgres_proxy/postgres_filter.h" + +#include "envoy/buffer/buffer.h" +#include "envoy/network/connection.h" + +#include "extensions/filters/network/postgres_proxy/postgres_decoder.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace PostgresProxy { + +PostgresFilterConfig::PostgresFilterConfig(const std::string& stat_prefix, Stats::Scope& scope) + : stat_prefix_{stat_prefix}, scope_{scope}, stats_{generateStats(stat_prefix, scope)} {} + +PostgresFilter::PostgresFilter(PostgresFilterConfigSharedPtr config) : config_{config} { + if (!decoder_) { + decoder_ = createDecoder(this); + } +} + +// Network::ReadFilter +Network::FilterStatus PostgresFilter::onData(Buffer::Instance& data, bool) { + ENVOY_CONN_LOG(trace, "echo: got {} bytes", read_callbacks_->connection(), data.length()); + + // Frontend Buffer + frontend_buffer_.add(data); + doDecode(frontend_buffer_, true); + + return Network::FilterStatus::Continue; +} + +Network::FilterStatus PostgresFilter::onNewConnection() { return Network::FilterStatus::Continue; } + +void PostgresFilter::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { + read_callbacks_ = &callbacks; +} + +// Network::WriteFilter +Network::FilterStatus PostgresFilter::onWrite(Buffer::Instance& data, bool) { + + // Backend Buffer + backend_buffer_.add(data); + doDecode(backend_buffer_, false); + + return Network::FilterStatus::Continue; +} + +DecoderPtr PostgresFilter::createDecoder(DecoderCallbacks* callbacks) { + return std::make_unique(callbacks); +} + +void PostgresFilter::incMessagesBackend() { + config_->stats_.messages_.inc(); + config_->stats_.messages_backend_.inc(); +} + +void PostgresFilter::incMessagesFrontend() { + config_->stats_.messages_.inc(); + config_->stats_.messages_frontend_.inc(); +} + +void PostgresFilter::incMessagesUnknown() { + config_->stats_.messages_.inc(); + config_->stats_.messages_unknown_.inc(); +} + +void PostgresFilter::incSessionsEncrypted() { + config_->stats_.sessions_.inc(); + config_->stats_.sessions_encrypted_.inc(); +} + +void PostgresFilter::incSessionsUnencrypted() { + config_->stats_.sessions_.inc(); + config_->stats_.sessions_unencrypted_.inc(); +} + +void PostgresFilter::incTransactions() { + if (!decoder_->getSession().inTransaction()) { + config_->stats_.transactions_.inc(); + } +} + +void PostgresFilter::incTransactionsCommit() { + if (!decoder_->getSession().inTransaction()) { + config_->stats_.transactions_commit_.inc(); + } +} + +void PostgresFilter::incTransactionsRollback() { + if (!decoder_->getSession().inTransaction()) { + config_->stats_.transactions_rollback_.inc(); + } +} + +void PostgresFilter::incNotices(NoticeType type) { + config_->stats_.notices_.inc(); + switch (type) { + case DecoderCallbacks::NoticeType::Warning: + config_->stats_.notices_warning_.inc(); + break; + case DecoderCallbacks::NoticeType::Notice: + config_->stats_.notices_notice_.inc(); + break; + case DecoderCallbacks::NoticeType::Debug: + config_->stats_.notices_debug_.inc(); + break; + case DecoderCallbacks::NoticeType::Info: + config_->stats_.notices_info_.inc(); + break; + case DecoderCallbacks::NoticeType::Log: + config_->stats_.notices_log_.inc(); + break; + case DecoderCallbacks::NoticeType::Unknown: + config_->stats_.notices_unknown_.inc(); + break; + } +} + +void PostgresFilter::incErrors(ErrorType type) { + config_->stats_.errors_.inc(); + switch (type) { + case DecoderCallbacks::ErrorType::Error: + config_->stats_.errors_error_.inc(); + break; + case DecoderCallbacks::ErrorType::Fatal: + config_->stats_.errors_fatal_.inc(); + break; + case DecoderCallbacks::ErrorType::Panic: + config_->stats_.errors_panic_.inc(); + break; + case DecoderCallbacks::ErrorType::Unknown: + config_->stats_.errors_unknown_.inc(); + break; + } +} + +void PostgresFilter::incStatements(StatementType type) { + config_->stats_.statements_.inc(); + + switch (type) { + case DecoderCallbacks::StatementType::Insert: + config_->stats_.statements_insert_.inc(); + break; + case DecoderCallbacks::StatementType::Delete: + config_->stats_.statements_delete_.inc(); + break; + case DecoderCallbacks::StatementType::Select: + config_->stats_.statements_select_.inc(); + break; + case DecoderCallbacks::StatementType::Update: + config_->stats_.statements_update_.inc(); + break; + case DecoderCallbacks::StatementType::Other: + config_->stats_.statements_other_.inc(); + break; + case DecoderCallbacks::StatementType::Noop: + break; + } +} + +void PostgresFilter::doDecode(Buffer::Instance& data, bool frontend) { + // Keep processing data until buffer is empty or decoder says + // that it cannot process data in the buffer. + while ((0 < data.length()) && (decoder_->onData(data, frontend))) { + ; + } +} + +} // namespace PostgresProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/postgres_proxy/postgres_filter.h b/source/extensions/filters/network/postgres_proxy/postgres_filter.h new file mode 100644 index 0000000000000..0355bea4b1f3b --- /dev/null +++ b/source/extensions/filters/network/postgres_proxy/postgres_filter.h @@ -0,0 +1,126 @@ +#pragma once + +#include "envoy/network/filter.h" +#include "envoy/stats/scope.h" +#include "envoy/stats/stats.h" +#include "envoy/stats/stats_macros.h" + +#include "common/buffer/buffer_impl.h" +#include "common/common/logger.h" + +#include "extensions/filters/network/postgres_proxy/postgres_decoder.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace PostgresProxy { + +/** + * All Postgres proxy stats. @see stats_macros.h + */ +#define ALL_POSTGRES_PROXY_STATS(COUNTER) \ + COUNTER(errors) \ + COUNTER(errors_error) \ + COUNTER(errors_fatal) \ + COUNTER(errors_panic) \ + COUNTER(errors_unknown) \ + COUNTER(messages) \ + COUNTER(messages_backend) \ + COUNTER(messages_frontend) \ + COUNTER(messages_unknown) \ + COUNTER(sessions) \ + COUNTER(sessions_encrypted) \ + COUNTER(sessions_unencrypted) \ + COUNTER(statements) \ + COUNTER(statements_insert) \ + COUNTER(statements_delete) \ + COUNTER(statements_update) \ + COUNTER(statements_select) \ + COUNTER(statements_other) \ + COUNTER(transactions) \ + COUNTER(transactions_commit) \ + COUNTER(transactions_rollback) \ + COUNTER(notices) \ + COUNTER(notices_notice) \ + COUNTER(notices_warning) \ + COUNTER(notices_debug) \ + COUNTER(notices_info) \ + COUNTER(notices_log) \ + COUNTER(notices_unknown) + +/** + * Struct definition for all Postgres proxy stats. @see stats_macros.h + */ +struct PostgresProxyStats { + ALL_POSTGRES_PROXY_STATS(GENERATE_COUNTER_STRUCT) +}; + +/** + * Configuration for the Postgres proxy filter. + */ +class PostgresFilterConfig { +public: + PostgresFilterConfig(const std::string& stat_prefix, Stats::Scope& scope); + + const std::string stat_prefix_; + Stats::Scope& scope_; + PostgresProxyStats stats_; + +private: + PostgresProxyStats generateStats(const std::string& prefix, Stats::Scope& scope) { + return PostgresProxyStats{ALL_POSTGRES_PROXY_STATS(POOL_COUNTER_PREFIX(scope, prefix))}; + } +}; + +using PostgresFilterConfigSharedPtr = std::shared_ptr; + +class PostgresFilter : public Network::Filter, + DecoderCallbacks, + Logger::Loggable { +public: + PostgresFilter(PostgresFilterConfigSharedPtr config); + ~PostgresFilter() override = default; + + // Network::ReadFilter + Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override; + Network::FilterStatus onNewConnection() override; + void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override; + + // Network::WriteFilter + Network::FilterStatus onWrite(Buffer::Instance& data, bool end_stream) override; + + // PostgresProxy::DecoderCallback + void incErrors(ErrorType) override; + void incMessagesBackend() override; + void incMessagesFrontend() override; + void incMessagesUnknown() override; + void incNotices(NoticeType) override; + void incSessionsEncrypted() override; + void incSessionsUnencrypted() override; + void incStatements(StatementType) override; + void incTransactions() override; + void incTransactionsCommit() override; + void incTransactionsRollback() override; + + void doDecode(Buffer::Instance& data, bool); + DecoderPtr createDecoder(DecoderCallbacks* callbacks); + void setDecoder(std::unique_ptr decoder) { decoder_ = std::move(decoder); } + Decoder* getDecoder() const { return decoder_.get(); } + + // Routines used during integration and unit tests + uint32_t getFrontendBufLength() const { return frontend_buffer_.length(); } + uint32_t getBackendBufLength() const { return backend_buffer_.length(); } + const PostgresProxyStats& getStats() const { return config_->stats_; } + +private: + Network::ReadFilterCallbacks* read_callbacks_{}; + PostgresFilterConfigSharedPtr config_; + Buffer::OwnedImpl frontend_buffer_; + Buffer::OwnedImpl backend_buffer_; + std::unique_ptr decoder_; +}; + +} // namespace PostgresProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/postgres_proxy/postgres_session.h b/source/extensions/filters/network/postgres_proxy/postgres_session.h new file mode 100644 index 0000000000000..46d8632530bc3 --- /dev/null +++ b/source/extensions/filters/network/postgres_proxy/postgres_session.h @@ -0,0 +1,24 @@ +#pragma once +#include + +#include "common/common/logger.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace PostgresProxy { + +// Class stores data about the current state of a transaction between postgres client and server. +class PostgresSession { +public: + bool inTransaction() { return in_transaction_; }; + void setInTransaction(bool in_transaction) { in_transaction_ = in_transaction; }; + +private: + bool in_transaction_{false}; +}; + +} // namespace PostgresProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/well_known_names.h b/source/extensions/filters/network/well_known_names.h index 9a9b0e59d74aa..a7577b8ffd2c3 100644 --- a/source/extensions/filters/network/well_known_names.h +++ b/source/extensions/filters/network/well_known_names.h @@ -28,6 +28,8 @@ class NetworkFilterNameValues { const std::string MongoProxy = "envoy.filters.network.mongo_proxy"; // MySQL proxy filter const std::string MySQLProxy = "envoy.filters.network.mysql_proxy"; + // Postgres proxy filter + const std::string Postgres = "envoy.filters.network.postgres_proxy"; // Rate limit filter const std::string RateLimit = "envoy.filters.network.ratelimit"; // Redis proxy filter diff --git a/test/extensions/filters/network/postgres_proxy/BUILD b/test/extensions/filters/network/postgres_proxy/BUILD new file mode 100644 index 0000000000000..f319540a50e9c --- /dev/null +++ b/test/extensions/filters/network/postgres_proxy/BUILD @@ -0,0 +1,67 @@ +licenses(["notice"]) # Apache 2 + +load( + "//bazel:envoy_build_system.bzl", + "envoy_package", +) +load( + "//test/extensions:extensions_build_system.bzl", + "envoy_extension_cc_test", + "envoy_extension_cc_test_library", +) + +envoy_package() + +envoy_extension_cc_test_library( + name = "postgres_test_utils_lib", + srcs = ["postgres_test_utils.cc"], + hdrs = ["postgres_test_utils.h"], + extension_name = "envoy.filters.network.postgres_proxy", + deps = [ + "//source/common/buffer:buffer_lib", + ], +) + +envoy_extension_cc_test( + name = "postgres_decoder_tests", + srcs = [ + "postgres_decoder_test.cc", + ], + extension_name = "envoy.filters.network.postgres_proxy", + deps = [ + ":postgres_test_utils_lib", + "//source/extensions/filters/network/postgres_proxy:filter", + "//test/mocks/network:network_mocks", + ], +) + +envoy_extension_cc_test( + name = "postgres_filter_tests", + srcs = [ + "postgres_filter_test.cc", + ], + extension_name = "envoy.filters.network.postgres_proxy", + deps = [ + ":postgres_test_utils_lib", + "//source/extensions/filters/network/postgres_proxy:filter", + "//test/mocks/network:network_mocks", + ], +) + +envoy_extension_cc_test( + name = "postgres_integration_test", + srcs = [ + "postgres_integration_test.cc", + ], + data = [ + "postgres_test_config.yaml", + ], + extension_name = "envoy.filters.network.postgres_proxy", + deps = [ + "//source/common/tcp_proxy", + "//source/extensions/filters/network/postgres_proxy:config", + "//source/extensions/filters/network/postgres_proxy:filter", + "//source/extensions/filters/network/tcp_proxy:config", + "//test/integration:integration_lib", + ], +) diff --git a/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc b/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc new file mode 100644 index 0000000000000..0714e5fd99749 --- /dev/null +++ b/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc @@ -0,0 +1,417 @@ +#include +#include + +#include "extensions/filters/network/postgres_proxy/postgres_decoder.h" + +#include "test/extensions/filters/network/postgres_proxy/postgres_test_utils.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace PostgresProxy { + +class DecoderCallbacksMock : public DecoderCallbacks { +public: + MOCK_METHOD(void, incMessagesBackend, (), (override)); + MOCK_METHOD(void, incMessagesFrontend, (), (override)); + MOCK_METHOD(void, incMessagesUnknown, (), (override)); + MOCK_METHOD(void, incSessionsEncrypted, (), (override)); + MOCK_METHOD(void, incSessionsUnencrypted, (), (override)); + MOCK_METHOD(void, incStatements, (StatementType), (override)); + MOCK_METHOD(void, incTransactions, (), (override)); + MOCK_METHOD(void, incTransactionsCommit, (), (override)); + MOCK_METHOD(void, incTransactionsRollback, (), (override)); + MOCK_METHOD(void, incNotices, (NoticeType), (override)); + MOCK_METHOD(void, incErrors, (ErrorType), (override)); +}; + +// Define fixture class with decoder and mock callbacks. +class PostgresProxyDecoderTestBase { +public: + PostgresProxyDecoderTestBase() { + decoder_ = std::make_unique(&callbacks_); + decoder_->initialize(); + decoder_->setStartup(false); + } + +protected: + ::testing::NiceMock callbacks_; + std::unique_ptr decoder_; + + // fields often used + Buffer::OwnedImpl data_; + char buf_[256]; + std::string payload_; +}; + +class PostgresProxyDecoderTest : public PostgresProxyDecoderTestBase, public ::testing::Test {}; + +// Class is used for parameterized tests for frontend messages. +class PostgresProxyFrontendDecoderTest : public PostgresProxyDecoderTestBase, + public ::testing::TestWithParam {}; + +// Class is used for parameterized tests for encrypted messages. +class PostgresProxyFrontendEncrDecoderTest : public PostgresProxyDecoderTestBase, + public ::testing::TestWithParam {}; + +// Class is used for parameterized tests for backend messages. +class PostgresProxyBackendDecoderTest : public PostgresProxyDecoderTestBase, + public ::testing::TestWithParam {}; + +class PostgresProxyErrorTest + : public PostgresProxyDecoderTestBase, + public ::testing::TestWithParam> {}; + +class PostgresProxyNoticeTest + : public PostgresProxyDecoderTestBase, + public ::testing::TestWithParam> {}; + +// Test processing the startup message from a client. +// For historical reasons, the first message does not include +// command (first byte). It starts with length. The startup +// message contains the protocol version. After processing the +// startup message the server should start using message format +// with command as 1st byte. +TEST_F(PostgresProxyDecoderTest, StartupMessage) { + decoder_->setStartup(true); + + // Start with length. + data_.writeBEInt(12); + // Add 8 bytes of some data. + data_.add(buf_, 8); + decoder_->onData(data_, true); + ASSERT_THAT(data_.length(), 0); + + // Now feed normal message with 1bytes as command. + data_.add("P"); + // Add length. + data_.writeBEInt(6); // 4 bytes of length + 2 bytes of data. + data_.add("AB"); + decoder_->onData(data_, true); + ASSERT_THAT(data_.length(), 0); +} + +// Test processing messages which map 1:1 with buffer. +// The buffer contains just a single entire message and +// nothing more. +TEST_F(PostgresProxyDecoderTest, ReadingBufferSingleMessages) { + + // Feed empty buffer - should not crash. + decoder_->onData(data_, true); + + // Put one byte. This is not enough to parse the message and that byte + // should stay in the buffer. + data_.add("P"); + decoder_->onData(data_, true); + ASSERT_THAT(data_.length(), 1); + + // Add length of 4 bytes. It would mean completely empty message. + // but it should be consumed. + data_.writeBEInt(4); + decoder_->onData(data_, true); + ASSERT_THAT(data_.length(), 0); + + // Create a message with 5 additional bytes. + data_.add("P"); + // Add length. + data_.writeBEInt(9); // 4 bytes of length field + 5 of data. + data_.add(buf_, 5); + decoder_->onData(data_, true); + ASSERT_THAT(data_.length(), 0); +} + +// Test simulates situation when decoder is called with incomplete message. +// The message should not be processed until the buffer is filled +// with missing bytes. +TEST_F(PostgresProxyDecoderTest, ReadingBufferLargeMessages) { + // Fill the buffer with message of 100 bytes long + // but the buffer contains only 98 bytes. + // It should not be processed. + data_.add("P"); + // Add length. + data_.writeBEInt(100); // This also includes length field + data_.add(buf_, 94); + decoder_->onData(data_, true); + // The buffer contains command (1 byte), length (4 bytes) and 94 bytes of message. + ASSERT_THAT(data_.length(), 99); + + // Add 2 missing bytes and feed again to decoder. + data_.add("AB"); + decoder_->onData(data_, true); + ASSERT_THAT(data_.length(), 0); +} + +// Test simulates situation when a buffer contains more than one +// message. Call to the decoder should consume only one message +// at a time and only when the buffer contains the entire message. +TEST_F(PostgresProxyDecoderTest, TwoMessagesInOneBuffer) { + // Create the first message of 50 bytes long (+1 for command). + data_.add("P"); + // Add length. + data_.writeBEInt(50); + data_.add(buf_, 46); + + // Create the second message of 50 + 46 bytes (+1 for command). + data_.add("P"); + // Add length. + data_.writeBEInt(96); + data_.add(buf_, 46); + data_.add(buf_, 46); + + // The buffer contains two messaged: + // 1st: command (1 byte), length (4 bytes), 46 bytes of data + // 2nd: command (1 byte), length (4 bytes), 92 bytes of data + ASSERT_THAT(data_.length(), 148); + // Process the first message. + decoder_->onData(data_, true); + ASSERT_THAT(data_.length(), 97); + // Process the second message. + decoder_->onData(data_, true); + ASSERT_THAT(data_.length(), 0); +} + +TEST_F(PostgresProxyDecoderTest, Unknown) { + // Create invalid message. The first byte is invalid "=" + // Message must be at least 5 bytes to be parsed. + EXPECT_CALL(callbacks_, incMessagesUnknown()).Times(1); + createPostgresMsg(data_, "=", "some not important string which will be ignored anyways"); + decoder_->onData(data_, true); +} + +// Test if each frontend command calls incMessagesFrontend() method. +TEST_P(PostgresProxyFrontendDecoderTest, FrontendInc) { + EXPECT_CALL(callbacks_, incMessagesFrontend()).Times(1); + createPostgresMsg(data_, GetParam(), "Some message just to create payload"); + decoder_->onData(data_, true); +} + +// Run the above test for each frontend message. +INSTANTIATE_TEST_SUITE_P(FrontEndMessagesTests, PostgresProxyFrontendDecoderTest, + ::testing::Values("B", "C", "d", "c", "f", "D", "E", "H", "F", "p", "P", + "p", "Q", "S", "X")); + +// Test if X message triggers incRollback and sets proper state in transaction. +TEST_F(PostgresProxyFrontendDecoderTest, TerminateMessage) { + // Set decoder state NOT to be in_transaction. + decoder_->getSession().setInTransaction(false); + EXPECT_CALL(callbacks_, incTransactionsRollback()).Times(0); + createPostgresMsg(data_, "X"); + decoder_->onData(data_, true); + + // Now set the decoder to be in_transaction state. + decoder_->getSession().setInTransaction(true); + EXPECT_CALL(callbacks_, incTransactionsRollback()).Times(1); + createPostgresMsg(data_, "X"); + decoder_->onData(data_, true); + ASSERT_FALSE(decoder_->getSession().inTransaction()); +} + +// Test if each backend command calls incMessagesBackend()) method. +TEST_P(PostgresProxyBackendDecoderTest, BackendInc) { + EXPECT_CALL(callbacks_, incMessagesBackend()).Times(1); + createPostgresMsg(data_, GetParam(), "Some not important message"); + decoder_->onData(data_, false); +} + +// Run the above test for each backend message. +INSTANTIATE_TEST_SUITE_P(BackendMessagesTests, PostgresProxyBackendDecoderTest, + ::testing::Values("R", "K", "2", "3", "C", "d", "c", "G", "H", "D", "I", + "E", "V", "v", "n", "N", "A", "t", "S", "1", "s", "Z", + "T")); +// Test parsing backend messages. +// The parser should react only to the first word until the space. +TEST_F(PostgresProxyBackendDecoderTest, ParseStatement) { + // Payload contains a space after the keyword + // Rollback counter should be bumped up. + EXPECT_CALL(callbacks_, incTransactionsRollback()); + createPostgresMsg(data_, "C", "ROLLBACK 123"); + decoder_->onData(data_, false); + data_.drain(data_.length()); + + // Now try just keyword without a space at the end. + EXPECT_CALL(callbacks_, incTransactionsRollback()); + createPostgresMsg(data_, "C", "ROLLBACK"); + decoder_->onData(data_, false); + data_.drain(data_.length()); + + // Partial message should be ignored. + EXPECT_CALL(callbacks_, incTransactionsRollback()).Times(0); + EXPECT_CALL(callbacks_, incStatements(DecoderCallbacks::StatementType::Other)); + createPostgresMsg(data_, "C", "ROLL"); + decoder_->onData(data_, false); + data_.drain(data_.length()); + + // Keyword without a space should be ignored. + EXPECT_CALL(callbacks_, incTransactionsRollback()).Times(0); + EXPECT_CALL(callbacks_, incStatements(DecoderCallbacks::StatementType::Other)); + createPostgresMsg(data_, "C", "ROLLBACK123"); + decoder_->onData(data_, false); + data_.drain(data_.length()); +} + +// Test Backend messages and make sure that they +// trigger proper stats updates. +TEST_F(PostgresProxyDecoderTest, Backend) { + // C message + EXPECT_CALL(callbacks_, incStatements(DecoderCallbacks::StatementType::Other)); + createPostgresMsg(data_, "C", "BEGIN 123"); + decoder_->onData(data_, false); + data_.drain(data_.length()); + ASSERT_TRUE(decoder_->getSession().inTransaction()); + + EXPECT_CALL(callbacks_, incStatements(DecoderCallbacks::StatementType::Other)); + createPostgresMsg(data_, "C", "START TR"); + decoder_->onData(data_, false); + data_.drain(data_.length()); + + EXPECT_CALL(callbacks_, incStatements(DecoderCallbacks::StatementType::Noop)); + EXPECT_CALL(callbacks_, incTransactionsCommit()); + createPostgresMsg(data_, "C", "COMMIT"); + decoder_->onData(data_, false); + data_.drain(data_.length()); + + EXPECT_CALL(callbacks_, incStatements(DecoderCallbacks::StatementType::Select)); + EXPECT_CALL(callbacks_, incTransactionsCommit()); + createPostgresMsg(data_, "C", "SELECT"); + decoder_->onData(data_, false); + data_.drain(data_.length()); + + EXPECT_CALL(callbacks_, incStatements(DecoderCallbacks::StatementType::Noop)); + EXPECT_CALL(callbacks_, incTransactionsRollback()); + createPostgresMsg(data_, "C", "ROLLBACK"); + decoder_->onData(data_, false); + data_.drain(data_.length()); + + EXPECT_CALL(callbacks_, incStatements(DecoderCallbacks::StatementType::Insert)); + EXPECT_CALL(callbacks_, incTransactionsCommit()); + createPostgresMsg(data_, "C", "INSERT 1"); + decoder_->onData(data_, false); + data_.drain(data_.length()); + + EXPECT_CALL(callbacks_, incStatements(DecoderCallbacks::StatementType::Update)); + EXPECT_CALL(callbacks_, incTransactionsCommit()); + createPostgresMsg(data_, "C", "UPDATE 123"); + decoder_->onData(data_, false); + data_.drain(data_.length()); + + EXPECT_CALL(callbacks_, incStatements(DecoderCallbacks::StatementType::Delete)); + EXPECT_CALL(callbacks_, incTransactionsCommit()); + createPostgresMsg(data_, "C", "DELETE 88"); + decoder_->onData(data_, false); + data_.drain(data_.length()); +} + +// Test checks deep inspection of the R message. +// During login/authentication phase client and server exchange +// multiple R messages. Only payload with length is 8 and +// payload with uint32 number equal to 0 indicates +// successful authentication. +TEST_F(PostgresProxyBackendDecoderTest, AuthenticationMsg) { + // Create authentication message which does not + // mean that authentication was OK. The number of + // sessions must not be increased. + EXPECT_CALL(callbacks_, incSessionsUnencrypted()).Times(0); + createPostgresMsg(data_, "R", "blah blah"); + decoder_->onData(data_, false); + data_.drain(data_.length()); + + // Create the correct payload which means that + // authentication completed successfully. + EXPECT_CALL(callbacks_, incSessionsUnencrypted()); + data_.add("R"); + // Add length. + data_.writeBEInt(8); + // Add 4-byte code. + data_.writeBEInt(0); + decoder_->onData(data_, false); + data_.drain(data_.length()); +} + +// Test check parsing of E message. The message +// indicates error. +TEST_P(PostgresProxyErrorTest, ParseErrorMsgs) { + EXPECT_CALL(callbacks_, incErrors(std::get<1>(GetParam()))); + createPostgresMsg(data_, "E", std::get<0>(GetParam())); + decoder_->onData(data_, false); +} + +INSTANTIATE_TEST_SUITE_P( + PostgresProxyErrorTestSuite, PostgresProxyErrorTest, + ::testing::Values( + std::make_tuple("blah blah", DecoderCallbacks::ErrorType::Unknown), + std::make_tuple("SERRORC1234", DecoderCallbacks::ErrorType::Error), + std::make_tuple("SERRORVERRORC1234", DecoderCallbacks::ErrorType::Error), + std::make_tuple("SFATALVFATALC22012", DecoderCallbacks::ErrorType::Fatal), + std::make_tuple("SPANICVPANICC22012", DecoderCallbacks::ErrorType::Panic), + // This is the real German message in Postgres > 9.6. It contains keyword + // in English with V prefix. + std::make_tuple("SPANIKVPANICC42501Mkonnte Datei »pg_wal/000000010000000100000096« nicht " + "öffnen: Permission deniedFxlog.cL3229RXLogFileInit", + DecoderCallbacks::ErrorType::Panic), + // This is German message indicating error. The comment field contains word PANIC. + // Since we do not decode other languages, it should go into Other bucket. + // This situation can only happen in Postgres < 9.6. Starting with version 9.6 + // messages must have severity in English with prefix V. + std::make_tuple("SFEHLERCP0001MMy PANIC ugly messageFpl_exec.cL3216Rexec_stmt_raise", + DecoderCallbacks::ErrorType::Unknown))); + +// Test parsing N message. It indicate notice +// and carries additional information about the +// purpose of the message. +TEST_P(PostgresProxyNoticeTest, ParseNoticeMsgs) { + EXPECT_CALL(callbacks_, incNotices(std::get<1>(GetParam()))); + createPostgresMsg(data_, "N", std::get<0>(GetParam())); + decoder_->onData(data_, false); +} + +INSTANTIATE_TEST_SUITE_P( + PostgresProxyNoticeTestSuite, PostgresProxyNoticeTest, + ::testing::Values(std::make_tuple("blah blah", DecoderCallbacks::NoticeType::Unknown), + std::make_tuple("SblalalaC2345", DecoderCallbacks::NoticeType::Unknown), + std::make_tuple("SblahVWARNING23345", DecoderCallbacks::NoticeType::Warning), + std::make_tuple("SNOTICEERRORbbal4", DecoderCallbacks::NoticeType::Notice), + std::make_tuple("SINFOVblabla", DecoderCallbacks::NoticeType::Info), + std::make_tuple("SDEBUGDEBUG", DecoderCallbacks::NoticeType::Debug), + std::make_tuple("SLOGGGGINFO", DecoderCallbacks::NoticeType::Log))); + +// Test checks if the decoder can detect initial message which indicates +// that protocol uses encryption. +TEST_P(PostgresProxyFrontendEncrDecoderTest, EncyptedTraffic) { + // Set decoder to wait for initial message. + decoder_->setStartup(true); + + // Initial state is no-encryption. + ASSERT_FALSE(decoder_->encrypted()); + + // Create SSLRequest. + EXPECT_CALL(callbacks_, incSessionsEncrypted()); + // Add length. + data_.writeBEInt(8); + // 1234 in the most significant 16 bits, and some code in the least significant 16 bits. + // Add 4 bytes long code + data_.writeBEInt(GetParam()); + decoder_->onData(data_, false); + ASSERT_TRUE(decoder_->encrypted()); + // Decoder should drain data. + ASSERT_THAT(data_.length(), 0); + + // Now when decoder detected encrypted traffic is should not + // react to any messages (even not encrypted ones). + EXPECT_CALL(callbacks_, incMessagesFrontend()).Times(0); + + createPostgresMsg(data_, "P", "Some message just to fill the payload."); + decoder_->onData(data_, true); + // Decoder should drain data. + ASSERT_THAT(data_.length(), 0); +} + +// Run encryption tests. +// 80877103 is SSL code +// 80877104 is GSS code +INSTANTIATE_TEST_SUITE_P(FrontendEncryptedMessagesTests, PostgresProxyFrontendEncrDecoderTest, + ::testing::Values(80877103, 80877104)); + +} // namespace PostgresProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/network/postgres_proxy/postgres_filter_test.cc b/test/extensions/filters/network/postgres_proxy/postgres_filter_test.cc new file mode 100644 index 0000000000000..c44d32bf94c32 --- /dev/null +++ b/test/extensions/filters/network/postgres_proxy/postgres_filter_test.cc @@ -0,0 +1,244 @@ +#include +#include + +#include + +#include "extensions/filters/network/postgres_proxy/postgres_filter.h" + +#include "test/extensions/filters/network/postgres_proxy/postgres_test_utils.h" +#include "test/mocks/network/mocks.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace PostgresProxy { + +using ::testing::WithArgs; + +// Decoder mock. +class MockDecoderTest : public Decoder { +public: + MOCK_METHOD(bool, onData, (Buffer::Instance&, bool), (override)); + MOCK_METHOD(PostgresSession&, getSession, (), (override)); +}; + +// Fixture class. +class PostgresFilterTest + : public ::testing::TestWithParam< + std::tuple, + std::function>> { +public: + PostgresFilterTest() { + config_ = std::make_shared(stat_prefix_, scope_); + filter_ = std::make_unique(config_); + + filter_->initializeReadFilterCallbacks(filter_callbacks_); + } + + Stats::IsolatedStoreImpl scope_; + std::string stat_prefix_{"test."}; + std::unique_ptr filter_; + PostgresFilterConfigSharedPtr config_; + NiceMock filter_callbacks_; + + // These variables are used internally in tests. + Buffer::OwnedImpl data_; + char buf_[256]; +}; + +TEST_F(PostgresFilterTest, NewConnection) { + EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onNewConnection()); +} + +// Test reading buffer until the buffer is exhausted +// or decoder indicates that there is not enough data in a buffer +// to process a message. +TEST_P(PostgresFilterTest, ReadData) { + // Create mock decoder, obtain raw pointer to it (required for EXPECT_CALL) + // and pass the decoder to filter. + std::unique_ptr decoder = std::make_unique(); + MockDecoderTest* decoderPtr = decoder.get(); + filter_->setDecoder(std::move(decoder)); + + data_.add(buf_, 256); + + // Simulate reading entire buffer. + EXPECT_CALL(*decoderPtr, onData) + .WillOnce(WithArgs<0, 1>(Invoke([](Buffer::Instance& data, bool) -> bool { + data.drain(data.length()); + return true; + }))); + std::get<0>(GetParam())(filter_.get(), data_, false); + ASSERT_THAT(std::get<1>(GetParam())(filter_.get()), 0); + + // Simulate reading entire data in two steps. + EXPECT_CALL(*decoderPtr, onData) + .WillOnce(WithArgs<0, 1>(Invoke([](Buffer::Instance& data, bool) -> bool { + data.drain(100); + return true; + }))) + .WillOnce(WithArgs<0, 1>(Invoke([](Buffer::Instance& data, bool) -> bool { + data.drain(156); + return true; + }))); + std::get<0>(GetParam())(filter_.get(), data_, false); + ASSERT_THAT(std::get<1>(GetParam())(filter_.get()), 0); + + // Simulate reading 3 packets. The first two were processed correctly and + // for the third one there was not enough data. There should be 56 bytes + // of unprocessed data. + EXPECT_CALL(*decoderPtr, onData) + .WillOnce(WithArgs<0, 1>(Invoke([](Buffer::Instance& data, bool) -> bool { + data.drain(100); + return true; + }))) + .WillOnce(WithArgs<0, 1>(Invoke([](Buffer::Instance& data, bool) -> bool { + data.drain(100); + return true; + }))) + .WillOnce(WithArgs<0, 1>(Invoke([](Buffer::Instance& data, bool) -> bool { + data.drain(0); + return false; + }))); + std::get<0>(GetParam())(filter_.get(), data_, false); + ASSERT_THAT(std::get<1>(GetParam())(filter_.get()), 56); +} + +// Parameterized test: +// First value in the tuple is method taking buffer with received data. +// Second value in the tuple is method returning how many bytes are left after processing. +INSTANTIATE_TEST_SUITE_P(ProcessDataTests, PostgresFilterTest, + ::testing::Values(std::make_tuple(&PostgresFilter::onData, + &PostgresFilter::getFrontendBufLength), + std::make_tuple(&PostgresFilter::onWrite, + &PostgresFilter::getBackendBufLength))); + +// Test generates various postgres payloads and feeds them into filter. +// It expects that certain statistics are updated. +TEST_F(PostgresFilterTest, BackendMsgsStats) { + // pretend that startup message has been received. + static_cast(filter_->getDecoder())->setStartup(false); + + // unknown message + createPostgresMsg(data_, "=", "blah blah blah"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().messages_unknown_.value(), 1); + + filter_->getDecoder()->getSession().setInTransaction(true); + createPostgresMsg(data_, "C", "COMMIT"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().statements_.value(), 1); + ASSERT_THAT(filter_->getStats().transactions_.value(), 0); + ASSERT_THAT(filter_->getStats().transactions_commit_.value(), 1); + + createPostgresMsg(data_, "C", "ROLLBACK 234"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().transactions_.value(), 0); + ASSERT_THAT(filter_->getStats().statements_.value(), 2); + ASSERT_THAT(filter_->getStats().statements_other_.value(), 0); + ASSERT_THAT(filter_->getStats().transactions_rollback_.value(), 1); + + createPostgresMsg(data_, "C", "SELECT blah"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().statements_.value(), 3); + ASSERT_THAT(filter_->getStats().statements_select_.value(), 1); + + createPostgresMsg(data_, "C", "INSERT 123"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().statements_.value(), 4); + ASSERT_THAT(filter_->getStats().statements_insert_.value(), 1); + + createPostgresMsg(data_, "C", "DELETE 123"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().statements_.value(), 5); + ASSERT_THAT(filter_->getStats().statements_delete_.value(), 1); + + createPostgresMsg(data_, "C", "UPDATE 123"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().statements_.value(), 6); + ASSERT_THAT(filter_->getStats().statements_update_.value(), 1); + + createPostgresMsg(data_, "C", "BEGIN 123"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().statements_.value(), 7); + ASSERT_THAT(filter_->getStats().statements_other_.value(), 1); +} + +// Test sends series of E type error messages to the filter and +// verifies that statistic counters are increased. +TEST_F(PostgresFilterTest, ErrorMsgsStats) { + // Pretend that startup message has been received. + static_cast(filter_->getDecoder())->setStartup(false); + + createPostgresMsg(data_, "E", "SERRORVERRORC22012"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().errors_.value(), 1); + ASSERT_THAT(filter_->getStats().errors_error_.value(), 1); + + createPostgresMsg(data_, "E", "SFATALVFATALC22012"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().errors_.value(), 2); + ASSERT_THAT(filter_->getStats().errors_fatal_.value(), 1); + + createPostgresMsg(data_, "E", "SPANICVPANICC22012"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().errors_.value(), 3); + ASSERT_THAT(filter_->getStats().errors_panic_.value(), 1); + + createPostgresMsg(data_, "E", "SBLAHBLAHC22012"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().errors_.value(), 4); + ASSERT_THAT(filter_->getStats().errors_unknown_.value(), 1); +} + +// Test sends series of N type messages to the filter and verifies +// that corresponding stats counters are updated. +TEST_F(PostgresFilterTest, NoticeMsgsStats) { + // Pretend that startup message has been received. + static_cast(filter_->getDecoder())->setStartup(false); + + createPostgresMsg(data_, "N", "SblalalaC2345"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().notices_.value(), 1); + ASSERT_THAT(filter_->getStats().notices_unknown_.value(), 1); + + createPostgresMsg(data_, "N", "SblahVWARNING23345"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().notices_.value(), 2); + ASSERT_THAT(filter_->getStats().notices_warning_.value(), 1); + + createPostgresMsg(data_, "N", "SNOTICEERRORbbal4"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().notices_.value(), 3); + ASSERT_THAT(filter_->getStats().notices_notice_.value(), 1); + + createPostgresMsg(data_, "N", "SINFOVblabla"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().notices_.value(), 4); + ASSERT_THAT(filter_->getStats().notices_info_.value(), 1); + + createPostgresMsg(data_, "N", "SDEBUGDEBUG"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().notices_.value(), 5); + ASSERT_THAT(filter_->getStats().notices_debug_.value(), 1); + + createPostgresMsg(data_, "N", "SLOGGGGINFO"); + filter_->onWrite(data_, false); + ASSERT_THAT(filter_->getStats().notices_.value(), 6); + ASSERT_THAT(filter_->getStats().notices_log_.value(), 1); +} + +// Encrypted sessions are detected based on the first received message. +TEST_F(PostgresFilterTest, EncryptedSessionStats) { + data_.writeBEInt(8); + // 1234 in the most significant 16 bits and some code in the least significant 16 bits. + data_.writeBEInt(80877103); // SSL code. + filter_->onData(data_, false); + ASSERT_THAT(filter_->getStats().sessions_.value(), 1); + ASSERT_THAT(filter_->getStats().sessions_encrypted_.value(), 1); +} + +} // namespace PostgresProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/network/postgres_proxy/postgres_integration_test.cc b/test/extensions/filters/network/postgres_proxy/postgres_integration_test.cc new file mode 100644 index 0000000000000..02229fc1ea1e8 --- /dev/null +++ b/test/extensions/filters/network/postgres_proxy/postgres_integration_test.cc @@ -0,0 +1,88 @@ +#include + +#include "test/integration/fake_upstream.h" +#include "test/integration/integration.h" +#include "test/integration/utility.h" +#include "test/mocks/network/mocks.h" +#include "test/test_common/network_utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace PostgresProxy { + +class PostgresIntegrationTest : public testing::TestWithParam, + public BaseIntegrationTest { + + std::string postgresConfig() { + return fmt::format( + TestEnvironment::readFileToStringForTest(TestEnvironment::runfilesPath( + "test/extensions/filters/network/postgres_proxy/postgres_test_config.yaml")), + Network::Test::getLoopbackAddressString(GetParam()), + Network::Test::getLoopbackAddressString(GetParam()), + Network::Test::getAnyAddressString(GetParam())); + } + +public: + PostgresIntegrationTest() : BaseIntegrationTest(GetParam(), postgresConfig()){}; + + void SetUp() override { BaseIntegrationTest::initialize(); } + + void TearDown() override { + test_server_.reset(); + fake_upstreams_.clear(); + } +}; +INSTANTIATE_TEST_SUITE_P(IpVersions, PostgresIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest())); + +// Test that the filter is properly chained and reacts to successful login +// message. +TEST_P(PostgresIntegrationTest, Login) { + std::string str; + std::string recv; + + IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("listener_0")); + FakeRawConnectionPtr fake_upstream_connection; + ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + + // Send the startup message. + Buffer::OwnedImpl data; + std::string rcvd; + char buf[32]; + + memset(buf, 0, sizeof(buf)); + // Add length. + data.writeBEInt(12); + // Add 8 bytes of some data. + data.add(buf, 8); + tcp_client->write(data.toString()); + ASSERT_TRUE(fake_upstream_connection->waitForData(data.toString().length(), &rcvd)); + data.drain(data.length()); + + // TCP session is up. Just send the AuthenticationOK downstream. + data.add("R"); + // Add length. + data.writeBEInt(8); + uint32_t code = 0; + data.add(&code, sizeof(code)); + + rcvd.clear(); + ASSERT_TRUE(fake_upstream_connection->write(data.toString())); + rcvd.append(data.toString()); + tcp_client->waitForData(rcvd, true); + + tcp_client->close(); + ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); + + // Make sure that the successful login bumped up the number of sessions. + test_server_->waitForCounterEq("postgres.postgres_stats.sessions", 1); +} + +} // namespace PostgresProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/network/postgres_proxy/postgres_test_config.yaml b/test/extensions/filters/network/postgres_proxy/postgres_test_config.yaml new file mode 100644 index 0000000000000..16202b9027a29 --- /dev/null +++ b/test/extensions/filters/network/postgres_proxy/postgres_test_config.yaml @@ -0,0 +1,36 @@ +admin: + access_log_path: /dev/null + address: + socket_address: + address: "{}" + port_value: 0 +static_resources: + clusters: + name: cluster_0 + type: STATIC + load_assignment: + cluster_name: cluster_0 + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: "{}" + port_value: 0 + listeners: + name: listener_0 + address: + socket_address: + address: "{}" + port_value: 0 + filter_chains: + - filters: + - name: postgres + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.postgres_proxy.v3alpha.PostgresProxy + stat_prefix: postgres_stats + - name: tcp + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy + stat_prefix: tcp_stats + cluster: cluster_0 diff --git a/test/extensions/filters/network/postgres_proxy/postgres_test_utils.cc b/test/extensions/filters/network/postgres_proxy/postgres_test_utils.cc new file mode 100644 index 0000000000000..51819b1ad72e2 --- /dev/null +++ b/test/extensions/filters/network/postgres_proxy/postgres_test_utils.cc @@ -0,0 +1,20 @@ +#include "test/extensions/filters/network/postgres_proxy/postgres_test_utils.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace PostgresProxy { + +// Helper function to create postgres messages. +void createPostgresMsg(Buffer::Instance& data, std::string type, std::string payload) { + data.drain(data.length()); + ASSERT(1 == type.length()); + data.add(type); + data.writeBEInt(4 + payload.length()); + data.add(payload); +} + +} // namespace PostgresProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/filters/network/postgres_proxy/postgres_test_utils.h b/test/extensions/filters/network/postgres_proxy/postgres_test_utils.h new file mode 100644 index 0000000000000..2011b06bdc4f8 --- /dev/null +++ b/test/extensions/filters/network/postgres_proxy/postgres_test_utils.h @@ -0,0 +1,13 @@ +#include "common/buffer/buffer_impl.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace PostgresProxy { + +void createPostgresMsg(Buffer::Instance& data, std::string type, std::string payload = ""); + +} // namespace PostgresProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index 21de3bb52722a..2bc47abeb63f8 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -118,6 +118,7 @@ GETting GLB GOAWAY GRPC +GSS GTEST GURL Grabbit @@ -222,6 +223,8 @@ PostCBs PREBIND PRNG PROT +Postgre +Postgres Prereq QUIC QoS @@ -828,6 +831,8 @@ pos posix postfix postfixes +postgres +postgresql pre preallocate preallocating