Skip to content

Commit

Permalink
Merge pull request #1 from Jorropo/fix-sessions
Browse files Browse the repository at this point in the history
Fix sessions
  • Loading branch information
MichaelMure authored Jul 27, 2022
2 parents f2a4f4f + c72dade commit 06e2b5d
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 40 deletions.
83 changes: 53 additions & 30 deletions blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,18 @@ func NewSession(ctx context.Context, bs BlockService) *Session {
exch := bs.Exchange()
if sessEx, ok := exch.(exchange.SessionExchange); ok {
return &Session{
sessCtx: ctx,
ses: nil,
sessEx: sessEx,
bs: bs.Blockstore(),
sessCtx: ctx,
ses: nil,
sessEx: sessEx,
bs: bs.Blockstore(),
notifier: exch,
}
}
return &Session{
ses: exch,
sessCtx: ctx,
bs: bs.Blockstore(),
ses: exch,
sessCtx: ctx,
bs: bs.Blockstore(),
notifier: exch,
}
}

Expand Down Expand Up @@ -214,19 +216,19 @@ func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, e
ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

var f func() exchange.Interface
var f func() notifiableFetcher
if s.exchange != nil {
f = s.getExchange
}

return getBlock(ctx, c, s.blockstore, f) // hash security
}

func (s *blockService) getExchange() exchange.Interface {
func (s *blockService) getExchange() notifiableFetcher {
return s.exchange
}

func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget func() exchange.Interface) (blocks.Block, error) {
func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget func() notifiableFetcher) (blocks.Block, error) {
err := verifcid.ValidateCid(c) // hash security
if err != nil {
return nil, err
Expand Down Expand Up @@ -271,15 +273,15 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan block
ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks")
defer span.End()

var f func() exchange.Interface
var f func() notifiableFetcher
if s.exchange != nil {
f = s.getExchange
}

return getBlocks(ctx, ks, s.blockstore, f) // hash security
}

func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget func() exchange.Interface) <-chan blocks.Block {
func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget func() notifiableFetcher) <-chan blocks.Block {
out := make(chan blocks.Block)

go func() {
Expand Down Expand Up @@ -397,48 +399,69 @@ func (s *blockService) Close() error {
return s.exchange.Close()
}

type notifier interface {
NotifyNewBlocks(context.Context, ...blocks.Block) error
}

// Session is a helper type to provide higher level access to bitswap sessions
type Session struct {
bs blockstore.Blockstore
ses exchange.Fetcher
sessEx exchange.SessionExchange
sessCtx context.Context
lk sync.Mutex
bs blockstore.Blockstore
ses exchange.Fetcher
sessEx exchange.SessionExchange
sessCtx context.Context
notifier notifier
lk sync.Mutex
}

func (s *Session) getSession() exchange.Interface {
type notifiableFetcher interface {
exchange.Fetcher
notifier
}

type notifiableFetcherWrapper struct {
exchange.Fetcher
notifier
}

func (s *Session) getSession() notifiableFetcher {
s.lk.Lock()
defer s.lk.Unlock()
if s.ses == nil {
s.ses = s.sessEx.NewSession(s.sessCtx)
}

// TODO: don't do that
return s.ses.(exchange.Interface)
return notifiableFetcherWrapper{s.ses, s.notifier}
}

func (s *Session) getExchange() notifiableFetcher {
return notifiableFetcherWrapper{s.ses, s.notifier}
}

func (s *Session) getFetcherFactory() func() notifiableFetcher {
if s.sessEx != nil {
return s.getSession
}
if s.ses != nil {
// Our exchange isn't session compatible, let's fallback to non sessions fetches
return s.getExchange
}
return nil
}

// GetBlock gets a block in the context of a request session
func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
defer span.End()

var f func() exchange.Interface
if s.sessEx != nil {
f = s.getSession
}
return getBlock(ctx, c, s.bs, f) // hash security
return getBlock(ctx, c, s.bs, s.getFetcherFactory()) // hash security
}

// GetBlocks gets blocks in the context of a request session
func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
ctx, span := internal.StartSpan(ctx, "Session.GetBlocks")
defer span.End()

var f func() exchange.Interface
if s.sessEx != nil {
f = s.getSession
}
return getBlocks(ctx, ks, s.bs, f) // hash security
return getBlocks(ctx, ks, s.bs, s.getFetcherFactory()) // hash security
}

var _ BlockGetter = (*Session)(nil)
8 changes: 2 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ require (
github.com/ipfs/go-ipfs-blockstore v1.2.0
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-delay v0.0.1
github.com/ipfs/go-ipfs-exchange-interface v0.1.0
github.com/ipfs/go-ipfs-exchange-offline v0.2.0
github.com/ipfs/go-ipfs-exchange-interface v0.2.0
github.com/ipfs/go-ipfs-exchange-offline v0.3.0
github.com/ipfs/go-ipfs-routing v0.2.1
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-ipld-format v0.3.0
Expand All @@ -21,10 +21,6 @@ require (
go.opentelemetry.io/otel/trace v1.7.0
)

replace github.com/ipfs/go-ipfs-exchange-interface => github.com/MichaelMure/go-ipfs-exchange-interface v0.0.2-0.20220713142804-1181846dc171

replace github.com/ipfs/go-ipfs-exchange-offline => github.com/MichaelMure/go-ipfs-exchange-offline v0.0.2-0.20220714102739-4b7a20c758a9

replace github.com/ipfs/go-bitswap => github.com/MichaelMure/go-bitswap v0.2.20-0.20220714225615-2c2a46194c4e

require (
Expand Down
9 changes: 5 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/MichaelMure/go-bitswap v0.2.20-0.20220714225615-2c2a46194c4e h1:LBNoZYGAIHUWfyolHrs9PH4M6kd3ze9MenhHdZXyHYc=
github.com/MichaelMure/go-bitswap v0.2.20-0.20220714225615-2c2a46194c4e/go.mod h1:EeOjh6Xi0IWQfdTj5LqScnXnxQtg9k4lshWjYCGSghc=
github.com/MichaelMure/go-ipfs-exchange-interface v0.0.2-0.20220713142804-1181846dc171 h1:J6IkkSKshHms3yQEOrNK/7B2YcCJ6ZbyDDmaXHwOj4Y=
github.com/MichaelMure/go-ipfs-exchange-interface v0.0.2-0.20220713142804-1181846dc171/go.mod h1:z6+RhJuDQbqKguVyslSOuVDhqF9JtTrO3eptSAiW2/Y=
github.com/MichaelMure/go-ipfs-exchange-offline v0.0.2-0.20220714102739-4b7a20c758a9 h1:R4PGpFAUOd3tJjFAJLu8dPyGOLufuICel3mZ9uwtYGo=
github.com/MichaelMure/go-ipfs-exchange-offline v0.0.2-0.20220714102739-4b7a20c758a9/go.mod h1:6SxfEhzkc8ZX1yCWGL8owaravbq3xUF28yGF13VSQ7c=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
Expand Down Expand Up @@ -293,6 +289,11 @@ github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG
github.com/ipfs/go-ipfs-ds-help v0.1.1/go.mod h1:SbBafGJuGsPI/QL3j9Fc5YPLeAu+SzOkI0gFwAg+mOs=
github.com/ipfs/go-ipfs-ds-help v1.1.0 h1:yLE2w9RAsl31LtfMt91tRZcrx+e61O5mDxFRR994w4Q=
github.com/ipfs/go-ipfs-ds-help v1.1.0/go.mod h1:YR5+6EaebOhfcqVCyqemItCLthrpVNot+rsOU/5IatU=
github.com/ipfs/go-ipfs-exchange-interface v0.1.0/go.mod h1:ych7WPlyHqFvCi/uQI48zLZuAWVP5iTQPXEfVaw5WEI=
github.com/ipfs/go-ipfs-exchange-interface v0.2.0 h1:8lMSJmKogZYNo2jjhUs0izT+dck05pqUw4mWNW9Pw6Y=
github.com/ipfs/go-ipfs-exchange-interface v0.2.0/go.mod h1:z6+RhJuDQbqKguVyslSOuVDhqF9JtTrO3eptSAiW2/Y=
github.com/ipfs/go-ipfs-exchange-offline v0.3.0 h1:c/Dg8GDPzixGd0MC8Jh6mjOwU57uYokgWRFidfvEkuA=
github.com/ipfs/go-ipfs-exchange-offline v0.3.0/go.mod h1:MOdJ9DChbb5u37M1IcbrRB02e++Z7521fMxqCNRrz9s=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-routing v0.2.1 h1:E+whHWhJkdN9YeoHZNj5itzc+OR292AJ2uE9FFiW0BY=
Expand Down

0 comments on commit 06e2b5d

Please sign in to comment.