From f427d09ef4204bf1262f7fd90d9202c305d61c90 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Thu, 7 Dec 2023 11:15:31 -0800 Subject: [PATCH] kafka/generator.py: Generate fmt::formatter specializations For the following structs: - `metadata_response_data` - `metadata_response_topic` - `metadata_response_partition` - `metadata_response_broker` This provides us with greater control of response data formatting for structs which might be very large. Signed-off-by: Oren Leiman --- src/v/kafka/protocol/schemata/generator.py | 97 +++++++++++++++++++++- 1 file changed, 96 insertions(+), 1 deletion(-) diff --git a/src/v/kafka/protocol/schemata/generator.py b/src/v/kafka/protocol/schemata/generator.py index dc05f32e8724d..02fc803209e8a 100755 --- a/src/v/kafka/protocol/schemata/generator.py +++ b/src/v/kafka/protocol/schemata/generator.py @@ -583,6 +583,13 @@ def make_context_field(path): "DeleteRecordsPartitionResult", ] +DROP_STREAM_OPERATOR = [ + "metadata_response_data", + "metadata_response_topic", + "metadata_response_partition", + "metadata_response_broker", +] + # a list of struct types which are ineligible to have default-generated # `operator==()`, because one or more of its member variables are not # comparable @@ -827,6 +834,11 @@ def type_headers(t): def is_default_comparable(self): return all(field.is_default_comparable for field in self.fields) + @property + def is_streamable(self): + return self._name not in DROP_STREAM_OPERATOR and all( + field.is_streamable for field in self.fields) + class ArrayType(FieldType): def __init__(self, value_type): @@ -1066,6 +1078,11 @@ def is_default_comparable(self): type_name, _ = self._redpanda_type() return type_name not in WITHOUT_DEFAULT_EQUALITY_OPERATOR + @property + def is_streamable(self): + type_name, _ = self._redpanda_type() + return type_name not in DROP_STREAM_OPERATOR + @property def is_error_code(self): type_name, _ = self._redpanda_type() @@ -1080,6 +1097,9 @@ def is_error_code(self): #include "kafka/protocol/errors.h" #include "seastarx.h" #include "utils/fragmented_vector.h" +{%- if not struct.is_streamable %} +#include +{%- endif %} {%- for header in struct.headers("header") %} {%- if header.startswith("<") %} @@ -1128,6 +1148,34 @@ def is_error_code(self): {%- endif %} {% endmacro %} +{% macro render_formatter(struct) %} +{%- if not struct.is_streamable %} +template<> +struct fmt::formatter : fmt::formatter { + template + auto format([[maybe_unused]] const kafka::{{ struct.name }} &v, FormatContext& ctx) const + -> decltype(ctx.out()); +}; +{%- for field in struct.fields %} +{%- if field.is_array and not field.is_streamable %} +{%- set vtype = field.value_type %} +{%- set container = (field.type_name_parts() | list)[1] %} +template<> +struct fmt::formatter<{{ container }}> { + template + constexpr auto parse(ParseContext& ctx) -> decltype(ctx.begin()) { + return ctx.begin(); + } + template + auto format( + [[maybe_unused]] const {{ container }}& v, + FormatContext& ctx) const -> decltype(ctx.out()); +}; +{%- endif %} +{%- endfor %} +{%- endif %} +{% endmacro %} + namespace kafka { namespace protocol { @@ -1138,7 +1186,10 @@ class response; {% for struct in struct.structs() %} {{ render_struct(struct) }} +{%- if struct.is_streamable %} friend std::ostream& operator<<(std::ostream&, const {{ struct.name }}&); +{%- endif %} + }; {% endfor %} @@ -1150,8 +1201,9 @@ class response; {%- else %} void decode(iobuf, api_version); {%- endif %} - +{%- if struct.is_streamable %} friend std::ostream& operator<<(std::ostream&, const {{ struct.name }}&); +{%- endif %} {%- if first_flex > 0 %} private: void encode_flex(protocol::encoder&, api_version); @@ -1181,6 +1233,11 @@ class response; }; {%- endif %} } + +{% for struct in struct.structs() %} +{{ render_formatter(struct) }} +{% endfor %} +{{ render_formatter(struct) }} """ COMBINED_SOURCE_TEMPLATE = """ @@ -1494,6 +1551,37 @@ class response; {%- endif %} {% endmacro %} +{% macro render_formatter(struct) %} +{%- if not struct.is_streamable %} +template<> +fmt::format_context::iterator +fmt::formatter::format( + [[maybe_unused]] const kafka::{{ struct.name }} &v, fmt::format_context& ctx) const +{ + return fmt::format_to(ctx.out(), + "{{'{{' + struct.format + '}}'}}", + {%- for field in struct.fields %} + {%- if field.is_sensitive %}"****"{% else %}v.{{ field.name }}{% endif %}{% if not loop.last %},{% endif %} + + {%- endfor %} + ); +} +{%- for field in struct.fields %} +{%- if field.is_array and not field.is_streamable %} +{%- set vtype = field.value_type %} +{%- set container = (field.type_name_parts() | list)[1] %} +template<> +fmt::format_context::iterator +fmt::formatter<{{ container }}>::format( + [[maybe_unused]] const {{ container }}& v, + fmt::format_context& ctx) const { + return fmt::format_to(ctx.out(), "{}", fmt::join(v, ",")); +} +{%- endif %} +{%- endfor %} +{%- endif %} +{% endmacro %} + namespace kafka { {%- if struct.fields %} @@ -1626,6 +1714,7 @@ class response; {% set structs = struct.structs() + [struct] %} {% for struct in structs %} +{%- if struct.is_streamable %} {%- if struct.fields %} std::ostream& operator<<(std::ostream& o, [[maybe_unused]] const {{ struct.name }}& v) { fmt::print(o, @@ -1642,9 +1731,15 @@ class response; return o << "{}"; } {%- endif %} +{%- endif %} {{ render_errored_source(struct) }} {% endfor %} } + +{% for struct in struct.structs() %} +{{ render_formatter(struct) }} +{% endfor %} +{{ render_formatter(struct) }} """ # This is the schema of the json files from the kafka tree. This isn't strictly