diff --git a/go/pkg/kv/doc.go b/go/pkg/kv/doc.go deleted file mode 100644 index 10969bdec1..0000000000 --- a/go/pkg/kv/doc.go +++ /dev/null @@ -1,37 +0,0 @@ -// Package kv provides a key-value store abstraction with TTL support and workspace isolation. -// -// The package defines a Store interface that can be implemented by different backends. -// Currently, a MySQL-based implementation is provided in the stores/mysql subpackage. -// -// Key features: -// - Automatic TTL expiration on read operations -// - Workspace-based isolation -// - Cursor-based pagination for listing operations -// - Primary/read-replica database connection support -// - Simple key-value model optimized for performance -// -// Example usage: -// -// import ( -// "github.com/unkeyed/unkey/go/pkg/kv" -// "github.com/unkeyed/unkey/go/pkg/kv/stores/mysql" -// ) -// -// store, err := mysql.NewStore(mysql.Config{ -// PrimaryDSN: "user:pass@tcp(localhost:3306)/db?parseTime=true", -// Logger: logger, -// }) -// if err != nil { -// // handle error -// } -// -// // Set a key with TTL -// ttl := 5 * time.Minute -// err = store.Set(ctx, "user:123", "workspace1", []byte("data"), &ttl) -// -// // Get a key -// data, found, err := store.Get(ctx, "user:123") -// -// // List keys by workspace with cursor pagination -// entries, err := store.ListByWorkspace(ctx, "workspace1", 0, 10) -package kv diff --git a/go/pkg/kv/store.go b/go/pkg/kv/store.go deleted file mode 100644 index 356bb0f924..0000000000 --- a/go/pkg/kv/store.go +++ /dev/null @@ -1,24 +0,0 @@ -package kv - -import ( - "context" - "time" -) - -// Store defines the interface for a key-value store with TTL support -type Store interface { - Get(ctx context.Context, key string) ([]byte, bool, error) - Set(ctx context.Context, key string, workspaceID string, value []byte, ttl *time.Duration) error - Delete(ctx context.Context, key string) error - ListByWorkspace(ctx context.Context, workspaceID string, cursor int64, limit int) ([]KvEntry, error) -} - -// KvEntry represents a key-value entry in the store -type KvEntry struct { - ID int64 - Key string - WorkspaceID string - Value []byte - TTL *int64 - CreatedAt int64 -} diff --git a/go/pkg/kv/stores/mysql/db.go b/go/pkg/kv/stores/mysql/db.go deleted file mode 100644 index 03ec3423c3..0000000000 --- a/go/pkg/kv/stores/mysql/db.go +++ /dev/null @@ -1,31 +0,0 @@ -// Code generated by sqlc. DO NOT EDIT. -// versions: -// sqlc v1.28.0 - -package mysql - -import ( - "context" - "database/sql" -) - -type DBTX interface { - ExecContext(context.Context, string, ...interface{}) (sql.Result, error) - PrepareContext(context.Context, string) (*sql.Stmt, error) - QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) - QueryRowContext(context.Context, string, ...interface{}) *sql.Row -} - -func New(db DBTX) *Queries { - return &Queries{db: db} -} - -type Queries struct { - db DBTX -} - -func (q *Queries) WithTx(tx *sql.Tx) *Queries { - return &Queries{ - db: tx, - } -} diff --git a/go/pkg/kv/stores/mysql/delete.sql.go b/go/pkg/kv/stores/mysql/delete.sql.go deleted file mode 100644 index a907385bea..0000000000 --- a/go/pkg/kv/stores/mysql/delete.sql.go +++ /dev/null @@ -1,19 +0,0 @@ -// Code generated by sqlc. DO NOT EDIT. -// versions: -// sqlc v1.28.0 -// source: delete.sql - -package mysql - -import ( - "context" -) - -const delete = `-- name: Delete :exec -DELETE FROM kv WHERE ` + "`" + `key` + "`" + ` = ? -` - -func (q *Queries) Delete(ctx context.Context, key string) error { - _, err := q.db.ExecContext(ctx, delete, key) - return err -} diff --git a/go/pkg/kv/stores/mysql/delete_expired.sql.go b/go/pkg/kv/stores/mysql/delete_expired.sql.go deleted file mode 100644 index 0229f04753..0000000000 --- a/go/pkg/kv/stores/mysql/delete_expired.sql.go +++ /dev/null @@ -1,25 +0,0 @@ -// Code generated by sqlc. DO NOT EDIT. -// versions: -// sqlc v1.28.0 -// source: delete_expired.sql - -package mysql - -import ( - "context" - "database/sql" -) - -const deleteExpired = `-- name: DeleteExpired :exec -DELETE FROM kv WHERE ` + "`" + `key` + "`" + ` = ? AND ttl IS NOT NULL AND ttl <= ? -` - -type DeleteExpiredParams struct { - Key string - Ttl sql.NullInt64 -} - -func (q *Queries) DeleteExpired(ctx context.Context, arg DeleteExpiredParams) error { - _, err := q.db.ExecContext(ctx, deleteExpired, arg.Key, arg.Ttl) - return err -} diff --git a/go/pkg/kv/stores/mysql/doc.go b/go/pkg/kv/stores/mysql/doc.go deleted file mode 100644 index d339b61132..0000000000 --- a/go/pkg/kv/stores/mysql/doc.go +++ /dev/null @@ -1,28 +0,0 @@ -// Package mysql provides a MySQL-backed implementation of the kv.Store interface. -// -// This implementation uses sqlc-generated code for type-safe database operations -// and supports both primary and read-replica database connections for optimal -// performance in production environments. -// -// The store automatically handles: -// - TTL expiration by deleting expired keys on read -// - Cursor-based pagination using created_at timestamps -// - Connection routing (reads to replica, writes to primary) -// - Auto-incrementing primary keys for efficient storage -// -// Database schema (inspired by GitHub's approach): -// -// CREATE TABLE kv ( -// id BIGINT(20) NOT NULL AUTO_INCREMENT, -// `key` VARCHAR(255) NOT NULL, -// workspace_id VARCHAR(255) NOT NULL, -// value BLOB NOT NULL, -// ttl BIGINT NULL, -// created_at BIGINT NOT NULL, -// -// PRIMARY KEY (id), -// UNIQUE KEY unique_key (`key`), -// INDEX idx_workspace_id (workspace_id), -// INDEX idx_ttl (ttl) -// ); -package mysql diff --git a/go/pkg/kv/stores/mysql/get.sql.go b/go/pkg/kv/stores/mysql/get.sql.go deleted file mode 100644 index e36a0c9be3..0000000000 --- a/go/pkg/kv/stores/mysql/get.sql.go +++ /dev/null @@ -1,35 +0,0 @@ -// Code generated by sqlc. DO NOT EDIT. -// versions: -// sqlc v1.28.0 -// source: get.sql - -package mysql - -import ( - "context" - "database/sql" -) - -const get = `-- name: Get :one -SELECT id, workspace_id, ` + "`" + `key` + "`" + `, value, ttl, created_at FROM kv -WHERE ` + "`" + `key` + "`" + ` = ? AND (ttl IS NULL OR ttl > ?) -` - -type GetParams struct { - Key string - Ttl sql.NullInt64 -} - -func (q *Queries) Get(ctx context.Context, arg GetParams) (Kv, error) { - row := q.db.QueryRowContext(ctx, get, arg.Key, arg.Ttl) - var i Kv - err := row.Scan( - &i.ID, - &i.WorkspaceID, - &i.Key, - &i.Value, - &i.Ttl, - &i.CreatedAt, - ) - return i, err -} diff --git a/go/pkg/kv/stores/mysql/list_by_workspace.sql.go b/go/pkg/kv/stores/mysql/list_by_workspace.sql.go deleted file mode 100644 index 1c1ad699bd..0000000000 --- a/go/pkg/kv/stores/mysql/list_by_workspace.sql.go +++ /dev/null @@ -1,62 +0,0 @@ -// Code generated by sqlc. DO NOT EDIT. -// versions: -// sqlc v1.28.0 -// source: list_by_workspace.sql - -package mysql - -import ( - "context" - "database/sql" -) - -const listByWorkspace = `-- name: ListByWorkspace :many -SELECT id, workspace_id, ` + "`" + `key` + "`" + `, value, ttl, created_at FROM kv -WHERE workspace_id = ? -AND (ttl IS NULL OR ttl > ?) -AND id > ? -ORDER BY id ASC -LIMIT ? -` - -type ListByWorkspaceParams struct { - WorkspaceID string - Ttl sql.NullInt64 - ID int64 - Limit int32 -} - -func (q *Queries) ListByWorkspace(ctx context.Context, arg ListByWorkspaceParams) ([]Kv, error) { - rows, err := q.db.QueryContext(ctx, listByWorkspace, - arg.WorkspaceID, - arg.Ttl, - arg.ID, - arg.Limit, - ) - if err != nil { - return nil, err - } - defer rows.Close() - var items []Kv - for rows.Next() { - var i Kv - if err := rows.Scan( - &i.ID, - &i.WorkspaceID, - &i.Key, - &i.Value, - &i.Ttl, - &i.CreatedAt, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} diff --git a/go/pkg/kv/stores/mysql/models.go b/go/pkg/kv/stores/mysql/models.go deleted file mode 100644 index ff7954c3ff..0000000000 --- a/go/pkg/kv/stores/mysql/models.go +++ /dev/null @@ -1,18 +0,0 @@ -// Code generated by sqlc. DO NOT EDIT. -// versions: -// sqlc v1.28.0 - -package mysql - -import ( - "database/sql" -) - -type Kv struct { - ID int64 - WorkspaceID string - Key string - Value []byte - Ttl sql.NullInt64 - CreatedAt int64 -} diff --git a/go/pkg/kv/stores/mysql/queries/delete.sql b/go/pkg/kv/stores/mysql/queries/delete.sql deleted file mode 100644 index 0887f03d4b..0000000000 --- a/go/pkg/kv/stores/mysql/queries/delete.sql +++ /dev/null @@ -1,2 +0,0 @@ --- name: Delete :exec -DELETE FROM kv WHERE `key` = ?; \ No newline at end of file diff --git a/go/pkg/kv/stores/mysql/queries/delete_expired.sql b/go/pkg/kv/stores/mysql/queries/delete_expired.sql deleted file mode 100644 index 9c681a23d2..0000000000 --- a/go/pkg/kv/stores/mysql/queries/delete_expired.sql +++ /dev/null @@ -1,2 +0,0 @@ --- name: DeleteExpired :exec -DELETE FROM kv WHERE `key` = ? AND ttl IS NOT NULL AND ttl <= ?; \ No newline at end of file diff --git a/go/pkg/kv/stores/mysql/queries/get.sql b/go/pkg/kv/stores/mysql/queries/get.sql deleted file mode 100644 index fe0c680b18..0000000000 --- a/go/pkg/kv/stores/mysql/queries/get.sql +++ /dev/null @@ -1,3 +0,0 @@ --- name: Get :one -SELECT * FROM kv -WHERE `key` = ? AND (ttl IS NULL OR ttl > ?); \ No newline at end of file diff --git a/go/pkg/kv/stores/mysql/queries/list_by_workspace.sql b/go/pkg/kv/stores/mysql/queries/list_by_workspace.sql deleted file mode 100644 index 295c50e3a7..0000000000 --- a/go/pkg/kv/stores/mysql/queries/list_by_workspace.sql +++ /dev/null @@ -1,7 +0,0 @@ --- name: ListByWorkspace :many -SELECT * FROM kv -WHERE workspace_id = ? -AND (ttl IS NULL OR ttl > ?) -AND id > ? -ORDER BY id ASC -LIMIT ?; \ No newline at end of file diff --git a/go/pkg/kv/stores/mysql/queries/set.sql b/go/pkg/kv/stores/mysql/queries/set.sql deleted file mode 100644 index 733a7b121d..0000000000 --- a/go/pkg/kv/stores/mysql/queries/set.sql +++ /dev/null @@ -1,7 +0,0 @@ --- name: Set :exec -INSERT INTO kv (`key`, workspace_id, value, ttl, created_at) -VALUES (?, ?, ?, ?, ?) -ON DUPLICATE KEY UPDATE - value = VALUES(value), - ttl = VALUES(ttl), - created_at = VALUES(created_at); \ No newline at end of file diff --git a/go/pkg/kv/stores/mysql/schema.sql b/go/pkg/kv/stores/mysql/schema.sql deleted file mode 100644 index cf8e5f1c74..0000000000 --- a/go/pkg/kv/stores/mysql/schema.sql +++ /dev/null @@ -1,13 +0,0 @@ -CREATE TABLE kv ( - id BIGINT(20) NOT NULL AUTO_INCREMENT, - workspace_id VARCHAR(255) NOT NULL, - `key` VARCHAR(255) NOT NULL, - value BLOB NOT NULL, - ttl BIGINT NULL, - created_at BIGINT NOT NULL, - - PRIMARY KEY (id), - UNIQUE KEY unique_key (`key`), - INDEX idx_workspace_id (workspace_id), - INDEX idx_ttl (ttl) -); diff --git a/go/pkg/kv/stores/mysql/set.sql.go b/go/pkg/kv/stores/mysql/set.sql.go deleted file mode 100644 index a58a489437..0000000000 --- a/go/pkg/kv/stores/mysql/set.sql.go +++ /dev/null @@ -1,39 +0,0 @@ -// Code generated by sqlc. DO NOT EDIT. -// versions: -// sqlc v1.28.0 -// source: set.sql - -package mysql - -import ( - "context" - "database/sql" -) - -const set = `-- name: Set :exec -INSERT INTO kv (` + "`" + `key` + "`" + `, workspace_id, value, ttl, created_at) -VALUES (?, ?, ?, ?, ?) -ON DUPLICATE KEY UPDATE - value = VALUES(value), - ttl = VALUES(ttl), - created_at = VALUES(created_at) -` - -type SetParams struct { - Key string - WorkspaceID string - Value []byte - Ttl sql.NullInt64 - CreatedAt int64 -} - -func (q *Queries) Set(ctx context.Context, arg SetParams) error { - _, err := q.db.ExecContext(ctx, set, - arg.Key, - arg.WorkspaceID, - arg.Value, - arg.Ttl, - arg.CreatedAt, - ) - return err -} diff --git a/go/pkg/kv/stores/mysql/sqlc.json b/go/pkg/kv/stores/mysql/sqlc.json deleted file mode 100644 index 07dffd8cb0..0000000000 --- a/go/pkg/kv/stores/mysql/sqlc.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "version": "2", - "sql": [ - { - "engine": "mysql", - "queries": "queries/", - "schema": "schema.sql", - "gen": { - "go": { - "package": "mysql", - "out": "." - } - } - } - ] -} diff --git a/go/pkg/kv/stores/mysql/store.go b/go/pkg/kv/stores/mysql/store.go deleted file mode 100644 index a1774a0ae8..0000000000 --- a/go/pkg/kv/stores/mysql/store.go +++ /dev/null @@ -1,213 +0,0 @@ -package mysql - -import ( - "context" - "database/sql" - "fmt" - "strings" - "time" - - _ "github.com/go-sql-driver/mysql" - - "github.com/unkeyed/unkey/go/pkg/fault" - "github.com/unkeyed/unkey/go/pkg/kv" - "github.com/unkeyed/unkey/go/pkg/otel/logging" - "github.com/unkeyed/unkey/go/pkg/retry" -) - -// Config defines the parameters needed to establish database connections. -type Config struct { - // The primary DSN for your database. This must support both reads and writes. - PrimaryDSN string - - // The readonly replica will be used for most read queries. - // If omitted, the primary is used. - ReadOnlyDSN string - - // Logger for database-related operations - Logger logging.Logger -} - -// Store implements the kv.Store interface using MySQL -type Store struct { - primary *sql.DB - readonly *sql.DB - queries *Queries - logger logging.Logger -} - -func open(dsn string, logger logging.Logger) (db *sql.DB, err error) { - if !strings.Contains(dsn, "parseTime=true") { - return nil, fault.New("DSN must contain parseTime=true") - } - - err = retry.New( - retry.Attempts(3), - retry.Backoff(func(n int) time.Duration { - return time.Duration(n) * time.Second - }), - ).Do(func() error { - db, err = sql.Open("mysql", dsn) - if err != nil { - logger.Info("mysql not ready yet, retrying...", "error", err.Error()) - } - return err - }) - - return db, err -} - -// NewStore creates a new MySQL-backed KV store -func NewStore(config Config) (kv.Store, error) { - primary, err := open(config.PrimaryDSN, config.Logger) - if err != nil { - return nil, fault.Wrap(err, fault.Internal("cannot open primary database")) - } - - readonly := primary // Default to primary for reads - if config.ReadOnlyDSN != "" { - readonly, err = open(config.ReadOnlyDSN, config.Logger) - if err != nil { - return nil, fault.Wrap(err, fault.Internal("cannot open readonly database")) - } - config.Logger.Info("kv store configured with separate read replica") - } else { - config.Logger.Info("kv store configured without separate read replica, using primary for reads") - } - - return &Store{ - primary: primary, - readonly: readonly, - queries: New(primary), - logger: config.Logger, - }, nil -} - -func (s *Store) Get(ctx context.Context, key string) ([]byte, bool, error) { - now := time.Now().UnixMilli() - - // Use readonly connection for Get operations - queries := New(s.readonly) - row, err := queries.Get(ctx, GetParams{ - Key: key, - Ttl: sql.NullInt64{Int64: now, Valid: true}, - }) - - if err != nil { - if err == sql.ErrNoRows { - return nil, false, nil - } - return nil, false, fmt.Errorf("failed to get key %s: %w", key, err) - } - - // Check if TTL is expired and delete if so - if row.Ttl.Valid && row.Ttl.Int64 <= now { - // Delete the expired key using primary connection - err = s.queries.DeleteExpired(ctx, DeleteExpiredParams{ - Key: key, - Ttl: sql.NullInt64{Int64: now, Valid: true}, - }) - if err != nil { - s.logger.Warn("failed to delete expired key", "key", key, "error", err.Error()) - } - return nil, false, nil - } - - return row.Value, true, nil -} - -func (s *Store) Set(ctx context.Context, key string, workspaceID string, value []byte, ttl *time.Duration) error { - now := time.Now().UnixMilli() - - var ttlValue sql.NullInt64 - if ttl != nil { - ttlMs := now + ttl.Milliseconds() - ttlValue = sql.NullInt64{Int64: ttlMs, Valid: true} - } - - err := s.queries.Set(ctx, SetParams{ - Key: key, - WorkspaceID: workspaceID, - Value: value, - Ttl: ttlValue, - CreatedAt: now, - }) - - if err != nil { - return fmt.Errorf("failed to set key %s: %w", key, err) - } - - return nil -} - -func (s *Store) Delete(ctx context.Context, key string) error { - err := s.queries.Delete(ctx, key) - if err != nil { - return fmt.Errorf("failed to delete key %s: %w", key, err) - } - return nil -} - -func (s *Store) ListByWorkspace(ctx context.Context, workspaceID string, cursor int64, limit int) ([]kv.KvEntry, error) { - now := time.Now().UnixMilli() - - // cursor = 0 means start from the beginning (oldest records first) - - // Use readonly connection for List operations - queries := New(s.readonly) - rows, err := queries.ListByWorkspace(ctx, ListByWorkspaceParams{ - WorkspaceID: workspaceID, - Ttl: sql.NullInt64{Int64: now, Valid: true}, - ID: cursor, - Limit: int32(limit), - }) - - if err != nil { - return nil, fmt.Errorf("failed to list by workspace: %w", err) - } - - return s.convertRows(rows) -} - -func (s *Store) convertRows(rows []Kv) ([]kv.KvEntry, error) { - entries := make([]kv.KvEntry, len(rows)) - - for i, row := range rows { - var ttl *int64 - if row.Ttl.Valid { - ttl = &row.Ttl.Int64 - } - - entries[i] = kv.KvEntry{ - ID: row.ID, - Key: row.Key, - WorkspaceID: row.WorkspaceID, - Value: row.Value, - TTL: ttl, - CreatedAt: row.CreatedAt, - } - } - - return entries, nil -} - -// Close closes the database connections -func (s *Store) Close() error { - var errs []error - - if err := s.primary.Close(); err != nil { - errs = append(errs, fmt.Errorf("failed to close primary connection: %w", err)) - } - - if s.readonly != s.primary { - if err := s.readonly.Close(); err != nil { - errs = append(errs, fmt.Errorf("failed to close readonly connection: %w", err)) - } - } - - if len(errs) > 0 { - return fmt.Errorf("errors closing connections: %v", errs) - } - - return nil -}