Skip to content

Commit

Permalink
bugfix: format schema cache is not cleared after the schema is dropped (
Browse files Browse the repository at this point in the history
  • Loading branch information
zliang-min authored Jul 5, 2024
1 parent 4588e5c commit 2b723fb
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 93 deletions.
10 changes: 4 additions & 6 deletions src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <Formats/FormatFactory.h>

#include <algorithm>
#include <Core/Settings.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -582,16 +581,15 @@ bool FormatFactory::checkIfFormatHasExternalSchemaWriter(const String & name)

ExternalSchemaWriterPtr FormatFactory::getExternalSchemaWriter(
const String & name,
std::string_view body,
ContextPtr & context,
const std::optional<FormatSettings> & _format_settings) const
const ContextPtr & context,
std::optional<FormatSettings> format_settings_) const
{
const auto & external_schema_writer_creator = dict.at(name).external_schema_writer_creator;
if (!external_schema_writer_creator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Format {} doesn't support schema creation.", name);

auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
return external_schema_writer_creator(body, format_settings);
auto format_settings = format_settings_ ? *format_settings_ : getFormatSettings(context);
return external_schema_writer_creator(std::move(format_settings));
}
/// proton: ends

Expand Down
7 changes: 3 additions & 4 deletions src/Formats/FormatFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class FormatFactory final : private boost::noncopyable
using ExternalSchemaReaderCreator = std::function<ExternalSchemaReaderPtr(const FormatSettings & settings)>;

/// proton: starts
using ExternalSchemaWriterCreator = std::function<ExternalSchemaWriterPtr(std::string_view schema_body, const FormatSettings & settings)>;
using ExternalSchemaWriterCreator = std::function<ExternalSchemaWriterPtr(FormatSettings settings)>;
/// proton: ends

/// Some formats can extract different schemas from the same source depending on
Expand Down Expand Up @@ -220,9 +220,8 @@ class FormatFactory final : private boost::noncopyable

ExternalSchemaWriterPtr getExternalSchemaWriter(
const String & name,
std::string_view body,
ContextPtr & context,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
const ContextPtr & context,
std::optional<FormatSettings> format_settings_ = std::nullopt) const;
/// proton: ends

/// Register schema readers for format its name.
Expand Down
20 changes: 15 additions & 5 deletions src/Formats/FormatSchemaFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <Processors/Formats/ISchemaWriter.h>

#include <format>
#include <fstream>

namespace DB
{
Expand All @@ -28,7 +29,7 @@ String basename(const std::filesystem::path & path)

}

void FormatSchemaFactory::registerSchema(const String & schema_name, const String & format, std::string_view schema_body, ExistsOP exists_op, ContextPtr & context)
void FormatSchemaFactory::registerSchema(const String & schema_name, const String & format, std::string_view schema_body, ExistsOP exists_op, const ContextPtr & context)
{
assert(!schema_name.empty());
assert(!format.empty());
Expand All @@ -41,21 +42,20 @@ void FormatSchemaFactory::registerSchema(const String & schema_name, const Strin
format_settings.schema.is_server = true;

std::lock_guard lock(mutex);
auto writer = FormatFactory::instance().getExternalSchemaWriter(format, schema_body, context, format_settings);
auto writer = FormatFactory::instance().getExternalSchemaWriter(format, context, std::move(format_settings));
assert(writer); /* confirmed with checkSchemaType */

try
{
writer->validate();
writer->validate(schema_body);
}
catch (DB::Exception & e)
{
e.addMessage(std::format("{} schema {} was invalid", format, schema_name));
e.rethrow();
}


auto result = writer->write(exists_op == ExistsOP::Replace);
auto result = writer->write(schema_body, exists_op == ExistsOP::Replace);
if (!result && exists_op == ExistsOP::Throw)
throw Exception(ErrorCodes::FORMAT_SCHEMA_ALREADY_EXISTS, "Format schema {} of type {} already exists", schema_name, format);
}
Expand Down Expand Up @@ -85,6 +85,16 @@ void FormatSchemaFactory::unregisterSchema(const String & schema_name, const Str
}

std::filesystem::remove(schema_path);

String format_name = format;
if (format_name.empty())
format_name = FormatFactory::instance().getFormatFromSchemaFileName(schema_path);

if (format_name.empty())
return;
auto writer = FormatFactory::instance().getExternalSchemaWriter(format_name, context, std::move(format_settings));
assert(writer); /* confirmed with checkSchemaType */
writer->onDeleted();
}

std::vector<FormatSchemaFactory::SchemaEntry> FormatSchemaFactory::getSchemasList(const String & format, const ContextPtr & context) const
Expand Down
4 changes: 1 addition & 3 deletions src/Formats/FormatSchemaFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
#include <boost/core/noncopyable.hpp>
#include <Interpreters/Context_fwd.h>

#include <fstream>

namespace DB
{
/// Manages format schemas.
Expand All @@ -20,7 +18,7 @@ class FormatSchemaFactory final : private boost::noncopyable

static FormatSchemaFactory & instance();

void registerSchema(const String & schema_name, const String & format, std::string_view schema_body, ExistsOP exists_op, ContextPtr & context);
void registerSchema(const String & schema_name, const String & format, std::string_view schema_body, ExistsOP exists_op, const ContextPtr & context);

void unregisterSchema(const String & schema_name, const String & format, bool throw_if_not_exists, const ContextPtr & context);

Expand Down
98 changes: 79 additions & 19 deletions src/Formats/ProtobufSchemas.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#if USE_PROTOBUF
# include <Common/Exception.h>
# include <Common/LRUCache.h>
# include <Formats/FormatSchemaInfo.h>
# include <Formats/KafkaSchemaRegistry.h>
# include <Formats/ProtobufSchemas.h>
Expand All @@ -24,6 +23,12 @@ ProtobufSchemas & ProtobufSchemas::instance()
return instance;
}

void ProtobufSchemas::clear()
{
std::lock_guard lock(mutex);
importers.clear();
}

class ProtobufSchemas::ImporterWithSourceTree : public google::protobuf::compiler::MultiFileErrorCollector
{
public:
Expand All @@ -44,30 +49,41 @@ class ProtobufSchemas::ImporterWithSourceTree : public google::protobuf::compile
return descriptor;

const auto * file_descriptor = importer.Import(schema_path);
// If there are parsing errors, AddError() throws an exception and in this case the following line
// isn't executed.
if (error)
{
auto info = error.value();
error.reset();
throw Exception(
ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA,
"Cannot parse '{}' file, found an error at line {}, column {}, {}",
info.filename,
info.line,
info.column,
info.message);
}

assert(file_descriptor);

if (with_envelope == WithEnvelope::No)
{
const auto * message_descriptor = file_descriptor->FindMessageTypeByName(message_name);
if (!message_descriptor)
throw Exception(
"Could not find a message named '" + message_name + "' in the schema file '" + schema_path + "'", ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Could not find a message named '{}' in the schema file '{}'",
message_name, schema_path);

return message_descriptor;
}
else
{
const auto * envelope_descriptor = file_descriptor->FindMessageTypeByName("Envelope");
if (!envelope_descriptor)
throw Exception(
"Could not find a message named 'Envelope' in the schema file '" + schema_path + "'", ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Could not find a message named 'Envelope' in the schema file '{}'",
schema_path);

const auto * message_descriptor = envelope_descriptor->FindNestedTypeByName(message_name); // silly protobuf API disallows a restricting the field type to messages
if (!message_descriptor)
throw Exception(
"Could not find a message named '" + message_name + "' in the schema file '" + schema_path + "'", ErrorCodes::BAD_ARGUMENTS);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Could not find a message named '{}' in the schema file '{}'",
message_name, schema_path);

return message_descriptor;
}
Expand All @@ -77,15 +93,24 @@ class ProtobufSchemas::ImporterWithSourceTree : public google::protobuf::compile
// Overrides google::protobuf::compiler::MultiFileErrorCollector:
void AddError(const String & filename, int line, int column, const String & message) override
{
throw Exception(
"Cannot parse '" + filename + "' file, found an error at line " + std::to_string(line) + ", column " + std::to_string(column)
+ ", " + message,
ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA);
/// Protobuf library code is not exception safe, we should
/// remember the error and throw it later from our side.
error = ErrorInfo{filename, line, column, message};
}

google::protobuf::compiler::DiskSourceTree disk_source_tree;
google::protobuf::compiler::Importer importer;
const WithEnvelope with_envelope;

struct ErrorInfo
{
String filename;
int line;
int column;
String message;
};

std::optional<ErrorInfo> error;
};


Expand All @@ -100,22 +125,57 @@ const google::protobuf::Descriptor * ProtobufSchemas::getMessageTypeForFormatSch
}

/// proton: starts
/// Overrides google::protobuf::io::ErrorCollector:
void ProtobufSchemas::AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message)
namespace
{

class ErrorCollectorImpl : public google::protobuf::io::ErrorCollector
{
throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA,
"Cannot parse schema, found an error at line {}, column {}, error: {}", line, column, message);
public:
struct ErrorInfo
{
int line;
int column;
String message;
};

ErrorCollectorImpl() = default;

void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override
{
/// Protobuf library code is not exception safe, we should
/// remember the error and throw it later from our side.
if (!error_) /// Only remember the first error.
error_ = {line, column, message};
}

std::optional<ErrorInfo> error()
{
return error_;
}

private:

std::optional<ErrorInfo> error_;
};

}

void ProtobufSchemas::validateSchema(std::string_view schema)
{
ErrorCollectorImpl error_collector{};

google::protobuf::io::ArrayInputStream input{schema.data(), static_cast<int>(schema.size())};
google::protobuf::io::Tokenizer tokenizer(&input, this);
google::protobuf::io::Tokenizer tokenizer(&input, &error_collector);
google::protobuf::FileDescriptorProto descriptor;
google::protobuf::compiler::Parser parser;

parser.RecordErrorsTo(this);
parser.RecordErrorsTo(&error_collector);

parser.Parse(&tokenizer, &descriptor);

if (auto error = error_collector.error(); error)
throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA,
"Cannot parse schema, found an error at line {}, column {}, error: {}", error->line, error->column, error->message);
}
/// proton: ends

Expand Down
6 changes: 3 additions & 3 deletions src/Formats/ProtobufSchemas.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class FormatSchemaInfo;
/** Keeps parsed google protobuf schemas parsed from files.
* This class is used to handle the "Protobuf" input/output formats.
*/
class ProtobufSchemas : public google::protobuf::io::ErrorCollector /* proton: updated */
class ProtobufSchemas
{
public:
enum class WithEnvelope
Expand Down Expand Up @@ -61,14 +61,14 @@ class ProtobufSchemas : public google::protobuf::io::ErrorCollector /* proton: u
};

static ProtobufSchemas & instance();
// Clear cached protobuf schemas
void clear();

/// Parses the format schema, then parses the corresponding proto file, and returns the descriptor of the message type.
/// The function never returns nullptr, it throws an exception if it cannot load or parse the file.
const google::protobuf::Descriptor * getMessageTypeForFormatSchema(const FormatSchemaInfo & info, WithEnvelope with_envelope);

/// proton: starts
void AddError(int line, google::protobuf::io::ColumnNumber column, const std::string & message) override;

/// Validates the given schema and throw a DB::Exception if the schema is invalid.
/// The exception will contain the first error encountered when validating the schema, i.e. there could be more errors.
void validateSchema(std::string_view schema);
Expand Down
51 changes: 39 additions & 12 deletions src/Processors/Formats/ISchemaWriter.h
Original file line number Diff line number Diff line change
@@ -1,32 +1,59 @@
#pragma once

#include "Formats/FormatSettings.h"
#include "IO/ReadBuffer.h"
#include <Formats/FormatSchemaInfo.h>
#include <Formats/FormatSettings.h>
#include <IO/WriteBufferFromFile.h>

#include <filesystem>

namespace DB
{

class IExternalSchemaWriter
{
public:
IExternalSchemaWriter(std::string_view schema_body_, const FormatSettings & settings_)
: schema_body(schema_body_)
, settings(settings_)
{}
explicit IExternalSchemaWriter(FormatSettings settings_)
: settings(std::move(settings_))
{}

virtual ~IExternalSchemaWriter() = default;

/// Validates the schema input. Should throw exceptions on validation failures.
virtual void validate() = 0;
virtual void validate(std::string_view schema_body) = 0;

/// Persistents the schema.
/// If the schema already exists, and replace_if_exist is false, it returns false.
/// Otherwise it returns true. Throws exceptions if it fails to write.
virtual bool write(bool replace_if_exist) = 0;
/// Returns false if file was not written, i.e. there was an existing schema file, and it's not replaced.
/// It throws exceptions if it fails to write.
bool write(std::string_view schema_body, bool replace_if_exist)
{
auto schema_info = getSchemaInfo();
auto already_exists = std::filesystem::exists(schema_info.absoluteSchemaPath());
if (already_exists && !replace_if_exist)
return false;

WriteBufferFromFile write_buffer{schema_info.absoluteSchemaPath()};
write_buffer.write(schema_body.data(), schema_body.size());
if (already_exists)
onReplaced();

return true;
}

/// A callback will be called when a schema gets deleted.
virtual void onDeleted() {}

protected:
std::string_view schema_body;
const FormatSettings & settings;
virtual String getFormatName() const = 0;

/// A callback will be called when an existing schema gets replaced.
virtual void onReplaced() {}

FormatSchemaInfo getSchemaInfo() const
{
return {settings.schema.format_schema, getFormatName(), false, settings.schema.is_server, settings.schema.format_schema_path};
}

FormatSettings settings;
};

}
Loading

0 comments on commit 2b723fb

Please sign in to comment.