Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/catalog_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ bool IRCAPI::VerifySchemaExistence(ClientContext &context, IcebergCatalog &catal
auto schema_name = GetEncodedSchemaName(namespace_items);

auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPrefixComponent(catalog.prefix);
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);
bool execute_head =
Expand All @@ -156,7 +156,7 @@ bool IRCAPI::VerifyTableExistence(ClientContext &context, IcebergCatalog &catalo
auto schema_name = GetEncodedSchemaName(schema.namespace_items);

auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPrefixComponent(catalog.prefix);
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);
url_builder.AddPathComponent("tables");
Expand All @@ -171,7 +171,7 @@ static unique_ptr<HTTPResponse> GetTableMetadata(ClientContext &context, Iceberg
auto schema_name = IRCAPI::GetEncodedSchemaName(schema.namespace_items);

auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPrefixComponent(catalog.prefix);
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);
url_builder.AddPathComponent("tables");
Expand Down Expand Up @@ -216,7 +216,7 @@ vector<rest_api_objects::TableIdentifier> IRCAPI::GetTables(ClientContext &conte

do {
auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPrefixComponent(catalog.prefix);
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);
url_builder.AddPathComponent("tables");
Expand Down Expand Up @@ -266,7 +266,7 @@ vector<IRCAPISchema> IRCAPI::GetSchemas(ClientContext &context, IcebergCatalog &
string page_token = "";
do {
auto endpoint_builder = catalog.GetBaseUrl();
endpoint_builder.AddPathComponent(catalog.prefix);
endpoint_builder.AddPrefixComponent(catalog.prefix);
endpoint_builder.AddPathComponent("namespaces");
if (!parent.empty()) {
auto parent_name = GetSchemaName(parent);
Expand Down Expand Up @@ -322,7 +322,7 @@ vector<IRCAPISchema> IRCAPI::GetSchemas(ClientContext &context, IcebergCatalog &

void IRCAPI::CommitMultiTableUpdate(ClientContext &context, IcebergCatalog &catalog, const string &body) {
auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPrefixComponent(catalog.prefix);
url_builder.AddPathComponent("transactions");
url_builder.AddPathComponent("commit");
HTTPHeaders headers(*context.db);
Expand All @@ -340,7 +340,7 @@ void IRCAPI::CommitTableUpdate(ClientContext &context, IcebergCatalog &catalog,
auto schema_name = GetEncodedSchemaName(schema);

auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPrefixComponent(catalog.prefix);
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);
url_builder.AddPathComponent("tables");
Expand All @@ -359,7 +359,7 @@ void IRCAPI::CommitTableDelete(ClientContext &context, IcebergCatalog &catalog,
const string &table_name) {
auto schema_name = GetEncodedSchemaName(schema);
auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPrefixComponent(catalog.prefix);
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);

Expand All @@ -379,7 +379,7 @@ void IRCAPI::CommitTableDelete(ClientContext &context, IcebergCatalog &catalog,

void IRCAPI::CommitNamespaceCreate(ClientContext &context, IcebergCatalog &catalog, string body) {
auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPrefixComponent(catalog.prefix);
url_builder.AddPathComponent("namespaces");
HTTPHeaders headers(*context.db);
headers.Insert("Content-Type", "application/json");
Expand All @@ -394,7 +394,7 @@ void IRCAPI::CommitNamespaceCreate(ClientContext &context, IcebergCatalog &catal
void IRCAPI::CommitNamespaceDrop(ClientContext &context, IcebergCatalog &catalog, vector<string> namespace_items) {
auto url_builder = catalog.GetBaseUrl();
auto schema_name = GetEncodedSchemaName(namespace_items);
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPrefixComponent(catalog.prefix);
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(schema_name);

Expand All @@ -414,7 +414,7 @@ rest_api_objects::LoadTableResult IRCAPI::CommitNewTable(ClientContext &context,
auto &ic_schema = table->schema.Cast<IcebergSchemaEntry>();
auto table_namespace = GetEncodedSchemaName(ic_schema.namespace_items);
auto url_builder = catalog.GetBaseUrl();
url_builder.AddPathComponent(catalog.prefix);
url_builder.AddPrefixComponent(catalog.prefix);
url_builder.AddPathComponent("namespaces");
url_builder.AddPathComponent(table_namespace);
url_builder.AddPathComponent("tables");
Expand Down
19 changes: 19 additions & 0 deletions src/common/url_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,25 @@ void IRCEndpointBuilder::AddPathComponent(const string &component) {
path_components.push_back(component);
}

void IRCEndpointBuilder::AddPrefixComponent(const string &component) {
if (component.empty()) {
return;
}

// If the component contains slashes, split it into multiple segments
if (component.find('/') != string::npos) {
Comment thread
talatuyarer marked this conversation as resolved.
auto segments = StringUtil::Split(component, '/');
for (const auto &segment : segments) {
if (!segment.empty()) {
path_components.push_back(segment);
}
}
} else {
// Single component without slashes
path_components.push_back(component);
}
}

string IRCEndpointBuilder::GetHost() const {
return host;
}
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/iceberg_authorization.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ struct IcebergAuthorization {
public:
static IcebergAuthorizationType TypeFromString(const string &type);

static void ParseExtraHttpHeaders(const Value &headers_value, unordered_map<string, string> &out_headers);

public:
virtual unique_ptr<HTTPResponse> Request(RequestType request_type, ClientContext &context,
const IRCEndpointBuilder &endpoint_builder, HTTPHeaders &headers,
Expand All @@ -65,6 +67,7 @@ struct IcebergAuthorization {
public:
IcebergAuthorizationType type;
unique_ptr<HTTPClient> client;
unordered_map<string, string> extra_http_headers;
};

} // namespace duckdb
1 change: 1 addition & 0 deletions src/include/url_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class IRCEndpointBuilder {
public:
IRCEndpointBuilder();
void AddPathComponent(const string &component);
void AddPrefixComponent(const string &component);

void SetHost(const string &host);
string GetHost() const;
Expand Down
13 changes: 13 additions & 0 deletions src/storage/authorization/none.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "storage/authorization/none.hpp"
#include "api_utils.hpp"
#include "storage/catalog/iceberg_catalog.hpp"
#include "duckdb/common/types/value.hpp"

namespace duckdb {

Expand All @@ -9,12 +10,24 @@ NoneAuthorization::NoneAuthorization() : IcebergAuthorization(IcebergAuthorizati

unique_ptr<IcebergAuthorization> NoneAuthorization::FromAttachOptions(IcebergAttachOptions &input) {
auto result = make_uniq<NoneAuthorization>();

// Parse extra_http_headers if provided directly in attach options
if (input.options.count("extra_http_headers")) {
IcebergAuthorization::ParseExtraHttpHeaders(input.options["extra_http_headers"], result->extra_http_headers);
input.options.erase("extra_http_headers");
}

return std::move(result);
}

unique_ptr<HTTPResponse> NoneAuthorization::Request(RequestType request_type, ClientContext &context,
const IRCEndpointBuilder &endpoint_builder, HTTPHeaders &headers,
const string &data) {
// Merge extra HTTP headers
for (auto &entry : extra_http_headers) {
headers.Insert(entry.first, entry.second);
}

return APIUtils::Request(request_type, context, endpoint_builder, client, headers, data);
}

Expand Down
41 changes: 33 additions & 8 deletions src/storage/authorization/oauth2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "duckdb/common/exception/http_exception.hpp"
#include "duckdb/logging/logger.hpp"
#include "duckdb/common/types/blob.hpp"
#include "duckdb/common/types/value.hpp"

namespace duckdb {

Expand All @@ -17,10 +18,14 @@ namespace {
//! So we use this to deduplicate it instead
static const case_insensitive_map_t<LogicalType> &IcebergSecretOptions() {
static const case_insensitive_map_t<LogicalType> options {
{"client_id", LogicalType::VARCHAR}, {"client_secret", LogicalType::VARCHAR},
{"endpoint", LogicalType::VARCHAR}, {"token", LogicalType::VARCHAR},
{"oauth2_scope", LogicalType::VARCHAR}, {"oauth2_server_uri", LogicalType::VARCHAR},
{"oauth2_grant_type", LogicalType::VARCHAR}};
{"client_id", LogicalType::VARCHAR},
{"client_secret", LogicalType::VARCHAR},
{"endpoint", LogicalType::VARCHAR},
{"token", LogicalType::VARCHAR},
{"oauth2_scope", LogicalType::VARCHAR},
{"oauth2_server_uri", LogicalType::VARCHAR},
{"oauth2_grant_type", LogicalType::VARCHAR},
{"extra_http_headers", LogicalType::MAP(LogicalType::VARCHAR, LogicalType::VARCHAR)}};
return options;
}

Expand Down Expand Up @@ -112,8 +117,8 @@ unique_ptr<OAuth2Authorization> OAuth2Authorization::FromAttachOptions(ClientCon
Value token;

static const unordered_set<string> recognized_create_secret_options {
"oauth2_scope", "oauth2_server_uri", "oauth2_grant_type", "token",
"client_id", "client_secret", "access_delegation_mode"};
"oauth2_scope", "oauth2_server_uri", "oauth2_grant_type", "token",
"client_id", "client_secret", "access_delegation_mode", "extra_http_headers"};

for (auto &entry : input.options) {
auto lower_name = StringUtil::Lower(entry.first);
Expand Down Expand Up @@ -153,6 +158,10 @@ unique_ptr<OAuth2Authorization> OAuth2Authorization::FromAttachOptions(ClientCon
input.endpoint = endpoint_from_secret.ToString();
}
token = kv_iceberg_secret.TryGetValue("token");

// Parse extra_http_headers from secret if present
IcebergAuthorization::ParseExtraHttpHeaders(kv_iceberg_secret.TryGetValue("extra_http_headers"),
result->extra_http_headers);
} else {
if (!secret.empty()) {
set<string> option_names;
Expand All @@ -171,6 +180,10 @@ unique_ptr<OAuth2Authorization> OAuth2Authorization::FromAttachOptions(ClientCon
auto new_secret = OAuth2Authorization::CreateCatalogSecretFunction(context, create_secret_input);
auto &kv_iceberg_secret = dynamic_cast<KeyValueSecret &>(*new_secret);
token = kv_iceberg_secret.TryGetValue("token");

// Parse extra_http_headers from inline options if present
IcebergAuthorization::ParseExtraHttpHeaders(kv_iceberg_secret.TryGetValue("extra_http_headers"),
result->extra_http_headers);
}

if (token.IsNull()) {
Expand All @@ -195,7 +208,13 @@ unique_ptr<BaseSecret> OAuth2Authorization::CreateCatalogSecretFunction(ClientCo
auto &param_name = named_param.first;
auto it = accepted_parameters.find(param_name);
if (it != accepted_parameters.end()) {
result->secret_map[param_name] = named_param.second.ToString();
// Special handling for extra_http_headers (MAP type)
if (StringUtil::Lower(param_name) == "extra_http_headers") {
// Store the MAP value directly, will be parsed later when creating authorization
result->secret_map[param_name] = named_param.second;
} else {
result->secret_map[param_name] = named_param.second.ToString();
}
} else {
throw InvalidInputException("Unknown named parameter passed to CreateIRCSecretFunction: %s", param_name);
}
Expand Down Expand Up @@ -263,9 +282,15 @@ unique_ptr<BaseSecret> OAuth2Authorization::CreateCatalogSecretFunction(ClientCo
unique_ptr<HTTPResponse> OAuth2Authorization::Request(RequestType request_type, ClientContext &context,
const IRCEndpointBuilder &endpoint_builder, HTTPHeaders &headers,
const string &data) {
for (auto &entry : extra_http_headers) {
headers.Insert(entry.first, entry.second);
}

// DuckDB's Bearer token always takes precedence over custom Authorization headers
if (!token.empty()) {
headers.Insert("Authorization", StringUtil::Format("Bearer %s", token));
headers["Authorization"] = StringUtil::Format("Bearer %s", token);
}

return APIUtils::Request(request_type, context, endpoint_builder, client, headers, data);
}

Expand Down
10 changes: 10 additions & 0 deletions src/storage/authorization/sigv4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "duckdb/common/file_system.hpp"
#include "duckdb/main/setting_info.hpp"
#include "storage/catalog/iceberg_catalog.hpp"
#include "duckdb/common/types/value.hpp"

namespace duckdb {

Expand Down Expand Up @@ -49,6 +50,9 @@ unique_ptr<IcebergAuthorization> SIGV4Authorization::FromAttachOptions(IcebergAt
throw InvalidInputException("Duplicate 'secret' option detected!");
}
result->secret = StringUtil::Lower(entry.second.ToString());
} else if (lower_name == "extra_http_headers") {
// Parse extra_http_headers if provided directly in attach options
IcebergAuthorization::ParseExtraHttpHeaders(entry.second, result->extra_http_headers);
} else {
remaining_options.emplace(std::move(entry));
}
Expand Down Expand Up @@ -114,6 +118,12 @@ AWSInput SIGV4Authorization::CreateAWSInput(ClientContext &context, const IRCEnd
unique_ptr<HTTPResponse> SIGV4Authorization::Request(RequestType request_type, ClientContext &context,
const IRCEndpointBuilder &endpoint_builder, HTTPHeaders &headers,
const string &data) {
// Note: For SIGV4, custom headers should be added BEFORE signing so they're included in the signature
// Merge extra HTTP headers first
for (auto &entry : extra_http_headers) {
headers.Insert(entry.first, entry.second);
}

auto aws_input = CreateAWSInput(context, endpoint_builder);
return aws_input.Request(request_type, context, client, headers, data);
}
Expand Down
26 changes: 26 additions & 0 deletions src/storage/iceberg_authorization.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "storage/iceberg_authorization.hpp"
#include "api_utils.hpp"
#include "storage/authorization/oauth2.hpp"
#include "duckdb/common/types/value.hpp"

namespace duckdb {

Expand All @@ -24,4 +25,29 @@ IcebergAuthorizationType IcebergAuthorization::TypeFromString(const string &type
StringUtil::Join(accepted_options, ", "));
}

void IcebergAuthorization::ParseExtraHttpHeaders(const Value &headers_value,
unordered_map<string, string> &out_headers) {
if (headers_value.IsNull() || headers_value.type().id() != LogicalTypeId::MAP) {
return;
}

// MAP is internally a LIST<STRUCT(key, value)>
// Each entry in the list is a STRUCT with exactly two fields: key and value
auto &map_entries = MapValue::GetChildren(headers_value);

for (const auto &entry : map_entries) {
if (entry.type().id() != LogicalTypeId::STRUCT) {
continue;
}

auto &struct_children = StructValue::GetChildren(entry);
if (struct_children.size() != 2) {
continue;
}

// struct_children[0] = key, struct_children[1] = value
out_headers[struct_children[0].ToString()] = struct_children[1].ToString();
}
}

} // namespace duckdb
Loading