Skip to content

Commit

Permalink
Fix entry verification
Browse files Browse the repository at this point in the history
- Need to sync entries before sampling
- Verify can now do a double-hashed find
- Make dh and non-dh verify configurable with --no-prov flag
- Optionally allow separate dhstore and indexers (provider sources) to be specified

Fixes #35
  • Loading branch information
gammazero committed Jul 13, 2023
1 parent 6269db1 commit e345865
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 49 deletions.
23 changes: 17 additions & 6 deletions pkg/find/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var FindCmd = &cli.Command{
Example usage:
ipni find -i cid.contact --cid bafybeigvgzoolc3drupxhlevdp2ugqcrbcsqfmcek2zxiw5wctk3xjpjwy`,
Flags: findFlags,
Before: beforeFind,
Action: findAction,
}

Expand All @@ -41,15 +42,13 @@ var findFlags = []cli.Flag{
},
&cli.StringSliceFlag{
Name: "indexer",
Usage: "URL of indexer to query. Multiple OK to specify providers info sources.",
Usage: "URL of indexer to query. Multiple OK to specify providers info sources for dhstore.",
Aliases: []string{"i"},
Value: cli.NewStringSlice("http://localhost:3000"),
},
&cli.StringFlag{
Name: "dhstore",
Usage: "URL of double-hashed (reader-private) store, if different from indexer",
EnvVars: []string{"DHSTORE"},
Required: false,
Name: "dhstore",
Usage: "URL of double-hashed (reader-private) store, if different from indexer",
Aliases: []string{"dhs"},
},
&cli.BoolFlag{
Name: "id-only",
Expand All @@ -65,6 +64,18 @@ var findFlags = []cli.Flag{
},
}

func beforeFind(cctx *cli.Context) error {
if len(cctx.StringSlice("indexer")) == 0 {
if cctx.Bool("no-priv") {
return cli.Exit("missing value for --indexer", 1)
}
if cctx.String("dhstore") == "" {
return cli.Exit("missing value for --dhstore and --indexer", 1)
}
}
return nil
}

func findAction(cctx *cli.Context) error {
mhArgs := cctx.StringSlice("mh")
cidArgs := cctx.StringSlice("cid")
Expand Down
207 changes: 164 additions & 43 deletions pkg/verify/ingest.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package verify

import (
"context"
"errors"
"fmt"
"math/rand"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/ipni/go-libipni/apierror"
"github.com/ipni/go-libipni/find/client"
"github.com/ipni/go-libipni/find/model"
"github.com/ipni/go-libipni/pcache"
"github.com/ipni/ipni-cli/pkg/adpub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multicodec"
Expand Down Expand Up @@ -116,12 +118,15 @@ var verifyIngestFlags = []cli.Flag{
Usage: "Path to the CAR index file from which to extract the list of multihash for verification.",
Aliases: []string{"fci"},
},
&cli.StringFlag{
&cli.StringSliceFlag{
Name: "indexer",
Usage: "URL of indexer to query",
EnvVars: []string{"INDEXER"},
Usage: "URL of indexer to query. Multiple OK to specify providers info sources for dhstore.",
Aliases: []string{"i"},
Value: "http://localhost:3000",
},
&cli.StringFlag{
Name: "dhstore",
Usage: "URL of double-hashed (reader-private) store, if different from indexer",
Aliases: []string{"dhs"},
},
&cli.Float64Flag{
Name: "sampling-prob",
Expand Down Expand Up @@ -184,9 +189,22 @@ var verifyIngestFlags = []cli.Flag{
Aliases: []string{"pum"},
Destination: &printUnindexedMhs,
},
&cli.BoolFlag{
Name: "no-priv",
Usage: "Do no use reader-privacy for queries. If --dhstore also specified, use in place of --indexer",
},
}

func beforeVerifyIngest(cctx *cli.Context) error {
if len(cctx.StringSlice("indexer")) == 0 {
if cctx.Bool("no-priv") {
return cli.Exit("missing value for --indexer", 1)
}
if cctx.String("dhstore") == "" {
return cli.Exit("missing value for --dhstore and --indexer", 1)
}
}

if samplingProb <= 0 || samplingProb > 1 {
return cli.Exit("Sampling probability must be larger than 0.0 and smaller or equal to 1.0.", 1)
}
Expand Down Expand Up @@ -262,13 +280,38 @@ func verifyIngestFromProvider(cctx *cli.Context, provID peer.ID) error {
adDepthLimitStr = fmt.Sprintf("%d", adDepthLimit)
}

find, err := client.New(cctx.String("indexer"))
if err != nil {
return err
var dhFind *client.DHashClient
var clearFind *client.Client
var provCache *pcache.ProviderCache
var err error
if cctx.Bool("no-priv") {
idxr := cctx.String("dhstore")
if idxr == "" {
idxr = cctx.StringSlice("indexer")[0]
}
clearFind, err = client.New(idxr)
if err != nil {
return err
}
provCache, err = pcache.New(pcache.WithSourceURL(idxr),
pcache.WithRefreshInterval(0))
if err != nil {
return err
}
} else {
dhFind, err = client.NewDHashClient(
client.WithProvidersURL(cctx.StringSlice("indexer")...),
client.WithDHStoreURL(cctx.String("dhstore")),
client.WithPcacheTTL(0),
)
if err != nil {
return err
}
provCache = dhFind.PCache()
}

// Get publisher address, for specified provider, from indexer.
provInfo, err := find.GetProvider(cctx.Context, provID)
provInfo, err := provCache.Get(cctx.Context, provID)
if err != nil {
var ae *apierror.Error
if errors.As(err, &ae) && ae.Status() == http.StatusNotFound {
Expand Down Expand Up @@ -321,14 +364,11 @@ func verifyIngestFromProvider(cctx *cli.Context, provID peer.ID) error {
}
fmt.Fprintf(os.Stderr, "⚠️ Failed to fully sync advertisement %s. Output shows partially synced ad.\n Error: %s\n", adCid, err.Error())
}
ads := stats.Sample(ad)

fmt.Printf("Advertisement ID: %s\n", ad.ID)
fmt.Printf("Previous Advertisement ID: %s\n", ad.PreviousID)
fmt.Printf("Verifying ingest... (%d/%s)\n", i, adDepthLimitStr)
if ads.NoLongerProvided {
fmt.Println("🧹 Removed in later advertisements; skipping verification.")
} else if ad.IsRemove {
if ad.IsRemove {
fmt.Println("✂️ Removal advertisement; skipping verification.")
} else if !ad.HasEntries() {
fmt.Println("Has no entries; skipping verification.")
Expand All @@ -338,25 +378,30 @@ func verifyIngestFromProvider(cctx *cli.Context, provID peer.ID) error {
fmt.Fprintf(os.Stderr, "⚠️ Failed to sync entries for advertisement %s: %s\n", ad.ID, err)
}

var entriesOutput string
if ads.PartiallySynced {
entriesOutput = "; ad entries are partially synced due to: " + ads.SyncErr.Error()
}

fmt.Printf("Total Entries: %d over %d chunk(s)%s\n", ads.MhCount, ads.ChunkCount, entriesOutput)
fmt.Print("Verification: ")
if len(ads.MhSample) == 0 {
fmt.Println("🔘 Skipped; sampling did not include any multihashes.")
ads := stats.Sample(ad)
if ads.NoLongerProvided {
fmt.Println("🧹 Removed in later advertisements; skipping verification.")
} else {
result, err := verifyIngestFromMhs(cctx, find, provID, ads.MhSample)
if err != nil {
return err
var entriesOutput string
if ads.PartiallySynced {
entriesOutput = "; ad entries are partially synced due to: " + ads.SyncErr.Error()
}
aggResult.add(result)
if result.passedVerification() {
fmt.Println("✅ Pass")

fmt.Printf("Total Entries: %d over %d chunk(s)%s\n", ads.MhCount, ads.ChunkCount, entriesOutput)
fmt.Print("Verification: ")
if len(ads.MhSample) == 0 {
fmt.Println("🔘 Skipped; sampling did not include any multihashes.")
} else {
fmt.Println("❌ Fail")
result, err := verifyIngestFromMhs(cctx, clearFind, dhFind, provID, ads.MhSample)
if err != nil {
return err
}
aggResult.add(result)
if result.passedVerification() {
fmt.Println("✅ Pass")
} else {
fmt.Println("❌ Fail")
}
}
}
}
Expand All @@ -383,12 +428,29 @@ func verifyIngestFromCar(cctx *cli.Context, provID peer.ID, carPath string) erro
return err
}

find, err := client.New(cctx.String("indexer"))
if err != nil {
return err
var dhFind *client.DHashClient
var clearFind *client.Client
if cctx.Bool("no-priv") {
idxr := cctx.String("dhstore")
if idxr == "" {
idxr = cctx.StringSlice("indexer")[0]
}
clearFind, err = client.New(idxr)
if err != nil {
return err
}
} else {
dhFind, err = client.NewDHashClient(
client.WithProvidersURL(cctx.StringSlice("indexer")...),
client.WithDHStoreURL(cctx.String("dhstore")),
client.WithPcacheTTL(0),
)
if err != nil {
return err
}
}

result, err := verifyIngestFromCarIterableIndex(cctx, find, provID, idx)
result, err := verifyIngestFromCarIterableIndex(cctx, clearFind, dhFind, provID, idx)
if err != nil {
return err
}
Expand Down Expand Up @@ -451,12 +513,29 @@ func verifyIngestFromCarIndex(cctx *cli.Context, provID peer.ID, carIndexPath st
return errInvalidCarIndexFormat()
}

find, err := client.New(cctx.String("indexer"))
if err != nil {
return err
var dhFind *client.DHashClient
var clearFind *client.Client
if cctx.Bool("no-priv") {
idxr := cctx.String("dhstore")
if idxr == "" {
idxr = cctx.StringSlice("indexer")[0]
}
clearFind, err = client.New(idxr)
if err != nil {
return err
}
} else {
dhFind, err = client.NewDHashClient(
client.WithProvidersURL(cctx.StringSlice("indexer")...),
client.WithDHStoreURL(cctx.String("dhstore")),
client.WithPcacheTTL(0),
)
if err != nil {
return err
}
}

result, err := verifyIngestFromCarIterableIndex(cctx, find, provID, iterIdx)
result, err := verifyIngestFromCarIterableIndex(cctx, clearFind, dhFind, provID, iterIdx)
if err != nil {
return err
}
Expand All @@ -473,7 +552,7 @@ func errVerifyIngestMultipleSources() error {
return cli.Exit("Multiple multihash sources are specified. Only a single source at a time is supported.", 1)
}

func verifyIngestFromCarIterableIndex(cctx *cli.Context, find *client.Client, provID peer.ID, idx index.IterableIndex) (*verifyResult, error) {
func verifyIngestFromCarIterableIndex(cctx *cli.Context, find *client.Client, dhFind *client.DHashClient, provID peer.ID, idx index.IterableIndex) (*verifyResult, error) {
var mhs []multihash.Multihash
if err := idx.ForEach(func(mh multihash.Multihash, _ uint64) error {
if include() {
Expand All @@ -483,7 +562,7 @@ func verifyIngestFromCarIterableIndex(cctx *cli.Context, find *client.Client, pr
}); err != nil {
return nil, err
}
return verifyIngestFromMhs(cctx, find, provID, mhs)
return verifyIngestFromMhs(cctx, find, dhFind, provID, mhs)
}

type verifyResult struct {
Expand Down Expand Up @@ -549,11 +628,11 @@ func (r *verifyResult) print(samplingProb float64, rngSeed int64, printUnindexed
}
}

func verifyIngestFromMhs(cctx *cli.Context, find *client.Client, wantProvID peer.ID, mhs []multihash.Multihash) (*verifyResult, error) {
func verifyIngestFromMhs(cctx *cli.Context, find *client.Client, dhFind *client.DHashClient, wantProvID peer.ID, mhs []multihash.Multihash) (*verifyResult, error) {
chunkSize := cctx.Int("batch-size")
aggResult := &verifyResult{}
for len(mhs) >= chunkSize {
result, err := verifyIngest(cctx, find, wantProvID, mhs[:chunkSize])
result, err := verifyIngest(cctx, find, dhFind, wantProvID, mhs[:chunkSize])
if err != nil {
return nil, err
}
Expand All @@ -562,7 +641,7 @@ func verifyIngestFromMhs(cctx *cli.Context, find *client.Client, wantProvID peer
os.Stdout.WriteString(".")
}
if len(mhs) != 0 {
result, err := verifyIngest(cctx, find, wantProvID, mhs)
result, err := verifyIngest(cctx, find, dhFind, wantProvID, mhs)
if err != nil {
return nil, err
}
Expand All @@ -571,18 +650,29 @@ func verifyIngestFromMhs(cctx *cli.Context, find *client.Client, wantProvID peer
return aggResult, nil
}

func verifyIngest(cctx *cli.Context, find *client.Client, wantProvID peer.ID, mhs []multihash.Multihash) (*verifyResult, error) {
func verifyIngest(cctx *cli.Context, find *client.Client, dhFind *client.DHashClient, wantProvID peer.ID, mhs []multihash.Multihash) (*verifyResult, error) {
result := &verifyResult{}
mhsCount := len(mhs)
result.TotalMhChecked = mhsCount
response, err := find.FindBatch(cctx.Context, mhs)

var response *model.FindResponse
var err error
if dhFind != nil {
response, err = doDHFind(cctx.Context, dhFind, mhs)
} else {
response, err = doClearFind(cctx.Context, find, mhs)
}
if err != nil {
result.FailedToVerify = mhsCount
err = fmt.Errorf("failed to connect to indexer: %w", err)
result.Errs = append(result.Errs, err)
return result, nil
}

if dhFind != nil {
fmt.Println("🔒 Reader privacy enabled")
}

if len(response.MultihashResults) == 0 {
result.Absent = mhsCount
return result, nil
Expand Down Expand Up @@ -615,3 +705,34 @@ func verifyIngest(cctx *cli.Context, find *client.Client, wantProvID peer.ID, mh
}
return result, nil
}

func doDHFind(ctx context.Context, cl *client.DHashClient, mhs []multihash.Multihash) (*model.FindResponse, error) {
var resp *model.FindResponse
for _, mh := range mhs {
r, err := cl.Find(ctx, mh)
if err != nil {
// TODO: Look for error that specifies double-hashing not supported.
var ae *apierror.Error
if errors.As(err, &ae) && ae.Status() == http.StatusNotFound {
continue
}
return nil, err
}
if resp == nil {
resp = r
} else {
resp.MultihashResults = append(resp.MultihashResults, r.MultihashResults...)
}
}
//if resp == nil && cctx.Bool("fallback") {
// return clearFind(cctx, mhs)
//}
return resp, nil
}

func doClearFind(ctx context.Context, cl *client.Client, mhs []multihash.Multihash) (*model.FindResponse, error) {
if len(mhs) == 1 {
return cl.Find(ctx, mhs[0])
}
return cl.FindBatch(ctx, mhs)
}

0 comments on commit e345865

Please sign in to comment.