Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion duckdb
Submodule duckdb updated 385 files
2 changes: 1 addition & 1 deletion extension_config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# Extension from this repo
duckdb_extension_load(ducklake
SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR}
SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR}
)

duckdb_extension_load(icu)
Expand Down
15 changes: 13 additions & 2 deletions src/common/ducklake_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,27 @@ static constexpr const ducklake_type_array DUCKLAKE_TYPES {{{"boolean", LogicalT
{"blob", LogicalTypeId::BLOB},
{"uuid", LogicalTypeId::UUID}}};

LogicalType ParseBaseType(const string &str) {
static LogicalType ParseBaseType(const string &str) {
for (auto &ducklake_type : DUCKLAKE_TYPES) {
if (StringUtil::CIEquals(str, ducklake_type.name)) {
return ducklake_type.id;
}
}

if (StringUtil::CIEquals(str, "json")) {
return LogicalType::JSON();
}

if (StringUtil::CIEquals(str, "geometry")) {
LogicalType geo_type(LogicalTypeId::BLOB);
geo_type.SetAlias("GEOMETRY");
return geo_type;
}

throw InvalidInputException("Failed to parse DuckLake type - unsupported type '%s'", str);
}

string ToStringBaseType(const LogicalType &type) {
static string ToStringBaseType(const LogicalType &type) {
for (auto &ducklake_type : DUCKLAKE_TYPES) {
if (type.id() == ducklake_type.id) {
return ducklake_type.name;
Expand Down Expand Up @@ -83,6 +91,9 @@ string DuckLakeTypes::ToString(const LogicalType &type) {
if (type.IsJSONType()) {
return "json";
}
if (type.GetAlias() == "GEOMETRY" && type.id() == LogicalTypeId::BLOB) {
return "geometry";
}
throw InvalidInputException("Unsupported user-defined type");
}
switch (type.id()) {
Expand Down
7 changes: 6 additions & 1 deletion src/common/ducklake_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,16 @@ string DuckLakeUtil::StatsToString(const string &text) {
return DuckLakeUtil::SQLLiteralToString(text);
}

string DuckLakeUtil::ValueToSQL(const Value &val) {
string DuckLakeUtil::ValueToSQL(ClientContext &context, const Value &val) {
// FIXME: this should be upstreamed
if (val.IsNull()) {
return val.ToSQLString();
}
if (val.type().HasAlias()) {
// extension type: cast to string
auto str_val = val.CastAs(context, LogicalType::VARCHAR);
return DuckLakeUtil::ValueToSQL(context, str_val);
}
switch (val.type().id()) {
case LogicalTypeId::VARCHAR: {
auto &str_val = StringValue::Get(val);
Expand Down
36 changes: 31 additions & 5 deletions src/functions/ducklake_add_data_files.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ struct DuckLakeAddDataFilesState : public GlobalTableFunctionState {
bool finished = false;
};

unique_ptr<GlobalTableFunctionState> DuckLakeAddDataFilesInit(ClientContext &context, TableFunctionInitInput &input) {
static unique_ptr<GlobalTableFunctionState> DuckLakeAddDataFilesInit(ClientContext &context,
TableFunctionInitInput &input) {
return make_uniq<DuckLakeAddDataFilesState>();
}

Expand Down Expand Up @@ -241,7 +242,7 @@ Value DuckLakeFileProcessor::GetStatsValue(string name, Value val) {

void DuckLakeFileProcessor::ReadParquetStats(const string &glob) {
auto result = transaction.Query(StringUtil::Format(R"(
SELECT file_name, column_id, num_values, coalesce(stats_min, stats_min_value), coalesce(stats_max, stats_max_value), stats_null_count, total_compressed_size
SELECT file_name, column_id, num_values, coalesce(stats_min, stats_min_value), coalesce(stats_max, stats_max_value), stats_null_count, total_compressed_size, geo_bbox, geo_types
FROM parquet_metadata(%s)
)",
SQLString(glob)));
Expand Down Expand Up @@ -280,6 +281,27 @@ FROM parquet_metadata(%s)
if (!row.IsNull(6)) {
stats.column_stats.push_back(GetStatsValue("column_size_bytes", row.GetValue<string>(6)));
}
if (!row.IsNull(7)) {
// Split the bbox struct into individual entries
auto bbox_value = row.iterator.chunk->GetValue(7, row.row);
auto &bbox_type = bbox_value.type();

auto &bbox_child_types = StructType::GetChildTypes(bbox_type);
auto &bbox_child_values = StructValue::GetChildren(bbox_value);

for (idx_t child_idx = 0; child_idx < bbox_child_types.size(); child_idx++) {
auto &name = bbox_child_types[child_idx].first;
auto &value = bbox_child_values[child_idx];
if (!value.IsNull()) {
stats.column_stats.push_back(GetStatsValue("bbox_" + name, value));
}
}
}
if (!row.IsNull(8)) {
auto list_value = row.iterator.chunk->GetValue(8, row.row);
stats.column_stats.push_back(GetStatsValue("geo_types", std::move(list_value)));
}

column.column_stats.push_back(std::move(stats));
}
}
Expand Down Expand Up @@ -373,6 +395,10 @@ LogicalType DuckLakeParquetTypeChecker::DeriveLogicalType(const ParquetColumn &s
return LogicalType::TIMESTAMP_NS;
} else if (StringUtil::StartsWith(s_ele.logical_type, "TimestampType(isAdjustedToUTC=1")) {
return LogicalType::TIMESTAMP_TZ;
} else if (StringUtil::StartsWith(s_ele.logical_type, "Geometry")) {
LogicalType geo_type(LogicalTypeId::BLOB);
geo_type.SetAlias("GEOMETRY");
return geo_type;
}
}
if (!s_ele.converted_type.empty()) {
Expand Down Expand Up @@ -440,7 +466,7 @@ LogicalType DuckLakeParquetTypeChecker::DeriveLogicalType(const ParquetColumn &s
throw InvalidInputException("Unrecognized type %s for parquet file", s_ele.type);
}

string FormatExpectedError(const vector<LogicalType> &expected) {
static string FormatExpectedError(const vector<LogicalType> &expected) {
string error;
for (auto &type : expected) {
if (!error.empty()) {
Expand Down Expand Up @@ -695,7 +721,7 @@ unique_ptr<DuckLakeNameMapEntry> DuckLakeFileProcessor::MapColumn(ParquetFileMet
return map_entry;
}

bool SupportsHivePartitioning(const LogicalType &type) {
static bool SupportsHivePartitioning(const LogicalType &type) {
if (type.IsNested()) {
return false;
}
Expand Down Expand Up @@ -830,7 +856,7 @@ vector<DuckLakeDataFile> DuckLakeFileProcessor::AddFiles(const vector<string> &g
return written_files;
}

void DuckLakeAddDataFilesExecute(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
static void DuckLakeAddDataFilesExecute(ClientContext &context, TableFunctionInput &data_p, DataChunk &output) {
auto &state = data_p.global_state->Cast<DuckLakeAddDataFilesState>();
auto &bind_data = data_p.bind_data->Cast<DuckLakeAddDataFilesData>();
auto &transaction = DuckLakeTransaction::Get(context, bind_data.catalog);
Expand Down
33 changes: 22 additions & 11 deletions src/functions/ducklake_compaction_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ DuckLakeCompactor::GenerateCompactionCommand(vector<DuckLakeCompactionFileEntry>
auto ducklake_scan =
make_uniq<LogicalGet>(table_idx, std::move(scan_function), std::move(bind_data), copy_options.expected_types,
copy_options.names, std::move(virtual_columns));

auto &column_ids = ducklake_scan->GetMutableColumnIds();
for (idx_t i = 0; i < columns.PhysicalColumnCount(); i++) {
column_ids.emplace_back(i);
Expand All @@ -416,6 +417,16 @@ DuckLakeCompactor::GenerateCompactionCommand(vector<DuckLakeCompactionFileEntry>
}
column_ids.emplace_back(DuckLakeMultiFileReader::COLUMN_IDENTIFIER_SNAPSHOT_ID);

// Resolve types so we can check if we need casts
ducklake_scan->ResolveOperatorTypes();

// Insert a cast projection if necessary
auto root = unique_ptr_cast<LogicalGet, LogicalOperator>(std::move(ducklake_scan));

if (DuckLakeInsert::RequireCasts(root->types)) {
root = DuckLakeInsert::InsertCasts(binder, root);
}

// generate the LogicalCopyToFile
auto copy = make_uniq<LogicalCopyToFile>(std::move(copy_options.copy_function), std::move(copy_options.bind_data),
std::move(copy_options.info));
Expand All @@ -440,8 +451,7 @@ DuckLakeCompactor::GenerateCompactionCommand(vector<DuckLakeCompactionFileEntry>
copy->preserve_order = PreserveOrderType::PRESERVE_ORDER;
copy->file_size_bytes = optional_idx();
copy->rotate = false;

copy->children.push_back(std::move(ducklake_scan));
copy->children.push_back(std::move(root));

optional_idx target_row_id_start;
if (files_are_adjacent) {
Expand All @@ -459,8 +469,8 @@ DuckLakeCompactor::GenerateCompactionCommand(vector<DuckLakeCompactionFileEntry>
//===--------------------------------------------------------------------===//
// Function
//===--------------------------------------------------------------------===//
unique_ptr<LogicalOperator> GenerateCompactionOperator(TableFunctionBindInput &input, idx_t bind_index,
vector<unique_ptr<LogicalOperator>> &compactions) {
static unique_ptr<LogicalOperator> GenerateCompactionOperator(TableFunctionBindInput &input, idx_t bind_index,
vector<unique_ptr<LogicalOperator>> &compactions) {
if (compactions.empty()) {
// nothing to compact - generate an empty result
vector<ColumnBinding> bindings;
Expand All @@ -478,9 +488,10 @@ unique_ptr<LogicalOperator> GenerateCompactionOperator(TableFunctionBindInput &i
return union_op;
}

void GenerateCompaction(ClientContext &context, DuckLakeTransaction &transaction, DuckLakeCatalog &ducklake_catalog,
TableFunctionBindInput &input, DuckLakeTableEntry &cur_table, CompactionType type,
double delete_threshold, vector<unique_ptr<LogicalOperator>> &compactions) {
static void GenerateCompaction(ClientContext &context, DuckLakeTransaction &transaction,
DuckLakeCatalog &ducklake_catalog, TableFunctionBindInput &input,
DuckLakeTableEntry &cur_table, CompactionType type, double delete_threshold,
vector<unique_ptr<LogicalOperator>> &compactions) {
switch (type) {
case CompactionType::MERGE_ADJACENT_TABLES: {
DuckLakeCompactor compactor(context, ducklake_catalog, transaction, *input.binder, cur_table.GetTableId());
Expand Down Expand Up @@ -559,8 +570,8 @@ unique_ptr<LogicalOperator> BindCompaction(ClientContext &context, TableFunction
return GenerateCompactionOperator(input, bind_index, compactions);
}

unique_ptr<LogicalOperator> MergeAdjacentFilesBind(ClientContext &context, TableFunctionBindInput &input,
idx_t bind_index, vector<string> &return_names) {
static unique_ptr<LogicalOperator> MergeAdjacentFilesBind(ClientContext &context, TableFunctionBindInput &input,
idx_t bind_index, vector<string> &return_names) {
return_names.push_back("Success");
return BindCompaction(context, input, bind_index, CompactionType::MERGE_ADJACENT_TABLES);
}
Expand All @@ -579,8 +590,8 @@ TableFunctionSet DuckLakeMergeAdjacentFilesFunction::GetFunctions() {
return set;
}

unique_ptr<LogicalOperator> RewriteFilesBind(ClientContext &context, TableFunctionBindInput &input, idx_t bind_index,
vector<string> &return_names) {
static unique_ptr<LogicalOperator> RewriteFilesBind(ClientContext &context, TableFunctionBindInput &input,
idx_t bind_index, vector<string> &return_names) {

return_names.push_back("Success");
return BindCompaction(context, input, bind_index, CompactionType::REWRITE_DELETES);
Expand Down
13 changes: 9 additions & 4 deletions src/functions/ducklake_flush_inlined_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ unique_ptr<LogicalOperator> DuckLakeDataFlusher::GenerateFlushCommand() {
column_ids.emplace_back(COLUMN_IDENTIFIER_ROW_ID);
column_ids.emplace_back(DuckLakeMultiFileReader::COLUMN_IDENTIFIER_SNAPSHOT_ID);

unique_ptr<LogicalOperator> root;
root = std::move(ducklake_scan);
auto root = unique_ptr_cast<LogicalGet, LogicalOperator>(std::move(ducklake_scan));

if (!copy_options.projection_list.empty()) {
// push a projection
Expand All @@ -194,6 +193,12 @@ unique_ptr<LogicalOperator> DuckLakeDataFlusher::GenerateFlushCommand() {
root = std::move(proj);
}

// Add another projection with casts if necessary
root->ResolveOperatorTypes();
if (DuckLakeInsert::RequireCasts(root->types)) {
root = DuckLakeInsert::InsertCasts(binder, root);
}

// generate the LogicalCopyToFile
auto copy = make_uniq<LogicalCopyToFile>(std::move(copy_options.copy_function), std::move(copy_options.bind_data),
std::move(copy_options.info));
Expand Down Expand Up @@ -227,8 +232,8 @@ unique_ptr<LogicalOperator> DuckLakeDataFlusher::GenerateFlushCommand() {
//===--------------------------------------------------------------------===//
// Function
//===--------------------------------------------------------------------===//
unique_ptr<LogicalOperator> FlushInlinedDataBind(ClientContext &context, TableFunctionBindInput &input,
idx_t bind_index, vector<string> &return_names) {
static unique_ptr<LogicalOperator> FlushInlinedDataBind(ClientContext &context, TableFunctionBindInput &input,
idx_t bind_index, vector<string> &return_names) {
// gather a list of files to compact
auto &catalog = BaseMetadataFunction::GetCatalog(context, input.inputs[0]);
auto &ducklake_catalog = catalog.Cast<DuckLakeCatalog>();
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/ducklake_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class DuckLakeUtil {
static string SQLIdentifierToString(const string &text);
static string SQLLiteralToString(const string &text);
static string StatsToString(const string &text);
static string ValueToSQL(const Value &val);
static string ValueToSQL(ClientContext &context, const Value &val);

static ParsedCatalogEntry ParseCatalogEntry(const string &input);
static string JoinPath(FileSystem &fs, const string &a, const string &b);
Expand Down
5 changes: 5 additions & 0 deletions src/include/storage/ducklake_insert.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ class DuckLakeInsert : public PhysicalOperator {
return true;
}

static bool RequireCasts(const vector<LogicalType> &types);
static void InsertCasts(const vector<LogicalType> &types, ClientContext &context, PhysicalPlanGenerator &planner,
optional_ptr<PhysicalOperator> &plan);
static unique_ptr<LogicalOperator> InsertCasts(Binder &binder, unique_ptr<LogicalOperator> &plan);

static DuckLakeColumnStats ParseColumnStats(const LogicalType &type, const vector<Value> &stats);
static DuckLakeCopyOptions GetCopyOptions(ClientContext &context, DuckLakeCopyInput &copy_input);
static PhysicalOperator &PlanCopyForInsert(ClientContext &context, PhysicalPlanGenerator &planner,
Expand Down
4 changes: 4 additions & 0 deletions src/include/storage/ducklake_metadata_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ struct DuckLakeColumnStatsInfo {
string min_val;
string max_val;
string contains_nan;
string extra_stats;
};

struct DuckLakeFilePartitionInfo {
Expand Down Expand Up @@ -195,6 +196,9 @@ struct DuckLakeGlobalColumnStatsInfo {

string max_val;
bool has_max = false;

string extra_stats;
bool has_extra_stats = false;
};

struct DuckLakeGlobalStatsInfo {
Expand Down
49 changes: 49 additions & 0 deletions src/include/storage/ducklake_stats.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,56 @@
namespace duckdb {
class BaseStatistics;

struct DuckLakeColumnExtraStats {
virtual ~DuckLakeColumnExtraStats() = default;

virtual void Merge(const DuckLakeColumnExtraStats &new_stats) = 0;
virtual unique_ptr<DuckLakeColumnExtraStats> Copy() const = 0;

// Convert the stats into a string representation for storage (e.g. JSON)
virtual string Serialize() const = 0;
// Parse the stats from a string
virtual void Deserialize(const string &stats) = 0;

template <class TARGET>
TARGET &Cast() {
DynamicCastCheck<TARGET>(this);
return reinterpret_cast<TARGET &>(*this);
}
template <class TARGET>
const TARGET &Cast() const {
DynamicCastCheck<TARGET>(this);
return reinterpret_cast<const TARGET &>(*this);
}
};

struct DuckLakeColumnGeoStats final : public DuckLakeColumnExtraStats {

DuckLakeColumnGeoStats();
void Merge(const DuckLakeColumnExtraStats &new_stats) override;
unique_ptr<DuckLakeColumnExtraStats> Copy() const override;

string Serialize() const override;
void Deserialize(const string &stats) override;

public:
double xmin, xmax, ymin, ymax, zmin, zmax, mmin, mmax;
set<string> geo_types;
};

struct DuckLakeColumnStats {
explicit DuckLakeColumnStats(LogicalType type_p) : type(std::move(type_p)) {
if (type.id() == LogicalTypeId::BLOB && type.HasAlias() && type.GetAlias() == "GEOMETRY") {
extra_stats = make_uniq<DuckLakeColumnGeoStats>();
}
}

// Copy constructor
DuckLakeColumnStats(const DuckLakeColumnStats &other);
DuckLakeColumnStats &operator=(const DuckLakeColumnStats &other);
DuckLakeColumnStats(DuckLakeColumnStats &&other) noexcept = default;
DuckLakeColumnStats &operator=(DuckLakeColumnStats &&other) noexcept = default;

LogicalType type;
string min;
string max;
Expand All @@ -32,9 +78,12 @@ struct DuckLakeColumnStats {
bool any_valid = true;
bool has_contains_nan = false;

unique_ptr<DuckLakeColumnExtraStats> extra_stats;

public:
unique_ptr<BaseStatistics> ToStats() const;
void MergeStats(const DuckLakeColumnStats &new_stats);
DuckLakeColumnStats Copy() const;

private:
unique_ptr<BaseStatistics> CreateNumericStats() const;
Expand Down
Loading
Loading