Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for qdrant foreign data wrapper #156

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions docs/qdrant.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
Qdrant foreign data wrapper allows you to read data from the [Qdrant vector database](https://qdrant.tech/).

## Preparation

Before you get started, make sure the `wrappers` extension is installed on your database:

```sql
create extension if not exists wrappers;
```

and then create the foreign data wrapper:

```sql
create foreign data wrapper qdrant_wrapper
handler qdrant_fdw_handler
validator qdrant_fdw_validator;
```

### Secure your credentials (optional)

By default, Postgres stores FDW credentials inide `pg_catalog.pg_foreign_server` in plain text. Anyone with access to this table will be able to view these credentials. Wrappers is designed to work with [Vault](https://supabase.com/docs/guides/database/vault), which provides an additional level of security for storing credentials. We recommend using Vault to store your credentials.

```sql
-- Save your Qdrant API key in Vault and retrieve the `key_id`
insert into vault.secrets (name, secret)
values (
'qdrant_api_key',
'YOUR_SECRET'
)
returning key_id;
```

### Connecting to Qdrant

We need to provide Postgres with the credentials to connect to Qdrant, and any additional options. We can do this using the `create server` command:

=== "With Vault"

```sql
create server qdrant_server
foreign data wrapper qdrant_wrapper
options (
cluster_url '<Qdrant cluster URL>', -- Qdrant cluster url, required
api_key_id '<key_ID>' -- The Key ID from above.
);
```

=== "Without Vault"

```sql
create server qdrant_server
foreign data wrapper qdrant_wrapper
options (
cluster_url '<Qdrant cluster URL>', -- Qdrant cluster url, required
api_key '<Qdrant API Key>' -- Qdrant API key, required
);
```

!!! note
The `cluster_url` URL must contain the port. The default port is 6333.

## Creating Foreign Tables

The Qdrant Wrapper supports data reads from Qdrant's [Scroll Points](https://qdrant.github.io/qdrant/redoc/index.html#tag/points/operation/scroll_points) endpoint (*read only*).

| Qdrant | Select | Insert | Update | Delete | Truncate |
| ----------- | :----: | :----: | :----: | :----: | :----: |
| Records | :white_check_mark:| :x: | :x: | :x: | :x: |

For example:

```sql
create foreign table my_foreign_table (
id bigint,
payload jsonb,
vector real[]
)
server qdrant_server
options (
collection_name '<collection name>'
);
```

!!! note
The names and types of the columns in the foreign table should match exactly as shown in the example above.

### Foreign table options

The full list of foreign table options are below:

- `collection_name` - Qdrant collection to fetch data from, required.

## Examples

Some examples on how to use Qdrant foreign tables.

### Basic example

This will create a "foreign table" inside your Postgres database called `qdrant_table`:

```sql
create foreign table qdrant_table (
id bigint,
payload jsonb,
vector real[]
)
server qdrant_server
options (
collection_name 'test_collection',
);
```

You can now fetch your Qdrant data from within your Postgres database:

```sql
select * from qdrant_table;
```
1 change: 1 addition & 0 deletions mkdocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ nav:
- Logflare: 'logflare.md'
- S3: 's3.md'
- Stripe: 'stripe.md'
- Qdrant: 'qdrant.md'
- Contributing: 'contributing.md'

theme:
Expand Down
7 changes: 7 additions & 0 deletions supabase-wrappers/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub enum Cell {
Date(Date),
Timestamp(Timestamp),
Json(JsonB),
RealsArray(Vec<f32>),
}

impl Clone for Cell {
Expand All @@ -62,6 +63,7 @@ impl Clone for Cell {
Cell::Date(v) => Cell::Date(*v),
Cell::Timestamp(v) => Cell::Timestamp(*v),
Cell::Json(v) => Cell::Json(JsonB(v.0.clone())),
Cell::RealsArray(v) => Cell::RealsArray(v.clone()),
}
}
}
Expand Down Expand Up @@ -95,6 +97,7 @@ impl fmt::Display for Cell {
write!(f, "'{}'", ts_cstr.to_str().unwrap())
},
Cell::Json(v) => write!(f, "{:?}", v),
Cell::RealsArray(v) => write!(f, "{:?}", v.as_slice()),
}
}
}
Expand All @@ -114,6 +117,7 @@ impl IntoDatum for Cell {
Cell::Date(v) => v.into_datum(),
Cell::Timestamp(v) => v.into_datum(),
Cell::Json(v) => v.into_datum(),
Cell::RealsArray(v) => v.into_datum(),
}
}

Expand Down Expand Up @@ -184,6 +188,9 @@ impl FromDatum for Cell {
PgOid::BuiltIn(PgBuiltInOids::JSONBOID) => {
Some(Cell::Json(JsonB::from_datum(datum, false).unwrap()))
}
PgOid::BuiltIn(PgBuiltInOids::FLOAT4ARRAYOID) => Some(Cell::RealsArray(
Vec::<f32>::from_datum(datum, false).unwrap(),
)),
_ => None,
}
}
Expand Down
11 changes: 11 additions & 0 deletions wrappers/.ci/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,14 @@ services:
timeout: 5s
retries: 3

qdrant:
image: qdrant/qdrant
container_name: qdrant
ports:
- "6333:6333" # http
- "6334:6334" # grpc
healthcheck:
test: curl --fail http://127.0.0.1:6333/readyz || exit 1
interval: 10s
timeout: 5s
retries: 3
5 changes: 3 additions & 2 deletions wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pg13 = ["pgrx/pg13", "pgrx-tests/pg13", "supabase-wrappers/pg13" ]
pg14 = ["pgrx/pg14", "pgrx-tests/pg14", "supabase-wrappers/pg14" ]
pg15 = ["pgrx/pg15", "pgrx-tests/pg15", "supabase-wrappers/pg15" ]
pg16 = ["pgrx/pg16", "pgrx-tests/pg16", "supabase-wrappers/pg16" ]
pg_test = []
pg_test = ["reqwest/blocking"]

helloworld_fdw = []
bigquery_fdw = ["gcp-bigquery-client", "serde_json", "serde", "wiremock", "futures", "yup-oauth2", "thiserror"]
Expand All @@ -29,9 +29,10 @@ s3_fdw = [
]
airtable_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "serde", "url", "thiserror"]
logflare_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror", "url"]
qdrant_fdw = ["reqwest", "reqwest-middleware", "reqwest-retry", "serde_json", "thiserror", "http", "tokio", "url", "serde", "serde_json"]

# Does not include helloworld_fdw because of its general uselessness
all_fdws = ["airtable_fdw", "bigquery_fdw", "clickhouse_fdw", "stripe_fdw", "firebase_fdw", "s3_fdw", "logflare_fdw"]
all_fdws = ["airtable_fdw", "bigquery_fdw", "clickhouse_fdw", "stripe_fdw", "firebase_fdw", "s3_fdw", "logflare_fdw", "qdrant_fdw"]

[dependencies]
pgrx = { version = "=0.10.2" }
Expand Down
1 change: 0 additions & 1 deletion wrappers/src/fdw/airtable_fdw/airtable_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::collections::HashMap;
use url::Url;

use supabase_wrappers::prelude::*;
use thiserror::Error;

use super::result::AirtableResponse;
use super::{AirtableFdwError, AirtableFdwResult};
Expand Down
1 change: 1 addition & 0 deletions wrappers/src/fdw/bigquery_fdw/bigquery_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ impl ForeignDataWrapper<BigQueryFdwError> for BigQueryFdw {
Cell::Date(v) => row_json[col_name] = json!(v),
Cell::Timestamp(v) => row_json[col_name] = json!(v),
Cell::Json(v) => row_json[col_name] = json!(v),
Cell::RealsArray(v) => row_json[col_name] = json!(v),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions wrappers/src/fdw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ mod s3_fdw;

#[cfg(feature = "logflare_fdw")]
mod logflare_fdw;

#[cfg(feature = "qdrant_fdw")]
mod qdrant_fdw;
14 changes: 14 additions & 0 deletions wrappers/src/fdw/qdrant_fdw/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Qdrant Foreign Data Wrapper

This is a foreign data wrapper for [Qdrant](https://qdrant.tech). It is developed using [Wrappers](https://github.com/supabase/wrappers) and only supports data read from [Scroll Points](https://qdrant.github.io/qdrant/redoc/index.html#tag/points/operation/scroll_points) endpoint.

## Documentation

[https://supabase.github.io/wrappers/qdrant/](https://supabase.github.io/wrappers/qdrant/)


## Changelog

| Version | Date | Notes |
| ------- | ---------- | ---------------------------------------------------- |
| 0.1.16 | 2023-09-26 | Initial version |
5 changes: 5 additions & 0 deletions wrappers/src/fdw/qdrant_fdw/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#![allow(clippy::module_inception)]

mod qdrant_client;
mod qdrant_fdw;
mod tests;
121 changes: 121 additions & 0 deletions wrappers/src/fdw/qdrant_fdw/qdrant_client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use crate::fdw::qdrant_fdw::qdrant_client::points::{
PointsRequestBuilder, PointsResponse, PointsResponseError, ResultPayload,
};
use http::{HeaderMap, HeaderName, HeaderValue};
use pgrx::pg_sys::panic::ErrorReport;
use pgrx::PgSqlErrorCode;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::policies::ExponentialBackoff;
use reqwest_retry::RetryTransientMiddleware;
use supabase_wrappers::prelude::*;
use thiserror::Error;
use tokio::runtime::Runtime;
use url::{ParseError, Url};

pub(crate) mod points;
mod row;
pub(crate) mod rows_iterator;

pub(crate) struct QdrantClient {
cluster_url: Url,
client: ClientWithMiddleware,
runtime: Runtime,
}

impl QdrantClient {
pub(crate) fn new(cluster_url: &str, api_key: &str) -> Result<Self, QdrantClientError> {
Ok(Self {
cluster_url: Url::parse(cluster_url)?,
client: Self::create_client(api_key)?,
runtime: create_async_runtime()?,
})
}

pub(crate) fn fetch_points(
&self,
collection_name: &str,
fetch_payload: bool,
fetch_vector: bool,
limit: Option<u64>,
offset: Option<u64>,
) -> Result<ResultPayload, QdrantClientError> {
let endpoint_url = Self::create_points_endpoint_url(&self.cluster_url, collection_name)?;
self.runtime.block_on(async {
let request = PointsRequestBuilder::new()
.fetch_payload(fetch_payload)
.fetch_vector(fetch_vector)
.limit(limit)
.offset(offset)
.build();
let response = self.client.post(endpoint_url).json(&request).send().await?;
let response = response.error_for_status()?;
let points_response = response.json::<PointsResponse>().await?;
let points = points_response.get_points_result()?;
Ok(points)
})
}

fn create_points_endpoint_url(
cluster_url: &Url,
collection_name: &str,
) -> Result<Url, QdrantClientError> {
// TODO: url encode collection_name
Ok(cluster_url.join(&format!("collections/{collection_name}/points/scroll"))?)
}

fn create_client(api_key: &str) -> Result<ClientWithMiddleware, QdrantClientError> {
let mut headers = HeaderMap::new();

let header_name = HeaderName::from_static("api-key");
let mut api_key_value =
HeaderValue::from_str(api_key).map_err(|_| QdrantClientError::InvalidApiKeyHeader)?;
api_key_value.set_sensitive(true);
headers.insert(header_name, api_key_value);

let client = reqwest::Client::builder()
.default_headers(headers)
.build()?;

let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);

Ok(ClientBuilder::new(client)
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build())
}
}

#[derive(Error, Debug)]
pub(crate) enum QdrantClientError {
#[error("{0}")]
CreateRuntimeError(#[from] CreateRuntimeError),

#[error("failed to parse cluster_url: {0}")]
UrlParseError(#[from] ParseError),

#[error("invalid api_key header")]
InvalidApiKeyHeader,

#[error("reqwest error: {0}")]
ReqwestError(#[from] reqwest::Error),

#[error("reqwest middleware error: {0}")]
ReqwestMiddlewareError(#[from] reqwest_middleware::Error),

#[error("{0}")]
PointsResponseError(#[from] PointsResponseError),
}

impl From<QdrantClientError> for ErrorReport {
fn from(value: QdrantClientError) -> Self {
match value {
QdrantClientError::CreateRuntimeError(e) => e.into(),
QdrantClientError::UrlParseError(_)
| QdrantClientError::InvalidApiKeyHeader
| QdrantClientError::ReqwestError(_)
| QdrantClientError::ReqwestMiddlewareError(_)
| QdrantClientError::PointsResponseError(_) => {
ErrorReport::new(PgSqlErrorCode::ERRCODE_FDW_ERROR, format!("{value}"), "")
}
}
}
}
Loading
Loading