Skip to content

Commit

Permalink
more refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
zliang-min committed Jun 26, 2024
1 parent ecab238 commit b340636
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 20 deletions.
7 changes: 3 additions & 4 deletions src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,17 +579,16 @@ bool FormatFactory::checkIfFormatHasExternalSchemaWriter(const String & name)
return static_cast<bool>(target.external_schema_writer_creator);
}

ExternalSchemaWriterPtr FormatFactory::getExternalSchemaWriter(
const String & name,
ExternalSchemaWriterPtr FormatFactory::getExternalSchemaWriter(const String & name,
ContextPtr & context,
const std::optional<FormatSettings> & _format_settings) const
std::optional<FormatSettings> _format_settings) const
{
const auto & external_schema_writer_creator = dict.at(name).external_schema_writer_creator;
if (!external_schema_writer_creator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} doesn't support schema creation.", name);

auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
return external_schema_writer_creator(format_settings);
return external_schema_writer_creator(std::move(format_settings));
}
/// proton: ends

Expand Down
4 changes: 2 additions & 2 deletions src/Formats/FormatFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class FormatFactory final : private boost::noncopyable
using ExternalSchemaReaderCreator = std::function<ExternalSchemaReaderPtr(const FormatSettings & settings)>;

/// proton: starts
using ExternalSchemaWriterCreator = std::function<ExternalSchemaWriterPtr(const FormatSettings & settings)>;
using ExternalSchemaWriterCreator = std::function<ExternalSchemaWriterPtr(FormatSettings settings)>;
/// proton: ends

/// Some formats can extract different schemas from the same source depending on
Expand Down Expand Up @@ -221,7 +221,7 @@ class FormatFactory final : private boost::noncopyable
ExternalSchemaWriterPtr getExternalSchemaWriter(
const String & name,
ContextPtr & context,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
std::optional<FormatSettings> format_settings = std::nullopt) const;
/// proton: ends

/// Register schema readers for format its name.
Expand Down
10 changes: 8 additions & 2 deletions src/Formats/FormatSchemaFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void FormatSchemaFactory::registerSchema(const String & schema_name, const Strin
format_settings.schema.is_server = true;

std::lock_guard lock(mutex);
auto writer = FormatFactory::instance().getExternalSchemaWriter(format, context, format_settings);
auto writer = FormatFactory::instance().getExternalSchemaWriter(format, context, std::move(format_settings));
assert(writer); /* confirmed with checkSchemaType */

try
Expand Down Expand Up @@ -86,7 +86,13 @@ void FormatSchemaFactory::unregisterSchema(const String & schema_name, const Str

std::filesystem::remove(schema_path);

auto writer = FormatFactory::instance().getExternalSchemaWriter(format, context, format_settings);
String format_name = format;
if (format_name.empty())
format_name = FormatFactory::instance().getFormatFromSchemaFileName(schema_path);

if (format_name.empty())
return;
auto writer = FormatFactory::instance().getExternalSchemaWriter(format_name, context, std::move(format_settings));
assert(writer); /* confirmed with checkSchemaType */
writer->onDeleted();
}
Expand Down
16 changes: 12 additions & 4 deletions src/Processors/Formats/ISchemaWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ namespace DB
class IExternalSchemaWriter
{
public:
IExternalSchemaWriter(const String & format, const FormatSettings & settings)
: schema_info(settings.schema.format_schema, format, false, settings.schema.is_server, settings.schema.format_schema_path)
{}
explicit IExternalSchemaWriter(FormatSettings settings_)
: settings(std::move(settings_))
{}

virtual ~IExternalSchemaWriter() = default;

Expand All @@ -26,6 +26,7 @@ class IExternalSchemaWriter
/// It throws exceptions if it fails to write.
bool write(std::string_view schema_body, bool replace_if_exist)
{
auto schema_info = getSchemaInfo();
auto already_exists = std::filesystem::exists(schema_info.absoluteSchemaPath());
if (already_exists && !replace_if_exist)
return false;
Expand All @@ -42,10 +43,17 @@ class IExternalSchemaWriter
virtual void onDeleted() {}

protected:
virtual String getFormatName() const = 0;

/// A callback will be called when an existing schema gets replaced.
virtual void onReplaced() {}

FormatSchemaInfo schema_info;
FormatSchemaInfo getSchemaInfo() const
{
return {settings.schema.format_schema, getFormatName(), false, settings.schema.is_server, settings.schema.format_schema_path};
}

FormatSettings settings;
};

}
13 changes: 9 additions & 4 deletions src/Processors/Formats/Impl/AvroRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1067,11 +1067,16 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node)
}

/// proton: starts
AvroSchemaWriter::AvroSchemaWriter(const FormatSettings & settings)
: IExternalSchemaWriter("Avro", settings)
AvroSchemaWriter::AvroSchemaWriter(const FormatSettings & settings_)
: IExternalSchemaWriter(settings_)
{
}

String AvroSchemaWriter::getFormatName() const
{
return "Avro";
}

void AvroSchemaWriter::validate(std::string_view schema_body)
{
try
Expand Down Expand Up @@ -1130,8 +1135,8 @@ void registerAvroSchemaReader(FormatFactory & factory)
/// proton: starts
factory.registerSchemaFileExtension("avsc", "Avro");

factory.registerExternalSchemaWriter("Avro", [](std::string_view schema_body, const FormatSettings & settings) {
return std::make_shared<AvroSchemaWriter>(schema_body, settings);
factory.registerExternalSchemaWriter("Avro", [](const FormatSettings & settings) {
return std::make_shared<AvroSchemaWriter>(settings);
});
/// proton: ends
}
Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Formats/Impl/AvroRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ class AvroSchemaWriter : public IExternalSchemaWriter
explicit AvroSchemaWriter(const FormatSettings & settings_);

void validate(std::string_view schema_body) override;

protected:
String getFormatName() const override;
};
/// proton: ends

Expand Down
13 changes: 9 additions & 4 deletions src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,16 @@ bool ProtobufConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadE
return true;
}

ProtobufSchemaWriter::ProtobufSchemaWriter(const FormatSettings & settings)
: IExternalSchemaWriter("Protobuf", settings)
ProtobufSchemaWriter::ProtobufSchemaWriter(const FormatSettings & settings_)
: IExternalSchemaWriter(settings_)
{
}

String ProtobufSchemaWriter::getFormatName() const
{
return "Protobuf";
}

void ProtobufSchemaWriter::validate(std::string_view schema_body)
{
ProtobufSchemas::instance().validateSchema(schema_body);
Expand All @@ -349,8 +354,8 @@ void registerProtobufSchemaReader(FormatFactory & factory)
/// proton: starts
factory.registerSchemaFileExtension("proto", "Protobuf");

factory.registerExternalSchemaWriter("Protobuf", [](std::string_view schema_body, const FormatSettings & settings) {
return std::make_shared<ProtobufSchemaWriter>(schema_body, settings);
factory.registerExternalSchemaWriter("Protobuf", [](const FormatSettings & settings) {
return std::make_shared<ProtobufSchemaWriter>(settings);
});
/// proton: ends

Expand Down
1 change: 1 addition & 0 deletions src/Processors/Formats/Impl/ProtobufRowInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class ProtobufSchemaWriter : public IExternalSchemaWriter
void validate(std::string_view schema_body) override;

protected:
String getFormatName() const override;
void onReplaced() override;
void onDeleted() override;
};
Expand Down

0 comments on commit b340636

Please sign in to comment.