Skip to content

Commit

Permalink
Work with Gerbil v0.18-35-g137852c7
Browse files Browse the repository at this point in the history
Add postgres support.
  • Loading branch information
fare committed Nov 29, 2023
1 parent 05836d6 commit 8a5e40d
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 59 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
manifest.ss
version.ss
build-deps
run/
2 changes: 1 addition & 1 deletion build.ss
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
;; gxpkg install github.com/fare/gerbil-crypto
;; gxpkg install github.com/fare/gerbil-poo

(import :std/misc/process :clan/building :clan/multicall)
(import :std/cli/multicall :std/misc/process :clan/building)
(init-build-environment!
name: "Gerbil-persist"
deps: '("clan" "clan/poo" "clan/crypto"))
Expand Down
22 changes: 11 additions & 11 deletions db-queue.ss
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,24 @@
:clan/poo/object :clan/poo/mop :clan/poo/io :clan/poo/number :clan/poo/type
./db)

(def PairNatNat (Pair Nat Nat)) ;; used to represent start and length of queue
(def PairUIntUInt (Pair UInt UInt)) ;; used to represent start and length of queue

;; : Bytes <- Bytes Nat
;; : Bytes <- Bytes UInt
(def (db-indexed-key db-key index)
(u8vector-append db-key (bytes<- Nat index)))
(u8vector-append db-key (bytes<- UInt index)))

;; A DB Queue
(defstruct DbQueue
(mx ;; : Mutex
key ;; : Bytes ;; db-key
start ;; : Nat ;; Next index to dequeue
length ;; : Nat ;; Number of items in the queue. end = start + length
start ;; : UInt ;; Next index to dequeue
length ;; : UInt ;; Number of items in the queue. end = start + length
manager)) ;; : Thread ;; or should we have a condition variable instead?

;; Assumes we already have a lock of the queue object, and the tx is open
;; : <- DbQueue TX
(def (%DbQueue-update q tx)
(db-put! (DbQueue-key q) (bytes<- PairNatNat (cons (DbQueue-start q) (DbQueue-length q))) tx))
(db-put! (DbQueue-key q) (bytes<- PairUIntUInt (cons (DbQueue-start q) (DbQueue-length q))) tx))

;; Internal: wake up the manager of a queue that isn't empty anymore.
;; : <- DbQueue Any
Expand Down Expand Up @@ -78,9 +78,9 @@
(zero? (DbQueue-length q)))

;; Get the state of a DbQueue from the database, given its db-key
;; : (Pair Nat Nat) <- Bytes TX
;; : (Pair UInt UInt) <- Bytes TX
(def (DbQueue-state db-key tx)
(cond ((db-get db-key tx) => (cut <-bytes PairNatNat <>))
(cond ((db-get db-key tx) => (cut <-bytes PairUIntUInt <>))
(else '(0 . 0)))) ;; owl of you

;; Restore a DbQueue from its persisted state, or start a new one if none is present.
Expand All @@ -105,8 +105,8 @@

;; DB Committed Queue: only dequeue things that were fully committed
(defstruct (DbCommittedQueue DbQueue)
(committed-end ;; : Nat ;; Next index to not dequeue yet
pending)) ;; : (Dequeue (Tuple Nat Completion Nat)) ;; dequeue of batch-id, batch-completion, end
(committed-end ;; : UInt ;; Next index to not dequeue yet
pending)) ;; : (Dequeue (Tuple UInt Completion UInt)) ;; dequeue of batch-id, batch-completion, end

(def (%DbCommittedQueue-update-pending q tx) ;; the end was increased, so add to pending
(def c (DbTransaction-connection tx))
Expand Down Expand Up @@ -141,7 +141,7 @@
(>= (DbQueue-start q) (DbCommittedQueue-committed-end q)))

;; Restore a DbQueue from its persisted state, or start a new one if none is present.
;; : DbCommittedQueue <- Any Bytes (<- Nat Bytes TX)
;; : DbCommittedQueue <- Any Bytes (<- UInt Bytes TX)
(def (DbCommittedQueue-restore name db-key processor)
(def q (match (with-tx (tx) (DbQueue-state db-key tx))
([start . length]
Expand Down
35 changes: 35 additions & 0 deletions kvs-postgres.ss
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
;;;; Key Value Store Interface for Postgres

(import
:std/db/dbi
:std/db/postgresql
:std/db/postgresql-driver
:std/iter
:std/misc/path
:std/sugar
:clan/path-config
:clan/persist/kvs
:clan/persist/kvs-sql)

(defstruct (KvsPostgres KvsSql)
(begin-tx-stmt commit-tx-stmt abort-tx-stmt
read-stmt write-stmt delete-stmt)
constructor: :init!)


(defmethod {:init! KvsPostgres}
(lambda (self . args)
(def connection (apply sql-connect args))
(sql-eval connection (string-append
"CREATE TABLE IF NOT EXISTS kvs ( "
"key BLOB, "
"value BLOB NOT NULL, "
"PRIMARY KEY (key)) ;"))
(struct-instance-init!
self connection
(sql-prepare connection "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ WRITE")
(sql-prepare connection "COMMIT TRANSACTION")
(sql-prepare connection "ROLLBACK TRANSACTION")
(sql-prepare connection "SELECT value FROM kvs WHERE key = ?")
(sql-prepare connection "INSERT INTO kvs (key, value) VALUES (?, ?) ON CONFLICT DO UPDATE SET value = excluded.value")
(sql-prepare connection "DELETE FROM kvs WHERE key = ?"))))
46 changes: 46 additions & 0 deletions kvs-sql.ss
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
;;;; Key Value Store for SQL in general (to be specialized by SQLite, PostgreSQL, etc.)

(import
:std/db/dbi
:std/iter
:std/misc/path
:std/sugar
:clan/path-config
:clan/persist/kvs)

(export #t)

(defstruct (KvsSql Kvs)
(begin-tx-stmt commit-tx-stmt abort-tx-stmt
read-stmt write-stmt delete-stmt)
constructor: :init!)

(defmethod {:init! KvsSql}
(lambda (self connection begin-tx-stmt commit-tx-stmt abort-tx-stmt read-stmt write-stmt delete-stmt)
(struct-instance-init! self connection begin-tx-stmt commit-tx-stmt abort-tx-stmt read-stmt write-stmt delete-stmt)))

(defmethod {begin-transaction KvsSql} (lambda (self) (sql-exec (KvsSql-begin-tx-stmt self))))
(defmethod {abort-transaction KvsSql} (lambda (self) (sql-exec (KvsSql-abort-tx-stmt self))))
(defmethod {commit-transaction KvsSql} (lambda (self) (sql-exec (KvsSql-commit-tx-stmt self))))

(defrule (with-statement (var stmt args ...) body ...)
(let ((var stmt))
(try {bind var args ...} body ...
(finally (sql-reset/clear stmt)))))

(defmethod {read-key KvsSql}
(lambda (K key)
(with-statement (s (KvsSql-read-stmt K) key)
(match {query-fetch s}
((eq? #!void) (values {query-row s} #t))
((eq? iter-end) (values #f #f))))))

(defmethod {write-key KvsSql}
(lambda (K k v)
(with-statement (s (KvsSql-write-stmt K) k v)
{exec s})))

(defmethod {delete-key KvsSql}
(lambda (K k)
(with-statement (s (KvsSql-delete-stmt K) k)
{exec s})))
40 changes: 3 additions & 37 deletions kvs-sqlite.ss
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
;;;; Key Value Store Interface
;;;; Key Value Store Interface for sqlite

(import
:std/db/dbi
Expand All @@ -8,45 +8,11 @@
:std/misc/path
:std/sugar
:clan/path-config
:clan/persist/kvs)
:clan/persist/kvs
:clan/persist/kvs-sql)

(export #t)

(defstruct (KvsSql Kvs)
(begin-tx-stmt commit-tx-stmt abort-tx-stmt
read-stmt write-stmt delete-stmt)
constructor: :init!)

(defmethod {:init! KvsSql}
(lambda (self connection begin-tx-stmt commit-tx-stmt abort-tx-stmt read-stmt write-stmt delete-stmt)
(struct-instance-init! self connection begin-tx-stmt commit-tx-stmt abort-tx-stmt read-stmt write-stmt delete-stmt)))

(defmethod {begin-transaction KvsSql} (lambda (self) (sql-exec (KvsSql-begin-tx-stmt self))))
(defmethod {abort-transaction KvsSql} (lambda (self) (sql-exec (KvsSql-abort-tx-stmt self))))
(defmethod {commit-transaction KvsSql} (lambda (self) (sql-exec (KvsSql-commit-tx-stmt self))))

(defrule (with-statement (var stmt args ...) body ...)
(let ((var stmt))
(try {bind var args ...} body ...
(finally (sql-reset/clear stmt)))))

(defmethod {read-key KvsSql}
(lambda (K key)
(with-statement (s (KvsSql-read-stmt K) key)
(match {query-fetch s}
((eq? #!void) (values {query-row s} #t))
((eq? iter-end) (values #f #f))))))

(defmethod {write-key KvsSql}
(lambda (K k v)
(with-statement (s (KvsSql-write-stmt K) k v)
{exec s})))

(defmethod {delete-key KvsSql}
(lambda (K k)
(with-statement (s (KvsSql-delete-stmt K) k)
{exec s})))

(defstruct (KvsSqlite KvsSql)
(begin-tx-stmt commit-tx-stmt abort-tx-stmt
read-stmt write-stmt delete-stmt)
Expand Down
2 changes: 1 addition & 1 deletion merkle-trie.ss
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
(raise-type-error "Digest doesn't match: " D trie-digest D digest up)))))
;; TODO: support negative proofs
(_ (raise-type-error "No leaf" sub up)))))
(def (MerkleTrie Key: (Key Nat) Height: (Height Nat)
(def (MerkleTrie Key: (Key UInt) Height: (Height UInt)
Value: (Value Any) Digesting: (.digesting keccak-addressing))
{(:: @ [MerkleTrie.]) Key Height Value .digesting
sexp: `(MerkleTrie Key: ,(.@ Key sexp) Height: ,(.@ Height sexp) Value: ,(.@ Value sexp)
Expand Down
10 changes: 5 additions & 5 deletions t/db-test.ss
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@
(def test-val (random-integer 1000000000))
(def key (string->bytes "test-key1"))
(with-db-connection (c-path "testdb")
(with-committed-tx (tx) (db-put! key (nat->u8vector test-val) tx))
(check-equal? (u8vector->nat (with-tx (tx) (db-get key tx))) test-val)
(with-committed-tx (tx) (db-put! key (nat->u8vector (1+ test-val)) tx))
(check-equal? (u8vector->nat (with-tx (tx) (db-get key tx))) (1+ test-val))))
(with-committed-tx (tx) (db-put! key (uint->u8vector test-val) tx))
(check-equal? (u8vector->uint (with-tx (tx) (db-get key tx))) test-val)
(with-committed-tx (tx) (db-put! key (uint->u8vector (1+ test-val)) tx))
(check-equal? (u8vector->uint (with-tx (tx) (db-get key tx))) (1+ test-val))))
(test-case "test-trivial-testdb-commits"
(def test-base (* 65536 (random-integer 65536)))
(def n-workers 200)
(def failures [])
(def (test-commit i)
(def key (nat->u8vector (+ i 65536)))
(def key (uint->u8vector (+ i 65536)))
(def val (string->bytes (number->string (+ test-base i))))
(try
(match (current-db-transaction)
Expand Down
4 changes: 3 additions & 1 deletion t/kvs-sqlite-test.ss
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
(export kvs-sqlite-test)

(import
:std/sugar
:std/test
:clan/base :clan/path-config
:clan/base
:clan/path-config
:clan/persist/kvs-sqlite
./kvs-test)

Expand Down
9 changes: 6 additions & 3 deletions t/persist-test.ss
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
(test-suite "test suite for persist/persist"
(test-case "define-persistent-variable"
(define-persistent-variable my-string String "my-string" "foo")
(define-persistent-variable my-num UInt256 "my-num" 42)
(with-db-connection (c "testdb")
;; Delete the persistent variables, in case the testdb is dirty
(with-tx (tx)
Expand All @@ -18,15 +19,17 @@
(check (my-string) => "foo")
(set! (my-string) "bar")
(check (my-string) => "bar")
(define-persistent-variable my-num UInt256 "my-num" 42)
(check (my-num) => 42)
(set! (my-num) 69)
(check (my-num) => 69))
(with-db-connection (c "testdb")
(check (my-string) => "bar")
(set! (my-string) "baz")
(check (my-string) => "baz")
(define-persistent-variable my-num UInt256 "my-num" 42)
(check (my-num) => 69)
(set! (my-num) 100)
(check (my-num) => 100)))))
(check (my-num) => 100))
;; Reset for next tests
(with-db-connection (c "testdb")
(set! (my-string) "foo")
(set! (my-num) 42)))))

0 comments on commit 8a5e40d

Please sign in to comment.