Skip to content

Commit

Permalink
kafka/generator.py: Generate fmt::formatter specializations
Browse files Browse the repository at this point in the history
For the following structs:
- `metadata_response_data`
- `metadata_response_topic`
- `metadata_response_partition`
- `metadata_response_broker`

This provides us with greater control of response data formatting for
structs which might be very large.

Signed-off-by: Oren Leiman <[email protected]>
  • Loading branch information
oleiman committed Dec 12, 2023
1 parent c3876e6 commit f427d09
Showing 1 changed file with 96 additions and 1 deletion.
97 changes: 96 additions & 1 deletion src/v/kafka/protocol/schemata/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,13 @@ def make_context_field(path):
"DeleteRecordsPartitionResult",
]

DROP_STREAM_OPERATOR = [
"metadata_response_data",
"metadata_response_topic",
"metadata_response_partition",
"metadata_response_broker",
]

# a list of struct types which are ineligible to have default-generated
# `operator==()`, because one or more of its member variables are not
# comparable
Expand Down Expand Up @@ -827,6 +834,11 @@ def type_headers(t):
def is_default_comparable(self):
return all(field.is_default_comparable for field in self.fields)

@property
def is_streamable(self):
return self._name not in DROP_STREAM_OPERATOR and all(
field.is_streamable for field in self.fields)


class ArrayType(FieldType):
def __init__(self, value_type):
Expand Down Expand Up @@ -1066,6 +1078,11 @@ def is_default_comparable(self):
type_name, _ = self._redpanda_type()
return type_name not in WITHOUT_DEFAULT_EQUALITY_OPERATOR

@property
def is_streamable(self):
type_name, _ = self._redpanda_type()
return type_name not in DROP_STREAM_OPERATOR

@property
def is_error_code(self):
type_name, _ = self._redpanda_type()
Expand All @@ -1080,6 +1097,9 @@ def is_error_code(self):
#include "kafka/protocol/errors.h"
#include "seastarx.h"
#include "utils/fragmented_vector.h"
{%- if not struct.is_streamable %}
#include <fmt/format.h>
{%- endif %}
{%- for header in struct.headers("header") %}
{%- if header.startswith("<") %}
Expand Down Expand Up @@ -1128,6 +1148,34 @@ def is_error_code(self):
{%- endif %}
{% endmacro %}
{% macro render_formatter(struct) %}
{%- if not struct.is_streamable %}
template<>
struct fmt::formatter<kafka::{{ struct.name }}> : fmt::formatter<std::string_view> {
template <typename FormatContext>
auto format([[maybe_unused]] const kafka::{{ struct.name }} &v, FormatContext& ctx) const
-> decltype(ctx.out());
};
{%- for field in struct.fields %}
{%- if field.is_array and not field.is_streamable %}
{%- set vtype = field.value_type %}
{%- set container = (field.type_name_parts() | list)[1] %}
template<>
struct fmt::formatter<{{ container }}<kafka::{{ vtype }}>> {
template<typename ParseContext>
constexpr auto parse(ParseContext& ctx) -> decltype(ctx.begin()) {
return ctx.begin();
}
template<typename FormatContext>
auto format(
[[maybe_unused]] const {{ container }}<kafka::{{ vtype}}>& v,
FormatContext& ctx) const -> decltype(ctx.out());
};
{%- endif %}
{%- endfor %}
{%- endif %}
{% endmacro %}
namespace kafka {
namespace protocol {
Expand All @@ -1138,7 +1186,10 @@ class response;
{% for struct in struct.structs() %}
{{ render_struct(struct) }}
{%- if struct.is_streamable %}
friend std::ostream& operator<<(std::ostream&, const {{ struct.name }}&);
{%- endif %}
};
{% endfor %}
Expand All @@ -1150,8 +1201,9 @@ class response;
{%- else %}
void decode(iobuf, api_version);
{%- endif %}
{%- if struct.is_streamable %}
friend std::ostream& operator<<(std::ostream&, const {{ struct.name }}&);
{%- endif %}
{%- if first_flex > 0 %}
private:
void encode_flex(protocol::encoder&, api_version);
Expand Down Expand Up @@ -1181,6 +1233,11 @@ class response;
};
{%- endif %}
}
{% for struct in struct.structs() %}
{{ render_formatter(struct) }}
{% endfor %}
{{ render_formatter(struct) }}
"""

COMBINED_SOURCE_TEMPLATE = """
Expand Down Expand Up @@ -1494,6 +1551,37 @@ class response;
{%- endif %}
{% endmacro %}
{% macro render_formatter(struct) %}
{%- if not struct.is_streamable %}
template<>
fmt::format_context::iterator
fmt::formatter<kafka::{{ struct.name }}>::format(
[[maybe_unused]] const kafka::{{ struct.name }} &v, fmt::format_context& ctx) const
{
return fmt::format_to(ctx.out(),
"{{'{{' + struct.format + '}}'}}",
{%- for field in struct.fields %}
{%- if field.is_sensitive %}"****"{% else %}v.{{ field.name }}{% endif %}{% if not loop.last %},{% endif %}
{%- endfor %}
);
}
{%- for field in struct.fields %}
{%- if field.is_array and not field.is_streamable %}
{%- set vtype = field.value_type %}
{%- set container = (field.type_name_parts() | list)[1] %}
template<>
fmt::format_context::iterator
fmt::formatter<{{ container }}<kafka::{{ vtype }}>>::format(
[[maybe_unused]] const {{ container }}<kafka::{{ vtype }}>& v,
fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{}", fmt::join(v, ","));
}
{%- endif %}
{%- endfor %}
{%- endif %}
{% endmacro %}
namespace kafka {
{%- if struct.fields %}
Expand Down Expand Up @@ -1626,6 +1714,7 @@ class response;
{% set structs = struct.structs() + [struct] %}
{% for struct in structs %}
{%- if struct.is_streamable %}
{%- if struct.fields %}
std::ostream& operator<<(std::ostream& o, [[maybe_unused]] const {{ struct.name }}& v) {
fmt::print(o,
Expand All @@ -1642,9 +1731,15 @@ class response;
return o << "{}";
}
{%- endif %}
{%- endif %}
{{ render_errored_source(struct) }}
{% endfor %}
}
{% for struct in struct.structs() %}
{{ render_formatter(struct) }}
{% endfor %}
{{ render_formatter(struct) }}
"""

# This is the schema of the json files from the kafka tree. This isn't strictly
Expand Down

0 comments on commit f427d09

Please sign in to comment.