Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: format schema cache is not cleared after the schema is dropped #781

Merged
merged 9 commits into from
Jul 5, 2024
8 changes: 3 additions & 5 deletions src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <Formats/FormatFactory.h>

#include <algorithm>
#include <Core/Settings.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -582,16 +581,15 @@ bool FormatFactory::checkIfFormatHasExternalSchemaWriter(const String & name)

ExternalSchemaWriterPtr FormatFactory::getExternalSchemaWriter(
const String & name,
std::string_view body,
ContextPtr & context,
const std::optional<FormatSettings> & _format_settings) const
const ContextPtr & context,
std::optional<FormatSettings> _format_settings) const
zliang-min marked this conversation as resolved.
Show resolved Hide resolved
{
const auto & external_schema_writer_creator = dict.at(name).external_schema_writer_creator;
if (!external_schema_writer_creator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} doesn't support schema creation.", name);

auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
return external_schema_writer_creator(body, format_settings);
return external_schema_writer_creator(std::move(format_settings));
}
/// proton: ends

Expand Down
7 changes: 3 additions & 4 deletions src/Formats/FormatFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class FormatFactory final : private boost::noncopyable
using ExternalSchemaReaderCreator = std::function<ExternalSchemaReaderPtr(const FormatSettings & settings)>;

/// proton: starts
using ExternalSchemaWriterCreator = std::function<ExternalSchemaWriterPtr(std::string_view schema_body, const FormatSettings & settings)>;
using ExternalSchemaWriterCreator = std::function<ExternalSchemaWriterPtr(FormatSettings settings)>;
/// proton: ends

/// Some formats can extract different schemas from the same source depending on
Expand Down Expand Up @@ -220,9 +220,8 @@ class FormatFactory final : private boost::noncopyable

ExternalSchemaWriterPtr getExternalSchemaWriter(
const String & name,
std::string_view body,
ContextPtr & context,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
const ContextPtr & context,
std::optional<FormatSettings> format_settings = std::nullopt) const;
/// proton: ends

/// Register schema readers for format its name.
Expand Down
20 changes: 15 additions & 5 deletions src/Formats/FormatSchemaFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <Processors/Formats/ISchemaWriter.h>

#include <format>
#include <fstream>

namespace DB
{
Expand All @@ -28,7 +29,7 @@ String basename(const std::filesystem::path & path)

}

void FormatSchemaFactory::registerSchema(const String & schema_name, const String & format, std::string_view schema_body, ExistsOP exists_op, ContextPtr & context)
void FormatSchemaFactory::registerSchema(const String & schema_name, const String & format, std::string_view schema_body, ExistsOP exists_op, const ContextPtr & context)
{
assert(!schema_name.empty());
assert(!format.empty());
Expand All @@ -41,21 +42,20 @@ void FormatSchemaFactory::registerSchema(const String & schema_name, const Strin
format_settings.schema.is_server = true;

std::lock_guard lock(mutex);
auto writer = FormatFactory::instance().getExternalSchemaWriter(format, schema_body, context, format_settings);
auto writer = FormatFactory::instance().getExternalSchemaWriter(format, context, std::move(format_settings));
assert(writer); /* confirmed with checkSchemaType */

try
{
writer->validate();
writer->validate(schema_body);
}
catch (DB::Exception & e)
{
e.addMessage(std::format("{} schema {} was invalid", format, schema_name));
e.rethrow();
}


auto result = writer->write(exists_op == ExistsOP::Replace);
auto result = writer->write(schema_body, exists_op == ExistsOP::Replace);
if (!result && exists_op == ExistsOP::Throw)
throw Exception(ErrorCodes::FORMAT_SCHEMA_ALREADY_EXISTS, "Format schema {} of type {} already exists", schema_name, format);
}
Expand Down Expand Up @@ -85,6 +85,16 @@ void FormatSchemaFactory::unregisterSchema(const String & schema_name, const Str
}

std::filesystem::remove(schema_path);

String format_name = format;
if (format_name.empty())
format_name = FormatFactory::instance().getFormatFromSchemaFileName(schema_path);

if (format_name.empty())
return;
auto writer = FormatFactory::instance().getExternalSchemaWriter(format_name, context, std::move(format_settings));
assert(writer); /* confirmed with checkSchemaType */
writer->onDeleted();
}

std::vector<FormatSchemaFactory::SchemaEntry> FormatSchemaFactory::getSchemasList(const String & format, const ContextPtr & context) const
Expand Down
4 changes: 1 addition & 3 deletions src/Formats/FormatSchemaFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
#include <boost/core/noncopyable.hpp>
#include <Interpreters/Context_fwd.h>

#include <fstream>

namespace DB
{
/// Manages format schemas.
Expand All @@ -20,7 +18,7 @@ class FormatSchemaFactory final : private boost::noncopyable

static FormatSchemaFactory & instance();

void registerSchema(const String & schema_name, const String & format, std::string_view schema_body, ExistsOP exists_op, ContextPtr & context);
void registerSchema(const String & schema_name, const String & format, std::string_view schema_body, ExistsOP exists_op, const ContextPtr & context);

void unregisterSchema(const String & schema_name, const String & format, bool throw_if_not_exists, const ContextPtr & context);

Expand Down
98 changes: 79 additions & 19 deletions src/Formats/ProtobufSchemas.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#if USE_PROTOBUF
# include <Common/Exception.h>
# include <Common/LRUCache.h>
# include <Formats/FormatSchemaInfo.h>
# include <Formats/KafkaSchemaRegistry.h>
# include <Formats/ProtobufSchemas.h>
Expand All @@ -24,6 +23,12 @@ ProtobufSchemas & ProtobufSchemas::instance()
return instance;
}

void ProtobufSchemas::clear()
{
std::lock_guard lock(mutex);
importers.clear();
}

class ProtobufSchemas::ImporterWithSourceTree : public google::protobuf::compiler::MultiFileErrorCollector
{
public:
Expand All @@ -44,30 +49,41 @@ class ProtobufSchemas::ImporterWithSourceTree : public google::protobuf::compile
return descriptor;

const auto * file_descriptor = importer.Import(schema_path);
// If there are parsing errors, AddError() throws an exception and in this case the following line
// isn't executed.
if (error)
{
auto info = error.value();
error.reset();
throw Exception(
ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA,
"Cannot parse '{}' file, found an error at line {}, column {}, {}",
info.filename,
std::to_string(info.line),
zliang-min marked this conversation as resolved.
Show resolved Hide resolved
std::to_string(info.column),
info.message);
}

assert(file_descriptor);

if (with_envelope == WithEnvelope::No)
{
const auto * message_descriptor = file_descriptor->FindMessageTypeByName(message_name);
if (!message_descriptor)
throw Exception(
"Could not find a message named '" + message_name + "' in the schema file '" + schema_path + "'", ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Could not find a message named '{}' in the schema file '{}'",
message_name, schema_path);

return message_descriptor;
}
else
{
const auto * envelope_descriptor = file_descriptor->FindMessageTypeByName("Envelope");
if (!envelope_descriptor)
throw Exception(
"Could not find a message named 'Envelope' in the schema file '" + schema_path + "'", ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Could not find a message named 'Envelope' in the schema file '{}'",
schema_path);

const auto * message_descriptor = envelope_descriptor->FindNestedTypeByName(message_name); // silly protobuf API disallows a restricting the field type to messages
if (!message_descriptor)
throw Exception(
"Could not find a message named '" + message_name + "' in the schema file '" + schema_path + "'", ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Could not find a message named '{}' in the schema file '{}'",
message_name, schema_path);

return message_descriptor;
}
Expand All @@ -77,15 +93,24 @@ class ProtobufSchemas::ImporterWithSourceTree : public google::protobuf::compile
// Overrides google::protobuf::compiler::MultiFileErrorCollector:
void AddError(const String & filename, int line, int column, const String & message) override
{
throw Exception(
"Cannot parse '" + filename + "' file, found an error at line " + std::to_string(line) + ", column " + std::to_string(column)
+ ", " + message,
ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA);
/// Protobuf library code is not exception safe, we should
/// remember the error and throw it later from our side.
error = ErrorInfo{filename, line, column, message};
}

google::protobuf::compiler::DiskSourceTree disk_source_tree;
google::protobuf::compiler::Importer importer;
const WithEnvelope with_envelope;

struct ErrorInfo
{
String filename;
int line;
int column;
String message;
};

std::optional<ErrorInfo> error;
};


Expand All @@ -100,22 +125,57 @@ const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForFormatSch
}

/// proton: starts
/// Overrides google::protobuf::io::ErrorCollector:
void ProtobufSchemas::AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message)
namespace
{

class ErrorCollectorImpl : public google::protobuf::io::ErrorCollector
{
throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA,
"Cannot parse schema, found an error at line {}, column {}, error: {}", line, column, message);
public:
struct ErrorInfo
{
int line;
int column;
String message;
};

ErrorCollectorImpl() = default;

void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override
{
/// Protobuf library code is not exception safe, we should
/// remember the error and throw it later from our side.
if (!error_) /// Only remember the first error.
error_ = {line, column, message};
}

std::optional<ErrorInfo> error()
{
return error_;
}

private:

std::optional<ErrorInfo> error_;
};

}

void ProtobufSchemas::validateSchema(std::string_view schema)
{
ErrorCollectorImpl error_collector{};

google::protobuf::io::ArrayInputStream input{schema.data(), static_cast<int>(schema.size())};
google::protobuf::io::Tokenizer tokenizer(&input, this);
google::protobuf::io::Tokenizer tokenizer(&input, &error_collector);
google::protobuf::FileDescriptorProto descriptor;
google::protobuf::compiler::Parser parser;

parser.RecordErrorsTo(this);
parser.RecordErrorsTo(&error_collector);

parser.Parse(&tokenizer, &descriptor);

if (auto error = error_collector.error(); error)
throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA,
"Cannot parse schema, found an error at line {}, column {}, error: {}", error->line, error->column, error->message);
}
/// proton: ends

Expand Down
6 changes: 3 additions & 3 deletions src/Formats/ProtobufSchemas.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class FormatSchemaInfo;
/** Keeps parsed google protobuf schemas parsed from files.
* This class is used to handle the "Protobuf" input/output formats.
*/
class ProtobufSchemas : public google::protobuf::io::ErrorCollector /* proton: updated */
class ProtobufSchemas
{
public:
enum class WithEnvelope
Expand Down Expand Up @@ -61,14 +61,14 @@ class ProtobufSchemas : public google::protobuf::io::ErrorCollector /* proton: u
};

static ProtobufSchemas & instance();
// Clear cached protobuf schemas
void clear();

/// Parses the format schema, then parses the corresponding proto file, and returns the descriptor of the message type.
/// The function never returns nullptr, it throws an exception if it cannot load or parse the file.
const google::protobuf::Descriptor * getMessageTypeForFormatSchema(const FormatSchemaInfo & info, WithEnvelope with_envelope);

/// proton: starts
void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override;

/// Validates the given schema and throw a DB::Exception if the schema is invalid.
/// The exception will contain the first error encountered when validating the schema, i.e. there could be more errors.
void validateSchema(std::string_view schema);
Expand Down
51 changes: 39 additions & 12 deletions src/Processors/Formats/ISchemaWriter.h
Original file line number Diff line number Diff line change
@@ -1,32 +1,59 @@
#pragma once

#include "Formats/FormatSettings.h"
#include "IO/ReadBuffer.h"
#include <Formats/FormatSchemaInfo.h>
#include <Formats/FormatSettings.h>
#include <IO/WriteBufferFromFile.h>

#include <filesystem>

namespace DB
{

class IExternalSchemaWriter
{
public:
IExternalSchemaWriter(std::string_view schema_body_, const FormatSettings & settings_)
: schema_body(schema_body_)
, settings(settings_)
{}
explicit IExternalSchemaWriter(FormatSettings settings_)
: settings(std::move(settings_))
{}

virtual ~IExternalSchemaWriter() = default;

/// Validates the schema input. Should throw exceptions on validation failures.
virtual void validate() = 0;
virtual void validate(std::string_view schema_body) = 0;

/// Persistents the schema.
/// If the schema already exists, and replace_if_exist is false, it returns false.
/// Otherwise it returns true. Throws exceptions if it fails to write.
virtual bool write(bool replace_if_exist) = 0;
/// Returns false if file was not written, i.e. there was an existing schema file, and it's not replaced.
/// It throws exceptions if it fails to write.
bool write(std::string_view schema_body, bool replace_if_exist)
{
auto schema_info = getSchemaInfo();
auto already_exists = std::filesystem::exists(schema_info.absoluteSchemaPath());
if (already_exists && !replace_if_exist)
return false;

WriteBufferFromFile write_buffer{schema_info.absoluteSchemaPath()};
write_buffer.write(schema_body.data(), schema_body.size());
if (already_exists)
onReplaced();

return true;
}

/// A callback will be called when a schema gets deleted.
virtual void onDeleted() {}

protected:
std::string_view schema_body;
const FormatSettings & settings;
virtual String getFormatName() const = 0;

/// A callback will be called when an existing schema gets replaced.
virtual void onReplaced() {}

FormatSchemaInfo getSchemaInfo() const
{
return {settings.schema.format_schema, getFormatName(), false, settings.schema.is_server, settings.schema.format_schema_path};
}

FormatSettings settings;
};

}
Loading
Loading