From 5bad850333cee50a1e6c21772429d74c80b6e43b Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Wed, 20 Mar 2024 14:21:34 +0100 Subject: [PATCH] feat: optional PUT, local data store --- docs/environment-variables.md | 22 +++ go.mod | 7 +- go.sum | 12 ++ main.go | 24 ++- server.go | 120 +++++++++----- server_routers.go | 295 +++++++++++++++++++++++++++++++--- server_routers_test.go | 193 ++++++++++++++++++++++ 7 files changed, 601 insertions(+), 72 deletions(-) diff --git a/docs/environment-variables.md b/docs/environment-variables.md index f4de147..94c87bb 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -5,6 +5,8 @@ - [Configuration](#configuration) - [`SOMEGUY_LISTEN_ADDRESS`](#someguy_listen_address) - [`SOMEGUY_ACCELERATED_DHT`](#someguy_accelerated_dht) + - [`SOMEGUY_PUT_ENABLED`](#someguy_put_enabled) + - [`SOMEGUY_DATADIR`](#someguy_datadir) - [`SOMEGUY_PROVIDER_ENDPOINTS`](#someguy_provider_endpoints) - [`SOMEGUY_PEER_ENDPOINTS`](#someguy_peer_endpoints) - [`SOMEGUY_IPNS_ENDPOINTS`](#someguy_ipns_endpoints) @@ -28,6 +30,26 @@ Whether or not the Accelerated DHT is enabled or not. Default: `true` +### `SOMEGUY_PUT_ENABLED` + +Whether or not to accept Delegated Routing V1 PUT requests. Affects all PUT requests: +provider records, peer records and IPNS records. + +By default, PUT requests are ignored. Therefore, they will neither be stored locally, +nor sent to other remote endpoints. + +Default: `false` + +### `SOMEGUY_DATADIR` + +Used in conjunction with [`SOMEGUY_PUT_ENABLED`](#someguy_put_enabled). + +The LevelDB data directory to persist PUT records. When receiving PUT requests, +the records will be stored in this database. The database is queried for GET +requests. + +Default: none + ### `SOMEGUY_PROVIDER_ENDPOINTS` Comma-separated list of other Delegated Routing V1 endpoints to proxy provider requests to. diff --git a/go.mod b/go.mod index 9b8ddef..139326c 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,8 @@ require ( github.com/felixge/httpsnoop v1.0.4 github.com/ipfs/boxo v0.18.1-0.20240319120907-07fc4754a2d2 github.com/ipfs/go-cid v0.4.1 + github.com/ipfs/go-datastore v0.6.0 + github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/libp2p/go-libp2p v0.33.0 github.com/libp2p/go-libp2p-kad-dht v0.25.2 @@ -15,6 +17,7 @@ require ( github.com/multiformats/go-multihash v0.2.3 github.com/prometheus/client_golang v1.19.0 github.com/rs/cors v1.10.1 + github.com/samber/lo v1.39.0 github.com/slok/go-http-metrics v0.11.0 github.com/stretchr/testify v1.9.0 github.com/urfave/cli/v2 v2.27.1 @@ -41,6 +44,7 @@ require ( github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 // indirect github.com/google/uuid v1.6.0 // indirect @@ -50,7 +54,6 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/huin/goupnp v1.3.0 // indirect - github.com/ipfs/go-datastore v0.6.0 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipld/go-ipld-prime v0.21.0 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect @@ -101,9 +104,9 @@ require ( github.com/quic-go/webtransport-go v0.6.0 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/samber/lo v1.39.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/syndtr/goleveldb v1.0.0 // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect github.com/xrash/smetrics v0.0.0-20231213231151-1d8dd44e695e // indirect go.opencensus.io v0.24.0 // indirect diff --git a/go.sum b/go.sum index 28d4074..17fbe45 100644 --- a/go.sum +++ b/go.sum @@ -135,6 +135,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f h1:jopqB+UTSdJGEJT8tEqYyE29zN91fi2827oLET8tl7k= github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f/go.mod h1:nOPhAkwVliJdNTkj3gXpljmWhjc4wCaVqbMJcPKWP4s= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -182,6 +184,7 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= @@ -193,12 +196,15 @@ github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= github.com/ipfs/go-datastore v0.1.0/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw= +github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk= github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0Myk= github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ds-badger v0.0.7/go.mod h1:qt0/fWzZDoPW6jpQeqUjR5kBfhDNB65jd9YlmAvpQBk= github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZCg+cSZvYb8= +github.com/ipfs/go-ds-leveldb v0.5.0 h1:s++MEBbD3ZKc9/8/njrn4flZLnCuY9I79v94gBUNumo= +github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc= github.com/ipfs/go-ipfs-util v0.0.3 h1:2RFdGez6bu2ZlZdI+rWfIdbQb1KudQp3VGwPtdNCmE0= @@ -244,6 +250,7 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv github.com/koron/go-ssdp v0.0.4 h1:1IDwrghSKYM7yLf7XCzbByg2sJ/JcNOZRXS2jczTwz0= github.com/koron/go-ssdp v0.0.4/go.mod h1:oDXq+E5IL5q0U8uSBcoAXzTzInwy5lEgC91HoKtbmZk= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= @@ -370,6 +377,7 @@ github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOEL github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo/v2 v2.15.0 h1:79HwNRBAZHOEwrczrgSOPy+eFTTlIGELKy5as+ClttY= github.com/onsi/ginkgo/v2 v2.15.0/go.mod h1:HlxMHtYF57y6Dpf+mc5529KKmSq9h2FpCF+/ZkwUxKM= @@ -489,6 +497,7 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= @@ -714,13 +723,16 @@ google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7 google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVYcSwSjGebuDL6176A1XskgbtNl64NSg+n8= gopkg.in/src-d/go-log.v1 v1.0.1/go.mod h1:GN34hKP0g305ysm2/hctJ0Y8nWP3zxXXJ8GFabTyABE= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go index 45dff22..bf84cac 100644 --- a/main.go +++ b/main.go @@ -36,6 +36,18 @@ func main() { EnvVars: []string{"SOMEGUY_ACCELERATED_DHT"}, Usage: "run the accelerated DHT client", }, + &cli.BoolFlag{ + Name: "put-enabled", + Value: false, + EnvVars: []string{"SOMEGUY_PUT_ENABLED"}, + Usage: "enables HTTP PUT endpoints", + }, + &cli.StringFlag{ + Name: "datadir", + Value: "", + EnvVars: []string{"SOMEGUY_DATADIR"}, + Usage: "directory for persistent data", + }, &cli.StringSliceFlag{ Name: "provider-endpoints", Value: cli.NewStringSlice(cidContactEndpoint), @@ -56,7 +68,17 @@ func main() { }, }, Action: func(ctx *cli.Context) error { - return start(ctx.Context, ctx.String("listen-address"), ctx.Bool("accelerated-dht"), ctx.StringSlice("provider-endpoints"), ctx.StringSlice("peer-endpoints"), ctx.StringSlice("ipns-endpoints")) + options := &serverOptions{ + listenAddress: ctx.String("listen-address"), + acceleratedDHT: ctx.Bool("accelerated-dht"), + putEnabled: ctx.Bool("put-enabled"), + contentEndpoints: ctx.StringSlice("provider-endpoints"), + peerEndpoints: ctx.StringSlice("peer-endpoints"), + ipnsEndpoints: ctx.StringSlice("ipns-endpoints"), + dataDirectory: ctx.String("datadir"), + } + + return startServer(ctx.Context, options) }, }, { diff --git a/server.go b/server.go index 08abef6..9168cce 100644 --- a/server.go +++ b/server.go @@ -32,59 +32,35 @@ func withRequestLogger(next http.Handler) http.Handler { }) } -func start(ctx context.Context, listenAddress string, runAcceleratedDHTClient bool, contentEndpoints, peerEndpoints, ipnsEndpoints []string) error { - h, err := newHost(runAcceleratedDHTClient) - if err != nil { - return err - } - - var dhtRouting routing.Routing - if runAcceleratedDHTClient { - wrappedDHT, err := newBundledDHT(ctx, h) - if err != nil { - return err - } - dhtRouting = wrappedDHT - } else { - standardDHT, err := dht.New(ctx, h, dht.Mode(dht.ModeClient), dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...)) - if err != nil { - return err - } - dhtRouting = standardDHT - } - - crRouters, err := getCombinedRouting(contentEndpoints, dhtRouting) - if err != nil { - return err - } - - prRouters, err := getCombinedRouting(peerEndpoints, dhtRouting) - if err != nil { - return err - } +type serverOptions struct { + listenAddress string + acceleratedDHT bool + putEnabled bool + contentEndpoints []string + peerEndpoints []string + ipnsEndpoints []string + dataDirectory string +} - ipnsRouters, err := getCombinedRouting(ipnsEndpoints, dhtRouting) +func startServer(ctx context.Context, options *serverOptions) error { + router, err := newRouter(ctx, options) if err != nil { return err } - _, port, err := net.SplitHostPort(listenAddress) + _, port, err := net.SplitHostPort(options.listenAddress) if err != nil { return err } - log.Printf("Listening on %s", listenAddress) + log.Printf("Listening on %s", options.listenAddress) log.Printf("Delegated Routing API on http://127.0.0.1:%s/routing/v1", port) mdlw := middleware.New(middleware.Config{ Recorder: metrics.NewRecorder(metrics.Config{Prefix: "someguy"}), }) - handler := server.Handler(&composableRouter{ - providers: crRouters, - peers: prRouters, - ipns: ipnsRouters, - }) + handler := server.Handler(router) // Add CORS. handler = cors.New(cors.Options{ @@ -108,8 +84,66 @@ func start(ctx context.Context, listenAddress string, runAcceleratedDHTClient bo http.Handle("/debug/metrics/prometheus", promhttp.Handler()) http.Handle("/", handler) - server := &http.Server{Addr: listenAddress, Handler: nil} + server := &http.Server{Addr: options.listenAddress, Handler: nil} return server.ListenAndServe() + + // TODO: call .Close in database on exit +} + +func newRouter(ctx context.Context, options *serverOptions) (server.ContentRouter, error) { + h, err := newHost(options.acceleratedDHT) + if err != nil { + return nil, err + } + + var dhtRouting routing.Routing + if options.acceleratedDHT { + wrappedDHT, err := newBundledDHT(ctx, h) + if err != nil { + return nil, err + } + dhtRouting = wrappedDHT + } else { + standardDHT, err := dht.New(ctx, h, dht.Mode(dht.ModeClient), dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...)) + if err != nil { + return nil, err + } + dhtRouting = standardDHT + } + + crRouters, err := getCombinedRouting(options.contentEndpoints, dhtRouting, options.putEnabled) + if err != nil { + return nil, err + } + + prRouters, err := getCombinedRouting(options.peerEndpoints, dhtRouting, options.putEnabled) + if err != nil { + return nil, err + } + + ipnsRouters, err := getCombinedRouting(options.ipnsEndpoints, dhtRouting, options.putEnabled) + if err != nil { + return nil, err + } + + remoteRouter := &composableRouter{ + providers: crRouters, + peers: prRouters, + ipns: ipnsRouters, + } + + if options.dataDirectory == "" { + return remoteRouter, nil + } + + localRouter, err := newLocalRouter(options.dataDirectory) + if err != nil { + return nil, err + } + + return ¶llelRouter{ + routers: []router{localRouter, remoteRouter}, + }, nil } func newHost(highOutboundLimits bool) (host.Host, error) { @@ -146,9 +180,9 @@ func newHost(highOutboundLimits bool) (host.Host, error) { return h, nil } -func getCombinedRouting(endpoints []string, dht routing.Routing) (router, error) { +func getCombinedRouting(endpoints []string, dht routing.Routing, putEnabled bool) (router, error) { if len(endpoints) == 0 { - return libp2pRouter{routing: dht}, nil + return libp2pRouter{routing: dht, putEnabled: putEnabled}, nil } var routers []router @@ -158,10 +192,10 @@ func getCombinedRouting(endpoints []string, dht routing.Routing) (router, error) if err != nil { return nil, err } - routers = append(routers, clientRouter{Client: drclient}) + routers = append(routers, clientRouter{client: drclient, putEnabled: putEnabled}) } return parallelRouter{ - routers: append(routers, libp2pRouter{routing: dht}), + routers: append(routers, libp2pRouter{routing: dht, putEnabled: putEnabled}), }, nil } diff --git a/server_routers.go b/server_routers.go index b7c5745..0dba29d 100644 --- a/server_routers.go +++ b/server_routers.go @@ -2,8 +2,10 @@ package main import ( "context" + "encoding/json" "errors" "math" + "path/filepath" "sync" "time" @@ -13,8 +15,11 @@ import ( "github.com/ipfs/boxo/routing/http/types" "github.com/ipfs/boxo/routing/http/types/iter" "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + leveldb "github.com/ipfs/go-ds-leveldb" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" + "github.com/samber/lo" ) type router interface { @@ -362,29 +367,29 @@ func (r parallelRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipn var _ router = libp2pRouter{} type libp2pRouter struct { - routing routing.Routing + putEnabled bool + routing routing.Routing } -func (d libp2pRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { +func (r libp2pRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { ctx, cancel := context.WithCancel(ctx) - ch := d.routing.FindProvidersAsync(ctx, key, limit) - return iter.ToResultIter[types.Record](&peerChanIter{ + ch := r.routing.FindProvidersAsync(ctx, key, limit) + return iter.ToResultIter(&peerChanIter{ ch: ch, cancel: cancel, }), nil } -func (d libp2pRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { - // NOTE: this router cannot provide further to the DHT, since we can only - // announce CIDs that our own node has, which is not the case. +func (r libp2pRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + // NOTE: the libp2p router cannot provide records further into the DHT. return 0, routing.ErrNotSupported } -func (d libp2pRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { +func (r libp2pRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { ctx, cancel := context.WithCancel(ctx) defer cancel() - addr, err := d.routing.FindPeer(ctx, pid) + addr, err := r.routing.FindPeer(ctx, pid) if err != nil { return nil, err } @@ -398,18 +403,19 @@ func (d libp2pRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (it rec.Addrs = append(rec.Addrs, types.Multiaddr{Multiaddr: addr}) } - return iter.ToResultIter[*types.PeerRecord](iter.FromSlice[*types.PeerRecord]([]*types.PeerRecord{rec})), nil + return iter.ToResultIter(iter.FromSlice([]*types.PeerRecord{rec})), nil } func (r libp2pRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + // NOTE: the libp2p router cannot provide peers further into the DHT. return 0, routing.ErrNotSupported } -func (d libp2pRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { +func (r libp2pRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() - raw, err := d.routing.GetValue(ctx, string(name.RoutingKey())) + raw, err := r.routing.GetValue(ctx, string(name.RoutingKey())) if err != nil { return nil, err } @@ -417,7 +423,11 @@ func (d libp2pRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record return ipns.UnmarshalRecord(raw) } -func (d libp2pRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { +func (r libp2pRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { + if !r.putEnabled { + return routing.ErrNotSupported + } + ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -426,7 +436,7 @@ func (d libp2pRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns. return err } - return d.routing.PutValue(ctx, string(name.RoutingKey()), raw) + return r.routing.PutValue(ctx, string(name.RoutingKey()), raw) } type peerChanIter struct { @@ -470,30 +480,51 @@ func (it *peerChanIter) Close() error { var _ router = clientRouter{} type clientRouter struct { - *client.Client + putEnabled bool + client *client.Client } -func (d clientRouter) FindProviders(ctx context.Context, cid cid.Cid, limit int) (iter.ResultIter[types.Record], error) { - return d.Client.FindProviders(ctx, cid) +func (r clientRouter) FindProviders(ctx context.Context, cid cid.Cid, limit int) (iter.ResultIter[types.Record], error) { + return r.client.FindProviders(ctx, cid) } -func (d clientRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { - return d.provide(func() (iter.ResultIter[*types.AnnouncementResponseRecord], error) { - return d.Client.ProvideRecords(ctx, req) +func (r clientRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + if !r.putEnabled { + return 0, routing.ErrNotSupported + } + + return r.provide(func() (iter.ResultIter[*types.AnnouncementResponseRecord], error) { + return r.client.ProvideRecords(ctx, req) }) } -func (d clientRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { - return d.Client.FindPeers(ctx, pid) +func (r clientRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { + return r.client.FindPeers(ctx, pid) } -func (d clientRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { - return d.provide(func() (iter.ResultIter[*types.AnnouncementResponseRecord], error) { - return d.Client.ProvidePeerRecords(ctx, req) +func (r clientRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + if !r.putEnabled { + return 0, routing.ErrNotSupported + } + + return r.provide(func() (iter.ResultIter[*types.AnnouncementResponseRecord], error) { + return r.client.ProvidePeerRecords(ctx, req) }) } -func (d clientRouter) provide(do func() (iter.ResultIter[*types.AnnouncementResponseRecord], error)) (time.Duration, error) { +func (r clientRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { + return r.client.GetIPNS(ctx, name) +} + +func (r clientRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { + if !r.putEnabled { + return routing.ErrNotSupported + } + + return r.client.PutIPNS(ctx, name, record) +} + +func (r clientRouter) provide(do func() (iter.ResultIter[*types.AnnouncementResponseRecord], error)) (time.Duration, error) { resultsIter, err := do() if err != nil { return 0, err @@ -511,3 +542,215 @@ func (d clientRouter) provide(do func() (iter.ResultIter[*types.AnnouncementResp return records[0].TTL, nil } + +var _ router = localRouter{} + +type localRouter struct { + datastore datastore.Batching +} + +func providersKey(cid cid.Cid) datastore.Key { + return datastore.KeyWithNamespaces([]string{"providers", cid.String()}) +} + +func peersKey(pid peer.ID) datastore.Key { + return datastore.KeyWithNamespaces([]string{"peers", pid.String()}) +} + +func ipnsKey(name ipns.Name) datastore.Key { + return datastore.NewKey("ipns-" + name.String()) +} + +func newLocalRouter(datadir string) (*localRouter, error) { + ds, err := leveldb.NewDatastore(filepath.Join(datadir, "leveldb"), nil) + if err != nil { + return nil, err + } + + return &localRouter{ + datastore: ds, + }, nil +} + +func (r localRouter) FindProviders(ctx context.Context, cid cid.Cid, limit int) (iter.ResultIter[types.Record], error) { + raw, err := r.datastore.Get(ctx, providersKey(cid)) + if errors.Is(err, datastore.ErrNotFound) { + return iter.ToResultIter(iter.FromSlice([]types.Record{})), nil + } else if err != nil { + return nil, err + } + + var peerRecords []*types.PeerRecord + err = json.Unmarshal(raw, &peerRecords) + if err != nil { + return nil, err + } + + var records []types.Record + for _, r := range peerRecords { + records = append(records, r) + } + + return iter.ToResultIter(iter.FromSlice(records)), nil +} + +// Note: we don't verify the record since that is already done by the caller, +// i.e., Boxo's implementation of the server. This also facilitates our tests. +func (r localRouter) Provide(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + key := providersKey(req.Payload.CID) + + var records []types.PeerRecord + + raw, err := r.datastore.Get(ctx, key) + if errors.Is(err, datastore.ErrNotFound) { + // Nothing + } else if err != nil { + return 0, err + } else { + err = json.Unmarshal(raw, &records) + if err != nil { + return 0, err + } + } + + // NOTE: this is a very naive storage. We just transform the announcement + // record into a peer record and append it to the list. This will be returned + // when FindPeers is called. + records = append(records, types.PeerRecord{ + Schema: types.SchemaPeer, + ID: req.Payload.ID, + Addrs: req.Payload.Addrs, + Protocols: req.Payload.Protocols, + }) + + raw, err = json.Marshal(records) + if err != nil { + return 0, err + } + + return req.Payload.TTL, r.datastore.Put(ctx, key, raw) +} + +func (r localRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { + raw, err := r.datastore.Get(ctx, peersKey(pid)) + if errors.Is(err, datastore.ErrNotFound) { + return iter.ToResultIter(iter.FromSlice([]*types.PeerRecord{})), nil + } else if err != nil { + return nil, err + } + + var record *types.PeerRecord + err = json.Unmarshal(raw, &record) + if err != nil { + return nil, err + } + + return iter.ToResultIter(iter.FromSlice([]*types.PeerRecord{record})), nil +} + +// Note: we don't verify the record since that is already done by the caller, +// i.e., Boxo's implementation of the server. This also facilitates our tests. +func (r localRouter) ProvidePeer(ctx context.Context, req *types.AnnouncementRecord) (time.Duration, error) { + key := peersKey(*req.Payload.ID) + + // Make a [types.PeerRecord] based on the given [types.AnnouncementRecord]. + record := &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: req.Payload.ID, + Addrs: req.Payload.Addrs, + Protocols: req.Payload.Protocols, + } + + raw, err := r.datastore.Get(ctx, key) + if errors.Is(err, datastore.ErrNotFound) { + // Nothing + } else if err != nil { + return 0, err + } else { + // If we already had a record for the same peer, merge them together. + var oldRecord *types.PeerRecord + err = json.Unmarshal(raw, &oldRecord) + if err != nil { + return 0, err + } + + record.Addrs = lo.Uniq(append(record.Addrs, oldRecord.Addrs...)) + record.Protocols = lo.Uniq(append(record.Protocols, oldRecord.Protocols...)) + } + + raw, err = json.Marshal(record) + if err != nil { + return 0, err + } + + return req.Payload.TTL, r.datastore.Put(ctx, key, raw) +} + +func (r localRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { + raw, err := r.datastore.Get(ctx, ipnsKey(name)) + if errors.Is(err, datastore.ErrNotFound) { + return nil, routing.ErrNotFound + } + if err != nil { + return nil, err + } + + return ipns.UnmarshalRecord(raw) +} + +func (r localRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { + shouldStore, err := r.shouldStoreNewIPNS(ctx, name, record) + if err != nil { + return err + } + + if !shouldStore { + return nil + } + + data, err := ipns.MarshalRecord(record) + if err != nil { + return err + } + + return r.datastore.Put(ctx, ipnsKey(name), data) +} + +func (r localRouter) shouldStoreNewIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) (bool, error) { + raw, err := r.datastore.Get(ctx, ipnsKey(name)) + if errors.Is(err, datastore.ErrNotFound) { + return true, nil + } + + if err != nil { + return false, err + } + + oldRecord, err := ipns.UnmarshalRecord(raw) + if err != nil { + return false, err + } + + oldSequence, err := oldRecord.Sequence() + if err != nil { + return false, err + } + + oldValidity, err := oldRecord.Validity() + if err != nil { + return false, err + } + + sequence, err := record.Sequence() + if err != nil { + return false, err + } + + validity, err := record.Validity() + if err != nil { + return false, err + } + + // Only store new record if sequence is higher or the validity is higher. + return sequence > oldSequence || validity.After(oldValidity), nil +} diff --git a/server_routers_test.go b/server_routers_test.go index 32680b0..848c38f 100644 --- a/server_routers_test.go +++ b/server_routers_test.go @@ -63,6 +63,10 @@ func (m *mockRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.R return args.Error(0) } +func (m *mockRouter) Close() error { + return nil +} + func makeName(t *testing.T) (crypto.PrivKey, ipns.Name) { sk, _, err := crypto.GenerateEd25519Key(rand.Reader) require.NoError(t, err) @@ -681,3 +685,192 @@ func TestManyIter(t *testing.T) { require.NoError(t, manyIter.Close()) }) } + +func equalRecords(t *testing.T, a, b *ipns.Record) { + aValue, err := a.Value() + require.NoError(t, err) + + aSequence, err := a.Sequence() + require.NoError(t, err) + + aTTL, err := a.TTL() + require.NoError(t, err) + + aValidity, err := a.Validity() + require.NoError(t, err) + + bValue, err := b.Value() + require.NoError(t, err) + + bSequence, err := b.Sequence() + require.NoError(t, err) + + bTTL, err := b.TTL() + require.NoError(t, err) + + bValidity, err := b.Validity() + require.NoError(t, err) + + require.Equal(t, aValue, bValue) + require.Equal(t, aSequence, bSequence) + require.Equal(t, aTTL, bTTL) + require.Equal(t, aValidity, bValidity) +} + +func TestLocalRouter(t *testing.T) { + t.Parallel() + + t.Run("Providers", func(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + r, err := newLocalRouter(tempDir) + require.NoError(t, err) + + _, name := makeName(t) + pid := name.Peer() + cid := makeCID() + + t.Run("Store and Retrieve Record for CID", func(t *testing.T) { + resultsIter, err := r.FindProviders(context.Background(), cid, 10) + require.NoError(t, err) + + results, err := iter.ReadAllResults(resultsIter) + require.NoError(t, err) + require.Len(t, results, 0) + + _, err = r.Provide(context.Background(), &types.AnnouncementRecord{ + Payload: types.AnnouncementPayload{ + CID: cid, + ID: &pid, + Protocols: []string{"a"}, + }, + }) + require.NoError(t, err) + + resultsIter, err = r.FindProviders(context.Background(), cid, 10) + require.NoError(t, err) + + results, err = iter.ReadAllResults(resultsIter) + require.NoError(t, err) + require.Len(t, results, 1) + }) + + t.Run("Provide New Record", func(t *testing.T) { + _, err = r.Provide(context.Background(), &types.AnnouncementRecord{ + Payload: types.AnnouncementPayload{ + ID: &pid, + CID: cid, + Protocols: []string{"b", "a", "a"}, + }, + }) + require.NoError(t, err) + + resultsIter, err := r.FindProviders(context.Background(), cid, 10) + require.NoError(t, err) + + results, err := iter.ReadAllResults(resultsIter) + require.NoError(t, err) + require.Len(t, results, 2) + }) + }) + + t.Run("Peers", func(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + r, err := newLocalRouter(tempDir) + require.NoError(t, err) + + _, name := makeName(t) + pid := name.Peer() + + t.Run("Store and Retrieve Peer", func(t *testing.T) { + resultsIter, err := r.FindPeers(context.Background(), pid, 1) + require.NoError(t, err) + + results, err := iter.ReadAllResults(resultsIter) + require.NoError(t, err) + require.Len(t, results, 0) + + _, err = r.ProvidePeer(context.Background(), &types.AnnouncementRecord{ + Payload: types.AnnouncementPayload{ + ID: &pid, + Protocols: []string{"a"}, + }, + }) + require.NoError(t, err) + + resultsIter, err = r.FindPeers(context.Background(), pid, 1) + require.NoError(t, err) + + results, err = iter.ReadAllResults(resultsIter) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, []string{"a"}, results[0].Protocols) + }) + + t.Run("Provide Updated Information", func(t *testing.T) { + _, err = r.ProvidePeer(context.Background(), &types.AnnouncementRecord{ + Payload: types.AnnouncementPayload{ + ID: &pid, + Protocols: []string{"b", "a", "a"}, + }, + }) + require.NoError(t, err) + + resultsIter, err := r.FindPeers(context.Background(), pid, 1) + require.NoError(t, err) + + results, err := iter.ReadAllResults(resultsIter) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, []string{"b", "a"}, results[0].Protocols) + }) + }) + + t.Run("IPNS", func(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + r, err := newLocalRouter(tempDir) + require.NoError(t, err) + + sk, name := makeName(t) + rec, _ := makeIPNSRecord(t, sk) + + time.Sleep(time.Millisecond * 100) + newerRecord, _ := makeIPNSRecord(t, sk) + + t.Run("Store and Retrieve IPNS Record", func(t *testing.T) { + _, err = r.GetIPNS(context.Background(), name) + require.ErrorIs(t, err, routing.ErrNotFound) + + err = r.PutIPNS(context.Background(), name, rec) + require.NoError(t, err) + + storedRecord, err := r.GetIPNS(context.Background(), name) + require.NoError(t, err) + equalRecords(t, rec, storedRecord) + }) + + t.Run("Should Replace With Newer Record", func(t *testing.T) { + err = r.PutIPNS(context.Background(), name, newerRecord) + require.NoError(t, err) + + storedRecord, err := r.GetIPNS(context.Background(), name) + require.NoError(t, err) + equalRecords(t, newerRecord, storedRecord) + }) + + t.Run("Should Not Replace With Older Record", func(t *testing.T) { + err = r.PutIPNS(context.Background(), name, rec) + require.NoError(t, err) + + storedRecord, err := r.GetIPNS(context.Background(), name) + require.NoError(t, err) + equalRecords(t, newerRecord, storedRecord) + }) + }) + +}