From 95d11d8933bcd45d0a1bbf316257f6e60c0feb5a Mon Sep 17 00:00:00 2001 From: Jorropo Date: Wed, 27 Jul 2022 19:26:58 +0200 Subject: [PATCH 1/2] chore: bump deps --- go.mod | 8 ++------ go.sum | 9 +++++---- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 88c1d75..0abc165 100644 --- a/go.mod +++ b/go.mod @@ -10,8 +10,8 @@ require ( github.com/ipfs/go-ipfs-blockstore v1.2.0 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-delay v0.0.1 - github.com/ipfs/go-ipfs-exchange-interface v0.1.0 - github.com/ipfs/go-ipfs-exchange-offline v0.2.0 + github.com/ipfs/go-ipfs-exchange-interface v0.2.0 + github.com/ipfs/go-ipfs-exchange-offline v0.3.0 github.com/ipfs/go-ipfs-routing v0.2.1 github.com/ipfs/go-ipfs-util v0.0.2 github.com/ipfs/go-ipld-format v0.3.0 @@ -21,10 +21,6 @@ require ( go.opentelemetry.io/otel/trace v1.7.0 ) -replace github.com/ipfs/go-ipfs-exchange-interface => github.com/MichaelMure/go-ipfs-exchange-interface v0.0.2-0.20220713142804-1181846dc171 - -replace github.com/ipfs/go-ipfs-exchange-offline => github.com/MichaelMure/go-ipfs-exchange-offline v0.0.2-0.20220714102739-4b7a20c758a9 - replace github.com/ipfs/go-bitswap => github.com/MichaelMure/go-bitswap v0.2.20-0.20220714225615-2c2a46194c4e require ( diff --git a/go.sum b/go.sum index f4cfff9..c1b524b 100644 --- a/go.sum +++ b/go.sum @@ -15,10 +15,6 @@ github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/MichaelMure/go-bitswap v0.2.20-0.20220714225615-2c2a46194c4e h1:LBNoZYGAIHUWfyolHrs9PH4M6kd3ze9MenhHdZXyHYc= github.com/MichaelMure/go-bitswap v0.2.20-0.20220714225615-2c2a46194c4e/go.mod h1:EeOjh6Xi0IWQfdTj5LqScnXnxQtg9k4lshWjYCGSghc= -github.com/MichaelMure/go-ipfs-exchange-interface v0.0.2-0.20220713142804-1181846dc171 h1:J6IkkSKshHms3yQEOrNK/7B2YcCJ6ZbyDDmaXHwOj4Y= -github.com/MichaelMure/go-ipfs-exchange-interface v0.0.2-0.20220713142804-1181846dc171/go.mod h1:z6+RhJuDQbqKguVyslSOuVDhqF9JtTrO3eptSAiW2/Y= -github.com/MichaelMure/go-ipfs-exchange-offline v0.0.2-0.20220714102739-4b7a20c758a9 h1:R4PGpFAUOd3tJjFAJLu8dPyGOLufuICel3mZ9uwtYGo= -github.com/MichaelMure/go-ipfs-exchange-offline v0.0.2-0.20220714102739-4b7a20c758a9/go.mod h1:6SxfEhzkc8ZX1yCWGL8owaravbq3xUF28yGF13VSQ7c= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= @@ -293,6 +289,11 @@ github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG github.com/ipfs/go-ipfs-ds-help v0.1.1/go.mod h1:SbBafGJuGsPI/QL3j9Fc5YPLeAu+SzOkI0gFwAg+mOs= github.com/ipfs/go-ipfs-ds-help v1.1.0 h1:yLE2w9RAsl31LtfMt91tRZcrx+e61O5mDxFRR994w4Q= github.com/ipfs/go-ipfs-ds-help v1.1.0/go.mod h1:YR5+6EaebOhfcqVCyqemItCLthrpVNot+rsOU/5IatU= +github.com/ipfs/go-ipfs-exchange-interface v0.1.0/go.mod h1:ych7WPlyHqFvCi/uQI48zLZuAWVP5iTQPXEfVaw5WEI= +github.com/ipfs/go-ipfs-exchange-interface v0.2.0 h1:8lMSJmKogZYNo2jjhUs0izT+dck05pqUw4mWNW9Pw6Y= +github.com/ipfs/go-ipfs-exchange-interface v0.2.0/go.mod h1:z6+RhJuDQbqKguVyslSOuVDhqF9JtTrO3eptSAiW2/Y= +github.com/ipfs/go-ipfs-exchange-offline v0.3.0 h1:c/Dg8GDPzixGd0MC8Jh6mjOwU57uYokgWRFidfvEkuA= +github.com/ipfs/go-ipfs-exchange-offline v0.3.0/go.mod h1:MOdJ9DChbb5u37M1IcbrRB02e++Z7521fMxqCNRrz9s= github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY= github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY= github.com/ipfs/go-ipfs-routing v0.2.1 h1:E+whHWhJkdN9YeoHZNj5itzc+OR292AJ2uE9FFiW0BY= From c72dade75a0ec8b9a719ed80dada29bdde114f42 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Wed, 27 Jul 2022 22:01:37 +0200 Subject: [PATCH 2/2] fix: correctly fallback to non session exchanges with sessions. --- blockservice.go | 83 +++++++++++++++++++++++++++++++------------------ 1 file changed, 53 insertions(+), 30 deletions(-) diff --git a/blockservice.go b/blockservice.go index 015f81a..b86f2eb 100644 --- a/blockservice.go +++ b/blockservice.go @@ -118,16 +118,18 @@ func NewSession(ctx context.Context, bs BlockService) *Session { exch := bs.Exchange() if sessEx, ok := exch.(exchange.SessionExchange); ok { return &Session{ - sessCtx: ctx, - ses: nil, - sessEx: sessEx, - bs: bs.Blockstore(), + sessCtx: ctx, + ses: nil, + sessEx: sessEx, + bs: bs.Blockstore(), + notifier: exch, } } return &Session{ - ses: exch, - sessCtx: ctx, - bs: bs.Blockstore(), + ses: exch, + sessCtx: ctx, + bs: bs.Blockstore(), + notifier: exch, } } @@ -214,7 +216,7 @@ func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, e ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - var f func() exchange.Interface + var f func() notifiableFetcher if s.exchange != nil { f = s.getExchange } @@ -222,11 +224,11 @@ func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, e return getBlock(ctx, c, s.blockstore, f) // hash security } -func (s *blockService) getExchange() exchange.Interface { +func (s *blockService) getExchange() notifiableFetcher { return s.exchange } -func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget func() exchange.Interface) (blocks.Block, error) { +func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget func() notifiableFetcher) (blocks.Block, error) { err := verifcid.ValidateCid(c) // hash security if err != nil { return nil, err @@ -271,7 +273,7 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan block ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks") defer span.End() - var f func() exchange.Interface + var f func() notifiableFetcher if s.exchange != nil { f = s.getExchange } @@ -279,7 +281,7 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan block return getBlocks(ctx, ks, s.blockstore, f) // hash security } -func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget func() exchange.Interface) <-chan blocks.Block { +func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget func() notifiableFetcher) <-chan blocks.Block { out := make(chan blocks.Block) go func() { @@ -397,24 +399,53 @@ func (s *blockService) Close() error { return s.exchange.Close() } +type notifier interface { + NotifyNewBlocks(context.Context, ...blocks.Block) error +} + // Session is a helper type to provide higher level access to bitswap sessions type Session struct { - bs blockstore.Blockstore - ses exchange.Fetcher - sessEx exchange.SessionExchange - sessCtx context.Context - lk sync.Mutex + bs blockstore.Blockstore + ses exchange.Fetcher + sessEx exchange.SessionExchange + sessCtx context.Context + notifier notifier + lk sync.Mutex } -func (s *Session) getSession() exchange.Interface { +type notifiableFetcher interface { + exchange.Fetcher + notifier +} + +type notifiableFetcherWrapper struct { + exchange.Fetcher + notifier +} + +func (s *Session) getSession() notifiableFetcher { s.lk.Lock() defer s.lk.Unlock() if s.ses == nil { s.ses = s.sessEx.NewSession(s.sessCtx) } - // TODO: don't do that - return s.ses.(exchange.Interface) + return notifiableFetcherWrapper{s.ses, s.notifier} +} + +func (s *Session) getExchange() notifiableFetcher { + return notifiableFetcherWrapper{s.ses, s.notifier} +} + +func (s *Session) getFetcherFactory() func() notifiableFetcher { + if s.sessEx != nil { + return s.getSession + } + if s.ses != nil { + // Our exchange isn't session compatible, let's fallback to non sessions fetches + return s.getExchange + } + return nil } // GetBlock gets a block in the context of a request session @@ -422,11 +453,7 @@ func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() - var f func() exchange.Interface - if s.sessEx != nil { - f = s.getSession - } - return getBlock(ctx, c, s.bs, f) // hash security + return getBlock(ctx, c, s.bs, s.getFetcherFactory()) // hash security } // GetBlocks gets blocks in the context of a request session @@ -434,11 +461,7 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo ctx, span := internal.StartSpan(ctx, "Session.GetBlocks") defer span.End() - var f func() exchange.Interface - if s.sessEx != nil { - f = s.getSession - } - return getBlocks(ctx, ks, s.bs, f) // hash security + return getBlocks(ctx, ks, s.bs, s.getFetcherFactory()) // hash security } var _ BlockGetter = (*Session)(nil)