From 9376574b0f58e0cc0407ed21acc6c4e6a6775d3d Mon Sep 17 00:00:00 2001 From: Brian McGee Date: Thu, 12 Oct 2023 11:22:30 +0100 Subject: [PATCH] pathinfo (#1) * feat: implement path info Signed-off-by: Brian McGee --- .github/workflows/ci.yaml | 6 +- README.md | 7 +- go.mod | 29 ++++-- go.sum | 60 +++++++---- internal/cli/store/run.go | 17 ++- nix/dev/tvix.nix | 3 +- nix/packages.nix | 2 +- pkg/blob/grpc.go | 16 +-- pkg/blob/grpc_test.go | 18 ++-- pkg/directory/grpc.go | 78 ++++++-------- pkg/pathinfo/config.go | 93 +++++++++++++++++ pkg/pathinfo/grpc.go | 210 ++++++++++++++++++++++++++++++++++++++ pkg/store/cache.go | 6 ++ pkg/store/cdc.go | 49 +++++++++ pkg/store/cdc_test.go | 53 ++++++++++ pkg/store/nats.go | 65 +++++++++++- pkg/store/types.go | 3 + pkg/util/iterator.go | 6 ++ 18 files changed, 616 insertions(+), 105 deletions(-) create mode 100644 pkg/pathinfo/config.go create mode 100644 pkg/pathinfo/grpc.go create mode 100644 pkg/util/iterator.go diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 5437c3f..99502b6 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -21,8 +21,8 @@ jobs: uses: actions/setup-go@v4 with: go-version: ${{ matrix.go }} - - name: go test -race -v ./... - run: go test -race -v ./... + - name: go test -v ./... + run: go test -v ./... coverage: runs-on: 'ubuntu-latest' @@ -34,7 +34,7 @@ jobs: with: go-version: '1.20' - name: Run coverage - run: go test -race -covermode=atomic -coverprofile=coverage.out -v ./... + run: go test -covermode=atomic -coverprofile=coverage.out -v ./... - name: Convert coverage.out to coverage.lcov uses: jandelgado/gcov2lcov-action@v1 - name: Coveralls diff --git a/README.md b/README.md index a673d15..ba81fca 100644 --- a/README.md +++ b/README.md @@ -12,10 +12,11 @@ It's primary focus is implementing [TVIX](https://cs.tvl.fyi/depot/-/tree/tvix) ## Roadmap -- [ ] Store +- [x] Store - [x] Blob Service - - [ ] Path Info Service - - [ ] Directory Service + - [x] Path Info Service + - [x] Directory Service +- [ ] Improve test coverage ## Requirements diff --git a/go.mod b/go.mod index fdeb109..cf7e11a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,8 @@ module github.com/brianmcgee/nvix go 1.20 require ( - code.tvl.fyi/tvix/castore/protos v0.0.0-20230922125121-72355662d742 + code.tvl.fyi/tvix/castore/protos v0.0.0-20231009220507-d6e0c5ab9bb7 + code.tvl.fyi/tvix/store/protos v0.0.0-20231010202247-8ec3506856aa github.com/SaveTheRbtz/fastcdc-go v0.3.0 github.com/alecthomas/kong v0.8.0 github.com/charmbracelet/log v0.2.5 @@ -12,14 +13,16 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf github.com/juju/errors v1.0.0 - github.com/nats-io/nats-server/v2 v2.9.22 + github.com/multiformats/go-multihash v0.2.1 + github.com/nats-io/nats-server/v2 v2.10.2 github.com/nats-io/nats.go v1.30.2 github.com/nats-io/nuid v1.0.1 + github.com/nix-community/go-nix v0.0.0-20231009143713-ebca3299475b github.com/prometheus/client_golang v1.17.0 github.com/stretchr/testify v1.8.4 github.com/ztrue/shutdown v0.1.1 - golang.org/x/sync v0.3.0 - google.golang.org/grpc v1.58.2 + golang.org/x/sync v0.4.0 + google.golang.org/grpc v1.58.3 google.golang.org/protobuf v1.31.0 lukechampine.com/blake3 v1.2.1 ) @@ -28,7 +31,7 @@ require ( github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/charmbracelet/lipgloss v0.8.0 // indirect + github.com/charmbracelet/lipgloss v0.9.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logfmt/logfmt v0.6.0 // indirect github.com/klauspost/compress v1.17.0 // indirect @@ -38,21 +41,25 @@ require ( github.com/mattn/go-runewidth v0.0.15 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/minio/highwayhash v1.0.2 // indirect + github.com/minio/sha256-simd v1.0.0 // indirect + github.com/mr-tron/base58 v1.2.0 // indirect github.com/muesli/reflow v0.3.0 // indirect github.com/muesli/termenv v0.15.2 // indirect + github.com/multiformats/go-varint v0.0.6 // indirect github.com/nats-io/jwt/v2 v2.5.2 // indirect github.com/nats-io/nkeys v0.4.5 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect + github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.44.0 // indirect - github.com/prometheus/procfs v0.11.1 // indirect + github.com/prometheus/procfs v0.12.0 // indirect github.com/rivo/uniseg v0.4.4 // indirect - golang.org/x/crypto v0.13.0 // indirect - golang.org/x/net v0.15.0 // indirect - golang.org/x/sys v0.12.0 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect + golang.org/x/crypto v0.14.0 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect golang.org/x/time v0.3.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 3686ec2..36d3e17 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ -code.tvl.fyi/tvix/castore/protos v0.0.0-20230922125121-72355662d742 h1:x7LsxggggaN3acnCMNDO5LZLAV+A+rZ+R8TXzr+Lgsk= -code.tvl.fyi/tvix/castore/protos v0.0.0-20230922125121-72355662d742/go.mod h1:Ejhyvc0dJUWQMxtJxddfFuAF5N8IKIO94q5CP4czY8Y= +code.tvl.fyi/tvix/castore/protos v0.0.0-20231009220507-d6e0c5ab9bb7 h1:gX2LWo/QHwGZK2QsDap9Lx1GrKLPX6mfgeNbGK3mwrU= +code.tvl.fyi/tvix/castore/protos v0.0.0-20231009220507-d6e0c5ab9bb7/go.mod h1:hj0y8RPthqn1QPj8u2jFe2vzH7NouUoclrwo1/CSbuc= +code.tvl.fyi/tvix/store/protos v0.0.0-20231010202247-8ec3506856aa h1:FwMglyEtXebo5rXgtLkxs+y11UVxAekGboFotdbfHcg= +code.tvl.fyi/tvix/store/protos v0.0.0-20231010202247-8ec3506856aa/go.mod h1:RmijF3bfElwtZpNkBtW66QEj/jldGNu+W2HlgZro7lw= github.com/SaveTheRbtz/fastcdc-go v0.3.0 h1:JdHvLlnijDuisYIwpRDcHZEjbxvCqtEmJ3gf35VJBgA= github.com/SaveTheRbtz/fastcdc-go v0.3.0/go.mod h1:2kMKqvBv1h9wCaUfETqsVkSESsCiFhp4YyEHyz7/SfE= github.com/alecthomas/assert/v2 v2.1.0 h1:tbredtNcQnoSd3QBhQWI7QZ3XHOVkw1Moklp2ojoH/0= @@ -12,8 +14,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/charmbracelet/lipgloss v0.8.0 h1:IS00fk4XAHcf8uZKc3eHeMUTCxUH6NkaTrdyCQk84RU= -github.com/charmbracelet/lipgloss v0.8.0/go.mod h1:p4eYUZZJ/0oXTuCQKFF8mqyKCz0ja6y+7DniDDw5KKU= +github.com/charmbracelet/lipgloss v0.9.0 h1:BHIM7U4vX77xGEld8GrTKspBMtSv7j0wxPCH73nrdxE= +github.com/charmbracelet/lipgloss v0.9.0/go.mod h1:h8KDyaivONasw1Bhb4nWiKlk4P1wHPly+3+3v6EFMmA= github.com/charmbracelet/log v0.2.5 h1:1yVvyKCKVV639RR4LIq1iy1Cs1AKxuNO+Hx2LJtk7Wc= github.com/charmbracelet/log v0.2.5/go.mod h1:nQGK8tvc4pS9cvVEH/pWJiZ50eUq1aoXUOjGpXvdD0k= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -37,6 +39,7 @@ github.com/juju/errors v1.0.0 h1:yiq7kjCLll1BiaRuNY53MGI0+EQ3rF6GB+wvboZDefM= github.com/juju/errors v1.0.0/go.mod h1:B5x9thDqx0wIMH3+aLIMP9HjItInYWObRovoCFM5Qe8= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -52,60 +55,73 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zk github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= +github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= +github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= +github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/muesli/reflow v0.3.0 h1:IFsN6K9NfGtjeggFP+68I4chLZV2yIKsXJFNZ+eWh6s= github.com/muesli/reflow v0.3.0/go.mod h1:pbwTDkVPibjO2kyvBQRBxTWEEGDGq0FlB1BIKtnHY/8= github.com/muesli/termenv v0.15.2 h1:GohcuySI0QmI3wN8Ok9PtKGkgkFIk7y6Vpb5PvrY+Wo= github.com/muesli/termenv v0.15.2/go.mod h1:Epx+iuz8sNs7mNKhxzH4fWXGNpZwUaJKRS1noLXviQ8= +github.com/multiformats/go-multihash v0.2.1 h1:aem8ZT0VA2nCHHk7bPJ1BjUbHNciqZC/d16Vve9l108= +github.com/multiformats/go-multihash v0.2.1/go.mod h1:WxoMcYG85AZVQUyRyo9s4wULvW5qrI9vb2Lt6evduFc= +github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2W/KhfNY= +github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats-server/v2 v2.9.22 h1:rzl88pqWFFrU4G00ed+JnY+uGHSLZ+3jrxDnJxzKwGA= -github.com/nats-io/nats-server/v2 v2.9.22/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq7My2IgFmnykc4C0= +github.com/nats-io/nats-server/v2 v2.10.2 h1:2o/OOyc/dxeMCQtrF1V/9er0SU0A3LKhDlv/+rqreBM= +github.com/nats-io/nats-server/v2 v2.10.2/go.mod h1:lzrskZ/4gyMAh+/66cCd+q74c6v7muBypzfWhP/MAaM= github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY= github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM= github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/nix-community/go-nix v0.0.0-20231009143713-ebca3299475b h1:AWEKOdDO3JnHApQDOmONEKLXbMCQJhYJJfJpiWB9VGI= +github.com/nix-community/go-nix v0.0.0-20231009143713-ebca3299475b/go.mod h1:hHM9UK2zOCjvmiLgeaW4LVbOW/vBaRWFJGzfi31/slQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= -github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM= -github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= -github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= -github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/ztrue/shutdown v0.1.1 h1:GKR2ye2OSQlq1GNVE/s2NbrIMsFdmL+NdR6z6t1k+Tg= github.com/ztrue/shutdown v0.1.1/go.mod h1:hcMWcM2SwIsQk7Wb49aYme4tX66x6iLzs07w1OYAQLw= -golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= -golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= -golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= -golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 h1:6GQBEOdGkX6MMTLT9V+TjtIRZCw9VPD5Z+yHY9wMgS0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97/go.mod h1:v7nGkzlmW8P3n/bKmWBn2WpBjpOEx8Q6gMueudAmKfY= -google.golang.org/grpc v1.58.2 h1:SXUpjxeVF3FKrTYQI4f4KvbGD5u2xccdYdurwowix5I= -google.golang.org/grpc v1.58.2/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c h1:jHkCUWkseRf+W+edG5hMzr/Uh1xkDREY4caybAq4dpY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c/go.mod h1:4cYg8o5yUbm77w8ZX00LhMVNl/YVBFJRYWDc0uYWMs0= +google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ= +google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= diff --git a/internal/cli/store/run.go b/internal/cli/store/run.go index df93b62..f46c620 100644 --- a/internal/cli/store/run.go +++ b/internal/cli/store/run.go @@ -6,6 +6,9 @@ import ( "runtime/debug" "syscall" + tvpb "code.tvl.fyi/tvix/store/protos" + "github.com/brianmcgee/nvix/pkg/pathinfo" + "github.com/brianmcgee/nvix/pkg/directory" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery" @@ -46,16 +49,21 @@ func (r *Run) Run() error { log.Fatalf("failed to connect to nats: %v", err) } - blobService, err := blob.NewService(conn) + blobServer, err := blob.NewServer(conn) if err != nil { log.Fatalf("failed to create blob service: %v", err) } - directoryService, err := directory.NewService(conn) + directoryServer, err := directory.NewServer(conn) if err != nil { log.Fatalf("failed to create directory service: %v", err) } + pathInfoServer, err := pathinfo.NewServer(conn, blobServer, directoryServer) + if err != nil { + log.Fatalf("failed to create path info service: %v", err) + } + // setup metrics srvMetrics := grpcprom.NewServerMetrics( grpcprom.WithServerHandlingTimeHistogram( @@ -95,8 +103,9 @@ func (r *Run) Run() error { } grpcServer := grpc.NewServer(opts...) - pb.RegisterBlobServiceServer(grpcServer, blobService) - pb.RegisterDirectoryServiceServer(grpcServer, directoryService) + pb.RegisterBlobServiceServer(grpcServer, blobServer) + pb.RegisterDirectoryServiceServer(grpcServer, directoryServer) + tvpb.RegisterPathInfoServiceServer(grpcServer, pathInfoServer) srvMetrics.InitializeMetrics(grpcServer) diff --git a/nix/dev/tvix.nix b/nix/dev/tvix.nix index b55dfed..26c7997 100644 --- a/nix/dev/tvix.nix +++ b/nix/dev/tvix.nix @@ -21,12 +21,11 @@ } { name = "PATH_INFO_SERVICE_ADDR"; - eval = "sled://$TVIX_HOME/store/path-info"; + value = "grpc+http://localhost:5000"; } { name = "DIRECTORY_SERVICE_ADDR"; value = "grpc+http://localhost:5000"; - # eval = "sled://$TVIX_HOME/store/directory"; } { name = "TVIX_MOUNT_DIR"; diff --git a/nix/packages.nix b/nix/packages.nix index 76ebcf5..7238acc 100644 --- a/nix/packages.nix +++ b/nix/packages.nix @@ -15,7 +15,7 @@ version = "0.0.1+dev"; src = ../.; - vendorSha256 = "sha256-43ufDyfbkjtV6T5yM9Xm29Xb2au/34w5FNiTZu/n2WQ="; + vendorSha256 = "sha256-BswbommN6CfUiWk/DyP0LPUm3HmOyXtfZsSAAVMx5NY="; ldflags = [ "-X 'build.Name=${pname}'" diff --git a/pkg/blob/grpc.go b/pkg/blob/grpc.go index 1c1ee6c..4216669 100644 --- a/pkg/blob/grpc.go +++ b/pkg/blob/grpc.go @@ -14,7 +14,7 @@ import ( "google.golang.org/grpc/status" ) -func NewService(conn *nats.Conn) (capb.BlobServiceServer, error) { +func NewServer(conn *nats.Conn) (*Server, error) { js, err := conn.JetStream() if err != nil { return nil, errors.Annotate(err, "failed to create a JetStream context") @@ -28,7 +28,7 @@ func NewService(conn *nats.Conn) (capb.BlobServiceServer, error) { return nil, errors.Annotate(err, "failed to create memory based stream") } - return &service{ + return &Server{ conn: conn, store: &store.CdcStore{ Meta: NewMetaStore(conn), @@ -37,14 +37,14 @@ func NewService(conn *nats.Conn) (capb.BlobServiceServer, error) { }, nil } -type service struct { +type Server struct { capb.UnimplementedBlobServiceServer conn *nats.Conn store *store.CdcStore } -func (s *service) Stat(ctx context.Context, request *capb.StatBlobRequest) (*capb.BlobMeta, error) { +func (s *Server) Stat(ctx context.Context, request *capb.StatBlobRequest) (*capb.BlobMeta, error) { l := log.WithPrefix("blob.stat") l.Debug("executing", "digest", store.Digest(request.GetDigest())) @@ -60,7 +60,7 @@ func (s *service) Stat(ctx context.Context, request *capb.StatBlobRequest) (*cap return &capb.BlobMeta{}, nil } -func (s *service) Read(request *capb.ReadBlobRequest, server capb.BlobService_ReadServer) error { +func (s *Server) Read(request *capb.ReadBlobRequest, server capb.BlobService_ReadServer) error { l := log.WithPrefix("blob.read") ctx, cancel := context.WithCancel(server.Context()) @@ -100,7 +100,11 @@ func (s *service) Read(request *capb.ReadBlobRequest, server capb.BlobService_Re return nil } -func (s *service) Put(server capb.BlobService_PutServer) (err error) { +func (s *Server) GetByDigest(digest store.Digest, ctx context.Context) (io.ReadCloser, error) { + return s.store.Get(digest, ctx) +} + +func (s *Server) Put(server capb.BlobService_PutServer) (err error) { l := log.WithPrefix("blob.put") ctx, cancel := context.WithCancel(server.Context()) diff --git a/pkg/blob/grpc_test.go b/pkg/blob/grpc_test.go index 6556079..61d3761 100644 --- a/pkg/blob/grpc_test.go +++ b/pkg/blob/grpc_test.go @@ -3,8 +3,8 @@ package blob import ( "bytes" "context" + "crypto/rand" "io" - "math/rand" "net" "testing" @@ -43,7 +43,7 @@ var sizes = []bytesize.ByteSize{ func blobServer(s *server.Server, t test.TestingT) (*grpc.Server, net.Listener) { t.Helper() - blobService, err := NewService(test.NatsConn(t, s)) + blobService, err := NewServer(test.NatsConn(t, s)) if err != nil { t.Fatalf("failed to create blob service: %v", err) } @@ -83,9 +83,11 @@ func BenchmarkBlobService_Put(b *testing.B) { b.ResetTimer() b.RunParallel(func(p *testing.PB) { - rng := rand.New(rand.NewSource(1)) data := make([]byte, size) - rng.Read(data) + _, err := rand.Read(data) + if err != nil { + b.Fatal(err) + } r := bytes.NewReader(data) @@ -133,9 +135,11 @@ func BenchmarkBlobService_Read(b *testing.B) { for _, size := range sizes { size := size - rng := rand.New(rand.NewSource(1)) data := make([]byte, size) - rng.Read(data) + _, err := rand.Read(data) + if err != nil { + b.Fatal(err) + } r := bytes.NewReader(data) @@ -210,7 +214,7 @@ func TestBlobService_Put(t *testing.T) { conn := test.GrpcConn(lis, t) client := pb.NewBlobServiceClient(conn) - payload := make([]byte, 100*1024*1024) + payload := make([]byte, 16*1024*1024) _, err := rand.Read(payload) if err != nil { t.Fatalf("failed to generate random bytes: %v", err) diff --git a/pkg/directory/grpc.go b/pkg/directory/grpc.go index 2bbab1e..46bf950 100644 --- a/pkg/directory/grpc.go +++ b/pkg/directory/grpc.go @@ -18,7 +18,7 @@ import ( "github.com/nats-io/nats.go" ) -func NewService(conn *nats.Conn) (capb.DirectoryServiceServer, error) { +func NewServer(conn *nats.Conn) (*Server, error) { js, err := conn.JetStream() if err != nil { return nil, errors.Annotate(err, "failed to create a JetStream context") @@ -32,24 +32,19 @@ func NewService(conn *nats.Conn) (capb.DirectoryServiceServer, error) { return nil, errors.Annotate(err, "failed to create memory based stream") } - return &service{ + return &Server{ conn: conn, store: NewDirectoryStore(conn), }, nil } -type service struct { +type Server struct { capb.UnimplementedDirectoryServiceServer conn *nats.Conn store store.Store } -// Get retrieves a stream of Directory messages, by using the lookup -// parameters in GetDirectoryRequest. -// Keep in mind multiple DirectoryNodes in different parts of the graph might -// have the same digest if they have the same underlying contents, -// so sending subsequent ones can be omitted. -func (s *service) Get(req *capb.GetDirectoryRequest, server capb.DirectoryService_GetServer) error { +func (s *Server) Get(req *capb.GetDirectoryRequest, server capb.DirectoryService_GetServer) error { l := log.WithPrefix("directory.get") rootDigest := store.Digest(req.GetDigest()) @@ -58,32 +53,9 @@ func (s *service) Get(req *capb.GetDirectoryRequest, server capb.DirectoryServic ctx, cancel := context.WithCancel(server.Context()) defer cancel() - fetch := func(digest store.Digest) (*capb.Directory, error) { - reader, err := s.store.Get(digest.String(), ctx) - if err != nil { - l.Errorf("failure: %v", err) - return nil, status.Errorf(codes.NotFound, "digest not found: %v", digest) - } - defer func() { - _ = reader.Close() - }() - - b, err := io.ReadAll(reader) - if err != nil { - l.Errorf("failure: %v", err) - return nil, status.Error(codes.Internal, "failed to read directory entry from store") - } - var dir capb.Directory - if err = proto.Unmarshal(b, &dir); err != nil { - l.Errorf("failure: %v", err) - return nil, status.Error(codes.Internal, "failed to unmarshal directory entry from store") - } - return &dir, nil - } - // todo handle get by what - rootDirectory, err := fetch(rootDigest) + rootDirectory, err := s.GetByDigest(rootDigest, ctx) if err != nil { l.Errorf("failure: %v", err) return status.Errorf(codes.NotFound, "directory not found: %v", rootDigest) @@ -94,7 +66,7 @@ func (s *service) Get(req *capb.GetDirectoryRequest, server capb.DirectoryServic iterateDirs := func(directory *capb.Directory) error { for _, dir := range directory.Directories { digest := store.Digest(dir.Digest) - if d, err := fetch(digest); err != nil { + if d, err := s.GetByDigest(digest, ctx); err != nil { return err } else if req.Recursive { dirs = append(dirs, d) @@ -118,16 +90,32 @@ func (s *service) Get(req *capb.GetDirectoryRequest, server capb.DirectoryServic return nil } -// Put uploads a graph of Directory messages. -// Individual Directory messages need to be send in an order walking up -// from the leaves to the root - a Directory message can only refer to -// Directory messages previously sent in the same stream. -// Keep in mind multiple DirectoryNodes in different parts of the graph might -// have the same digest if they have the same underlying contents, -// so sending subsequent ones can be omitted. -// We might add a separate method, allowing to send partial graphs at a later -// time, if requiring to send the full graph turns out to be a problem. -func (s *service) Put(server capb.DirectoryService_PutServer) error { +func (s *Server) GetByDigest(digest store.Digest, ctx context.Context) (*capb.Directory, error) { + l := log.WithPrefix("directory.getByDigest") + + reader, err := s.store.Get(digest.String(), ctx) + if err != nil { + l.Errorf("failure: %v", err) + return nil, status.Errorf(codes.NotFound, "digest not found: %v", digest) + } + defer func() { + _ = reader.Close() + }() + + b, err := io.ReadAll(reader) + if err != nil { + l.Errorf("failure: %v", err) + return nil, status.Error(codes.Internal, "failed to read directory entry from store") + } + var dir capb.Directory + if err = proto.Unmarshal(b, &dir); err != nil { + l.Errorf("failure: %v", err) + return nil, status.Error(codes.Internal, "failed to unmarshal directory entry from store") + } + return &dir, nil +} + +func (s *Server) Put(server capb.DirectoryService_PutServer) error { l := log.WithPrefix("directory.put") ctx, cancel := context.WithCancel(server.Context()) @@ -204,6 +192,6 @@ func (s *service) Put(server capb.DirectoryService_PutServer) error { l.Error("failed to send put response", "err", err) } - l.Debug("finished") + l.Debug("finished", "digest", store.Digest(rootDigest).String()) return nil } diff --git a/pkg/pathinfo/config.go b/pkg/pathinfo/config.go new file mode 100644 index 0000000..7929fc3 --- /dev/null +++ b/pkg/pathinfo/config.go @@ -0,0 +1,93 @@ +package pathinfo + +import ( + "github.com/brianmcgee/nvix/pkg/store" + "github.com/brianmcgee/nvix/pkg/subject" + "github.com/nats-io/nats.go" +) + +var ( + DiskBasedStreamConfig = nats.StreamConfig{ + Name: "path_info_store", + Subjects: []string{ + subject.WithPrefix("STORE.PATH_INFO.*"), + subject.WithPrefix("STORE.PATH_INFO_OUT_IDX.*"), + }, + Replicas: 1, + Discard: nats.DiscardOld, + MaxMsgsPerSubject: 1, + Storage: nats.FileStorage, + AllowRollup: true, + AllowDirect: true, + Compression: nats.S2Compression, + // automatically publish into the cache topic + RePublish: &nats.RePublish{ + Source: subject.WithPrefix("STORE.*.*"), + Destination: subject.WithPrefix("CACHE.{{wildcard(1)}}.{{wildcard(2)}}"), + }, + } + + MemoryBasedStreamConfig = nats.StreamConfig{ + Name: "path_info_cache", + Subjects: []string{ + subject.WithPrefix("CACHE.PATH_INFO.*"), + subject.WithPrefix("CACHE.PATH_INFO_OUT_IDX.*"), + }, + Replicas: 1, + Discard: nats.DiscardOld, + MaxMsgsPerSubject: 1, + Storage: nats.MemoryStorage, + AllowRollup: true, + AllowDirect: true, + } +) + +func NewPathInfoStore(conn *nats.Conn) store.Store { + diskPrefix := DiskBasedStreamConfig.Subjects[0] + diskPrefix = diskPrefix[:len(diskPrefix)-2] + + memoryPrefix := MemoryBasedStreamConfig.Subjects[0] + memoryPrefix = memoryPrefix[:len(memoryPrefix)-2] + + disk := &store.NatsStore{ + Conn: conn, + StreamConfig: &DiskBasedStreamConfig, + SubjectPrefix: diskPrefix, + } + + memory := &store.NatsStore{ + Conn: conn, + StreamConfig: &MemoryBasedStreamConfig, + SubjectPrefix: memoryPrefix, + } + + return &store.CachingStore{ + Disk: disk, + Memory: memory, + } +} + +func NewPathInfoOutIdxStore(conn *nats.Conn) store.Store { + diskPrefix := DiskBasedStreamConfig.Subjects[1] + diskPrefix = diskPrefix[:len(diskPrefix)-2] + + memoryPrefix := MemoryBasedStreamConfig.Subjects[1] + memoryPrefix = memoryPrefix[:len(memoryPrefix)-2] + + disk := &store.NatsStore{ + Conn: conn, + StreamConfig: &DiskBasedStreamConfig, + SubjectPrefix: diskPrefix, + } + + memory := &store.NatsStore{ + Conn: conn, + StreamConfig: &MemoryBasedStreamConfig, + SubjectPrefix: memoryPrefix, + } + + return &store.CachingStore{ + Disk: disk, + Memory: memory, + } +} diff --git a/pkg/pathinfo/grpc.go b/pkg/pathinfo/grpc.go new file mode 100644 index 0000000..594cff0 --- /dev/null +++ b/pkg/pathinfo/grpc.go @@ -0,0 +1,210 @@ +package pathinfo + +import ( + "bytes" + "context" + "io" + + capb "code.tvl.fyi/tvix/castore/protos" + tvpb "code.tvl.fyi/tvix/store/protos" + + "github.com/brianmcgee/nvix/pkg/blob" + "github.com/brianmcgee/nvix/pkg/directory" + "github.com/brianmcgee/nvix/pkg/store" + "github.com/charmbracelet/log" + "github.com/golang/protobuf/proto" + "github.com/juju/errors" + multihash "github.com/multiformats/go-multihash/core" + "github.com/nats-io/nats.go" + "github.com/nix-community/go-nix/pkg/hash" + "github.com/nix-community/go-nix/pkg/nixbase32" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func NewServer(conn *nats.Conn, blob *blob.Server, directory *directory.Server) (*Service, error) { + js, err := conn.JetStream() + if err != nil { + return nil, errors.Annotate(err, "failed to create a JetStream context") + } + + if _, err := js.AddStream(&DiskBasedStreamConfig); err != nil { + return nil, errors.Annotate(err, "failed to create disk based stream") + } + + if _, err := js.AddStream(&MemoryBasedStreamConfig); err != nil { + return nil, errors.Annotate(err, "failed to create memory based stream") + } + + return &Service{ + conn: conn, + store: NewPathInfoStore(conn), + outIdx: NewPathInfoOutIdxStore(conn), + blob: blob, + directory: directory, + }, nil +} + +type Service struct { + tvpb.UnimplementedPathInfoServiceServer + conn *nats.Conn + store store.Store + outIdx store.Store + blob *blob.Server + directory *directory.Server +} + +func pathInfoDigest(node *capb.Node) (*store.Digest, error) { + var digest store.Digest + if node.GetDirectory() != nil { + digest = store.Digest(node.GetDirectory().Digest) + // todo check directory exists + } else if node.GetFile() != nil { + digest = store.Digest(node.GetFile().Digest) + } else if node.GetSymlink() != nil { + digest = store.Digest(node.GetSymlink().Target) + } else { + return nil, status.Error(codes.Internal, "unexpected node type") + } + + return &digest, nil +} + +func (s *Service) Get(ctx context.Context, req *tvpb.GetPathInfoRequest) (*tvpb.PathInfo, error) { + // only supporting get by output hash + l := log.WithPrefix("path_info.get") + + outHash := nixbase32.EncodeToString(req.GetByOutputHash()) + l.Debug("executing", "outHash", outHash) + + reader, err := s.outIdx.Get(outHash, ctx) + if err != nil { + return nil, err + } + defer func() { + _ = reader.Close() + }() + + digest, err := io.ReadAll(reader) + if err != nil { + return nil, err + } + + reader, err = s.store.Get(store.Digest(digest).String(), ctx) + if err != nil { + return nil, status.Error(codes.Internal, "failed to get path info") + } + + b, err := io.ReadAll(reader) + if err != nil { + return nil, status.Error(codes.Internal, "failed to read path info") + } + + var pathInfo tvpb.PathInfo + if err = proto.Unmarshal(b, &pathInfo); err != nil { + return nil, status.Error(codes.Internal, "failed to unmarshal path info") + } + + return &pathInfo, nil +} + +func (s *Service) Put(ctx context.Context, pathInfo *tvpb.PathInfo) (*tvpb.PathInfo, error) { + digest, err := pathInfoDigest(pathInfo.Node) + if err != nil { + return nil, err + } + + l := log.WithPrefix("path_info.put") + l.Debug("executing", "digest", digest.String()) + + b, err := proto.Marshal(pathInfo) + if err != nil { + return nil, status.Error(codes.Internal, "failed to marshal path info") + } + + err = s.store.Put(digest.String(), io.NopCloser(bytes.NewReader(b)), ctx) + if err != nil { + return nil, status.Error(codes.Internal, "failed to put path info") + } + + err = s.outIdx.Put( + nixbase32.EncodeToString(pathInfo.Narinfo.NarSha256), + io.NopCloser(bytes.NewReader(digest[:])), + ctx, + ) + + // no signatures to add so just return the same path info + return pathInfo, err +} + +func (s *Service) CalculateNAR(ctx context.Context, node *capb.Node) (*tvpb.CalculateNARResponse, error) { + r, w := io.Pipe() + + go func() { + err := tvpb.Export(w, node, + func(digest []byte) (*capb.Directory, error) { + return s.directory.GetByDigest(store.Digest(digest), ctx) + }, func(digest []byte) (io.ReadCloser, error) { + return s.blob.GetByDigest(store.Digest(digest), ctx) + }) + _ = w.CloseWithError(err) + }() + + hasher, err := hash.New(multihash.SHA2_256) + if err != nil { + return nil, status.Error(codes.Internal, "failed to create a SHA256 hasher") + } + + if _, err = io.Copy(hasher, r); err != nil { + return nil, status.Error(codes.Internal, "failed to write NAR to SHA256 hasher") + } + + return &tvpb.CalculateNARResponse{ + NarSize: hasher.BytesWritten(), + NarSha256: hasher.Digest(), + }, nil +} + +func (s *Service) List(_ *tvpb.ListPathInfoRequest, server tvpb.PathInfoService_ListServer) error { + // only list all currently to implement + ctx, cancel := context.WithCancel(server.Context()) + defer cancel() + + l := log.WithPrefix("path_info.list") + l.Debug("executing") + + iter, err := s.store.List(ctx) + if err != nil { + return status.Error(codes.Internal, "failed to create store iterator") + } + + defer func() { + _ = iter.Close() + }() + + for { + reader, err := iter.Next() + if err == io.EOF { + break + } else if err != nil { + return status.Error(codes.Internal, "failed to read next path info") + } + + b, err := io.ReadAll(reader) + _ = reader.Close() + if err != nil { + return status.Error(codes.Internal, "failed to read path info") + } + + var pathInfo tvpb.PathInfo + if err = proto.Unmarshal(b, &pathInfo); err != nil { + return status.Error(codes.Internal, "failed to unmarshal path info") + } + + if err = server.Send(&pathInfo); err != nil { + return err + } + } + + return nil +} diff --git a/pkg/store/cache.go b/pkg/store/cache.go index 609cc36..028cc7d 100644 --- a/pkg/store/cache.go +++ b/pkg/store/cache.go @@ -5,6 +5,8 @@ import ( "context" "io" + "github.com/brianmcgee/nvix/pkg/util" + "github.com/nats-io/nats.go" "github.com/charmbracelet/log" @@ -15,6 +17,10 @@ type CachingStore struct { Memory Store } +func (c *CachingStore) List(ctx context.Context) (util.Iterator[io.ReadCloser], error) { + return c.Disk.List(ctx) +} + func (c *CachingStore) Stat(key string, ctx context.Context) (ok bool, err error) { ok, err = c.Memory.Stat(key, ctx) if err == nil { diff --git a/pkg/store/cdc.go b/pkg/store/cdc.go index 6db94ed..298f360 100644 --- a/pkg/store/cdc.go +++ b/pkg/store/cdc.go @@ -5,6 +5,7 @@ import ( "context" "io" + "github.com/brianmcgee/nvix/pkg/util" "golang.org/x/sync/errgroup" "github.com/nats-io/nats.go" @@ -53,6 +54,18 @@ func (c *CdcStore) getMeta(key string, ctx context.Context) (*pb.BlobMeta, error return &meta, nil } +func (c *CdcStore) List(ctx context.Context) (util.Iterator[io.ReadCloser], error) { + metaIterator, err := c.Meta.List(ctx) + if err != nil { + return nil, err + } + return &blobIterator{ + ctx: ctx, + chunks: c.Chunks, + metaIterator: metaIterator, + }, nil +} + func (c *CdcStore) Stat(digest Digest, ctx context.Context) (ok bool, err error) { return c.Meta.Stat(digest.String(), ctx) } @@ -156,6 +169,42 @@ func (c *CdcStore) Delete(digest Digest, ctx context.Context) error { return nil } +type blobIterator struct { + ctx context.Context + chunks Store + metaIterator util.Iterator[io.ReadCloser] +} + +func (b *blobIterator) Next() (io.ReadCloser, error) { + metaReader, err := b.metaIterator.Next() + if err != nil { + return nil, err + } + defer func() { + _ = metaReader.Close() + }() + + metaBytes, err := io.ReadAll(metaReader) + if err != nil { + return nil, errors.Annotate(err, "failed to read blob metadata") + } + + var meta pb.BlobMeta + if err = proto.Unmarshal(metaBytes, &meta); err != nil { + return nil, err + } + + return &blobReader{ + blob: &meta, + store: b.chunks, + ctx: b.ctx, + }, nil +} + +func (b *blobIterator) Close() error { + return b.metaIterator.Close() +} + type blobReader struct { blob *pb.BlobMeta store Store diff --git a/pkg/store/cdc_test.go b/pkg/store/cdc_test.go index 4ebd2fc..1a4e1b8 100644 --- a/pkg/store/cdc_test.go +++ b/pkg/store/cdc_test.go @@ -119,6 +119,59 @@ func newCdcStore(t test.TestingT, conn *nats.Conn, js nats.JetStreamContext) *Cd } } +func TestCdcStore_List(t *testing.T) { + as := assert.New(t) + + s := test.RunBasicJetStreamServer(t) + defer test.ShutdownJSServerAndRemoveStorage(t, s) + + conn, js := test.JsClient(t, s) + + js, err := conn.JetStream() + if err != nil { + t.Fatal(err) + } + + store := newCdcStore(t, conn, js) + rng := rand.New(rand.NewSource(1)) + + digests := make(map[string]bool) + + writeCount := 10 + for i := 0; i < writeCount; i++ { + data := make([]byte, 8*1024*1024) + rng.Read(data) + + digest, err := store.Put(io.NopCloser(bytes.NewReader(data)), context.Background()) + as.Nil(err) + + digests[digest.String()] = true + } + + iter, err := store.List(context.Background()) + as.Nil(err) + + readCount := 0 + for { + reader, err := iter.Next() + if err == io.EOF { + break + } + + hasher := blake3.New(32, nil) + as.Nil(err) + + _, err = io.Copy(hasher, reader) + as.Nil(err) + + blobDigest := Digest(hasher.Sum(nil)) + as.True(digests[blobDigest.String()]) + readCount += 1 + } + + as.Equal(writeCount, readCount) +} + func TestCdcStore_PutAndGet(t *testing.T) { as := assert.New(t) diff --git a/pkg/store/nats.go b/pkg/store/nats.go index 6da7a2d..76256dd 100644 --- a/pkg/store/nats.go +++ b/pkg/store/nats.go @@ -4,6 +4,9 @@ import ( "bytes" "context" "io" + "time" + + "github.com/brianmcgee/nvix/pkg/util" "github.com/juju/errors" "github.com/nats-io/nats.go" @@ -96,6 +99,23 @@ func (n *NatsStore) Delete(key string, ctx context.Context) error { }) } +func (n *NatsStore) List(ctx context.Context) (util.Iterator[io.ReadCloser], error) { + js, err := n.js(ctx) + if err != nil { + return nil, err + } + sub, err := js.SubscribeSync(n.subject("*"), nats.DeliverAll()) + if err != nil { + return nil, err + } + return &natsIterator{ + ctx: ctx, + sub: sub, + fetchTimeout: 5 * time.Second, + numPending: 1, + }, nil +} + func (n *NatsStore) js(_ context.Context) (nats.JetStreamContext, error) { // todo potentially extract js from ctx js, err := n.Conn.JetStream(nats.DirectGet()) @@ -105,6 +125,49 @@ func (n *NatsStore) js(_ context.Context) (nats.JetStreamContext, error) { return js, err } +type natsIterator struct { + ctx context.Context + sub *nats.Subscription + fetchTimeout time.Duration + + numPending uint64 +} + +func (n *natsIterator) Next() (io.ReadCloser, error) { + if n.numPending == 0 { + // we have caught up + return nil, io.EOF + } + + select { + case <-n.ctx.Done(): + return nil, n.ctx.Err() + default: + ctx, cancel := context.WithTimeout(n.ctx, n.fetchTimeout) + defer cancel() + + msg, err := n.sub.NextMsgWithContext(ctx) + if err != nil { + return nil, err + } + + meta, err := msg.Metadata() + if err != nil { + return nil, errors.Annotate(err, "failed to get msg metadata") + } + + n.numPending = meta.NumPending + + return &natsMsgReader{ + reader: bytes.NewReader(msg.Data), + }, nil + } +} + +func (n *natsIterator) Close() error { + return n.sub.Unsubscribe() +} + type natsMsgReader struct { js nats.JetStreamContext stream string @@ -115,7 +178,7 @@ type natsMsgReader struct { } func (r *natsMsgReader) Read(p []byte) (n int, err error) { - if r.msg == nil { + if r.reader == nil && r.msg == nil { r.msg, err = r.js.GetLastMsg(r.stream, r.subject) if err == nats.ErrMsgNotFound { return 0, ErrKeyNotFound diff --git a/pkg/store/types.go b/pkg/store/types.go index 73ce012..9006d4c 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -5,6 +5,8 @@ import ( "encoding/base64" "io" + "github.com/brianmcgee/nvix/pkg/util" + "github.com/nats-io/nats.go" "github.com/juju/errors" @@ -24,6 +26,7 @@ type Store interface { Get(key string, ctx context.Context) (io.ReadCloser, error) Put(key string, reader io.ReadCloser, ctx context.Context) error PutAsync(key string, reader io.ReadCloser, ctx context.Context) (nats.PubAckFuture, error) + List(ctx context.Context) (util.Iterator[io.ReadCloser], error) Stat(key string, ctx context.Context) (bool, error) Delete(key string, ctx context.Context) error } diff --git a/pkg/util/iterator.go b/pkg/util/iterator.go new file mode 100644 index 0000000..00fd41a --- /dev/null +++ b/pkg/util/iterator.go @@ -0,0 +1,6 @@ +package util + +type Iterator[T any] interface { + Next() (T, error) + Close() error +}